api

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: Apache-2.0 Imports: 3 Imported by: 40

Documentation

Index

Constants

View Source
const (
	ConnectionConnected    = "connected"
	ConnectionConnecting   = "connecting"
	ConnectionDisconnected = "disconnected"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bounded

type Bounded interface {
	SetEofIngest(eof EOFIngest)
}

Bounded means the source can have an end.

type BytesCollector

type BytesCollector interface {
	Sink
	Collect(ctx StreamContext, item RawTuple) error
}

type BytesIngest

type BytesIngest func(ctx StreamContext, payload []byte, meta map[string]any, ts time.Time)

type BytesSource

type BytesSource interface {
	Source
	Subscribe(ctx StreamContext, ingest BytesIngest, ingestError ErrorIngest) error
}

BytesSource receives the bytes payload pushed by the external source

type Closable

type Closable interface {
	Close(ctx StreamContext) error
}

type Connector

type Connector interface {
	Connect(ctx StreamContext, sch StatusChangeHandler) error
}

Connector is a source feature that allows the source to connect to the data source.

type EOFIngest

type EOFIngest func(ctx StreamContext, msg string)

type ErrorIngest

type ErrorIngest func(ctx StreamContext, err error)

type Function

type Function interface {
	// Validate The argument is a list of xsql.Expr
	Validate(args []any) error
	// Exec Execute the function, return the result and if execution is successful.
	// If execution fails, return the error and false.
	Exec(ctx FunctionContext, args []any) (interface{}, bool)
	// IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice
	IsAggregate() bool
}

type FunctionContext

type FunctionContext interface {
	StreamContext
	GetFuncId() int
}

type HasDynamicProps

type HasDynamicProps interface {
	// DynamicProps return the transformed dynamic properties (typically in sink).
	// The transformation should be done in transform op
	DynamicProps(template string) (string, bool)
	AllProps() map[string]string
}

type Logger

type Logger interface {
	Debug(args ...interface{})
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

type LookupBytesSource

type LookupBytesSource interface {
	Source
	// Lookup receive multiple rows of bytes
	Lookup(ctx StreamContext, fields []string, keys []string, values []any) ([][]byte, error)
}

LookupBytesSource looks up with the bytes payload pushed by the external source

type LookupSource

type LookupSource interface {
	Source
	// Lookup receive lookup values to construct the query and return query results
	Lookup(ctx StreamContext, fields []string, keys []string, values []any) ([]map[string]any, error)
}

LookupSource is a source feature to query the source on demand

type MessageTuple

type MessageTuple interface {
	ReadonlyMessage
}

MessageTuple is an interface of the below interfaces

type MessageTupleList

type MessageTupleList interface {
	RangeOfTuples(f func(index int, tuple MessageTuple) bool)
	Len() int
	ToMaps() []map[string]any
}

type MetaInfo

type MetaInfo interface {
	Meta(key, table string) (any, bool)
	Created() time.Time
	AllMeta() map[string]any
}

type ModuleInfo

type ModuleInfo struct {
	Id          string
	Description string
	New         func() Nodelet
}

type Nodelet

type Nodelet interface {
	// Provision is called when the node is created, usually setting the configs. Do not put time-consuming operations here.
	Provision(ctx StreamContext, configs map[string]any) error
	Closable
}

type PullBytesSource

type PullBytesSource interface {
	Source
	Pull(ctx StreamContext, trigger time.Time, ingest BytesIngest, ingestError ErrorIngest)
}

PullBytesSource fetch the bytes payload in an interval from the external source. Interval property must be defined

type PullTupleSource

type PullTupleSource interface {
	Source
	Pull(ctx StreamContext, trigger time.Time, ingest TupleIngest, ingestError ErrorIngest)
}

PullTupleSource fetch the non-bytes payload in an interval from the external source. Interval property must be defined

type RawTuple

type RawTuple interface {
	Raw() []byte
	Replace([]byte)
}

type ReadonlyMessage

type ReadonlyMessage interface {
	Value(key, table string) (any, bool)
	ToMap() map[string]any
}

ReadonlyMessage Message is the interface that wraps each record. Use this interface to exchange data between different components. It is used in sink

type Rewindable

type Rewindable interface {
	GetOffset() (any, error)
	Rewind(offset any) error
	ResetOffset(input map[string]any) error
}

Rewindable is a source feature that allows the source to rewind to a specific offset.

type Sink

type Sink interface {
	Nodelet
	Connector
}

Sink is the interface that wraps the basic Sink method. It is used to connect to the external system and send data to it. A sink must implement the Sink interface AND any collector interface. The lifecycle of a sink: Provision -> Connect -> Collect -> Close

type Source

type Source interface {
	Nodelet
	Connector
}

Source is the raw interface that wraps the basic Source method. It cannot be used independently, must implement more traits. The lifecycle of a source: Provision -> Connect -> Subscribe/Pull -> Close

type StatusChangeHandler

type StatusChangeHandler func(status string, message string)

type Store

type Store interface {
	SaveState(checkpointId int64, opId string, state map[string]interface{}) error
	// SaveCheckpoint saves the whole checkpoint state into storage
	SaveCheckpoint(checkpointId int64) error
	GetOpState(opId string) (*sync.Map, error)
	Clean() error
}

type StreamContext

type StreamContext interface {
	context.Context
	GetLogger() Logger
	GetRuleId() string
	GetOpId() string
	GetInstanceId() int
	GetRootPath() string

	WithMeta(ruleId string, opId string, store Store) StreamContext
	WithInstance(instanceId int) StreamContext
	WithCancel() (StreamContext, context.CancelFunc)
	EnableTracer(enabled bool)
	IsTraceEnabled() bool

	SetError(e error)
	// IncrCounter State handling
	IncrCounter(key string, amount int) error
	GetCounter(key string) (int, error)
	PutState(key string, value interface{}) error
	GetState(key string) (interface{}, error)
	DeleteState(key string) error
	// ParseTemplate parse the template string with the given data
	ParseTemplate(template string, data interface{}) (string, error)
	// ParseJsonPath parse the jsonPath string with the given data
	ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
}

type TupleCollector

type TupleCollector interface {
	Sink
	Collect(ctx StreamContext, item MessageTuple) error
	CollectList(ctx StreamContext, items MessageTupleList) error
}

type TupleIngest

type TupleIngest func(ctx StreamContext, data any, meta map[string]any, ts time.Time)

TupleIngest reads in a structural data or its list. It supports map and []map for now

type TupleSource

type TupleSource interface {
	Source
	Subscribe(ctx StreamContext, ingest TupleIngest, ingestError ErrorIngest) error
}

TupleSource receives the non-bytes payload pushed by the external source

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL