Documentation
¶
Overview ¶
Package rxd provides a reactive daemon framework for managing concurrent services. It allows you to define services, manage their lifecycles, and handle inter-service communication. The daemon can be configured with various options such as prestart pipelines, service managers, and logging.
The reactive part of the daemon lies in the callers ability to define custom service runners that can react to lifecycles of other services, handle state transitions, and communicate with the daemon. The daemon itself reacts to OS signals and is able to gracefully signal shutdown to all services, the caller's implementation must be context-aware from within their service runners to handle the lifecycle of their services properly.
Index ¶
- Variables
- func IsCancelled(err error) bool
- type CommandHandler
- type Daemon
- type DaemonLog
- type DaemonOption
- func WithCustomPrestartPipeline(prestart Pipeline) DaemonOption
- func WithInternalLogger(logger log.Logger) DaemonOption
- func WithInternalLogging(filepath string, level log.Level) DaemonOption
- func WithPrestart(conf PrestartConfig, stages ...Stage) DaemonOption
- func WithRPC(cfg RPCConfig) DaemonOption
- func WithReportAlive(timeoutSecs uint64) DaemonOption
- func WithServiceLogger(logger log.Logger) DaemonOption
- func WithSignals(signals ...os.Signal) DaemonOption
- type DaemonService
- type ErrUninitialized
- type Error
- type FilterMode
- type ManagerOption
- type ManagerStateTimeouts
- type Pipeline
- type PrestartConfig
- type RPCConfig
- type RPCServer
- type RunContinuousManager
- type RunUntilSuccessManager
- type Service
- type ServiceAction
- type ServiceContext
- type ServiceFilter
- type ServiceLogger
- type ServiceManager
- type ServiceOption
- type ServiceRunner
- type ServiceStateUpdate
- type ServiceStates
- type ServiceWatcher
- type Stage
- type StageFunc
- type State
- type StateTransition
- type StateUpdate
Constants ¶
This section is empty.
Variables ¶
var NoFilter = ServiceFilter{Mode: None, Names: map[string]struct{}{}}
NoFilter is a predefined ServiceFilter that does not filter any services.
Functions ¶
func IsCancelled ¶ added in v1.4.5
IsCancelled checks if the error is a context cancellation error.
Types ¶
type CommandHandler ¶ added in v1.2.0
type CommandHandler struct {
// contains filtered or unexported fields
}
CommandHandler is a struct that implements the RPC service for handling commands.
func (CommandHandler) ChangeLogLevel ¶ added in v1.2.0
func (h CommandHandler) ChangeLogLevel(level log.Level, _ *error) error
ChangeLogLevel changes the log level for the service and internal logger.
type Daemon ¶ added in v1.2.0
type Daemon interface { AddServices(services ...Service) error AddService(service Service) error Start(ctx context.Context) error }
Daemon is the interface for the reactive daemon. It allows you to add services, start the daemon, and manage the lifecycle of the services.
func NewDaemon ¶
func NewDaemon(name string, options ...DaemonOption) Daemon
NewDaemon creates and return an instance of the reactive daemon NOTE: The service logger runs with a default stdout logger. This can be optionally changed by passing the WithServiceLogger option in NewDaemon The internal logger is disabled by default and can be enabled by passing the WithInternalLogger option in NewDaemon
type DaemonOption ¶ added in v1.2.0
type DaemonOption func(*daemon)
DaemonOption is a functional option type for configuring a Daemon.
func WithCustomPrestartPipeline ¶ added in v1.2.1
func WithCustomPrestartPipeline(prestart Pipeline) DaemonOption
WithCustomPrestartPipeline allows you to set a custom prestart pipeline for the daemon. This is useful if you have a prestart pipeline that does not fit the standard configuration
func WithInternalLogger ¶ added in v1.2.0
func WithInternalLogger(logger log.Logger) DaemonOption
WithInternalLogger sets a custom logger for the daemon to use for internal logging. by default, the daemon will use a noop logger since this logger is used for rxd internals.
func WithInternalLogging ¶ added in v1.2.0
func WithInternalLogging(filepath string, level log.Level) DaemonOption
WithInternalLogging enables the internal logger to write to the filepath using the provided log level.
func WithPrestart ¶ added in v1.2.1
func WithPrestart(conf PrestartConfig, stages ...Stage) DaemonOption
WithPrestart sets the prestart pipeline configration and stages for the daemon. The prestart pipeline will run before the daemon starts and can be used to perform any necessary setup or checks before the main daemon logic begins.
func WithRPC ¶ added in v1.2.0
func WithRPC(cfg RPCConfig) DaemonOption
WithRPC enables an RPC server to run alongside the daemon. The RPC server will be available at the provided address and port. Currently the RPC server only supports a single method to change log level. An RPC client is provided in the pkg/rxrpc package for external use.
func WithReportAlive ¶ added in v1.2.0
func WithReportAlive(timeoutSecs uint64) DaemonOption
WithReportAlive sets the interval in seconds for when the daemon should report that it is still alive to the service manager. If the value is set to 0, the daemon will not interact with the service manager.
func WithServiceLogger ¶ added in v1.2.0
func WithServiceLogger(logger log.Logger) DaemonOption
WithServiceLogger sets a custom logger for the daemon to pass along for all services to use.
func WithSignals ¶ added in v1.2.0
func WithSignals(signals ...os.Signal) DaemonOption
WithSignals sets the OS signals that the daemon should listen for. If no signals are provided, the daemon will listen for SIGINT and SIGTERM by default.
type DaemonService ¶ added in v1.2.0
type DaemonService struct { Name string Runner ServiceRunner }
DaemonService is a DTO passed to a service manager containing the Name of the service along with the the ServiceRunner implementation for the manager to use. Every service is wrapped in a Service Manager to handle the lifecycle of the service.
type ErrUninitialized ¶ added in v1.2.0
ErrUninitialized is an error type that indicates a struct is uninitialized
func (ErrUninitialized) Error ¶ added in v1.2.0
func (e ErrUninitialized) Error() string
type Error ¶
type Error string
Error is a simple string type that implements the error interface.
const ( // ErrLifecycleDone indicates that the lifecycle has completed. ErrLifecycleDone Error = Error("lifecycle done") // ErrDaemonStarted indicates that the daemon has already been started. ErrDaemonStarted Error = Error("daemon has already been started") // ErrDuplicateServiceName indicates that a service with the same name already exists. ErrDuplicateServiceName Error = Error("duplicate service name found") // ErrNoServices indicates that there are no services to run. ErrNoServices Error = Error("no services to run") // ErrNoServiceName indicates that no service name was provided. ErrNoServiceName Error = Error("no service name provided") // ErrNilService indicates that a nil service was provided. ErrNilService Error = Error("nil service provided") // ErrDuplicateServicePolicy indicates that a service policy with the same name already exists. ErrDuplicateServicePolicy Error = Error("duplicate service policy found") // ErrAddingServiceOnceStarted indicates that a service cannot be added after the daemon has started. ErrAddingServiceOnceStarted Error = Error("cannot add a service once the daemon is started") )
type FilterMode ¶ added in v1.2.0
type FilterMode uint8
FilterMode represents the mode of filtering services based on their names.
const ( // None represents no filtering mode. // In this mode, all services are considered, and no names are filtered out. None FilterMode = iota // Include represents a filtering mode where only specified service names are included. // In this mode, only the services whose names are in the Names set will be included in operations. Include // Exclude represents a filtering mode where specified service names are excluded. // In this mode, all services are considered except those whose names are in the Names set. Exclude )
type ManagerOption ¶ added in v1.2.0
type ManagerOption func(m *RunContinuousManager)
ManagerOption is a functional option type for configuring a RunContinuousManager.
func WithInitDelay ¶ added in v1.2.0
func WithInitDelay(delay time.Duration) ManagerOption
WithInitDelay allows setting a custom delay before the service is initialized. This is useful for services that need to wait for other dependencies to be ready.
func WithTransitionTimeouts ¶ added in v1.2.0
func WithTransitionTimeouts(t ManagerStateTimeouts) ManagerOption
WithTransitionTimeouts allows setting specific timeouts for state transitions.
type ManagerStateTimeouts ¶ added in v1.2.0
ManagerStateTimeouts is a map of State to time.Duration. This is used to define the timeouts for each state transition in a service manager.
type Pipeline ¶ added in v1.2.1
Pipeline is an interface that defines a sequence of stages to be executed in order. Each stage is a function that takes a context and returns an error.
func NewPrestartPipeline ¶ added in v1.2.1
func NewPrestartPipeline(conf PrestartConfig, stages ...Stage) Pipeline
NewPrestartPipeline creates a new prestart pipeline with the given configuration and stages. The pipeline will run the stages in order and can be configured to restart on error with a delay. If RestartOnError is true, the pipeline will restart from the beginning if an error occurs.
type PrestartConfig ¶ added in v1.2.1
PrestartConfig holds the configuration for a prestart pipeline.
type RPCConfig ¶ added in v1.2.0
RPCConfig holds the configuration for the RPC server. It includes the address and port on which the server will listen. The address can be an IP or a hostname, and the port is a uint16. The server will listen on the specified address and port for incoming RPC requests.
type RPCServer ¶ added in v1.2.0
type RPCServer struct {
// contains filtered or unexported fields
}
RPCServer is a struct that represents an RPC server. It contains an HTTP server that listens for incoming RPC requests.
func NewRPCHandler ¶ added in v1.2.0
NewRPCHandler creates a new RPC server with the given configuration. It initializes the HTTP server and registers the CommandHandler service. The CommandHandler service provides methods for changing log levels and sending commands to services.
type RunContinuousManager ¶ added in v1.2.0
type RunContinuousManager struct { DefaultDelay time.Duration StartupDelay time.Duration StateTimeouts ManagerStateTimeouts }
RunContinuousManager is a service handler that does its best to run the service moving the service to the next desired state returned from each lifecycle The handle will override the state transition if the context is cancelled and force the service to Exit.
func NewDefaultManager ¶ added in v1.2.0
func NewDefaultManager(opts ...ManagerOption) RunContinuousManager
NewDefaultManager creates a new RunContinuousManager with the provided options. The StartupDelay is used to delay the first run of the service after the manager is started. The DefaultDelay is used for all subsequent state transitions if not overridden in the StateTimeouts map.
func (RunContinuousManager) Manage ¶ added in v1.2.0
func (m RunContinuousManager) Manage(sctx ServiceContext, ds DaemonService, updateC chan<- ServiceStateUpdate)
Manage will run the service continuously until the context is cancelled. It will start from the init state, then idle, run, stop and back to init. The only condition that will cause the service to exit is if the context is cancelled. It is up to the caller to decide how to track and handle waiting between any state transitions beyond the defaults. The default state timeouts are used if not overridden in the StateTimeouts map.
type RunUntilSuccessManager ¶ added in v1.2.1
RunUntilSuccessManager represents a service manager that will run the service lifecycles until the service exits Run with nil error. It will retry the service lifecycles with a delay between attempts. The manager will start with a startup delay and then use the default delay for subsequent attempts. If the service stops, it will re-init
func NewRunUntilSuccessManager ¶ added in v1.2.1
func NewRunUntilSuccessManager(defaultDelay, startupDelay time.Duration) RunUntilSuccessManager
NewRunUntilSuccessManager creates a new RunUntilSuccessManager with the provided default and startup delays. The startup delay is used to delay the first run of the service after the manager is started. The default delay is used for subsequent runs after the service has stopped or exited.
func (RunUntilSuccessManager) Manage ¶ added in v1.2.1
func (m RunUntilSuccessManager) Manage(sctx ServiceContext, ds DaemonService, updateC chan<- ServiceStateUpdate)
Manage will run the service until it exits Run with a nil error. It will start with a startup delay, then run the service lifecycles in a loop. This manager is ideal for situation where you have a service that needs to succeed only a single time during its Run lifecycle. This manager usually indicates that the service isn't intended to run continuously for as long as the daemon is running, but rather it is expected to run until it succeeds and then exit. It can be use to perform one-off tasks or used to signal to other rxd services that it has completed its task before they move to their next state(s).
type Service ¶
type Service struct { Name string Runner ServiceRunner Manager ServiceManager }
Service acts as a DTO that carries the Name of the service along with the ServiceRunner implementation and an optional ServiceManager. This is normally passed to the Daemon via AddService or AddServices methods.
func NewService ¶
func NewService(name string, runner ServiceRunner, opts ...ServiceOption) Service
NewService creates a new Service DTO with the provided name, runner and options. This DTO is used to register a service with the Daemon via AddService or AddServices methods.
type ServiceAction ¶ added in v1.2.0
type ServiceAction uint8
ServiceAction represents different actions that can be watched for in a service's lifecycle. It is used by the ServiceWatcher to determine how to filter what to watch when inspecting service states.
const ( // Entering indicates the service is entering a state. Entering ServiceAction = iota // 0 // Exited idicates the service has exited a state. Exited // 1 // Changing indicates the service has changed its state. // This would trigger at least twice per state: exited previous state and then entered new state. Changing // NotIn used to include all states except a given state. // This would also trigger at least twice per state: exited previous state and then entered new state. // But it would not trigger for the state that is being excluded. NotIn // 6 )
func (ServiceAction) String ¶ added in v1.2.0
func (s ServiceAction) String() string
type ServiceContext ¶
type ServiceContext interface { context.Context ServiceWatcher ServiceLogger // WithParent replaces the original context with the provided parent context. // It returns the original ServiceContext with the new context and a cancel function. // This allows for detaching the service context from its parent and allowing the caller to // cancel the context at an earlier time if needed. WithParent(ctx context.Context) (ServiceContext, context.CancelFunc) // WithName allows the caller to create a new CHILD service context with a specific name. // This is useful for creating child contexts to be used for services launches additional // worker goroutines where the parent service is managing its own lifecycles of those workers. // The returned ServiceContext is composed from the original parent context, therefore it has // if the parent is cancelled, the child context will also be cancelled when the daemon is shutting down. WithName(name string) (ServiceContext, context.CancelFunc) // IntracomRegistry returns the embedded intracom registry used by the daemon for all services. // The caller can use this to register new topics, create subscriptions, and publish messages // between services. // NOTE: Avoid using topics with an _rxd prefix as these are reserved for internal use by the daemon. // overlapping this topic name may cause unexpected behavior with internal states tracking by the daemon. IntracomRegistry() *intracom.Intracom // Logger returns the service logger used to log messages related to the service. // This logger should be prefixed with its original service=<service_name> field. Logger() log.Logger }
ServiceContext is an interface that combines context.Context, ServiceWatcher, and ServiceLogger. It provides a way to manage service-specific contexts with logging and state watching capabilities. It allows for creating child contexts with specific names and fields, and provides methods to log messages
type ServiceFilter ¶ added in v1.2.0
type ServiceFilter struct { Mode FilterMode Names map[string]struct{} }
ServiceFilter is used to filter services based on their names. It can be used to include or exclude services from operations based on their names. The Mode field determines whether the names are included or excluded. The Names field is a set of service names that are either included or excluded based on the Mode. This is useful for operations that need to target specific services or avoid certain services.
func NewServiceFilter ¶ added in v1.2.0
func NewServiceFilter(mode FilterMode, names ...string) ServiceFilter
NewServiceFilter creates a new ServiceFilter with the specified mode and names. The names are stored in a set for efficient lookup.
type ServiceLogger ¶ added in v1.2.0
type ServiceLogger interface { // Log logs a message with the specified level, message, and any additional fields. Log(level log.Level, message string, extra ...log.Field) }
ServiceLogger is an interface for logging messages within a service context. It allows logging at different levels with additional fields for context. This interface is used by the ServiceContext to log messages related to service operations. It provides a way to log messages with a specific level and additional fields for context.
type ServiceManager ¶ added in v1.2.0
type ServiceManager interface {
Manage(ctx ServiceContext, dService DaemonService, updateC chan<- ServiceStateUpdate)
}
ServiceManager interface defines the behavior of a service manager. It is responsible for managing the lifecycle of a service, including transitioning between different states such as Init, Idle, Run, Stop, and Exit.
type ServiceOption ¶
type ServiceOption func(*Service)
ServiceOption is a functional option type for configuring a Service.
func WithManager ¶ added in v1.2.0
func WithManager(manager ServiceManager) ServiceOption
WithManager allows overriding the default ServiceManager for a Service.
type ServiceRunner ¶ added in v1.2.0
type ServiceRunner interface { Init(ServiceContext) error Idle(ServiceContext) error Run(ServiceContext) error Stop(ServiceContext) error }
ServiceRunner is an interface that defines the lifecycle methods for a service.
type ServiceStateUpdate ¶ added in v1.4.0
type ServiceStateUpdate struct { Name string // e.g. "service1" State State // e.g. StateRun Transition StateTransition // e.g. TransitionEntering }
ServiceStateUpdate is used to signal a state change in a service. Used by the service manager to notify the daemon states watcher of state changes happening with the service it is managing.
type ServiceStates ¶ added in v1.2.0
type ServiceStates map[string]StateUpdate
ServiceStates is a map of service names to their most recent state update. This is used by the states watcher to build a map of all services and their states.
type ServiceWatcher ¶ added in v1.2.0
type ServiceWatcher interface { // WatchAllStates allows the caller to watch all service states and filter them based on the provided filter. // It returns a channel that will signal every time there is a change in the states of the services. // The channel will send out the current states of all services that match the filter criteria. // If no filter is provided, it will send out all the states of all services for every single change. // The filter can be used to include or exclude specific services based on their names. // This is useful for scenarios where you want to monitor the states of all services from another reporting-like service. // // NOTE: The 2nd return value is a cancel function that can be used to stop watching the services. // The channel will stay open until this cancel function is called or the parent context is done. // The caller should always call the cancel function to avoid leaking resources. // The channel will send out the states of all services that match the filter criteria. WatchAllStates(ServiceFilter) (<-chan ServiceStates, context.CancelFunc) // WatchAnyServices allows the caller to specify a list of services by unique name and their target state and action. // It returns a channel that will signal when ANY of the services given match the action and target state. // The moment a single service reaches the desired state, the channel will send out the states of those services as a signal. // This is useful for scenarios where you want to be notified when any group of services reaches a specific state. // // NOTE: The 2nd return value is a cancel function that can be used to stop watching the services. // The channel will stay open until this cancel function is called or the parent context is done. // The caller should always call the cancel function to avoid leaking resources. // This is useful for scenarios where you want to be notified when any of the services reaches a specific state. WatchAnyServices(action ServiceAction, target State, services ...string) (<-chan ServiceStates, context.CancelFunc) // WatchAllServices allows the caller to specify a list of services by unique name and their target state and action. // It returns a channel that will only signal when ALL the services given match the action and target state. // Only once all the services reach the desired state, the channel will send out the states of those services as a signal. // This is useful for scenarios where you want to wait for multiple services to reach a specific state before proceeding. // // NOTE: The 2nd return value is a cancel function that can be used to stop watching the services. // The channel will stay open until this cancel function is called or the parent context is done. // The caller should always call the cancel function to avoid leaking resources. WatchAllServices(action ServiceAction, target State, services ...string) (<-chan ServiceStates, context.CancelFunc) }
ServiceWatcher is an interface for watching service states.
type Stage ¶ added in v1.2.1
Stage represents a single stage in the pipeline. It has a name for identification and a function to execute.
type StageFunc ¶ added in v1.2.1
StageFunc is a function type that represents a stage in the pipeline. It takes a context and returns an error if the stage fails.
type State ¶
type State uint8
State respresents the given state of a service.
const ( // StateUnknown is the default state of a service when it is not initialized or has no state. StateUnknown State = iota // 0 // StateExit is the state when a service is exiting or has exited. StateExit // 1 // StateInit is the state when a service is being initialized. StateInit // 2 // StateIdle is the state when a service is idle and ready to run. StateIdle // 3 // StateRun is the state when a service is running. StateRun // 4 // StateStop is the state when a service is stopping or has stopped. StateStop // 5 )
type StateTransition ¶ added in v1.4.0
type StateTransition uint8
StateTransition represents the transition of a service from one state to another.
const ( // TransitionUnknown is the default transition when a service has no state or is not initialized. TransitionUnknown StateTransition = iota // 0 // TransitionEntering is the transition when a service is entering a new state. TransitionEntering // 1 // TransitionExited is the transition when a service has exited a state. TransitionExited // 2 )
type StateUpdate ¶ added in v1.0.0
type StateUpdate struct { State State // e.g. StateRun Transition StateTransition // e.g. TransitionEntering }
StateUpdate is a DTO used by the states watcher to build a states map of all services.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
multi_service
For this example we will create two services that will run until they are stopped.
|
For this example we will create two services that will run until they are stopped. |
single_service
For this example we will create a simple API service that will run until it is stopped either via the context timeout (30s) or an OS signal (SIGINT, SIGTERM).
|
For this example we will create a simple API service that will run until it is stopped either via the context timeout (30s) or an OS signal (SIGINT, SIGTERM). |
Package intracom provides a lightweight way to create topics and subscriptions used for communication between different parts of an application running concurrently.
|
Package intracom provides a lightweight way to create topics and subscriptions used for communication between different parts of an application running concurrently. |
Package log provides a simple logging interface with different levels of logging.
|
Package log provides a simple logging interface with different levels of logging. |
journald
Package journald provides a log handler that writes logs to journald.
|
Package journald provides a log handler that writes logs to journald. |
pkg
|
|
rpc
Package rpc provides a client for making remote procedure calls (RPC) to a server.
|
Package rpc provides a client for making remote procedure calls (RPC) to a server. |