activities

package
v0.0.0-...-7f0a130 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2026 License: AGPL-3.0 Imports: 45 Imported by: 0

Documentation

Overview

internal methods for flowable.go

TODO: This file contains temporary validation code used during the migration from v1 to v2 schema delta approaches. Once the v2 approach is fully rolled out and v1 is removed, this entire file should be deleted. The validation ensures that v2 produces identical results to v1 during the transition period.

Related cleanup tasks when deleting this file: - Remove applySchemaDeltasV1() function once v1 is deprecated - Clean up any references to this validation logic - Clean up `PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG` in dynconf.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunEveryIntervalUntilFinish

func RunEveryIntervalUntilFinish[T any](
	ctx context.Context,
	runFunc func() (finished bool, result T, err error),
	runInterval time.Duration,
	logMessage string,
	logInterval time.Duration,
	runBeforeFirstTick bool,
) (T, error)

Types

type CancelTableAdditionActivity

type CancelTableAdditionActivity struct {
	CatalogPool    shared.CatalogPool
	Alerter        *alerting.Alerter
	TemporalClient client.Client
	OtelManager    *otel_metrics.OtelManager
}

func (*CancelTableAdditionActivity) CleanupCurrentParentMirror

func (a *CancelTableAdditionActivity) CleanupCurrentParentMirror(ctx context.Context, flowJobName string, workflowId string) error

func (*CancelTableAdditionActivity) CleanupIncompleteTablesInStats

func (a *CancelTableAdditionActivity) CleanupIncompleteTablesInStats(
	ctx context.Context,
	flowJobName string,
	completedTables []*protos.TableMapping,
) error

func (*CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns

func (a *CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns(
	ctx context.Context,
	flowJobName string,
	workflowId string,
) ([]string, error)

GetCompletedTablesFromQrepRuns gets the list of tables in the addition request whose snapshot has completed

func (*CancelTableAdditionActivity) GetFlowInfoFromCatalog

func (a *CancelTableAdditionActivity) GetFlowInfoFromCatalog(
	ctx context.Context,
	flowJobName string,
) (*protos.GetFlowInfoToCancelFromCatalogOutput, error)

func (*CancelTableAdditionActivity) GetTableOIDsFromCatalog

func (a *CancelTableAdditionActivity) GetTableOIDsFromCatalog(
	ctx context.Context,
	flowJobName string,
	tableMappings []*protos.TableMapping,
) (map[uint32]string, error)

func (*CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable

func (a *CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable(
	ctx context.Context,
	flowJobName string,
	sourcePeerName string,
	publicationNameInConfig string,
	finalListOfTables []*protos.TableMapping,
) error

func (*CancelTableAdditionActivity) StartNewCDCFlow

func (a *CancelTableAdditionActivity) StartNewCDCFlow(
	ctx context.Context,
	flowConfig *protos.FlowConnectionConfigsCore,
	state *cdc_state.CDCFlowWorkflowState,
	workflowID string,
) error

func (*CancelTableAdditionActivity) UpdateCdcJobEntry

func (a *CancelTableAdditionActivity) UpdateCdcJobEntry(
	ctx context.Context,
	connectionConfigs *protos.FlowConnectionConfigsCore,
	workflowID string,
) error

func (*CancelTableAdditionActivity) WaitForNewRunningMirrorToBeInRunningState

func (a *CancelTableAdditionActivity) WaitForNewRunningMirrorToBeInRunningState(
	ctx context.Context,
	flowJobName string,
	workflowId string,
) error

type CheckMetadataTablesResult

type CheckMetadataTablesResult struct {
	NeedsSetupMetadataTables bool
}

type FlowableActivity

type FlowableActivity struct {
	CatalogPool    shared.CatalogPool
	Alerter        *alerting.Alerter
	OtelManager    *otel_metrics.OtelManager
	TemporalClient client.Client
}

func (*FlowableActivity) AddTablesToPublication

func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore,
	additionalTableMappings []*protos.TableMapping,
) error

func (*FlowableActivity) Alert

func (a *FlowableActivity) Alert(
	ctx context.Context,
	alert *protos.AlertInput,
) error

func (*FlowableActivity) CheckConnection

func (a *FlowableActivity) CheckConnection(
	ctx context.Context,
	config *protos.SetupInput,
) error

func (*FlowableActivity) CheckMetadataTables

func (a *FlowableActivity) CheckMetadataTables(
	ctx context.Context,
	config *protos.SetupInput,
) (*CheckMetadataTablesResult, error)

func (*FlowableActivity) CleanupQRepFlow

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error

func (*FlowableActivity) ConsolidateQRepPartitions

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
	runUUID string,
) error

func (*FlowableActivity) CreateNormalizedTable

CreateNormalizedTable creates normalized tables in destination.

func (*FlowableActivity) CreateRawTable

CreateRawTable creates a raw table in the destination flowable.

func (*FlowableActivity) CreateTablesFromExisting

func (*FlowableActivity) DeleteMirrorStats

func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error

func (*FlowableActivity) DropFlowDestination

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error

func (*FlowableActivity) DropFlowSource

func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error

func (*FlowableActivity) EnsurePullability

func (*FlowableActivity) GetFlowMetadata

NOTE: this activity is used on the path between CDCFlowWorkflow start and the signal handler for running state. If it's unable to progress for whatever reason, the upgrades will break and very unpleasant manual recovery will be needed. If you have to modify it, do it carefully and think through the edge cases.

func (*FlowableActivity) GetQRepPartitions

func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
	config *protos.QRepConfig,
	last *protos.QRepPartition,
	runUUID string,
) (*protos.QRepParitionResult, error)

GetQRepPartitions returns the partitions for a given QRepConfig.

func (*FlowableActivity) MigratePostgresTableOIDs

func (a *FlowableActivity) MigratePostgresTableOIDs(
	ctx context.Context,
	flowName string,
	oidToTableNameMapping map[uint32]string,
	tableMappings []*protos.TableMapping,
) error

*

  • MigratePostgresTableOIDs migrates the OIDs for source Postgres tables to the catalog's table_schema_mapping

func (*FlowableActivity) PeerDBFullRefreshOverwriteMode

func (a *FlowableActivity) PeerDBFullRefreshOverwriteMode(ctx context.Context, env map[string]string) (bool, error)

func (*FlowableActivity) QRepHasNewRows

func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
	config *protos.QRepConfig, last *protos.QRepPartition,
) (bool, error)

func (*FlowableActivity) RecordMetricsAggregates

func (a *FlowableActivity) RecordMetricsAggregates(ctx context.Context) error

func (*FlowableActivity) RecordMetricsCritical

func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error

func (*FlowableActivity) RecordSlotSizes

func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error

func (*FlowableActivity) RemoveFlowDetailsFromCatalog

func (a *FlowableActivity) RemoveFlowDetailsFromCatalog(
	ctx context.Context,
	req *model.RemoveFlowDetailsFromCatalogRequest,
) error

func (*FlowableActivity) RemoveTablesFromCatalog

func (a *FlowableActivity) RemoveTablesFromCatalog(
	ctx context.Context,
	cfg *protos.FlowConnectionConfigsCore,
	tablesToRemove []*protos.TableMapping,
) error

func (*FlowableActivity) RemoveTablesFromPublication

func (a *FlowableActivity) RemoveTablesFromPublication(
	ctx context.Context,
	cfg *protos.FlowConnectionConfigsCore,
	removedTablesMapping []*protos.TableMapping,
) error

func (*FlowableActivity) RemoveTablesFromRawTable

func (a *FlowableActivity) RemoveTablesFromRawTable(
	ctx context.Context,
	cfg *protos.FlowConnectionConfigsCore,
	tablesToRemove []*protos.TableMapping,
) error

func (*FlowableActivity) RenameTables

func (*FlowableActivity) ReplicateQRepPartitions

func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
	config *protos.QRepConfig,
	partitions *protos.QRepPartitionBatch,
	runUUID string,
) error

ReplicateQRepPartitions spawns multiple ReplicateQRepPartition

func (*FlowableActivity) ReplicateXminPartition

func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	runUUID string,
) (int64, error)

func (*FlowableActivity) ReportStatusMetric

func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos.FlowStatus) error

func (*FlowableActivity) ScheduledTasks

func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error

func (*FlowableActivity) SendWALHeartbeat

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error

func (*FlowableActivity) SetupMetadataTables

func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error

func (*FlowableActivity) SetupQRepMetadataTables

func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error

SetupQRepMetadataTables sets up the metadata tables for QReplication.

func (*FlowableActivity) SetupTableSchema

func (a *FlowableActivity) SetupTableSchema(
	ctx context.Context,
	config *protos.SetupTableSchemaBatchInput,
) error

SetupTableSchema populates table_schema_mapping

func (*FlowableActivity) SyncFlow

func (*FlowableActivity) UpdateCDCConfigInCatalogActivity

func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error

type MaintenanceActivity

type MaintenanceActivity struct {
	CatalogPool    shared.CatalogPool
	Alerter        *alerting.Alerter
	TemporalClient client.Client
	OtelManager    *otel_metrics.OtelManager
}

func (*MaintenanceActivity) BackgroundAlerter

func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error

func (*MaintenanceActivity) BackupAllPreviouslyRunningFlows

func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error

func (*MaintenanceActivity) CleanBackedUpFlows

func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error

func (*MaintenanceActivity) DisableMaintenanceMode

func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error

func (*MaintenanceActivity) EnableMaintenanceMode

func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error

func (*MaintenanceActivity) GetAllMirrors

func (*MaintenanceActivity) GetBackedUpFlows

func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error)

func (*MaintenanceActivity) PauseMirrorIfRunning

func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error)

func (*MaintenanceActivity) ResumeMirror

func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error

func (*MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates

func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates(
	ctx context.Context,
	skippedFlows map[string]struct{},
) (*protos.MaintenanceMirrors, error)

type PeerType

type PeerType string
const (
	Source      PeerType = "source"
	Destination PeerType = "destination"
)

type SlotSnapshotState

type SlotSnapshotState struct {
	// contains filtered or unexported fields
}

type SnapshotActivity

type SnapshotActivity struct {
	Alerter             *alerting.Alerter
	CatalogPool         shared.CatalogPool
	SlotSnapshotStates  map[string]SlotSnapshotState
	TxSnapshotStates    map[string]TxSnapshotState
	SnapshotStatesMutex sync.Mutex
}

func (*SnapshotActivity) CloseSlotKeepAlive

func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error

closes the slot signal

func (*SnapshotActivity) GetDefaultPartitionKeyForTables

func (*SnapshotActivity) GetPeerType

func (a *SnapshotActivity) GetPeerType(ctx context.Context, name string) (protos.DBType, error)

func (*SnapshotActivity) LoadTableSchema

func (a *SnapshotActivity) LoadTableSchema(
	ctx context.Context,
	flowName string,
	tableName string,
) (*protos.TableSchema, error)

func (*SnapshotActivity) MaintainTx

func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, flowName string, peer string, env map[string]string) error

func (*SnapshotActivity) SetupReplication

func (*SnapshotActivity) WaitForExportSnapshot

func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error)

type StreamCloser

type StreamCloser interface {
	Close(error)
}

type TxSnapshotState

type TxSnapshotState struct {
	SnapshotName string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL