Documentation
¶
Index ¶
Constants ¶
View Source
const SASLTypePlaintext = sarama.SASLTypePlaintext
Variables ¶
This section is empty.
Functions ¶
func NewSaramaConfig ¶
func NewSaramaConfig(c *ProducerConfig) *sarama.Config
Types ¶
type Consumer ¶
type Consumer interface {
transport.Server //跟着gin框架一起启动
RegisterHandles(MsgHandleFunc, ...string) //注册处理方法
Subscribe(ctx context.Context) //开始处理消息,手动开启
}
func NewConsumerService ¶
func NewConsumerService(conf *ConsumerConfig) Consumer
type ConsumerConfig ¶
type CustomMessage ¶
type CustomMessage struct {
Header RawMessage `json:"header"`
Payload RawMessage `json:"payload"`
}
func NewMessage ¶
func NewMessage(ms ...map[string]any) *CustomMessage
func (*CustomMessage) NewPubMsg ¶
func (m *CustomMessage) NewPubMsg(key []byte) *PubMsg
type MessageHandleDef ¶
type MessageHandleDef struct {
Topic []string
Handle MsgHandleFunc
Options []Option
}
type MsgHandleFunc ¶
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithAutoCommit ¶
func WithCleanup ¶
func WithCleanup(f func(sarama.ConsumerGroupSession) error) Option
func WithErrHandle ¶
type OptionFunc ¶
type OptionFunc func(*optionConfig)
type Producer ¶
type Producer interface {
Send(topic string, value []byte) error
SendWithKey(topic string, key []byte, value []byte) error
RetrySend(ctx context.Context, topic string, messageBody []byte) error
ListTopicGroupIds(topic string) ([]string, error)
Close() error
}
func NewSyncProducer ¶
func NewSyncProducer(c *ProducerConfig) (p Producer, err error)
type ProducerConfig ¶
type RawMessage ¶
func NewRawMessage ¶
func NewRawMessage() RawMessage
func (*RawMessage) Marshal ¶
func (m *RawMessage) Marshal() []byte
func (*RawMessage) RawMessageInject ¶
func (m *RawMessage) RawMessageInject(ctx context.Context)
Click to show internal directories.
Click to hide internal directories.