otelkafka

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

README

otel-kafka

OpenTelemetry instrumentation library for confluent-kafka-go.

Installation

go get -u github.com/ekazakas/otel-kafka

Note: otel-kafka uses Go Modules to manage dependencies.

What is otel-kafka?

otel-kafka is an OpenTelemetry instrumentation library designed for confluent-kafka-go applications. It provides observability by integrating with the OpenTelemetry framework, allowing for the collection of traces and metrics related to Kafka operations. This enables better monitoring, troubleshooting, and performance analysis of Kafka-based systems.

It is designed to work within an application and offers functionalities such as:

  • Instrumenting Kafka producers and consumers
  • Propagating OpenTelemetry context (trace and span IDs) within Kafka message headers
  • Collecting traces related to Kafka operations
  • Collecting metrics related to Kafka operations
  • Providing configuration options for instrumentation behavior

Why otel-kafka?

When building modern applications that rely on Kafka, ensuring proper observability is crucial for understanding system behavior, identifying bottlenecks, and troubleshooting issues. otel-kafka is here to help with that.

otel-kafka provides the following benefits:

  • Seamless OpenTelemetry Integration: Easily instruments your confluent-kafka-go applications to work with the OpenTelemetry ecosystem.
  • Comprehensive Observability: Enables the collection of vital traces and metrics from your Kafka operations, offering deep insights into message flow and performance.
  • Simplified Troubleshooting: By propagating OpenTelemetry context (like trace and span IDs) through Kafka message headers, it makes distributed tracing across your Kafka consumers and producers straightforward.
  • Performance Analysis: The collected telemetry data helps in monitoring the health and performance of your Kafka interactions, allowing for proactive optimization.
  • Configurable Instrumentation: Offers options to customize how Kafka operations are instrumented, giving you control over the data collected.

In essence, otel-kafka simplifies the process of gaining visibility into your Kafka-driven microservices, allowing you to focus on building robust and high-performing applications.

Documentation

Index

Constants

View Source
const (
	MinPort = 0
	MaxPort = 65535
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

Consumer is a wrapper around kafka.Consumer that adds OpenTelemetry tracing and metrics.

func NewConsumer

func NewConsumer(config kafka.ConfigMap, opts ...Option) (*Consumer, error)

NewConsumer returns a new kafka.Consumer instance with OpenTelemetry features, configured with the provided kafka.ConfigMap and otelkafka.Option(s). It returns an error if the Kafka consumer cannot be created or if metrics initialization fails.

func WrapOTELConsumer

func WrapOTELConsumer(consumer *kafka.Consumer, opts ...Option) (*Consumer, error)

WrapOTELConsumer decorates an existing kafka.Consumer instance with OpenTelemetry tracing and metrics capabilities. It returns the wrapped Consumer or an error if metrics initialization fails.

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the underlying confluent-kafka-go Consumer and waits for any active spans to be ended.

func (*Consumer) Poll

func (c *Consumer) Poll(timeoutMs int) (event kafka.Event)

Poll polls the consumer for messages or events using the underlying kafka.Consumer. Messages received will be automatically traced and associated metrics will be recorded.

func (*Consumer) ReadMessage

func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error)

ReadMessage polls the consumer for a single message using the underlying kafka.Consumer. Messages received will be automatically traced and associated metrics will be recorded.

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

MessageCarrier implements the opentelemetry-go propagation.TextMapCarrier interface for confluent-kafka-go messages.

func NewMessageCarrier

func NewMessageCarrier(msg *kafka.Message) *MessageCarrier

NewMessageCarrier creates a new MessageCarrier for the given kafka.Message.

func (*MessageCarrier) Get

func (c *MessageCarrier) Get(key string) string

Get returns the value associated with the passed key.

func (*MessageCarrier) Keys

func (c *MessageCarrier) Keys() []string

Keys returns a slice of all keys in the carrier.

func (*MessageCarrier) Set

func (c *MessageCarrier) Set(key string, value string)

Set sets the key-value pair in the carrier. If a key already exists, its value is updated.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is an interface for setting optional otelConfig properties.

func WithConsumerSpanTimeout

func WithConsumerSpanTimeout(timeout time.Duration) Option

WithConsumerSpanTimeout specifies the time.Duration after which a consumer span should be closed if no new message is polled. A value of 0 or less means no timeout will be applied.

func WithCustomAttributeInjector

func WithCustomAttributeInjector(fn func(msg *kafka.Message) []attribute.KeyValue) Option

WithCustomAttributeInjector provides a custom function to inject additional OpenTelemetry attributes into the consumer span based on the kafka.Message.

func WithMeterProvider

func WithMeterProvider(meterProvider metric.MeterProvider) Option

WithMeterProvider specifies a MeterProvider to use for creating a Meter. If not specified, the global MeterProvider is used.

func WithPropagator

func WithPropagator(propagator propagation.TextMapPropagator) Option

WithPropagator specifies the TextMapPropagator to use for extracting and injecting OpenTelemetry context from/into Kafka message headers. If no propagator is specified, the global TextMapPropagator is used.

func WithTracerProvider

func WithTracerProvider(tracerProvider trace.TracerProvider) Option

WithTracerProvider specifies a TracerProvider to use for creating a Tracer. If not specified, the global TracerProvider is used.

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

Producer is a wrapper around kafka.Producer that adds OpenTelemetry tracing and metrics.

func NewProducer

func NewProducer(config kafka.ConfigMap, opts ...Option) (*Producer, error)

NewProducer returns a new kafka.Producer instance with OpenTelemetry features, configured with the provided kafka.ConfigMap and otelkafka.Option(s). It returns an error if the Kafka producer cannot be created or if metrics initialization fails.

func WrapOTELProducer

func WrapOTELProducer(producer *kafka.Producer, opts ...Option) (*Producer, error)

WrapOTELProducer decorates an existing kafka.Producer instance with OpenTelemetry tracing and metrics capabilities. It returns the wrapped Producer or an error if metrics initialization fails.

func (*Producer) Produce

func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error

Produce calls the underlying kafka.Producer to send a message to Kafka, while also tracing the production operation and recording relevant metrics. The OpenTelemetry context is propagated through Kafka message headers. The `deliveryChan` is used to deliver the production result asynchronously.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL