queue

package
v1.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: Apache-2.0 Imports: 18 Imported by: 23

Documentation

Overview

!DEPRECATED

Index

Constants

View Source
const MainQueueName = "main"

Variables

View Source
var (
	DefaultWaitLoopCheckInterval    = 125 * time.Millisecond
	DefaultDelayOnQueueIsEmpty      = 250 * time.Millisecond
	DefaultInitialDelayOnFailedTask = 5 * time.Second
	DefaultDelayOnRepeat            = 25 * time.Millisecond
)

Functions

This section is empty.

Types

type TaskCounter added in v1.10.2

type TaskCounter struct {
	// contains filtered or unexported fields
}

func NewTaskCounter added in v1.10.2

func NewTaskCounter(name string, countableTypes map[task.TaskType]struct{}, metricStorage metric.Storage) *TaskCounter

func (*TaskCounter) Add added in v1.10.2

func (tc *TaskCounter) Add(task task.Task)

func (*TaskCounter) GetReachedCap added in v1.10.2

func (tc *TaskCounter) GetReachedCap() map[string]struct{}

func (*TaskCounter) IsAnyCapReached added in v1.10.2

func (tc *TaskCounter) IsAnyCapReached() bool

func (*TaskCounter) Remove added in v1.10.2

func (tc *TaskCounter) Remove(task task.Task)

func (*TaskCounter) ResetReachedCap added in v1.10.2

func (tc *TaskCounter) ResetReachedCap()

type TaskQueue

type TaskQueue struct {
	Name    string
	Handler func(ctx context.Context, t task.Task) TaskResult
	Status  string

	// Timing settings.
	WaitLoopCheckInterval time.Duration
	DelayOnQueueIsEmpty   time.Duration
	DelayOnRepeat         time.Duration
	ExponentialBackoffFn  func(failureCount int) time.Duration
	// contains filtered or unexported fields
}

func NewTasksQueue

func NewTasksQueue(metricStorage metric.Storage, opts ...TaskQueueOption) *TaskQueue

NewTasksQueue creates a new TaskQueue with the provided options

func (*TaskQueue) AddAfter

func (q *TaskQueue) AddAfter(id string, tasks ...task.Task)

AddAfter inserts a task after the task with specified id.

func (*TaskQueue) AddBefore

func (q *TaskQueue) AddBefore(id string, newTask task.Task)

AddBefore inserts a task before the task with specified id.

func (*TaskQueue) AddFirst

func (q *TaskQueue) AddFirst(tasks ...task.Task)

AddFirst adds new head element.

func (*TaskQueue) AddLast

func (q *TaskQueue) AddLast(tasks ...task.Task)

AddLast adds new tail element.

func (*TaskQueue) CancelTaskDelay added in v1.0.11

func (q *TaskQueue) CancelTaskDelay()

CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.

func (*TaskQueue) Filter

func (q *TaskQueue) Filter(filterFn func(task.Task) bool)

Filter run filterFn on every task and remove each with false result.

func (*TaskQueue) Get

func (q *TaskQueue) Get(id string) task.Task

Get returns a task by id.

func (*TaskQueue) GetFirst

func (q *TaskQueue) GetFirst() task.Task

GetFirst returns a head element.

func (*TaskQueue) GetLast

func (q *TaskQueue) GetLast() task.Task

GetLast returns a tail element.

func (*TaskQueue) GetStatus added in v1.6.0

func (q *TaskQueue) GetStatus() string

func (*TaskQueue) IsEmpty

func (q *TaskQueue) IsEmpty() bool

func (*TaskQueue) Iterate

func (q *TaskQueue) Iterate(doFn func(task.Task))

Iterate run doFn for every task.

func (*TaskQueue) Length

func (q *TaskQueue) Length() int

func (*TaskQueue) MeasureActionTime

func (q *TaskQueue) MeasureActionTime(action string) func()

MeasureActionTime is a helper to measure execution time of queue's actions

func (*TaskQueue) Remove

func (q *TaskQueue) Remove(id string) task.Task

Remove finds element by id and deletes it.

func (*TaskQueue) RemoveFirst

func (q *TaskQueue) RemoveFirst() task.Task

RemoveFirst deletes a head element, so head is moved.

func (*TaskQueue) RemoveLast

func (q *TaskQueue) RemoveLast() task.Task

RemoveLast deletes a tail element, so tail is moved.

func (*TaskQueue) SetStatus added in v1.6.0

func (q *TaskQueue) SetStatus(status string)

func (*TaskQueue) Start

func (q *TaskQueue) Start(ctx context.Context)

func (*TaskQueue) Stop

func (q *TaskQueue) Stop()

func (*TaskQueue) String

func (q *TaskQueue) String() string

Dump tasks in queue to one line

type TaskQueueOption added in v1.10.2

type TaskQueueOption func(*TaskQueue)

TaskQueueOption defines a functional option for TaskQueue configuration

func WithCompactableTypes added in v1.10.1

func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption

WithCompactableTypes sets the compactable task types for the TaskQueue

func WithCompactionCallback added in v1.10.1

func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption

func WithContext added in v1.10.2

func WithContext(ctx context.Context) TaskQueueOption

WithContext sets the context for the TaskQueue

func WithHandler added in v1.10.2

func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption

WithHandler sets the task handler for the TaskQueue

func WithLogger added in v1.10.1

func WithLogger(logger *log.Logger) TaskQueueOption

func WithName added in v1.10.2

func WithName(name string) TaskQueueOption

WithName sets the name for the TaskQueue

type TaskQueueSet

type TaskQueueSet struct {
	MainName string

	Queues map[string]*TaskQueue
	// contains filtered or unexported fields
}

TaskQueueSet is a manager for a set of named queues

func NewTaskQueueSet

func NewTaskQueueSet() *TaskQueueSet

func (*TaskQueueSet) Add

func (tqs *TaskQueueSet) Add(queue *TaskQueue)

func (*TaskQueueSet) DoWithLock

func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
taskQueueSet.DoWithLock(func(tqs *TaskQueueSet){
   tqs.GetMain().Pop()
})

func (*TaskQueueSet) GetByName

func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue

func (*TaskQueueSet) GetMain

func (tqs *TaskQueueSet) GetMain() *TaskQueue

func (*TaskQueueSet) Iterate

func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))

Iterate run doFn for every task.

func (*TaskQueueSet) NewNamedQueue

func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption)

func (*TaskQueueSet) Remove

func (tqs *TaskQueueSet) Remove(name string)

func (*TaskQueueSet) Start

func (tqs *TaskQueueSet) Start(ctx context.Context)

func (*TaskQueueSet) StartMain

func (tqs *TaskQueueSet) StartMain(ctx context.Context)

func (*TaskQueueSet) Stop

func (tqs *TaskQueueSet) Stop()

func (*TaskQueueSet) WaitStopWithTimeout

func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)

func (*TaskQueueSet) WithContext

func (tqs *TaskQueueSet) WithContext(ctx context.Context)

func (*TaskQueueSet) WithMainName

func (tqs *TaskQueueSet) WithMainName(name string)

func (*TaskQueueSet) WithMetricStorage

func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet

type TaskQueueSlice added in v1.10.0

type TaskQueueSlice struct {
	CompactableTypes map[task.TaskType]struct{}

	Name    string
	Handler func(ctx context.Context, t task.Task) TaskResult
	Status  string

	// Callback for task compaction events
	CompactionCallback func(compactedTasks []task.Task, targetTask task.Task)

	// Timing settings.
	WaitLoopCheckInterval time.Duration
	DelayOnQueueIsEmpty   time.Duration
	DelayOnRepeat         time.Duration
	ExponentialBackoffFn  func(failureCount int) time.Duration
	// contains filtered or unexported fields
}

func NewTasksQueueSlice added in v1.10.0

func NewTasksQueueSlice() *TaskQueueSlice

func (*TaskQueueSlice) AddAfter added in v1.10.0

func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)

AddAfter inserts a task after the task with specified id.

func (*TaskQueueSlice) AddBefore added in v1.10.0

func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)

AddBefore inserts a task before the task with specified id.

func (*TaskQueueSlice) AddFirst added in v1.10.0

func (q *TaskQueueSlice) AddFirst(tasks ...task.Task)

AddFirst adds new head element.

func (*TaskQueueSlice) AddLast added in v1.10.0

func (q *TaskQueueSlice) AddLast(tasks ...task.Task)

AddLast adds new tail element.

func (*TaskQueueSlice) CancelTaskDelay added in v1.10.0

func (q *TaskQueueSlice) CancelTaskDelay()

CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.

func (*TaskQueueSlice) Filter added in v1.10.0

func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)

Filter run filterFn on every task and remove each with false result.

func (*TaskQueueSlice) Get added in v1.10.0

func (q *TaskQueueSlice) Get(id string) task.Task

Get returns a task by id.

func (*TaskQueueSlice) GetFirst added in v1.10.0

func (q *TaskQueueSlice) GetFirst() task.Task

GetFirst returns a head element.

func (*TaskQueueSlice) GetLast added in v1.10.0

func (q *TaskQueueSlice) GetLast() task.Task

GetLast returns a tail element.

func (*TaskQueueSlice) GetStatus added in v1.10.0

func (q *TaskQueueSlice) GetStatus() string

func (*TaskQueueSlice) IsEmpty added in v1.10.0

func (q *TaskQueueSlice) IsEmpty() bool

func (*TaskQueueSlice) Iterate added in v1.10.0

func (q *TaskQueueSlice) Iterate(doFn func(task.Task))

Iterate run doFn for every task.

func (*TaskQueueSlice) Length added in v1.10.0

func (q *TaskQueueSlice) Length() int

func (*TaskQueueSlice) MeasureActionTime added in v1.10.0

func (q *TaskQueueSlice) MeasureActionTime(action string) func()

MeasureActionTime is a helper to measure execution time of queue's actions

func (*TaskQueueSlice) Remove added in v1.10.0

func (q *TaskQueueSlice) Remove(id string) task.Task

Remove finds element by id and deletes it.

func (*TaskQueueSlice) RemoveFirst added in v1.10.0

func (q *TaskQueueSlice) RemoveFirst() task.Task

RemoveFirst deletes a head element, so head is moved.

func (*TaskQueueSlice) RemoveLast added in v1.10.0

func (q *TaskQueueSlice) RemoveLast() task.Task

RemoveLast deletes a tail element, so tail is moved.

func (*TaskQueueSlice) SetDebug added in v1.10.0

func (q *TaskQueueSlice) SetDebug(debug bool)

func (*TaskQueueSlice) SetStatus added in v1.10.0

func (q *TaskQueueSlice) SetStatus(status string)

func (*TaskQueueSlice) Start added in v1.10.0

func (q *TaskQueueSlice) Start(ctx context.Context)

func (*TaskQueueSlice) Stop added in v1.10.0

func (q *TaskQueueSlice) Stop()

func (*TaskQueueSlice) String added in v1.10.0

func (q *TaskQueueSlice) String() string

Dump tasks in queue to one line

func (*TaskQueueSlice) WithCompactableTypes added in v1.10.0

func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice

func (*TaskQueueSlice) WithCompactionCallback added in v1.10.0

func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice

func (*TaskQueueSlice) WithContext added in v1.10.0

func (q *TaskQueueSlice) WithContext(ctx context.Context)

func (*TaskQueueSlice) WithHandler added in v1.10.0

func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice

func (*TaskQueueSlice) WithLogger added in v1.10.0

func (q *TaskQueueSlice) WithLogger(logger *log.Logger)

func (*TaskQueueSlice) WithMetricStorage added in v1.10.0

func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice

func (*TaskQueueSlice) WithName added in v1.10.0

func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice

type TaskResult

type TaskResult struct {
	Status TaskStatus

	DelayBeforeNextTask time.Duration

	AfterHandle func()
	// contains filtered or unexported fields
}

func (*TaskResult) AddAfterTasks added in v1.10.1

func (res *TaskResult) AddAfterTasks(t ...task.Task)

func (*TaskResult) AddHeadTasks added in v1.10.1

func (res *TaskResult) AddHeadTasks(t ...task.Task)

func (*TaskResult) AddTailTasks added in v1.10.1

func (res *TaskResult) AddTailTasks(t ...task.Task)

func (*TaskResult) GetAfterTasks added in v1.10.1

func (res *TaskResult) GetAfterTasks() []task.Task

func (*TaskResult) GetHeadTasks added in v1.10.1

func (res *TaskResult) GetHeadTasks() []task.Task

func (*TaskResult) GetTailTasks added in v1.10.1

func (res *TaskResult) GetTailTasks() []task.Task

type TaskStatus added in v1.0.11

type TaskStatus string
const (
	Success TaskStatus = "Success"
	Fail    TaskStatus = "Fail"
	Repeat  TaskStatus = "Repeat"
	Keep    TaskStatus = "Keep"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL