Documentation
¶
Overview ¶
Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.
Local Shuffled Queue ¶
The shuffled queue is functionally similar to the LocalUnordered Queue (which is, in fact, a FIFO queue as a result of its implementation); however, the shuffled queue dispatches tasks randomized, using the properties of Go's map type, which is not dependent on insertion order.
Additionally this implementation does not using locking, which may improve performance for some workloads. Intentionally, the implementation retains pointers to all completed tasks, and does not cap the number of pending tasks.
Local Unordered Queue ¶
The unordered queue provides a basic, single-instance, amboy.Queue that runs jobs locally in the context of the application with no persistence layer. The unordered queue does not guarantee any particular execution order, nor does it compute dependences between jobs, but, as an implementation detail, dispatches jobs to workers in a first-in-first-out (e.g. FIFO) model.
By default, LocalUnordered uses the amboy/pool.Workers implementation of amboy.Runner interface.
Index ¶
- Constants
- func NewAdaptiveOrderedLocalQueue(workers, capacity int) amboy.Queue
- func NewLocalLimitedSize(workers, capacity int) amboy.Queue
- func NewLocalOrdered(workers int) amboy.Queue
- func NewLocalPriorityQueue(workers, capacity int) amboy.Queue
- func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)
- func NewLocalUnordered(workers int) amboy.Queue
- func NewMgoRemoteSingleQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, session *mgo.Session, ...) (amboy.QueueGroup, error)
- func NewMongoRemoteQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, client *mongo.Client, ...) (amboy.QueueGroup, error)
- func NewMongoRemoteSingleQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, client *mongo.Client, ...) (amboy.QueueGroup, error)
- func NewSQSFifoQueue(queueName string, workers int) (amboy.Queue, error)
- func NewShuffledLocal(workers, capacity int) amboy.Queue
- type Driver
- func NewInternalDriver() Driver
- func NewMgoDriver(name string, opts MongoDBOptions) Driver
- func NewMgoGroupDriver(name string, opts MongoDBOptions, group string) Driver
- func NewMongoDriver(name string, opts MongoDBOptions) Driver
- func NewMongoGroupDriver(name string, opts MongoDBOptions, group string) Driver
- func NewPriorityDriver() Driver
- func OpenNewMgoDriver(ctx context.Context, name string, opts MongoDBOptions, session *mgo.Session) (Driver, error)
- func OpenNewMgoGroupDriver(ctx context.Context, name string, opts MongoDBOptions, group string, ...) (Driver, error)
- func OpenNewMongoDriver(ctx context.Context, name string, opts MongoDBOptions, client *mongo.Client) (Driver, error)
- func OpenNewMongoGroupDriver(ctx context.Context, name string, opts MongoDBOptions, group string, ...) (Driver, error)
- type GroupCache
- type LocalQueueGroupOptions
- type LockManager
- type MongoDBOptions
- type Remote
- type RemoteQueueGroupOptions
Constants ¶
const LockTimeout = 5 * time.Minute
LockTimeout reflects the distributed lock timeout period.
Variables ¶
This section is empty.
Functions ¶
func NewAdaptiveOrderedLocalQueue ¶
NewAdaptiveOrderedLocalQueue provides a queue implementation that stores jobs in memory, and dispatches tasks based on the dependency information.
Use this implementation rather than LocalOrderedQueue when you need to add jobs *after* starting the queue, and when you want to avoid the higher potential overhead of the remote-backed queues.
func NewLocalLimitedSize ¶
NewLocalLimitedSize constructs a LocalLimitedSize queue instance with the specified number of workers and capacity.
func NewLocalOrdered ¶
NewLocalOrdered constructs an LocalOrdered object. The "workers" argument is passed to a default pool.SimplePool object.
func NewLocalPriorityQueue ¶
NewLocalPriorityQueue constructs a new priority queue instance and initializes a local worker queue with the specified number of worker processes.
func NewLocalQueueGroup ¶
func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)
NewLocalQueueGroup constructs a new local queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.
func NewLocalUnordered ¶
NewLocalUnordered is a constructor for a local queue that does not respect dependency information in dispatching queue jobs.
All jobs are stored in memory and while there is a buffer of pending work, in general the number of buffered jobs is equal to twice the size of the worker pool, up to 64 jobs.
func NewMgoRemoteSingleQueueGroup ¶
func NewMgoRemoteSingleQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, session *mgo.Session, mdbopts MongoDBOptions) (amboy.QueueGroup, error)
NewMgoRemoteSingleQueueGroup constructs a new remote queue group where all queues are stored in a single collection, using the legacy driver. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.
func NewMongoRemoteQueueGroup ¶
func NewMongoRemoteQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, client *mongo.Client, mdbopts MongoDBOptions) (amboy.QueueGroup, error)
NewMongoRemoteQueueGroup constructs a new remote queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.
The MongoRemoteQueue group creats a new collection for every queue, unlike the other remote queue group implementations. This is probably most viable for lower volume workloads; however, the caching mechanism may be more responsive in some situations.
func NewMongoRemoteSingleQueueGroup ¶
func NewMongoRemoteSingleQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, client *mongo.Client, mdbopts MongoDBOptions) (amboy.QueueGroup, error)
NewMongoRemoteSingleQueueGroup constructs a new remote queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.
func NewSQSFifoQueue ¶
NewSQSFifoQueue constructs a AWS SQS backed Queue implementation. This queue, generally is ephemeral: tasks are removed from the queue, and therefore may not handle jobs across restarts.
func NewShuffledLocal ¶
NewShuffledLocal provides a queue implementation that shuffles the order of jobs, relative the insertion order.
Types ¶
type Driver ¶
type Driver interface { ID() string Open(context.Context) error Close() Get(context.Context, string) (amboy.Job, error) Put(context.Context, amboy.Job) error Save(context.Context, amboy.Job) error SaveStatus(context.Context, amboy.Job, amboy.JobStatusInfo) error Jobs(context.Context) <-chan amboy.Job Next(context.Context) amboy.Job Stats(context.Context) amboy.QueueStats JobStats(context.Context) <-chan amboy.JobStatusInfo LockManager }
Driver describes the interface between a queue and an out of process persistence layer, like a database.
func NewInternalDriver ¶
func NewInternalDriver() Driver
NewInternalDriver creates a local persistence layer object.
func NewMgoDriver ¶
func NewMgoDriver(name string, opts MongoDBOptions) Driver
NewMgoDriver creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection
func NewMgoGroupDriver ¶
func NewMgoGroupDriver(name string, opts MongoDBOptions, group string) Driver
NewMgoGroupDriver creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection
func NewMongoDriver ¶
func NewMongoDriver(name string, opts MongoDBOptions) Driver
NewMongoDriver constructs a MongoDB backed queue driver implementation using the go.mongodb.org/mongo-driver as the database interface.
func NewMongoGroupDriver ¶
func NewMongoGroupDriver(name string, opts MongoDBOptions, group string) Driver
NewMongoGroupDriver is similar to the MongoDriver, except it prefixes job ids with a prefix and adds the group field to the documents in the database which makes it possible to manage distinct queues with a single MongoDB collection.
func NewPriorityDriver ¶
func NewPriorityDriver() Driver
NewPriorityDriver returns an initialized Priority Driver instances.
func OpenNewMgoDriver ¶
func OpenNewMgoDriver(ctx context.Context, name string, opts MongoDBOptions, session *mgo.Session) (Driver, error)
OpenNewMgoDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMgo() and calling *MongoDB.Open().
func OpenNewMgoGroupDriver ¶
func OpenNewMgoGroupDriver(ctx context.Context, name string, opts MongoDBOptions, group string, session *mgo.Session) (Driver, error)
OpenNewMgoGroupDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMgoGroup() and calling *MongoDB.Open().
func OpenNewMongoDriver ¶
func OpenNewMongoDriver(ctx context.Context, name string, opts MongoDBOptions, client *mongo.Client) (Driver, error)
OpenNewMongoDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoDriver() and calling driver.Open().
func OpenNewMongoGroupDriver ¶
func OpenNewMongoGroupDriver(ctx context.Context, name string, opts MongoDBOptions, group string, client *mongo.Client) (Driver, error)
OpenNewMongoGroupDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoGroupDriver() and calling driver.Open().
type GroupCache ¶
type GroupCache interface { Set(string, amboy.Queue, time.Duration) error Get(string) amboy.Queue Remove(context.Context, string) error Prune(context.Context) error Close(context.Context) error Names() []string Len() int }
GroupCache provides a common mechanism for managing collections of queues, for use in specific group cache situations
func NewCacheWithCleanupHook ¶
func NewCacheWithCleanupHook(ttl time.Duration, hook func(ctx context.Context, id string) error) GroupCache
NewCacheWithCleanupHook defines a cache but allows implementations to add additional cleanup logic to the prune and Close operations.
func NewGroupCache ¶
func NewGroupCache(ttl time.Duration) GroupCache
NewGroupCache produces a GroupCache implementation that supports a default TTL setting, and supports cloning and closing operations.
type LocalQueueGroupOptions ¶
type LocalQueueGroupOptions struct { Constructor func(ctx context.Context) (amboy.Queue, error) TTL time.Duration }
LocalQueueGroupOptions describe options passed to NewLocalQueueGroup.
type LockManager ¶
type LockManager interface { Lock(context.Context, amboy.Job) error Unlock(context.Context, amboy.Job) error }
LockManager describes the component of the Driver interface that handles job mutexing.
func NewLockManager ¶
func NewLockManager(ctx context.Context, d Driver) LockManager
NewLockManager configures a Lock manager for use in Driver implementations. This operation does *not* start the background thread. The name *must* be unique per driver/queue combination, to ensure that each driver/queue can have exclusive locks over jobs.
type MongoDBOptions ¶
type MongoDBOptions struct { URI string DB string Priority bool CheckWaitUntil bool SkipIndexBuilds bool Format amboy.Format WaitInterval time.Duration }
MongoDBOptions is a struct passed to the NewMgo constructor to communicate mgoDriver specific settings about the driver's behavior and operation.
func DefaultMongoDBOptions ¶
func DefaultMongoDBOptions() MongoDBOptions
DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database, and *not* using priority ordering of jobs.
type Remote ¶
Remote queues extend the queue interface to allow a pluggable-storage backend, or "driver"
func NewRemoteUnordered ¶
NewRemoteUnordered returns a queue that has been initialized with a local worker pool Runner instance of the specified size.
func NewSimpleRemoteOrdered ¶
NewSimpleRemoteOrdered returns a queue with a configured local runner with the specified number of workers.
type RemoteQueueGroupOptions ¶
type RemoteQueueGroupOptions struct { // Prefix is a string prepended to the queue collections. Prefix string // Ordered controls if an order-respecting queue will be // created, while default workers sets the defualt number of // workers new queues will have if the WorkerPoolSize function // is not set. Ordered bool DefaultWorkers int // WorkerPoolSize determines how many works will be allocated // to each queue, based on the queue ID passed to it. WorkerPoolSize func(string) int // PruneFrequency is how often Prune runs by default. PruneFrequency time.Duration // BackgroundCreateFrequency is how often the background queue // creation runs, in the case that queues may be created in // the background without BackgroundCreateFrequency time.Duration // TTL is how old the oldest task in the queue must be for the collection to be pruned. TTL time.Duration }
RemoteQueueGroupOptions describe options passed to NewRemoteQueueGroup.
Source Files
¶
- adaptive_order.go
- adaptive_order_storage.go
- doc.go
- driver.go
- driver_group_mgo.go
- driver_group_mongo.go
- driver_internal.go
- driver_locker.go
- driver_mgo.go
- driver_mongo.go
- driver_priority.go
- fixed.go
- fixed_storage.go
- group_local.go
- group_remote_mgo_single.go
- group_remote_mongo.go
- group_remote_mongo_single.go
- group_util.go
- ordered.go
- priority.go
- priority_storage.go
- remote.go
- remote_base.go
- remote_ordered.go
- shuffled.go
- sqs.go
- unordered.go
- util.go