cdc

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package cdc provides Change Data Capture functionality for real-time data replication

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchEventHandler

type BatchEventHandler func(ctx context.Context, events []ChangeEvent) error

BatchEventHandler is a function type for handling batches of change events

type CDCConfig

type CDCConfig struct {
	Type          ConnectorType          `json:"type" yaml:"type"`
	ConnectionStr string                 `json:"connection_string" yaml:"connection_string"`
	Database      string                 `json:"database" yaml:"database"`
	Tables        []string               `json:"tables" yaml:"tables"`
	StartPosition *Position              `json:"start_position,omitempty" yaml:"start_position,omitempty"`
	BatchSize     int                    `json:"batch_size" yaml:"batch_size"`
	PollInterval  time.Duration          `json:"poll_interval" yaml:"poll_interval"`
	BufferSize    int                    `json:"buffer_size" yaml:"buffer_size"`
	Options       map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
}

CDCConfig contains configuration for CDC connectors

func (*CDCConfig) Validate

func (c *CDCConfig) Validate() error

Validate validates the CDC configuration

type CDCConnector

type CDCConnector interface {
	// Connect establishes connection to the data source
	Connect(config CDCConfig) error

	// Subscribe starts listening to changes on specified tables/collections
	Subscribe(tables []string) error

	// ReadChanges returns a channel of change events
	ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)

	// GetPosition returns the current replication position
	GetPosition() Position

	// Acknowledge confirms processing of events up to the given position
	Acknowledge(position Position) error

	// Stop gracefully shuts down the connector
	Stop() error

	// Health returns the health status of the connector
	Health() HealthStatus
}

CDCConnector defines the interface for Change Data Capture connectors

type ChangeEvent

type ChangeEvent struct {
	ID            string                 `json:"id"`
	Operation     OperationType          `json:"operation"`
	Database      string                 `json:"database"`
	Table         string                 `json:"table"`
	Schema        string                 `json:"schema,omitempty"`
	Before        map[string]interface{} `json:"before,omitempty"`
	After         map[string]interface{} `json:"after,omitempty"`
	Timestamp     time.Time              `json:"timestamp"`
	Position      Position               `json:"position"`
	TransactionID string                 `json:"transaction_id,omitempty"`

	// Additional metadata
	Metadata map[string]interface{} `json:"metadata,omitempty"`
	Source   SourceInfo             `json:"source"`
}

ChangeEvent represents a single change event from the database

func (*ChangeEvent) ConvertToRecord

func (ce *ChangeEvent) ConvertToRecord() (*models.Record, error)

ConvertToRecord converts a ChangeEvent to the unified Nebula Record

type Checkpoint

type Checkpoint struct {
	ID          string                 `json:"id"`
	Position    Position               `json:"position"`
	Timestamp   time.Time              `json:"timestamp"`
	EventCount  int64                  `json:"event_count"`
	ProcessedAt time.Time              `json:"processed_at"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

Checkpoint represents a savepoint in the event stream

type CheckpointStorage

type CheckpointStorage interface {
	Save(checkpoint Checkpoint) error
	Load() (Checkpoint, error)
	Delete(checkpointID string) error
	List() ([]Checkpoint, error)
}

CheckpointStorage defines the interface for checkpoint storage

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

Checkpointer manages checkpoints for stream processing

func NewCheckpointer

func NewCheckpointer(storage CheckpointStorage, interval time.Duration, logger *zap.Logger) *Checkpointer

NewCheckpointer creates a new checkpointer

func (*Checkpointer) Checkpoint

func (c *Checkpointer) Checkpoint() error

Checkpoint saves the current processing state

func (*Checkpointer) GetLastCheckpoint

func (c *Checkpointer) GetLastCheckpoint() Checkpoint

GetLastCheckpoint returns the last saved checkpoint

type ColumnInfo

type ColumnInfo struct {
	Name     string
	Type     string
	Nullable bool
	Default  interface{}
	OID      uint32
}

ColumnInfo represents PostgreSQL column information

type ComponentHealth

type ComponentHealth struct {
	Name         string        `json:"name"`
	Status       string        `json:"status"`
	LastCheck    time.Time     `json:"last_check"`
	FailureCount int           `json:"failure_count"`
	SuccessCount int           `json:"success_count"`
	LastError    string        `json:"last_error,omitempty"`
	ResponseTime time.Duration `json:"response_time"`
}

ComponentHealth tracks the health of a single component

type ConnectorType

type ConnectorType string

ConnectorType represents the type of CDC connector

const (
	ConnectorPostgreSQL ConnectorType = "postgresql"
	ConnectorMySQL      ConnectorType = "mysql"
	ConnectorMongoDB    ConnectorType = "mongodb"
	ConnectorKafka      ConnectorType = "kafka"
)

type DeadLetterQueue

type DeadLetterQueue interface {
	Send(task ProcessingTask, err error) error
	Read(limit int) ([]ProcessingTask, error)
	Acknowledge(taskID string) error
	GetStats() DeadLetterStats
}

DeadLetterQueue defines the interface for handling failed events

type DeadLetterStats

type DeadLetterStats struct {
	TotalEvents     int64     `json:"total_events"`
	PendingEvents   int64     `json:"pending_events"`
	ProcessedEvents int64     `json:"processed_events"`
	OldestEvent     time.Time `json:"oldest_event"`
	LastAdded       time.Time `json:"last_added"`
}

DeadLetterStats contains statistics for the dead letter queue

type EventFilter

type EventFilter struct {
	IncludeTables []string          `json:"include_tables,omitempty"`
	ExcludeTables []string          `json:"exclude_tables,omitempty"`
	Operations    []OperationType   `json:"operations,omitempty"`
	Conditions    []FilterCondition `json:"conditions,omitempty"`
}

EventFilter defines filtering criteria for change events

func (*EventFilter) AddCondition

func (f *EventFilter) AddCondition(field, operator string, value interface{})

AddCondition adds a filter condition

func (*EventFilter) ShouldInclude

func (f *EventFilter) ShouldInclude(event ChangeEvent) bool

ShouldInclude checks if an event should be included based on the filter

type EventHandler

type EventHandler func(ctx context.Context, event ChangeEvent) error

EventHandler is a function type for handling change events

type EventMetrics

type EventMetrics struct {
	EventsReceived    int64         `json:"events_received"`
	EventsProcessed   int64         `json:"events_processed"`
	EventsFiltered    int64         `json:"events_filtered"`
	EventsErrored     int64         `json:"events_errored"`
	ProcessingLatency time.Duration `json:"processing_latency"`
	ThroughputRPS     float64       `json:"throughput_rps"`
	LastEventTime     time.Time     `json:"last_event_time"`
	BacklogSize       int64         `json:"backlog_size"`
}

EventMetrics contains metrics for CDC operations

type EventProcessor

type EventProcessor interface {
	Process(ctx context.Context, event ChangeEvent) error
	ProcessBatch(ctx context.Context, events []ChangeEvent) error
	Stop() error
}

EventProcessor defines the interface for processing change events

type FilterCondition

type FilterCondition struct {
	Field    string      `json:"field"`
	Operator string      `json:"operator"` // eq, ne, lt, gt, in, like, etc.
	Value    interface{} `json:"value"`
}

FilterCondition represents a single filtering condition

type HandlerMetrics

type HandlerMetrics struct {
	EventsProcessed int64         `json:"events_processed"`
	EventsErrored   int64         `json:"events_errored"`
	AverageLatency  time.Duration `json:"average_latency"`
	LastExecution   time.Time     `json:"last_execution"`
}

HandlerMetrics contains metrics for a specific handler

type HealthCheckConfig

type HealthCheckConfig struct {
	Enabled          bool          `json:"enabled"`
	Interval         time.Duration `json:"interval"`
	Timeout          time.Duration `json:"timeout"`
	FailureThreshold int           `json:"failure_threshold"`
	SuccessThreshold int           `json:"success_threshold"`
}

HealthCheckConfig contains health monitoring configuration

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor monitors the health of CDC components

func NewHealthMonitor

func NewHealthMonitor(config HealthCheckConfig, manager *Manager, logger *zap.Logger) *HealthMonitor

NewHealthMonitor creates a new health monitor

func (*HealthMonitor) GetComponentHealth

func (hm *HealthMonitor) GetComponentHealth() map[string]*ComponentHealth

GetComponentHealth returns the health of all components

func (*HealthMonitor) Start

func (hm *HealthMonitor) Start()

Start starts the health monitor

func (*HealthMonitor) Stop

func (hm *HealthMonitor) Stop()

Stop stops the health monitor

type HealthStatus

type HealthStatus struct {
	Status     string                 `json:"status"`
	Message    string                 `json:"message,omitempty"`
	LastEvent  time.Time              `json:"last_event"`
	EventCount int64                  `json:"event_count"`
	ErrorCount int64                  `json:"error_count"`
	Lag        time.Duration          `json:"lag,omitempty"`
	Details    map[string]interface{} `json:"details,omitempty"`
}

HealthStatus represents the health of a CDC connector

func (HealthStatus) IsHealthy

func (h HealthStatus) IsHealthy() bool

IsHealthy returns true if the health status indicates the connector is healthy

type JSONMessageDeserializer

type JSONMessageDeserializer struct{}

JSONMessageDeserializer provides JSON deserialization

func (*JSONMessageDeserializer) ContentType

func (d *JSONMessageDeserializer) ContentType() string

ContentType returns the content type

func (*JSONMessageDeserializer) Deserialize

func (d *JSONMessageDeserializer) Deserialize(data []byte) (ChangeEvent, error)

Deserialize deserializes JSON data to a ChangeEvent

type JSONMessageSerializer

type JSONMessageSerializer struct{}

JSONMessageSerializer provides JSON serialization

func (*JSONMessageSerializer) ContentType

func (s *JSONMessageSerializer) ContentType() string

ContentType returns the content type

func (*JSONMessageSerializer) Serialize

func (s *JSONMessageSerializer) Serialize(event ChangeEvent) ([]byte, error)

Serialize serializes a ChangeEvent to JSON

func (*JSONMessageSerializer) SerializeKey

func (s *JSONMessageSerializer) SerializeKey(event ChangeEvent) ([]byte, error)

SerializeKey serializes the event key

type KafkaConfig

type KafkaConfig struct {
	Brokers               []string `json:"brokers"`
	SecurityProtocol      string   `json:"security_protocol"`
	SASLMechanism         string   `json:"sasl_mechanism"`
	SASLUsername          string   `json:"sasl_username"`
	SASLPassword          string   `json:"sasl_password"`
	EnableTLS             bool     `json:"enable_tls"`
	TLSInsecureSkipVerify bool     `json:"tls_insecure_skip_verify"`

	// Producer settings
	ProducerAcks        string `json:"producer_acks"` // all, 1, 0
	ProducerRetries     int    `json:"producer_retries"`
	ProducerBatchSize   int    `json:"producer_batch_size"`
	ProducerLingerMS    int    `json:"producer_linger_ms"`
	ProducerCompression string `json:"producer_compression"` // none, gzip, snappy, lz4
	EnableIdempotence   bool   `json:"enable_idempotence"`
	TransactionalID     string `json:"transactional_id"`

	// Consumer settings
	ConsumerGroupID     string `json:"consumer_group_id"`
	AutoOffsetReset     string `json:"auto_offset_reset"` // earliest, latest
	EnableAutoCommit    bool   `json:"enable_auto_commit"`
	SessionTimeoutMS    int    `json:"session_timeout_ms"`
	HeartbeatIntervalMS int    `json:"heartbeat_interval_ms"`
	MaxPollRecords      int    `json:"max_poll_records"`

	// Topic settings
	TopicPrefix  string            `json:"topic_prefix"`
	TopicSuffix  string            `json:"topic_suffix"`
	TopicMapping map[string]string `json:"topic_mapping"`
	DefaultTopic string            `json:"default_topic"`

	// Message settings
	MessageFormat   string `json:"message_format"` // json, avro, protobuf
	IncludeSchema   bool   `json:"include_schema"`
	CompressionType string `json:"compression_type"`

	// Exactly-once settings
	ExactlyOnce          bool `json:"exactly_once"`
	TransactionTimeoutMS int  `json:"transaction_timeout_ms"`
}

KafkaConfig contains Kafka-specific configuration

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

KafkaConsumer provides Kafka CDC event consumption

func NewKafkaConsumer

func NewKafkaConsumer(config KafkaConfig, logger *zap.Logger) *KafkaConsumer

NewKafkaConsumer creates a new Kafka consumer

func (*KafkaConsumer) Cleanup

Cleanup implements sarama.ConsumerGroupHandler

func (*KafkaConsumer) Close

func (kc *KafkaConsumer) Close() error

Close closes the Kafka consumer

func (*KafkaConsumer) ConsumeClaim

func (kc *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim implements sarama.ConsumerGroupHandler

func (*KafkaConsumer) GetMetrics

func (kc *KafkaConsumer) GetMetrics() KafkaMetrics

GetMetrics returns Kafka consumer metrics

func (*KafkaConsumer) Setup

Setup implements sarama.ConsumerGroupHandler

func (*KafkaConsumer) Subscribe

func (kc *KafkaConsumer) Subscribe(topics []string, handler EventHandler) error

Subscribe subscribes to Kafka topics and starts consuming

type KafkaMessage

type KafkaMessage struct {
	Key       string            `json:"key"`
	Value     ChangeEvent       `json:"value"`
	Headers   map[string]string `json:"headers"`
	Timestamp time.Time         `json:"timestamp"`
}

KafkaMessage represents a message sent to/from Kafka

type KafkaMetrics

type KafkaMetrics struct {
	MessagesProduced      int64         `json:"messages_produced"`
	MessagesConsumed      int64         `json:"messages_consumed"`
	MessagesFailed        int64         `json:"messages_failed"`
	MessagesRetried       int64         `json:"messages_retried"`
	BytesProduced         int64         `json:"bytes_produced"`
	BytesConsumed         int64         `json:"bytes_consumed"`
	ProducerLatency       time.Duration `json:"producer_latency"`
	ConsumerLatency       time.Duration `json:"consumer_latency"`
	TransactionsCommitted int64         `json:"transactions_committed"`
	TransactionsAborted   int64         `json:"transactions_aborted"`
	LastProducedTime      time.Time     `json:"last_produced_time"`
	LastConsumedTime      time.Time     `json:"last_consumed_time"`
}

KafkaMetrics contains Kafka-specific metrics

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

KafkaProducer provides Kafka integration for CDC events with exactly-once semantics

func NewKafkaProducer

func NewKafkaProducer(config KafkaConfig, logger *zap.Logger) *KafkaProducer

NewKafkaProducer creates a new Kafka producer

func (*KafkaProducer) Close

func (kp *KafkaProducer) Close() error

Close closes the Kafka producer

func (*KafkaProducer) Connect

func (kp *KafkaProducer) Connect() error

Connect establishes connection to Kafka

func (*KafkaProducer) GetMetrics

func (kp *KafkaProducer) GetMetrics() KafkaMetrics

GetMetrics returns Kafka producer metrics

func (*KafkaProducer) ProduceEvent

func (kp *KafkaProducer) ProduceEvent(ctx context.Context, event ChangeEvent) error

ProduceEvent produces a single CDC event to Kafka

func (*KafkaProducer) ProduceEvents

func (kp *KafkaProducer) ProduceEvents(ctx context.Context, events []ChangeEvent) error

ProduceEvents produces multiple CDC events to Kafka with exactly-once semantics

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates multiple CDC connectors and event processing

func NewManager

func NewManager(config ManagerConfig, logger *zap.Logger) *Manager

NewManager creates a new CDC manager

func (*Manager) AddConnector

func (m *Manager) AddConnector(name string, config CDCConfig) error

AddConnector adds a new CDC connector

func (*Manager) GetStatus

func (m *Manager) GetStatus() ManagerStatus

GetStatus returns the current status of the CDC manager

func (*Manager) RemoveConnector

func (m *Manager) RemoveConnector(name string) error

RemoveConnector removes a CDC connector

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the CDC manager and all its components

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops the CDC manager and all its components

type ManagerConfig

type ManagerConfig struct {
	// Connector configurations
	Connectors map[string]CDCConfig `json:"connectors"`

	// Stream processing configuration
	Streaming StreamingConfig `json:"streaming"`

	// Kafka configuration
	Kafka       KafkaConfig `json:"kafka"`
	EnableKafka bool        `json:"enable_kafka"`

	// Health monitoring
	HealthCheck HealthCheckConfig `json:"health_check"`

	// Global settings
	GlobalTimeout time.Duration `json:"global_timeout"`
	RetryPolicy   RetryPolicy   `json:"retry_policy"`

	// Monitoring and metrics
	MetricsEnabled bool `json:"metrics_enabled"`
	MetricsPort    int  `json:"metrics_port"`
}

ManagerConfig contains configuration for the CDC manager

type ManagerStatus

type ManagerStatus struct {
	Running         bool                        `json:"running"`
	StartTime       time.Time                   `json:"start_time"`
	Uptime          time.Duration               `json:"uptime"`
	Connectors      map[string]HealthStatus     `json:"connectors"`
	StreamProcessor *StreamMetrics              `json:"stream_processor,omitempty"`
	KafkaProducer   *KafkaMetrics               `json:"kafka_producer,omitempty"`
	ComponentHealth map[string]*ComponentHealth `json:"component_health"`
	OverallHealth   string                      `json:"overall_health"`
}

ManagerStatus represents the overall status of the CDC manager

type MemoryCheckpointStorage

type MemoryCheckpointStorage struct {
	// contains filtered or unexported fields
}

MemoryCheckpointStorage provides in-memory checkpoint storage

func NewMemoryCheckpointStorage

func NewMemoryCheckpointStorage() *MemoryCheckpointStorage

NewMemoryCheckpointStorage creates a new in-memory checkpoint storage

func (*MemoryCheckpointStorage) Delete

func (m *MemoryCheckpointStorage) Delete(checkpointID string) error

Delete deletes a checkpoint

func (*MemoryCheckpointStorage) List

func (m *MemoryCheckpointStorage) List() ([]Checkpoint, error)

List lists all checkpoints

func (*MemoryCheckpointStorage) Load

Load loads the latest checkpoint

func (*MemoryCheckpointStorage) Save

func (m *MemoryCheckpointStorage) Save(checkpoint Checkpoint) error

Save saves a checkpoint

type MemoryDeadLetterQueue

type MemoryDeadLetterQueue struct {
	// contains filtered or unexported fields
}

MemoryDeadLetterQueue provides in-memory dead letter queue implementation

func NewMemoryDeadLetterQueue

func NewMemoryDeadLetterQueue(maxSize int) *MemoryDeadLetterQueue

NewMemoryDeadLetterQueue creates a new in-memory dead letter queue

func (*MemoryDeadLetterQueue) Acknowledge

func (dlq *MemoryDeadLetterQueue) Acknowledge(taskID string) error

Acknowledge marks a task as processed

func (*MemoryDeadLetterQueue) GetStats

func (dlq *MemoryDeadLetterQueue) GetStats() DeadLetterStats

GetStats returns dead letter queue statistics

func (*MemoryDeadLetterQueue) Read

func (dlq *MemoryDeadLetterQueue) Read(limit int) ([]ProcessingTask, error)

Read retrieves pending tasks from the dead letter queue

func (*MemoryDeadLetterQueue) Send

func (dlq *MemoryDeadLetterQueue) Send(task ProcessingTask, err error) error

Send adds a failed task to the dead letter queue

type MessageDeserializer

type MessageDeserializer interface {
	Deserialize(data []byte) (ChangeEvent, error)
	ContentType() string
}

MessageDeserializer defines the interface for message deserialization

type MessageSerializer

type MessageSerializer interface {
	Serialize(event ChangeEvent) ([]byte, error)
	SerializeKey(event ChangeEvent) ([]byte, error)
	ContentType() string
}

MessageSerializer defines the interface for message serialization

type MongoChangeEvent

type MongoChangeEvent struct {
	ID                       bson.Raw               `bson:"_id"`
	OperationType            string                 `bson:"operationType"`
	ClusterTime              primitive.Timestamp    `bson:"clusterTime"`
	FullDocument             map[string]interface{} `bson:"fullDocument,omitempty"`
	FullDocumentBeforeChange map[string]interface{} `bson:"fullDocumentBeforeChange,omitempty"`
	DocumentKey              map[string]interface{} `bson:"documentKey,omitempty"`
	UpdateDescription        *UpdateDescription     `bson:"updateDescription,omitempty"`
	Namespace                Namespace              `bson:"ns"`
	TxnNumber                *int64                 `bson:"txnNumber,omitempty"`
	SessionID                *bson.Raw              `bson:"lsid,omitempty"`
}

MongoChangeEvent represents a MongoDB change event

type MongoDBConfig

type MongoDBConfig struct {
	ResumeToken              string               `json:"resume_token,omitempty"`
	StartAtOperationTime     *primitive.Timestamp `json:"start_at_operation_time,omitempty"`
	FullDocument             string               `json:"full_document"`               // updateLookup, default
	FullDocumentBeforeChange string               `json:"full_document_before_change"` // whenAvailable, required, off
	BatchSize                int32                `json:"batch_size"`
	MaxAwaitTime             time.Duration        `json:"max_await_time"`
	Collation                *options.Collation   `json:"collation,omitempty"`
	IncludeOperationTypes    []string             `json:"include_operation_types,omitempty"`
}

MongoDBConfig contains MongoDB-specific configuration

type MongoDBConnector

type MongoDBConnector struct {
	// contains filtered or unexported fields
}

MongoDBConnector implements CDC for MongoDB using change streams

func NewMongoDBConnector

func NewMongoDBConnector(logger *zap.Logger) *MongoDBConnector

NewMongoDBConnector creates a new MongoDB CDC connector

func (*MongoDBConnector) Acknowledge

func (c *MongoDBConnector) Acknowledge(position Position) error

Acknowledge confirms processing of events up to the given position

func (*MongoDBConnector) Connect

func (c *MongoDBConnector) Connect(config CDCConfig) error

Connect establishes connection to MongoDB and sets up change streams

func (*MongoDBConnector) GetPosition

func (c *MongoDBConnector) GetPosition() Position

GetPosition returns the current change stream position

func (*MongoDBConnector) Health

func (c *MongoDBConnector) Health() HealthStatus

Health returns the health status of the connector

func (*MongoDBConnector) ReadChanges

func (c *MongoDBConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)

ReadChanges returns a channel of change events

func (*MongoDBConnector) Stop

func (c *MongoDBConnector) Stop() error

Stop gracefully shuts down the connector

func (*MongoDBConnector) Subscribe

func (c *MongoDBConnector) Subscribe(collections []string) error

Subscribe starts listening to changes on specified collections

type MySQLConfig

type MySQLConfig struct {
	ServerID              uint32        `json:"server_id"`
	StartPosition         string        `json:"start_position,omitempty"`
	Flavor                string        `json:"flavor"` // mysql or mariadb
	GTIDEnabled           bool          `json:"gtid_enabled"`
	HeartbeatPeriod       time.Duration `json:"heartbeat_period"`
	ReadTimeout           time.Duration `json:"read_timeout"`
	UseDecimal            bool          `json:"use_decimal"`
	IgnoreJSONDecodeError bool          `json:"ignore_json_decode_error"`
}

MySQLConfig contains MySQL-specific configuration

type MySQLConnector

type MySQLConnector struct {
	// contains filtered or unexported fields
}

MySQLConnector implements CDC for MySQL using binary log replication

func NewMySQLConnector

func NewMySQLConnector(logger *zap.Logger) *MySQLConnector

NewMySQLConnector creates a new MySQL CDC connector

func (*MySQLConnector) Acknowledge

func (c *MySQLConnector) Acknowledge(position Position) error

Acknowledge confirms processing of events up to the given position

func (*MySQLConnector) Connect

func (c *MySQLConnector) Connect(config CDCConfig) error

Connect establishes connection to MySQL and sets up binary log replication

func (*MySQLConnector) GetPosition

func (c *MySQLConnector) GetPosition() Position

GetPosition returns the current replication position

func (*MySQLConnector) Health

func (c *MySQLConnector) Health() HealthStatus

Health returns the health status of the connector

func (*MySQLConnector) ReadChanges

func (c *MySQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)

ReadChanges returns a channel of change events

func (*MySQLConnector) Stop

func (c *MySQLConnector) Stop() error

Stop gracefully shuts down the connector

func (*MySQLConnector) Subscribe

func (c *MySQLConnector) Subscribe(tables []string) error

Subscribe starts listening to changes on specified tables

type Namespace

type Namespace struct {
	Database   string `bson:"db"`
	Collection string `bson:"coll"`
}

Namespace represents a MongoDB namespace (database.collection)

type OperationType

type OperationType string

OperationType represents the type of database operation

const (
	OperationInsert OperationType = "INSERT"
	OperationUpdate OperationType = "UPDATE"
	OperationDelete OperationType = "DELETE"
	OperationDDL    OperationType = "DDL"    // Schema changes
	OperationCommit OperationType = "COMMIT" // Transaction commit
)

type Position

type Position struct {
	Type     string                 `json:"type"`
	Value    interface{}            `json:"value"`
	Metadata map[string]interface{} `json:"metadata,omitempty"`
}

Position represents a replication position in the change stream

func (Position) Compare

func (p Position) Compare(other Position) int

Compare compares two positions (returns -1, 0, 1 for less, equal, greater)

func (Position) IsValid

func (p Position) IsValid() bool

IsValid checks if the position is valid

func (Position) String

func (p Position) String() string

String returns a string representation of the Position

type PostgreSQLConfig

type PostgreSQLConfig struct {
	SlotName       string `json:"slot_name"`
	Publication    string `json:"publication"`
	StartLSN       string `json:"start_lsn,omitempty"`
	TempSlot       bool   `json:"temp_slot"`
	PluginName     string `json:"plugin_name"`
	StatusInterval int    `json:"status_interval"`
}

PostgreSQLConfig contains PostgreSQL-specific configuration

type PostgreSQLConnector

type PostgreSQLConnector struct {
	// contains filtered or unexported fields
}

PostgreSQLConnector implements CDC for PostgreSQL using logical replication

func NewPostgreSQLConnector

func NewPostgreSQLConnector(logger *zap.Logger) *PostgreSQLConnector

NewPostgreSQLConnector creates a new PostgreSQL CDC connector

func (*PostgreSQLConnector) Acknowledge

func (c *PostgreSQLConnector) Acknowledge(position Position) error

Acknowledge confirms processing of events up to the given position

func (*PostgreSQLConnector) Connect

func (c *PostgreSQLConnector) Connect(config CDCConfig) error

Connect establishes connection to PostgreSQL and sets up logical replication

func (*PostgreSQLConnector) GetPosition

func (c *PostgreSQLConnector) GetPosition() Position

GetPosition returns the current replication position

func (*PostgreSQLConnector) Health

func (c *PostgreSQLConnector) Health() HealthStatus

Health returns the health status of the connector

func (*PostgreSQLConnector) ReadChanges

func (c *PostgreSQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)

ReadChanges returns a channel of change events

func (*PostgreSQLConnector) Stop

func (c *PostgreSQLConnector) Stop() error

Stop gracefully shuts down the connector

func (*PostgreSQLConnector) Subscribe

func (c *PostgreSQLConnector) Subscribe(tables []string) error

Subscribe starts listening to changes on specified tables

type ProcessingTask

type ProcessingTask struct {
	Events     []ChangeEvent
	Handler    string
	Partition  int
	Timestamp  time.Time
	RetryCount int
	MaxRetries int
}

ProcessingTask represents a task to process events

type RetryPolicy

type RetryPolicy struct {
	MaxRetries     int           `json:"max_retries"`
	InitialBackoff time.Duration `json:"initial_backoff"`
	MaxBackoff     time.Duration `json:"max_backoff"`
	Multiplier     float64       `json:"multiplier"`
}

RetryPolicy defines retry behavior

type SchemaChange

type SchemaChange struct {
	Type      string                 `json:"type"`   // CREATE, ALTER, DROP
	Object    string                 `json:"object"` // TABLE, INDEX, etc.
	Name      string                 `json:"name"`
	Statement string                 `json:"statement"`
	Before    map[string]interface{} `json:"before,omitempty"`
	After     map[string]interface{} `json:"after,omitempty"`
}

SchemaChange represents a DDL change event

type SourceInfo

type SourceInfo struct {
	Name          string        `json:"name"`
	Database      string        `json:"database"`
	Table         string        `json:"table"`
	ConnectorType ConnectorType `json:"connector_type"`
	Version       string        `json:"version,omitempty"`
	Timestamp     time.Time     `json:"timestamp"`
}

SourceInfo contains information about the data source

type StreamMetrics

type StreamMetrics struct {
	EventsReceived    int64         `json:"events_received"`
	EventsProcessed   int64         `json:"events_processed"`
	EventsFiltered    int64         `json:"events_filtered"`
	EventsErrored     int64         `json:"events_errored"`
	EventsRetried     int64         `json:"events_retried"`
	BatchesProcessed  int64         `json:"batches_processed"`
	ProcessingLatency time.Duration `json:"processing_latency"`
	ThroughputRPS     float64       `json:"throughput_rps"`
	BacklogSize       int64         `json:"backlog_size"`
	LastProcessedTime time.Time     `json:"last_processed_time"`

	// Per-handler metrics
	HandlerMetrics map[string]HandlerMetrics `json:"handler_metrics"`
}

StreamMetrics contains metrics for stream processing

type StreamProcessor

type StreamProcessor struct {
	// contains filtered or unexported fields
}

StreamProcessor handles real-time processing of CDC events

func NewStreamProcessor

func NewStreamProcessor(config StreamingConfig, logger *zap.Logger) *StreamProcessor

NewStreamProcessor creates a new stream processor

func (*StreamProcessor) AddFilter

func (sp *StreamProcessor) AddFilter(filter EventFilter)

AddFilter adds an event filter

func (*StreamProcessor) GetDeadLetterQueueStats

func (sp *StreamProcessor) GetDeadLetterQueueStats() DeadLetterStats

GetDeadLetterQueueStats returns dead letter queue statistics

func (*StreamProcessor) GetMetrics

func (sp *StreamProcessor) GetMetrics() StreamMetrics

GetMetrics returns current stream processing metrics

func (*StreamProcessor) ProcessEvent

func (sp *StreamProcessor) ProcessEvent(ctx context.Context, event ChangeEvent) error

ProcessEvent processes a single event

func (*StreamProcessor) ProcessEvents

func (sp *StreamProcessor) ProcessEvents(ctx context.Context, events []ChangeEvent) error

ProcessEvents processes multiple events

func (*StreamProcessor) RegisterBatchHandler

func (sp *StreamProcessor) RegisterBatchHandler(pattern string, handler BatchEventHandler)

RegisterBatchHandler registers a batch event handler

func (*StreamProcessor) RegisterHandler

func (sp *StreamProcessor) RegisterHandler(pattern string, handler EventHandler)

RegisterHandler registers an event handler for a specific event type or table

func (*StreamProcessor) Start

func (sp *StreamProcessor) Start(ctx context.Context) error

Start starts the stream processor

func (*StreamProcessor) Stop

func (sp *StreamProcessor) Stop() error

Stop stops the stream processor gracefully

type StreamWorker

type StreamWorker struct {
	// contains filtered or unexported fields
}

StreamWorker processes events in parallel

type StreamingConfig

type StreamingConfig struct {
	MaxBatchSize    int           `json:"max_batch_size"`
	BatchTimeout    time.Duration `json:"batch_timeout"`
	MaxRetries      int           `json:"max_retries"`
	RetryBackoff    time.Duration `json:"retry_backoff"`
	DeadLetterQueue string        `json:"dead_letter_queue,omitempty"`
	ParallelWorkers int           `json:"parallel_workers"`
	OrderingKey     string        `json:"ordering_key,omitempty"`
	CompressionType string        `json:"compression_type,omitempty"`
	ExactlyOnce     bool          `json:"exactly_once"`
}

StreamingConfig contains configuration for event streaming

func (*StreamingConfig) Validate

func (c *StreamingConfig) Validate() error

Validate validates the streaming configuration

type TableSchema

type TableSchema struct {
	Name        string
	Columns     []ColumnInfo
	PrimaryKey  []string
	LastUpdated time.Time
}

TableSchema represents PostgreSQL table schema information

type TransactionInfo

type TransactionInfo struct {
	ID         string    `json:"id"`
	StartTime  time.Time `json:"start_time"`
	EndTime    time.Time `json:"end_time"`
	EventCount int       `json:"event_count"`
	Size       int64     `json:"size"`
}

TransactionInfo contains information about database transactions

type TruncatedArray

type TruncatedArray struct {
	Field   string `bson:"field"`
	NewSize int32  `bson:"newSize"`
}

TruncatedArray represents a truncated array field

type UpdateDescription

type UpdateDescription struct {
	UpdatedFields   map[string]interface{} `bson:"updatedFields,omitempty"`
	RemovedFields   []string               `bson:"removedFields,omitempty"`
	TruncatedArrays []TruncatedArray       `bson:"truncatedArrays,omitempty"`
}

UpdateDescription contains information about updated fields

type WorkerMetrics

type WorkerMetrics struct {
	TasksProcessed int64         `json:"tasks_processed"`
	TasksErrored   int64         `json:"tasks_errored"`
	AverageLatency time.Duration `json:"average_latency"`
	LastTaskTime   time.Time     `json:"last_task_time"`
}

WorkerMetrics contains metrics for a worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL