Documentation
¶
Overview ¶
Package queueing implements "job" processing for bulk export requests
Job Processing is handled by RiverQueue and consists of three main components: 1. ProcessJob: Main job, ie bulk export requests. 2. PrepareJob: Handles logic dedicated to creating subjobs for ProcessJob. 3. CleanupJob: Handles cleaning up old/archived bulk export job files.
There are three workers for each step above; they are assigned a "kind" of work and do that work only.
When a request comes in, the PrepareWorker will divide the steps into multiple pieces to be worked, depending on the number of beneficiaries and resources requested. Each of those pieces will enqueue a new Job which will be picked up by a jobProcessWorker.
Jobs are written to the application database. Jobs contain set of keys, which are generated in step 2 and then made available for the consumer that made the request.
Index ¶
- func StartRiver(db *sql.DB, numWorkers int) *queue
- type CleanupJobWorker
- type Enqueuer
- type JobWorker
- type MockEnqueuer
- type MockEnqueuer_AddJob_Call
- func (_c *MockEnqueuer_AddJob_Call) Return(err error) *MockEnqueuer_AddJob_Call
- func (_c *MockEnqueuer_AddJob_Call) Run(run func(ctx context.Context, job worker_types.JobEnqueueArgs, priority int)) *MockEnqueuer_AddJob_Call
- func (_c *MockEnqueuer_AddJob_Call) RunAndReturn(...) *MockEnqueuer_AddJob_Call
- type MockEnqueuer_AddPrepareJob_Call
- func (_c *MockEnqueuer_AddPrepareJob_Call) Return(err error) *MockEnqueuer_AddPrepareJob_Call
- func (_c *MockEnqueuer_AddPrepareJob_Call) Run(run func(ctx context.Context, job worker_types.PrepareJobArgs)) *MockEnqueuer_AddPrepareJob_Call
- func (_c *MockEnqueuer_AddPrepareJob_Call) RunAndReturn(run func(ctx context.Context, job worker_types.PrepareJobArgs) error) *MockEnqueuer_AddPrepareJob_Call
- type MockEnqueuer_Expecter
- type Notifier
- type PrepareJobWorker
- type ValidateJobConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StartRiver ¶
TODO: better dependency injection (db, worker, logger). Waiting for pgxv5 upgrade
Types ¶
type CleanupJobWorker ¶
type CleanupJobWorker struct { river.WorkerDefaults[worker_types.CleanupJobArgs] // contains filtered or unexported fields }
TODO: Consider moving functions like cleanupJob and archiveExpiring to receiver methods of CleanupJobWorker
func NewCleanupJobWorker ¶
func NewCleanupJobWorker(db *sql.DB) *CleanupJobWorker
func (*CleanupJobWorker) Work ¶
func (w *CleanupJobWorker) Work(ctx context.Context, rjob *river.Job[worker_types.CleanupJobArgs]) error
type Enqueuer ¶
type Enqueuer interface { AddJob(ctx context.Context, job worker_types.JobEnqueueArgs, priority int) error AddPrepareJob(ctx context.Context, job worker_types.PrepareJobArgs) error }
Enqueuer only handles inserting job entries into the appropriate table
type JobWorker ¶
type JobWorker struct { river.WorkerDefaults[worker_types.JobEnqueueArgs] // contains filtered or unexported fields }
func (*JobWorker) Work ¶
func (w *JobWorker) Work(ctx context.Context, rjob *river.Job[worker_types.JobEnqueueArgs]) error
type MockEnqueuer ¶
MockEnqueuer is an autogenerated mock type for the Enqueuer type
func NewMockEnqueuer ¶
func NewMockEnqueuer(t interface { mock.TestingT Cleanup(func()) }) *MockEnqueuer
NewMockEnqueuer creates a new instance of MockEnqueuer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockEnqueuer) AddJob ¶
func (_mock *MockEnqueuer) AddJob(ctx context.Context, job worker_types.JobEnqueueArgs, priority int) error
AddJob provides a mock function for the type MockEnqueuer
func (*MockEnqueuer) AddPrepareJob ¶
func (_mock *MockEnqueuer) AddPrepareJob(ctx context.Context, job worker_types.PrepareJobArgs) error
AddPrepareJob provides a mock function for the type MockEnqueuer
func (*MockEnqueuer) EXPECT ¶
func (_m *MockEnqueuer) EXPECT() *MockEnqueuer_Expecter
type MockEnqueuer_AddJob_Call ¶
MockEnqueuer_AddJob_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddJob'
func (*MockEnqueuer_AddJob_Call) Return ¶
func (_c *MockEnqueuer_AddJob_Call) Return(err error) *MockEnqueuer_AddJob_Call
func (*MockEnqueuer_AddJob_Call) Run ¶
func (_c *MockEnqueuer_AddJob_Call) Run(run func(ctx context.Context, job worker_types.JobEnqueueArgs, priority int)) *MockEnqueuer_AddJob_Call
func (*MockEnqueuer_AddJob_Call) RunAndReturn ¶
func (_c *MockEnqueuer_AddJob_Call) RunAndReturn(run func(ctx context.Context, job worker_types.JobEnqueueArgs, priority int) error) *MockEnqueuer_AddJob_Call
type MockEnqueuer_AddPrepareJob_Call ¶
MockEnqueuer_AddPrepareJob_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddPrepareJob'
func (*MockEnqueuer_AddPrepareJob_Call) Return ¶
func (_c *MockEnqueuer_AddPrepareJob_Call) Return(err error) *MockEnqueuer_AddPrepareJob_Call
func (*MockEnqueuer_AddPrepareJob_Call) Run ¶
func (_c *MockEnqueuer_AddPrepareJob_Call) Run(run func(ctx context.Context, job worker_types.PrepareJobArgs)) *MockEnqueuer_AddPrepareJob_Call
func (*MockEnqueuer_AddPrepareJob_Call) RunAndReturn ¶
func (_c *MockEnqueuer_AddPrepareJob_Call) RunAndReturn(run func(ctx context.Context, job worker_types.PrepareJobArgs) error) *MockEnqueuer_AddPrepareJob_Call
type MockEnqueuer_Expecter ¶
type MockEnqueuer_Expecter struct {
// contains filtered or unexported fields
}
func (*MockEnqueuer_Expecter) AddJob ¶
func (_e *MockEnqueuer_Expecter) AddJob(ctx interface{}, job interface{}, priority interface{}) *MockEnqueuer_AddJob_Call
AddJob is a helper method to define mock.On call
- ctx context.Context
- job worker_types.JobEnqueueArgs
- priority int
func (*MockEnqueuer_Expecter) AddPrepareJob ¶
func (_e *MockEnqueuer_Expecter) AddPrepareJob(ctx interface{}, job interface{}) *MockEnqueuer_AddPrepareJob_Call
AddPrepareJob is a helper method to define mock.On call
- ctx context.Context
- job worker_types.PrepareJobArgs
type PrepareJobWorker ¶
type PrepareJobWorker struct { river.WorkerDefaults[worker_types.PrepareJobArgs] // contains filtered or unexported fields }
PrepareJobWorker has two BFD clients because it depends on a configuration variable that is not available until Work() is called. There were other discussed methods of injecting the client and overwriting the the basepath but ruled out due to the risk and time constraints. Many of the Service's functionality is used solely in this PrepareJob functionality and should eventually be migrated when time allows.
func NewPrepareJobWorker ¶
func NewPrepareJobWorker(db *sql.DB) (*PrepareJobWorker, error)
func (*PrepareJobWorker) GetBundleLastUpdated ¶
func (p *PrepareJobWorker) GetBundleLastUpdated(basepath string, jobData worker_types.JobEnqueueArgs) (time.Time, error)
GetBundleLastUpdated requests a fake patient in order to acquire the bundle's lastUpdated metadata.
func (*PrepareJobWorker) Work ¶
func (w *PrepareJobWorker) Work(ctx context.Context, rjob *river.Job[worker_types.PrepareJobArgs]) error
type ValidateJobConfig ¶
type ValidateJobConfig struct { WorkerInstance worker.Worker Logger logrus.FieldLogger Repository repository.Repository JobID int64 QJobID int64 Args worker_types.JobEnqueueArgs ErrorCount int }