Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) Unsubscribe ¶
func (hub *Hub) Unsubscribe(sub *subscription)
type Operation ¶
type Operation interface { IncrementTry() // Blocks the flow of execution till the operation is done. // Must be able to call Wait() from various points in code. Wait() // Done() signals that the Operation is done. Anyone waiting // with Wait() will resume. // // Unfortunate for you, this has to be idempotent. // Minimally, it has to not crash when called consecutively Done() }
Operation - an interface representing a unit of task to be done by a worker. For example, if you want to define a pool of http requests or database readers, then your operation can be a request object or an object that represents a database read. To capture the output, either publicly or privately set an output.
The interface assumes that Wait() would block until the operation is Done(). Otherwise, the operation may be a bit unpredictable.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
func NewRingBuffer ¶
func NewRingBuffer(capacity uint) *RingBuffer
func (*RingBuffer) Close ¶
func (buffer *RingBuffer) Close() error
type SubscriptionRing ¶
type SubscriptionRing struct {
// contains filtered or unexported fields
}
func NewSubscriptionRing ¶
func NewSubscriptionRing() *SubscriptionRing
func (*SubscriptionRing) Add ¶
func (rng *SubscriptionRing) Add(sub *subscription)
Add - add a subscription as a ring item to the end
func (*SubscriptionRing) Current ¶
func (rng *SubscriptionRing) Current() *subscription
func (*SubscriptionRing) Do ¶
func (rng *SubscriptionRing) Do(f func(*subscription))
func (*SubscriptionRing) Len ¶
func (rng *SubscriptionRing) Len() int
Len - returns the number of nodes in the ring (see container/ring.Ring.Len)
func (*SubscriptionRing) Next ¶
func (rng *SubscriptionRing) Next()
Next - advances the underlying ring node forward by one
func (*SubscriptionRing) Remove ¶
func (rng *SubscriptionRing) Remove(sub *subscription) bool
Remove - remove a the first occurrence, if any, of a given subscription and returns true. If the node is not removed, the function returns false.
type Worker ¶
type Worker interface { // Init - Init() error Process(op Operation) (bool, error) HandleError(error, Operation) Equal(Worker) bool Cleanup() }
Worker - an interface representing a client or processor that handles requests (operations).
For example, if you want to define a pool of map/reduce processes then a worker may be a logical unit (consisting of several logical units) that processes a request. If you want to define a pool of database clients, then a worker may be some struct that accepts a database query for an op, and returns the seeker for the row.
Such a client/processor should know how to handle errors and clean up. The main function here would be its Process function which processes a certain operation and returns if it failed and if the operation should be retried.
type WorkerPool ¶
type WorkerPool struct { Workers []Worker // contains filtered or unexported fields }
WorkerPool - represents a pool of (possibly heterogeneous) Workers who will read messages off of a queue and process them. The idea here is that the messages without curation and dispatch may go through several passes before being handled by the correct worker.
What the worker pool does, then, is to provide basic controls around spinning up the worker, assigning tasks to the worker, keeping the workers up, and shutting down the pool when everything is done.
It exposes just three methods: Start, Execute, Shutdown. With these three methods we should be able to push tasks to initialise the workers, push tasks to them, and reclaim resources when done.
func NewPool ¶
func NewPool(workers []Worker) *WorkerPool
NewPool - creates a new pool of workers. By passing it a list of workers, each will be initialised, and registered to receive messages on a queue, and restarted when some error occurs. Each will be shutdown appropriately when the shutdown sequence is called.
func (*WorkerPool) Do ¶
func (pool *WorkerPool) Do(op Operation) error
Do - executes a single operation asynchronously. This returns an error if the worker pool is not running (e.g. has not started or is shutdown) because then the channels are all closed.
func (*WorkerPool) Execute ¶
func (pool *WorkerPool) Execute(ops []Operation) (<-chan Operation, error)
Execute - executes a collection of operations (requests). The return is a channel that is closed when all the operations have either been successfully processed or failed.
If an error is returned, the channel is closed.
func (*WorkerPool) Shutdown ¶
func (pool *WorkerPool) Shutdown()
Shutdown - shuts down the actor pool so that (some of) its resources can be reused, and the workers will be notified to shutdown.
This will prevent all other requests from being executed. This does not affect any requests that have begun processing.
func (*WorkerPool) Start ¶
func (pool *WorkerPool) Start() error
Start - start the pool by setting up the workers to listen to the requests. A pool that isn't started cannot process any requests.
func (*WorkerPool) Wrap ¶
func (pool *WorkerPool) Wrap(inStream <-chan Operation) (<-chan Operation, error)
Wrap - wraps an input stream into another output stream. The idea here is if the client has a streaming input, then rather than forcing clients to batch and executing using Execute, they should be able to fire directly into an input channel and we will handle processing the operations and then as operations are done, putting them onto the output channel.
When the input stream is closed, the output stream may not close. The output stream is only closed when all the inbound requests have been handled, either successfully or unsuccessfully (but marked as done).