scheduler

package
v0.0.0-...-675e061 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: AGPL-3.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Beta9WorkerLabelKey         string = "run.beam.cloud/role"
	Beta9WorkerLabelValue       string = "worker"
	Beta9WorkerJobPrefix        string = "worker"
	Beta9MachineLabelIDKey      string = "run.beam.cloud/machine-id"
	Beta9WorkerLabelIDKey       string = "run.beam.cloud/worker-id"
	Beta9WorkerLabelPoolNameKey string = "run.beam.cloud/worker-pool-name"
	PrometheusPortKey           string = "prometheus.io/port"
	PrometheusScrapeKey         string = "prometheus.io/scrape"
)

Variables

View Source
var SchedulerConfig config = config{
	Version: "0.1.0",
}

Functions

func GenerateWorkerId

func GenerateWorkerId() string

func MonitorPoolHealth

func MonitorPoolHealth(opts PoolHealthMonitorOptions) error

func MonitorPoolSize

func MonitorPoolSize(wpc WorkerPoolController,
	workerPoolConfig *types.WorkerPoolConfig,
	workerRepo repository.WorkerRepository,
	workerPoolRepo repository.WorkerPoolRepository,
	providerRepo repository.ProviderRepository) error

func ParseCPU

func ParseCPU(cpu interface{}) (int64, error)

func ParseGPU

func ParseGPU(gpu interface{}) (uint, error)

func ParseGPUType

func ParseGPUType(gpu interface{}) (types.GpuType, error)

func ParseGpuCount

func ParseGpuCount(gpuCount interface{}) (int64, error)

func ParseMemory

func ParseMemory(memory interface{}) (int64, error)

Types

type ExternalWorkerPoolController

type ExternalWorkerPoolController struct {
	// contains filtered or unexported fields
}

func (*ExternalWorkerPoolController) AddWorker

func (wpc *ExternalWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)

func (*ExternalWorkerPoolController) AddWorkerToMachine

func (wpc *ExternalWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)

func (*ExternalWorkerPoolController) Context

func (*ExternalWorkerPoolController) FreeCapacity

func (wpc *ExternalWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)

func (*ExternalWorkerPoolController) IsPreemptable

func (wpc *ExternalWorkerPoolController) IsPreemptable() bool

func (*ExternalWorkerPoolController) Mode

func (*ExternalWorkerPoolController) Name

func (*ExternalWorkerPoolController) RequiresPoolSelector

func (wpc *ExternalWorkerPoolController) RequiresPoolSelector() bool

func (*ExternalWorkerPoolController) State

type LocalKubernetesWorkerPoolController

type LocalKubernetesWorkerPoolController struct {
	// contains filtered or unexported fields
}

A "local" k8s worker pool controller means the pool is local to the control plane / in-cluster

func (*LocalKubernetesWorkerPoolController) AddWorker

func (wpc *LocalKubernetesWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)

func (*LocalKubernetesWorkerPoolController) AddWorkerToMachine

func (wpc *LocalKubernetesWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)

func (*LocalKubernetesWorkerPoolController) Context

func (*LocalKubernetesWorkerPoolController) FreeCapacity

func (*LocalKubernetesWorkerPoolController) IsPreemptable

func (wpc *LocalKubernetesWorkerPoolController) IsPreemptable() bool

func (*LocalKubernetesWorkerPoolController) Mode

func (*LocalKubernetesWorkerPoolController) Name

func (*LocalKubernetesWorkerPoolController) RequiresPoolSelector

func (wpc *LocalKubernetesWorkerPoolController) RequiresPoolSelector() bool

func (*LocalKubernetesWorkerPoolController) State

type PoolHealthMonitor

type PoolHealthMonitor struct {
	// contains filtered or unexported fields
}

func NewPoolHealthMonitor

func NewPoolHealthMonitor(opts PoolHealthMonitorOptions) *PoolHealthMonitor

func (*PoolHealthMonitor) Start

func (p *PoolHealthMonitor) Start()

type PoolHealthMonitorOptions

type PoolHealthMonitorOptions struct {
	Controller       WorkerPoolController
	WorkerPoolConfig types.WorkerPoolConfig
	WorkerConfig     types.WorkerConfig
	WorkerRepo       repository.WorkerRepository
	WorkerPoolRepo   repository.WorkerPoolRepository
	ProviderRepo     repository.ProviderRepository
	ContainerRepo    repository.ContainerRepository
	EventRepo        repository.EventRepository
}

type RequestBacklog

type RequestBacklog struct {
	// contains filtered or unexported fields
}

func NewRequestBacklog

func NewRequestBacklog(rdb *common.RedisClient) *RequestBacklog

func (*RequestBacklog) Len

func (rb *RequestBacklog) Len() int64

Gets the length of the sorted set

func (*RequestBacklog) Pop

Pops the oldest container request from the sorted set

func (*RequestBacklog) Push

func (rb *RequestBacklog) Push(request *types.ContainerRequest) error

Pushes a new container request into the sorted set

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, usageRepo repo.UsageMetricsRepository, backendRepo repo.BackendRepository, workspaceRepo repo.WorkspaceRepository, tailscale *network.Tailscale) (*Scheduler, error)

func (*Scheduler) Run

func (s *Scheduler) Run(request *types.ContainerRequest) error

func (*Scheduler) StartProcessingRequests

func (s *Scheduler) StartProcessingRequests()

func (*Scheduler) Stop

func (s *Scheduler) Stop(stopArgs *types.StopContainerArgs) error

type SchedulerService

type SchedulerService struct {
	pb.UnimplementedSchedulerServer
	Scheduler *Scheduler
}

func NewSchedulerService

func NewSchedulerService(scheduler *Scheduler) (*SchedulerService, error)

func (*SchedulerService) GetVersion

func (wbs *SchedulerService) GetVersion(ctx context.Context, in *pb.VersionRequest) (*pb.VersionResponse, error)

Get Scheduler version

func (*SchedulerService) RunContainer

Run a container

type SchedulerUsageMetrics

type SchedulerUsageMetrics struct {
	UsageRepo repository.UsageMetricsRepository
}

func NewSchedulerUsageMetrics

func NewSchedulerUsageMetrics(usageMetricsRepo repository.UsageMetricsRepository) SchedulerUsageMetrics

func (*SchedulerUsageMetrics) CounterIncContainerRequested

func (sm *SchedulerUsageMetrics) CounterIncContainerRequested(request *types.ContainerRequest)

func (*SchedulerUsageMetrics) CounterIncContainerScheduled

func (sm *SchedulerUsageMetrics) CounterIncContainerScheduled(request *types.ContainerRequest)

type WorkerPool

type WorkerPool struct {
	Name       string
	Config     types.WorkerPoolConfig
	Controller WorkerPoolController
}

WorkerPool represents a pool of workers with specific configuration and controller.

type WorkerPoolCapacity

type WorkerPoolCapacity struct {
	FreeCpu       int64
	FreeMemory    int64
	FreeGpu       uint
	PendingCpu    int64
	PendingMemory int64
	PendingGpu    uint
}

type WorkerPoolConfig

type WorkerPoolConfig struct {
	DefaultWorkerCpuRequest    int64
	DefaultWorkerMemoryRequest int64
}

type WorkerPoolController

type WorkerPoolController interface {
	AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
	AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
	Name() string
	FreeCapacity() (*WorkerPoolCapacity, error)
	Context() context.Context
	IsPreemptable() bool
	State() (*types.WorkerPoolState, error)
	RequiresPoolSelector() bool
	Mode() types.PoolMode
}

func NewExternalWorkerPoolController

func NewExternalWorkerPoolController(opts WorkerPoolControllerOptions) (WorkerPoolController, error)

func NewLocalKubernetesWorkerPoolController

func NewLocalKubernetesWorkerPoolController(opts WorkerPoolControllerOptions) (WorkerPoolController, error)

type WorkerPoolControllerOptions

type WorkerPoolControllerOptions struct {
	Name           string
	Context        context.Context
	Config         types.AppConfig
	BackendRepo    repository.BackendRepository
	WorkerRepo     repository.WorkerRepository
	WorkerPoolRepo repository.WorkerPoolRepository
	ContainerRepo  repository.ContainerRepository
	ProviderName   *types.MachineProvider
	ProviderRepo   repository.ProviderRepository
	EventRepo      repository.EventRepository
	Tailscale      *network.Tailscale
}

type WorkerPoolManager

type WorkerPoolManager struct {
	// contains filtered or unexported fields
}

WorkerPoolManager manages a collection of WorkerPools using a thread-safe SafeMap. It provides additional functionality to filter and retrieve pools based on specific criteria, such as GPU type.

func NewWorkerPoolManager

func NewWorkerPoolManager(failoverEnabled bool) *WorkerPoolManager

NewWorkerPoolManager creates a new WorkerPoolManager.

func (*WorkerPoolManager) GetPool

func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool)

GetPool retrieves a WorkerPool by its name.

func (*WorkerPoolManager) GetPoolByFilters

func (m *WorkerPoolManager) GetPoolByFilters(filters poolFilters) []*WorkerPool

GetPoolByFilters retrieves all WorkerPools that match the specified filters. It returns a slice of WorkerPools that match all specified filters (GPU type and preemptibility), sorted by WorkerPoolConfig.Priority in descending order.

func (*WorkerPoolManager) GetPoolByGPU

func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool)

GetPoolByGPU retrieves a WorkerPool by its GPU type. It returns the first matching WorkerPool found that is healthy (if failover is enabled).

func (*WorkerPoolManager) SetPool

func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController)

SetPool adds or updates a WorkerPool in the manager.

type WorkerPoolSizer

type WorkerPoolSizer struct {
	// contains filtered or unexported fields
}

func NewWorkerPoolSizer

func NewWorkerPoolSizer(controller WorkerPoolController,
	workerPoolConfig *types.WorkerPoolConfig,
	workerRepo repository.WorkerRepository,
	workerPoolRepo repository.WorkerPoolRepository,
	providerRepo repository.ProviderRepository) (*WorkerPoolSizer, error)

func (*WorkerPoolSizer) Start

func (s *WorkerPoolSizer) Start()

type WorkerResourceCleaner

type WorkerResourceCleaner struct {
	PoolName   string
	Config     types.WorkerConfig
	KubeClient *kubernetes.Clientset
	EventRepo  repository.EventRepository
	WorkerRepo repository.WorkerRepository
}

func (*WorkerResourceCleaner) Clean

func (c *WorkerResourceCleaner) Clean(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL