Documentation
¶
Index ¶
- Variables
- func ConstructColumnNameAvroFieldMap(fields []types.QField) map[string]string
- func ItemsToJSON(items Items) (string, error)
- func SleepFuture(ctx workflow.Context, d time.Duration) workflow.Future
- type BaseRecord
- type CDCFlowSignal
- type CDCStream
- func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) error
- func (r *CDCStream[T]) AddSchemaDelta(tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta)
- func (r *CDCStream[T]) ChannelLen() int
- func (r *CDCStream[T]) Close()
- func (r *CDCStream[T]) GetLastCheckpoint() CdcCheckpoint
- func (r *CDCStream[T]) GetRecords() <-chan Record[T]
- func (r *CDCStream[T]) NeedsNormalize() bool
- func (r *CDCStream[T]) SignalAsEmpty()
- func (r *CDCStream[T]) SignalAsNotEmpty()
- func (r *CDCStream[T]) UpdateLatestCheckpointID(val int64)
- func (r *CDCStream[T]) UpdateLatestCheckpointText(val string)
- func (r *CDCStream[T]) WaitAndCheckEmpty() bool
- type CdcCheckpoint
- type CdcColumnNumericTruncator
- type CdcTableNumericTruncator
- type DeleteRecord
- type HeaderProvider
- type InsertRecord
- type Items
- type MessageRecord
- type NameAndExclude
- type NormalizeRecordsRequest
- type NormalizeResponse
- type NullMismatchTracker
- type Object
- type PgItems
- func (r PgItems) AddColumn(col string, val []byte)
- func (r PgItems) DeleteColName(colName string)
- func (r PgItems) GetBytesByColName(colName string) ([]byte, error)
- func (r PgItems) GetColumnValue(col string) []byte
- func (r PgItems) Len() int
- func (r PgItems) MarshalJSON() ([]byte, error)
- func (r PgItems) ToJSON() (string, error)
- func (r PgItems) ToJSONWithOptions(options ToJSONOptions) (string, error)
- func (r PgItems) UpdateIfNotExists(input_ Items) []string
- func (r PgItems) UpdateWithBaseRecord(baseRecord BaseRecord)
- type PullRecordsRequest
- type QObjectStream
- func (s *QObjectStream) Close(err error)
- func (s *QObjectStream) Err() error
- func (s *QObjectStream) Format() (QObjectStreamFormat, error)
- func (s *QObjectStream) HeaderProvider() (HeaderProvider, error)
- func (s *QObjectStream) Schema() (types.QRecordSchema, error)
- func (s *QObjectStream) SetFormat(format QObjectStreamFormat)
- func (s *QObjectStream) SetHeaderProvider(provider HeaderProvider)
- func (s *QObjectStream) SetSchema(schema types.QRecordSchema)
- type QObjectStreamFormat
- type QRecordAvroChunkSizeTracker
- type QRecordAvroConverter
- type QRecordAvroField
- type QRecordAvroSchema
- type QRecordAvroSchemaDefinition
- type QRecordBatch
- type QRecordCopyFromSource
- type QRecordStream
- func (s *QRecordStream) Close(err error)
- func (s *QRecordStream) Err() error
- func (s *QRecordStream) IsSchemaSet() bool
- func (s *QRecordStream) Schema() (types.QRecordSchema, error)
- func (s *QRecordStream) SchemaChan() <-chan struct{}
- func (s *QRecordStream) SchemaDebug() *types.NullableSchemaDebug
- func (s *QRecordStream) SetSchema(schema types.QRecordSchema)
- func (s *QRecordStream) SetSchemaDebug(debug *types.NullableSchemaDebug)
- type Record
- type RecordItems
- func (r RecordItems) AddColumn(col string, val types.QValue)
- func (r RecordItems) DeleteColName(colName string)
- func (r RecordItems) GetBytesByColName(colName string) ([]byte, error)
- func (r RecordItems) GetColumnValue(col string) types.QValue
- func (r RecordItems) GetValueByColName(colName string) (types.QValue, error)
- func (r RecordItems) Len() int
- func (r RecordItems) MarshalJSON() ([]byte, error)
- func (r RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error)
- func (r RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error)
- func (r RecordItems) UpdateIfNotExists(input_ Items) []string
- func (r RecordItems) UpdateWithBaseRecord(baseRecord BaseRecord)
- type RecordTypeCounts
- type RecordsToStreamRequest
- type RelationMessageMapping
- type RelationRecord
- type RemoveFlowDetailsFromCatalogRequest
- type SetupReplicationResult
- type SnapshotTableNumericTruncator
- type StreamNumericTruncator
- type SyncCompositeResponse
- type SyncRecordsRequest
- type SyncResponse
- type TableWithPkey
- type ToJSONOptions
- type TypedReceiveChannel
- func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector
- func (self TypedReceiveChannel[T]) Receive(ctx workflow.Context) (T, bool)
- func (self TypedReceiveChannel[T]) ReceiveAsync() (T, bool)
- func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool)
- func (self TypedReceiveChannel[T]) ReceiveWithTimeout(ctx workflow.Context, timeout time.Duration) (T, bool, bool)
- type TypedSignal
- func (self TypedSignal[T]) GetSignalChannel(ctx workflow.Context) TypedReceiveChannel[T]
- func (self TypedSignal[T]) SignalChildWorkflow(ctx workflow.Context, wf workflow.ChildWorkflowFuture, value T) workflow.Future
- func (self TypedSignal[T]) SignalClientWorkflow(ctx context.Context, c client.Client, workflowID string, runID string, value T) error
- func (self TypedSignal[T]) SignalExternalWorkflow(ctx workflow.Context, workflowID string, runID string, value T) workflow.Future
- type UpdateRecord
Constants ¶
This section is empty.
Variables ¶
var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}
var FlowSignal = TypedSignal[CDCFlowSignal]{
Name: "peer-flow-signal",
}
var FlowSignalStateChange = TypedSignal[*protos.FlowStateChangeRequest]{
Name: "flow-state-change-signal",
}
var StartMaintenanceSignal = TypedSignal[*protos.StartMaintenanceSignal]{
Name: "start-maintenance-signal",
}
Functions ¶
func ItemsToJSON ¶
Types ¶
type BaseRecord ¶
type BaseRecord struct {
// CheckpointID is the ID of the record.
CheckpointID int64 `json:"checkpointId"`
// BeginMessage.CommitTime.UnixNano(), 16 bytes smaller than time.Time
CommitTimeNano int64 `json:"commitTimeNano"`
// TransactionID is the `XID` corresponding to the transaction that committed this record.
TransactionID uint64 `json:"transactionId"`
}
func (*BaseRecord) GetCheckpointID ¶
func (r *BaseRecord) GetCheckpointID() int64
func (*BaseRecord) GetCommitTime ¶
func (r *BaseRecord) GetCommitTime() time.Time
func (*BaseRecord) GetTransactionID ¶
func (r *BaseRecord) GetTransactionID() uint64
type CDCFlowSignal ¶
type CDCFlowSignal int32
const ( NoopSignal CDCFlowSignal = iota PauseSignal TerminateSignal ResyncSignal )
func FlowSignalHandler ¶
func FlowSignalHandler(activeSignal CDCFlowSignal, v CDCFlowSignal, logger log.Logger, ) CDCFlowSignal
type CDCStream ¶
type CDCStream[T Items] struct { // Schema changes from slot SchemaDeltas []*protos.TableSchemaDelta // contains filtered or unexported fields }
func NewCDCStream ¶
func (*CDCStream[T]) AddSchemaDelta ¶
func (r *CDCStream[T]) AddSchemaDelta( tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta, )
func (*CDCStream[T]) ChannelLen ¶
func (*CDCStream[T]) GetLastCheckpoint ¶
func (r *CDCStream[T]) GetLastCheckpoint() CdcCheckpoint
func (*CDCStream[T]) GetRecords ¶
func (*CDCStream[T]) NeedsNormalize ¶
func (*CDCStream[T]) SignalAsEmpty ¶
func (r *CDCStream[T]) SignalAsEmpty()
func (*CDCStream[T]) SignalAsNotEmpty ¶
func (r *CDCStream[T]) SignalAsNotEmpty()
func (*CDCStream[T]) UpdateLatestCheckpointID ¶
func (*CDCStream[T]) UpdateLatestCheckpointText ¶
func (*CDCStream[T]) WaitAndCheckEmpty ¶
type CdcCheckpoint ¶
type CdcColumnNumericTruncator ¶
type CdcColumnNumericTruncator struct {
Stat *qvalue.NumericStat
}
type CdcTableNumericTruncator ¶
type CdcTableNumericTruncator struct {
TruncatorsByColumn map[string]CdcColumnNumericTruncator
DestinationTable string
}
func NewCdcTableNumericTruncator ¶
func NewCdcTableNumericTruncator( destinationTable string, columnSettings []*protos.ColumnSetting, typesToSkip map[string]struct{}, ) CdcTableNumericTruncator
func (CdcTableNumericTruncator) CollectWarnings ¶
func (ts CdcTableNumericTruncator) CollectWarnings(warnings *shared.QRepWarnings)
func (CdcTableNumericTruncator) Get ¶
func (ts CdcTableNumericTruncator) Get(destinationColumn string) CdcColumnNumericTruncator
type DeleteRecord ¶
type DeleteRecord[T Items] struct { // Items is a map of column name to value. Items T // unchanged toast columns, filled from latest UpdateRecord UnchangedToastColumns map[string]struct{} // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string BaseRecord }
func (*DeleteRecord[T]) GetDestinationTableName ¶
func (r *DeleteRecord[T]) GetDestinationTableName() string
func (*DeleteRecord[T]) GetItems ¶
func (r *DeleteRecord[T]) GetItems() T
func (*DeleteRecord[T]) GetSourceTableName ¶
func (r *DeleteRecord[T]) GetSourceTableName() string
func (*DeleteRecord[T]) Kind ¶
func (*DeleteRecord[T]) Kind() string
func (*DeleteRecord[T]) PopulateCountMap ¶
func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
type HeaderProvider ¶
HeaderProvider provides HTTP headers for authenticating object requests.
type InsertRecord ¶
type InsertRecord[T Items] struct { // Items is a map of column name to value. Items T // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string // CommitID is the ID of the commit corresponding to this record. CommitID int64 BaseRecord }
func (*InsertRecord[T]) GetDestinationTableName ¶
func (r *InsertRecord[T]) GetDestinationTableName() string
func (*InsertRecord[T]) GetItems ¶
func (r *InsertRecord[T]) GetItems() T
func (*InsertRecord[T]) GetSourceTableName ¶
func (r *InsertRecord[T]) GetSourceTableName() string
func (*InsertRecord[T]) Kind ¶
func (*InsertRecord[T]) Kind() string
func (*InsertRecord[T]) PopulateCountMap ¶
func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
type MessageRecord ¶
type MessageRecord[T Items] struct { Prefix string Content string BaseRecord }
func (*MessageRecord[T]) GetDestinationTableName ¶
func (r *MessageRecord[T]) GetDestinationTableName() string
func (*MessageRecord[T]) GetItems ¶
func (r *MessageRecord[T]) GetItems() T
func (*MessageRecord[T]) GetSourceTableName ¶
func (r *MessageRecord[T]) GetSourceTableName() string
func (*MessageRecord[T]) Kind ¶
func (*MessageRecord[T]) Kind() string
func (*MessageRecord[T]) PopulateCountMap ¶
func (r *MessageRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
type NameAndExclude ¶
func NewNameAndExclude ¶
func NewNameAndExclude(name string, exclude []string) NameAndExclude
type NormalizeRecordsRequest ¶
type NormalizeResponse ¶
type NullMismatchTracker ¶
type NullMismatchTracker struct {
// contains filtered or unexported fields
}
NullMismatchTracker detects null values in columns that would be non-nullable under strict mode
func NewNullMismatchTracker ¶
func NewNullMismatchTracker( schemaDebug *types.NullableSchemaDebug, ) *NullMismatchTracker
func (*NullMismatchTracker) LogIfMismatch ¶
func (t *NullMismatchTracker) LogIfMismatch(ctx context.Context, logger log.Logger)
func (*NullMismatchTracker) RecordNull ¶
func (t *NullMismatchTracker) RecordNull(fieldIdx int)
type PgItems ¶
encoding/gob cannot encode unexported fields
func NewPgItems ¶
func (PgItems) DeleteColName ¶
func (PgItems) GetBytesByColName ¶
func (PgItems) GetColumnValue ¶
func (PgItems) MarshalJSON ¶
func (PgItems) ToJSONWithOptions ¶
func (r PgItems) ToJSONWithOptions(options ToJSONOptions) (string, error)
func (PgItems) UpdateIfNotExists ¶
UpdateIfNotExists takes in a RecordItems as input and updates the values of the current RecordItems with the values from the input RecordItems for the columns that are present in the input RecordItems but not in the current RecordItems. We return the slice of col names that were updated.
func (PgItems) UpdateWithBaseRecord ¶
func (r PgItems) UpdateWithBaseRecord(baseRecord BaseRecord)
type PullRecordsRequest ¶
type PullRecordsRequest[T Items] struct { // record batch for pushing changes into RecordStream *CDCStream[T] // ConsumedOffset can be reported as committed to reduce slot size ConsumedOffset *atomic.Int64 // FlowJobName is the name of the flow job. FlowJobName string // relId to name Mapping SrcTableIDNameMapping map[uint32]string // source to destination table name mapping TableNameMapping map[string]NameAndExclude // tablename to schema mapping TableNameSchemaMapping map[string]*protos.TableSchema // overrides dynamic configuration Env map[string]string // override publication name OverridePublicationName string // override replication slot name OverrideReplicationSlotName string // LastOffset is the latest LSN that was synced. LastOffset CdcCheckpoint // MaxBatchSize is the max number of records to fetch. MaxBatchSize uint32 // peerdb versioning to prevent breaking changes InternalVersion uint32 // IdleTimeout is the timeout to wait for new records. IdleTimeout time.Duration }
type QObjectStream ¶
type QObjectStream struct {
Objects chan *Object
// contains filtered or unexported fields
}
QObjectStream is a stream of HTTP objects represented with URLs that are directly consumable by any HTTP client when combined with authentication headers from the HeaderProvider.
func NewQObjectStream ¶
func NewQObjectStream(buffer int) *QObjectStream
func (*QObjectStream) Close ¶
func (s *QObjectStream) Close(err error)
func (*QObjectStream) Err ¶
func (s *QObjectStream) Err() error
func (*QObjectStream) Format ¶
func (s *QObjectStream) Format() (QObjectStreamFormat, error)
func (*QObjectStream) HeaderProvider ¶
func (s *QObjectStream) HeaderProvider() (HeaderProvider, error)
func (*QObjectStream) Schema ¶
func (s *QObjectStream) Schema() (types.QRecordSchema, error)
func (*QObjectStream) SetFormat ¶
func (s *QObjectStream) SetFormat(format QObjectStreamFormat)
func (*QObjectStream) SetHeaderProvider ¶
func (s *QObjectStream) SetHeaderProvider(provider HeaderProvider)
func (*QObjectStream) SetSchema ¶
func (s *QObjectStream) SetSchema(schema types.QRecordSchema)
type QObjectStreamFormat ¶
type QObjectStreamFormat string
const ( // QObjectStreamBigQueryExportParquetFormat is the Parquet format used by BigQuery export. // More details: https://docs.cloud.google.com/bigquery/docs/exporting-data#parquet_export_details QObjectStreamBigQueryExportParquetFormat QObjectStreamFormat = "parquet" )
type QRecordAvroConverter ¶
type QRecordAvroConverter struct {
Schema *QRecordAvroSchemaDefinition
ColNames []string
TargetDWH protos.DBType
UnboundedNumericAsString bool
NullMismatchTracker *NullMismatchTracker
// contains filtered or unexported fields
}
func NewQRecordAvroConverter ¶
func (*QRecordAvroConverter) Convert ¶
func (qac *QRecordAvroConverter) Convert( ctx context.Context, env map[string]string, qrecord []types.QValue, typeConversions map[string]types.TypeConversion, numericTruncator SnapshotTableNumericTruncator, format internal.BinaryFormat, calcSize bool, ) (map[string]any, int64, error)
type QRecordAvroField ¶
type QRecordAvroSchema ¶
type QRecordAvroSchema struct {
Type string `json:"type"`
Name string `json:"name"`
Fields []QRecordAvroField `json:"fields"`
}
type QRecordAvroSchemaDefinition ¶
type QRecordAvroSchemaDefinition struct {
Schema *avro.RecordSchema
Fields []types.QField
}
func GetAvroSchemaDefinition ¶
type QRecordBatch ¶
type QRecordBatch struct {
Schema types.QRecordSchema
Records [][]types.QValue
}
QRecordBatch holds a batch of []QValue slices
func (*QRecordBatch) FeedToQRecordStream ¶
func (q *QRecordBatch) FeedToQRecordStream(stream *QRecordStream)
func (*QRecordBatch) ToQRecordStream ¶
func (q *QRecordBatch) ToQRecordStream(buffer int) *QRecordStream
type QRecordCopyFromSource ¶
type QRecordCopyFromSource struct {
// contains filtered or unexported fields
}
func NewQRecordCopyFromSource ¶
func NewQRecordCopyFromSource( stream *QRecordStream, ) *QRecordCopyFromSource
func (*QRecordCopyFromSource) Err ¶
func (src *QRecordCopyFromSource) Err() error
func (*QRecordCopyFromSource) Next ¶
func (src *QRecordCopyFromSource) Next() bool
func (*QRecordCopyFromSource) Values ¶
func (src *QRecordCopyFromSource) Values() ([]any, error)
type QRecordStream ¶
func NewQRecordStream ¶
func NewQRecordStream(buffer int) *QRecordStream
func (*QRecordStream) Close ¶
func (s *QRecordStream) Close(err error)
Set error & close stream. Calling with multiple errors only tracks first error & does not panic. Close(nil) after an error won't panic, but Close after Close(nil) will panic, this is enough to be able to safely `defer stream.Close(nil)`.
func (*QRecordStream) Err ¶
func (s *QRecordStream) Err() error
func (*QRecordStream) IsSchemaSet ¶
func (s *QRecordStream) IsSchemaSet() bool
func (*QRecordStream) Schema ¶
func (s *QRecordStream) Schema() (types.QRecordSchema, error)
func (*QRecordStream) SchemaChan ¶
func (s *QRecordStream) SchemaChan() <-chan struct{}
func (*QRecordStream) SchemaDebug ¶
func (s *QRecordStream) SchemaDebug() *types.NullableSchemaDebug
func (*QRecordStream) SetSchema ¶
func (s *QRecordStream) SetSchema(schema types.QRecordSchema)
func (*QRecordStream) SetSchemaDebug ¶
func (s *QRecordStream) SetSchemaDebug(debug *types.NullableSchemaDebug)
type RecordItems ¶
encoding/gob cannot encode unexported fields
func NewMongoRecordItems ¶
func NewMongoRecordItems(capacity int) RecordItems
func NewRecordItems ¶
func NewRecordItems(capacity int) RecordItems
func (RecordItems) DeleteColName ¶
func (r RecordItems) DeleteColName(colName string)
func (RecordItems) GetBytesByColName ¶
func (r RecordItems) GetBytesByColName(colName string) ([]byte, error)
func (RecordItems) GetColumnValue ¶
func (r RecordItems) GetColumnValue(col string) types.QValue
func (RecordItems) GetValueByColName ¶
func (r RecordItems) GetValueByColName(colName string) (types.QValue, error)
func (RecordItems) Len ¶
func (r RecordItems) Len() int
func (RecordItems) MarshalJSON ¶
func (r RecordItems) MarshalJSON() ([]byte, error)
func (RecordItems) MarshalJSONWithOptions ¶
func (r RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error)
func (RecordItems) ToJSONWithOptions ¶
func (r RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error)
func (RecordItems) UpdateIfNotExists ¶
func (r RecordItems) UpdateIfNotExists(input_ Items) []string
UpdateIfNotExists takes in a RecordItems as input and updates the values of the current RecordItems with the values from the input RecordItems for the columns that are present in the input RecordItems but not in the current RecordItems. We return the slice of col names that were updated.
func (RecordItems) UpdateWithBaseRecord ¶
func (r RecordItems) UpdateWithBaseRecord(baseRecord BaseRecord)
type RecordTypeCounts ¶
type RecordsToStreamRequest ¶
type RecordsToStreamRequest[T Items] struct { TableMapping map[string]*RecordTypeCounts BatchID int64 UnboundedNumericAsString bool TargetDWH protos.DBType // contains filtered or unexported fields }
func NewRecordsToStreamRequest ¶
func NewRecordsToStreamRequest[T Items]( records <-chan Record[T], tableMapping map[string]*RecordTypeCounts, batchID int64, unboundedNumericAsString bool, targetDWH protos.DBType, ) *RecordsToStreamRequest[T]
func (*RecordsToStreamRequest[T]) GetRecords ¶
func (r *RecordsToStreamRequest[T]) GetRecords() <-chan Record[T]
type RelationMessageMapping ¶
type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage
type RelationRecord ¶
type RelationRecord[T Items] struct { TableSchemaDelta *protos.TableSchemaDelta `json:"tableSchemaDelta"` BaseRecord }
being clever and passing the delta back as a regular record instead of heavy CDC refactoring.
func (*RelationRecord[T]) GetDestinationTableName ¶
func (r *RelationRecord[T]) GetDestinationTableName() string
func (*RelationRecord[T]) GetItems ¶
func (r *RelationRecord[T]) GetItems() T
func (*RelationRecord[T]) GetSourceTableName ¶
func (r *RelationRecord[T]) GetSourceTableName() string
func (*RelationRecord[T]) Kind ¶
func (*RelationRecord[T]) Kind() string
func (*RelationRecord[T]) PopulateCountMap ¶
func (r *RelationRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
type SetupReplicationResult ¶
type SnapshotTableNumericTruncator ¶
type SnapshotTableNumericTruncator []qvalue.NumericStat
func NewSnapshotTableNumericTruncator ¶
func NewSnapshotTableNumericTruncator(destinationTable string, fields []types.QField) SnapshotTableNumericTruncator
func (SnapshotTableNumericTruncator) Get ¶
func (ts SnapshotTableNumericTruncator) Get(idx int) *qvalue.NumericStat
func (SnapshotTableNumericTruncator) Warnings ¶
func (ts SnapshotTableNumericTruncator) Warnings() shared.QRepWarnings
type StreamNumericTruncator ¶
type StreamNumericTruncator map[string]CdcTableNumericTruncator
func NewStreamNumericTruncator ¶
func NewStreamNumericTruncator(tableMappings []*protos.TableMapping, typesToSkip map[string]struct{}) StreamNumericTruncator
func (StreamNumericTruncator) Get ¶
func (ss StreamNumericTruncator) Get(destinationTable string) CdcTableNumericTruncator
func (StreamNumericTruncator) Warnings ¶
func (ss StreamNumericTruncator) Warnings() shared.QRepWarnings
type SyncCompositeResponse ¶
type SyncCompositeResponse struct {
SyncResponse *SyncResponse
NeedsNormalize bool
}
type SyncRecordsRequest ¶
type SyncRecordsRequest[T Items] struct { Records *CDCStream[T] // ConsumedOffset allows destination to confirm lsn for slot ConsumedOffset *atomic.Int64 // FlowJobName is the name of the flow job. FlowJobName string // destination table name -> schema mapping TableNameSchemaMapping map[string]*protos.TableSchema Env map[string]string // Staging path for AVRO files in CDC StagingPath string // Lua script Script string // source:destination mappings TableMappings []*protos.TableMapping SyncBatchID int64 Version uint32 }
type SyncResponse ¶
type SyncResponse struct {
// TableNameRowsMapping tells how many records need to be synced to each destination table.
TableNameRowsMapping map[string]*RecordTypeCounts
// to be carried to parent workflow
TableSchemaDeltas []*protos.TableSchemaDelta
// LastSyncedCheckpoint is the last state (eg LSN, GTID) that was synced.
LastSyncedCheckpoint CdcCheckpoint
// NumRecordsSynced is the number of records that were synced.
NumRecordsSynced int64
CurrentSyncBatchID int64
Warnings shared.QRepWarnings
}
type TableWithPkey ¶
type TableWithPkey struct {
TableName string
// SHA256 hash of the primary key columns
PkeyColVal [32]byte
}
func RecToTablePKey ¶
func RecToTablePKey[T Items]( tableNameSchemaMapping map[string]*protos.TableSchema, rec Record[T], ) (TableWithPkey, error)
type ToJSONOptions ¶
func NewToJSONOptions ¶
func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) ToJSONOptions
type TypedReceiveChannel ¶
type TypedReceiveChannel[T any] struct { Chan workflow.ReceiveChannel }
func (TypedReceiveChannel[T]) AddToSelector ¶
func (TypedReceiveChannel[T]) Receive ¶
func (self TypedReceiveChannel[T]) Receive(ctx workflow.Context) (T, bool)
func (TypedReceiveChannel[T]) ReceiveAsync ¶
func (self TypedReceiveChannel[T]) ReceiveAsync() (T, bool)
func (TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag ¶
func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool)
func (TypedReceiveChannel[T]) ReceiveWithTimeout ¶
type TypedSignal ¶
func (TypedSignal[T]) GetSignalChannel ¶
func (self TypedSignal[T]) GetSignalChannel(ctx workflow.Context) TypedReceiveChannel[T]
func (TypedSignal[T]) SignalChildWorkflow ¶
func (self TypedSignal[T]) SignalChildWorkflow( ctx workflow.Context, wf workflow.ChildWorkflowFuture, value T, ) workflow.Future
func (TypedSignal[T]) SignalClientWorkflow ¶
func (TypedSignal[T]) SignalExternalWorkflow ¶
type UpdateRecord ¶
type UpdateRecord[T Items] struct { // OldItems is a map of column name to value. OldItems T // NewItems is a map of column name to value. NewItems T // unchanged toast columns UnchangedToastColumns map[string]struct{} // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string BaseRecord }
func (*UpdateRecord[T]) GetDestinationTableName ¶
func (r *UpdateRecord[T]) GetDestinationTableName() string
func (*UpdateRecord[T]) GetItems ¶
func (r *UpdateRecord[T]) GetItems() T
func (*UpdateRecord[T]) GetSourceTableName ¶
func (r *UpdateRecord[T]) GetSourceTableName() string
func (*UpdateRecord[T]) Kind ¶
func (*UpdateRecord[T]) Kind() string
func (*UpdateRecord[T]) PopulateCountMap ¶
func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)