Documentation
¶
Index ¶
- Constants
- Variables
- func SettingsToURI(settings *ConnectionSettings) string
- type AMQPError
- type Action
- type Binding
- type BindingOptions
- type Config
- type Connection
- func (c *Connection) Close() error
- func (c *Connection) DecodeDeliveryBody(delivery Delivery, v any) error
- func (c *Connection) InspectQueue(name string) (*QueueInfo, error)
- func (c *Connection) Name() string
- func (c *Connection) NotifyErrors() <-chan error
- func (c *Connection) Recover() error
- func (c *Connection) RemoveBinding(queueName, routingKey, exchangeName string, args Table) error
- func (c *Connection) RemoveExchange(name string, ifUnused, noWait bool) error
- func (c *Connection) RemoveQueue(name string, ifUnused, ifEmpty, noWait bool) (int, error)
- func (c *Connection) Renew(uri ...string) error
- func (c *Connection) SetBackOffFactor(factor int)
- func (c *Connection) SetDecoder(decoder JSONDecoder)
- func (c *Connection) SetEncoder(encoder JSONEncoder)
- func (c *Connection) SetLoggers(loggers ...Logger)
- func (c *Connection) SetMaxRecoveryRetries(maxRetries int)
- func (c *Connection) SetRecoveryInterval(interval time.Duration)
- func (c *Connection) SetReturnHandler(returnHandler ReturnHandler)
- type ConnectionOption
- func WithConnectionOptionAMQPConfig(config *Config) ConnectionOption
- func WithConnectionOptionBackOffFactor(factor int) ConnectionOption
- func WithConnectionOptionConnectionName(name string) ConnectionOption
- func WithConnectionOptionDecoder(decoder JSONDecoder) ConnectionOption
- func WithConnectionOptionEncoder(encoder JSONEncoder) ConnectionOption
- func WithConnectionOptionLoggers(loggers ...Logger) ConnectionOption
- func WithConnectionOptionMaxRecoveryRetries(maxRetries int) ConnectionOption
- func WithConnectionOptionPrefetchCount(count int) ConnectionOption
- func WithConnectionOptionRecoveryInterval(interval time.Duration) ConnectionOption
- func WithConnectionOptionReturnHandler(returnHandler ReturnHandler) ConnectionOption
- func WithCustomConnectionOptions(options *ConnectionOptions) ConnectionOption
- type ConnectionOptions
- type ConnectionSettings
- type ConsumeOption
- func WithBindingOptionCustomBinding(binding Binding) ConsumeOption
- func WithConsumerOptionConsumeAfterCreation(consumeAfterCreation bool) ConsumeOption
- func WithConsumerOptionConsumerAutoAck(autoAck bool) ConsumeOption
- func WithConsumerOptionConsumerExclusive(exclusive bool) ConsumeOption
- func WithConsumerOptionConsumerName(consumerName string) ConsumeOption
- func WithConsumerOptionDeadLetterRetry(options *RetryOptions) ConsumeOption
- func WithConsumerOptionHandlerQuantity(concurrency int) ConsumeOption
- func WithConsumerOptionNoWait(noWait bool) ConsumeOption
- func WithConsumerOptionRoutingKey(routingKey string) ConsumeOption
- func WithCustomConsumeOptions(options *ConsumeOptions) ConsumeOption
- func WithExchangeOptionArgs(args Table) ConsumeOption
- func WithExchangeOptionAutoDelete(autoDelete bool) ConsumeOption
- func WithExchangeOptionDeclare(declare bool) ConsumeOption
- func WithExchangeOptionDurable(durable bool) ConsumeOption
- func WithExchangeOptionInternal(internal bool) ConsumeOption
- func WithExchangeOptionKind(kind string) ConsumeOption
- func WithExchangeOptionName(name string) ConsumeOption
- func WithExchangeOptionNoWait(noWait bool) ConsumeOption
- func WithExchangeOptionPassive(passive bool) ConsumeOption
- func WithQueueOptionArgs(args Table) ConsumeOption
- func WithQueueOptionAutoDelete(autoDelete bool) ConsumeOption
- func WithQueueOptionDeclare(declare bool) ConsumeOption
- func WithQueueOptionDurable(durable bool) ConsumeOption
- func WithQueueOptionExclusive(exclusive bool) ConsumeOption
- func WithQueueOptionNoWait(noWait bool) ConsumeOption
- func WithQueueOptionPassive(passive bool) ConsumeOption
- func WithQueueOptionPriority(maxPriority Priority) ConsumeOption
- func WithQueueOptionQuorum() ConsumeOption
- type ConsumeOptions
- type Consumer
- type ConsumerOptions
- type Delivery
- type DeliveryMode
- type ExchangeOptions
- type HandlerFunc
- type JSONDecoder
- type JSONEncoder
- type Logger
- type MaxRetriesExceededHandler
- type Priority
- type PublishOptions
- type Publisher
- func (p *Publisher) Close() error
- func (p *Publisher) Publish(ctx context.Context, target string, data any) error
- func (p *Publisher) PublishCachedMessages(ctx context.Context, cacheLen int) error
- func (p *Publisher) PublishWithOptions(ctx context.Context, targets []string, data any, options ...PublisherOption) error
- type PublisherOption
- func WithCustomPublishOptions(options *PublisherOptions) PublisherOption
- func WithPublishOptionAppID(appID string) PublisherOption
- func WithPublishOptionContentEncoding(contentEncoding string) PublisherOption
- func WithPublishOptionContentType(contentType string) PublisherOption
- func WithPublishOptionDeliveryMode(deliveryMode DeliveryMode) PublisherOption
- func WithPublishOptionExchange(exchange string) PublisherOption
- func WithPublishOptionExpiration(expiration string) PublisherOption
- func WithPublishOptionHeaders(headers Table) PublisherOption
- func WithPublishOptionMandatory(mandatory bool) PublisherOption
- func WithPublishOptionMessageID(messageID string) PublisherOption
- func WithPublishOptionPriority(priority Priority) PublisherOption
- func WithPublishOptionReplyTo(replyTo string) PublisherOption
- func WithPublishOptionTimestamp(timestamp time.Time) PublisherOption
- func WithPublishOptionTracing(correlationID string) PublisherOption
- func WithPublishOptionType(messageType string) PublisherOption
- func WithPublishOptionUserID(userID string) PublisherOption
- func WithPublisherOptionPublisherName(publisherName string) PublisherOption
- func WithPublisherOptionPublishingCache(cache PublishingCache) PublisherOption
- type PublisherOptions
- type Publishing
- type PublishingCache
- type QueueInfo
- type QueueOptions
- type RecoveryFailedError
- type RetryOptions
- type Return
- type ReturnHandler
- type SlogLogger
- func (s *SlogLogger) Debug(ctx context.Context, msg string, attrs ...any)
- func (s *SlogLogger) Error(ctx context.Context, msg string, err error, attrs ...any)
- func (s *SlogLogger) Info(ctx context.Context, msg string, attrs ...any)
- func (s *SlogLogger) Warn(ctx context.Context, msg string, attrs ...any)
- type Table
Constants ¶
const ( // Constant for RabbitMQ's default exchange (direct exchange). ExchangeDefault string = amqp.DefaultExchange // Constant for standard AMQP 0-9-1 direct exchange type. ExchangeDirect string = amqp.ExchangeDirect // Constant for standard AMQP 0-9-1 fanout exchange type. ExchangeFanout string = amqp.ExchangeFanout // Constant for standard AMQP 0-9-1 topic exchange type. ExchangeTopic string = amqp.ExchangeTopic // Constant for standard AMQP 0-9-1 headers exchange type. ExchangeHeaders string = amqp.ExchangeHeaders )
const ( ArgDLX string = "x-dead-letter-exchange" ArgDLK string = "x-dead-letter-routing-key" ArgTTL string = "x-message-ttl" )
Variables ¶
var ErrCacheNotSet = errors.New("publishing cache is not set")
ErrCacheNotSet occurs when the publishing cache is not set.
var ErrConsumerAlreadyRunning = errors.New("consumer is running")
ErrConsumerAlreadyRunning occurs when the consumer is attempted to be started but already running.
var ErrHealthyConnection = errors.New("connection is healthy, no need to recover")
ErrHealthyConnection occurs when a manual recovery is triggered but the connection persists.
var ErrInvalidConnection = errors.New("invalid connection")
ErrInvalidConnection occurs when an invalid connection is passed to a publisher or a consumer.
var ErrMaxRetriesExceeded = errors.New("max retries exceeded")
ErrMaxRetriesExceeded occurs when the maximum number of retries exceeds.
var ErrNoActiveConnection = errors.New("no active connection to broker")
ErrNoActiveConnection occurs when there is no active connection while trying to get the failed recovery notification channel.
var ErrPublishFailedChannelClosed = errors.New("channel is closed")
ErrPublishFailedChannelClosed occurs when the channel is accessed while being closed.
var ErrPublishFailedChannelClosedCached = errors.New("channel is closed: publishing was cached")
ErrPublishFailedChannelClosedCached occurs when the channel is accessed while being closed but publishing was cached.
Functions ¶
func SettingsToURI ¶
func SettingsToURI(settings *ConnectionSettings) string
SettingsToURI can be used to convert a ConnectionSettings struct to a valid AMQP URI to ensure correct escaping.
Types ¶
type Action ¶
type Action int
Action is an action that occurs after processed this delivery.
const ( // Ack default ack this msg after you have successfully processed this delivery. Ack Action = iota // NackDiscard the message will be dropped or delivered to a broker configured dead-letter queue. NackDiscard // NackRequeue deliver this message to a different consumer. NackRequeue // Message acknowledgement is left to the user using the msg.Ack() method. Manual )
type Binding ¶
type Binding struct { *BindingOptions RoutingKey string QueueName string ExchangeName string }
Binding describes the binding of a queue to a routing key to an exchange.
type BindingOptions ¶
type BindingOptions struct { // Are used by plugins and broker-specific features such as message TTL, queue length limit, etc. Args Table // If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception. NoWait bool // If true, the binding will be declared if it does not already exist. Declare bool }
BindingOptions describes the options a binding can have.
type Config ¶
Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection(uri string, options ...ConnectionOption) (*Connection, error)
NewConnection creates a new connection.
Must be closed with the Close() method to conserve resources!
func (*Connection) Close ¶
func (c *Connection) Close() error
Close gracefully closes the connection to the broker.
func (*Connection) DecodeDeliveryBody ¶
func (c *Connection) DecodeDeliveryBody(delivery Delivery, v any) error
DecodeDeliveryBody can be used to decode the body of a delivery into v.
func (*Connection) InspectQueue ¶ added in v2.2.0
func (c *Connection) InspectQueue(name string) (*QueueInfo, error)
func (*Connection) Name ¶
func (c *Connection) Name() string
Name returns the name of the connection if specified, otherwise returns an empty string.
func (*Connection) NotifyErrors ¶
func (c *Connection) NotifyErrors() <-chan error
NotifyErrors returns a channel that will return an errors that happen concurrently.
func (*Connection) Recover ¶
func (c *Connection) Recover() error
Recover can be used to manually start the recovery.
func (*Connection) RemoveBinding ¶
func (c *Connection) RemoveBinding(queueName, routingKey, exchangeName string, args Table) error
RemoveBinding removes a binding between an exchange and queue matching the key and arguments.
It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.
func (*Connection) RemoveExchange ¶
func (c *Connection) RemoveExchange(name string, ifUnused, noWait bool) error
RemoveExchange removes the named exchange from the broker. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.
When ifUnused is true, the broker will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the broker does not delete it but close the channel with an exception instead. Set this to true if you are not the sole owner of the exchange.
When noWait is true, do not wait for a broker confirmation that the exchange has been deleted. Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.
func (*Connection) RemoveQueue ¶
func (c *Connection) RemoveQueue(name string, ifUnused, ifEmpty, noWait bool) (int, error)
RemoveQueue removes the queue from the broker including all bindings then purges the messages based on broker configuration, returning the number of messages purged.
When ifUnused is true, the queue will not be deleted if there are any consumers on the queue. If there are consumers, an error will be returned and the channel will be closed.
When ifEmpty is true, the queue will not be deleted if there are any messages remaining on the queue. If there are messages, an error will be returned and the channel will be closed.
func (*Connection) Renew ¶
func (c *Connection) Renew(uri ...string) error
Renew can be used to establish a new connection. If new URI is provided, it will be used to renew the connection instead of the current URI.
func (*Connection) SetBackOffFactor ¶
func (c *Connection) SetBackOffFactor(factor int)
SetBackOffFactor sets the exponential back-off factor.
Default: 2.
func (*Connection) SetDecoder ¶
func (c *Connection) SetDecoder(decoder JSONDecoder)
SetReturnHandler provides possibility to set the json decoder.
func (*Connection) SetEncoder ¶
func (c *Connection) SetEncoder(encoder JSONEncoder)
SetReturnHandler provides possibility to set the json encoder.
func (*Connection) SetLoggers ¶
func (c *Connection) SetLoggers(loggers ...Logger)
SetLoggers provides possibility to add loggers.
func (*Connection) SetMaxRecoveryRetries ¶
func (c *Connection) SetMaxRecoveryRetries(maxRetries int)
SetMaxRecoveryRetries sets the limit for maximum retries.
Default: 10.
func (*Connection) SetRecoveryInterval ¶
func (c *Connection) SetRecoveryInterval(interval time.Duration)
SetRecoveryInterval sets the recovery interval.
Default: 1s.
func (*Connection) SetReturnHandler ¶
func (c *Connection) SetReturnHandler(returnHandler ReturnHandler)
SetReturnHandler provides possibility to add a return handler.
type ConnectionOption ¶
type ConnectionOption func(*ConnectionOptions)
ConnectionOption is an option for a Connection.
func WithConnectionOptionAMQPConfig ¶
func WithConnectionOptionAMQPConfig(config *Config) ConnectionOption
WithConnectionOptionAMQPConfig sets the amqp.Config that will be used to create the connection.
Warning: this will override any values set in the connection config.
func WithConnectionOptionBackOffFactor ¶
func WithConnectionOptionBackOffFactor(factor int) ConnectionOption
WithConnectionOptionBackOffFactor sets the exponential back-off factor.
Default: 2.
func WithConnectionOptionConnectionName ¶
func WithConnectionOptionConnectionName(name string) ConnectionOption
WithConnectionOptionConnectionName sets the name of the connection.
func WithConnectionOptionDecoder ¶
func WithConnectionOptionDecoder(decoder JSONDecoder) ConnectionOption
WithConnectionOptionDecoder sets the decoder that will be used to decode messages.
func WithConnectionOptionEncoder ¶
func WithConnectionOptionEncoder(encoder JSONEncoder) ConnectionOption
WithConnectionOptionEncoder sets the encoder that will be used to encode messages.
func WithConnectionOptionLoggers ¶
func WithConnectionOptionLoggers(loggers ...Logger) ConnectionOption
WithConnectionOptionLoggers adds multiple loggers.
func WithConnectionOptionMaxRecoveryRetries ¶
func WithConnectionOptionMaxRecoveryRetries(maxRetries int) ConnectionOption
WithConnectionOptionMaxRecoveryRetries sets the limit for maximum retries.
Default: 10.
func WithConnectionOptionPrefetchCount ¶
func WithConnectionOptionPrefetchCount(count int) ConnectionOption
WithConnectionOptionPrefetchCount sets the number of messages that will be prefetched.
func WithConnectionOptionRecoveryInterval ¶
func WithConnectionOptionRecoveryInterval(interval time.Duration) ConnectionOption
WithConnectionOptionRecoveryInterval sets the initial recovery interval.
Default: 1s.
func WithConnectionOptionReturnHandler ¶
func WithConnectionOptionReturnHandler(returnHandler ReturnHandler) ConnectionOption
WithConnectionOptionReturnHandler sets an Handler that can be used to handle undeliverable publishes.
When a publish is undeliverable from being mandatory, it will be returned and can be handled by this return handler.
func WithCustomConnectionOptions ¶
func WithCustomConnectionOptions(options *ConnectionOptions) ConnectionOption
WithCustomConnectionOptions sets the connection options.
It can be used to set all connection options at once.
type ConnectionOptions ¶
type ConnectionOptions struct { ReturnHandler Config *Config PrefetchCount int RecoveryInterval time.Duration MaxRecoveryRetries int BackOffFactor int // contains filtered or unexported fields }
ConnectionOptions are used to describe how a new connection will be created.
type ConnectionSettings ¶
type ConnectionSettings struct { // UserName contains the username of the broker user. UserName string // Password contains the password of the broker user. Password string // Host contains the hostname or ip of the broker. Host string // Post contains the port number the broker is listening on. Port int }
ConnectionSettings holds settings for a broker connection.
func (*ConnectionSettings) ToURI ¶
func (c *ConnectionSettings) ToURI() string
ToURI returns the URI representation of the ConnectionSettings. Includes url escaping for safe usage.
type ConsumeOption ¶
type ConsumeOption func(*ConsumeOptions)
ConsumeOption is an option for a Consumer.
func WithBindingOptionCustomBinding ¶
func WithBindingOptionCustomBinding(binding Binding) ConsumeOption
WithBindingOptionCustomBinding adds a new binding to the queue which allows you to set the binding options on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to the zero value. If you want to declare your bindings for example, be sure to set Declare=true.
func WithConsumerOptionConsumeAfterCreation ¶
func WithConsumerOptionConsumeAfterCreation(consumeAfterCreation bool) ConsumeOption
WithConsumerOptionConsumeAfterCreation sets the consume after creation property of the consumer. If true the consumer will immediately start consuming messages from the queue after creation.
func WithConsumerOptionConsumerAutoAck ¶
func WithConsumerOptionConsumerAutoAck(autoAck bool) ConsumeOption
WithConsumerOptionConsumerAutoAck sets the auto acknowledge property of the consumer.
Default: false.
func WithConsumerOptionConsumerExclusive ¶
func WithConsumerOptionConsumerExclusive(exclusive bool) ConsumeOption
WithConsumerOptionConsumerExclusive sets the exclusive property of this consumer, which means the broker will ensure that this is the only consumer from this queue. When exclusive is false, the broker will fairly distribute deliveries across multiple consumers.
Default: false.
func WithConsumerOptionConsumerName ¶
func WithConsumerOptionConsumerName(consumerName string) ConsumeOption
WithConsumerOptionConsumerName sets the name of the consumer.
If unset a random name will be given.
func WithConsumerOptionDeadLetterRetry ¶
func WithConsumerOptionDeadLetterRetry(options *RetryOptions) ConsumeOption
WithConsumerOptionDeadLetterRetry enables the dead letter retry.
For each `delay` a dead letter queue will be declared.
After exceeding `maxRetries` the delivery will be dropped.
func WithConsumerOptionHandlerQuantity ¶
func WithConsumerOptionHandlerQuantity(concurrency int) ConsumeOption
WithConsumerOptionHandlerQuantity sets the number of message handlers, that will run concurrently.
func WithConsumerOptionNoWait ¶
func WithConsumerOptionNoWait(noWait bool) ConsumeOption
WithConsumerOptionNoWait sets the exclusive no-wait property of this consumer, which means it does not wait for the broker to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.
Default: false.
func WithConsumerOptionRoutingKey ¶
func WithConsumerOptionRoutingKey(routingKey string) ConsumeOption
WithConsumerOptionRoutingKey binds the queue to a routing key with the default binding options.
func WithCustomConsumeOptions ¶
func WithCustomConsumeOptions(options *ConsumeOptions) ConsumeOption
WithCustomConsumeOptions sets the consumer options.
It can be used to set all consumer options at once.
func WithExchangeOptionArgs ¶
func WithExchangeOptionArgs(args Table) ConsumeOption
WithExchangeOptionArgs adds optional args to the exchange.
func WithExchangeOptionAutoDelete ¶
func WithExchangeOptionAutoDelete(autoDelete bool) ConsumeOption
WithExchangeOptionAutoDelete sets whether the exchange is an auto-delete exchange.
Default: false.
func WithExchangeOptionDeclare ¶
func WithExchangeOptionDeclare(declare bool) ConsumeOption
WithExchangeOptionDeclare sets whether the exchange should be declared on startup if it doesn't already exist.
Default: false.
func WithExchangeOptionDurable ¶
func WithExchangeOptionDurable(durable bool) ConsumeOption
WithExchangeOptionDurable sets whether the exchange is a durable exchange.
Default: false.
func WithExchangeOptionInternal ¶
func WithExchangeOptionInternal(internal bool) ConsumeOption
WithExchangeOptionInternal sets whether the exchange is an internal exchange.
Default: false.
func WithExchangeOptionKind ¶
func WithExchangeOptionKind(kind string) ConsumeOption
WithExchangeOptionKind ensures the queue is a durable queue.
func WithExchangeOptionName ¶
func WithExchangeOptionName(name string) ConsumeOption
WithExchangeOptionName sets the exchange name.
func WithExchangeOptionNoWait ¶
func WithExchangeOptionNoWait(noWait bool) ConsumeOption
WithExchangeOptionNoWait sets whether the exchange is a no-wait exchange.
Default: false.
func WithExchangeOptionPassive ¶
func WithExchangeOptionPassive(passive bool) ConsumeOption
WithExchangeOptionPassive sets whether the exchange is a passive exchange.
Default: false.
func WithQueueOptionArgs ¶
func WithQueueOptionArgs(args Table) ConsumeOption
WithQueueOptionArgs adds optional args to the queue.
func WithQueueOptionAutoDelete ¶
func WithQueueOptionAutoDelete(autoDelete bool) ConsumeOption
WithQueueOptionAutoDelete sets whether the queue is an auto-delete queue.
Default: false.
func WithQueueOptionDeclare ¶
func WithQueueOptionDeclare(declare bool) ConsumeOption
WithQueueOptionDeclare sets whether the queue should be declared upon startup if it doesn't already exist.
Default: true.
func WithQueueOptionDurable ¶
func WithQueueOptionDurable(durable bool) ConsumeOption
WithQueueOptionDurable sets whether the queue is a durable queue.
Default: false.
func WithQueueOptionExclusive ¶
func WithQueueOptionExclusive(exclusive bool) ConsumeOption
WithQueueOptionExclusive sets whether the queue is an exclusive queue.
Default: false.
func WithQueueOptionNoWait ¶
func WithQueueOptionNoWait(noWait bool) ConsumeOption
WithQueueOptionNoWait sets whether the queue is a no-wait queue.
Default: false.
func WithQueueOptionPassive ¶
func WithQueueOptionPassive(passive bool) ConsumeOption
WithQueueOptionPassive sets whether the queue is a passive queue.
Default: false.
func WithQueueOptionPriority ¶
func WithQueueOptionPriority(maxPriority Priority) ConsumeOption
WithQueueOptionPriority if set a priority queue will be declared with the given maximum priority.
func WithQueueOptionQuorum ¶
func WithQueueOptionQuorum() ConsumeOption
WithQueueOptionQuorum sets the queue quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability.
type ConsumeOptions ¶
type ConsumeOptions struct { ConsumerOptions *ConsumerOptions QueueOptions *QueueOptions ExchangeOptions *ExchangeOptions RetryOptions *RetryOptions Bindings []Binding // The number of message handlers, that will run concurrently. HandlerQuantity int // If true, the consumer will start consuming messages instantly after successful creation. // Default: false. ConsumeAfterCreation bool }
ConsumeOptions are used to describe how a new consumer will be configured.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a consumer for AMQP messages.
func NewConsumer ¶
func NewConsumer(conn *Connection, queueName string, handler HandlerFunc, options ...ConsumeOption) (*Consumer, error)
NewConsumer creates a new Consumer instance. Options can be passed to customize the behavior of the Consumer.
type ConsumerOptions ¶
type ConsumerOptions struct { // Application or exchange specific fields, // the headers exchange will inspect this field. Args Table // The name of the consumer / consumer-tag. Name string // Auto client acknowledgment for each message. AutoAck bool // Ensures that this is the sole consumer from the queue. Exclusive bool // If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception. NoWait bool }
ConsumerOptions are used to configure the consumer.
type Delivery ¶
Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the broker to a consumer from Channel. Consume or Channel.Get.
type DeliveryMode ¶
type DeliveryMode uint8
The delivery mode of a message can be either transient or persistent.
const ( // TransientDelivery indicates that the message should be published as transient message. TransientDelivery DeliveryMode = iota + 1 // PersistentDelivery indicates that the message should be published as persistent message. PersistentDelivery )
type ExchangeOptions ¶
type ExchangeOptions struct { // Are used by plugins and broker-specific features such as message TTL, queue length limit, etc. Args Table // Exchange name. Name string // Exchange type. Possible values: empty string for default exchange or direct, topic, fanout Kind string // If true, the exchange survives broker restart. Durable bool // If true, the exchange is deleted when last queue is unbound from it. AutoDelete bool // If yes, clients cannot publish to this exchange directly. It can only be used with exchange to exchange bindings. Internal bool // If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception. NoWait bool // If false, a missing exchange will be created on the broker. Passive bool // If true, the exchange will be created only if it does not already exist. Declare bool }
ExchangeOptions are used to configure an exchange. If the Passive flag is set the client will only check if the exchange exists on the broker and that the settings match, no creation attempt will be made.
type HandlerFunc ¶
HandlerFunc defines the handler of each Delivery and return Action.
type JSONDecoder ¶
JSONDecoder parses JSON-encoded data and stores the result in the value pointed to by v. If v is nil or not a pointer, Unmarshal returns an InvalidUnmarshalError.
type JSONEncoder ¶
JSONEncoder returns the JSON encoding of v.
type Logger ¶
type Logger interface { Debug(ctx context.Context, msg string, args ...any) Error(ctx context.Context, msg string, err error, args ...any) Info(ctx context.Context, msg string, args ...any) Warn(ctx context.Context, msg string, args ...any) }
Logger is an interface that is be used for log messages.
type Priority ¶
type Priority uint8
Priority of a message can be either no priority, lowest, low, medium, high or highest.
const ( // NoPriority indicates that the message should be published with no priority. NoPriority Priority = iota // LowestPriority indicates that the message should be published with lowest priority. LowestPriority // LowPriority indicates that the message should be published with low priority. LowPriority // NormalPriority indicates that the message should be published with normal priority. MediumPriority // HighPriority indicates that the message should be published with high priority. HighPriority // HighestPriority indicates that the message should be published with highest priority. HighestPriority )
type PublishOptions ¶
type PublishOptions struct { // Application or exchange specific fields, // the headers exchange will inspect this field. Headers Table // Message timestamp. Timestamp time.Time // Exchange name. Exchange string // MIME content type. ContentType string // Expiration time in ms that a message will expire from a queue. // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers Expiration string // MIME content encoding. ContentEncoding string // Correlation identifier. CorrelationID string // Address to reply to (ex: RPC). ReplyTo string // Message identifier. MessageID string // Message type name. Type string // Creating user id - default: "guest". UserID string // creating application id. AppID string // Mandatory fails to publish if there are no queues // bound to the routing key. Mandatory bool // Message priority level from 1 to 5 (0 == no priority). Priority Priority // Transient (0 or 1) or Persistent (2). DeliveryMode DeliveryMode }
PublishOptions are used to control how data is published.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher is a publisher for AMQP messages.
func NewPublisher ¶
func NewPublisher(conn *Connection, options ...PublisherOption) (*Publisher, error)
Creates a new Publisher instance. Options can be passed to customize the behavior of the Publisher.
func (*Publisher) Close ¶
Close closes the Publisher.
When using the publishing cache, the publisher must be closed to clear the cache.
func (*Publisher) Publish ¶
Publish publishes a message with the publish options configured in the Publisher.
target can be a queue name for direct publishing or a routing key.
func (*Publisher) PublishCachedMessages ¶
func (*Publisher) PublishWithOptions ¶
func (p *Publisher) PublishWithOptions(ctx context.Context, targets []string, data any, options ...PublisherOption) error
PublishWithOptions publishes a message to one or multiple targets.
Targets can be a queue names for direct publishing or routing keys.
Options can be passed to override the default options just for this publish.
type PublisherOption ¶
type PublisherOption func(*PublisherOptions)
PublisherOption is an option for a Publisher.
func WithCustomPublishOptions ¶
func WithCustomPublishOptions(options *PublisherOptions) PublisherOption
WithCustomPublishOptions sets the publish options.
It can be used to set all publisher options at once.
func WithPublishOptionAppID ¶
func WithPublishOptionAppID(appID string) PublisherOption
WithPublishOptionAppID sets the application id.
func WithPublishOptionContentEncoding ¶
func WithPublishOptionContentEncoding(contentEncoding string) PublisherOption
WithPublishOptionContentEncoding sets the content encoding, i.e. "utf-8".
func WithPublishOptionContentType ¶
func WithPublishOptionContentType(contentType string) PublisherOption
WithPublishOptionContentType sets the content type, i.e. "application/json".
func WithPublishOptionDeliveryMode ¶
func WithPublishOptionDeliveryMode(deliveryMode DeliveryMode) PublisherOption
WithPublishOptionDeliveryMode sets the message delivery mode. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during broker restart. By default publishing's are transient.
func WithPublishOptionExchange ¶
func WithPublishOptionExchange(exchange string) PublisherOption
WithPublishOptionExchange sets the exchange to publish to.
func WithPublishOptionExpiration ¶
func WithPublishOptionExpiration(expiration string) PublisherOption
WithPublishOptionExpiration sets the expiry/TTL of a message. As per RabbitMq spec, it must be a. string value in milliseconds.
func WithPublishOptionHeaders ¶
func WithPublishOptionHeaders(headers Table) PublisherOption
WithPublishOptionHeaders sets message header values, i.e. "msg-id".
func WithPublishOptionMandatory ¶
func WithPublishOptionMandatory(mandatory bool) PublisherOption
WithPublishOptionMandatory sets whether the publishing is mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle.
Default: false.
func WithPublishOptionMessageID ¶
func WithPublishOptionMessageID(messageID string) PublisherOption
WithPublishOptionMessageID sets the message identifier.
func WithPublishOptionPriority ¶
func WithPublishOptionPriority(priority Priority) PublisherOption
WithPublishOptionPriority sets the content priority from 0 to 9.
func WithPublishOptionReplyTo ¶
func WithPublishOptionReplyTo(replyTo string) PublisherOption
WithPublishOptionReplyTo sets the reply to field.
func WithPublishOptionTimestamp ¶
func WithPublishOptionTimestamp(timestamp time.Time) PublisherOption
WithPublishOptionTimestamp sets the timestamp for the message.
func WithPublishOptionTracing ¶
func WithPublishOptionTracing(correlationID string) PublisherOption
WithPublishOptionTracing sets the content correlation identifier.
func WithPublishOptionType ¶
func WithPublishOptionType(messageType string) PublisherOption
WithPublishOptionType sets the message type name.
func WithPublishOptionUserID ¶
func WithPublishOptionUserID(userID string) PublisherOption
WithPublishOptionUserID sets the user id e.g. "user".
func WithPublisherOptionPublisherName ¶
func WithPublisherOptionPublisherName(publisherName string) PublisherOption
WithPublisherOptionPublisherName sets the name of the publisher.
If unset a random name will be given.
func WithPublisherOptionPublishingCache ¶
func WithPublisherOptionPublishingCache(cache PublishingCache) PublisherOption
WithPublisherOptionPublishingCache enables the publishing cache.
An implementation of the PublishingCache interface must be provided.
type PublisherOptions ¶
type PublisherOptions struct { // PublisherName is the name of the publisher. PublisherName string // PublishingCache is the publishing cache. PublishingCache PublishingCache // PublishingOptions are the options for publishing messages. PublishingOptions *PublishOptions }
PublisherOptions are the options for a publisher.
type Publishing ¶
type Publishing interface { ID() string GetTargets() []string GetData() any GetOptions() *PublishOptions }
Publishing is an interface for messages that are published to a broker.
type PublishingCache ¶
type PublishingCache interface { // Put adds a publishing to the cache. Put(p Publishing) error // PopAll gets all publishing's from the cache and removes them. PopAll() ([]Publishing, error) // Len returns the number of publishing in the cache. Len() int // Flush removes all publishing's from the cache. Flush() error }
PublishingCache is an interface for a cache of messages that could not be published due to a missing broker connection.
type QueueInfo ¶ added in v2.2.0
QueueInfo represents the current server state of a queue on the server.
type QueueOptions ¶
type QueueOptions struct { // Are used by plugins and broker-specific features such as message TTL, queue length limit, etc. Args Table // If true, the queue will survive a broker restart. Durable bool // If true, the queue is deleted when last consumer unsubscribes. AutoDelete bool // If true, the queue is used by only one connection and will be deleted when that connection closes. Exclusive bool // If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception. NoWait bool // If false, a missing queue will be created on the broker. Passive bool // If true, the queue will be declared if it does not already exist. Declare bool // contains filtered or unexported fields }
QueueOptions are used to configure a queue. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception.
type RecoveryFailedError ¶
ErrRecoveryFailed occurs when the recovery failed after a connection loss.
func (*RecoveryFailedError) Error ¶
func (e *RecoveryFailedError) Error() string
Error implements the Error method of the error interface.
type RetryOptions ¶
type RetryOptions struct { // Is used to handle the retries on a separate connection. // If not specified, a connection will be created. RetryConn *Connection // The delays which a message will be exponentially redelivered with. Delays []time.Duration // The maximum number of times a message will be redelivered. MaxRetries int64 // When enabled all retry related queues and exchanges associated when the consumer gets closed. // // Warning: Existing messages on the retry queues will be purged. Cleanup bool MaxRetriesExceededHandler MaxRetriesExceededHandler // contains filtered or unexported fields }
RetryOptions are used to describe how the retry will be configured.
type Return ¶
Return captures a flattened struct of fields returned by the broker when a publishing is unable to be delivered due to the `mandatory` flag set and no route found.
type ReturnHandler ¶
type ReturnHandler func(Return)
type SlogLogger ¶ added in v2.1.0
type SlogLogger struct {
// contains filtered or unexported fields
}
SlogLogger is a clarimq.Logger implementation that uses slog.Logger.
func NewSlogLogger ¶ added in v2.1.0
func NewSlogLogger(logger *slog.Logger) *SlogLogger
NewSlogLogger creates a new instance of SlogLogger. If a logger is not provided, it will use the default slog.Logger.
Parameters: - logger: A pointer to a slog.Logger instance. If nil, it will use the default logger.
Returns: - A new SlogLogger instance that implements the clarimq.Logger.
func (*SlogLogger) Debug ¶ added in v2.1.0
func (s *SlogLogger) Debug(ctx context.Context, msg string, attrs ...any)
Debug logs a debug message with the provided attributes.
func (*SlogLogger) Error ¶ added in v2.1.0
Error logs an error message with the provided attributes and error.
type Table ¶
Table stores user supplied fields of the following types:
bool byte float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types
Functions taking a table will immediately fail when the table contains a value of an unsupported type.
The caller must be specific in which precision of integer it wishes to encode.
Use a type assertion when reading values from a table for type conversion.
RabbitMQ expects int32 for integer values.