Documentation
¶
Index ¶
- Variables
- func NewExecutor(objectDownloader object.Downloader[object.GlobalReference], ...) ...
- type Computer
- type ComputerFactory
- type ComputerForTesting
- type Environment
- type Evaluation
- type KeyState
- type NestedError
- type ObjectManagerForTesting
- type RecursiveComputer
- func (rc *RecursiveComputer[TReference, TMetadata]) GetAllEvaluations() iter.Seq[Evaluation[TReference]]
- func (rc *RecursiveComputer[TReference, TMetadata]) GetOrCreateKeyState(key model_core.TopLevelMessage[proto.Message, TReference]) (*KeyState[TReference, TMetadata], error)
- func (rc *RecursiveComputer[TReference, TMetadata]) GetProgress() (model_core.PatchedMessage[*model_evaluation_pb.Progress, TMetadata], error)
- func (rc *RecursiveComputer[TReference, TMetadata]) InjectKeyState(key model_core.TopLevelMessage[proto.Message, TReference], ...) error
- func (rc *RecursiveComputer[TReference, TMetadata]) ProcessNextQueuedKey(ctx context.Context, rcq *RecursiveComputerQueue[TReference, TMetadata]) bool
- func (rc *RecursiveComputer[TReference, TMetadata]) WaitForMessageValue(ctx context.Context, ks *KeyState[TReference, TMetadata]) (model_core.Message[proto.Message, TReference], error)
- type RecursiveComputerQueue
- type RecursiveComputerQueuePicker
- type RecursiveComputerQueues
- type RecursiveComputerQueuesFactory
Constants ¶
This section is empty.
Variables ¶
var ErrMissingDependency = errors.New("missing dependency")
ErrMissingDependency is an error that can be returned by Computer.Compute*Value() to indicate that a value could not be computed due to a dependent value being missing. Retrying the computation after the values of dependencies will allow further progress.
Functions ¶
func NewExecutor ¶
func NewExecutor( objectDownloader object.Downloader[object.GlobalReference], computerFactory ComputerFactory[buffered.Reference, *model_core.LeakCheckingReferenceMetadata[buffered.ReferenceMetadata]], queuesFactory RecursiveComputerQueuesFactory[buffered.Reference, buffered.ReferenceMetadata], parsedObjectPool *model_parser.ParsedObjectPool, dagUploaderClient dag_pb.UploaderClient, objectContentsWalkerSemaphore *semaphore.Weighted, clock clock.Clock, ) remoteworker.Executor[*model_executewithstorage.Action[object.GlobalReference], model_core.Decodable[object.LocalReference], model_core.Decodable[object.LocalReference]]
Types ¶
type Computer ¶
type Computer[TReference any, TMetadata model_core.ReferenceMetadata] interface { ComputeMessageValue(ctx context.Context, key model_core.Message[proto.Message, TReference], e Environment[TReference, TMetadata]) (model_core.PatchedMessage[proto.Message, TMetadata], error) ComputeNativeValue(ctx context.Context, key model_core.Message[proto.Message, TReference], e Environment[TReference, TMetadata]) (any, error) }
Computer of values belonging to keys. Keys are always provided in the form of a Protobuf message. The resulting values can either be Protobuf messages, or ones belonging to native Go types.
func NewLeakCheckingComputer ¶
func NewLeakCheckingComputer[TReference any, TMetadata model_core.ReferenceMetadata](base Computer[TReference, *model_core.LeakCheckingReferenceMetadata[TMetadata]]) Computer[TReference, TMetadata]
NewLeakCheckingComputer creates a decorator for Computer that keeps track of all ReferenceMetadata objects that a call to Compute*Value() creates. When any computation causes ReferenceMetadata objects to leak, an error is reported.
type ComputerFactory ¶
type ComputerFactory[TReference any, TMetadata model_core.ReferenceMetadata] interface { NewComputer( namespace object.Namespace, parsedObjectPoolIngester *model_parser.ParsedObjectPoolIngester[TReference], objectExporter model_core.ObjectExporter[TReference, object.LocalReference], ) Computer[TReference, TMetadata] }
type ComputerForTesting ¶
type ComputerForTesting Computer[object.LocalReference, model_core.ReferenceMetadata]
type Environment ¶
type Environment[TReference any, TMetadata model_core.ReferenceMetadata] interface { model_core.ObjectManager[TReference, TMetadata] // Methods that implementations of Computer can invoke to get // access to the value of another key. GetMessageValue(key model_core.PatchedMessage[proto.Message, TMetadata]) model_core.Message[proto.Message, TReference] GetNativeValue(key model_core.PatchedMessage[proto.Message, TMetadata]) (any, bool) }
Environment that is provided to Computer.Compute*Value() to obtain access to values of other keys, and to attach Merkle tree nodes to computed keys and values.
type Evaluation ¶
type Evaluation[TReference any] struct { Key model_core.TopLevelMessage[proto.Message, TReference] Value model_core.Message[proto.Message, TReference] Dependencies []model_core.TopLevelMessage[proto.Message, TReference] }
type KeyState ¶
type KeyState[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] struct { // contains filtered or unexported fields }
KeyState contains all of the evaluation state of RecursiveComputer for a given key. If evaluation has not yet completed, it stores the list of keys that are currently blocked on its completion (i.e., its reverse dependencies). Upon completion, it stores the value associated with the key or any error that occurred computing it.
type NestedError ¶
type NestedError[TReference object.BasicReference] struct { Key model_core.TopLevelMessage[proto.Message, TReference] Err error }
NestedError is used to wrap errors that occurred while evaluating a dependency of a given key. The key of the dependency is included, meaning that repeated unwrapping can be used to obtain a stack trace.
func (NestedError[TReference]) Error ¶
func (e NestedError[TReference]) Error() string
type ObjectManagerForTesting ¶
type ObjectManagerForTesting = model_core.ObjectManager[object.LocalReference, model_core.ReferenceMetadata]
type RecursiveComputer ¶
type RecursiveComputer[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] struct { // contains filtered or unexported fields }
RecursiveComputer can be used to compute values, taking dependencies between keys into account.
Whenever the computation function requests the value for a key that has not been computed before, the key of the dependency is placed in a queue. Once the values of all previously missing dependencies are available, computation of the original key is restarted. This process repeates itself until all requested keys are exhausted.
func NewRecursiveComputer ¶
func NewRecursiveComputer[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata]( base Computer[TReference, TMetadata], queuePicker RecursiveComputerQueuePicker[TReference, TMetadata], objectManager model_core.ObjectManager[TReference, TMetadata], clock clock.Clock, ) *RecursiveComputer[TReference, TMetadata]
NewRecursiveComputer creates a new RecursiveComputer that is in the initial state (i.e., having no queued or evaluated keys).
func (*RecursiveComputer[TReference, TMetadata]) GetAllEvaluations ¶
func (rc *RecursiveComputer[TReference, TMetadata]) GetAllEvaluations() iter.Seq[Evaluation[TReference]]
func (*RecursiveComputer[TReference, TMetadata]) GetOrCreateKeyState ¶
func (rc *RecursiveComputer[TReference, TMetadata]) GetOrCreateKeyState(key model_core.TopLevelMessage[proto.Message, TReference]) (*KeyState[TReference, TMetadata], error)
func (*RecursiveComputer[TReference, TMetadata]) GetProgress ¶
func (rc *RecursiveComputer[TReference, TMetadata]) GetProgress() (model_core.PatchedMessage[*model_evaluation_pb.Progress, TMetadata], error)
func (*RecursiveComputer[TReference, TMetadata]) InjectKeyState ¶
func (rc *RecursiveComputer[TReference, TMetadata]) InjectKeyState(key model_core.TopLevelMessage[proto.Message, TReference], value model_core.Message[proto.Message, TReference]) error
func (*RecursiveComputer[TReference, TMetadata]) ProcessNextQueuedKey ¶
func (rc *RecursiveComputer[TReference, TMetadata]) ProcessNextQueuedKey(ctx context.Context, rcq *RecursiveComputerQueue[TReference, TMetadata]) bool
func (*RecursiveComputer[TReference, TMetadata]) WaitForMessageValue ¶
func (rc *RecursiveComputer[TReference, TMetadata]) WaitForMessageValue(ctx context.Context, ks *KeyState[TReference, TMetadata]) (model_core.Message[proto.Message, TReference], error)
type RecursiveComputerQueue ¶
type RecursiveComputerQueue[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] struct { // contains filtered or unexported fields }
RecursiveComputerQueue represents a queue of evaluation keys that are currently not blocked and are ready to be evaluated.
Instances of RecursiveComputer can make use of multiple queues. This can be used to enforce that different types of keys are evaluated with different amounts of concurrency. For example, keys that are CPU intensive to evaluate can be executed with a concurrency proportional to the number of locally available CPU cores, while keys that perform long-running network requests can use a higher amount of concurrency.
func NewRecursiveComputerQueue ¶
func NewRecursiveComputerQueue[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata]() *RecursiveComputerQueue[TReference, TMetadata]
NewRecursiveComputerQueue creates a new RecursiveComputerQueue that does not have any queues keys.
type RecursiveComputerQueuePicker ¶
type RecursiveComputerQueuePicker[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] interface { PickQueue(model_core.Message[proto.Message, TReference]) *RecursiveComputerQueue[TReference, TMetadata] }
RecursiveComputerQueuePicker is used by RecursiveComputer to pick a RecursiveComputerQueue to which a given key should be assigned.
type RecursiveComputerQueues ¶
type RecursiveComputerQueues[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] interface { RecursiveComputerQueuePicker[TReference, TMetadata] ProcessAllQueuedKeys(group program.Group, computer *RecursiveComputer[TReference, TMetadata]) }
RecursiveComputerQueues represents a set of queues that RecursiveComputer may use to schedule the evaluation of keys.
Simple implementations may place all keys in a single queue, but this has the disadvantage that all work is limited by the same concurrency limit. This may be sufficient if all work is CPU intensive, but may lead to low utilization if some work calls into remote services and may block for large amounts of time.
type RecursiveComputerQueuesFactory ¶
type RecursiveComputerQueuesFactory[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata] interface { NewQueues() RecursiveComputerQueues[TReference, TMetadata] }
RecursiveComputerQueuesFactory is invoked by Executor to create the queues that are necessary to schedule the evaluation of keys.
func NewSimpleRecursiveComputerQueuesFactory ¶
func NewSimpleRecursiveComputerQueuesFactory[TReference object.BasicReference, TMetadata model_core.ReferenceMetadata](concurrency uint32) RecursiveComputerQueuesFactory[TReference, TMetadata]
RecursiveComputerQueues creates a RecursiveComputerQueuesFactory that always returns RecursiveComputerQueues instances backed by a single queue.
This implementation may be sufficient for testing, or can be used as a base type for more advanced implementations that create multiple queues.