cache

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: Apache-2.0 Imports: 32 Imported by: 0

README

Cache Package

The cache package provides centralized caching and routing capabilities for the AIBrix LLM inference system, including pod management, model routing, and KV cache event synchronization.

Overview

This package implements:

  • Pod and model metadata caching
  • Request routing and load tracking
  • KV cache event synchronization
  • GPU profile management
  • Output prediction for performance optimization

Core Components

Cache Store (cache.go)

The central cache that maintains:

  • Pod metadata and status
  • Model-to-pod mappings
  • Request tracking and metrics
  • GPU profiles and capabilities
KV Event Management
Event Manager (kv_event_manager.go)

Manages KV cache event synchronization between vLLM pods and the routing system:

// The event manager is created automatically when the cache is initialized
// It subscribes to eligible pods and processes their KV cache events

// Configuration via environment variables:
// AIBRIX_KV_EVENT_SYNC_ENABLED=true
// AIBRIX_USE_REMOTE_TOKENIZER=true

Features:

  • Automatic pod discovery and subscription
  • Lifecycle management (pod add/update/delete)
  • Integration with the prefix cache indexer
  • Support for multi-model deployments
Event Handler (kv_event_handler.go)

Processes incoming KV cache events:

  • Routes events to the appropriate indexer
  • Handles block stored/removed events
  • Maintains consistency across pod restarts
Request Tracking
Request Trace (trace.go)

Tracks individual requests through the system:

  • Request timing and latency
  • Token counts and model information
  • GPU utilization metrics
Output Predictor (output_predictor.go)

Predicts output token counts based on historical data:

  • Maintains sliding window of recent requests
  • Groups by input token buckets
  • Provides weighted predictions for capacity planning
Informers (informers.go)

Kubernetes informers for watching:

  • Pod lifecycle events
  • ModelAdapter resources
  • Configuration updates

KV Cache Event Flow

  1. Pod Discovery: Cache watches for pods with KV event labels
  2. Subscription: Event manager creates ZMQ client for eligible pods
  3. Event Processing: Incoming events update the prefix cache indexer
  4. Query Routing: Router queries indexer for prefix matches
  5. Request Dispatch: Requests routed to pods with matching prefixes

Configuration

Environment Variables

Core Cache:

  • AIBRIX_POD_DEPLOYMENT_LABEL: Label for deployment identification
  • AIBRIX_POD_RAYCLUSTERFLEET_LABEL: Label for fleet identification

KV Event Sync:

  • AIBRIX_KV_EVENT_SYNC_ENABLED: Enable KV event synchronization
  • AIBRIX_USE_REMOTE_TOKENIZER: Enable remote tokenizer (required for KV sync)

Performance:

  • AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: Metric refresh interval
  • AIBRIX_MODEL_GPU_PROFILE_CACHING_FLAG: Enable GPU profile caching

Usage Example

// Create cache with KV event sync enabled
os.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
os.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")

store := cache.NewStore()

// Add a pod - KV sync will start automatically if eligible
store.AddPod(pod)

// Query pods for a model
pods := store.GetPodsForModel("llama-2-7b")

// Track a request
store.AddRequest(pod.Name, model, inputTokens)
defer store.DoneRequest(pod.Name, model, outputTokens)

Integration Points

With Routing Algorithms

The cache provides data for routing decisions:

  • Pod availability and load
  • Prefix cache hit rates
  • GPU utilization metrics
With Gateway Plugins

Gateway plugins query the cache for:

  • Available pods per model
  • Current request counts
  • Performance predictions

Monitoring

Prometheus metrics are exposed for:

  • Cache hit/miss rates
  • Request queue depths
  • KV event processing rates
  • Pod synchronization status

Testing

Run tests:

# All cache tests
go test ./pkg/cache/

# KV event specific tests  
go test -v ./pkg/cache/ -run ".*KV.*"

# With ZMQ support
go test -tags="zmq" ./pkg/cache/

Thread Safety

All cache operations are thread-safe through:

  • Read-write mutexes for data structures
  • Atomic operations for counters
  • Channel-based event processing

Documentation

Overview

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	MovingInterval        = 10 * time.Second
	MaxOutputLen          = 4096 // TODO: override this value if profile is provided.
	DefaultColdPrediction = OptimisticColdPrediction
)
View Source
const (
	// The version of request trace, version history:
	// v1: No meta, default
	// v2: Added meta data include version(meta_v), bucket precision(meta_precision), and interval(meta_interval_sec) to notify client the trace interval.
	// v3: Added the number of total requests(meta_total_reqs) and pending requests(meta_pending_reqs) for uncompleted requests.
	// v4: Added the number of queueing requests (meta_queue_reqs) for unrouted requests.
	RequestTraceVersion = 4
	// Trace write interval
	RequestTraceWriteInterval = 10 * time.Second
	// Max tolerable write delay to write ticks.
	// For example for RequestTraceWriteInterval = 10s and MaxRequestTraceIntervalOffset = 500ms, the trace should be written before X:00.5s, X:10.5s, .., X:50.5s.
	MaxRequestTraceIntervalOffset = 500 * time.Millisecond
	// The precision of buckets in trace. 0.1 means requests will be split into buckets of .1 according to log2(tokens)
	RequestTracePrecision = 0.1
)
View Source
const (
	// When the engine's HTTP proxy is separated from the engine itself,
	// the request port and metrics port may differ, so a dedicated metrics port is required.
	MetricPortLabel = constants.ModelLabelMetricPort
)
View Source
const ModelGPUNameTemplate = "aibrix:profile_%s_%s"
View Source
const SignatureTolerance = 0.5

Assuming indexes are of equal distances, the signature_tolerance control how a value in the middle of two indexes should aligned to a certain index. While 0.5 indicate neutual preference. 0.25 set threshold closer to lower index and then prefer higher index.

Variables

View Source
var (
	ErrorTypeMetricNotFound = &CacheError{error: errors.New("metric not found")}
	ErrorMissingProfile     = &CacheError{error: errors.New("missing profile")}
)
View Source
var (
	ErrorNotSupport          = fmt.Errorf("not support")
	ErrorSLOFailureRequest   = fmt.Errorf("slo failure request")
	ErrorLoadCapacityReached = fmt.Errorf("load capacity reached")
)
View Source
var (
	ErrProfileNoThroughput = fmt.Errorf("profile has no throughput data")
	ErrProfileNoE2E        = fmt.Errorf("profile has no E2E latency data")
	ErrProfileNoTTFT       = fmt.Errorf("profile has no TTFT data")
	ErrProfileNoTPOT       = fmt.Errorf("profile has no TPOT data")
)
View Source
var NewRequestTrace = newRequestTraceGen(nil)

Functions

func InitWithAsyncPods added in v0.4.0

func InitWithAsyncPods(st *Store, pods []*v1.Pod, model string) <-chan *Store

InitWithAsyncPods initializes the cache store with pods initialized in an async way, this simulate the timeline of how store initializes

func IsError added in v0.4.0

func IsError(err error, errCategory error) bool

func ModelGPUProfileKey added in v0.4.0

func ModelGPUProfileKey(modelName string, deploymentName string) string

Types

type Cache

Cache is the root interface aggregating caching functionalities

func Get

func Get() (Cache, error)

Get retrieves the cache instance Returns:

Cache: Cache interface instance
error: Returns error if cache is not initialized

type CacheError added in v0.4.0

type CacheError struct {
	// contains filtered or unexported fields
}

func (*CacheError) ErrorType added in v0.4.0

func (e *CacheError) ErrorType() error

type CachedLoadProvider added in v0.4.0

type CachedLoadProvider struct {
	// contains filtered or unexported fields
}

CachedLoadProvider reads metrics from the cache

func NewCachedLoadProvider added in v0.4.0

func NewCachedLoadProvider(metricName string) (*CachedLoadProvider, error)

func (*CachedLoadProvider) Cache added in v0.4.0

func (p *CachedLoadProvider) Cache() Cache

func (*CachedLoadProvider) GetConsumption added in v0.4.0

func (p *CachedLoadProvider) GetConsumption(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)

func (*CachedLoadProvider) GetUtilization added in v0.4.0

func (p *CachedLoadProvider) GetUtilization(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)

type CappedLoadProvider added in v0.4.0

type CappedLoadProvider interface {
	LoadProvider

	// Cap returns the capacity of the pod in terms of the metrics
	Cap() float64
}

CappedLoadProvider provides an abstraction to get the capacity of specified metrics.

type ColdPredictionStrategy added in v0.4.0

type ColdPredictionStrategy int

ColdPredictionStrategy defines the strategy when there is no history for the predictor.

const (
	// OptimisticColdPrediction predicts the output to be minimum 1 to be profile friendly. (most profiles should best result if output length is minimum)
	OptimisticColdPrediction ColdPredictionStrategy = iota
	// RandomColdPredition randomly predicts the output between 0 and maxOutputTokens.
	RandomColdPredition
	// InputColdPrediction predicts the output to be the same as the input.
	InputColdPrediction
	// PessimiticColdPrediction predicts the output to be maximum maxOutputTokens.
	PessimiticColdPrediction
)

type Error added in v0.4.0

type Error interface {
	// ErrorType returns the type of the error.
	ErrorType() error
}

Error support error type detection and structured error info.

type InitOptions added in v0.4.0

type InitOptions struct {
	// EnableKVSync configures whether to start the ZMQ KV event sync
	EnableKVSync bool

	// RedisClient is required for KVSync and other features. Can be nil.
	RedisClient *redis.Client

	// ModelRouterProvider is needed only by the gateway. Can be nil.
	ModelRouterProvider ModelRouterProviderFunc
}

InitOptions configures the cache initialization behavior

type KVEventManager added in v0.4.0

type KVEventManager struct {
	// contains filtered or unexported fields
}

KVEventManager stub implementation when ZMQ is not available

func NewKVEventManager added in v0.4.0

func NewKVEventManager(store *Store) *KVEventManager

NewKVEventManager creates a stub KV event manager

func (*KVEventManager) OnPodAdd added in v0.4.0

func (m *KVEventManager) OnPodAdd(pod *v1.Pod)

OnPodAdd is a no-op

func (*KVEventManager) OnPodDelete added in v0.4.0

func (m *KVEventManager) OnPodDelete(pod *v1.Pod)

OnPodDelete is a no-op

func (*KVEventManager) OnPodUpdate added in v0.4.0

func (m *KVEventManager) OnPodUpdate(oldPod, newPod *v1.Pod)

OnPodUpdate is a no-op

func (*KVEventManager) Start added in v0.4.0

func (m *KVEventManager) Start() error

Start is a no-op

func (*KVEventManager) Stop added in v0.4.0

func (m *KVEventManager) Stop()

Stop is a no-op

type LoadProvider added in v0.4.0

type LoadProvider interface {
	// GetUtilization reads utilization of the pod in terms of the metrics
	GetUtilization(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)

	// GetConsumption reads load consumption of the request on specified pod)
	// If ErrorSLOFailureRequest is returned, the request is considered as failed to meet server capacity,
	// so SLO is expected to violate and can be early rejected.
	GetConsumption(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
}

LoadProvider provides an abstraction to get the utilizatin in terms of specified metrics

type MetricCache

type MetricCache interface {
	// GetMetricValueByPod gets metric value for a pod
	// Parameters:
	//   podName: Name of the pod
	//   podNamespace: Namespace of the pod
	//   metricName: Name of the metric
	// Returns:
	//   metrics.MetricValue: Retrieved metric value
	//   error: Error information if operation fails
	GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error)

	// GetMetricValueByPodModel gets metric value for pod-model pair
	// Parameters:
	//   ctx: Routing context
	//   podName: Name of the pod
	//   podNamespace: Namespace of the pod
	//   modelName: Name of the model
	//   metricName: Name of the metric
	// Returns:
	//   metrics.MetricValue: Retrieved metric value
	//   error: Error information if operation fails
	GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error)

	// AddSubscriber adds a metric subscriber
	// Parameters:
	//   subscriber: Metric subscriber implementation
	AddSubscriber(subscriber metrics.MetricSubscriber)
}

MetricCache defines operations for metric data caching

type MetricNotFoundError added in v0.4.0

type MetricNotFoundError struct {
	*CacheError
	PodName    string
	MetricName string
}

type MissingProfileError added in v0.4.0

type MissingProfileError struct {
	*CacheError
	ProfileKey string
}

type Model

type Model struct {
	// Pods is a CustomizedRegistry that stores *v1.Pod objects.
	// The internal map uses `namespace/name` as the key and `*v1.Pod` as the value.
	// This allows efficient lookups and caching of Pod objects by their unique identifier.
	// By outputing utils.PodArray on Array() call, the PodArray support efficient filtering by
	// GPU (by deployment-like names, see utils.DeploymentNameFromPod() in utils/pod.go).
	Pods *utils.CustomizedRegistry[*v1.Pod, *utils.PodArray]
	// OutputPredictor predicts the number of output tokens per request by history requests.
	// The prediction is essential to SLO-aware features.
	OutputPredictor types.OutputPredictor
	// QueueRouter maintains a local request queue, enabling flexible request reordering.
	QueueRouter types.QueueRouter
	// contains filtered or unexported fields
}

type ModelCache

type ModelCache interface {
	// HasModel checks existence of a model
	// Parameters:
	//   modelName: Name of the model
	// Returns:
	//   bool: True if model exists, false otherwise
	HasModel(modelName string) bool

	// ListModels gets all model names
	// Returns:
	//   []string: List of model names
	ListModels() []string

	// ListModelsByPod gets models associated with a pod
	// Parameters:
	//   podName: Name of the pod
	//   podNamespace: Namespace of the pod
	// Returns:
	//   map[string]struct{}: Set of model names
	//   error: Error information if operation fails
	ListModelsByPod(podName, podNamespace string) ([]string, error)
}

ModelCache defines operations for model information caching

type ModelGPUProfile added in v0.4.0

type ModelGPUProfile struct {
	Deployment string      `json:"gpu"`     // k8s deployment that specified model and GPU information.
	Cost       float64     `json:"cost"`    // Dollar cost of the unit time GPU computing.
	Tputs      [][]float64 `json:"tputs"`   // Max RPS per correspondent index.
	Indexes    [][]float64 `json:"indexes"` // [output tokens, input tokens]
	Created    float64     `json:"created"` // Profile generation timestamp in Unix format.
	E2E        [][]float64 `json:"e2e"`     // Mean E2E latency per correspondent RPS.
	TTFT       [][]float64 `json:"ttft"`    // Mean TTFT per correspondent RPS.
	TPOT       [][]float64 `json:"tpot"`    // Mean TPOT.
	SLOs       ModelSLOs   `json:"slos"`    // SLOs used for specified model and GPU.
}

func (*ModelGPUProfile) GetSignature added in v0.4.0

func (pf *ModelGPUProfile) GetSignature(features ...float64) ([]int, error)

func (*ModelGPUProfile) LatencySeconds added in v0.4.0

func (pf *ModelGPUProfile) LatencySeconds(signature ...int) (float64, error)

func (*ModelGPUProfile) TPOTSeconds added in v0.4.0

func (pf *ModelGPUProfile) TPOTSeconds(signature ...int) (float64, error)

func (*ModelGPUProfile) TTFTSeconds added in v0.4.0

func (pf *ModelGPUProfile) TTFTSeconds(signature ...int) (float64, error)

func (*ModelGPUProfile) ThroughputRPS added in v0.4.0

func (pf *ModelGPUProfile) ThroughputRPS(signature ...int) (float64, error)

func (*ModelGPUProfile) Unmarshal added in v0.4.0

func (pf *ModelGPUProfile) Unmarshal(data []byte) error

type ModelRouterProviderFunc added in v0.4.0

type ModelRouterProviderFunc func(modelName string) (types.QueueRouter, error)

ModelRouterProviderFunc defines the function to provider per-model router

type ModelSLOs added in v0.4.0

type ModelSLOs struct {
	Percentile int     `json:"percentile"` // Percentile applied to SLO metric(s).
	TPUT       float64 `json:"tput"`       // Request Throughput: RPS
	TT         float64 `json:"tt"`         // Token Throughput
	E2E        float64 `json:"e2e"`        // End-to-end latency
	TTFT       float64 `json:"ttft"`       // Time to first token
	TPAT       float64 `json:"tpat"`       // Time per all tokens (suggests normalized E2E latency for different workloads)
	TPOT       float64 `json:"tpot"`       // Time per output tokens
}

type PendingLoadProvider added in v0.4.0

type PendingLoadProvider struct {
	*CachedLoadProvider
}

PendingLoadProvider estimate server utilization in terms of pending load: PendingLoad = 1 / PendingRequests = 1 / (Throughput * Latency), where PendingRequests = Throughput * Latency follows Little's Law, and Throughput(RPS) and Latency are from loaded profile per feature (input tokens, output tokens)

func NewPendingLoadProvider added in v0.4.0

func NewPendingLoadProvider() (*PendingLoadProvider, error)

func (*PendingLoadProvider) Cap added in v0.4.0

func (p *PendingLoadProvider) Cap() float64

func (*PendingLoadProvider) GetConsumption added in v0.4.0

func (p *PendingLoadProvider) GetConsumption(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)

func (*PendingLoadProvider) GetUtilization added in v0.4.0

func (p *PendingLoadProvider) GetUtilization(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)

type Pod

type Pod struct {
	*v1.Pod
	Models       *utils.Registry[string]                    // Model/adapter names that the pod is running
	Metrics      utils.SyncMap[string, metrics.MetricValue] // Pod metrics (metric_name -> value)
	ModelMetrics utils.SyncMap[string, metrics.MetricValue] // Pod-model metrics (model_name/metric_name -> value)
	// contains filtered or unexported fields
}

func (*Pod) CanLogPodTrace added in v0.4.0

func (pod *Pod) CanLogPodTrace(level klog.Level) bool

type PodCache

type PodCache interface {
	// GetPod retrieves a Pod object by name
	// Parameters:
	//   podName: Name of the pod
	//   podNamespace: Namespace of the pod
	// Returns:
	//   *v1.Pod: Found pod object
	//   error: Error information if operation fails
	GetPod(podName, podNamespace string) (*v1.Pod, error)

	// ListPodsByModel gets pods associated with a model
	// Parameters:
	//   modelName: Name of the model
	// Returns:
	//   map[string]*v1.Pod: Pod objects matching the criteria
	//   error: Error information if operation fails
	ListPodsByModel(modelName string) (types.PodList, error)
}

PodCache defines operations for pod information caching

type ProfileCache added in v0.4.0

type ProfileCache interface {
	// GetModelProfileByPod gets model profile for a pod
	// Parameters:
	//   pod: Pod object
	//   modelName: Name of the model
	GetModelProfileByPod(pod *v1.Pod, modelName string) (*ModelGPUProfile, error)

	// GetModelProfileByDeploymentName gets model profile for a deployment
	// Parameters:
	//   deploymentName: Name of the deployment
	//   modelName: Name of the model
	GetModelProfileByDeploymentName(deploymentName string, modelName string) (*ModelGPUProfile, error)
}

ProfileCache defines operations for model profiles

type RequestTrace

type RequestTrace struct {
	// contains filtered or unexported fields
}

func (*RequestTrace) AddRequest

func (t *RequestTrace) AddRequest(requestID string, key string) (int64, bool)

Increase request counting and return the trace term, key is ignored for now.

func (*RequestTrace) AddRequestTrace

func (t *RequestTrace) AddRequestTrace(requestID string, inputTokens, outputTokens int64, key string) (string, bool)

Add request trace profile. key must be provided and will not be checked

func (*RequestTrace) DoneRequest

func (t *RequestTrace) DoneRequest(requestID string, term int64)

Decrease request counting with term verification, retrying is fultile.

func (*RequestTrace) DoneRequestTrace

func (t *RequestTrace) DoneRequestTrace(requestID string, inputTokens, outputTokens int64, key string, term int64) (string, bool)

Decrease request counting and add request trace profile.

func (*RequestTrace) Lock

func (t *RequestTrace) Lock()

func (*RequestTrace) Recycle

func (t *RequestTrace) Recycle()

func (*RequestTrace) RecycleLocked

func (t *RequestTrace) RecycleLocked()

func (*RequestTrace) ToMap

func (t *RequestTrace) ToMap(total_pending, queueing int) map[string]int

func (*RequestTrace) ToMapLocked

func (t *RequestTrace) ToMapLocked(total_pending, queueing int) map[string]int

func (*RequestTrace) Unlock

func (t *RequestTrace) Unlock()

type RequestTraceMetaKey

type RequestTraceMetaKey int
const (
	MetaKeyVersionKey RequestTraceMetaKey = iota
	MetaKeyIntervalInSeconds
	MetaKeyTracePrecision
	MetaKeyTotalRequests
	MetaKeyPendingRequests
	MetaKeyQueueingRequests
	RequestTraceNumMetaKeys // Guardian for the number of RequestTraceMetaKey. This is not a actual meta key.
)

func (RequestTraceMetaKey) ToString

func (key RequestTraceMetaKey) ToString() string

type RequestTracker

type RequestTracker interface {
	// AddRequestCount tracks the start of a request after routing.
	// To support realtime statistics update and access, AddRequestCount can be called multiple times for a request.
	// As the result, implementation should ensure thread-safe access to the counterm and idempotency.
	//
	// Parameters:
	//   ctx: Routing context
	//   requestID: Unique request identifier
	//   modelName: Name of the model
	// Returns:
	//   int64: Trace term identifier
	AddRequestCount(ctx *types.RoutingContext, requestID string, modelName string) (traceTerm int64)

	// DoneRequestCount tracks the completion of a request without usage information like inputTokens and outputTokens.
	// Only one DoneRequestXXX should be called for a request. Idemptency is not required.
	//
	// Parameters:
	//   requestID: Unique request identifier
	//   modelName: Name of the model
	//   traceTerm: Trace term identifier
	DoneRequestCount(ctx *types.RoutingContext, requestID string, modelName string, traceTerm int64)

	// DoneRequestTrace tracks the completion of a request with usage information like inputTokens and outputTokens.
	// Only one DoneRequestXXX should be called for a request. Idemptency is not required.
	//
	// Parameters:
	//   ctx: Routing context
	//   requestID: Unique request identifier
	//   modelName: Name of the model
	//   inputTokens: Number of input tokens
	//   outputTokens: Number of output tokens
	//   traceTerm: Trace term identifier
	DoneRequestTrace(ctx *types.RoutingContext, requestID string, modelName string, inputTokens, outputTokens, traceTerm int64)
}

RequestTracker defines operations for track workload statistics

type SimpleOutputPredictor added in v0.4.0

type SimpleOutputPredictor struct {
	// contains filtered or unexported fields
}

SimpleOutputPredictor collects moving histogram of output tokens of completed requests corresponding each input token buckets, and uses weighted random to predict output tokens for a specific request. Usage: 1. NewSimpleOutputPredictor() with max input and output estimation, specifying the window size. 2. AddTrace() to collect seen output tokens. Output tokens will be categorized in input token bucket of round(log2(input tokens)). 3. Call Predict() to get a prediction of number of output tokens by the number of input tokens.

func NewSimpleOutputPredictor added in v0.4.0

func NewSimpleOutputPredictor(maxInputTokens, maxOutputTokens int, window time.Duration) *SimpleOutputPredictor

func (*SimpleOutputPredictor) AddTrace added in v0.4.0

func (p *SimpleOutputPredictor) AddTrace(inputTokens, outputTokens int, cnt int32)

func (*SimpleOutputPredictor) AddTraceWithTimestamp added in v0.4.0

func (p *SimpleOutputPredictor) AddTraceWithTimestamp(inputTokens, outputTokens int, cnt int32, ts time.Time)

func (*SimpleOutputPredictor) Predict added in v0.4.0

func (p *SimpleOutputPredictor) Predict(inputTokens int) int

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store contains core data structures and components of the caching system

func InitForTest

func InitForTest() *Store

InitForTest initialize the global store object for testing.

func InitWithModelRouterProvider added in v0.4.0

func InitWithModelRouterProvider(st *Store, modelRouterProvider ModelRouterProviderFunc) *Store

InitModelRouterProvider initializes the cache store with model router provider for testing purposes, it can be repeated call for reset. Call this function before InitWithPods for expected behavior.

func InitWithOptions added in v0.4.0

func InitWithOptions(config *rest.Config, stopCh <-chan struct{}, opts InitOptions) *Store

InitWithOptions initializes the cache store with configurable behavior Parameters:

config: Kubernetes configuration
stopCh: Stop signal channel
opts: Configuration options for initialization

Returns:

*Store: Pointer to initialized store instance

func InitWithPods added in v0.4.0

func InitWithPods(st *Store, pods []*v1.Pod, model string) *Store

InitWithPods initializes the cache store with pods for testing purposes, it can be repeated call for reset.

func InitWithPodsMetrics added in v0.4.0

func InitWithPodsMetrics(st *Store, podMetrics map[string]map[string]metrics.MetricValue) *Store

InitWithPods initializes the cache store with pods metrics for testing purposes, it can be repeated call for reset.

func InitWithProfileCache added in v0.4.0

func InitWithProfileCache(st *Store) *Store

InitWithRequestTrace initializes the cache store with request trace.

func InitWithRequestTrace added in v0.4.0

func InitWithRequestTrace(st *Store) *Store

InitWithRequestTrace initializes the cache store with request trace.

func New

func New(redisClient *redis.Client, prometheusApi prometheusv1.API, modelRouterProvider ModelRouterProviderFunc) *Store

New creates a new cache store instance Parameters:

redisClient: Redis client instance
prometheusApi: Prometheus API client

Returns:

Store: Initialized cache store instance

func NewForTest added in v0.4.0

func NewForTest() *Store

NewForTest initializes the cache store for testing purposes, it can be repeated call for reset.

func NewWithPodsForTest added in v0.4.0

func NewWithPodsForTest(pods []*v1.Pod, model string) *Store

func NewWithPodsMetricsForTest added in v0.4.0

func NewWithPodsMetricsForTest(pods []*v1.Pod, model string, podMetrics map[string]map[string]metrics.MetricValue) *Store

func (*Store) AddRequestCount

func (c *Store) AddRequestCount(ctx *types.RoutingContext, requestID string, modelName string) (traceTerm int64)

AddRequestCount tracks new request initiation. If ctx is provided, AddRequestCount can be called multiple times for the same request.

Parameters:

ctx: Routing context
requestID: Unique request identifier
modelName: Model handling the request

Returns:

int64: Trace term identifier

func (*Store) AddSubscriber

func (c *Store) AddSubscriber(subscriber metrics.MetricSubscriber)

AddSubscriber registers new metric subscriber Parameters:

subscriber: Metric subscriber implementation

func (*Store) Close added in v0.4.0

func (s *Store) Close()

Close gracefully shuts down the cache store

func (*Store) DoneRequestCount

func (c *Store) DoneRequestCount(ctx *types.RoutingContext, requestID string, modelName string, traceTerm int64)

DoneRequestCount completes request tracking Parameters:

 ctx: Routing context
	requestID: Unique request identifier
	modelName: Model handling the request
	traceTerm: Trace term identifier

func (*Store) DoneRequestTrace

func (c *Store) DoneRequestTrace(ctx *types.RoutingContext, requestID string, modelName string, inputTokens, outputTokens, traceTerm int64)

DoneRequestTrace completes request tracing Parameters:

ctx: Routing context
requestID: Unique request identifier
modelName: Model handling the request
inputTokens: Input tokens count
outputTokens: Output tokens count
traceTerm: Trace term identifier

func (*Store) DumpRequestTrace added in v0.4.0

func (c *Store) DumpRequestTrace(modelName string) map[string]int

func (*Store) GetMetricValueByPod

func (c *Store) GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error)

GetMetricValueByPod retrieves metric value for a Pod Parameters:

podName: Name of the Pod
podNamespace: Namespace of the Pod
metricName: Name of the metric

Returns:

metrics.MetricValue: The metric value
error: Error if Pod or metric doesn't exist

func (*Store) GetMetricValueByPodModel

func (c *Store) GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error)

GetMetricValueByPodModel retrieves metric value for Pod-Model combination Parameters:

podName: Name of the Pod
podNamespace: Namespace of the Pod
modelName: Name of the model
metricName: Name of the metric

Returns:

metrics.MetricValue: The metric value
error: Error if Pod, model or metric doesn't exist

func (*Store) GetModelProfileByDeploymentName added in v0.4.0

func (c *Store) GetModelProfileByDeploymentName(deploymentName string, modelName string) (*ModelGPUProfile, error)

func (*Store) GetModelProfileByPod added in v0.4.0

func (c *Store) GetModelProfileByPod(pod *v1.Pod, modelName string) (*ModelGPUProfile, error)

func (*Store) GetOutputPredictor added in v0.4.0

func (c *Store) GetOutputPredictor(modelName string) (types.OutputPredictor, error)

func (*Store) GetPod

func (c *Store) GetPod(podName, podNamespace string) (*v1.Pod, error)

GetPod retrieves a Pod object by name from the cache Parameters:

podName: Name of the pod to retrieve
podNamespace: Namespace of the pod to retrieve

Returns:

*v1.Pod: The found Pod object
error: Error if pod doesn't exist

func (*Store) GetRouter added in v0.4.0

func (c *Store) GetRouter(ctx *types.RoutingContext) (types.Router, error)

func (*Store) GetSyncPrefixIndexer added in v0.4.0

func (s *Store) GetSyncPrefixIndexer() *syncindexer.SyncPrefixHashTable

GetSyncPrefixIndexer returns the sync prefix hash indexer

func (*Store) HasModel

func (c *Store) HasModel(modelName string) bool

HasModel checks if a model exists in the cache Parameters:

modelName: Name of the model to check

Returns:

bool: True if model exists

func (*Store) ListModels

func (c *Store) ListModels() []string

ListModels returns all cached model names Returns:

[]string: Slice of model names

func (*Store) ListModelsByPod

func (c *Store) ListModelsByPod(podName, podNamespace string) ([]string, error)

ListModelsByPod gets models associated with a specific Pod Parameters:

podName: Name of the Pod to query
podNamespace: Namespace of the Pod to query

Returns:

[]string: Slice of model names
error: Error if Pod doesn't exist

func (*Store) ListPods

func (c *Store) ListPods() []*v1.Pod

ListPods returns all cached Pod objects Do not call this directly, for debug purpose and less efficient. Returns:

[]*v1.Pod: Slice of Pod objects

func (*Store) ListPodsByModel

func (c *Store) ListPodsByModel(modelName string) (types.PodList, error)

ListPodsByModel gets Pods associated with a specific model Parameters:

modelName: Name of the model to query

Returns:

types.PodList: PodArray wrapper for a slice of Pod objects
error: Error if model doesn't exist

func (*Store) UpdateModelProfile added in v0.4.0

func (c *Store) UpdateModelProfile(key string, profile *ModelGPUProfile, force bool)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL