Documentation
¶
Index ¶
- Variables
- func DeleteReplica(conf *Conf, db DB, pk ReplicaPK) (err error)
- func DeleteWorker(conf *Conf, db DB, pk WorkerPK) (err error)
- func FmtPoolQueueName(conf *Conf, poolID string) string
- func FmtPoolQueueURL(conf *Conf, poolID string) string
- func FmtReplicaID(datasetID, workerID string) string
- func FmtWorkerQueueName(conf *Conf, poolID, workerID string) string
- func FmtWorkerQueueURL(conf *Conf, poolID, workerID string) string
- func HandleGateway(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
- func HandleRelease(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
- func HandleSchedule(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
- func Mux(conf *Conf, svc *Services) http.Handler
- func PutNewAlloc(conf *Conf, db DB, alloc *Alloc) (err error)
- func PutNewPool(conf *Conf, db DB, pool *Pool) (err error)
- func PutNewWorker(conf *Conf, db DB, worker *Worker) (err error)
- func PutReplica(conf *Conf, db DB, replica *Replica) (err error)
- func ReceiveEvals(conf *Conf, svc *Services, pool *Pool) (err error)
- func UpdateAllocTTL(conf *Conf, db DB, ttl int64, apk AllocPK) (err error)
- func UpdatePoolTTL(conf *Conf, db DB, ttl int64, pk PoolPK) (err error)
- func UpdateWorkerTTL(conf *Conf, db DB, ttl int64, pk WorkerPK) (err error)
- type Alloc
- type AllocPK
- type Conf
- type DB
- type Eval
- type GatewayRequest
- type GatewayResponse
- type Handler
- type Pool
- type PoolPK
- type Replica
- type ReplicaPK
- type Services
- type Worker
- type WorkerPK
Constants ¶
This section is empty.
Variables ¶
var ( //ErrAllocExists means a alloc exists while it was expected not to ErrAllocExists = errors.New("alloc already exists") //ErrAllocNotExists means a alloc was not found while expecting it to exist ErrAllocNotExists = errors.New("alloc doesn't exist") )
var ( //ErrPoolExists means a pool exists while it was expected not to ErrPoolExists = errors.New("pool already exists") //ErrPoolNotExists means a pool was not found while expecting it to exist ErrPoolNotExists = errors.New("pool doesn't exist") )
var ( //ErrWorkerExists means a worker exists while it was expected not to ErrWorkerExists = errors.New("worker already exists") //ErrWorkerNotExists means a worker was not found while expecting it to exist ErrWorkerNotExists = errors.New("worker doesn't exist") )
var Handlers = map[*regexp.Regexp]Handler{ regexp.MustCompile(`-schedule$`): HandleSchedule, regexp.MustCompile(`-release$`): HandleRelease, regexp.MustCompile(`-gateway$`): HandleGateway, }
Handlers map arn suffixes to actual event handlers
Functions ¶
func DeleteReplica ¶
DeleteReplica deletes a replica by pk
func DeleteWorker ¶
DeleteWorker deletes a worker by pk
func FmtPoolQueueName ¶
FmtPoolQueueName will format a sqs queue name consistently
func FmtPoolQueueURL ¶
FmtPoolQueueURL is able to "predict" an sqs queue url from configurations
func FmtReplicaID ¶
FmtReplicaID formats the combined pool and worker id of a replica
func FmtWorkerQueueName ¶
FmtWorkerQueueName will format a sqs queue name consistently
func FmtWorkerQueueURL ¶
FmtWorkerQueueURL is able to "predict" an sqs queue url from configurations
func HandleGateway ¶
func HandleGateway(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
HandleGateway takes invocations from the API Gateway and handles them as HTTP requests to return HTTP responses based on restful principles
func HandleRelease ¶
func HandleRelease(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
HandleRelease is a Lambda handler that periodically queries a pool's expired allocations, replicas and workers
func HandleSchedule ¶
func HandleSchedule(conf *Conf, svc *Services, ev json.RawMessage) (res interface{}, err error)
HandleSchedule is a Lambda handler that periodically reads from the scheduling queue and queries the workers table for available capacity. If the capacity can be claimed an allocation is created.
func PutNewAlloc ¶
PutNewAlloc will put an alloc with the condition the pk doesn't exist yet
func PutNewPool ¶
PutNewPool will put an pool with the condition the pk doesn't exist yet
func PutNewWorker ¶
PutNewWorker will put an worker with the condition the pk doesn't exist yet
func PutReplica ¶
PutReplica will put an replica with the condition the pk doesn't exist yet
func ReceiveEvals ¶
ReceiveEvals will long poll for scheduling messages on the scheduling queue of the pool
func UpdateAllocTTL ¶
UpdateAllocTTL under the condition that it exists
func UpdatePoolTTL ¶
UpdatePoolTTL under the condition that it exists
Types ¶
type Alloc ¶
type Alloc struct {
AllocPK
TTL int64 `dynamodbav:"ttl"`
WorkerID string `dynamodbav:"wrk"`
Eval *Eval `dynamodbav:"eval"`
}
Alloc represents a planned execution
type Conf ¶
type Conf struct {
Deployment string `envconfig:"DEPLOYMENT"`
AWSAccountID string `envconfig:"AWS_ACCOUNT_ID"`
AWSAccessKeyID string `envconfig:"AWS_ACCESS_KEY_ID"`
AWSSecretAccessKey string `envconfig:"AWS_SECRET_ACCESS_KEY"`
AWSRegion string `envconfig:"AWS_REGION"`
StripBaseMappings int `envconfig:"STRIP_BASE_MAPPINGS"`
PoolTTL int64 `envconfig:"POOL_TTL"`
WorkerTTL int64 `envconfig:"WORKER_TTL"`
ReplicaTTL int64 `envconfig:"REPLICA_TTL"`
AllocTTL int64 `envconfig:"ALLOC_TTL"`
MaxRetry int `envconfig:"MAX_RETRY"`
ScheduleDLQueueURL string `envconfig:"SCHEDULE_DLQUEUE_URL"`
PoolsTableName string `envconfig:"TABLE_NAME_POOLS"`
ReplicasTableName string `envconfig:"TABLE_NAME_REPLICAS"`
ReplicasTTLIdxName string `envconfig:"TABLE_IDX_REPLICAS_TTL"`
WorkersTTLIdxName string `envconfig:"TABLE_IDX_WORKERS_TTL"`
WorkersTableName string `envconfig:"TABLE_NAME_WORKERS"`
WorkersCapIdxName string `envconfig:"TABLE_IDX_WORKERS_CAP"`
AllocsTableName string `envconfig:"TABLE_NAME_ALLOCS"`
AllocsTTLIdxName string `envconfig:"TABLE_IDX_ALLOCS_TTL"`
}
Conf holds our configuration taken from the environment
type Eval ¶
type Eval struct {
Dataset string `dynamodbav:"set"` //certain dataset must be available
Size int `dynamodbav:"size"` //certain capacity must be available
Retry int `dynamodbav:"try"`
}
Eval is a scheduling evaluation
type GatewayRequest ¶
type GatewayRequest struct {
HTTPMethod string
Headers map[string]string
Resource string
PathParameters map[string]string
Path string
QueryStringParameters map[string]string
Body string
IsBase64Encoded bool
StageVariables map[string]string
}
GatewayRequest represents an Amazon API Gateway Proxy Event.
type GatewayResponse ¶
type GatewayResponse struct {
StatusCode int `json:"statusCode"`
Body string `json:"body"`
Headers map[string]string `json:"headers"`
}
GatewayResponse is returned to the API Gateway
type Handler ¶
type Handler func(conf *Conf, svc *Services, ev json.RawMessage) (interface{}, error)
Handler describes a Lambda handler that matches a specific suffic
type Pool ¶
Pool represents capacity provided by pools
func GetActivePool ¶
GetActivePool will get a pool by its pk but errors if it's disbanded
type PoolPK ¶
type PoolPK struct {
PoolID string `dynamodbav:"pool"`
}
PoolPK describes the pool's primary key in the base table
type Services ¶
type Services struct {
SQS sqsiface.SQSAPI //message queues
DB dynamodbiface.DynamoDBAPI //dynamodb nosql database
Logs *zap.Logger //logging service
}
Services hold our backend services