request

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2025 License: Apache-2.0 Imports: 25 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AstraBundleConfig

type AstraBundleConfig struct {
	SecureConnectionBundleBase64 string `mapstructure:"secure-connection-bundle-base64"`
	GatewayUser                  string `mapstructure:"gateway-user"`
	GatewayPassword              string `mapstructure:"gateway-password"`
}

type AstraCqlStoreConfig

type AstraCqlStoreConfig struct {
	GatewayHost string
	GatewayPort string
	GatewayUser string
	GatewayPass string
	TlsConfig   *tls.Config
}

AstraCqlStoreConfig defines configuration for gocql needed to connect to AstraDB

func NewAstraCqlStoreConfig

func NewAstraCqlStoreConfig(logger klog.Logger, config *AstraBundleConfig) *AstraCqlStoreConfig

type Buffer

type Buffer interface {
	Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)
	GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
	GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
	Update(checkpoint *models.CheckpointedRequest) error
	GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
	Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) error
	Start(submitter pipeline.StageActor[*BufferOutput, types.UID])
}

type BufferConfig

type BufferConfig struct {
	PayloadStoragePath         string        `mapstructure:"payload-storage-path,omitempty"`
	PayloadValidFor            time.Duration `mapstructure:"payload-valid-for,omitempty"`
	FailureRateBaseDelay       time.Duration `mapstructure:"failure-rate-base-delay,omitempty"`
	FailureRateMaxDelay        time.Duration `mapstructure:"failure-rate-max-delay,omitempty"`
	RateLimitElementsPerSecond int           `mapstructure:"rate-limit-elements-per-second,omitempty"`
	RateLimitElementsBurst     int           `mapstructure:"rate-limit-elements-burst,omitempty"`
	Workers                    int           `mapstructure:"workers,omitempty"`
}

type BufferInput

type BufferInput struct {
	Checkpoint        *models.CheckpointedRequest
	ResolvedWorkgroup *v1.NexusAlgorithmWorkgroupSpec
	ResolvedParent    *metav1.OwnerReference
	SerializedPayload *[]byte
	Config            *v1.NexusAlgorithmSpec
	IsDryRun          bool
}

func NewBufferInput added in v1.1.0

func NewBufferInput(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) (*BufferInput, error)

func (*BufferInput) Tags added in v1.1.0

func (input *BufferInput) Tags() map[string]string

type BufferOutput

type BufferOutput struct {
	Checkpoint      *models.CheckpointedRequest
	Entry           *models.SubmissionBufferEntry
	Workgroup       *v1.NexusAlgorithmWorkgroupSpec
	ParentReference *metav1.OwnerReference
	IsDryRun        bool
}

type CheckpointStore

type CheckpointStore interface {
	UpsertCheckpoint(checkpoint *models.CheckpointedRequest) error
	ReadCheckpoint(algorithm string, id string) (*models.CheckpointedRequest, error)
	ReadBufferedCheckpointsByHost(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)
	ReadCheckpointsByTag(requestTag string) (iter.Seq2[*models.CheckpointedRequest, error], error)
}

type CqlStore

type CqlStore struct {
	// contains filtered or unexported fields
}

func NewAstraCqlStore

func NewAstraCqlStore(logger klog.Logger, bundle *AstraBundleConfig) *CqlStore

NewAstraCqlStore creates a CqlStore connected to DataStax AstraDB serverless instance

func NewCqlStore

func NewCqlStore(cluster *gocql.ClusterConfig, logger klog.Logger) *CqlStore

NewCqlStore creates a generic connected CqlStore (Apache Cassandra/Scylla)

func NewScyllaCqlStore added in v1.2.2

func NewScyllaCqlStore(logger klog.Logger, config *ScyllaCqlStoreConfig) *CqlStore

func (*CqlStore) ReadBufferedCheckpointsByHost added in v1.1.0

func (cqls *CqlStore) ReadBufferedCheckpointsByHost(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)

func (*CqlStore) ReadCheckpoint

func (cqls *CqlStore) ReadCheckpoint(algorithm string, id string) (*models.CheckpointedRequest, error)

func (*CqlStore) ReadCheckpointsByTag added in v1.1.0

func (cqls *CqlStore) ReadCheckpointsByTag(requestTag string) (iter.Seq2[*models.CheckpointedRequest, error], error)

func (*CqlStore) ReadMetadata

func (cqls *CqlStore) ReadMetadata(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)

func (*CqlStore) UpsertCheckpoint

func (cqls *CqlStore) UpsertCheckpoint(checkpoint *models.CheckpointedRequest) error

func (*CqlStore) UpsertMetadata

func (cqls *CqlStore) UpsertMetadata(entry *models.SubmissionBufferEntry) error

type DefaultBuffer

type DefaultBuffer struct {
	// contains filtered or unexported fields
}

func NewAstraS3Buffer added in v1.2.2

func NewAstraS3Buffer(ctx context.Context, config *S3BufferConfig, astraConfig *AstraBundleConfig, metricTags map[string]string) *DefaultBuffer

NewAstraS3Buffer creates a default buffer that uses Astra DB for checkpointing and S3-compatible storage for payload persistence

func NewScyllaS3Buffer added in v1.2.2

func NewScyllaS3Buffer(ctx context.Context, config *S3BufferConfig, scyllaConfig *ScyllaCqlStoreConfig, metricTags map[string]string) *DefaultBuffer

NewScyllaS3Buffer creates a default buffer that uses ScyllaDb for checkpointing and S3-compatible storage for payload persistence

func (*DefaultBuffer) Add

func (buffer *DefaultBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) error

func (*DefaultBuffer) Get added in v0.4.0

func (buffer *DefaultBuffer) Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)

func (*DefaultBuffer) GetBuffered added in v1.1.0

func (buffer *DefaultBuffer) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)

func (*DefaultBuffer) GetBufferedEntry added in v1.1.0

func (buffer *DefaultBuffer) GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)

func (*DefaultBuffer) GetTagged added in v1.1.0

func (buffer *DefaultBuffer) GetTagged(tag string) (iter.Seq2[*models.CheckpointedRequest, error], error)

func (*DefaultBuffer) Start

func (buffer *DefaultBuffer) Start(submitter pipeline.StageActor[*BufferOutput, types.UID])

func (*DefaultBuffer) Update added in v1.1.0

func (buffer *DefaultBuffer) Update(checkpoint *models.CheckpointedRequest) error

type MemoryPassthroughBuffer added in v1.3.1

type MemoryPassthroughBuffer struct {
	Checkpoints     []*models.CheckpointedRequest
	BufferedEntries []*models.SubmissionBufferEntry
	// contains filtered or unexported fields
}

func NewMemoryPassthroughBuffer added in v1.3.1

func NewMemoryPassthroughBuffer(ctx context.Context, metricTags map[string]string) *MemoryPassthroughBuffer

NewMemoryPassthroughBuffer creates a default buffer that does not persist payloads. This buffer persists ALL information in app memory and is ONLY intended to use in tests. DO NOT USE THIS IN PRODUCTION. Some methods in this buffer will behave different from Cassandra buffers, for example Update replaces the checkpoint instead of updating its properties.

func (*MemoryPassthroughBuffer) Add added in v1.3.1

func (buffer *MemoryPassthroughBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.NexusAlgorithmSpec, workgroup *v1.NexusAlgorithmWorkgroupSpec, parent *metav1.OwnerReference, isDryRun bool) error

func (*MemoryPassthroughBuffer) Get added in v1.3.1

func (buffer *MemoryPassthroughBuffer) Get(requestId string, algorithmName string) (*models.CheckpointedRequest, error)

func (*MemoryPassthroughBuffer) GetBuffered added in v1.3.1

func (buffer *MemoryPassthroughBuffer) GetBuffered(host string) (iter.Seq2[*models.CheckpointedRequest, error], error)

func (*MemoryPassthroughBuffer) GetBufferedEntry added in v1.3.1

func (buffer *MemoryPassthroughBuffer) GetBufferedEntry(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)

func (*MemoryPassthroughBuffer) GetTagged added in v1.3.1

func (*MemoryPassthroughBuffer) Start added in v1.3.1

func (buffer *MemoryPassthroughBuffer) Start(submitter pipeline.StageActor[*BufferOutput, types.UID])

func (*MemoryPassthroughBuffer) Update added in v1.3.1

func (buffer *MemoryPassthroughBuffer) Update(checkpoint *models.CheckpointedRequest) error

type MetadataStore

type MetadataStore interface {
	UpsertMetadata(entry *models.SubmissionBufferEntry) error
	ReadMetadata(checkpoint *models.CheckpointedRequest) (*models.SubmissionBufferEntry, error)
}

type S3BufferConfig added in v1.1.0

type S3BufferConfig struct {
	BufferConfig    *BufferConfig `mapstructure:"buffer-config,omitempty"`
	AccessKeyID     string        `mapstructure:"access-key-id,omitempty"`
	SecretAccessKey string        `mapstructure:"secret-access-key,omitempty"`
	Region          string        `mapstructure:"region,omitempty"`
	Endpoint        string        `mapstructure:"endpoint,omitempty"`
}

type ScyllaCqlStoreConfig added in v1.2.2

type ScyllaCqlStoreConfig struct {
	Hosts    []string `mapstructure:"hosts"`
	Port     string   `mapstructure:"port"`
	User     string   `mapstructure:"user"`
	Password string   `mapstructure:"password"`
	LocalDC  string   `mapstructure:"local-dc"`
}

ScyllaCqlStoreConfig defines configuration for gocql needed to connect to ScyllaDB

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL