Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( DefaultMaxItems = uint64(100) // maximum no of items packed inside a Batch DefaultMaxWait = time.Duration(30) * time.Second //seconds DefaultBatchNo = int32(1) )
var (
DefaultWorkerPool = 10
)
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct { Item chan interface{} Id int Semaphore *Semaphore Islocked bool Producer *BatchProducer Consumer *BatchConsumer Log *log.Logger }
Batch struct defines the structure payload for a Batch.
Item: channel that contains the Resources object from the client. Id: Each item that a client send for the processing marked with Id. Semaphore: The ReadWrite locks handle by the Semaphore object, it helps to synchronize the batch processing session. Islocked: Whenever the batch processing session starts, Islocked changes to [true], so it will restrict the concurrent batch processing. Producer: The BatchItem object send to the Producer for further processing. Consumer: The Consumer arranges the prepared []BatchItems for the Workerline. Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.
func NewBatch ¶
func NewBatch(opts ...BatchOptions) *Batch
NewBatch creates a new Batch object with BatchProducer & BatchConsumer. The BatchOptions sets the MaxItems for a batch and maximum wait time for a batch to complete set by MaxWait.
func (*Batch) Close ¶
func (b *Batch) Close()
Close is the exit function to terminate the batch processing.
func (*Batch) ReadItems ¶
func (b *Batch) ReadItems()
ReadItems function will run infinitely to listen to the Resource channel and the received object marshaled with BatchItem and then send to the Producer Watcher channel for further processing.
func (*Batch) SetDebugLogLevel ¶ added in v1.0.4
func (b *Batch) SetDebugLogLevel()
SetLogLevel [Info:Debug]
func (*Batch) StartBatchProcessing ¶
func (b *Batch) StartBatchProcessing()
StartBatchProcessing function to begin the BatchProcessing library and to start the Producer/ Consumer listeners. The ReadItems goroutine will receive the item from a source that keeps listening infinitely.
type BatchConsumer ¶
type BatchConsumer struct { ConsumerCh chan []BatchItems BatchWorkerCh chan []BatchItems Supply *BatchSupply Workerline *sync.WaitGroup TerminateCh chan os.Signal Quit chan bool Log *log.Logger }
BatchConsumer struct defines the Consumer line for the Batch processing. It has the Workerline that manages the concurrent scenarios where a large set of []BatchItems needs to be send to client.
ConsumerCh: It receives the []BatchItems from the Producer line. BatchWorkerCh: It has set of workers that manages the concurrent work under Workerline [sync.WaitGroup]. Supply: The final chain in the batch processing that sends the []BatchItems to the client. Workerline: It's WaitGroup that synchronizes the workers to send the []BatchItems to the supply chain. TerminateCh: To handle the graceful shutdown, this channel will listen to the os.Signal and terminate processing accordingly. Quit: It's the exit channel for the Consumer to end the processing Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.
func NewBatchConsumer ¶
func NewBatchConsumer() *BatchConsumer
NewBatchConsumer defines several types of production channels those are works at a different stages to release a Batch to the client. The ConsumerCh received the Batch and send it to the Workers channel. Then, the Workerline arranges the worker under a waitGroup to release the Batch to the Supply channel.
The BatchSupply has a bidirectional channel that requests a Batch from the Worker channel and receives a Batch via response channel. Also, BatchSupply has a Client channel that sends the released Batch to the Client. The client needs to listen to the ClientSupplyCh to receive batch instantly.
func (*BatchConsumer) ConsumerBatch ¶
func (c *BatchConsumer) ConsumerBatch(ctx context.Context)
ConsumerBatch has the <-c.ConsumerCh receive channel to receives the newly created []BatchItems. After that, the []BatchItems gets send to the WorkerCh to send the batch item to the supply line.
This also supports the termination of the Consumer line in case of graceful shutdown or to exit the batch processing forcefully.
<-ctx.Done(): get called during a graceful shutdown scenarios and closes the worker channel <-c.Quit: Exit the batch processing during a forceful request from the client.
func (*BatchConsumer) ConsumerFunc ¶
func (c *BatchConsumer) ConsumerFunc(items []BatchItems)
ConsumerFunc works as a callback function for the Producer line to send the released []BatchItems to the Consumer and then the batch items send to the ConsumerCh channel for further processing.
func (*BatchConsumer) GetBatchSupply ¶
func (c *BatchConsumer) GetBatchSupply()
GetBatchSupply request the WorkerChannel for the released []BatchItems. The BatchSupplyChannel works as a bidirectional channel to request/response for the final []BatchItems product. The ClientSupplyChannel will send the []BatchItems to the client.
func (*BatchConsumer) Shutdown ¶
func (c *BatchConsumer) Shutdown()
func (*BatchConsumer) StartConsumer ¶
func (c *BatchConsumer) StartConsumer()
StartConsumer will create the Wokerpool [DefaultWorkerPool: 10] to handle the large set of []BatchItems that gets created fequently in highly concurrent scenarios. Also, starts the ConsumerCh channel listener to the incoming []BatchItems from the Producer line.
signal.Notify(c.TerminateCh, syscall.SIGINT, syscall.SIGTERM) <-c.TerminateCh
To handle the graceful shutdown, the BatchConsumer supports os.Signal. So, the TerminateCh works as a terminate channel in case of certain os.Signal received [syscall.SIGINT, syscall.SIGTERM]. This logic will help the Workerline to complete the remaining work before going for a shutdown.
func (*BatchConsumer) WorkerFunc ¶
func (c *BatchConsumer) WorkerFunc(index int)
WorkerFunc is the final production of []BatchItems. Each WorkerChannel sends their released []BatchItems to the SupplyChannel.
type BatchItems ¶
type BatchItems struct { Id int `json:"id"` BatchNo int `json:"batchNo"` Item interface{} `json:"item"` }
BatchItems struct defines the each batch item payload with an Id and relates to an overall BatchNo
type BatchOptions ¶
type BatchOptions func(b *BatchProducer)
func WithMaxItems ¶
func WithMaxItems(maxItems uint64) BatchOptions
func WithMaxWait ¶
func WithMaxWait(maxWait time.Duration) BatchOptions
type BatchProducer ¶
type BatchProducer struct { Watcher chan *BatchItems MaxItems uint64 BatchNo int32 MaxWait time.Duration ConsumerFunc ConsumerFunc Quit chan bool Log *log.Logger }
BatchProducer struct defines the Producers fields that requires to create a []BatchItems object.
Watcher: The receiver channel that gets the BatchItems marshalled object from Batch reader. MaxItems: Maximum no of BatchItems can be packed for a released Batch. BatchNo: Every []BatchItems that gets released marked with BatchNo [integer]. MaxWait: If a batch processing takes too long, then MaxWait has the timeout that expires after an interval. ConsumerFunc: It's the callback function that gets invoke by the Consumer Quit: It's the exit channel for the Producer to end the processing Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.
func NewBatchProducer ¶
func NewBatchProducer(callBackFn ConsumerFunc, opts ...BatchOptions) *BatchProducer
NewBatchProducer defines the producer line for creating a Batch. There will be a Watcher channel that receives the incoming BatchItem from the source. The ConsumerFunc works as a callback function to the Consumer line to release the newly created set of BatchItems.
Each Batch is registered with a BatchNo that gets created when the Batch itemCounter++ increases to the MaxItems value.
func (*BatchProducer) CheckRemainingItems ¶
func (p *BatchProducer) CheckRemainingItems(done chan bool)
CheckRemainingItems is a force re-check function on remaining batch items that are available for processing.
func (*BatchProducer) WatchProducer ¶
func (p *BatchProducer) WatchProducer()
WatchProducer has the Watcher channel that receives the BatchItem object from the Batch read item channel. Watcher marks each BatchItem with a BatchNo and adds it to the []BatchItems array. After the batch itemCounter++ increases to the MaxItems [DefaultMaxItems: 100], the Batch gets releases to the Consumer callback function.
If the Batch processing get to halt in the Watcher channel then the MaxWait [DefaultMaxWait: 30 sec] timer channel gets called to check the state to releases the Batch to the Consumer callback function.
type BatchSupply ¶
type BatchSupply struct { BatchSupplyCh chan chan []BatchItems ClientSupplyCh chan []BatchItems }
BatchSupply structure defines the supply line for the final delivery of []BatchItems to the client
BatchSupplyCh: It's the bidirectional channel that request for the []BatchItems to the Workerline and gets in the response. ClientSupplyCh: It's delivery channel that works as a Supply line to sends the []BatchItems and the client receives by listening to the channel.
func NewBatchSupply ¶
func NewBatchSupply() *BatchSupply
NewBatchSupply will create the BatchSupply object that has two sets of supply channels. The BatchSupplyCh will work as a bidirectional channel to request for a []BatchItems from the Workerline and gets the batch items from the response channel. The ClientSupplyCh will send received the []BatchItems from the BatchSupplyCh to the client.
type ConsumerFunc ¶
type ConsumerFunc func(items []BatchItems)
ConsumerFunc is the callback function that invoke from Consumer
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}