kafka

package
v0.49.0-ntoml-functions.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2021 License: MIT Imports: 10 Imported by: 0

README

github.com/netlify/netlify-commons/kafka

Package kafka provides a Consumer and a Producer for basic Kafka operations.

It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.

Docs

Please find the generated godoc documentation including some examples in pkg.go.dev.

TODO

  • Support standalone consumers and not only consumers members of a consumer group.
  • Support seeking by timestamp (only offset is supported)
  • Integration tests

Documentation

Overview

Package kafka provides a Consumer and a Producer for basic Kafka operations.

It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.

Index

Examples

Constants

View Source
const (
	AuthTypePlain    = "plain"
	AuthTypeSCRAM256 = "scram-sha256"
	AuthTypeSCRAM512 = "scram-sha512"

	PartitionerRandom           = PartitionerAlgorithm("random")            // random distribution
	PartitionerConsistent       = PartitionerAlgorithm("consistent")        //  CRC32 hash of key (Empty and NULL keys are mapped to single partition)
	PartitionerConsistentRandom = PartitionerAlgorithm("consistent_random") // CRC32 hash of key (Empty and NULL keys are randomly partitioned)
	PartitionerMurMur2          = PartitionerAlgorithm("murmur2")           // Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition)
	PartitionerMurMur2Random    = PartitionerAlgorithm("murmur2_random")    // Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. Default partitioner in the Java Producer.)
	PartitionerFNV1A            = PartitionerAlgorithm("fnv1a")             // FNV-1a hash of key (NULL keys are mapped to single partition)
	PartitionerFNV1ARandom      = PartitionerAlgorithm("fnv1a_random")      // FNV-1a hash of key (NULL keys are randomly partitioned).
	PartitionerFilebeat         = PartitionerAlgorithm("filebeat")          // This is to fix the stupidity that is in the filebeat code.

	DefaultTimeout = time.Duration(30 * time.Second) // Default timeout to be used if not set in the config
)

Supported auth types

View Source
const (
	// DefaultProducerDeliveryTimeoutMs configures `delivery.timeout.ms`. The timeout for the producer from sending a message until is considered as delivered.
	// This value should always be greater than DefaultProducerBufferMaxMs.
	// The default value in librdkafka is `300000`, but we reduced it to `5000`.
	DefaultProducerDeliveryTimeoutMs = 5000

	// DefaultProducerBufferMaxMs configures `queue.buffering.max.ms`. The max amount of ms the buffer will wait before sending it to kafka.
	// This value should always be lower than DefaultProducerDeliveryTimeoutMs.
	// The default value in librdkafka is `5`.
	DefaultProducerBufferMaxMs = 5

	// DefaultProducerBufferMaxMessages configures `queue.buffering.max.messages`. The max number of messages in buffer before sending to Kafka.
	// The default value in librdkafka is `100000`.
	DefaultProducerBufferMaxMessages = 100000
)
View Source
const DefaultLogLevel = logrus.ErrorLevel

DefaultLogLevel is the log level Kafka producers/consumers will use if non set.

Variables

View Source
var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.")

ErrSeekTimedOut is the error returned when a consumer timed out during Seek.

Functions

func GetPartition added in v0.40.1

func GetPartition(key string, partitions []int32, algorithm PartitionerAlgorithm) int32

Types

type Config

type Config struct {
	Brokers        []string       `json:"brokers"`
	Topic          string         `json:"topic"`
	Producer       ProducerConfig `json:"producer"`
	Consumer       ConsumerConfig `json:"consumer"`
	AuthType       string         `json:"auth" split_words:"true"`
	User           string         `json:"user"`
	Password       string         `json:"password"`
	CAPEMFile      string         `json:"ca_pem_file"`
	LogLevel       string         `json:"log_level" split_words:"true"`
	RequestTimeout time.Duration  `json:"request_timeout"`
}

Config holds all the configuration for this package.

Example (Auth)
_ = Config{
	// Append the following to your configuration (Consumer or Producer)
	AuthType:  AuthTypeSCRAM256,
	User:      "my-user",
	Password:  "my-secret-password",
	CAPEMFile: "/etc/certificate.pem",
}

type ConfigOpt

type ConfigOpt func(c *kafkalib.ConfigMap)

ConfigOpt configures Kafka consumers and producers.

func WithConsumerGroupID

func WithConsumerGroupID(groupID string) ConfigOpt

WithConsumerGroupID sets the Consumer consumer group ID.

func WithLogger

func WithLogger(log logrus.FieldLogger) ConfigOpt

WithLogger adds a logger to a Kafka consumer or producer.

func WithPartitionerAlgorithm added in v0.40.1

func WithPartitionerAlgorithm(algorithm PartitionerAlgorithm) ConfigOpt

WithPartitionerAlgorithm sets the partitioner algorithm

type ConfluentConsumer

type ConfluentConsumer struct {
	// contains filtered or unexported fields
}

ConfluentConsumer implements Consumer interface.

func (*ConfluentConsumer) AssignPartitionByID added in v0.40.1

func (cc *ConfluentConsumer) AssignPartitionByID(id int32) error

AssignPartitionByID sets the partition to consume messages from by the passed partition ID

func (*ConfluentConsumer) AssignPartitionByKey added in v0.40.1

func (cc *ConfluentConsumer) AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error

AssignPartitionByKey sets the partition to consume messages from by the passed key and algorithm - NOTE we currently only support the murmur2 and fnv1a hashing algorithm in the consumer

func (*ConfluentConsumer) Close

func (cc *ConfluentConsumer) Close() error

Close closes the consumer.

func (*ConfluentConsumer) CommitMessage

func (cc *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error

CommitMessage commits the offset of a given message.

func (*ConfluentConsumer) FetchMessage

func (cc *ConfluentConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)

FetchMessage fetches one message, if there is any available at the current offset.

func (*ConfluentConsumer) GetMetadata added in v0.40.1

func (cc *ConfluentConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)

GetMetadata return the confluence consumer metadata

func (*ConfluentConsumer) GetPartitions added in v0.40.1

func (cc *ConfluentConsumer) GetPartitions() ([]int32, error)

GetPartitions returns the partition ids of the configured topic

func (*ConfluentConsumer) Seek

func (cc *ConfluentConsumer) Seek(offset int64) error

Seek seeks the assigned topic partitions to the given offset.

func (*ConfluentConsumer) SeekToTime added in v0.40.1

func (cc *ConfluentConsumer) SeekToTime(t time.Time) error

SeekToTime seeks to the specified time.

type ConfluentProducer

type ConfluentProducer struct {
	// contains filtered or unexported fields
}

ConfluentProducer implements Producer interface.

func NewProducer

func NewProducer(conf Config, opts ...ConfigOpt) (w *ConfluentProducer, err error)

NewProducer creates a ConfluentProducer based on config.

func (ConfluentProducer) Close

func (w ConfluentProducer) Close() error

Close should be called when no more writes will be performed.

func (*ConfluentProducer) GetMetadata added in v0.40.1

func (w *ConfluentProducer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)

GetMetadata return the confluence producers metatdata

func (*ConfluentProducer) GetPartions added in v0.40.1

func (w *ConfluentProducer) GetPartions() ([]int32, error)

GetPartions returns the partition ids of a given topic

func (ConfluentProducer) Produce

func (w ConfluentProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error

Produce produces messages into Kafka.

type Consumer

type Consumer interface {
	// AssignPartittionByKey sets the current consumer to read from a partion by a hashed key.
	AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error

	// AssignPartitionByID sets the current consumer to read from the specified partition.
	AssignPartitionByID(id int32) error

	// FetchMessage fetches one message, if there is any available at the current offset.
	FetchMessage(ctx context.Context) (*kafkalib.Message, error)

	// Close closes the consumer.
	Close() error

	// CommitMessage commits the offset of a given message.
	CommitMessage(msg *kafkalib.Message) error

	// GetMetadata gets the metadata for a consumer.
	GetMetadata(allTopics bool) (*kafkalib.Metadata, error)

	// GetPartitions returns the partitions on the consumer.
	GetPartitions() ([]int32, error)

	// Seek seeks the assigned topic partitions to the given offset.
	Seek(offset int64) error

	// SeekToTime seeks to the specified time.
	SeekToTime(t time.Time) error
}

Consumer reads messages from Kafka.

Example
conf := Config{
	Topic:   "example-topic",
	Brokers: []string{"localhost:9092"},
	Consumer: ConsumerConfig{
		GroupID: "example-group",
	},
}

log := logrus.New()
c, err := NewConsumer(log, conf)
if err != nil {
	log.Fatal(err)
}

defer c.Close()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// Consider implementing a retry mechanism.
for {
	// 1. Fetch the message.
	msg, err := c.FetchMessage(ctx)
	if err != nil {
		log.WithError(err).Fatal("error fetching message")
	}

	log.WithField("msg", msg.String()).Debug("Msg got fetched")

	// 2. Do whatever you need to do with the msg.

	// 3. Then commit the message.
	if err := c.CommitMessage(msg); err != nil {
		log.WithError(err).Fatal("error commiting message")
	}
}

func NewConsumer

func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error)

NewConsumer creates a ConfluentConsumer based on config.

  • NOTE if the partition is set and the partition key is not set in config we have no way of knowing where to assign the consumer to in the case of a rebalance

func NewDetachedConsumer added in v0.44.0

func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error)

NewDetachedConsumer creates a Consumer detached from Consumer Groups for partition assignment and rebalance (see NOTE).

  • NOTE Either a partition or partition key is required to be set. A detached consumer will work out of consumer groups for partition assignment and rebalance, however it needs permission on the group coordinator for managing commits, so it needs a consumer group in the broker. In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a policy that gives permission to such consumer group.

type ConsumerConfig

type ConsumerConfig struct {
	GroupID              string               `json:"group_id" split_words:"true"`
	Partition            *int32               `json:"partition"`
	PartitionKey         string               `json:"partition_key"`
	PartitionerAlgorithm PartitionerAlgorithm `json:"partition_algorithm"`
	InitialOffset        *int64               `json:"initial_offset"`
}

ConsumerConfig holds the specific configuration for a consumer.

func (ConsumerConfig) Apply

func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap)

Apply applies the specific configuration for a consumer.

type PartitionerAlgorithm added in v0.40.1

type PartitionerAlgorithm string

type Producer

type Producer interface {
	io.Closer
	Produce(ctx context.Context, msgs ...*kafkalib.Message) error
}

Producer produces messages into Kafka.

Example
conf := Config{
	Brokers: []string{"localhost:9092"},
}

log := logrus.New()
p, err := NewProducer(conf)
if err != nil {
	log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

topic := "example-topic"
msg := &kafkalib.Message{
	TopicPartition: kafkalib.TopicPartition{Topic: &topic},
	Key:            []byte("example"),
	Value:          []byte("Hello World!"),
	Timestamp:      time.Now(),
}
if err := p.Produce(ctx, msg); err != nil {
	log.WithError(err).Fatal("error producing message")
}

type ProducerConfig

type ProducerConfig struct {
	FlushPeriod     time.Duration `json:"flush_period" split_words:"true"`
	BatchSize       int           `json:"batch_size" split_words:"true"`
	DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"`
}

ProducerConfig holds the specific configuration for a producer.

func (ProducerConfig) Apply

func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap)

Apply applies the specific configuration for a producer.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL