emmq

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2022 License: MIT Imports: 9 Imported by: 0

README

emmq

Build Status codecov Go Report Card

emmq is an embedded message queue backed by BadgerDB. It focusses on durability of messages in favour of strict ordering, but will generally achieve FIFO behaviour with a couple of caveats noted below.

The package was built to provide a simple message queue for embedded applications and as a learning exercise for BadgerDB.

Getting started

go get github.com/stevecallear/emmq@latest
e, err := emmq.Open("messages")
if err != nil {
    log.Fatal(err)
}
defer e.Close()

q, err := e.Declare("queue")
if err != nil {
    log.Fatal(err)
}

q.Bind("topic")

c, err := q.Consume(context.Background())
if err != nil {
    log.Fatal(err)
}

if err = e.Publish("topic", []byte("value")); err != nil {
    log.Fatal(err)
}

d := <-c
log.Print(string(d.Value))

if err = d.Delete(); err != nil {
    log.Fatal(err)
}

Message delivery

Messages are published for a specific topic. Each topic can be bound to multiple queues and a single queue can be bound to multiple topics. If no queues are bound to a specific topic, then published messages will be discarded.

All published messages are persisted prior to sending to consuming channels. If bound queues have not been configured to consume messages, then they will be persisted for immediate consumption. If bound queues have been configured to consume messages then they will be persisted with the configured visibility timeout and immediately sent to the consuming channel.

Messages can be delayed by using WithDelay or can be forced to wait for the next poll interval using WithWait.

Delivered messages that have been processed should be deleted by calling Delete. If the message is not deleted then it will be delivered again once the visibility timeout has expired.

Delivery order

Messages are generally sent to consumer channels in FIFO order to nanosecond precision. If multiple messages are published within the same nanosecond then consume order will be random for those messages only.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithDelay

func WithDelay(d time.Duration) func(*PublishOptions)

WithDelay delays message consumption until the first poll interval after the specified duration

func WithPolling

func WithPolling(interval time.Duration, batchSize int) func(*Options)

WithPolling configures the exchange poll interval and batch size

func WithWait

func WithWait() func(*PublishOptions)

WithWait delays message consumption until the next poll interval

Types

type Delivery

type Delivery struct {
	Key   Key
	Value []byte
	// contains filtered or unexported fields
}

Delivery represents a message delivery

func (*Delivery) Delete added in v0.3.0

func (d *Delivery) Delete() error

Delete deletes the message If a message is not deleted then it will be re-delivered after the visibility timeout

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange represents a message exchange

func Open

func Open(path string, optFns ...func(*Options)) (*Exchange, error)

Open opens a new exchange with the specified options

func (*Exchange) Close

func (e *Exchange) Close() error

Close closes the exchange and the underlying database

func (*Exchange) Declare added in v0.3.0

func (e *Exchange) Declare(queue string) (Queue, error)

Declare declares a new queue with the specified name

func (*Exchange) Publish

func (e *Exchange) Publish(topic string, value []byte, optFns ...func(*PublishOptions)) error

Publish publishes the specified message If no queues are bound to the topic then the message will be discarded

func (*Exchange) PurgeAll added in v0.3.0

func (e *Exchange) PurgeAll() error

PurgeAll purges all messages in all queues

type Key

type Key []byte

Key represents a message key

func NewKey

func NewKey(prefix string, dueAt time.Time) (Key, error)

NewKey returns a new key

func (Key) Delay added in v0.3.0

func (k Key) Delay(d time.Duration) Key

Delay returns a copy of the key with the specified due at delay

func (Key) DueAt

func (k Key) DueAt() time.Time

DueAt returns the due at time for the key

func (Key) Prefix added in v0.3.0

func (k Key) Prefix() []byte

Prefix returns the key prefix

func (Key) UUID

func (k Key) UUID() uuid.UUID

UUID returns the uuid for the key

type Options

type Options struct {
	PollInterval      time.Duration
	PollBatchSize     int
	VisibilityTimeout time.Duration
	BadgerOptions     badger.Options
	OnError           func(error)
}

Options represents exchange options

type PublishOptions

type PublishOptions struct {
	Wait  bool
	Delay time.Duration
}

PublishOptions represents message publish options

type Queue added in v0.3.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a message queue

func (Queue) Bind added in v0.3.0

func (q Queue) Bind(topics ...string)

Bind binds the queue to the specified topics A topic may be bound to multiple queues

func (Queue) Consume added in v0.3.0

func (q Queue) Consume(ctx context.Context) (<-chan Delivery, error)

Consume consumes queue messages

func (Queue) Purge added in v0.3.0

func (q Queue) Purge() error

Purge purges all queue messages

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL