mqmg

package module
v0.0.0-...-c091936 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

README

mqmg

介绍

对github.com/confluentinc/confluent-kafka-go/kafka进行生产者和消费者,以及消息封装的库。简化对 kafka 的使用

包特性
  1. 生产者和消费者,在创建时能处理什么消息就确定了,也就是,一对生产者消费者,只能处理同一种消息。

  2. 消费者的默认组,就是自定义消息的名字。也就是说,默认情况下,不同消息,不会在一个go消费组中,不会竞争消费的消息。

  3. 不同的自定义消息,其实可以topic。只不过不是自己的消息会自己丢弃。

  4. 我个人推荐这种一个消息一个消费组的模式。

    1. 如果某种消息量很大,用一个topic,就很正常。
    2. 如果很多个消息,消息量并不大,希望复用topic,虽然消费者会丢弃很多不是自己的消息,但是消息量毕竟不大。可以接受必定使用消息队列大多数是内网。
    3. 但是这样确有一个额外的好处,就是不同消息,之间的commit id不会相互干扰,不会有一种消息,处理不了,后其他消费者也得停下来这种情况。
    4. 再有一个好处,就是,不同消息直接,完全独立,互相不干扰,会提供代码的可读性。
使用说明
  1. go get gitee.com/gudongkun/mqmg
  2. 定义自己的消息
type User struct {
	Name   string
	Age    string
	Gander string
}
  1. 生产者
package main

import (
	"log"

	"gitee.com/gudongkun/mqmg"
)

type User struct {
	Name   string
	Age    string
	Gander string
}

func main() {
	user := User{
		Name:   "test02",
		Age:    "2",
		Gander: "male",
	}

	producer, err := mqmg.NewMqMg[User]([]mqmg.Options{
		{Key: "broker", Val: "192.168.56.2:9092"},
		{Key: "topic", Val: "test"},
	})
	if err != nil {
		log.Println("NewMqMg err:", err)
	}

	err = producer.SendMsg(user)
	if err != nil {
		log.Println("send msg err:", err)
	}

}
  1. 消费者
package main

import (
	"fmt"
	"log"
	"time"

	"gitee.com/gudongkun/mqmg"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

type User struct {
	Name   string
	Age    string
	Gander string
}

func main() {
	consumer, err := mqmg.NewMqMg[User]([]mqmg.Options{
		{Key: "topic", Val: "test"},
	})
	if err != nil {
		log.Println("NewMqMg err:", err)
	}
	go func(c *mqmg.MqMg[User]) {
		c.RunConsumer([]mqmg.Options{
			{Key: "broker", Val: "192.168.56.2:9092"},
			{Key: "topic", Val: "test"},
			{Key: "handler", Val: func(data User, h ...kafka.Header) error {
				fmt.Println("recv msg:", data)
				return nil
			}},
		})
	}(consumer)

	time.Sleep(30 * time.Second)
	consumer.Run = false
	time.Sleep(2 * time.Second)
}



  1. 生产者也可以传入kafka客户端,进行复用
package main

import (
	"log"

	"gitee.com/gudongkun/mqmg"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

type User struct {
	Name   string
	Age    string
	Gander string
}

func main() {
	user := User{
		Name:   "test02",
		Age:    "2",
		Gander: "male",
	}

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "192.168.56.2:9092"})
	if err != nil {
		log.Fatal("NewMqMg NewProducer err:", err)
	}

	producer, err := mqmg.NewMqMg[User]([]mqmg.Options{
		{Key: "producer", Val: p},
		{Key: "topic", Val: "test"},
	})

	if err != nil {
		log.Println("NewMqMg err:", err)
	}

	err = producer.SendMsg(user)
	if err != nil {
		log.Println("send msg err:", err)
	}

}


  1. 可以设置日志打印类
mqmg.SetLogger(logger Logger)
参与贡献
  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(l Logger)

SetLogger 设置自定义的日志记录器

Types

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger 定义日志接口

type MqMg

type MqMg[T any] struct {
	Topic             string
	Broker            string
	NumPartition      int
	ReplicationFactor int
	Producer          *kafka.Producer
	Handler           func(data T, headers ...kafka.Header) error
	GroupName         string
	Run               bool
	MsgTypeName       string
}

消息队列管理类,支持生产者,消费者(自定义处理函数)

func NewMqMg

func NewMqMg[T any](opts []Options) (*MqMg[T], error)

func (*MqMg[T]) RunConsumer

func (m *MqMg[T]) RunConsumer(opts []Options) error

func (*MqMg[T]) SendMsg

func (m *MqMg[T]) SendMsg(msg T, headers ...kafka.Header) error

func (*MqMg[T]) SetOpt

func (m *MqMg[T]) SetOpt(opts []Options) error

type MsgPack

type MsgPack[T any] struct {
	Msg     T
	MsgType string
}

消息打包,支持json,msgpack

func (*MsgPack[T]) SetTypeName

func (p *MsgPack[T]) SetTypeName()

type Options

type Options struct {
	Key string
	Val any
}

配置信息 支持的key :topic, broker, num_partition, replication_factor, producer, consumer, handler, run 这样,SetOpt,中有全部参数设置方法,设置有一定的可以阅读性(ide 不友好) 但是修改设置可以更灵活,比如,在初始化的时候,就可以设置好,也可以在运行时修改,参数也可以不固定。 编程处理也更方便,比如配置文件,循环设置参数,可以只修改配置文件,就能增减配置。 真正的配置还是,在结构体中定义,可读性,和易用性都没有牺牲。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL