eventbus

package
v1.84.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2025 License: BSD-3-Clause Imports: 19 Imported by: 1

Documentation

Overview

Package eventbus provides an in-process event bus.

An event bus connects publishers of typed events with subscribers interested in those events. Typically, there is one global event bus per process.

Usage

To send or receive events, first use Bus.Client to register with the bus. Clients should register with a human-readable name that identifies the code using the client, to aid in debugging.

To publish events, use Publish on a Client to get a typed publisher for your event type, then call Publisher.Publish as needed. If your event is expensive to construct, you can optionally use Publisher.ShouldPublish to skip the work if nobody is listening for the event.

To receive events, use Subscribe to get a typed subscriber for each event type you're interested in. Receive the events themselves by selecting over all your Subscriber.Events channels, as well as Subscriber.Done for shutdown notifications.

Concurrency properties

The bus serializes all published events across all publishers, and preserves that ordering when delivering to subscribers that are attached to the same Client. In more detail:

  • An event is published to the bus at some instant between the start and end of the call to Publisher.Publish.
  • Two events cannot be published at the same instant, and so are totally ordered by their publication time. Given two events E1 and E2, either E1 happens before E2, or E2 happens before E1.
  • Clients dispatch events to their Subscribers in publication order: if E1 happens before E2, the client always delivers E1 before E2.
  • Clients do not synchronize subscriptions with each other: given clients C1 and C2, both subscribed to events E1 and E2, C1 may deliver both E1 and E2 before C2 delivers E1.

Less formally: there is one true timeline of all published events. If you make a Client and subscribe to events, you will receive events one at a time, in the same order as the one true timeline. You will "skip over" events you didn't subscribe to, but your view of the world always moves forward in time, never backwards, and you will observe events in the same order as everyone else.

However, you cannot assume that what your client see as "now" is the same as what other clients. They may be further behind you in working through the timeline, or running ahead of you. This means you should be careful about reaching out to another component directly after receiving an event, as its view of the world may not yet (or ever) be exactly consistent with yours.

To make your code more testable and understandable, you should try to structure it following the actor model: you have some local state over which you have authority, but your only way to interact with state elsewhere in the program is to receive and process events coming from elsewhere, or to emit events of your own.

Expected subscriber behavior

Subscribers are expected to promptly receive their events on Subscriber.Events. The bus has a small, fixed amount of internal buffering, meaning that a slow subscriber will eventually cause backpressure and block publication of all further events.

In general, you should receive from your subscriber(s) in a loop, and only do fast state updates within that loop. Any heavier work should be offloaded to another goroutine.

Causing publishers to block from backpressure is considered a bug in the slow subscriber causing the backpressure, and should be addressed there. Publishers should assume that Publish will not block for extended periods of time, and should not make exceptional effort to behave gracefully if they do get blocked.

These blocking semantics are provisional and subject to change. Please speak up if this causes development pain, so that we can adapt the semantics to better suit our needs.

Debugging facilities

The Debugger, obtained through Bus.Debugger, provides introspection facilities to monitor events flowing through the bus, and inspect publisher and subscriber state.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is an event bus that distributes published events to interested subscribers.

func New

func New() *Bus

New returns a new bus. Use [PublisherOf] to make event publishers, and [Bus.Queue] and Subscribe to make event subscribers.

func (*Bus) Client

func (b *Bus) Client(name string) *Client

Client returns a new client with no subscriptions. Use Subscribe to receive events, and Publish to emit events.

The client's name is used only for debugging, to tell humans what piece of code a publisher/subscriber belongs to. Aim for something short but unique, for example "kernel-route-monitor" or "taildrop", not "watcher".

func (*Bus) Close

func (b *Bus) Close()

Close closes the bus. Implicitly closes all clients, publishers and subscribers attached to the bus.

Close blocks until the bus is fully shut down. The bus is permanently unusable after closing.

func (*Bus) Debugger

func (b *Bus) Debugger() *Debugger

Debugger returns the debugging facility for the bus.

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client can publish and subscribe to events on its attached bus. See Publish to publish events, and Subscribe to receive events.

Subscribers that share the same client receive events one at a time, in the order they were published.

func (*Client) Close

func (c *Client) Close()

Close closes the client. Implicitly closes all publishers and subscribers obtained from this client.

func (*Client) Name

func (c *Client) Name() string

type Debugger

type Debugger struct {
	// contains filtered or unexported fields
}

A Debugger offers access to a bus's privileged introspection and debugging facilities.

The debugger's functionality is intended for humans and their tools to examine and troubleshoot bus clients, and should not be used in normal codepaths.

In particular, the debugger provides access to information that is deliberately withheld from bus clients to encourage more robust and maintainable code - for example, the sender of an event, or the event streams of other clients. Please don't use the debugger to circumvent these restrictions for purposes other than debugging.

func (*Debugger) Clients

func (d *Debugger) Clients() []*Client

Clients returns a list of all clients attached to the bus.

func (*Debugger) PublishQueue

func (d *Debugger) PublishQueue() []PublishedEvent

PublishQueue returns the contents of the publish queue.

The publish queue contains events that have been accepted by the bus from Publish() calls, but have not yet been routed to relevant subscribers.

This queue is expected to be almost empty in normal operation. A full publish queue indicates that a slow subscriber downstream is causing backpressure and stalling the bus.

func (*Debugger) PublishTypes

func (d *Debugger) PublishTypes(client *Client) []reflect.Type

PublishTypes returns the list of types being published by client.

The returned types are those for which the client has obtained a Publisher. The client may not have ever sent the type in question.

func (*Debugger) RegisterHTTP

func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler)

func (*Debugger) SubscribeQueue

func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent

SubscribeQueue returns the contents of the given client's subscribe queue.

The subscribe queue contains events that are to be delivered to the client, but haven't yet been handed off to the relevant Subscriber.

This queue is expected to be almost empty in normal operation. A full subscribe queue indicates that the client is accepting events too slowly, and may be causing the rest of the bus to stall.

func (*Debugger) SubscribeTypes

func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type

SubscribeTypes returns the list of types being subscribed to by client.

The returned types are those for which the client has obtained a Subscriber. The client may not have ever received the type in question, and here may not be any publishers of the type.

func (*Debugger) WatchBus

func (d *Debugger) WatchBus() *Subscriber[RoutedEvent]

WatchBus streams information about all events passing through the bus.

Monitored events are delivered in the bus's global publication order (see "Concurrency properties" in the package docs).

The caller must consume monitoring events promptly to avoid stalling the bus (see "Expected subscriber behavior" in the package docs).

func (*Debugger) WatchPublish

func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent]

WatchPublish streams information about all events published by the given client.

Monitored events are delivered in the bus's global publication order (see "Concurrency properties" in the package docs).

The caller must consume monitoring events promptly to avoid stalling the bus (see "Expected subscriber behavior" in the package docs).

func (*Debugger) WatchSubscribe

func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent]

WatchSubscribe streams information about all events received by the given client.

Monitored events are delivered in the bus's global publication order (see "Concurrency properties" in the package docs).

The caller must consume monitoring events promptly to avoid stalling the bus (see "Expected subscriber behavior" in the package docs).

type DeliveredEvent

type DeliveredEvent struct {
	Event any
	From  *Client
	To    *Client
}

type PublishedEvent

type PublishedEvent struct {
	Event any
	From  *Client
}

type Publisher

type Publisher[T any] struct {
	// contains filtered or unexported fields
}

A Publisher publishes typed events on a bus.

func Publish

func Publish[T any](c *Client) *Publisher[T]

Publisher returns a publisher for event type T using the given client.

func (*Publisher[T]) Close

func (p *Publisher[T]) Close()

Close closes the publisher.

Calls to Publish after Close silently do nothing.

func (*Publisher[T]) Publish

func (p *Publisher[T]) Publish(v T)

Publish publishes event v on the bus.

func (*Publisher[T]) ShouldPublish

func (p *Publisher[T]) ShouldPublish() bool

ShouldPublish reports whether anyone is subscribed to the events that this publisher emits.

ShouldPublish can be used to skip expensive event construction if nobody seems to care. Publishers must not assume that someone will definitely receive an event if ShouldPublish returns true.

type RoutedEvent

type RoutedEvent struct {
	Event any
	From  *Client
	To    []*Client
}

type Subscriber

type Subscriber[T any] struct {
	// contains filtered or unexported fields
}

A Subscriber delivers one type of event from a Client.

func Subscribe

func Subscribe[T any](c *Client) *Subscriber[T]

Subscribe requests delivery of events of type T through the given Queue. Panics if the queue already has a subscriber for T.

func (*Subscriber[T]) Close

func (s *Subscriber[T]) Close()

Close closes the Subscriber, indicating the caller no longer wishes to receive this event type. After Close, receives on Subscriber.Events block for ever.

func (*Subscriber[T]) Done

func (s *Subscriber[T]) Done() <-chan struct{}

Done returns a channel that is closed when the subscriber is closed.

func (*Subscriber[T]) Events

func (s *Subscriber[T]) Events() <-chan T

Events returns a channel on which the subscriber's events are delivered.

Directories

Path Synopsis
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients.
debug-demo is a program that serves a bus's debug interface over HTTP, then generates some fake traffic from a handful of clients.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL