Documentation
¶
Index ¶
- Constants
- type Consumer
- type MessageCarrier
- type Option
- func WithConsumerSpanTimeout(timeout time.Duration) Option
- func WithCustomAttributeInjector(fn func(msg *kafka.Message) []attribute.KeyValue) Option
- func WithMeterProvider(meterProvider metric.MeterProvider) Option
- func WithPropagator(propagator propagation.TextMapPropagator) Option
- func WithTracerProvider(tracerProvider trace.TracerProvider) Option
- type Producer
Constants ¶
const ( MinPort = 0 MaxPort = 65535 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
Consumer is a wrapper around kafka.Consumer that adds OpenTelemetry tracing and metrics.
func NewConsumer ¶
NewConsumer returns a new kafka.Consumer instance with OpenTelemetry features, configured with the provided kafka.ConfigMap and otelkafka.Option(s). It returns an error if the Kafka consumer cannot be created or if metrics initialization fails.
func WrapOTELConsumer ¶
WrapOTELConsumer decorates an existing kafka.Consumer instance with OpenTelemetry tracing and metrics capabilities. It returns the wrapped Consumer or an error if metrics initialization fails.
func (*Consumer) Close ¶
Close closes the underlying confluent-kafka-go Consumer and waits for any active spans to be ended.
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
MessageCarrier implements the opentelemetry-go propagation.TextMapCarrier interface for confluent-kafka-go messages.
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafka.Message) *MessageCarrier
NewMessageCarrier creates a new MessageCarrier for the given kafka.Message.
func (*MessageCarrier) Get ¶
func (c *MessageCarrier) Get(key string) string
Get returns the value associated with the passed key.
func (*MessageCarrier) Keys ¶
func (c *MessageCarrier) Keys() []string
Keys returns a slice of all keys in the carrier.
func (*MessageCarrier) Set ¶
func (c *MessageCarrier) Set(key string, value string)
Set sets the key-value pair in the carrier. If a key already exists, its value is updated.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is an interface for setting optional otelConfig properties.
func WithConsumerSpanTimeout ¶
WithConsumerSpanTimeout specifies the time.Duration after which a consumer span should be closed if no new message is polled. A value of 0 or less means no timeout will be applied.
func WithCustomAttributeInjector ¶
WithCustomAttributeInjector provides a custom function to inject additional OpenTelemetry attributes into the consumer span based on the kafka.Message.
func WithMeterProvider ¶
func WithMeterProvider(meterProvider metric.MeterProvider) Option
WithMeterProvider specifies a MeterProvider to use for creating a Meter. If not specified, the global MeterProvider is used.
func WithPropagator ¶
func WithPropagator(propagator propagation.TextMapPropagator) Option
WithPropagator specifies the TextMapPropagator to use for extracting and injecting OpenTelemetry context from/into Kafka message headers. If no propagator is specified, the global TextMapPropagator is used.
func WithTracerProvider ¶
func WithTracerProvider(tracerProvider trace.TracerProvider) Option
WithTracerProvider specifies a TracerProvider to use for creating a Tracer. If not specified, the global TracerProvider is used.
type Producer ¶
Producer is a wrapper around kafka.Producer that adds OpenTelemetry tracing and metrics.
func NewProducer ¶
NewProducer returns a new kafka.Producer instance with OpenTelemetry features, configured with the provided kafka.ConfigMap and otelkafka.Option(s). It returns an error if the Kafka producer cannot be created or if metrics initialization fails.
func WrapOTELProducer ¶
WrapOTELProducer decorates an existing kafka.Producer instance with OpenTelemetry tracing and metrics capabilities. It returns the wrapped Producer or an error if metrics initialization fails.
func (*Producer) Produce ¶
Produce calls the underlying kafka.Producer to send a message to Kafka, while also tracing the production operation and recording relevant metrics. The OpenTelemetry context is propagated through Kafka message headers. The `deliveryChan` is used to deliver the production result asynchronously.