peerflow

package
v0.0.0-...-bd61e48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Overview

This file corresponds to xmin based replication.

Index

Constants

View Source
const (
	SNAPSHOT_TYPE_UNKNOWN snapshotType = iota
	SNAPSHOT_TYPE_SLOT
	SNAPSHOT_TYPE_TX
)

Variables

View Source
var InitialLastPartition = &protos.QRepPartition{
	PartitionId: "not-applicable-partition",
	Range:       nil,
}

Functions

func DropFlowWorkflow

func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error

func GetChildWorkflowID

func GetChildWorkflowID(
	prefix string,
	peerFlowName string,
	uuid string,
) string

func GetFlowMetadataContext

func GetFlowMetadataContext(
	ctx workflow.Context,
	input *protos.FlowContextMetadataInput,
) (workflow.Context, error)

func GetPeerDBVersion

func GetPeerDBVersion(ctx workflow.Context) string

func GetSideEffect

func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T

func GetUUID

func GetUUID(ctx workflow.Context) string

func GlobalScheduleManagerWorkflow

func GlobalScheduleManagerWorkflow(ctx workflow.Context) error

func HeartbeatFlowWorkflow

func HeartbeatFlowWorkflow(ctx workflow.Context) error

HeartbeatFlowWorkflow sends WAL heartbeats

func QRepFlowWorkflow

func QRepFlowWorkflow(
	ctx workflow.Context,
	config *protos.QRepConfig,
	state *protos.QRepFlowState,
) (*protos.QRepFlowState, error)

func QRepPartitionWorkflow

func QRepPartitionWorkflow(
	ctx workflow.Context,
	config *protos.QRepConfig,
	partitions *protos.QRepPartitionBatch,
	runUUID string,
) error

QRepPartitionWorkflow replicate a partition batch

func QRepWaitForNewRowsWorkflow

func QRepWaitForNewRowsWorkflow(ctx workflow.Context, config *protos.QRepConfig, lastPartition *protos.QRepPartition) error

func RecordSlotSizeWorkflow

func RecordSlotSizeWorkflow(ctx workflow.Context) error

RecordSlotSizeWorkflow monitors replication slot size

func RegisterFlowWorkerWorkflows

func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry)

func RunEndMaintenanceWorkflow

func RunEndMaintenanceWorkflow(
	ctx context.Context,
	temporalClient client.Client,
	input *protos.EndMaintenanceFlowInput,
	taskQueueId shared.TaskQueueID,
) (client.WorkflowRun, error)

RunEndMaintenanceWorkflow is a helper function to start the EndMaintenanceWorkflow with sane defaults

func RunStartMaintenanceWorkflow

func RunStartMaintenanceWorkflow(
	ctx context.Context,
	temporalClient client.Client,
	input *protos.StartMaintenanceFlowInput,
	taskQueueId shared.TaskQueueID,
) (client.WorkflowRun, error)

RunStartMaintenanceWorkflow is a helper function to start the StartMaintenanceWorkflow with sane defaults

func ScheduledTasksWorkflow

func ScheduledTasksWorkflow(ctx workflow.Context) error

func SetupFlowWorkflow

func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigsCore) (*protos.SetupFlowOutput, error)

SetupFlowWorkflow is the workflow that sets up the flow.

func ShouldWorkflowContinueAsNew

func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool

func SnapshotFlowWorkflow

func SnapshotFlowWorkflow(
	ctx workflow.Context,
	config *protos.FlowConnectionConfigsCore,
) error

func XminFlowWorkflow

func XminFlowWorkflow(
	ctx workflow.Context,
	config *protos.QRepConfig,
	state *protos.QRepFlowState,
) (*protos.QRepFlowState, error)

Types

type CDCFlowWorkflowResult

type CDCFlowWorkflowResult = cdc_state.CDCFlowWorkflowState

CDCFlowWorkflowResult is the result of the PeerFlowWorkflow.

type QRepFlowExecution

type QRepFlowExecution struct {
	// contains filtered or unexported fields
}

func (*QRepFlowExecution) SetupMetadataTables

func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error

SetupMetadataTables creates the metadata tables for query based replication.

type QRepPartitionFlowExecution

type QRepPartitionFlowExecution struct {
	// contains filtered or unexported fields
}

type SetupFlowExecution

type SetupFlowExecution struct {
	log.Logger
	// contains filtered or unexported fields
}

SetupFlow is the workflow that is responsible for ensuring all the setup is done for a sync flow to execute.

The setup flow is responsible for:

  1. Global: - ensure that we are able to connect to the source and destination peers
  2. Source Peer: - setup the metadata table on the source peer - initialize pullability on the source peer, as an example on postgres: - ensuring the required table exists on the source peer - creating the slot and publication on the source peer
  3. Destination Peer: - setup the metadata table on the destination peer - creating the raw table on the destination peer - creating the normalized table on the destination peer

func NewSetupFlowExecution

func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution

NewSetupFlowExecution creates a new instance of SetupFlowExecution.

type SnapshotFlowExecution

type SnapshotFlowExecution struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL