retrypool

package module
v0.0.0-...-4fac6ad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2025 License: MIT Imports: 15 Imported by: 3

README

retrypool

GoDoc
Go Report Card

retrypool is a Go library designed for scenarios where you have multiple workers that need to process the same type of tasks but each worker requires exclusive access to a resource. The pool handles task distribution, retries on failures, and lets you dynamically manage workers as your resource availability changes.

The key focus is on managing tasks that must be processed sequentially by workers with unique resources - rather than just parallelizing work across identical workers, retrypool helps you orchestrate task processing when your worker count is determined by how many unique resource instances (credentials, connections, etc.) you have available.

In many workloads, workers aren’t identical: they come with their own credentials, rate limits, or access restrictions. A simple example might be several API keys, each with different usage quotas. Rather than treating all workers as the same, retrypool recognizes these differences. It’s built so that each worker can carry its own conditions, while you just submit tasks and let the pool handle the rest.

If one worker frequently hits a rate limit, retrypool can reroute subsequent tasks to other workers that still have capacity. If another worker uses a premium credential, it can receive specialized tasks without manual effort. Over time, you can add, remove, pause, or resume workers as your environment changes, and the pool will smoothly adjust. Instead of juggling these details yourself, you define the basic rules, and retrypool ensures that tasks reach the most suitable worker—even as each worker’s unique constraints and availability evolve.

Features

  • Generic Task Support: Type-safe task processing with Go generics

  • Flexible Worker Management:

    • Dynamic worker addition/removal
    • Worker pause/resume capabilities
    • Synchronous or asynchronous operation modes
    • Built-in worker lifecycle hooks (OnStart, OnStop, OnPause, OnResume, OnRemove)
  • Task Processing:

    • Configurable retry attempts and delays
    • Multiple retry policies (fixed, exponential backoff)
    • Custom retry conditions
    • Task timeouts (per-attempt and total)
    • Immediate retry and bounce retry options
    • Dead task management with history tracking
  • Additional Features:

    • Rate limiting
    • Maximum queue size limits
    • Deadtask limits
    • Request-response pattern support
    • Task state transitions tracking
    • Panic recovery and handling

Installation

go get github.com/davidroman0O/retrypool

Basic Usage

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/davidroman0O/retrypool"
)

// Define your worker
type MyWorker struct {
    ID int
}

// Implement the Worker interface
func (w *MyWorker) Run(ctx context.Context, data int) error {
    fmt.Printf("Worker %d processing: %d\n", w.ID, data)
    return nil
}

func main() {
    ctx := context.Background()
    
    // Create workers
    workers := []retrypool.Worker[int]{
        &MyWorker{},
        &MyWorker{},
    }
    
    // Initialize pool with options
    pool := retrypool.New(ctx, workers,
        retrypool.WithAttempts[int](3),
        retrypool.WithDelay[int](time.Second),
        retrypool.WithRateLimit[int](10.0), // 10 tasks per second
    )

    // Submit tasks
    for i := 0; i < 5; i++ {
        if err := pool.Submit(i); err != nil {
            fmt.Printf("Failed to submit task: %v\n", err)
        }
    }

    // Wait for completion
    pool.WaitWithCallback(ctx, func(queueSize, processingCount, deadTaskCount int) bool {
        return queueSize > 0 || processingCount > 0
    }, time.Second)

    pool.Close()
}

Advanced Configuration

Custom Retry Policy
pool := retrypool.New(ctx, workers,
    retrypool.WithRetryPolicy[int](&retrypool.ExponentialBackoffRetryPolicy[int]{
        BaseDelay: time.Second,
        MaxDelay:  time.Minute,
        MaxJitter: time.Second,
    }),
)
Worker Lifecycle Management
// Add a new worker
newWorker := &MyWorker{}
pool.Add(newWorker, nil)

// Pause a worker
pool.Pause(workerID)

// Resume a worker
pool.Resume(workerID)

// Remove a worker
pool.Remove(workerID)
Task Options
// Submit with immediate retry
pool.Submit(data, retrypool.WithImmediateRetry[int]())

// Submit with timeout
pool.Submit(data, retrypool.WithTimeout[int](time.Minute))

// Submit with bounce retry (retry on different workers)
pool.Submit(data, retrypool.WithBounceRetry[int]())
Handling Tasks with Unique Credentials and Bounce Retry

Consider a scenario where you have multiple API tokens, each worker holding its own. Some tasks might fail with one token due to rate limiting, transient errors, or resource constraints, but could succeed under a different token’s conditions. By using bounce retry, retrypool automatically routes failed tasks to other workers with different tokens on subsequent attempts. You don’t need to manually shuffle tasks around—the pool takes care of it.

In the example below, each worker is associated with its own token (tokenA, tokenB, tokenC). Tasks may randomly fail in this simulation. When that happens, retrypool tries the task again—potentially handing it to a different worker with a different token. Over time, this improves the chance that tasks find a suitable environment to succeed, without any extra logic from you.

type APIWorker struct {
	ID int
}

func (w *APIWorker) OnStart(ctx context.Context) {
	fmt.Printf("Worker with id %d started\n", w.ID)
}

func (w *APIWorker) Run(ctx context.Context, task *retrypool.RequestResponse[string, error]) error {
	fmt.Printf("Task '%s' started with id %d\n", task.Request, w.ID)

	if w.ID == 1 {
		fmt.Printf("\tTask '%s' failed with id %d\n", task.Request, w.ID)
		return fmt.Errorf("failed with id %d", w.ID)
	}

	task.Complete(nil)
	fmt.Printf("Task '%s' succeeded with id %d\n", task.Request, w.ID)
	return nil
}

func main() {
	ctx := context.Background()

	workers := []retrypool.Worker[*retrypool.RequestResponse[string, error]]{
		&APIWorker{},
		&APIWorker{},
	}

	pool := retrypool.New(ctx, workers,
		retrypool.WithAttempts[*retrypool.RequestResponse[string, error]](5),
	)

	for i := 0; i < 20; i++ {
		req := retrypool.NewRequestResponse[string, error](fmt.Sprintf("Task #%d", i))

		if err := pool.Submit(req, retrypool.WithBounceRetry[*retrypool.RequestResponse[string, error]]()); err != nil {
			fmt.Printf("Failed to submit task: %v\n", err)
		}
	}

	pool.WaitWithCallback(ctx, func(q, p, d int) bool {
		return q > 0 && p > 0 && d > 0
	}, 500*time.Millisecond)

	pool.Close()

	pool.RangeWorkerQueues(func(workerID int, queueSize int64) bool {
		fmt.Println("WorkerID:", workerID, "QueueSize:", queueSize)
		return true
	})

	pool.RangeTaskQueues(func(workerID int, queue retrypool.TaskQueue[*retrypool.RequestResponse[string, error]]) bool {
		fmt.Println("WorkerID:", workerID, "Size:", queue.Length())
		return true
	})
}

Examples

The following examples demonstrate various use cases and features:

Documentation

For detailed API documentation and more examples, visit:

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

logs.go

Index

Constants

View Source
const RetrypoolMetadataKey = "$retrypool"

Variables

View Source
var (
	ErrGroupNotFound     = errors.New("group not found")
	ErrGroupEnded        = errors.New("group already ended")
	ErrGroupFailed       = errors.New("group has failed")
	ErrInvalidGroupID    = errors.New("invalid group ID")
	ErrTaskGroupMismatch = errors.New("task group ID does not match submit group ID")
)
View Source
var (
	ErrPoolClosed           = errors.New("pool is closed")
	ErrRateLimitExceeded    = errors.New("rate limit exceeded")
	ErrNoWorkersAvailable   = errors.New("no workers available")
	ErrInvalidWorkerID      = errors.New("invalid worker ID")
	ErrMaxQueueSizeExceeded = errors.New("max queue size exceeded")
	ErrTaskTimeout          = errors.New("task timeout")
)

Predefined errors

Functions

func SetTaskMetadata

func SetTaskMetadata(ctx context.Context, metadata map[string]any) error

Types

type BlockingConfig

type BlockingConfig[T any] struct {

	// Task callbacks
	OnTaskSubmitted func(task T)
	OnTaskStarted   func(task T)
	OnTaskCompleted func(task T)
	OnTaskFailed    func(task T, err error)
	// Group/Pool callbacks
	OnGroupCreated   func(groupID any)
	OnGroupCompleted func(groupID any, tasks []T)
	OnGroupFailed    func(groupID any, tasks []T)
	OnGroupRemoved   func(groupID any, tasks []T)
	OnPoolCreated    func(groupID any)
	OnPoolDestroyed  func(groupID any)
	OnWorkerAdded    func(groupID any, workerID int)
	OnWorkerRemoved  func(groupID any, workerID int)
	OnPoolClosed     func()
	// contains filtered or unexported fields
}

BlockingConfig holds basic config - each active group gets its own pool of workers

type BlockingDependentTask

type BlockingDependentTask[GID comparable, TID comparable] interface {
	GetGroupID() GID
	GetTaskID() TID
}

type BlockingMetricsSnapshot

type BlockingMetricsSnapshot[T any, GID comparable] struct {
	TotalTasksSubmitted int64
	TotalTasksProcessed int64
	TotalTasksSucceeded int64
	TotalTasksFailed    int64
	TotalDeadTasks      int64
	Metrics             []GroupBlockingMetricSnapshot[T, GID]
}

type BlockingPool

type BlockingPool[T any, GID comparable, TID comparable] struct {
	// contains filtered or unexported fields
}

BlockingPool manages multiple pools, assigning one to each active group

func NewBlockingPool

func NewBlockingPool[T any, GID comparable, TID comparable](
	ctx context.Context,
	opt ...BlockingPoolOption[T],
) (*BlockingPool[T, GID, TID], error)

NewBlockingPool constructs a BlockingPool with the given options.

func (*BlockingPool[T, GID, TID]) Close

func (p *BlockingPool[T, GID, TID]) Close() error

Close shuts down all pools

func (*BlockingPool[T, GID, TID]) GetSnapshot

func (p *BlockingPool[T, GID, TID]) GetSnapshot() BlockingMetricsSnapshot[T, GID]

func (*BlockingPool[T, GID, TID]) RangeTaskQueues

func (p *BlockingPool[T, GID, TID]) RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool)

func (*BlockingPool[T, GID, TID]) RangeWorkerQueues

func (p *BlockingPool[T, GID, TID]) RangeWorkerQueues(f func(workerID int, queueSize int64) bool)

func (*BlockingPool[T, GID, TID]) RangeWorkers

func (p *BlockingPool[T, GID, TID]) RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)

func (*BlockingPool[T, GID, TID]) SetConcurrentPools

func (p *BlockingPool[T, GID, TID]) SetConcurrentPools(max int)

Scale the amount of parallel pools that can be active at the same time

func (*BlockingPool[T, GID, TID]) SetConcurrentWorkers

func (p *BlockingPool[T, GID, TID]) SetConcurrentWorkers(max int)

Scale the amount of workers that can be active at the same time on each pool

func (*BlockingPool[T, GID, TID]) Submit

func (p *BlockingPool[T, GID, TID]) Submit(data T, opt ...SubmitOption[T]) error

Submit handles task submission, managing group pools as needed

func (*BlockingPool[T, GID, TID]) WaitForTask

func (p *BlockingPool[T, GID, TID]) WaitForTask(groupID GID, taskID TID) error

WaitForTask blocks until the specified task completes

func (*BlockingPool[T, GID, TID]) WaitWithCallback

func (p *BlockingPool[T, GID, TID]) WaitWithCallback(
	ctx context.Context,
	callback func(queueSize, processingCount, deadTaskCount int) bool,
	interval time.Duration,
) error

WaitWithCallback monitors all pool progress

type BlockingPoolOption

type BlockingPoolOption[T any] func(*BlockingConfig[T])

BlockingPoolOption is a functional option for configuring the blocking pool

func WithBlockingGetData

func WithBlockingGetData[T any](cb func(T) interface{}) BlockingPoolOption[T]

func WithBlockingLogger

func WithBlockingLogger[T any](logger Logger) BlockingPoolOption[T]

func WithBlockingMaxActivePools

func WithBlockingMaxActivePools[T any](max int) BlockingPoolOption[T]

func WithBlockingMaxWorkersPerPool

func WithBlockingMaxWorkersPerPool[T any](max int) BlockingPoolOption[T]

func WithBlockingOnGroupCompleted

func WithBlockingOnGroupCompleted[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]

func WithBlockingOnGroupCreated

func WithBlockingOnGroupCreated[T any](cb func(groupID any)) BlockingPoolOption[T]

func WithBlockingOnGroupFailed

func WithBlockingOnGroupFailed[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]

func WithBlockingOnGroupRemoved

func WithBlockingOnGroupRemoved[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]

func WithBlockingOnPoolClosed

func WithBlockingOnPoolClosed[T any](cb func()) BlockingPoolOption[T]

func WithBlockingOnPoolCreated

func WithBlockingOnPoolCreated[T any](cb func(groupID any)) BlockingPoolOption[T]

func WithBlockingOnPoolDestroyed

func WithBlockingOnPoolDestroyed[T any](cb func(groupID any)) BlockingPoolOption[T]

func WithBlockingOnTaskCompleted

func WithBlockingOnTaskCompleted[T any](cb func(task T)) BlockingPoolOption[T]

func WithBlockingOnTaskFailed

func WithBlockingOnTaskFailed[T any](cb func(task T, err error)) BlockingPoolOption[T]

func WithBlockingOnTaskStarted

func WithBlockingOnTaskStarted[T any](cb func(task T)) BlockingPoolOption[T]

func WithBlockingOnTaskSubmitted

func WithBlockingOnTaskSubmitted[T any](cb func(task T)) BlockingPoolOption[T]

func WithBlockingOnWorkerAdded

func WithBlockingOnWorkerAdded[T any](cb func(groupID any, workerID int)) BlockingPoolOption[T]

func WithBlockingOnWorkerRemoved

func WithBlockingOnWorkerRemoved[T any](cb func(groupID any, workerID int)) BlockingPoolOption[T]

func WithBlockingSnapshotHandler

func WithBlockingSnapshotHandler[T any](cb func()) BlockingPoolOption[T]

func WithBlockingWorkerFactory

func WithBlockingWorkerFactory[T any](factory WorkerFactory[T]) BlockingPoolOption[T]

Configuration options

type BlockingRequestResponse

type BlockingRequestResponse[T any, R any, GID comparable, TID comparable] struct {
	// contains filtered or unexported fields
}

BlockingRequestResponse manages the lifecycle of a task request and its response

func NewBlockingRequestResponse

func NewBlockingRequestResponse[T any, R any, GID comparable, TID comparable](request T, gid GID, tid TID) *BlockingRequestResponse[T, R, GID, TID]

NewBlockingRequestResponse creates a new BlockingRequestResponse instance

func (*BlockingRequestResponse[T, R, GID, TID]) Complete

func (rr *BlockingRequestResponse[T, R, GID, TID]) Complete(response R)

Complete safely marks the request as complete with a response

func (*BlockingRequestResponse[T, R, GID, TID]) CompleteWithError

func (rr *BlockingRequestResponse[T, R, GID, TID]) CompleteWithError(err error)

CompleteWithError safely marks the request as complete with an error

func (*BlockingRequestResponse[T, R, GID, TID]) ConsultRequest

func (rr *BlockingRequestResponse[T, R, GID, TID]) ConsultRequest(fn func(T) error) error

Safely consults the request data

func (*BlockingRequestResponse[T, R, GID, TID]) Done

func (rr *BlockingRequestResponse[T, R, GID, TID]) Done() <-chan struct{}

Done returns a channel that's closed when the request is complete

func (*BlockingRequestResponse[T, R, GID, TID]) Err

func (rr *BlockingRequestResponse[T, R, GID, TID]) Err() error

Err returns any error that occurred during the request

func (BlockingRequestResponse[T, R, GID, TID]) GetGroupID

func (rr BlockingRequestResponse[T, R, GID, TID]) GetGroupID() GID

func (BlockingRequestResponse[T, R, GID, TID]) GetTaskID

func (rr BlockingRequestResponse[T, R, GID, TID]) GetTaskID() TID

func (*BlockingRequestResponse[T, R, GID, TID]) Wait

func (rr *BlockingRequestResponse[T, R, GID, TID]) Wait(ctx context.Context) (R, error)

Wait waits for the request to complete and returns the response and any error

type CircularQueue

type CircularQueue[T any] struct {
	// contains filtered or unexported fields
}

CircularQueue is a fixed-size circular queue

func (*CircularQueue[T]) Clear

func (q *CircularQueue[T]) Clear()

func (*CircularQueue[T]) Dequeue

func (q *CircularQueue[T]) Dequeue() (*Task[T], bool)

func (*CircularQueue[T]) Drain

func (q *CircularQueue[T]) Drain() []*Task[T]

func (*CircularQueue[T]) Enqueue

func (q *CircularQueue[T]) Enqueue(task *Task[T])

func (*CircularQueue[T]) Length

func (q *CircularQueue[T]) Length() int

func (*CircularQueue[T]) PriorityEnqueue

func (q *CircularQueue[T]) PriorityEnqueue(task *Task[T]) error

type Config

type Config[T any] struct {
	// contains filtered or unexported fields
}

Config holds configurations for the pool

type DeadTask

type DeadTask[T any] struct {
	Data          T
	Retries       int
	TotalDuration time.Duration
	Errors        []error
	Reason        string
	StateHistory  []*TaskStateTransition[T]
}

DeadTask represents a task that has failed permanently

type DelayFunc

type DelayFunc[T any] func(retries int, err error, config *Config[T]) time.Duration

DelayFunc defines a function to calculate delay before retrying a task

type DependencyGraph

type DependencyGraph[TID comparable] struct {
	Nodes map[TID]*Node[TID]
	Order []TID // Topologically sorted execution order
}

DependencyGraph represents the complete dependency structure for a task group

type DependencyMode

type DependencyMode int

DependencyMode defines how dependencies are interpreted

const (
	// ForwardMode means tasks depend on previous tasks (A depends on B means A waits for B)
	ForwardMode DependencyMode = iota
	// ReverseMode means tasks depend on subsequent tasks (A depends on B means B waits for A)
	ReverseMode
)

type ExponentialBackoffRetryPolicy

type ExponentialBackoffRetryPolicy[T any] struct {
	BaseDelay time.Duration
	MaxDelay  time.Duration
	MaxJitter time.Duration
}

ExponentialBackoffRetryPolicy implements exponential backoff with jitter

func (ExponentialBackoffRetryPolicy[T]) ComputeDelay

func (p ExponentialBackoffRetryPolicy[T]) ComputeDelay(retries int, err error, config *Config[T]) time.Duration

ComputeDelay computes the delay using exponential backoff with jitter

type FixedDelayRetryPolicy

type FixedDelayRetryPolicy[T any] struct {
	Delay     time.Duration
	MaxJitter time.Duration
}

FixedDelayRetryPolicy implements a fixed delay between retries

func (FixedDelayRetryPolicy[T]) ComputeDelay

func (p FixedDelayRetryPolicy[T]) ComputeDelay(retries int, err error, config *Config[T]) time.Duration

ComputeDelay computes the delay using fixed delay with jitter

type GroupBlockingMetricSnapshot

type GroupBlockingMetricSnapshot[T any, GID comparable] struct {
	GroupID GID
	MetricsSnapshot[T]
}

type GroupMetricSnapshot

type GroupMetricSnapshot[T any, GID comparable] struct {
	GroupID         GID
	PoolID          uint
	HasPool         bool
	IsPending       bool
	TasksPending    int
	MetricsSnapshot MetricsSnapshot[T]
}

type GroupMetricsSnapshot

type GroupMetricsSnapshot[T any, GID comparable] struct {
	// These totals are GLOBAL and never reset
	TotalTasksSubmitted int64
	TotalTasksProcessed int64
	TotalTasksSucceeded int64
	TotalTasksFailed    int64
	TotalDeadTasks      int64

	TotalPools        int
	ActivePools       int
	TotalPendingTasks int
	GroupsWithPending int

	Pools        []PoolMetricsSnapshot[T]
	PendingTasks []PendingTasksSnapshot[GID]
	Metrics      []GroupMetricSnapshot[T, GID]
}

func (GroupMetricsSnapshot[T, GID]) Clone

func (m GroupMetricsSnapshot[T, GID]) Clone() GroupMetricsSnapshot[T, GID]

Clone creates a deep copy of the snapshot

type GroupPool

type GroupPool[T GroupTask[GID], GID comparable] struct {
	// contains filtered or unexported fields
}

func NewGroupPool

func NewGroupPool[T GroupTask[GID], GID comparable](
	ctx context.Context,
	opts ...GroupPoolOption[T, GID],
) (*GroupPool[T, GID], error)

NewGroupPool creates a new GroupPool instance with provided options

func (*GroupPool[T, GID]) Close

func (gp *GroupPool[T, GID]) Close() error

Close gracefully shuts down all pools and cleans up resources

func (*GroupPool[T, GID]) EndGroup

func (gp *GroupPool[T, GID]) EndGroup(gid GID) error

EndGroup signals no more tasks for the group, but allows existing tasks to complete

func (*GroupPool[T, GID]) GetSnapshot

func (gp *GroupPool[T, GID]) GetSnapshot() GroupMetricsSnapshot[T, GID]

GetSnapshot returns current metrics snapshot

func (*GroupPool[T, GID]) SetMaxActivePools

func (gp *GroupPool[T, GID]) SetMaxActivePools(max int)

func (*GroupPool[T, GID]) SetMaxWorkersPerPool

func (gp *GroupPool[T, GID]) SetMaxWorkersPerPool(max int)

func (*GroupPool[T, GID]) ShouldWaitAll

func (gp *GroupPool[T, GID]) ShouldWaitAll() bool

func (*GroupPool[T, GID]) Submit

func (gp *GroupPool[T, GID]) Submit(gid GID, task T, opts ...GroupTaskOption[T, GID]) error

Submit task to a group

func (*GroupPool[T, GID]) WaitAll

func (gp *GroupPool[T, GID]) WaitAll(ctx context.Context) error

func (*GroupPool[T, GID]) WaitGroup

func (gp *GroupPool[T, GID]) WaitGroup(ctx context.Context, gid GID) error

WaitGroup waits for all tasks in a group to complete

type GroupPoolConfig

type GroupPoolConfig[T GroupTask[GID], GID comparable] struct {
	Logger            Logger
	WorkerFactory     WorkerFactory[T]
	MaxActivePools    int
	MaxWorkersPerPool int
	UseFreeWorkerOnly bool
	GroupMustSucceed  bool // By default a group can have deadtask/failure, if true the pending tasks will be discarded

	OnTaskSuccess func(gid GID, poolID uint, data T, metadata map[string]any)
	OnTaskFailure func(gid GID, poolID uint, data T, metadata map[string]any, err error) TaskAction

	// GroupPool specific callback handled either OnTaskSuccess or OnTaskFailure
	OnTaskExecuted func(gid GID, poolID uint, data T, metadata map[string]any, err error)

	OnSnapshot func(snapshot GroupMetricsSnapshot[T, GID])

	OnGroupStarts func(gid GID, poolID uint)
	OnGroupEnds   func(gid GID, poolID uint)
	OnGroupFails  func(gid GID, poolID uint, pendingTasks []T) // Only if GroupMustSucceed is enabled
	// contains filtered or unexported fields
}

GroupPoolConfig holds configuration options for the GroupPool

type GroupPoolOption

type GroupPoolOption[T GroupTask[GID], GID comparable] func(*GroupPoolConfig[T, GID])

GroupOption is a functional option for configuring a GroupPool

func WithGroupPoolGroupMustSucceed

func WithGroupPoolGroupMustSucceed[T GroupTask[GID], GID comparable](must bool) GroupPoolOption[T, GID]

func WithGroupPoolLogger

func WithGroupPoolLogger[T GroupTask[GID], GID comparable](logger Logger) GroupPoolOption[T, GID]

func WithGroupPoolMaxActivePools

func WithGroupPoolMaxActivePools[T GroupTask[GID], GID comparable](max int) GroupPoolOption[T, GID]

func WithGroupPoolMaxWorkersPerPool

func WithGroupPoolMaxWorkersPerPool[T GroupTask[GID], GID comparable](max int) GroupPoolOption[T, GID]

func WithGroupPoolMetadata

func WithGroupPoolMetadata[T GroupTask[GID], GID comparable](m *Metadata) GroupPoolOption[T, GID]

func WithGroupPoolOnGroupEnds

func WithGroupPoolOnGroupEnds[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint)) GroupPoolOption[T, GID]

func WithGroupPoolOnGroupFails

func WithGroupPoolOnGroupFails[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, pendingTasks []T)) GroupPoolOption[T, GID]

func WithGroupPoolOnGroupStarts

func WithGroupPoolOnGroupStarts[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint)) GroupPoolOption[T, GID]

func WithGroupPoolOnSnapshot

func WithGroupPoolOnSnapshot[T GroupTask[GID], GID comparable](cb func(snapshot GroupMetricsSnapshot[T, GID])) GroupPoolOption[T, GID]

func WithGroupPoolOnTaskExecuted

func WithGroupPoolOnTaskExecuted[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any, err error)) GroupPoolOption[T, GID]

func WithGroupPoolOnTaskFailure

func WithGroupPoolOnTaskFailure[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any, err error) TaskAction) GroupPoolOption[T, GID]

func WithGroupPoolOnTaskSuccess

func WithGroupPoolOnTaskSuccess[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any)) GroupPoolOption[T, GID]

func WithGroupPoolUseFreeWorkerOnly

func WithGroupPoolUseFreeWorkerOnly[T GroupTask[GID], GID comparable]() GroupPoolOption[T, GID]

func WithGroupPoolWorkerFactory

func WithGroupPoolWorkerFactory[T GroupTask[GID], GID comparable](wf WorkerFactory[T]) GroupPoolOption[T, GID]

type GroupRequestResponse

type GroupRequestResponse[T any, R any, GID comparable] struct {
	// contains filtered or unexported fields
}

GroupRequestResponse manages a request/response pair for tasks in a group.

func NewGroupRequestResponse

func NewGroupRequestResponse[T any, R any, GID comparable](req T, gid GID) *GroupRequestResponse[T, R, GID]

func (*GroupRequestResponse[T, R, GID]) Complete

func (rr *GroupRequestResponse[T, R, GID]) Complete(resp R)

func (*GroupRequestResponse[T, R, GID]) CompleteWithError

func (rr *GroupRequestResponse[T, R, GID]) CompleteWithError(err error)

func (*GroupRequestResponse[T, R, GID]) ConsultRequest

func (rr *GroupRequestResponse[T, R, GID]) ConsultRequest(fn func(T) error) error

func (*GroupRequestResponse[T, R, GID]) Done

func (rr *GroupRequestResponse[T, R, GID]) Done() <-chan struct{}

func (*GroupRequestResponse[T, R, GID]) Err

func (rr *GroupRequestResponse[T, R, GID]) Err() error

func (GroupRequestResponse[T, R, GID]) GetGroupID

func (rr GroupRequestResponse[T, R, GID]) GetGroupID() GID

func (*GroupRequestResponse[T, R, GID]) Wait

func (rr *GroupRequestResponse[T, R, GID]) Wait(ctx context.Context) (R, error)

type GroupTask

type GroupTask[GID comparable] interface {
	GetGroupID() GID
}

GroupTask is the interface for tasks that have a group ID

type GroupTaskOption

type GroupTaskOption[T any, GID comparable] func(*groupSubmitConfig)

func WithTaskGroupMetadata

func WithTaskGroupMetadata[T any, GID comparable](metadata map[string]any) GroupTaskOption[T, GID]

func WithTaskGroupProcessedNotification

func WithTaskGroupProcessedNotification[T any, GID comparable](notification *ProcessedNotification) GroupTaskOption[T, GID]

func WithTaskGroupQueueNotification

func WithTaskGroupQueueNotification[T any, GID comparable](notification *QueuedNotification) GroupTaskOption[T, GID]

type GrowingCircularQueue

type GrowingCircularQueue[T any] struct {
	// contains filtered or unexported fields
}

GrowingCircularQueue is a circular queue that grows when full

func (*GrowingCircularQueue[T]) Clear

func (q *GrowingCircularQueue[T]) Clear()

func (*GrowingCircularQueue[T]) Dequeue

func (q *GrowingCircularQueue[T]) Dequeue() (*Task[T], bool)

func (*GrowingCircularQueue[T]) Drain

func (q *GrowingCircularQueue[T]) Drain() []*Task[T]

func (*GrowingCircularQueue[T]) Enqueue

func (q *GrowingCircularQueue[T]) Enqueue(task *Task[T])

func (*GrowingCircularQueue[T]) Length

func (q *GrowingCircularQueue[T]) Length() int

func (*GrowingCircularQueue[T]) PriorityEnqueue

func (q *GrowingCircularQueue[T]) PriorityEnqueue(task *Task[T]) error

type GrowingRingBufferQueue

type GrowingRingBufferQueue[T any] struct {
	// contains filtered or unexported fields
}

GrowingRingBufferQueue is a ring buffer-based queue that grows when full

func (*GrowingRingBufferQueue[T]) Clear

func (q *GrowingRingBufferQueue[T]) Clear()

func (*GrowingRingBufferQueue[T]) Dequeue

