delegate

package
v0.0.0-...-1512001 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CoordinateWorkAndUpdateStatus

func CoordinateWorkAndUpdateStatus(ctx context.Context, dbpool *pgxpool.Pool, ca *CommandArguments) error

func GetOutputRecordCount

func GetOutputRecordCount(dbpool *pgxpool.Pool, sessionId string) (int64, int64)

Returns dbRecordCount (nbr of rows in pipeline_execution_details) and outputRecordCount (nbr of rows saved from server process)

func RegisterReport

func RegisterReport(dbpool *pgxpool.Pool, client, org, objectType, fileKey string,
	sourcePeriodKey, tableName, sourceType, sessionId, userEmail string) error

Types

type CommandArguments

type CommandArguments struct {
	WorkspaceName           string
	Client                  string
	Org                     string
	ObjectType              string
	Environment             string
	SessionId               string
	SourcePeriodKey         string
	ProcessName             string
	ReportName              string
	FileKey                 string
	OutputPath              string
	OriginalFileName        string
	ReportScriptPaths       []string
	CurrentReportDirectives *ReportDirectives
	BucketName              string
	RegionName              string
	SkipCompileWorkspace    bool
	FileKeyComponents       map[string]interface{}
}

func (*CommandArguments) DoCsvReport

func (ca *CommandArguments) DoCsvReport(dbpool *pgxpool.Pool, tempDir string, s3FileName *string, name string, sqlStmt *string) error

func (*CommandArguments) DoParquetReport

func (ca *CommandArguments) DoParquetReport(dbpool *pgxpool.Pool, tempDir string, s3FileName *string, name string, sqlStmt *string) error

func (*CommandArguments) DoReport

func (ca *CommandArguments) DoReport(dbpool *pgxpool.Pool, tempDir string, outputFileName *string, sqlStmt *string) (string, error)

The heavy lifting outputFileName is the name in the report sql file, this is mapped to a table name in ReportDirectives.ReportsAsTable

func (*CommandArguments) RunReports

func (ca *CommandArguments) RunReports(dbpool *pgxpool.Pool) (returnedErr error)

Main Functions --------------------------------------------------------------------------------------

func (*CommandArguments) RunSchemaProviderReportsCmds

func (ca *CommandArguments) RunSchemaProviderReportsCmds(ctx context.Context, dbpool *pgxpool.Pool,
	errCh chan<- error)

Run report commands specified by the schema provider of the main input source Note the errCh will be closed by this func either synchronously or async when the worker pool completes

type CommandWorker

type CommandWorker struct {
	// contains filtered or unexported fields
}

func NewCommandWorker

func NewCommandWorker(ctx context.Context, s3Client *s3.Client,
	done chan struct{}, errCh chan<- error) *CommandWorker

func (*CommandWorker) DoWork

func (ctx *CommandWorker) DoWork(workersTaskCh <-chan any)

type RegisterReportSpec

type RegisterReportSpec struct {
	TableName  string `json:"table_name"`
	Org        string `json:"org"`
	ObjectType string `json:"object_type"`
	SourceType string `json:"source_type"`
}

type ReportDirectives

type ReportDirectives struct {
	// InputPath is original fileKey, unless overriten in config file, used to emit sentinel file
	FilePathSubstitution []StringSubstitution         `json:"filePathSubstitution"`
	ReportScripts        []string                     `json:"reportScripts"`
	UpdateLookupTables   bool                         `json:"updateLookupTables"`
	EmitSentinelFile     *SentinelConfig              `json:"emitSentinelFile"`
	OutputS3Prefix       string                       `json:"outputS3Prefix"`
	InputPath            string                       `json:"inputPath"`
	OutputPath           string                       `json:"outputPath"`
	ReportProperties     map[string]ReportProperty    `json:"reportProperties"`
	StatementProperties  map[string]StatementProperty `json:"statementProperties"`
	RegisterReports      []RegisterReportSpec         `json:"registerReport"`
}

type ReportProperty

type ReportProperty struct {
	ReportOrScript  string            `json:"reportOrScript"`
	UpdatedFileKeys []string          `json:"updatedFileKeys"`
	RunWhen         []RunWhenCriteria `json:"runWhen"`
}

type RunWhenCriteria

type RunWhenCriteria struct {
	FileKeyComponent        string `json:"fileKeyComponent"`
	HasValue                string `json:"hasValue"`
	HasNotValue             string `json:"hasNotValue"`
	HasNonZeroOutputRecords bool   `json:"hasOutputRecordsOnly"`
}

type SentinelConfig

type SentinelConfig struct {
	FilePathSubstitution []StringSubstitution `json:"filePathSubstitution"`
}

type StatementProperty

type StatementProperty struct {
	Org          string            `json:"org"`
	ObjectType   string            `json:"object_type"`
	OutputFormat string            `json:"outputFormat"`
	RunWhen      []RunWhenCriteria `json:"runWhen"`
}

type StringSubstitution

type StringSubstitution struct {
	Replace string `json:"replace"`
	With    string `json:"with"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL