Documentation
¶
Index ¶
- func RandConsumerName() string
- func WithChannelOptionContext(ctx context.Context) func(options *ChannelOptions)
- func WithChannelOptionDelay(delayer DelayProvider) func(options *ChannelOptions)
- func WithChannelOptionDown(down CallbackWhenDown) func(options *ChannelOptions)
- func WithChannelOptionName(name string) func(options *ChannelOptions)
- func WithChannelOptionNotification(ch chan Event) func(options *ChannelOptions)
- func WithChannelOptionNotifyPublish(publishNotifier CallbackNotifyPublish) func(options *ChannelOptions)
- func WithChannelOptionNotifyReturn(returnNotifier CallbackNotifyReturn) func(options *ChannelOptions)
- func WithChannelOptionProcessor(proc CallbackProcessMessages) func(options *ChannelOptions)
- func WithChannelOptionRecovering(recover CallbackWhenRecovering) func(options *ChannelOptions)
- func WithChannelOptionTopology(topology []*TopologyOptions) func(options *ChannelOptions)
- func WithChannelOptionUp(up CallbackWhenUp) func(options *ChannelOptions)
- func WithChannelOptionUsageParams(params ChanUsageParameters) func(options *ChannelOptions)
- func WithConnectionOptionContext(ctx context.Context) func(options *ConnectionOptions)
- func WithConnectionOptionDelay(delayer DelayProvider) func(options *ConnectionOptions)
- func WithConnectionOptionDown(down CallbackWhenDown) func(options *ConnectionOptions)
- func WithConnectionOptionName(name string) func(options *ConnectionOptions)
- func WithConnectionOptionNotification(ch chan Event) func(options *ConnectionOptions)
- func WithConnectionOptionPassword(credentials SecretProvider) func(options *ConnectionOptions)
- func WithConnectionOptionRecovering(recover CallbackWhenRecovering) func(options *ConnectionOptions)
- func WithConnectionOptionUp(up CallbackWhenUp) func(options *ConnectionOptions)
- type CallbackNotifyPublish
- type CallbackNotifyReturn
- type CallbackProcessMessages
- type CallbackWhenDown
- type CallbackWhenRecovering
- type CallbackWhenUp
- type ChanUsageParameters
- type Channel
- func (ch *Channel) Ack(tag uint64, multiple bool) error
- func (ch *Channel) Cancel(consumer string, noWait bool) error
- func (ch *Channel) Channel() *SafeBaseChan
- func (ch *Channel) Close() error
- func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
- func (ch *Channel) ExchangeDeclareWithTopology(t *TopologyOptions) error
- func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
- func (ch *Channel) GetNextPublishSeqNo() uint64
- func (ch *Channel) IsClosed() bool
- func (ch *Channel) IsPaused() bool
- func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error
- func (ch *Channel) Name() string
- func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, ...) error
- func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, ...) (*amqp.DeferredConfirmation, error)
- func (ch *Channel) Queue() string
- func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
- func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
- func (ch *Channel) QueueDeclareWithTopology(t *TopologyOptions) (amqp.Queue, error)
- func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
- func (ch *Channel) QueueInspect(name string) (amqp.Queue, error)deprecated
- func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)
- func (ch *Channel) Reject(tag uint64, requeue bool) error
- type ChannelOptions
- type ClientType
- type ConfirmationOutcome
- type Connection
- type ConnectionOptions
- type Consumer
- type ConsumerOptions
- func (opt *ConsumerOptions) WithArgs(args amqp.Table) *ConsumerOptions
- func (opt *ConsumerOptions) WithAutoAck(autoAck bool) *ConsumerOptions
- func (opt *ConsumerOptions) WithExclusive(exclusive bool) *ConsumerOptions
- func (opt *ConsumerOptions) WithName(name string) *ConsumerOptions
- func (opt *ConsumerOptions) WithNoLocal(noLocal bool) *ConsumerOptions
- func (opt *ConsumerOptions) WithNoWait(noWait bool) *ConsumerOptions
- func (opt *ConsumerOptions) WithPrefetchCount(count int) *ConsumerOptions
- func (opt *ConsumerOptions) WithPrefetchSize(size int) *ConsumerOptions
- func (opt *ConsumerOptions) WithPrefetchTimeout(timeout time.Duration) *ConsumerOptions
- func (opt *ConsumerOptions) WithQosGlobal(global bool) *ConsumerOptions
- func (opt *ConsumerOptions) WithQueue(queue string) *ConsumerOptions
- type ConsumerUsageOptions
- type DefaultDelayer
- type DeferredConfirmation
- type DelayProvider
- type DeliveriesProperties
- type DeliveryData
- type DeliveryPayload
- type Event
- type EventType
- type OptionalError
- type PersistentNotifiers
- type Publisher
- func (p *Publisher) Available() (bool, bool)
- func (p *Publisher) AwaitAvailable(timeout, pollFreq time.Duration) bool
- func (p *Publisher) AwaitDeferredConfirmation(d *DeferredConfirmation, tmr time.Duration) *DeferredConfirmation
- func (p *Publisher) Channel() *Channel
- func (p *Publisher) Close() error
- func (p *Publisher) Publish(msg amqp.Publishing) error
- func (p *Publisher) PublishDeferredConfirm(msg amqp.Publishing) (*DeferredConfirmation, error)
- func (p *Publisher) PublishDeferredConfirmWithOptions(opt PublisherOptions, msg amqp.Publishing) (*DeferredConfirmation, error)
- func (p *Publisher) PublishWithOptions(opt PublisherOptions, msg amqp.Publishing) error
- type PublisherOptions
- func (opt *PublisherOptions) WithConfirmationNoWait(confNoWait bool) *PublisherOptions
- func (opt *PublisherOptions) WithConfirmationsCount(count int) *PublisherOptions
- func (opt *PublisherOptions) WithContext(ctx context.Context) *PublisherOptions
- func (opt *PublisherOptions) WithExchange(exchange string) *PublisherOptions
- func (opt *PublisherOptions) WithImmediate(immediate bool) *PublisherOptions
- func (opt *PublisherOptions) WithKey(key string) *PublisherOptions
- func (opt *PublisherOptions) WithMandatory(mandatory bool) *PublisherOptions
- type PublisherUsageOptions
- type SafeBaseChan
- type SafeBaseConn
- type SafeBool
- type SecretProvider
- type TopologyBind
- type TopologyOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RandConsumerName ¶ added in v0.2.0
func RandConsumerName() string
RandConsumerName creates a random string for the consumers. It is used internally by DefaultConsumerOptions by setting the 'ConsumerName' property of ConsumerOptions
func WithChannelOptionContext ¶
func WithChannelOptionContext(ctx context.Context) func(options *ChannelOptions)
WithChannelOptionContext creates a function that sets the context of a ChannelOptions struct.
It takes a context.Context as a parameter and returns a function that takes a pointer to a ChannelOptions struct. The returned function sets the ctx field of the ChannelOptions struct to the provided context.
func WithChannelOptionDelay ¶
func WithChannelOptionDelay(delayer DelayProvider) func(options *ChannelOptions)
WithChannelOptionDelay returns a function that sets the "delayer" field of the ChannelOptions struct to the given DelayProvider.
Parameters: - delayer: The DelayProvider that will be set as the "delayer" field of ChannelOptions.
Return type: A function that takes a pointer to a ChannelOptions struct as its parameter.
func WithChannelOptionDown ¶
func WithChannelOptionDown(down CallbackWhenDown) func(options *ChannelOptions)
WithChannelOptionDown returns a function that sets the callback function to be called when the channel is down.
down - The callback function to be called when the channel is down. options - The ChannelOptions object to be modified.
func WithChannelOptionName ¶
func WithChannelOptionName(name string) func(options *ChannelOptions)
WithChannelOptionName creates a function that sets the name field of the ChannelOptions struct.
It takes a string parameter 'name' and returns a function that takes a pointer to the ChannelOptions struct as a parameter.
func WithChannelOptionNotification ¶
func WithChannelOptionNotification(ch chan Event) func(options *ChannelOptions)
WithChannelOptionNotification provides an application defined Event receiver to handle various alerts about the channel status.
func WithChannelOptionNotifyPublish ¶
func WithChannelOptionNotifyPublish(publishNotifier CallbackNotifyPublish) func(options *ChannelOptions)
WithChannelOptionNotifyPublish returns a function that sets the callback function for notifying the publish event in the ChannelOptions.
It takes a single parameter: - publishNotifier: the callback function for notifying the publish event.
It returns a function that takes a pointer to ChannelOptions as a parameter.
func WithChannelOptionNotifyReturn ¶
func WithChannelOptionNotifyReturn(returnNotifier CallbackNotifyReturn) func(options *ChannelOptions)
WithChannelOptionNotifyReturn generates a function that sets the returnNotifier callback for a ChannelOptions struct.
It takes a returnNotifier parameter of type CallbackNotifyReturn which represents a function that will be called when a return value is received.
The generated function takes an options parameter of type *ChannelOptions and sets the cbNotifyReturn field to the provided returnNotifier.
func WithChannelOptionProcessor ¶ added in v0.2.0
func WithChannelOptionProcessor(proc CallbackProcessMessages) func(options *ChannelOptions)
WithChannelOptionProcessor is a function that returns a function which sets the callback process messages for the ChannelOptions struct.
The parameter `proc` is a CallbackProcessMessages function that will be assigned to the `cbProcessMessages` field of the `ChannelOptions` struct.
The return type of the returned function is `func(options *ChannelOptions)`.
func WithChannelOptionRecovering ¶
func WithChannelOptionRecovering(recover CallbackWhenRecovering) func(options *ChannelOptions)
WithChannelOptionRecovering generates a function that sets the callback function to be called when recovering from an error in the ChannelOptions.
Parameters:
- recover: a CallbackWhenRecovering function that will be called when recovering from an error in the ChannelOptions.
Returns:
- A function that takes a pointer to ChannelOptions and sets the cbReconnect field to the provided recover function.
func WithChannelOptionTopology ¶
func WithChannelOptionTopology(topology []*TopologyOptions) func(options *ChannelOptions)
WithChannelOptionTopology returns a function that sets the topology options for a channel.
The function takes a slice of TopologyOptions as a parameter, which specifies the desired topology for the channel. It returns a function that takes a pointer to a ChannelOptions struct as a parameter. The function sets the topology field of the ChannelOptions struct to the provided topology slice.
func WithChannelOptionUp ¶
func WithChannelOptionUp(up CallbackWhenUp) func(options *ChannelOptions)
WithChannelOptionUp returns a function that sets the callback function to be executed when the channel is up.
up: the callback function to be executed when the channel is up. options: the ChannelOptions to be modified.
returns: a function that modifies the ChannelOptions by setting the callback function to be executed when the channel is up.
func WithChannelOptionUsageParams ¶
func WithChannelOptionUsageParams(params ChanUsageParameters) func(options *ChannelOptions)
WithChannelOptionUsageParams returns a function that sets the implementation parameters of the ChannelOptions struct.
It takes a parameter of type ChanUsageParameters and returns a function that takes a pointer to a ChannelOptions struct.
func WithConnectionOptionContext ¶
func WithConnectionOptionContext(ctx context.Context) func(options *ConnectionOptions)
WithConnectionOptionContext stores the application provided context. Cancelling this context will terminate the recovery loop and also close down the connection (and indirectly its channel dependents).
func WithConnectionOptionDelay ¶
func WithConnectionOptionDelay(delayer DelayProvider) func(options *ConnectionOptions)
WithConnectionOptionDelay provides an application space defined delay (between re-connection attempts) policy. An example of DelayProvider could be an exponential timeout routine based on the retry parameter.
func WithConnectionOptionDown ¶
func WithConnectionOptionDown(down CallbackWhenDown) func(options *ConnectionOptions)
WithConnectionOptionDown stores the application space callback for connection down events.
func WithConnectionOptionName ¶
func WithConnectionOptionName(name string) func(options *ConnectionOptions)
WithConnectionOptionName assigns a tag to this connection.
func WithConnectionOptionNotification ¶
func WithConnectionOptionNotification(ch chan Event) func(options *ConnectionOptions)
WithConnectionOptionNotification provides an application defined Event receiver to handle various alerts about the connection status.
func WithConnectionOptionPassword ¶
func WithConnectionOptionPassword(credentials SecretProvider) func(options *ConnectionOptions)
WithConnectionOptionPassword provides password refresh capabilities for dynamically protected services (future IAM)
func WithConnectionOptionRecovering ¶
func WithConnectionOptionRecovering(recover CallbackWhenRecovering) func(options *ConnectionOptions)
WithConnectionOptionRecovering stores the application space callback for connection recovering events.
func WithConnectionOptionUp ¶
func WithConnectionOptionUp(up CallbackWhenUp) func(options *ConnectionOptions)
WithConnectionOptionUp stores the application space callback for connection established events.
Types ¶
type CallbackNotifyPublish ¶
type CallbackNotifyPublish func(confirm amqp.Confirmation, ch *Channel)
CallbackNotifyPublish defines a function type for handling the publish notifications. Applications can define their own handler and pass it via WithChannelOptionNotifyPublish.
type CallbackNotifyReturn ¶
CallbackNotifyReturn defines a function type for handling the return notifications. Applications can define their own handler and pass it via WithChannelOptionNotifyReturn.
type CallbackProcessMessages ¶ added in v0.2.0
type CallbackProcessMessages func(props *DeliveriesProperties, messages []DeliveryData, mustAck bool, ch *Channel)
CallbackProcessMessages defines a user passed function for processing the received messages. Applications can define their own handler and pass it via WithChannelOptionProcessor.
type CallbackWhenDown ¶
type CallbackWhenDown func(name string, err OptionalError) bool
CallbackWhenDown defines a function type used when connection was lost. Returns false when want aborting this connection. Pass your implementations via WithChannelOptionDown and WithConnectionOptionDown.
type CallbackWhenRecovering ¶
CallbackWhenRecovering defines a function used prior to recovering a connection. Returns false when want aborting this connection. Applications can define their own handler and pass it via WithChannelOptionRecovering and WithConnectionOptionRecovering.
type CallbackWhenUp ¶
type CallbackWhenUp func(name string)
CallbackWhenUp defines a function type used after a successful connection or channel recovery. Applications can define their own handler and pass it via WithConnectionOptionUp and WithChannelOptionUp.
type ChanUsageParameters ¶
type ChanUsageParameters struct { PublisherUsageOptions ConsumerUsageOptions }
ChanUsageParameters embeds PublisherUsageOptions and ConsumerUsageOptions. It is a private member of the ChannelOptions and cen be passed via WithChannelOptionUsageParams.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel wraps the base amqp channel by creating a managed channel.
func NewChannel ¶
func NewChannel(conn *Connection, optionFuncs ...func(*ChannelOptions)) *Channel
NewChannel creates a new managed Channel with the given Connection and optional ChannelOptions. There shouldn't be any need to have direct access and is recommended using a Consumer or Publisher instead.
The resulting channel inherits the events notifier, context and delayer from the master connection but all can be overridden by passing options. Use the 'WithChannelOption<OptionName>' for optionFuncs.
Example Usage:
chan := NewChannel(conn, WithChannelOptionName("myChannel"), WithChannelOptionDown(Down), WithChannelOptionUp(Up), WithChannelOptionRecovering(Reattempting), WithChannelOptionNotification(dataStatusChan), WithChannelOptionContext(ctx), )
Parameters:
- conn: The Connection to associate the Channel with.
- optionFuncs: An optional list of functions to modify the ChannelOptions.
Returns: A new Channel object.
func (*Channel) Channel ¶
func (ch *Channel) Channel() *SafeBaseChan
Channel returns the low level library channel for further direct access to its Super() low level channel. Use sparingly and prefer using the predefined Channel wrapping methods instead. Pair usage with the provided full [Lock][UnLock] or read [RLock][RUnlock] locking/unlocking mechanisms for safety!
func (*Channel) ExchangeDeclare ¶ added in v0.2.0
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDeclare safely wraps the base channel ExchangeDeclare Prefer using the [ExchangeDeclareWithTopology] instead; that also supports bindings, see TopologyOptions
func (*Channel) ExchangeDeclareWithTopology ¶ added in v0.2.0
func (ch *Channel) ExchangeDeclareWithTopology(t *TopologyOptions) error
ExchangeDeclareWithTopology safely declares a desired exchange as described in the parameter; see TopologyOptions
func (*Channel) ExchangeDelete ¶ added in v1.0.0
ExchangeDelete safely wraps the base channel ExchangeDelete.
func (*Channel) GetNextPublishSeqNo ¶ added in v1.0.0
GetNextPublishSeqNo safely wraps the base channel GetNextPublishSeqNo
func (*Channel) Name ¶ added in v1.1.0
Name returns the tag defined originally when creating this channel
func (*Channel) PublishWithContext ¶ added in v0.2.1
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
PublishWithContext safely wraps the base channel PublishWithContext.
func (*Channel) PublishWithDeferredConfirmWithContext ¶ added in v1.0.0
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
PublishWithDeferredConfirmWithContext safely wraps the base channel PublishWithDeferredConfirmWithContext.
func (*Channel) Queue ¶
Queue returns the active (as indicated by [IsDestination] option in topology options) queue name. Useful for finding the server assigned name.
func (*Channel) QueueDeclare ¶ added in v0.2.0
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclare safely wraps the base channel QueueDeclare. Prefer using the [QueueDeclareWithTopology] instead; that also supports bindings, see TopologyOptions
func (*Channel) QueueDeclarePassive ¶ added in v1.0.0
func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclarePassive safely wraps the base channel QueueInspect.
func (*Channel) QueueDeclareWithTopology ¶ added in v0.2.0
func (ch *Channel) QueueDeclareWithTopology(t *TopologyOptions) (amqp.Queue, error)
QueueDeclareWithTopology safely declares a desired queue as described in the parameter; see TopologyOptions
func (*Channel) QueueDelete ¶ added in v0.2.0
QueueDelete safely wraps the base channel QueueDelete.
func (*Channel) QueuePurge ¶ added in v0.2.0
QueuePurge safely wraps the base channel QueuePurge.
type ChannelOptions ¶
type ChannelOptions struct {
// contains filtered or unexported fields
}
ChannelOptions represents the options for configuring a channel.
type ClientType ¶
type ClientType int
ClientType defines the class of objects that interact with the amqp functionality. Used mostly for sending alerts about specific functionality areas.
const ( CliConnection ClientType = iota CliChannel )
func (ClientType) String ¶
func (i ClientType) String() string
type ConfirmationOutcome ¶ added in v1.1.0
type ConfirmationOutcome int
const ( ConfirmationTimeOut ConfirmationOutcome = iota // no timely response ConfirmationClosed // data confirmation channel is closed ConfirmationDisabled // base channel has not been put into confirm mode ConfirmationPrevious // lower sequence number than expected ConfirmationACK // ACK (publish confirmed) ConfirmationNAK // NAK (publish negative acknowledgement) )
func (ConfirmationOutcome) String ¶ added in v1.1.0
func (i ConfirmationOutcome) String() string
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection wraps a SafeBaseConn with additional attributes (impl. details: rabbit URL, ConnectionOptions and a cancelling context). Applications should obtain a connection using NewConnection.
func NewConnection ¶
func NewConnection(address string, config amqp.Config, optionFuncs ...func(*ConnectionOptions)) *Connection
NewConnection creates a new managed Connection object with the given address, configuration, and option functions.
Example Usage:
conn := NewConnection("amqp://guest:guest@localhost:5672/", amqp.Config{}, WithConnectionOptionContext(context.Background(), WithConnectionOptionName("default"), WithConnectionOptionDown(Down), WithConnectionOptionUp(Up), WithConnectionOptionRecovering(Reattempting), WithConnectionOptionNotification(connStatusChan), )
Parameters:
- address: the address of the connection.
- config: the AMQP configuration.
- optionFuncs: variadic option functions to customize the connection options.
Returns: a new Connection object.
func (*Connection) Channel ¶
func (conn *Connection) Channel() (*amqp.Channel, error)
Channel safely wraps the amqp connection Channel() function.
func (*Connection) Close ¶
func (conn *Connection) Close() error
Close safely wraps the amqp connection Close and terminates the maintenance loop.
func (*Connection) Connection ¶
func (conn *Connection) Connection() *SafeBaseConn
Connection returns the safe base connection and thus indirectly the low level library connection.
func (*Connection) IsBlocked ¶
func (conn *Connection) IsBlocked() bool
IsBlocked returns the TCP flow status of the base connection.
func (*Connection) IsClosed ¶
func (conn *Connection) IsClosed() bool
IsClosed safely wraps the amqp connection IsClosed
type ConnectionOptions ¶
type ConnectionOptions struct {
// contains filtered or unexported fields
}
ConnectionOptions defines a collection of attributes used internally by the Connection.
Attributes can be set via optionFuncs parameters of NewConnection via WithConnectionOption<Fct> family, ex: WithConnectionOptionDown, WithConnectionOptionContext, WithConnectionOptionNotification.
type Consumer ¶ added in v0.2.0
type Consumer struct {
// contains filtered or unexported fields
}
Consumer implements an object allowing calling applications to receive messages on already established connections. Create a consumer instance by calling NewConsumer.
func NewConsumer ¶ added in v0.2.0
func NewConsumer(conn *Connection, opt ConsumerOptions, optionFuncs ...func(*ChannelOptions)) *Consumer
NewConsumer creates a consumer with the desired options and then starts consuming. It creates and opens a new dedicated Channel using the passed shared connection. NOTE: It's advisable to use separate connections for Channel.Publish and Channel.Consume
func (*Consumer) Available ¶ added in v0.2.0
Available returns the status of both the underlying connection and channel.
func (*Consumer) AwaitAvailable ¶ added in v0.2.0
AwaitAvailable waits till the consumer infrastructure is ready or timeout expires. Useful when the connections and channels are about being created or recovering. When passing zero value parameter the defaults used are 7500ms for timeout and 330 ms for polling frequency.
func (*Consumer) Cancel ¶ added in v0.2.0
Cancel wraps safely the base consumer channel cancellation.
func (*Consumer) Channel ¶ added in v0.2.0
Channel returns the managed Channel which can be further used to extract SafeBaseChan
type ConsumerOptions ¶ added in v0.2.0
type ConsumerOptions struct {
ConsumerUsageOptions
}
func DefaultConsumerOptions ¶ added in v0.2.0
func DefaultConsumerOptions() ConsumerOptions
DefaultConsumerOptions creates some sane defaults for consuming messages.
func (*ConsumerOptions) WithArgs ¶ added in v0.2.0
func (opt *ConsumerOptions) WithArgs(args amqp.Table) *ConsumerOptions
WithArgs sets the arguments for the consumer options.
args: The arguments to be set. Returns: The updated consumer options.
func (*ConsumerOptions) WithAutoAck ¶ added in v0.2.0
func (opt *ConsumerOptions) WithAutoAck(autoAck bool) *ConsumerOptions
WithAutoAck sets the ConsumerAutoAck field of the ConsumerOptions struct to the provided boolean value.
autoAck: A boolean value indicating whether the consumer should automatically acknowledge messages.
*ConsumerOptions: A pointer to the ConsumerOptions struct. Returns: A pointer to the updated ConsumerOptions struct.
func (*ConsumerOptions) WithExclusive ¶ added in v0.2.0
func (opt *ConsumerOptions) WithExclusive(exclusive bool) *ConsumerOptions
WithExclusive sets the exclusive flag for the ConsumerOptions.
exclusive: a boolean indicating whether the ConsumerOptions should be exclusive. Returns a pointer to the updated ConsumerOptions.
func (*ConsumerOptions) WithName ¶ added in v0.2.0
func (opt *ConsumerOptions) WithName(name string) *ConsumerOptions
WithName sets the name of the ConsumerOptions.
name: the name to set for the ConsumerOptions. return: the updated ConsumerOptions.
func (*ConsumerOptions) WithNoLocal ¶ added in v0.2.0
func (opt *ConsumerOptions) WithNoLocal(noLocal bool) *ConsumerOptions
WithNoLocal sets the ConsumerNoLocal field of the ConsumerOptions struct.
It takes a boolean parameter named noLocal. It returns a pointer to the ConsumerOptions struct.
func (*ConsumerOptions) WithNoWait ¶ added in v0.2.0
func (opt *ConsumerOptions) WithNoWait(noWait bool) *ConsumerOptions
WithNoWait sets the ConsumerNoWait field of ConsumerOptions struct and returns the modified ConsumerOptions object.
Parameters: - noWait: a boolean value indicating whether the consumer should wait or not.
Return type: - *ConsumerOptions: the modified ConsumerOptions object.
func (*ConsumerOptions) WithPrefetchCount ¶ added in v0.2.0
func (opt *ConsumerOptions) WithPrefetchCount(count int) *ConsumerOptions
WithPrefetchCount sets the prefetch count for the ConsumerOptions.
count: the number of messages to prefetch. returns: a pointer to the updated ConsumerOptions.
func (*ConsumerOptions) WithPrefetchSize ¶ added in v0.2.0
func (opt *ConsumerOptions) WithPrefetchSize(size int) *ConsumerOptions
WithPrefetchSize sets the prefetch size for the ConsumerOptions struct.
It takes an integer `size` as a parameter and sets the PrefetchSize field of the ConsumerOptions struct to that value. It returns a pointer to the modified ConsumerOptions struct.
func (*ConsumerOptions) WithPrefetchTimeout ¶ added in v0.2.0
func (opt *ConsumerOptions) WithPrefetchTimeout(timeout time.Duration) *ConsumerOptions
WithPrefetchTimeout sets the prefetch timeout for the ConsumerOptions struct.
timeout - The duration of the prefetch timeout. Returns the updated ConsumerOptions struct.
func (*ConsumerOptions) WithQosGlobal ¶ added in v0.2.0
func (opt *ConsumerOptions) WithQosGlobal(global bool) *ConsumerOptions
WithQosGlobal sets the global QoS option for the ConsumerOptions struct.
It takes a boolean value, `global`, to determine whether the QoS option should be set globally. The function returns a pointer to the updated ConsumerOptions struct.
func (*ConsumerOptions) WithQueue ¶ added in v0.2.0
func (opt *ConsumerOptions) WithQueue(queue string) *ConsumerOptions
WithQueue sets the consumer queue for the ConsumerOptions struct.
queue: the name of the queue. returns: the updated ConsumerOptions struct.
type ConsumerUsageOptions ¶ added in v0.2.0
type ConsumerUsageOptions struct { IsConsumer bool // indicates if this chan is used for consuming ConsumerName string // chanel wide consumers unique identifier PrefetchTimeout time.Duration // how long to wait for PrefetchCount messages to arrive PrefetchCount int // Qos count PrefetchSize int // Qos payload size QosGlobal bool // all future channels ConsumerQueue string // queue name from which to receive. Overridden by engine assigned name. ConsumerAutoAck bool // see [amqp.Consume] ConsumerExclusive bool // see [amqp.Consume] ConsumerNoLocal bool // see [amqp.Consume] ConsumerNoWait bool // see [amqp.Consume] ConsumerArgs amqp.Table // core properties }
ConsumerUsageOptions defines parameters for driving the consumers behavior and indicating to the supporting channel to start consuming.
type DefaultDelayer ¶
DefaultDelayer allows defining a basic (constant) delay policy. The implementation defaults used by new connections and channels has a value of 7.5 seconds.
type DeferredConfirmation ¶ added in v1.1.0
type DeferredConfirmation struct { *amqp.DeferredConfirmation // wrapped low level confirmation Outcome ConfirmationOutcome // acknowledgment received stats RequestSequence uint64 // sequence of the original request (GetNextPublishSeqNo) ChannelName string // channel name of the publisher Queue string // queue name of the publisher }
DeferredConfirmation wraps amqp.DeferredConfirmation with additional data. It inherits (by embedding) all original fields and functonality from the amqp object.
type DelayProvider ¶
DelayProvider allows passing a bespoke method for providing the delay policy for waiting between reconnection attempts. See WithConnectionOptionDelay, WithChannelOptionDelay. TIP: one could pass en exponential delayer derived from the 'retry' counter.
type DeliveriesProperties ¶ added in v0.2.0
type DeliveriesProperties struct { Headers amqp.Table // Application or header exchange table // Properties; assume all are common ContentType string // MIME content type ContentEncoding string // MIME content encoding DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) Priority uint8 // queue implementation use - 0 to 9 ConsumerTag string // client tag as provided during consumer registration Exchange string // basic.publish exchange RoutingKey string // basic.publish routing key }
DeliveriesProperties captures the common attributes of multiple commonly grouped (i.e. received over same channel in one go) deliveries. It is an incomplete amqp.Delivery
func DeliveryPropsFrom ¶ added in v1.0.0
func DeliveryPropsFrom(d *amqp.Delivery) (prop DeliveriesProperties)
DeliveryPropsFrom generates a DeliveriesProperties struct from an amqp.Delivery.
Takes a pointer to an amqp.Delivery as the parameter and returns a DeliveriesProperties struct.
type DeliveryData ¶ added in v1.0.0
type DeliveryData struct { Body DeliveryPayload // actual data payload DeliveryTag uint64 // sequential number of this message Redelivered bool // message has been re-enqueued Expiration string // message expiration spec MessageId string // message identifier Timestamp time.Time // message timestamp Type string // message type name UserId string // user of the publishing connection AppId string // application id }
DeliveryData isolates the data part of each specific delivered message
func DeliveryDataFrom ¶ added in v1.0.0
func DeliveryDataFrom(d *amqp.Delivery) (data DeliveryData)
DeliveryDataFrom creates a DeliveryData object from an amqp.Delivery object.
It takes a pointer to an amqp.Delivery object as its parameter and returns a DeliveryData object.
type DeliveryPayload ¶ added in v0.2.0
type DeliveryPayload []byte
DeliveryPayload subtypes the actual content of deliveries
type Event ¶
type Event struct { SourceType ClientType // origin type SourceName string // origin tag TargetName string // affected tag Kind EventType // type of event Err OptionalError // low level error }
Event defines a simple body structure for the alerts received via the notification channels passed in WithChannelOptionNotification and WithConnectionOptionNotification.
type EventType ¶
type EventType int
EventType defines the class of alerts sent to the application layer.
type OptionalError ¶ added in v0.2.0
type OptionalError struct {
// contains filtered or unexported fields
}
func SomeErrFromError ¶ added in v0.2.0
func SomeErrFromError(err error, isSet bool) OptionalError
SomeErrFromError creates an OptionalError struct with the given error and isSet values.
Parameters: - err: The error to be assigned to the OptionalError struct. - isSet: A boolean value indicating whether the error is set or not.
Return: - OptionalError: The OptionalError struct with the assigned error and isSet values.
func SomeErrFromString ¶ added in v0.2.0
func SomeErrFromString(text string) OptionalError
SomeErrFromString creates an OptionalError from the specified text.
Parameters: - text: the string to create the error from.
Return type: - OptionalError: the created OptionalError.
func (OptionalError) Error ¶ added in v0.2.0
func (e OptionalError) Error() string
Error returns the error string representation of the OptionalError.
It calls the Or method on the OptionalError to get the error value and returns its Error method.
func (OptionalError) IsSet ¶ added in v0.2.0
func (e OptionalError) IsSet() bool
IsSet returns a boolean value indicating whether the OptionalError is set.
This function does not take any parameters. It returns a boolean value.
func (OptionalError) Or ¶ added in v0.2.0
func (e OptionalError) Or(err error) error
Or returns the optional error if it is set, otherwise it returns the provided error.
err - The error to return if the optional error is not set. error - The optional error.
type PersistentNotifiers ¶
type PersistentNotifiers struct { Published chan amqp.Confirmation // publishing confirmation Returned chan amqp.Return // returned messages Flow chan bool // flow control Closed chan *amqp.Error // channel closed Cancel chan string // channel cancelled Consumer <-chan amqp.Delivery // message intake }
PersistentNotifiers are channels that have the lifespan of the channel. Only need refreshing when recovering.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher implements an object allowing calling applications to publish messages on already established connections. Create a publisher instance by calling NewPublisher.
func NewPublisher ¶
func NewPublisher(conn *Connection, opt PublisherOptions, optionFuncs ...func(*ChannelOptions)) *Publisher
NewPublisher creates a publisher with the desired options. It creates and opens a new dedicated Channel using the passed shared connection.
func (*Publisher) Available ¶
Available returns the status of both the underlying connection and channel.
func (*Publisher) AwaitAvailable ¶
AwaitAvailable waits till the publisher infrastructure is ready or timeout expires. Useful when the connections and channels are about being created or recovering. When passing zero value parameter the defaults used are 7500ms for timeout and 330 ms for polling frequency.
func (*Publisher) AwaitDeferredConfirmation ¶ added in v1.1.1
func (p *Publisher) AwaitDeferredConfirmation(d *DeferredConfirmation, tmr time.Duration) *DeferredConfirmation
AwaitDeferredConfirmation waits for the confirmation of a deferred action and updates its outcome.
It takes in a deferred confirmation object and a time duration for the timeout. It returns the updated deferred confirmation object.
func (*Publisher) Channel ¶ added in v0.2.0
Channel returns the managed Channel which can be further used to extract SafeBaseChan
func (*Publisher) Publish ¶
func (p *Publisher) Publish(msg amqp.Publishing) error
Publish wraps the amqp.PublishWithContext using the internal PublisherOptions cached when the publisher was created.
func (*Publisher) PublishDeferredConfirm ¶ added in v1.1.0
func (p *Publisher) PublishDeferredConfirm(msg amqp.Publishing) (*DeferredConfirmation, error)
PublishDeferredConfirm wraps the amqp.PublishWithDeferredConfirmWithContext using the internal PublisherOptions cached when the publisher was created.
func (*Publisher) PublishDeferredConfirmWithOptions ¶ added in v1.1.0
func (p *Publisher) PublishDeferredConfirmWithOptions(opt PublisherOptions, msg amqp.Publishing) (*DeferredConfirmation, error)
PublishDeferredConfirmWithOptions wraps the amqp.PublishWithDeferredConfirmWithContext using the passed options.
func (*Publisher) PublishWithOptions ¶
func (p *Publisher) PublishWithOptions(opt PublisherOptions, msg amqp.Publishing) error
PublishWithOptions wraps the amqp.PublishWithContext using the passed options.
type PublisherOptions ¶
type PublisherOptions struct { PublisherUsageOptions Context context.Context // controlling environment Exchange string // routing exchange Key string // routing key (usually queue name) Mandatory bool // delivery is mandatory Immediate bool // delivery is immediate }
PublisherOptions defines publisher specific parameters. Mostly used as defaults for sending messages and inner channel functionality.
func DefaultPublisherOptions ¶
func DefaultPublisherOptions() PublisherOptions
DefaultPublisherOptions creates some sane defaults for publishing messages. Note: The Message/payload itself must still be an amqp.Publishing object, fully under application's control.
func (*PublisherOptions) WithConfirmationNoWait ¶
func (opt *PublisherOptions) WithConfirmationNoWait(confNoWait bool) *PublisherOptions
WithConfirmationNoWait sets the ConfirmationNoWait field of the PublisherOptions struct.
It takes a boolean parameter `confNoWait` and updates the `ConfirmationNoWait` field of the `PublisherOptions` struct to the value of `confNoWait`. It returns a pointer to the `PublisherOptions` struct.
func (*PublisherOptions) WithConfirmationsCount ¶
func (opt *PublisherOptions) WithConfirmationsCount(count int) *PublisherOptions
WithConfirmationsCount sets the number of confirmations required for publishing.
count: The number of confirmations required. *PublisherOptions: The updated PublisherOptions object.
func (*PublisherOptions) WithContext ¶
func (opt *PublisherOptions) WithContext(ctx context.Context) *PublisherOptions
WithContext sets the context for the PublisherOptions.
ctx: The context to be set. Returns: A pointer to PublisherOptions.
func (*PublisherOptions) WithExchange ¶
func (opt *PublisherOptions) WithExchange(exchange string) *PublisherOptions
WithExchange sets the exchange for the PublisherOptions struct.
Parameters: - exchange: The exchange to set.
Returns: - *PublisherOptions: The updated PublisherOptions struct.
func (*PublisherOptions) WithImmediate ¶
func (opt *PublisherOptions) WithImmediate(immediate bool) *PublisherOptions
WithImmediate sets the immediate flag of the PublisherOptions struct.
It takes a boolean parameter `immediate` and returns a pointer to the updated PublisherOptions.
func (*PublisherOptions) WithKey ¶
func (opt *PublisherOptions) WithKey(key string) *PublisherOptions
WithKey sets the key for the PublisherOptions.
key: the key to set. returns: a pointer to the PublisherOptions.
func (*PublisherOptions) WithMandatory ¶
func (opt *PublisherOptions) WithMandatory(mandatory bool) *PublisherOptions
WithMandatory sets the mandatory flag in the PublisherOptions struct.
Parameters: - mandatory: a boolean indicating whether the field should be mandatory.
Returns: - *PublisherOptions: a pointer to the PublisherOptions struct.
type PublisherUsageOptions ¶ added in v0.2.0
type PublisherUsageOptions struct { ConfirmationCount int // size of publishing confirmations over the amqp channel ConfirmationNoWait bool // publisher confirmation mode parameter IsPublisher bool // indicates if this chan is used for publishing }
PublisherUsageOptions defines parameters for driving the publishers behavior and indicating to the supporting channel that publishing operations are enabled.
type SafeBaseChan ¶
type SafeBaseChan struct {
// contains filtered or unexported fields
}
SafeBaseChan wraps in a concurrency safe way the low level amqp.Channel.
func (*SafeBaseChan) IsSet ¶
func (c *SafeBaseChan) IsSet() bool
IsSet checks whether the SafeBaseChan's super field is set.
It does this by acquiring a read lock on the SafeBaseChan's mutex and then deferring its release.
Returns true if the super field is not nil, false otherwise.
func (*SafeBaseChan) Lock ¶ added in v0.2.0
func (c *SafeBaseChan) Lock()
Lock acquires locking of the low level channel [Super] for amqp operations. Use sparingly and fast as this locks-out the channel recovery!
func (*SafeBaseChan) RLock ¶ added in v1.0.0
func (c *SafeBaseChan) RLock()
RLock acquires read locking of the low level channel [Super] for amqp operations. Use sparingly and fast as this locks-out the channel recovery!
func (*SafeBaseChan) RUnlock ¶ added in v1.0.0
func (c *SafeBaseChan) RUnlock()
RUnlock releases the low level channel [Super] read lock.
func (*SafeBaseChan) Super ¶
func (c *SafeBaseChan) Super() *amqp.Channel
Super returns the low level amqp channel for direct interactions. Use sparingly and prefer using the predefined Channel wrapping methods instead. Pair usage with the locking/unlocking routines for safety!
func (*SafeBaseChan) UnLock ¶ added in v0.2.0
func (c *SafeBaseChan) UnLock()
UnLock releases the low level channel [Super] lock.
type SafeBaseConn ¶
type SafeBaseConn struct {
// contains filtered or unexported fields
}
SafeBaseConn wraps in a concurrency safe way the low level amqp.Connection.
func (*SafeBaseConn) IsSet ¶
func (c *SafeBaseConn) IsSet() bool
IsSet tests if the low level amqp connection is set.
func (*SafeBaseConn) Lock ¶ added in v0.2.0
func (c *SafeBaseConn) Lock()
Lock acquires locking of the low level connection [Super] for amqp operations. Use sparingly and fast as this locks-out the channel recovery!
func (*SafeBaseConn) Super ¶
func (c *SafeBaseConn) Super() *amqp.Connection
Super returns the low level amqp connection for direct interactions. Use sparingly and prefer using the predefined Connection wrapping methods instead. Pair usage with the locking/unlocking routines for safety!
func (*SafeBaseConn) UnLock ¶ added in v0.2.0
func (c *SafeBaseConn) UnLock()
UnLock releases the low level connection [Super] lock.
type SafeBool ¶
type SafeBool struct {
// contains filtered or unexported fields
}
SafeBool wraps a boolean in a concurrency safe way so it can be set, reset and tested from different coroutines.
type SecretProvider ¶
SecretProvider allows passing a bespoke method for providing the secret required when connecting to the Rabbit engine. See WithConnectionOptionPassword.
type TopologyBind ¶
type TopologyBind struct { Enabled bool // want this re-routing Peer string // other end of routing Key string // routing key / filter NoWait bool // re-routing confirmation required Args amqp.Table // core properties }
TopologyBind defines the possible binding relation between exchanges or queues and exchanges.
type TopologyOptions ¶
type TopologyOptions struct { Name string // tag of exchange or queue IsDestination bool // end target, i.e. if messages should be routed to it IsExchange bool // indicates if this an exchange or queue Bind TopologyBind // complex routing Kind string // empty string for default exchange or: direct, topic, fanout, headers. Durable bool // maps the durable amqp attribute AutoDelete bool // maps the auto-delete amqp attribute Exclusive bool // if queue is exclusive Internal bool // NoWait bool // // maps the noWait amqp attribute Passive bool // if false, it will be created on the server when missing Args amqp.Table // wraps the amqp Table parameters Declare bool // gets created on start and also during recovery if Durable is false }
TopologyOptions defines the infrastructure topology, i.e. exchange and queues definition when wanting handling automatically on recovery or one time creation
func (*TopologyOptions) GetRouting ¶ added in v0.2.0
func (t *TopologyOptions) GetRouting() (source, destination string)
GetRouting returns the source and destination strings for the TopologyOptions struct.
The source and destination strings are determined based on whether IsDestination is true or false.
- if IsDestination is true, the source string is set to t.Bind.Peer and the destination string is set to t.Name.
- if IsDestination is false, the source string is set to t.Name and the destination string is set to t.Bind.Peer.
Returns the source and destination strings.
Source Files
¶
- channel.go
- channel_consume.go
- channel_notifiers.go
- channel_options.go
- channel_safebase.go
- channel_safewrappers.go
- clienttype_string.go
- confirmationoutcome_string.go
- connection.go
- connection_options.go
- connection_safebase.go
- connection_safewrappers.go
- consumer.go
- consumer_options.go
- error.go
- events.go
- eventtype_string.go
- general_options.go
- publisher.go
- publisher_options.go
- topology_options.go