Documentation
¶
Index ¶
- Constants
- Variables
- func CompileSequences(table map[uint64]bool) []uint64
- func DecodeSequence(key []byte) (uint64, error)
- func DecodeSequences(value []byte) ([]uint64, error)
- func EncodeSequence(s uint64, compact bool) []byte
- func EncodeSequences(list []uint64) []byte
- func GenerateSequence(n uint32) uint64
- func JoinSequence(ts time.Time, n uint32) uint64
- func SplitSequence(s uint64) (time.Time, uint32)
- type Buffer
- type Cleaner
- type CleanerConfig
- type Consumer
- type ConsumerConfig
- type DB
- type DBConfig
- type Entry
- type Ledger
- func (l *Ledger) Delete(sequence uint64) (int, error)
- func (l *Ledger) Head() uint64
- func (l *Ledger) Index(index int) (uint64, bool, error)
- func (l *Ledger) Length() int
- func (l *Ledger) Read(sequence uint64, amount int) ([]Entry, error)
- func (l *Ledger) Subscribe(receiver chan<- uint64)
- func (l *Ledger) Tail() uint64
- func (l *Ledger) Unsubscribe(receiver chan<- uint64)
- func (l *Ledger) Write(entries ...Entry) error
- type LedgerConfig
- type Producer
- type ProducerConfig
- type Queue
- type QueueConfig
- type Table
- type TableConfig
Constants ¶
const EncodedSequenceLength = 20
EncodedSequenceLength defines the expected length of an encoded sequence.
Variables ¶
var ErrConsumerClosed = errors.New("consumer closed")
ErrConsumerClosed is yielded to callbacks if the consumer has been closed.
var ErrConsumerDeadlock = errors.New("consumer deadlock")
ErrConsumerDeadlock is returned by the consumer if the specified deadline has been reached.
var ErrInvalidSequence = errors.New("invalid sequence")
ErrInvalidSequence is yielded to callbacks if the provided sequences that has not yet been processed by the consumer.
var ErrLimitReached = errors.New("limit reached")
ErrLimitReached is returned for write attempts that go beyond the allowed ledger length.
var ErrNotMonotonic = errors.New("not monotonic")
ErrNotMonotonic is returned for write attempts that are not monotonic.
var ErrProducerClosed = errors.New("producer closed")
ErrProducerClosed is yielded to callbacks if the producer has been closed.
Functions ¶
func CompileSequences ¶ added in v0.5.0
CompileSequences will compile a list of positive sequences from the provided mark table. It will compress positive adjacent tail sequences and inject a fake positive sequence at the beginning if the first entry in the table is negative.
func DecodeSequence ¶
DecodeSequence will decode a sequence.
func DecodeSequences ¶ added in v0.2.0
DecodeSequences will decode a list of compacted sequences.
func EncodeSequence ¶
EncodeSequence will encode a sequence.
func EncodeSequences ¶ added in v0.2.0
EncodeSequences will encode a list of compacted sequences.
func GenerateSequence ¶
GenerateSequence will generate a locally monotonic sequence that consists of the current time and an ordinal number. The returned sequence is the first of n consecutive numbers and will either overflow in 2106 or if generated more than ca. 4 billion times a second.
func JoinSequence ¶
JoinSequence constructs a sequence from a 32 bit timestamp and 32 bit ordinal number.
Types ¶
type Buffer ¶ added in v0.5.0
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is a circular buffer to store entries.
func (*Buffer) Index ¶ added in v0.8.0
Index will return the entry on the specified position in the buffer. Negative indexes are counted backwards.
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner will periodically delete entries from a ledger honoring the configured retention and threshold as well as positions from the specified tables. Failed cleanings are retried and the errors yielded to the configured callback.
func NewCleaner ¶
func NewCleaner(ledger *Ledger, config CleanerConfig) *Cleaner
NewCleaner will create and return a new cleaner.
type CleanerConfig ¶ added in v0.3.0
type CleanerConfig struct { // The amount of entries to keep available in the ledger. Retention int // The maximum amount of entries to keep in the ledger. Threshold int // The interval of cleanings. Interval time.Duration // The tables to check for positions. Tables []*Table // The callback used to yield errors. Errors func(error) }
CleanerConfig is used to configure a cleaner.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer manages consuming messages of a ledger.
func NewConsumer ¶
func NewConsumer(ledger *Ledger, table *Table, config ConsumerConfig) *Consumer
NewConsumer will create and return a new consumer.
func (*Consumer) Mark ¶ added in v0.5.0
Mark will acknowledge and mark the consumption of the specified sequence. The specified callback is called with the result of the processed mark. If Skip is configured the callback might called later once the mark will be persisted. The method returns whether the mark has been successfully queued and its callback will be called with the result or an error if the consumer is closed.
type ConsumerConfig ¶ added in v0.3.0
type ConsumerConfig struct { // The name of the persistent consumer. If empty, the consumer will not // persist its positions. Name string // The start position of the consumer if not recovered from the table. Start uint64 // The channel on which available entries are sent. Entries chan<- Entry // The callback that is called with errors before the consumer dies. Errors func(error) // The amount of entries to fetch from the ledger at once. Batch int // The maximal size of the unmarked sequence range. Window int // The number of acks to skip before sequences are written to the table. Skip int // The time after skipped marks are persisted to the table. Timeout time.Duration // The time after which the consumer crashes if it cannot make progress. Deadline time.Duration }
ConsumerConfig is used to configure a consumer.
type DB ¶
DB is a generic database.
type DBConfig ¶ added in v0.3.0
type DBConfig struct { // Whether all writes should be synced. SyncWrites bool // The sink used for logging. Logger func(format string, args ...interface{}) }
DBConfig is used to configure a DB.
type Entry ¶
type Entry struct { // The entries sequence (must be greater than zero). Sequence uint64 // The entries payload that is written to disk. Payload []byte // The reference to a shared object. This can be used with cached ledgers // to retain a reference to a decoded object of the entry. Object interface{} }
Entry is a single entry in the ledger.
type Ledger ¶
type Ledger struct {
// contains filtered or unexported fields
}
Ledger manages the storage of sequential entries.
func CreateLedger ¶
func CreateLedger(db *DB, config LedgerConfig) (*Ledger, error)
CreateLedger will create a ledger that stores entries in the provided db. Read, write and delete requested can be issued concurrently to maximize performance. However, only one goroutine may write entries at the same time.
func (*Ledger) Delete ¶
Delete will remove all entries up to and including the specified sequence from the ledger.
func (*Ledger) Head ¶
Head will return the last committed sequence. This value can be checked periodically to asses whether new entries have been added.
func (*Ledger) Index ¶
Index will return the sequence of the specified index in the ledger. Negative indexes are counted backwards from the head. If the index exceeds the current length, the sequence of the last entry and false is returned. If the ledger is empty the current head (zero if unused) and false will be returned.
func (*Ledger) Read ¶
Read will read entries from and including the specified sequence up to the requested amount of entries.
func (*Ledger) Subscribe ¶
Subscribe will subscribe the specified channel to changes to the last sequence stored in the ledger. Notifications will be skipped if the specified channel is not writable for some reason.
func (*Ledger) Tail ¶ added in v0.9.0
Tail will return the last deleted sequence. This value can be checked periodically to asses whether entries haven been deleted.
func (*Ledger) Unsubscribe ¶
Unsubscribe will remove a previously subscribed receiver.
type LedgerConfig ¶ added in v0.3.0
type LedgerConfig struct { // The prefix for all ledger keys. Prefix string // The amount of entries to cache in memory. Cache int // The maximum length of the ledger. Write() will return ErrLimitReached if // the ledger is longer than this value. Limit int }
LedgerConfig is used to configure a ledger.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer provides an interface to efficiently batch entries and write them to a ledger.
func NewProducer ¶
func NewProducer(ledger *Ledger, config ProducerConfig) *Producer
NewProducer will create and return a producer.
func (*Producer) Close ¶
func (p *Producer) Close()
Close will close the producer. Unprocessed entries will be canceled and the callbacks receive ErrProducerClosed if available.
type ProducerConfig ¶ added in v0.3.0
type ProducerConfig struct { // The maximum size of the written entry batches. Batch int // The timeout after an unfinished batch is written in any case. Timeout time.Duration // The number of times a failed write due to ErrLimitReached is retried. Retry int // The time after which a failed write due to ErrLimitReached is retried. Delay time.Duration // If enabled, the producer will filter out entries that have a lower // sequence than the current ledger head. Filter bool }
ProducerConfig is used to configure a producer.
type Queue ¶ added in v0.6.0
type Queue struct {
// contains filtered or unexported fields
}
Queue is a managed ledger and table with a cleaner.
func CreateQueue ¶ added in v0.6.0
func CreateQueue(db *DB, config QueueConfig) (*Queue, error)
CreateQueue will create and return a new queue based on the provided settings.
func (*Queue) Consumer ¶ added in v0.6.0
func (q *Queue) Consumer(config ConsumerConfig) *Consumer
Consumer will create a new consumer with the provided config.
func (*Queue) Producer ¶ added in v0.6.0
func (q *Queue) Producer(config ProducerConfig) *Producer
Producer will create a new producer with the provided config.
type QueueConfig ¶ added in v0.6.0
type QueueConfig struct { // The prefix for all keys. Prefix string // The amount of ledger entries to cache in memory. LedgerCache int // The maximum length of the ledger. LedgerLimit int // Whether the table should be fully cached in memory. TableCache bool // The amount of entries to keep around. CleanRetention int // The point after which entries are dropped no matter what. CleanThreshold int // The interval of periodic cleanings. CleanInterval time.Duration // The callback used to yield cleaner errors. CleanErrors func(error) }
QueueConfig is used to configure a queue.
type Table ¶
type Table struct {
// contains filtered or unexported fields
}
Table manages the storage of positions markers.
func CreateTable ¶
func CreateTable(db *DB, config TableConfig) (*Table, error)
CreateTable will create a table that stores position markers.
type TableConfig ¶ added in v0.3.0
type TableConfig struct { // The prefix for all table keys. Prefix string // Enable to keep all positions in memory. Cache bool }
TableConfig is used to configure a table.