Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultJetStreamStreamerConfig = JetStreamStreamerConfig{ AckWait: 30 * time.Second, }
DefaultJetStreamStreamerConfig are the default settings for the JetStream streamer
var MetadataIndexStream = &nats.StreamConfig{ Name: "MetadataIndexStream", Subjects: []string{ "MetadataIndex.*", }, MaxAge: 24 * time.Hour, Replicas: 5, }
MetadataIndexStream is the stream config for MetadataIndex messages.
var V2CDurableStream = &nats.StreamConfig{ Name: "V2CStream", Subjects: []string{ "v2c.*.*.*", }, MaxAge: 15 * time.Minute, Replicas: 5, }
V2CDurableStream is the stream config for Durable v2c messages.
Functions ¶
func MustConnectJetStream ¶
func MustConnectJetStream(nc *nats.Conn) nats.JetStreamContext
MustConnectJetStream creates a new JetStream connection.
func MustConnectNATS ¶
func MustConnectNATS() *nats.Conn
MustConnectNATS attempts to connect to the NATS message bus.
Types ¶
type JetStreamStreamerConfig ¶
type JetStreamStreamerConfig struct {
// AckWait is the duration to wait before Ack() is considered failed and JetStream knows to resend the value.
AckWait time.Duration
}
JetStreamStreamerConfig contains options that can be set for a JetStream Streamer.
type Msg ¶
type Msg interface {
// Data returns the serialized data stored in the message.
Data() []byte
// Ack acknowledges the message.
Ack() error
}
Msg is the interface for a message sent over the stream
type PersistentSub ¶
type PersistentSub interface {
// Close the subscription, but allow future PersistentSubs to read from the sub starting after
// the last acked message.
Close() error
}
PersistentSub is the interface to an active persistent subscription.
type Streamer ¶
type Streamer interface {
// PersistentSubscribe creates a persistent subscription on a subject, calling the message
// handler callback on each message that arrives on the sub.
//
// Here persistence means that if the subscription closes or dies and later resumes,
// the Subscription will continue from the earliest message that was not acked.
//
// This position in the stream will be tracked according to the (subject, persistentName) pair.
// * If you need a new subscription to see all of the available stream messages, you can receive them
// by invoking PersistentSubscribe() on the same subject but a new persistentName.
// * If you call PersistentSubscribe() with a new subject but an existing persistentName, the implementation
// should treat it as a new persistent subscription and send all data available on the subscription.
//
// Parallel callers of PersistentSubscribe that use the same subject + persistentName pair will be added
// to the same WorkQueue: messages published on that subject will be assigned to one of
// the callers. If the assigned caller does not Ack() a message within an implementation's
// timeout, then the message will be reassigned to another worker.
PersistentSubscribe(subject, persistentName string, cb MsgHandler) (PersistentSub, error)
// Publish publishes the data to the specific subject.
Publish(subject string, data []byte) error
// PeekLatestMessage returns the last message published on a subject. If no messages
// exist for the subject method returns `nil`.
//
// PeekLatestMessage does not care about the state of any Sub. It strictly returns the last message sent from a Publish()
// call.
PeekLatestMessage(subject string) (Msg, error)
}
Streamer is an interface for any streaming handler.
func NewJetStreamStreamer ¶
func NewJetStreamStreamer(nc *nats.Conn, js nats.JetStreamContext, sCfg *nats.StreamConfig) (Streamer, error)
NewJetStreamStreamer creates a new Streamer implemented using JetStream with default configuration.
func NewJetStreamStreamerWithConfig ¶
func NewJetStreamStreamerWithConfig(nc *nats.Conn, js nats.JetStreamContext, sCfg *nats.StreamConfig, cfg JetStreamStreamerConfig) (Streamer, error)
NewJetStreamStreamerWithConfig creates a new Streamer implemented using JetStream with specific configuration.