Documentation
¶
Index ¶
- func FakeKafkaConsumerFactory(distri <-chan *kafkalib.Message) kafka.ConsumerFactory
- func KafkaConsumerFactoryFromConsumer(c kafka.Consumer) kafka.ConsumerFactory
- func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer)
- type FakeKafkaConsumer
- func (f *FakeKafkaConsumer) AssignPartitionByID(id int32) error
- func (f *FakeKafkaConsumer) AssignPartitionByKey(key string, algorithm kafka.PartitionerAlgorithm) error
- func (f *FakeKafkaConsumer) Close() error
- func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error
- func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)
- func (f *FakeKafkaConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)
- func (f *FakeKafkaConsumer) GetPartitions() ([]int32, error)
- func (f *FakeKafkaConsumer) Pause(p []kafkalib.TopicPartition) error
- func (f *FakeKafkaConsumer) Resume(p []kafkalib.TopicPartition) error
- func (f *FakeKafkaConsumer) Seek(offset int64) error
- func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error
- func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error
- func (f *FakeKafkaConsumer) StoreOffset(msg *kafkalib.Message) error
- type FakeKafkaProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FakeKafkaConsumerFactory ¶ added in v0.46.0
func FakeKafkaConsumerFactory(distri <-chan *kafkalib.Message) kafka.ConsumerFactory
func KafkaConsumerFactoryFromConsumer ¶ added in v0.46.0
func KafkaConsumerFactoryFromConsumer(c kafka.Consumer) kafka.ConsumerFactory
func KafkaPipe ¶
func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer)
Types ¶
type FakeKafkaConsumer ¶
type FakeKafkaConsumer struct {
// contains filtered or unexported fields
}
func NewFakeKafkaConsumer ¶
func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Message) *FakeKafkaConsumer
func (*FakeKafkaConsumer) AssignPartitionByID ¶ added in v0.45.2
func (f *FakeKafkaConsumer) AssignPartitionByID(id int32) error
func (*FakeKafkaConsumer) AssignPartitionByKey ¶ added in v0.45.2
func (f *FakeKafkaConsumer) AssignPartitionByKey(key string, algorithm kafka.PartitionerAlgorithm) error
func (*FakeKafkaConsumer) Close ¶
func (f *FakeKafkaConsumer) Close() error
func (*FakeKafkaConsumer) CommitMessage ¶
func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error
func (*FakeKafkaConsumer) FetchMessage ¶
func (*FakeKafkaConsumer) GetMetadata ¶ added in v0.45.2
func (f *FakeKafkaConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error)
func (*FakeKafkaConsumer) GetPartitions ¶ added in v0.45.2
func (f *FakeKafkaConsumer) GetPartitions() ([]int32, error)
func (*FakeKafkaConsumer) Pause ¶ added in v0.52.1
func (f *FakeKafkaConsumer) Pause(p []kafkalib.TopicPartition) error
func (*FakeKafkaConsumer) Resume ¶ added in v0.52.1
func (f *FakeKafkaConsumer) Resume(p []kafkalib.TopicPartition) error
func (*FakeKafkaConsumer) Seek ¶
func (f *FakeKafkaConsumer) Seek(offset int64) error
func (*FakeKafkaConsumer) SeekToTime ¶ added in v0.45.2
func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error
func (*FakeKafkaConsumer) SetInitialOffset ¶
func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error
func (*FakeKafkaConsumer) StoreOffset ¶ added in v0.52.2
func (f *FakeKafkaConsumer) StoreOffset(msg *kafkalib.Message) error
type FakeKafkaProducer ¶
type FakeKafkaProducer struct {
// contains filtered or unexported fields
}
func NewFakeKafkaProducer ¶
func NewFakeKafkaProducer(distris ...chan<- *kafkalib.Message) *FakeKafkaProducer
func (*FakeKafkaProducer) AddDistri ¶
func (f *FakeKafkaProducer) AddDistri(d chan<- *kafkalib.Message)
func (*FakeKafkaProducer) Close ¶
func (f *FakeKafkaProducer) Close() error
func (*FakeKafkaProducer) WaitForKey ¶
func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool)
Click to show internal directories.
Click to hide internal directories.