Documentation
¶
Overview ¶
logs.go
Index ¶
- Constants
- Variables
- func SetTaskMetadata(ctx context.Context, metadata map[string]any) error
- type BlockingConfig
- type BlockingDependentTask
- type BlockingMetricsSnapshot
- type BlockingPool
- func (p *BlockingPool[T, GID, TID]) Close() error
- func (p *BlockingPool[T, GID, TID]) GetSnapshot() BlockingMetricsSnapshot[T, GID]
- func (p *BlockingPool[T, GID, TID]) RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool)
- func (p *BlockingPool[T, GID, TID]) RangeWorkerQueues(f func(workerID int, queueSize int64) bool)
- func (p *BlockingPool[T, GID, TID]) RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)
- func (p *BlockingPool[T, GID, TID]) SetConcurrentPools(max int)
- func (p *BlockingPool[T, GID, TID]) SetConcurrentWorkers(max int)
- func (p *BlockingPool[T, GID, TID]) Submit(data T, opt ...SubmitOption[T]) error
- func (p *BlockingPool[T, GID, TID]) WaitForTask(groupID GID, taskID TID) error
- func (p *BlockingPool[T, GID, TID]) WaitWithCallback(ctx context.Context, ...) error
- type BlockingPoolOption
- func WithBlockingGetData[T any](cb func(T) interface{}) BlockingPoolOption[T]
- func WithBlockingLogger[T any](logger Logger) BlockingPoolOption[T]
- func WithBlockingMaxActivePools[T any](max int) BlockingPoolOption[T]
- func WithBlockingMaxWorkersPerPool[T any](max int) BlockingPoolOption[T]
- func WithBlockingOnGroupCompleted[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]
- func WithBlockingOnGroupCreated[T any](cb func(groupID any)) BlockingPoolOption[T]
- func WithBlockingOnGroupFailed[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]
- func WithBlockingOnGroupRemoved[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]
- func WithBlockingOnPoolClosed[T any](cb func()) BlockingPoolOption[T]
- func WithBlockingOnPoolCreated[T any](cb func(groupID any)) BlockingPoolOption[T]
- func WithBlockingOnPoolDestroyed[T any](cb func(groupID any)) BlockingPoolOption[T]
- func WithBlockingOnTaskCompleted[T any](cb func(task T)) BlockingPoolOption[T]
- func WithBlockingOnTaskFailed[T any](cb func(task T, err error)) BlockingPoolOption[T]
- func WithBlockingOnTaskStarted[T any](cb func(task T)) BlockingPoolOption[T]
- func WithBlockingOnTaskSubmitted[T any](cb func(task T)) BlockingPoolOption[T]
- func WithBlockingOnWorkerAdded[T any](cb func(groupID any, workerID int)) BlockingPoolOption[T]
- func WithBlockingOnWorkerRemoved[T any](cb func(groupID any, workerID int)) BlockingPoolOption[T]
- func WithBlockingSnapshotHandler[T any](cb func()) BlockingPoolOption[T]
- func WithBlockingWorkerFactory[T any](factory WorkerFactory[T]) BlockingPoolOption[T]
- type BlockingRequestResponse
- func (rr *BlockingRequestResponse[T, R, GID, TID]) Complete(response R)
- func (rr *BlockingRequestResponse[T, R, GID, TID]) CompleteWithError(err error)
- func (rr *BlockingRequestResponse[T, R, GID, TID]) ConsultRequest(fn func(T) error) error
- func (rr *BlockingRequestResponse[T, R, GID, TID]) Done() <-chan struct{}
- func (rr *BlockingRequestResponse[T, R, GID, TID]) Err() error
- func (rr BlockingRequestResponse[T, R, GID, TID]) GetGroupID() GID
- func (rr BlockingRequestResponse[T, R, GID, TID]) GetTaskID() TID
- func (rr *BlockingRequestResponse[T, R, GID, TID]) Wait(ctx context.Context) (R, error)
- type CircularQueue
- type Config
- type DeadTask
- type DelayFunc
- type DependencyGraph
- type DependencyMode
- type ExponentialBackoffRetryPolicy
- type FixedDelayRetryPolicy
- type GroupBlockingMetricSnapshot
- type GroupMetricSnapshot
- type GroupMetricsSnapshot
- type GroupPool
- func (gp *GroupPool[T, GID]) Close() error
- func (gp *GroupPool[T, GID]) EndGroup(gid GID) error
- func (gp *GroupPool[T, GID]) GetSnapshot() GroupMetricsSnapshot[T, GID]
- func (gp *GroupPool[T, GID]) SetMaxActivePools(max int)
- func (gp *GroupPool[T, GID]) SetMaxWorkersPerPool(max int)
- func (gp *GroupPool[T, GID]) ShouldWaitAll() bool
- func (gp *GroupPool[T, GID]) Submit(gid GID, task T, opts ...GroupTaskOption[T, GID]) error
- func (gp *GroupPool[T, GID]) WaitAll(ctx context.Context) error
- func (gp *GroupPool[T, GID]) WaitGroup(ctx context.Context, gid GID) error
- type GroupPoolConfig
- type GroupPoolOption
- func WithGroupPoolGroupMustSucceed[T GroupTask[GID], GID comparable](must bool) GroupPoolOption[T, GID]
- func WithGroupPoolLogger[T GroupTask[GID], GID comparable](logger Logger) GroupPoolOption[T, GID]
- func WithGroupPoolMaxActivePools[T GroupTask[GID], GID comparable](max int) GroupPoolOption[T, GID]
- func WithGroupPoolMaxWorkersPerPool[T GroupTask[GID], GID comparable](max int) GroupPoolOption[T, GID]
- func WithGroupPoolMetadata[T GroupTask[GID], GID comparable](m *Metadata) GroupPoolOption[T, GID]
- func WithGroupPoolOnGroupEnds[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint)) GroupPoolOption[T, GID]
- func WithGroupPoolOnGroupFails[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, pendingTasks []T)) GroupPoolOption[T, GID]
- func WithGroupPoolOnGroupStarts[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint)) GroupPoolOption[T, GID]
- func WithGroupPoolOnSnapshot[T GroupTask[GID], GID comparable](cb func(snapshot GroupMetricsSnapshot[T, GID])) GroupPoolOption[T, GID]
- func WithGroupPoolOnTaskExecuted[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any, err error)) GroupPoolOption[T, GID]
- func WithGroupPoolOnTaskFailure[T GroupTask[GID], GID comparable](...) GroupPoolOption[T, GID]
- func WithGroupPoolOnTaskSuccess[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any)) GroupPoolOption[T, GID]
- func WithGroupPoolUseFreeWorkerOnly[T GroupTask[GID], GID comparable]() GroupPoolOption[T, GID]
- func WithGroupPoolWorkerFactory[T GroupTask[GID], GID comparable](wf WorkerFactory[T]) GroupPoolOption[T, GID]
- type GroupRequestResponse
- func (rr *GroupRequestResponse[T, R, GID]) Complete(resp R)
- func (rr *GroupRequestResponse[T, R, GID]) CompleteWithError(err error)
- func (rr *GroupRequestResponse[T, R, GID]) ConsultRequest(fn func(T) error) error
- func (rr *GroupRequestResponse[T, R, GID]) Done() <-chan struct{}
- func (rr *GroupRequestResponse[T, R, GID]) Err() error
- func (rr GroupRequestResponse[T, R, GID]) GetGroupID() GID
- func (rr *GroupRequestResponse[T, R, GID]) Wait(ctx context.Context) (R, error)
- type GroupTask
- type GroupTaskOption
- func WithTaskGroupMetadata[T any, GID comparable](metadata map[string]any) GroupTaskOption[T, GID]
- func WithTaskGroupProcessedNotification[T any, GID comparable](notification *ProcessedNotification) GroupTaskOption[T, GID]
- func WithTaskGroupQueueNotification[T any, GID comparable](notification *QueuedNotification) GroupTaskOption[T, GID]
- type GrowingCircularQueue
- func (q *GrowingCircularQueue[T]) Clear()
- func (q *GrowingCircularQueue[T]) Dequeue() (*Task[T], bool)
- func (q *GrowingCircularQueue[T]) Drain() []*Task[T]
- func (q *GrowingCircularQueue[T]) Enqueue(task *Task[T])
- func (q *GrowingCircularQueue[T]) Length() int
- func (q *GrowingCircularQueue[T]) PriorityEnqueue(task *Task[T]) error
- type GrowingRingBufferQueue
- func (q *GrowingRingBufferQueue[T]) Clear()
- func (q *GrowingRingBufferQueue[T]) Dequeue() (*Task[T], bool)
- func (q *GrowingRingBufferQueue[T]) Drain() []*Task[T]
- func (q *GrowingRingBufferQueue[T]) Enqueue(task *Task[T])
- func (q *GrowingRingBufferQueue[T]) Length() int
- func (q *GrowingRingBufferQueue[T]) PriorityEnqueue(task *Task[T]) error
- type IndependentConfig
- type IndependentDependentTask
- type IndependentPool
- func (p *IndependentPool[T, GID, TID]) Close() error
- func (p *IndependentPool[T, GID, TID]) GetGroupStatus(groupID GID) (completed, total int, err error)
- func (i *IndependentPool[T, GID, TID]) PullDeadTask(id int) (*DeadTask[T], error)
- func (p *IndependentPool[T, GID, TID]) Submit(tasks []T) error
- func (p *IndependentPool[T, GID, TID]) WaitForGroup(ctx context.Context, groupID GID) error
- func (p *IndependentPool[T, GID, TID]) WaitWithCallback(ctx context.Context, ...) error
- type IndependentPoolOption
- func WithIndependentDependencyMode[T any](mode DependencyMode) IndependentPoolOption[T]
- func WithIndependentOnDeadTask[T any](handler func(deadTaskIndex int)) IndependentPoolOption[T]
- func WithIndependentOnGroupCompleted[T any](cb func(groupID any)) IndependentPoolOption[T]
- func WithIndependentOnGroupCreated[T any](cb func(groupID any)) IndependentPoolOption[T]
- func WithIndependentOnGroupRemoved[T any](cb func(groupID any, tasks []T)) IndependentPoolOption[T]
- func WithIndependentOnPoolClosed[T any](cb func()) IndependentPoolOption[T]
- func WithIndependentOnTaskCompleted[T any](cb func(task T)) IndependentPoolOption[T]
- func WithIndependentOnTaskFailed[T any](cb func(task T, err error)) IndependentPoolOption[T]
- func WithIndependentOnTaskStarted[T any](cb func(task T)) IndependentPoolOption[T]
- func WithIndependentOnTaskSubmitted[T any](cb func(task T)) IndependentPoolOption[T]
- func WithIndependentOnWorkerAdded[T any](cb func(workerID int)) IndependentPoolOption[T]
- func WithIndependentOnWorkerRemoved[T any](cb func(workerID int)) IndependentPoolOption[T]
- func WithIndependentWorkerFactory[T any](factory WorkerFactory[T]) IndependentPoolOption[T]
- func WithIndependentWorkerLimits[T any](min, max int) IndependentPoolOption[T]
- type LogFormat
- type LogLevel
- type LogOption
- type Logger
- type Metadata
- type Metrics
- type MetricsSnapshot
- type NoWorkerPolicy
- type Node
- type Option
- func WithAttempts[T any](attempts int) Option[T]
- func WithDeadTasksLimit[T any](limit int) Option[T]
- func WithDelay[T any](delay time.Duration) Option[T]
- func WithDelayFunc[T any](delayFunc DelayFunc[T]) Option[T]
- func WithIfRetry[T any](retryIf func(err error) bool) Option[T]
- func WithLogger[T any](logger Logger) Option[T]
- func WithLoopTicker[T any](d time.Duration) Option[T]
- func WithMaxDelay[T any](maxDelay time.Duration) Option[T]
- func WithMaxJitter[T any](maxJitter time.Duration) Option[T]
- func WithMaxQueueSize[T any](size int) Option[T]
- func WithMetadata[T any](metadata *Metadata) Option[T]
- func WithNoWorkerPolicy[T any](policy NoWorkerPolicy) Option[T]
- func WithOnDeadTask[T any](handler func(deadTaskIndex int)) Option[T]
- func WithOnPanic[T any](handler func(recovery interface{}, stackTrace string)) Option[T]
- func WithOnTaskAttempt[T any](handler func(task *Task[T], workerID int)) Option[T]
- func WithOnTaskFailure[T any](handler func(data T, metadata map[string]any, err error) TaskAction) Option[T]
- func WithOnTaskSuccess[T any](handler func(data T, metadata map[string]any)) Option[T]
- func WithOnWorkerPanic[T any](handler func(workerID int, recovery interface{}, stackTrace string)) Option[T]
- func WithRateLimit[T any](rps float64) Option[T]
- func WithRetryPolicy[T any](policy RetryPolicy[T]) Option[T]
- func WithRoundRobinDistribution[T any]() Option[T]
- func WithSnapshotCallback[T any](callback func(MetricsSnapshot[T])) Option[T]
- func WithSnapshotInterval[T any](interval time.Duration) Option[T]
- func WithSnapshots[T any]() Option[T]
- func WithSynchronousMode[T any]() Option[T]
- func WithTaskQueueType[T any](queueType TaskQueueType) Option[T]
- type PendingTasksSnapshot
- type Pool
- func (p *Pool[T]) Add(worker Worker[T], queue TaskQueue[T]) error
- func (p *Pool[T]) Close() error
- func (p *Pool[T]) DeadTaskCount() int64
- func (p *Pool[T]) GetFreeWorkers() []int
- func (p *Pool[T]) GetSnapshot() MetricsSnapshot[T]
- func (p *Pool[T]) GetWorkerQueueSize(workerID int) int64
- func (p *Pool[T]) IsAsync() bool
- func (p *Pool[T]) IsRoundRobin() bool
- func (p *Pool[T]) NewTaskQueue(queueType TaskQueueType) TaskQueue[T]
- func (p *Pool[T]) Pause(id int) error
- func (p *Pool[T]) ProcessingCount() int64
- func (p *Pool[T]) PullDeadTask(idx int) (*DeadTask[T], error)
- func (p *Pool[T]) PullRangeDeadTasks(from int, to int) ([]*DeadTask[T], error)
- func (p *Pool[T]) QueueSize() int64
- func (p *Pool[T]) RangeDeadTasks(fn func(*DeadTask[T]) bool)
- func (p *Pool[T]) RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool)
- func (p *Pool[T]) RangeWorkerQueues(f func(workerID int, queueSize int64) bool)
- func (p *Pool[T]) RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)
- func (p *Pool[T]) RedistributeAllTasks()
- func (p *Pool[T]) Remove(id int) error
- func (p *Pool[T]) Resume(id int) error
- func (p *Pool[T]) RoundRobinIndex() int
- func (p *Pool[T]) SetOnTaskAttempt(handler func(task *Task[T], workerID int))
- func (p *Pool[T]) SetOnTaskFailure(handler func(data T, metadata map[string]any, err error) TaskAction)
- func (p *Pool[T]) SetOnTaskSuccess(handler func(data T, metadata map[string]any))
- func (p *Pool[T]) StartMetricsUpdater(interval time.Duration)
- func (p *Pool[T]) StopMetricsUpdater()
- func (p *Pool[T]) Submit(data T, options ...TaskOption[T]) error
- func (p *Pool[T]) SubmitToFreeWorker(taskData T, options ...TaskOption[T]) error
- func (p *Pool[T]) TransitionTaskState(task *Task[T], to TaskState, reason string) error
- func (p *Pool[T]) UpdateTotalQueueSize(delta int64)
- func (p *Pool[T]) UpdateWorkerQueueSize(workerID int, delta int64)
- func (p *Pool[T]) WaitWithCallback(ctx context.Context, ...) error
- func (p *Pool[T]) Workers() ([]int, error)
- type PoolMetricsSnapshot
- type Pooler
- type ProcessedNotification
- type QueuedNotification
- type RequestResponse
- func (rr *RequestResponse[T, R]) Complete(response R)
- func (rr *RequestResponse[T, R]) CompleteWithError(err error)
- func (rr *RequestResponse[T, R]) ConsultRequest(fn func(T) error) error
- func (rr *RequestResponse[T, R]) Done() <-chan struct{}
- func (rr *RequestResponse[T, R]) Err() error
- func (rr *RequestResponse[T, R]) Wait(ctx context.Context) (R, error)
- type RetryPolicy
- type RingBufferQueue
- func (q *RingBufferQueue[T]) Clear()
- func (q *RingBufferQueue[T]) Dequeue() (*Task[T], bool)
- func (q *RingBufferQueue[T]) Drain() []*Task[T]
- func (q *RingBufferQueue[T]) Enqueue(task *Task[T])
- func (q *RingBufferQueue[T]) Length() int
- func (q *RingBufferQueue[T]) PriorityEnqueue(task *Task[T]) error
- type SubmitOption
- type Task
- type TaskAction
- type TaskMetadataSetter
- type TaskOption
- func WithTaskBounceRetry[T any]() TaskOption[T]
- func WithTaskDuration[T any](d time.Duration) TaskOption[T]
- func WithTaskImmediateRetry[T any]() TaskOption[T]
- func WithTaskMetadata[T any](metadata *Metadata) TaskOption[T]
- func WithTaskProcessedCb[T any](cb func()) TaskOption[T]
- func WithTaskProcessedNotification[T any](n *ProcessedNotification) TaskOption[T]
- func WithTaskQueuedCb[T any](cb func()) TaskOption[T]
- func WithTaskQueuedNotification[T any](n *QueuedNotification) TaskOption[T]
- func WithTaskRunningCb[T any](cb func()) TaskOption[T]
- func WithTaskTimeout[T any](d time.Duration) TaskOption[T]
- type TaskQueue
- type TaskQueueType
- type TaskState
- type TaskStateTransition
- type Worker
- type WorkerFactory
- type WorkerSnapshot
- type WorkerState
Constants ¶
const RetrypoolMetadataKey = "$retrypool"
Variables ¶
var ( ErrGroupNotFound = errors.New("group not found") ErrGroupEnded = errors.New("group already ended") ErrGroupFailed = errors.New("group has failed") ErrInvalidGroupID = errors.New("invalid group ID") ErrTaskGroupMismatch = errors.New("task group ID does not match submit group ID") )
var ( ErrPoolClosed = errors.New("pool is closed") ErrRateLimitExceeded = errors.New("rate limit exceeded") ErrNoWorkersAvailable = errors.New("no workers available") ErrInvalidWorkerID = errors.New("invalid worker ID") ErrMaxQueueSizeExceeded = errors.New("max queue size exceeded") ErrTaskTimeout = errors.New("task timeout") )
Predefined errors
Functions ¶
Types ¶
type BlockingConfig ¶
type BlockingConfig[T any] struct { // Task callbacks OnTaskSubmitted func(task T) OnTaskStarted func(task T) OnTaskCompleted func(task T) OnTaskFailed func(task T, err error) // Group/Pool callbacks OnGroupCreated func(groupID any) OnGroupCompleted func(groupID any, tasks []T) OnGroupFailed func(groupID any, tasks []T) OnGroupRemoved func(groupID any, tasks []T) OnPoolCreated func(groupID any) OnPoolDestroyed func(groupID any) OnWorkerAdded func(groupID any, workerID int) OnWorkerRemoved func(groupID any, workerID int) OnPoolClosed func() // contains filtered or unexported fields }
BlockingConfig holds basic config - each active group gets its own pool of workers
type BlockingDependentTask ¶
type BlockingDependentTask[GID comparable, TID comparable] interface { GetGroupID() GID GetTaskID() TID }
type BlockingMetricsSnapshot ¶
type BlockingMetricsSnapshot[T any, GID comparable] struct { TotalTasksSubmitted int64 TotalTasksProcessed int64 TotalTasksSucceeded int64 TotalTasksFailed int64 TotalDeadTasks int64 Metrics []GroupBlockingMetricSnapshot[T, GID] }
type BlockingPool ¶
type BlockingPool[T any, GID comparable, TID comparable] struct { // contains filtered or unexported fields }
BlockingPool manages multiple pools, assigning one to each active group
func NewBlockingPool ¶
func NewBlockingPool[T any, GID comparable, TID comparable]( ctx context.Context, opt ...BlockingPoolOption[T], ) (*BlockingPool[T, GID, TID], error)
NewBlockingPool constructs a BlockingPool with the given options.
func (*BlockingPool[T, GID, TID]) Close ¶
func (p *BlockingPool[T, GID, TID]) Close() error
Close shuts down all pools
func (*BlockingPool[T, GID, TID]) GetSnapshot ¶
func (p *BlockingPool[T, GID, TID]) GetSnapshot() BlockingMetricsSnapshot[T, GID]
func (*BlockingPool[T, GID, TID]) RangeTaskQueues ¶
func (p *BlockingPool[T, GID, TID]) RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool)
func (*BlockingPool[T, GID, TID]) RangeWorkerQueues ¶
func (p *BlockingPool[T, GID, TID]) RangeWorkerQueues(f func(workerID int, queueSize int64) bool)
func (*BlockingPool[T, GID, TID]) RangeWorkers ¶
func (p *BlockingPool[T, GID, TID]) RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)
func (*BlockingPool[T, GID, TID]) SetConcurrentPools ¶
func (p *BlockingPool[T, GID, TID]) SetConcurrentPools(max int)
Scale the amount of parallel pools that can be active at the same time
func (*BlockingPool[T, GID, TID]) SetConcurrentWorkers ¶
func (p *BlockingPool[T, GID, TID]) SetConcurrentWorkers(max int)
Scale the amount of workers that can be active at the same time on each pool
func (*BlockingPool[T, GID, TID]) Submit ¶
func (p *BlockingPool[T, GID, TID]) Submit(data T, opt ...SubmitOption[T]) error
Submit handles task submission, managing group pools as needed
func (*BlockingPool[T, GID, TID]) WaitForTask ¶
func (p *BlockingPool[T, GID, TID]) WaitForTask(groupID GID, taskID TID) error
WaitForTask blocks until the specified task completes
func (*BlockingPool[T, GID, TID]) WaitWithCallback ¶
func (p *BlockingPool[T, GID, TID]) WaitWithCallback( ctx context.Context, callback func(queueSize, processingCount, deadTaskCount int) bool, interval time.Duration, ) error
WaitWithCallback monitors all pool progress
type BlockingPoolOption ¶
type BlockingPoolOption[T any] func(*BlockingConfig[T])
BlockingPoolOption is a functional option for configuring the blocking pool
func WithBlockingGetData ¶
func WithBlockingGetData[T any](cb func(T) interface{}) BlockingPoolOption[T]
func WithBlockingLogger ¶
func WithBlockingLogger[T any](logger Logger) BlockingPoolOption[T]
func WithBlockingMaxActivePools ¶
func WithBlockingMaxActivePools[T any](max int) BlockingPoolOption[T]
func WithBlockingMaxWorkersPerPool ¶
func WithBlockingMaxWorkersPerPool[T any](max int) BlockingPoolOption[T]
func WithBlockingOnGroupCompleted ¶
func WithBlockingOnGroupCompleted[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]
func WithBlockingOnGroupCreated ¶
func WithBlockingOnGroupCreated[T any](cb func(groupID any)) BlockingPoolOption[T]
func WithBlockingOnGroupFailed ¶
func WithBlockingOnGroupFailed[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]
func WithBlockingOnGroupRemoved ¶
func WithBlockingOnGroupRemoved[T any](cb func(groupID any, tasks []T)) BlockingPoolOption[T]
func WithBlockingOnPoolClosed ¶
func WithBlockingOnPoolClosed[T any](cb func()) BlockingPoolOption[T]
func WithBlockingOnPoolCreated ¶
func WithBlockingOnPoolCreated[T any](cb func(groupID any)) BlockingPoolOption[T]
func WithBlockingOnPoolDestroyed ¶
func WithBlockingOnPoolDestroyed[T any](cb func(groupID any)) BlockingPoolOption[T]
func WithBlockingOnTaskCompleted ¶
func WithBlockingOnTaskCompleted[T any](cb func(task T)) BlockingPoolOption[T]
func WithBlockingOnTaskFailed ¶
func WithBlockingOnTaskFailed[T any](cb func(task T, err error)) BlockingPoolOption[T]
func WithBlockingOnTaskStarted ¶
func WithBlockingOnTaskStarted[T any](cb func(task T)) BlockingPoolOption[T]
func WithBlockingOnTaskSubmitted ¶
func WithBlockingOnTaskSubmitted[T any](cb func(task T)) BlockingPoolOption[T]
func WithBlockingOnWorkerAdded ¶
func WithBlockingOnWorkerAdded[T any](cb func(groupID any, workerID int)) BlockingPoolOption[T]
func WithBlockingOnWorkerRemoved ¶
func WithBlockingOnWorkerRemoved[T any](cb func(groupID any, workerID int)) BlockingPoolOption[T]
func WithBlockingSnapshotHandler ¶
func WithBlockingSnapshotHandler[T any](cb func()) BlockingPoolOption[T]
func WithBlockingWorkerFactory ¶
func WithBlockingWorkerFactory[T any](factory WorkerFactory[T]) BlockingPoolOption[T]
Configuration options
type BlockingRequestResponse ¶
type BlockingRequestResponse[T any, R any, GID comparable, TID comparable] struct { // contains filtered or unexported fields }
BlockingRequestResponse manages the lifecycle of a task request and its response
func NewBlockingRequestResponse ¶
func NewBlockingRequestResponse[T any, R any, GID comparable, TID comparable](request T, gid GID, tid TID) *BlockingRequestResponse[T, R, GID, TID]
NewBlockingRequestResponse creates a new BlockingRequestResponse instance
func (*BlockingRequestResponse[T, R, GID, TID]) Complete ¶
func (rr *BlockingRequestResponse[T, R, GID, TID]) Complete(response R)
Complete safely marks the request as complete with a response
func (*BlockingRequestResponse[T, R, GID, TID]) CompleteWithError ¶
func (rr *BlockingRequestResponse[T, R, GID, TID]) CompleteWithError(err error)
CompleteWithError safely marks the request as complete with an error
func (*BlockingRequestResponse[T, R, GID, TID]) ConsultRequest ¶
func (rr *BlockingRequestResponse[T, R, GID, TID]) ConsultRequest(fn func(T) error) error
Safely consults the request data
func (*BlockingRequestResponse[T, R, GID, TID]) Done ¶
func (rr *BlockingRequestResponse[T, R, GID, TID]) Done() <-chan struct{}
Done returns a channel that's closed when the request is complete
func (*BlockingRequestResponse[T, R, GID, TID]) Err ¶
func (rr *BlockingRequestResponse[T, R, GID, TID]) Err() error
Err returns any error that occurred during the request
func (BlockingRequestResponse[T, R, GID, TID]) GetGroupID ¶
func (rr BlockingRequestResponse[T, R, GID, TID]) GetGroupID() GID
func (BlockingRequestResponse[T, R, GID, TID]) GetTaskID ¶
func (rr BlockingRequestResponse[T, R, GID, TID]) GetTaskID() TID
type CircularQueue ¶
type CircularQueue[T any] struct { // contains filtered or unexported fields }
CircularQueue is a fixed-size circular queue
func (*CircularQueue[T]) Clear ¶
func (q *CircularQueue[T]) Clear()
func (*CircularQueue[T]) Dequeue ¶
func (q *CircularQueue[T]) Dequeue() (*Task[T], bool)
func (*CircularQueue[T]) Drain ¶
func (q *CircularQueue[T]) Drain() []*Task[T]
func (*CircularQueue[T]) Enqueue ¶
func (q *CircularQueue[T]) Enqueue(task *Task[T])
func (*CircularQueue[T]) Length ¶
func (q *CircularQueue[T]) Length() int
func (*CircularQueue[T]) PriorityEnqueue ¶
func (q *CircularQueue[T]) PriorityEnqueue(task *Task[T]) error
type Config ¶
type Config[T any] struct { // contains filtered or unexported fields }
Config holds configurations for the pool
type DeadTask ¶
type DeadTask[T any] struct { Data T Retries int TotalDuration time.Duration Errors []error Reason string StateHistory []*TaskStateTransition[T] }
DeadTask represents a task that has failed permanently
type DependencyGraph ¶
type DependencyGraph[TID comparable] struct { Nodes map[TID]*Node[TID] Order []TID // Topologically sorted execution order }
DependencyGraph represents the complete dependency structure for a task group
type DependencyMode ¶
type DependencyMode int
DependencyMode defines how dependencies are interpreted
const ( // ForwardMode means tasks depend on previous tasks (A depends on B means A waits for B) ForwardMode DependencyMode = iota // ReverseMode means tasks depend on subsequent tasks (A depends on B means B waits for A) ReverseMode )
type ExponentialBackoffRetryPolicy ¶
type ExponentialBackoffRetryPolicy[T any] struct { BaseDelay time.Duration MaxDelay time.Duration MaxJitter time.Duration }
ExponentialBackoffRetryPolicy implements exponential backoff with jitter
func (ExponentialBackoffRetryPolicy[T]) ComputeDelay ¶
func (p ExponentialBackoffRetryPolicy[T]) ComputeDelay(retries int, err error, config *Config[T]) time.Duration
ComputeDelay computes the delay using exponential backoff with jitter
type FixedDelayRetryPolicy ¶
FixedDelayRetryPolicy implements a fixed delay between retries
func (FixedDelayRetryPolicy[T]) ComputeDelay ¶
func (p FixedDelayRetryPolicy[T]) ComputeDelay(retries int, err error, config *Config[T]) time.Duration
ComputeDelay computes the delay using fixed delay with jitter
type GroupBlockingMetricSnapshot ¶
type GroupBlockingMetricSnapshot[T any, GID comparable] struct { GroupID GID MetricsSnapshot[T] }
type GroupMetricSnapshot ¶
type GroupMetricSnapshot[T any, GID comparable] struct { GroupID GID PoolID uint HasPool bool IsPending bool TasksPending int MetricsSnapshot MetricsSnapshot[T] }
type GroupMetricsSnapshot ¶
type GroupMetricsSnapshot[T any, GID comparable] struct { // These totals are GLOBAL and never reset TotalTasksSubmitted int64 TotalTasksProcessed int64 TotalTasksSucceeded int64 TotalTasksFailed int64 TotalDeadTasks int64 TotalPools int ActivePools int TotalPendingTasks int GroupsWithPending int Pools []PoolMetricsSnapshot[T] PendingTasks []PendingTasksSnapshot[GID] Metrics []GroupMetricSnapshot[T, GID] }
func (GroupMetricsSnapshot[T, GID]) Clone ¶
func (m GroupMetricsSnapshot[T, GID]) Clone() GroupMetricsSnapshot[T, GID]
Clone creates a deep copy of the snapshot
type GroupPool ¶
type GroupPool[T GroupTask[GID], GID comparable] struct { // contains filtered or unexported fields }
func NewGroupPool ¶
func NewGroupPool[T GroupTask[GID], GID comparable]( ctx context.Context, opts ...GroupPoolOption[T, GID], ) (*GroupPool[T, GID], error)
NewGroupPool creates a new GroupPool instance with provided options
func (*GroupPool[T, GID]) EndGroup ¶
EndGroup signals no more tasks for the group, but allows existing tasks to complete
func (*GroupPool[T, GID]) GetSnapshot ¶
func (gp *GroupPool[T, GID]) GetSnapshot() GroupMetricsSnapshot[T, GID]
GetSnapshot returns current metrics snapshot
func (*GroupPool[T, GID]) SetMaxActivePools ¶
func (*GroupPool[T, GID]) SetMaxWorkersPerPool ¶
func (*GroupPool[T, GID]) ShouldWaitAll ¶
func (*GroupPool[T, GID]) Submit ¶
func (gp *GroupPool[T, GID]) Submit(gid GID, task T, opts ...GroupTaskOption[T, GID]) error
Submit task to a group
type GroupPoolConfig ¶
type GroupPoolConfig[T GroupTask[GID], GID comparable] struct { Logger Logger WorkerFactory WorkerFactory[T] MaxActivePools int MaxWorkersPerPool int UseFreeWorkerOnly bool GroupMustSucceed bool // By default a group can have deadtask/failure, if true the pending tasks will be discarded OnTaskSuccess func(gid GID, poolID uint, data T, metadata map[string]any) OnTaskFailure func(gid GID, poolID uint, data T, metadata map[string]any, err error) TaskAction // GroupPool specific callback handled either OnTaskSuccess or OnTaskFailure OnTaskExecuted func(gid GID, poolID uint, data T, metadata map[string]any, err error) OnSnapshot func(snapshot GroupMetricsSnapshot[T, GID]) OnGroupStarts func(gid GID, poolID uint) OnGroupEnds func(gid GID, poolID uint) OnGroupFails func(gid GID, poolID uint, pendingTasks []T) // Only if GroupMustSucceed is enabled // contains filtered or unexported fields }
GroupPoolConfig holds configuration options for the GroupPool
type GroupPoolOption ¶
type GroupPoolOption[T GroupTask[GID], GID comparable] func(*GroupPoolConfig[T, GID])
GroupOption is a functional option for configuring a GroupPool
func WithGroupPoolGroupMustSucceed ¶
func WithGroupPoolGroupMustSucceed[T GroupTask[GID], GID comparable](must bool) GroupPoolOption[T, GID]
func WithGroupPoolLogger ¶
func WithGroupPoolLogger[T GroupTask[GID], GID comparable](logger Logger) GroupPoolOption[T, GID]
func WithGroupPoolMaxActivePools ¶
func WithGroupPoolMaxActivePools[T GroupTask[GID], GID comparable](max int) GroupPoolOption[T, GID]
func WithGroupPoolMaxWorkersPerPool ¶
func WithGroupPoolMaxWorkersPerPool[T GroupTask[GID], GID comparable](max int) GroupPoolOption[T, GID]
func WithGroupPoolMetadata ¶
func WithGroupPoolMetadata[T GroupTask[GID], GID comparable](m *Metadata) GroupPoolOption[T, GID]
func WithGroupPoolOnGroupEnds ¶
func WithGroupPoolOnGroupEnds[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint)) GroupPoolOption[T, GID]
func WithGroupPoolOnGroupFails ¶
func WithGroupPoolOnGroupFails[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, pendingTasks []T)) GroupPoolOption[T, GID]
func WithGroupPoolOnGroupStarts ¶
func WithGroupPoolOnGroupStarts[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint)) GroupPoolOption[T, GID]
func WithGroupPoolOnSnapshot ¶
func WithGroupPoolOnSnapshot[T GroupTask[GID], GID comparable](cb func(snapshot GroupMetricsSnapshot[T, GID])) GroupPoolOption[T, GID]
func WithGroupPoolOnTaskExecuted ¶
func WithGroupPoolOnTaskExecuted[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any, err error)) GroupPoolOption[T, GID]
func WithGroupPoolOnTaskFailure ¶
func WithGroupPoolOnTaskFailure[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any, err error) TaskAction) GroupPoolOption[T, GID]
func WithGroupPoolOnTaskSuccess ¶
func WithGroupPoolOnTaskSuccess[T GroupTask[GID], GID comparable](cb func(gid GID, poolID uint, data T, metadata map[string]any)) GroupPoolOption[T, GID]
func WithGroupPoolUseFreeWorkerOnly ¶
func WithGroupPoolUseFreeWorkerOnly[T GroupTask[GID], GID comparable]() GroupPoolOption[T, GID]
func WithGroupPoolWorkerFactory ¶
func WithGroupPoolWorkerFactory[T GroupTask[GID], GID comparable](wf WorkerFactory[T]) GroupPoolOption[T, GID]
type GroupRequestResponse ¶
type GroupRequestResponse[T any, R any, GID comparable] struct { // contains filtered or unexported fields }
GroupRequestResponse manages a request/response pair for tasks in a group.
func NewGroupRequestResponse ¶
func NewGroupRequestResponse[T any, R any, GID comparable](req T, gid GID) *GroupRequestResponse[T, R, GID]
func (*GroupRequestResponse[T, R, GID]) Complete ¶
func (rr *GroupRequestResponse[T, R, GID]) Complete(resp R)
func (*GroupRequestResponse[T, R, GID]) CompleteWithError ¶
func (rr *GroupRequestResponse[T, R, GID]) CompleteWithError(err error)
func (*GroupRequestResponse[T, R, GID]) ConsultRequest ¶
func (rr *GroupRequestResponse[T, R, GID]) ConsultRequest(fn func(T) error) error
func (*GroupRequestResponse[T, R, GID]) Done ¶
func (rr *GroupRequestResponse[T, R, GID]) Done() <-chan struct{}
func (*GroupRequestResponse[T, R, GID]) Err ¶
func (rr *GroupRequestResponse[T, R, GID]) Err() error
func (GroupRequestResponse[T, R, GID]) GetGroupID ¶
func (rr GroupRequestResponse[T, R, GID]) GetGroupID() GID
type GroupTask ¶
type GroupTask[GID comparable] interface { GetGroupID() GID }
GroupTask is the interface for tasks that have a group ID
type GroupTaskOption ¶
type GroupTaskOption[T any, GID comparable] func(*groupSubmitConfig)
func WithTaskGroupMetadata ¶
func WithTaskGroupMetadata[T any, GID comparable](metadata map[string]any) GroupTaskOption[T, GID]
func WithTaskGroupProcessedNotification ¶
func WithTaskGroupProcessedNotification[T any, GID comparable](notification *ProcessedNotification) GroupTaskOption[T, GID]
func WithTaskGroupQueueNotification ¶
func WithTaskGroupQueueNotification[T any, GID comparable](notification *QueuedNotification) GroupTaskOption[T, GID]
type GrowingCircularQueue ¶
type GrowingCircularQueue[T any] struct { // contains filtered or unexported fields }
GrowingCircularQueue is a circular queue that grows when full
func (*GrowingCircularQueue[T]) Clear ¶
func (q *GrowingCircularQueue[T]) Clear()
func (*GrowingCircularQueue[T]) Dequeue ¶
func (q *GrowingCircularQueue[T]) Dequeue() (*Task[T], bool)
func (*GrowingCircularQueue[T]) Drain ¶
func (q *GrowingCircularQueue[T]) Drain() []*Task[T]
func (*GrowingCircularQueue[T]) Enqueue ¶
func (q *GrowingCircularQueue[T]) Enqueue(task *Task[T])
func (*GrowingCircularQueue[T]) Length ¶
func (q *GrowingCircularQueue[T]) Length() int
func (*GrowingCircularQueue[T]) PriorityEnqueue ¶
func (q *GrowingCircularQueue[T]) PriorityEnqueue(task *Task[T]) error
type GrowingRingBufferQueue ¶
type GrowingRingBufferQueue[T any] struct { // contains filtered or unexported fields }
GrowingRingBufferQueue is a ring buffer-based queue that grows when full
func (*GrowingRingBufferQueue[T]) Clear ¶
func (q *GrowingRingBufferQueue[T]) Clear()
func (*GrowingRingBufferQueue[T]) Dequeue ¶
func (q *GrowingRingBufferQueue[T]) Dequeue() (*Task[T], bool)
func (*GrowingRingBufferQueue[T]) Drain ¶
func (q *GrowingRingBufferQueue[T]) Drain() []*Task[T]
func (*GrowingRingBufferQueue[T]) Enqueue ¶
func (q *GrowingRingBufferQueue[T]) Enqueue(task *Task[T])
func (*GrowingRingBufferQueue[T]) Length ¶
func (q *GrowingRingBufferQueue[T]) Length() int
func (*GrowingRingBufferQueue[T]) PriorityEnqueue ¶
func (q *GrowingRingBufferQueue[T]) PriorityEnqueue(task *Task[T]) error
type IndependentConfig ¶
type IndependentConfig[T any] struct { // Task callbacks OnTaskSubmitted func(task T) OnTaskStarted func(task T) OnTaskCompleted func(task T) OnTaskFailed func(task T, err error) // Group callbacks OnGroupCreated func(groupID any) OnGroupCompleted func(groupID any) OnGroupRemoved func(groupID any, tasks []T) // Pool events OnWorkerAdded func(workerID int) OnWorkerRemoved func(workerID int) OnPoolClosed func() // contains filtered or unexported fields }
type IndependentDependentTask ¶
type IndependentDependentTask[GID comparable, TID comparable] interface { GetDependencies() []TID GetGroupID() GID GetTaskID() TID }
type IndependentPool ¶
type IndependentPool[T any, GID comparable, TID comparable] struct { // contains filtered or unexported fields }
func NewIndependentPool ¶
func NewIndependentPool[T any, GID comparable, TID comparable]( ctx context.Context, opt ...IndependentPoolOption[T], ) (*IndependentPool[T, GID, TID], error)
func (*IndependentPool[T, GID, TID]) Close ¶
func (p *IndependentPool[T, GID, TID]) Close() error
Close gracefully shuts down the pool
func (*IndependentPool[T, GID, TID]) GetGroupStatus ¶
func (p *IndependentPool[T, GID, TID]) GetGroupStatus(groupID GID) (completed, total int, err error)
GetGroupStatus returns the status of a task group
func (*IndependentPool[T, GID, TID]) PullDeadTask ¶
func (i *IndependentPool[T, GID, TID]) PullDeadTask(id int) (*DeadTask[T], error)
func (*IndependentPool[T, GID, TID]) Submit ¶
func (p *IndependentPool[T, GID, TID]) Submit(tasks []T) error
Submit submits a complete group of tasks with dependencies
func (*IndependentPool[T, GID, TID]) WaitForGroup ¶
func (p *IndependentPool[T, GID, TID]) WaitForGroup(ctx context.Context, groupID GID) error
WaitForGroup waits for all tasks in a group to complete
func (*IndependentPool[T, GID, TID]) WaitWithCallback ¶
func (p *IndependentPool[T, GID, TID]) WaitWithCallback( ctx context.Context, callback func(queueSize, processingCount, deadTaskCount int) bool, interval time.Duration, ) error
WaitWithCallback waits for the pool to complete while calling a callback function
type IndependentPoolOption ¶
type IndependentPoolOption[T any] func(*IndependentConfig[T])
func WithIndependentDependencyMode ¶
func WithIndependentDependencyMode[T any](mode DependencyMode) IndependentPoolOption[T]
func WithIndependentOnDeadTask ¶
func WithIndependentOnDeadTask[T any](handler func(deadTaskIndex int)) IndependentPoolOption[T]
func WithIndependentOnGroupCompleted ¶
func WithIndependentOnGroupCompleted[T any](cb func(groupID any)) IndependentPoolOption[T]
func WithIndependentOnGroupCreated ¶
func WithIndependentOnGroupCreated[T any](cb func(groupID any)) IndependentPoolOption[T]
func WithIndependentOnGroupRemoved ¶
func WithIndependentOnGroupRemoved[T any](cb func(groupID any, tasks []T)) IndependentPoolOption[T]
func WithIndependentOnPoolClosed ¶
func WithIndependentOnPoolClosed[T any](cb func()) IndependentPoolOption[T]
func WithIndependentOnTaskCompleted ¶
func WithIndependentOnTaskCompleted[T any](cb func(task T)) IndependentPoolOption[T]
func WithIndependentOnTaskFailed ¶
func WithIndependentOnTaskFailed[T any](cb func(task T, err error)) IndependentPoolOption[T]
func WithIndependentOnTaskStarted ¶
func WithIndependentOnTaskStarted[T any](cb func(task T)) IndependentPoolOption[T]
func WithIndependentOnTaskSubmitted ¶
func WithIndependentOnTaskSubmitted[T any](cb func(task T)) IndependentPoolOption[T]
func WithIndependentOnWorkerAdded ¶
func WithIndependentOnWorkerAdded[T any](cb func(workerID int)) IndependentPoolOption[T]
func WithIndependentOnWorkerRemoved ¶
func WithIndependentOnWorkerRemoved[T any](cb func(workerID int)) IndependentPoolOption[T]
func WithIndependentWorkerFactory ¶
func WithIndependentWorkerFactory[T any](factory WorkerFactory[T]) IndependentPoolOption[T]
func WithIndependentWorkerLimits ¶
func WithIndependentWorkerLimits[T any](min, max int) IndependentPoolOption[T]
type LogOption ¶
type LogOption func(*loggerConfig)
LogOption defines functional options for logger configuration
type Logger ¶
type Logger interface { Debug(ctx context.Context, msg string, keysAndValues ...any) Info(ctx context.Context, msg string, keysAndValues ...any) Warn(ctx context.Context, msg string, keysAndValues ...any) Error(ctx context.Context, msg string, keysAndValues ...any) WithFields(fields map[string]any) Logger Enable() Disable() }
Logger provides structured logging with source tracking
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
func NewMetadata ¶
func NewMetadata() *Metadata
func (*Metadata) CloneStore ¶
type Metrics ¶
type Metrics struct { TasksSubmitted atomic.Int64 TasksProcessed atomic.Int64 TasksSucceeded atomic.Int64 TasksFailed atomic.Int64 DeadTasks atomic.Int64 }
Metrics holds metrics for the pool
type MetricsSnapshot ¶
type MetricsSnapshot[T any] struct { TasksSubmitted int64 TasksProcessed int64 TasksSucceeded int64 TasksFailed int64 DeadTasks int64 TaskQueues map[int]int Workers map[int]WorkerSnapshot[T] }
func (MetricsSnapshot[T]) Clone ¶
func (m MetricsSnapshot[T]) Clone() MetricsSnapshot[T]
type NoWorkerPolicy ¶
type NoWorkerPolicy int
NoWorkerPolicy defines the behavior when no workers are available
const ( NoWorkerPolicyReject NoWorkerPolicy = iota // Default behavior: Reject the task submission with an error NoWorkerPolicyAddToDeadTasks // Add the task to dead tasks )
type Node ¶
type Node[TID comparable] struct { ID TID Dependencies []TID Dependents []TID Visited bool InProgress bool // Used for cycle detection }
type Option ¶
Option type for configuring the Pool
func WithAttempts ¶
WithAttempts sets the maximum number of attempts
func WithDeadTasksLimit ¶
WithDeadTasksLimit sets the limit for dead tasks
func WithDelayFunc ¶
WithDelayFunc sets a custom function to determine the delay between retries
func WithIfRetry ¶
WithIfRetry sets the function to determine if an error is retryable
func WithLogger ¶
WithLogger sets the logger for the pool and all its components
func WithMaxDelay ¶
WithMaxDelay sets the maximum delay between retries
func WithMaxJitter ¶
WithMaxJitter sets the maximum jitter for delay between retries
func WithMaxQueueSize ¶
WithMaxQueueSize sets the maximum queue size for the pool
func WithMetadata ¶
func WithNoWorkerPolicy ¶
func WithNoWorkerPolicy[T any](policy NoWorkerPolicy) Option[T]
WithNoWorkerPolicy sets the policy when no workers are available
func WithOnDeadTask ¶
WithOnDeadTask sets a callback when a task is added to dead tasks
func WithOnPanic ¶
WithOnPanic sets a custom panic handler for the pool
func WithOnTaskAttempt ¶
WithOnTaskAttempt sets a callback that is called whenever a worker attempts a task
func WithOnTaskFailure ¶
func WithOnTaskFailure[T any](handler func(data T, metadata map[string]any, err error) TaskAction) Option[T]
WithOnTaskFailure sets a callback for task failure
func WithOnTaskSuccess ¶
WithOnTaskSuccess sets a callback for successful task completion
func WithOnWorkerPanic ¶
func WithOnWorkerPanic[T any](handler func(workerID int, recovery interface{}, stackTrace string)) Option[T]
WithOnWorkerPanic sets a custom panic handler for workers
func WithRateLimit ¶
WithRateLimit sets the rate limit for task submissions. In async mode, the first burst of tasks (up to 2 * number of workers) may be processed immediately before the rate limit takes full effect. This provides faster startup while maintaining the desired steady-state rate.
func WithRetryPolicy ¶
func WithRetryPolicy[T any](policy RetryPolicy[T]) Option[T]
WithRetryPolicy sets a custom retry policy for the pool
func WithRoundRobinDistribution ¶
WithRoundRobinDistribution sets the task distribution to round-robin
func WithSnapshotCallback ¶
func WithSnapshotCallback[T any](callback func(MetricsSnapshot[T])) Option[T]
WithSnapshotCallback sets a callback for snapshot updates
func WithSnapshotInterval ¶
WithSnapshotInterval sets the interval for periodic snapshots
func WithSnapshots ¶
WithSnapshotInterval sets the interval for periodic snapshots
func WithSynchronousMode ¶
WithSynchronousMode sets the pool to synchronous mode
func WithTaskQueueType ¶
func WithTaskQueueType[T any](queueType TaskQueueType) Option[T]
WithTaskQueueType sets the type of task queue
type PendingTasksSnapshot ¶
type PendingTasksSnapshot[GID comparable] struct { GroupID GID TasksPending int }
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool manages a set of workers and tasks
func (*Pool[T]) DeadTaskCount ¶
DeadTaskCount returns the number of dead tasks
func (*Pool[T]) GetFreeWorkers ¶
GetFreeWorkers returns a list of worker IDs that have no tasks in their queue
func (*Pool[T]) GetSnapshot ¶
func (p *Pool[T]) GetSnapshot() MetricsSnapshot[T]
func (*Pool[T]) GetWorkerQueueSize ¶
func (*Pool[T]) IsRoundRobin ¶
func (*Pool[T]) NewTaskQueue ¶
func (p *Pool[T]) NewTaskQueue(queueType TaskQueueType) TaskQueue[T]
NewTaskQueue creates a new task queue of the specified type
func (*Pool[T]) ProcessingCount ¶
ProcessingCount returns the number of tasks currently being processed
func (*Pool[T]) PullDeadTask ¶
PullDeadTask removes and returns a dead task from the pool
func (*Pool[T]) PullRangeDeadTasks ¶
PullRangeDeadTasks removes and returns a range of dead tasks from the pool
func (*Pool[T]) RangeDeadTasks ¶
RangeDeadTasks iterates over all dead tasks
func (*Pool[T]) RangeTaskQueues ¶
func (*Pool[T]) RangeWorkerQueues ¶
func (*Pool[T]) RangeWorkers ¶
func (p *Pool[T]) RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool)
func (*Pool[T]) RedistributeAllTasks ¶
func (p *Pool[T]) RedistributeAllTasks()
Redistribute all tasks among workers
func (*Pool[T]) RoundRobinIndex ¶
If you're using round robin, it might comes handy to know the current index
func (*Pool[T]) SetOnTaskAttempt ¶
SetOnTaskAttempt allows setting the onTaskAttempt handler after pool creation
func (*Pool[T]) SetOnTaskFailure ¶
func (p *Pool[T]) SetOnTaskFailure(handler func(data T, metadata map[string]any, err error) TaskAction)
SetOnTaskFailure allows setting the onTaskFailure handler after pool creation
func (*Pool[T]) SetOnTaskSuccess ¶
SetOnTaskSuccess allows setting the onTaskSuccess handler after pool creation
func (*Pool[T]) StartMetricsUpdater ¶
func (*Pool[T]) StopMetricsUpdater ¶
func (p *Pool[T]) StopMetricsUpdater()
func (*Pool[T]) Submit ¶
func (p *Pool[T]) Submit(data T, options ...TaskOption[T]) error
Submit allows the developer to send data directly without pre-allocation.
func (*Pool[T]) SubmitToFreeWorker ¶
func (p *Pool[T]) SubmitToFreeWorker(taskData T, options ...TaskOption[T]) error
SubmitToFreeWorker attempts to submit a task to a free worker
func (*Pool[T]) TransitionTaskState ¶
TransitionTaskState handles state changes and maintains counts
func (*Pool[T]) UpdateTotalQueueSize ¶
func (*Pool[T]) UpdateWorkerQueueSize ¶
type PoolMetricsSnapshot ¶
type PoolMetricsSnapshot[T any] struct { PoolID uint AssignedGroup any IsActive bool QueueSize int64 Workers map[int]WorkerSnapshot[T] }
PoolMetricsSnapshot represents metrics for a single pool
type Pooler ¶
type Pooler[T any] interface { // SetOnTaskSuccess sets the handler that will be called when a task succeeds. SetOnTaskSuccess(handler func(data T, metadata map[string]any)) // SetOnTaskFailure sets the handler that will be called when a task fails. // The handler should return a TaskAction indicating how the pool should proceed. SetOnTaskFailure(handler func(data T, metadata map[string]any, err error) TaskAction) SetOnTaskAttempt(handler func(task *Task[T], workerID int)) // Add adds a new worker to the pool. If a queue is not provided, a new one will be created. Add(worker Worker[T], queue TaskQueue[T]) error // Remove removes a worker by its ID, redistributing its tasks to other workers. Remove(id int) error // Pause pauses a worker by its ID, redistributing its tasks and preventing it from processing further tasks. Pause(id int) error // Resume resumes a previously paused worker, allowing it to process tasks again. Resume(id int) error // Workers returns the list of worker IDs currently managed by the pool. Workers() ([]int, error) // GetFreeWorkers returns a list of worker IDs that have no tasks in their queue GetFreeWorkers() []int // SubmitToFreeWorker attempts to submit a task to a free worker SubmitToFreeWorker(taskData T, options ...TaskOption[T]) error // Submit allows submitting data directly as a task without pre-allocation. Optional TaskOptions can modify the task's behavior. Submit(data T, options ...TaskOption[T]) error // WaitWithCallback waits until the provided callback returns false, periodically invoking it at the given interval. // The callback receives the current queue size, processing count, and dead task count, and should return false to stop waiting. WaitWithCallback(ctx context.Context, callback func(queueSize, processingCount, deadTaskCount int) bool, interval time.Duration) error // Close gracefully shuts down the pool, stopping all workers and redistributing or discarding tasks as needed. Close() error // QueueSize returns the total number of tasks currently queued. QueueSize() int64 // ProcessingCount returns the number of tasks currently being processed by workers. ProcessingCount() int64 // DeadTaskCount returns the number of dead tasks that have permanently failed. DeadTaskCount() int64 // RangeDeadTasks iterates over all dead tasks. If the callback returns false, iteration stops. RangeDeadTasks(fn func(*DeadTask[T]) bool) // PullDeadTask removes and returns a dead task at the specified index. PullDeadTask(idx int) (*DeadTask[T], error) // PullRangeDeadTasks removes and returns a range of dead tasks [from, to). PullRangeDeadTasks(from int, to int) ([]*DeadTask[T], error) // RangeWorkerQueues iterates over each worker's queue size. If the callback returns false, iteration stops. RangeWorkerQueues(f func(workerID int, queueSize int64) bool) // RangeTaskQueues iterates over each worker's TaskQueue. If the callback returns false, iteration stops. RangeTaskQueues(f func(workerID int, queue TaskQueue[T]) bool) // RangeWorkers iterates over each worker. If the callback returns false, iteration stops. RangeWorkers(f func(workerID int, state WorkerSnapshot[T]) bool) }
Pooler is an interface that exposes all of the public methods of the Pool[T] struct.
type ProcessedNotification ¶
type ProcessedNotification struct {
// contains filtered or unexported fields
}
ProcessedNotification is used to notify when a task is processed
func NewProcessedNotification ¶
func NewProcessedNotification() *ProcessedNotification
NewProcessedNotification creates a new ProcessedNotification
func (*ProcessedNotification) Done ¶
func (n *ProcessedNotification) Done() <-chan struct{}
Done returns the channel that's closed when the task is processed
func (*ProcessedNotification) Notify ¶
func (n *ProcessedNotification) Notify()
Notify notifies that the task has been processed
func (*ProcessedNotification) Wait ¶
func (n *ProcessedNotification) Wait()
Wait waits for the notification
type QueuedNotification ¶
type QueuedNotification struct {
// contains filtered or unexported fields
}
QueuedNotification is used to notify when a task is queued
func NewQueuedNotification ¶
func NewQueuedNotification() *QueuedNotification
NewQueuedNotification creates a new QueuedNotification
func (*QueuedNotification) Done ¶
func (n *QueuedNotification) Done() <-chan struct{}
Done returns the channel that's closed when the task is queued
func (*QueuedNotification) Notify ¶
func (n *QueuedNotification) Notify()
Notify notifies that the task has been queued
func (*QueuedNotification) Wait ¶
func (n *QueuedNotification) Wait()
Wait waits for the notification
type RequestResponse ¶
RequestResponse manages the lifecycle of a task request and its response
func NewRequestResponse ¶
func NewRequestResponse[T any, R any](request T) *RequestResponse[T, R]
NewRequestResponse creates a new RequestResponse instance
func (*RequestResponse[T, R]) Complete ¶
func (rr *RequestResponse[T, R]) Complete(response R)
Complete safely marks the request as complete with a response
func (*RequestResponse[T, R]) CompleteWithError ¶
func (rr *RequestResponse[T, R]) CompleteWithError(err error)
CompleteWithError safely marks the request as complete with an error
func (*RequestResponse[T, R]) ConsultRequest ¶
func (rr *RequestResponse[T, R]) ConsultRequest(fn func(T) error) error
Safely consults the request data
func (*RequestResponse[T, R]) Done ¶
func (rr *RequestResponse[T, R]) Done() <-chan struct{}
Done returns a channel that's closed when the request is complete
func (*RequestResponse[T, R]) Err ¶
func (rr *RequestResponse[T, R]) Err() error
Err returns any error that occurred during the request
type RetryPolicy ¶
type RetryPolicy[T any] interface { ComputeDelay(retries int, err error, config *Config[T]) time.Duration }
RetryPolicy defines an interface for retry policies
type RingBufferQueue ¶
type RingBufferQueue[T any] struct { // contains filtered or unexported fields }
RingBufferQueue is a ring buffer-based queue
func (*RingBufferQueue[T]) Clear ¶
func (q *RingBufferQueue[T]) Clear()
func (*RingBufferQueue[T]) Dequeue ¶
func (q *RingBufferQueue[T]) Dequeue() (*Task[T], bool)
func (*RingBufferQueue[T]) Drain ¶
func (q *RingBufferQueue[T]) Drain() []*Task[T]
func (*RingBufferQueue[T]) Enqueue ¶
func (q *RingBufferQueue[T]) Enqueue(task *Task[T])
func (*RingBufferQueue[T]) Length ¶
func (q *RingBufferQueue[T]) Length() int
func (*RingBufferQueue[T]) PriorityEnqueue ¶
func (q *RingBufferQueue[T]) PriorityEnqueue(task *Task[T]) error
type SubmitOption ¶
type SubmitOption[T any] func(*submitConfig)
func WithBlockingMetadata ¶
func WithBlockingMetadata[T any](metadata map[string]any) SubmitOption[T]
func WithBlockingProcessedNotification ¶
func WithBlockingProcessedNotification[T any](notification *ProcessedNotification) SubmitOption[T]
func WithBlockingQueueNotification ¶
func WithBlockingQueueNotification[T any](notification *QueuedNotification) SubmitOption[T]
type Task ¶
type Task[T any] struct { // contains filtered or unexported fields }
Task represents a task in the pool
func (*Task[T]) GetAttemptedWorkers ¶
func (*Task[T]) GetMetadata ¶
type TaskAction ¶
type TaskAction int
TaskAction represents the action to take for a failed task
const ( TaskActionRetry TaskAction = iota + 1 // Task will retry using its own state TaskActionForceRetry // Force a retry, ignoring retry limits TaskActionRemove // Remove the task and recycle resources TaskActionAddToDeadTasks // Add the task to dead tasks )
type TaskMetadataSetter ¶
type TaskOption ¶
TaskOption functions for configuring individual tasks
func WithTaskBounceRetry ¶
func WithTaskBounceRetry[T any]() TaskOption[T]
WithTaskBounceRetry enables retry on different workers
func WithTaskDuration ¶
func WithTaskDuration[T any](d time.Duration) TaskOption[T]
WithTaskDuration sets a per-attempt time limit for the task
func WithTaskImmediateRetry ¶
func WithTaskImmediateRetry[T any]() TaskOption[T]
WithTaskImmediateRetry allows the submitted task to retry immediately
func WithTaskMetadata ¶
func WithTaskMetadata[T any](metadata *Metadata) TaskOption[T]
WithMetadata sets metadata for the task
func WithTaskProcessedCb ¶
func WithTaskProcessedCb[T any](cb func()) TaskOption[T]
WithTaskProcessedCb sets a callback for when a task is processed
func WithTaskProcessedNotification ¶
func WithTaskProcessedNotification[T any](n *ProcessedNotification) TaskOption[T]
WithTaskProcessedNotification sets a notification for when a task is processed
func WithTaskQueuedCb ¶
func WithTaskQueuedCb[T any](cb func()) TaskOption[T]
WithTaskQueuedCb sets a callback for when a task is queued
func WithTaskQueuedNotification ¶
func WithTaskQueuedNotification[T any](n *QueuedNotification) TaskOption[T]
WithTaskQueuedNotification sets a notification for when a task is queued
func WithTaskRunningCb ¶
func WithTaskRunningCb[T any](cb func()) TaskOption[T]
WithTaskRunningCb sets a callback for when a task is running
func WithTaskTimeout ¶
func WithTaskTimeout[T any](d time.Duration) TaskOption[T]
WithTaskTimeout sets a total time limit for the task
type TaskQueue ¶
type TaskQueue[T any] interface { Enqueue(task *Task[T]) PriorityEnqueue(task *Task[T]) error // For immediate retry Dequeue() (*Task[T], bool) Length() int Clear() Drain() []*Task[T] }
TaskQueue defines the interface for task queues
type TaskQueueType ¶
type TaskQueueType int
TaskQueueType represents the type of task queue
const ( TaskQueueTypeSlice TaskQueueType = iota TaskQueueTypeRingBuffer TaskQueueTypeCircularQueue TaskQueueTypeLinkedList TaskQueueTypeGrowingRingBuffer TaskQueueTypeGrowingCircularQueue )
type TaskStateTransition ¶
type TaskStateTransition[T any] struct { FromState TaskState ToState TaskState Task *Task[T] Reason string Timestamp time.Time }
TaskStateTransition represents a state change
type WorkerFactory ¶
type WorkerSnapshot ¶
type WorkerState ¶
type WorkerState string
const ( WorkerStateCreated WorkerState = "Created" WorkerStateStarted WorkerState = "Started" WorkerStateIdle WorkerState = "Idle" WorkerStateEnqueuing WorkerState = "Enqueuing" WorkerStateProcessing WorkerState = "Processing" WorkerStatePaused WorkerState = "Paused" WorkerStateRemoved WorkerState = "Removed" )