Documentation
¶
Index ¶
Constants ¶
View Source
const ( MaxItemsInStreamQueue = 10 StreamMaxDelay = 30 * time.Second )
Variables ¶
View Source
var (
ErrStreamQueueTimeout = fmt.Errorf("stream queue timeout")
)
Functions ¶
This section is empty.
Types ¶
type CoordinatorConnection ¶
type CoordinatorConnection interface { JoinToCoordinator(ctx context.Context) error LeaveCoordinator(ctx context.Context) error GetClient() pb.CoordinatorClient UpdateWorkerStreamStatus(ctx context.Context, workerStreamID int64, status pb.WorkerStreamStatus) error IngestMetrics(ctx context.Context, workerStreamID int64, inputEvents, processorErrors, outputEvents uint64) error }
func NewCoordinatorConnection ¶
func NewCoordinatorConnection(grpcConn *grpc.ClientConn, grpcPort uint32) CoordinatorConnection
type IngestResult ¶
type ServiceStream ¶
type ServiceStream struct { Stream *service.Stream Cancel context.CancelFunc Mux *http.ServeMux Status persistence.WorkerStreamStatus TracingSummary *service.TracingSummary }
type StreamManager ¶
type StreamManager interface { AddStream(workerStreamID int64, config string) error GetStream(workerStreamID int64) (*ServiceStream, bool) GetStreamStatus(workerStreamID int64) (*persistence.WorkerStreamStatus, error) DeleteStream(workerStreamID int64) error IngestData(workerStreamID int64, method, path, contentType string, payload []byte) (*IngestResult, error) GetAllStreams() map[int64]*ServiceStream StopAllStreams() StartStream(ctx context.Context, workerStreamID int64) }
func NewStreamManager ¶
func NewStreamManager(coordinatorConnection CoordinatorConnection, vaultProvider vault.VaultProvider) StreamManager
type StreamQueue ¶
type StreamQueue interface { AddStreamToQueue(workerStreamID int64, config string) error ConsumeStreamQueue(ctx context.Context) }
func NewStreamQueue ¶
func NewStreamQueue(streamManager StreamManager) StreamQueue
type StreamQueueItem ¶
type WorkerExecutor ¶
type WorkerExecutor interface { JoinToCoordinator(context.Context) error LeaveCoordinator(context.Context) error AddStreamToQueue(ctx context.Context, workerStreamID int64, config string) error FetchWorkerStreamStatus(ctx context.Context, workerStreamID int64) (*persistence.WorkerStreamStatus, error) DeleteWorkerStream(ctx context.Context, workerStreamID int64) error ShipLogs(context.Context) ShipMetrics(context.Context) ConsumeStreamQueue(context.Context) IngestData(ctx context.Context, workerStreamID int64, method, path, contentType string, payload []byte) (*IngestResult, error) }
func NewWorkerExecutor ¶
func NewWorkerExecutor(grpcConn *grpc.ClientConn, grpcPort uint32, vaultProvider vault.VaultProvider) WorkerExecutor
Click to show internal directories.
Click to hide internal directories.