Documentation
¶
Overview ¶
Package microbatch simplifies asynchronous microbatching.
Example (Asynchronous) ¶
Example (Asynchronous) demonstrates how to use [Batcher.SubmitJob] with a timeout. Note that you can shut down the batcher without waiting for the jobs to finish.
package main import ( "context" "fmt" "sync" "fillmore-labs.com/microbatch" ) type ( JobID int Job struct { ID JobID } JobResult struct { ID JobID Body string } Jobs []*Job JobResults []*JobResult ) func (j *Job) JobID() JobID { return j.ID } func (j *JobResult) JobID() JobID { return j.ID } // unwrap unwraps a JobResult to payload and error. func unwrap(r *JobResult, err error) (string, error) { if err != nil { return "", err } return r.Body, nil } type RemoteProcessor struct{} func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) { results := make(JobResults, 0, len(jobs)) for _, job := range jobs { result := &JobResult{ ID: job.ID, Body: fmt.Sprintf("Processed job %d", job.ID), } results = append(results, result) } return results, nil } func main() { // Initialize processor := &RemoteProcessor{} batcher := microbatch.NewBatcher( processor.ProcessJobs, (*Job).JobID, (*JobResult).JobID, microbatch.WithSize(3), ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() const iterations = 5 var wg sync.WaitGroup for i := 1; i <= iterations; i++ { future := batcher.Submit(&Job{ID: JobID(i)}) wg.Add(1) go func(i int) { defer wg.Done() result, err := unwrap(future.Await(ctx)) if err == nil { fmt.Println(result) } else { fmt.Printf("Error executing job %d: %v\n", i, err) } }(i) } // Shut down batcher.Send() wg.Wait() }
Output: Processed job 1 Processed job 2 Processed job 3 Processed job 4 Processed job 5
Example (Blocking) ¶
Example (Blocking) demonstrates how to use [Batcher.SubmitJob] in a single line.
package main import ( "context" "fmt" "sync" "time" "fillmore-labs.com/microbatch" ) type ( JobID int Job struct { ID JobID } JobResult struct { ID JobID Body string } Jobs []*Job JobResults []*JobResult ) func (j *Job) JobID() JobID { return j.ID } func (j *JobResult) JobID() JobID { return j.ID } // unwrap unwraps a JobResult to payload and error. func unwrap(r *JobResult, err error) (string, error) { if err != nil { return "", err } return r.Body, nil } type RemoteProcessor struct{} func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) { results := make(JobResults, 0, len(jobs)) for _, job := range jobs { result := &JobResult{ ID: job.ID, Body: fmt.Sprintf("Processed job %d", job.ID), } results = append(results, result) } return results, nil } func main() { // Initialize processor := &RemoteProcessor{} batcher := microbatch.NewBatcher( processor.ProcessJobs, (*Job).JobID, (*JobResult).JobID, microbatch.WithSize(3), microbatch.WithTimeout(10*time.Millisecond), ) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() const iterations = 5 var wg sync.WaitGroup // Submit jobs for i := 1; i <= iterations; i++ { wg.Add(1) go func(i int) { defer wg.Done() if result, err := unwrap(batcher.Execute(ctx, &Job{ID: JobID(i)})); err == nil { fmt.Println(result) } }(i) // https://go.dev/doc/faq#closures_and_goroutines } // Shut down wg.Wait() batcher.Send() }
Output: Processed job 1 Processed job 2 Processed job 3 Processed job 4 Processed job 5
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrNoResult is returned when the response from processJobs is missing a // matching correlation ID. ErrNoResult = errors.New("no result") // ErrDuplicateID is returned when a job has an already existing correlation ID. ErrDuplicateID = errors.New("duplicate correlation ID") )
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[Q, R any] struct { // contains filtered or unexported fields }
Batcher handles submitting requests in batches and returning results through channels.
func NewBatcher ¶
func NewBatcher[Q, R any, C comparable, QQ ~[]Q, RR ~[]R]( processJobs func(jobs QQ) (RR, error), correlateRequest func(request Q) C, correlateResult func(result R) C, opts ...Option, ) *Batcher[Q, R]
NewBatcher creates a new Batcher.
- batchProcessor is used to process batches of jobs.
- correlateRequest and correlateResult functions are used to get a common key from a job and result for correlating results back to jobs.
- opts are used to configure the batch size and timeout.
The batch collector is run in a goroutine which must be terminated with [Batcher.Shutdown].
type Option ¶ added in v0.1.0
type Option interface {
// contains filtered or unexported methods
}
Option defines configurations for NewBatcher.
func WithTimeout ¶ added in v0.1.0
WithTimeout is an option to configure the batch timeout.
Click to show internal directories.
Click to hide internal directories.