Documentation
¶
Overview ¶
Package bus implements a publish-subscribe pattern on top of go-que.
Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern.
Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern: - '*' matches any single token in a subject (e.g., "foo.*.baz" matches "foo.bar.baz") - '>' matches one or more tokens at the end of a subject (e.g., "foo.>" matches "foo.bar" and "foo.bar.baz")
The package uses a pluggable architecture with the Dialect interface to support different database backends. A PostgreSQL implementation is provided in the pgbus subpackage.
Index ¶
- Variables
- func ArgsFromMessageRaw(msgRaw json.RawMessage, pattern string) []byte
- func ToRegexPattern(pattern string) (string, error)
- func ValidatePattern(pattern string) error
- func ValidateSubject(subject string) error
- type Bus
- type BusImpl
- func (b *BusImpl) BySubject(ctx context.Context, subject string) ([]Subscription, error)
- func (b *BusImpl) Close() error
- func (b *BusImpl) Dispatch(ctx context.Context, msgs ...*Outbound) (xerr error)
- func (b *BusImpl) Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOption) error
- func (b *BusImpl) Queue(name string) Queue
- type BusOption
- type BusOptions
- type ConsumeOption
- type ConsumeOptions
- type Dialect
- type Handler
- type Header
- type Inbound
- type Message
- type Outbound
- type PlanConfig
- type PublishOption
- type PublishOptions
- type Queue
- type QueueImpl
- func (q *QueueImpl) Close() error
- func (q *QueueImpl) Consume(_ context.Context, handler Handler, opts ...ConsumeOption) error
- func (q *QueueImpl) Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)
- func (q *QueueImpl) Subscriptions(ctx context.Context) ([]Subscription, error)
- type SubscribeOption
- type SubscribeOptions
- type Subscription
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidQueue indicates that the queue name is invalid. ErrInvalidQueue = errors.New("queue is invalid") // ErrInvalidMessage indicates that the message is invalid. ErrInvalidMessage = errors.New("message is invalid") // ErrInvalidSubject indicates that the subject is invalid. ErrInvalidSubject = errors.New("subject is invalid") // ErrInvalidPattern indicates that the pattern is invalid. ErrInvalidPattern = errors.New("pattern is invalid") // ErrQueueClosed indicates an operation was attempted on a closed queue. ErrQueueClosed = errors.New("queue is closed") // ErrBusClosed indicates an operation was attempted on a closed bus. ErrBusClosed = errors.New("bus is closed") // ErrSubscriptionNotFound is returned when no subscription is found. ErrSubscriptionNotFound = errors.New("subscription not found") // ErrWorkerAlreadyRunning is returned when a worker is already processing messages. ErrWorkerAlreadyRunning = errors.New("worker already running for this queue") // ErrOverlappingPatterns is returned when a queue has multiple patterns matching the same subject. ErrOverlappingPatterns = errors.New("queue has overlapping patterns which may cause unexpected behavior; only the first subscription will be triggered") )
Common errors returned by the bus package.
var DefaultConsumeBackOffFactory = func() backoff.BackOff { expBackoff := backoff.NewExponentialBackOff() expBackoff.InitialInterval = 1 * time.Second expBackoff.MaxInterval = 30 * time.Second expBackoff.Multiplier = 2.0 return expBackoff }
DefaultConsumeBackOffFactory generates the default backoff strategy for worker reconnection.
var DefaultMaxEnqueuePerBatch = 100
DefaultMaxEnqueuePerBatch defines the default maximum number of plans that can be enqueued in a single transaction.
var DefaultPlanConfig = PlanConfig{ RetryPolicy: DefaultRetryPolicy, RunAtDelta: 0, UniqueLifecycle: que.Ignore, }
DefaultPlanConfig provides default settings for subscription jobs.
var DefaultRetryPolicy = que.RetryPolicy{ InitialInterval: 30 * time.Second, MaxInterval: 600 * time.Second, NextIntervalMultiplier: 2, IntervalRandomPercent: 33, MaxRetryCount: 3, }
DefaultRetryPolicy provides a default retry policy for published messages.
var DefaultWorkerConfig = WorkerConfig{
MaxLockPerSecond: 5,
MaxBufferJobsCount: 0,
MaxPerformPerSecond: 1000,
MaxConcurrentPerformCount: 200,
}
DefaultWorkerConfig provides default settings for workers.
var UniqueIDGenerator = func() string { return "_gobus_:" + xid.New().String() }
Functions ¶
func ArgsFromMessageRaw ¶
func ArgsFromMessageRaw(msgRaw json.RawMessage, pattern string) []byte
func ToRegexPattern ¶
ToRegexPattern converts a NATS-style wildcard pattern to a regex pattern that follows NATS specification for subject matching. It returns an error if the pattern contains invalid characters or structure.
func ValidatePattern ¶
ValidatePattern validates that a pattern follows NATS wildcard rules. It checks for:
- Empty patterns are not allowed
- Empty tokens (segments between dots) are not allowed
- Only lowercase alphanumeric characters, '_', and '-' are allowed in non-wildcard tokens
- Wildcards can be '*' (single token) or '>' (multiple tokens)
- The '>' wildcard can only appear at the end of a pattern
func ValidateSubject ¶
ValidateSubject validates that a subject follows NATS subject rules. It checks for empty subjects, invalid characters, subjects starting or ending with a dot, and empty tokens.
Types ¶
type Bus ¶
type Bus interface { io.Closer // Queue returns a queue with the specified name. Queue(name string) Queue // Publish sends a payload to all queues with subscriptions matching the subject. Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOption) error // Dispatch sends outbound messages to all queues with subscriptions matching the subject. // All messages are processed in a single transaction. Dispatch(ctx context.Context, msgs ...*Outbound) error // BySubject returns all subscriptions with patterns matching the given subject. BySubject(ctx context.Context, subject string) ([]Subscription, error) }
Bus provides publish-subscribe capabilities on top of go-que. It manages subject-queue mappings and handles subject pattern matching.
type BusImpl ¶
type BusImpl struct {
// contains filtered or unexported fields
}
BusImpl is a generic implementation of the Bus interface.
func (*BusImpl) BySubject ¶
BySubject returns all subscriptions with patterns matching the given subject.
func (*BusImpl) Dispatch ¶
Dispatch sends outbound messages to all queues with subscriptions matching the subject. All messages are processed in a single transaction.
type BusOption ¶
type BusOption func(*BusOptions)
func WithLogger ¶
WithLogger sets the logger for the Bus implementation.
func WithMaxEnqueuePerBatch ¶
WithMaxEnqueuePerBatch sets the maximum number of plans that can be enqueued in a single transaction. If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.
func WithMigrate ¶
WithMigrate sets whether database migrations should be run during initialization.
type BusOptions ¶
type BusOptions struct { // Migrate controls whether database migrations are run during initialization. Migrate bool // Logger is used for logging warnings and errors. If nil, a default logger will be used. Logger *slog.Logger // MaxEnqueuePerBatch limits the maximum number of plans that can be enqueued in a single transaction. // If the number of plans exceeds this limit, they will be split into multiple transactions. // If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used. MaxEnqueuePerBatch int }
BusOptions configures the Bus implementation.
type ConsumeOption ¶
type ConsumeOption func(*ConsumeOptions)
ConsumeOption represents an option for customizing a worker.
func WithBackoff ¶
func WithBackoff(b backoff.BackOff) ConsumeOption
WithBackoff sets the backoff strategy for reconnection.
func WithWorkerConfig ¶
func WithWorkerConfig(config WorkerConfig) ConsumeOption
WithWorkerConfig sets the worker configuration for a worker.
type ConsumeOptions ¶
type ConsumeOptions struct { // WorkerConfig contains the performance-related settings for a worker. WorkerConfig WorkerConfig // Backoff configures the reconnection backoff strategy. // If nil, DefaultBackOffFactory() will be used. Backoff backoff.BackOff }
ConsumeOptions holds all the options for creating a worker.
type Dialect ¶
type Dialect interface { // Migrate performs database migrations to initialize required tables. Migrate(ctx context.Context) error // GoQue returns the underlying GoQue instance. GoQue() que.Queue // BySubject finds all subscriptions with patterns matching the given subject. BySubject(ctx context.Context, subject string) ([]Subscription, error) // ByQueue returns all subscriptions for a specific queue. ByQueue(ctx context.Context, queue string) ([]Subscription, error) // Upsert creates or updates a subscription. Upsert(ctx context.Context, queue, pattern string, planConfig PlanConfig) (Subscription, error) // Delete removes a subscription. Delete(ctx context.Context, queue, pattern string) error // BeginTx starts a transaction. BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) }
Dialect defines the interface for database-specific implementations of the message bus. It abstracts storage operations and allows for different backend databases to be used.
type Inbound ¶
type Inbound struct { // Job is the job that received the message. que.Job `json:"-"` // Message is the message content. Message // Pattern is the pattern this message matches against. Pattern string `json:"pattern"` }
func InboundFromArgs ¶
type Message ¶
type Message struct { // Subject is the topic this message is published to. Subject string `json:"subject"` // Header is the header of the message. Header Header `json:"header,omitempty"` // Payload is the actual content of the message. Payload []byte `json:"payload,omitempty"` }
Message represents a message in the publish-subscribe system.
func (*Message) ToRaw ¶
func (m *Message) ToRaw() json.RawMessage
type PlanConfig ¶
type PlanConfig struct { // RetryPolicy defines how to retry failed job executions. RetryPolicy que.RetryPolicy `json:"retryPolicy"` // RunAtDelta specifies the duration to delay job execution from the time of message receipt. // Zero means execute immediately, positive values mean delayed execution. RunAtDelta time.Duration `json:"runAtDelta"` // UniqueLifecycle controls the uniqueness behavior of the job. UniqueLifecycle que.UniqueLifecycle `json:"uniqueLifecycle"` }
PlanConfig defines how a queue processes messages for a specific subject pattern.
func (PlanConfig) Equal ¶
func (p PlanConfig) Equal(other PlanConfig) bool
Equal compares this PlanConfig with another and returns true if they are equivalent.
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithHeader ¶
func WithHeader(header Header) PublishOption
func WithUniqueID ¶
func WithUniqueID(uniqueID string) PublishOption
type PublishOptions ¶
type Queue ¶
type Queue interface { io.Closer // Subscribe registers the queue to receive messages published to subjects matching the pattern. // Pattern supports NATS-style wildcards: '*' for a single token, '>' for multiple trailing tokens. Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error) // Subscriptions returns all subscriptions for the queue. Subscriptions(ctx context.Context) ([]Subscription, error) // Consume registers the queue to receive messages published to subjects matching the pattern. Consume(ctx context.Context, handler Handler, opts ...ConsumeOption) error }
Queue represents a message queue that can subscribe to subjects.
type QueueImpl ¶
type QueueImpl struct {
// contains filtered or unexported fields
}
QueueImpl implements the Queue interface.
func (*QueueImpl) Subscribe ¶
func (q *QueueImpl) Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)
Subscribe registers the queue to receive messages published to subjects matching the pattern.
func (*QueueImpl) Subscriptions ¶
func (q *QueueImpl) Subscriptions(ctx context.Context) ([]Subscription, error)
Subscriptions returns all subscriptions for the queue.
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption represents an option for customizing a subscription.
func WithPlanConfig ¶
func WithPlanConfig(config PlanConfig) SubscribeOption
WithPlanConfig sets the job configuration for a subscription.
type SubscribeOptions ¶
type SubscribeOptions struct { // PlanConfig contains the settings for job execution. PlanConfig PlanConfig }
SubscribeOptions holds all the options for creating a subscription.
type Subscription ¶
type Subscription interface { // ID returns the unique identifier of the subscription. ID() int64 // Queue returns the name of the queue that receives messages. Queue() string // Pattern returns the subject pattern this subscription matches against. Pattern() string // PlanConfig returns the configuration plan for this subscription. PlanConfig() PlanConfig // Unsubscribe removes this subscription. Unsubscribe(ctx context.Context) error }
Subscription represents an active subscription that can be unsubscribed.
type WorkerConfig ¶
type WorkerConfig struct { // MaxLockPerSecond is maximum frequency of calls to Lock() method of Queue. // Lower number uses lower database CPU resources. MaxLockPerSecond float64 `json:"maxLockPerSecond"` // MaxBufferJobsCount is maximum number of jobs in channel that are waiting for // a goroutine to execute them. MaxBufferJobsCount int `json:"maxBufferJobsCount"` // MaxPerformPerSecond is maximum frequency of Perform executions. MaxPerformPerSecond float64 `json:"maxPerformPerSecond"` // MaxConcurrentPerformCount is maximum number of goroutines executing Perform simultaneously. MaxConcurrentPerformCount int `json:"maxConcurrentPerformCount"` }
WorkerConfig defines performance-related configuration for workers processing messages.