kafka

package
v0.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//Topic 消息主题
	Topic = "topic"
	//Key 消息key
	Key = "key"
	//Partition 消费分区
	Partition = "partition"
)
View Source
const (
	// KeyResponseTopic 响应主题metadataKey
	KeyResponseTopic = "responseTopic"
	// KeyResponseKey 响应key metadataKey
	KeyResponseKey = "key"
	// KeyResponsePartition 响应 消费分区metadataKey
	KeyResponsePartition = "partition"
)
View Source
const Type = types.EndpointTypePrefix + "kafka"

Type 组件类型

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// kafka服务器地址列表,多个与逗号隔开
	Server string `json:"server"`
	// GroupId 消费者组Id
	GroupId string `json:"groupId"`
	// SASL认证配置
	SASL SASLConfig `json:"sasl"`
	// TLS配置
	TLS TLSConfig `json:"tls"`
}

type Endpoint

type Endpoint = Kafka

Endpoint 别名

type Kafka

type Kafka struct {
	impl.BaseEndpoint
	RuleConfig types.Config
	//Config 配置
	Config Config
	// contains filtered or unexported fields
}

Kafka Kafka 接收端端点

func (*Kafka) AddRouter

func (x *Kafka) AddRouter(router endpointApi.Router, params ...interface{}) (string, error)

func (*Kafka) BeginShutdown added in v0.32.0

func (x *Kafka) BeginShutdown(ctx context.Context) error

BeginShutdown 实现 GracefulShutdown 接口,开始优雅关闭过程

func (*Kafka) Close

func (x *Kafka) Close() error

func (*Kafka) Destroy

func (x *Kafka) Destroy()

Destroy 销毁

func (*Kafka) GetShutdownTimeout added in v0.32.0

func (x *Kafka) GetShutdownTimeout() time.Duration

GetShutdownTimeout 实现 ShutdownTimeout 接口,返回关闭超时时间

func (*Kafka) Id

func (x *Kafka) Id() string

func (*Kafka) Init

func (x *Kafka) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*Kafka) IsShuttingDown added in v0.32.0

func (x *Kafka) IsShuttingDown() bool

IsShuttingDown 实现 GracefulShutdown 接口,检查是否正在关闭

func (*Kafka) New

func (x *Kafka) New() types.Node

func (*Kafka) Printf

func (x *Kafka) Printf(format string, v ...interface{})

func (*Kafka) RemoveRouter

func (x *Kafka) RemoveRouter(routerId string, params ...interface{}) error

func (*Kafka) Start

func (x *Kafka) Start() error

func (*Kafka) Type

func (x *Kafka) Type() string

Type 组件类型

type RequestMessage

type RequestMessage struct {
	// contains filtered or unexported fields
}

RequestMessage http请求消息

func (*RequestMessage) Body

func (r *RequestMessage) Body() []byte

func (*RequestMessage) From

func (r *RequestMessage) From() string

func (*RequestMessage) GetError

func (r *RequestMessage) GetError() error

func (*RequestMessage) GetMsg

func (r *RequestMessage) GetMsg() *types.RuleMsg

func (*RequestMessage) GetParam

func (r *RequestMessage) GetParam(key string) string

func (*RequestMessage) Headers

func (r *RequestMessage) Headers() textproto.MIMEHeader

func (*RequestMessage) SetBody

func (r *RequestMessage) SetBody(body []byte)

func (*RequestMessage) SetError

func (r *RequestMessage) SetError(err error)

func (*RequestMessage) SetMsg

func (r *RequestMessage) SetMsg(msg *types.RuleMsg)

func (*RequestMessage) SetStatusCode

func (r *RequestMessage) SetStatusCode(statusCode int)

type ResponseMessage

type ResponseMessage struct {
	// contains filtered or unexported fields
}

ResponseMessage http响应消息

func (*ResponseMessage) Body

func (r *ResponseMessage) Body() []byte

func (*ResponseMessage) From

func (r *ResponseMessage) From() string

func (*ResponseMessage) GetError

func (r *ResponseMessage) GetError() error

func (*ResponseMessage) GetMsg

func (r *ResponseMessage) GetMsg() *types.RuleMsg

func (*ResponseMessage) GetParam

func (r *ResponseMessage) GetParam(key string) string

func (*ResponseMessage) Headers

func (r *ResponseMessage) Headers() textproto.MIMEHeader

func (*ResponseMessage) SetBody

func (r *ResponseMessage) SetBody(body []byte)

func (*ResponseMessage) SetError

func (r *ResponseMessage) SetError(err error)

func (*ResponseMessage) SetMsg

func (r *ResponseMessage) SetMsg(msg *types.RuleMsg)

func (*ResponseMessage) SetStatusCode

func (r *ResponseMessage) SetStatusCode(statusCode int)

type SASLConfig added in v0.32.0

type SASLConfig struct {
	// Enable 是否启用SASL认证
	Enable bool `json:"enable"`
	// Mechanism 认证机制,支持 PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
	Mechanism string `json:"mechanism"`
	// Username 用户名
	Username string `json:"username"`
	// Password 密码
	Password string `json:"password"`
}

SASLConfig SASL认证配置

type TLSConfig added in v0.32.0

type TLSConfig struct {
	// Enable 是否启用TLS
	Enable bool `json:"enable"`
	// InsecureSkipVerify 是否跳过证书验证
	InsecureSkipVerify bool `json:"insecureSkipVerify"`
}

TLSConfig TLS配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL