Documentation
¶
Overview ¶
Package tasker provides utilities to background task management to achieve simplicity.
Example (SequenceMixedWithConcurrence) ¶
package main import ( "context" "go.llib.dev/frameless/pkg/tasker" ) func main() { _ = tasker.Sequence( tasker.Concurrence( func() { /* migration task 1 */ }, func() { /* migration task 2 */ }, ), tasker.Concurrence( func() { /* task dependent on migrations */ }, func() { /* task dependent on migrations */ }, func() { /* task dependent on migrations */ }, ), )(context.Background()) }
Index ¶
- func Background[TFN genericTask](ctx context.Context, tasks ...TFN) func() error
- func HTTPServerPortFromENV(envKeys ...string) httpServerTaskOption
- func Main[TFN genericTask](ctx context.Context, tasks ...TFN) error
- type Daily
- type Interval
- type Monthly
- type Repository
- type Runnable
- type Scheduler
- type State
- type StateID
- type StateRepository
- type Task
- func Concurrence[TFN genericTask](tfns ...TFN) Task
- func HTTPServerTask(srv *http.Server, opts ...httpServerTaskOption) Task
- func IgnoreError[TFN genericTask](tfn TFN, errsToIgnore ...error) Task
- func OnError[TFN genericTask, EHFN genericErrorHandler](tfn TFN, ehfn EHFN) Task
- func Sequence[TFN genericTask](tfns ...TFN) Task
- func ToTask[TFN genericTask](tfn TFN) Task
- func WithRepeat[TFN genericTask](interval Interval, tfn TFN) Task
- func WithSchedule[TFN genericTask](s Scheduler, id StateID, interval Interval, tsk TFN) Task
- func WithShutdown[StartFn, StopFn genericTask](start StartFn, stop StopFn) Task
- func WithSignalNotify[TFN genericTask](tfn TFN, shutdownSignals ...os.Signal) Task
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Background ¶ added in v0.204.0
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/logging" "go.llib.dev/frameless/pkg/tasker" ) func main() { var ctx context.Context var ( task1 tasker.Task task2 tasker.Task ) join := tasker.Background(ctx, task1, task2, ) if err := join(); err != nil { logger.Error(ctx, "error in background task", logging.ErrField(err)) } }
func HTTPServerPortFromENV ¶ added in v0.195.0
func HTTPServerPortFromENV(envKeys ...string) httpServerTaskOption
func Main ¶
Main helps to manage concurrent background Tasks in your main. Each Task will run in its own goroutine. If any of the Task encounters a failure, the other tasker will receive a cancellation signal.
Example (WithRunnable) ¶
package main import ( "context" "os" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/logging" "go.llib.dev/frameless/pkg/tasker" ) func main() { var ( ctx = context.Background() task tasker.Runnable ) if err := tasker.Main(ctx, &task); err != nil { logger.Fatal(ctx, "error in main", logging.ErrField(err)) os.Exit(1) } }
Example (WithTask) ¶
package main import ( "context" "os" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/logging" "go.llib.dev/frameless/pkg/tasker" ) func main() { var ( ctx = context.Background() task tasker.Task ) if err := tasker.Main(ctx, task); err != nil { logger.Fatal(ctx, "error in main", logging.ErrField(err)) os.Exit(1) } }
Types ¶
type Repository ¶ added in v0.223.0
type Repository interface { Locks() guard.LockerFactory[StateID] States() StateRepository }
type Scheduler ¶ added in v0.223.0
type Scheduler struct{ Repository Repository }
func (Scheduler) WithSchedule ¶ added in v0.223.0
Example ¶
package main import ( "context" "log" "go.llib.dev/frameless/pkg/tasker" ) func main() { scheduler := tasker.Scheduler{ Repository: nil, // &postgresql.TaskerScheduleRepository{CM: cm}, } task := scheduler.WithSchedule("db maintenance", tasker.Monthly{Day: 1}, func(ctx context.Context) error { return nil }) if err := task(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }
type StateRepository ¶ added in v0.223.0
type Task ¶
Task is the basic unit of tasker package, which represents an executable work.
Task at its core is nothing more than a synchronous function. Working with synchronous functions removes the complexity of thinking about how to run your application. Your components become more stateless and focus on the domain rather than the lifecycle management. This less stateful approach can help to make testing your Task also easier.
func Concurrence ¶
func Concurrence[TFN genericTask](tfns ...TFN) Task
Concurrence is a construct that allows you to execute a list of Task concurrently. If any of the Task fails with an error, all Task will receive cancellation signal.
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/tasker" ) func main() { err := tasker.Concurrence( func(ctx context.Context) error { // concurrent task 1 return nil }, func(ctx context.Context) error { // concurrent task 2 return nil }, ).Run(context.Background()) _ = err }
func HTTPServerTask ¶
HTTPServerTask is designed to encapsulate your `http.Server`, enabling graceful shutdown with the server and presenting it as a Task.
Example ¶
package main import ( "context" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := &http.Server{ Addr: "localhost:58080", Handler: http.NewServeMux(), } tasker.HTTPServerTask(srv). Run(context.Background()) }
Example (WithMain) ¶
package main import ( "context" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := &http.Server{Handler: http.NewServeMux()} tasker.Main(context.Background(), tasker.HTTPServerTask(srv, tasker.HTTPServerPortFromENV("PORT", "LOYALIFY_WEB_PORT"))) }
func IgnoreError ¶
func OnError ¶
func OnError[TFN genericTask, EHFN genericErrorHandler](tfn TFN, ehfn EHFN) Task
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/logger" "go.llib.dev/frameless/pkg/tasker" ) func main() { withErrorHandling := tasker.OnError( func(ctx context.Context) error { return nil }, // task func(ctx context.Context, err error) error { logger.Error(ctx, err.Error()); return nil }, // error handling ) _ = withErrorHandling }
func Sequence ¶
func Sequence[TFN genericTask](tfns ...TFN) Task
Example ¶
package main import ( "context" "go.llib.dev/frameless/pkg/tasker" ) func main() { err := tasker.Sequence( func(ctx context.Context) error { // first task to execute return nil }, func(ctx context.Context) error { // follow-up task to execute return nil }, ).Run(context.Background()) _ = err }
func WithRepeat ¶
WithRepeat will keep repeating a given Task until shutdown is signaled. It is most suitable for Task(s) meant to be short-lived and executed continuously until the shutdown signal.
Example ¶
package main import ( "context" "log" "time" "go.llib.dev/frameless/pkg/tasker" ) func main() { task := tasker.WithRepeat(tasker.Every(time.Second), func(ctx context.Context) error { // I'm a short-lived task, and prefer to be constantly executed, // Repeat will keep repeating me every second until shutdown is signaled. return nil }) ctx, cancel := context.WithTimeout(context.Background(), time.Hour) defer cancel() if err := task(ctx); err != nil { log.Println("ERROR", err.Error()) } }
func WithSchedule ¶ added in v0.223.0
WithSchedule will repeatedly execute a given task according to its schedule, ensuring coordination across application instances. It prevents different application nodes from running the same task concurrently through using the Scheduler.
Example ¶
package main import ( "context" "log" "go.llib.dev/frameless/pkg/tasker" ) func main() { scheduler := tasker.Scheduler{ Repository: nil, // &postgresql.TaskerScheduleRepository{CM: cm}, } task := tasker.WithSchedule(scheduler, "db maintenance", tasker.Monthly{Day: 1}, func(ctx context.Context) error { return nil }) if err := task(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }
func WithShutdown ¶
func WithShutdown[StartFn, StopFn genericTask](start StartFn, stop StopFn) Task
WithShutdown will combine the start and stop/shutdown function into a single Task function. It supports a graceful shutdown period; upon reaching the deadline, it will cancel the context passed to the shutdown function. WithShutdown makes it easy to use components with graceful shutdown support as a Task, such as the http.Server.
tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown)
Example ¶
package main import ( "context" "log" "go.llib.dev/frameless/pkg/tasker" ) func main() { task := tasker.WithShutdown( func(ctx context.Context) error { // start working <-ctx.Done() return nil }, func(ctx context.Context) error { // graceful stop for work <-ctx.Done() return nil }, ) ctx, cancel := context.WithCancel(context.Background()) // listen to a cancellation signal and then call the cancel func // or use ShutdownManager. _ = cancel if err := task(ctx); err != nil { log.Println("ERROR", err.Error()) } }
Example (HttpServer) ¶
package main import ( "context" "log" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := http.Server{ Addr: "localhost:8080", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTeapot) }), } httpServerTask := tasker.WithShutdown( tasker.IgnoreError(srv.ListenAndServe, http.ErrServerClosed), srv.Shutdown, ) if err := tasker.WithSignalNotify(httpServerTask)(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }
func WithSignalNotify ¶
Example ¶
package main import ( "context" "log" "net/http" "go.llib.dev/frameless/pkg/tasker" ) func main() { srv := http.Server{ Addr: "localhost:8080", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTeapot) }), } task := tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown) task = tasker.WithSignalNotify(task) if err := task(context.Background()); err != nil { log.Println("ERROR", err.Error()) } }