Documentation
¶
Overview ¶
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func InitWithAsyncPods(st *Store, pods []*v1.Pod, model string) <-chan *Store
- func IsError(err error, errCategory error) bool
- func ModelGPUProfileKey(modelName string, deploymentName string) string
- type Cache
- type CacheError
- type CachedLoadProvider
- type CappedLoadProvider
- type ColdPredictionStrategy
- type Error
- type InitOptions
- type KVEventManager
- type LoadProvider
- type MetricCache
- type MetricNotFoundError
- type MissingProfileError
- type Model
- type ModelCache
- type ModelGPUProfile
- func (pf *ModelGPUProfile) GetSignature(features ...float64) ([]int, error)
- func (pf *ModelGPUProfile) LatencySeconds(signature ...int) (float64, error)
- func (pf *ModelGPUProfile) TPOTSeconds(signature ...int) (float64, error)
- func (pf *ModelGPUProfile) TTFTSeconds(signature ...int) (float64, error)
- func (pf *ModelGPUProfile) ThroughputRPS(signature ...int) (float64, error)
- func (pf *ModelGPUProfile) Unmarshal(data []byte) error
- type ModelRouterProviderFunc
- type ModelSLOs
- type PendingLoadProvider
- type Pod
- type PodCache
- type ProfileCache
- type RequestTrace
- func (t *RequestTrace) AddRequest(requestID string, key string) (int64, bool)
- func (t *RequestTrace) AddRequestTrace(requestID string, inputTokens, outputTokens int64, key string) (string, bool)
- func (t *RequestTrace) DoneRequest(requestID string, term int64)
- func (t *RequestTrace) DoneRequestTrace(requestID string, inputTokens, outputTokens int64, key string, term int64) (string, bool)
- func (t *RequestTrace) Lock()
- func (t *RequestTrace) Recycle()
- func (t *RequestTrace) RecycleLocked()
- func (t *RequestTrace) ToMap(total_pending, queueing int) map[string]int
- func (t *RequestTrace) ToMapLocked(total_pending, queueing int) map[string]int
- func (t *RequestTrace) Unlock()
- type RequestTraceMetaKey
- type RequestTracker
- type SimpleOutputPredictor
- type Store
- func InitForTest() *Store
- func InitWithModelRouterProvider(st *Store, modelRouterProvider ModelRouterProviderFunc) *Store
- func InitWithOptions(config *rest.Config, stopCh <-chan struct{}, opts InitOptions) *Store
- func InitWithPods(st *Store, pods []*v1.Pod, model string) *Store
- func InitWithPodsMetrics(st *Store, podMetrics map[string]map[string]metrics.MetricValue) *Store
- func InitWithProfileCache(st *Store) *Store
- func InitWithRequestTrace(st *Store) *Store
- func New(redisClient *redis.Client, prometheusApi prometheusv1.API, ...) *Store
- func NewForTest() *Store
- func NewWithPodsForTest(pods []*v1.Pod, model string) *Store
- func NewWithPodsMetricsForTest(pods []*v1.Pod, model string, ...) *Store
- func (c *Store) AddRequestCount(ctx *types.RoutingContext, requestID string, modelName string) (traceTerm int64)
- func (c *Store) AddSubscriber(subscriber metrics.MetricSubscriber)
- func (s *Store) Close()
- func (c *Store) DoneRequestCount(ctx *types.RoutingContext, requestID string, modelName string, traceTerm int64)
- func (c *Store) DoneRequestTrace(ctx *types.RoutingContext, requestID string, modelName string, ...)
- func (c *Store) DumpRequestTrace(modelName string) map[string]int
- func (c *Store) GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error)
- func (c *Store) GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error)
- func (c *Store) GetModelProfileByDeploymentName(deploymentName string, modelName string) (*ModelGPUProfile, error)
- func (c *Store) GetModelProfileByPod(pod *v1.Pod, modelName string) (*ModelGPUProfile, error)
- func (c *Store) GetOutputPredictor(modelName string) (types.OutputPredictor, error)
- func (c *Store) GetPod(podName, podNamespace string) (*v1.Pod, error)
- func (c *Store) GetRouter(ctx *types.RoutingContext) (types.Router, error)
- func (s *Store) GetSyncPrefixIndexer() *syncindexer.SyncPrefixHashTable
- func (c *Store) HasModel(modelName string) bool
- func (c *Store) ListModels() []string
- func (c *Store) ListModelsByPod(podName, podNamespace string) ([]string, error)
- func (c *Store) ListPods() []*v1.Pod
- func (c *Store) ListPodsByModel(modelName string) (types.PodList, error)
- func (c *Store) UpdateModelProfile(key string, profile *ModelGPUProfile, force bool)
Constants ¶
const ( MovingInterval = 10 * time.Second MaxOutputLen = 4096 // TODO: override this value if profile is provided. DefaultColdPrediction = OptimisticColdPrediction )
const ( // The version of request trace, version history: // v1: No meta, default // v2: Added meta data include version(meta_v), bucket precision(meta_precision), and interval(meta_interval_sec) to notify client the trace interval. // v3: Added the number of total requests(meta_total_reqs) and pending requests(meta_pending_reqs) for uncompleted requests. // v4: Added the number of queueing requests (meta_queue_reqs) for unrouted requests. RequestTraceVersion = 4 // Trace write interval RequestTraceWriteInterval = 10 * time.Second // Max tolerable write delay to write ticks. // For example for RequestTraceWriteInterval = 10s and MaxRequestTraceIntervalOffset = 500ms, the trace should be written before X:00.5s, X:10.5s, .., X:50.5s. MaxRequestTraceIntervalOffset = 500 * time.Millisecond // The precision of buckets in trace. 0.1 means requests will be split into buckets of .1 according to log2(tokens) RequestTracePrecision = 0.1 )
const ( // When the engine's HTTP proxy is separated from the engine itself, // the request port and metrics port may differ, so a dedicated metrics port is required. MetricPortLabel = constants.ModelLabelMetricPort )
const ModelGPUNameTemplate = "aibrix:profile_%s_%s"
const SignatureTolerance = 0.5
Assuming indexes are of equal distances, the signature_tolerance control how a value in the middle of two indexes should aligned to a certain index. While 0.5 indicate neutual preference. 0.25 set threshold closer to lower index and then prefer higher index.
Variables ¶
var ( ErrorTypeMetricNotFound = &CacheError{error: errors.New("metric not found")} ErrorMissingProfile = &CacheError{error: errors.New("missing profile")} )
var ( ErrorNotSupport = fmt.Errorf("not support") ErrorSLOFailureRequest = fmt.Errorf("slo failure request") ErrorLoadCapacityReached = fmt.Errorf("load capacity reached") )
var ( ErrProfileNoThroughput = fmt.Errorf("profile has no throughput data") ErrProfileNoE2E = fmt.Errorf("profile has no E2E latency data") ErrProfileNoTTFT = fmt.Errorf("profile has no TTFT data") ErrProfileNoTPOT = fmt.Errorf("profile has no TPOT data") )
var NewRequestTrace = newRequestTraceGen(nil)
Functions ¶
func InitWithAsyncPods ¶ added in v0.4.0
InitWithAsyncPods initializes the cache store with pods initialized in an async way, this simulate the timeline of how store initializes
func ModelGPUProfileKey ¶ added in v0.4.0
Types ¶
type Cache ¶
type Cache interface {
PodCache
ModelCache
MetricCache
RequestTracker
ProfileCache
types.OutputPredictorProvider
types.RouterProvider
}
Cache is the root interface aggregating caching functionalities
type CacheError ¶ added in v0.4.0
type CacheError struct {
// contains filtered or unexported fields
}
func (*CacheError) ErrorType ¶ added in v0.4.0
func (e *CacheError) ErrorType() error
type CachedLoadProvider ¶ added in v0.4.0
type CachedLoadProvider struct {
// contains filtered or unexported fields
}
CachedLoadProvider reads metrics from the cache
func NewCachedLoadProvider ¶ added in v0.4.0
func NewCachedLoadProvider(metricName string) (*CachedLoadProvider, error)
func (*CachedLoadProvider) Cache ¶ added in v0.4.0
func (p *CachedLoadProvider) Cache() Cache
func (*CachedLoadProvider) GetConsumption ¶ added in v0.4.0
func (p *CachedLoadProvider) GetConsumption(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
func (*CachedLoadProvider) GetUtilization ¶ added in v0.4.0
func (p *CachedLoadProvider) GetUtilization(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
type CappedLoadProvider ¶ added in v0.4.0
type CappedLoadProvider interface {
LoadProvider
// Cap returns the capacity of the pod in terms of the metrics
Cap() float64
}
CappedLoadProvider provides an abstraction to get the capacity of specified metrics.
type ColdPredictionStrategy ¶ added in v0.4.0
type ColdPredictionStrategy int
ColdPredictionStrategy defines the strategy when there is no history for the predictor.
const ( // OptimisticColdPrediction predicts the output to be minimum 1 to be profile friendly. (most profiles should best result if output length is minimum) OptimisticColdPrediction ColdPredictionStrategy = iota // RandomColdPredition randomly predicts the output between 0 and maxOutputTokens. RandomColdPredition // InputColdPrediction predicts the output to be the same as the input. InputColdPrediction // PessimiticColdPrediction predicts the output to be maximum maxOutputTokens. PessimiticColdPrediction )
type Error ¶ added in v0.4.0
type Error interface {
// ErrorType returns the type of the error.
ErrorType() error
}
Error support error type detection and structured error info.
type InitOptions ¶ added in v0.4.0
type InitOptions struct {
// EnableKVSync configures whether to start the ZMQ KV event sync
EnableKVSync bool
// RedisClient is required for KVSync and other features. Can be nil.
RedisClient *redis.Client
// ModelRouterProvider is needed only by the gateway. Can be nil.
ModelRouterProvider ModelRouterProviderFunc
}
InitOptions configures the cache initialization behavior
type KVEventManager ¶ added in v0.4.0
type KVEventManager struct {
// contains filtered or unexported fields
}
KVEventManager stub implementation when ZMQ is not available
func NewKVEventManager ¶ added in v0.4.0
func NewKVEventManager(store *Store) *KVEventManager
NewKVEventManager creates a stub KV event manager
func (*KVEventManager) OnPodAdd ¶ added in v0.4.0
func (m *KVEventManager) OnPodAdd(pod *v1.Pod)
OnPodAdd is a no-op
func (*KVEventManager) OnPodDelete ¶ added in v0.4.0
func (m *KVEventManager) OnPodDelete(pod *v1.Pod)
OnPodDelete is a no-op
func (*KVEventManager) OnPodUpdate ¶ added in v0.4.0
func (m *KVEventManager) OnPodUpdate(oldPod, newPod *v1.Pod)
OnPodUpdate is a no-op
func (*KVEventManager) Start ¶ added in v0.4.0
func (m *KVEventManager) Start() error
Start is a no-op
type LoadProvider ¶ added in v0.4.0
type LoadProvider interface {
// GetUtilization reads utilization of the pod in terms of the metrics
GetUtilization(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
// GetConsumption reads load consumption of the request on specified pod)
// If ErrorSLOFailureRequest is returned, the request is considered as failed to meet server capacity,
// so SLO is expected to violate and can be early rejected.
GetConsumption(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
}
LoadProvider provides an abstraction to get the utilizatin in terms of specified metrics
type MetricCache ¶
type MetricCache interface {
// GetMetricValueByPod gets metric value for a pod
// Parameters:
// podName: Name of the pod
// podNamespace: Namespace of the pod
// metricName: Name of the metric
// Returns:
// metrics.MetricValue: Retrieved metric value
// error: Error information if operation fails
GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error)
// GetMetricValueByPodModel gets metric value for pod-model pair
// Parameters:
// ctx: Routing context
// podName: Name of the pod
// podNamespace: Namespace of the pod
// modelName: Name of the model
// metricName: Name of the metric
// Returns:
// metrics.MetricValue: Retrieved metric value
// error: Error information if operation fails
GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error)
// AddSubscriber adds a metric subscriber
// Parameters:
// subscriber: Metric subscriber implementation
AddSubscriber(subscriber metrics.MetricSubscriber)
}
MetricCache defines operations for metric data caching
type MetricNotFoundError ¶ added in v0.4.0
type MetricNotFoundError struct {
*CacheError
PodName string
MetricName string
}
type MissingProfileError ¶ added in v0.4.0
type MissingProfileError struct {
*CacheError
ProfileKey string
}
type Model ¶
type Model struct {
// Pods is a CustomizedRegistry that stores *v1.Pod objects.
// The internal map uses `namespace/name` as the key and `*v1.Pod` as the value.
// This allows efficient lookups and caching of Pod objects by their unique identifier.
// By outputing utils.PodArray on Array() call, the PodArray support efficient filtering by
// GPU (by deployment-like names, see utils.DeploymentNameFromPod() in utils/pod.go).
Pods *utils.CustomizedRegistry[*v1.Pod, *utils.PodArray]
// OutputPredictor predicts the number of output tokens per request by history requests.
// The prediction is essential to SLO-aware features.
OutputPredictor types.OutputPredictor
// QueueRouter maintains a local request queue, enabling flexible request reordering.
QueueRouter types.QueueRouter
// contains filtered or unexported fields
}
type ModelCache ¶
type ModelCache interface {
// HasModel checks existence of a model
// Parameters:
// modelName: Name of the model
// Returns:
// bool: True if model exists, false otherwise
HasModel(modelName string) bool
// ListModels gets all model names
// Returns:
// []string: List of model names
ListModels() []string
// ListModelsByPod gets models associated with a pod
// Parameters:
// podName: Name of the pod
// podNamespace: Namespace of the pod
// Returns:
// map[string]struct{}: Set of model names
// error: Error information if operation fails
ListModelsByPod(podName, podNamespace string) ([]string, error)
}
ModelCache defines operations for model information caching
type ModelGPUProfile ¶ added in v0.4.0
type ModelGPUProfile struct {
Deployment string `json:"gpu"` // k8s deployment that specified model and GPU information.
Cost float64 `json:"cost"` // Dollar cost of the unit time GPU computing.
Tputs [][]float64 `json:"tputs"` // Max RPS per correspondent index.
Indexes [][]float64 `json:"indexes"` // [output tokens, input tokens]
Created float64 `json:"created"` // Profile generation timestamp in Unix format.
E2E [][]float64 `json:"e2e"` // Mean E2E latency per correspondent RPS.
TTFT [][]float64 `json:"ttft"` // Mean TTFT per correspondent RPS.
TPOT [][]float64 `json:"tpot"` // Mean TPOT.
SLOs ModelSLOs `json:"slos"` // SLOs used for specified model and GPU.
}
func (*ModelGPUProfile) GetSignature ¶ added in v0.4.0
func (pf *ModelGPUProfile) GetSignature(features ...float64) ([]int, error)
func (*ModelGPUProfile) LatencySeconds ¶ added in v0.4.0
func (pf *ModelGPUProfile) LatencySeconds(signature ...int) (float64, error)
func (*ModelGPUProfile) TPOTSeconds ¶ added in v0.4.0
func (pf *ModelGPUProfile) TPOTSeconds(signature ...int) (float64, error)
func (*ModelGPUProfile) TTFTSeconds ¶ added in v0.4.0
func (pf *ModelGPUProfile) TTFTSeconds(signature ...int) (float64, error)
func (*ModelGPUProfile) ThroughputRPS ¶ added in v0.4.0
func (pf *ModelGPUProfile) ThroughputRPS(signature ...int) (float64, error)
func (*ModelGPUProfile) Unmarshal ¶ added in v0.4.0
func (pf *ModelGPUProfile) Unmarshal(data []byte) error
type ModelRouterProviderFunc ¶ added in v0.4.0
type ModelRouterProviderFunc func(modelName string) (types.QueueRouter, error)
ModelRouterProviderFunc defines the function to provider per-model router
type ModelSLOs ¶ added in v0.4.0
type ModelSLOs struct {
Percentile int `json:"percentile"` // Percentile applied to SLO metric(s).
TPUT float64 `json:"tput"` // Request Throughput: RPS
TT float64 `json:"tt"` // Token Throughput
E2E float64 `json:"e2e"` // End-to-end latency
TTFT float64 `json:"ttft"` // Time to first token
TPAT float64 `json:"tpat"` // Time per all tokens (suggests normalized E2E latency for different workloads)
TPOT float64 `json:"tpot"` // Time per output tokens
}
type PendingLoadProvider ¶ added in v0.4.0
type PendingLoadProvider struct {
*CachedLoadProvider
}
PendingLoadProvider estimate server utilization in terms of pending load: PendingLoad = 1 / PendingRequests = 1 / (Throughput * Latency), where PendingRequests = Throughput * Latency follows Little's Law, and Throughput(RPS) and Latency are from loaded profile per feature (input tokens, output tokens)
func NewPendingLoadProvider ¶ added in v0.4.0
func NewPendingLoadProvider() (*PendingLoadProvider, error)
func (*PendingLoadProvider) Cap ¶ added in v0.4.0
func (p *PendingLoadProvider) Cap() float64
func (*PendingLoadProvider) GetConsumption ¶ added in v0.4.0
func (p *PendingLoadProvider) GetConsumption(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
func (*PendingLoadProvider) GetUtilization ¶ added in v0.4.0
func (p *PendingLoadProvider) GetUtilization(ctx *types.RoutingContext, pod *v1.Pod) (float64, error)
type Pod ¶
type Pod struct {
*v1.Pod
Models *utils.Registry[string] // Model/adapter names that the pod is running
Metrics utils.SyncMap[string, metrics.MetricValue] // Pod metrics (metric_name -> value)
ModelMetrics utils.SyncMap[string, metrics.MetricValue] // Pod-model metrics (model_name/metric_name -> value)
// contains filtered or unexported fields
}
func (*Pod) CanLogPodTrace ¶ added in v0.4.0
type PodCache ¶
type PodCache interface {
// GetPod retrieves a Pod object by name
// Parameters:
// podName: Name of the pod
// podNamespace: Namespace of the pod
// Returns:
// *v1.Pod: Found pod object
// error: Error information if operation fails
GetPod(podName, podNamespace string) (*v1.Pod, error)
// ListPodsByModel gets pods associated with a model
// Parameters:
// modelName: Name of the model
// Returns:
// map[string]*v1.Pod: Pod objects matching the criteria
// error: Error information if operation fails
ListPodsByModel(modelName string) (types.PodList, error)
}
PodCache defines operations for pod information caching
type ProfileCache ¶ added in v0.4.0
type ProfileCache interface {
// GetModelProfileByPod gets model profile for a pod
// Parameters:
// pod: Pod object
// modelName: Name of the model
GetModelProfileByPod(pod *v1.Pod, modelName string) (*ModelGPUProfile, error)
// GetModelProfileByDeploymentName gets model profile for a deployment
// Parameters:
// deploymentName: Name of the deployment
// modelName: Name of the model
GetModelProfileByDeploymentName(deploymentName string, modelName string) (*ModelGPUProfile, error)
}
ProfileCache defines operations for model profiles
type RequestTrace ¶
type RequestTrace struct {
// contains filtered or unexported fields
}
func (*RequestTrace) AddRequest ¶
func (t *RequestTrace) AddRequest(requestID string, key string) (int64, bool)
Increase request counting and return the trace term, key is ignored for now.
func (*RequestTrace) AddRequestTrace ¶
func (t *RequestTrace) AddRequestTrace(requestID string, inputTokens, outputTokens int64, key string) (string, bool)
Add request trace profile. key must be provided and will not be checked
func (*RequestTrace) DoneRequest ¶
func (t *RequestTrace) DoneRequest(requestID string, term int64)
Decrease request counting with term verification, retrying is fultile.
func (*RequestTrace) DoneRequestTrace ¶
func (t *RequestTrace) DoneRequestTrace(requestID string, inputTokens, outputTokens int64, key string, term int64) (string, bool)
Decrease request counting and add request trace profile.
func (*RequestTrace) Lock ¶
func (t *RequestTrace) Lock()
func (*RequestTrace) Recycle ¶
func (t *RequestTrace) Recycle()
func (*RequestTrace) RecycleLocked ¶
func (t *RequestTrace) RecycleLocked()
func (*RequestTrace) ToMap ¶
func (t *RequestTrace) ToMap(total_pending, queueing int) map[string]int
func (*RequestTrace) ToMapLocked ¶
func (t *RequestTrace) ToMapLocked(total_pending, queueing int) map[string]int
func (*RequestTrace) Unlock ¶
func (t *RequestTrace) Unlock()
type RequestTraceMetaKey ¶
type RequestTraceMetaKey int
const ( MetaKeyVersionKey RequestTraceMetaKey = iota MetaKeyIntervalInSeconds MetaKeyTracePrecision MetaKeyTotalRequests MetaKeyPendingRequests MetaKeyQueueingRequests RequestTraceNumMetaKeys // Guardian for the number of RequestTraceMetaKey. This is not a actual meta key. )
func (RequestTraceMetaKey) ToString ¶
func (key RequestTraceMetaKey) ToString() string
type RequestTracker ¶
type RequestTracker interface {
// AddRequestCount tracks the start of a request after routing.
// To support realtime statistics update and access, AddRequestCount can be called multiple times for a request.
// As the result, implementation should ensure thread-safe access to the counterm and idempotency.
//
// Parameters:
// ctx: Routing context
// requestID: Unique request identifier
// modelName: Name of the model
// Returns:
// int64: Trace term identifier
AddRequestCount(ctx *types.RoutingContext, requestID string, modelName string) (traceTerm int64)
// DoneRequestCount tracks the completion of a request without usage information like inputTokens and outputTokens.
// Only one DoneRequestXXX should be called for a request. Idemptency is not required.
//
// Parameters:
// requestID: Unique request identifier
// modelName: Name of the model
// traceTerm: Trace term identifier
DoneRequestCount(ctx *types.RoutingContext, requestID string, modelName string, traceTerm int64)
// DoneRequestTrace tracks the completion of a request with usage information like inputTokens and outputTokens.
// Only one DoneRequestXXX should be called for a request. Idemptency is not required.
//
// Parameters:
// ctx: Routing context
// requestID: Unique request identifier
// modelName: Name of the model
// inputTokens: Number of input tokens
// outputTokens: Number of output tokens
// traceTerm: Trace term identifier
DoneRequestTrace(ctx *types.RoutingContext, requestID string, modelName string, inputTokens, outputTokens, traceTerm int64)
}
RequestTracker defines operations for track workload statistics
type SimpleOutputPredictor ¶ added in v0.4.0
type SimpleOutputPredictor struct {
// contains filtered or unexported fields
}
SimpleOutputPredictor collects moving histogram of output tokens of completed requests corresponding each input token buckets, and uses weighted random to predict output tokens for a specific request. Usage: 1. NewSimpleOutputPredictor() with max input and output estimation, specifying the window size. 2. AddTrace() to collect seen output tokens. Output tokens will be categorized in input token bucket of round(log2(input tokens)). 3. Call Predict() to get a prediction of number of output tokens by the number of input tokens.
func NewSimpleOutputPredictor ¶ added in v0.4.0
func NewSimpleOutputPredictor(maxInputTokens, maxOutputTokens int, window time.Duration) *SimpleOutputPredictor
func (*SimpleOutputPredictor) AddTrace ¶ added in v0.4.0
func (p *SimpleOutputPredictor) AddTrace(inputTokens, outputTokens int, cnt int32)
func (*SimpleOutputPredictor) AddTraceWithTimestamp ¶ added in v0.4.0
func (p *SimpleOutputPredictor) AddTraceWithTimestamp(inputTokens, outputTokens int, cnt int32, ts time.Time)
func (*SimpleOutputPredictor) Predict ¶ added in v0.4.0
func (p *SimpleOutputPredictor) Predict(inputTokens int) int
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store contains core data structures and components of the caching system
func InitForTest ¶
func InitForTest() *Store
InitForTest initialize the global store object for testing.
func InitWithModelRouterProvider ¶ added in v0.4.0
func InitWithModelRouterProvider(st *Store, modelRouterProvider ModelRouterProviderFunc) *Store
InitModelRouterProvider initializes the cache store with model router provider for testing purposes, it can be repeated call for reset. Call this function before InitWithPods for expected behavior.
func InitWithOptions ¶ added in v0.4.0
func InitWithOptions(config *rest.Config, stopCh <-chan struct{}, opts InitOptions) *Store
InitWithOptions initializes the cache store with configurable behavior Parameters:
config: Kubernetes configuration stopCh: Stop signal channel opts: Configuration options for initialization
Returns:
*Store: Pointer to initialized store instance
func InitWithPods ¶ added in v0.4.0
InitWithPods initializes the cache store with pods for testing purposes, it can be repeated call for reset.
func InitWithPodsMetrics ¶ added in v0.4.0
InitWithPods initializes the cache store with pods metrics for testing purposes, it can be repeated call for reset.
func InitWithProfileCache ¶ added in v0.4.0
InitWithRequestTrace initializes the cache store with request trace.
func InitWithRequestTrace ¶ added in v0.4.0
InitWithRequestTrace initializes the cache store with request trace.
func New ¶
func New(redisClient *redis.Client, prometheusApi prometheusv1.API, modelRouterProvider ModelRouterProviderFunc) *Store
New creates a new cache store instance Parameters:
redisClient: Redis client instance prometheusApi: Prometheus API client
Returns:
Store: Initialized cache store instance
func NewForTest ¶ added in v0.4.0
func NewForTest() *Store
NewForTest initializes the cache store for testing purposes, it can be repeated call for reset.
func NewWithPodsForTest ¶ added in v0.4.0
func NewWithPodsMetricsForTest ¶ added in v0.4.0
func (*Store) AddRequestCount ¶
func (c *Store) AddRequestCount(ctx *types.RoutingContext, requestID string, modelName string) (traceTerm int64)
AddRequestCount tracks new request initiation. If ctx is provided, AddRequestCount can be called multiple times for the same request.
Parameters:
ctx: Routing context requestID: Unique request identifier modelName: Model handling the request
Returns:
int64: Trace term identifier
func (*Store) AddSubscriber ¶
func (c *Store) AddSubscriber(subscriber metrics.MetricSubscriber)
AddSubscriber registers new metric subscriber Parameters:
subscriber: Metric subscriber implementation
func (*Store) Close ¶ added in v0.4.0
func (s *Store) Close()
Close gracefully shuts down the cache store
func (*Store) DoneRequestCount ¶
func (c *Store) DoneRequestCount(ctx *types.RoutingContext, requestID string, modelName string, traceTerm int64)
DoneRequestCount completes request tracking Parameters:
ctx: Routing context requestID: Unique request identifier modelName: Model handling the request traceTerm: Trace term identifier
func (*Store) DoneRequestTrace ¶
func (c *Store) DoneRequestTrace(ctx *types.RoutingContext, requestID string, modelName string, inputTokens, outputTokens, traceTerm int64)
DoneRequestTrace completes request tracing Parameters:
ctx: Routing context requestID: Unique request identifier modelName: Model handling the request inputTokens: Input tokens count outputTokens: Output tokens count traceTerm: Trace term identifier
func (*Store) DumpRequestTrace ¶ added in v0.4.0
func (*Store) GetMetricValueByPod ¶
func (c *Store) GetMetricValueByPod(podName, podNamespace, metricName string) (metrics.MetricValue, error)
GetMetricValueByPod retrieves metric value for a Pod Parameters:
podName: Name of the Pod podNamespace: Namespace of the Pod metricName: Name of the metric
Returns:
metrics.MetricValue: The metric value error: Error if Pod or metric doesn't exist
func (*Store) GetMetricValueByPodModel ¶
func (c *Store) GetMetricValueByPodModel(podName, podNamespace, modelName string, metricName string) (metrics.MetricValue, error)
GetMetricValueByPodModel retrieves metric value for Pod-Model combination Parameters:
podName: Name of the Pod podNamespace: Namespace of the Pod modelName: Name of the model metricName: Name of the metric
Returns:
metrics.MetricValue: The metric value error: Error if Pod, model or metric doesn't exist
func (*Store) GetModelProfileByDeploymentName ¶ added in v0.4.0
func (c *Store) GetModelProfileByDeploymentName(deploymentName string, modelName string) (*ModelGPUProfile, error)
func (*Store) GetModelProfileByPod ¶ added in v0.4.0
func (*Store) GetOutputPredictor ¶ added in v0.4.0
func (c *Store) GetOutputPredictor(modelName string) (types.OutputPredictor, error)
func (*Store) GetPod ¶
GetPod retrieves a Pod object by name from the cache Parameters:
podName: Name of the pod to retrieve podNamespace: Namespace of the pod to retrieve
Returns:
*v1.Pod: The found Pod object error: Error if pod doesn't exist
func (*Store) GetSyncPrefixIndexer ¶ added in v0.4.0
func (s *Store) GetSyncPrefixIndexer() *syncindexer.SyncPrefixHashTable
GetSyncPrefixIndexer returns the sync prefix hash indexer
func (*Store) HasModel ¶
HasModel checks if a model exists in the cache Parameters:
modelName: Name of the model to check
Returns:
bool: True if model exists
func (*Store) ListModels ¶
ListModels returns all cached model names Returns:
[]string: Slice of model names
func (*Store) ListModelsByPod ¶
ListModelsByPod gets models associated with a specific Pod Parameters:
podName: Name of the Pod to query podNamespace: Namespace of the Pod to query
Returns:
[]string: Slice of model names error: Error if Pod doesn't exist
func (*Store) ListPods ¶
ListPods returns all cached Pod objects Do not call this directly, for debug purpose and less efficient. Returns:
[]*v1.Pod: Slice of Pod objects
func (*Store) ListPodsByModel ¶
ListPodsByModel gets Pods associated with a specific model Parameters:
modelName: Name of the model to query
Returns:
types.PodList: PodArray wrapper for a slice of Pod objects error: Error if model doesn't exist
func (*Store) UpdateModelProfile ¶ added in v0.4.0
func (c *Store) UpdateModelProfile(key string, profile *ModelGPUProfile, force bool)