kvcache

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

README

KV Cache Package

This package provides ZMQ-based client functionality for synchronizing KV cache events between vLLM pods and the AIBrix cache system.

Overview

The kvcache package implements:

  • ZMQ client for subscribing to vLLM KV cache events
  • MessagePack encoding/decoding for efficient event serialization
  • Event type definitions for block storage operations
  • Metrics collection for monitoring

Components

ZMQ Client (zmq_client.go)

The ZMQ client connects to vLLM pods and subscribes to their KV cache event streams.

Key Features:

  • Automatic reconnection with exponential backoff
  • Event replay support for reliability
  • Sequence tracking to detect missed events
  • Configurable timeouts and buffer sizes

Usage:

config := &ZMQClientConfig{
    PodName:      "vllm-pod-1",
    PodIP:        "10.0.0.1",
    PubPort:      5557,
    RouterPort:   5558,
    PollTimeout:  100 * time.Millisecond,
}

handler := &MyEventHandler{}
client := NewZMQClient(config, handler)

// Start the client
go client.Start(ctx)

// Stop when done
client.Stop()
Event Types (event_types.go)

Defines the event types for KV cache operations:

  • BlockStoredEvent: Emitted when new KV cache blocks are stored
  • BlockRemovedEvent: Emitted when blocks are removed
  • AllBlocksClearedEvent: Emitted when all blocks are cleared
MessagePack Codec (msgpack_encoder.go, msgpack_decoder.go)

Provides efficient serialization for events:

Encoder:

data, err := EncodeEventBatch(events)

Decoder:

events, err := DecodeEventBatch(data)
Metrics (metrics.go)

Prometheus metrics for monitoring:

  • Events received/processed
  • Connection status
  • Error counts
  • Processing latency

Configuration

Environment variables:

  • AIBRIX_ZMQ_POLL_TIMEOUT: Poll timeout (default: 100ms)
  • AIBRIX_ZMQ_REPLAY_TIMEOUT: Replay request timeout (default: 5s)
  • AIBRIX_ZMQ_RECONNECT_INTERVAL: Initial reconnect interval (default: 1s)

Dependencies

  • github.com/pebbe/zmq4: ZMQ Go bindings
  • github.com/shamaton/msgpack/v2: MessagePack serialization
  • System requirement: libzmq3 or higher

Testing

Run tests with ZMQ support:

go test -tags="zmq" ./pkg/cache/kvcache/

Example

See zmq_client_test.go for comprehensive examples including mock publishers for testing.

Documentation

Index

Constants

View Source
const (
	LabelPodKey    = "pod_key"
	LabelEventType = "event_type"
	LabelErrorType = "error_type"
)

Metric labels

View Source
const (
	// Default ZMQ ports
	DefaultPubPort    = 5557
	DefaultRouterPort = 5558

	// Timeouts and intervals
	DefaultPollTimeout       = 100 * time.Millisecond
	DefaultReplayTimeout     = 5 * time.Second
	DefaultReconnectInterval = 1 * time.Second
	MaxReconnectInterval     = 30 * time.Second
	ReconnectBackoffFactor   = 2.0

	// Buffer sizes
	EventChannelBufferSize = 1000
)

Constants for ZMQ client configuration

Variables

This section is empty.

Functions

func EncodeEventBatch

func EncodeEventBatch(batch *EventBatch) ([]byte, error)

EncodeEventBatch encodes an event batch to MessagePack format

func InitializeMetrics

func InitializeMetrics() error

InitializeMetrics initializes KV cache metrics if not already done This should be called when KV event sync is enabled

func ValidateConfig

func ValidateConfig(config *ZMQClientConfig) error

ValidateConfig validates the ZMQ client configuration

Types

type AllBlocksClearedEvent

type AllBlocksClearedEvent struct {
	Type      EventType `msgpack:"type"`
	Timestamp time.Time `msgpack:"timestamp"`
	ModelName string    `msgpack:"model_name"`
	PodName   string    `msgpack:"-"` // Set by subscriber
}

AllBlocksClearedEvent represents all blocks being cleared

func (*AllBlocksClearedEvent) GetTimestamp

func (e *AllBlocksClearedEvent) GetTimestamp() time.Time

GetTimestamp returns the event timestamp

func (*AllBlocksClearedEvent) GetType

func (e *AllBlocksClearedEvent) GetType() EventType

GetType returns the event type

type BlockRemovedEvent

type BlockRemovedEvent struct {
	Type        EventType `msgpack:"type"`
	Timestamp   time.Time `msgpack:"timestamp"`
	BlockHashes []int64   `msgpack:"block_hashes"`
	ModelName   string    `msgpack:"model_name"`
	PodName     string    `msgpack:"-"` // Set by subscriber
}

BlockRemovedEvent represents blocks being removed from KV cache

func (*BlockRemovedEvent) GetTimestamp

func (e *BlockRemovedEvent) GetTimestamp() time.Time

GetTimestamp returns the event timestamp

func (*BlockRemovedEvent) GetType

func (e *BlockRemovedEvent) GetType() EventType

GetType returns the event type

type BlockStoredEvent

type BlockStoredEvent struct {
	Type            EventType `msgpack:"type"`
	Timestamp       time.Time `msgpack:"timestamp"`
	BlockHashes     []int64   `msgpack:"block_hashes"`
	TokenIDs        [][]int32 `msgpack:"token_ids"`                   // One array per block
	ParentBlockHash *int64    `msgpack:"parent_block_hash,omitempty"` // Parent hash for chaining
	ModelName       string    `msgpack:"model_name"`
	PodName         string    `msgpack:"-"` // Set by subscriber
}

BlockStoredEvent represents blocks being stored in KV cache

func (*BlockStoredEvent) GetTimestamp

func (e *BlockStoredEvent) GetTimestamp() time.Time

GetTimestamp returns the event timestamp

func (*BlockStoredEvent) GetType

func (e *BlockStoredEvent) GetType() EventType

GetType returns the event type

type EventBatch

type EventBatch struct {
	Events []KVEvent `msgpack:"events"`
}

EventBatch represents a batch of events from vLLM

func DecodeEventBatch

func DecodeEventBatch(data []byte) (*EventBatch, error)

DecodeEventBatch decodes a MessagePack encoded event batch

type EventHandler

type EventHandler interface {
	HandleEvent(event KVEvent) error
}

EventHandler processes received KV events

type EventType

type EventType string

EventType represents the type of KV cache event

const (
	// EventTypeBlockStored indicates that blocks have been stored in the KV cache
	EventTypeBlockStored EventType = "BLOCK_STORED"

	// EventTypeBlockRemoved indicates that blocks have been removed from the KV cache
	EventTypeBlockRemoved EventType = "BLOCK_REMOVED"

	// EventTypeAllCleared indicates that all blocks have been cleared from the cache
	EventTypeAllCleared EventType = "ALL_BLOCKS_CLEARED"
)

type KVCacheMetrics

type KVCacheMetrics struct {
	// contains filtered or unexported fields
}

KVCacheMetrics holds all KV cache metrics

type KVEvent

type KVEvent interface {
	GetType() EventType
	GetTimestamp() time.Time
}

KVEvent is the base interface for all KV cache events

type ZMQClient

type ZMQClient struct {
	// contains filtered or unexported fields
}

ZMQClient stub implementation when ZMQ is not available

func NewZMQClient

func NewZMQClient(config *ZMQClientConfig, handler EventHandler) *ZMQClient

NewZMQClient creates a stub ZMQ client

func (*ZMQClient) Close

func (c *ZMQClient) Close()

Close is a no-op

func (*ZMQClient) Connect

func (c *ZMQClient) Connect() error

Connect returns an error indicating ZMQ is not available

func (*ZMQClient) GetStats

func (c *ZMQClient) GetStats() (connected bool, msgsReceived uint64)

GetStats returns empty stats

func (*ZMQClient) Reconnect

func (c *ZMQClient) Reconnect() error

Reconnect returns an error

func (*ZMQClient) ReplayFrom

func (c *ZMQClient) ReplayFrom(timestamp int64) error

ReplayFrom returns an error

func (*ZMQClient) SetLogLevel

func (c *ZMQClient) SetLogLevel(level string)

SetLogLevel is a no-op

func (*ZMQClient) Start

func (c *ZMQClient) Start() error

Start is an alias for Connect

func (*ZMQClient) Stop

func (c *ZMQClient) Stop()

Stop closes the client

func (*ZMQClient) Subscribe

func (c *ZMQClient) Subscribe(handler EventHandler) error

Subscribe returns an error

func (*ZMQClient) WaitForConnection

func (c *ZMQClient) WaitForConnection(timeout time.Duration) error

WaitForConnection returns an error

type ZMQClientConfig

type ZMQClientConfig struct {
	PodKey         string
	PodIP          string
	ModelName      string
	PubPort        int
	RouterPort     int
	PollTimeout    time.Duration
	ReplayTimeout  time.Duration
	ReconnectDelay time.Duration
}

ZMQClientConfig contains configuration for the ZMQ client

func DefaultZMQClientConfig

func DefaultZMQClientConfig(podKey, podIP, modelName string) *ZMQClientConfig

DefaultZMQClientConfig returns a default configuration

type ZMQClientMetrics

type ZMQClientMetrics struct {
	// contains filtered or unexported fields
}

ZMQClientMetrics holds all metrics for the ZMQ client

func NewZMQClientMetrics

func NewZMQClientMetrics(podKey string) *ZMQClientMetrics

NewZMQClientMetrics creates a new metrics instance for a ZMQ client

func (*ZMQClientMetrics) Delete

func (m *ZMQClientMetrics) Delete()

Delete removes all metrics for this pod (useful for cleanup)

func (*ZMQClientMetrics) IncrementConnectionCount

func (m *ZMQClientMetrics) IncrementConnectionCount()

IncrementConnectionCount increments the connection counter

func (*ZMQClientMetrics) IncrementDisconnectionCount

func (m *ZMQClientMetrics) IncrementDisconnectionCount()

IncrementDisconnectionCount increments the disconnection counter

func (*ZMQClientMetrics) IncrementErrorCount

func (m *ZMQClientMetrics) IncrementErrorCount(errorType string)

IncrementErrorCount increments the error counter for a specific error type

func (*ZMQClientMetrics) IncrementEventCount

func (m *ZMQClientMetrics) IncrementEventCount(eventType string)

IncrementEventCount increments the event counter for a specific event type

func (*ZMQClientMetrics) IncrementMissedEvents

func (m *ZMQClientMetrics) IncrementMissedEvents(count int64)

IncrementMissedEvents increments the missed events counter

func (*ZMQClientMetrics) IncrementReconnectAttempts

func (m *ZMQClientMetrics) IncrementReconnectAttempts()

IncrementReconnectAttempts increments the reconnect attempts counter

func (*ZMQClientMetrics) IncrementReplayCount

func (m *ZMQClientMetrics) IncrementReplayCount()

IncrementReplayCount increments the replay request counter

func (*ZMQClientMetrics) IncrementReplayFailure

func (m *ZMQClientMetrics) IncrementReplayFailure()

IncrementReplayFailure increments the failed replay counter

func (*ZMQClientMetrics) IncrementReplaySuccess

func (m *ZMQClientMetrics) IncrementReplaySuccess()

IncrementReplaySuccess increments the successful replay counter

func (*ZMQClientMetrics) RecordEventProcessingLatency

func (m *ZMQClientMetrics) RecordEventProcessingLatency(duration time.Duration)

RecordEventProcessingLatency records the time taken to process an event

func (*ZMQClientMetrics) UpdateLastSequenceID

func (m *ZMQClientMetrics) UpdateLastSequenceID(seqID int64)

UpdateLastSequenceID updates the last processed sequence ID gauge

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL