simulator

package
v0.0.0-...-61698a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2026 License: MIT Imports: 18 Imported by: 6

Documentation

Overview

Package simulator provides the core simulation engine and infrastructure for stochadex simulations. It includes the main simulation loop, state management, partition coordination, and execution control mechanisms.

Key Features:

  • Partition-based simulation architecture
  • Concurrent execution with goroutine coordination
  • State history management and time tracking
  • Configurable termination and output conditions
  • Flexible timestep control
  • Storage and persistence utilities

Architecture: The simulator uses a partition-based approach where simulations are divided into independent partitions that can be executed concurrently. Each partition maintains its own state history and can communicate with other partitions through defined interfaces.

Usage Patterns:

  • Configure and run complex multi-partition simulations
  • Manage simulation state across multiple timesteps
  • Coordinate concurrent execution of simulation components
  • Store and retrieve simulation results and intermediate states
  • Implement custom termination and output conditions

Index

Constants

This section is empty.

Variables

View Source
var File_cmd_messages_partition_state_proto protoreflect.FileDescriptor

Functions

func RunWithHarnesses

func RunWithHarnesses(settings *Settings, implementations *Implementations) error

RunWithHarnesses runs all iterations, each wrapped in a test harness and returns any errors if found. The simulation is also run twice to check for statefulness residues.

Types

type ConfigGenerator

type ConfigGenerator struct {
	// contains filtered or unexported fields
}

ConfigGenerator builds Settings and Implementations programmatically and can generate runnable configs on demand.

func NewConfigGenerator

func NewConfigGenerator() *ConfigGenerator

NewConfigGenerator creates a new ConfigGenerator with empty ordering.

func (*ConfigGenerator) GenerateConfigs

func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)

GenerateConfigs constructs Settings and Implementations ready to run. It computes state widths, converts named references, and configures iterations with their partition indices.

func (*ConfigGenerator) GetGlobalSeed

func (c *ConfigGenerator) GetGlobalSeed() uint64

GetGlobalSeed returns the current global seed.

func (*ConfigGenerator) GetPartition

func (c *ConfigGenerator) GetPartition(name string) *PartitionConfig

GetPartition retrieves a partition config by name.

func (*ConfigGenerator) GetSimulation

func (c *ConfigGenerator) GetSimulation() *SimulationConfig

GetSimulation returns the current simulation config.

func (*ConfigGenerator) ResetPartition

func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)

ResetPartition replaces the config for a partition by name.

func (*ConfigGenerator) SetGlobalSeed

func (c *ConfigGenerator) SetGlobalSeed(seed uint64)

SetGlobalSeed assigns a random seed to each partition derived from the provided global seed.

func (*ConfigGenerator) SetPartition

func (c *ConfigGenerator) SetPartition(config *PartitionConfig)

SetPartition adds a new partition config. Names must be unique.

func (*ConfigGenerator) SetSimulation

func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)

SetSimulation sets the current simulation config.

type ConstantTimestepFunction

type ConstantTimestepFunction struct {
	Stepsize float64
}

ConstantTimestepFunction uses a fixed stepsize.

func (*ConstantTimestepFunction) NextIncrement

func (t *ConstantTimestepFunction) NextIncrement(
	timestepsHistory *CumulativeTimestepsHistory,
) float64

type CumulativeTimestepsHistory

type CumulativeTimestepsHistory struct {
	NextIncrement     float64
	Values            *mat.VecDense
	CurrentStepNumber int
	StateHistoryDepth int
}

CumulativeTimestepsHistory is a rolling window of cumulative timesteps with NextIncrement and CurrentStepNumber.

type DownstreamStateValues

type DownstreamStateValues struct {
	Channel chan []float64
	Copies  int
}

DownstreamStateValues contains information to broadcast state values to downstream iterators via channel.

type EveryNStepsOutputCondition

type EveryNStepsOutputCondition struct {
	N int
	// contains filtered or unexported fields
}

EveryNStepsOutputCondition emits output once every N steps.

func (*EveryNStepsOutputCondition) IsOutputStep

func (c *EveryNStepsOutputCondition) IsOutputStep(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
) bool

type EveryStepOutputCondition

type EveryStepOutputCondition struct{}

EveryStepOutputCondition calls the OutputFunction at every step.

func (*EveryStepOutputCondition) IsOutputStep

func (c *EveryStepOutputCondition) IsOutputStep(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
) bool

type ExponentialDistributionTimestepFunction

type ExponentialDistributionTimestepFunction struct {
	Mean float64
	Seed uint64
	// contains filtered or unexported fields
}

ExponentialDistributionTimestepFunction draws dt from an exponential distribution parameterised by Mean and Seed.

func NewExponentialDistributionTimestepFunction

func NewExponentialDistributionTimestepFunction(
	mean float64,
	seed uint64,
) *ExponentialDistributionTimestepFunction

NewExponentialDistributionTimestepFunction constructs an exponential-dt timestep function given mean and seed.

func (*ExponentialDistributionTimestepFunction) NextIncrement

func (t *ExponentialDistributionTimestepFunction) NextIncrement(
	timestepsHistory *CumulativeTimestepsHistory,
) float64

type Implementations

type Implementations struct {
	Iterations           []Iteration
	OutputCondition      OutputCondition
	OutputFunction       OutputFunction
	TerminationCondition TerminationCondition
	TimestepFunction     TimestepFunction
}

Implementations provides concrete implementations for a simulation run.

type Iteration

type Iteration interface {
	Configure(partitionIndex int, settings *Settings)
	Iterate(
		params *Params,
		partitionIndex int,
		stateHistories []*StateHistory,
		timestepsHistory *CumulativeTimestepsHistory,
	) []float64
}

Iteration defines the interface for per-partition state update functions in stochadex simulations.

The Iteration interface is the fundamental building block for defining how simulation state evolves over time. Each partition in a simulation uses an Iteration to compute its next state values based on the current state, parameters, and time information.

Design Philosophy: The Iteration interface emphasizes modularity and composability. By providing a simple, well-defined interface, it enables the creation of complex simulations through the combination of simple, focused iterations. This design supports both built-in iteration types and custom user-defined iterations.

Interface Methods:

  • Configure: Initialize the iteration with simulation settings (called once)
  • Iterate: Compute the next state values (called each simulation step)

Configuration Phase: Configure is called once per partition during simulation setup. It receives:

  • partitionIndex: The index of this partition in the simulation
  • settings: Global simulation settings and configuration

This phase is used for:

  • Initializing random number generators
  • Setting up internal data structures
  • Configuring iteration-specific parameters
  • Validating configuration parameters

Iteration Phase: Iterate is called each simulation step to compute the next state values. It receives:

  • params: Current simulation parameters for this partition
  • partitionIndex: The index of this partition
  • stateHistories: State histories for all partitions (for cross-partition access)
  • timestepsHistory: Time and timestep information

It must return:

  • []float64: The next state values for this partition

Implementation Requirements:

  • Configure must be called before Iterate
  • Iterate must return a slice of the correct length (matching state width)
  • Iterate should not modify the input parameters or state histories
  • Iterate should be deterministic given the same inputs and initial seed (for reproducible simulations)

Example Usage:

type MyIteration struct {
    // Internal state
}

func (m *MyIteration) Configure(partitionIndex int, settings *Settings) {
    // Initialize iteration
}

func (m *MyIteration) Iterate(params *Params, partitionIndex int,
                              stateHistories []*StateHistory,
                              timestepsHistory *CumulativeTimestepsHistory) []float64 {
    // Compute next state values
    return []float64{newValue1, newValue2, ...}
}

Common Iteration Types:

  • Stochastic processes: WienerProcessIteration, PoissonProcessIteration
  • Deterministic functions: ValuesFunctionIteration, ConstantValuesIteration
  • Aggregation functions: VectorMeanIteration, GroupedAggregationIteration
  • User-defined iterations: Custom implementations for specific needs

Performance Considerations:

  • Iterate is called frequently during simulation execution
  • Implementations should be optimized for performance
  • Avoid expensive computations or memory allocations in Iterate
  • Consider caching expensive computations in Configure

Thread Safety:

  • Iterate may be called concurrently from multiple goroutines
  • Implementations should be thread-safe or stateless

type IterationSettings

type IterationSettings struct {
	Name               string                    `yaml:"name"`
	Params             Params                    `yaml:"params"`
	ParamsFromUpstream map[string]UpstreamConfig `yaml:"params_from_upstream,omitempty"`
	InitStateValues    []float64                 `yaml:"init_state_values"`
	Seed               uint64                    `yaml:"seed"`
	StateWidth         int                       `yaml:"state_width"`
	StateHistoryDepth  int                       `yaml:"state_history_depth"`
}

IterationSettings is the YAML-loadable per-partition configuration.

Usage hints:

  • Name is used to address partitions in other configs and params maps.
  • ParamsFromUpstream forwards outputs from upstream partitions into Params.
  • StateWidth and StateHistoryDepth control the size and depth of state.

type IterationTestHarness

type IterationTestHarness struct {
	Iteration Iteration
	Err       error
	// contains filtered or unexported fields
}

IterationTestHarness wraps an iteration and performs checks on its behaviour while running.

func (*IterationTestHarness) Configure

func (h *IterationTestHarness) Configure(
	partitionIndex int,
	settings *Settings,
)

func (*IterationTestHarness) Iterate

func (h *IterationTestHarness) Iterate(
	params *Params,
	partitionIndex int,
	stateHistories []*StateHistory,
	timestepsHistory *CumulativeTimestepsHistory,
) []float64

type IteratorInputMessage

type IteratorInputMessage struct {
	StateHistories   []*StateHistory
	TimestepsHistory *CumulativeTimestepsHistory
}

IteratorInputMessage carries shared histories into iterator jobs.

type JsonLogChannelOutputFunction

type JsonLogChannelOutputFunction struct {
	// contains filtered or unexported fields
}

JsonLogChannelOutputFunction writes JSON log entries via a background goroutine using a channel for improved throughput.

func NewJsonLogChannelOutputFunction

func NewJsonLogChannelOutputFunction(
	filePath string,
) *JsonLogChannelOutputFunction

NewJsonLogChannelOutputFunction creates a JsonLogChannelOutputFunction. Call Close (defer it) to ensure flushing at the end of a run.

func (*JsonLogChannelOutputFunction) Close

func (j *JsonLogChannelOutputFunction) Close()

Close ensures that the log channel flushes at the end of a run.

func (*JsonLogChannelOutputFunction) Output

func (j *JsonLogChannelOutputFunction) Output(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
)

type JsonLogEntry

type JsonLogEntry struct {
	PartitionName       string    `json:"partition_name"`
	State               []float64 `json:"state"`
	CumulativeTimesteps float64   `json:"time"`
}

JsonLogEntry is the serialised record format used by JSON log outputs.

type JsonLogOutputFunction

type JsonLogOutputFunction struct {
	// contains filtered or unexported fields
}

JsonLogOutputFunction writes newline-delimited JSON log entries.

func NewJsonLogOutputFunction

func NewJsonLogOutputFunction(
	filePath string,
) *JsonLogOutputFunction

NewJsonLogOutputFunction creates a new JsonLogOutputFunction.

func (*JsonLogOutputFunction) Output

func (j *JsonLogOutputFunction) Output(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
)

type NamedPartitionIndex

type NamedPartitionIndex struct {
	Name  string
	Index int
}

NamedPartitionIndex pairs the name of a partition with the partition index assigned to it by the PartitionCoordinator.

type NamedUpstreamConfig

type NamedUpstreamConfig struct {
	Upstream string `yaml:"upstream"`
	Indices  []int  `yaml:"indices,omitempty"`
}

NamedUpstreamConfig is like UpstreamConfig but refers to upstream by name.

type NilOutputCondition

type NilOutputCondition struct{}

NilOutputCondition never outputs.

func (*NilOutputCondition) IsOutputStep

func (c *NilOutputCondition) IsOutputStep(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
) bool

type NilOutputFunction

type NilOutputFunction struct{}

NilOutputFunction outputs nothing from the simulation.

func (*NilOutputFunction) Output

func (f *NilOutputFunction) Output(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
)

type NumberOfStepsTerminationCondition

type NumberOfStepsTerminationCondition struct {
	MaxNumberOfSteps int
}

NumberOfStepsTerminationCondition terminates after MaxNumberOfSteps.

func (*NumberOfStepsTerminationCondition) Terminate

func (t *NumberOfStepsTerminationCondition) Terminate(
	stateHistories []*StateHistory,
	timestepsHistory *CumulativeTimestepsHistory,
) bool

type OnlyGivenPartitionsOutputCondition

type OnlyGivenPartitionsOutputCondition struct {
	Partitions map[string]bool
}

OnlyGivenPartitionsOutputCondition emits output only for listed partitions.

func (*OnlyGivenPartitionsOutputCondition) IsOutputStep

func (o *OnlyGivenPartitionsOutputCondition) IsOutputStep(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
) bool

type OutputCondition

type OutputCondition interface {
	IsOutputStep(partitionName string, state []float64, cumulativeTimesteps float64) bool
}

OutputCondition decides whether an output should be emitted this step.

type OutputFunction

type OutputFunction interface {
	Output(partitionName string, state []float64, cumulativeTimesteps float64)
}

OutputFunction writes state/time to an output sink when the OutputCondition is met.

type Params

type Params struct {
	Map map[string][]float64 `yaml:",inline"`
	// contains filtered or unexported fields
}

Params stores per-partition parameter values.

Usage hints:

  • Use Get/GetIndex helpers to retrieve, Set/SetIndex to update.
  • SetPartitionName improves error messages for missing params.

func NewParams

func NewParams(params map[string][]float64) Params

NewParams constructs a Params instance.

func (*Params) Get

func (p *Params) Get(name string) []float64

Get returns parameter values or panics with a helpful message.

func (*Params) GetCopy

func (p *Params) GetCopy(name string) []float64

GetCopy returns a copy of parameter values or panics with a helpful message.

func (*Params) GetCopyOk

func (p *Params) GetCopyOk(name string) ([]float64, bool)

GetCopyOk returns a copy of parameter values if present along with a flag.

func (*Params) GetIndex

func (p *Params) GetIndex(name string, index int) float64

GetIndex returns a single parameter value or panics.

func (*Params) GetOk

func (p *Params) GetOk(name string) ([]float64, bool)

GetOk returns parameter values if present along with a boolean flag.

func (*Params) Set

func (p *Params) Set(name string, values []float64)

Set creates or updates parameter values by name.

func (*Params) SetIndex

func (p *Params) SetIndex(name string, index int, value float64)

SetIndex updates a single parameter value or panics on invalid index.

func (*Params) SetPartitionName

func (p *Params) SetPartitionName(name string)

SetPartitionName attaches the owning partition name for better errors.

type PartitionConfig

type PartitionConfig struct {
	Name               string                         `yaml:"name"`
	Iteration          Iteration                      `yaml:"-"`
	Params             Params                         `yaml:"params"`
	ParamsAsPartitions map[string][]string            `yaml:"params_as_partitions,omitempty"`
	ParamsFromUpstream map[string]NamedUpstreamConfig `yaml:"params_from_upstream,omitempty"`
	InitStateValues    []float64                      `yaml:"init_state_values"`
	StateHistoryDepth  int                            `yaml:"state_history_depth"`
	Seed               uint64                         `yaml:"seed"`
}

PartitionConfig defines a partition to add to a simulation.

Usage hints:

  • Iteration is not YAML-serialised; set it programmatically.
  • ParamsAsPartitions allows passing partition indices via their names.
  • ParamsFromUpstream forwards outputs from named upstream partitions.

func LoadPartitionConfigFromYaml

func LoadPartitionConfigFromYaml(path string) *PartitionConfig

LoadPartitionConfigFromYaml loads PartitionConfig from a YAML file path.

Usage hints:

  • Calls Init to populate missing defaults after unmarshalling.

func (*PartitionConfig) Init

func (p *PartitionConfig) Init()

Init ensures params maps are initialised; call after unmarshalling YAML.

type PartitionConfigOrdering

type PartitionConfigOrdering struct {
	Names        []string
	IndexByName  map[string]int
	ConfigByName map[string]*PartitionConfig
}

PartitionConfigOrdering maintains the ordering and lookup for partitions. Can be updated dynamically via Append.

func (*PartitionConfigOrdering) Append

func (p *PartitionConfigOrdering) Append(config *PartitionConfig)

Append inserts another partition into the ordering and updates lookups.

type PartitionCoordinator

type PartitionCoordinator struct {
	Iterators            []*StateIterator
	Shared               *IteratorInputMessage
	TimestepFunction     TimestepFunction
	TerminationCondition TerminationCondition
	// contains filtered or unexported fields
}

PartitionCoordinator orchestrates iteration work across partitions and applies state/time history updates in a coordinated manner.

The PartitionCoordinator is the central component that manages the execution of all partitions in a simulation. It coordinates the timing, communication, and state updates across all partitions, ensuring proper synchronization and maintaining simulation consistency.

Architecture: The coordinator uses a two-phase execution model:

  1. Iteration Phase: All partitions compute their next state values
  2. Update Phase: State and time histories are updated with new values

This design ensures that all partitions see consistent state information during each iteration, preventing race conditions and maintaining simulation determinism.

Concurrency Model:

  • Each partition runs in its own goroutine for parallel execution
  • Channels are used for inter-partition communication
  • WaitGroups ensure proper synchronization between phases
  • Shared state is protected by the coordinator's control flow

Execution Flow:

  1. Compute next timestep increment using TimestepFunction
  2. Request iterations from all partitions (parallel execution)
  3. Wait for all iterations to complete
  4. Update state and time histories (parallel execution)
  5. Check termination condition
  6. Repeat until termination

Fields:

  • Iterators: List of StateIterators, one per partition
  • Shared: Shared state and time information accessible to all partitions
  • TimestepFunction: Function that determines the next timestep increment
  • TerminationCondition: Condition that determines when to stop the simulation
  • newWorkChannels: Communication channels for coordinating partition work

Example Usage:

coordinator := NewPartitionCoordinator(settings, implementations)

// Run simulation until termination
coordinator.Run()

// Or step-by-step control
for !coordinator.ReadyToTerminate() {
    var wg sync.WaitGroup
    coordinator.Step(&wg)
}

Performance:

  • O(p) time complexity where p is the number of partitions
  • Parallel execution of partition iterations
  • Efficient channel-based communication
  • Memory usage scales with partition count and state size

Thread Safety:

  • Safe for concurrent access to coordinator methods
  • Internal synchronization ensures consistent state updates
  • Partition communication is thread-safe through channels

func NewPartitionCoordinator

func NewPartitionCoordinator(
	settings *Settings,
	implementations *Implementations,
) *PartitionCoordinator

NewPartitionCoordinator wires Settings and Implementations into a runnable coordinator with initial state/time histories and channels.

func (*PartitionCoordinator) ReadyToTerminate

func (c *PartitionCoordinator) ReadyToTerminate() bool

ReadyToTerminate returns whether the TerminationCondition is met.

func (*PartitionCoordinator) RequestMoreIterations

func (c *PartitionCoordinator) RequestMoreIterations(wg *sync.WaitGroup)

RequestMoreIterations spawns a goroutine per partition to run ReceiveAndIteratePending.

func (*PartitionCoordinator) Run

func (c *PartitionCoordinator) Run()

Run advances by repeatedly calling Step until termination.

func (*PartitionCoordinator) Step

func (c *PartitionCoordinator) Step(wg *sync.WaitGroup)

Step performs one simulation tick: compute dt, request iterations, then apply state/time updates.

func (*PartitionCoordinator) UpdateHistory

func (c *PartitionCoordinator) UpdateHistory(wg *sync.WaitGroup)

UpdateHistory spawns a goroutine per partition to run UpdateHistory and shifts time history forward, adding NextIncrement to t[0].

type PartitionState

type PartitionState struct {
	CumulativeTimesteps float64   `protobuf:"fixed64,1,opt,name=cumulative_timesteps,json=cumulativeTimesteps,proto3" json:"cumulative_timesteps,omitempty"`
	PartitionName       string    `protobuf:"bytes,2,opt,name=partition_name,json=partitionName,proto3" json:"partition_name,omitempty"`
	State               []float64 `protobuf:"fixed64,3,rep,packed,name=state,proto3" json:"state,omitempty"`
	// contains filtered or unexported fields
}

func (*PartitionState) Descriptor deprecated

func (*PartitionState) Descriptor() ([]byte, []int)

Deprecated: Use PartitionState.ProtoReflect.Descriptor instead.

func (*PartitionState) GetCumulativeTimesteps

func (x *PartitionState) GetCumulativeTimesteps() float64

func (*PartitionState) GetPartitionName

func (x *PartitionState) GetPartitionName() string

func (*PartitionState) GetState

func (x *PartitionState) GetState() []float64

func (*PartitionState) ProtoMessage

func (*PartitionState) ProtoMessage()

func (*PartitionState) ProtoReflect

func (x *PartitionState) ProtoReflect() protoreflect.Message

func (*PartitionState) Reset

func (x *PartitionState) Reset()

func (*PartitionState) String

func (x *PartitionState) String() string

type Settings

type Settings struct {
	Iterations            []IterationSettings `yaml:"iterations"`
	InitTimeValue         float64             `yaml:"init_time_value"`
	TimestepsHistoryDepth int                 `yaml:"timesteps_history_depth"`
}

Settings is the YAML-loadable top-level simulation configuration.

func LoadSettingsFromYaml

func LoadSettingsFromYaml(path string) *Settings

LoadSettingsFromYaml loads Settings from a YAML file path.

Usage hints:

  • Calls Init to populate missing defaults after unmarshalling.

func (*Settings) Init

func (s *Settings) Init()

Init fills in defaults and ensures maps are initialised. Call immediately after unmarshalling from YAML.

type SimulationConfig

type SimulationConfig struct {
	OutputCondition      OutputCondition
	OutputFunction       OutputFunction
	TerminationCondition TerminationCondition
	TimestepFunction     TimestepFunction
	InitTimeValue        float64
}

SimulationConfig defines additional run-level configuration.

type SimulationConfigStrings

type SimulationConfigStrings struct {
	OutputCondition      string  `yaml:"output_condition"`
	OutputFunction       string  `yaml:"output_function"`
	TerminationCondition string  `yaml:"termination_condition"`
	TimestepFunction     string  `yaml:"timestep_function"`
	InitTimeValue        float64 `yaml:"init_time_value"`
}

SimulationConfigStrings is the YAML-loadable version of SimulationConfig, referring to implementations by type names for templating.

func LoadSimulationConfigStringsFromYaml

func LoadSimulationConfigStringsFromYaml(path string) *SimulationConfigStrings

LoadSimulationConfigStringsFromYaml loads SimulationConfigStrings from YAML.

type StateHistory

type StateHistory struct {
	// each row is a different state in the history, by convention,
	// starting with the most recent at index = 0
	Values *mat.Dense
	// should be of length = StateWidth
	NextValues        []float64
	StateWidth        int
	StateHistoryDepth int
}

StateHistory is a rolling window of state vectors.

Usage hints:

  • Values holds rows of state (row 0 is most recent by convention).
  • Use GetNextStateRowToUpdate when updating in multi-row histories.

func (*StateHistory) CopyStateRow

func (s *StateHistory) CopyStateRow(index int) []float64

CopyStateRow copies a row from the state history given the index.

func (*StateHistory) GetNextStateRowToUpdate

func (s *StateHistory) GetNextStateRowToUpdate() []float64

GetNextStateRowToUpdate determines whether or not it is necessary to copy the previous row or simply expose it based on whether a history longer than 1 is needed.

type StateIterator

type StateIterator struct {
	Iteration       Iteration
	Params          Params
	Partition       NamedPartitionIndex
	ValueChannels   StateValueChannels
	OutputCondition OutputCondition
	OutputFunction  OutputFunction
}

StateIterator runs an Iteration for a partition on a goroutine and manages reads/writes to history and output.

func NewStateIterator

func NewStateIterator(
	iteration Iteration,
	params Params,
	partitionName string,
	partitionIndex int,
	valueChannels StateValueChannels,
	outputCondition OutputCondition,
	outputFunction OutputFunction,
	initState []float64,
	initTime float64,
) *StateIterator

NewStateIterator creates a StateIterator and may emit initial output if the condition is met by the initial state/time.

func (*StateIterator) Iterate

func (s *StateIterator) Iterate(
	stateHistories []*StateHistory,
	timestepsHistory *CumulativeTimestepsHistory,
) []float64

Iterate runs the Iteration and optionally triggers output if the condition is met for the new state/time.

func (*StateIterator) ReceiveAndIteratePending

func (s *StateIterator) ReceiveAndIteratePending(
	inputChannel <-chan *IteratorInputMessage,
)

ReceiveAndIteratePending listens for an IteratorInputMessage, updates upstream-driven params, runs Iterate, and stores a pending state update.

func (*StateIterator) UpdateHistory

func (s *StateIterator) UpdateHistory(inputChannel <-chan *IteratorInputMessage)

UpdateHistory applies the pending state update to the partition history.

type StateTimeStorage

type StateTimeStorage struct {
	// contains filtered or unexported fields
}

StateTimeStorage provides thread-safe storage for simulation time series data with minimal contention and efficient access patterns.

StateTimeStorage is designed to handle concurrent access from multiple simulation partitions while maintaining data consistency and performance. It uses a mutex-protected design optimized for the common case of appending new data points during simulation execution.

Data Organization:

  • Time series are organized by partition name
  • Each partition can have multiple state dimensions
  • Time axis is shared across all partitions
  • Data is stored in row-major format for efficient access

Thread Safety:

  • ConcurrentAppend is safe for concurrent use from multiple goroutines
  • GetValues/GetTimes are safe for concurrent reads
  • SetValues/SetTimes should not be called concurrently with appends
  • Internal mutex protects against race conditions

Performance Characteristics:

  • O(1) lookup by partition name using hash map
  • O(1) append operations with minimal locking
  • Memory usage: O(total_samples * state_dimensions)
  • Efficient for high-frequency data collection

Usage Patterns:

  • Real-time data collection during simulation runs
  • Batch data loading from external sources
  • Result storage for post-simulation analysis
  • Intermediate storage for multi-stage simulations

Example Usage:

storage := NewStateTimeStorage()

// Concurrent appends from multiple partitions
go func() {
    storage.ConcurrentAppend("prices", 1.0, []float64{100.0, 101.0})
}()
go func() {
    storage.ConcurrentAppend("volumes", 1.0, []float64{1000.0})
}()

// Retrieve data after simulation
priceData := storage.GetValues("prices")
timeData := storage.GetTimes()

Memory Management:

  • Automatic memory allocation for new partitions
  • Efficient storage of sparse time series
  • No automatic cleanup (caller responsible for memory management)

Error Handling:

  • GetValues panics if partition name is not found
  • Provides helpful error messages with available partition names
  • ConcurrentAppend handles time deduplication automatically

func NewStateTimeStorage

func NewStateTimeStorage() *StateTimeStorage

NewStateTimeStorage constructs a new StateTimeStorage.

func (*StateTimeStorage) ConcurrentAppend

func (s *StateTimeStorage) ConcurrentAppend(
	name string,
	time float64,
	values []float64,
)

ConcurrentAppend appends values for name and updates the time axis at most once per unique timestamp. Safe for concurrent use.

func (*StateTimeStorage) GetIndex

func (s *StateTimeStorage) GetIndex(name string) int

GetIndex returns or creates the index for a name.

func (*StateTimeStorage) GetNames

func (s *StateTimeStorage) GetNames() []string

GetNames returns all partition names present in the store.

func (*StateTimeStorage) GetTimes

func (s *StateTimeStorage) GetTimes() []float64

GetTimes returns the time axis.

func (*StateTimeStorage) GetValues

func (s *StateTimeStorage) GetValues(name string) [][]float64

GetValues returns all time series for name, panicking if absent.

func (*StateTimeStorage) SetTimes

func (s *StateTimeStorage) SetTimes(times []float64)

SetTimes replaces the time axis.

func (*StateTimeStorage) SetValues

func (s *StateTimeStorage) SetValues(name string, values [][]float64)

SetValues replaces the entire series for name.

type StateTimeStorageOutputFunction

type StateTimeStorageOutputFunction struct {
	Store *StateTimeStorage
}

StateTimeStorageOutputFunction stores output into StateTimeStorage when the condition is met.

func (*StateTimeStorageOutputFunction) Output

func (f *StateTimeStorageOutputFunction) Output(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
)

type StateValueChannels

type StateValueChannels struct {
	Upstreams  map[string]*UpstreamStateValues
	Downstream *DownstreamStateValues
}

StateValueChannels provides upstream/downstream channels for inter-iterator communication.

func (*StateValueChannels) BroadcastDownstream

func (s *StateValueChannels) BroadcastDownstream(stateValues []float64)

BroadcastDownstream sends state values to all configured downstream copies.

func (*StateValueChannels) UpdateUpstreamParams

func (s *StateValueChannels) UpdateUpstreamParams(params *Params)

UpdateUpstreamParams updates Params with values received from upstream channels.

type StdoutOutputFunction

type StdoutOutputFunction struct{}

StdoutOutputFunction outputs the state to the terminal.

func (*StdoutOutputFunction) Output

func (s *StdoutOutputFunction) Output(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
)

type TerminationCondition

type TerminationCondition interface {
	Terminate(
		stateHistories []*StateHistory,
		timestepsHistory *CumulativeTimestepsHistory,
	) bool
}

TerminationCondition decides when the simulation should end.

type TimeElapsedTerminationCondition

type TimeElapsedTerminationCondition struct {
	MaxTimeElapsed float64
}

TimeElapsedTerminationCondition terminates after MaxTimeElapsed.

func (*TimeElapsedTerminationCondition) Terminate

func (t *TimeElapsedTerminationCondition) Terminate(
	stateHistories []*StateHistory,
	timestepsHistory *CumulativeTimestepsHistory,
) bool

type TimestepFunction

type TimestepFunction interface {
	NextIncrement(
		timestepsHistory *CumulativeTimestepsHistory,
	) float64
}

TimestepFunction computes the next time increment.

type UpstreamConfig

type UpstreamConfig struct {
	Upstream int   `yaml:"upstream"`
	Indices  []int `yaml:"indices,omitempty"`
}

UpstreamConfig is the YAML-loadable representation of a slice of data from the output of a partition which is computationally upstream.

type UpstreamStateValues

type UpstreamStateValues struct {
	Channel chan []float64
	Indices []int
}

UpstreamStateValues contains information to receive state values from an upstream iterator via channel.

type WebsocketOutputFunction

type WebsocketOutputFunction struct {
	// contains filtered or unexported fields
}

WebsocketOutputFunction serialises and sends outputs via a websocket connection when the condition is met.

func NewWebsocketOutputFunction

func NewWebsocketOutputFunction(
	connection *websocket.Conn,
	mutex *sync.Mutex,
) *WebsocketOutputFunction

NewWebsocketOutputFunction constructs a WebsocketOutputFunction with a connection and a mutex for safe concurrent writes.

func (*WebsocketOutputFunction) Output

func (w *WebsocketOutputFunction) Output(
	partitionName string,
	state []float64,
	cumulativeTimesteps float64,
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL