model

package
v0.0.0-...-bd61e48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
	Name: "cdc-dynamic-properties",
}
View Source
var FlowSignal = TypedSignal[CDCFlowSignal]{
	Name: "peer-flow-signal",
}
View Source
var FlowSignalStateChange = TypedSignal[*protos.FlowStateChangeRequest]{
	Name: "flow-state-change-signal",
}
View Source
var StartMaintenanceSignal = TypedSignal[*protos.StartMaintenanceSignal]{
	Name: "start-maintenance-signal",
}

Functions

func ConstructColumnNameAvroFieldMap

func ConstructColumnNameAvroFieldMap(fields []types.QField) map[string]string

func ItemsToJSON

func ItemsToJSON(items Items) (string, error)

func SleepFuture

func SleepFuture(ctx workflow.Context, d time.Duration) workflow.Future

Types

type BaseRecord

type BaseRecord struct {
	// CheckpointID is the ID of the record.
	CheckpointID int64 `json:"checkpointId"`
	// BeginMessage.CommitTime.UnixNano(), 16 bytes smaller than time.Time
	CommitTimeNano int64 `json:"commitTimeNano"`
	// TransactionID is the `XID` corresponding to the transaction that committed this record.
	TransactionID uint64 `json:"transactionId"`
}

func (*BaseRecord) GetCheckpointID

func (r *BaseRecord) GetCheckpointID() int64

func (*BaseRecord) GetCommitTime

func (r *BaseRecord) GetCommitTime() time.Time

func (*BaseRecord) GetTransactionID

func (r *BaseRecord) GetTransactionID() uint64

type CDCFlowSignal

type CDCFlowSignal int32
const (
	NoopSignal CDCFlowSignal = iota

	PauseSignal
	TerminateSignal
	ResyncSignal
)

func FlowSignalHandler

func FlowSignalHandler(activeSignal CDCFlowSignal,
	v CDCFlowSignal, logger log.Logger,
) CDCFlowSignal

type CDCStream

type CDCStream[T Items] struct {

	// Schema changes from slot
	SchemaDeltas []*protos.TableSchemaDelta
	// contains filtered or unexported fields
}

func NewCDCStream

func NewCDCStream[T Items](channelBuffer int) *CDCStream[T]

func (*CDCStream[T]) AddRecord

func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) error

func (*CDCStream[T]) AddSchemaDelta

func (r *CDCStream[T]) AddSchemaDelta(
	tableNameMapping map[string]NameAndExclude,
	delta *protos.TableSchemaDelta,
)

func (*CDCStream[T]) ChannelLen

func (r *CDCStream[T]) ChannelLen() int

func (*CDCStream[T]) Close

func (r *CDCStream[T]) Close()

func (*CDCStream[T]) GetLastCheckpoint

func (r *CDCStream[T]) GetLastCheckpoint() CdcCheckpoint

func (*CDCStream[T]) GetRecords

func (r *CDCStream[T]) GetRecords() <-chan Record[T]

func (*CDCStream[T]) NeedsNormalize

func (r *CDCStream[T]) NeedsNormalize() bool

func (*CDCStream[T]) SignalAsEmpty

func (r *CDCStream[T]) SignalAsEmpty()

func (*CDCStream[T]) SignalAsNotEmpty

func (r *CDCStream[T]) SignalAsNotEmpty()

func (*CDCStream[T]) UpdateLatestCheckpointID

func (r *CDCStream[T]) UpdateLatestCheckpointID(val int64)

func (*CDCStream[T]) UpdateLatestCheckpointText

func (r *CDCStream[T]) UpdateLatestCheckpointText(val string)

func (*CDCStream[T]) WaitAndCheckEmpty

func (r *CDCStream[T]) WaitAndCheckEmpty() bool

type CdcCheckpoint

type CdcCheckpoint struct {
	Text string
	ID   int64
}

type CdcColumnNumericTruncator

type CdcColumnNumericTruncator struct {
	Stat *qvalue.NumericStat
}

type CdcTableNumericTruncator

type CdcTableNumericTruncator struct {
	TruncatorsByColumn map[string]CdcColumnNumericTruncator
	DestinationTable   string
}

func NewCdcTableNumericTruncator

func NewCdcTableNumericTruncator(
	destinationTable string, columnSettings []*protos.ColumnSetting, typesToSkip map[string]struct{},
) CdcTableNumericTruncator

func (CdcTableNumericTruncator) CollectWarnings

func (ts CdcTableNumericTruncator) CollectWarnings(warnings *shared.QRepWarnings)

func (CdcTableNumericTruncator) Get

func (ts CdcTableNumericTruncator) Get(destinationColumn string) CdcColumnNumericTruncator

type DeleteRecord

type DeleteRecord[T Items] struct {
	// Items is a map of column name to value.
	Items T
	// unchanged toast columns, filled from latest UpdateRecord
	UnchangedToastColumns map[string]struct{}
	// Name of the source table
	SourceTableName string
	// Name of the destination table
	DestinationTableName string
	BaseRecord
}

func (*DeleteRecord[T]) GetDestinationTableName

func (r *DeleteRecord[T]) GetDestinationTableName() string

func (*DeleteRecord[T]) GetItems

func (r *DeleteRecord[T]) GetItems() T

func (*DeleteRecord[T]) GetSourceTableName

func (r *DeleteRecord[T]) GetSourceTableName() string

func (*DeleteRecord[T]) Kind

func (*DeleteRecord[T]) Kind() string

func (*DeleteRecord[T]) PopulateCountMap

func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)

type HeaderProvider

type HeaderProvider interface {
	GetHeaders(ctx context.Context) (http.Header, error)
}

HeaderProvider provides HTTP headers for authenticating object requests.

type InsertRecord

type InsertRecord[T Items] struct {
	// Items is a map of column name to value.
	Items T
	// Name of the source table
	SourceTableName string
	// Name of the destination table
	DestinationTableName string
	// CommitID is the ID of the commit corresponding to this record.
	CommitID int64
	BaseRecord
}

func (*InsertRecord[T]) GetDestinationTableName

func (r *InsertRecord[T]) GetDestinationTableName() string

func (*InsertRecord[T]) GetItems

func (r *InsertRecord[T]) GetItems() T

func (*InsertRecord[T]) GetSourceTableName

func (r *InsertRecord[T]) GetSourceTableName() string

func (*InsertRecord[T]) Kind

func (*InsertRecord[T]) Kind() string

func (*InsertRecord[T]) PopulateCountMap

func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)

type Items

type Items interface {
	json.Marshaler
	UpdateIfNotExists(Items) []string
	UpdateWithBaseRecord(BaseRecord)
	GetBytesByColName(string) ([]byte, error)
	ToJSONWithOptions(ToJSONOptions) (string, error)
	DeleteColName(string)
}

type MessageRecord

type MessageRecord[T Items] struct {
	Prefix  string
	Content string
	BaseRecord
}

func (*MessageRecord[T]) GetDestinationTableName

func (r *MessageRecord[T]) GetDestinationTableName() string

func (*MessageRecord[T]) GetItems

func (r *MessageRecord[T]) GetItems() T

func (*MessageRecord[T]) GetSourceTableName

func (r *MessageRecord[T]) GetSourceTableName() string

func (*MessageRecord[T]) Kind

func (*MessageRecord[T]) Kind() string

func (*MessageRecord[T]) PopulateCountMap

func (r *MessageRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)

type NameAndExclude

type NameAndExclude struct {
	Exclude map[string]struct{}
	Name    string
}

func NewNameAndExclude

func NewNameAndExclude(name string, exclude []string) NameAndExclude

type NormalizeRecordsRequest

type NormalizeRecordsRequest struct {
	Env                    map[string]string
	TableNameSchemaMapping map[string]*protos.TableSchema
	FlowJobName            string
	SoftDeleteColName      string
	SyncedAtColName        string
	TableMappings          []*protos.TableMapping
	SyncBatchID            int64
	Version                uint32
}

type NormalizeResponse

type NormalizeResponse struct {
	StartBatchID int64
	EndBatchID   int64
}

type NullMismatchTracker

type NullMismatchTracker struct {
	// contains filtered or unexported fields
}

NullMismatchTracker detects null values in columns that would be non-nullable under strict mode

func NewNullMismatchTracker

func NewNullMismatchTracker(
	schemaDebug *types.NullableSchemaDebug,
) *NullMismatchTracker

func (*NullMismatchTracker) LogIfMismatch

func (t *NullMismatchTracker) LogIfMismatch(ctx context.Context, logger log.Logger)

func (*NullMismatchTracker) RecordNull

func (t *NullMismatchTracker) RecordNull(fieldIdx int)

type Object

type Object struct {
	URL  string
	Size int64
}

type PgItems

type PgItems struct {
	ColToVal map[string][]byte
}

encoding/gob cannot encode unexported fields

func NewPgItems

func NewPgItems(capacity int) PgItems

func (PgItems) AddColumn

func (r PgItems) AddColumn(col string, val []byte)

func (PgItems) DeleteColName

func (r PgItems) DeleteColName(colName string)

func (PgItems) GetBytesByColName

func (r PgItems) GetBytesByColName(colName string) ([]byte, error)

func (PgItems) GetColumnValue

func (r PgItems) GetColumnValue(col string) []byte

func (PgItems) Len

func (r PgItems) Len() int

func (PgItems) MarshalJSON

func (r PgItems) MarshalJSON() ([]byte, error)

func (PgItems) ToJSON

func (r PgItems) ToJSON() (string, error)

func (PgItems) ToJSONWithOptions

func (r PgItems) ToJSONWithOptions(options ToJSONOptions) (string, error)

func (PgItems) UpdateIfNotExists

func (r PgItems) UpdateIfNotExists(input_ Items) []string

UpdateIfNotExists takes in a RecordItems as input and updates the values of the current RecordItems with the values from the input RecordItems for the columns that are present in the input RecordItems but not in the current RecordItems. We return the slice of col names that were updated.

func (PgItems) UpdateWithBaseRecord

func (r PgItems) UpdateWithBaseRecord(baseRecord BaseRecord)

type PullRecordsRequest

type PullRecordsRequest[T Items] struct {
	// record batch for pushing changes into
	RecordStream *CDCStream[T]
	// ConsumedOffset can be reported as committed to reduce slot size
	ConsumedOffset *atomic.Int64
	// FlowJobName is the name of the flow job.
	FlowJobName string
	// relId to name Mapping
	SrcTableIDNameMapping map[uint32]string
	// source to destination table name mapping
	TableNameMapping map[string]NameAndExclude
	// tablename to schema mapping
	TableNameSchemaMapping map[string]*protos.TableSchema
	// overrides dynamic configuration
	Env map[string]string
	// override publication name
	OverridePublicationName string
	// override replication slot name
	OverrideReplicationSlotName string
	// LastOffset is the latest LSN that was synced.
	LastOffset CdcCheckpoint
	// MaxBatchSize is the max number of records to fetch.
	MaxBatchSize uint32
	// peerdb versioning to prevent breaking changes
	InternalVersion uint32
	// IdleTimeout is the timeout to wait for new records.
	IdleTimeout time.Duration
}

type QObjectStream

type QObjectStream struct {
	Objects chan *Object
	// contains filtered or unexported fields
}

QObjectStream is a stream of HTTP objects represented with URLs that are directly consumable by any HTTP client when combined with authentication headers from the HeaderProvider.

func NewQObjectStream

func NewQObjectStream(buffer int) *QObjectStream

func (*QObjectStream) Close

func (s *QObjectStream) Close(err error)

func (*QObjectStream) Err

func (s *QObjectStream) Err() error

func (*QObjectStream) Format

func (s *QObjectStream) Format() (QObjectStreamFormat, error)

func (*QObjectStream) HeaderProvider

func (s *QObjectStream) HeaderProvider() (HeaderProvider, error)

func (*QObjectStream) Schema

func (s *QObjectStream) Schema() (types.QRecordSchema, error)

func (*QObjectStream) SetFormat

func (s *QObjectStream) SetFormat(format QObjectStreamFormat)

func (*QObjectStream) SetHeaderProvider

func (s *QObjectStream) SetHeaderProvider(provider HeaderProvider)

func (*QObjectStream) SetSchema

func (s *QObjectStream) SetSchema(schema types.QRecordSchema)

type QObjectStreamFormat

type QObjectStreamFormat string
const (
	// QObjectStreamBigQueryExportParquetFormat is the Parquet format used by BigQuery export.
	// More details: https://docs.cloud.google.com/bigquery/docs/exporting-data#parquet_export_details
	QObjectStreamBigQueryExportParquetFormat QObjectStreamFormat = "parquet"
)

type QRecordAvroChunkSizeTracker

type QRecordAvroChunkSizeTracker struct {
	TrackUncompressed bool
	Bytes             atomic.Int64
}

type QRecordAvroConverter

type QRecordAvroConverter struct {
	Schema                   *QRecordAvroSchemaDefinition
	ColNames                 []string
	TargetDWH                protos.DBType
	UnboundedNumericAsString bool
	NullMismatchTracker      *NullMismatchTracker
	// contains filtered or unexported fields
}

func NewQRecordAvroConverter

func NewQRecordAvroConverter(
	ctx context.Context,
	env map[string]string,
	schema *QRecordAvroSchemaDefinition,
	targetDWH protos.DBType,
	colNames []string,
	logger log.Logger,
) (*QRecordAvroConverter, error)

func (*QRecordAvroConverter) Convert

func (qac *QRecordAvroConverter) Convert(
	ctx context.Context,
	env map[string]string,
	qrecord []types.QValue,
	typeConversions map[string]types.TypeConversion,
	numericTruncator SnapshotTableNumericTruncator,
	format internal.BinaryFormat,
	calcSize bool,
) (map[string]any, int64, error)

type QRecordAvroField

type QRecordAvroField struct {
	Type any    `json:"type"`
	Name string `json:"name"`
}

type QRecordAvroSchema

type QRecordAvroSchema struct {
	Type   string             `json:"type"`
	Name   string             `json:"name"`
	Fields []QRecordAvroField `json:"fields"`
}

type QRecordAvroSchemaDefinition

type QRecordAvroSchemaDefinition struct {
	Schema *avro.RecordSchema
	Fields []types.QField
}

func GetAvroSchemaDefinition

func GetAvroSchemaDefinition(
	ctx context.Context,
	env map[string]string,
	dstTableName string,
	qRecordSchema types.QRecordSchema,
	targetDWH protos.DBType,
	avroNameMap map[string]string,
) (*QRecordAvroSchemaDefinition, error)

type QRecordBatch

type QRecordBatch struct {
	Schema  types.QRecordSchema
	Records [][]types.QValue
}

QRecordBatch holds a batch of []QValue slices

func (*QRecordBatch) FeedToQRecordStream

func (q *QRecordBatch) FeedToQRecordStream(stream *QRecordStream)

func (*QRecordBatch) ToQRecordStream

func (q *QRecordBatch) ToQRecordStream(buffer int) *QRecordStream

type QRecordCopyFromSource

type QRecordCopyFromSource struct {
	// contains filtered or unexported fields
}

func NewQRecordCopyFromSource

func NewQRecordCopyFromSource(
	stream *QRecordStream,
) *QRecordCopyFromSource

func (*QRecordCopyFromSource) Err

func (src *QRecordCopyFromSource) Err() error

func (*QRecordCopyFromSource) Next

func (src *QRecordCopyFromSource) Next() bool

func (*QRecordCopyFromSource) Values

func (src *QRecordCopyFromSource) Values() ([]any, error)

type QRecordStream

type QRecordStream struct {
	Records chan []types.QValue
	// contains filtered or unexported fields
}

func NewQRecordStream

func NewQRecordStream(buffer int) *QRecordStream

func (*QRecordStream) Close

func (s *QRecordStream) Close(err error)

Set error & close stream. Calling with multiple errors only tracks first error & does not panic. Close(nil) after an error won't panic, but Close after Close(nil) will panic, this is enough to be able to safely `defer stream.Close(nil)`.

func (*QRecordStream) Err

func (s *QRecordStream) Err() error

func (*QRecordStream) IsSchemaSet

func (s *QRecordStream) IsSchemaSet() bool

func (*QRecordStream) Schema

func (s *QRecordStream) Schema() (types.QRecordSchema, error)

func (*QRecordStream) SchemaChan

func (s *QRecordStream) SchemaChan() <-chan struct{}

func (*QRecordStream) SchemaDebug

func (s *QRecordStream) SchemaDebug() *types.NullableSchemaDebug

func (*QRecordStream) SetSchema

func (s *QRecordStream) SetSchema(schema types.QRecordSchema)

func (*QRecordStream) SetSchemaDebug

func (s *QRecordStream) SetSchemaDebug(debug *types.NullableSchemaDebug)

type Record

type Record[T Items] interface {
	Kind() string
	GetCheckpointID() int64
	GetCommitTime() time.Time
	GetTransactionID() uint64
	GetDestinationTableName() string
	GetSourceTableName() string
	// get columns and values for the record
	GetItems() T
	PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
}

type RecordItems

type RecordItems struct {
	ColToVal               map[string]types.QValue
	TruncateThresholdBytes int
}

encoding/gob cannot encode unexported fields

func NewMongoRecordItems

func NewMongoRecordItems(capacity int) RecordItems

func NewRecordItems

func NewRecordItems(capacity int) RecordItems

func (RecordItems) AddColumn

func (r RecordItems) AddColumn(col string, val types.QValue)

func (RecordItems) DeleteColName

func (r RecordItems) DeleteColName(colName string)

func (RecordItems) GetBytesByColName

func (r RecordItems) GetBytesByColName(colName string) ([]byte, error)

func (RecordItems) GetColumnValue

func (r RecordItems) GetColumnValue(col string) types.QValue

func (RecordItems) GetValueByColName

func (r RecordItems) GetValueByColName(colName string) (types.QValue, error)

func (RecordItems) Len

func (r RecordItems) Len() int

func (RecordItems) MarshalJSON

func (r RecordItems) MarshalJSON() ([]byte, error)

func (RecordItems) MarshalJSONWithOptions

func (r RecordItems) MarshalJSONWithOptions(opts ToJSONOptions) ([]byte, error)

func (RecordItems) ToJSONWithOptions

func (r RecordItems) ToJSONWithOptions(options ToJSONOptions) (string, error)

func (RecordItems) UpdateIfNotExists

func (r RecordItems) UpdateIfNotExists(input_ Items) []string

UpdateIfNotExists takes in a RecordItems as input and updates the values of the current RecordItems with the values from the input RecordItems for the columns that are present in the input RecordItems but not in the current RecordItems. We return the slice of col names that were updated.

func (RecordItems) UpdateWithBaseRecord

func (r RecordItems) UpdateWithBaseRecord(baseRecord BaseRecord)

type RecordTypeCounts

type RecordTypeCounts struct {
	InsertCount atomic.Int32
	UpdateCount atomic.Int32
	DeleteCount atomic.Int32
}

type RecordsToStreamRequest

type RecordsToStreamRequest[T Items] struct {
	TableMapping             map[string]*RecordTypeCounts
	BatchID                  int64
	UnboundedNumericAsString bool
	TargetDWH                protos.DBType
	// contains filtered or unexported fields
}

func NewRecordsToStreamRequest

func NewRecordsToStreamRequest[T Items](
	records <-chan Record[T],
	tableMapping map[string]*RecordTypeCounts,
	batchID int64,
	unboundedNumericAsString bool,
	targetDWH protos.DBType,
) *RecordsToStreamRequest[T]

func (*RecordsToStreamRequest[T]) GetRecords

func (r *RecordsToStreamRequest[T]) GetRecords() <-chan Record[T]

type RelationMessageMapping

type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage

type RelationRecord

type RelationRecord[T Items] struct {
	TableSchemaDelta *protos.TableSchemaDelta `json:"tableSchemaDelta"`
	BaseRecord
}

being clever and passing the delta back as a regular record instead of heavy CDC refactoring.

func (*RelationRecord[T]) GetDestinationTableName

func (r *RelationRecord[T]) GetDestinationTableName() string

func (*RelationRecord[T]) GetItems

func (r *RelationRecord[T]) GetItems() T

func (*RelationRecord[T]) GetSourceTableName

func (r *RelationRecord[T]) GetSourceTableName() string

func (*RelationRecord[T]) Kind

func (*RelationRecord[T]) Kind() string

func (*RelationRecord[T]) PopulateCountMap

func (r *RelationRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)

type RemoveFlowDetailsFromCatalogRequest

type RemoveFlowDetailsFromCatalogRequest struct {
	FlowName string
	Resync   bool
}

type SetupReplicationResult

type SetupReplicationResult struct {
	Conn             interface{ Close(context.Context) error }
	SlotName         string
	SnapshotName     string
	SupportsTIDScans bool
}

type SnapshotTableNumericTruncator

type SnapshotTableNumericTruncator []qvalue.NumericStat

func NewSnapshotTableNumericTruncator

func NewSnapshotTableNumericTruncator(destinationTable string, fields []types.QField) SnapshotTableNumericTruncator

func (SnapshotTableNumericTruncator) Get

func (SnapshotTableNumericTruncator) Warnings

type StreamNumericTruncator

type StreamNumericTruncator map[string]CdcTableNumericTruncator

func NewStreamNumericTruncator

func NewStreamNumericTruncator(tableMappings []*protos.TableMapping, typesToSkip map[string]struct{}) StreamNumericTruncator

func (StreamNumericTruncator) Get

func (ss StreamNumericTruncator) Get(destinationTable string) CdcTableNumericTruncator

func (StreamNumericTruncator) Warnings

type SyncCompositeResponse

type SyncCompositeResponse struct {
	SyncResponse   *SyncResponse
	NeedsNormalize bool
}

type SyncRecordsRequest

type SyncRecordsRequest[T Items] struct {
	Records *CDCStream[T]
	// ConsumedOffset allows destination to confirm lsn for slot
	ConsumedOffset *atomic.Int64
	// FlowJobName is the name of the flow job.
	FlowJobName string
	// destination table name -> schema mapping
	TableNameSchemaMapping map[string]*protos.TableSchema
	Env                    map[string]string
	// Staging path for AVRO files in CDC
	StagingPath string
	// Lua script
	Script string
	// source:destination mappings
	TableMappings []*protos.TableMapping
	SyncBatchID   int64
	Version       uint32
}

type SyncResponse

type SyncResponse struct {
	// TableNameRowsMapping tells how many records need to be synced to each destination table.
	TableNameRowsMapping map[string]*RecordTypeCounts
	// to be carried to parent workflow
	TableSchemaDeltas []*protos.TableSchemaDelta
	// LastSyncedCheckpoint is the last state (eg LSN, GTID) that was synced.
	LastSyncedCheckpoint CdcCheckpoint
	// NumRecordsSynced is the number of records that were synced.
	NumRecordsSynced   int64
	CurrentSyncBatchID int64
	Warnings           shared.QRepWarnings
}

type TableWithPkey

type TableWithPkey struct {
	TableName string
	// SHA256 hash of the primary key columns
	PkeyColVal [32]byte
}

func RecToTablePKey

func RecToTablePKey[T Items](
	tableNameSchemaMapping map[string]*protos.TableSchema,
	rec Record[T],
) (TableWithPkey, error)

type ToJSONOptions

type ToJSONOptions struct {
	UnnestColumns map[string]struct{}
	HStoreAsJSON  bool
}

func NewToJSONOptions

func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) ToJSONOptions

type TypedReceiveChannel

type TypedReceiveChannel[T any] struct {
	Chan workflow.ReceiveChannel
}

func (TypedReceiveChannel[T]) AddToSelector

func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector

func (TypedReceiveChannel[T]) Receive

func (self TypedReceiveChannel[T]) Receive(ctx workflow.Context) (T, bool)

func (TypedReceiveChannel[T]) ReceiveAsync

func (self TypedReceiveChannel[T]) ReceiveAsync() (T, bool)

func (TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag

func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool)

func (TypedReceiveChannel[T]) ReceiveWithTimeout

func (self TypedReceiveChannel[T]) ReceiveWithTimeout(ctx workflow.Context, timeout time.Duration) (T, bool, bool)

type TypedSignal

type TypedSignal[T any] struct {
	Name string
}

func (TypedSignal[T]) GetSignalChannel

func (self TypedSignal[T]) GetSignalChannel(ctx workflow.Context) TypedReceiveChannel[T]

func (TypedSignal[T]) SignalChildWorkflow

func (self TypedSignal[T]) SignalChildWorkflow(
	ctx workflow.Context,
	wf workflow.ChildWorkflowFuture,
	value T,
) workflow.Future

func (TypedSignal[T]) SignalClientWorkflow

func (self TypedSignal[T]) SignalClientWorkflow(
	ctx context.Context,
	c client.Client,
	workflowID string,
	runID string,
	value T,
) error

func (TypedSignal[T]) SignalExternalWorkflow

func (self TypedSignal[T]) SignalExternalWorkflow(
	ctx workflow.Context,
	workflowID string,
	runID string,
	value T,
) workflow.Future

type UpdateRecord

type UpdateRecord[T Items] struct {
	// OldItems is a map of column name to value.
	OldItems T
	// NewItems is a map of column name to value.
	NewItems T
	// unchanged toast columns
	UnchangedToastColumns map[string]struct{}
	// Name of the source table
	SourceTableName string
	// Name of the destination table
	DestinationTableName string
	BaseRecord
}

func (*UpdateRecord[T]) GetDestinationTableName

func (r *UpdateRecord[T]) GetDestinationTableName() string

func (*UpdateRecord[T]) GetItems

func (r *UpdateRecord[T]) GetItems() T

func (*UpdateRecord[T]) GetSourceTableName

func (r *UpdateRecord[T]) GetSourceTableName() string

func (*UpdateRecord[T]) Kind

func (*UpdateRecord[T]) Kind() string

func (*UpdateRecord[T]) PopulateCountMap

func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL