gubgub

package module
v0.0.0-...-80e284c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: MIT Imports: 4 Imported by: 0

README

GubGub

Yet another in-memory Go PubSub library. I started to develop what is now GubGub in one of my personal projects but I soon found myself using it in other completely unrelated projects and I thought it could be a nice thing to share.

Getting started

go get -u gitlab.com/naterciom/gubgub

Example

We'll be ignoring errors for code brevity!

package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/naterciom/gubgub"
)

type MyMessage struct {
	Name string
}

func consumer(msg MyMessage) {
	fmt.Printf("Hello %s", msg.Name)
}

func main() {
	topic := gubgub.NewAsyncTopic[MyMessage]()
	defer topic.Close() // Returns after all messages are delivered

	_ = topic.Subscribe(gubgub.Forever(consumer))

	// The AsyncTopic doesn't wait for the subscriber to be registered so, for the purposes of this
	// example, we sleep on it.
	time.Sleep(time.Millisecond)

	_ = topic.Publish(MyMessage{Name: "John Smith"}) // Returns immediately
}

Topics

Topics are what this is all about. You publish to a topic and you subscribe to a topic. That is it.

A Subscriber is just a callback func. A message is considered delivered when all subscribers have been called for that message and returned.

If you Publish a message successfully (did not get an error) then you can be sure the message will be deliverd before any call to Close returns.

Topics are meant to live as long as the application but you should call the Close method upon shutdown to fulfill the publishing promise. Use the WithOnClose option when creating the topic to perform any extra clean up you might need to do if the topic is closed.

GubGub offers 2 kinds of topics:

  • SyncTopic - Publishing blocks until the message was delivered to all subscribers. Subscribing blocks until the subscriber is registered.

  • AsyncTopic - Publishing schedules the message to be eventually delivered. Subscribing schedules a subscriber to be eventually registered. Only message delivery is garanteed.

The type of topic does not relate to how messages are actually delivered. Currently we deliver messages sequenctially (each subscriber gets the message one after the other).

Benchmarks

  • SyncTopic - Subscribers speed and number will have a direct impact the publishing performance. Under the right conditions (few and fast subscribers) this is the most performant topic.

  • AsyncTopic - Subscribers speed and number will not directly impact the publishing perfomance at the cost of some publishing overhead. This is generally the most scalable topic.

The following benchmarks are just for topic comparison regarding how the number of subscribers and their speed can impact the publishing performance:

BenchmarkAsyncTopic_Publish/10_NoOp_Subscribers-8         	 2047338	       498.7 ns/op
BenchmarkAsyncTopic_Publish/100_NoOp_Subscribers-8        	 3317646	       535.0 ns/op
BenchmarkAsyncTopic_Publish/1K_NoOp_Subscribers-8         	 3239110	       578.9 ns/op
BenchmarkAsyncTopic_Publish/10K_NoOp_Subscribers-8        	 1871702	       691.2 ns/op
BenchmarkAsyncTopic_Publish/10_Slow_Subscribers-8         	 2615269	       433.4 ns/op
BenchmarkAsyncTopic_Publish/20_Slow_Subscribers-8         	 3127874	       470.4 ns/op
BenchmarkSyncTopic_Publish/10_NoOp_Subscribers-8          	24740354	        59.69 ns/op
BenchmarkSyncTopic_Publish/100_NoOp_Subscribers-8         	 4135681	       488.9 ns/op
BenchmarkSyncTopic_Publish/1K_NoOp_Subscribers-8          	  474122	      4320 ns/op
BenchmarkSyncTopic_Publish/10K_NoOp_Subscribers-8         	   45790	     35583 ns/op
BenchmarkSyncTopic_Publish/10_Slow_Subscribers-8          	  357253	      3393 ns/op
BenchmarkSyncTopic_Publish/20_Slow_Subscribers-8          	  179725	      6688 ns/op

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTopicClosed = fmt.Errorf("topic is closed")

Functions

func Feed

func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error)

Feed allows the usage of for/range loop to consume future published messages. The supporting subscriber will eventually be discarded after you exit the for loop.

Types

type AsyncTopic

type AsyncTopic[T any] struct {
	// contains filtered or unexported fields
}

AsyncTopic allows any message T to be broadcast to subscribers. Publishing as well as subscribing happens asynchronously (as non-blocking as possible). Closing the topic guarantees that published message will be delivered and no further messages nor subscribers will be accepted. Delivery order is NOT guaranteed.

func NewAsyncTopic

func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T]

NewAsyncTopic creates an AsyncTopic.

func (*AsyncTopic[T]) Close

func (t *AsyncTopic[T]) Close()

Close terminates background go routines and prevents further publishing and subscribing. All published messages are garanteed to be delivered once Close returns. This is idempotent.

func (*AsyncTopic[T]) Publish

func (t *AsyncTopic[T]) Publish(msg T) error

Publish broadcasts a msg to all subscribers asynchronously.

func (*AsyncTopic[T]) Subscribe

func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error

Subscribe registers a Subscriber func asynchronously.

type Publishable

type Publishable[T any] interface {
	Publish(msg T) error
}

type Subscribable

type Subscribable[T any] interface {
	Subscribe(Subscriber[T]) error
}

type Subscriber

type Subscriber[T any] func(T) bool

Subscriber is a func that processes a message and returns true if it should continue processing more messages.

func Buffered

func Buffered[T any](subscriber Subscriber[T]) Subscriber[T]

Buffered returns a subscriber that buffers messages if they can't be delivered immediately. There is no artificial limit to how many items can be buffered. This is bounded only by available memory. This is useful if message publishing is surge prone and message processing is slow or unpredictable (for example: subscriber makes network request). IMPORTANT: messages are considered delivered even it they are still in the buffer which means that buffered subscribers are NOT COVERED by the publishing promise. Message average processing rate must still be higher than the average message publishing rate otherwise it will eventually lead to memory issues. You will need to find a better strategy to deal with such scenario.

func Forever

func Forever[T any](fn func(T)) Subscriber[T]

Forever wraps a subscriber that will never stop consuming messages. This helps avoiding subscribers that always return TRUE.

func NoOp

func NoOp[T any]() Subscriber[T]

NoOp creates a subscriber that does absolutely nothing forever. This is mostly useful for testing.

func Once

func Once[T any](fn func(T)) Subscriber[T]

Once wraps a subscriber that will consume only one message. This helps avoiding subscribers that always return FALSE.

type SyncTopic

type SyncTopic[T any] struct {
	// contains filtered or unexported fields
}

SyncTopic is the simplest and most naive topic. It allows any message T to be broadcast to subscribers. Publishing and Subscribing happens synchronously (block).

func NewSyncTopic

func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T]

NewSyncTopic creates a SyncTopic with the specified options.

func (*SyncTopic[T]) Close

func (t *SyncTopic[T]) Close()

Close will prevent further publishing and subscribing.

func (*SyncTopic[T]) Publish

func (t *SyncTopic[T]) Publish(msg T) error

Publish broadcasts a message to all subscribers.

func (*SyncTopic[T]) Subscribe

func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error

Subscribe adds a Subscriber func that will consume future published messages.

type Topic

type Topic[T any] interface {
	Publishable[T]
	Subscribable[T]
}

Topic is just a convenience interface you can expect all topics to implement.

type TopicOption

type TopicOption func(*TopicOptions)

func WithOnClose

func WithOnClose(fn func()) TopicOption

func WithOnSubscribe

func WithOnSubscribe(fn func()) TopicOption

type TopicOptions

type TopicOptions struct {
	// contains filtered or unexported fields
}

TopicOptions holds common options for topics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL