kafkatest

package
v0.58.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2021 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FakeKafkaConsumerFactory added in v0.46.0

func FakeKafkaConsumerFactory(distri <-chan *kafkalib.Message) kafka.ConsumerFactory

func KafkaConsumerFactoryFromConsumer added in v0.46.0

func KafkaConsumerFactoryFromConsumer(c kafka.Consumer) kafka.ConsumerFactory

Types

type FakeKafkaConsumer

type FakeKafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewFakeKafkaConsumer

func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Message) *FakeKafkaConsumer

func (*FakeKafkaConsumer) AssignPartitionByID added in v0.45.2

func (f *FakeKafkaConsumer) AssignPartitionByID(id int32) error

func (*FakeKafkaConsumer) AssignPartitionByKey added in v0.45.2

func (f *FakeKafkaConsumer) AssignPartitionByKey(key string, algorithm kafka.PartitionerAlgorithm) error

func (*FakeKafkaConsumer) Close

func (f *FakeKafkaConsumer) Close() error

func (*FakeKafkaConsumer) CommitMessage

func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error

func (*FakeKafkaConsumer) FetchMessage

func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)

func (*FakeKafkaConsumer) GetMetadata added in v0.45.2

func (f *FakeKafkaConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)

func (*FakeKafkaConsumer) GetPartitions added in v0.45.2

func (f *FakeKafkaConsumer) GetPartitions() ([]int32, error)

func (*FakeKafkaConsumer) Pause added in v0.52.1

func (*FakeKafkaConsumer) Resume added in v0.52.1

func (*FakeKafkaConsumer) Seek

func (f *FakeKafkaConsumer) Seek(offset int64) error

func (*FakeKafkaConsumer) SeekToTime added in v0.45.2

func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error

func (*FakeKafkaConsumer) SetInitialOffset

func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error

func (*FakeKafkaConsumer) StoreOffset added in v0.52.2

func (f *FakeKafkaConsumer) StoreOffset(msg *kafkalib.Message) error

type FakeKafkaProducer

type FakeKafkaProducer struct {
	// contains filtered or unexported fields
}

func NewFakeKafkaProducer

func NewFakeKafkaProducer(distris ...chan<- *kafkalib.Message) *FakeKafkaProducer

func (*FakeKafkaProducer) AddDistri

func (f *FakeKafkaProducer) AddDistri(d chan<- *kafkalib.Message)

func (*FakeKafkaProducer) Close

func (f *FakeKafkaProducer) Close() error

func (*FakeKafkaProducer) Produce

func (f *FakeKafkaProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error

func (*FakeKafkaProducer) WaitForKey

func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL