Documentation
¶
Index ¶
- func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer)
- type FakeKafkaConsumer
- func (f *FakeKafkaConsumer) Close() error
- func (f *FakeKafkaConsumer) CommitMessage(msg *kafka.Message) error
- func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafka.Message, error)
- func (f *FakeKafkaConsumer) Seek(offset int64, _ time.Duration) error
- func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error
- type FakeKafkaProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func KafkaPipe ¶
func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer)
Types ¶
type FakeKafkaConsumer ¶
type FakeKafkaConsumer struct {
// contains filtered or unexported fields
}
func NewFakeKafkaConsumer ¶
func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafka.Message) *FakeKafkaConsumer
func (*FakeKafkaConsumer) Close ¶
func (f *FakeKafkaConsumer) Close() error
func (*FakeKafkaConsumer) CommitMessage ¶
func (f *FakeKafkaConsumer) CommitMessage(msg *kafka.Message) error
func (*FakeKafkaConsumer) FetchMessage ¶
func (*FakeKafkaConsumer) Seek ¶
func (f *FakeKafkaConsumer) Seek(offset int64, _ time.Duration) error
func (*FakeKafkaConsumer) SetInitialOffset ¶
func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error
type FakeKafkaProducer ¶
type FakeKafkaProducer struct {
// contains filtered or unexported fields
}
func NewFakeKafkaProducer ¶
func NewFakeKafkaProducer(distris ...chan<- *kafka.Message) *FakeKafkaProducer
func (*FakeKafkaProducer) AddDistri ¶
func (f *FakeKafkaProducer) AddDistri(d chan<- *kafka.Message)
func (*FakeKafkaProducer) Close ¶
func (f *FakeKafkaProducer) Close() error
func (*FakeKafkaProducer) WaitForKey ¶
func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool)
Click to show internal directories.
Click to hide internal directories.