Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateWorkerId() string
- func MonitorPoolHealth(opts PoolHealthMonitorOptions) error
- func MonitorPoolSize(wpc WorkerPoolController, workerPoolConfig *types.WorkerPoolConfig, ...) error
- func ParseCPU(cpu interface{}) (int64, error)
- func ParseGPU(gpu interface{}) (uint, error)
- func ParseGPUType(gpu interface{}) (types.GpuType, error)
- func ParseGpuCount(gpuCount interface{}) (int64, error)
- func ParseMemory(memory interface{}) (int64, error)
- type ExternalWorkerPoolController
- func (wpc *ExternalWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
- func (wpc *ExternalWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
- func (wpc *ExternalWorkerPoolController) Context() context.Context
- func (wpc *ExternalWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
- func (wpc *ExternalWorkerPoolController) IsPreemptable() bool
- func (wpc *ExternalWorkerPoolController) Mode() types.PoolMode
- func (wpc *ExternalWorkerPoolController) Name() string
- func (wpc *ExternalWorkerPoolController) RequiresPoolSelector() bool
- func (wpc *ExternalWorkerPoolController) State() (*types.WorkerPoolState, error)
- type LocalKubernetesWorkerPoolController
- func (wpc *LocalKubernetesWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
- func (wpc *LocalKubernetesWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
- func (wpc *LocalKubernetesWorkerPoolController) Context() context.Context
- func (wpc *LocalKubernetesWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
- func (wpc *LocalKubernetesWorkerPoolController) IsPreemptable() bool
- func (wpc *LocalKubernetesWorkerPoolController) Mode() types.PoolMode
- func (wpc *LocalKubernetesWorkerPoolController) Name() string
- func (wpc *LocalKubernetesWorkerPoolController) RequiresPoolSelector() bool
- func (wpc *LocalKubernetesWorkerPoolController) State() (*types.WorkerPoolState, error)
- type PoolHealthMonitor
- type PoolHealthMonitorOptions
- type RequestBacklog
- type Scheduler
- type SchedulerService
- type SchedulerUsageMetrics
- type WorkerPool
- type WorkerPoolCapacity
- type WorkerPoolConfig
- type WorkerPoolController
- type WorkerPoolControllerOptions
- type WorkerPoolManager
- func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool)
- func (m *WorkerPoolManager) GetPoolByFilters(filters poolFilters) []*WorkerPool
- func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool)
- func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController)
- type WorkerPoolSizer
- type WorkerResourceCleaner
Constants ¶
const ( Beta9WorkerLabelKey string = "run.beam.cloud/role" Beta9WorkerLabelValue string = "worker" Beta9WorkerJobPrefix string = "worker" Beta9MachineLabelIDKey string = "run.beam.cloud/machine-id" Beta9WorkerLabelIDKey string = "run.beam.cloud/worker-id" Beta9WorkerLabelPoolNameKey string = "run.beam.cloud/worker-pool-name" PrometheusPortKey string = "prometheus.io/port" PrometheusScrapeKey string = "prometheus.io/scrape" )
Variables ¶
var SchedulerConfig config = config{
Version: "0.1.0",
}
Functions ¶
func GenerateWorkerId ¶
func GenerateWorkerId() string
func MonitorPoolHealth ¶
func MonitorPoolHealth(opts PoolHealthMonitorOptions) error
func MonitorPoolSize ¶
func MonitorPoolSize(wpc WorkerPoolController, workerPoolConfig *types.WorkerPoolConfig, workerRepo repository.WorkerRepository, workerPoolRepo repository.WorkerPoolRepository, providerRepo repository.ProviderRepository) error
func ParseGPUType ¶
func ParseGpuCount ¶
func ParseMemory ¶
Types ¶
type ExternalWorkerPoolController ¶
type ExternalWorkerPoolController struct {
// contains filtered or unexported fields
}
func (*ExternalWorkerPoolController) AddWorkerToMachine ¶
func (*ExternalWorkerPoolController) Context ¶
func (wpc *ExternalWorkerPoolController) Context() context.Context
func (*ExternalWorkerPoolController) FreeCapacity ¶
func (wpc *ExternalWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
func (*ExternalWorkerPoolController) IsPreemptable ¶
func (wpc *ExternalWorkerPoolController) IsPreemptable() bool
func (*ExternalWorkerPoolController) Mode ¶
func (wpc *ExternalWorkerPoolController) Mode() types.PoolMode
func (*ExternalWorkerPoolController) Name ¶
func (wpc *ExternalWorkerPoolController) Name() string
func (*ExternalWorkerPoolController) RequiresPoolSelector ¶
func (wpc *ExternalWorkerPoolController) RequiresPoolSelector() bool
func (*ExternalWorkerPoolController) State ¶
func (wpc *ExternalWorkerPoolController) State() (*types.WorkerPoolState, error)
type LocalKubernetesWorkerPoolController ¶
type LocalKubernetesWorkerPoolController struct {
// contains filtered or unexported fields
}
A "local" k8s worker pool controller means the pool is local to the control plane / in-cluster
func (*LocalKubernetesWorkerPoolController) AddWorkerToMachine ¶
func (*LocalKubernetesWorkerPoolController) Context ¶
func (wpc *LocalKubernetesWorkerPoolController) Context() context.Context
func (*LocalKubernetesWorkerPoolController) FreeCapacity ¶
func (wpc *LocalKubernetesWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
func (*LocalKubernetesWorkerPoolController) IsPreemptable ¶
func (wpc *LocalKubernetesWorkerPoolController) IsPreemptable() bool
func (*LocalKubernetesWorkerPoolController) Mode ¶
func (wpc *LocalKubernetesWorkerPoolController) Mode() types.PoolMode
func (*LocalKubernetesWorkerPoolController) Name ¶
func (wpc *LocalKubernetesWorkerPoolController) Name() string
func (*LocalKubernetesWorkerPoolController) RequiresPoolSelector ¶
func (wpc *LocalKubernetesWorkerPoolController) RequiresPoolSelector() bool
func (*LocalKubernetesWorkerPoolController) State ¶
func (wpc *LocalKubernetesWorkerPoolController) State() (*types.WorkerPoolState, error)
type PoolHealthMonitor ¶
type PoolHealthMonitor struct {
// contains filtered or unexported fields
}
func NewPoolHealthMonitor ¶
func NewPoolHealthMonitor(opts PoolHealthMonitorOptions) *PoolHealthMonitor
func (*PoolHealthMonitor) Start ¶
func (p *PoolHealthMonitor) Start()
type PoolHealthMonitorOptions ¶
type PoolHealthMonitorOptions struct { Controller WorkerPoolController WorkerPoolConfig types.WorkerPoolConfig WorkerConfig types.WorkerConfig WorkerRepo repository.WorkerRepository WorkerPoolRepo repository.WorkerPoolRepository ProviderRepo repository.ProviderRepository ContainerRepo repository.ContainerRepository EventRepo repository.EventRepository }
type RequestBacklog ¶
type RequestBacklog struct {
// contains filtered or unexported fields
}
func NewRequestBacklog ¶
func NewRequestBacklog(rdb *common.RedisClient) *RequestBacklog
func (*RequestBacklog) Len ¶
func (rb *RequestBacklog) Len() int64
Gets the length of the sorted set
func (*RequestBacklog) Pop ¶
func (rb *RequestBacklog) Pop() (*types.ContainerRequest, error)
Pops the oldest container request from the sorted set
func (*RequestBacklog) Push ¶
func (rb *RequestBacklog) Push(request *types.ContainerRequest) error
Pushes a new container request into the sorted set
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, usageRepo repo.UsageMetricsRepository, backendRepo repo.BackendRepository, workspaceRepo repo.WorkspaceRepository, tailscale *network.Tailscale) (*Scheduler, error)
func (*Scheduler) StartProcessingRequests ¶
func (s *Scheduler) StartProcessingRequests()
type SchedulerService ¶
type SchedulerService struct { pb.UnimplementedSchedulerServer Scheduler *Scheduler }
func NewSchedulerService ¶
func NewSchedulerService(scheduler *Scheduler) (*SchedulerService, error)
func (*SchedulerService) GetVersion ¶
func (wbs *SchedulerService) GetVersion(ctx context.Context, in *pb.VersionRequest) (*pb.VersionResponse, error)
Get Scheduler version
func (*SchedulerService) RunContainer ¶
func (wbs *SchedulerService) RunContainer(ctx context.Context, in *pb.RunContainerRequest) (*pb.RunContainerResponse, error)
Run a container
type SchedulerUsageMetrics ¶
type SchedulerUsageMetrics struct {
UsageRepo repository.UsageMetricsRepository
}
func NewSchedulerUsageMetrics ¶
func NewSchedulerUsageMetrics(usageMetricsRepo repository.UsageMetricsRepository) SchedulerUsageMetrics
func (*SchedulerUsageMetrics) CounterIncContainerRequested ¶
func (sm *SchedulerUsageMetrics) CounterIncContainerRequested(request *types.ContainerRequest)
func (*SchedulerUsageMetrics) CounterIncContainerScheduled ¶
func (sm *SchedulerUsageMetrics) CounterIncContainerScheduled(request *types.ContainerRequest)
type WorkerPool ¶
type WorkerPool struct { Name string Config types.WorkerPoolConfig Controller WorkerPoolController }
WorkerPool represents a pool of workers with specific configuration and controller.
type WorkerPoolCapacity ¶
type WorkerPoolConfig ¶
type WorkerPoolController ¶
type WorkerPoolController interface { AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error) Name() string FreeCapacity() (*WorkerPoolCapacity, error) Context() context.Context IsPreemptable() bool State() (*types.WorkerPoolState, error) RequiresPoolSelector() bool Mode() types.PoolMode }
func NewExternalWorkerPoolController ¶
func NewExternalWorkerPoolController(opts WorkerPoolControllerOptions) (WorkerPoolController, error)
func NewLocalKubernetesWorkerPoolController ¶
func NewLocalKubernetesWorkerPoolController(opts WorkerPoolControllerOptions) (WorkerPoolController, error)
type WorkerPoolControllerOptions ¶
type WorkerPoolControllerOptions struct { Name string Context context.Context Config types.AppConfig BackendRepo repository.BackendRepository WorkerRepo repository.WorkerRepository WorkerPoolRepo repository.WorkerPoolRepository ContainerRepo repository.ContainerRepository ProviderName *types.MachineProvider ProviderRepo repository.ProviderRepository EventRepo repository.EventRepository Tailscale *network.Tailscale }
type WorkerPoolManager ¶
type WorkerPoolManager struct {
// contains filtered or unexported fields
}
WorkerPoolManager manages a collection of WorkerPools using a thread-safe SafeMap. It provides additional functionality to filter and retrieve pools based on specific criteria, such as GPU type.
func NewWorkerPoolManager ¶
func NewWorkerPoolManager(failoverEnabled bool) *WorkerPoolManager
NewWorkerPoolManager creates a new WorkerPoolManager.
func (*WorkerPoolManager) GetPool ¶
func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool)
GetPool retrieves a WorkerPool by its name.
func (*WorkerPoolManager) GetPoolByFilters ¶
func (m *WorkerPoolManager) GetPoolByFilters(filters poolFilters) []*WorkerPool
GetPoolByFilters retrieves all WorkerPools that match the specified filters. It returns a slice of WorkerPools that match all specified filters (GPU type and preemptibility), sorted by WorkerPoolConfig.Priority in descending order.
func (*WorkerPoolManager) GetPoolByGPU ¶
func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool)
GetPoolByGPU retrieves a WorkerPool by its GPU type. It returns the first matching WorkerPool found that is healthy (if failover is enabled).
func (*WorkerPoolManager) SetPool ¶
func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController)
SetPool adds or updates a WorkerPool in the manager.
type WorkerPoolSizer ¶
type WorkerPoolSizer struct {
// contains filtered or unexported fields
}
func NewWorkerPoolSizer ¶
func NewWorkerPoolSizer(controller WorkerPoolController, workerPoolConfig *types.WorkerPoolConfig, workerRepo repository.WorkerRepository, workerPoolRepo repository.WorkerPoolRepository, providerRepo repository.ProviderRepository) (*WorkerPoolSizer, error)
func (*WorkerPoolSizer) Start ¶
func (s *WorkerPoolSizer) Start()
type WorkerResourceCleaner ¶
type WorkerResourceCleaner struct { PoolName string Config types.WorkerConfig KubeClient *kubernetes.Clientset EventRepo repository.EventRepository WorkerRepo repository.WorkerRepository }
func (*WorkerResourceCleaner) Clean ¶
func (c *WorkerResourceCleaner) Clean(ctx context.Context)