Documentation
¶
Index ¶
- Constants
- func BeholderNoopLoggerProvider() *sdklog.LoggerProvider
- func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string
- func DefaultWriterClientConfig() writerClientConfig
- func ExtractAttributes(attrKVs ...any) map[string]any
- func ExtractSourceAndType(attrKVs ...any) (string, string, error)
- func GetLogger() otellog.Logger
- func GetMeter() otelmetric.Meter
- func GetTracer() oteltrace.Tracer
- func NewAuthHeaders(ed25519Signer crypto.Signer) (map[string]string, error)
- func NewMetadataValidator() (*validator.Validate, error)
- func NewStaticAuthHeaderProvider(headers map[string]string) chipingress.HeaderProvider
- func OtelAttr(key string, value any) otellog.KeyValue
- func SetClient(client *Client)
- func SetGlobalOtelProviders()
- func ToSchemaFullName(m proto.Message) string
- type Attributes
- type BeholderClient
- type ChipIngressClient
- type ChipIngressEmitter
- type Client
- func GetClient() *Client
- func NewClient(cfg Config) (*Client, error)
- func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, error)
- func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, error)
- func NewNoopClient() *Client
- func NewStdoutClient() (*Client, error)
- func NewWriterClient(w io.Writer) (*Client, error)
- type Config
- type DualSourceEmitter
- type Emitter
- type Message
- type Metadata
- type MetricInfo
- func (m MetricInfo) NewFloat64Gauge(meter metric.Meter) (metric.Float64Gauge, error)
- func (m MetricInfo) NewFloat64Histogram(meter metric.Meter) (metric.Float64Histogram, error)
- func (m MetricInfo) NewInt64Counter(meter metric.Meter) (metric.Int64Counter, error)
- func (m MetricInfo) NewInt64Gauge(meter metric.Meter) (metric.Int64Gauge, error)
- func (m MetricInfo) NewInt64Histogram(meter metric.Meter) (metric.Int64Histogram, error)
- type OtelAttributes
- type ProtoEmitter
- type ProtoProcessor
- type RetryConfig
Examples ¶
Constants ¶
const ( AttrKeyDataSchema = "beholder_data_schema" AttrKeyEntity = "beholder_entity" AttrKeyDomain = "beholder_domain" AttrKeyDataType = "beholder_data_type" )
const (
// Helper keys to avoid duplicating attributes
CtxKeySkipAppendAttrs = "skip_append_attrs"
)
Variables ¶
This section is empty.
Functions ¶
func BeholderNoopLoggerProvider ¶ added in v0.7.1
func BeholderNoopLoggerProvider() *sdklog.LoggerProvider
BeholderNoopLoggerProvider returns a *sdklog.LoggerProvider (the same type as sdklog.NewLoggerProvider) that drops all logs.
func BuildAuthHeaders ¶ added in v0.4.0
func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string
BuildAuthHeaders creates the auth header value to be included on requests. The current format for the header is:
<version>:<public_key_hex>:<signature_hex>
where the byte value of <public_key_hex> is what's being signed Deprecated: use NewAuthHeaders
func DefaultWriterClientConfig ¶
func DefaultWriterClientConfig() writerClientConfig
func ExtractAttributes ¶ added in v0.7.1
func ExtractSourceAndType ¶ added in v0.7.0
ExtractSourceAndType extracts source domain and entity from the attributes
func GetMeter ¶
func GetMeter() otelmetric.Meter
func NewAuthHeaders ¶ added in v0.6.0
NewAuthHeaders creates the auth header value to be included on requests. The current format for the header is:
<version>:<public_key_hex>:<signature_hex>
where the byte value of <public_key_hex> is what's being signed
func NewMetadataValidator ¶ added in v0.4.0
func NewMetadataValidator() (*validator.Validate, error)
func NewStaticAuthHeaderProvider ¶ added in v0.7.1
func NewStaticAuthHeaderProvider(headers map[string]string) chipingress.HeaderProvider
func SetGlobalOtelProviders ¶
func SetGlobalOtelProviders()
Sets global OTel logger, tracer, meter providers from Client. Makes them accessible from anywhere in the code via global otel getters. Any package that relies on go.opentelemetry.io will be able to pick up configured global providers e.g [otelgrpc](https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc#example-NewServerHandler)
func ToSchemaFullName ¶ added in v0.7.1
toSchemaName returns a protobuf message name (full)
Types ¶
type Attributes ¶
type BeholderClient ¶ added in v0.7.1
type BeholderClient struct {
*Client
ProtoEmitter ProtoEmitter
}
BeholderClient is a Beholder client extension with a custom ProtoEmitter
type ChipIngressClient ¶ added in v0.9.5
type ChipIngressClient interface {
RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error)
}
func NewChipIngressClient ¶ added in v0.9.5
func NewChipIngressClient(client chipingress.Client) (ChipIngressClient, error)
type ChipIngressEmitter ¶ added in v0.7.0
type ChipIngressEmitter struct {
// contains filtered or unexported fields
}
type Client ¶
type Client struct {
Config Config
// Logger
Logger otellog.Logger
// Tracer
Tracer oteltrace.Tracer
// Meter
Meter otelmetric.Meter
// Message Emitter
Emitter Emitter
// Chip
Chip ChipIngressClient
// Providers
LoggerProvider otellog.LoggerProvider
TracerProvider oteltrace.TracerProvider
MeterProvider otelmetric.MeterProvider
MessageLoggerProvider otellog.LoggerProvider
// OnClose
OnClose func() error
}
func GetClient ¶
func GetClient() *Client
Returns the global Beholder Client Its thread-safe and can be used concurrently
func NewClient ¶
NewClient creates a new Client with initialized OpenTelemetry components To handle OpenTelemetry errors use otel.SetErrorHandler(https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler)
Example ¶
package main
import (
"context"
"fmt"
"log"
"go.opentelemetry.io/otel"
"google.golang.org/protobuf/proto"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
)
func main() {
config := beholder.DefaultConfig()
// Initialize beholder otel client which sets up OTel components
client, err := beholder.NewClient(config)
if err != nil {
log.Fatalf("Error creating Beholder client: %v", err)
}
// Handle OTel errors
otel.SetErrorHandler(otelErrPrinter)
// Set global client so it will be accessible from anywhere through beholder functions
beholder.SetClient(client)
// Define a custom protobuf payload to emit
payload := &pb.TestCustomMessage{
BoolVal: true,
IntVal: 42,
FloatVal: 3.14,
StringVal: "Hello, World!",
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
log.Fatalf("Failed to marshal protobuf")
}
// Emit the custom message anywhere from application logic
fmt.Println("Emit custom messages")
for range 10 {
err := beholder.GetEmitter().Emit(context.Background(), payloadBytes,
beholder.AttrKeyDataSchema, "/custom-message/versions/1", // required
beholder.AttrKeyDomain, "ExampleDomain", // required
beholder.AttrKeyEntity, "ExampleEntity", // required
beholder.AttrKeyDataType, "custom_message",
"foo", "bar",
)
if err != nil {
log.Printf("Error emitting message: %v", err)
}
}
}
var otelErrPrinter = otel.ErrorHandlerFunc(func(err error) {
log.Printf("otel error: %v", err)
})
Output: Emit custom messages
func NewGRPCClient ¶ added in v0.6.0
NewGRPCClient creates a GRPC based beholder Client. Use NewClient to create a client from a Config which will pick the best client type from the Config.
func NewHTTPClient ¶ added in v0.6.0
NewHTTPClient creates a HTTP based beholder Client. Use NewClient to create a client from a Config which will pick the best client type from the Config.
func NewNoopClient ¶
func NewNoopClient() *Client
Default client to fallback when is is not initialized properly
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func main() {
fmt.Println("Beholder is not initialized. Fall back to Noop OTel Client")
fmt.Println("Emitting custom message via noop otel client")
err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"),
beholder.AttrKeyDataSchema, "/custom-message/versions/1", // required
beholder.AttrKeyDomain, "ExampleDomain", // required
beholder.AttrKeyEntity, "ExampleEntity", // required
)
if err != nil {
log.Printf("Error emitting message: %v", err)
}
}
Output: Beholder is not initialized. Fall back to Noop OTel Client Emitting custom message via noop otel client
func NewStdoutClient ¶
NewStdoutClient creates a new Client with exporters which send telemetry data to standard output Used for testing and debugging
func NewWriterClient ¶
NewWriterClient creates a new Client with otel exporters which send telemetry data to custom io.Writer
func (Client) ForName ¶ added in v0.4.0
ForName returns a new Client with the same configuration but with a different name. For global package-scoped telemetry, use the package name. For injected component-scoped telemetry, use a fully qualified name that uniquely identifies this instance.
func (Client) ForPackage ¶
Returns a new Client with the same configuration but with a different package name Deprecated: Use ForName
type Config ¶
type Config struct {
InsecureConnection bool
CACertFile string
OtelExporterGRPCEndpoint string
OtelExporterHTTPEndpoint string
// OTel Resource
ResourceAttributes []attribute.KeyValue
// Message Emitter
EmitterExportTimeout time.Duration
EmitterExportInterval time.Duration
EmitterExportMaxBatchSize int
EmitterMaxQueueSize int
EmitterBatchProcessor bool // Enabled by default. Disable only for testing.
// OTel Trace
TraceSampleRatio float64
TraceBatchTimeout time.Duration
TraceSpanExporter trace.SpanExporter // optional additional exporter
TraceRetryConfig *RetryConfig
// OTel Metric
MetricReaderInterval time.Duration
MetricRetryConfig *RetryConfig
MetricViews []metric.View
// Custom Events via Chip Ingress Emitter
ChipIngressEmitterEnabled bool
ChipIngressEmitterGRPCEndpoint string
ChipIngressInsecureConnection bool // Disables TLS for Chip Ingress Emitter
// OTel Log
LogExportTimeout time.Duration
LogExportInterval time.Duration
LogExportMaxBatchSize int
LogMaxQueueSize int
LogBatchProcessor bool // Enabled by default. Disable only for testing.
// Retry config for shared log exporter, used by Emitter and Logger
LogRetryConfig *RetryConfig
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter
// Auth
AuthPublicKeyHex string
AuthHeaders map[string]string
}
Example ¶
package main
import (
"fmt"
"time"
otelattr "go.opentelemetry.io/otel/attribute"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
const (
packageName = "beholder"
)
func main() {
config := beholder.Config{
InsecureConnection: true,
CACertFile: "",
OtelExporterGRPCEndpoint: "localhost:4317",
OtelExporterHTTPEndpoint: "localhost:4318",
// Resource
ResourceAttributes: []otelattr.KeyValue{
otelattr.String("package_name", packageName),
otelattr.String("sender", "beholderclient"),
},
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterExportMaxBatchSize: 512,
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: nil,
// Trace
TraceSampleRatio: 1,
TraceBatchTimeout: 1 * time.Second,
// OTel trace exporter retry config
TraceRetryConfig: nil,
// Metric
MetricReaderInterval: 1 * time.Second,
// OTel metric exporter retry config
MetricRetryConfig: nil,
// Log
LogExportTimeout: 1 * time.Second,
LogExportMaxBatchSize: 512,
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
LogStreamingEnabled: false, // Disable streaming logs by default
}
fmt.Printf("%+v\n", config)
config.LogRetryConfig = &beholder.RetryConfig{
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 1 * time.Minute, // Set to zero to disable retry
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
}
Output: {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[]} {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
func DefaultConfig ¶
func DefaultConfig() Config
func TestDefaultConfig ¶
func TestDefaultConfig() Config
func TestDefaultConfigHTTPClient ¶ added in v0.4.0
func TestDefaultConfigHTTPClient() Config
type DualSourceEmitter ¶ added in v0.7.0
type DualSourceEmitter struct {
// contains filtered or unexported fields
}
dualSourceEmitter emits both to chip ingress and to the otel collector this is to help transition from sending custom messages via OTLP to instead use chip-ingress we want to send to both during the transition period, then cutover to using chipIngressEmitter only
type Emitter ¶
type Emitter interface {
// Sends message with bytes and attributes to OTel Collector
Emit(ctx context.Context, body []byte, attrKVs ...any) error
}
func GetEmitter ¶
func GetEmitter() Emitter
func NewChipIngressEmitter ¶ added in v0.7.0
func NewChipIngressEmitter(client chipingress.Client) (Emitter, error)
func NewDualSourceEmitter ¶ added in v0.7.0
func NewMessageEmitter ¶ added in v0.7.0
NewMessageEmitter creates a new message emitter that emits messages to the otel collector
type Message ¶
type Message struct {
Attrs Attributes
Body []byte
}
Example ¶
package main
import (
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func main() {
// Create message with body and attributes
m1 := beholder.NewMessage([]byte{1}, beholder.Attributes{"key_string": "value"})
fmt.Println("#1", m1)
// Create attributes
additionalAttributes := beholder.Attributes{
"key_string": "new value",
"key_int32": int32(1),
}
// Add attributes to message
m1.AddAttributes(additionalAttributes)
fmt.Println("#2", m1)
// Create mmpty message struct
m2 := beholder.Message{}
fmt.Println("#3", m2)
// Add attributes to message
m2.AddAttributes(beholder.Attributes{"key_int": 1})
fmt.Println("#4", m2)
// Update attribute key_int
m2.AddAttributes(beholder.Attributes{"key_int": 2})
fmt.Println("#5", m2)
// Set message body
m2.Body = []byte("0123")
fmt.Println("#6", m2)
// Reset attributes
m2.Attrs = beholder.Attributes{}
fmt.Println("#7", m2)
// Reset body
m2.Body = nil
fmt.Println("#8", m2)
// Shalow copy of message
m3 := beholder.NewMessage(m1.Body, m1.Attrs)
fmt.Println("#9", m3)
m1.Body[0] = byte(2) // Wil mutate m3
fmt.Println("#10", m3)
// Deep copy
m4 := m1.Copy()
fmt.Println("#11", m4)
m1.Body[0] = byte(3) // Should not mutate m4
fmt.Println("#12", m4)
// Create message with mixed attributes: kv pairs and maps
m5 := beholder.NewMessage([]byte{1},
// Add attributes from the map
map[string]any{
"key1": "value1",
},
// Add attributes from KV pair
"key2", "value2",
// Add attributes from Attributes map
beholder.Attributes{"key3": "value3"},
// Add attributes from KV pair
"key4", "value4",
// Modify key1
"key1", "value5",
// Modify key2
map[string]any{
"key2": "value6",
},
)
fmt.Println("#13", m5)
// Create message with no attributes
m6 := beholder.NewMessage([]byte{1}, beholder.Attributes{})
// Add attributes using AddAttributes
m6.AddAttributes(
"key1", "value1",
"key2", "value2",
)
fmt.Println("#14", m6)
}
Output: #1 Message{Attrs: map[key_string:value], Body: [1]} #2 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} #3 Message{Attrs: map[], Body: []} #4 Message{Attrs: map[key_int:1], Body: []} #5 Message{Attrs: map[key_int:2], Body: []} #6 Message{Attrs: map[key_int:2], Body: [48 49 50 51]} #7 Message{Attrs: map[], Body: [48 49 50 51]} #8 Message{Attrs: map[], Body: []} #9 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} #10 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #11 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #12 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #13 Message{Attrs: map[key1:value5 key2:value6 key3:value3 key4:value4], Body: [1]} #14 Message{Attrs: map[key1:value1 key2:value2], Body: [1]}
func NewMessage ¶
func (*Message) AddAttributes ¶
func (*Message) OtelRecord ¶
type Metadata ¶
type Metadata struct {
// REQUIRED FIELDS
// Schema Registry URI to fetch schema
BeholderDomain string `validate:"required,domain_entity"`
BeholderEntity string `validate:"required,domain_entity"`
BeholderDataSchema string `validate:"required"`
// OPTIONAL FIELDS
// The version of the CL node.
NodeVersion string
// mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures.
NodeCsaKey string
// Signature from CSA private key.
NodeCsaSignature string
DonID string
// The RDD network name the CL node is operating with.
NetworkName []string
WorkflowID string
WorkflowName string
WorkflowOwnerAddress string
// Hash of the workflow spec.
WorkflowSpecID string
// The unique execution of a workflow.
WorkflowExecutionID string
// The address for the contract.
CapabilityContractAddress string
CapabilityID string
CapabilityVersion string
CapabilityName string
NetworkChainID string
}
Example ¶
package main
import (
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func testMetadata() beholder.Metadata {
return beholder.Metadata{
NodeVersion: "v1.0.0",
NodeCsaKey: "test_key",
NodeCsaSignature: "test_signature",
DonID: "test_don_id",
NetworkName: []string{"test_network"},
WorkflowID: "test_workflow_id",
WorkflowName: "test_workflow_name",
WorkflowOwnerAddress: "test_owner_address",
WorkflowSpecID: "test_spec_id",
WorkflowExecutionID: "test_execution_id",
BeholderDomain: "TestDomain",
BeholderEntity: "TestEntity",
BeholderDataSchema: "/schemas/ids/test_schema",
CapabilityContractAddress: "test_contract_address",
CapabilityID: "test_capability_id",
CapabilityVersion: "test_capability_version",
CapabilityName: "test_capability_name",
NetworkChainID: "test_chain_id",
}
}
func main() {
m := testMetadata()
fmt.Printf("%#v\n", m)
fmt.Println(m.Attributes())
}
Output: beholder.Metadata{BeholderDomain:"TestDomain", BeholderEntity:"TestEntity", BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id"} map[beholder_data_schema:/schemas/ids/test_schema beholder_domain:TestDomain beholder_entity:TestEntity capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id]
func NewMetadata ¶
func NewMetadata(attrs Attributes) *Metadata
func (Metadata) Attributes ¶
func (m Metadata) Attributes() Attributes
func (*Metadata) FromAttributes ¶
func (m *Metadata) FromAttributes(attrs Attributes) *Metadata
Sets metadata fields from attributes
func (*Metadata) Validate ¶
Example ¶
package main
import (
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func main() {
validate, err := beholder.NewMetadataValidator()
if err != nil {
fmt.Println(err)
}
metadata := beholder.Metadata{
BeholderDomain: "TestDomain",
BeholderEntity: "TestEntity",
}
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
}
metadata.BeholderDataSchema = "example.proto"
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
}
metadata.BeholderDataSchema = "/schemas/ids/test_schema"
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
} else {
fmt.Println("Metadata is valid")
}
}
Output: Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag Metadata is valid
type MetricInfo ¶ added in v0.7.1
func (MetricInfo) NewFloat64Gauge ¶ added in v0.7.1
func (m MetricInfo) NewFloat64Gauge(meter metric.Meter) (metric.Float64Gauge, error)
NewFloat64Gauge creates a new Float64Gauge metric
func (MetricInfo) NewFloat64Histogram ¶ added in v0.7.1
func (m MetricInfo) NewFloat64Histogram(meter metric.Meter) (metric.Float64Histogram, error)
NewFloat64Histogram creates a new Float64Histogram metric
func (MetricInfo) NewInt64Counter ¶ added in v0.7.1
func (m MetricInfo) NewInt64Counter(meter metric.Meter) (metric.Int64Counter, error)
NewInt64Counter creates a new Int64Counter metric
func (MetricInfo) NewInt64Gauge ¶ added in v0.7.1
func (m MetricInfo) NewInt64Gauge(meter metric.Meter) (metric.Int64Gauge, error)
NewInt64Gauge creates a new Int64Gauge metric
func (MetricInfo) NewInt64Histogram ¶ added in v0.7.1
func (m MetricInfo) NewInt64Histogram(meter metric.Meter) (metric.Int64Histogram, error)
NewInt64Histogram creates a new Int64Histogram metric
type OtelAttributes ¶ added in v0.7.1
func (OtelAttributes) AsStringAttributes ¶ added in v0.7.1
func (a OtelAttributes) AsStringAttributes() (attributes []attribute.KeyValue)
type ProtoEmitter ¶ added in v0.7.1
type ProtoEmitter interface {
// Sends message with bytes and attributes to OTel Collector
Emit(ctx context.Context, m proto.Message, attrKVs ...any) error
EmitWithLog(ctx context.Context, m proto.Message, attrKVs ...any) error
}
ProtoEmitter is an interface for emitting protobuf messages
func NewProtoEmitter ¶ added in v0.7.1
func NewProtoEmitter(lggr logger.Logger, client *Client, schemaBasePath string) ProtoEmitter
type ProtoProcessor ¶ added in v0.7.1
type ProtoProcessor interface {
Process(ctx context.Context, m proto.Message, attrKVs ...any) error
}
ProtoProcessor is an interface for processing emitted protobuf messages
type RetryConfig ¶ added in v0.4.0
type RetryConfig struct {
// InitialInterval the time to wait after the first failure before
// retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is
// reached the delay between consecutive retries will always be
// `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent
// trying to send a request/batch. Once this value is reached, the data
// is discarded.
// Set to zero to disable retry
MaxElapsedTime time.Duration
}
func (*RetryConfig) Copy ¶ added in v0.4.0
func (c *RetryConfig) Copy() *RetryConfig
func (*RetryConfig) Enabled ¶ added in v0.4.0
func (c *RetryConfig) Enabled() bool
Calculate if retry is enabled
func (*RetryConfig) GetInitialInterval ¶ added in v0.4.0
func (c *RetryConfig) GetInitialInterval() time.Duration
Implement getters for fields to avoid nil pointer dereference in case the config is not set
func (*RetryConfig) GetMaxElapsedTime ¶ added in v0.4.0
func (c *RetryConfig) GetMaxElapsedTime() time.Duration
func (*RetryConfig) GetMaxInterval ¶ added in v0.4.0
func (c *RetryConfig) GetMaxInterval() time.Duration