datasources

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2025 License: MPL-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultWatcherBufferSize = 1024
View Source
const MongoDefaultIDField = "_id"
View Source
const MySQLDefaultIDField = "id"
View Source
const PostgresDefaultIDField = "id"
View Source
const RedisKeyspacePattern = "__keyspace@*__:"

Variables

View Source
var (
	ErrDatastoreClosed = errors.New("datasource closed")
)

Functions

func ConnectToMongoDB

func ConnectToMongoDB(ctx *context.Context, config MongoDatasourceConfigs) *mongo.Client

Connect to MongoDB

func ConnectToMySQL added in v0.7.0

func ConnectToMySQL[T any](ctx *context.Context, config MySQLDatasourceConfigs[T]) *gorm.DB

Connect to MySQL using GORM

func ConnectToPostgreSQL added in v0.5.0

func ConnectToPostgreSQL[T any](ctx *context.Context, config PostgresDatasourceConfigs[T]) *gorm.DB

Connect to PostgreSQL using GORM

func ConnectToRedis added in v0.4.0

func ConnectToRedis(ctx *context.Context, config RedisDatasourceConfigs) *redis.Client

Connect to Redis

func ConvertBSON added in v0.4.0

func ConvertBSON(val any) any

ConvertBSON converts a BSON value to a generic any value

func LoadCSV

func LoadCSV(
	ctx *context.Context,
	ds Datasource,
	path string,
	batchSize uint64,
) error

Load CSV data into datasource

func LoadParquet added in v0.9.0

func LoadParquet(
	ctx *context.Context,
	ds Datasource,
	path string,
	batchSize uint64,
) error

Load Parquet data into datasource

func ParseGormFieldValue added in v0.7.0

func ParseGormFieldValue(field *schema.Field, value any) (any, error)

Parse value based on GORM field type Returns parsed value or error if parsing fails Note: This is a best-effort implementation and may not cover all edge cases. It is recommended to validate the parsed values before using them in database operations.

func RedisOutputSchemaToJSON added in v0.4.0

RedisOutputSchemaToJSON converts Redis output schema to JSON

func SaveCSV added in v0.6.0

func SaveCSV(
	ctx *context.Context,
	ds Datasource,
	path string,
	batchSize uint64,
) error

Save datasource into CSV

func SaveParquet added in v0.9.0

func SaveParquet(
	ctx *context.Context,
	ds Datasource,
	path string,
	batchSize uint64,
	fields map[string]reflect.Type,
) error

Save datasource into Parquet

func StreamChanges

func StreamChanges(
	ctx *context.Context,
	name string,
	watcher <-chan DatasourcePushRequest,
	request *DatasourceStreamRequest) <-chan DatasourceStreamResult

Process datasource change stream with additional features such as batching and de-duplication within a batch window

Types

type Datasource

type Datasource interface {
	// Allowed actions
	Actions() *DatasourceActionResult

	// Get total count of items based on the provided request.
	// Note: The count should reflect the given 'Size' and number of 'IDs'
	Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
	// Fetch data
	Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
	// Insert, Update, Delete data (in that order)
	Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
	// Listen to Change Data Streams or periodically watch for changes
	Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
	// Clear data source
	Clear(ctx *context.Context) error
	// Close data source
	Close(ctx *context.Context) error

	// Import data
	Import(ctx *context.Context, request DatasourceImportRequest) error
	// Export data
	Export(ctx *context.Context, request DatasourceExportRequest) error
}

type DatasourceActionResult added in v0.8.0

type DatasourceActionResult struct {
	Read   bool
	Write  bool
	Stream bool
}

type DatasourceExportDestination added in v0.6.0

type DatasourceExportDestination string
const (
	DatasourceExportDestinationFile DatasourceExportDestination = "file"
)

type DatasourceExportRequest added in v0.6.0

type DatasourceExportRequest struct {
	Type        DatasourceExportType
	Destination DatasourceExportDestination
	Location    string
	BatchSize   uint64
}

type DatasourceExportType added in v0.6.0

type DatasourceExportType string
const (
	DatasourceExportTypeCSV     DatasourceExportType = "csv"
	DatasourceExportTypeParquet DatasourceExportType = "parquet"
)

type DatasourceFetchRequest

type DatasourceFetchRequest struct {
	// List of IDs to fetch (optional)
	IDs []string
	// Number of items to fetch. 0 to fetch all (optional)
	Size uint64
	// Offset to start fetching from. 0 to start from beginning (optional)
	Offset uint64
}

type DatasourceFetchResult

type DatasourceFetchResult struct {
	Err  error
	Docs []map[string]any
	// Start offset: where first offset = 0
	Start uint64
	// End offset: where first offset = 0
	End uint64
}

type DatasourceImportRequest added in v0.6.0

type DatasourceImportRequest struct {
	Type      DatasourceImportType
	Source    DatasourceImportSource
	Location  string
	BatchSize uint64
}

type DatasourceImportSource added in v0.6.0

type DatasourceImportSource string
const (
	DatasourceImportSourceFile DatasourceImportSource = "file"
)

type DatasourceImportType added in v0.6.0

type DatasourceImportType string
const (
	DatasourceImportTypeCSV     DatasourceImportType = "csv"
	DatasourceImportTypeParquet DatasourceImportType = "parquet"
)

type DatasourcePushCount

type DatasourcePushCount struct {
	Inserts uint64
	Updates uint64
	Deletes uint64
}

type DatasourcePushRequest

type DatasourcePushRequest struct {
	Inserts []map[string]any
	Updates []map[string]any
	Deletes []string
}

type DatasourceStreamRequest

type DatasourceStreamRequest struct {
	// Number of items to batch. 0 to disable batching (optional)
	BatchSize uint64
	// How long to wait to accumulate batch (optional)
	BatchWindowSeconds uint64
}

type DatasourceStreamResult

type DatasourceStreamResult struct {
	Err  error
	Docs DatasourcePushRequest
}

type DatasourceTransformer

type DatasourceTransformer func(data map[string]any) (map[string]any, error)

type MemoryDatasource

type MemoryDatasource struct {
	// contains filtered or unexported fields
}

func NewMemoryDatasource

func NewMemoryDatasource(name string, idField string) *MemoryDatasource

func (*MemoryDatasource) Actions added in v0.8.0

func (*MemoryDatasource) Clear

func (m *MemoryDatasource) Clear(_ *context.Context) error

func (*MemoryDatasource) Close added in v0.4.0

func (m *MemoryDatasource) Close(ctx *context.Context) error

func (*MemoryDatasource) Count

func (*MemoryDatasource) Export added in v0.6.0

func (*MemoryDatasource) Fetch

func (*MemoryDatasource) Import added in v0.6.0

func (*MemoryDatasource) Push

func (*MemoryDatasource) Watch

type MongoDatasource

type MongoDatasource struct {
	// contains filtered or unexported fields
}

func NewMongoDatasource

func NewMongoDatasource(ctx *context.Context,
	config MongoDatasourceConfigs,
) *MongoDatasource

NewMongoDatasource creates and returns a new instance of MongoDatasource.

  • ctx: Context.
  • uri: Database connection uri.
  • databaseName: The database name.
  • collectionName: The collection name.

func (*MongoDatasource) Actions added in v0.8.0

func (ds *MongoDatasource) Actions() *DatasourceActionResult

func (*MongoDatasource) Clear

func (ds *MongoDatasource) Clear(ctx *context.Context) error

Clear data source

func (*MongoDatasource) Client

func (ds *MongoDatasource) Client() *mongo.Client

func (*MongoDatasource) Close added in v0.4.0

func (ds *MongoDatasource) Close(ctx *context.Context) error

Close data source

func (*MongoDatasource) Count

func (ds *MongoDatasource) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64

Get total count

func (*MongoDatasource) Export added in v0.6.0

func (ds *MongoDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error

Export data from data source

func (*MongoDatasource) Fetch

Get data

func (*MongoDatasource) Import added in v0.6.0

func (ds *MongoDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error

Import data into data source

func (*MongoDatasource) Push

Insert/Update/Delete data

func (*MongoDatasource) Watch

Listen to Change Data Streams (CDC) if available

type MongoDatasourceConfigs

type MongoDatasourceConfigs struct {
	URI            string
	DatabaseName   string
	CollectionName string

	// MongoDB Filter Query
	Filter map[string]any
	// MongoDB Sort Query
	Sort map[string]any
	// Use accurate counting instead of estimated count (slower)
	AccurateCount bool

	// Provide custom mongo client
	WithClient func(ctx *context.Context, uri string) (*mongo.Client, error)
	// Provide custom transformer. E.g To convert item's ID to mongo ID
	WithTransformer DatasourceTransformer
	// Perform actions on init
	OnInit func(client *mongo.Client) error
}

type MySQLConnectionInfo added in v0.7.0

type MySQLConnectionInfo struct {
	Addr     string
	User     string
	Password string
	Database string
}

MySQLConnectionInfo holds connection details parsed from DSN

type MySQLDatasource added in v0.7.0

type MySQLDatasource[T any] struct {
	// contains filtered or unexported fields
}

func NewMySQLDatasource added in v0.7.0

func NewMySQLDatasource[T any](ctx *context.Context, config MySQLDatasourceConfigs[T]) *MySQLDatasource[T]

NewMySQLDatasource creates and returns a new instance of MySQLDatasource

func (*MySQLDatasource[T]) Actions added in v0.8.0

func (ds *MySQLDatasource[T]) Actions() *DatasourceActionResult

func (*MySQLDatasource[T]) Clear added in v0.7.0

func (ds *MySQLDatasource[T]) Clear(ctx *context.Context) error

Clear data source

func (*MySQLDatasource[T]) Close added in v0.7.0

func (ds *MySQLDatasource[T]) Close(ctx *context.Context) error

Close data source

func (*MySQLDatasource[T]) Count added in v0.7.0

func (ds *MySQLDatasource[T]) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64

Get total count

func (*MySQLDatasource[T]) DB added in v0.7.0

func (ds *MySQLDatasource[T]) DB() *gorm.DB

func (*MySQLDatasource[T]) Export added in v0.7.0

func (ds *MySQLDatasource[T]) Export(ctx *context.Context, request DatasourceExportRequest) error

Export data from the data source

func (*MySQLDatasource[T]) Fetch added in v0.7.0

Get data

func (*MySQLDatasource[T]) Import added in v0.7.0

func (ds *MySQLDatasource[T]) Import(ctx *context.Context, request DatasourceImportRequest) error

Import data into the data source

func (*MySQLDatasource[T]) Push added in v0.7.0

Insert/Update/Delete data

func (*MySQLDatasource[T]) Watch added in v0.7.0

Listen to binlog replication stream using go-mysql canal

type MySQLDatasourceConfigs added in v0.7.0

type MySQLDatasourceConfigs[T any] struct {
	// Database connection string
	DSN string
	// Table name to work with
	TableName string
	// GORM model struct for the table.
	// @see: https://gorm.io/docs/models.html
	Model *T
	// ID Field for the table. Default is "id".
	// This would very likely correspond to the primary key field in the table
	IDField string
	// Database flavor: "mysql" (default) or "mariadb"
	DBFlavor MySQLFlavor

	// Filter conditions (WHERE clause)
	Filter MySQLDatasourceFilter
	// Sort order (ORDER BY clause)
	Sort map[string]any

	// Provide custom GORM DB instance
	WithDB func(ctx *context.Context, dsn string) (*gorm.DB, error)
	// Provide custom transformer
	WithTransformer MySQLDatasourceTransformer[T]
	// Perform actions on init
	OnInit func(db *gorm.DB) error

	// Binlog replication settings
	DisableReplication  bool
	BinlogStartPosition mysql.Position

	// Connection pool settings
	DBMaxIdleConns    int
	DBMaxOpenConns    int
	DBMaxConnLifetime time.Duration

	// Whether to disable auto-migration of the table schema
	// Use this if you want to manage the table schema manually
	//   or if the table already exists with a different schema
	DisableAutoMigrate bool
}

type MySQLDatasourceFilter added in v0.7.0

type MySQLDatasourceFilter struct {
	// SQL WHERE clause (without the "WHERE" keyword)
	// Use "?" for parameters, e.g. "status = ? AND age > ?"
	Query string
	// Query parameters
	Params []any
}

type MySQLDatasourceTransformer added in v0.7.0

type MySQLDatasourceTransformer[T any] func(data map[string]any) (T, error)

type MySQLEventHandler added in v0.7.0

type MySQLEventHandler[T any] struct {
	// contains filtered or unexported fields
}

MySQLEventHandler handles MySQL binlog events

func (*MySQLEventHandler[T]) OnDDL added in v0.7.0

func (h *MySQLEventHandler[T]) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error

OnDDL handles DDL events

func (*MySQLEventHandler[T]) OnGTID added in v0.7.0

OnGTID handles GTID events

func (*MySQLEventHandler[T]) OnPosSynced added in v0.7.0

func (h *MySQLEventHandler[T]) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error

OnPosSynced handles position sync events

func (*MySQLEventHandler[T]) OnRotate added in v0.7.0

OnRotate handles binlog rotation events

func (*MySQLEventHandler[T]) OnRow added in v0.7.0

func (h *MySQLEventHandler[T]) OnRow(e *canal.RowsEvent) error

OnRow handles row-level binlog events

func (*MySQLEventHandler[T]) OnRowsQueryEvent added in v0.7.0

func (h *MySQLEventHandler[T]) OnRowsQueryEvent(e *replication.RowsQueryEvent) error

OnRowsQueryEvent handles rows query events

func (*MySQLEventHandler[T]) OnTableChanged added in v0.7.0

func (h *MySQLEventHandler[T]) OnTableChanged(header *replication.EventHeader, schema, table string) error

OnTableChanged handles table structure changes

func (*MySQLEventHandler[T]) OnXID added in v0.7.0

func (h *MySQLEventHandler[T]) OnXID(header *replication.EventHeader, nextPos mysql.Position) error

OnXID handles transaction commit events

func (*MySQLEventHandler[T]) String added in v0.7.0

func (h *MySQLEventHandler[T]) String() string

String returns string representation

type MySQLFlavor added in v0.8.0

type MySQLFlavor string
const (
	MySQLFlavorDefault MySQLFlavor = "mysql"
	MySQLFlavorMariaDB MySQLFlavor = "mariadb"
)

type PostgresDatasource added in v0.5.0

type PostgresDatasource[T any] struct {
	// contains filtered or unexported fields
}

func NewPostgresDatasource added in v0.5.0

func NewPostgresDatasource[T any](ctx *context.Context, config PostgresDatasourceConfigs[T]) *PostgresDatasource[T]

NewPostgresDatasource creates and returns a new instance of PostgresDatasource

func (*PostgresDatasource[T]) Actions added in v0.8.0

func (ds *PostgresDatasource[T]) Actions() *DatasourceActionResult

func (*PostgresDatasource[T]) Clear added in v0.5.0

func (ds *PostgresDatasource[T]) Clear(ctx *context.Context) error

Clear data source

func (*PostgresDatasource[T]) Close added in v0.5.0

func (ds *PostgresDatasource[T]) Close(ctx *context.Context) error

Close data source

func (*PostgresDatasource[T]) Count added in v0.5.0

func (ds *PostgresDatasource[T]) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64

Get total count

func (*PostgresDatasource[T]) DB added in v0.5.0

func (ds *PostgresDatasource[T]) DB() *gorm.DB

func (*PostgresDatasource[T]) Export added in v0.6.0

func (ds *PostgresDatasource[T]) Export(ctx *context.Context, request DatasourceExportRequest) error

Export data from the data source

func (*PostgresDatasource[T]) Fetch added in v0.5.0

Get data

func (*PostgresDatasource[T]) Import added in v0.6.0

func (ds *PostgresDatasource[T]) Import(ctx *context.Context, request DatasourceImportRequest) error

Import data into the data source

func (*PostgresDatasource[T]) Push added in v0.5.0

Insert/Update/Delete data

func (*PostgresDatasource[T]) Watch added in v0.5.0

Listen to logical replication stream using pgx replication protocol

type PostgresDatasourceConfigs added in v0.5.0

type PostgresDatasourceConfigs[T any] struct {
	// Database connection string
	DSN string
	// Table name to work with
	TableName string
	// GORM model struct for the table.
	// @see: https://gorm.io/docs/models.html
	Model *T
	// ID Field for the table. Default is "id".
	// This would very likely correspond to the primary key field in the table
	IDField string

	// Filter conditions (WHERE clause)
	Filter PostgresDatasourceFilter
	// Sort order (ORDER BY clause)
	Sort map[string]any

	// Provide custom GORM DB instance
	WithDB func(ctx *context.Context, dsn string) (*gorm.DB, error)
	// Provide custom transformer
	WithTransformer PostgresDatasourceTransformer[T]
	// Perform actions on init
	OnInit func(db *gorm.DB) error

	// Logical replication settings
	PublicationName     string
	ReplicationSlotName string
	DisableReplication  bool

	// Connection pool settings
	DBMaxIdleConns    int
	DBMaxOpenConns    int
	DBMaxConnLifetime time.Duration

	// Whether to disable auto-migration of the table schema
	// Use this if you want to manage the table schema manually
	//   or if the table already exists with a different schema
	DisableAutoMigrate bool
}

type PostgresDatasourceFilter added in v0.5.0

type PostgresDatasourceFilter struct {
	// SQL WHERE clause (without the "WHERE" keyword)
	// Use "?" for parameters, e.g. "status = ? AND age > ?"
	Query string
	// Query parameters
	Params []any
}

type PostgresDatasourceTransformer added in v0.5.0

type PostgresDatasourceTransformer[T any] func(data map[string]any) (T, error)

type RedisDatasource added in v0.4.0

type RedisDatasource struct {
	// contains filtered or unexported fields
}

func NewRedisDatasource added in v0.4.0

func NewRedisDatasource(ctx *context.Context, config RedisDatasourceConfigs) *RedisDatasource

NewRedisDatasource creates and returns a new instance of RedisDatasource

func (*RedisDatasource) Actions added in v0.8.0

func (ds *RedisDatasource) Actions() *DatasourceActionResult

func (*RedisDatasource) Clear added in v0.4.0

func (ds *RedisDatasource) Clear(ctx *context.Context) error

Clear data source

func (*RedisDatasource) Client added in v0.4.0

func (ds *RedisDatasource) Client() *redis.Client

func (*RedisDatasource) Close added in v0.4.0

func (ds *RedisDatasource) Close(ctx *context.Context) error

Close data source

func (*RedisDatasource) Count added in v0.4.0

func (ds *RedisDatasource) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64

Get total count

func (*RedisDatasource) Export added in v0.6.0

func (ds *RedisDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error

Export data from data source

func (*RedisDatasource) Fetch added in v0.4.0

Get data

func (*RedisDatasource) Import added in v0.6.0

func (ds *RedisDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error

Import data into data source

func (*RedisDatasource) Push added in v0.4.0

Insert/Update/Delete data

func (*RedisDatasource) Watch added in v0.4.0

Listen to keyspace notifications if available

type RedisDatasourceConfigs added in v0.4.0

type RedisDatasourceConfigs struct {
	// Redis connection URI
	URI string
	// Key prefix to use for saving and fetching items. E.g. `user:profile:`
	KeyPrefix string
	// ID Field for input JSON
	IDField string
	// Size of scan operations. E.g. '1000' to scan a thousand at a time. Min = 1.
	ScanSize uint64

	// Custom redis client provider
	WithClient func(ctx *context.Context, uri string) (*redis.Client, error)
	// Transform data before saving to redis
	WithTransformer RedisDatasourceTransformer
	// Perform actions on init
	OnInit func(client *redis.Client) error
}

type RedisDatasourceTransformer added in v0.4.0

type RedisDatasourceTransformer func(data map[string]any) (RedisInputSchema, error)

type RedisDsType added in v0.4.0

type RedisDsType string
const (
	RedisDsString RedisDsType = "string"
	RedisDsList   RedisDsType = "list"
	RedisDsSet    RedisDsType = "set"
	RedisDsHash   RedisDsType = "hash"
	RedisDsJSON   RedisDsType = "rejson-rl"
	RedisDsZSet   RedisDsType = "zset"
)

type RedisInputSchema added in v0.4.0

type RedisInputSchema struct {
	String    string
	List      []any
	Set       []string
	Hash      map[string]string
	JSON      map[string]any
	SortedSet []map[string]float64
}

func JSONToRedisHashInputSchema added in v0.4.0

func JSONToRedisHashInputSchema(data map[string]any) RedisInputSchema

JSONToRedisHashInputSchema converts JSON to Redis input schema (using Redis Hash)

func JSONToRedisJSONInputSchema added in v0.4.0

func JSONToRedisJSONInputSchema(data map[string]any) RedisInputSchema

JSONToRedisJSONInputSchema converts JSON to Redis input schema (using ReJSON)

type RedisOutputHashSchema added in v0.4.0

type RedisOutputHashSchema map[string]string

type RedisOutputJSONSchema added in v0.4.0

type RedisOutputJSONSchema map[string]any

type RedisOutputListSchema added in v0.4.0

type RedisOutputListSchema struct {
	Key     string   `json:"key"`
	Members []string `json:"array"`
}

type RedisOutputSetSchema added in v0.4.0

type RedisOutputSetSchema struct {
	Key     string   `json:"key"`
	Members []string `json:"members"`
}

type RedisOutputSortedSetSchema added in v0.4.0

type RedisOutputSortedSetSchema struct {
	Key     string                 `json:"key"`
	Members []RedisSortedSetSchema `json:"members"`
}

type RedisOutputStringSchema added in v0.4.0

type RedisOutputStringSchema struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type RedisSortedSetSchema added in v0.4.0

type RedisSortedSetSchema struct {
	Value string      `json:"value"`
	Score json.Number `json:"score"`
}

type ReplicationEvent added in v0.5.0

type ReplicationEvent struct {
	Action ReplicationEventAction `json:"action"`
	Data   map[string]any         `json:"data"`
}

type ReplicationEventAction added in v0.5.0

type ReplicationEventAction string

ReplicationEvent represents a logical replication event

const (
	ReplicationEventActionInsert ReplicationEventAction = "INSERT"
	ReplicationEventActionUpdate ReplicationEventAction = "UPDATE"
	ReplicationEventActionDelete ReplicationEventAction = "DELETE"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL