mqmg
介绍
对github.com/confluentinc/confluent-kafka-go/kafka进行生产者和消费者,以及消息封装的库。简化对 kafka 的使用
包特性
-
生产者和消费者,在创建时能处理什么消息就确定了,也就是,一对生产者消费者,只能处理同一种消息。
-
消费者的默认组,就是自定义消息的名字。也就是说,默认情况下,不同消息,不会在一个go消费组中,不会竞争消费的消息。
-
不同的自定义消息,其实可以topic。只不过不是自己的消息会自己丢弃。
-
我个人推荐这种一个消息一个消费组的模式。
- 如果某种消息量很大,用一个topic,就很正常。
- 如果很多个消息,消息量并不大,希望复用topic,虽然消费者会丢弃很多不是自己的消息,但是消息量毕竟不大。可以接受必定使用消息队列大多数是内网。
- 但是这样确有一个额外的好处,就是不同消息,之间的commit id不会相互干扰,不会有一种消息,处理不了,后其他消费者也得停下来这种情况。
- 再有一个好处,就是,不同消息直接,完全独立,互相不干扰,会提供代码的可读性。
使用说明
- go get gitee.com/gudongkun/mqmg
- 定义自己的消息
type User struct {
Name string
Age string
Gander string
}
- 生产者
package main
import (
"log"
"gitee.com/gudongkun/mqmg"
)
type User struct {
Name string
Age string
Gander string
}
func main() {
user := User{
Name: "test02",
Age: "2",
Gander: "male",
}
producer, err := mqmg.NewMqMg[User]([]mqmg.Options{
{Key: "broker", Val: "192.168.56.2:9092"},
{Key: "topic", Val: "test"},
})
if err != nil {
log.Println("NewMqMg err:", err)
}
err = producer.SendMsg(user)
if err != nil {
log.Println("send msg err:", err)
}
}
- 消费者
package main
import (
"fmt"
"log"
"time"
"gitee.com/gudongkun/mqmg"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type User struct {
Name string
Age string
Gander string
}
func main() {
consumer, err := mqmg.NewMqMg[User]([]mqmg.Options{
{Key: "topic", Val: "test"},
})
if err != nil {
log.Println("NewMqMg err:", err)
}
go func(c *mqmg.MqMg[User]) {
c.RunConsumer([]mqmg.Options{
{Key: "broker", Val: "192.168.56.2:9092"},
{Key: "topic", Val: "test"},
{Key: "handler", Val: func(data User, h ...kafka.Header) error {
fmt.Println("recv msg:", data)
return nil
}},
})
}(consumer)
time.Sleep(30 * time.Second)
consumer.Run = false
time.Sleep(2 * time.Second)
}
- 生产者也可以传入kafka客户端,进行复用
package main
import (
"log"
"gitee.com/gudongkun/mqmg"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type User struct {
Name string
Age string
Gander string
}
func main() {
user := User{
Name: "test02",
Age: "2",
Gander: "male",
}
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "192.168.56.2:9092"})
if err != nil {
log.Fatal("NewMqMg NewProducer err:", err)
}
producer, err := mqmg.NewMqMg[User]([]mqmg.Options{
{Key: "producer", Val: p},
{Key: "topic", Val: "test"},
})
if err != nil {
log.Println("NewMqMg err:", err)
}
err = producer.SendMsg(user)
if err != nil {
log.Println("send msg err:", err)
}
}
- 可以设置日志打印类
mqmg.SetLogger(logger Logger)
参与贡献
- Fork 本仓库
- 新建 Feat_xxx 分支
- 提交代码