worker

package
v0.0.0-...-628eae6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2020 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package worker contains tools to interact with the runs store, and process runs. A worker consumes runs, starts their jobs on Kubernetes, monitors them and updates their status.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRedisClient

func NewRedisClient() redis.UniversalClient

Types

type CloudProvider

type CloudProvider interface {
	// Runs the job on the cloud provider.
	// Blocks until the job completes.
	RunJob(job Job) error
}

type Event

type Event struct {
	Type    string
	Title   string
	Message string
}

type EventStore

type EventStore interface {
	// Creates a new event in the store,
	// ready to be consumed.
	CreateEvent(event Event) error
}

type Info

type Info struct {
	Name         string
	Queue        string
	ProcessQueue string
}

func NewInfo

func NewInfo() Info

type Job

type Job struct {
	Name  string
	Image string
	Run   string
}

type JobDependency

type JobDependency struct {
	JobID         string
	ExpectFailure bool
}

type K8SCloudProvider

type K8SCloudProvider struct {
	// contains filtered or unexported fields
}

func NewK8SCloudProvider

func NewK8SCloudProvider() K8SCloudProvider

If the Kubernetes client can not be created, this function panics.

func (K8SCloudProvider) RunJob

func (cp K8SCloudProvider) RunJob(job Job) error

type Recycler

type Recycler interface {
	// Start synchronizing with the recycler.
	// As the synchronization is a loop, it must be called in a goroutine.
	StartSync()
}

func NewRecycler

func NewRecycler(info Info) Recycler

type RedisEventStore

type RedisEventStore struct {
	// contains filtered or unexported fields
}

func NewRedisEventStore

func NewRedisEventStore() RedisEventStore

func (RedisEventStore) CreateEvent

func (es RedisEventStore) CreateEvent(event Event) error

type RedisRecycler

type RedisRecycler struct {
	// contains filtered or unexported fields
}

func (RedisRecycler) StartSync

func (r RedisRecycler) StartSync()

This function registers the worker with the recycler, and frequently sends keepalives. It should be called in a goroutine. In case of error, it keeps retrying.

type RedisRunStore

type RedisRunStore struct {
	// contains filtered or unexported fields
}

func NewRedisRunStore

func NewRedisRunStore(info Info) RedisRunStore

func (RedisRunStore) Close

func (rs RedisRunStore) Close(runKey string) error

func (RedisRunStore) GetJob

func (rs RedisRunStore) GetJob(jobKey string) (Job, error)

func (RedisRunStore) GetJobDependencies

func (rs RedisRunStore) GetJobDependencies(jobKey string) ([]JobDependency, error)

func (RedisRunStore) GetJobs

func (rs RedisRunStore) GetJobs(runKey string) ([]string, error)

func (RedisRunStore) NextRun

func (rs RedisRunStore) NextRun() (string, error)

func (RedisRunStore) SetJobStatus

func (rs RedisRunStore) SetJobStatus(jobKey, status string) error

func (RedisRunStore) SetRunStatus

func (rs RedisRunStore) SetRunStatus(runKey, status string) error

type RunStore

type RunStore interface {
	// Actively listens to new runs, and when a new run is available,
	// returns an arbitrary string identifier referencing it.
	// This identifier is used to refer to the run in store operations.
	// Runs returned by this function must be closed when no longer used.
	NextRun() (string, error)

	// Persists the run status in the store.
	// Status can be:
	// - PENDING
	// - RUNNING
	// - SUCCESSFUL
	// - FAILED
	// - CANCELED
	SetRunStatus(runID, status string) error

	// Returns a list of arbitrary string identifiers referencing all
	// jobs contained in the run.
	// A job identifier must be globally unique, meaning that "job1" from "run1"
	// and "job1" from "run2" must have different identifiers.
	GetJobs(runID string) ([]string, error)

	// Returns the Job structure corresponding to the identifier.
	GetJob(jobID string) (Job, error)

	// Persists the job status in the store.
	// Status can be:
	// - PENDING
	// - SKIPPED
	// - RUNNING
	// - SUCCESSFUL
	// - FAILED
	SetJobStatus(jobID, status string) error

	// Returns a list of arbitrary string identifiers referencing all
	// dependencies for the job.
	// A dependency identifier must be globally unique.
	GetJobDependencies(jobID string) ([]JobDependency, error)

	// Closes the run corresponding to the identifier.
	// Post-run operations are done in this function.
	Close(runID string) error
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func New

func New() Worker

func (Worker) ProcessNextRun

func (w Worker) ProcessNextRun(wg *sync.WaitGroup) error

ProcessNextRun is a blocking function, listening for a new run, and processing it in a goroutine.

func (Worker) Start

func (w Worker) Start()

Start launches the worker loop. It runs indefinitely. Upon starting, it synchronizes with the recycler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL