tasker

package
v0.255.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 21 Imported by: 2

README

package tasker

tasker package provides utilities to background task management.

A tasker.Task, at its core, is nothing more than a synchronous function.

func(ctx context.Context) error {
	return nil
}

Working with synchronous functions removes the complexity of thinking about how to run your application in your main. Your components become more stateless and focus on the domain rather than the lifecycle management, such as implementing a graceful async shutdown. This less stateful approach can help to make testing also easier.

Short-lived Jobs with Repeat

If your Job is a short-lived interaction, which is meant to be executed continuously between intervals, then you can use the tasker.WithRepeat to implement a continuous execution that stops on a shutdown signal.

task := tasker.WithRepeat(tasker.Every(time.Second), func(ctx context.Context) error {
	// I'm a short-lived task, and I prefer to be constantly executed,
	// Repeat will keep repeating to me every second until the shutdown is signalled.
	return nil
})

ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

if err := task(ctx); err != nil {
	logger.Error(ctx, err.Error())
}
scheduling

In the schedule package, you can choose from various options on how would you like to schedule your task.

  • Schedule by time duration interval
tasker.Every(time.Second) // schedule every second
  • Schedule on a Daily basis
tasker.Daily{Hour:12} // schedule every day at 12 o'clock
  • Schedule on a Monthly basis
// schedule every month at 12 o'clock on the third day
tasker.Monthly{Day: 3, Hour:12} 
Execution Order

If you wish to execute Jobs in a sequential order, use tasker.Sequence. It can express dependency between tasks if one should only execute if the previous one has already succeeded.

s := tasker.Sequence(
    func(ctx context.Context) error {
        // first task to execute
        return nil
    },
    func(ctx context.Context) error {
        // follow-up task to execute
        return nil
    },
)

err := s.Run(context.Background())

If you need to execute tasks concurrently, use tasker.Concurrence. It guarantees that if a task fails, you receive the error back. It also ensures that the tasks fail together as a unit, though signalling cancellation if any of the tasks has a failure.

c := tasker.Concurrence(
    func(ctx context.Context) error {
        return nil // It runs at the same time.
    },
    func(ctx context.Context) error {
        return nil // It runs at the same time.
    },
)

err := c.Run(context.Background())

You can model dependency between tasks by mixing "Sequence" and "Concurrence".

task := tasker.Sequence(
	tasker.Concurrence( // group 1 which is a prerequisite to group 2
		func(ctx context.Context) error { return nil /* some migration task 1 */ },
		func(ctx context.Context) error { return nil /* some migration task 2 */ },
	),
	tasker.Concurrence( // group 2 which depends on group 1 success
		func(ctx context.Context) error { return nil /* a task which depending on a completed migration 1 */ },
		func(ctx context.Context) error { return nil /* a task which depending on a completed migration 2 */ },
		func(ctx context.Context) error { return nil /* a task which depending on a completed migration 3 */ },
	),
)

tasker.Main(context.Background(), task)

Long-lived Jobs

If your task requires continuous work, you can use the received context as a parent context to get notified about a shutdown event. This allows simplicity in your code so you don't have to differentiate if you need to cancel operations because of a request cancellation or because of a shutdown event. You can still separate the two cancellation types by using background context.

func MyJob(signal context.Context) error {
	<-signal.Done() // work until shutdown signal
	return signal.Err() // returning the context error is not an issue.
}

Cron-like scheduled Tasks with Scheduler.WithSchedule

If you need cron-like background tasks with the guarantee that your background tasks are serialised across your application instances, and only one scheduled task can run at a time, then you may use tasker.Scheduler, which solves that for you.

package main

import (
	"context"
	"os"
	"database/sql"

	"go.llib.dev/frameless/adapter/postgresql"
	"go.llib.dev/frameless/pkg/contextkit"
	"go.llib.dev/frameless/pkg/logger"
	"go.llib.dev/frameless/pkg/tasker"
	"go.llib.dev/frameless/pkg/tasker/schedule"
)

func main() {
	ctx := context.Background()

	db, err := sql.Open("driverName", os.Getenv("DATABASE_URL"))
	if err != nil {
		logger.Error(ctx, "error during postgres db opening", logging.ErrField(err))
		os.Exit(1)
	}

	scheduler := tasker.Scheduler{
		LockerFactory: &postgresql.LockerFactory[string]{DB: db},
		Repository:    &postgresql.TaskerScheduleStateRepository{DB: db},
	}

	task1 := scheduler.WithSchedule("my scheduled task", tasker.Monthly{Day: 1}, func(ctx context.Context) error {
		// this task will only run in one instance every month, on the first day.
		return nil
	})

	if err := tasker.Main(ctx, task1); err != nil {
		logger.Error(ctx, "error during the application run", logging.ErrField(err))
		os.Exit(1)
	}
}

Using components as Job with Graceful shutdown support

If your application components signal shutdown with a method interaction, like how http.Server do, then you can use tasker.WithShutdown to combine the entry-point method and the shutdown method into a single tasker.Job lambda expression. The graceful shutdown has a timeout, and the shutdown context will be cancelled afterwards.

srv := http.Server{
	Addr: "localhost:8080",
	Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusTeapot)
	}),
}

httpServerTask := tasker.WithShutdown(
	tasker.IgnoreError(srv.ListenAndServe, http.ErrServerClosed), 
	srv.Shutdown,
)

Notify shutdown signals to tasks

The tasker.WithSignalNotify will listen to the shutdown syscalls, and will cancel the context of your Task. Using tasker.WithSignalNotify is most suitable from the main function.

// The task will be notified about shutdown signal call as context cancellation.
task := tasker.WithSignalNotify(MyTask)

if err := task(context.Background()); err != nil {
	logger.Error(ctx, err.Error())
}

Running your tasks in main

The most convenient way to run your tasks in your main is by using tasker.Main. It combines Concurrent task execution with shutdown cancellation by signals.

tasker.Main(ctx, task1, task2, task3)

Documentation

Overview

Package tasker provides utilities to background task management to achieve simplicity.

Example (SequenceMixedWithConcurrence)
package main

import (
	"context"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	_ = tasker.Sequence(
		tasker.Concurrence(
			func() { /* migration task 1 */ },
			func() { /* migration task 2 */ },
		),
		tasker.Concurrence(
			func() { /* task dependent on migrations */ },
			func() { /* task dependent on migrations */ },
			func() { /* task dependent on migrations */ },
		),
	)(context.Background())
}

Index

Examples

Constants

View Source
const ErrAlive errorkit.Error = "ErrAlive"

Variables

This section is empty.

Functions

func HTTPServerPortFromENV added in v0.195.0

func HTTPServerPortFromENV(envKeys ...string) httpServerTaskOption

func Main

func Main[TFN genericTask](ctx context.Context, tfns ...TFN) error

Main helps to manage concurrent background Tasks in your main. Each Task will run in its own goroutine. If any of the Task encounters a failure, the other tasker will receive a cancellation signal.

Example (WithRunnable)
package main

import (
	"context"
	"os"

	"go.llib.dev/frameless/pkg/logger"
	"go.llib.dev/frameless/pkg/logging"
	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	var (
		ctx  = context.Background()
		task tasker.Runnable
	)
	if err := tasker.Main(ctx, &task); err != nil {
		logger.Fatal(ctx, "error in main", logging.ErrField(err))
		os.Exit(1)
	}
}
Example (WithTask)
package main

import (
	"context"
	"os"

	"go.llib.dev/frameless/pkg/logger"
	"go.llib.dev/frameless/pkg/logging"
	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	var (
		ctx  = context.Background()
		task tasker.Task
	)
	if err := tasker.Main(ctx, task); err != nil {
		logger.Fatal(ctx, "error in main", logging.ErrField(err))
		os.Exit(1)
	}
}

Types

type Daily added in v0.223.0

type Daily struct {
	Hour, Minute int
	Location     *time.Location
}

func (Daily) UntilNext added in v0.223.0

func (i Daily) UntilNext(lastRanAt time.Time) time.Duration

type FireAndForget added in v0.250.0

type FireAndForget struct{}

type Interval

type Interval interface {
	UntilNext(lastRanAt time.Time) time.Duration
}

Interval a scheduling interval component that can identify

func Every added in v0.223.0

func Every(d time.Duration) Interval

Every returns an Interval which scheduling frequency is the received time duration.

type Job added in v0.247.0

type Job struct {
	// contains filtered or unexported fields
}

Job is a task that runs in the background. You can create one by using:

  • tasker.Background
  • tasker.JobGroup#Background
  • Or manually starting it with tasker.Job#Start.

Each method allows you to run tasks in the background.

func (*Job) Alive added in v0.247.0

func (j *Job) Alive() bool

func (*Job) Join added in v0.247.0

func (j *Job) Join() error

func (*Job) Start added in v0.247.0

func (j *Job) Start(ctx context.Context, tsk Task) error

func (*Job) Stop added in v0.247.0

func (j *Job) Stop() error

func (*Job) Wait added in v0.250.0

func (j *Job) Wait()

type JobGroup added in v0.247.0

type JobGroup[M Manual | FireAndForget] struct {
	// contains filtered or unexported fields
}

JobGroup is a job manager where you can start background tasks as jobs.

It supports two mode:

  • Manual: allows you to collect the results of the background jobs, and you need to call Join to free up their results. Ideal for jog groups where you need to collect their error results.
  • FireAndForget: does things automatically, including collecting finished jobs and freeing their resources. Ideal for background job management, where the job results are not needed to be collected.

func Background added in v0.204.0

func Background[TFN genericTask](ctx context.Context, tasks ...TFN) *JobGroup[Manual]
Example
package main

import (
	"context"

	"go.llib.dev/frameless/pkg/logger"
	"go.llib.dev/frameless/pkg/logging"
	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	var ctx context.Context
	var (
		task1 tasker.Task
		task2 tasker.Task
	)
	jobGroup := tasker.Background(ctx,
		task1,
		task2,
	)
	if err := jobGroup.Join(); err != nil {
		logger.Error(ctx, "error in background task", logging.ErrField(err))
	}
}

func (*JobGroup[M]) Alive added in v0.247.0

func (jg *JobGroup[M]) Alive() bool

func (*JobGroup[M]) Background added in v0.247.0

func (jg *JobGroup[M]) Background(ctx context.Context, tsk Task) *Job

func (*JobGroup[M]) Join added in v0.247.0

func (jg *JobGroup[M]) Join() error

func (*JobGroup[M]) Len added in v0.247.0

func (jg *JobGroup[M]) Len() int

func (*JobGroup[M]) Stop added in v0.247.0

func (jg *JobGroup[M]) Stop() error

func (*JobGroup[M]) Wait added in v0.250.0

func (jg *JobGroup[M]) Wait()

type Manual added in v0.250.0

type Manual struct{}

type Monthly added in v0.223.0

type Monthly struct {
	Day, Hour, Minute int
	Location          *time.Location
}

func (Monthly) UntilNext added in v0.223.0

func (i Monthly) UntilNext(lastRanAt time.Time) time.Duration

type Runnable

type Runnable interface{ Run(context.Context) error }

type ScheduleID added in v0.247.0

type ScheduleID string

type ScheduleState added in v0.242.0

type ScheduleState struct {
	ID        ScheduleID `ext:"id"`
	Timestamp time.Time
}

type Scheduler added in v0.223.0

type Scheduler struct {
	Locks  SchedulerLocks
	States ScheduleStateRepository
}

func (Scheduler) WithSchedule added in v0.223.0

func (s Scheduler) WithSchedule(id ScheduleID, interval Interval, job Task) Task
Example
package main

import (
	"context"
	"log"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	scheduler := tasker.Scheduler{}

	task := scheduler.WithSchedule("db maintenance", tasker.Monthly{Day: 1}, func(ctx context.Context) error {
		return nil
	})

	if err := task(context.Background()); err != nil {
		log.Println("ERROR", err.Error())
	}
}

type SchedulerLocks added in v0.246.0

type Task

type Task func(context.Context) error

Task is the basic unit of tasker package, which represents an executable work.

Task at its core is nothing more than a synchronous function. Working with synchronous functions removes the complexity of thinking about how to run your application. Your components become more stateless and focus on the domain rather than the lifecycle management. This less stateful approach can help to make testing your Task also easier.

func Concurrence

func Concurrence[TFN genericTask](tfns ...TFN) Task

Concurrence is a construct that allows you to execute a list of Task concurrently. If any of the Task fails with an error, all Task will receive cancellation signal.

Example
package main

import (
	"context"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	err := tasker.Concurrence(
		func(ctx context.Context) error {
			// concurrent task 1
			return nil
		},
		func(ctx context.Context) error {
			// concurrent task 2
			return nil
		},
	).Run(context.Background())
	_ = err
}

func HTTPServerTask

func HTTPServerTask(srv *http.Server, opts ...httpServerTaskOption) Task

HTTPServerTask is designed to encapsulate your `http.Server`, enabling graceful shutdown with the server and presenting it as a Task.

Example
package main

import (
	"context"
	"net/http"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	srv := &http.Server{
		Addr:    "localhost:58080",
		Handler: http.NewServeMux(),
	}

	tasker.HTTPServerTask(srv).
		Run(context.Background())
}
Example (WithMain)
package main

import (
	"context"
	"net/http"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	srv := &http.Server{Handler: http.NewServeMux()}

	tasker.Main(context.Background(),
		tasker.HTTPServerTask(srv,
			tasker.HTTPServerPortFromENV("PORT", "LOYALIFY_WEB_PORT")))
}

func IgnoreError

func IgnoreError[TFN genericTask](tfn TFN, errsToIgnore ...error) Task

func OnError

func OnError[TFN genericTask, EHFN genericErrorHandler](tfn TFN, ehfn EHFN) Task
Example
package main

import (
	"context"

	"go.llib.dev/frameless/pkg/logger"
	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	withErrorHandling := tasker.OnError(
		func(ctx context.Context) error { return nil },                                            // task
		func(ctx context.Context, err error) error { logger.Error(ctx, err.Error()); return nil }, // error handling
	)
	_ = withErrorHandling
}

func Sequence

func Sequence[TFN genericTask](tfns ...TFN) Task
Example
package main

import (
	"context"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	err := tasker.Sequence(
		func(ctx context.Context) error {
			// first task to execute
			return nil
		},
		func(ctx context.Context) error {
			// follow-up task to execute
			return nil
		},
	).Run(context.Background())
	_ = err
}

func ToTask

func ToTask[TFN genericTask](tfn TFN) Task

func WithNoOverlap added in v0.247.0

func WithNoOverlap(lock guard.NonBlockingLocker, job Task) Task

func WithRepeat

func WithRepeat[TFN genericTask](interval Interval, tfn TFN) Task

WithRepeat will keep repeating a given Task until shutdown is signaled. It is most suitable for Task(s) meant to be short-lived and executed continuously until the shutdown signal.

Example
package main

import (
	"context"
	"log"
	"time"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	task := tasker.WithRepeat(tasker.Every(time.Second), func(ctx context.Context) error {
		// I'm a short-lived task, and prefer to be constantly executed,
		// Repeat will keep repeating me every second until shutdown is signaled.
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
	defer cancel()

	if err := task(ctx); err != nil {
		log.Println("ERROR", err.Error())
	}
}

func WithShutdown

func WithShutdown[StartFn, StopFn genericTask](start StartFn, stop StopFn) Task

WithShutdown will combine the start and stop/shutdown function into a single Task function. It supports a graceful shutdown period; upon reaching the deadline, it will cancel the context passed to the shutdown function. WithShutdown makes it easy to use components with graceful shutdown support as a Task, such as the http.Server.

tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown)
Example
package main

import (
	"context"
	"log"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	task := tasker.WithShutdown(
		func(ctx context.Context) error {
			// start working
			<-ctx.Done()
			return nil
		},
		func(ctx context.Context) error {
			// graceful stop for work
			<-ctx.Done()
			return nil
		},
	)

	ctx, cancel := context.WithCancel(context.Background())
	// listen to a cancellation signal and then call the cancel func
	// or use ShutdownManager.
	_ = cancel

	if err := task(ctx); err != nil {
		log.Println("ERROR", err.Error())
	}
}
Example (HttpServer)
package main

import (
	"context"
	"log"
	"net/http"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}
	httpServerTask := tasker.WithShutdown(
		tasker.IgnoreError(srv.ListenAndServe, http.ErrServerClosed),
		srv.Shutdown,
	)

	if err := tasker.WithSignalNotify(httpServerTask)(context.Background()); err != nil {
		log.Println("ERROR", err.Error())
	}
}

func WithSignalNotify

func WithSignalNotify[TFN genericTask](tfn TFN, shutdownSignals ...os.Signal) Task
Example
package main

import (
	"context"
	"log"
	"net/http"

	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	srv := http.Server{
		Addr: "localhost:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.WriteHeader(http.StatusTeapot)
		}),
	}

	task := tasker.WithShutdown(srv.ListenAndServe, srv.Shutdown)
	task = tasker.WithSignalNotify(task)

	if err := task(context.Background()); err != nil {
		log.Println("ERROR", err.Error())
	}
}

func (Task) Run

func (fn Task) Run(ctx context.Context) error

Run method supplies Runnable interface for Task.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL