worker

package
v0.0.1-beta-8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxItemsInStreamQueue = 10
	StreamMaxDelay        = 30 * time.Second
)

Variables

View Source
var (
	ErrStreamQueueTimeout = fmt.Errorf("stream queue timeout")
)

Functions

This section is empty.

Types

type CoordinatorConnection

type CoordinatorConnection interface {
	JoinToCoordinator(ctx context.Context) error
	LeaveCoordinator(ctx context.Context) error
	GetClient() pb.CoordinatorClient
	UpdateWorkerStreamStatus(ctx context.Context, workerStreamID int64, status pb.WorkerStreamStatus) error
	IngestMetrics(ctx context.Context, workerStreamID int64, inputEvents, processorErrors, outputEvents uint64) error
}

func NewCoordinatorConnection

func NewCoordinatorConnection(grpcConn *grpc.ClientConn, grpcPort uint32) CoordinatorConnection

type IngestResult

type IngestResult struct {
	StatusCode int
	Response   []byte
}

type ServiceStream

type ServiceStream struct {
	Stream         *service.Stream
	Cancel         context.CancelFunc
	Mux            *http.ServeMux
	Status         persistence.WorkerStreamStatus
	TracingSummary *service.TracingSummary
}

type StreamManager

type StreamManager interface {
	AddStream(workerStreamID int64, config string) error
	GetStream(workerStreamID int64) (*ServiceStream, bool)
	GetStreamStatus(workerStreamID int64) (*persistence.WorkerStreamStatus, error)
	DeleteStream(workerStreamID int64) error
	IngestData(workerStreamID int64, method, path, contentType string, payload []byte) (*IngestResult, error)
	GetAllStreams() map[int64]*ServiceStream
	StopAllStreams()
	StartStream(ctx context.Context, workerStreamID int64)
}

func NewStreamManager

func NewStreamManager(coordinatorConnection CoordinatorConnection, vaultProvider vault.VaultProvider) StreamManager

type StreamQueue

type StreamQueue interface {
	AddStreamToQueue(workerStreamID int64, config string) error
	ConsumeStreamQueue(ctx context.Context)
}

func NewStreamQueue

func NewStreamQueue(streamManager StreamManager) StreamQueue

type StreamQueueItem

type StreamQueueItem struct {
	WorkerStreamID int64
	Config         string
}

type WorkerExecutor

type WorkerExecutor interface {
	JoinToCoordinator(context.Context) error
	LeaveCoordinator(context.Context) error
	AddStreamToQueue(ctx context.Context, workerStreamID int64, config string) error
	FetchWorkerStreamStatus(ctx context.Context, workerStreamID int64) (*persistence.WorkerStreamStatus, error)
	DeleteWorkerStream(ctx context.Context, workerStreamID int64) error
	ShipLogs(context.Context)
	ShipMetrics(context.Context)
	ConsumeStreamQueue(context.Context)
	IngestData(ctx context.Context, workerStreamID int64, method, path, contentType string, payload []byte) (*IngestResult, error)
}

func NewWorkerExecutor

func NewWorkerExecutor(grpcConn *grpc.ClientConn, grpcPort uint32, vaultProvider vault.VaultProvider) WorkerExecutor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL