Documentation
¶
Overview ¶
Package cdc provides Change Data Capture functionality for real-time data replication
Index ¶
- type BatchEventHandler
- type CDCConfig
- type CDCConnector
- type ChangeEvent
- type Checkpoint
- type CheckpointStorage
- type Checkpointer
- type ColumnInfo
- type ComponentHealth
- type ConnectorType
- type DeadLetterQueue
- type DeadLetterStats
- type EventFilter
- type EventHandler
- type EventMetrics
- type EventProcessor
- type FilterCondition
- type HandlerMetrics
- type HealthCheckConfig
- type HealthMonitor
- type HealthStatus
- type JSONMessageDeserializer
- type JSONMessageSerializer
- type KafkaConfig
- type KafkaConsumer
- func (kc *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error
- func (kc *KafkaConsumer) Close() error
- func (kc *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (kc *KafkaConsumer) GetMetrics() KafkaMetrics
- func (kc *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error
- func (kc *KafkaConsumer) Subscribe(topics []string, handler EventHandler) error
- type KafkaMessage
- type KafkaMetrics
- type KafkaProducer
- func (kp *KafkaProducer) Close() error
- func (kp *KafkaProducer) Connect() error
- func (kp *KafkaProducer) GetMetrics() KafkaMetrics
- func (kp *KafkaProducer) ProduceEvent(ctx context.Context, event ChangeEvent) error
- func (kp *KafkaProducer) ProduceEvents(ctx context.Context, events []ChangeEvent) error
- type Manager
- type ManagerConfig
- type ManagerStatus
- type MemoryCheckpointStorage
- type MemoryDeadLetterQueue
- type MessageDeserializer
- type MessageSerializer
- type MongoChangeEvent
- type MongoDBConfig
- type MongoDBConnector
- func (c *MongoDBConnector) Acknowledge(position Position) error
- func (c *MongoDBConnector) Connect(config CDCConfig) error
- func (c *MongoDBConnector) GetPosition() Position
- func (c *MongoDBConnector) Health() HealthStatus
- func (c *MongoDBConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
- func (c *MongoDBConnector) Stop() error
- func (c *MongoDBConnector) Subscribe(collections []string) error
- type MySQLConfig
- type MySQLConnector
- func (c *MySQLConnector) Acknowledge(position Position) error
- func (c *MySQLConnector) Connect(config CDCConfig) error
- func (c *MySQLConnector) GetPosition() Position
- func (c *MySQLConnector) Health() HealthStatus
- func (c *MySQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
- func (c *MySQLConnector) Stop() error
- func (c *MySQLConnector) Subscribe(tables []string) error
- type Namespace
- type OperationType
- type Position
- type PostgreSQLConfig
- type PostgreSQLConnector
- func (c *PostgreSQLConnector) Acknowledge(position Position) error
- func (c *PostgreSQLConnector) Connect(config CDCConfig) error
- func (c *PostgreSQLConnector) GetPosition() Position
- func (c *PostgreSQLConnector) Health() HealthStatus
- func (c *PostgreSQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
- func (c *PostgreSQLConnector) Stop() error
- func (c *PostgreSQLConnector) Subscribe(tables []string) error
- type ProcessingTask
- type RetryPolicy
- type SchemaChange
- type SourceInfo
- type StreamMetrics
- type StreamProcessor
- func (sp *StreamProcessor) AddFilter(filter EventFilter)
- func (sp *StreamProcessor) GetDeadLetterQueueStats() DeadLetterStats
- func (sp *StreamProcessor) GetMetrics() StreamMetrics
- func (sp *StreamProcessor) ProcessEvent(ctx context.Context, event ChangeEvent) error
- func (sp *StreamProcessor) ProcessEvents(ctx context.Context, events []ChangeEvent) error
- func (sp *StreamProcessor) RegisterBatchHandler(pattern string, handler BatchEventHandler)
- func (sp *StreamProcessor) RegisterHandler(pattern string, handler EventHandler)
- func (sp *StreamProcessor) Start(ctx context.Context) error
- func (sp *StreamProcessor) Stop() error
- type StreamWorker
- type StreamingConfig
- type TableSchema
- type TransactionInfo
- type TruncatedArray
- type UpdateDescription
- type WorkerMetrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchEventHandler ¶
type BatchEventHandler func(ctx context.Context, events []ChangeEvent) error
BatchEventHandler is a function type for handling batches of change events
type CDCConfig ¶
type CDCConfig struct { Type ConnectorType `json:"type" yaml:"type"` ConnectionStr string `json:"connection_string" yaml:"connection_string"` Database string `json:"database" yaml:"database"` Tables []string `json:"tables" yaml:"tables"` StartPosition *Position `json:"start_position,omitempty" yaml:"start_position,omitempty"` BatchSize int `json:"batch_size" yaml:"batch_size"` PollInterval time.Duration `json:"poll_interval" yaml:"poll_interval"` BufferSize int `json:"buffer_size" yaml:"buffer_size"` Options map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"` }
CDCConfig contains configuration for CDC connectors
type CDCConnector ¶
type CDCConnector interface { // Connect establishes connection to the data source Connect(config CDCConfig) error // Subscribe starts listening to changes on specified tables/collections Subscribe(tables []string) error // ReadChanges returns a channel of change events ReadChanges(ctx context.Context) (<-chan ChangeEvent, error) // GetPosition returns the current replication position GetPosition() Position // Acknowledge confirms processing of events up to the given position Acknowledge(position Position) error // Stop gracefully shuts down the connector Stop() error // Health returns the health status of the connector Health() HealthStatus }
CDCConnector defines the interface for Change Data Capture connectors
type ChangeEvent ¶
type ChangeEvent struct { ID string `json:"id"` Operation OperationType `json:"operation"` Database string `json:"database"` Table string `json:"table"` Schema string `json:"schema,omitempty"` Before map[string]interface{} `json:"before,omitempty"` After map[string]interface{} `json:"after,omitempty"` Timestamp time.Time `json:"timestamp"` Position Position `json:"position"` TransactionID string `json:"transaction_id,omitempty"` // Additional metadata Metadata map[string]interface{} `json:"metadata,omitempty"` Source SourceInfo `json:"source"` }
ChangeEvent represents a single change event from the database
func (*ChangeEvent) ConvertToRecord ¶
func (ce *ChangeEvent) ConvertToRecord() (*models.Record, error)
ConvertToRecord converts a ChangeEvent to the unified Nebula Record
type Checkpoint ¶
type Checkpoint struct { ID string `json:"id"` Position Position `json:"position"` Timestamp time.Time `json:"timestamp"` EventCount int64 `json:"event_count"` ProcessedAt time.Time `json:"processed_at"` Metadata map[string]interface{} `json:"metadata,omitempty"` }
Checkpoint represents a savepoint in the event stream
type CheckpointStorage ¶
type CheckpointStorage interface { Save(checkpoint Checkpoint) error Load() (Checkpoint, error) Delete(checkpointID string) error List() ([]Checkpoint, error) }
CheckpointStorage defines the interface for checkpoint storage
type Checkpointer ¶
type Checkpointer struct {
// contains filtered or unexported fields
}
Checkpointer manages checkpoints for stream processing
func NewCheckpointer ¶
func NewCheckpointer(storage CheckpointStorage, interval time.Duration, logger *zap.Logger) *Checkpointer
NewCheckpointer creates a new checkpointer
func (*Checkpointer) Checkpoint ¶
func (c *Checkpointer) Checkpoint() error
Checkpoint saves the current processing state
func (*Checkpointer) GetLastCheckpoint ¶
func (c *Checkpointer) GetLastCheckpoint() Checkpoint
GetLastCheckpoint returns the last saved checkpoint
type ColumnInfo ¶
ColumnInfo represents PostgreSQL column information
type ComponentHealth ¶
type ComponentHealth struct { Name string `json:"name"` Status string `json:"status"` LastCheck time.Time `json:"last_check"` FailureCount int `json:"failure_count"` SuccessCount int `json:"success_count"` LastError string `json:"last_error,omitempty"` ResponseTime time.Duration `json:"response_time"` }
ComponentHealth tracks the health of a single component
type ConnectorType ¶
type ConnectorType string
ConnectorType represents the type of CDC connector
const ( ConnectorPostgreSQL ConnectorType = "postgresql" ConnectorMySQL ConnectorType = "mysql" ConnectorMongoDB ConnectorType = "mongodb" ConnectorKafka ConnectorType = "kafka" )
type DeadLetterQueue ¶
type DeadLetterQueue interface { Send(task ProcessingTask, err error) error Read(limit int) ([]ProcessingTask, error) Acknowledge(taskID string) error GetStats() DeadLetterStats }
DeadLetterQueue defines the interface for handling failed events
type DeadLetterStats ¶
type DeadLetterStats struct { TotalEvents int64 `json:"total_events"` PendingEvents int64 `json:"pending_events"` ProcessedEvents int64 `json:"processed_events"` OldestEvent time.Time `json:"oldest_event"` LastAdded time.Time `json:"last_added"` }
DeadLetterStats contains statistics for the dead letter queue
type EventFilter ¶
type EventFilter struct { IncludeTables []string `json:"include_tables,omitempty"` ExcludeTables []string `json:"exclude_tables,omitempty"` Operations []OperationType `json:"operations,omitempty"` Conditions []FilterCondition `json:"conditions,omitempty"` }
EventFilter defines filtering criteria for change events
func (*EventFilter) AddCondition ¶
func (f *EventFilter) AddCondition(field, operator string, value interface{})
AddCondition adds a filter condition
func (*EventFilter) ShouldInclude ¶
func (f *EventFilter) ShouldInclude(event ChangeEvent) bool
ShouldInclude checks if an event should be included based on the filter
type EventHandler ¶
type EventHandler func(ctx context.Context, event ChangeEvent) error
EventHandler is a function type for handling change events
type EventMetrics ¶
type EventMetrics struct { EventsReceived int64 `json:"events_received"` EventsProcessed int64 `json:"events_processed"` EventsFiltered int64 `json:"events_filtered"` EventsErrored int64 `json:"events_errored"` ProcessingLatency time.Duration `json:"processing_latency"` ThroughputRPS float64 `json:"throughput_rps"` LastEventTime time.Time `json:"last_event_time"` BacklogSize int64 `json:"backlog_size"` }
EventMetrics contains metrics for CDC operations
type EventProcessor ¶
type EventProcessor interface { Process(ctx context.Context, event ChangeEvent) error ProcessBatch(ctx context.Context, events []ChangeEvent) error Stop() error }
EventProcessor defines the interface for processing change events
type FilterCondition ¶
type FilterCondition struct { Field string `json:"field"` Operator string `json:"operator"` // eq, ne, lt, gt, in, like, etc. Value interface{} `json:"value"` }
FilterCondition represents a single filtering condition
type HandlerMetrics ¶
type HandlerMetrics struct { EventsProcessed int64 `json:"events_processed"` EventsErrored int64 `json:"events_errored"` AverageLatency time.Duration `json:"average_latency"` LastExecution time.Time `json:"last_execution"` }
HandlerMetrics contains metrics for a specific handler
type HealthCheckConfig ¶
type HealthCheckConfig struct { Enabled bool `json:"enabled"` Interval time.Duration `json:"interval"` Timeout time.Duration `json:"timeout"` FailureThreshold int `json:"failure_threshold"` SuccessThreshold int `json:"success_threshold"` }
HealthCheckConfig contains health monitoring configuration
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor monitors the health of CDC components
func NewHealthMonitor ¶
func NewHealthMonitor(config HealthCheckConfig, manager *Manager, logger *zap.Logger) *HealthMonitor
NewHealthMonitor creates a new health monitor
func (*HealthMonitor) GetComponentHealth ¶
func (hm *HealthMonitor) GetComponentHealth() map[string]*ComponentHealth
GetComponentHealth returns the health of all components
type HealthStatus ¶
type HealthStatus struct { Status string `json:"status"` Message string `json:"message,omitempty"` LastEvent time.Time `json:"last_event"` EventCount int64 `json:"event_count"` ErrorCount int64 `json:"error_count"` Lag time.Duration `json:"lag,omitempty"` Details map[string]interface{} `json:"details,omitempty"` }
HealthStatus represents the health of a CDC connector
func (HealthStatus) IsHealthy ¶
func (h HealthStatus) IsHealthy() bool
IsHealthy returns true if the health status indicates the connector is healthy
type JSONMessageDeserializer ¶
type JSONMessageDeserializer struct{}
JSONMessageDeserializer provides JSON deserialization
func (*JSONMessageDeserializer) ContentType ¶
func (d *JSONMessageDeserializer) ContentType() string
ContentType returns the content type
func (*JSONMessageDeserializer) Deserialize ¶
func (d *JSONMessageDeserializer) Deserialize(data []byte) (ChangeEvent, error)
Deserialize deserializes JSON data to a ChangeEvent
type JSONMessageSerializer ¶
type JSONMessageSerializer struct{}
JSONMessageSerializer provides JSON serialization
func (*JSONMessageSerializer) ContentType ¶
func (s *JSONMessageSerializer) ContentType() string
ContentType returns the content type
func (*JSONMessageSerializer) Serialize ¶
func (s *JSONMessageSerializer) Serialize(event ChangeEvent) ([]byte, error)
Serialize serializes a ChangeEvent to JSON
func (*JSONMessageSerializer) SerializeKey ¶
func (s *JSONMessageSerializer) SerializeKey(event ChangeEvent) ([]byte, error)
SerializeKey serializes the event key
type KafkaConfig ¶
type KafkaConfig struct { Brokers []string `json:"brokers"` SecurityProtocol string `json:"security_protocol"` SASLMechanism string `json:"sasl_mechanism"` SASLUsername string `json:"sasl_username"` SASLPassword string `json:"sasl_password"` EnableTLS bool `json:"enable_tls"` TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"` // Producer settings ProducerAcks string `json:"producer_acks"` // all, 1, 0 ProducerRetries int `json:"producer_retries"` ProducerBatchSize int `json:"producer_batch_size"` ProducerLingerMS int `json:"producer_linger_ms"` ProducerCompression string `json:"producer_compression"` // none, gzip, snappy, lz4 EnableIdempotence bool `json:"enable_idempotence"` TransactionalID string `json:"transactional_id"` // Consumer settings ConsumerGroupID string `json:"consumer_group_id"` AutoOffsetReset string `json:"auto_offset_reset"` // earliest, latest EnableAutoCommit bool `json:"enable_auto_commit"` SessionTimeoutMS int `json:"session_timeout_ms"` HeartbeatIntervalMS int `json:"heartbeat_interval_ms"` MaxPollRecords int `json:"max_poll_records"` // Topic settings TopicPrefix string `json:"topic_prefix"` TopicSuffix string `json:"topic_suffix"` TopicMapping map[string]string `json:"topic_mapping"` DefaultTopic string `json:"default_topic"` // Message settings MessageFormat string `json:"message_format"` // json, avro, protobuf IncludeSchema bool `json:"include_schema"` CompressionType string `json:"compression_type"` // Exactly-once settings ExactlyOnce bool `json:"exactly_once"` TransactionTimeoutMS int `json:"transaction_timeout_ms"` }
KafkaConfig contains Kafka-specific configuration
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
KafkaConsumer provides Kafka CDC event consumption
func NewKafkaConsumer ¶
func NewKafkaConsumer(config KafkaConfig, logger *zap.Logger) *KafkaConsumer
NewKafkaConsumer creates a new Kafka consumer
func (*KafkaConsumer) Cleanup ¶
func (kc *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup implements sarama.ConsumerGroupHandler
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close() error
Close closes the Kafka consumer
func (*KafkaConsumer) ConsumeClaim ¶
func (kc *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim implements sarama.ConsumerGroupHandler
func (*KafkaConsumer) GetMetrics ¶
func (kc *KafkaConsumer) GetMetrics() KafkaMetrics
GetMetrics returns Kafka consumer metrics
func (*KafkaConsumer) Setup ¶
func (kc *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error
Setup implements sarama.ConsumerGroupHandler
func (*KafkaConsumer) Subscribe ¶
func (kc *KafkaConsumer) Subscribe(topics []string, handler EventHandler) error
Subscribe subscribes to Kafka topics and starts consuming
type KafkaMessage ¶
type KafkaMessage struct { Key string `json:"key"` Value ChangeEvent `json:"value"` Headers map[string]string `json:"headers"` Timestamp time.Time `json:"timestamp"` }
KafkaMessage represents a message sent to/from Kafka
type KafkaMetrics ¶
type KafkaMetrics struct { MessagesProduced int64 `json:"messages_produced"` MessagesConsumed int64 `json:"messages_consumed"` MessagesFailed int64 `json:"messages_failed"` MessagesRetried int64 `json:"messages_retried"` BytesProduced int64 `json:"bytes_produced"` BytesConsumed int64 `json:"bytes_consumed"` ProducerLatency time.Duration `json:"producer_latency"` ConsumerLatency time.Duration `json:"consumer_latency"` TransactionsCommitted int64 `json:"transactions_committed"` TransactionsAborted int64 `json:"transactions_aborted"` LastProducedTime time.Time `json:"last_produced_time"` LastConsumedTime time.Time `json:"last_consumed_time"` }
KafkaMetrics contains Kafka-specific metrics
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
KafkaProducer provides Kafka integration for CDC events with exactly-once semantics
func NewKafkaProducer ¶
func NewKafkaProducer(config KafkaConfig, logger *zap.Logger) *KafkaProducer
NewKafkaProducer creates a new Kafka producer
func (*KafkaProducer) Close ¶
func (kp *KafkaProducer) Close() error
Close closes the Kafka producer
func (*KafkaProducer) Connect ¶
func (kp *KafkaProducer) Connect() error
Connect establishes connection to Kafka
func (*KafkaProducer) GetMetrics ¶
func (kp *KafkaProducer) GetMetrics() KafkaMetrics
GetMetrics returns Kafka producer metrics
func (*KafkaProducer) ProduceEvent ¶
func (kp *KafkaProducer) ProduceEvent(ctx context.Context, event ChangeEvent) error
ProduceEvent produces a single CDC event to Kafka
func (*KafkaProducer) ProduceEvents ¶
func (kp *KafkaProducer) ProduceEvents(ctx context.Context, events []ChangeEvent) error
ProduceEvents produces multiple CDC events to Kafka with exactly-once semantics
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates multiple CDC connectors and event processing
func NewManager ¶
func NewManager(config ManagerConfig, logger *zap.Logger) *Manager
NewManager creates a new CDC manager
func (*Manager) AddConnector ¶
AddConnector adds a new CDC connector
func (*Manager) GetStatus ¶
func (m *Manager) GetStatus() ManagerStatus
GetStatus returns the current status of the CDC manager
func (*Manager) RemoveConnector ¶
RemoveConnector removes a CDC connector
type ManagerConfig ¶
type ManagerConfig struct { // Connector configurations Connectors map[string]CDCConfig `json:"connectors"` // Stream processing configuration Streaming StreamingConfig `json:"streaming"` // Kafka configuration Kafka KafkaConfig `json:"kafka"` EnableKafka bool `json:"enable_kafka"` // Health monitoring HealthCheck HealthCheckConfig `json:"health_check"` // Global settings GlobalTimeout time.Duration `json:"global_timeout"` RetryPolicy RetryPolicy `json:"retry_policy"` // Monitoring and metrics MetricsEnabled bool `json:"metrics_enabled"` MetricsPort int `json:"metrics_port"` }
ManagerConfig contains configuration for the CDC manager
type ManagerStatus ¶
type ManagerStatus struct { Running bool `json:"running"` StartTime time.Time `json:"start_time"` Uptime time.Duration `json:"uptime"` Connectors map[string]HealthStatus `json:"connectors"` StreamProcessor *StreamMetrics `json:"stream_processor,omitempty"` KafkaProducer *KafkaMetrics `json:"kafka_producer,omitempty"` ComponentHealth map[string]*ComponentHealth `json:"component_health"` OverallHealth string `json:"overall_health"` }
ManagerStatus represents the overall status of the CDC manager
type MemoryCheckpointStorage ¶
type MemoryCheckpointStorage struct {
// contains filtered or unexported fields
}
MemoryCheckpointStorage provides in-memory checkpoint storage
func NewMemoryCheckpointStorage ¶
func NewMemoryCheckpointStorage() *MemoryCheckpointStorage
NewMemoryCheckpointStorage creates a new in-memory checkpoint storage
func (*MemoryCheckpointStorage) Delete ¶
func (m *MemoryCheckpointStorage) Delete(checkpointID string) error
Delete deletes a checkpoint
func (*MemoryCheckpointStorage) List ¶
func (m *MemoryCheckpointStorage) List() ([]Checkpoint, error)
List lists all checkpoints
func (*MemoryCheckpointStorage) Load ¶
func (m *MemoryCheckpointStorage) Load() (Checkpoint, error)
Load loads the latest checkpoint
func (*MemoryCheckpointStorage) Save ¶
func (m *MemoryCheckpointStorage) Save(checkpoint Checkpoint) error
Save saves a checkpoint
type MemoryDeadLetterQueue ¶
type MemoryDeadLetterQueue struct {
// contains filtered or unexported fields
}
MemoryDeadLetterQueue provides in-memory dead letter queue implementation
func NewMemoryDeadLetterQueue ¶
func NewMemoryDeadLetterQueue(maxSize int) *MemoryDeadLetterQueue
NewMemoryDeadLetterQueue creates a new in-memory dead letter queue
func (*MemoryDeadLetterQueue) Acknowledge ¶
func (dlq *MemoryDeadLetterQueue) Acknowledge(taskID string) error
Acknowledge marks a task as processed
func (*MemoryDeadLetterQueue) GetStats ¶
func (dlq *MemoryDeadLetterQueue) GetStats() DeadLetterStats
GetStats returns dead letter queue statistics
func (*MemoryDeadLetterQueue) Read ¶
func (dlq *MemoryDeadLetterQueue) Read(limit int) ([]ProcessingTask, error)
Read retrieves pending tasks from the dead letter queue
func (*MemoryDeadLetterQueue) Send ¶
func (dlq *MemoryDeadLetterQueue) Send(task ProcessingTask, err error) error
Send adds a failed task to the dead letter queue
type MessageDeserializer ¶
type MessageDeserializer interface { Deserialize(data []byte) (ChangeEvent, error) ContentType() string }
MessageDeserializer defines the interface for message deserialization
type MessageSerializer ¶
type MessageSerializer interface { Serialize(event ChangeEvent) ([]byte, error) SerializeKey(event ChangeEvent) ([]byte, error) ContentType() string }
MessageSerializer defines the interface for message serialization
type MongoChangeEvent ¶
type MongoChangeEvent struct { ID bson.Raw `bson:"_id"` OperationType string `bson:"operationType"` ClusterTime primitive.Timestamp `bson:"clusterTime"` FullDocument map[string]interface{} `bson:"fullDocument,omitempty"` FullDocumentBeforeChange map[string]interface{} `bson:"fullDocumentBeforeChange,omitempty"` DocumentKey map[string]interface{} `bson:"documentKey,omitempty"` UpdateDescription *UpdateDescription `bson:"updateDescription,omitempty"` Namespace Namespace `bson:"ns"` TxnNumber *int64 `bson:"txnNumber,omitempty"` SessionID *bson.Raw `bson:"lsid,omitempty"` }
MongoChangeEvent represents a MongoDB change event
type MongoDBConfig ¶
type MongoDBConfig struct { ResumeToken string `json:"resume_token,omitempty"` StartAtOperationTime *primitive.Timestamp `json:"start_at_operation_time,omitempty"` FullDocument string `json:"full_document"` // updateLookup, default FullDocumentBeforeChange string `json:"full_document_before_change"` // whenAvailable, required, off BatchSize int32 `json:"batch_size"` MaxAwaitTime time.Duration `json:"max_await_time"` Collation *options.Collation `json:"collation,omitempty"` IncludeOperationTypes []string `json:"include_operation_types,omitempty"` }
MongoDBConfig contains MongoDB-specific configuration
type MongoDBConnector ¶
type MongoDBConnector struct {
// contains filtered or unexported fields
}
MongoDBConnector implements CDC for MongoDB using change streams
func NewMongoDBConnector ¶
func NewMongoDBConnector(logger *zap.Logger) *MongoDBConnector
NewMongoDBConnector creates a new MongoDB CDC connector
func (*MongoDBConnector) Acknowledge ¶
func (c *MongoDBConnector) Acknowledge(position Position) error
Acknowledge confirms processing of events up to the given position
func (*MongoDBConnector) Connect ¶
func (c *MongoDBConnector) Connect(config CDCConfig) error
Connect establishes connection to MongoDB and sets up change streams
func (*MongoDBConnector) GetPosition ¶
func (c *MongoDBConnector) GetPosition() Position
GetPosition returns the current change stream position
func (*MongoDBConnector) Health ¶
func (c *MongoDBConnector) Health() HealthStatus
Health returns the health status of the connector
func (*MongoDBConnector) ReadChanges ¶
func (c *MongoDBConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
ReadChanges returns a channel of change events
func (*MongoDBConnector) Stop ¶
func (c *MongoDBConnector) Stop() error
Stop gracefully shuts down the connector
func (*MongoDBConnector) Subscribe ¶
func (c *MongoDBConnector) Subscribe(collections []string) error
Subscribe starts listening to changes on specified collections
type MySQLConfig ¶
type MySQLConfig struct { ServerID uint32 `json:"server_id"` StartPosition string `json:"start_position,omitempty"` Flavor string `json:"flavor"` // mysql or mariadb GTIDEnabled bool `json:"gtid_enabled"` HeartbeatPeriod time.Duration `json:"heartbeat_period"` ReadTimeout time.Duration `json:"read_timeout"` UseDecimal bool `json:"use_decimal"` IgnoreJSONDecodeError bool `json:"ignore_json_decode_error"` }
MySQLConfig contains MySQL-specific configuration
type MySQLConnector ¶
type MySQLConnector struct {
// contains filtered or unexported fields
}
MySQLConnector implements CDC for MySQL using binary log replication
func NewMySQLConnector ¶
func NewMySQLConnector(logger *zap.Logger) *MySQLConnector
NewMySQLConnector creates a new MySQL CDC connector
func (*MySQLConnector) Acknowledge ¶
func (c *MySQLConnector) Acknowledge(position Position) error
Acknowledge confirms processing of events up to the given position
func (*MySQLConnector) Connect ¶
func (c *MySQLConnector) Connect(config CDCConfig) error
Connect establishes connection to MySQL and sets up binary log replication
func (*MySQLConnector) GetPosition ¶
func (c *MySQLConnector) GetPosition() Position
GetPosition returns the current replication position
func (*MySQLConnector) Health ¶
func (c *MySQLConnector) Health() HealthStatus
Health returns the health status of the connector
func (*MySQLConnector) ReadChanges ¶
func (c *MySQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
ReadChanges returns a channel of change events
func (*MySQLConnector) Stop ¶
func (c *MySQLConnector) Stop() error
Stop gracefully shuts down the connector
func (*MySQLConnector) Subscribe ¶
func (c *MySQLConnector) Subscribe(tables []string) error
Subscribe starts listening to changes on specified tables
type OperationType ¶
type OperationType string
OperationType represents the type of database operation
const ( OperationInsert OperationType = "INSERT" OperationUpdate OperationType = "UPDATE" OperationDelete OperationType = "DELETE" OperationDDL OperationType = "DDL" // Schema changes OperationCommit OperationType = "COMMIT" // Transaction commit )
type Position ¶
type Position struct { Type string `json:"type"` Value interface{} `json:"value"` Metadata map[string]interface{} `json:"metadata,omitempty"` }
Position represents a replication position in the change stream
type PostgreSQLConfig ¶
type PostgreSQLConfig struct { SlotName string `json:"slot_name"` Publication string `json:"publication"` StartLSN string `json:"start_lsn,omitempty"` TempSlot bool `json:"temp_slot"` PluginName string `json:"plugin_name"` StatusInterval int `json:"status_interval"` }
PostgreSQLConfig contains PostgreSQL-specific configuration
type PostgreSQLConnector ¶
type PostgreSQLConnector struct {
// contains filtered or unexported fields
}
PostgreSQLConnector implements CDC for PostgreSQL using logical replication
func NewPostgreSQLConnector ¶
func NewPostgreSQLConnector(logger *zap.Logger) *PostgreSQLConnector
NewPostgreSQLConnector creates a new PostgreSQL CDC connector
func (*PostgreSQLConnector) Acknowledge ¶
func (c *PostgreSQLConnector) Acknowledge(position Position) error
Acknowledge confirms processing of events up to the given position
func (*PostgreSQLConnector) Connect ¶
func (c *PostgreSQLConnector) Connect(config CDCConfig) error
Connect establishes connection to PostgreSQL and sets up logical replication
func (*PostgreSQLConnector) GetPosition ¶
func (c *PostgreSQLConnector) GetPosition() Position
GetPosition returns the current replication position
func (*PostgreSQLConnector) Health ¶
func (c *PostgreSQLConnector) Health() HealthStatus
Health returns the health status of the connector
func (*PostgreSQLConnector) ReadChanges ¶
func (c *PostgreSQLConnector) ReadChanges(ctx context.Context) (<-chan ChangeEvent, error)
ReadChanges returns a channel of change events
func (*PostgreSQLConnector) Stop ¶
func (c *PostgreSQLConnector) Stop() error
Stop gracefully shuts down the connector
func (*PostgreSQLConnector) Subscribe ¶
func (c *PostgreSQLConnector) Subscribe(tables []string) error
Subscribe starts listening to changes on specified tables
type ProcessingTask ¶
type ProcessingTask struct { Events []ChangeEvent Handler string Partition int Timestamp time.Time RetryCount int MaxRetries int }
ProcessingTask represents a task to process events
type RetryPolicy ¶
type RetryPolicy struct { MaxRetries int `json:"max_retries"` InitialBackoff time.Duration `json:"initial_backoff"` MaxBackoff time.Duration `json:"max_backoff"` Multiplier float64 `json:"multiplier"` }
RetryPolicy defines retry behavior
type SchemaChange ¶
type SchemaChange struct { Type string `json:"type"` // CREATE, ALTER, DROP Object string `json:"object"` // TABLE, INDEX, etc. Name string `json:"name"` Statement string `json:"statement"` Before map[string]interface{} `json:"before,omitempty"` After map[string]interface{} `json:"after,omitempty"` }
SchemaChange represents a DDL change event
type SourceInfo ¶
type SourceInfo struct { Name string `json:"name"` Database string `json:"database"` Table string `json:"table"` ConnectorType ConnectorType `json:"connector_type"` Version string `json:"version,omitempty"` Timestamp time.Time `json:"timestamp"` }
SourceInfo contains information about the data source
type StreamMetrics ¶
type StreamMetrics struct { EventsReceived int64 `json:"events_received"` EventsProcessed int64 `json:"events_processed"` EventsFiltered int64 `json:"events_filtered"` EventsErrored int64 `json:"events_errored"` EventsRetried int64 `json:"events_retried"` BatchesProcessed int64 `json:"batches_processed"` ProcessingLatency time.Duration `json:"processing_latency"` ThroughputRPS float64 `json:"throughput_rps"` BacklogSize int64 `json:"backlog_size"` LastProcessedTime time.Time `json:"last_processed_time"` // Per-handler metrics HandlerMetrics map[string]HandlerMetrics `json:"handler_metrics"` }
StreamMetrics contains metrics for stream processing
type StreamProcessor ¶
type StreamProcessor struct {
// contains filtered or unexported fields
}
StreamProcessor handles real-time processing of CDC events
func NewStreamProcessor ¶
func NewStreamProcessor(config StreamingConfig, logger *zap.Logger) *StreamProcessor
NewStreamProcessor creates a new stream processor
func (*StreamProcessor) AddFilter ¶
func (sp *StreamProcessor) AddFilter(filter EventFilter)
AddFilter adds an event filter
func (*StreamProcessor) GetDeadLetterQueueStats ¶
func (sp *StreamProcessor) GetDeadLetterQueueStats() DeadLetterStats
GetDeadLetterQueueStats returns dead letter queue statistics
func (*StreamProcessor) GetMetrics ¶
func (sp *StreamProcessor) GetMetrics() StreamMetrics
GetMetrics returns current stream processing metrics
func (*StreamProcessor) ProcessEvent ¶
func (sp *StreamProcessor) ProcessEvent(ctx context.Context, event ChangeEvent) error
ProcessEvent processes a single event
func (*StreamProcessor) ProcessEvents ¶
func (sp *StreamProcessor) ProcessEvents(ctx context.Context, events []ChangeEvent) error
ProcessEvents processes multiple events
func (*StreamProcessor) RegisterBatchHandler ¶
func (sp *StreamProcessor) RegisterBatchHandler(pattern string, handler BatchEventHandler)
RegisterBatchHandler registers a batch event handler
func (*StreamProcessor) RegisterHandler ¶
func (sp *StreamProcessor) RegisterHandler(pattern string, handler EventHandler)
RegisterHandler registers an event handler for a specific event type or table
func (*StreamProcessor) Start ¶
func (sp *StreamProcessor) Start(ctx context.Context) error
Start starts the stream processor
func (*StreamProcessor) Stop ¶
func (sp *StreamProcessor) Stop() error
Stop stops the stream processor gracefully
type StreamWorker ¶
type StreamWorker struct {
// contains filtered or unexported fields
}
StreamWorker processes events in parallel
type StreamingConfig ¶
type StreamingConfig struct { MaxBatchSize int `json:"max_batch_size"` BatchTimeout time.Duration `json:"batch_timeout"` MaxRetries int `json:"max_retries"` RetryBackoff time.Duration `json:"retry_backoff"` DeadLetterQueue string `json:"dead_letter_queue,omitempty"` ParallelWorkers int `json:"parallel_workers"` OrderingKey string `json:"ordering_key,omitempty"` CompressionType string `json:"compression_type,omitempty"` ExactlyOnce bool `json:"exactly_once"` }
StreamingConfig contains configuration for event streaming
func (*StreamingConfig) Validate ¶
func (c *StreamingConfig) Validate() error
Validate validates the streaming configuration
type TableSchema ¶
type TableSchema struct { Name string Columns []ColumnInfo PrimaryKey []string LastUpdated time.Time }
TableSchema represents PostgreSQL table schema information
type TransactionInfo ¶
type TransactionInfo struct { ID string `json:"id"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` EventCount int `json:"event_count"` Size int64 `json:"size"` }
TransactionInfo contains information about database transactions
type TruncatedArray ¶
TruncatedArray represents a truncated array field
type UpdateDescription ¶
type UpdateDescription struct { UpdatedFields map[string]interface{} `bson:"updatedFields,omitempty"` RemovedFields []string `bson:"removedFields,omitempty"` TruncatedArrays []TruncatedArray `bson:"truncatedArrays,omitempty"` }
UpdateDescription contains information about updated fields