manager

package
v0.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkerNotFound     = errors.New("no such worker")
	ErrWorkloadNotFound   = errors.New("no such workload")
	ErrMissingAssociation = errors.New("missing association")
)
View Source
var (
	ErrInvalidEvent = errors.New("invalid event type")
)
View Source
var ErrWorkerExists = errors.New("worker already exists")
View Source
var ErrWorkloadExists = errors.New("workload already exists")

Functions

This section is empty.

Types

type Event

type Event struct {
	Type       EventType `json:"type"`
	ManagerID  string    `json:"managerId"`
	ResourceID string    `json:"resourceId"`
	WorkerID   string    `json:"workerId,omitempty"`
}

func NewWorkerAddedEvent

func NewWorkerAddedEvent(managerId string, worker Worker) Event

func NewWorkerDeletedEvent

func NewWorkerDeletedEvent(managerId string, worker Worker) Event

func NewWorkloadAddedEvent

func NewWorkloadAddedEvent(managerId string, workload Workload) Event

func NewWorkloadDeletedEvent

func NewWorkloadDeletedEvent(managerId string, workload Workload) Event

func NewWorkloadDistributedErrorEvent

func NewWorkloadDistributedErrorEvent(managerId, workerId string, workload Workload) Event

func NewWorkloadDistributedEvent

func NewWorkloadDistributedEvent(managerId, workerId string, workload Workload) Event

type EventType

type EventType int
const (
	EventWorkerAdded EventType = iota
	EventWorkerDeleted

	EventWorkloadAdded
	EventWorkloadDeleted

	EventWorkloadDistributed
	EventWorkloadDistributedError
)

func (EventType) MarshalJSON

func (e EventType) MarshalJSON() ([]byte, error)

func (EventType) String

func (e EventType) String() string

func (*EventType) UnmarshalJSON

func (e *EventType) UnmarshalJSON(val []byte) error

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, opts ...Option) (*Manager, error)

func (*Manager) AddWorker

func (m *Manager) AddWorker(ctx context.Context, w Worker) error

func (*Manager) AddWorkload

func (m *Manager) AddWorkload(ctx context.Context, wl Workload) error

func (*Manager) Assign added in v0.1.4

func (m *Manager) Assign(w Worker, wl Workload)

Assign a workload to a worker, commonly used to backfill when a manager is started post worker startup. Bypasses distribution steps for the workload.

func (*Manager) Associate added in v0.1.11

func (m *Manager) Associate(ctx context.Context, wl Workload, w Worker) error

func (*Manager) DeleteWorker

func (m *Manager) DeleteWorker(ctx context.Context, w Worker) error

func (*Manager) DeleteWorkload

func (m *Manager) DeleteWorkload(ctx context.Context, wl Workload) error

func (*Manager) GetAssociation added in v0.1.11

func (m *Manager) GetAssociation(ctx context.Context, wl Workload) (Worker, error)

func (*Manager) GetAssosiactions added in v0.1.11

func (m *Manager) GetAssosiactions(ctx context.Context, w Worker) ([]Workload, error)

func (*Manager) GetWorker added in v0.1.9

func (m *Manager) GetWorker(ctx context.Context, id string) (Worker, error)

func (*Manager) GetWorkload added in v0.1.9

func (m *Manager) GetWorkload(ctx context.Context, id string) (Workload, error)

func (*Manager) Stop

func (m *Manager) Stop() error

func (*Manager) Workers

func (m *Manager) Workers(ctx context.Context) ([]Worker, error)

func (*Manager) Workloads

func (m *Manager) Workloads(ctx context.Context) ([]Workload, error)

type MemoryStore added in v0.1.11

type MemoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStore added in v0.1.11

func NewMemoryStore() *MemoryStore

func (*MemoryStore) AddWorker added in v0.1.11

func (s *MemoryStore) AddWorker(_ context.Context, w Worker) error

func (*MemoryStore) AddWorkload added in v0.1.11

func (s *MemoryStore) AddWorkload(_ context.Context, wl Workload) error

func (*MemoryStore) Associate added in v0.1.11

func (s *MemoryStore) Associate(_ context.Context, wl Workload, w Worker) error

func (*MemoryStore) DeleteWorker added in v0.1.11

func (s *MemoryStore) DeleteWorker(_ context.Context, w Worker) error

func (*MemoryStore) DeleteWorkload added in v0.1.11

func (s *MemoryStore) DeleteWorkload(_ context.Context, wl Workload) error

func (*MemoryStore) Disassociate added in v0.1.11

func (s *MemoryStore) Disassociate(_ context.Context, wl Workload, w Worker) error

func (*MemoryStore) GetAllWorkers added in v0.1.11

func (s *MemoryStore) GetAllWorkers(_ context.Context) ([]Worker, error)

func (*MemoryStore) GetAllWorkloads added in v0.1.11

func (s *MemoryStore) GetAllWorkloads(_ context.Context) ([]Workload, error)

func (*MemoryStore) GetAssociation added in v0.1.11

func (s *MemoryStore) GetAssociation(_ context.Context, wl Workload) (Worker, error)

func (*MemoryStore) GetAssociations added in v0.1.11

func (s *MemoryStore) GetAssociations(_ context.Context, w Worker) ([]Workload, error)

func (*MemoryStore) GetWorker added in v0.1.11

func (s *MemoryStore) GetWorker(_ context.Context, id string) (Worker, error)

func (*MemoryStore) GetWorkload added in v0.1.11

func (s *MemoryStore) GetWorkload(_ context.Context, id string) (Workload, error)

func (*MemoryStore) Lock added in v0.1.11

func (s *MemoryStore) Lock()

func (*MemoryStore) Unlock added in v0.1.11

func (s *MemoryStore) Unlock()

func (*MemoryStore) UpdateWorkload added in v0.1.11

func (s *MemoryStore) UpdateWorkload(_ context.Context, wl Workload) error

type Option

type Option func(*Manager)

func WithCleanupInterval

func WithCleanupInterval(t time.Duration) Option

func WithCleanupMaxTime

func WithCleanupMaxTime(t time.Duration) Option

func WithDistributorInterval

func WithDistributorInterval(t time.Duration) Option

func WithManagerID

func WithManagerID(id string) Option

Set an explicit Manager ID, default: UUID

func WithMaxDelta added in v0.1.18

func WithMaxDelta(d int) Option

func WithRebalanceInterval

func WithRebalanceInterval(t time.Duration) Option

func WithSignaller added in v0.1.11

func WithSignaller(signaller Signals) Option

func WithStateStorage added in v0.1.11

func WithStateStorage(state StateStorage) Option

type Signals added in v0.1.11

type Signals interface {
	Event(Event)
	Error(error)
}

type Slog added in v0.1.11

type Slog interface {
	Info(string, ...any)
	Error(string, ...any)
}

type SlogSignaller added in v0.1.11

type SlogSignaller struct {
	// contains filtered or unexported fields
}

func NewSlogSignaller added in v0.1.11

func NewSlogSignaller(instance Slog) *SlogSignaller

func (*SlogSignaller) Error added in v0.1.11

func (s *SlogSignaller) Error(err error)

func (*SlogSignaller) Event added in v0.1.11

func (s *SlogSignaller) Event(e Event)

type StateStorage added in v0.1.11

type StateStorage interface {
	Lock()
	Unlock()

	GetAllWorkers(context.Context) ([]Worker, error)
	GetWorker(context.Context, string) (Worker, error)
	AddWorker(context.Context, Worker) error
	DeleteWorker(context.Context, Worker) error

	GetAllWorkloads(context.Context) ([]Workload, error)
	GetWorkload(context.Context, string) (Workload, error)
	AddWorkload(context.Context, Workload) error
	UpdateWorkload(context.Context, Workload) error
	DeleteWorkload(context.Context, Workload) error

	GetAssociation(context.Context, Workload) (Worker, error)
	GetAssociations(context.Context, Worker) ([]Workload, error)
	Associate(context.Context, Workload, Worker) error
	Disassociate(context.Context, Workload, Worker) error
}

type Status added in v0.1.11

type Status uint8
const (
	StatusInit Status = iota
	StatusDistributing
	StatusRunning
	StatusDown
	StatusErr
)

func (Status) MarshalJSON added in v0.1.11

func (s Status) MarshalJSON() ([]byte, error)

func (Status) String added in v0.1.11

func (s Status) String() string

func (*Status) UnmarshalJSON added in v0.1.11

func (s *Status) UnmarshalJSON(val []byte) error

type Worker

type Worker interface {
	GetID() string
	Unload(Workload) error
	Load(Workload) error
}

type Workload

type Workload interface {
	GetID() string
	GetStatus() Status
	SetStatus(Status)
	LastStatusChange() time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL