datatable

package
v0.0.0-...-1512001 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2025 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculatePeriod

func CalculatePeriod(year, month, day int) (monthPeriod, weekPeriod, dayPeriod int)

Calculate the month, week, and day period since unix epoch (1/1/1970)

func DoNotifyApiGateway

func DoNotifyApiGateway(fileKey, apiEndpoint, apiEndpointJson, notificationTemplate string,
	customFileKeys []string, errMsg string, envSettings map[string]any) error

func EvalThrotting

func EvalThrotting(submRc, submT1c int64) (bool, error)

func GetLastComponent

func GetLastComponent(path string) (result string)

func GetOutputTables

func GetOutputTables(dbpool *pgxpool.Pool, pipelineExecutionKey int) ([]string, error)

func GetSchemaProviderJsonFromPipelineKey

func GetSchemaProviderJsonFromPipelineKey(dbpool *pgxpool.Pool, peKey int) (string, string, error)

Utility function to get the SchemaProvider json and session_id using the pipeline execution key

func GetSchemaProviderJsonFromPipelineSession

func GetSchemaProviderJsonFromPipelineSession(dbpool *pgxpool.Pool, sessionId string) (string, error)

Utility function to get the SchemaProvider json using the pipeline execution session id

func InsertSourcePeriod

func InsertSourcePeriod(dbpool *pgxpool.Pool, year, month, day int) (int, error)

Insert into source_period and returns the source_period.key If row already exist on table, return the key of that row without inserting a new one

func RegisterDomainTables

func RegisterDomainTables(dbpool *pgxpool.Pool, usingSshTunnel bool, pipelineExecutionKey int) error

Register Domain Table with input_registry

func RunUpdateDb

func RunUpdateDb(workspaceName string, serverArgs *[]string) (string, error)

Run update_db - function used by apiserver and server

func SplitFileKeyIntoComponents

func SplitFileKeyIntoComponents(keyMap map[string]interface{}, fileKey *string) map[string]interface{}

func UnitTestWorkspaceAction

func UnitTestWorkspaceAction(ctx *DataTableContext, dataTableAction *DataTableAction, token string)

Execute pipeline in unit test mode

Types

type Column

type Column struct {
	Table        string `json:"table"`
	Column       string `json:"column"`
	CalculatedAs string `json:"calculatedAs"`
}

type DataTableAction

type DataTableAction struct {
	Action            string            `json:"action"`
	WorkspaceName     string            `json:"workspaceName"`
	WorkspaceBranch   string            `json:"workspaceBranch"`
	FeatureBranch     string            `json:"featureBranch"`
	RawQuery          string            `json:"query"`
	RawQueryMap       map[string]string `json:"query_map"`
	Columns           []Column          `json:"columns"`
	FromClauses       []FromClause      `json:"fromClauses"`
	WhereClauses      []WhereClause     `json:"whereClauses"`
	WithClauses       []WithClause      `json:"withClauses"`
	DistinctOnClauses []string          `json:"distinctOnClauses"`
	SortColumn        string            `json:"sortColumn"`
	SortColumnTable   string            `json:"sortColumnTable"`
	SortAscending     bool              `json:"sortAscending"`
	Offset            int               `json:"offset"`
	Limit             int               `json:"limit"`
	// used for raw_query & raw_query_tool action only
	RequestColumnDef bool `json:"requestColumnDef"`
	// other non-query properties
	SkipThrottling bool                     `json:"skipThrottling"`
	Data           []map[string]interface{} `json:"data"`
}

sql access builder SkipThrottling indicates not to put pipeline in pending

type DataTableColumnDef

type DataTableColumnDef struct {
	Index     int    `json:"index"`
	Name      string `json:"name"`
	Label     string `json:"label"`
	Tooltips  string `json:"tooltips"`
	IsNumeric bool   `json:"isnumeric"`
}

DataTableColumnDef used when returning the column definition obtained from db catalog

func (*DataTableColumnDef) String

func (dc *DataTableColumnDef) String() string

type DataTableContext

type DataTableContext struct {
	Dbpool         *pgxpool.Pool
	DevMode        bool
	UsingSshTunnel bool
	UnitTestDir    *string
	AdminEmail     *string
}

Environment and settings needed

func NewDataTableContext

func NewDataTableContext(dbpool *pgxpool.Pool, devMode bool, usingSshTunnel bool,
	unitTestDir *string, adminEmail *string) *DataTableContext

func (*DataTableContext) AddWorkspaceFile

func (ctx *DataTableContext) AddWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)

AddWorkspaceFile

func (*DataTableContext) DeleteAllWorkspaceChanges

func (ctx *DataTableContext) DeleteAllWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

DeleteAllWorkspaceChanges -------------------------------------------------------------------------- Function to delete workspace file changes based on rows in workspace_changes Delete the workspace_changes row and the associated large object

func (*DataTableContext) DeleteWorkspaceChanges

func (ctx *DataTableContext) DeleteWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

DeleteWorkspaceChanges -------------------------------------------------------------------------- Function to delete workspace file changes based on rows in workspace_changes Delete the workspace_changes row and the associated large object Restaure files from stash, except for .db and .tgz files

func (*DataTableContext) DeleteWorkspaceFile

func (ctx *DataTableContext) DeleteWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)

DeleteWorkspaceFile

func (*DataTableContext) DoPreviewFileAction

func (ctx *DataTableContext) DoPreviewFileAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)

DoPreviewFileAction ------------------------------------------------------

func (*DataTableContext) DoReadAction

func (ctx *DataTableContext) DoReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)

DoReadAction ------------------------------------------------------

func (*DataTableContext) DoWorkspaceReadAction

func (ctx *DataTableContext) DoWorkspaceReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)

DoWorkspaceReadAction ------------------------------------------------------

func (*DataTableContext) DropTable

func (ctx *DataTableContext) DropTable(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

DropTable ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items

func (*DataTableContext) ExecDataManagementStatement

func (ctx *DataTableContext) ExecDataManagementStatement(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

func (*DataTableContext) ExecRawQuery

func (ctx *DataTableContext) ExecRawQuery(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

ExecRawQuery ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items

func (*DataTableContext) ExecRawQueryMap

func (ctx *DataTableContext) ExecRawQueryMap(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

ExecRawQueryMap ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items

func (*DataTableContext) GetTaskThrottlingInfo

func (ctx *DataTableContext) GetTaskThrottlingInfo(stateMachineName, taskStatus string) (int64, int64, error)

func (*DataTableContext) GetWorkspaceFileContent

func (ctx *DataTableContext) GetWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

GetWorkspaceFileContent -------------------------------------------------------------------------- Function to get the workspace file content based on relative file name Read the file from the workspace on file system since it's already in sync with database

func (*DataTableContext) InsertPipelineExecutionStatus

func (ctx *DataTableContext) InsertPipelineExecutionStatus(dataTableAction *DataTableAction, irow int, results *map[string]any) (peKey int, httpStatus int, err error)

Insert into pipeline_execution_status and in loader_execution_status (the latter will be depricated)

func (*DataTableContext) InsertRawRows

func (ctx *DataTableContext) InsertRawRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

InsertRawRows ------------------------------------------------------ Insert row function using a raw text buffer containing cst/tsv rows Delegates to InsertRows

func (*DataTableContext) InsertRows

func (ctx *DataTableContext) InsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

InsertRows ------------------------------------------------------ Main insert row function with pre processing hooks for validating/authorizing the request Main insert row function with post processing hooks for starting pipelines Inserting rows using pre-defined sql statements, keyed by table name provided in dataTableAction

func (*DataTableContext) LoadAllFiles

func (ctx *DataTableContext) LoadAllFiles(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)

Load All Files for client/org/object_type from a given day_period

func (*DataTableContext) PutSchemaEventToS3

func (ctx *DataTableContext) PutSchemaEventToS3(action *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)

Submit Schema Event to S3 (which will call RegisterFileKEys as side effect)

func (*DataTableContext) RegisterFileKeys

func (ctx *DataTableContext) RegisterFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)

Register file_key with file_key_staging table

func (*DataTableContext) SaveWorkspaceClientConfig

func (ctx *DataTableContext) SaveWorkspaceClientConfig(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

SaveWorkspaceClientConfig -------------------------------------------------------------------------- Function to save the workspace file content in local workspace file system and in database

func (*DataTableContext) SaveWorkspaceFileContent

func (ctx *DataTableContext) SaveWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

SaveWorkspaceFileContent -------------------------------------------------------------------------- Function to save the workspace file content in local workspace file system and in database

func (*DataTableContext) StartPendingTasks

func (ctx *DataTableContext) StartPendingTasks(stateMachineName string) (err error)

func (*DataTableContext) StartPipelinesForInputRegistryV2

func (ctx *DataTableContext) StartPipelinesForInputRegistryV2(inputRegistryKey, sourcePeriodKey int,
	inputSessionId, client, objectType, fileKey, token string) error

Start process based on matching criteria:

  • find pipelines that are ready to start with the input input_registry key.
  • Pipeline must have automated flag on

Note: the argument inputSessionId is the inputRegistryKey sessionId

func (*DataTableContext) SyncFileKeys

func (ctx *DataTableContext) SyncFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)

SyncFileKeys ------------------------------------------------------ 12/17/2023: Replacing all keys in file_key_staging to be able to reset keys from source_config that are Part File sources

func (*DataTableContext) VerifyUserPermission

func (ctx *DataTableContext) VerifyUserPermission(sqlStmt *SqlInsertDefinition, token string) (*user.User, error)

Check that the user has the required permission to execute the action

func (*DataTableContext) WorkspaceInsertRows

func (ctx *DataTableContext) WorkspaceInsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)

WorkspaceInsertRows ------------------------------------------------------ Main insert row function with pre processing hooks for validating/authorizing the request Main insert row function with post processing hooks to perform work async Inserting rows using pre-defined sql statements, keyed by table name provided in dataTableAction

func (*DataTableContext) WorkspaceQueryStructure

func (ctx *DataTableContext) WorkspaceQueryStructure(dataTableAction *DataTableAction, token string) (results *[]byte, httpStatus int, err error)

WorkspaceQueryStructure ------------------------------------------------------ Function to query the workspace structure, it returns a hierarchical structure modeled based on ui MenuEntry class. It uses a virtual table name to indicate the level of granularity of the structure dataTableAction.FromClauses[0].Table:

case "workspace_file_structure": structure based on files of the workspace
case "workspace_object_structure": structure based on object (rule, lookup, class, etc) of the workspace

Initial implementation use workspace_file_structure NOTE: routePath must correspond to the parametrized url (needed by ui MenuEntry) NOTE: routeParam contains the routePath parameters (needed by ui MenuEntry) Input dataTableAction.Data:

[
	{
		"key": "123",
		"workspace_name": "jets_ws",
		"user_email": "email here"
	}
]

Output results:

			{
				"key": "123",
				"workspace_name": "jets_ws",
			  "result_type": "workspace_file_structure",
				"result_data": [
					{
						"key": "a1",
           "type": "dir",
						"label": "Jet Rules",
						"route_path": "/workspace/:workspace_name/jetRules",
						"route_params": {
								"workspace_name": "jets_ws",
						},
						"children": [
							{
								"key": "a1.1",
               "type": "dir",
								"label": "folder name",
								"children": [
									{
										"key": "a1.1.1",
                   "type": "file",
										"label": "mapping_rules.jr",
										"route_path": "/workspace/:workspace_name/wsFile/:file_name",
										"route_params": {
											"workspace_name": "jets_ws",
											"file_name": "jet_rules%03mapping_rules.jr",
										}
							 	  }
								]
							}
						]
					}
				]
			}

type FromClause

type FromClause struct {
	Schema  string `json:"schema"`
	Table   string `json:"table"`
	AsTable string `json:"asTable"`
}

type PendingTask

type PendingTask struct {
	Key                  int64
	MainInputRegistryKey sql.NullInt64
	MainInputFileKey     sql.NullString
	Client               string
	ProcessName          string
	SessionId            string
	Status               string
	UserEmail            string
	FileSize             sql.NullInt64
}

type RegisterFileKeyAction

type RegisterFileKeyAction struct {
	Action          string                   `json:"action"`
	Data            []map[string]interface{} `json:"data"`
	NoAutomatedLoad bool                     `json:"noAutomatedLoad"`
	IsSchemaEvent   bool                     `json:"isSchemaEvent"`
}

type SourcePeriod

type SourcePeriod struct {
	Key         int `json:"key"`
	Year        int `json:"year"`
	Month       int `json:"month"`
	Day         int `json:"day"`
	MonthPeriod int `json:"month_period"`
	WeekPeriod  int `json:"week_period"`
	DayPeriod   int `json:"day_period"`
}

* TODO refactor to use SourcePeriod entity

func LoadSourcePeriod

func LoadSourcePeriod(dbpool *pgxpool.Pool, key int) (sp SourcePeriod, err error)

Load source period info from database by key

type SqlInsertDefinition

type SqlInsertDefinition struct {
	Stmt       string
	ColumnKeys []string
	AdminOnly  bool
	Capability string
}

Simple definition of sql statement for insert

type StatusUpdate

type StatusUpdate struct {
	CpipesMode            bool
	CpipesEnv             map[string]any
	UsingSshTunnel        bool
	Dbpool                *pgxpool.Pool
	PeKey                 int
	Status                string
	FileKey               string
	FailureDetails        string
	DoNotNotifyApiGateway bool
}

Status Update command line arguments When used as a delegate from apiserver Dbpool is non nil and then the connection properties (AwsDsnSecret, DbPoolSize, UsingSshTunnel, AwsRegion) are not needed.

func (*StatusUpdate) CoordinateWork

func (ca *StatusUpdate) CoordinateWork() error

func (*StatusUpdate) ValidateArguments

func (ca *StatusUpdate) ValidateArguments() []string

Package Main Functions --------------------------------------------------------------------------------------

type ThrottlingSpec

type ThrottlingSpec struct {
	MaxConcurrentPipelines int `json:"max_concurrent"`
	MaxPipeline            int `json:"max_for_size"`
	Size                   int `json:"size"`
}

Size in GiB

func (ThrottlingSpec) String

func (t ThrottlingSpec) String() string

type WhereClause

type WhereClause struct {
	Table    string   `json:"table"`
	Column   string   `json:"column"`
	Values   []string `json:"values"`
	JoinWith string   `json:"joinWith"`
	Like     string   `json:"like"`
	Ge       string   `json:"ge"`
	Le       string   `json:"le"`
	// Adding a simple or clause
	OrWith *WhereClause `json:"orWith"`
}

type WithClause

type WithClause struct {
	Name string `json:"name"`
	Stmt string `json:"stmt"`
}

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL