Documentation
¶
Index ¶
- Constants
- func GetWorkerInfoFromHostPID(hostPID uint32) (*framework.ProcessMappingInfo, error)
- type APIClient
- func (a *APIClient) CreateOrUpdateGPU(gpuNodeName string, gpuID string, ...) error
- func (a *APIClient) DeleteGPU(uuid string) error
- func (a *APIClient) GetGPU(uuid string) (*tfv1.GPU, error)
- func (a *APIClient) UpdateGPUNodeStatus(nodeName string, nodeInfo *api.NodeInfo) error
- func (a *APIClient) UpdateGPUStatus(gpu *tfv1.GPU) error
- type DevicePlugin
- func (dp *DevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error)
- func (dp *DevicePlugin) GetDevicePluginOptions(ctx context.Context, req *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error)
- func (dp *DevicePlugin) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
- func (dp *DevicePlugin) ListAndWatch(req *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error
- func (dp *DevicePlugin) PreStartContainer(ctx context.Context, req *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error)
- func (dp *DevicePlugin) Start() error
- func (dp *DevicePlugin) Stop() error
- type GPUInfo
- type KubeletBackend
- func (b *KubeletBackend) GetDeviceChangeHandler() framework.DeviceChangeHandler
- func (b *KubeletBackend) GetProcessMappingInfo(hostPID uint32) (*framework.ProcessMappingInfo, error)
- func (b *KubeletBackend) ListWorkers() []*api.WorkerInfo
- func (b *KubeletBackend) RegisterWorkerUpdateHandler(handler framework.WorkerChangeHandler) error
- func (b *KubeletBackend) Start() error
- func (b *KubeletBackend) StartWorker(worker *api.WorkerInfo) error
- func (b *KubeletBackend) Stop() error
- func (b *KubeletBackend) StopWorker(workerUID string) error
- type PodCacheManager
- func (kc *PodCacheManager) GetAllPods() map[string]*corev1.Pod
- func (kc *PodCacheManager) GetPodByUID(podUID string) *corev1.Pod
- func (kc *PodCacheManager) GetWorkerInfoForAllocationByIndex(podIndex int) (*api.WorkerInfo, error)
- func (kc *PodCacheManager) RegisterWorkerInfoSubscriber(name string, subscriber chan<- *api.WorkerInfo)
- func (kc *PodCacheManager) Start() error
- func (kc *PodCacheManager) Stop()
- func (kc *PodCacheManager) UnregisterWorkerInfoSubscriber(name string)
Constants ¶
const ( // DevicePluginPath is the path where device plugins should register DevicePluginPath = "/var/lib/kubelet/device-plugins" // KubeletSocket is the kubelet registration socket KubeletSocket = "kubelet.sock" // DevicePluginEndpoint is the endpoint name for this device plugin DevicePluginEndpoint = "tensor-fusion-index-%d.sock" )
Variables ¶
This section is empty.
Functions ¶
func GetWorkerInfoFromHostPID ¶
func GetWorkerInfoFromHostPID(hostPID uint32) (*framework.ProcessMappingInfo, error)
GetWorkerInfoFromHostPID extracts worker information from a process's environment by reading /proc/{hostPID}/environ and /proc/{hostPID}/status Returns ProcessMappingInfo with Namespace, PodName, ContainerName for worker lookup
Types ¶
type APIClient ¶
type APIClient struct {
// contains filtered or unexported fields
}
APIClient provides CRUD operations for GPU resources
func NewAPIClient ¶
NewAPIClient creates a new API client instance with an existing client
func NewAPIClientFromConfig ¶
NewAPIClientFromConfig creates a new API client instance from a rest.Config
func (*APIClient) CreateOrUpdateGPU ¶
func (a *APIClient) CreateOrUpdateGPU( gpuNodeName string, gpuID string, mutateFn func(gpuNode *tfv1.GPUNode, gpu *tfv1.GPU) error, ) error
CreateOrUpdateGPU creates or updates a GPU resource with metadata and status
func (*APIClient) UpdateGPUNodeStatus ¶
UpdateGPUNodeStatus updates the status of a GPUNode resource
type DevicePlugin ¶
type DevicePlugin struct {
pluginapi.UnimplementedDevicePluginServer
// contains filtered or unexported fields
}
DevicePlugin implements the Kubernetes device plugin interface
func NewDevicePlugins ¶
func NewDevicePlugins(ctx context.Context, deviceController framework.DeviceController, allocationController framework.WorkerAllocationController, kubeletClient *PodCacheManager) []*DevicePlugin
NewDevicePlugins creates a new device plugin instance
func (*DevicePlugin) Allocate ¶
func (dp *DevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error)
Allocate handles device allocation requests from kubelet
func (*DevicePlugin) GetDevicePluginOptions ¶
func (dp *DevicePlugin) GetDevicePluginOptions(ctx context.Context, req *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error)
GetDevicePluginOptions returns options for the device plugin
func (*DevicePlugin) GetPreferredAllocation ¶
func (dp *DevicePlugin) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
GetPreferredAllocation returns preferred device allocation (optional)
func (*DevicePlugin) ListAndWatch ¶
func (dp *DevicePlugin) ListAndWatch(req *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error
ListAndWatch streams device list and health updates
func (*DevicePlugin) PreStartContainer ¶
func (dp *DevicePlugin) PreStartContainer(ctx context.Context, req *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error)
PreStartContainer is called before container start (optional)
func (*DevicePlugin) Start ¶
func (dp *DevicePlugin) Start() error
Start starts the device plugin gRPC server and registers with kubelet
type GPUInfo ¶
type GPUInfo struct {
UUID string
DeviceName string
VRAMBytes uint64
TFlops resource.Quantity
Index int32
NUMANodeID int32
NodeName string
Vendor string
IsolationMode tfv1.IsolationModeType
}
GPUInfo contains information needed to create or update a GPU
type KubeletBackend ¶
type KubeletBackend struct {
// contains filtered or unexported fields
}
func NewKubeletBackend ¶
func NewKubeletBackend(ctx context.Context, deviceController framework.DeviceController, allocationController framework.WorkerAllocationController, restConfig *rest.Config) (*KubeletBackend, error)
func (*KubeletBackend) GetDeviceChangeHandler ¶
func (b *KubeletBackend) GetDeviceChangeHandler() framework.DeviceChangeHandler
func (*KubeletBackend) GetProcessMappingInfo ¶
func (b *KubeletBackend) GetProcessMappingInfo(hostPID uint32) (*framework.ProcessMappingInfo, error)
func (*KubeletBackend) ListWorkers ¶
func (b *KubeletBackend) ListWorkers() []*api.WorkerInfo
func (*KubeletBackend) RegisterWorkerUpdateHandler ¶
func (b *KubeletBackend) RegisterWorkerUpdateHandler(handler framework.WorkerChangeHandler) error
RegisterWorkerUpdateHandler registers a handler for worker updates
func (*KubeletBackend) Start ¶
func (b *KubeletBackend) Start() error
func (*KubeletBackend) StartWorker ¶
func (b *KubeletBackend) StartWorker(worker *api.WorkerInfo) error
func (*KubeletBackend) Stop ¶
func (b *KubeletBackend) Stop() error
func (*KubeletBackend) StopWorker ¶
func (b *KubeletBackend) StopWorker(workerUID string) error
type PodCacheManager ¶
type PodCacheManager struct {
// contains filtered or unexported fields
}
PodCacheManager manages pod watching and worker information extraction
func NewPodCacheManager ¶
func NewPodCacheManager(ctx context.Context, restConfig *rest.Config, nodeName string) (*PodCacheManager, error)
NewPodCacheManager creates a new pod cache manager
func (*PodCacheManager) GetAllPods ¶
func (kc *PodCacheManager) GetAllPods() map[string]*corev1.Pod
GetAllPods returns all pods currently in the cache
func (*PodCacheManager) GetPodByUID ¶
func (kc *PodCacheManager) GetPodByUID(podUID string) *corev1.Pod
GetPodByUID retrieves a pod from the cache by its UID
func (*PodCacheManager) GetWorkerInfoForAllocationByIndex ¶
func (kc *PodCacheManager) GetWorkerInfoForAllocationByIndex(podIndex int) (*api.WorkerInfo, error)
GetWorkerInfoForAllocationByIndex finds a pod by its index annotation and extracts worker info It implements a Pub/Sub pattern where callers subscribe to worker info changes for a specific pod index. If worker info is already available, it returns immediately. Otherwise, it waits for up to 10 minutes for the worker info to become available.
func (*PodCacheManager) RegisterWorkerInfoSubscriber ¶
func (kc *PodCacheManager) RegisterWorkerInfoSubscriber(name string, subscriber chan<- *api.WorkerInfo)
func (*PodCacheManager) Start ¶
func (kc *PodCacheManager) Start() error
Start starts watching pods on this node
func (*PodCacheManager) UnregisterWorkerInfoSubscriber ¶
func (kc *PodCacheManager) UnregisterWorkerInfoSubscriber(name string)