Documentation
¶
Index ¶
- Variables
- func MergeContext(a, b context.Context) context.Context
- type EventConstraint
- type GetSequenceFunc
- type Option
- func WithCoreStoredEventsSize(size uint64) Option
- func WithDBProcessorErrorRetryTimer(d time.Duration) Option
- func WithDBProcessorRetryTimer(d time.Duration) Option
- func WithErrorLogger(fn func(err error)) Option
- func WithGetLastEventsLimit(limit uint64) Option
- func WithGetUnprocessedEventsLimit(limit uint64) Option
- func WithLogger(logger *zap.Logger) Option
- type Repository
- type RetentionJob
- type RetentionOption
- type RetentionRepository
- type RetryConsumer
- type RetryConsumerOption
- type Runner
- type SetSequenceFunc
- type Subscriber
- type SubscriberOption
- type Timer
Constants ¶
This section is empty.
Variables ¶
var ErrEventNotFound = errors.New("not found any events from a sequence")
ErrEventNotFound when select from events table not find events >= sequence (because of retention)
Functions ¶
Types ¶
type EventConstraint ¶ added in v0.3.0
type EventConstraint interface { // GetID returns the event id GetID() uint64 // GetSequence returns the event sequence number, = 0 if sequence is null GetSequence() uint64 // GetSize returns the approximate size (in bytes) of the event, for limit batch size by event data size // using WithSubscriberSizeLimit for configuring this limit GetSize() uint64 }
EventConstraint a type constraint for event
type GetSequenceFunc ¶ added in v0.5.0
GetSequenceFunc ...
type Option ¶
type Option func(opts *eventxOptions)
Option for configuration
func WithCoreStoredEventsSize ¶
WithCoreStoredEventsSize configures the size of stored events
func WithDBProcessorErrorRetryTimer ¶
WithDBProcessorErrorRetryTimer configures retry timer duration
func WithDBProcessorRetryTimer ¶
WithDBProcessorRetryTimer configures retry timer duration
func WithErrorLogger ¶ added in v0.5.1
WithErrorLogger configures callback func for errors
func WithGetLastEventsLimit ¶
WithGetLastEventsLimit configures GetLastEvents limit
func WithGetUnprocessedEventsLimit ¶
WithGetUnprocessedEventsLimit configures GetUnprocessedEvents limit
type Repository ¶
type Repository[E EventConstraint] interface { // GetLastEvents returns top *limit* events (events with the highest sequence numbers), // by sequence number in ascending order, ignore events with null sequence number GetLastEvents(ctx context.Context, limit uint64) ([]E, error) // GetUnprocessedEvents returns list of events with the smallest event *id* (not sequence number) // *AND* have NULL sequence numbers, in ascending order of event *id* // size of the list is limited by *limit* GetUnprocessedEvents(ctx context.Context, limit uint64) ([]E, error) // GetEventsFrom returns list of events with sequence number >= *from* // in ascending order of event sequence numbers, ignoring events with null sequence numbers // size of the list is limited by *limit* GetEventsFrom(ctx context.Context, from uint64, limit uint64) ([]E, error) // UpdateSequences updates only sequence numbers of *events* UpdateSequences(ctx context.Context, events []E) error }
Repository for accessing database, MUST be thread safe
type RetentionJob ¶ added in v0.4.0
type RetentionJob[E EventConstraint] struct { // contains filtered or unexported fields }
RetentionJob ...
func NewRetentionJob ¶ added in v0.4.0
func NewRetentionJob[E EventConstraint]( runner *Runner[E], repo RetentionRepository, options ...RetentionOption, ) *RetentionJob[E]
NewRetentionJob ...
func (*RetentionJob[E]) RunJob ¶ added in v0.4.0
func (j *RetentionJob[E]) RunJob(ctx context.Context)
RunJob will stop when the context object is cancelled / deadline exceeded
type RetentionOption ¶ added in v0.4.0
type RetentionOption func(opts *retentionOptions)
RetentionOption ...
func WithDeleteBatchSize ¶ added in v0.4.0
func WithDeleteBatchSize(size uint64) RetentionOption
WithDeleteBatchSize specifies number events to be deleted with DeleteEventsBefore() method
func WithMaxTotalEvents ¶ added in v0.4.0
func WithMaxTotalEvents(maxSize uint64) RetentionOption
WithMaxTotalEvents keep the number of events not more than *maxSize*
func WithRetentionErrorLogger ¶ added in v0.4.0
func WithRetentionErrorLogger(logger func(err error)) RetentionOption
WithRetentionErrorLogger config the error logger
func WithRetentionErrorRetryDuration ¶ added in v0.4.0
func WithRetentionErrorRetryDuration(d time.Duration) RetentionOption
WithRetentionErrorRetryDuration config the retry duration
type RetentionRepository ¶ added in v0.4.0
type RetentionRepository interface { // GetMinSequence returns the min sequence number of all events (except events with null sequence numbers) // returns null if no events with sequence number existed GetMinSequence(ctx context.Context) (sql.NullInt64, error) // DeleteEventsBefore deletes events with sequence number < *beforeSeq* DeleteEventsBefore(ctx context.Context, beforeSeq uint64) error }
RetentionRepository for delete old events
type RetryConsumer ¶ added in v0.5.0
type RetryConsumer[E EventConstraint] struct { // contains filtered or unexported fields }
RetryConsumer ...
func NewRetryConsumer ¶ added in v0.5.0
func NewRetryConsumer[E EventConstraint]( runner *Runner[E], repo Repository[E], getSequence GetSequenceFunc, setSequence SetSequenceFunc, handler func(ctx context.Context, events []E) error, options ...RetryConsumerOption, ) *RetryConsumer[E]
NewRetryConsumer ...
func (*RetryConsumer[E]) RunConsumer ¶ added in v0.5.0
func (c *RetryConsumer[E]) RunConsumer(ctx context.Context)
RunConsumer until the ctx is finished
type RetryConsumerOption ¶ added in v0.5.0
type RetryConsumerOption func(conf *retryConsumerConfig)
RetryConsumerOption ...
func WithConsumerRetryDuration ¶ added in v0.5.0
func WithConsumerRetryDuration(d time.Duration) RetryConsumerOption
WithConsumerRetryDuration ...
func WithRetryConsumerErrorLogger ¶ added in v0.5.0
func WithRetryConsumerErrorLogger(logger func(err error)) RetryConsumerOption
WithRetryConsumerErrorLogger ...
func WithRetryConsumerFetchLimit ¶ added in v0.5.0
func WithRetryConsumerFetchLimit(limit uint64) RetryConsumerOption
WithRetryConsumerFetchLimit ...
type Runner ¶
type Runner[E EventConstraint] struct { // contains filtered or unexported fields }
Runner for running event handling
func NewRunner ¶
func NewRunner[E EventConstraint]( repo Repository[E], setSequence func(event *E, seq uint64), options ...Option, ) *Runner[E]
NewRunner creates a Runner
func (*Runner[E]) NewSubscriber ¶
func (r *Runner[E]) NewSubscriber(from uint64, fetchLimit uint64, options ...SubscriberOption) *Subscriber[E]
NewSubscriber creates a subscriber
type SetSequenceFunc ¶ added in v0.5.0
SetSequenceFunc ...
type Subscriber ¶
type Subscriber[E EventConstraint] struct { // contains filtered or unexported fields }
Subscriber for subscribing to events
func (*Subscriber[E]) Fetch ¶
func (s *Subscriber[E]) Fetch(ctx context.Context) ([]E, error)
Fetch get events, if ctx is cancelled / deadline exceed then the fetch will be returned with error = ctx.Err(), and then it can be call again with a normal context object. The list of events returned will never be empty when err = nil
type SubscriberOption ¶
type SubscriberOption func(opts *subscriberOptions)
SubscriberOption for customizing subscribers
func WithSubscriberSizeLimit ¶
func WithSubscriberSizeLimit(sizeLimit uint64) SubscriberOption
WithSubscriberSizeLimit configures limit in size of Fetch batches