Documentation
¶
Overview ¶
Package simulator provides the core simulation engine and infrastructure for stochadex simulations. It includes the main simulation loop, state management, partition coordination, and execution control mechanisms.
Key Features:
- Partition-based simulation architecture
- Concurrent execution with goroutine coordination
- State history management and time tracking
- Configurable termination and output conditions
- Flexible timestep control
- Storage and persistence utilities
Architecture: The simulator uses a partition-based approach where simulations are divided into independent partitions that can be executed concurrently. Each partition maintains its own state history and can communicate with other partitions through defined interfaces.
Usage Patterns:
- Configure and run complex multi-partition simulations
- Manage simulation state across multiple timesteps
- Coordinate concurrent execution of simulation components
- Store and retrieve simulation results and intermediate states
- Implement custom termination and output conditions
Index ¶
- Variables
- func RunWithHarnesses(settings *Settings, implementations *Implementations) error
- type ConfigGenerator
- func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)
- func (c *ConfigGenerator) GetGlobalSeed() uint64
- func (c *ConfigGenerator) GetPartition(name string) *PartitionConfig
- func (c *ConfigGenerator) GetSimulation() *SimulationConfig
- func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)
- func (c *ConfigGenerator) SetGlobalSeed(seed uint64)
- func (c *ConfigGenerator) SetPartition(config *PartitionConfig)
- func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)
- type ConstantTimestepFunction
- type CumulativeTimestepsHistory
- type DownstreamStateValues
- type EveryNStepsOutputCondition
- type EveryStepOutputCondition
- type ExponentialDistributionTimestepFunction
- type Implementations
- type Iteration
- type IterationSettings
- type IterationTestHarness
- type IteratorInputMessage
- type JsonLogChannelOutputFunction
- type JsonLogEntry
- type JsonLogOutputFunction
- type NamedPartitionIndex
- type NamedUpstreamConfig
- type NilOutputCondition
- type NilOutputFunction
- type NumberOfStepsTerminationCondition
- type OnlyGivenPartitionsOutputCondition
- type OutputCondition
- type OutputFunction
- type Params
- func (p *Params) Get(name string) []float64
- func (p *Params) GetCopy(name string) []float64
- func (p *Params) GetCopyOk(name string) ([]float64, bool)
- func (p *Params) GetIndex(name string, index int) float64
- func (p *Params) GetOk(name string) ([]float64, bool)
- func (p *Params) Set(name string, values []float64)
- func (p *Params) SetIndex(name string, index int, value float64)
- func (p *Params) SetPartitionName(name string)
- type PartitionConfig
- type PartitionConfigOrdering
- type PartitionCoordinator
- type PartitionState
- func (*PartitionState) Descriptor() ([]byte, []int)deprecated
- func (x *PartitionState) GetCumulativeTimesteps() float64
- func (x *PartitionState) GetPartitionName() string
- func (x *PartitionState) GetState() []float64
- func (*PartitionState) ProtoMessage()
- func (x *PartitionState) ProtoReflect() protoreflect.Message
- func (x *PartitionState) Reset()
- func (x *PartitionState) String() string
- type Settings
- type SimulationConfig
- type SimulationConfigStrings
- type StateHistory
- type StateIterator
- type StateTimeStorage
- func (s *StateTimeStorage) ConcurrentAppend(name string, time float64, values []float64)
- func (s *StateTimeStorage) GetIndex(name string) int
- func (s *StateTimeStorage) GetNames() []string
- func (s *StateTimeStorage) GetTimes() []float64
- func (s *StateTimeStorage) GetValues(name string) [][]float64
- func (s *StateTimeStorage) SetTimes(times []float64)
- func (s *StateTimeStorage) SetValues(name string, values [][]float64)
- type StateTimeStorageOutputFunction
- type StateValueChannels
- type StdoutOutputFunction
- type TerminationCondition
- type TimeElapsedTerminationCondition
- type TimestepFunction
- type UpstreamConfig
- type UpstreamStateValues
- type WebsocketOutputFunction
Constants ¶
This section is empty.
Variables ¶
var File_cmd_messages_partition_state_proto protoreflect.FileDescriptor
Functions ¶
func RunWithHarnesses ¶
func RunWithHarnesses(settings *Settings, implementations *Implementations) error
RunWithHarnesses runs all iterations, each wrapped in a test harness and returns any errors if found. The simulation is also run twice to check for statefulness residues.
Types ¶
type ConfigGenerator ¶
type ConfigGenerator struct {
// contains filtered or unexported fields
}
ConfigGenerator builds Settings and Implementations programmatically and can generate runnable configs on demand.
func NewConfigGenerator ¶
func NewConfigGenerator() *ConfigGenerator
NewConfigGenerator creates a new ConfigGenerator with empty ordering.
func (*ConfigGenerator) GenerateConfigs ¶
func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)
GenerateConfigs constructs Settings and Implementations ready to run. It computes state widths, converts named references, and configures iterations with their partition indices.
func (*ConfigGenerator) GetGlobalSeed ¶
func (c *ConfigGenerator) GetGlobalSeed() uint64
GetGlobalSeed returns the current global seed.
func (*ConfigGenerator) GetPartition ¶
func (c *ConfigGenerator) GetPartition(name string) *PartitionConfig
GetPartition retrieves a partition config by name.
func (*ConfigGenerator) GetSimulation ¶
func (c *ConfigGenerator) GetSimulation() *SimulationConfig
GetSimulation returns the current simulation config.
func (*ConfigGenerator) ResetPartition ¶
func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)
ResetPartition replaces the config for a partition by name.
func (*ConfigGenerator) SetGlobalSeed ¶
func (c *ConfigGenerator) SetGlobalSeed(seed uint64)
SetGlobalSeed assigns a random seed to each partition derived from the provided global seed.
func (*ConfigGenerator) SetPartition ¶
func (c *ConfigGenerator) SetPartition(config *PartitionConfig)
SetPartition adds a new partition config. Names must be unique.
func (*ConfigGenerator) SetSimulation ¶
func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)
SetSimulation sets the current simulation config.
type ConstantTimestepFunction ¶
type ConstantTimestepFunction struct {
Stepsize float64
}
ConstantTimestepFunction uses a fixed stepsize.
func (*ConstantTimestepFunction) NextIncrement ¶
func (t *ConstantTimestepFunction) NextIncrement( timestepsHistory *CumulativeTimestepsHistory, ) float64
type CumulativeTimestepsHistory ¶
type CumulativeTimestepsHistory struct {
NextIncrement float64
Values *mat.VecDense
CurrentStepNumber int
StateHistoryDepth int
}
CumulativeTimestepsHistory is a rolling window of cumulative timesteps with NextIncrement and CurrentStepNumber.
type DownstreamStateValues ¶
DownstreamStateValues contains information to broadcast state values to downstream iterators via channel.
type EveryNStepsOutputCondition ¶
type EveryNStepsOutputCondition struct {
N int
// contains filtered or unexported fields
}
EveryNStepsOutputCondition emits output once every N steps.
func (*EveryNStepsOutputCondition) IsOutputStep ¶
func (c *EveryNStepsOutputCondition) IsOutputStep( partitionName string, state []float64, cumulativeTimesteps float64, ) bool
type EveryStepOutputCondition ¶
type EveryStepOutputCondition struct{}
EveryStepOutputCondition calls the OutputFunction at every step.
func (*EveryStepOutputCondition) IsOutputStep ¶
func (c *EveryStepOutputCondition) IsOutputStep( partitionName string, state []float64, cumulativeTimesteps float64, ) bool
type ExponentialDistributionTimestepFunction ¶
type ExponentialDistributionTimestepFunction struct {
Mean float64
Seed uint64
// contains filtered or unexported fields
}
ExponentialDistributionTimestepFunction draws dt from an exponential distribution parameterised by Mean and Seed.
func NewExponentialDistributionTimestepFunction ¶
func NewExponentialDistributionTimestepFunction( mean float64, seed uint64, ) *ExponentialDistributionTimestepFunction
NewExponentialDistributionTimestepFunction constructs an exponential-dt timestep function given mean and seed.
func (*ExponentialDistributionTimestepFunction) NextIncrement ¶
func (t *ExponentialDistributionTimestepFunction) NextIncrement( timestepsHistory *CumulativeTimestepsHistory, ) float64
type Implementations ¶
type Implementations struct {
Iterations []Iteration
OutputCondition OutputCondition
OutputFunction OutputFunction
TerminationCondition TerminationCondition
TimestepFunction TimestepFunction
}
Implementations provides concrete implementations for a simulation run.
type Iteration ¶
type Iteration interface {
Configure(partitionIndex int, settings *Settings)
Iterate(
params *Params,
partitionIndex int,
stateHistories []*StateHistory,
timestepsHistory *CumulativeTimestepsHistory,
) []float64
}
Iteration defines the interface for per-partition state update functions in stochadex simulations.
The Iteration interface is the fundamental building block for defining how simulation state evolves over time. Each partition in a simulation uses an Iteration to compute its next state values based on the current state, parameters, and time information.
Design Philosophy: The Iteration interface emphasizes modularity and composability. By providing a simple, well-defined interface, it enables the creation of complex simulations through the combination of simple, focused iterations. This design supports both built-in iteration types and custom user-defined iterations.
Interface Methods:
- Configure: Initialize the iteration with simulation settings (called once)
- Iterate: Compute the next state values (called each simulation step)
Configuration Phase: Configure is called once per partition during simulation setup. It receives:
- partitionIndex: The index of this partition in the simulation
- settings: Global simulation settings and configuration
This phase is used for:
- Initializing random number generators
- Setting up internal data structures
- Configuring iteration-specific parameters
- Validating configuration parameters
Iteration Phase: Iterate is called each simulation step to compute the next state values. It receives:
- params: Current simulation parameters for this partition
- partitionIndex: The index of this partition
- stateHistories: State histories for all partitions (for cross-partition access)
- timestepsHistory: Time and timestep information
It must return:
- []float64: The next state values for this partition
Implementation Requirements:
- Configure must be called before Iterate
- Iterate must return a slice of the correct length (matching state width)
- Iterate should not modify the input parameters or state histories
- Iterate should be deterministic given the same inputs and initial seed (for reproducible simulations)
Example Usage:
type MyIteration struct {
// Internal state
}
func (m *MyIteration) Configure(partitionIndex int, settings *Settings) {
// Initialize iteration
}
func (m *MyIteration) Iterate(params *Params, partitionIndex int,
stateHistories []*StateHistory,
timestepsHistory *CumulativeTimestepsHistory) []float64 {
// Compute next state values
return []float64{newValue1, newValue2, ...}
}
Common Iteration Types:
- Stochastic processes: WienerProcessIteration, PoissonProcessIteration
- Deterministic functions: ValuesFunctionIteration, ConstantValuesIteration
- Aggregation functions: VectorMeanIteration, GroupedAggregationIteration
- User-defined iterations: Custom implementations for specific needs
Performance Considerations:
- Iterate is called frequently during simulation execution
- Implementations should be optimized for performance
- Avoid expensive computations or memory allocations in Iterate
- Consider caching expensive computations in Configure
Thread Safety:
- Iterate may be called concurrently from multiple goroutines
- Implementations should be thread-safe or stateless
type IterationSettings ¶
type IterationSettings struct {
Name string `yaml:"name"`
Params Params `yaml:"params"`
ParamsFromUpstream map[string]UpstreamConfig `yaml:"params_from_upstream,omitempty"`
InitStateValues []float64 `yaml:"init_state_values"`
Seed uint64 `yaml:"seed"`
StateWidth int `yaml:"state_width"`
StateHistoryDepth int `yaml:"state_history_depth"`
}
IterationSettings is the YAML-loadable per-partition configuration.
Usage hints:
- Name is used to address partitions in other configs and params maps.
- ParamsFromUpstream forwards outputs from upstream partitions into Params.
- StateWidth and StateHistoryDepth control the size and depth of state.
type IterationTestHarness ¶
type IterationTestHarness struct {
Iteration Iteration
Err error
// contains filtered or unexported fields
}
IterationTestHarness wraps an iteration and performs checks on its behaviour while running.
func (*IterationTestHarness) Configure ¶
func (h *IterationTestHarness) Configure( partitionIndex int, settings *Settings, )
func (*IterationTestHarness) Iterate ¶
func (h *IterationTestHarness) Iterate( params *Params, partitionIndex int, stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory, ) []float64
type IteratorInputMessage ¶
type IteratorInputMessage struct {
StateHistories []*StateHistory
TimestepsHistory *CumulativeTimestepsHistory
}
IteratorInputMessage carries shared histories into iterator jobs.
type JsonLogChannelOutputFunction ¶
type JsonLogChannelOutputFunction struct {
// contains filtered or unexported fields
}
JsonLogChannelOutputFunction writes JSON log entries via a background goroutine using a channel for improved throughput.
func NewJsonLogChannelOutputFunction ¶
func NewJsonLogChannelOutputFunction( filePath string, ) *JsonLogChannelOutputFunction
NewJsonLogChannelOutputFunction creates a JsonLogChannelOutputFunction. Call Close (defer it) to ensure flushing at the end of a run.
func (*JsonLogChannelOutputFunction) Close ¶
func (j *JsonLogChannelOutputFunction) Close()
Close ensures that the log channel flushes at the end of a run.
type JsonLogEntry ¶
type JsonLogEntry struct {
PartitionName string `json:"partition_name"`
State []float64 `json:"state"`
CumulativeTimesteps float64 `json:"time"`
}
JsonLogEntry is the serialised record format used by JSON log outputs.
type JsonLogOutputFunction ¶
type JsonLogOutputFunction struct {
// contains filtered or unexported fields
}
JsonLogOutputFunction writes newline-delimited JSON log entries.
func NewJsonLogOutputFunction ¶
func NewJsonLogOutputFunction( filePath string, ) *JsonLogOutputFunction
NewJsonLogOutputFunction creates a new JsonLogOutputFunction.
type NamedPartitionIndex ¶
NamedPartitionIndex pairs the name of a partition with the partition index assigned to it by the PartitionCoordinator.
type NamedUpstreamConfig ¶
type NamedUpstreamConfig struct {
Upstream string `yaml:"upstream"`
Indices []int `yaml:"indices,omitempty"`
}
NamedUpstreamConfig is like UpstreamConfig but refers to upstream by name.
type NilOutputCondition ¶
type NilOutputCondition struct{}
NilOutputCondition never outputs.
func (*NilOutputCondition) IsOutputStep ¶
func (c *NilOutputCondition) IsOutputStep( partitionName string, state []float64, cumulativeTimesteps float64, ) bool
type NilOutputFunction ¶
type NilOutputFunction struct{}
NilOutputFunction outputs nothing from the simulation.
type NumberOfStepsTerminationCondition ¶
type NumberOfStepsTerminationCondition struct {
MaxNumberOfSteps int
}
NumberOfStepsTerminationCondition terminates after MaxNumberOfSteps.
func (*NumberOfStepsTerminationCondition) Terminate ¶
func (t *NumberOfStepsTerminationCondition) Terminate( stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory, ) bool
type OnlyGivenPartitionsOutputCondition ¶
OnlyGivenPartitionsOutputCondition emits output only for listed partitions.
func (*OnlyGivenPartitionsOutputCondition) IsOutputStep ¶
func (o *OnlyGivenPartitionsOutputCondition) IsOutputStep( partitionName string, state []float64, cumulativeTimesteps float64, ) bool
type OutputCondition ¶
type OutputCondition interface {
IsOutputStep(partitionName string, state []float64, cumulativeTimesteps float64) bool
}
OutputCondition decides whether an output should be emitted this step.
type OutputFunction ¶
type OutputFunction interface {
Output(partitionName string, state []float64, cumulativeTimesteps float64)
}
OutputFunction writes state/time to an output sink when the OutputCondition is met.
type Params ¶
type Params struct {
Map map[string][]float64 `yaml:",inline"`
// contains filtered or unexported fields
}
Params stores per-partition parameter values.
Usage hints:
- Use Get/GetIndex helpers to retrieve, Set/SetIndex to update.
- SetPartitionName improves error messages for missing params.
func (*Params) GetCopy ¶
GetCopy returns a copy of parameter values or panics with a helpful message.
func (*Params) GetCopyOk ¶
GetCopyOk returns a copy of parameter values if present along with a flag.
func (*Params) SetPartitionName ¶
SetPartitionName attaches the owning partition name for better errors.
type PartitionConfig ¶
type PartitionConfig struct {
Name string `yaml:"name"`
Iteration Iteration `yaml:"-"`
Params Params `yaml:"params"`
ParamsAsPartitions map[string][]string `yaml:"params_as_partitions,omitempty"`
ParamsFromUpstream map[string]NamedUpstreamConfig `yaml:"params_from_upstream,omitempty"`
InitStateValues []float64 `yaml:"init_state_values"`
StateHistoryDepth int `yaml:"state_history_depth"`
Seed uint64 `yaml:"seed"`
}
PartitionConfig defines a partition to add to a simulation.
Usage hints:
- Iteration is not YAML-serialised; set it programmatically.
- ParamsAsPartitions allows passing partition indices via their names.
- ParamsFromUpstream forwards outputs from named upstream partitions.
func LoadPartitionConfigFromYaml ¶
func LoadPartitionConfigFromYaml(path string) *PartitionConfig
LoadPartitionConfigFromYaml loads PartitionConfig from a YAML file path.
Usage hints:
- Calls Init to populate missing defaults after unmarshalling.
func (*PartitionConfig) Init ¶
func (p *PartitionConfig) Init()
Init ensures params maps are initialised; call after unmarshalling YAML.
type PartitionConfigOrdering ¶
type PartitionConfigOrdering struct {
Names []string
IndexByName map[string]int
ConfigByName map[string]*PartitionConfig
}
PartitionConfigOrdering maintains the ordering and lookup for partitions. Can be updated dynamically via Append.
func (*PartitionConfigOrdering) Append ¶
func (p *PartitionConfigOrdering) Append(config *PartitionConfig)
Append inserts another partition into the ordering and updates lookups.
type PartitionCoordinator ¶
type PartitionCoordinator struct {
Iterators []*StateIterator
TimestepFunction TimestepFunction
TerminationCondition TerminationCondition
// contains filtered or unexported fields
}
PartitionCoordinator orchestrates iteration work across partitions and applies state/time history updates in a coordinated manner.
The PartitionCoordinator is the central component that manages the execution of all partitions in a simulation. It coordinates the timing, communication, and state updates across all partitions, ensuring proper synchronization and maintaining simulation consistency.
Architecture: The coordinator uses a two-phase execution model:
- Iteration Phase: All partitions compute their next state values
- Update Phase: State and time histories are updated with new values
This design ensures that all partitions see consistent state information during each iteration, preventing race conditions and maintaining simulation determinism.
Concurrency Model:
- Each partition runs in its own goroutine for parallel execution
- Channels are used for inter-partition communication
- WaitGroups ensure proper synchronization between phases
- Shared state is protected by the coordinator's control flow
Execution Flow:
- Compute next timestep increment using TimestepFunction
- Request iterations from all partitions (parallel execution)
- Wait for all iterations to complete
- Update state and time histories (parallel execution)
- Check termination condition
- Repeat until termination
Fields:
- Iterators: List of StateIterators, one per partition
- Shared: Shared state and time information accessible to all partitions
- TimestepFunction: Function that determines the next timestep increment
- TerminationCondition: Condition that determines when to stop the simulation
- newWorkChannels: Communication channels for coordinating partition work
Example Usage:
coordinator := NewPartitionCoordinator(settings, implementations)
// Run simulation until termination
coordinator.Run()
// Or step-by-step control
for !coordinator.ReadyToTerminate() {
var wg sync.WaitGroup
coordinator.Step(&wg)
}
Performance:
- O(p) time complexity where p is the number of partitions
- Parallel execution of partition iterations
- Efficient channel-based communication
- Memory usage scales with partition count and state size
Thread Safety:
- Safe for concurrent access to coordinator methods
- Internal synchronization ensures consistent state updates
- Partition communication is thread-safe through channels
func NewPartitionCoordinator ¶
func NewPartitionCoordinator( settings *Settings, implementations *Implementations, ) *PartitionCoordinator
NewPartitionCoordinator wires Settings and Implementations into a runnable coordinator with initial state/time histories and channels.
func (*PartitionCoordinator) ReadyToTerminate ¶
func (c *PartitionCoordinator) ReadyToTerminate() bool
ReadyToTerminate returns whether the TerminationCondition is met.
func (*PartitionCoordinator) RequestMoreIterations ¶
func (c *PartitionCoordinator) RequestMoreIterations(wg *sync.WaitGroup)
RequestMoreIterations spawns a goroutine per partition to run ReceiveAndIteratePending.
func (*PartitionCoordinator) Run ¶
func (c *PartitionCoordinator) Run()
Run advances by repeatedly calling Step until termination.
func (*PartitionCoordinator) Step ¶
func (c *PartitionCoordinator) Step(wg *sync.WaitGroup)
Step performs one simulation tick: compute dt, request iterations, then apply state/time updates.
func (*PartitionCoordinator) UpdateHistory ¶
func (c *PartitionCoordinator) UpdateHistory(wg *sync.WaitGroup)
UpdateHistory spawns a goroutine per partition to run UpdateHistory and shifts time history forward, adding NextIncrement to t[0].
type PartitionState ¶
type PartitionState struct {
CumulativeTimesteps float64 `protobuf:"fixed64,1,opt,name=cumulative_timesteps,json=cumulativeTimesteps,proto3" json:"cumulative_timesteps,omitempty"`
PartitionName string `protobuf:"bytes,2,opt,name=partition_name,json=partitionName,proto3" json:"partition_name,omitempty"`
State []float64 `protobuf:"fixed64,3,rep,packed,name=state,proto3" json:"state,omitempty"`
// contains filtered or unexported fields
}
func (*PartitionState) Descriptor
deprecated
func (*PartitionState) Descriptor() ([]byte, []int)
Deprecated: Use PartitionState.ProtoReflect.Descriptor instead.
func (*PartitionState) GetCumulativeTimesteps ¶
func (x *PartitionState) GetCumulativeTimesteps() float64
func (*PartitionState) GetPartitionName ¶
func (x *PartitionState) GetPartitionName() string
func (*PartitionState) GetState ¶
func (x *PartitionState) GetState() []float64
func (*PartitionState) ProtoMessage ¶
func (*PartitionState) ProtoMessage()
func (*PartitionState) ProtoReflect ¶
func (x *PartitionState) ProtoReflect() protoreflect.Message
func (*PartitionState) Reset ¶
func (x *PartitionState) Reset()
func (*PartitionState) String ¶
func (x *PartitionState) String() string
type Settings ¶
type Settings struct {
Iterations []IterationSettings `yaml:"iterations"`
InitTimeValue float64 `yaml:"init_time_value"`
TimestepsHistoryDepth int `yaml:"timesteps_history_depth"`
}
Settings is the YAML-loadable top-level simulation configuration.
func LoadSettingsFromYaml ¶
LoadSettingsFromYaml loads Settings from a YAML file path.
Usage hints:
- Calls Init to populate missing defaults after unmarshalling.
type SimulationConfig ¶
type SimulationConfig struct {
OutputCondition OutputCondition
OutputFunction OutputFunction
TerminationCondition TerminationCondition
TimestepFunction TimestepFunction
InitTimeValue float64
}
SimulationConfig defines additional run-level configuration.
type SimulationConfigStrings ¶
type SimulationConfigStrings struct {
OutputCondition string `yaml:"output_condition"`
OutputFunction string `yaml:"output_function"`
TerminationCondition string `yaml:"termination_condition"`
TimestepFunction string `yaml:"timestep_function"`
InitTimeValue float64 `yaml:"init_time_value"`
}
SimulationConfigStrings is the YAML-loadable version of SimulationConfig, referring to implementations by type names for templating.
func LoadSimulationConfigStringsFromYaml ¶
func LoadSimulationConfigStringsFromYaml(path string) *SimulationConfigStrings
LoadSimulationConfigStringsFromYaml loads SimulationConfigStrings from YAML.
type StateHistory ¶
type StateHistory struct {
// each row is a different state in the history, by convention,
// starting with the most recent at index = 0
Values *mat.Dense
// should be of length = StateWidth
NextValues []float64
StateWidth int
StateHistoryDepth int
}
StateHistory is a rolling window of state vectors.
Usage hints:
- Values holds rows of state (row 0 is most recent by convention).
- Use GetNextStateRowToUpdate when updating in multi-row histories.
func (*StateHistory) CopyStateRow ¶
func (s *StateHistory) CopyStateRow(index int) []float64
CopyStateRow copies a row from the state history given the index.
func (*StateHistory) GetNextStateRowToUpdate ¶
func (s *StateHistory) GetNextStateRowToUpdate() []float64
GetNextStateRowToUpdate determines whether or not it is necessary to copy the previous row or simply expose it based on whether a history longer than 1 is needed.
type StateIterator ¶
type StateIterator struct {
Iteration Iteration
Params Params
Partition NamedPartitionIndex
ValueChannels StateValueChannels
OutputCondition OutputCondition
OutputFunction OutputFunction
}
StateIterator runs an Iteration for a partition on a goroutine and manages reads/writes to history and output.
func NewStateIterator ¶
func NewStateIterator( iteration Iteration, params Params, partitionName string, partitionIndex int, valueChannels StateValueChannels, outputCondition OutputCondition, outputFunction OutputFunction, initState []float64, initTime float64, ) *StateIterator
NewStateIterator creates a StateIterator and may emit initial output if the condition is met by the initial state/time.
func (*StateIterator) Iterate ¶
func (s *StateIterator) Iterate( stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory, ) []float64
Iterate runs the Iteration and optionally triggers output if the condition is met for the new state/time.
func (*StateIterator) ReceiveAndIteratePending ¶
func (s *StateIterator) ReceiveAndIteratePending( inputChannel <-chan *IteratorInputMessage, )
ReceiveAndIteratePending listens for an IteratorInputMessage, updates upstream-driven params, runs Iterate, and stores a pending state update.
func (*StateIterator) UpdateHistory ¶
func (s *StateIterator) UpdateHistory(inputChannel <-chan *IteratorInputMessage)
UpdateHistory applies the pending state update to the partition history.
type StateTimeStorage ¶
type StateTimeStorage struct {
// contains filtered or unexported fields
}
StateTimeStorage provides thread-safe storage for simulation time series data with minimal contention and efficient access patterns.
StateTimeStorage is designed to handle concurrent access from multiple simulation partitions while maintaining data consistency and performance. It uses a mutex-protected design optimized for the common case of appending new data points during simulation execution.
Data Organization:
- Time series are organized by partition name
- Each partition can have multiple state dimensions
- Time axis is shared across all partitions
- Data is stored in row-major format for efficient access
Thread Safety:
- ConcurrentAppend is safe for concurrent use from multiple goroutines
- GetValues/GetTimes are safe for concurrent reads
- SetValues/SetTimes should not be called concurrently with appends
- Internal mutex protects against race conditions
Performance Characteristics:
- O(1) lookup by partition name using hash map
- O(1) append operations with minimal locking
- Memory usage: O(total_samples * state_dimensions)
- Efficient for high-frequency data collection
Usage Patterns:
- Real-time data collection during simulation runs
- Batch data loading from external sources
- Result storage for post-simulation analysis
- Intermediate storage for multi-stage simulations
Example Usage:
storage := NewStateTimeStorage()
// Concurrent appends from multiple partitions
go func() {
storage.ConcurrentAppend("prices", 1.0, []float64{100.0, 101.0})
}()
go func() {
storage.ConcurrentAppend("volumes", 1.0, []float64{1000.0})
}()
// Retrieve data after simulation
priceData := storage.GetValues("prices")
timeData := storage.GetTimes()
Memory Management:
- Automatic memory allocation for new partitions
- Efficient storage of sparse time series
- No automatic cleanup (caller responsible for memory management)
Error Handling:
- GetValues panics if partition name is not found
- Provides helpful error messages with available partition names
- ConcurrentAppend handles time deduplication automatically
func NewStateTimeStorage ¶
func NewStateTimeStorage() *StateTimeStorage
NewStateTimeStorage constructs a new StateTimeStorage.
func (*StateTimeStorage) ConcurrentAppend ¶
func (s *StateTimeStorage) ConcurrentAppend( name string, time float64, values []float64, )
ConcurrentAppend appends values for name and updates the time axis at most once per unique timestamp. Safe for concurrent use.
func (*StateTimeStorage) GetIndex ¶
func (s *StateTimeStorage) GetIndex(name string) int
GetIndex returns or creates the index for a name.
func (*StateTimeStorage) GetNames ¶
func (s *StateTimeStorage) GetNames() []string
GetNames returns all partition names present in the store.
func (*StateTimeStorage) GetTimes ¶
func (s *StateTimeStorage) GetTimes() []float64
GetTimes returns the time axis.
func (*StateTimeStorage) GetValues ¶
func (s *StateTimeStorage) GetValues(name string) [][]float64
GetValues returns all time series for name, panicking if absent.
func (*StateTimeStorage) SetTimes ¶
func (s *StateTimeStorage) SetTimes(times []float64)
SetTimes replaces the time axis.
func (*StateTimeStorage) SetValues ¶
func (s *StateTimeStorage) SetValues(name string, values [][]float64)
SetValues replaces the entire series for name.
type StateTimeStorageOutputFunction ¶
type StateTimeStorageOutputFunction struct {
Store *StateTimeStorage
}
StateTimeStorageOutputFunction stores output into StateTimeStorage when the condition is met.
type StateValueChannels ¶
type StateValueChannels struct {
Upstreams map[string]*UpstreamStateValues
Downstream *DownstreamStateValues
}
StateValueChannels provides upstream/downstream channels for inter-iterator communication.
func (*StateValueChannels) BroadcastDownstream ¶
func (s *StateValueChannels) BroadcastDownstream(stateValues []float64)
BroadcastDownstream sends state values to all configured downstream copies.
func (*StateValueChannels) UpdateUpstreamParams ¶
func (s *StateValueChannels) UpdateUpstreamParams(params *Params)
UpdateUpstreamParams updates Params with values received from upstream channels.
type StdoutOutputFunction ¶
type StdoutOutputFunction struct{}
StdoutOutputFunction outputs the state to the terminal.
type TerminationCondition ¶
type TerminationCondition interface {
Terminate(
stateHistories []*StateHistory,
timestepsHistory *CumulativeTimestepsHistory,
) bool
}
TerminationCondition decides when the simulation should end.
type TimeElapsedTerminationCondition ¶
type TimeElapsedTerminationCondition struct {
MaxTimeElapsed float64
}
TimeElapsedTerminationCondition terminates after MaxTimeElapsed.
func (*TimeElapsedTerminationCondition) Terminate ¶
func (t *TimeElapsedTerminationCondition) Terminate( stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory, ) bool
type TimestepFunction ¶
type TimestepFunction interface {
NextIncrement(
timestepsHistory *CumulativeTimestepsHistory,
) float64
}
TimestepFunction computes the next time increment.
type UpstreamConfig ¶
type UpstreamConfig struct {
Upstream int `yaml:"upstream"`
Indices []int `yaml:"indices,omitempty"`
}
UpstreamConfig is the YAML-loadable representation of a slice of data from the output of a partition which is computationally upstream.
type UpstreamStateValues ¶
UpstreamStateValues contains information to receive state values from an upstream iterator via channel.
type WebsocketOutputFunction ¶
type WebsocketOutputFunction struct {
// contains filtered or unexported fields
}
WebsocketOutputFunction serialises and sends outputs via a websocket connection when the condition is met.
func NewWebsocketOutputFunction ¶
func NewWebsocketOutputFunction( connection *websocket.Conn, mutex *sync.Mutex, ) *WebsocketOutputFunction
NewWebsocketOutputFunction constructs a WebsocketOutputFunction with a connection and a mutex for safe concurrent writes.