Documentation
¶
Overview ¶
!DEPRECATED
Index ¶
- Constants
- Variables
- type TaskCounter
- type TaskQueue
- func (q *TaskQueue) AddAfter(id string, tasks ...task.Task)
- func (q *TaskQueue) AddBefore(id string, newTask task.Task)
- func (q *TaskQueue) AddFirst(tasks ...task.Task)
- func (q *TaskQueue) AddLast(tasks ...task.Task)
- func (q *TaskQueue) CancelTaskDelay()
- func (q *TaskQueue) Filter(filterFn func(task.Task) bool)
- func (q *TaskQueue) Get(id string) task.Task
- func (q *TaskQueue) GetFirst() task.Task
- func (q *TaskQueue) GetLast() task.Task
- func (q *TaskQueue) GetStatus() string
- func (q *TaskQueue) IsEmpty() bool
- func (q *TaskQueue) Iterate(doFn func(task.Task))
- func (q *TaskQueue) Length() int
- func (q *TaskQueue) MeasureActionTime(action string) func()
- func (q *TaskQueue) Remove(id string) task.Task
- func (q *TaskQueue) RemoveFirst() task.Task
- func (q *TaskQueue) RemoveLast() task.Task
- func (q *TaskQueue) SetStatus(status string)
- func (q *TaskQueue) Start(ctx context.Context)
- func (q *TaskQueue) Stop()
- func (q *TaskQueue) String() string
- type TaskQueueOption
- func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
- func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
- func WithContext(ctx context.Context) TaskQueueOption
- func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
- func WithLogger(logger *log.Logger) TaskQueueOption
- func WithName(name string) TaskQueueOption
- type TaskQueueSet
- func (tqs *TaskQueueSet) Add(queue *TaskQueue)
- func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
- func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
- func (tqs *TaskQueueSet) GetMain() *TaskQueue
- func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))
- func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, ...)
- func (tqs *TaskQueueSet) Remove(name string)
- func (tqs *TaskQueueSet) Start(ctx context.Context)
- func (tqs *TaskQueueSet) StartMain(ctx context.Context)
- func (tqs *TaskQueueSet) Stop()
- func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
- func (tqs *TaskQueueSet) WithContext(ctx context.Context)
- func (tqs *TaskQueueSet) WithMainName(name string)
- func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet
- type TaskQueueSlice
- func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)
- func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)
- func (q *TaskQueueSlice) AddFirst(tasks ...task.Task)
- func (q *TaskQueueSlice) AddLast(tasks ...task.Task)
- func (q *TaskQueueSlice) CancelTaskDelay()
- func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)
- func (q *TaskQueueSlice) Get(id string) task.Task
- func (q *TaskQueueSlice) GetFirst() task.Task
- func (q *TaskQueueSlice) GetLast() task.Task
- func (q *TaskQueueSlice) GetStatus() string
- func (q *TaskQueueSlice) IsEmpty() bool
- func (q *TaskQueueSlice) Iterate(doFn func(task.Task))
- func (q *TaskQueueSlice) Length() int
- func (q *TaskQueueSlice) MeasureActionTime(action string) func()
- func (q *TaskQueueSlice) Remove(id string) task.Task
- func (q *TaskQueueSlice) RemoveFirst() task.Task
- func (q *TaskQueueSlice) RemoveLast() task.Task
- func (q *TaskQueueSlice) SetDebug(debug bool)
- func (q *TaskQueueSlice) SetStatus(status string)
- func (q *TaskQueueSlice) Start(ctx context.Context)
- func (q *TaskQueueSlice) Stop()
- func (q *TaskQueueSlice) String() string
- func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice
- func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice
- func (q *TaskQueueSlice) WithContext(ctx context.Context)
- func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice
- func (q *TaskQueueSlice) WithLogger(logger *log.Logger)
- func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice
- func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice
- type TaskResult
- func (res *TaskResult) AddAfterTasks(t ...task.Task)
- func (res *TaskResult) AddHeadTasks(t ...task.Task)
- func (res *TaskResult) AddTailTasks(t ...task.Task)
- func (res *TaskResult) GetAfterTasks() []task.Task
- func (res *TaskResult) GetHeadTasks() []task.Task
- func (res *TaskResult) GetTailTasks() []task.Task
- type TaskStatus
Constants ¶
const MainQueueName = "main"
Variables ¶
var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond DefaultInitialDelayOnFailedTask = 5 * time.Second DefaultDelayOnRepeat = 25 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type TaskCounter ¶ added in v1.10.2
type TaskCounter struct {
// contains filtered or unexported fields
}
func NewTaskCounter ¶ added in v1.10.2
func (*TaskCounter) Add ¶ added in v1.10.2
func (tc *TaskCounter) Add(task task.Task)
func (*TaskCounter) GetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) GetReachedCap() map[string]struct{}
func (*TaskCounter) IsAnyCapReached ¶ added in v1.10.2
func (tc *TaskCounter) IsAnyCapReached() bool
func (*TaskCounter) Remove ¶ added in v1.10.2
func (tc *TaskCounter) Remove(task task.Task)
func (*TaskCounter) ResetReachedCap ¶ added in v1.10.2
func (tc *TaskCounter) ResetReachedCap()
type TaskQueue ¶
type TaskQueue struct { Name string Handler func(ctx context.Context, t task.Task) TaskResult Status string // Timing settings. WaitLoopCheckInterval time.Duration DelayOnQueueIsEmpty time.Duration DelayOnRepeat time.Duration ExponentialBackoffFn func(failureCount int) time.Duration // contains filtered or unexported fields }
func NewTasksQueue ¶
func NewTasksQueue(metricStorage metric.Storage, opts ...TaskQueueOption) *TaskQueue
NewTasksQueue creates a new TaskQueue with the provided options
func (*TaskQueue) CancelTaskDelay ¶ added in v1.0.11
func (q *TaskQueue) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueue) MeasureActionTime ¶
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueue) RemoveFirst ¶
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueue) RemoveLast ¶
RemoveLast deletes a tail element, so tail is moved.
type TaskQueueOption ¶ added in v1.10.2
type TaskQueueOption func(*TaskQueue)
TaskQueueOption defines a functional option for TaskQueue configuration
func WithCompactableTypes ¶ added in v1.10.1
func WithCompactableTypes(taskTypes ...task.TaskType) TaskQueueOption
WithCompactableTypes sets the compactable task types for the TaskQueue
func WithCompactionCallback ¶ added in v1.10.1
func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) TaskQueueOption
func WithContext ¶ added in v1.10.2
func WithContext(ctx context.Context) TaskQueueOption
WithContext sets the context for the TaskQueue
func WithHandler ¶ added in v1.10.2
func WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) TaskQueueOption
WithHandler sets the task handler for the TaskQueue
func WithLogger ¶ added in v1.10.1
func WithLogger(logger *log.Logger) TaskQueueOption
func WithName ¶ added in v1.10.2
func WithName(name string) TaskQueueOption
WithName sets the name for the TaskQueue
type TaskQueueSet ¶
type TaskQueueSet struct { MainName string Queues map[string]*TaskQueue // contains filtered or unexported fields }
TaskQueueSet is a manager for a set of named queues
func NewTaskQueueSet ¶
func NewTaskQueueSet() *TaskQueueSet
func (*TaskQueueSet) Add ¶
func (tqs *TaskQueueSet) Add(queue *TaskQueue)
func (*TaskQueueSet) DoWithLock ¶
func (tqs *TaskQueueSet) DoWithLock(fn func(tqs *TaskQueueSet))
taskQueueSet.DoWithLock(func(tqs *TaskQueueSet){ tqs.GetMain().Pop() })
func (*TaskQueueSet) GetByName ¶
func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue
func (*TaskQueueSet) GetMain ¶
func (tqs *TaskQueueSet) GetMain() *TaskQueue
func (*TaskQueueSet) Iterate ¶
func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue))
Iterate run doFn for every task.
func (*TaskQueueSet) NewNamedQueue ¶
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption)
func (*TaskQueueSet) Remove ¶
func (tqs *TaskQueueSet) Remove(name string)
func (*TaskQueueSet) Start ¶
func (tqs *TaskQueueSet) Start(ctx context.Context)
func (*TaskQueueSet) StartMain ¶
func (tqs *TaskQueueSet) StartMain(ctx context.Context)
func (*TaskQueueSet) Stop ¶
func (tqs *TaskQueueSet) Stop()
func (*TaskQueueSet) WaitStopWithTimeout ¶
func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration)
func (*TaskQueueSet) WithContext ¶
func (tqs *TaskQueueSet) WithContext(ctx context.Context)
func (*TaskQueueSet) WithMainName ¶
func (tqs *TaskQueueSet) WithMainName(name string)
func (*TaskQueueSet) WithMetricStorage ¶
func (tqs *TaskQueueSet) WithMetricStorage(mstor metric.Storage) *TaskQueueSet
type TaskQueueSlice ¶ added in v1.10.0
type TaskQueueSlice struct { CompactableTypes map[task.TaskType]struct{} Name string Handler func(ctx context.Context, t task.Task) TaskResult Status string // Callback for task compaction events CompactionCallback func(compactedTasks []task.Task, targetTask task.Task) // Timing settings. WaitLoopCheckInterval time.Duration DelayOnQueueIsEmpty time.Duration DelayOnRepeat time.Duration ExponentialBackoffFn func(failureCount int) time.Duration // contains filtered or unexported fields }
func NewTasksQueueSlice ¶ added in v1.10.0
func NewTasksQueueSlice() *TaskQueueSlice
func (*TaskQueueSlice) AddAfter ¶ added in v1.10.0
func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task)
AddAfter inserts a task after the task with specified id.
func (*TaskQueueSlice) AddBefore ¶ added in v1.10.0
func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task)
AddBefore inserts a task before the task with specified id.
func (*TaskQueueSlice) AddFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) AddFirst(tasks ...task.Task)
AddFirst adds new head element.
func (*TaskQueueSlice) AddLast ¶ added in v1.10.0
func (q *TaskQueueSlice) AddLast(tasks ...task.Task)
AddLast adds new tail element.
func (*TaskQueueSlice) CancelTaskDelay ¶ added in v1.10.0
func (q *TaskQueueSlice) CancelTaskDelay()
CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay.
func (*TaskQueueSlice) Filter ¶ added in v1.10.0
func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool)
Filter run filterFn on every task and remove each with false result.
func (*TaskQueueSlice) Get ¶ added in v1.10.0
func (q *TaskQueueSlice) Get(id string) task.Task
Get returns a task by id.
func (*TaskQueueSlice) GetFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) GetFirst() task.Task
GetFirst returns a head element.
func (*TaskQueueSlice) GetLast ¶ added in v1.10.0
func (q *TaskQueueSlice) GetLast() task.Task
GetLast returns a tail element.
func (*TaskQueueSlice) GetStatus ¶ added in v1.10.0
func (q *TaskQueueSlice) GetStatus() string
func (*TaskQueueSlice) IsEmpty ¶ added in v1.10.0
func (q *TaskQueueSlice) IsEmpty() bool
func (*TaskQueueSlice) Iterate ¶ added in v1.10.0
func (q *TaskQueueSlice) Iterate(doFn func(task.Task))
Iterate run doFn for every task.
func (*TaskQueueSlice) Length ¶ added in v1.10.0
func (q *TaskQueueSlice) Length() int
func (*TaskQueueSlice) MeasureActionTime ¶ added in v1.10.0
func (q *TaskQueueSlice) MeasureActionTime(action string) func()
MeasureActionTime is a helper to measure execution time of queue's actions
func (*TaskQueueSlice) Remove ¶ added in v1.10.0
func (q *TaskQueueSlice) Remove(id string) task.Task
Remove finds element by id and deletes it.
func (*TaskQueueSlice) RemoveFirst ¶ added in v1.10.0
func (q *TaskQueueSlice) RemoveFirst() task.Task
RemoveFirst deletes a head element, so head is moved.
func (*TaskQueueSlice) RemoveLast ¶ added in v1.10.0
func (q *TaskQueueSlice) RemoveLast() task.Task
RemoveLast deletes a tail element, so tail is moved.
func (*TaskQueueSlice) SetDebug ¶ added in v1.10.0
func (q *TaskQueueSlice) SetDebug(debug bool)
func (*TaskQueueSlice) SetStatus ¶ added in v1.10.0
func (q *TaskQueueSlice) SetStatus(status string)
func (*TaskQueueSlice) Start ¶ added in v1.10.0
func (q *TaskQueueSlice) Start(ctx context.Context)
func (*TaskQueueSlice) Stop ¶ added in v1.10.0
func (q *TaskQueueSlice) Stop()
func (*TaskQueueSlice) String ¶ added in v1.10.0
func (q *TaskQueueSlice) String() string
Dump tasks in queue to one line
func (*TaskQueueSlice) WithCompactableTypes ¶ added in v1.10.0
func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice
func (*TaskQueueSlice) WithCompactionCallback ¶ added in v1.10.0
func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice
func (*TaskQueueSlice) WithContext ¶ added in v1.10.0
func (q *TaskQueueSlice) WithContext(ctx context.Context)
func (*TaskQueueSlice) WithHandler ¶ added in v1.10.0
func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice
func (*TaskQueueSlice) WithLogger ¶ added in v1.10.0
func (q *TaskQueueSlice) WithLogger(logger *log.Logger)
func (*TaskQueueSlice) WithMetricStorage ¶ added in v1.10.0
func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice
func (*TaskQueueSlice) WithName ¶ added in v1.10.0
func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice
type TaskResult ¶
type TaskResult struct { Status TaskStatus DelayBeforeNextTask time.Duration AfterHandle func() // contains filtered or unexported fields }
func (*TaskResult) AddAfterTasks ¶ added in v1.10.1
func (res *TaskResult) AddAfterTasks(t ...task.Task)
func (*TaskResult) AddHeadTasks ¶ added in v1.10.1
func (res *TaskResult) AddHeadTasks(t ...task.Task)
func (*TaskResult) AddTailTasks ¶ added in v1.10.1
func (res *TaskResult) AddTailTasks(t ...task.Task)
func (*TaskResult) GetAfterTasks ¶ added in v1.10.1
func (res *TaskResult) GetAfterTasks() []task.Task
func (*TaskResult) GetHeadTasks ¶ added in v1.10.1
func (res *TaskResult) GetHeadTasks() []task.Task
func (*TaskResult) GetTailTasks ¶ added in v1.10.1
func (res *TaskResult) GetTailTasks() []task.Task
type TaskStatus ¶ added in v1.0.11
type TaskStatus string
const ( Success TaskStatus = "Success" Fail TaskStatus = "Fail" Repeat TaskStatus = "Repeat" Keep TaskStatus = "Keep" )