Documentation
¶
Index ¶
- Constants
- func Close() error
- func CreateDefault(options ...Option) error
- func Messages(ctx context.Context, options ...FetchOption) (<-chan *apis.Message, <-chan error)
- func Name() string
- func Write(ctx context.Context, msg *apis.Message, options ...WriteOption) error
- type AuthConfig
- type Config
- type FetchMode
- type FetchOption
- type Messenger
- type Option
- type Publisher
- type State
- type WriteOption
Constants ¶
const ( Nats = "nats" Kafka = "kafka" GRPC = "grpc" )
const ( Online = apis.Online Offline = apis.Offline )
const ( DefaultStream = "Chameleon" EnforcePolicyPrefix = "policy.enforce" HeartbeatPrefix = "fort_heartbeat" GenericPrefix = "fort_common" )
Variables ¶
This section is empty.
Functions ¶
func CreateDefault ¶
CreateDefault creates default messenger, which MUST invoke before any others calls bind with singleton object
Types ¶
type AuthConfig ¶
type FetchOption ¶
type FetchOption interface {
// contains filtered or unexported methods
}
func Batch ¶
func Batch(batch int) FetchOption
Batch configures maximum messages to fetch of each batch
func BindStream ¶
func BindStream(stream string) FetchOption
BindStream binds current consumer to a specified stream
func From ¶
func From(topic string) FetchOption
From configures from which topic current consumer receive message
func Interval ¶
func Interval(interval time.Duration) FetchOption
Interval configures interval of fetch operation
type Messenger ¶
type Messenger interface {
// Name returns active messenger name, one of (nats, kafka, grpc)
Name() string
// Messages fetch messages from remote service(nats/kafka/grpc), and returns two channels of messages
// and errors, callers can read messages and errors from these channels.
//
// options can be specified by options argument, and configure message fetch mode, topic, etc.
Messages(ctx context.Context, options ...FetchOption) (<-chan *apis.Message, <-chan error)
// Write writes a message to remote service(nats/kafka/grpc)
Write(ctx context.Context, msg *apis.Message, options ...WriteOption) error
// Close closes active messenger client
Close() error
}
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func Authorization ¶
func Authorization(auth AuthConfig) Option
Authorization configures messenger connection authorization infos
func Insecure ¶
func Insecure() Option
Insecure configures connect messenger service without client tls verify
func StateChange ¶
StateChange configures callback for messenger connection Online or Offline state
func Stream ¶
func Stream(streams ...*jetstream.StreamConfig) Option
Stream configures nats StreamConfig, which used to auto create or update nats stream
type State ¶
type State = apis.MessageType
type WriteOption ¶
type WriteOption interface {
// contains filtered or unexported methods
}
func JetStream ¶
func JetStream(enable bool) WriteOption
func RawData ¶
func RawData() WriteOption
RawData configures messenger only write apis.Message Data to remote service, which is used to write customized data without original apis.Message headers