Documentation
¶
Overview ¶
internal methods for flowable.go
TODO: This file contains temporary validation code used during the migration from v1 to v2 schema delta approaches. Once the v2 approach is fully rolled out and v1 is removed, this entire file should be deleted. The validation ensures that v2 produces identical results to v1 during the transition period.
Related cleanup tasks when deleting this file: - Remove applySchemaDeltasV1() function once v1 is deprecated - Clean up any references to this validation logic - Clean up `PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG` in dynconf.go
Index ¶
- func RunEveryIntervalUntilFinish[T any](ctx context.Context, runFunc func() (finished bool, result T, err error), ...) (T, error)
- type CancelTableAdditionActivity
- func (a *CancelTableAdditionActivity) CleanupCurrentParentMirror(ctx context.Context, flowJobName string, workflowId string) error
- func (a *CancelTableAdditionActivity) CleanupIncompleteTablesInStats(ctx context.Context, flowJobName string, ...) error
- func (a *CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns(ctx context.Context, flowJobName string, workflowId string) ([]string, error)
- func (a *CancelTableAdditionActivity) GetFlowInfoFromCatalog(ctx context.Context, flowJobName string) (*protos.GetFlowInfoToCancelFromCatalogOutput, error)
- func (a *CancelTableAdditionActivity) GetTableOIDsFromCatalog(ctx context.Context, flowJobName string, tableMappings []*protos.TableMapping) (map[uint32]string, error)
- func (a *CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable(ctx context.Context, flowJobName string, sourcePeerName string, ...) error
- func (a *CancelTableAdditionActivity) StartNewCDCFlow(ctx context.Context, flowConfig *protos.FlowConnectionConfigsCore, ...) error
- func (a *CancelTableAdditionActivity) UpdateCdcJobEntry(ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, ...) error
- func (a *CancelTableAdditionActivity) WaitForNewRunningMirrorToBeInRunningState(ctx context.Context, flowJobName string, workflowId string) error
- type CheckMetadataTablesResult
- type FlowableActivity
- func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) Alert(ctx context.Context, alert *protos.AlertInput) error
- func (a *FlowableActivity) CheckConnection(ctx context.Context, config *protos.SetupInput) error
- func (a *FlowableActivity) CheckMetadataTables(ctx context.Context, config *protos.SetupInput) (*CheckMetadataTablesResult, error)
- func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
- func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig, runUUID string) error
- func (a *FlowableActivity) CreateNormalizedTable(ctx context.Context, config *protos.SetupNormalizedTableBatchInput) (*protos.SetupNormalizedTableBatchOutput, error)
- func (a *FlowableActivity) CreateRawTable(ctx context.Context, config *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)
- func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (*protos.CreateTablesFromExistingOutput, error)
- func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error
- func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error
- func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error
- func (a *FlowableActivity) EnsurePullability(ctx context.Context, config *protos.EnsurePullabilityBatchInput) (*protos.EnsurePullabilityBatchOutput, error)
- func (a *FlowableActivity) GetFlowMetadata(ctx context.Context, input *protos.FlowContextMetadataInput) (*protos.FlowContextMetadata, error)
- func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ...) (*protos.QRepParitionResult, error)
- func (a *FlowableActivity) MigratePostgresTableOIDs(ctx context.Context, flowName string, oidToTableNameMapping map[uint32]string, ...) error
- func (a *FlowableActivity) PeerDBFullRefreshOverwriteMode(ctx context.Context, env map[string]string) (bool, error)
- func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) (bool, error)
- func (a *FlowableActivity) RecordMetricsAggregates(ctx context.Context) error
- func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error
- func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error
- func (a *FlowableActivity) RemoveFlowDetailsFromCatalog(ctx context.Context, req *model.RemoveFlowDetailsFromCatalogRequest) error
- func (a *FlowableActivity) RemoveTablesFromCatalog(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) RemoveTablesFromPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) RemoveTablesFromRawTable(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error)
- func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, config *protos.QRepConfig, ...) error
- func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, config *protos.QRepConfig, ...) (int64, error)
- func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos.FlowStatus) error
- func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error
- func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error
- func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error
- func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error
- func (a *FlowableActivity) SetupTableSchema(ctx context.Context, config *protos.SetupTableSchemaBatchInput) error
- func (a *FlowableActivity) SyncFlow(ctx context.Context, config *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error
- type MaintenanceActivity
- func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error
- func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error
- func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error
- func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error
- func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error
- func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (*protos.MaintenanceMirrors, error)
- func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error)
- func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error)
- func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error
- func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates(ctx context.Context, skippedFlows map[string]struct{}) (*protos.MaintenanceMirrors, error)
- type PeerType
- type SlotSnapshotState
- type SnapshotActivity
- func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error
- func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(ctx context.Context, input *protos.FlowConnectionConfigsCore) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
- func (a *SnapshotActivity) GetPeerType(ctx context.Context, name string) (protos.DBType, error)
- func (a *SnapshotActivity) LoadTableSchema(ctx context.Context, flowName string, tableName string) (*protos.TableSchema, error)
- func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, flowName string, peer string, ...) error
- func (a *SnapshotActivity) SetupReplication(ctx context.Context, config *protos.SetupReplicationInput) (*protos.SetupReplicationOutput, error)
- func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error)
- type StreamCloser
- type TxSnapshotState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CancelTableAdditionActivity ¶
type CancelTableAdditionActivity struct {
CatalogPool shared.CatalogPool
Alerter *alerting.Alerter
TemporalClient client.Client
OtelManager *otel_metrics.OtelManager
}
func (*CancelTableAdditionActivity) CleanupCurrentParentMirror ¶
func (*CancelTableAdditionActivity) CleanupIncompleteTablesInStats ¶
func (a *CancelTableAdditionActivity) CleanupIncompleteTablesInStats( ctx context.Context, flowJobName string, completedTables []*protos.TableMapping, ) error
func (*CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns ¶
func (a *CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns( ctx context.Context, flowJobName string, workflowId string, ) ([]string, error)
GetCompletedTablesFromQrepRuns gets the list of tables in the addition request whose snapshot has completed
func (*CancelTableAdditionActivity) GetFlowInfoFromCatalog ¶
func (a *CancelTableAdditionActivity) GetFlowInfoFromCatalog( ctx context.Context, flowJobName string, ) (*protos.GetFlowInfoToCancelFromCatalogOutput, error)
func (*CancelTableAdditionActivity) GetTableOIDsFromCatalog ¶
func (a *CancelTableAdditionActivity) GetTableOIDsFromCatalog( ctx context.Context, flowJobName string, tableMappings []*protos.TableMapping, ) (map[uint32]string, error)
func (*CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable ¶
func (a *CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable( ctx context.Context, flowJobName string, sourcePeerName string, publicationNameInConfig string, finalListOfTables []*protos.TableMapping, ) error
func (*CancelTableAdditionActivity) StartNewCDCFlow ¶
func (a *CancelTableAdditionActivity) StartNewCDCFlow( ctx context.Context, flowConfig *protos.FlowConnectionConfigsCore, state *cdc_state.CDCFlowWorkflowState, workflowID string, ) error
func (*CancelTableAdditionActivity) UpdateCdcJobEntry ¶
func (a *CancelTableAdditionActivity) UpdateCdcJobEntry( ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, ) error
func (*CancelTableAdditionActivity) WaitForNewRunningMirrorToBeInRunningState ¶
type CheckMetadataTablesResult ¶
type CheckMetadataTablesResult struct {
NeedsSetupMetadataTables bool
}
type FlowableActivity ¶
type FlowableActivity struct {
CatalogPool shared.CatalogPool
Alerter *alerting.Alerter
OtelManager *otel_metrics.OtelManager
TemporalClient client.Client
}
func (*FlowableActivity) AddTablesToPublication ¶
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, additionalTableMappings []*protos.TableMapping, ) error
func (*FlowableActivity) Alert ¶
func (a *FlowableActivity) Alert( ctx context.Context, alert *protos.AlertInput, ) error
func (*FlowableActivity) CheckConnection ¶
func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, ) error
func (*FlowableActivity) CheckMetadataTables ¶
func (a *FlowableActivity) CheckMetadataTables( ctx context.Context, config *protos.SetupInput, ) (*CheckMetadataTablesResult, error)
func (*FlowableActivity) CleanupQRepFlow ¶
func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
func (*FlowableActivity) ConsolidateQRepPartitions ¶
func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig, runUUID string, ) error
func (*FlowableActivity) CreateNormalizedTable ¶
func (a *FlowableActivity) CreateNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, ) (*protos.SetupNormalizedTableBatchOutput, error)
CreateNormalizedTable creates normalized tables in destination.
func (*FlowableActivity) CreateRawTable ¶
func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error)
CreateRawTable creates a raw table in the destination flowable.
func (*FlowableActivity) CreateTablesFromExisting ¶
func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) ( *protos.CreateTablesFromExistingOutput, error, )
func (*FlowableActivity) DeleteMirrorStats ¶
func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error
func (*FlowableActivity) DropFlowDestination ¶
func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error
func (*FlowableActivity) DropFlowSource ¶
func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error
func (*FlowableActivity) EnsurePullability ¶
func (a *FlowableActivity) EnsurePullability( ctx context.Context, config *protos.EnsurePullabilityBatchInput, ) (*protos.EnsurePullabilityBatchOutput, error)
func (*FlowableActivity) GetFlowMetadata ¶
func (a *FlowableActivity) GetFlowMetadata( ctx context.Context, input *protos.FlowContextMetadataInput, ) (*protos.FlowContextMetadata, error)
NOTE: this activity is used on the path between CDCFlowWorkflow start and the signal handler for running state. If it's unable to progress for whatever reason, the upgrades will break and very unpleasant manual recovery will be needed. If you have to modify it, do it carefully and think through the edge cases.
func (*FlowableActivity) GetQRepPartitions ¶
func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, runUUID string, ) (*protos.QRepParitionResult, error)
GetQRepPartitions returns the partitions for a given QRepConfig.
func (*FlowableActivity) MigratePostgresTableOIDs ¶
func (a *FlowableActivity) MigratePostgresTableOIDs( ctx context.Context, flowName string, oidToTableNameMapping map[uint32]string, tableMappings []*protos.TableMapping, ) error
*
- MigratePostgresTableOIDs migrates the OIDs for source Postgres tables to the catalog's table_schema_mapping
func (*FlowableActivity) PeerDBFullRefreshOverwriteMode ¶
func (*FlowableActivity) QRepHasNewRows ¶
func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ) (bool, error)
func (*FlowableActivity) RecordMetricsAggregates ¶
func (a *FlowableActivity) RecordMetricsAggregates(ctx context.Context) error
func (*FlowableActivity) RecordMetricsCritical ¶
func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error
func (*FlowableActivity) RecordSlotSizes ¶
func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error
func (*FlowableActivity) RemoveFlowDetailsFromCatalog ¶
func (a *FlowableActivity) RemoveFlowDetailsFromCatalog( ctx context.Context, req *model.RemoveFlowDetailsFromCatalogRequest, ) error
func (*FlowableActivity) RemoveTablesFromCatalog ¶
func (a *FlowableActivity) RemoveTablesFromCatalog( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tablesToRemove []*protos.TableMapping, ) error
func (*FlowableActivity) RemoveTablesFromPublication ¶
func (a *FlowableActivity) RemoveTablesFromPublication( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, removedTablesMapping []*protos.TableMapping, ) error
func (*FlowableActivity) RemoveTablesFromRawTable ¶
func (a *FlowableActivity) RemoveTablesFromRawTable( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tablesToRemove []*protos.TableMapping, ) error
func (*FlowableActivity) RenameTables ¶
func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error)
func (*FlowableActivity) ReplicateQRepPartitions ¶
func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, config *protos.QRepConfig, partitions *protos.QRepPartitionBatch, runUUID string, ) error
ReplicateQRepPartitions spawns multiple ReplicateQRepPartition
func (*FlowableActivity) ReplicateXminPartition ¶
func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, runUUID string, ) (int64, error)
func (*FlowableActivity) ReportStatusMetric ¶
func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos.FlowStatus) error
func (*FlowableActivity) ScheduledTasks ¶
func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error
func (*FlowableActivity) SendWALHeartbeat ¶
func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error
func (*FlowableActivity) SetupMetadataTables ¶
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error
func (*FlowableActivity) SetupQRepMetadataTables ¶
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error
SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (*FlowableActivity) SetupTableSchema ¶
func (a *FlowableActivity) SetupTableSchema( ctx context.Context, config *protos.SetupTableSchemaBatchInput, ) error
SetupTableSchema populates table_schema_mapping
func (*FlowableActivity) SyncFlow ¶
func (a *FlowableActivity) SyncFlow( ctx context.Context, config *protos.FlowConnectionConfigsCore, options *protos.SyncFlowOptions, ) error
func (*FlowableActivity) UpdateCDCConfigInCatalogActivity ¶
func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error
type MaintenanceActivity ¶
type MaintenanceActivity struct {
CatalogPool shared.CatalogPool
Alerter *alerting.Alerter
TemporalClient client.Client
OtelManager *otel_metrics.OtelManager
}
func (*MaintenanceActivity) BackgroundAlerter ¶
func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error
func (*MaintenanceActivity) BackupAllPreviouslyRunningFlows ¶
func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error
func (*MaintenanceActivity) CleanBackedUpFlows ¶
func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error
func (*MaintenanceActivity) DisableMaintenanceMode ¶
func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error
func (*MaintenanceActivity) EnableMaintenanceMode ¶
func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error
func (*MaintenanceActivity) GetAllMirrors ¶
func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (*protos.MaintenanceMirrors, error)
func (*MaintenanceActivity) GetBackedUpFlows ¶
func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error)
func (*MaintenanceActivity) PauseMirrorIfRunning ¶
func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error)
func (*MaintenanceActivity) ResumeMirror ¶
func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error
func (*MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates ¶
func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates( ctx context.Context, skippedFlows map[string]struct{}, ) (*protos.MaintenanceMirrors, error)
type SlotSnapshotState ¶
type SlotSnapshotState struct {
// contains filtered or unexported fields
}
type SnapshotActivity ¶
type SnapshotActivity struct {
Alerter *alerting.Alerter
CatalogPool shared.CatalogPool
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
SnapshotStatesMutex sync.Mutex
}
func (*SnapshotActivity) CloseSlotKeepAlive ¶
func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error
closes the slot signal
func (*SnapshotActivity) GetDefaultPartitionKeyForTables ¶
func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( ctx context.Context, input *protos.FlowConnectionConfigsCore, ) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
func (*SnapshotActivity) GetPeerType ¶
func (*SnapshotActivity) LoadTableSchema ¶
func (a *SnapshotActivity) LoadTableSchema( ctx context.Context, flowName string, tableName string, ) (*protos.TableSchema, error)
func (*SnapshotActivity) MaintainTx ¶
func (*SnapshotActivity) SetupReplication ¶
func (a *SnapshotActivity) SetupReplication( ctx context.Context, config *protos.SetupReplicationInput, ) (*protos.SetupReplicationOutput, error)
func (*SnapshotActivity) WaitForExportSnapshot ¶
func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error)
type StreamCloser ¶
type StreamCloser interface {
Close(error)
}
type TxSnapshotState ¶
type TxSnapshotState struct {
SnapshotName string
}