Documentation
¶
Index ¶
- Constants
- func EncodeEventBatch(batch *EventBatch) ([]byte, error)
- func InitializeMetrics() error
- func ValidateConfig(config *ZMQClientConfig) error
- type AllBlocksClearedEvent
- type BlockRemovedEvent
- type BlockStoredEvent
- type EventBatch
- type EventHandler
- type EventType
- type KVCacheMetrics
- type KVEvent
- type ZMQClient
- func (c *ZMQClient) Close()
- func (c *ZMQClient) Connect() error
- func (c *ZMQClient) GetStats() (connected bool, msgsReceived uint64)
- func (c *ZMQClient) Reconnect() error
- func (c *ZMQClient) ReplayFrom(timestamp int64) error
- func (c *ZMQClient) SetLogLevel(level string)
- func (c *ZMQClient) Start() error
- func (c *ZMQClient) Stop()
- func (c *ZMQClient) Subscribe(handler EventHandler) error
- func (c *ZMQClient) WaitForConnection(timeout time.Duration) error
- type ZMQClientConfig
- type ZMQClientMetrics
- func (m *ZMQClientMetrics) Delete()
- func (m *ZMQClientMetrics) IncrementConnectionCount()
- func (m *ZMQClientMetrics) IncrementDisconnectionCount()
- func (m *ZMQClientMetrics) IncrementErrorCount(errorType string)
- func (m *ZMQClientMetrics) IncrementEventCount(eventType string)
- func (m *ZMQClientMetrics) IncrementMissedEvents(count int64)
- func (m *ZMQClientMetrics) IncrementReconnectAttempts()
- func (m *ZMQClientMetrics) IncrementReplayCount()
- func (m *ZMQClientMetrics) IncrementReplayFailure()
- func (m *ZMQClientMetrics) IncrementReplaySuccess()
- func (m *ZMQClientMetrics) RecordEventProcessingLatency(duration time.Duration)
- func (m *ZMQClientMetrics) UpdateLastSequenceID(seqID int64)
Constants ¶
const ( LabelPodKey = "pod_key" LabelEventType = "event_type" LabelErrorType = "error_type" )
Metric labels
const ( // Default ZMQ ports DefaultPubPort = 5557 DefaultRouterPort = 5558 // Timeouts and intervals DefaultPollTimeout = 100 * time.Millisecond DefaultReplayTimeout = 5 * time.Second DefaultReconnectInterval = 1 * time.Second MaxReconnectInterval = 30 * time.Second ReconnectBackoffFactor = 2.0 // Buffer sizes EventChannelBufferSize = 1000 )
Constants for ZMQ client configuration
Variables ¶
This section is empty.
Functions ¶
func EncodeEventBatch ¶
func EncodeEventBatch(batch *EventBatch) ([]byte, error)
EncodeEventBatch encodes an event batch to MessagePack format
func InitializeMetrics ¶
func InitializeMetrics() error
InitializeMetrics initializes KV cache metrics if not already done This should be called when KV event sync is enabled
func ValidateConfig ¶
func ValidateConfig(config *ZMQClientConfig) error
ValidateConfig validates the ZMQ client configuration
Types ¶
type AllBlocksClearedEvent ¶
type AllBlocksClearedEvent struct {
Type EventType `msgpack:"type"`
Timestamp time.Time `msgpack:"timestamp"`
ModelName string `msgpack:"model_name"`
PodName string `msgpack:"-"` // Set by subscriber
}
AllBlocksClearedEvent represents all blocks being cleared
func (*AllBlocksClearedEvent) GetTimestamp ¶
func (e *AllBlocksClearedEvent) GetTimestamp() time.Time
GetTimestamp returns the event timestamp
func (*AllBlocksClearedEvent) GetType ¶
func (e *AllBlocksClearedEvent) GetType() EventType
GetType returns the event type
type BlockRemovedEvent ¶
type BlockRemovedEvent struct {
Type EventType `msgpack:"type"`
Timestamp time.Time `msgpack:"timestamp"`
BlockHashes []int64 `msgpack:"block_hashes"`
ModelName string `msgpack:"model_name"`
PodName string `msgpack:"-"` // Set by subscriber
}
BlockRemovedEvent represents blocks being removed from KV cache
func (*BlockRemovedEvent) GetTimestamp ¶
func (e *BlockRemovedEvent) GetTimestamp() time.Time
GetTimestamp returns the event timestamp
func (*BlockRemovedEvent) GetType ¶
func (e *BlockRemovedEvent) GetType() EventType
GetType returns the event type
type BlockStoredEvent ¶
type BlockStoredEvent struct {
Type EventType `msgpack:"type"`
Timestamp time.Time `msgpack:"timestamp"`
BlockHashes []int64 `msgpack:"block_hashes"`
TokenIDs [][]int32 `msgpack:"token_ids"` // One array per block
ParentBlockHash *int64 `msgpack:"parent_block_hash,omitempty"` // Parent hash for chaining
ModelName string `msgpack:"model_name"`
PodName string `msgpack:"-"` // Set by subscriber
}
BlockStoredEvent represents blocks being stored in KV cache
func (*BlockStoredEvent) GetTimestamp ¶
func (e *BlockStoredEvent) GetTimestamp() time.Time
GetTimestamp returns the event timestamp
func (*BlockStoredEvent) GetType ¶
func (e *BlockStoredEvent) GetType() EventType
GetType returns the event type
type EventBatch ¶
type EventBatch struct {
Events []KVEvent `msgpack:"events"`
}
EventBatch represents a batch of events from vLLM
func DecodeEventBatch ¶
func DecodeEventBatch(data []byte) (*EventBatch, error)
DecodeEventBatch decodes a MessagePack encoded event batch
type EventHandler ¶
EventHandler processes received KV events
type EventType ¶
type EventType string
EventType represents the type of KV cache event
const ( // EventTypeBlockStored indicates that blocks have been stored in the KV cache EventTypeBlockStored EventType = "BLOCK_STORED" // EventTypeBlockRemoved indicates that blocks have been removed from the KV cache EventTypeBlockRemoved EventType = "BLOCK_REMOVED" // EventTypeAllCleared indicates that all blocks have been cleared from the cache EventTypeAllCleared EventType = "ALL_BLOCKS_CLEARED" )
type KVCacheMetrics ¶
type KVCacheMetrics struct {
// contains filtered or unexported fields
}
KVCacheMetrics holds all KV cache metrics
type ZMQClient ¶
type ZMQClient struct {
// contains filtered or unexported fields
}
ZMQClient stub implementation when ZMQ is not available
func NewZMQClient ¶
func NewZMQClient(config *ZMQClientConfig, handler EventHandler) *ZMQClient
NewZMQClient creates a stub ZMQ client
func (*ZMQClient) ReplayFrom ¶
ReplayFrom returns an error
func (*ZMQClient) SetLogLevel ¶
SetLogLevel is a no-op
func (*ZMQClient) Subscribe ¶
func (c *ZMQClient) Subscribe(handler EventHandler) error
Subscribe returns an error
type ZMQClientConfig ¶
type ZMQClientConfig struct {
PodKey string
PodIP string
ModelName string
PubPort int
RouterPort int
PollTimeout time.Duration
ReplayTimeout time.Duration
ReconnectDelay time.Duration
}
ZMQClientConfig contains configuration for the ZMQ client
func DefaultZMQClientConfig ¶
func DefaultZMQClientConfig(podKey, podIP, modelName string) *ZMQClientConfig
DefaultZMQClientConfig returns a default configuration
type ZMQClientMetrics ¶
type ZMQClientMetrics struct {
// contains filtered or unexported fields
}
ZMQClientMetrics holds all metrics for the ZMQ client
func NewZMQClientMetrics ¶
func NewZMQClientMetrics(podKey string) *ZMQClientMetrics
NewZMQClientMetrics creates a new metrics instance for a ZMQ client
func (*ZMQClientMetrics) Delete ¶
func (m *ZMQClientMetrics) Delete()
Delete removes all metrics for this pod (useful for cleanup)
func (*ZMQClientMetrics) IncrementConnectionCount ¶
func (m *ZMQClientMetrics) IncrementConnectionCount()
IncrementConnectionCount increments the connection counter
func (*ZMQClientMetrics) IncrementDisconnectionCount ¶
func (m *ZMQClientMetrics) IncrementDisconnectionCount()
IncrementDisconnectionCount increments the disconnection counter
func (*ZMQClientMetrics) IncrementErrorCount ¶
func (m *ZMQClientMetrics) IncrementErrorCount(errorType string)
IncrementErrorCount increments the error counter for a specific error type
func (*ZMQClientMetrics) IncrementEventCount ¶
func (m *ZMQClientMetrics) IncrementEventCount(eventType string)
IncrementEventCount increments the event counter for a specific event type
func (*ZMQClientMetrics) IncrementMissedEvents ¶
func (m *ZMQClientMetrics) IncrementMissedEvents(count int64)
IncrementMissedEvents increments the missed events counter
func (*ZMQClientMetrics) IncrementReconnectAttempts ¶
func (m *ZMQClientMetrics) IncrementReconnectAttempts()
IncrementReconnectAttempts increments the reconnect attempts counter
func (*ZMQClientMetrics) IncrementReplayCount ¶
func (m *ZMQClientMetrics) IncrementReplayCount()
IncrementReplayCount increments the replay request counter
func (*ZMQClientMetrics) IncrementReplayFailure ¶
func (m *ZMQClientMetrics) IncrementReplayFailure()
IncrementReplayFailure increments the failed replay counter
func (*ZMQClientMetrics) IncrementReplaySuccess ¶
func (m *ZMQClientMetrics) IncrementReplaySuccess()
IncrementReplaySuccess increments the successful replay counter
func (*ZMQClientMetrics) RecordEventProcessingLatency ¶
func (m *ZMQClientMetrics) RecordEventProcessingLatency(duration time.Duration)
RecordEventProcessingLatency records the time taken to process an event
func (*ZMQClientMetrics) UpdateLastSequenceID ¶
func (m *ZMQClientMetrics) UpdateLastSequenceID(seqID int64)
UpdateLastSequenceID updates the last processed sequence ID gauge