Documentation
¶
Index ¶
- Constants
- Variables
- func ConnectToMongoDB(ctx *context.Context, config MongoDatasourceConfigs) *mongo.Client
- func ConnectToMySQL[T any](ctx *context.Context, config MySQLDatasourceConfigs[T]) *gorm.DB
- func ConnectToPostgreSQL[T any](ctx *context.Context, config PostgresDatasourceConfigs[T]) *gorm.DB
- func ConnectToRedis(ctx *context.Context, config RedisDatasourceConfigs) *redis.Client
- func ConvertBSON(val any) any
- func LoadCSV(ctx *context.Context, ds Datasource, path string, batchSize uint64) error
- func LoadParquet(ctx *context.Context, ds Datasource, path string, batchSize uint64) error
- func ParseGormFieldValue(field *schema.Field, value any) (any, error)
- func RedisOutputSchemaToJSON[V ...](schema V) map[string]any
- func SaveCSV(ctx *context.Context, ds Datasource, path string, batchSize uint64) error
- func SaveParquet(ctx *context.Context, ds Datasource, path string, batchSize uint64, ...) error
- func StreamChanges(ctx *context.Context, name string, watcher <-chan DatasourcePushRequest, ...) <-chan DatasourceStreamResult
- type Datasource
- type DatasourceActionResult
- type DatasourceExportDestination
- type DatasourceExportRequest
- type DatasourceExportType
- type DatasourceFetchRequest
- type DatasourceFetchResult
- type DatasourceImportRequest
- type DatasourceImportSource
- type DatasourceImportType
- type DatasourcePushCount
- type DatasourcePushRequest
- type DatasourceStreamRequest
- type DatasourceStreamResult
- type DatasourceTransformer
- type MemoryDatasource
- func (m *MemoryDatasource) Actions() *DatasourceActionResult
- func (m *MemoryDatasource) Clear(_ *context.Context) error
- func (m *MemoryDatasource) Close(ctx *context.Context) error
- func (m *MemoryDatasource) Count(_ *context.Context, request *DatasourceFetchRequest) uint64
- func (m *MemoryDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error
- func (m *MemoryDatasource) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
- func (m *MemoryDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error
- func (m *MemoryDatasource) Push(_ *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
- func (m *MemoryDatasource) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
- type MongoDatasource
- func (ds *MongoDatasource) Actions() *DatasourceActionResult
- func (ds *MongoDatasource) Clear(ctx *context.Context) error
- func (ds *MongoDatasource) Client() *mongo.Client
- func (ds *MongoDatasource) Close(ctx *context.Context) error
- func (ds *MongoDatasource) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
- func (ds *MongoDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error
- func (ds *MongoDatasource) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
- func (ds *MongoDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error
- func (ds *MongoDatasource) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
- func (ds *MongoDatasource) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
- type MongoDatasourceConfigs
- type MySQLConnectionInfo
- type MySQLDatasource
- func (ds *MySQLDatasource[T]) Actions() *DatasourceActionResult
- func (ds *MySQLDatasource[T]) Clear(ctx *context.Context) error
- func (ds *MySQLDatasource[T]) Close(ctx *context.Context) error
- func (ds *MySQLDatasource[T]) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
- func (ds *MySQLDatasource[T]) DB() *gorm.DB
- func (ds *MySQLDatasource[T]) Export(ctx *context.Context, request DatasourceExportRequest) error
- func (ds *MySQLDatasource[T]) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
- func (ds *MySQLDatasource[T]) Import(ctx *context.Context, request DatasourceImportRequest) error
- func (ds *MySQLDatasource[T]) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
- func (ds *MySQLDatasource[T]) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
- type MySQLDatasourceConfigs
- type MySQLDatasourceFilter
- type MySQLDatasourceTransformer
- type MySQLEventHandler
- func (h *MySQLEventHandler[T]) OnDDL(header *replication.EventHeader, nextPos mysql.Position, ...) error
- func (h *MySQLEventHandler[T]) OnGTID(header *replication.EventHeader, gtid mysql.BinlogGTIDEvent) error
- func (h *MySQLEventHandler[T]) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, ...) error
- func (h *MySQLEventHandler[T]) OnRotate(header *replication.EventHeader, e *replication.RotateEvent) error
- func (h *MySQLEventHandler[T]) OnRow(e *canal.RowsEvent) error
- func (h *MySQLEventHandler[T]) OnRowsQueryEvent(e *replication.RowsQueryEvent) error
- func (h *MySQLEventHandler[T]) OnTableChanged(header *replication.EventHeader, schema, table string) error
- func (h *MySQLEventHandler[T]) OnXID(header *replication.EventHeader, nextPos mysql.Position) error
- func (h *MySQLEventHandler[T]) String() string
- type MySQLFlavor
- type PostgresDatasource
- func (ds *PostgresDatasource[T]) Actions() *DatasourceActionResult
- func (ds *PostgresDatasource[T]) Clear(ctx *context.Context) error
- func (ds *PostgresDatasource[T]) Close(ctx *context.Context) error
- func (ds *PostgresDatasource[T]) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
- func (ds *PostgresDatasource[T]) DB() *gorm.DB
- func (ds *PostgresDatasource[T]) Export(ctx *context.Context, request DatasourceExportRequest) error
- func (ds *PostgresDatasource[T]) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
- func (ds *PostgresDatasource[T]) Import(ctx *context.Context, request DatasourceImportRequest) error
- func (ds *PostgresDatasource[T]) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
- func (ds *PostgresDatasource[T]) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
- type PostgresDatasourceConfigs
- type PostgresDatasourceFilter
- type PostgresDatasourceTransformer
- type RedisDatasource
- func (ds *RedisDatasource) Actions() *DatasourceActionResult
- func (ds *RedisDatasource) Clear(ctx *context.Context) error
- func (ds *RedisDatasource) Client() *redis.Client
- func (ds *RedisDatasource) Close(ctx *context.Context) error
- func (ds *RedisDatasource) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
- func (ds *RedisDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error
- func (ds *RedisDatasource) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
- func (ds *RedisDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error
- func (ds *RedisDatasource) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
- func (ds *RedisDatasource) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
- type RedisDatasourceConfigs
- type RedisDatasourceTransformer
- type RedisDsType
- type RedisInputSchema
- type RedisOutputHashSchema
- type RedisOutputJSONSchema
- type RedisOutputListSchema
- type RedisOutputSetSchema
- type RedisOutputSortedSetSchema
- type RedisOutputStringSchema
- type RedisSortedSetSchema
- type ReplicationEvent
- type ReplicationEventAction
Constants ¶
const DefaultWatcherBufferSize = 1024
const MongoDefaultIDField = "_id"
const MySQLDefaultIDField = "id"
const PostgresDefaultIDField = "id"
const RedisKeyspacePattern = "__keyspace@*__:"
Variables ¶
var (
ErrDatastoreClosed = errors.New("datasource closed")
)
Functions ¶
func ConnectToMongoDB ¶
func ConnectToMongoDB(ctx *context.Context, config MongoDatasourceConfigs) *mongo.Client
Connect to MongoDB
func ConnectToMySQL ¶ added in v0.7.0
Connect to MySQL using GORM
func ConnectToPostgreSQL ¶ added in v0.5.0
Connect to PostgreSQL using GORM
func ConnectToRedis ¶ added in v0.4.0
func ConnectToRedis(ctx *context.Context, config RedisDatasourceConfigs) *redis.Client
Connect to Redis
func ConvertBSON ¶ added in v0.4.0
ConvertBSON converts a BSON value to a generic any value
func LoadParquet ¶ added in v0.9.0
Load Parquet data into datasource
func ParseGormFieldValue ¶ added in v0.7.0
Parse value based on GORM field type Returns parsed value or error if parsing fails Note: This is a best-effort implementation and may not cover all edge cases. It is recommended to validate the parsed values before using them in database operations.
func RedisOutputSchemaToJSON ¶ added in v0.4.0
func RedisOutputSchemaToJSON[V RedisOutputStringSchema | RedisOutputListSchema | RedisOutputSetSchema | RedisOutputHashSchema | RedisOutputJSONSchema | RedisOutputSortedSetSchema]( schema V, ) map[string]any
RedisOutputSchemaToJSON converts Redis output schema to JSON
func SaveParquet ¶ added in v0.9.0
func SaveParquet( ctx *context.Context, ds Datasource, path string, batchSize uint64, fields map[string]reflect.Type, ) error
Save datasource into Parquet
func StreamChanges ¶
func StreamChanges( ctx *context.Context, name string, watcher <-chan DatasourcePushRequest, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
Process datasource change stream with additional features such as batching and de-duplication within a batch window
Types ¶
type Datasource ¶
type Datasource interface {
// Allowed actions
Actions() *DatasourceActionResult
// Get total count of items based on the provided request.
// Note: The count should reflect the given 'Size' and number of 'IDs'
Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
// Fetch data
Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
// Insert, Update, Delete data (in that order)
Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
// Listen to Change Data Streams or periodically watch for changes
Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
// Clear data source
Clear(ctx *context.Context) error
// Close data source
Close(ctx *context.Context) error
// Import data
Import(ctx *context.Context, request DatasourceImportRequest) error
// Export data
Export(ctx *context.Context, request DatasourceExportRequest) error
}
type DatasourceActionResult ¶ added in v0.8.0
type DatasourceExportDestination ¶ added in v0.6.0
type DatasourceExportDestination string
const (
DatasourceExportDestinationFile DatasourceExportDestination = "file"
)
type DatasourceExportRequest ¶ added in v0.6.0
type DatasourceExportRequest struct {
Type DatasourceExportType
Destination DatasourceExportDestination
Location string
BatchSize uint64
}
type DatasourceExportType ¶ added in v0.6.0
type DatasourceExportType string
const ( DatasourceExportTypeCSV DatasourceExportType = "csv" DatasourceExportTypeParquet DatasourceExportType = "parquet" )
type DatasourceFetchRequest ¶
type DatasourceFetchResult ¶
type DatasourceImportRequest ¶ added in v0.6.0
type DatasourceImportRequest struct {
Type DatasourceImportType
Source DatasourceImportSource
Location string
BatchSize uint64
}
type DatasourceImportSource ¶ added in v0.6.0
type DatasourceImportSource string
const (
DatasourceImportSourceFile DatasourceImportSource = "file"
)
type DatasourceImportType ¶ added in v0.6.0
type DatasourceImportType string
const ( DatasourceImportTypeCSV DatasourceImportType = "csv" DatasourceImportTypeParquet DatasourceImportType = "parquet" )
type DatasourcePushCount ¶
type DatasourcePushRequest ¶
type DatasourceStreamRequest ¶
type DatasourceStreamResult ¶
type DatasourceStreamResult struct {
Err error
Docs DatasourcePushRequest
}
type DatasourceTransformer ¶
type MemoryDatasource ¶
type MemoryDatasource struct {
// contains filtered or unexported fields
}
func NewMemoryDatasource ¶
func NewMemoryDatasource(name string, idField string) *MemoryDatasource
func (*MemoryDatasource) Actions ¶ added in v0.8.0
func (m *MemoryDatasource) Actions() *DatasourceActionResult
func (*MemoryDatasource) Close ¶ added in v0.4.0
func (m *MemoryDatasource) Close(ctx *context.Context) error
func (*MemoryDatasource) Count ¶
func (m *MemoryDatasource) Count(_ *context.Context, request *DatasourceFetchRequest) uint64
func (*MemoryDatasource) Export ¶ added in v0.6.0
func (m *MemoryDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error
func (*MemoryDatasource) Fetch ¶
func (m *MemoryDatasource) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
func (*MemoryDatasource) Import ¶ added in v0.6.0
func (m *MemoryDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error
func (*MemoryDatasource) Push ¶
func (m *MemoryDatasource) Push(_ *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
func (*MemoryDatasource) Watch ¶
func (m *MemoryDatasource) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
type MongoDatasource ¶
type MongoDatasource struct {
// contains filtered or unexported fields
}
func NewMongoDatasource ¶
func NewMongoDatasource(ctx *context.Context, config MongoDatasourceConfigs, ) *MongoDatasource
NewMongoDatasource creates and returns a new instance of MongoDatasource.
- ctx: Context.
- uri: Database connection uri.
- databaseName: The database name.
- collectionName: The collection name.
func (*MongoDatasource) Actions ¶ added in v0.8.0
func (ds *MongoDatasource) Actions() *DatasourceActionResult
func (*MongoDatasource) Clear ¶
func (ds *MongoDatasource) Clear(ctx *context.Context) error
Clear data source
func (*MongoDatasource) Client ¶
func (ds *MongoDatasource) Client() *mongo.Client
func (*MongoDatasource) Close ¶ added in v0.4.0
func (ds *MongoDatasource) Close(ctx *context.Context) error
Close data source
func (*MongoDatasource) Count ¶
func (ds *MongoDatasource) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
Get total count
func (*MongoDatasource) Export ¶ added in v0.6.0
func (ds *MongoDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error
Export data from data source
func (*MongoDatasource) Fetch ¶
func (ds *MongoDatasource) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
Get data
func (*MongoDatasource) Import ¶ added in v0.6.0
func (ds *MongoDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error
Import data into data source
func (*MongoDatasource) Push ¶
func (ds *MongoDatasource) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
Insert/Update/Delete data
func (*MongoDatasource) Watch ¶
func (ds *MongoDatasource) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
Listen to Change Data Streams (CDC) if available
type MongoDatasourceConfigs ¶
type MongoDatasourceConfigs struct {
URI string
DatabaseName string
CollectionName string
// MongoDB Filter Query
Filter map[string]any
// MongoDB Sort Query
Sort map[string]any
// Use accurate counting instead of estimated count (slower)
AccurateCount bool
// Provide custom mongo client
WithClient func(ctx *context.Context, uri string) (*mongo.Client, error)
// Provide custom transformer. E.g To convert item's ID to mongo ID
WithTransformer DatasourceTransformer
// Perform actions on init
OnInit func(client *mongo.Client) error
}
type MySQLConnectionInfo ¶ added in v0.7.0
MySQLConnectionInfo holds connection details parsed from DSN
type MySQLDatasource ¶ added in v0.7.0
type MySQLDatasource[T any] struct { // contains filtered or unexported fields }
func NewMySQLDatasource ¶ added in v0.7.0
func NewMySQLDatasource[T any](ctx *context.Context, config MySQLDatasourceConfigs[T]) *MySQLDatasource[T]
NewMySQLDatasource creates and returns a new instance of MySQLDatasource
func (*MySQLDatasource[T]) Actions ¶ added in v0.8.0
func (ds *MySQLDatasource[T]) Actions() *DatasourceActionResult
func (*MySQLDatasource[T]) Clear ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Clear(ctx *context.Context) error
Clear data source
func (*MySQLDatasource[T]) Close ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Close(ctx *context.Context) error
Close data source
func (*MySQLDatasource[T]) Count ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
Get total count
func (*MySQLDatasource[T]) DB ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) DB() *gorm.DB
func (*MySQLDatasource[T]) Export ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Export(ctx *context.Context, request DatasourceExportRequest) error
Export data from the data source
func (*MySQLDatasource[T]) Fetch ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
Get data
func (*MySQLDatasource[T]) Import ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Import(ctx *context.Context, request DatasourceImportRequest) error
Import data into the data source
func (*MySQLDatasource[T]) Push ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
Insert/Update/Delete data
func (*MySQLDatasource[T]) Watch ¶ added in v0.7.0
func (ds *MySQLDatasource[T]) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
Listen to binlog replication stream using go-mysql canal
type MySQLDatasourceConfigs ¶ added in v0.7.0
type MySQLDatasourceConfigs[T any] struct { // Database connection string DSN string // Table name to work with TableName string // GORM model struct for the table. // @see: https://gorm.io/docs/models.html Model *T // ID Field for the table. Default is "id". // This would very likely correspond to the primary key field in the table IDField string // Database flavor: "mysql" (default) or "mariadb" DBFlavor MySQLFlavor // Filter conditions (WHERE clause) Filter MySQLDatasourceFilter // Sort order (ORDER BY clause) Sort map[string]any // Provide custom GORM DB instance WithDB func(ctx *context.Context, dsn string) (*gorm.DB, error) // Provide custom transformer WithTransformer MySQLDatasourceTransformer[T] // Perform actions on init OnInit func(db *gorm.DB) error // Binlog replication settings DisableReplication bool BinlogStartPosition mysql.Position // Connection pool settings DBMaxIdleConns int DBMaxOpenConns int DBMaxConnLifetime time.Duration // Whether to disable auto-migration of the table schema // Use this if you want to manage the table schema manually // or if the table already exists with a different schema DisableAutoMigrate bool }
type MySQLDatasourceFilter ¶ added in v0.7.0
type MySQLDatasourceTransformer ¶ added in v0.7.0
type MySQLEventHandler ¶ added in v0.7.0
type MySQLEventHandler[T any] struct { // contains filtered or unexported fields }
MySQLEventHandler handles MySQL binlog events
func (*MySQLEventHandler[T]) OnDDL ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnDDL handles DDL events
func (*MySQLEventHandler[T]) OnGTID ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnGTID(header *replication.EventHeader, gtid mysql.BinlogGTIDEvent) error
OnGTID handles GTID events
func (*MySQLEventHandler[T]) OnPosSynced ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
OnPosSynced handles position sync events
func (*MySQLEventHandler[T]) OnRotate ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnRotate(header *replication.EventHeader, e *replication.RotateEvent) error
OnRotate handles binlog rotation events
func (*MySQLEventHandler[T]) OnRow ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnRow(e *canal.RowsEvent) error
OnRow handles row-level binlog events
func (*MySQLEventHandler[T]) OnRowsQueryEvent ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnRowsQueryEvent(e *replication.RowsQueryEvent) error
OnRowsQueryEvent handles rows query events
func (*MySQLEventHandler[T]) OnTableChanged ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnTableChanged(header *replication.EventHeader, schema, table string) error
OnTableChanged handles table structure changes
func (*MySQLEventHandler[T]) OnXID ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) OnXID(header *replication.EventHeader, nextPos mysql.Position) error
OnXID handles transaction commit events
func (*MySQLEventHandler[T]) String ¶ added in v0.7.0
func (h *MySQLEventHandler[T]) String() string
String returns string representation
type MySQLFlavor ¶ added in v0.8.0
type MySQLFlavor string
const ( MySQLFlavorDefault MySQLFlavor = "mysql" MySQLFlavorMariaDB MySQLFlavor = "mariadb" )
type PostgresDatasource ¶ added in v0.5.0
type PostgresDatasource[T any] struct { // contains filtered or unexported fields }
func NewPostgresDatasource ¶ added in v0.5.0
func NewPostgresDatasource[T any](ctx *context.Context, config PostgresDatasourceConfigs[T]) *PostgresDatasource[T]
NewPostgresDatasource creates and returns a new instance of PostgresDatasource
func (*PostgresDatasource[T]) Actions ¶ added in v0.8.0
func (ds *PostgresDatasource[T]) Actions() *DatasourceActionResult
func (*PostgresDatasource[T]) Clear ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) Clear(ctx *context.Context) error
Clear data source
func (*PostgresDatasource[T]) Close ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) Close(ctx *context.Context) error
Close data source
func (*PostgresDatasource[T]) Count ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
Get total count
func (*PostgresDatasource[T]) DB ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) DB() *gorm.DB
func (*PostgresDatasource[T]) Export ¶ added in v0.6.0
func (ds *PostgresDatasource[T]) Export(ctx *context.Context, request DatasourceExportRequest) error
Export data from the data source
func (*PostgresDatasource[T]) Fetch ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
Get data
func (*PostgresDatasource[T]) Import ¶ added in v0.6.0
func (ds *PostgresDatasource[T]) Import(ctx *context.Context, request DatasourceImportRequest) error
Import data into the data source
func (*PostgresDatasource[T]) Push ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
Insert/Update/Delete data
func (*PostgresDatasource[T]) Watch ¶ added in v0.5.0
func (ds *PostgresDatasource[T]) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
Listen to logical replication stream using pgx replication protocol
type PostgresDatasourceConfigs ¶ added in v0.5.0
type PostgresDatasourceConfigs[T any] struct { // Database connection string DSN string // Table name to work with TableName string // GORM model struct for the table. // @see: https://gorm.io/docs/models.html Model *T // ID Field for the table. Default is "id". // This would very likely correspond to the primary key field in the table IDField string // Filter conditions (WHERE clause) Filter PostgresDatasourceFilter // Sort order (ORDER BY clause) Sort map[string]any // Provide custom GORM DB instance WithDB func(ctx *context.Context, dsn string) (*gorm.DB, error) // Provide custom transformer WithTransformer PostgresDatasourceTransformer[T] // Perform actions on init OnInit func(db *gorm.DB) error // Logical replication settings PublicationName string ReplicationSlotName string DisableReplication bool // Connection pool settings DBMaxIdleConns int DBMaxOpenConns int DBMaxConnLifetime time.Duration // Whether to disable auto-migration of the table schema // Use this if you want to manage the table schema manually // or if the table already exists with a different schema DisableAutoMigrate bool }
type PostgresDatasourceFilter ¶ added in v0.5.0
type PostgresDatasourceTransformer ¶ added in v0.5.0
type RedisDatasource ¶ added in v0.4.0
type RedisDatasource struct {
// contains filtered or unexported fields
}
func NewRedisDatasource ¶ added in v0.4.0
func NewRedisDatasource(ctx *context.Context, config RedisDatasourceConfigs) *RedisDatasource
NewRedisDatasource creates and returns a new instance of RedisDatasource
func (*RedisDatasource) Actions ¶ added in v0.8.0
func (ds *RedisDatasource) Actions() *DatasourceActionResult
func (*RedisDatasource) Clear ¶ added in v0.4.0
func (ds *RedisDatasource) Clear(ctx *context.Context) error
Clear data source
func (*RedisDatasource) Client ¶ added in v0.4.0
func (ds *RedisDatasource) Client() *redis.Client
func (*RedisDatasource) Close ¶ added in v0.4.0
func (ds *RedisDatasource) Close(ctx *context.Context) error
Close data source
func (*RedisDatasource) Count ¶ added in v0.4.0
func (ds *RedisDatasource) Count(ctx *context.Context, request *DatasourceFetchRequest) uint64
Get total count
func (*RedisDatasource) Export ¶ added in v0.6.0
func (ds *RedisDatasource) Export(ctx *context.Context, request DatasourceExportRequest) error
Export data from data source
func (*RedisDatasource) Fetch ¶ added in v0.4.0
func (ds *RedisDatasource) Fetch(ctx *context.Context, request *DatasourceFetchRequest) DatasourceFetchResult
Get data
func (*RedisDatasource) Import ¶ added in v0.6.0
func (ds *RedisDatasource) Import(ctx *context.Context, request DatasourceImportRequest) error
Import data into data source
func (*RedisDatasource) Push ¶ added in v0.4.0
func (ds *RedisDatasource) Push(ctx *context.Context, request *DatasourcePushRequest) (DatasourcePushCount, error)
Insert/Update/Delete data
func (*RedisDatasource) Watch ¶ added in v0.4.0
func (ds *RedisDatasource) Watch(ctx *context.Context, request *DatasourceStreamRequest) <-chan DatasourceStreamResult
Listen to keyspace notifications if available
type RedisDatasourceConfigs ¶ added in v0.4.0
type RedisDatasourceConfigs struct {
// Redis connection URI
URI string
// Key prefix to use for saving and fetching items. E.g. `user:profile:`
KeyPrefix string
// ID Field for input JSON
IDField string
// Size of scan operations. E.g. '1000' to scan a thousand at a time. Min = 1.
ScanSize uint64
// Custom redis client provider
WithClient func(ctx *context.Context, uri string) (*redis.Client, error)
// Transform data before saving to redis
WithTransformer RedisDatasourceTransformer
// Perform actions on init
OnInit func(client *redis.Client) error
}
type RedisDatasourceTransformer ¶ added in v0.4.0
type RedisDatasourceTransformer func(data map[string]any) (RedisInputSchema, error)
type RedisDsType ¶ added in v0.4.0
type RedisDsType string
const ( RedisDsString RedisDsType = "string" RedisDsList RedisDsType = "list" RedisDsSet RedisDsType = "set" RedisDsHash RedisDsType = "hash" RedisDsJSON RedisDsType = "rejson-rl" RedisDsZSet RedisDsType = "zset" )
type RedisInputSchema ¶ added in v0.4.0
type RedisInputSchema struct {
String string
List []any
Set []string
Hash map[string]string
JSON map[string]any
SortedSet []map[string]float64
}
func JSONToRedisHashInputSchema ¶ added in v0.4.0
func JSONToRedisHashInputSchema(data map[string]any) RedisInputSchema
JSONToRedisHashInputSchema converts JSON to Redis input schema (using Redis Hash)
func JSONToRedisJSONInputSchema ¶ added in v0.4.0
func JSONToRedisJSONInputSchema(data map[string]any) RedisInputSchema
JSONToRedisJSONInputSchema converts JSON to Redis input schema (using ReJSON)
type RedisOutputHashSchema ¶ added in v0.4.0
type RedisOutputJSONSchema ¶ added in v0.4.0
type RedisOutputListSchema ¶ added in v0.4.0
type RedisOutputSetSchema ¶ added in v0.4.0
type RedisOutputSortedSetSchema ¶ added in v0.4.0
type RedisOutputSortedSetSchema struct {
Key string `json:"key"`
Members []RedisSortedSetSchema `json:"members"`
}
type RedisOutputStringSchema ¶ added in v0.4.0
type RedisSortedSetSchema ¶ added in v0.4.0
type ReplicationEvent ¶ added in v0.5.0
type ReplicationEvent struct {
Action ReplicationEventAction `json:"action"`
Data map[string]any `json:"data"`
}
type ReplicationEventAction ¶ added in v0.5.0
type ReplicationEventAction string
ReplicationEvent represents a logical replication event
const ( ReplicationEventActionInsert ReplicationEventAction = "INSERT" ReplicationEventActionUpdate ReplicationEventAction = "UPDATE" ReplicationEventActionDelete ReplicationEventAction = "DELETE" )