Documentation
¶
Index ¶
- type ExecutionOptionPreparer
- type JobDefinition
- type JobDefinitionMeta
- type JobDefinitionWithResult
- type JobError
- type JobErrorCode
- type JobExecutionOptions
- type JobInstance
- func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta
- func (ji *JobInstance[T]) GetJobInstanceId() string
- func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool)
- func (jd *JobInstance[T]) Visualize() (string, error)
- func (ji *JobInstance[T]) Wait(ctx context.Context) error
- type JobInstanceMeta
- type JobInstanceWithResult
- type JobOptionPreparer
- type RetryPolicy
- type RetryReport
- type StepContextPolicy
- type StepDefinition
- func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- func AddStepWithStaticFunc[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- func StepAfterWithStaticFunc[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, ...) (*StepDefinition[ST], error)
- type StepDefinitionMeta
- type StepErrorPolicy
- type StepExecutionData
- type StepExecutionOptions
- type StepInstance
- func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec
- func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context)
- func (si *StepInstance[T]) ExecutionData() *StepExecutionData
- func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta
- func (si *StepInstance[T]) GetName() string
- func (si *StepInstance[T]) GetState() StepState
- func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta
- func (si *StepInstance[T]) Waitable() asynctask.Waitable
- type StepInstanceMeta
- type StepState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ExecutionOptionPreparer ¶
type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions
func ExecuteAfter ¶
func ExecuteAfter(step StepDefinitionMeta) ExecutionOptionPreparer
Add precedence to a step.
without taking input from it(use StepAfter/StepAfterBoth otherwise)
func WithContextEnrichment ¶
func WithContextEnrichment(contextPolicy StepContextPolicy) ExecutionOptionPreparer
func WithRetry ¶
func WithRetry(retryPolicy RetryPolicy) ExecutionOptionPreparer
Allow retry of a step on error.
func WithTimeout ¶
func WithTimeout(timeout time.Duration) ExecutionOptionPreparer
Limit time spend on a step.
type JobDefinition ¶
JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
func NewJobDefinition ¶
func NewJobDefinition[T any](name string) *JobDefinition[T]
Create new JobDefinition
it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func (*JobDefinition[T]) GetName ¶
func (jd *JobDefinition[T]) GetName() string
func (*JobDefinition[T]) GetStep ¶
func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool)
GetStep returns the stepDefinition by name
func (*JobDefinition[T]) Start ¶
func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T]
Start execution of the job definition.
this will create and return new instance of the job caller will then be able to wait for the job instance
func (*JobDefinition[T]) Visualize ¶
func (jd *JobDefinition[T]) Visualize() (string, error)
Visualize the job definition in graphviz dot format
type JobDefinitionMeta ¶
type JobDefinitionMeta interface { GetName() string GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error // contains filtered or unexported methods }
Interface for a job definition
type JobDefinitionWithResult ¶
type JobDefinitionWithResult[Tin, Tout any] struct { *JobDefinition[Tin] // contains filtered or unexported fields }
func JobWithResult ¶
func JobWithResult[Tin, Tout any](jd *JobDefinition[Tin], resultStep *StepDefinition[Tout]) (*JobDefinitionWithResult[Tin, Tout], error)
func (*JobDefinitionWithResult[Tin, Tout]) Start ¶
func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin) *JobInstanceWithResult[Tin, Tout]
type JobError ¶
type JobError struct { Code JobErrorCode StepError error StepInstance StepInstanceMeta Message string }
type JobErrorCode ¶
type JobErrorCode string
const ( ErrPrecedentStepFailure JobErrorCode = "precedent step failed" ErrStepFailed JobErrorCode = "step failed" ErrStepNotInJob JobErrorCode = "trying to reference to a step not registered in job" )
func (JobErrorCode) Error ¶
func (code JobErrorCode) Error() string
type JobExecutionOptions ¶
type JobInstance ¶
type JobInstance[T any] struct { Definition *JobDefinition[T] // contains filtered or unexported fields }
JobInstance is the instance of a jobDefinition
func (*JobInstance[T]) GetJobDefinition ¶
func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta
func (*JobInstance[T]) GetJobInstanceId ¶
func (ji *JobInstance[T]) GetJobInstanceId() string
func (*JobInstance[T]) GetStepInstance ¶
func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool)
GetStepInstance returns the stepInstance by name
func (*JobInstance[T]) Visualize ¶
func (jd *JobInstance[T]) Visualize() (string, error)
Visualize the job instance in graphviz dot format
type JobInstanceMeta ¶
type JobInstanceMeta interface { GetJobDefinition() JobDefinitionMeta GetStepInstance(stepName string) (StepInstanceMeta, bool) Wait(context.Context) error // contains filtered or unexported methods }
type JobInstanceWithResult ¶
type JobInstanceWithResult[Tin, Tout any] struct { *JobInstance[Tin] // contains filtered or unexported fields }
type JobOptionPreparer ¶
type JobOptionPreparer func(*JobExecutionOptions) *JobExecutionOptions
func WithJobId ¶
func WithJobId(jobId string) JobOptionPreparer
func WithSequentialExecution ¶
func WithSequentialExecution() JobOptionPreparer
type RetryReport ¶
type RetryReport struct {
Count int
}
RetryReport would record the retry count (could extend to include each retry duration, ...)
type StepContextPolicy ¶
type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context
StepContextPolicy allows context enrichment before passing to step.
With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.
type StepDefinition ¶
type StepDefinition[T any] struct { // contains filtered or unexported fields }
StepDefinition defines a step and it's dependencies in a job definition.
func AddStep ¶
func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
AddStep adds a step to the job definition.
func AddStepWithStaticFunc ¶
func AddStepWithStaticFunc[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func StepAfter ¶
func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfter add a step after a preceding step, also take input from that preceding step
func StepAfterBoth ¶
func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfterBoth add a step after both preceding steps, also take input from both preceding steps
func StepAfterBothWithStaticFunc ¶
func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func StepAfterWithStaticFunc ¶
func StepAfterWithStaticFunc[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)
StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func (*StepDefinition[T]) DependsOn ¶
func (sd *StepDefinition[T]) DependsOn() []string
func (*StepDefinition[T]) DotSpec ¶
func (sd *StepDefinition[T]) DotSpec() *graph.DotNodeSpec
func (*StepDefinition[T]) ExecutionPolicy ¶
func (sd *StepDefinition[T]) ExecutionPolicy() *StepExecutionOptions
func (*StepDefinition[T]) GetName ¶
func (sd *StepDefinition[T]) GetName() string
type StepDefinitionMeta ¶
type StepDefinitionMeta interface { // GetName return name of the step GetName() string // DependsOn return the list of step names that this step depends on DependsOn() []string // ExecutionPolicy return the execution policy of the step ExecutionPolicy() *StepExecutionOptions // DotSpec used for generating graphviz graph DotSpec() *graph.DotNodeSpec // contains filtered or unexported methods }
StepDefinitionMeta is the interface for a step definition
type StepErrorPolicy ¶
type StepErrorPolicy struct{}
type StepExecutionData ¶
type StepExecutionData struct { StartTime time.Time Duration time.Duration Retried *RetryReport }
StepExecutionData would measure the step execution time and retry report.
type StepExecutionOptions ¶
type StepExecutionOptions struct { Timeout time.Duration ErrorPolicy StepErrorPolicy RetryPolicy RetryPolicy ContextPolicy StepContextPolicy // dependencies that are not input. DependOn []string }
type StepInstance ¶
type StepInstance[T any] struct { Definition *StepDefinition[T] JobInstance JobInstanceMeta // contains filtered or unexported fields }
StepInstance is the instance of a step, within a job instance.
func (*StepInstance[T]) DotSpec ¶
func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec
func (*StepInstance[T]) EnrichContext ¶
func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context)
func (*StepInstance[T]) ExecutionData ¶
func (si *StepInstance[T]) ExecutionData() *StepExecutionData
func (*StepInstance[T]) GetJobInstance ¶
func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta
func (*StepInstance[T]) GetName ¶
func (si *StepInstance[T]) GetName() string
func (*StepInstance[T]) GetState ¶
func (si *StepInstance[T]) GetState() StepState
func (*StepInstance[T]) GetStepDefinition ¶
func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta
func (*StepInstance[T]) Waitable ¶
func (si *StepInstance[T]) Waitable() asynctask.Waitable
type StepInstanceMeta ¶
type StepInstanceMeta interface { GetName() string ExecutionData() *StepExecutionData GetState() StepState GetJobInstance() JobInstanceMeta GetStepDefinition() StepDefinitionMeta Waitable() asynctask.Waitable DotSpec() *graph.DotNodeSpec }
StepInstanceMeta is the interface for a step instance