Documentation
¶
Index ¶
- func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string
- func DefaultWriterClientConfig() writerClientConfig
- func GetLogger() otellog.Logger
- func GetMeter() otelmetric.Meter
- func GetTracer() oteltrace.Tracer
- func NewMetadataValidator() (*validator.Validate, error)
- func OtelAttr(key string, value any) otellog.KeyValue
- func SetClient(client *Client)
- func SetGlobalOtelProviders()
- type Attributes
- type Client
- type Config
- type Emitter
- type Message
- type Metadata
- type RetryConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildAuthHeaders ¶ added in v0.4.0
func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string
BuildAuthHeaders creates the auth header value to be included on requests. The current format for the header is:
<version>:<public_key_hex>:<signature_hex>
where the byte value of <public_key_hex> is what's being signed
func DefaultWriterClientConfig ¶
func DefaultWriterClientConfig() writerClientConfig
func GetMeter ¶
func GetMeter() otelmetric.Meter
func NewMetadataValidator ¶ added in v0.4.0
func NewMetadataValidator() (*validator.Validate, error)
func SetGlobalOtelProviders ¶
func SetGlobalOtelProviders()
Sets global OTel logger, tracer, meter providers from Client. Makes them accessible from anywhere in the code via global otel getters. Any package that relies on go.opentelemetry.io will be able to pick up configured global providers e.g [otelgrpc](https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc#example-NewServerHandler)
Types ¶
type Attributes ¶
type Client ¶
type Client struct {
Config Config
// Logger
Logger otellog.Logger
// Tracer
Tracer oteltrace.Tracer
// Meter
Meter otelmetric.Meter
// Message Emitter
Emitter Emitter
// Providers
LoggerProvider otellog.LoggerProvider
TracerProvider oteltrace.TracerProvider
MeterProvider otelmetric.MeterProvider
MessageLoggerProvider otellog.LoggerProvider
// OnClose
OnClose func() error
}
func GetClient ¶
func GetClient() *Client
Returns the global Beholder Client Its thread-safe and can be used concurrently
func NewClient ¶
NewClient creates a new Client with initialized OpenTelemetry components To handle OpenTelemetry errors use otel.SetErrorHandler(https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler)
Example ¶
package main
import (
"context"
"fmt"
"log"
"go.opentelemetry.io/otel"
"google.golang.org/protobuf/proto"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
)
func main() {
config := beholder.DefaultConfig()
// Initialize beholder otel client which sets up OTel components
client, err := beholder.NewClient(config)
if err != nil {
log.Fatalf("Error creating Beholder client: %v", err)
}
// Handle OTel errors
otel.SetErrorHandler(otelErrPrinter)
// Set global client so it will be accessible from anywhere through beholder functions
beholder.SetClient(client)
// Define a custom protobuf payload to emit
payload := &pb.TestCustomMessage{
BoolVal: true,
IntVal: 42,
FloatVal: 3.14,
StringVal: "Hello, World!",
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
log.Fatalf("Failed to marshal protobuf")
}
// Emit the custom message anywhere from application logic
fmt.Println("Emit custom messages")
for range 10 {
err := beholder.GetEmitter().Emit(context.Background(), payloadBytes,
"beholder_data_schema", "/custom-message/versions/1", // required
"beholder_domain", "ExampleDomain", // required
"beholder_entity", "ExampleEntity", // required
"beholder_data_type", "custom_message",
"foo", "bar",
)
if err != nil {
log.Printf("Error emitting message: %v", err)
}
}
}
var otelErrPrinter = otel.ErrorHandlerFunc(func(err error) {
log.Printf("otel error: %v", err)
})
Output: Emit custom messages
func NewNoopClient ¶
func NewNoopClient() *Client
Default client to fallback when is is not initialized properly
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func main() {
fmt.Println("Beholder is not initialized. Fall back to Noop OTel Client")
fmt.Println("Emitting custom message via noop otel client")
err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"),
"beholder_data_schema", "/custom-message/versions/1", // required
"beholder_domain", "ExampleDomain", // required
"beholder_entity", "ExampleEntity", // required
)
if err != nil {
log.Printf("Error emitting message: %v", err)
}
}
Output: Beholder is not initialized. Fall back to Noop OTel Client Emitting custom message via noop otel client
func NewStdoutClient ¶
NewStdoutClient creates a new Client with exporters which send telemetry data to standard output Used for testing and debugging
func NewWriterClient ¶
NewWriterClient creates a new Client with otel exporters which send telemetry data to custom io.Writer
func (Client) ForName ¶ added in v0.4.0
ForName returns a new Client with the same configuration but with a different name. For global package-scoped telemetry, use the package name. For injected component-scoped telemetry, use a fully qualified name that uniquely identifies this instance.
func (Client) ForPackage ¶
Returns a new Client with the same configuration but with a different package name Deprecated: Use ForName
type Config ¶
type Config struct {
InsecureConnection bool
CACertFile string
OtelExporterGRPCEndpoint string
OtelExporterHTTPEndpoint string
// OTel Resource
ResourceAttributes []otelattr.KeyValue
// Message Emitter
EmitterExportTimeout time.Duration
// Batch processing is enabled by default
// Disable it only for testing
EmitterBatchProcessor bool
// OTel Trace
TraceSampleRatio float64
TraceBatchTimeout time.Duration
TraceSpanExporter sdktrace.SpanExporter // optional additional exporter
TraceRetryConfig *RetryConfig
// OTel Metric
MetricReaderInterval time.Duration
MetricRetryConfig *RetryConfig
MetricViews []sdkmetric.View
// OTel Log
LogExportTimeout time.Duration
// Batch processing is enabled by default
// Disable it only for testing
LogBatchProcessor bool
// Retry config for shared log exporter, used by Emitter and Logger
LogRetryConfig *RetryConfig
// Auth
AuthPublicKeyHex string
AuthHeaders map[string]string
}
Example ¶
package main
import (
"fmt"
"time"
otelattr "go.opentelemetry.io/otel/attribute"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
const (
packageName = "beholder"
)
func main() {
config := beholder.Config{
InsecureConnection: true,
CACertFile: "",
OtelExporterGRPCEndpoint: "localhost:4317",
OtelExporterHTTPEndpoint: "localhost:4318",
// Resource
ResourceAttributes: []otelattr.KeyValue{
otelattr.String("package_name", packageName),
otelattr.String("sender", "beholderclient"),
},
// Message Emitter
EmitterExportTimeout: 1 * time.Second,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: nil,
// Trace
TraceSampleRatio: 1,
TraceBatchTimeout: 1 * time.Second,
// OTel trace exporter retry config
TraceRetryConfig: nil,
// Metric
MetricReaderInterval: 1 * time.Second,
// OTel metric exporter retry config
MetricRetryConfig: nil,
// Log
LogExportTimeout: 1 * time.Second,
LogBatchProcessor: true,
}
fmt.Printf("%+v\n", config)
config.LogRetryConfig = &beholder.RetryConfig{
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 1 * time.Minute, // Set to zero to disable retry
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
}
Output: {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]} {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
func DefaultConfig ¶
func DefaultConfig() Config
func TestDefaultConfig ¶
func TestDefaultConfig() Config
func TestDefaultConfigHTTPClient ¶ added in v0.4.0
func TestDefaultConfigHTTPClient() Config
type Emitter ¶
type Emitter interface {
// Sends message with bytes and attributes to OTel Collector
Emit(ctx context.Context, body []byte, attrKVs ...any) error
}
func GetEmitter ¶
func GetEmitter() Emitter
type Message ¶
type Message struct {
Attrs Attributes
Body []byte
}
Example ¶
package main
import (
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func main() {
// Create message with body and attributes
m1 := beholder.NewMessage([]byte{1}, beholder.Attributes{"key_string": "value"})
fmt.Println("#1", m1)
// Create attributes
additionalAttributes := beholder.Attributes{
"key_string": "new value",
"key_int32": int32(1),
}
// Add attributes to message
m1.AddAttributes(additionalAttributes)
fmt.Println("#2", m1)
// Create mmpty message struct
m2 := beholder.Message{}
fmt.Println("#3", m2)
// Add attributes to message
m2.AddAttributes(beholder.Attributes{"key_int": 1})
fmt.Println("#4", m2)
// Update attribute key_int
m2.AddAttributes(beholder.Attributes{"key_int": 2})
fmt.Println("#5", m2)
// Set message body
m2.Body = []byte("0123")
fmt.Println("#6", m2)
// Reset attributes
m2.Attrs = beholder.Attributes{}
fmt.Println("#7", m2)
// Reset body
m2.Body = nil
fmt.Println("#8", m2)
// Shalow copy of message
m3 := beholder.NewMessage(m1.Body, m1.Attrs)
fmt.Println("#9", m3)
m1.Body[0] = byte(2) // Wil mutate m3
fmt.Println("#10", m3)
// Deep copy
m4 := m1.Copy()
fmt.Println("#11", m4)
m1.Body[0] = byte(3) // Should not mutate m4
fmt.Println("#12", m4)
// Create message with mixed attributes: kv pairs and maps
m5 := beholder.NewMessage([]byte{1},
// Add attributes from the map
map[string]any{
"key1": "value1",
},
// Add attributes from KV pair
"key2", "value2",
// Add attributes from Attributes map
beholder.Attributes{"key3": "value3"},
// Add attributes from KV pair
"key4", "value4",
// Modify key1
"key1", "value5",
// Modify key2
map[string]any{
"key2": "value6",
},
)
fmt.Println("#13", m5)
// Create message with no attributes
m6 := beholder.NewMessage([]byte{1}, beholder.Attributes{})
// Add attributes using AddAttributes
m6.AddAttributes(
"key1", "value1",
"key2", "value2",
)
fmt.Println("#14", m6)
}
Output: #1 Message{Attrs: map[key_string:value], Body: [1]} #2 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} #3 Message{Attrs: map[], Body: []} #4 Message{Attrs: map[key_int:1], Body: []} #5 Message{Attrs: map[key_int:2], Body: []} #6 Message{Attrs: map[key_int:2], Body: [48 49 50 51]} #7 Message{Attrs: map[], Body: [48 49 50 51]} #8 Message{Attrs: map[], Body: []} #9 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} #10 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #11 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #12 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #13 Message{Attrs: map[key1:value5 key2:value6 key3:value3 key4:value4], Body: [1]} #14 Message{Attrs: map[key1:value1 key2:value2], Body: [1]}
func NewMessage ¶
func (*Message) AddAttributes ¶
func (*Message) OtelRecord ¶
type Metadata ¶
type Metadata struct {
// REQUIRED FIELDS
// Schema Registry URI to fetch schema
BeholderDomain string `validate:"required,domain_entity"`
BeholderEntity string `validate:"required,domain_entity"`
BeholderDataSchema string `validate:"required,uri"`
// OPTIONAL FIELDS
// The version of the CL node.
NodeVersion string
// mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures.
NodeCsaKey string
// Signature from CSA private key.
NodeCsaSignature string
DonID string
// The RDD network name the CL node is operating with.
NetworkName []string
WorkflowID string
WorkflowName string
WorkflowOwnerAddress string
// Hash of the workflow spec.
WorkflowSpecID string
// The unique execution of a workflow.
WorkflowExecutionID string
// The address for the contract.
CapabilityContractAddress string
CapabilityID string
CapabilityVersion string
CapabilityName string
NetworkChainID string
}
Example ¶
package main
import (
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func testMetadata() beholder.Metadata {
return beholder.Metadata{
NodeVersion: "v1.0.0",
NodeCsaKey: "test_key",
NodeCsaSignature: "test_signature",
DonID: "test_don_id",
NetworkName: []string{"test_network"},
WorkflowID: "test_workflow_id",
WorkflowName: "test_workflow_name",
WorkflowOwnerAddress: "test_owner_address",
WorkflowSpecID: "test_spec_id",
WorkflowExecutionID: "test_execution_id",
BeholderDomain: "TestDomain",
BeholderEntity: "TestEntity",
BeholderDataSchema: "/schemas/ids/test_schema",
CapabilityContractAddress: "test_contract_address",
CapabilityID: "test_capability_id",
CapabilityVersion: "test_capability_version",
CapabilityName: "test_capability_name",
NetworkChainID: "test_chain_id",
}
}
func main() {
m := testMetadata()
fmt.Printf("%#v\n", m)
fmt.Println(m.Attributes())
}
Output: beholder.Metadata{BeholderDomain:"TestDomain", BeholderEntity:"TestEntity", BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id"} map[beholder_data_schema:/schemas/ids/test_schema beholder_domain:TestDomain beholder_entity:TestEntity capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id]
func NewMetadata ¶
func NewMetadata(attrs Attributes) *Metadata
func (Metadata) Attributes ¶
func (m Metadata) Attributes() Attributes
func (*Metadata) FromAttributes ¶
func (m *Metadata) FromAttributes(attrs Attributes) *Metadata
Sets metadata fields from attributes
func (*Metadata) Validate ¶
Example ¶
package main
import (
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
func main() {
validate, err := beholder.NewMetadataValidator()
if err != nil {
fmt.Println(err)
}
metadata := beholder.Metadata{
BeholderDomain: "TestDomain",
BeholderEntity: "TestEntity",
}
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
}
metadata.BeholderDataSchema = "example.proto"
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
}
metadata.BeholderDataSchema = "/schemas/ids/test_schema"
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
} else {
fmt.Println("Metadata is valid")
}
}
Output: Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag Metadata is valid
type RetryConfig ¶ added in v0.4.0
type RetryConfig struct {
// InitialInterval the time to wait after the first failure before
// retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is
// reached the delay between consecutive retries will always be
// `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent
// trying to send a request/batch. Once this value is reached, the data
// is discarded.
// Set to zero to disable retry
MaxElapsedTime time.Duration
}
func (*RetryConfig) Copy ¶ added in v0.4.0
func (c *RetryConfig) Copy() *RetryConfig
func (*RetryConfig) Enabled ¶ added in v0.4.0
func (c *RetryConfig) Enabled() bool
Calculate if retry is enabled
func (*RetryConfig) GetInitialInterval ¶ added in v0.4.0
func (c *RetryConfig) GetInitialInterval() time.Duration
Implement getters for fields to avoid nil pointer dereference in case the config is not set
func (*RetryConfig) GetMaxElapsedTime ¶ added in v0.4.0
func (c *RetryConfig) GetMaxElapsedTime() time.Duration
func (*RetryConfig) GetMaxInterval ¶ added in v0.4.0
func (c *RetryConfig) GetMaxInterval() time.Duration