messenger

package
v0.0.0-...-205da4a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Nats  = "nats"
	Kafka = "kafka"
	GRPC  = "grpc"
)
View Source
const (
	Online  = apis.Online
	Offline = apis.Offline
)
View Source
const (
	DefaultStream       = "Chameleon"
	EnforcePolicyPrefix = "policy.enforce"
	HeartbeatPrefix     = "fort_heartbeat"
	GenericPrefix       = "fort_common"
)

Variables

This section is empty.

Functions

func Close

func Close() error

Close closes active messenger client

func CreateDefault

func CreateDefault(options ...Option) error

CreateDefault creates default messenger, which MUST invoke before any others calls bind with singleton object

func Messages

func Messages(ctx context.Context, options ...FetchOption) (<-chan *apis.Message, <-chan error)

Messages fetch messages from remote service(nats/kafka/grpc), and returns two channels of messages and errors, callers can read messages and errors from these channels.

options can be specified by options argument, and configure message fetch mode, topic, etc.

func Name

func Name() string

Name returns active messenger name, one of (nats, kafka, grpc)

func Write

func Write(ctx context.Context, msg *apis.Message, options ...WriteOption) error

Write writes a message to remote service(nats/kafka/grpc)

Types

type AuthConfig

type AuthConfig struct {
	Username string `json:"username"`
	Password string `json:"password"`
}

type Config

type Config struct {
	Kind       string `json:"kind"`
	Endpoint   string `json:"endpoint"`
	User       string `json:"user"`
	Credential string `json:"credential"`
	Insecure   bool   `json:"insecure"`
}

type FetchMode

type FetchMode uint8
const (
	Push FetchMode = iota
	Pull
)

type FetchOption

type FetchOption interface {
	// contains filtered or unexported methods
}

func Batch

func Batch(batch int) FetchOption

Batch configures maximum messages to fetch of each batch

func BindStream

func BindStream(stream string) FetchOption

BindStream binds current consumer to a specified stream

func From

func From(topic string) FetchOption

From configures from which topic current consumer receive message

func Interval

func Interval(interval time.Duration) FetchOption

Interval configures interval of fetch operation

func Mode

func Mode(mode FetchMode) FetchOption

Mode configures consumer fetch mode, Pull or Push

type Messenger

type Messenger interface {
	// Name returns active messenger name, one of (nats, kafka, grpc)
	Name() string

	// Messages fetch messages from remote service(nats/kafka/grpc), and returns two channels of messages
	// and errors, callers can read messages and errors from these channels.
	//
	// options can be specified by options argument, and configure message fetch mode, topic, etc.
	Messages(ctx context.Context, options ...FetchOption) (<-chan *apis.Message, <-chan error)

	// Write writes a message to remote service(nats/kafka/grpc)
	Write(ctx context.Context, msg *apis.Message, options ...WriteOption) error

	// Close closes active messenger client
	Close() error
}

func New

func New(ctx context.Context, options ...Option) (Messenger, error)

New creates a new messenger, supported services are nats/kafka/grpc

type Option

type Option interface {
	// contains filtered or unexported methods
}

func Authorization

func Authorization(auth AuthConfig) Option

Authorization configures messenger connection authorization infos

func Endpoint

func Endpoint(endpoint string) Option

Endpoint configures messenger service host address

func Insecure

func Insecure() Option

Insecure configures connect messenger service without client tls verify

func Kind

func Kind(kind string) Option

Kind configures messenger kind to use, such as Nats, Kafka and GRPC

func PluginEndpoint

func PluginEndpoint(endpoint string) Option

PluginEndpoint not used

func StateChange

func StateChange(stateChange func(State)) Option

StateChange configures callback for messenger connection Online or Offline state

func Stream

func Stream(streams ...*jetstream.StreamConfig) Option

Stream configures nats StreamConfig, which used to auto create or update nats stream

type Publisher

type Publisher interface {
	Publish(ctx context.Context, msg *apis.Message) error
}

type State

type State = apis.MessageType

type WriteOption

type WriteOption interface {
	// contains filtered or unexported methods
}

func JetStream

func JetStream(enable bool) WriteOption

func RawData

func RawData() WriteOption

RawData configures messenger only write apis.Message Data to remote service, which is used to write customized data without original apis.Message headers

func To

func To(topic string) WriteOption

To configures write topic for current message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL