Documentation
¶
Index ¶
- type AstraBundleConfig
- type AstraCqlStoreConfig
- type Buffer
- type BufferConfig
- type BufferInput
- type BufferOutput
- type CheckpointStore
- type CqlStore
- func (cqls *CqlStore) ReadBufferedCheckpointsByHost(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
- func (cqls *CqlStore) ReadCheckpoint(algorithm string, id string) (*models.CheckpointedRequest, error)
- func (cqls *CqlStore) ReadCheckpointsByTag(requestTag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
- func (cqls *CqlStore) ReadMetadata(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
- func (cqls *CqlStore) UpsertCheckpoint(checkpoint *models.CheckpointedRequest) error
- func (cqls *CqlStore) UpsertMetadata(entry *models.SubmissionBufferEntry) error
- type DefaultBuffer
- func (buffer *DefaultBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, ...) error
- func (buffer *DefaultBuffer) Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)
- func (buffer *DefaultBuffer) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
- func (buffer *DefaultBuffer) GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
- func (buffer *DefaultBuffer) GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
- func (buffer *DefaultBuffer) Start(submitter pipeline.StageActor[*BufferOutput, types.UID])
- func (buffer *DefaultBuffer) Update(checkpoint *models.CheckpointedRequest) error
- type MemoryPassthroughBuffer
- func (buffer *MemoryPassthroughBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, ...) error
- func (buffer *MemoryPassthroughBuffer) Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)
- func (buffer *MemoryPassthroughBuffer) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
- func (buffer *MemoryPassthroughBuffer) GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
- func (buffer *MemoryPassthroughBuffer) GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
- func (buffer *MemoryPassthroughBuffer) Start(submitter pipeline.StageActor[*BufferOutput, types.UID])
- func (buffer *MemoryPassthroughBuffer) Update(checkpoint *models.CheckpointedRequest) error
- type MetadataStore
- type S3BufferConfig
- type ScyllaCqlStoreConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AstraBundleConfig ¶
type AstraCqlStoreConfig ¶
type AstraCqlStoreConfig struct { GatewayHost string GatewayPort string GatewayUser string GatewayPass string TlsConfig *tls.Config }
AstraCqlStoreConfig defines configuration for gocql needed to connect to AstraDB
func NewAstraCqlStoreConfig ¶
func NewAstraCqlStoreConfig(logger klog.Logger, config *AstraBundleConfig) *AstraCqlStoreConfig
type Buffer ¶
type Buffer interface { Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error) GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error) Update(checkpoint *models.CheckpointedRequest) error GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) error Start(submitter pipeline.StageActor[*BufferOutput, types.UID]) }
type BufferConfig ¶
type BufferConfig struct { PayloadStoragePath string `mapstructure:"payload-storage-path,omitempty"` PayloadValidFor time.Duration `mapstructure:"payload-valid-for,omitempty"` FailureRateBaseDelay time.Duration `mapstructure:"failure-rate-base-delay,omitempty"` FailureRateMaxDelay time.Duration `mapstructure:"failure-rate-max-delay,omitempty"` RateLimitElementsPerSecond int `mapstructure:"rate-limit-elements-per-second,omitempty"` RateLimitElementsBurst int `mapstructure:"rate-limit-elements-burst,omitempty"` Workers int `mapstructure:"workers,omitempty"` }
type BufferInput ¶
type BufferInput struct { Checkpoint *models.CheckpointedRequest ResolvedWorkgroup *v1.NexusAlgorithmWorkgroupSpec ResolvedParent *metav1.OwnerReference SerializedPayload *[]byte Config *v1.NexusAlgorithmSpec IsDryRun bool }
func NewBufferInput ¶ added in v1.1.0
func NewBufferInput(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) (*BufferInput, error)
func (*BufferInput) Tags ¶ added in v1.1.0
func (input *BufferInput) Tags() map[string]string
type BufferOutput ¶
type BufferOutput struct { Checkpoint *models.CheckpointedRequest Entry *models.SubmissionBufferEntry Workgroup *v1.NexusAlgorithmWorkgroupSpec ParentReference *metav1.OwnerReference IsDryRun bool }
type CheckpointStore ¶
type CheckpointStore interface { UpsertCheckpoint(checkpoint *models.CheckpointedRequest) error ReadCheckpoint(algorithm string, id string) (*models.CheckpointedRequest, error) ReadBufferedCheckpointsByHost(host string) (iter.Seq2[*models.CheckpointedRequest, error], error) ReadCheckpointsByTag(requestTag string) (iter.Seq2[*models.CheckpointedRequest, error], error) }
type CqlStore ¶
type CqlStore struct {
// contains filtered or unexported fields
}
func NewAstraCqlStore ¶
func NewAstraCqlStore(logger klog.Logger, bundle *AstraBundleConfig) *CqlStore
NewAstraCqlStore creates a CqlStore connected to DataStax AstraDB serverless instance
func NewCqlStore ¶
func NewCqlStore(cluster *gocql.ClusterConfig, logger klog.Logger) *CqlStore
NewCqlStore creates a generic connected CqlStore (Apache Cassandra/Scylla)
func NewScyllaCqlStore ¶ added in v1.2.2
func NewScyllaCqlStore(logger klog.Logger, config *ScyllaCqlStoreConfig) *CqlStore
func (*CqlStore) ReadBufferedCheckpointsByHost ¶ added in v1.1.0
func (*CqlStore) ReadCheckpoint ¶
func (*CqlStore) ReadCheckpointsByTag ¶ added in v1.1.0
func (*CqlStore) ReadMetadata ¶
func (cqls *CqlStore) ReadMetadata(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
func (*CqlStore) UpsertCheckpoint ¶
func (cqls *CqlStore) UpsertCheckpoint(checkpoint *models.CheckpointedRequest) error
func (*CqlStore) UpsertMetadata ¶
func (cqls *CqlStore) UpsertMetadata(entry *models.SubmissionBufferEntry) error
type DefaultBuffer ¶
type DefaultBuffer struct {
// contains filtered or unexported fields
}
func NewAstraS3Buffer ¶ added in v1.2.2
func NewAstraS3Buffer(ctx context.Context, config *S3BufferConfig, astraConfig *AstraBundleConfig, metricTags map[string]string) *DefaultBuffer
NewAstraS3Buffer creates a default buffer that uses Astra DB for checkpointing and S3-compatible storage for payload persistence
func NewScyllaS3Buffer ¶ added in v1.2.2
func NewScyllaS3Buffer(ctx context.Context, config *S3BufferConfig, scyllaConfig *ScyllaCqlStoreConfig, metricTags map[string]string) *DefaultBuffer
NewScyllaS3Buffer creates a default buffer that uses ScyllaDb for checkpointing and S3-compatible storage for payload persistence
func (*DefaultBuffer) Add ¶
func (buffer *DefaultBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) error
func (*DefaultBuffer) Get ¶ added in v0.4.0
func (buffer *DefaultBuffer) Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)
func (*DefaultBuffer) GetBuffered ¶ added in v1.1.0
func (buffer *DefaultBuffer) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
func (*DefaultBuffer) GetBufferedEntry ¶ added in v1.1.0
func (buffer *DefaultBuffer) GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
func (*DefaultBuffer) GetTagged ¶ added in v1.1.0
func (buffer *DefaultBuffer) GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
func (*DefaultBuffer) Start ¶
func (buffer *DefaultBuffer) Start(submitter pipeline.StageActor[*BufferOutput, types.UID])
func (*DefaultBuffer) Update ¶ added in v1.1.0
func (buffer *DefaultBuffer) Update(checkpoint *models.CheckpointedRequest) error
type MemoryPassthroughBuffer ¶ added in v1.3.1
type MemoryPassthroughBuffer struct { Checkpoints []*models.CheckpointedRequest BufferedEntries []*models.SubmissionBufferEntry // contains filtered or unexported fields }
func NewMemoryPassthroughBuffer ¶ added in v1.3.1
func NewMemoryPassthroughBuffer(ctx context.Context, metricTags map[string]string) *MemoryPassthroughBuffer
NewMemoryPassthroughBuffer creates a default buffer that does not persist payloads. This buffer persists ALL information in app memory and is ONLY intended to use in tests. DO NOT USE THIS IN PRODUCTION. Some methods in this buffer will behave different from Cassandra buffers, for example Update replaces the checkpoint instead of updating its properties.
func (*MemoryPassthroughBuffer) Add ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) error
func (*MemoryPassthroughBuffer) Get ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)
func (*MemoryPassthroughBuffer) GetBuffered ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
func (*MemoryPassthroughBuffer) GetBufferedEntry ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
func (*MemoryPassthroughBuffer) GetTagged ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
func (*MemoryPassthroughBuffer) Start ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) Start(submitter pipeline.StageActor[*BufferOutput, types.UID])
func (*MemoryPassthroughBuffer) Update ¶ added in v1.3.1
func (buffer *MemoryPassthroughBuffer) Update(checkpoint *models.CheckpointedRequest) error
type MetadataStore ¶
type MetadataStore interface { UpsertMetadata(entry *models.SubmissionBufferEntry) error ReadMetadata(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error) }
type S3BufferConfig ¶ added in v1.1.0
type S3BufferConfig struct { BufferConfig *BufferConfig `mapstructure:"buffer-config,omitempty"` AccessKeyID string `mapstructure:"access-key-id,omitempty"` SecretAccessKey string `mapstructure:"secret-access-key,omitempty"` Region string `mapstructure:"region,omitempty"` Endpoint string `mapstructure:"endpoint,omitempty"` }
type ScyllaCqlStoreConfig ¶ added in v1.2.2
type ScyllaCqlStoreConfig struct { Hosts []string `mapstructure:"hosts"` Port string `mapstructure:"port"` User string `mapstructure:"user"` Password string `mapstructure:"password"` LocalDC string `mapstructure:"local-dc"` }
ScyllaCqlStoreConfig defines configuration for gocql needed to connect to ScyllaDB