queueing

package
v0.0.0-...-ff2f52f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: CC0-1.0 Imports: 38 Imported by: 0

Documentation

Overview

Package queueing implements "job" processing for bulk export requests

Job Processing is handled by RiverQueue and consists of three main components: 1. ProcessJob: Main job, ie bulk export requests. 2. PrepareJob: Handles logic dedicated to creating subjobs for ProcessJob. 3. CleanupJob: Handles cleaning up old/archived bulk export job files.

There are three workers for each step above; they are assigned a "kind" of work and do that work only.

When a request comes in, the PrepareWorker will divide the steps into multiple pieces to be worked, depending on the number of beneficiaries and resources requested. Each of those pieces will enqueue a new Job which will be picked up by a jobProcessWorker.

Jobs are written to the application database. Jobs contain set of keys, which are generated in step 2 and then made available for the consumer that made the request.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartRiver

func StartRiver(db *sql.DB, numWorkers int) *queue

TODO: better dependency injection (db, worker, logger). Waiting for pgxv5 upgrade

Types

type CleanupJobWorker

type CleanupJobWorker struct {
	river.WorkerDefaults[worker_types.CleanupJobArgs]
	// contains filtered or unexported fields
}

TODO: Consider moving functions like cleanupJob and archiveExpiring to receiver methods of CleanupJobWorker

func NewCleanupJobWorker

func NewCleanupJobWorker(db *sql.DB) *CleanupJobWorker

func (*CleanupJobWorker) Work

type Enqueuer

type Enqueuer interface {
	AddJob(ctx context.Context, job worker_types.JobEnqueueArgs, priority int) error
	AddPrepareJob(ctx context.Context, job worker_types.PrepareJobArgs) error
}

Enqueuer only handles inserting job entries into the appropriate table

func NewEnqueuer

func NewEnqueuer(db *sql.DB, pool *pgxv5Pool.Pool) Enqueuer

Creates a river client for the Job queue. If the client does not call .Start(), then it is insert only We still need the workers and the types of workers to insert them

type JobWorker

type JobWorker struct {
	river.WorkerDefaults[worker_types.JobEnqueueArgs]
	// contains filtered or unexported fields
}

func (*JobWorker) Work

type MockEnqueuer

type MockEnqueuer struct {
	mock.Mock
}

MockEnqueuer is an autogenerated mock type for the Enqueuer type

func NewMockEnqueuer

func NewMockEnqueuer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockEnqueuer

NewMockEnqueuer creates a new instance of MockEnqueuer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockEnqueuer) AddJob

func (_mock *MockEnqueuer) AddJob(ctx context.Context, job worker_types.JobEnqueueArgs, priority int) error

AddJob provides a mock function for the type MockEnqueuer

func (*MockEnqueuer) AddPrepareJob

func (_mock *MockEnqueuer) AddPrepareJob(ctx context.Context, job worker_types.PrepareJobArgs) error

AddPrepareJob provides a mock function for the type MockEnqueuer

func (*MockEnqueuer) EXPECT

func (_m *MockEnqueuer) EXPECT() *MockEnqueuer_Expecter

type MockEnqueuer_AddJob_Call

type MockEnqueuer_AddJob_Call struct {
	*mock.Call
}

MockEnqueuer_AddJob_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddJob'

func (*MockEnqueuer_AddJob_Call) Return

func (*MockEnqueuer_AddJob_Call) Run

func (*MockEnqueuer_AddJob_Call) RunAndReturn

func (_c *MockEnqueuer_AddJob_Call) RunAndReturn(run func(ctx context.Context, job worker_types.JobEnqueueArgs, priority int) error) *MockEnqueuer_AddJob_Call

type MockEnqueuer_AddPrepareJob_Call

type MockEnqueuer_AddPrepareJob_Call struct {
	*mock.Call
}

MockEnqueuer_AddPrepareJob_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddPrepareJob'

func (*MockEnqueuer_AddPrepareJob_Call) Return

func (*MockEnqueuer_AddPrepareJob_Call) Run

func (*MockEnqueuer_AddPrepareJob_Call) RunAndReturn

type MockEnqueuer_Expecter

type MockEnqueuer_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockEnqueuer_Expecter) AddJob

func (_e *MockEnqueuer_Expecter) AddJob(ctx interface{}, job interface{}, priority interface{}) *MockEnqueuer_AddJob_Call

AddJob is a helper method to define mock.On call

  • ctx context.Context
  • job worker_types.JobEnqueueArgs
  • priority int

func (*MockEnqueuer_Expecter) AddPrepareJob

func (_e *MockEnqueuer_Expecter) AddPrepareJob(ctx interface{}, job interface{}) *MockEnqueuer_AddPrepareJob_Call

AddPrepareJob is a helper method to define mock.On call

  • ctx context.Context
  • job worker_types.PrepareJobArgs

type Notifier

type Notifier interface {
	PostMessageContext(context.Context, string, ...slack.MsgOption) (string, string, error)
}

type PrepareJobWorker

type PrepareJobWorker struct {
	river.WorkerDefaults[worker_types.PrepareJobArgs]
	// contains filtered or unexported fields
}

PrepareJobWorker has two BFD clients because it depends on a configuration variable that is not available until Work() is called. There were other discussed methods of injecting the client and overwriting the the basepath but ruled out due to the risk and time constraints. Many of the Service's functionality is used solely in this PrepareJob functionality and should eventually be migrated when time allows.

func NewPrepareJobWorker

func NewPrepareJobWorker(db *sql.DB) (*PrepareJobWorker, error)

func (*PrepareJobWorker) GetBundleLastUpdated

func (p *PrepareJobWorker) GetBundleLastUpdated(basepath string, jobData worker_types.JobEnqueueArgs) (time.Time, error)

GetBundleLastUpdated requests a fake patient in order to acquire the bundle's lastUpdated metadata.

func (*PrepareJobWorker) Work

type ValidateJobConfig

type ValidateJobConfig struct {
	WorkerInstance worker.Worker
	Logger         logrus.FieldLogger
	Repository     repository.Repository
	JobID          int64
	QJobID         int64
	Args           worker_types.JobEnqueueArgs
	ErrorCount     int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL