Documentation
¶
Overview ¶
This file corresponds to xmin based replication.
Index ¶
- Constants
- Variables
- func CancelTableAdditionFlow(ctx workflow.Context, input *protos.CancelTableAdditionInput) (*protos.CancelTableAdditionOutput, error)
- func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error
- func EndMaintenanceWorkflow(ctx workflow.Context, input *protos.EndMaintenanceFlowInput) (*protos.EndMaintenanceFlowOutput, error)
- func GetChildWorkflowID(prefix string, peerFlowName string, uuid string) string
- func GetFlowMetadataContext(ctx workflow.Context, input *protos.FlowContextMetadataInput) (workflow.Context, error)
- func GetPeerDBVersion(ctx workflow.Context) string
- func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T
- func GetUUID(ctx workflow.Context) string
- func GlobalScheduleManagerWorkflow(ctx workflow.Context) error
- func HeartbeatFlowWorkflow(ctx workflow.Context) error
- func QRepFlowWorkflow(ctx workflow.Context, config *protos.QRepConfig, state *protos.QRepFlowState) (*protos.QRepFlowState, error)
- func QRepPartitionWorkflow(ctx workflow.Context, config *protos.QRepConfig, ...) error
- func QRepWaitForNewRowsWorkflow(ctx workflow.Context, config *protos.QRepConfig, ...) error
- func RecordSlotSizeWorkflow(ctx workflow.Context) error
- func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry)
- func RunEndMaintenanceWorkflow(ctx context.Context, temporalClient client.Client, ...) (client.WorkflowRun, error)
- func RunStartMaintenanceWorkflow(ctx context.Context, temporalClient client.Client, ...) (client.WorkflowRun, error)
- func ScheduledTasksWorkflow(ctx workflow.Context) error
- func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigsCore) (*protos.SetupFlowOutput, error)
- func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool
- func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigsCore) error
- func StartMaintenanceWorkflow(ctx workflow.Context, input *protos.StartMaintenanceFlowInput) (*protos.StartMaintenanceFlowOutput, error)
- func XminFlowWorkflow(ctx workflow.Context, config *protos.QRepConfig, state *protos.QRepFlowState) (*protos.QRepFlowState, error)
- type CDCFlowWorkflowResult
- type QRepFlowExecution
- type QRepPartitionFlowExecution
- type SetupFlowExecution
- type SnapshotFlowExecution
Constants ¶
const ( SNAPSHOT_TYPE_UNKNOWN snapshotType = iota SNAPSHOT_TYPE_SLOT SNAPSHOT_TYPE_TX )
Variables ¶
var InitialLastPartition = &protos.QRepPartition{ PartitionId: "not-applicable-partition", Range: nil, }
Functions ¶
func CancelTableAdditionFlow ¶
func CancelTableAdditionFlow(ctx workflow.Context, input *protos.CancelTableAdditionInput) (*protos.CancelTableAdditionOutput, error)
func DropFlowWorkflow ¶
func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error
func EndMaintenanceWorkflow ¶
func EndMaintenanceWorkflow(ctx workflow.Context, input *protos.EndMaintenanceFlowInput) (*protos.EndMaintenanceFlowOutput, error)
func GetChildWorkflowID ¶
func GetFlowMetadataContext ¶
func GetPeerDBVersion ¶
func HeartbeatFlowWorkflow ¶
HeartbeatFlowWorkflow sends WAL heartbeats
func QRepFlowWorkflow ¶
func QRepFlowWorkflow( ctx workflow.Context, config *protos.QRepConfig, state *protos.QRepFlowState, ) (*protos.QRepFlowState, error)
func QRepPartitionWorkflow ¶
func QRepPartitionWorkflow( ctx workflow.Context, config *protos.QRepConfig, partitions *protos.QRepPartitionBatch, runUUID string, ) error
QRepPartitionWorkflow replicate a partition batch
func QRepWaitForNewRowsWorkflow ¶
func QRepWaitForNewRowsWorkflow(ctx workflow.Context, config *protos.QRepConfig, lastPartition *protos.QRepPartition) error
func RecordSlotSizeWorkflow ¶
RecordSlotSizeWorkflow monitors replication slot size
func RegisterFlowWorkerWorkflows ¶
func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry)
func RunEndMaintenanceWorkflow ¶
func RunEndMaintenanceWorkflow( ctx context.Context, temporalClient client.Client, input *protos.EndMaintenanceFlowInput, taskQueueId shared.TaskQueueID, ) (client.WorkflowRun, error)
RunEndMaintenanceWorkflow is a helper function to start the EndMaintenanceWorkflow with sane defaults
func RunStartMaintenanceWorkflow ¶
func RunStartMaintenanceWorkflow( ctx context.Context, temporalClient client.Client, input *protos.StartMaintenanceFlowInput, taskQueueId shared.TaskQueueID, ) (client.WorkflowRun, error)
RunStartMaintenanceWorkflow is a helper function to start the StartMaintenanceWorkflow with sane defaults
func ScheduledTasksWorkflow ¶
func SetupFlowWorkflow ¶
func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigsCore) (*protos.SetupFlowOutput, error)
SetupFlowWorkflow is the workflow that sets up the flow.
func SnapshotFlowWorkflow ¶
func SnapshotFlowWorkflow( ctx workflow.Context, config *protos.FlowConnectionConfigsCore, ) error
func StartMaintenanceWorkflow ¶
func StartMaintenanceWorkflow(ctx workflow.Context, input *protos.StartMaintenanceFlowInput) (*protos.StartMaintenanceFlowOutput, error)
func XminFlowWorkflow ¶
func XminFlowWorkflow( ctx workflow.Context, config *protos.QRepConfig, state *protos.QRepFlowState, ) (*protos.QRepFlowState, error)
Types ¶
type CDCFlowWorkflowResult ¶
type CDCFlowWorkflowResult = cdc_state.CDCFlowWorkflowState
CDCFlowWorkflowResult is the result of the PeerFlowWorkflow.
func CDCFlowWorkflow ¶
func CDCFlowWorkflow( ctx workflow.Context, cfg *protos.FlowConnectionConfigsCore, state *cdc_state.CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error)
type QRepFlowExecution ¶
type QRepFlowExecution struct {
// contains filtered or unexported fields
}
func (*QRepFlowExecution) SetupMetadataTables ¶
func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error
SetupMetadataTables creates the metadata tables for query based replication.
type QRepPartitionFlowExecution ¶
type QRepPartitionFlowExecution struct {
// contains filtered or unexported fields
}
type SetupFlowExecution ¶
SetupFlow is the workflow that is responsible for ensuring all the setup is done for a sync flow to execute.
The setup flow is responsible for:
- Global: - ensure that we are able to connect to the source and destination peers
- Source Peer: - setup the metadata table on the source peer - initialize pullability on the source peer, as an example on postgres: - ensuring the required table exists on the source peer - creating the slot and publication on the source peer
- Destination Peer: - setup the metadata table on the destination peer - creating the raw table on the destination peer - creating the normalized table on the destination peer
func NewSetupFlowExecution ¶
func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution
NewSetupFlowExecution creates a new instance of SetupFlowExecution.
type SnapshotFlowExecution ¶
type SnapshotFlowExecution struct {
// contains filtered or unexported fields
}