Documentation
¶
Index ¶
- func EventStreamSampleGroup(factory func() EventStream)
- type Bracket
- type Event
- type EventBuilder
- type EventHandler
- type EventStream
- type EventStreamWrapper
- func (s *EventStreamWrapper) After(duration time.Duration) EventBuilder
- func (s *EventStreamWrapper) Agg(a ...string) EventBuilder
- func (s *EventStreamWrapper) DefAgg() EventBuilder
- func (s *EventStreamWrapper) Emit(builders ...EventBuilder) (*Event, error)
- func (s *EventStreamWrapper) IncrBy(duration time.Duration) EventBuilder
- func (s *EventStreamWrapper) Stream() EventStream
- func (s *EventStreamWrapper) Type(t string) EventBuilder
- type SelectOption
- type Selector
- type SequenceStore
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EventStreamSampleGroup ¶
func EventStreamSampleGroup(factory func() EventStream)
Types ¶
type Event ¶
type Event struct {
Sequence int64 `json:"sequence,omitempty" yaml:"sequence,omitempty"`
Aggregate []string `json:"aggregate,omitempty" yaml:"aggregate,omitempty"`
Type string `json:"type,omitempty" yaml:"type,omitempty"`
OccurredAt time.Time `json:"occurred_at,omitempty" yaml:"occurred_at,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"`
}
type EventBuilder ¶
type EventBuilder = func(e *Event)
type EventHandler ¶
type EventStream ¶
type EventStream interface {
Store(event *Event) (int64, error)
LastSequence() int64
Get(sequence int64) (*Event, error)
Stream(ctx context.Context, sel Selector, bracket Bracket, handler EventHandler) error
Subscribe(ctx context.Context, persistentClientID string, sel Selector, handler EventHandler) (Subscription, error)
// Returns all currently known Subscriptions.
Subscriptions() []Subscription
}
type EventStreamWrapper ¶
type EventStreamWrapper struct {
// contains filtered or unexported fields
}
func NewWrapper ¶
func NewWrapper(stream EventStream) *EventStreamWrapper
func NewWrapperWithStartTime ¶
func NewWrapperWithStartTime(stream EventStream, startTime time.Time) *EventStreamWrapper
func (*EventStreamWrapper) After ¶
func (s *EventStreamWrapper) After(duration time.Duration) EventBuilder
func (*EventStreamWrapper) Agg ¶
func (s *EventStreamWrapper) Agg(a ...string) EventBuilder
func (*EventStreamWrapper) DefAgg ¶
func (s *EventStreamWrapper) DefAgg() EventBuilder
func (*EventStreamWrapper) Emit ¶
func (s *EventStreamWrapper) Emit(builders ...EventBuilder) (*Event, error)
func (*EventStreamWrapper) IncrBy ¶
func (s *EventStreamWrapper) IncrBy(duration time.Duration) EventBuilder
func (*EventStreamWrapper) Stream ¶
func (s *EventStreamWrapper) Stream() EventStream
func (*EventStreamWrapper) Type ¶
func (s *EventStreamWrapper) Type(t string) EventBuilder
type SelectOption ¶
type SelectOption func(s *Selector)
func SelectAggregate ¶
func SelectAggregate(agg ...string) SelectOption
func SelectType ¶
func SelectType(t string) SelectOption
type Selector ¶
func ParseSelector ¶
func Select ¶
func Select(options ...SelectOption) Selector
func (*Selector) IsComplete ¶
type SequenceStore ¶
type Subscription ¶
type Subscription interface {
PersistentID() string
// Returns the currently active Selector.
ActiveSelector() Selector
LastAcknowledgedSequence() (int64, error)
Acknowledge(sequence int64) error
// Returns whether this Subscription is currently active.
Active() bool
// Returns the time this Subscription last became inactive.
InactiveSince() time.Time
// Wait for the Subscription to become inactive (disconnected)
Wait() error
// Returns how often this Subscription has dropped out of the live stream.
DropOuts() int
// Closes this Subscription and removes all associated state. A Subscription can not be resumed after this call.
Shutdown()
}
Click to show internal directories.
Click to hide internal directories.