Documentation
¶
Index ¶
- Variables
- func AppendArrowRow(schema *arrow.Schema, recordBuilder *array.RecordBuilder, ...) error
- func ArrowArrayValueToAvroValue(arr arrow.Array, idx int) (interface{}, error)
- func ArrowToAvro(tuples arrow.Record) ([][]byte, error)
- func ArrowToAvroSchema(arrowSchema *arrow.Schema) (*goavro.Codec, error)
- func ArrowToAvroType(arrowType arrow.DataType) (string, error)
- func AvroToArrow(allocator *memory.GoAllocator, table *elements.Table, ...) (arrow.Record, error)
- func BuildTasker(ctx context.Context, logger *slog.Logger, options tasker.Options) (*tasker.Tasker, error)
- func CastAvroTypesToArrowTypes(schema *arrow.Schema, avroData map[string]interface{}) error
- func PartitionColumns(allocator *memory.GoAllocator, tuples arrow.Record, ...) ([]arrow.Array, error)
- func PartitionKeys(allocator *memory.GoAllocator, tuples arrow.Record, ...) (*array.String, error)
- func TablePartitionArrowSchema(table *elements.Table, subscriptionSourceName string) (*arrow.Schema, error)
- func TablePartitionAvroSchema(table *elements.Table, subscriptionSourceName string) (*goavro.Codec, error)
- type IInserter
- type IKeyStorage
- type ITableRegistry
- type Inserter
- func (obj *Inserter) GetPartition(ctx context.Context, tableName string, batchCount int, ...) (_ elements.Partition, _ storage.ILock, _ arrow.Record, err error)
- func (obj *Inserter) GetPartitionBatch(ctx context.Context, part elements.Partition) (_ storage.ILock, _ arrow.Record, err error)
- func (obj *Inserter) InsertTuples(ctx context.Context, tableName, sourceName string, tuples arrow.Record) error
- type InserterOptions
- type TableRegistry
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTableAlreadyAddedToRegistry = errors.New("table already added to registry") ErrTableNotFound = errors.New("table not found") ErrSubscriptionNotFound = errors.New("subscription not found") ErrColumnNotFound = errors.New("column not found") ErrTupleColumnsDifferentThanSubscription = errors.New("tuple columns different than subscription") ErrUnsupportedArrowToAvroTypeConversion = errors.New("unsupported arrow to avro type conversion") ErrMultipleColumnsFound = errors.New("multiple columns found") ErrIntegerRangeTypeNotImplemented = errors.New("integer range type not implemented") ErrNoPartitionsAvailable = errors.New("no partitions available") ErrPartitionTuplesEmpty = errors.New("partition tuples empty") ErrPartitionColumnsEmpty = errors.New("partition columns empty") )
Functions ¶
func AppendArrowRow ¶
func ArrowToAvro ¶
* Convert an arrow record to an avro array of serialized rows
func ArrowToAvroSchema ¶
func AvroToArrow ¶
func BuildTasker ¶
func PartitionColumns ¶
func PartitionColumns(allocator *memory.GoAllocator, tuples arrow.Record, columns []elements.ColumnPartition) ([]arrow.Array, error)
* Returns an arry of partition arrays for each partitioned column. They are in the order * in which the columns were passed in.
func PartitionKeys ¶
func PartitionKeys(allocator *memory.GoAllocator, tuples arrow.Record, columns []elements.ColumnPartition) (*array.String, error)
Types ¶
type IInserter ¶
type IInserter interface { GetPartitionBatch(ctx context.Context, part elements.Partition) (storage.ILock, arrow.Record, error) GetPartition(ctx context.Context, tableName string, batchCount int, batchDelay time.Duration) (elements.Partition, storage.ILock, arrow.Record, error) InsertTuples(ctx context.Context, tableName, sourceName string, tuples arrow.Record) error }
type IKeyStorage ¶
type IKeyStorage interface { AddItemsToTablePartition(context.Context, elements.Partition, [][]byte) (int64, error) GetTablePartitionItems(context.Context, elements.Partition, int) ([]string, error) GetTablePartitions(context.Context, string, uint64, int64) ([]elements.Partition, error) GetTablePartitionTimestamp(context.Context, elements.Partition) (time.Time, error) SetTablePartitionTimestamp(context.Context, elements.Partition) (bool, error) DeleteTablePartitionTimestamp(context.Context, elements.Partition) (bool, error) ClaimPartition(context.Context, elements.Partition, time.Duration) (storage.ILock, error) ReleasePartitionLock(context.Context, storage.ILock) (bool, error) }
type ITableRegistry ¶
type Inserter ¶
type Inserter struct {
// contains filtered or unexported fields
}
func NewInserter ¶
func NewInserter( logger *slog.Logger, tableRegistry ITableRegistry, keyStorage IKeyStorage, tasker *tasker.Tasker, allocator *memory.GoAllocator, options InserterOptions, ) *Inserter
func (*Inserter) GetPartition ¶
func (*Inserter) GetPartitionBatch ¶
type InserterOptions ¶
type TableRegistry ¶
type TableRegistry struct {
// contains filtered or unexported fields
}
func NewTableRegistry ¶
func NewTableRegistry(ctx context.Context, logger *slog.Logger) *TableRegistry
func (*TableRegistry) AddTables ¶
func (obj *TableRegistry) AddTables(tables ...*elements.Table) error
func (*TableRegistry) GetTable ¶
func (obj *TableRegistry) GetTable(tableName string) (*elements.Table, error)
func (*TableRegistry) TableExists ¶
func (obj *TableRegistry) TableExists(tableName string) bool
func (*TableRegistry) Tables ¶
func (obj *TableRegistry) Tables() []*elements.Table
Click to show internal directories.
Click to hide internal directories.