operations

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2024 License: Apache-2.0 Imports: 16 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTableAlreadyAddedToRegistry           = errors.New("table already added to registry")
	ErrTableNotFound                         = errors.New("table not found")
	ErrSubscriptionNotFound                  = errors.New("subscription not found")
	ErrColumnNotFound                        = errors.New("column not found")
	ErrTupleColumnsDifferentThanSubscription = errors.New("tuple columns different than subscription")
	ErrUnsupportedArrowToAvroTypeConversion  = errors.New("unsupported arrow to avro type conversion")
	ErrMultipleColumnsFound                  = errors.New("multiple columns found")
	ErrIntegerRangeTypeNotImplemented        = errors.New("integer range type not implemented")
	ErrNoPartitionsAvailable                 = errors.New("no partitions available")
	ErrPartitionTuplesEmpty                  = errors.New("partition tuples empty")
	ErrPartitionColumnsEmpty                 = errors.New("partition columns empty")
)

Functions

func AppendArrowRow

func AppendArrowRow(schema *arrow.Schema, recordBuilder *array.RecordBuilder, avroData map[string]interface{}) error

func ArrowArrayValueToAvroValue

func ArrowArrayValueToAvroValue(arr arrow.Array, idx int) (interface{}, error)

func ArrowToAvro

func ArrowToAvro(tuples arrow.Record) ([][]byte, error)

* Convert an arrow record to an avro array of serialized rows

func ArrowToAvroSchema

func ArrowToAvroSchema(arrowSchema *arrow.Schema) (*goavro.Codec, error)

func ArrowToAvroType

func ArrowToAvroType(arrowType arrow.DataType) (string, error)

func AvroToArrow

func AvroToArrow(allocator *memory.GoAllocator, table *elements.Table, subscriptionSourceName string, tuples []string) (arrow.Record, error)

func BuildTasker

func BuildTasker(ctx context.Context, logger *slog.Logger, options tasker.Options) (*tasker.Tasker, error)

func CastAvroTypesToArrowTypes

func CastAvroTypesToArrowTypes(schema *arrow.Schema, avroData map[string]interface{}) error

func PartitionColumns

func PartitionColumns(allocator *memory.GoAllocator, tuples arrow.Record, columns []elements.ColumnPartition) ([]arrow.Array, error)

* Returns an arry of partition arrays for each partitioned column. They are in the order * in which the columns were passed in.

func PartitionKeys

func PartitionKeys(allocator *memory.GoAllocator, tuples arrow.Record, columns []elements.ColumnPartition) (*array.String, error)

func TablePartitionArrowSchema

func TablePartitionArrowSchema(table *elements.Table, subscriptionSourceName string) (*arrow.Schema, error)

func TablePartitionAvroSchema

func TablePartitionAvroSchema(table *elements.Table, subscriptionSourceName string) (*goavro.Codec, error)

Types

type IInserter

type IInserter interface {
	GetPartitionBatch(ctx context.Context, part elements.Partition) (storage.ILock, arrow.Record, error)
	GetPartition(ctx context.Context, tableName string, batchCount int, batchDelay time.Duration) (elements.Partition, storage.ILock, arrow.Record, error)
	InsertTuples(ctx context.Context, tableName, sourceName string, tuples arrow.Record) error
}

type IKeyStorage

type IKeyStorage interface {
	AddItemsToTablePartition(context.Context, elements.Partition, [][]byte) (int64, error)

	GetTablePartitionItems(context.Context, elements.Partition, int) ([]string, error)
	GetTablePartitions(context.Context, string, uint64, int64) ([]elements.Partition, error)

	GetTablePartitionTimestamp(context.Context, elements.Partition) (time.Time, error)
	SetTablePartitionTimestamp(context.Context, elements.Partition) (bool, error)
	DeleteTablePartitionTimestamp(context.Context, elements.Partition) (bool, error)

	ClaimPartition(context.Context, elements.Partition, time.Duration) (storage.ILock, error)
	ReleasePartitionLock(context.Context, storage.ILock) (bool, error)
}

type ITableRegistry

type ITableRegistry interface {
	AddTables(tables ...*elements.Table) error
	GetTable(tableName string) (*elements.Table, error)
	TableExists(tableName string) bool
	Tables() []*elements.Table
}

type Inserter

type Inserter struct {
	// contains filtered or unexported fields
}

func NewInserter

func NewInserter(
	logger *slog.Logger,
	tableRegistry ITableRegistry,
	keyStorage IKeyStorage,
	tasker *tasker.Tasker,
	allocator *memory.GoAllocator,
	options InserterOptions,
) *Inserter

func (*Inserter) GetPartition

func (obj *Inserter) GetPartition(
	ctx context.Context,
	tableName string,
	batchCount int,
	batchDelay time.Duration,
) (_ elements.Partition, _ storage.ILock, _ arrow.Record, err error)

func (*Inserter) GetPartitionBatch

func (obj *Inserter) GetPartitionBatch(
	ctx context.Context,
	part elements.Partition,
) (_ storage.ILock, _ arrow.Record, err error)

func (*Inserter) InsertTuples

func (obj *Inserter) InsertTuples(ctx context.Context, tableName, sourceName string, tuples arrow.Record) error

type InserterOptions

type InserterOptions struct {
	PartitionLockDuration time.Duration
}

type TableRegistry

type TableRegistry struct {
	// contains filtered or unexported fields
}

func NewTableRegistry

func NewTableRegistry(ctx context.Context, logger *slog.Logger) *TableRegistry

func (*TableRegistry) AddTables

func (obj *TableRegistry) AddTables(tables ...*elements.Table) error

func (*TableRegistry) GetTable

func (obj *TableRegistry) GetTable(tableName string) (*elements.Table, error)

func (*TableRegistry) TableExists

func (obj *TableRegistry) TableExists(tableName string) bool

func (*TableRegistry) Tables

func (obj *TableRegistry) Tables() []*elements.Table

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL