bus

package module
v0.0.0-...-a68e852 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2025 License: MIT Imports: 20 Imported by: 0

README

go-bus

中文版本

A simple and reliable PostgreSQL-based publish/subscribe message bus system for Go applications. Built on top of github.com/tnclong/go-que.

Features

Flexible topic subscription patterns: Support for NATS-style topic matching with exact matching, single-level wildcards (*), and multi-level wildcards (>)
Persistent message queue: PostgreSQL-based storage ensures reliable message delivery
Multiple queue support: Multiple queues can subscribe to the same topic patterns
Custom retry strategies: Each subscription can configure its own message processing retry strategy
Message header support: Support for custom message metadata
Context propagation: Full integration with Go's context package

Installation

go get github.com/qor5/go-bus

For proper functionality, you need to add the following replace directive to your go.mod file:

replace github.com/tnclong/go-que => github.com/molon/go-que v0.0.0-20250417171457-4715a14d4ddb

Quick Start

Creating a Bus Instance
import (
    "database/sql"
    "github.com/qor5/go-bus/pgbus"
    _ "github.com/lib/pq"
)

// Connect to PostgreSQL
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
    log.Fatalf("Failed to connect to database: %v", err)
}

// Create a new bus instance
bus, err := pgbus.New(db)
if err != nil {
    log.Fatalf("Failed to create bus: %v", err)
}
defer bus.Close()
Creating Subscriptions
ctx := context.Background()

// Get a queue
queue := bus.Queue("my_service_queue")

// Create subscriptions - supporting various patterns
exactSub, err := queue.Subscribe(ctx, "orders.created")                // Exact match
wildcardSub, err := queue.Subscribe(ctx, "products.*.category.*.info") // Single-level wildcard at multiple positions
multiLevelSub, err := queue.Subscribe(ctx, "notifications.>")          // Multi-level wildcard

// Subscription with custom configuration
customPlan := bus.PlanConfig{
    RunAtDelta: 200 * time.Millisecond,
    RetryPolicy: que.RetryPolicy{
        InitialInterval:        2 * time.Second,
        MaxInterval:            20 * time.Second,
        NextIntervalMultiplier: 2.0,
        IntervalRandomPercent:  20,
        MaxRetryCount:          5,
    },
}

customSub, err := queue.Subscribe(ctx, "payments.processed", bus.WithPlanConfig(customPlan))
Publishing Messages
// Basic publish
err = bus.Publish(ctx, "orders.created", []byte(`{"id": "12345", "total": 99.99}`))

// Publishing with unique ID (for deduplication)
err = bus.Publish(ctx, "orders.created", []byte(`{"id": "12345", "total": 99.99}`), bus.WithUniqueID("order-12345-created-1"))

// Publishing with headers
err = bus.Publish(ctx, "orders.created", []byte(`{"id": "12345", "total": 99.99}`), bus.WithHeader(bus.Header{
    "Content-Type": []string{"application/json"},
    "X-Request-ID": []string{"req-123456"},
}))

// Publishing with an Outbound object
outbound := &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Header:  bus.Header{"Content-Type": []string{"application/json"}},
        Payload: []byte(`{"id": "12345", "total": 99.99}`),
    },
    UniqueID: "order-12345-created-event", // Optional unique ID for message deduplication
}
err = bus.Dispatch(ctx, outbound)

// Publishing multiple messages at once
outbound1 := &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Payload: []byte(`{"id": "12345", "total": 99.99}`),
    },
    UniqueID: "order-12345-created",
}
outbound2 := &bus.Outbound{
    Message: bus.Message{
        Subject: "notifications.sent",
        Payload: []byte(`{"user_id": "user123", "message": "Your order has been created"}`),
    },
    UniqueID: "notification-user123-order-created",
}
// Dispatch supports publishing multiple outbound messages in a single call
err = bus.Dispatch(ctx, outbound1, outbound2)
Consuming Messages
// Basic consumption
err = queue.Consume(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    fmt.Printf("Received message: subject=%s, payload=%s\n", msg.Subject, string(msg.Payload))

    // Reading headers
    if contentType := msg.Header.Get("Content-Type"); contentType != "" {
        fmt.Printf("Content-Type: %s\n", contentType)
    }

    // Mark message as done after processing
    return msg.Done(ctx)
})

// Consumption with custom worker configuration
workerConfig := bus.WorkerConfig{
    MaxLockPerSecond:          5,
    MaxBufferJobsCount:        10,
    MaxPerformPerSecond:       5,
    MaxConcurrentPerformCount: 2,
}

err = queue.Consume(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    // Process message...

    // If you want to discard the message, use Destroy instead of Done
    return msg.Destroy(ctx)
}, bus.WithWorkerConfig(workerConfig))
Finding Matching Subscriptions
// Find subscriptions matching a specific subject
subs, err := bus.BySubject(ctx, "orders.created")
for _, sub := range subs {
    fmt.Printf("Queue %s matches with pattern %s\n", sub.Queue(), sub.Pattern())
}

// Get all subscriptions for a specific queue
queueSubs, err := queue.Subscriptions(ctx)
for _, sub := range queueSubs {
    fmt.Printf("Pattern: %s, Created at: %s\n", sub.Pattern(), sub.CreatedAt())
}
Unsubscribing
// Unsubscribe from a specific subscription
err = subscription.Unsubscribe(ctx)

Advanced Usage

Using Random Queue Names for Distributed Broadcast Reception

In distributed environments (like Kubernetes), when you need to ensure that each instance receives the same broadcast message, you can create queues with random names for each instance. This way, each instance will independently receive the same message, achieving a broadcast effect.

import (
    "github.com/google/uuid"
    "github.com/qor5/go-bus/pgbus"
    "context"
)

// Create a unique queue name for each service instance (like a K8s Pod)
instanceQueueName := fmt.Sprintf("broadcast-receiver-%s", uuid.New().String())
instanceQueue := bus.Queue(instanceQueueName)

// Subscribe to broadcast topics
sub, err := instanceQueue.Subscribe(ctx, "broadcast.events.>")
if err != nil {
    log.Fatalf("Failed to create broadcast subscription: %v", err)
}

// Start consuming messages (starts workers but doesn't block)
err = instanceQueue.Consume(context.Background(), func(ctx context.Context, msg *bus.Inbound) error {
    log.Printf("Instance %s received broadcast message: %s - %s",
        instanceQueueName, msg.Subject, string(msg.Payload))
    return msg.Done(ctx)
})
if err != nil {
    log.Printf("Failed to start consumer: %v", err)
}

// Clean up resources on service shutdown
shutdown := func() {
    ctx := context.Background()
    if err := sub.Unsubscribe(ctx); err != nil {
        log.Printf("Failed to unsubscribe: %v", err)
    }
    // Other cleanup work...
}

// Register shutdown hooks
// e.g., signal.Notify(...) or k8s graceful shutdown handling

This pattern is particularly useful for:

  • Broadcasting configuration changes or system notifications to all service instances
  • Ensuring each instance in the cluster independently processes the same message, implementing a reliable broadcast mechanism
  • Implementing event-driven system-wide notifications in microservice architectures

Each instance creates a queue with a unique name, so each message is processed independently by each subscribed instance, achieving a true broadcast effect.

Topic Pattern Explanation

go-bus supports three types of topic matching patterns, following the NATS messaging system style:

  1. Exact Match: Matches the exact topic string

    • Example: orders.created only matches orders.created
  2. Single-Level Wildcard (*): Matches any string in a single level

    • Example: products.*.category.*.info matches products.xyz.category.abc.info and products.123.category.456.info, but not products.category.info or products.xyz.category.abc.def.info
  3. Multi-Level Wildcard (>): Matches zero or more levels

    • Example: orders.> matches orders.created, orders.updated, and orders.items.created

Important Notes

Avoid Overlapping Subscription Patterns

Do not subscribe to potentially overlapping patterns in the same queue. When a message matches multiple subscriptions in a queue, the system will only use the configuration (such as retry strategy) from the earliest created subscription and ignore others.

Problem Example

Suppose you create these two subscriptions in the same queue:

// First created subscription - using default configuration
sub1, err := queue.Subscribe(ctx, "orders.>")

// Later created subscription - with custom retry strategy
customPlan := bus.PlanConfig{
    RetryPolicy: que.RetryPolicy{
        MaxRetryCount: 10,
        // Other configurations...
    },
}
sub2, err := queue.Subscribe(ctx, "orders.created", bus.WithPlanConfig(customPlan))

When publishing a message with the subject orders.created:

  • The message matches both patterns: orders.> and orders.created
  • The system will use the configuration from sub1 (the earlier created orders.> subscription)
  • The custom retry strategy in sub2 (MaxRetryCount: 10) will be ignored
Correct Approach

To avoid this issue, use different queues for potentially overlapping patterns:

// First queue handles general orders events
queue1 := bus.Queue("orders_general_queue")
sub1, err := queue1.Subscribe(ctx, "orders.>")

// Second queue specifically handles orders.created events with custom configuration
queue2 := bus.Queue("orders_created_queue")
customPlan := bus.PlanConfig{
    RetryPolicy: que.RetryPolicy{
        MaxRetryCount: 10,
        // Other configurations...
    },
}
sub2, err := queue2.Subscribe(ctx, "orders.created", bus.WithPlanConfig(customPlan))

This way, the two subscriptions will process messages independently in their respective queues, and each configuration will be effective.

License

This project is licensed under the MIT License.

Acknowledgments

This project is based on github.com/tnclong/go-que - a high-performance PostgreSQL backend job queue.

Documentation

Overview

Package bus implements a publish-subscribe pattern on top of go-que.

Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern.

Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern: - '*' matches any single token in a subject (e.g., "foo.*.baz" matches "foo.bar.baz") - '>' matches one or more tokens at the end of a subject (e.g., "foo.>" matches "foo.bar" and "foo.bar.baz")

The package uses a pluggable architecture with the Dialect interface to support different database backends. A PostgreSQL implementation is provided in the pgbus subpackage.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidQueue indicates that the queue name is invalid.
	ErrInvalidQueue = errors.New("queue is invalid")

	// ErrInvalidMessage indicates that the message is invalid.
	ErrInvalidMessage = errors.New("message is invalid")

	// ErrInvalidSubject indicates that the subject is invalid.
	ErrInvalidSubject = errors.New("subject is invalid")

	// ErrInvalidPattern indicates that the pattern is invalid.
	ErrInvalidPattern = errors.New("pattern is invalid")

	// ErrQueueClosed indicates an operation was attempted on a closed queue.
	ErrQueueClosed = errors.New("queue is closed")

	// ErrBusClosed indicates an operation was attempted on a closed bus.
	ErrBusClosed = errors.New("bus is closed")

	// ErrSubscriptionNotFound is returned when no subscription is found.
	ErrSubscriptionNotFound = errors.New("subscription not found")

	// ErrWorkerAlreadyRunning is returned when a worker is already processing messages.
	ErrWorkerAlreadyRunning = errors.New("worker already running for this queue")

	// ErrOverlappingPatterns is returned when a queue has multiple patterns matching the same subject.
	ErrOverlappingPatterns = errors.New("queue has overlapping patterns which may cause unexpected behavior; only the first subscription will be triggered")
)

Common errors returned by the bus package.

View Source
var DefaultConsumeBackOffFactory = func() backoff.BackOff {
	expBackoff := backoff.NewExponentialBackOff()
	expBackoff.InitialInterval = 1 * time.Second
	expBackoff.MaxInterval = 30 * time.Second
	expBackoff.Multiplier = 2.0
	return expBackoff
}

DefaultConsumeBackOffFactory generates the default backoff strategy for worker reconnection.

View Source
var DefaultMaxEnqueuePerBatch = 100

DefaultMaxEnqueuePerBatch defines the default maximum number of plans that can be enqueued in a single transaction.

View Source
var DefaultPlanConfig = PlanConfig{
	RetryPolicy:     DefaultRetryPolicy,
	RunAtDelta:      0,
	UniqueLifecycle: que.Ignore,
}

DefaultPlanConfig provides default settings for subscription jobs.

View Source
var DefaultRetryPolicy = que.RetryPolicy{
	InitialInterval:        30 * time.Second,
	MaxInterval:            600 * time.Second,
	NextIntervalMultiplier: 2,
	IntervalRandomPercent:  33,
	MaxRetryCount:          3,
}

DefaultRetryPolicy provides a default retry policy for published messages.

View Source
var DefaultWorkerConfig = WorkerConfig{
	MaxLockPerSecond:          5,
	MaxBufferJobsCount:        0,
	MaxPerformPerSecond:       1000,
	MaxConcurrentPerformCount: 200,
}

DefaultWorkerConfig provides default settings for workers.

View Source
var UniqueIDGenerator = func() string {
	return "_gobus_:" + xid.New().String()
}

Functions

func ArgsFromMessageRaw

func ArgsFromMessageRaw(msgRaw json.RawMessage, pattern string) []byte

func ToRegexPattern

func ToRegexPattern(pattern string) (string, error)

ToRegexPattern converts a NATS-style wildcard pattern to a regex pattern that follows NATS specification for subject matching. It returns an error if the pattern contains invalid characters or structure.

func ValidatePattern

func ValidatePattern(pattern string) error

ValidatePattern validates that a pattern follows NATS wildcard rules. It checks for:

  • Empty patterns are not allowed
  • Empty tokens (segments between dots) are not allowed
  • Only lowercase alphanumeric characters, '_', and '-' are allowed in non-wildcard tokens
  • Wildcards can be '*' (single token) or '>' (multiple tokens)
  • The '>' wildcard can only appear at the end of a pattern

func ValidateSubject

func ValidateSubject(subject string) error

ValidateSubject validates that a subject follows NATS subject rules. It checks for empty subjects, invalid characters, subjects starting or ending with a dot, and empty tokens.

Types

type Bus

type Bus interface {
	io.Closer

	// Queue returns a queue with the specified name.
	Queue(name string) Queue

	// Publish sends a payload to all queues with subscriptions matching the subject.
	Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOption) error

	// Dispatch sends outbound messages to all queues with subscriptions matching the subject.
	// All messages are processed in a single transaction.
	Dispatch(ctx context.Context, msgs ...*Outbound) error

	// BySubject returns all subscriptions with patterns matching the given subject.
	BySubject(ctx context.Context, subject string) ([]Subscription, error)
}

Bus provides publish-subscribe capabilities on top of go-que. It manages subject-queue mappings and handles subject pattern matching.

func New

func New(dialect Dialect, opts ...BusOption) (Bus, error)

New creates a new Bus instance with the given dialect and options.

dialect is the database dialect used for storing subscriptions. Different database backends can be supported by implementing this interface. A PostgreSQL implementation is provided in the pgbus package.

type BusImpl

type BusImpl struct {
	// contains filtered or unexported fields
}

BusImpl is a generic implementation of the Bus interface.

func (*BusImpl) BySubject

func (b *BusImpl) BySubject(ctx context.Context, subject string) ([]Subscription, error)

BySubject returns all subscriptions with patterns matching the given subject.

func (*BusImpl) Close

func (b *BusImpl) Close() error

Close closes the bus and releases any associated resources.

func (*BusImpl) Dispatch

func (b *BusImpl) Dispatch(ctx context.Context, msgs ...*Outbound) (xerr error)

Dispatch sends outbound messages to all queues with subscriptions matching the subject. All messages are processed in a single transaction.

func (*BusImpl) Publish

func (b *BusImpl) Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOption) error

Publish sends a payload to all queues with subscriptions matching the subject.

func (*BusImpl) Queue

func (b *BusImpl) Queue(name string) Queue

Queue returns a queue with the specified name.

type BusOption

type BusOption func(*BusOptions)

func WithLogger

func WithLogger(logger *slog.Logger) BusOption

WithLogger sets the logger for the Bus implementation.

func WithMaxEnqueuePerBatch

func WithMaxEnqueuePerBatch(max int) BusOption

WithMaxEnqueuePerBatch sets the maximum number of plans that can be enqueued in a single transaction. If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.

func WithMigrate

func WithMigrate(migrate bool) BusOption

WithMigrate sets whether database migrations should be run during initialization.

type BusOptions

type BusOptions struct {
	// Migrate controls whether database migrations are run during initialization.
	Migrate bool

	// Logger is used for logging warnings and errors. If nil, a default logger will be used.
	Logger *slog.Logger

	// MaxEnqueuePerBatch limits the maximum number of plans that can be enqueued in a single transaction.
	// If the number of plans exceeds this limit, they will be split into multiple transactions.
	// If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.
	MaxEnqueuePerBatch int
}

BusOptions configures the Bus implementation.

type ConsumeOption

type ConsumeOption func(*ConsumeOptions)

ConsumeOption represents an option for customizing a worker.

func WithBackoff

func WithBackoff(b backoff.BackOff) ConsumeOption

WithBackoff sets the backoff strategy for reconnection.

func WithWorkerConfig

func WithWorkerConfig(config WorkerConfig) ConsumeOption

WithWorkerConfig sets the worker configuration for a worker.

type ConsumeOptions

type ConsumeOptions struct {
	// WorkerConfig contains the performance-related settings for a worker.
	WorkerConfig WorkerConfig

	// Backoff configures the reconnection backoff strategy.
	// If nil, DefaultBackOffFactory() will be used.
	Backoff backoff.BackOff
}

ConsumeOptions holds all the options for creating a worker.

type Dialect

type Dialect interface {
	// Migrate performs database migrations to initialize required tables.
	Migrate(ctx context.Context) error

	// GoQue returns the underlying GoQue instance.
	GoQue() que.Queue

	// BySubject finds all subscriptions with patterns matching the given subject.
	BySubject(ctx context.Context, subject string) ([]Subscription, error)

	// ByQueue returns all subscriptions for a specific queue.
	ByQueue(ctx context.Context, queue string) ([]Subscription, error)

	// Upsert creates or updates a subscription.
	Upsert(ctx context.Context, queue, pattern string, planConfig PlanConfig) (Subscription, error)

	// Delete removes a subscription.
	Delete(ctx context.Context, queue, pattern string) error

	// BeginTx starts a transaction.
	BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
}

Dialect defines the interface for database-specific implementations of the message bus. It abstracts storage operations and allows for different backend databases to be used.

type Handler

type Handler func(ctx context.Context, msg *Inbound) error

Handler represents a function that processes messages.

type Header = http.Header

type Inbound

type Inbound struct {
	// Job is the job that received the message.
	que.Job `json:"-"`

	// Message is the message content.
	Message

	// Pattern is the pattern this message matches against.
	Pattern string `json:"pattern"`
}

func InboundFromArgs

func InboundFromArgs(args []byte) (*Inbound, error)

type Message

type Message struct {
	// Subject is the topic this message is published to.
	Subject string `json:"subject"`

	// Header is the header of the message.
	Header Header `json:"header,omitempty"`

	// Payload is the actual content of the message.
	Payload []byte `json:"payload,omitempty"`
}

Message represents a message in the publish-subscribe system.

func (*Message) ToRaw

func (m *Message) ToRaw() json.RawMessage

type Outbound

type Outbound struct {
	// Message is the message content.
	Message

	// UniqueID is the unique identifier for this message.
	// If set, it will be used for message deduplication.
	UniqueID string `json:"uniqueId,omitempty"`
}

type PlanConfig

type PlanConfig struct {
	// RetryPolicy defines how to retry failed job executions.
	RetryPolicy que.RetryPolicy `json:"retryPolicy"`

	// RunAtDelta specifies the duration to delay job execution from the time of message receipt.
	// Zero means execute immediately, positive values mean delayed execution.
	RunAtDelta time.Duration `json:"runAtDelta"`

	// UniqueLifecycle controls the uniqueness behavior of the job.
	UniqueLifecycle que.UniqueLifecycle `json:"uniqueLifecycle"`
}

PlanConfig defines how a queue processes messages for a specific subject pattern.

func (PlanConfig) Equal

func (p PlanConfig) Equal(other PlanConfig) bool

Equal compares this PlanConfig with another and returns true if they are equivalent.

type PublishOption

type PublishOption func(*PublishOptions)

func WithHeader

func WithHeader(header Header) PublishOption

func WithUniqueID

func WithUniqueID(uniqueID string) PublishOption

type PublishOptions

type PublishOptions struct {
	Header   Header
	UniqueID string
}

type Queue

type Queue interface {
	io.Closer

	// Subscribe registers the queue to receive messages published to subjects matching the pattern.
	// Pattern supports NATS-style wildcards: '*' for a single token, '>' for multiple trailing tokens.
	Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)

	// Subscriptions returns all subscriptions for the queue.
	Subscriptions(ctx context.Context) ([]Subscription, error)

	// Consume registers the queue to receive messages published to subjects matching the pattern.
	Consume(ctx context.Context, handler Handler, opts ...ConsumeOption) error
}

Queue represents a message queue that can subscribe to subjects.

type QueueImpl

type QueueImpl struct {
	// contains filtered or unexported fields
}

QueueImpl implements the Queue interface.

func (*QueueImpl) Close

func (q *QueueImpl) Close() error

Close closes the queue and stops any running workers.

func (*QueueImpl) Consume

func (q *QueueImpl) Consume(_ context.Context, handler Handler, opts ...ConsumeOption) error

Consume registers a worker to process messages for this queue.

func (*QueueImpl) Subscribe

func (q *QueueImpl) Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)

Subscribe registers the queue to receive messages published to subjects matching the pattern.

func (*QueueImpl) Subscriptions

func (q *QueueImpl) Subscriptions(ctx context.Context) ([]Subscription, error)

Subscriptions returns all subscriptions for the queue.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption represents an option for customizing a subscription.

func WithPlanConfig

func WithPlanConfig(config PlanConfig) SubscribeOption

WithPlanConfig sets the job configuration for a subscription.

type SubscribeOptions

type SubscribeOptions struct {
	// PlanConfig contains the settings for job execution.
	PlanConfig PlanConfig
}

SubscribeOptions holds all the options for creating a subscription.

type Subscription

type Subscription interface {
	// ID returns the unique identifier of the subscription.
	ID() int64

	// Queue returns the name of the queue that receives messages.
	Queue() string

	// Pattern returns the subject pattern this subscription matches against.
	Pattern() string

	// PlanConfig returns the configuration plan for this subscription.
	PlanConfig() PlanConfig

	// Unsubscribe removes this subscription.
	Unsubscribe(ctx context.Context) error
}

Subscription represents an active subscription that can be unsubscribed.

type WorkerConfig

type WorkerConfig struct {
	// MaxLockPerSecond is maximum frequency of calls to Lock() method of Queue.
	// Lower number uses lower database CPU resources.
	MaxLockPerSecond float64 `json:"maxLockPerSecond"`

	// MaxBufferJobsCount is maximum number of jobs in channel that are waiting for
	// a goroutine to execute them.
	MaxBufferJobsCount int `json:"maxBufferJobsCount"`

	// MaxPerformPerSecond is maximum frequency of Perform executions.
	MaxPerformPerSecond float64 `json:"maxPerformPerSecond"`

	// MaxConcurrentPerformCount is maximum number of goroutines executing Perform simultaneously.
	MaxConcurrentPerformCount int `json:"maxConcurrentPerformCount"`
}

WorkerConfig defines performance-related configuration for workers processing messages.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL