Documentation
¶
Overview ¶
Package batching provides a batcher to collect points and emit them as batches.
Example (Batcher) ¶
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching"
)
func main() {
// Create a random number generator
r := rand.New(rand.NewSource(456))
// Instantiate a client using your credentials.
client, err := influxdb3.NewFromEnv()
if err != nil {
log.Fatal(err)
}
// Close the client when finished and raise any errors.
defer client.Close()
// Synchronous use
// Create a Batcher with a size of 5
b := batching.NewBatcher(batching.WithSize(5))
// Simulate delay of a second
t := time.Now().Add(-54 * time.Second)
// Write 54 points synchronously to the batcher
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Paris"},
map[string]any{
"temperature": 15 + r.Float64()*20,
"humidity": 30 + r.Int63n(40),
},
t)
// Add the point to the batcher
b.Add(p)
// Update time
t = t.Add(time.Second)
// If the batcher is ready, write the batch to the client and reset the batcher
if b.Ready() {
err := client.WritePoints(context.Background(), b.Emit())
if err != nil {
log.Fatal(err)
}
}
}
// Write the final batch to the client
err = client.WritePoints(context.Background(), b.Emit())
if err != nil {
panic(err)
}
// Asynchronous use
// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
b = batching.NewBatcher(
batching.WithSize(5),
batching.WithReadyCallback(func() { fmt.Println("ready") }),
batching.WithEmitCallback(func(points []*influxdb3.Point) {
err = client.WritePoints(context.Background(), points)
if err != nil {
log.Fatal(err)
}
}),
)
// Simulate delay of a second
t = time.Now().Add(-54 * time.Second)
// Write 54 points synchronously to the batcher
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Madrid"},
map[string]any{
"temperature": 15 + r.Float64()*20,
"humidity": 30 + r.Int63n(40),
},
t)
// Add the point to the batcher
b.Add(p)
// Update time
t = t.Add(time.Second)
}
// Write the final batch to the client
err = client.WritePoints(context.Background(), b.Emit())
if err != nil {
log.Fatal(err)
}
}
Example (LineProtocol_batcher) ¶
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching"
)
func main() {
// Create a random number generator
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
// initialize data
dataTemplate := "cpu,host=%s load=%.3f,reg=%d %d"
syncHosts := []string{"r2d2", "c3po", "robbie"}
const recordCount = 200
var wErr error
// Instantiate a client using your credentials.
client, err := influxdb3.NewFromEnv()
if err != nil {
log.Fatal(err)
}
defer func(client *influxdb3.Client) {
err = client.Close()
if err != nil {
log.Fatal(err)
}
}(client)
// SYNCHRONOUS USAGE
// create a new Line Protocol Batcher with a batch size of 4096 bytes
slpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size
// Simulate delay of a second
t := time.Now().Add(-recordCount * time.Second)
// create and emit records
for range recordCount {
slpb.Add(fmt.Sprintf(dataTemplate,
syncHosts[rnd.Intn(len(syncHosts))],
rnd.Float64()*150,
rnd.Intn(32),
t))
t = t.Add(time.Second)
if slpb.Ready() {
wErr = client.Write(context.Background(), slpb.Emit())
if wErr != nil {
log.Fatal(wErr)
}
}
}
// write any remaining records in batcher to client
wErr = client.Write(context.Background(), slpb.Emit())
if wErr != nil {
log.Fatal(wErr)
}
// ASYNCHRONOUS USAGE
asyncHosts := []string{"Z80", "C64", "i8088"}
// create a new Line Protocol Batcher with a batch size of 4096 bytes
// ... a callback to handle when ready state reached and
// ... a callback to handle emits of bytes
alpb := batching.NewLPBatcher(batching.WithBufferSize(4096),
batching.WithByteEmitReadyCallback(func() { fmt.Println("ready") }),
batching.WithEmitBytesCallback(func(bytes []byte) {
wErr := client.Write(context.Background(), bytes)
if wErr != nil {
log.Fatal(wErr)
}
}))
// Simulate delay of a second
t = time.Now().Add(-recordCount * time.Second)
// create and add data to the batcher
for range recordCount {
alpb.Add(fmt.Sprintf(dataTemplate,
asyncHosts[rnd.Intn(len(asyncHosts))],
rnd.Float64()*150,
rnd.Intn(32),
t))
// update time
t = t.Add(time.Second)
}
// write any remaining records in batcher to client
wErr = client.Write(context.Background(), alpb.Emit())
if wErr != nil {
log.Fatal(wErr)
}
}
Index ¶
- Constants
- type Batcher
- func (b *Batcher) Add(p ...*influxdb3.Point)
- func (b *Batcher) CurrentLoadSize() int
- func (b *Batcher) Emit() []*influxdb3.Point
- func (b *Batcher) Flush() []*influxdb3.Point
- func (b *Batcher) Ready() bool
- func (b *Batcher) SetCapacity(c int)deprecated
- func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point))
- func (b *Batcher) SetInitialCapacity(c int)
- func (b *Batcher) SetReadyCallback(f func())
- func (b *Batcher) SetSize(s int)
- type ByteEmittable
- type Emittable
- type LPBatcher
- func (lpb *LPBatcher) Add(lines ...string)
- func (lpb *LPBatcher) CurrentLoadSize() int
- func (lpb *LPBatcher) Emit() []byte
- func (lpb *LPBatcher) Flush() []byte
- func (lpb *LPBatcher) Ready() bool
- func (lpb *LPBatcher) SetCapacity(c int)deprecated
- func (lpb *LPBatcher) SetEmitBytesCallback(f func([]byte))
- func (lpb *LPBatcher) SetInitialCapacity(c int)
- func (lpb *LPBatcher) SetReadyCallback(f func())
- func (lpb *LPBatcher) SetSize(s int)
- type LPOption
- type Option
- type PointEmittable
Examples ¶
Constants ¶
const DefaultBatchSize = 1000
DefaultBatchSize is the default number of points emitted
const DefaultBufferCapacity = DefaultInitialBufferCapacity
Deprecated: use DefaultInitialBufferCapacity
const DefaultByteBatchSize = 100000
const DefaultCapacity = DefaultInitialCapacity
Deprecated: use DefaultInitialCapacity
const DefaultInitialBufferCapacity = DefaultByteBatchSize * 2
const DefaultInitialCapacity = 2 * DefaultBatchSize
DefaultInitialCapacity is the default initial capacity of the point buffer
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
Batcher collects points and emits them as batches
func NewBatcher ¶
NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultInitialCapacity.
func (*Batcher) CurrentLoadSize ¶
func (*Batcher) Emit ¶
Emit returns a new batch of points with the provided batch size or with the remaining points. Please drain the points at the end of your processing to get the remaining points not filling up a batch.
func (*Batcher) Flush ¶
Flush drains all points even if the internal buffer is currently larger than size. It does not call the callbackEmit method
func (*Batcher) SetCapacity
deprecated
func (*Batcher) SetEmitCallback ¶
SetEmitCallback sets the callbackEmit function.
func (*Batcher) SetInitialCapacity ¶ added in v2.7.0
SetInitialCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer.
func (*Batcher) SetReadyCallback ¶
func (b *Batcher) SetReadyCallback(f func())
SetReadyCallback sets the callbackReady function.
type ByteEmittable ¶
type ByteEmittable interface {
Emittable
SetEmitBytesCallback(ebcb func([]byte)) // callback for emitting bytes
}
ByteEmittable provides the basis for a type Emitting line protocol data as a byte array (i.e. []byte).
type Emittable ¶
type Emittable interface {
SetSize(s int) // setsize
SetInitialCapacity(c int) // set capacity
SetReadyCallback(rcb func()) // ready Callback
SetCapacity(c int) // Deprecated: use SetInitialCapacity instead
}
Emittable provides the base for any type that will collect and then emit data upon reaching a ready state.
type LPBatcher ¶
LPBatcher collects line protocol strings storing them to a byte buffer and then emitting them as []byte.
Lines are added to the LPBatcher using the `Add()` method. Lines in the internal buffer are delimited by a '\n' byte, which is added automatically, if not already used to terminate a line.
As lines are added to LPBatcher a check is made to determine whether the `size` property has been exceeded. At that point the function `callbackByteEmit()` is automatically called using the internal `emitBytes()` method.
In the most common use case, a response batch packet of lines is emitted up to but not exceeding the `size` property. When the first line in the buffer exceeds this property, only that line is emitted.
func NewLPBatcher ¶
NewLPBatcher creates and initializes a new LPBatcher instance applying the supplied options. By default a batch size is DefaultByteBatchSize and the initial capacity is the DefaultBufferCapacity.
func (*LPBatcher) Add ¶
Add lines to the buffer and call appropriate callbacks when the ready state is reached.
func (*LPBatcher) CurrentLoadSize ¶
CurrentLoadSize returns the current size of the internal buffer
func (*LPBatcher) Emit ¶
Emit returns a new batch of bytes with upto to the provided batch size depending on when the last newline character in the potential batch is met, or with all the remaining bytes. Please drain the bytes at the end of your processing to get the remaining bytes not filling up a batch.
func (*LPBatcher) SetCapacity
deprecated
func (*LPBatcher) SetEmitBytesCallback ¶
SetEmitBytesCallback sets the callbackByteEmit function
func (*LPBatcher) SetInitialCapacity ¶ added in v2.7.0
SetInitialCapacity sets the initial capacity of the internal buffer
func (*LPBatcher) SetReadyCallback ¶
func (lpb *LPBatcher) SetReadyCallback(f func())
SetReadyCallback sets the ReadyCallback function
type LPOption ¶
type LPOption func(ByteEmittable)
func WithBufferCapacity
deprecated
func WithBufferSize ¶
WithBufferSize changes the batch-size emitted by the LPbatcher The unit is byte
func WithByteEmitReadyCallback ¶
func WithByteEmitReadyCallback(f func()) LPOption
WithByteEmitReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithEmitBytesCallback ¶
WithEmitBytesCallback sets the function called when a new batch is ready with the batch bytes. The batcher will wait for the callback to finish, so please return as quickly as possible and move any long-running processing to a go routine.
func WithInitialBufferCapacity ¶ added in v2.7.0
WithInitialBufferCapacity changes the initial capacity of the internal buffer The unit is byte
type Option ¶
type Option func(PointEmittable)
func WithCapacity
deprecated
func WithEmitCallback ¶
WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithInitialCapacity ¶ added in v2.7.0
WithInitialCapacity changes the initial capacity of the internal buffer
func WithReadyCallback ¶
func WithReadyCallback(f func()) Option
WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
type PointEmittable ¶
type PointEmittable interface {
Emittable
SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points
}
PointEmittable provides the basis for any type emitting Point arrays as []*influxdb3.Point