Documentation
¶
Overview ¶
Package tasker provides utilities to background task management to achieve simplicity.
Example (SequenceMixedWithConcurrence) ¶
package main import ( "context" "go.llib.dev/frameless/pkg/tasker" ) func main() { _ = tasker.Sequence( tasker.Concurrence( func() { /* migration task 1 */ }, func() { /* migration task 2 */ }, ), tasker.Concurrence( func() { /* task dependent on migrations */ }, func() { /* task dependent on migrations */ }, func() { /* task dependent on migrations */ }, ), )(context.Background()) }
Index ¶
- Constants
- func HTTPServerPortFromENV(envKeys ...string) httpServerTaskOption
- func Main[TFN genericTask](ctx context.Context, tfns ...TFN) error
- type Daily
- type FireAndForget
- type Interval
- type Job
- type JobGroup
- type Manual
- type Monthly
- type Runnable
- type ScheduleID
- type ScheduleState
- type ScheduleStateRepository
- type Scheduler
- type SchedulerLocks
- type Task
- func Concurrence[TFN genericTask](tfns ...TFN) Task
- func HTTPServerTask(srv *http.Server, opts ...httpServerTaskOption) Task
- func IgnoreError[TFN genericTask](tfn TFN, errsToIgnore ...error) Task
- func OnError[TFN genericTask, EHFN genericErrorHandler](tfn TFN, ehfn EHFN) Task
- func Sequence[TFN genericTask](tfns ...TFN) Task
- func ToTask[TFN genericTask](tfn TFN) Task
- func WithNoOverlap(lock guard.NonBlockingLocker, job Task) Task
- func WithRepeat[TFN genericTask](interval Interval, tfn TFN) Task
- func WithShutdown[StartFn, StopFn genericTask](start StartFn, stop StopFn) Task
- func WithSignalNotify[TFN genericTask](tfn TFN, shutdownSignals ...os.Signal) Task
Examples ¶
Constants ¶
const ErrAlive errorkit.Error = "ErrAlive"
Variables ¶
This section is empty.
Functions ¶
func HTTPServerPortFromENV ¶ added in v0.195.0
func HTTPServerPortFromENV(envKeys ...string) httpServerTaskOption
func Main ¶
Main helps to manage concurrent background Tasks in your main. Each Task will run in its own goroutine. If any of the Task encounters a failure, the other tasker will receive a cancellation signal.
Example (WithRunnable) ¶
package main import ( "context" "os" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/logging" "go.llib.dev/frameless/pkg/tasker" ) func main() { var ( ctx = context.Background() task tasker.Runnable ) if err := tasker.Main(ctx, &task); err != nil { logger.Fatal(ctx, "error in main", logging.ErrField(err)) os.Exit(1) } }
Example (WithTask) ¶
package main import ( "context" "os" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/logging" "go.llib.dev/frameless/pkg/tasker" ) func main() { var ( ctx = context.Background() task tasker.Task ) if err := tasker.Main(ctx, task); err != nil { logger.Fatal(ctx, "error in main", logging.ErrField(err)) os.Exit(1) } }
Types ¶
type FireAndForget ¶ added in v0.250.0
type FireAndForget struct{}
FireAndForget does things automatically, including collecting finished jobs and freeing their resources.
Ideal for background job management, where the job results are not needed to be collected.
type Job ¶ added in v0.247.0
type Job struct {
// contains filtered or unexported fields
}
Job is a task that runs in the background. You can create one by using:
- tasker.Background
- tasker.JobGroup#Background
- Or manually starting it with tasker.Job#Start.
Each method allows you to run tasks in the background.
type JobGroup ¶ added in v0.247.0
type JobGroup[M Manual | FireAndForget] struct { // contains filtered or unexported fields }
JobGroup is a job manager where you can start background tasks as jobs.
func Background ¶ added in v0.204.0
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/logging" "go.llib.dev/frameless/pkg/tasker" ) func main() { var ctx context.Context var ( task1 tasker.Task task2 tasker.Task ) jobGroup := tasker.Background(ctx, task1, task2, ) if err := jobGroup.Join(); err != nil { logger.Error(ctx, "error in background task", logging.ErrField(err)) } }
func (*JobGroup[M]) Background ¶ added in v0.247.0
type Manual ¶ added in v0.250.0
type Manual struct{}
Manual allows you to collect the results of the background jobs, and you need to call Join to free up their results. Ideal for jog groups where you need to collect their error results.
type ScheduleID ¶ added in v0.247.0
type ScheduleID string
type ScheduleState ¶ added in v0.242.0
type ScheduleState struct { ID ScheduleID `ext:"id"` Timestamp time.Time }
type ScheduleStateRepository ¶ added in v0.242.0
type ScheduleStateRepository interface { crud.Creator[ScheduleState] crud.Updater[ScheduleState] crud.ByIDDeleter[ScheduleID] crud.ByIDFinder[ScheduleState, ScheduleID] }
type Scheduler ¶ added in v0.223.0
type Scheduler struct { Locks SchedulerLocks States ScheduleStateRepository }
func (Scheduler) WithSchedule ¶ added in v0.223.0
func (s Scheduler) WithSchedule(id ScheduleID, interval Interval, job Task) Task
Example ¶
package main import ( "context" "log" "go.llib.dev/frameless/pkg/tasker" ) func main() { scheduler := tasker.Scheduler{} task := scheduler.WithSchedule("db maintenance", tasker.Monthly{Day: 1}, func(ctx context.Context) error { return nil }) if err := task(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }
type SchedulerLocks ¶ added in v0.246.0
type SchedulerLocks interface { guard.LockerFactory[ScheduleID] }
type Task ¶
Task is the basic unit of tasker package, which represents an executable work.
Task at its core is nothing more than a synchronous function. Working with synchronous functions removes the complexity of thinking about how to run your application. Your components become more stateless and focus on the domain rather than the lifecycle management. This less stateful approach can help to make testing your Task also easier.
func Concurrence ¶
func Concurrence[TFN genericTask](tfns ...TFN) Task
Concurrence is a construct that allows you to execute a list of Task concurrently. If any of the Task fails with an error, all Task will receive cancellation signal.
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/tasker" ) func main() { err := tasker.Concurrence( func(ctx context.Context) error { // concurrent task 1 return nil }, func(ctx context.Context) error { // concurrent task 2 return nil }, ).Run(context.Background()) _ = err }
func HTTPServerTask ¶
HTTPServerTask is designed to encapsulate your `http.Server`, enabling graceful shutdown with the server and presenting it as a Task.
Example ¶
package main import ( "context" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := &http.Server{ Addr: "localhost:58080", Handler: http.NewServeMux(), } tasker.HTTPServerTask(srv). Run(context.Background()) }
Example (WithMain) ¶
package main import ( "context" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := &http.Server{Handler: http.NewServeMux()} tasker.Main(context.Background(), tasker.HTTPServerTask(srv, tasker.HTTPServerPortFromENV("PORT", "LOYALIFY_WEB_PORT"))) }
func IgnoreError ¶
func OnError ¶
func OnError[TFN genericTask, EHFN genericErrorHandler](tfn TFN, ehfn EHFN) Task
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/tasker" ) func main() { withErrorHandling := tasker.OnError( func(ctx context.Context) error { return nil }, // task func(ctx context.Context, err error) error { logger.Error(ctx, err.Error()); return nil }, // error handling ) _ = withErrorHandling }
func Sequence ¶
func Sequence[TFN genericTask](tfns ...TFN) Task
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/tasker" ) func main() { err := tasker.Sequence( func(ctx context.Context) error { // first task to execute return nil }, func(ctx context.Context) error { // follow-up task to execute return nil }, ).Run(context.Background()) _ = err }
func WithNoOverlap ¶ added in v0.247.0
func WithNoOverlap(lock guard.NonBlockingLocker, job Task) Task
func WithRepeat ¶
WithRepeat will keep repeating a given Task until shutdown is signaled. It is most suitable for Task(s) meant to be short-lived and executed continuously until the shutdown signal.
Example ¶
package main import ( "context" "log" "time" "go.llib.dev/frameless/pkg/tasker" ) func main() { task := tasker.WithRepeat(tasker.Every(time.Second), func(ctx context.Context) error { // I'm a short-lived task, and prefer to be constantly executed, // Repeat will keep repeating me every second until shutdown is signaled. return nil }) ctx, cancel := context.WithTimeout(context.Background(), time.Hour) defer cancel() if err := task(ctx); err != nil { log.Println("ERROR", err.Error()) } }
func WithShutdown ¶
func WithShutdown[StartFn, StopFn genericTask](start StartFn, stop StopFn) Task
WithShutdown will combine the start and stop/shutdown function into a single Task function. It supports a graceful shutdown period; upon reaching the deadline, it will cancel the context passed to the shutdown function. WithShutdown makes it easy to use components with graceful shutdown support as a Task, such as the http.Server.
tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown)
Example ¶
package main import ( "context" "log" "go.llib.dev/frameless/pkg/tasker" ) func main() { task := tasker.WithShutdown( func(ctx context.Context) error { // start working <-ctx.Done() return nil }, func(ctx context.Context) error { // graceful stop for work <-ctx.Done() return nil }, ) ctx, cancel := context.WithCancel(context.Background()) // listen to a cancellation signal and then call the cancel func // or use ShutdownManager. _ = cancel if err := task(ctx); err != nil { log.Println("ERROR", err.Error()) } }
Example (HttpServer) ¶
package main import ( "context" "log" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := http.Server{ Addr: "localhost:8080", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTeapot) }), } httpServerTask := tasker.WithShutdown( tasker.IgnoreError(srv.ListenAndServe, http.ErrServerClosed), srv.Shutdown, ) if err := tasker.WithSignalNotify(httpServerTask)(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }
func WithSignalNotify ¶
Example ¶
package main import ( "context" "log" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := http.Server{ Addr: "localhost:8080", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTeapot) }), } task := tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown) task = tasker.WithSignalNotify(task) if err := task(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }