Documentation
¶
Overview ¶
Package kafka provides a Consumer and a Producer for basic Kafka operations.
It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.
Index ¶
- Constants
- Variables
- func GetPartition(key string, partitions []int32, algorithm PartitionerAlgorithm) int32
- type Config
- type ConfigOpt
- type ConfluentConsumer
- func (cc *ConfluentConsumer) AssignPartitionByID(id int32) error
- func (cc *ConfluentConsumer) AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error
- func (cc *ConfluentConsumer) Close() error
- func (cc *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error
- func (cc *ConfluentConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)
- func (cc *ConfluentConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)
- func (cc *ConfluentConsumer) GetPartitions() ([]int32, error)
- func (cc *ConfluentConsumer) Seek(offset int64) error
- func (cc *ConfluentConsumer) SeekToTime(t time.Time) error
- type ConfluentProducer
- type Consumer
- type ConsumerConfig
- type PartitionerAlgorithm
- type Producer
- type ProducerConfig
Examples ¶
Constants ¶
const ( AuthTypePlain = "plain" AuthTypeSCRAM256 = "scram-sha256" AuthTypeSCRAM512 = "scram-sha512" PartitionerRandom = PartitionerAlgorithm("random") // random distribution PartitionerConsistent = PartitionerAlgorithm("consistent") // CRC32 hash of key (Empty and NULL keys are mapped to single partition) PartitionerConsistentRandom = PartitionerAlgorithm("consistent_random") // CRC32 hash of key (Empty and NULL keys are randomly partitioned) PartitionerMurMur2 = PartitionerAlgorithm("murmur2") // Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition) PartitionerMurMur2Random = PartitionerAlgorithm("murmur2_random") // Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. Default partitioner in the Java Producer.) PartitionerFNV1A = PartitionerAlgorithm("fnv1a") // FNV-1a hash of key (NULL keys are mapped to single partition) PartitionerFNV1ARandom = PartitionerAlgorithm("fnv1a_random") // FNV-1a hash of key (NULL keys are randomly partitioned). PartitionerFilebeat = PartitionerAlgorithm("filebeat") // This is to fix the stupidity that is in the filebeat code. DefaultTimeout = time.Duration(30 * time.Second) // Default timeout to be used if not set in the config )
Supported auth types
const ( // DefaultProducerDeliveryTimeoutMs configures `delivery.timeout.ms`. The timeout for the producer from sending a message until is considered as delivered. // This value should always be greater than DefaultProducerBufferMaxMs. // The default value in librdkafka is `300000`, but we reduced it to `5000`. DefaultProducerDeliveryTimeoutMs = 5000 // DefaultProducerBufferMaxMs configures `queue.buffering.max.ms`. The max amount of ms the buffer will wait before sending it to kafka. // This value should always be lower than DefaultProducerDeliveryTimeoutMs. // The default value in librdkafka is `5`. DefaultProducerBufferMaxMs = 5 // DefaultProducerBufferMaxMessages configures `queue.buffering.max.messages`. The max number of messages in buffer before sending to Kafka. // The default value in librdkafka is `100000`. DefaultProducerBufferMaxMessages = 100000 )
const DefaultLogLevel = logrus.ErrorLevel
DefaultLogLevel is the log level Kafka producers/consumers will use if non set.
Variables ¶
var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.")
ErrSeekTimedOut is the error returned when a consumer timed out during Seek.
Functions ¶
func GetPartition ¶ added in v0.40.1
func GetPartition(key string, partitions []int32, algorithm PartitionerAlgorithm) int32
Types ¶
type Config ¶
type Config struct { Brokers []string `json:"brokers"` Topic string `json:"topic"` Producer ProducerConfig `json:"producer"` Consumer ConsumerConfig `json:"consumer"` AuthType string `json:"auth" split_words:"true"` User string `json:"user"` Password string `json:"password"` CAPEMFile string `json:"ca_pem_file"` LogLevel string `json:"log_level" split_words:"true"` RequestTimeout time.Duration `json:"request_timeout"` }
Config holds all the configuration for this package.
Example (Auth) ¶
_ = Config{ // Append the following to your configuration (Consumer or Producer) AuthType: AuthTypeSCRAM256, User: "my-user", Password: "my-secret-password", CAPEMFile: "/etc/certificate.pem", }
type ConfigOpt ¶
ConfigOpt configures Kafka consumers and producers.
func WithConsumerGroupID ¶
WithConsumerGroupID sets the Consumer consumer group ID.
func WithLogger ¶
func WithLogger(log logrus.FieldLogger) ConfigOpt
WithLogger adds a logger to a Kafka consumer or producer.
func WithPartitionerAlgorithm ¶ added in v0.40.1
func WithPartitionerAlgorithm(algorithm PartitionerAlgorithm) ConfigOpt
WithPartitionerAlgorithm sets the partitioner algorithm
type ConfluentConsumer ¶
type ConfluentConsumer struct {
// contains filtered or unexported fields
}
ConfluentConsumer implements Consumer interface.
func (*ConfluentConsumer) AssignPartitionByID ¶ added in v0.40.1
func (cc *ConfluentConsumer) AssignPartitionByID(id int32) error
AssignPartitionByID sets the partition to consume messages from by the passed partition ID
func (*ConfluentConsumer) AssignPartitionByKey ¶ added in v0.40.1
func (cc *ConfluentConsumer) AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error
AssignPartitionByKey sets the partition to consume messages from by the passed key and algorithm - NOTE we currently only support the murmur2 and fnv1a hashing algorithm in the consumer
func (*ConfluentConsumer) Close ¶
func (cc *ConfluentConsumer) Close() error
Close closes the consumer.
func (*ConfluentConsumer) CommitMessage ¶
func (cc *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error
CommitMessage commits the offset of a given message.
func (*ConfluentConsumer) FetchMessage ¶
FetchMessage fetches one message, if there is any available at the current offset.
func (*ConfluentConsumer) GetMetadata ¶ added in v0.40.1
func (cc *ConfluentConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)
GetMetadata return the confluence consumer metadata
func (*ConfluentConsumer) GetPartitions ¶ added in v0.40.1
func (cc *ConfluentConsumer) GetPartitions() ([]int32, error)
GetPartitions returns the partition ids of the configured topic
func (*ConfluentConsumer) Seek ¶
func (cc *ConfluentConsumer) Seek(offset int64) error
Seek seeks the assigned topic partitions to the given offset.
func (*ConfluentConsumer) SeekToTime ¶ added in v0.40.1
func (cc *ConfluentConsumer) SeekToTime(t time.Time) error
SeekToTime seeks to the specified time.
type ConfluentProducer ¶
type ConfluentProducer struct {
// contains filtered or unexported fields
}
ConfluentProducer implements Producer interface.
func NewProducer ¶
func NewProducer(conf Config, opts ...ConfigOpt) (w *ConfluentProducer, err error)
NewProducer creates a ConfluentProducer based on config.
func (ConfluentProducer) Close ¶
func (w ConfluentProducer) Close() error
Close should be called when no more writes will be performed.
func (*ConfluentProducer) GetMetadata ¶ added in v0.40.1
func (w *ConfluentProducer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)
GetMetadata return the confluence producers metatdata
func (*ConfluentProducer) GetPartions ¶ added in v0.40.1
func (w *ConfluentProducer) GetPartions() ([]int32, error)
GetPartions returns the partition ids of a given topic
type Consumer ¶
type Consumer interface { // AssignPartittionByKey sets the current consumer to read from a partion by a hashed key. AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error // AssignPartitionByID sets the current consumer to read from the specified partition. AssignPartitionByID(id int32) error // FetchMessage fetches one message, if there is any available at the current offset. FetchMessage(ctx context.Context) (*kafkalib.Message, error) // Close closes the consumer. Close() error // CommitMessage commits the offset of a given message. CommitMessage(msg *kafkalib.Message) error // GetMetadata gets the metadata for a consumer. GetMetadata(allTopics bool) (*kafkalib.Metadata, error) // GetPartitions returns the partitions on the consumer. GetPartitions() ([]int32, error) // Seek seeks the assigned topic partitions to the given offset. Seek(offset int64) error // SeekToTime seeks to the specified time. SeekToTime(t time.Time) error }
Consumer reads messages from Kafka.
Example ¶
conf := Config{ Topic: "example-topic", Brokers: []string{"localhost:9092"}, Consumer: ConsumerConfig{ GroupID: "example-group", }, } log := logrus.New() c, err := NewConsumer(log, conf) if err != nil { log.Fatal(err) } defer c.Close() ctx, cancel := context.WithCancel(context.TODO()) defer cancel() // Consider implementing a retry mechanism. for { // 1. Fetch the message. msg, err := c.FetchMessage(ctx) if err != nil { log.WithError(err).Fatal("error fetching message") } log.WithField("msg", msg.String()).Debug("Msg got fetched") // 2. Do whatever you need to do with the msg. // 3. Then commit the message. if err := c.CommitMessage(msg); err != nil { log.WithError(err).Fatal("error commiting message") } }
func NewConsumer ¶
NewConsumer creates a ConfluentConsumer based on config.
- NOTE if the partition is set and the partition key is not set in config we have no way of knowing where to assign the consumer to in the case of a rebalance
func NewDetachedConsumer ¶ added in v0.44.0
NewDetachedConsumer creates a Consumer detached from Consumer Groups for partition assignment and rebalance (see NOTE).
- NOTE Either a partition or partition key is required to be set. A detached consumer will work out of consumer groups for partition assignment and rebalance, however it needs permission on the group coordinator for managing commits, so it needs a consumer group in the broker. In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a policy that gives permission to such consumer group.
type ConsumerConfig ¶
type ConsumerConfig struct { GroupID string `json:"group_id" split_words:"true"` Partition *int32 `json:"partition"` PartitionKey string `json:"partition_key"` PartitionerAlgorithm PartitionerAlgorithm `json:"partition_algorithm"` InitialOffset *int64 `json:"initial_offset"` }
ConsumerConfig holds the specific configuration for a consumer.
func (ConsumerConfig) Apply ¶
func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap)
Apply applies the specific configuration for a consumer.
type PartitionerAlgorithm ¶ added in v0.40.1
type PartitionerAlgorithm string
type Producer ¶
Producer produces messages into Kafka.
Example ¶
conf := Config{ Brokers: []string{"localhost:9092"}, } log := logrus.New() p, err := NewProducer(conf) if err != nil { log.Fatal(err) } ctx, cancel := context.WithCancel(context.TODO()) defer cancel() topic := "example-topic" msg := &kafkalib.Message{ TopicPartition: kafkalib.TopicPartition{Topic: &topic}, Key: []byte("example"), Value: []byte("Hello World!"), Timestamp: time.Now(), } if err := p.Produce(ctx, msg); err != nil { log.WithError(err).Fatal("error producing message") }
type ProducerConfig ¶
type ProducerConfig struct { FlushPeriod time.Duration `json:"flush_period" split_words:"true"` BatchSize int `json:"batch_size" split_words:"true"` DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"` }
ProducerConfig holds the specific configuration for a producer.
func (ProducerConfig) Apply ¶
func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap)
Apply applies the specific configuration for a producer.