Documentation
¶
Overview ¶
Package eventbus provides an in-process event bus.
An event bus connects publishers of typed events with subscribers interested in those events. Typically, there is one global event bus per process.
Usage ¶
To send or receive events, first use Bus.Client to register with the bus. Clients should register with a human-readable name that identifies the code using the client, to aid in debugging.
To publish events, use Publish on a Client to get a typed publisher for your event type, then call Publisher.Publish as needed. If your event is expensive to construct, you can optionally use Publisher.ShouldPublish to skip the work if nobody is listening for the event.
To receive events, use Subscribe to get a typed subscriber for each event type you're interested in. Receive the events themselves by selecting over all your Subscriber.Events channels, as well as Subscriber.Done for shutdown notifications.
Concurrency properties ¶
The bus serializes all published events across all publishers, and preserves that ordering when delivering to subscribers that are attached to the same Client. In more detail:
- An event is published to the bus at some instant between the start and end of the call to Publisher.Publish.
- Two events cannot be published at the same instant, and so are totally ordered by their publication time. Given two events E1 and E2, either E1 happens before E2, or E2 happens before E1.
- Clients dispatch events to their Subscribers in publication order: if E1 happens before E2, the client always delivers E1 before E2.
- Clients do not synchronize subscriptions with each other: given clients C1 and C2, both subscribed to events E1 and E2, C1 may deliver both E1 and E2 before C2 delivers E1.
Less formally: there is one true timeline of all published events. If you make a Client and subscribe to events, you will receive events one at a time, in the same order as the one true timeline. You will "skip over" events you didn't subscribe to, but your view of the world always moves forward in time, never backwards, and you will observe events in the same order as everyone else.
However, you cannot assume that what your client see as "now" is the same as what other clients. They may be further behind you in working through the timeline, or running ahead of you. This means you should be careful about reaching out to another component directly after receiving an event, as its view of the world may not yet (or ever) be exactly consistent with yours.
To make your code more testable and understandable, you should try to structure it following the actor model: you have some local state over which you have authority, but your only way to interact with state elsewhere in the program is to receive and process events coming from elsewhere, or to emit events of your own.
Expected subscriber behavior ¶
Subscribers are expected to promptly receive their events on Subscriber.Events. The bus has a small, fixed amount of internal buffering, meaning that a slow subscriber will eventually cause backpressure and block publication of all further events.
In general, you should receive from your subscriber(s) in a loop, and only do fast state updates within that loop. Any heavier work should be offloaded to another goroutine.
Causing publishers to block from backpressure is considered a bug in the slow subscriber causing the backpressure, and should be addressed there. Publishers should assume that Publish will not block for extended periods of time, and should not make exceptional effort to behave gracefully if they do get blocked.
These blocking semantics are provisional and subject to change. Please speak up if this causes development pain, so that we can adapt the semantics to better suit our needs.
Debugging facilities ¶
The Debugger, obtained through Bus.Debugger, provides introspection facilities to monitor events flowing through the bus, and inspect publisher and subscriber state.
Index ¶
- type Bus
- type Client
- type Debugger
- func (d *Debugger) Clients() []*Client
- func (d *Debugger) PublishQueue() []PublishedEvent
- func (d *Debugger) PublishTypes(client *Client) []reflect.Type
- func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler)
- func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent
- func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type
- func (d *Debugger) WatchBus() *Subscriber[RoutedEvent]
- func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent]
- func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent]
- type DeliveredEvent
- type PublishedEvent
- type Publisher
- type RoutedEvent
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is an event bus that distributes published events to interested subscribers.
func New ¶
func New() *Bus
New returns a new bus. Use [PublisherOf] to make event publishers, and [Bus.Queue] and Subscribe to make event subscribers.
func (*Bus) Client ¶
Client returns a new client with no subscriptions. Use Subscribe to receive events, and Publish to emit events.
The client's name is used only for debugging, to tell humans what piece of code a publisher/subscriber belongs to. Aim for something short but unique, for example "kernel-route-monitor" or "taildrop", not "watcher".
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
A Client can publish and subscribe to events on its attached bus. See Publish to publish events, and Subscribe to receive events.
Subscribers that share the same client receive events one at a time, in the order they were published.
type Debugger ¶
type Debugger struct {
// contains filtered or unexported fields
}
A Debugger offers access to a bus's privileged introspection and debugging facilities.
The debugger's functionality is intended for humans and their tools to examine and troubleshoot bus clients, and should not be used in normal codepaths.
In particular, the debugger provides access to information that is deliberately withheld from bus clients to encourage more robust and maintainable code - for example, the sender of an event, or the event streams of other clients. Please don't use the debugger to circumvent these restrictions for purposes other than debugging.
func (*Debugger) PublishQueue ¶
func (d *Debugger) PublishQueue() []PublishedEvent
PublishQueue returns the contents of the publish queue.
The publish queue contains events that have been accepted by the bus from Publish() calls, but have not yet been routed to relevant subscribers.
This queue is expected to be almost empty in normal operation. A full publish queue indicates that a slow subscriber downstream is causing backpressure and stalling the bus.
func (*Debugger) PublishTypes ¶
PublishTypes returns the list of types being published by client.
The returned types are those for which the client has obtained a Publisher. The client may not have ever sent the type in question.
func (*Debugger) RegisterHTTP ¶
func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler)
func (*Debugger) SubscribeQueue ¶
func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent
SubscribeQueue returns the contents of the given client's subscribe queue.
The subscribe queue contains events that are to be delivered to the client, but haven't yet been handed off to the relevant Subscriber.
This queue is expected to be almost empty in normal operation. A full subscribe queue indicates that the client is accepting events too slowly, and may be causing the rest of the bus to stall.
func (*Debugger) SubscribeTypes ¶
SubscribeTypes returns the list of types being subscribed to by client.
The returned types are those for which the client has obtained a Subscriber. The client may not have ever received the type in question, and here may not be any publishers of the type.
func (*Debugger) WatchBus ¶
func (d *Debugger) WatchBus() *Subscriber[RoutedEvent]
WatchBus streams information about all events passing through the bus.
Monitored events are delivered in the bus's global publication order (see "Concurrency properties" in the package docs).
The caller must consume monitoring events promptly to avoid stalling the bus (see "Expected subscriber behavior" in the package docs).
func (*Debugger) WatchPublish ¶
func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent]
WatchPublish streams information about all events published by the given client.
Monitored events are delivered in the bus's global publication order (see "Concurrency properties" in the package docs).
The caller must consume monitoring events promptly to avoid stalling the bus (see "Expected subscriber behavior" in the package docs).
func (*Debugger) WatchSubscribe ¶
func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent]
WatchSubscribe streams information about all events received by the given client.
Monitored events are delivered in the bus's global publication order (see "Concurrency properties" in the package docs).
The caller must consume monitoring events promptly to avoid stalling the bus (see "Expected subscriber behavior" in the package docs).
type DeliveredEvent ¶
type PublishedEvent ¶
type Publisher ¶
type Publisher[T any] struct { // contains filtered or unexported fields }
A Publisher publishes typed events on a bus.
func (*Publisher[T]) Close ¶
func (p *Publisher[T]) Close()
Close closes the publisher.
Calls to Publish after Close silently do nothing.
func (*Publisher[T]) Publish ¶
func (p *Publisher[T]) Publish(v T)
Publish publishes event v on the bus.
func (*Publisher[T]) ShouldPublish ¶
ShouldPublish reports whether anyone is subscribed to the events that this publisher emits.
ShouldPublish can be used to skip expensive event construction if nobody seems to care. Publishers must not assume that someone will definitely receive an event if ShouldPublish returns true.
type RoutedEvent ¶
type Subscriber ¶
type Subscriber[T any] struct { // contains filtered or unexported fields }
A Subscriber delivers one type of event from a Client.
func Subscribe ¶
func Subscribe[T any](c *Client) *Subscriber[T]
Subscribe requests delivery of events of type T through the given Queue. Panics if the queue already has a subscriber for T.
func (*Subscriber[T]) Close ¶
func (s *Subscriber[T]) Close()
Close closes the Subscriber, indicating the caller no longer wishes to receive this event type. After Close, receives on Subscriber.Events block for ever.
func (*Subscriber[T]) Done ¶
func (s *Subscriber[T]) Done() <-chan struct{}
Done returns a channel that is closed when the subscriber is closed.
func (*Subscriber[T]) Events ¶
func (s *Subscriber[T]) Events() <-chan T
Events returns a channel on which the subscriber's events are delivered.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients.
|
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients. |