func (q *GrowingRingBufferQueue[T]) Dequeue() (*Task[T], bool)

func (*GrowingRingBufferQueue[T]) Drain

func (q *GrowingRingBufferQueue[T]) Drain() []*Task[T]

func (*GrowingRingBufferQueue[T]) Enqueue

func (q *GrowingRingBufferQueue[T]) Enqueue(task *Task[T])

func (*GrowingRingBufferQueue[T]) Length

func (q *GrowingRingBufferQueue[T]) Length() int

func (*GrowingRingBufferQueue[T]) PriorityEnqueue

func (q *GrowingRingBufferQueue[T]) PriorityEnqueue(task *Task[T]) error

type IndependentConfig

type IndependentConfig[T any] struct {

	// Task callbacks
	OnTaskSubmitted func(task T)
	OnTaskStarted   func(task T)
	OnTaskCompleted func(task T)
	OnTaskFailed    func(task T, err error)
	// Group callbacks
	OnGroupCreated   func(groupID any)
	OnGroupCompleted func(groupID any)
	OnGroupRemoved   func(groupID any, tasks []T)
	// Pool events
	OnWorkerAdded   func(workerID int)
	OnWorkerRemoved func(workerID int)
	OnPoolClosed    func()
	// contains filtered or unexported fields
}

type IndependentDependentTask

type IndependentDependentTask[GID comparable, TID comparable] interface {
	GetDependencies() []TID
	GetGroupID() GID
	GetTaskID() TID
}

type IndependentPool

type IndependentPool[T any, GID comparable, TID comparable] struct {
	// contains filtered or unexported fields
}

func NewIndependentPool

func NewIndependentPool[T any, GID comparable, TID comparable](
	ctx context.Context,
	opt ...IndependentPoolOption[T],
) (*IndependentPool[T, GID, TID], error)

func (*IndependentPool[T, GID, TID]) Close

func (p *IndependentPool[T, GID, TID]) Close() error

Close gracefully shuts down the pool

func (*IndependentPool[T, GID, TID]) GetGroupStatus

func (p *IndependentPool[T, GID, TID]) GetGroupStatus(groupID GID) (completed, total int, err error)

GetGroupStatus returns the status of a task group

func (*IndependentPool[T, GID, TID]) PullDeadTask

func (i *IndependentPool[T, GID, TID]) PullDeadTask(id int) (*DeadTask[T], error)

func (*IndependentPool[T, GID, TID]) Submit

func (p *IndependentPool[T, GID, TID]) Submit(tasks []T) error

Submit submits a complete group of tasks with dependencies

func (*IndependentPool[T, GID, TID]) WaitForGroup

func (p *IndependentPool[T, GID, TID]) WaitForGroup(ctx context.Context, groupID GID) error

WaitForGroup waits for all tasks in a group to complete

func (*IndependentPool[T, GID, TID]) WaitWithCallback

func (p *IndependentPool[T, GID, TID]) WaitWithCallback(
	ctx context.Context,
	callback func(queueSize, processingCount, deadTaskCount int) bool,
	interval time.Duration,
) error

WaitWithCallback waits for the pool to complete while calling a callback function

type IndependentPoolOption

type IndependentPoolOption[T any] func(*IndependentConfig[T])

func WithIndependentDependencyMode

func WithIndependentDependencyMode[T any](mode DependencyMode) IndependentPoolOption[T]

func WithIndependentOnDeadTask

func WithIndependentOnDeadTask[T any](handler func(deadTaskIndex int)) IndependentPoolOption[T]

func WithIndependentOnGroupCompleted

func WithIndependentOnGroupCompleted[T any](cb func(groupID any)) IndependentPoolOption[T]

func WithIndependentOnGroupCreated

func WithIndependentOnGroupCreated[T any](cb func(groupID any)) IndependentPoolOption[T]

func WithIndependentOnGroupRemoved

func WithIndependentOnGroupRemoved[T any](cb func(groupID any, tasks []T)) IndependentPoolOption[T]

func WithIndependentOnPoolClosed

func WithIndependentOnPoolClosed[T any](cb func()) IndependentPoolOption[T]

func WithIndependentOnTaskCompleted

func WithIndependentOnTaskCompleted[T any](cb func(task T)) IndependentPoolOption[T]

func WithIndependentOnTaskFailed

func WithIndependentOnTaskFailed[T any](cb func(task T, err error)) IndependentPoolOption[T]

func WithIndependentOnTaskStarted

func WithIndependentOnTaskStarted[T any](cb func(task T)) IndependentPoolOption[T]

func WithIndependentOnTaskSubmitted

func WithIndependentOnTaskSubmitted[T any](cb func(task T)) IndependentPoolOption[T]

func WithIndependentOnWorkerAdded

func WithIndependentOnWorkerAdded[T any](cb func(workerID int)) IndependentPoolOption[T]

func WithIndependentOnWorkerRemoved

func WithIndependentOnWorkerRemoved[T any](cb func(workerID int)) IndependentPoolOption[T]

func WithIndependentWorkerFactory

func WithIndependentWorkerFactory[T any](factory WorkerFactory[T]) IndependentPoolOption[T]

func WithIndependentWorkerLimits

func WithIndependentWorkerLimits[T any](min, max int) IndependentPoolOption[T]

type LogFormat

type LogFormat string
const (
	TextFormat LogFormat = "text"
	JSONFormat LogFormat = "json"
)

type LogLevel

type LogLevel = slog.Level

type LogOption

type LogOption func(*loggerConfig)

LogOption defines functional options for logger configuration

func WithFormat

func WithFormat(format LogFormat) LogOption

WithFormat sets the log format

func WithOutput

func WithOutput(output *os.File) LogOption

WithOutput sets a custom output file

type Logger

type Logger interface {
	Debug(ctx context.Context, msg string, keysAndValues ...any)
	Info(ctx context.Context, msg string, keysAndValues ...any)
	Warn(ctx context.Context, msg string, keysAndValues ...any)
	Error(ctx context.Context, msg string, keysAndValues ...any)
	WithFields(fields map[string]any) Logger
	Enable()
	Disable()
}

Logger provides structured logging with source tracking

func NewLogger

func NewLogger(level LogLevel, opts ...LogOption) Logger

NewLogger creates a new logger with the given options

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

func NewMetadata

func NewMetadata() *Metadata

func (*Metadata) Clone

func (m *Metadata) Clone() *Metadata

func (*Metadata) CloneStore

func (m *Metadata) CloneStore() map[string]any

func (*Metadata) Get

func (m *Metadata) Get(key string) any

func (*Metadata) Merge

func (m *Metadata) Merge(other *Metadata)

func (*Metadata) Set

func (m *Metadata) Set(key string, value any)

type Metrics

type Metrics struct {
	TasksSubmitted atomic.Int64
	TasksProcessed atomic.Int64
	TasksSucceeded atomic.Int64
	TasksFailed    atomic.Int64
	DeadTasks      atomic.Int64
}

Metrics holds metrics for the pool

type MetricsSnapshot

type MetricsSnapshot[T any] struct {
	TasksSubmitted int64
	TasksProcessed int64
	TasksSucceeded int64
	TasksFailed    int64
	DeadTasks      int64
	TaskQueues     map[int]int
	Workers        map[int]WorkerSnapshot[T]
}

func (MetricsSnapshot[T]) Clone

func (m MetricsSnapshot[T]) Clone() MetricsSnapshot[T]

type NoWorkerPolicy

type NoWorkerPolicy int

NoWorkerPolicy defines the behavior when no workers are available

const (
	NoWorkerPolicyReject         NoWorkerPolicy = iota // Default behavior: Reject the task submission with an error
	NoWorkerPolicyAddToDeadTasks                       // Add the task to dead tasks
)

type Node

type Node[TID comparable] struct {
	ID           TID
	Dependencies []TID
	Dependents   []TID
	Visited      bool
	InProgress   bool // Used for cycle detection
}

type Option

type Option[T any] func(*Pool[T])

Option type for configuring the Pool

func WithAttempts

func WithAttempts[T any](attempts int) Option[T]

WithAttempts sets the maximum number of attempts

func WithDeadTasksLimit

func WithDeadTasksLimit[T any](limit int) Option[T]

WithDeadTasksLimit sets the limit for dead tasks

func WithDelay

func WithDelay[T any](delay time.Duration) Option[T]

WithDelay sets the delay between retries

func WithDelayFunc

func WithDelayFunc[T any](delayFunc DelayFunc[T]) Option[T]

WithDelayFunc sets a custom function to determine the delay between retries

func WithIfRetry

func WithIfRetry[T any](retryIf func(err error) bool) Option[T]

WithIfRetry sets the function to determine if an error is retryable

func WithLogger

func WithLogger[T any](logger Logger) Option[T]

WithLogger sets the logger for the pool and all its components

func WithLoopTicker

func WithLoopTicker[T any](d time.Duration) Option[T]

func WithMaxDelay

func WithMaxDelay[T any](maxDelay time.Duration) Option[T]

WithMaxDelay sets the maximum delay between retries

func WithMaxJitter

func WithMaxJitter[T any](maxJitter time.Duration) Option[T]

WithMaxJitter sets the maximum jitter for delay between retries

func WithMaxQueueSize

func WithMaxQueueSize[T any](size int) Option[T]

WithMaxQueueSize sets the maximum queue size for the pool

func WithMetadata

func WithMetadata[T any](metadata *Metadata) Option[T]

func WithNoWorkerPolicy

func WithNoWorkerPolicy[T any](policy NoWorkerPolicy) Option[T]

WithNoWorkerPolicy sets the policy when no workers are available

func WithOnDeadTask

func WithOnDeadTask[T any](handler func(deadTaskIndex int)) Option[T]

WithOnDeadTask sets a callback when a task is added to dead tasks

func WithOnPanic

func WithOnPanic[T any](handler func(recovery interface{}, stackTrace string)) Option[T]

WithOnPanic sets a custom panic handler for the pool

func WithOnTaskAttempt

func WithOnTaskAttempt[T any](handler func(task *Task[T], workerID int)) Option[T]

WithOnTaskAttempt sets a callback that is called whenever a worker attempts a task

func WithOnTaskFailure

func WithOnTaskFailure[T any](handler func(data T, metadata map[string]any, err error) TaskAction) Option[T]

WithOnTaskFailure sets a callback for task failure

func WithOnTaskSuccess

func WithOnTaskSuccess[T any](handler func(data T, metadata map[string]any)) Option[T]

WithOnTaskSuccess sets a callback for successful task completion

func WithOnWorkerPanic

func WithOnWorkerPanic[T any](handler func(workerID int, recovery interface{}, stackTrace string)) Option[T]

WithOnWorkerPanic sets a custom panic handler for workers

func WithRateLimit

func WithRateLimit[T any](rps float64) Option[T]

WithRateLimit sets the rate limit for task submissions. In async mode, the first burst of tasks (up to 2 * number of workers) may be processed immediately before the rate limit takes full effect. This provides faster startup while maintaining the desired steady-state rate.

func WithRetryPolicy

func WithRetryPolicy[T any](policy RetryPolicy[T]) Option[T]

WithRetryPolicy sets a custom retry policy for the pool

func WithRoundRobinDistribution

func WithRoundRobinDistribution[T any]() Option[T]

WithRoundRobinDistribution sets the task distribution to round-robin

func WithSnapshotCallback

func WithSnapshotCallback[T any](callback func(MetricsSnapshot[T])) Option[T]

WithSnapshotCallback sets a callback for snapshot updates

func WithSnapshotInterval

func WithSnapshotInterval[T any](interval time.Duration) Option[T]

WithSnapshotInterval sets the interval for periodic snapshots

func WithSnapshots

func WithSnapshots[T any]() Option[T]

WithSnapshotInterval sets the interval for periodic snapshots

func WithSynchronousMode

func WithSynchronousMode[T any]() Option[T]

WithSynchronousMode sets the pool to synchronous mode

func WithTaskQueueType

func WithTaskQueueType[T any](queueType TaskQueueType) Option[T]

WithTaskQueueType sets the type of task queue

type PendingTasksSnapshot

type PendingTasksSnapshot[GID comparable] struct {
	GroupID      GID
	TasksPending int
}

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool manages a set of workers and tasks

func New

func New[T any](ctx context.Context, workers []Worker[T], options ...Option[T]) *Pool[T]

New initializes the Pool with given workers and options

func (*Pool[T]) Add

func (p *Pool[T]) Add(worker Worker[T], queue TaskQueue[T]) error

Add adds a new worker to the pool

func (*Pool[T]) Close

func (p *Pool[T]) Close() error

Close gracefully shuts down the pool

func (*Pool[T]) DeadTaskCount

func (p *Pool[T]) DeadTaskCount() int64

DeadTaskCount returns the number of dead tasks

func (*Pool[T]) GetFreeWorkers

func (p *Pool[T]) GetFreeWorkers() []int

GetFreeWorkers returns a list of worker IDs that have no tasks in their queue

func (*Pool[T]) GetSnapshot

func (p *Pool[T]) GetSnapshot() MetricsSnapshot[T]

func (*Pool[T]) GetWorkerQueueSize

func (p *Pool[T]) GetWorkerQueueSize(workerID int) int64

func (*Pool[T]) IsAsync

func (p *Pool[T]) IsAsync() bool

func (*Pool[T]) IsRoundRobin

func (p *Pool[T]) IsRoundRobin() bool

func (*Pool[T]) NewTaskQueue

func (p *Pool[T]) NewTaskQueue(queueType TaskQueueType) TaskQueue[T]

NewTaskQueue creates a new task queue of the specified type

func (*Pool[T]) Pause

func (p *Pool[T]) Pause(id int) error

Pause pauses a worker

func (*Pool[T]) ProcessingCount

func (p *Pool[T]) ProcessingCount() int64

ProcessingCount returns the number of tasks currently being processed

func (*Pool[T]) PullDeadTask

func (p *Pool[T]) PullDeadTask(idx int) (*DeadTask[T], error)

PullDeadTask removes and returns a dead task from the pool

func (*Pool[T]) PullRangeDeadTasks

func (p *Pool[T]) PullRangeDeadTasks(from int, to int) ([]*DeadTask[T], error)

PullRangeDeadTasks removes and returns a range of dead tasks from the pool

func (*Pool[T]) QueueSize

func (p *Pool[T]) QueueSize() int64

QueueSize returns the total number of tasks in the queue

func (*Pool[T]) RangeDeadTasks

func (p *Pool[T]) RangeDeadTasks(fn func(*DeadTask[T]) bool)

RangeDeadTasks iterates over all dead tasks

func (*Pool[T]) RangeTaskQueues

func (p *Pool[T]) RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool)

func (*Pool[T]) RangeWorkerQueues

func (p *Pool[T]) RangeWorkerQueues(f func(workerID int, queueSize int64) bool)

func (*Pool[T]) RangeWorkers

func (p *Pool[T]) RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)

func (*Pool[T]) RedistributeAllTasks

func (p *Pool[T]) RedistributeAllTasks()

Redistribute all tasks among workers

func (*Pool[T]) Remove

func (p *Pool[T]) Remove(id int) error

Remove removes a worker from the pool

func (*Pool[T]) Resume

func (p *Pool[T]) Resume(id int) error

Resume resumes a worker

func (*Pool[T]) RoundRobinIndex

func (p *Pool[T]) RoundRobinIndex() int

If you're using round robin, it might comes handy to know the current index

func (*Pool[T]) SetOnTaskAttempt

func (p *Pool[T]) SetOnTaskAttempt(handler func(task *Task[T], workerID int))

SetOnTaskAttempt allows setting the onTaskAttempt handler after pool creation

func (*Pool[T]) SetOnTaskFailure

func (p *Pool[T]) SetOnTaskFailure(handler func(data T, metadata map[string]any, err error) TaskAction)

SetOnTaskFailure allows setting the onTaskFailure handler after pool creation

func (*Pool[T]) SetOnTaskSuccess

func (p *Pool[T]) SetOnTaskSuccess(handler func(data T, metadata map[string]any))

SetOnTaskSuccess allows setting the onTaskSuccess handler after pool creation

func (*Pool[T]) StartMetricsUpdater

func (p *Pool[T]) StartMetricsUpdater(interval time.Duration)

func (*Pool[T]) StopMetricsUpdater

func (p *Pool[T]) StopMetricsUpdater()

func (*Pool[T]) Submit

func (p *Pool[T]) Submit(data T, options ...TaskOption[T]) error

Submit allows the developer to send data directly without pre-allocation.

func (*Pool[T]) SubmitToFreeWorker

func (p *Pool[T]) SubmitToFreeWorker(taskData T, options ...TaskOption[T]) error

SubmitToFreeWorker attempts to submit a task to a free worker

func (*Pool[T]) TransitionTaskState

func (p *Pool[T]) TransitionTaskState(task *Task[T], to TaskState, reason string) error

TransitionTaskState handles state changes and maintains counts

func (*Pool[T]) UpdateTotalQueueSize

func (p *Pool[T]) UpdateTotalQueueSize(delta int64)

func (*Pool[T]) UpdateWorkerQueueSize

func (p *Pool[T]) UpdateWorkerQueueSize(workerID int, delta int64)

func (*Pool[T]) WaitWithCallback

func (p *Pool[T]) WaitWithCallback(ctx context.Context, callback func(queueSize, processingCount, deadTaskCount int) bool, interval time.Duration) error

WaitWithCallback waits for the pool to complete while calling a callback function

func (*Pool[T]) Workers

func (p *Pool[T]) Workers() ([]int, error)

Workers returns the list of worker IDs

type PoolMetricsSnapshot

type PoolMetricsSnapshot[T any] struct {
	PoolID        uint
	AssignedGroup any
	IsActive      bool
	QueueSize     int64
	Workers       map[int]WorkerSnapshot[T]
}

PoolMetricsSnapshot represents metrics for a single pool

type Pooler

type Pooler[T any] interface {
	// SetOnTaskSuccess sets the handler that will be called when a task succeeds.
	SetOnTaskSuccess(handler func(data T, metadata map[string]any))

	// SetOnTaskFailure sets the handler that will be called when a task fails.
	// The handler should return a TaskAction indicating how the pool should proceed.
	SetOnTaskFailure(handler func(data T, metadata map[string]any, err error) TaskAction)

	SetOnTaskAttempt(handler func(task *Task[T], workerID int))

	// Add adds a new worker to the pool. If a queue is not provided, a new one will be created.
	Add(worker Worker[T], queue TaskQueue[T]) error

	// Remove removes a worker by its ID, redistributing its tasks to other workers.
	Remove(id int) error

	// Pause pauses a worker by its ID, redistributing its tasks and preventing it from processing further tasks.
	Pause(id int) error

	// Resume resumes a previously paused worker, allowing it to process tasks again.
	Resume(id int) error

	// Workers returns the list of worker IDs currently managed by the pool.
	Workers() ([]int, error)

	// GetFreeWorkers returns a list of worker IDs that have no tasks in their queue
	GetFreeWorkers() []int

	// SubmitToFreeWorker attempts to submit a task to a free worker
	SubmitToFreeWorker(taskData T, options ...TaskOption[T]) error

	// Submit allows submitting data directly as a task without pre-allocation. Optional TaskOptions can modify the task's behavior.
	Submit(data T, options ...TaskOption[T]) error

	// WaitWithCallback waits until the provided callback returns false, periodically invoking it at the given interval.
	// The callback receives the current queue size, processing count, and dead task count, and should return false to stop waiting.
	WaitWithCallback(ctx context.Context, callback func(queueSize, processingCount, deadTaskCount int) bool, interval time.Duration) error

	// Close gracefully shuts down the pool, stopping all workers and redistributing or discarding tasks as needed.
	Close() error

	// QueueSize returns the total number of tasks currently queued.
	QueueSize() int64

	// ProcessingCount returns the number of tasks currently being processed by workers.
	ProcessingCount() int64

	// DeadTaskCount returns the number of dead tasks that have permanently failed.
	DeadTaskCount() int64

	// RangeDeadTasks iterates over all dead tasks. If the callback returns false, iteration stops.
	RangeDeadTasks(fn func(*DeadTask[T]) bool)

	// PullDeadTask removes and returns a dead task at the specified index.
	PullDeadTask(idx int) (*DeadTask[T], error)

	// PullRangeDeadTasks removes and returns a range of dead tasks [from, to).
	PullRangeDeadTasks(from int, to int) ([]*DeadTask[T], error)

	// RangeWorkerQueues iterates over each worker's queue size. If the callback returns false, iteration stops.
	RangeWorkerQueues(f func(workerID int, queueSize int64) bool)

	// RangeTaskQueues iterates over each worker's TaskQueue. If the callback returns false, iteration stops.
	RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool)

	// RangeWorkers iterates over each worker. If the callback returns false, iteration stops.
	RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)
}

Pooler is an interface that exposes all of the public methods of the Pool[T] struct.

type ProcessedNotification

type ProcessedNotification struct {
	// contains filtered or unexported fields
}

ProcessedNotification is used to notify when a task is processed

func NewProcessedNotification

func NewProcessedNotification() *ProcessedNotification

NewProcessedNotification creates a new ProcessedNotification

func (*ProcessedNotification) Done

func (n *ProcessedNotification) Done() <-chan struct{}

Done returns the channel that's closed when the task is processed

func (*ProcessedNotification) Notify

func (n *ProcessedNotification) Notify()

Notify notifies that the task has been processed

func (*ProcessedNotification) Wait

func (n *ProcessedNotification) Wait()

Wait waits for the notification

type QueuedNotification

type QueuedNotification struct {
	// contains filtered or unexported fields
}

QueuedNotification is used to notify when a task is queued

func NewQueuedNotification

func NewQueuedNotification() *QueuedNotification

NewQueuedNotification creates a new QueuedNotification

func (*QueuedNotification) Done

func (n *QueuedNotification) Done() <-chan struct{}

Done returns the channel that's closed when the task is queued

func (*QueuedNotification) Notify

func (n *QueuedNotification) Notify()

Notify notifies that the task has been queued

func (*QueuedNotification) Wait

func (n *QueuedNotification) Wait()

Wait waits for the notification

type RequestResponse

type RequestResponse[T any, R any] struct {
	// contains filtered or unexported fields
}

RequestResponse manages the lifecycle of a task request and its response

func NewRequestResponse

func NewRequestResponse[T any, R any](request T) *RequestResponse[T, R]

NewRequestResponse creates a new RequestResponse instance

func (*RequestResponse[T, R]) Complete

func (rr *RequestResponse[T, R]) Complete(response R)

Complete safely marks the request as complete with a response

func (*RequestResponse[T, R]) CompleteWithError

func (rr *RequestResponse[T, R]) CompleteWithError(err error)

CompleteWithError safely marks the request as complete with an error

func (*RequestResponse[T, R]) ConsultRequest

func (rr *RequestResponse[T, R]) ConsultRequest(fn func(T) error) error

Safely consults the request data

func (*RequestResponse[T, R]) Done

func (rr *RequestResponse[T, R]) Done() <-chan struct{}

Done returns a channel that's closed when the request is complete

func (*RequestResponse[T, R]) Err

func (rr *RequestResponse[T, R]) Err() error

Err returns any error that occurred during the request

func (*RequestResponse[T, R]) Wait

func (rr *RequestResponse[T, R]) Wait(ctx context.Context) (R, error)

Wait waits for the request to complete and returns the response and any error

type RetryPolicy

type RetryPolicy[T any] interface {
	ComputeDelay(retries int, err error, config *Config[T]) time.Duration
}

RetryPolicy defines an interface for retry policies

type RingBufferQueue

type RingBufferQueue[T any] struct {
	// contains filtered or unexported fields
}

RingBufferQueue is a ring buffer-based queue

func (*RingBufferQueue[T]) Clear

func (q *RingBufferQueue[T]) Clear()

func (*RingBufferQueue[T]) Dequeue

func (q *RingBufferQueue[T]) Dequeue() (*Task[T], bool)

func (*RingBufferQueue[T]) Drain

func (q *RingBufferQueue[T]) Drain() []*Task[T]

func (*RingBufferQueue[T]) Enqueue

func (q *RingBufferQueue[T]) Enqueue(task *Task[T])

func (*RingBufferQueue[T]) Length

func (q *RingBufferQueue[T]) Length() int

func (*RingBufferQueue[T]) PriorityEnqueue

func (q *RingBufferQueue[T]) PriorityEnqueue(task *Task[T]) error

type SubmitOption

type SubmitOption[T any] func(*submitConfig)

func WithBlockingMetadata

func WithBlockingMetadata[T any](metadata map[string]any) SubmitOption[T]

func WithBlockingProcessedNotification

func WithBlockingProcessedNotification[T any](notification *ProcessedNotification) SubmitOption[T]

func WithBlockingQueueNotification

func WithBlockingQueueNotification[T any](notification *QueuedNotification) SubmitOption[T]

type Task

type Task[T any] struct {
	// contains filtered or unexported fields
}

Task represents a task in the pool

func (*Task[T]) GetAttemptedWorkers

func (t *Task[T]) GetAttemptedWorkers() []int

func (*Task[T]) GetMetadata

func (t *Task[T]) GetMetadata() *Metadata

type TaskAction

type TaskAction int

TaskAction represents the action to take for a failed task

const (
	TaskActionRetry          TaskAction = iota + 1 // Task will retry using its own state
	TaskActionForceRetry                           // Force a retry, ignoring retry limits
	TaskActionRemove                               // Remove the task and recycle resources
	TaskActionAddToDeadTasks                       // Add the task to dead tasks
)

type TaskMetadataSetter

type TaskMetadataSetter func(metadata map[string]any)

type TaskOption

type TaskOption[T any] func(*Task[T])

TaskOption functions for configuring individual tasks

func WithTaskBounceRetry

func WithTaskBounceRetry[T any]() TaskOption[T]

WithTaskBounceRetry enables retry on different workers

func WithTaskDuration

func WithTaskDuration[T any](d time.Duration) TaskOption[T]

WithTaskDuration sets a per-attempt time limit for the task

func WithTaskImmediateRetry

func WithTaskImmediateRetry[T any]() TaskOption[T]

WithTaskImmediateRetry allows the submitted task to retry immediately

func WithTaskMetadata

func WithTaskMetadata[T any](metadata *Metadata) TaskOption[T]

WithMetadata sets metadata for the task

func WithTaskProcessedCb

func WithTaskProcessedCb[T any](cb func()) TaskOption[T]

WithTaskProcessedCb sets a callback for when a task is processed

func WithTaskProcessedNotification

func WithTaskProcessedNotification[T any](n *ProcessedNotification) TaskOption[T]

WithTaskProcessedNotification sets a notification for when a task is processed

func WithTaskQueuedCb

func WithTaskQueuedCb[T any](cb func()) TaskOption[T]

WithTaskQueuedCb sets a callback for when a task is queued

func WithTaskQueuedNotification

func WithTaskQueuedNotification[T any](n *QueuedNotification) TaskOption[T]

WithTaskQueuedNotification sets a notification for when a task is queued

func WithTaskRunningCb

func WithTaskRunningCb[T any](cb func()) TaskOption[T]

WithTaskRunningCb sets a callback for when a task is running

func WithTaskTimeout

func WithTaskTimeout[T any](d time.Duration) TaskOption[T]

WithTaskTimeout sets a total time limit for the task

type TaskQueue

type TaskQueue[T any] interface {
	Enqueue(task *Task[T])
	PriorityEnqueue(task *Task[T]) error // For immediate retry
	Dequeue() (*Task[T], bool)
	Length() int
	Clear()
	Drain() []*Task[T]
}

TaskQueue defines the interface for task queues

type TaskQueueType

type TaskQueueType int

TaskQueueType represents the type of task queue

const (
	TaskQueueTypeSlice TaskQueueType = iota
	TaskQueueTypeRingBuffer
	TaskQueueTypeCircularQueue
	TaskQueueTypeLinkedList
	TaskQueueTypeGrowingRingBuffer
	TaskQueueTypeGrowingCircularQueue
)

type TaskState

type TaskState int

TaskState represents the state of a task

const (
	TaskStateNone TaskState = iota
	TaskStateCreated
	TaskStatePending
	TaskStateQueued
	TaskStateRunning
	TaskStateCompleted
	TaskStateFailed
	TaskStateDead
)

func (TaskState) String

func (s TaskState) String() string

type TaskStateTransition

type TaskStateTransition[T any] struct {
	FromState TaskState
	ToState   TaskState
	Task      *Task[T]
	Reason    string
	Timestamp time.Time
}

TaskStateTransition represents a state change

type Worker

type Worker[T any] interface {
	Run(ctx context.Context, data T) error
}

Worker interface for task processing

type WorkerFactory

type WorkerFactory[T any] func() Worker[T]

type WorkerSnapshot

type WorkerSnapshot[T any] struct {
	ID       int
	Paused   bool
	Removed  bool
	HasTask  bool
	Metadata map[string]any
	State    WorkerState
}

type WorkerState

type WorkerState string
const (
	WorkerStateCreated    WorkerState = "Created"
	WorkerStateStarted    WorkerState = "Started"
	WorkerStateIdle       WorkerState = "Idle"
	WorkerStateEnqueuing  WorkerState = "Enqueuing"
	WorkerStateProcessing WorkerState = "Processing"
	WorkerStatePaused     WorkerState = "Paused"
	WorkerStateRemoved    WorkerState = "Removed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL