Documentation
¶
Index ¶
- Constants
- type Config
- type Endpoint
- type Kafka
- func (x *Kafka) AddRouter(router endpointApi.Router, params ...interface{}) (string, error)
- func (x *Kafka) BeginShutdown(ctx context.Context) error
- func (x *Kafka) Close() error
- func (x *Kafka) Destroy()
- func (x *Kafka) GetShutdownTimeout() time.Duration
- func (x *Kafka) Id() string
- func (x *Kafka) Init(ruleConfig types.Config, configuration types.Configuration) error
- func (x *Kafka) IsShuttingDown() bool
- func (x *Kafka) New() types.Node
- func (x *Kafka) Printf(format string, v ...interface{})
- func (x *Kafka) RemoveRouter(routerId string, params ...interface{}) error
- func (x *Kafka) Start() error
- func (x *Kafka) Type() string
- type RequestMessage
- func (r *RequestMessage) Body() []byte
- func (r *RequestMessage) From() string
- func (r *RequestMessage) GetError() error
- func (r *RequestMessage) GetMsg() *types.RuleMsg
- func (r *RequestMessage) GetParam(key string) string
- func (r *RequestMessage) Headers() textproto.MIMEHeader
- func (r *RequestMessage) SetBody(body []byte)
- func (r *RequestMessage) SetError(err error)
- func (r *RequestMessage) SetMsg(msg *types.RuleMsg)
- func (r *RequestMessage) SetStatusCode(statusCode int)
- type ResponseMessage
- func (r *ResponseMessage) Body() []byte
- func (r *ResponseMessage) From() string
- func (r *ResponseMessage) GetError() error
- func (r *ResponseMessage) GetMsg() *types.RuleMsg
- func (r *ResponseMessage) GetParam(key string) string
- func (r *ResponseMessage) Headers() textproto.MIMEHeader
- func (r *ResponseMessage) SetBody(body []byte)
- func (r *ResponseMessage) SetError(err error)
- func (r *ResponseMessage) SetMsg(msg *types.RuleMsg)
- func (r *ResponseMessage) SetStatusCode(statusCode int)
- type SASLConfig
- type TLSConfig
Constants ¶
View Source
const ( //Topic 消息主题 Topic = "topic" //Key 消息key Key = "key" //Partition 消费分区 Partition = "partition" )
View Source
const ( // KeyResponseTopic 响应主题metadataKey KeyResponseTopic = "responseTopic" // KeyResponseKey 响应key metadataKey KeyResponseKey = "key" // KeyResponsePartition 响应 消费分区metadataKey KeyResponsePartition = "partition" )
View Source
const Type = types.EndpointTypePrefix + "kafka"
Type 组件类型
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // kafka服务器地址列表,多个与逗号隔开 Server string `json:"server"` // GroupId 消费者组Id GroupId string `json:"groupId"` // SASL认证配置 SASL SASLConfig `json:"sasl"` // TLS配置 TLS TLSConfig `json:"tls"` }
type Kafka ¶
type Kafka struct { impl.BaseEndpoint RuleConfig types.Config //Config 配置 Config Config // contains filtered or unexported fields }
Kafka Kafka 接收端端点
func (*Kafka) AddRouter ¶
func (x *Kafka) AddRouter(router endpointApi.Router, params ...interface{}) (string, error)
func (*Kafka) BeginShutdown ¶ added in v0.32.0
BeginShutdown 实现 GracefulShutdown 接口,开始优雅关闭过程
func (*Kafka) GetShutdownTimeout ¶ added in v0.32.0
GetShutdownTimeout 实现 ShutdownTimeout 接口,返回关闭超时时间
func (*Kafka) IsShuttingDown ¶ added in v0.32.0
IsShuttingDown 实现 GracefulShutdown 接口,检查是否正在关闭
func (*Kafka) RemoveRouter ¶
type RequestMessage ¶
type RequestMessage struct {
// contains filtered or unexported fields
}
RequestMessage http请求消息
func (*RequestMessage) Body ¶
func (r *RequestMessage) Body() []byte
func (*RequestMessage) From ¶
func (r *RequestMessage) From() string
func (*RequestMessage) GetError ¶
func (r *RequestMessage) GetError() error
func (*RequestMessage) GetMsg ¶
func (r *RequestMessage) GetMsg() *types.RuleMsg
func (*RequestMessage) GetParam ¶
func (r *RequestMessage) GetParam(key string) string
func (*RequestMessage) Headers ¶
func (r *RequestMessage) Headers() textproto.MIMEHeader
func (*RequestMessage) SetBody ¶
func (r *RequestMessage) SetBody(body []byte)
func (*RequestMessage) SetError ¶
func (r *RequestMessage) SetError(err error)
func (*RequestMessage) SetMsg ¶
func (r *RequestMessage) SetMsg(msg *types.RuleMsg)
func (*RequestMessage) SetStatusCode ¶
func (r *RequestMessage) SetStatusCode(statusCode int)
type ResponseMessage ¶
type ResponseMessage struct {
// contains filtered or unexported fields
}
ResponseMessage http响应消息
func (*ResponseMessage) Body ¶
func (r *ResponseMessage) Body() []byte
func (*ResponseMessage) From ¶
func (r *ResponseMessage) From() string
func (*ResponseMessage) GetError ¶
func (r *ResponseMessage) GetError() error
func (*ResponseMessage) GetMsg ¶
func (r *ResponseMessage) GetMsg() *types.RuleMsg
func (*ResponseMessage) GetParam ¶
func (r *ResponseMessage) GetParam(key string) string
func (*ResponseMessage) Headers ¶
func (r *ResponseMessage) Headers() textproto.MIMEHeader
func (*ResponseMessage) SetBody ¶
func (r *ResponseMessage) SetBody(body []byte)
func (*ResponseMessage) SetError ¶
func (r *ResponseMessage) SetError(err error)
func (*ResponseMessage) SetMsg ¶
func (r *ResponseMessage) SetMsg(msg *types.RuleMsg)
func (*ResponseMessage) SetStatusCode ¶
func (r *ResponseMessage) SetStatusCode(statusCode int)
type SASLConfig ¶ added in v0.32.0
type SASLConfig struct { // Enable 是否启用SASL认证 Enable bool `json:"enable"` // Mechanism 认证机制,支持 PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 Mechanism string `json:"mechanism"` // Username 用户名 Username string `json:"username"` // Password 密码 Password string `json:"password"` }
SASLConfig SASL认证配置
Click to show internal directories.
Click to hide internal directories.