Documentation
¶
Index ¶
- Constants
- type ExecutionOptionPreparer
- type JobDefinition
- func (jd *JobDefinition[T]) GetName() string
- func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool)
- func (jd *JobDefinition[T]) Seal()
- func (jd *JobDefinition[T]) Sealed() bool
- func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T]
- func (jd *JobDefinition[T]) Visualize() (string, error)
- type JobDefinitionMeta
- type JobDefinitionWithResult
- type JobError
- type JobErrorCode
- type JobExecutionOptions
- type JobInstance
- func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta
- func (ji *JobInstance[T]) GetJobInstanceId() string
- func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool)
- func (jd *JobInstance[T]) Visualize() (string, error)
- func (ji *JobInstance[T]) Wait(ctx context.Context) error
- type JobInstanceMeta
- type JobInstanceWithResult
- type JobOptionPreparer
- type MessageError
- type RetryPolicy
- type RetryReport
- type StepContextPolicy
- type StepDefinition
- func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], ...) (*StepDefinition[ST], error)
- func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], ...) (*StepDefinition[ST], error)
- func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], ...) (*StepDefinition[ST], error)
- func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], ...) (*StepDefinition[ST], error)
- func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], ...) (*StepDefinition[ST], error)
- type StepDefinitionMeta
- type StepErrorPolicy
- type StepExecutionData
- type StepExecutionOptions
- type StepInstance
- func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec
- func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context)
- func (si *StepInstance[T]) ExecutionData() *StepExecutionData
- func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta
- func (si *StepInstance[T]) GetName() string
- func (si *StepInstance[T]) GetState() StepState
- func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta
- func (si *StepInstance[T]) Waitable() asynctask.Waitable
- type StepInstanceMeta
- type StepState
Constants ¶
const ( ErrPrecedentStepFailed JobErrorCode = "PrecedentStepFailed" ErrStepFailed JobErrorCode = "StepFailed" ErrRefStepNotInJob JobErrorCode = "RefStepNotInJob" MsgRefStepNotInJob string = "trying to reference to step %q, but it is not registered in job" ErrAddStepInSealedJob JobErrorCode = "AddStepInSealedJob" MsgAddStepInSealedJob string = "trying to add step %q to a sealed job definition" ErrAddExistingStep JobErrorCode = "AddExistingStep" MsgAddExistingStep string = "trying to add step %q to job definition, but it already exists" ErrDuplicateInputParentStep JobErrorCode = "DuplicateInputParentStep" MsgDuplicateInputParentStep string = "at least 2 input parentSteps are same" ErrRuntimeStepNotFound JobErrorCode = "RuntimeStepNotFound" MsgRuntimeStepNotFound string = "runtime step %q not found, must be a bug in asyncjob" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ExecutionOptionPreparer ¶
type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions
func ExecuteAfter ¶ added in v0.1.1
func ExecuteAfter(step StepDefinitionMeta) ExecutionOptionPreparer
Add precedence to a step.
without taking input from it(use StepAfter/StepAfterBoth otherwise)
func WithContextEnrichment ¶ added in v0.5.0
func WithContextEnrichment(contextPolicy StepContextPolicy) ExecutionOptionPreparer
func WithRetry ¶ added in v0.2.2
func WithRetry(retryPolicy RetryPolicy) ExecutionOptionPreparer
Allow retry of a step on error.
type JobDefinition ¶ added in v0.5.0
type JobDefinition[T any] struct { // contains filtered or unexported fields }
JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
func NewJobDefinition ¶ added in v0.5.0
func NewJobDefinition[T any](name string) *JobDefinition[T]
Create new JobDefinition
it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func (*JobDefinition[T]) GetName ¶ added in v0.5.0
func (jd *JobDefinition[T]) GetName() string
func (*JobDefinition[T]) GetStep ¶ added in v0.5.0
func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool)
GetStep returns the stepDefinition by name
func (*JobDefinition[T]) Seal ¶ added in v0.5.0
func (jd *JobDefinition[T]) Seal()
func (*JobDefinition[T]) Sealed ¶ added in v0.5.0
func (jd *JobDefinition[T]) Sealed() bool
func (*JobDefinition[T]) Start ¶ added in v0.5.0
func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T]
Start execution of the job definition.
this will create and return new instance of the job caller will then be able to wait for the job instance
func (*JobDefinition[T]) Visualize ¶ added in v0.5.0
func (jd *JobDefinition[T]) Visualize() (string, error)
Visualize the job definition in graphviz dot format
type JobDefinitionMeta ¶ added in v0.5.0
type JobDefinitionMeta interface { GetName() string GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error Seal() Sealed() bool Visualize() (string, error) // contains filtered or unexported methods }
Interface for a job definition
type JobDefinitionWithResult ¶ added in v0.5.0
type JobDefinitionWithResult[Tin, Tout any] struct { *JobDefinition[Tin] // contains filtered or unexported fields }
func JobWithResult ¶ added in v0.5.0
func JobWithResult[Tin, Tout any](jd *JobDefinition[Tin], resultStep *StepDefinition[Tout]) (*JobDefinitionWithResult[Tin, Tout], error)
func (*JobDefinitionWithResult[Tin, Tout]) Start ¶ added in v0.5.0
func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout]
type JobError ¶ added in v0.2.2
type JobError struct { Code JobErrorCode StepError error StepInstance StepInstanceMeta Message string }
type JobErrorCode ¶ added in v0.2.2
type JobErrorCode string
func (JobErrorCode) Error ¶ added in v0.2.2
func (code JobErrorCode) Error() string
func (JobErrorCode) WithMessage ¶ added in v0.6.0
func (code JobErrorCode) WithMessage(msg string) *MessageError
type JobExecutionOptions ¶ added in v0.5.0
type JobInstance ¶ added in v0.5.0
type JobInstance[T any] struct { Definition *JobDefinition[T] // contains filtered or unexported fields }
JobInstance is the instance of a jobDefinition
func (*JobInstance[T]) GetJobDefinition ¶ added in v0.5.0
func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta
func (*JobInstance[T]) GetJobInstanceId ¶ added in v0.5.0
func (ji *JobInstance[T]) GetJobInstanceId() string
func (*JobInstance[T]) GetStepInstance ¶ added in v0.5.0
func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool)
GetStepInstance returns the stepInstance by name
func (*JobInstance[T]) Visualize ¶ added in v0.5.0
func (jd *JobInstance[T]) Visualize() (string, error)
Visualize the job instance in graphviz dot format
type JobInstanceMeta ¶ added in v0.5.0
type JobInstanceMeta interface { GetJobInstanceId() string GetJobDefinition() JobDefinitionMeta GetStepInstance(stepName string) (StepInstanceMeta, bool) Wait(context.Context) error Visualize() (string, error) // contains filtered or unexported methods }
type JobInstanceWithResult ¶ added in v0.5.0
type JobInstanceWithResult[Tin, Tout any] struct { *JobInstance[Tin] // contains filtered or unexported fields }
type JobOptionPreparer ¶ added in v0.5.0
type JobOptionPreparer func(*JobExecutionOptions) *JobExecutionOptions
func WithJobId ¶ added in v0.5.0
func WithJobId(jobId string) JobOptionPreparer
func WithSequentialExecution ¶ added in v0.5.0
func WithSequentialExecution() JobOptionPreparer
type MessageError ¶ added in v0.6.0
type MessageError struct { Code JobErrorCode Message string }
func (*MessageError) Error ¶ added in v0.6.0
func (me *MessageError) Error() string
func (*MessageError) Unwrap ¶ added in v0.6.0
func (me *MessageError) Unwrap() error
type RetryPolicy ¶ added in v0.2.2
type RetryReport ¶ added in v0.2.2
type RetryReport struct {
Count uint
}
RetryReport would record the retry count (could extend to include each retry duration, ...)
type StepContextPolicy ¶ added in v0.2.2
type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context
StepContextPolicy allows context enrichment before passing to step.
With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.
type StepDefinition ¶ added in v0.5.0
type StepDefinition[T any] struct { // contains filtered or unexported fields }
StepDefinition defines a step and it's dependencies in a job definition.
func AddStep ¶
func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
AddStep adds a step to the job definition.
func AddStepWithStaticFunc ¶ added in v0.5.0
func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func StepAfter ¶
func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfter add a step after a preceding step, also take input from that preceding step
func StepAfterBoth ¶
func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfterBoth add a step after both preceding steps, also take input from both preceding steps
func StepAfterBothWithStaticFunc ¶ added in v0.5.0
func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func StepAfterWithStaticFunc ¶ added in v0.5.0
func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func (*StepDefinition[T]) DependsOn ¶ added in v0.5.0
func (sd *StepDefinition[T]) DependsOn() []string
func (*StepDefinition[T]) DotSpec ¶ added in v0.5.0
func (sd *StepDefinition[T]) DotSpec() *graph.DotNodeSpec
func (*StepDefinition[T]) GetName ¶ added in v0.5.0
func (sd *StepDefinition[T]) GetName() string
type StepDefinitionMeta ¶ added in v0.5.0
type StepDefinitionMeta interface { // GetName return name of the step GetName() string // DependsOn return the list of step names that this step depends on DependsOn() []string // DotSpec used for generating graphviz graph DotSpec() *graph.DotNodeSpec // contains filtered or unexported methods }
StepDefinitionMeta is the interface for a step definition
type StepErrorPolicy ¶
type StepErrorPolicy struct{}
type StepExecutionData ¶ added in v0.2.1
type StepExecutionData struct { StartTime time.Time Duration time.Duration Retried *RetryReport }
StepExecutionData would measure the step execution time and retry report.
type StepExecutionOptions ¶
type StepExecutionOptions struct { ErrorPolicy StepErrorPolicy RetryPolicy RetryPolicy ContextPolicy StepContextPolicy // dependencies that are not input. DependOn []string }
type StepInstance ¶ added in v0.5.0
type StepInstance[T any] struct { Definition *StepDefinition[T] JobInstance JobInstanceMeta // contains filtered or unexported fields }
StepInstance is the instance of a step, within a job instance.
func (*StepInstance[T]) DotSpec ¶ added in v0.5.0
func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec
func (*StepInstance[T]) EnrichContext ¶ added in v0.5.0
func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context)
func (*StepInstance[T]) ExecutionData ¶ added in v0.5.0
func (si *StepInstance[T]) ExecutionData() *StepExecutionData
func (*StepInstance[T]) GetJobInstance ¶ added in v0.5.0
func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta
func (*StepInstance[T]) GetName ¶ added in v0.5.0
func (si *StepInstance[T]) GetName() string
func (*StepInstance[T]) GetState ¶ added in v0.5.0
func (si *StepInstance[T]) GetState() StepState
func (*StepInstance[T]) GetStepDefinition ¶ added in v0.5.0
func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta
func (*StepInstance[T]) Waitable ¶ added in v0.5.0
func (si *StepInstance[T]) Waitable() asynctask.Waitable
type StepInstanceMeta ¶ added in v0.5.0
type StepInstanceMeta interface { GetName() string ExecutionData() *StepExecutionData GetState() StepState GetJobInstance() JobInstanceMeta GetStepDefinition() StepDefinitionMeta Waitable() asynctask.Waitable DotSpec() *graph.DotNodeSpec }
StepInstanceMeta is the interface for a step instance