rxd

package module
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

README

RxDaemon (RxD)

codecov

A simple (alpha) reactive services daemon

NOTE: RxD has been through another refactor to make it more flexible with the introduction of injectable "Service Managers" which are effectively supervisors of a given service. Intracom (pub/sub package) has become a subpackage of rxd.

RxD leverages Intracom for internal comms which allows for the ability of your individual RxD services to subscribe interest to the lifecycle states of other RxD services running alongside each other. This means each service can be notified independently of what state another service is in. You can ultimately have any service "watch and react" to a state change of another service.

A good example to imagine here would be something like ServiceA that has subscribed interest in another service, ServiceB, where ServiceB happens to be a health check service. It might maintain a live TCP connection to an external service, run interval queries against a database engine, or health check a docker container, it doesnt really matter. The goal here is to NOT have alot of services individually doing their own health checks against the same resource because the more services you have the more checks you are potentially doing against the same resource which might be creating socket connections or doing file I/O not to mention the potential code duplication lines for each service to do their own check. Why not instead, write a service that can do the main logic of the check then signal using its own lifecycle states to anyone who is interested in that health check logic. RxD gives us that ability.

Note about service managers like Systemd

In rxd when setting the WithReportAlive option on the daemon, this effectively causes the daemon during Start to launch a routine to interact with the underlying system service manager to report in with alive checks. For now this is purely meant for Systemd. This does also require you to set the appropriate configurations in your .service file. You would be required by systemd to set the WatchdogSec property.

This WatchdogSec property should be less for rxd because this is the amount of time systemd will wait to hear from your running daemon. After this time if your service has not reported in, systemd will consider it frozen/hung and will make attempts to stop or restart it.

By default, systemd has notify turned off. So it is possible to just not set or set a zero-value for the UsingReportAlive which will disable rxds notifier, by default RxD leaves this disabled.

[Unit]
Description=My Notifying Service

[Service]
Type=notify # WatchdogSec is required if this is set.
ExecStart=/path/to/my-service
NotifyAccess=main
WatchdogSec=10s  # Service must send a watchdog notification every 10 seconds, required it Type=notify is set.

Documentation

Overview

Package rxd provides a reactive daemon framework for managing concurrent services. It allows you to define services, manage their lifecycles, and handle inter-service communication. The daemon can be configured with various options such as prestart pipelines, service managers, and logging.

The reactive part of the daemon lies in the callers ability to define custom service runners that can react to lifecycles of other services, handle state transitions, and communicate with the daemon. The daemon itself reacts to OS signals and is able to gracefully signal shutdown to all services, the caller's implementation must be context-aware from within their service runners to handle the lifecycle of their services properly.

Index

Constants

This section is empty.

Variables

View Source
var NoFilter = ServiceFilter{Mode: None, Names: map[string]struct{}{}}

NoFilter is a predefined ServiceFilter that does not filter any services.

Functions

func IsCancelled added in v1.4.5

func IsCancelled(err error) bool

IsCancelled checks if the error is a context cancellation error.

Types

type CommandHandler added in v1.2.0

type CommandHandler struct {
	// contains filtered or unexported fields
}

CommandHandler is a struct that implements the RPC service for handling commands.

func (CommandHandler) ChangeLogLevel added in v1.2.0

func (h CommandHandler) ChangeLogLevel(level log.Level, _ *error) error

ChangeLogLevel changes the log level for the service and internal logger.

type Daemon added in v1.2.0

type Daemon interface {
	AddServices(services ...Service) error
	AddService(service Service) error
	Start(ctx context.Context) error
}

Daemon is the interface for the reactive daemon. It allows you to add services, start the daemon, and manage the lifecycle of the services.

func NewDaemon

func NewDaemon(name string, options ...DaemonOption) Daemon

NewDaemon creates and return an instance of the reactive daemon NOTE: The service logger runs with a default stdout logger. This can be optionally changed by passing the WithServiceLogger option in NewDaemon The internal logger is disabled by default and can be enabled by passing the WithInternalLogger option in NewDaemon

type DaemonLog added in v1.2.0

type DaemonLog struct {
	Level   log.Level
	Message string
	Fields  []log.Field
}

DaemonLog represents a log entry for the daemon.

func (DaemonLog) String added in v1.2.0

func (l DaemonLog) String() string

type DaemonOption added in v1.2.0

type DaemonOption func(*daemon)

DaemonOption is a functional option type for configuring a Daemon.

func WithCustomPrestartPipeline added in v1.2.1

func WithCustomPrestartPipeline(prestart Pipeline) DaemonOption

WithCustomPrestartPipeline allows you to set a custom prestart pipeline for the daemon. This is useful if you have a prestart pipeline that does not fit the standard configuration

func WithInternalLogger added in v1.2.0

func WithInternalLogger(logger log.Logger) DaemonOption

WithInternalLogger sets a custom logger for the daemon to use for internal logging. by default, the daemon will use a noop logger since this logger is used for rxd internals.

func WithInternalLogging added in v1.2.0

func WithInternalLogging(filepath string, level log.Level) DaemonOption

WithInternalLogging enables the internal logger to write to the filepath using the provided log level.

func WithPrestart added in v1.2.1

func WithPrestart(conf PrestartConfig, stages ...Stage) DaemonOption

WithPrestart sets the prestart pipeline configration and stages for the daemon. The prestart pipeline will run before the daemon starts and can be used to perform any necessary setup or checks before the main daemon logic begins.

func WithRPC added in v1.2.0

func WithRPC(cfg RPCConfig) DaemonOption

WithRPC enables an RPC server to run alongside the daemon. The RPC server will be available at the provided address and port. Currently the RPC server only supports a single method to change log level. An RPC client is provided in the pkg/rxrpc package for external use.

func WithReportAlive added in v1.2.0

func WithReportAlive(timeoutSecs uint64) DaemonOption

WithReportAlive sets the interval in seconds for when the daemon should report that it is still alive to the service manager. If the value is set to 0, the daemon will not interact with the service manager.

func WithServiceLogger added in v1.2.0

func WithServiceLogger(logger log.Logger) DaemonOption

WithServiceLogger sets a custom logger for the daemon to pass along for all services to use.

func WithSignals added in v1.2.0

func WithSignals(signals ...os.Signal) DaemonOption

WithSignals sets the OS signals that the daemon should listen for. If no signals are provided, the daemon will listen for SIGINT and SIGTERM by default.

type DaemonService added in v1.2.0

type DaemonService struct {
	Name   string
	Runner ServiceRunner
}

DaemonService is a DTO passed to a service manager containing the Name of the service along with the the ServiceRunner implementation for the manager to use. Every service is wrapped in a Service Manager to handle the lifecycle of the service.

type ErrUninitialized added in v1.2.0

type ErrUninitialized struct {
	StructName string
	Method     string
}

ErrUninitialized is an error type that indicates a struct is uninitialized

func (ErrUninitialized) Error added in v1.2.0

func (e ErrUninitialized) Error() string

type Error

type Error string

Error is a simple string type that implements the error interface.

const (
	// ErrLifecycleDone indicates that the lifecycle has completed.
	ErrLifecycleDone Error = Error("lifecycle done")
	// ErrDaemonStarted indicates that the daemon has already been started.
	ErrDaemonStarted Error = Error("daemon has already been started")
	// ErrDuplicateServiceName indicates that a service with the same name already exists.
	ErrDuplicateServiceName Error = Error("duplicate service name found")
	// ErrNoServices indicates that there are no services to run.
	ErrNoServices Error = Error("no services to run")
	// ErrNoServiceName indicates that no service name was provided.
	ErrNoServiceName Error = Error("no service name provided")
	// ErrNilService indicates that a nil service was provided.
	ErrNilService Error = Error("nil service provided")
	// ErrDuplicateServicePolicy indicates that a service policy with the same name already exists.
	ErrDuplicateServicePolicy Error = Error("duplicate service policy found")
	// ErrAddingServiceOnceStarted indicates that a service cannot be added after the daemon has started.
	ErrAddingServiceOnceStarted Error = Error("cannot add a service once the daemon is started")
)

func (Error) Error added in v1.2.0

func (e Error) Error() string

Error implements the error interface for the Error type.

type FilterMode added in v1.2.0

type FilterMode uint8

FilterMode represents the mode of filtering services based on their names.

const (
	// None represents no filtering mode.
	// In this mode, all services are considered, and no names are filtered out.
	None FilterMode = iota
	// Include represents a filtering mode where only specified service names are included.
	// In this mode, only the services whose names are in the Names set will be included in operations.
	Include
	// Exclude represents a filtering mode where specified service names are excluded.
	// In this mode, all services are considered except those whose names are in the Names set.
	Exclude
)

type ManagerOption added in v1.2.0

type ManagerOption func(m *RunContinuousManager)

ManagerOption is a functional option type for configuring a RunContinuousManager.

func WithInitDelay added in v1.2.0

func WithInitDelay(delay time.Duration) ManagerOption

WithInitDelay allows setting a custom delay before the service is initialized. This is useful for services that need to wait for other dependencies to be ready.

func WithTransitionTimeouts added in v1.2.0

func WithTransitionTimeouts(t ManagerStateTimeouts) ManagerOption

WithTransitionTimeouts allows setting specific timeouts for state transitions.

type ManagerStateTimeouts added in v1.2.0

type ManagerStateTimeouts map[State]time.Duration

ManagerStateTimeouts is a map of State to time.Duration. This is used to define the timeouts for each state transition in a service manager.

type Pipeline added in v1.2.1

type Pipeline interface {
	Add(stage Stage)
	Run(ctx context.Context) <-chan DaemonLog
}

Pipeline is an interface that defines a sequence of stages to be executed in order. Each stage is a function that takes a context and returns an error.

func NewPrestartPipeline added in v1.2.1

func NewPrestartPipeline(conf PrestartConfig, stages ...Stage) Pipeline

NewPrestartPipeline creates a new prestart pipeline with the given configuration and stages. The pipeline will run the stages in order and can be configured to restart on error with a delay. If RestartOnError is true, the pipeline will restart from the beginning if an error occurs.

type PrestartConfig added in v1.2.1

type PrestartConfig struct {
	RestartOnError bool
	RestartDelay   time.Duration
}

PrestartConfig holds the configuration for a prestart pipeline.

type RPCConfig added in v1.2.0

type RPCConfig struct {
	Addr string
	Port uint16
}

RPCConfig holds the configuration for the RPC server. It includes the address and port on which the server will listen. The address can be an IP or a hostname, and the port is a uint16. The server will listen on the specified address and port for incoming RPC requests.

type RPCServer added in v1.2.0

type RPCServer struct {
	// contains filtered or unexported fields
}

RPCServer is a struct that represents an RPC server. It contains an HTTP server that listens for incoming RPC requests.

func NewRPCHandler added in v1.2.0

func NewRPCHandler(cfg RPCConfig) (*RPCServer, error)

NewRPCHandler creates a new RPC server with the given configuration. It initializes the HTTP server and registers the CommandHandler service. The CommandHandler service provides methods for changing log levels and sending commands to services.

func (*RPCServer) Start added in v1.2.0

func (s *RPCServer) Start() error

Start starts the RPC server and begins listening for incoming requests.

func (*RPCServer) Stop added in v1.2.0

func (s *RPCServer) Stop() error

Stop stops the RPC server by closing the underlying HTTP server.

type RunContinuousManager added in v1.2.0

type RunContinuousManager struct {
	DefaultDelay  time.Duration
	StartupDelay  time.Duration
	StateTimeouts ManagerStateTimeouts
}

RunContinuousManager is a service handler that does its best to run the service moving the service to the next desired state returned from each lifecycle The handle will override the state transition if the context is cancelled and force the service to Exit.

func NewDefaultManager added in v1.2.0

func NewDefaultManager(opts ...ManagerOption) RunContinuousManager

NewDefaultManager creates a new RunContinuousManager with the provided options. The StartupDelay is used to delay the first run of the service after the manager is started. The DefaultDelay is used for all subsequent state transitions if not overridden in the StateTimeouts map.

func (RunContinuousManager) Manage added in v1.2.0

func (m RunContinuousManager) Manage(sctx ServiceContext, ds DaemonService, updateC chan<- ServiceStateUpdate)

Manage will run the service continuously until the context is cancelled. It will start from the init state, then idle, run, stop and back to init. The only condition that will cause the service to exit is if the context is cancelled. It is up to the caller to decide how to track and handle waiting between any state transitions beyond the defaults. The default state timeouts are used if not overridden in the StateTimeouts map.

type RunUntilSuccessManager added in v1.2.1

type RunUntilSuccessManager struct {
	StartupDelay time.Duration
	DefaultDelay time.Duration
}

RunUntilSuccessManager represents a service manager that will run the service lifecycles until the service exits Run with nil error. It will retry the service lifecycles with a delay between attempts. The manager will start with a startup delay and then use the default delay for subsequent attempts. If the service stops, it will re-init

func NewRunUntilSuccessManager added in v1.2.1

func NewRunUntilSuccessManager(defaultDelay, startupDelay time.Duration) RunUntilSuccessManager

NewRunUntilSuccessManager creates a new RunUntilSuccessManager with the provided default and startup delays. The startup delay is used to delay the first run of the service after the manager is started. The default delay is used for subsequent runs after the service has stopped or exited.

func (RunUntilSuccessManager) Manage added in v1.2.1

func (m RunUntilSuccessManager) Manage(sctx ServiceContext, ds DaemonService, updateC chan<- ServiceStateUpdate)

Manage will run the service until it exits Run with a nil error. It will start with a startup delay, then run the service lifecycles in a loop. This manager is ideal for situation where you have a service that needs to succeed only a single time during its Run lifecycle. This manager usually indicates that the service isn't intended to run continuously for as long as the daemon is running, but rather it is expected to run until it succeeds and then exit. It can be use to perform one-off tasks or used to signal to other rxd services that it has completed its task before they move to their next state(s).

type Service

type Service struct {
	Name    string
	Runner  ServiceRunner
	Manager ServiceManager
}

Service acts as a DTO that carries the Name of the service along with the ServiceRunner implementation and an optional ServiceManager. This is normally passed to the Daemon via AddService or AddServices methods.

func NewService

func NewService(name string, runner ServiceRunner, opts ...ServiceOption) Service

NewService creates a new Service DTO with the provided name, runner and options. This DTO is used to register a service with the Daemon via AddService or AddServices methods.

type ServiceAction added in v1.2.0

type ServiceAction uint8

ServiceAction represents different actions that can be watched for in a service's lifecycle. It is used by the ServiceWatcher to determine how to filter what to watch when inspecting service states.

const (
	// Entering indicates the service is entering a state.
	Entering ServiceAction = iota // 0
	// Exited idicates the service has exited a state.
	Exited // 1
	// Changing indicates the service has changed its state.
	// This would trigger at least twice per state: exited previous state and then entered new state.
	Changing
	// NotIn used to include all states except a given state.
	// This would also trigger at least twice per state: exited previous state and then entered new state.
	// But it would not trigger for the state that is being excluded.
	NotIn // 6
)

func (ServiceAction) String added in v1.2.0

func (s ServiceAction) String() string

type ServiceContext

type ServiceContext interface {
	context.Context
	ServiceWatcher
	ServiceLogger
	// WithParent replaces the original context with the provided parent context.
	// It returns the original ServiceContext with the new context and a cancel function.
	// This allows for detaching the service context from its parent and allowing the caller to
	// cancel the context at an earlier time if needed.
	WithParent(ctx context.Context) (ServiceContext, context.CancelFunc)
	// WithName allows the caller to create a new CHILD service context with a specific name.
	// This is useful for creating child contexts to be used for services launches additional
	// worker goroutines where the parent service is managing its own lifecycles of those workers.
	// The returned ServiceContext is composed from the original parent context, therefore it has
	// if the parent is cancelled, the child context will also be cancelled when the daemon is shutting down.
	WithName(name string) (ServiceContext, context.CancelFunc)
	// IntracomRegistry returns the embedded intracom registry used by the daemon for all services.
	// The caller can use this to register new topics, create subscriptions, and publish messages
	// between services.
	// NOTE: Avoid using topics with an _rxd prefix as these are reserved for internal use by the daemon.
	// overlapping this topic name may cause unexpected behavior with internal states tracking by the daemon.
	IntracomRegistry() *intracom.Intracom
	// Logger returns the service logger used to log messages related to the service.
	// This logger should be prefixed with its original service=<service_name> field.
	Logger() log.Logger
}

ServiceContext is an interface that combines context.Context, ServiceWatcher, and ServiceLogger. It provides a way to manage service-specific contexts with logging and state watching capabilities. It allows for creating child contexts with specific names and fields, and provides methods to log messages

type ServiceFilter added in v1.2.0

type ServiceFilter struct {
	Mode  FilterMode
	Names map[string]struct{}
}

ServiceFilter is used to filter services based on their names. It can be used to include or exclude services from operations based on their names. The Mode field determines whether the names are included or excluded. The Names field is a set of service names that are either included or excluded based on the Mode. This is useful for operations that need to target specific services or avoid certain services.

func NewServiceFilter added in v1.2.0

func NewServiceFilter(mode FilterMode, names ...string) ServiceFilter

NewServiceFilter creates a new ServiceFilter with the specified mode and names. The names are stored in a set for efficient lookup.

type ServiceLogger added in v1.2.0

type ServiceLogger interface {
	// Log logs a message with the specified level, message, and any additional fields.
	Log(level log.Level, message string, extra ...log.Field)
}

ServiceLogger is an interface for logging messages within a service context. It allows logging at different levels with additional fields for context. This interface is used by the ServiceContext to log messages related to service operations. It provides a way to log messages with a specific level and additional fields for context.

type ServiceManager added in v1.2.0

type ServiceManager interface {
	Manage(ctx ServiceContext, dService DaemonService, updateC chan<- ServiceStateUpdate)
}

ServiceManager interface defines the behavior of a service manager. It is responsible for managing the lifecycle of a service, including transitioning between different states such as Init, Idle, Run, Stop, and Exit.

type ServiceOption

type ServiceOption func(*Service)

ServiceOption is a functional option type for configuring a Service.

func WithManager added in v1.2.0

func WithManager(manager ServiceManager) ServiceOption

WithManager allows overriding the default ServiceManager for a Service.

type ServiceRunner added in v1.2.0

type ServiceRunner interface {
	Init(ServiceContext) error
	Idle(ServiceContext) error
	Run(ServiceContext) error
	Stop(ServiceContext) error
}

ServiceRunner is an interface that defines the lifecycle methods for a service.

type ServiceStateUpdate added in v1.4.0

type ServiceStateUpdate struct {
	Name       string          // e.g. "service1"
	State      State           // e.g. StateRun
	Transition StateTransition // e.g. TransitionEntering
}

ServiceStateUpdate is used to signal a state change in a service. Used by the service manager to notify the daemon states watcher of state changes happening with the service it is managing.

type ServiceStates added in v1.2.0

type ServiceStates map[string]StateUpdate

ServiceStates is a map of service names to their most recent state update. This is used by the states watcher to build a map of all services and their states.

type ServiceWatcher added in v1.2.0

type ServiceWatcher interface {
	// WatchAllStates allows the caller to watch all service states and filter them based on the provided filter.
	// It returns a channel that will signal every time there is a change in the states of the services.
	// The channel will send out the current states of all services that match the filter criteria.
	// If no filter is provided, it will send out all the states of all services for every single change.
	// The filter can be used to include or exclude specific services based on their names.
	// This is useful for scenarios where you want to monitor the states of all services from another reporting-like service.
	//
	// NOTE: The 2nd return value is a cancel function that can be used to stop watching the services.
	// The channel will stay open until this cancel function is called or the parent context is done.
	// The caller should always call the cancel function to avoid leaking resources.
	// The channel will send out the states of all services that match the filter criteria.
	WatchAllStates(ServiceFilter) (<-chan ServiceStates, context.CancelFunc)

	// WatchAnyServices allows the caller to specify a list of services by unique name and their target state and action.
	// It returns a channel that will signal when ANY of the services given match the action and target state.
	// The moment a single service reaches the desired state, the channel will send out the states of those services as a signal.
	// This is useful for scenarios where you want to be notified when any group of services reaches a specific state.
	//
	// NOTE: The 2nd return value is a cancel function that can be used to stop watching the services.
	// The channel will stay open until this cancel function is called or the parent context is done.
	// The caller should always call the cancel function to avoid leaking resources.
	// This is useful for scenarios where you want to be notified when any of the services reaches a specific state.
	WatchAnyServices(action ServiceAction, target State, services ...string) (<-chan ServiceStates, context.CancelFunc)

	// WatchAllServices allows the caller to specify a list of services by unique name and their target state and action.
	// It returns a channel that will only signal when ALL the services given match the action and target state.
	// Only once all the services reach the desired state, the channel will send out the states of those services as a signal.
	// This is useful for scenarios where you want to wait for multiple services to reach a specific state before proceeding.
	//
	// NOTE: The 2nd return value is a cancel function that can be used to stop watching the services.
	// The channel will stay open until this cancel function is called or the parent context is done.
	// The caller should always call the cancel function to avoid leaking resources.
	WatchAllServices(action ServiceAction, target State, services ...string) (<-chan ServiceStates, context.CancelFunc)
}

ServiceWatcher is an interface for watching service states.

type Stage added in v1.2.1

type Stage struct {
	Name string
	Func StageFunc
}

Stage represents a single stage in the pipeline. It has a name for identification and a function to execute.

type StageFunc added in v1.2.1

type StageFunc func(ctx context.Context) error

StageFunc is a function type that represents a stage in the pipeline. It takes a context and returns an error if the stage fails.

type State

type State uint8

State respresents the given state of a service.

const (
	// StateUnknown is the default state of a service when it is not initialized or has no state.
	StateUnknown State = iota // 0
	// StateExit is the state when a service is exiting or has exited.
	StateExit // 1
	// StateInit is the state when a service is being initialized.
	StateInit // 2
	// StateIdle is the state when a service is idle and ready to run.
	StateIdle // 3
	// StateRun is the state when a service is running.
	StateRun // 4
	// StateStop is the state when a service is stopping or has stopped.
	StateStop // 5
)

func (State) String added in v1.1.2

func (s State) String() string

type StateTransition added in v1.4.0

type StateTransition uint8

StateTransition represents the transition of a service from one state to another.

const (
	// TransitionUnknown is the default transition when a service has no state or is not initialized.
	TransitionUnknown StateTransition = iota // 0
	// TransitionEntering is the transition when a service is entering a new state.
	TransitionEntering // 1
	// TransitionExited is the transition when a service has exited a state.
	TransitionExited // 2
)

type StateUpdate added in v1.0.0

type StateUpdate struct {
	State      State           // e.g. StateRun
	Transition StateTransition // e.g. TransitionEntering
}

StateUpdate is a DTO used by the states watcher to build a states map of all services.

Directories

Path Synopsis
examples
multi_service
For this example we will create two services that will run until they are stopped.
For this example we will create two services that will run until they are stopped.
single_service
For this example we will create a simple API service that will run until it is stopped either via the context timeout (30s) or an OS signal (SIGINT, SIGTERM).
For this example we will create a simple API service that will run until it is stopped either via the context timeout (30s) or an OS signal (SIGINT, SIGTERM).
Package intracom provides a lightweight way to create topics and subscriptions used for communication between different parts of an application running concurrently.
Package intracom provides a lightweight way to create topics and subscriptions used for communication between different parts of an application running concurrently.
log
Package log provides a simple logging interface with different levels of logging.
Package log provides a simple logging interface with different levels of logging.
journald
Package journald provides a log handler that writes logs to journald.
Package journald provides a log handler that writes logs to journald.
pkg
rpc
Package rpc provides a client for making remote procedure calls (RPC) to a server.
Package rpc provides a client for making remote procedure calls (RPC) to a server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL