queue

package
v1.11.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2020 License: MIT Imports: 13 Imported by: 17

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

func IsErrInvalidConfiguration ¶

func IsErrInvalidConfiguration(err error) bool

IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration

func RegisteredTypesAsString ¶

func RegisteredTypesAsString() []string

RegisteredTypesAsString provides the list of requested types of queues

Types ¶

type ChannelQueue ¶

type ChannelQueue struct {
	// contains filtered or unexported fields
}

ChannelQueue implements

func (*ChannelQueue) Name ¶

func (c *ChannelQueue) Name() string

Name returns the name of this queue

func (*ChannelQueue) Push ¶

func (c *ChannelQueue) Push(data Data) error

Push will push data into the queue

func (*ChannelQueue) Run ¶

func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

type ChannelQueueConfiguration ¶

type ChannelQueueConfiguration struct {
	QueueLength  int
	BatchLength  int
	Workers      int
	MaxWorkers   int
	BlockTimeout time.Duration
	BoostTimeout time.Duration
	BoostWorkers int
	Name         string
}

ChannelQueueConfiguration is the configuration for a ChannelQueue

type Data ¶

type Data interface{}

Data defines an type of queuable data

type DummyQueue ¶

type DummyQueue struct {
}

DummyQueue represents an empty queue

func (*DummyQueue) Push ¶

func (b *DummyQueue) Push(Data) error

Push pushes data to the queue

func (*DummyQueue) Run ¶

func (b *DummyQueue) Run(_, _ func(context.Context, func()))

Run starts to run the queue

type ErrInvalidConfiguration ¶

type ErrInvalidConfiguration struct {
	// contains filtered or unexported fields
}

ErrInvalidConfiguration is called when there is invalid configuration for a queue

func (ErrInvalidConfiguration) Error ¶

func (err ErrInvalidConfiguration) Error() string

type HandlerFunc ¶

type HandlerFunc func(...Data)

HandlerFunc is a function that takes a variable amount of data and processes it

type LevelQueue ¶

type LevelQueue struct {
	// contains filtered or unexported fields
}

LevelQueue implements a disk library queue

func (*LevelQueue) Name ¶

func (l *LevelQueue) Name() string

Name returns the name of this queue

func (*LevelQueue) Push ¶

func (l *LevelQueue) Push(data Data) error

Push will push the indexer data to queue

func (*LevelQueue) Run ¶

func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

func (*LevelQueue) Shutdown ¶

func (l *LevelQueue) Shutdown()

Shutdown this queue and stop processing

func (*LevelQueue) Terminate ¶

func (l *LevelQueue) Terminate()

Terminate this queue and close the queue

type LevelQueueConfiguration ¶

type LevelQueueConfiguration struct {
	DataDir      string
	QueueLength  int
	BatchLength  int
	Workers      int
	MaxWorkers   int
	BlockTimeout time.Duration
	BoostTimeout time.Duration
	BoostWorkers int
	Name         string
}

LevelQueueConfiguration is the configuration for a LevelQueue

type ManagedPool ¶

type ManagedPool interface {
	AddWorkers(number int, timeout time.Duration) context.CancelFunc
	NumberOfWorkers() int
	MaxNumberOfWorkers() int
	SetMaxNumberOfWorkers(int)
	BoostTimeout() time.Duration
	BlockTimeout() time.Duration
	BoostWorkers() int
	SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}

ManagedPool is a simple interface to get certain details from a worker pool

type ManagedQueue ¶

type ManagedQueue struct {
	QID           int64
	Queue         Queue
	Type          Type
	Name          string
	Configuration interface{}
	ExemplarType  string
	Pool          ManagedPool

	PoolWorkers map[int64]*PoolWorkers
	// contains filtered or unexported fields
}

ManagedQueue represents a working queue inheriting from Gitea.

func (*ManagedQueue) AddWorkers ¶

func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc

AddWorkers adds workers to the queue if it has registered an add worker function

func (*ManagedQueue) BlockTimeout ¶

func (q *ManagedQueue) BlockTimeout() time.Duration

BlockTimeout returns the timeout til the next boost

func (*ManagedQueue) BoostTimeout ¶

func (q *ManagedQueue) BoostTimeout() time.Duration

BoostTimeout returns the timeout of the next boost

func (*ManagedQueue) BoostWorkers ¶

func (q *ManagedQueue) BoostWorkers() int

BoostWorkers returns the number of workers for a boost

func (*ManagedQueue) CancelWorkers ¶

func (q *ManagedQueue) CancelWorkers(pid int64)

CancelWorkers cancels pooled workers with pid

func (*ManagedQueue) MaxNumberOfWorkers ¶

func (q *ManagedQueue) MaxNumberOfWorkers() int

MaxNumberOfWorkers returns the maximum number of workers for the pool

func (*ManagedQueue) NumberOfWorkers ¶

func (q *ManagedQueue) NumberOfWorkers() int

NumberOfWorkers returns the number of workers in the queue

func (*ManagedQueue) RegisterWorkers ¶

func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64

RegisterWorkers registers workers to this queue

func (*ManagedQueue) RemoveWorkers ¶

func (q *ManagedQueue) RemoveWorkers(pid int64)

RemoveWorkers deletes pooled workers with pid

func (*ManagedQueue) SetSettings ¶

func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)

SetSettings sets the setable boost values

func (*ManagedQueue) Workers ¶

func (q *ManagedQueue) Workers() []*PoolWorkers

Workers returns the poolworkers

type ManagedQueueList ¶

type ManagedQueueList []*ManagedQueue

ManagedQueueList implements the sort.Interface

func (ManagedQueueList) Len ¶

func (l ManagedQueueList) Len() int

func (ManagedQueueList) Less ¶

func (l ManagedQueueList) Less(i, j int) bool

func (ManagedQueueList) Swap ¶

func (l ManagedQueueList) Swap(i, j int)

type Manager ¶

type Manager struct {
	Queues map[int64]*ManagedQueue
	// contains filtered or unexported fields
}

Manager is a queue manager

func GetManager ¶

func GetManager() *Manager

GetManager returns a Manager and initializes one as singleton if there's none yet

func (*Manager) Add ¶

func (m *Manager) Add(queue Queue,
	t Type,
	configuration,
	exemplar interface{},
	pool ManagedPool) int64

Add adds a queue to this manager

func (*Manager) GetManagedQueue ¶

func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue

GetManagedQueue by qid

func (*Manager) ManagedQueues ¶

func (m *Manager) ManagedQueues() []*ManagedQueue

ManagedQueues returns the managed queues

func (*Manager) Remove ¶

func (m *Manager) Remove(qid int64)

Remove a queue from the Manager

type Named ¶

type Named interface {
	Name() string
}

Named represents a queue with a name

type NewQueueFunc ¶

type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)

NewQueueFunc is a function that creates a queue

type PersistableChannelQueue ¶

type PersistableChannelQueue struct {
	*ChannelQueue
	// contains filtered or unexported fields
}

PersistableChannelQueue wraps a channel queue and level queue together

func (*PersistableChannelQueue) Name ¶

func (p *PersistableChannelQueue) Name() string

Name returns the name of this queue

func (*PersistableChannelQueue) Push ¶

func (p *PersistableChannelQueue) Push(data Data) error

Push will push the indexer data to queue

func (*PersistableChannelQueue) Run ¶

func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

func (*PersistableChannelQueue) Shutdown ¶

func (p *PersistableChannelQueue) Shutdown()

Shutdown processing this queue

func (*PersistableChannelQueue) Terminate ¶

func (p *PersistableChannelQueue) Terminate()

Terminate this queue and close the queue

type PersistableChannelQueueConfiguration ¶

type PersistableChannelQueueConfiguration struct {
	Name         string
	DataDir      string
	BatchLength  int
	QueueLength  int
	Timeout      time.Duration
	MaxAttempts  int
	Workers      int
	MaxWorkers   int
	BlockTimeout time.Duration
	BoostTimeout time.Duration
	BoostWorkers int
}

PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue

type PoolWorkers ¶

type PoolWorkers struct {
	PID        int64
	Workers    int
	Start      time.Time
	Timeout    time.Time
	HasTimeout bool
	Cancel     context.CancelFunc
}

PoolWorkers represents a working queue inheriting from Gitea.

type PoolWorkersList ¶

type PoolWorkersList []*PoolWorkers

PoolWorkersList implements the sort.Interface

func (PoolWorkersList) Len ¶

func (l PoolWorkersList) Len() int

func (PoolWorkersList) Less ¶

func (l PoolWorkersList) Less(i, j int) bool

func (PoolWorkersList) Swap ¶

func (l PoolWorkersList) Swap(i, j int)

type Queue ¶

type Queue interface {
	Run(atShutdown, atTerminate func(context.Context, func()))
	Push(Data) error
}

Queue defines an interface to save an issue indexer queue

func CreateQueue ¶

func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue

CreateQueue for name with provided handler and exemplar

func NewChannelQueue ¶

func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewChannelQueue create a memory channel queue

func NewDummyQueue ¶

func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)

NewDummyQueue creates a new DummyQueue

func NewLevelQueue ¶

func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewLevelQueue creates a ledis local queue

func NewPersistableChannelQueue ¶

func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate

func NewQueue ¶

func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)

NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error

func NewRedisQueue ¶

func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewRedisQueue creates single redis or cluster redis queue

func NewWrappedQueue ¶

func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewWrappedQueue will attempt to create a queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedQueue with delayed startup of the queue instead and a channel which will be redirected to the queue

type RedisQueue ¶

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue redis queue

func (*RedisQueue) Name ¶

func (r *RedisQueue) Name() string

Name returns the name of this queue

func (*RedisQueue) Push ¶

func (r *RedisQueue) Push(data Data) error

Push implements Queue

func (*RedisQueue) Run ¶

func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run runs the redis queue

func (*RedisQueue) Shutdown ¶

func (r *RedisQueue) Shutdown()

Shutdown processing from this queue

func (*RedisQueue) Terminate ¶

func (r *RedisQueue) Terminate()

Terminate this queue and close the queue

type RedisQueueConfiguration ¶

type RedisQueueConfiguration struct {
	Network      string
	Addresses    string
	Password     string
	DBIndex      int
	BatchLength  int
	QueueLength  int
	QueueName    string
	Workers      int
	MaxWorkers   int
	BlockTimeout time.Duration
	BoostTimeout time.Duration
	BoostWorkers int
	Name         string
}

RedisQueueConfiguration is the configuration for the redis queue

type Shutdownable ¶

type Shutdownable interface {
	Shutdown()
	Terminate()
}

Shutdownable represents a queue that can be shutdown

type Type ¶

type Type string

Type is a type of Queue

const ChannelQueueType Type = "channel"

ChannelQueueType is the type for channel queue

const DummyQueueType Type = "dummy"

DummyQueueType is the type for the dummy queue

const LevelQueueType Type = "level"

LevelQueueType is the type for level queue

const PersistableChannelQueueType Type = "persistable-channel"

PersistableChannelQueueType is the type for persistable queue

const RedisQueueType Type = "redis"

RedisQueueType is the type for redis queue

const WrappedQueueType Type = "wrapped"

WrappedQueueType is the type for a wrapped delayed starting queue

func RegisteredTypes ¶

func RegisteredTypes() []Type

RegisteredTypes provides the list of requested types of queues

type WorkerPool ¶

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool takes

func (*WorkerPool) AddWorkers ¶

func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc

AddWorkers adds workers to the pool - this allows the number of workers to go above the limit

func (*WorkerPool) BlockTimeout ¶

func (p *WorkerPool) BlockTimeout() time.Duration

BlockTimeout returns the timeout til the next boost

func (*WorkerPool) BoostTimeout ¶

func (p *WorkerPool) BoostTimeout() time.Duration

BoostTimeout returns the timeout of the next boost

func (*WorkerPool) BoostWorkers ¶

func (p *WorkerPool) BoostWorkers() int

BoostWorkers returns the number of workers for a boost

func (*WorkerPool) CleanUp ¶

func (p *WorkerPool) CleanUp(ctx context.Context)

CleanUp will drain the remaining contents of the channel This should be called after AddWorkers context is closed

func (*WorkerPool) MaxNumberOfWorkers ¶

func (p *WorkerPool) MaxNumberOfWorkers() int

MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool

func (*WorkerPool) NumberOfWorkers ¶

func (p *WorkerPool) NumberOfWorkers() int

NumberOfWorkers returns the number of current workers in the pool

func (*WorkerPool) Push ¶

func (p *WorkerPool) Push(data Data)

Push pushes the data to the internal channel

func (*WorkerPool) SetMaxNumberOfWorkers ¶

func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)

SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool Changing this number will not change the number of current workers but will change the limit for future additions

func (*WorkerPool) SetSettings ¶

func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)

SetSettings sets the setable boost values

func (*WorkerPool) Wait ¶

func (p *WorkerPool) Wait()

Wait for WorkerPool to finish

type WrappedQueue ¶

type WrappedQueue struct {
	// contains filtered or unexported fields
}

WrappedQueue wraps a delayed starting queue

func (*WrappedQueue) Name ¶

func (q *WrappedQueue) Name() string

Name returns the name of the queue

func (*WrappedQueue) Push ¶

func (q *WrappedQueue) Push(data Data) error

Push will push the data to the internal channel checking it against the exemplar

func (*WrappedQueue) Run ¶

func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue and attempts to create the internal queue

func (*WrappedQueue) Shutdown ¶

func (q *WrappedQueue) Shutdown()

Shutdown this queue and stop processing

func (*WrappedQueue) Terminate ¶

func (q *WrappedQueue) Terminate()

Terminate this queue and close the queue

type WrappedQueueConfiguration ¶

type WrappedQueueConfiguration struct {
	Underlying  Type
	Timeout     time.Duration
	MaxAttempts int
	Config      interface{}
	QueueLength int
	Name        string
}

WrappedQueueConfiguration is the configuration for a WrappedQueue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL