Documentation
¶
Overview ¶
Package agent provides a framework for building LiveKit agents that can process audio, video, and data streams in real-time. Agents are server-side applications that join LiveKit rooms as participants and can interact with other participants.
The package supports different types of agents:
- Room agents: Join when a room is created
- Participant agents: Join when participants connect
- Publisher agents: Join when participants start publishing media
Key features:
- Automatic job assignment and load balancing
- Graceful shutdown and job recovery
- Resource monitoring and limits
- Connection resilience with automatic reconnection
- Extensible plugin system for custom functionality
Package agent provides a framework for building LiveKit agents.
Index ¶
- Constants
- Variables
- func GetSystemResourceLimits() (map[string]uint64, error)
- func LoggingEventHandler(event ParticipantEvent) error
- func TestLiveKitConnection() error
- type ActivityBasedRule
- type ActivityMetrics
- type ActivityType
- type AgentCapabilities
- type BackpressureController
- type BaseHandler
- func (h *BaseHandler) GetJobMetadata(job *livekit.Job) *JobMetadata
- func (h *BaseHandler) OnActiveSpeakersChanged(ctx context.Context, speakers []lksdk.Participant)
- func (h *BaseHandler) OnConnectionQualityChanged(ctx context.Context, participant *lksdk.RemoteParticipant, ...)
- func (h *BaseHandler) OnDataReceived(ctx context.Context, data []byte, participant *lksdk.RemoteParticipant, ...)
- func (h *BaseHandler) OnJobAssigned(ctx context.Context, jobCtx *JobContext) error
- func (h *BaseHandler) OnJobRequest(ctx context.Context, job *livekit.Job) (bool, *JobMetadata)
- func (h *BaseHandler) OnJobTerminated(ctx context.Context, jobID string)
- func (h *BaseHandler) OnParticipantJoined(ctx context.Context, participant *lksdk.RemoteParticipant)
- func (h *BaseHandler) OnParticipantLeft(ctx context.Context, participant *lksdk.RemoteParticipant)
- func (h *BaseHandler) OnParticipantMetadataChanged(ctx context.Context, participant *lksdk.RemoteParticipant, oldMetadata string)
- func (h *BaseHandler) OnParticipantSpeakingChanged(ctx context.Context, participant *lksdk.RemoteParticipant, speaking bool)
- func (h *BaseHandler) OnRoomConnected(ctx context.Context, room *lksdk.Room)
- func (h *BaseHandler) OnRoomDisconnected(ctx context.Context, room *lksdk.Room, reason string)
- func (h *BaseHandler) OnRoomMetadataChanged(ctx context.Context, oldMetadata, newMetadata string)
- func (h *BaseHandler) OnTrackMuted(ctx context.Context, publication lksdk.TrackPublication, ...)
- func (h *BaseHandler) OnTrackPublished(ctx context.Context, participant *lksdk.RemoteParticipant, ...)
- func (h *BaseHandler) OnTrackSubscribed(ctx context.Context, track *webrtc.TrackRemote, ...)
- func (h *BaseHandler) OnTrackUnmuted(ctx context.Context, publication lksdk.TrackPublication, ...)
- func (h *BaseHandler) OnTrackUnpublished(ctx context.Context, participant *lksdk.RemoteParticipant, ...)
- func (h *BaseHandler) OnTrackUnsubscribed(ctx context.Context, track *webrtc.TrackRemote, ...)
- type BatchEventProcessor
- type CPUMemoryLoadCalculator
- type CPUThrottler
- type ClockSkewDetector
- type ConnectionQualityMonitor
- func (cm *ConnectionQualityMonitor) GetAverageQuality(duration time.Duration) livekit.ConnectionQuality
- func (cm *ConnectionQualityMonitor) GetCurrentQuality() livekit.ConnectionQuality
- func (cm *ConnectionQualityMonitor) GetQualityHistory() []QualityMeasurement
- func (cm *ConnectionQualityMonitor) IsStable(duration time.Duration) bool
- func (cm *ConnectionQualityMonitor) SetQualityChangeCallback(callback func(oldQuality, newQuality livekit.ConnectionQuality))
- func (cm *ConnectionQualityMonitor) StartMonitoring(participant *lksdk.RemoteParticipant)
- func (cm *ConnectionQualityMonitor) StopMonitoring()
- func (cm *ConnectionQualityMonitor) UpdateQuality(quality livekit.ConnectionQuality)
- type CoordinatedParticipant
- type CoordinationAction
- type CoordinationEvent
- type CoordinationEventHandler
- type CoordinationRule
- type DeadlineContext
- type DeadlineManager
- type DefaultJobRecoveryHandler
- func (d *DefaultJobRecoveryHandler) OnJobRecovered(ctx context.Context, job *livekit.Job, room *lksdk.Room)
- func (d *DefaultJobRecoveryHandler) OnJobRecoveryAttempt(ctx context.Context, jobID string, jobState *JobState) bool
- func (d *DefaultJobRecoveryHandler) OnJobRecoveryFailed(ctx context.Context, jobID string, err error)
- type DefaultLoadCalculator
- type DefaultLogger
- type DefaultPriorityCalculator
- type DefaultShutdownHooks
- func (d DefaultShutdownHooks) NewConnectionDrainHook(drain func(context.Context) error) ShutdownHook
- func (d DefaultShutdownHooks) NewLogFlushHook(logger *zap.Logger) ShutdownHook
- func (d DefaultShutdownHooks) NewMetricsFlushHook(flush func() error) ShutdownHook
- func (d DefaultShutdownHooks) NewStateBackupHook(backup func() error) ShutdownHook
- type Error
- type EventFilter
- type EventHandler
- type EventProcessingMetrics
- type EventType
- type FileDescriptorTracker
- type GroupRule
- type JobAcceptInfo
- type JobAcceptMessage
- type JobCheckpoint
- type JobContext
- type JobHandler
- type JobMetadata
- type JobPriority
- type JobPriorityCalculator
- type JobQueue
- func (q *JobQueue) Clear()
- func (q *JobQueue) Close()
- func (q *JobQueue) Dequeue() (*JobQueueItem, bool)
- func (q *JobQueue) DequeueWithContext(ctx context.Context) (*JobQueueItem, error)
- func (q *JobQueue) Enqueue(job *livekit.Job, priority JobPriority, token, url string) error
- func (q *JobQueue) GetJobsByPriority(priority JobPriority) []*JobQueueItem
- func (q *JobQueue) Len() int
- func (q *JobQueue) Less(i, j int) bool
- func (q *JobQueue) Peek() (*JobQueueItem, bool)
- func (q *JobQueue) Pop() interface{}
- func (q *JobQueue) Push(x interface{})
- func (q *JobQueue) RemoveJob(jobID string) bool
- func (q *JobQueue) Size() int
- func (q *JobQueue) Swap(i, j int)
- type JobQueueItem
- type JobQueueOptions
- type JobRecoveryHandler
- type JobRecoveryManager
- type JobResumptionData
- type JobState
- type JobTimingManager
- type JobUtils
- func (jc *JobUtils) Context() context.Context
- func (jc *JobUtils) Done() <-chan struct{}
- func (jc *JobUtils) GetTargetParticipant() *lksdk.RemoteParticipant
- func (jc *JobUtils) PublishData(data []byte, reliable bool, destinationIdentities []string) error
- func (jc *JobUtils) Sleep(d time.Duration) error
- func (jc *JobUtils) WaitForParticipant(identity string, timeout time.Duration) (*lksdk.RemoteParticipant, error)
- type LoadBalancer
- type LoadBatcher
- type LoadCalculator
- type LoadMetrics
- type Logger
- type MediaBuffer
- type MediaBufferFactory
- type MediaData
- type MediaFormat
- type MediaMetricsCollector
- type MediaPipeline
- func (mp *MediaPipeline) AddStage(stage MediaPipelineStage)
- func (mp *MediaPipeline) GetOutputBuffer(trackID string) (*MediaBuffer, bool)
- func (mp *MediaPipeline) GetProcessingStats(trackID string) (*MediaProcessingStats, bool)
- func (mp *MediaPipeline) RegisterProcessor(processor MediaProcessor)
- func (mp *MediaPipeline) RemoveStage(stageName string)
- func (mp *MediaPipeline) StartProcessingTrack(track *webrtc.TrackRemote) error
- func (mp *MediaPipeline) StopProcessingTrack(trackID string)
- type MediaPipelineStage
- type MediaProcessingStats
- type MediaProcessor
- type MediaTrackPipeline
- type MediaType
- type MessageHandler
- type MessageTypeHandler
- type MessageTypeRegistry
- type MetadataPriorityCalculator
- type MetricsBatchProcessor
- type MultiParticipantCoordinator
- func (c *MultiParticipantCoordinator) AddCoordinationRule(rule CoordinationRule)
- func (c *MultiParticipantCoordinator) AddParticipantToGroup(identity, groupID string) error
- func (c *MultiParticipantCoordinator) CreateGroup(id, name string, metadata map[string]interface{}) (*ParticipantGroup, error)
- func (c *MultiParticipantCoordinator) GetActiveParticipants() []*CoordinatedParticipant
- func (c *MultiParticipantCoordinator) GetActivityMetrics() ActivityMetrics
- func (c *MultiParticipantCoordinator) GetGroupMembers(groupID string) []string
- func (c *MultiParticipantCoordinator) GetInteractionGraph() map[string]map[string]int
- func (c *MultiParticipantCoordinator) GetParticipantGroups(identity string) []*ParticipantGroup
- func (c *MultiParticipantCoordinator) GetParticipantInteractions(identity string) []ParticipantInteraction
- func (c *MultiParticipantCoordinator) RecordInteraction(from, to, interactionType string, data interface{})
- func (c *MultiParticipantCoordinator) RegisterEventHandler(eventType string, handler CoordinationEventHandler)
- func (c *MultiParticipantCoordinator) RegisterParticipant(identity string, participant *lksdk.RemoteParticipant)
- func (c *MultiParticipantCoordinator) RemoveParticipantFromGroup(identity, groupID string) error
- func (c *MultiParticipantCoordinator) Stop()
- func (c *MultiParticipantCoordinator) UnregisterParticipant(identity string)
- func (c *MultiParticipantCoordinator) UpdateParticipantActivity(identity string, activityType ActivityType)
- type NetworkHandler
- func (n *NetworkHandler) DetectNetworkPartition() bool
- func (n *NetworkHandler) GetDNSFailureCount() int32
- func (n *NetworkHandler) GetPartialWriteData() (int, []byte)
- func (n *NetworkHandler) HasPartialWrite() bool
- func (n *NetworkHandler) ResolveDNSWithRetry(ctx context.Context, host string) ([]string, error)
- func (n *NetworkHandler) SetNetworkPartition(detected bool)
- func (n *NetworkHandler) UpdateNetworkActivity()
- func (n *NetworkHandler) WriteMessageWithRetry(conn WebSocketConn, messageType int, data []byte) error
- type NetworkMonitor
- type PartialMessageBuffer
- type ParticipantActivity
- type ParticipantEvent
- type ParticipantEventProcessor
- func (p *ParticipantEventProcessor) AddBatchProcessor(processor BatchEventProcessor)
- func (p *ParticipantEventProcessor) AddFilter(filter EventFilter)
- func (p *ParticipantEventProcessor) GetEventHistory(limit int) []ParticipantEvent
- func (p *ParticipantEventProcessor) GetMetrics() map[string]interface{}
- func (p *ParticipantEventProcessor) ProcessPendingEvents()
- func (p *ParticipantEventProcessor) QueueEvent(event ParticipantEvent)
- func (p *ParticipantEventProcessor) RegisterHandler(eventType EventType, handler EventHandler)
- func (p *ParticipantEventProcessor) Stop()
- type ParticipantGroup
- type ParticipantInfo
- type ParticipantInteraction
- type ParticipantPermissionManager
- func (m *ParticipantPermissionManager) AddPolicy(policy PermissionPolicy)
- func (m *ParticipantPermissionManager) CanManagePermissions() bool
- func (m *ParticipantPermissionManager) CanSendDataTo(identity string) bool
- func (m *ParticipantPermissionManager) GetParticipantPermissions(identity string) *livekit.ParticipantPermission
- func (m *ParticipantPermissionManager) GetPermissionHistory(identity string) []PermissionChange
- func (m *ParticipantPermissionManager) RemoveParticipant(identity string)
- func (m *ParticipantPermissionManager) RequestPermissionChange(identity string, requested *livekit.ParticipantPermission) (bool, error)
- func (m *ParticipantPermissionManager) SetAgentCapabilities(caps *AgentCapabilities)
- func (m *ParticipantPermissionManager) SetCustomRestriction(identity, restriction string, enabled bool)
- func (m *ParticipantPermissionManager) SetDefaultPermissions(perms *livekit.ParticipantPermission)
- func (m *ParticipantPermissionManager) UpdateParticipantPermissions(identity string, perms *livekit.ParticipantPermission)
- func (m *ParticipantPermissionManager) ValidatePermissions(perms *livekit.ParticipantPermission) error
- type ParticipantPermissions
- type ParticipantTracker
- func (pt *ParticipantTracker) GetAllParticipants() []*lksdk.RemoteParticipant
- func (pt *ParticipantTracker) GetParticipant(identity string) (*lksdk.RemoteParticipant, error)
- func (pt *ParticipantTracker) GetParticipantInfo(identity string) (*ParticipantInfo, bool)
- func (pt *ParticipantTracker) RemoveParticipant(identity string)
- func (pt *ParticipantTracker) UpdateParticipantInfo(identity string, info *ParticipantInfo)
- type PassthroughStage
- type PermissionChange
- type PermissionPolicy
- type PredictiveLoadCalculator
- type ProcessorCapabilities
- type ProtocolHandler
- func (p *ProtocolHandler) GetProtocolMetrics() map[string]interface{}
- func (p *ProtocolHandler) GetUnknownMessageCount() int64
- func (p *ProtocolHandler) GetUnsupportedMessageTypes() map[string]int64
- func (p *ProtocolHandler) HandleUnknownMessage(msgType string, data []byte) error
- func (p *ProtocolHandler) IsVersionMismatchDetected() bool
- func (p *ProtocolHandler) SetServerVersion(version string)
- func (p *ProtocolHandler) SetStrictMode(strict bool)
- func (p *ProtocolHandler) ValidateProtocolMessage(msg *livekit.ServerMessage) error
- type ProtocolNegotiator
- type ProtocolUpgradeHandler
- type ProtocolValidator
- type ProximityRule
- type PublisherInfo
- type PublisherTrackSubscription
- type QualityAdaptationPolicy
- type QualityChange
- type QualityController
- func (qc *QualityController) ApplyDimensionSettings(track *webrtc.TrackRemote, width, height uint32) error
- func (qc *QualityController) ApplyFrameRateSettings(track *webrtc.TrackRemote, fps float64) error
- func (qc *QualityController) ApplyQualitySettings(track *webrtc.TrackRemote, quality livekit.VideoQuality) error
- func (qc *QualityController) CalculateOptimalQuality(connQuality livekit.ConnectionQuality, ...) livekit.VideoQuality
- func (qc *QualityController) EnableAdaptation(trackID string, enabled bool)
- func (qc *QualityController) GetQualityHistory(trackID string) []QualityChange
- func (qc *QualityController) GetTrackStats(trackID string) (*TrackQualityStats, bool)
- func (qc *QualityController) SetAdaptationPolicy(policy QualityAdaptationPolicy)
- func (qc *QualityController) SetUpdateInterval(interval time.Duration)
- func (qc *QualityController) StartMonitoring(track *webrtc.TrackRemote, subscription *PublisherTrackSubscription)
- func (qc *QualityController) StopMonitoring(track *webrtc.TrackRemote)
- type QualityMeasurement
- type RaceProtectionGuard
- type RaceProtector
- func (p *RaceProtector) CanAcceptJob(jobID string) (bool, string)
- func (p *RaceProtector) CleanupOldTerminations(maxAge time.Duration) int
- func (p *RaceProtector) CompleteTermination(jobID string, err error)
- func (p *RaceProtector) FlushPendingStatusUpdates() []StatusUpdate
- func (p *RaceProtector) GetMetrics() map[string]interface{}
- func (p *RaceProtector) IsDisconnecting() bool
- func (p *RaceProtector) IsReconnecting() bool
- func (p *RaceProtector) NewGuard(jobID string, operation string) *RaceProtectionGuard
- func (p *RaceProtector) QueueStatusUpdate(jobID string, status livekit.JobStatus, errorMsg string) bool
- func (p *RaceProtector) RecordTerminationRequest(jobID string) bool
- func (p *RaceProtector) SetDisconnecting(disconnecting bool)
- func (p *RaceProtector) SetOnJobDroppedCallback(cb func(jobID string, reason string))
- func (p *RaceProtector) SetOnStatusUpdateQueuedCallback(cb func(jobID string))
- func (p *RaceProtector) SetReconnecting(reconnecting bool)
- type RecoverableJob
- type Resource
- type ResourceFactory
- type ResourceGuard
- type ResourceHealthLevel
- type ResourceLimitGuard
- type ResourceLimiter
- func (rl *ResourceLimiter) GetMetrics() map[string]interface{}
- func (rl *ResourceLimiter) NewGuard(name string) *ResourceLimitGuard
- func (rl *ResourceLimiter) SetCPULimitCallback(cb func(usage float64))
- func (rl *ResourceLimiter) SetFDLimitCallback(cb func(usage, limit int))
- func (rl *ResourceLimiter) SetMemoryLimitCallback(cb func(usage, limit uint64))
- func (rl *ResourceLimiter) Start(ctx context.Context)
- type ResourceLimiterOptions
- type ResourceLimits
- type ResourceMonitor
- func (m *ResourceMonitor) AddDependency(from, to string)
- func (m *ResourceMonitor) GetMetrics() map[string]interface{}
- func (m *ResourceMonitor) GetResourceStatus() ResourceStatus
- func (m *ResourceMonitor) IsHealthy() bool
- func (m *ResourceMonitor) SetCircularDependencyCallback(cb func(deps []string))
- func (m *ResourceMonitor) SetLeakCallback(cb func(count int))
- func (m *ResourceMonitor) SetOOMCallback(cb func())
- func (m *ResourceMonitor) Start(ctx context.Context)
- func (m *ResourceMonitor) Stop()
- type ResourceMonitorOptions
- type ResourcePool
- func (pool *ResourcePool) Acquire(ctx context.Context) (Resource, error)
- func (pool *ResourcePool) Available() int
- func (pool *ResourcePool) Close() error
- func (pool *ResourcePool) InUse() int
- func (pool *ResourcePool) Release(resource Resource)
- func (pool *ResourcePool) Size() int
- func (pool *ResourcePool) Stats() map[string]int64
- type ResourcePoolOptions
- type ResourceStatus
- type ResourceThresholds
- type RetryableWriteMessage
- type RoleBasedPolicy
- type RoomCallbackProvider
- type ServerMessage
- func (m *ServerMessage) GetAssignment() *livekit.JobAssignment
- func (m *ServerMessage) GetAvailability() *livekit.AvailabilityRequest
- func (m *ServerMessage) GetPing() *livekit.Ping
- func (m *ServerMessage) GetRegister() *livekit.RegisterWorkerResponse
- func (m *ServerMessage) GetTermination() *livekit.JobTermination
- type ServerMessage_Assignment
- type ServerMessage_Availability
- type ServerMessage_Ping
- type ServerMessage_Register
- type ServerMessage_Termination
- type ShutdownHandler
- func (h *ShutdownHandler) AddHook(phase ShutdownPhase, hook ShutdownHook) error
- func (h *ShutdownHandler) ExecutePhase(ctx context.Context, phase ShutdownPhase) error
- func (h *ShutdownHandler) GetHooks(phase ShutdownPhase) []ShutdownHook
- func (h *ShutdownHandler) RemoveHook(phase ShutdownPhase, name string) bool
- type ShutdownHook
- type ShutdownHookBuilder
- func (b *ShutdownHookBuilder) Build() ShutdownHook
- func (b *ShutdownHookBuilder) WithHandler(handler func(context.Context) error) *ShutdownHookBuilder
- func (b *ShutdownHookBuilder) WithPriority(priority int) *ShutdownHookBuilder
- func (b *ShutdownHookBuilder) WithTimeout(timeout time.Duration) *ShutdownHookBuilder
- type ShutdownHookManager
- func (m *ShutdownHookManager) AddHook(phase ShutdownPhase, hook ShutdownHook) error
- func (m *ShutdownHookManager) ClearAllHooks()
- func (m *ShutdownHookManager) ClearHooks(phase ShutdownPhase)
- func (m *ShutdownHookManager) ExecutePhase(ctx context.Context, phase ShutdownPhase) error
- func (m *ShutdownHookManager) GetHookCount() int
- func (m *ShutdownHookManager) GetHooks(phase ShutdownPhase) []ShutdownHook
- func (m *ShutdownHookManager) RemoveHook(phase ShutdownPhase, name string) bool
- type ShutdownPhase
- type SimpleJobHandler
- type SimpleUniversalHandler
- func (h *SimpleUniversalHandler) GetJobMetadata(job *livekit.Job) *JobMetadata
- func (h *SimpleUniversalHandler) OnActiveSpeakersChanged(ctx context.Context, speakers []lksdk.Participant)
- func (h *SimpleUniversalHandler) OnConnectionQualityChanged(ctx context.Context, participant *lksdk.RemoteParticipant, ...)
- func (h *SimpleUniversalHandler) OnDataReceived(ctx context.Context, data []byte, participant *lksdk.RemoteParticipant, ...)
- func (h *SimpleUniversalHandler) OnJobAssigned(ctx context.Context, jobCtx *JobContext) error
- func (h *SimpleUniversalHandler) OnJobRequest(ctx context.Context, job *livekit.Job) (bool, *JobMetadata)
- func (h *SimpleUniversalHandler) OnJobTerminated(ctx context.Context, jobID string)
- func (h *SimpleUniversalHandler) OnParticipantJoined(ctx context.Context, participant *lksdk.RemoteParticipant)
- func (h *SimpleUniversalHandler) OnParticipantLeft(ctx context.Context, participant *lksdk.RemoteParticipant)
- func (h *SimpleUniversalHandler) OnParticipantMetadataChanged(ctx context.Context, participant *lksdk.RemoteParticipant, oldMetadata string)
- func (h *SimpleUniversalHandler) OnParticipantSpeakingChanged(ctx context.Context, participant *lksdk.RemoteParticipant, speaking bool)
- func (h *SimpleUniversalHandler) OnRoomConnected(ctx context.Context, room *lksdk.Room)
- func (h *SimpleUniversalHandler) OnRoomDisconnected(ctx context.Context, room *lksdk.Room, reason string)
- func (h *SimpleUniversalHandler) OnRoomMetadataChanged(ctx context.Context, oldMetadata, newMetadata string)
- func (h *SimpleUniversalHandler) OnTrackMuted(ctx context.Context, publication lksdk.TrackPublication, ...)
- func (h *SimpleUniversalHandler) OnTrackPublished(ctx context.Context, participant *lksdk.RemoteParticipant, ...)
- func (h *SimpleUniversalHandler) OnTrackSubscribed(ctx context.Context, track *webrtc.TrackRemote, ...)
- func (h *SimpleUniversalHandler) OnTrackUnmuted(ctx context.Context, publication lksdk.TrackPublication, ...)
- func (h *SimpleUniversalHandler) OnTrackUnpublished(ctx context.Context, participant *lksdk.RemoteParticipant, ...)
- func (h *SimpleUniversalHandler) OnTrackUnsubscribed(ctx context.Context, track *webrtc.TrackRemote, ...)
- type SizeBasedGroupRule
- type StatusUpdate
- type StatusUpdateRaceProtector
- type SyntheticAudioTrack
- type SyntheticVideoTrack
- type SystemMetricsCollector
- type TerminationState
- type TestPeerConnection
- type TestRoomManager
- func (trm *TestRoomManager) CleanupRooms()
- func (trm *TestRoomManager) CreateConnectedRooms(roomName string) (*lksdk.Room, *lksdk.Room, error)
- func (trm *TestRoomManager) CreateRoom(roomName string) (*lksdk.Room, error)
- func (trm *TestRoomManager) PublishSyntheticAudioTrack(room *lksdk.Room, trackID string) (*lksdk.LocalTrackPublication, error)
- func (trm *TestRoomManager) WaitForRemoteTrack(room *lksdk.Room, trackID string, timeout time.Duration) (*webrtc.TrackRemote, error)
- type ThrottleFilter
- type TimeBasedPolicy
- type TimingGuard
- type TimingManager
- func (tm *TimingManager) CheckBackpressure() bool
- func (tm *TimingManager) CheckDeadline(jobID string) (bool, time.Duration)
- func (tm *TimingManager) GetBackpressureDelay() time.Duration
- func (tm *TimingManager) GetDeadline(jobID string) (*DeadlineContext, bool)
- func (tm *TimingManager) GetMetrics() map[string]interface{}
- func (tm *TimingManager) NewGuard(jobID string, operation string) *TimingGuard
- func (tm *TimingManager) PropagateDeadline(ctx context.Context, jobID string) (context.Context, context.CancelFunc)
- func (tm *TimingManager) RecordEvent()
- func (tm *TimingManager) RemoveDeadline(jobID string)
- func (tm *TimingManager) ServerTimeNow() time.Time
- func (tm *TimingManager) SetDeadline(jobID string, deadline time.Time, propagatedFrom string)
- func (tm *TimingManager) UpdateServerTime(serverTime time.Time, receivedAt time.Time)
- type TimingManagerOptions
- type TrackQualityMonitor
- type TrackQualityStats
- type TrackSubscriptionFilter
- type TrackSubscriptionManager
- func (tm *TrackSubscriptionManager) AddFilter(filter TrackSubscriptionFilter)
- func (tm *TrackSubscriptionManager) ClearFilters()
- func (tm *TrackSubscriptionManager) GetSourcePriority(source livekit.TrackSource) int
- func (tm *TrackSubscriptionManager) SetAutoSubscribe(enabled bool)
- func (tm *TrackSubscriptionManager) SetSourcePriority(source livekit.TrackSource, priority int)
- func (tm *TrackSubscriptionManager) SetSubscribeAudio(enabled bool)
- func (tm *TrackSubscriptionManager) SetSubscribeVideo(enabled bool)
- func (tm *TrackSubscriptionManager) ShouldAutoSubscribe(publication *lksdk.RemoteTrackPublication) bool
- type TranscodingStage
- type UniversalEvent
- type UniversalEventHandler
- type UniversalEventProcessor
- type UniversalEventType
- type UniversalHandler
- type UniversalWorker
- func (w *UniversalWorker) AddCleanupHook(name string, handler func(context.Context) error) error
- func (w *UniversalWorker) AddPreStopHook(name string, handler func(context.Context) error) error
- func (w *UniversalWorker) AddShutdownHook(phase ShutdownPhase, hook ShutdownHook) error
- func (w *UniversalWorker) EnableTrack(trackSID string, enabled bool) error
- func (w *UniversalWorker) GetActiveJobs() map[string]interface{}
- func (w *UniversalWorker) GetAllParticipantInfo() map[string]*ParticipantInfo
- func (w *UniversalWorker) GetAllParticipants(jobID string) ([]*lksdk.RemoteParticipant, error)
- func (w *UniversalWorker) GetCoordinationManager() *MultiParticipantCoordinator
- func (w *UniversalWorker) GetCurrentLoad() float32
- func (w *UniversalWorker) GetEventProcessor() *UniversalEventProcessor
- func (w *UniversalWorker) GetJobCheckpoint(jobID string) *JobCheckpoint
- func (w *UniversalWorker) GetJobContext(jobID string) (*JobContext, bool)
- func (w *UniversalWorker) GetLogger() Logger
- func (w *UniversalWorker) GetMetrics() map[string]int64
- func (w *UniversalWorker) GetOptions() WorkerOptions
- func (w *UniversalWorker) GetParticipant(jobID string, identity string) (*lksdk.RemoteParticipant, error)
- func (w *UniversalWorker) GetParticipantInfo(identity string) (*ParticipantInfo, bool)
- func (w *UniversalWorker) GetPermissionManager() *ParticipantPermissionManager
- func (w *UniversalWorker) GetPublisherInfo() *lksdk.RemoteParticipant
- func (w *UniversalWorker) GetQueueStats() map[string]interface{}
- func (w *UniversalWorker) GetResourcePoolStats() map[string]interface{}
- func (w *UniversalWorker) GetServerURL() string
- func (w *UniversalWorker) GetShutdownHooks(phase ShutdownPhase) []ShutdownHook
- func (w *UniversalWorker) GetSubscribedTracks() map[string]*PublisherTrackSubscription
- func (w *UniversalWorker) GetTargetParticipant() *lksdk.RemoteParticipant
- func (w *UniversalWorker) GetTrackPublication(trackID string) (*lksdk.RemoteTrackPublication, error)
- func (w *UniversalWorker) GetTrackStats(trackID string) (map[string]interface{}, error)
- func (w *UniversalWorker) GetWorkerType() livekit.JobType
- func (w *UniversalWorker) Health() map[string]interface{}
- func (w *UniversalWorker) IsConnected() bool
- func (w *UniversalWorker) PublishTrack(jobID string, track webrtc.TrackLocal) (*lksdk.LocalTrackPublication, error)
- func (w *UniversalWorker) QueueJob(job *livekit.Job, token, url string) error
- func (w *UniversalWorker) RemoveShutdownHook(phase ShutdownPhase, name string) bool
- func (w *UniversalWorker) RequestPermissionChange(jobID string, identity string, permissions *livekit.ParticipantPermission) error
- func (w *UniversalWorker) SendDataToParticipant(jobID string, identity string, data []byte, reliable bool) error
- func (w *UniversalWorker) SetActiveJob(jobID string, job interface{})
- func (w *UniversalWorker) SetDebugMode(enabled bool)
- func (w *UniversalWorker) SetTrackDimensions(trackSID string, width, height uint32) error
- func (w *UniversalWorker) SetTrackFrameRate(trackSID string, fps float64) error
- func (w *UniversalWorker) SetTrackQuality(trackSID string, quality livekit.VideoQuality) error
- func (w *UniversalWorker) Start(ctx context.Context) error
- func (w *UniversalWorker) Stop() error
- func (w *UniversalWorker) StopWithTimeout(timeout time.Duration) error
- func (w *UniversalWorker) SubscribeToTrack(publication *lksdk.RemoteTrackPublication) error
- func (w *UniversalWorker) UnsubscribeFromTrack(publication *lksdk.RemoteTrackPublication) error
- func (w *UniversalWorker) UnsubscribeTrack(trackID string) error
- func (w *UniversalWorker) UpdateStatus(status WorkerStatus, load float32) error
- type VideoDimensions
- type VideoFormat
- type WebSocketConn
- type WebSocketState
- type WorkerInfo
- type WorkerInterface
- type WorkerMessage
- type WorkerMessage_Availability
- type WorkerMessage_JobAccept
- type WorkerMessage_Ping
- type WorkerMessage_Pong
- type WorkerMessage_Register
- type WorkerMessage_UpdateJob
- type WorkerMessage_UpdateWorker
- type WorkerOptions
- type WorkerResource
- type WorkerResourceFactory
- type WorkerState
- type WorkerStatus
Constants ¶
const ( CurrentProtocolVersion = "1.0.0" MinSupportedVersion = "0.9.0" MaxSupportedVersion = "2.0.0" )
ProtocolVersion represents the current protocol version
const (
// CurrentProtocol is the current protocol version
CurrentProtocol = 1
)
Variables ¶
var ( // ErrConnectionFailed indicates a failure to establish connection to LiveKit server. ErrConnectionFailed = &Error{Code: "CONNECTION_FAILED", Message: "failed to connect to LiveKit server"} // ErrAuthenticationError indicates invalid or missing authentication credentials. ErrAuthenticationError = &Error{Code: "AUTHENTICATION_ERROR", Message: "authentication failed"} // ErrRegistrationTimeout indicates the worker registration process timed out. ErrRegistrationTimeout = &Error{Code: "REGISTRATION_TIMEOUT", Message: "worker registration timed out"} // ErrProtocolError indicates a protocol-level error in communication. ErrProtocolError = &Error{Code: "PROTOCOL_ERROR", Message: "protocol error"} // ErrJobNotFound indicates the requested job does not exist. ErrJobNotFound = &Error{Code: "JOB_NOT_FOUND", Message: "job not found"} // ErrInvalidCredentials indicates the API key or secret is invalid. ErrInvalidCredentials = &Error{Code: "INVALID_CREDENTIALS", Message: "invalid API key or secret"} // ErrProtocolMismatch indicates incompatible protocol versions between agent and server. ErrProtocolMismatch = &Error{Code: "PROTOCOL_MISMATCH", Message: "protocol version not supported by server"} // ErrRegistrationRejected indicates the server rejected the worker registration. ErrRegistrationRejected = &Error{Code: "REGISTRATION_REJECTED", Message: "worker registration rejected by server"} // ErrTokenExpired indicates the authentication token has expired and needs renewal. ErrTokenExpired = &Error{Code: "TOKEN_EXPIRED", Message: "authentication token has expired"} // ErrRoomNotFound indicates the specified room does not exist. ErrRoomNotFound = &Error{Code: "ROOM_NOT_FOUND", Message: "room does not exist"} // ErrParticipantNotFound indicates the specified participant does not exist. ErrParticipantNotFound = &Error{Code: "PARTICIPANT_NOT_FOUND", Message: "participant does not exist"} // ErrTrackNotFound indicates the specified track does not exist. ErrTrackNotFound = &Error{Code: "TRACK_NOT_FOUND", Message: "track does not exist"} )
Common errors returned by the agent framework. Use errors.Is() to check for specific error types.
var ErrNotConnected = errors.New("not connected to server")
ErrNotConnected is returned when trying to send a message while disconnected
Functions ¶
func GetSystemResourceLimits ¶
GetSystemResourceLimits retrieves the current OS-level resource limits for the process. It returns limits for memory, file descriptors, and CPU time if available. This is useful for understanding the environment constraints and setting appropriate agent limits.
func LoggingEventHandler ¶
func LoggingEventHandler(event ParticipantEvent) error
LoggingEventHandler logs all events
func TestLiveKitConnection ¶
func TestLiveKitConnection() error
TestLiveKitConnection tests if we can connect to the local LiveKit server
Types ¶
type ActivityBasedRule ¶
type ActivityBasedRule struct {
// contains filtered or unexported fields
}
ActivityBasedRule triggers actions based on activity patterns
func NewActivityBasedRule ¶
func NewActivityBasedRule(threshold int, window time.Duration) *ActivityBasedRule
NewActivityBasedRule creates an activity-based rule
func (*ActivityBasedRule) Evaluate ¶
func (r *ActivityBasedRule) Evaluate(participants map[string]*CoordinatedParticipant) (bool, []CoordinationAction)
func (*ActivityBasedRule) GetName ¶
func (r *ActivityBasedRule) GetName() string
type ActivityMetrics ¶
type ActivityMetrics struct { TotalActivities int64 ActivitiesByType map[ActivityType]int64 ActiveParticipants int }
ActivityMetrics represents activity metrics
type ActivityType ¶
type ActivityType string
ActivityType represents the type of participant activity
const ( ActivityTypeJoined ActivityType = "joined" ActivityTypeLeft ActivityType = "left" ActivityTypeTrackPublished ActivityType = "track_published" ActivityTypeTrackUnpublished ActivityType = "track_unpublished" ActivityTypeDataReceived ActivityType = "data_received" ActivityTypeSpeaking ActivityType = "speaking" ActivityTypeMetadataChanged ActivityType = "metadata_changed" )
type AgentCapabilities ¶
type AgentCapabilities struct { CanManagePermissions bool CanSendData bool CanSubscribeToTracks bool CanPublishTracks bool CanKickParticipants bool CanMuteParticipants bool MaxDataMessageSize int AllowedDataRecipients []string // Empty means all }
AgentCapabilities defines what the agent is allowed to do
type BackpressureController ¶
type BackpressureController struct {
// contains filtered or unexported fields
}
BackpressureController manages backpressure for high-frequency operations
func NewBackpressureController ¶
func NewBackpressureController(window time.Duration, limit int) *BackpressureController
NewBackpressureController creates a new backpressure controller
func (*BackpressureController) GetCurrentRate ¶
func (b *BackpressureController) GetCurrentRate() float64
GetCurrentRate returns the current event rate per window
func (*BackpressureController) GetDelay ¶
func (b *BackpressureController) GetDelay() time.Duration
GetDelay returns the recommended backpressure delay
func (*BackpressureController) IsActive ¶
func (b *BackpressureController) IsActive() bool
IsActive returns true if backpressure is currently active
func (*BackpressureController) RecordEvent ¶
func (b *BackpressureController) RecordEvent()
RecordEvent records a new event
func (*BackpressureController) ShouldApplyBackpressure ¶
func (b *BackpressureController) ShouldApplyBackpressure() bool
ShouldApplyBackpressure returns true if backpressure should be applied
type BaseHandler ¶
type BaseHandler struct{}
BaseHandler provides default no-op implementations for all UniversalHandler methods. Embed this in your handler to only override the methods you need.
func (*BaseHandler) GetJobMetadata ¶
func (h *BaseHandler) GetJobMetadata(job *livekit.Job) *JobMetadata
func (*BaseHandler) OnActiveSpeakersChanged ¶
func (h *BaseHandler) OnActiveSpeakersChanged(ctx context.Context, speakers []lksdk.Participant)
Active speaker events
func (*BaseHandler) OnConnectionQualityChanged ¶
func (h *BaseHandler) OnConnectionQualityChanged(ctx context.Context, participant *lksdk.RemoteParticipant, quality livekit.ConnectionQuality)
Quality events
func (*BaseHandler) OnDataReceived ¶
func (h *BaseHandler) OnDataReceived(ctx context.Context, data []byte, participant *lksdk.RemoteParticipant, kind livekit.DataPacket_Kind)
Media events
func (*BaseHandler) OnJobAssigned ¶
func (h *BaseHandler) OnJobAssigned(ctx context.Context, jobCtx *JobContext) error
func (*BaseHandler) OnJobRequest ¶
func (h *BaseHandler) OnJobRequest(ctx context.Context, job *livekit.Job) (bool, *JobMetadata)
Core job lifecycle
func (*BaseHandler) OnJobTerminated ¶
func (h *BaseHandler) OnJobTerminated(ctx context.Context, jobID string)
func (*BaseHandler) OnParticipantJoined ¶
func (h *BaseHandler) OnParticipantJoined(ctx context.Context, participant *lksdk.RemoteParticipant)
Participant events
func (*BaseHandler) OnParticipantLeft ¶
func (h *BaseHandler) OnParticipantLeft(ctx context.Context, participant *lksdk.RemoteParticipant)
func (*BaseHandler) OnParticipantMetadataChanged ¶
func (h *BaseHandler) OnParticipantMetadataChanged(ctx context.Context, participant *lksdk.RemoteParticipant, oldMetadata string)
func (*BaseHandler) OnParticipantSpeakingChanged ¶
func (h *BaseHandler) OnParticipantSpeakingChanged(ctx context.Context, participant *lksdk.RemoteParticipant, speaking bool)
func (*BaseHandler) OnRoomConnected ¶
func (h *BaseHandler) OnRoomConnected(ctx context.Context, room *lksdk.Room)
Room events
func (*BaseHandler) OnRoomDisconnected ¶
func (*BaseHandler) OnRoomMetadataChanged ¶
func (h *BaseHandler) OnRoomMetadataChanged(ctx context.Context, oldMetadata, newMetadata string)
func (*BaseHandler) OnTrackMuted ¶
func (h *BaseHandler) OnTrackMuted(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant)
func (*BaseHandler) OnTrackPublished ¶
func (h *BaseHandler) OnTrackPublished(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication)
Track events
func (*BaseHandler) OnTrackSubscribed ¶
func (h *BaseHandler) OnTrackSubscribed(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant)
func (*BaseHandler) OnTrackUnmuted ¶
func (h *BaseHandler) OnTrackUnmuted(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant)
func (*BaseHandler) OnTrackUnpublished ¶
func (h *BaseHandler) OnTrackUnpublished(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication)
func (*BaseHandler) OnTrackUnsubscribed ¶
func (h *BaseHandler) OnTrackUnsubscribed(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant)
type BatchEventProcessor ¶
type BatchEventProcessor interface { // ShouldBatch determines if an event should be batched ShouldBatch(event ParticipantEvent) bool // ProcessBatch processes a batch of events ProcessBatch(events []ParticipantEvent) error // GetBatchSize returns the preferred batch size GetBatchSize() int // GetBatchTimeout returns the batch timeout GetBatchTimeout() time.Duration }
BatchEventProcessor processes events in batches
type CPUMemoryLoadCalculator ¶
type CPUMemoryLoadCalculator struct { // JobWeight is the weight given to job count (default: 0.4) JobWeight float32 // CPUWeight is the weight given to CPU usage (default: 0.3) CPUWeight float32 // MemoryWeight is the weight given to memory usage (default: 0.3) MemoryWeight float32 }
CPUMemoryLoadCalculator calculates load based on CPU and memory usage.
This calculator combines multiple metrics using weighted averages:
- Job count relative to capacity
- CPU usage percentage
- Memory usage percentage
The weights are configurable but should sum to 1.0 for normalized output. This calculator is ideal for workers that process resource-intensive jobs where system resources are the limiting factor.
func NewCPUMemoryLoadCalculator ¶
func NewCPUMemoryLoadCalculator() *CPUMemoryLoadCalculator
NewCPUMemoryLoadCalculator creates a calculator with default weights.
Default weights:
- Jobs: 40%
- CPU: 30%
- Memory: 30%
These weights provide a balanced view of worker load, considering both job count and system resource utilization.
func (*CPUMemoryLoadCalculator) Calculate ¶
func (c *CPUMemoryLoadCalculator) Calculate(metrics LoadMetrics) float32
Calculate implements LoadCalculator using weighted system metrics.
The calculation combines three components:
- Job load: ActiveJobs/MaxJobs (or heuristic if unlimited)
- CPU load: CPUPercent/100
- Memory load: MemoryPercent/100
Final load = (JobLoad * JobWeight) + (CPULoad * CPUWeight) + (MemoryLoad * MemoryWeight)
The result is clamped between 0.0 and 1.0.
type CPUThrottler ¶
type CPUThrottler struct {
// contains filtered or unexported fields
}
CPUThrottler implements CPU usage throttling by applying sleep delays and temporarily reducing GOMAXPROCS when CPU usage exceeds quotas. This helps prevent agents from consuming excessive CPU resources in shared environments.
func NewCPUThrottler ¶
func NewCPUThrottler(quotaPercent int) *CPUThrottler
NewCPUThrottler creates a new CPU throttler with the specified quota percentage. The quota represents the maximum allowed CPU usage as a percentage (100 = 1 CPU core).
func (*CPUThrottler) Throttle ¶
func (t *CPUThrottler) Throttle(currentUsagePercent float64)
Throttle applies CPU throttling proportional to the excess usage above the quota. It uses sleep delays and temporarily reduces GOMAXPROCS for severe overages. The throttling is applied immediately in the calling goroutine.
type ClockSkewDetector ¶
type ClockSkewDetector struct {
// contains filtered or unexported fields
}
ClockSkewDetector provides standalone clock skew detection
func NewClockSkewDetector ¶
func NewClockSkewDetector(maxSamples int) *ClockSkewDetector
NewClockSkewDetector creates a new clock skew detector
func (*ClockSkewDetector) AddSample ¶
func (d *ClockSkewDetector) AddSample(localTime, remoteTime time.Time) time.Duration
AddSample adds a clock difference sample
func (*ClockSkewDetector) GetAverageSkew ¶
func (d *ClockSkewDetector) GetAverageSkew() time.Duration
GetAverageSkew returns the average clock skew
type ConnectionQualityMonitor ¶
type ConnectionQualityMonitor struct {
// contains filtered or unexported fields
}
ConnectionQualityMonitor tracks and analyzes the connection quality of a participant over time. It maintains a history of quality measurements and provides methods to analyze connection stability, calculate average quality over time periods, and trigger callbacks on quality changes. This is useful for adaptive behaviors like adjusting subscription quality or providing user feedback about connection issues.
func NewConnectionQualityMonitor ¶
func NewConnectionQualityMonitor() *ConnectionQualityMonitor
NewConnectionQualityMonitor creates a new connection quality monitor with default settings. The monitor starts with GOOD quality assumption and maintains a history of the last 10 measurements. No participant is initially monitored until StartMonitoring is called.
func (*ConnectionQualityMonitor) GetAverageQuality ¶
func (cm *ConnectionQualityMonitor) GetAverageQuality(duration time.Duration) livekit.ConnectionQuality
GetAverageQuality calculates the average connection quality over the specified time period. Only measurements within the duration from now are considered. If no measurements exist within the time period, returns the current quality. The average is calculated by treating quality enum values as integers and averaging them.
func (*ConnectionQualityMonitor) GetCurrentQuality ¶
func (cm *ConnectionQualityMonitor) GetCurrentQuality() livekit.ConnectionQuality
GetCurrentQuality returns the most recent connection quality measurement. This is thread-safe and returns the quality as of the last UpdateQuality call.
func (*ConnectionQualityMonitor) GetQualityHistory ¶
func (cm *ConnectionQualityMonitor) GetQualityHistory() []QualityMeasurement
GetQualityHistory returns a copy of all quality measurements in the history. The returned slice is independent of the internal history and can be safely modified. Measurements are ordered chronologically (oldest first).
func (*ConnectionQualityMonitor) IsStable ¶
func (cm *ConnectionQualityMonitor) IsStable(duration time.Duration) bool
IsStable returns true if connection quality has remained constant within the specified time period. This is useful for determining if a connection has stabilized before making subscription or quality adjustments. Returns true if fewer than 2 measurements exist, or if all measurements within the time period have the same quality value.
func (*ConnectionQualityMonitor) SetQualityChangeCallback ¶
func (cm *ConnectionQualityMonitor) SetQualityChangeCallback(callback func(oldQuality, newQuality livekit.ConnectionQuality))
SetQualityChangeCallback sets a callback function to be invoked whenever connection quality changes. The callback receives the old and new quality values. It is called asynchronously to avoid blocking the quality update process. Pass nil to remove the callback.
func (*ConnectionQualityMonitor) StartMonitoring ¶
func (cm *ConnectionQualityMonitor) StartMonitoring(participant *lksdk.RemoteParticipant)
StartMonitoring begins monitoring the connection quality for the specified participant. This resets any previous monitoring state and starts fresh quality tracking. Call UpdateQuality to provide quality measurements after starting monitoring.
func (*ConnectionQualityMonitor) StopMonitoring ¶
func (cm *ConnectionQualityMonitor) StopMonitoring()
StopMonitoring stops monitoring connection quality for the current participant. This clears the participant reference but preserves the quality history. The monitor can be reused by calling StartMonitoring with a new participant.
func (*ConnectionQualityMonitor) UpdateQuality ¶
func (cm *ConnectionQualityMonitor) UpdateQuality(quality livekit.ConnectionQuality)
UpdateQuality updates the current connection quality measurement and adds it to the history. If a quality change callback is registered and the quality has changed, the callback will be invoked asynchronously. The history is automatically trimmed to maintain the configured history size limit.
type CoordinatedParticipant ¶
type CoordinatedParticipant struct { Identity string Participant *lksdk.RemoteParticipant JoinedAt time.Time LastActivity time.Time ActivityCount int Groups []string State map[string]interface{} ActivityHistory []ParticipantActivity }
CoordinatedParticipant represents a participant in the coordination system
type CoordinationAction ¶
CoordinationAction represents an action to take based on coordination rules
type CoordinationEvent ¶
type CoordinationEvent struct { Type string Timestamp time.Time Participants []string Data map[string]interface{} }
CoordinationEvent represents a coordination event
type CoordinationEventHandler ¶
type CoordinationEventHandler func(event CoordinationEvent)
CoordinationEventHandler handles coordination events
type CoordinationRule ¶
type CoordinationRule interface { // Evaluate evaluates if the rule applies to the given participants Evaluate(participants map[string]*CoordinatedParticipant) (bool, []CoordinationAction) // GetName returns the rule name GetName() string }
CoordinationRule defines rules for participant coordination
type DeadlineContext ¶
type DeadlineContext struct { JobID string OriginalDeadline time.Time AdjustedDeadline time.Time PropagatedFrom string CreatedAt time.Time }
DeadlineContext tracks deadline information for a job
type DeadlineManager ¶
type DeadlineManager struct {
// contains filtered or unexported fields
}
DeadlineManager provides high-level deadline management
func NewDeadlineManager ¶
func NewDeadlineManager(tm *TimingManager, logger *zap.Logger) *DeadlineManager
NewDeadlineManager creates a new deadline manager
func (*DeadlineManager) CreateContextWithDeadline ¶
func (dm *DeadlineManager) CreateContextWithDeadline(ctx context.Context, jobID string) (context.Context, context.CancelFunc)
CreateContextWithDeadline creates a context with the job's deadline
func (*DeadlineManager) SetJobDeadline ¶
func (dm *DeadlineManager) SetJobDeadline(job *livekit.Job)
SetJobDeadline sets a deadline for a job from a Job message
type DefaultJobRecoveryHandler ¶
type DefaultJobRecoveryHandler struct{}
DefaultJobRecoveryHandler provides a default recovery implementation.
The default behavior:
- Attempts recovery only for jobs with JS_RUNNING status
- No special handling on successful recovery
- No special handling on failed recovery
This handler is used when no custom handler is provided.
func (*DefaultJobRecoveryHandler) OnJobRecovered ¶
func (d *DefaultJobRecoveryHandler) OnJobRecovered(ctx context.Context, job *livekit.Job, room *lksdk.Room)
OnJobRecovered implements JobRecoveryHandler. Default implementation does nothing.
func (*DefaultJobRecoveryHandler) OnJobRecoveryAttempt ¶
func (d *DefaultJobRecoveryHandler) OnJobRecoveryAttempt(ctx context.Context, jobID string, jobState *JobState) bool
OnJobRecoveryAttempt implements JobRecoveryHandler. By default, only attempts recovery for jobs with JS_RUNNING status.
func (*DefaultJobRecoveryHandler) OnJobRecoveryFailed ¶
func (d *DefaultJobRecoveryHandler) OnJobRecoveryFailed(ctx context.Context, jobID string, err error)
OnJobRecoveryFailed implements JobRecoveryHandler. Default implementation does nothing.
type DefaultLoadCalculator ¶
type DefaultLoadCalculator struct{}
DefaultLoadCalculator implements the default load calculation strategy.
This calculator uses a simple approach based solely on job count:
- With MaxJobs set: load = ActiveJobs / MaxJobs
- Without MaxJobs: load = ActiveJobs * 0.1 (capped at 1.0)
This is suitable for workers where job count is the primary constraint and system resources are not a concern.
func (*DefaultLoadCalculator) Calculate ¶
func (d *DefaultLoadCalculator) Calculate(metrics LoadMetrics) float32
Calculate implements LoadCalculator using job count ratio.
The calculation is straightforward:
- If MaxJobs > 0: Returns ActiveJobs/MaxJobs
- If MaxJobs = 0: Returns ActiveJobs * 0.1, capped at 1.0
This assumes each job contributes equally to load and ignores system resource usage.
type DefaultLogger ¶
type DefaultLogger struct{}
DefaultLogger is a simple logger implementation
func (*DefaultLogger) Debug ¶
func (l *DefaultLogger) Debug(msg string, fields ...interface{})
Debug logs a debug message
func (*DefaultLogger) Error ¶
func (l *DefaultLogger) Error(msg string, fields ...interface{})
Error logs an error message
func (*DefaultLogger) Info ¶
func (l *DefaultLogger) Info(msg string, fields ...interface{})
Info logs an info message
func (*DefaultLogger) Warn ¶
func (l *DefaultLogger) Warn(msg string, fields ...interface{})
Warn logs a warning message
type DefaultPriorityCalculator ¶
type DefaultPriorityCalculator struct{}
DefaultPriorityCalculator provides a default priority calculation strategy.
Priority is determined by:
- Job metadata containing priority hints ("urgent", "high", "low")
- Job type (room jobs get higher priority by default)
- Falls back to normal priority if no hints are found
func (*DefaultPriorityCalculator) CalculatePriority ¶
func (d *DefaultPriorityCalculator) CalculatePriority(job *livekit.Job) JobPriority
CalculatePriority determines priority based on job metadata and type.
Priority assignment:
- Metadata "urgent" or "high-priority" → JobPriorityUrgent
- Metadata "high" → JobPriorityHigh
- Metadata "low" → JobPriorityLow
- JobType JT_ROOM → JobPriorityHigh (if no metadata)
- All others → JobPriorityNormal
type DefaultShutdownHooks ¶
type DefaultShutdownHooks struct{}
DefaultShutdownHooks provides factory methods for creating commonly needed shutdown hooks. These hooks handle standard cleanup tasks like flushing metrics, logs, draining connections, and backing up state. Each factory method returns a pre-configured ShutdownHook with appropriate priorities and timeouts.
func (DefaultShutdownHooks) NewConnectionDrainHook ¶
func (d DefaultShutdownHooks) NewConnectionDrainHook(drain func(context.Context) error) ShutdownHook
NewConnectionDrainHook creates a shutdown hook that drains active connections gracefully. This hook has priority 50 and a 10-second timeout. It's typically used early in shutdown to allow clients to finish ongoing operations before forced termination.
func (DefaultShutdownHooks) NewLogFlushHook ¶
func (d DefaultShutdownHooks) NewLogFlushHook(logger *zap.Logger) ShutdownHook
NewLogFlushHook creates a shutdown hook that flushes the logger's buffer before termination. This hook has priority 200 and a 2-second timeout. It ensures all pending log messages are written to their destinations before the worker shuts down.
func (DefaultShutdownHooks) NewMetricsFlushHook ¶
func (d DefaultShutdownHooks) NewMetricsFlushHook(flush func() error) ShutdownHook
NewMetricsFlushHook creates a shutdown hook that flushes metrics before termination. This hook has priority 100 and a 3-second timeout. It's typically used in the final shutdown phase to ensure metrics are sent to external systems.
func (DefaultShutdownHooks) NewStateBackupHook ¶
func (d DefaultShutdownHooks) NewStateBackupHook(backup func() error) ShutdownHook
NewStateBackupHook creates a shutdown hook that backs up application state before termination. This hook has priority 75 and a 5-second timeout. It's useful for persisting important state information that can be recovered when the worker restarts.
type Error ¶
type Error struct { // Code is a stable identifier for the error type. Code string // Message provides human-readable error details. Message string }
Error represents a typed error with a code and message. Error codes are stable and can be used for programmatic error handling.
type EventFilter ¶
type EventFilter func(event ParticipantEvent) bool
EventFilter filters events before processing
type EventHandler ¶
type EventHandler func(event ParticipantEvent) error
EventHandler handles individual events
type EventProcessingMetrics ¶
type EventProcessingMetrics struct {
// contains filtered or unexported fields
}
EventProcessingMetrics tracks event processing metrics
type EventType ¶
type EventType string
EventType represents the type of participant event
const ( EventTypeParticipantJoined EventType = "participant_joined" EventTypeParticipantLeft EventType = "participant_left" EventTypeTrackPublished EventType = "track_published" EventTypeTrackUnpublished EventType = "track_unpublished" EventTypeMetadataChanged EventType = "metadata_changed" EventTypeNameChanged EventType = "name_changed" EventTypePermissionsChanged EventType = "permissions_changed" EventTypeSpeakingChanged EventType = "speaking_changed" EventTypeDataReceived EventType = "data_received" EventTypeConnectionQuality EventType = "connection_quality" )
type FileDescriptorTracker ¶
type FileDescriptorTracker struct { }
FileDescriptorTracker monitors the number of open file descriptors for the current process. It can count file descriptors and categorize them by type (files, sockets, pipes, etc.) to help diagnose file descriptor leaks and resource usage patterns.
func NewFileDescriptorTracker ¶
func NewFileDescriptorTracker() *FileDescriptorTracker
NewFileDescriptorTracker creates a new file descriptor tracker. The tracker works by examining the /proc/self/fd directory on Unix systems.
func (*FileDescriptorTracker) GetCurrentCount ¶
func (t *FileDescriptorTracker) GetCurrentCount() int
GetCurrentCount returns the current number of open file descriptors for the process. On Unix systems, this counts entries in /proc/self/fd. If that fails, it returns a default estimate rather than attempting syscalls that might interfere with runtime.
func (*FileDescriptorTracker) LogOpenFiles ¶
func (t *FileDescriptorTracker) LogOpenFiles(logger *zap.Logger)
LogOpenFiles logs detailed information about currently open file descriptors, categorized by type (files, sockets, pipes, etc.). This is useful for debugging file descriptor leaks and understanding resource usage patterns.
type GroupRule ¶
type GroupRule interface { // CanJoinGroup checks if a participant can join the group CanJoinGroup(participant *CoordinatedParticipant, group *ParticipantGroup) bool // OnGroupChange is called when group membership changes OnGroupChange(group *ParticipantGroup, added, removed []string) }
GroupRule defines rules for participant groups
type JobAcceptInfo ¶
type JobAcceptInfo struct { Identity string Name string Metadata string Attributes map[string]string SupportsResume bool }
JobAcceptInfo contains info for accepting a job
type JobAcceptMessage ¶
type JobAcceptMessage struct { JobId string Accept *JobAcceptInfo }
JobAcceptMessage wraps job acceptance
type JobCheckpoint ¶
type JobCheckpoint struct {
// contains filtered or unexported fields
}
JobCheckpoint allows handlers to save job progress.
Checkpoints enable jobs to save their state periodically, making it possible to resume from a known point after interruptions. The checkpoint data is preserved during recovery attempts.
Example usage:
checkpoint := NewJobCheckpoint(jobID) checkpoint.Save("processed_count", 150) checkpoint.Save("last_timestamp", time.Now()) // Later, after recovery if count, ok := checkpoint.Load("processed_count"); ok { processedCount := count.(int) // Resume from this count }
func NewJobCheckpoint ¶
func NewJobCheckpoint(jobID string) *JobCheckpoint
NewJobCheckpoint creates a new checkpoint for a job.
The checkpoint starts empty and can be populated with Save calls.
func (*JobCheckpoint) Clear ¶
func (c *JobCheckpoint) Clear()
Clear removes all checkpoint data.
Use this to reset the checkpoint to an empty state. The method is thread-safe.
func (*JobCheckpoint) GetAll ¶
func (c *JobCheckpoint) GetAll() map[string]interface{}
GetAll returns all checkpoint data.
Returns a copy of the checkpoint data map. Modifications to the returned map won't affect the checkpoint's internal state.
This is useful for serialization or debugging. The method is thread-safe.
func (*JobCheckpoint) Load ¶
func (c *JobCheckpoint) Load(key string) (interface{}, bool)
Load retrieves a checkpoint value.
Returns the value and true if the key exists, or nil and false if not found. The caller should type assert the returned value to the expected type.
The method is thread-safe.
func (*JobCheckpoint) Save ¶
func (c *JobCheckpoint) Save(key string, value interface{})
Save stores a checkpoint value.
The value can be any serializable type. Common types include:
- Basic types: int, string, bool, float64
- Time values: time.Time
- Slices and maps of basic types
The method is thread-safe.
type JobContext ¶
type JobContext struct { Job *livekit.Job Room *lksdk.Room Cancel context.CancelFunc StartedAt time.Time // Job-specific context TargetParticipant *lksdk.RemoteParticipant // For PARTICIPANT jobs PublisherInfo *PublisherInfo // For PUBLISHER jobs CustomData map[string]interface{} // For extension data }
JobContext contains all context for an active job
type JobHandler ¶
type JobHandler interface { // OnJobRequest is called when a job is offered to the agent. // The agent should inspect the job details and decide whether to accept it. // If accepted, the agent should return metadata specifying how it will join the room. // // This method should return quickly as the server waits for the response. // Heavy initialization should be deferred to OnJobAssigned. // // Parameters: // - ctx: Context for the request, may have a deadline // - job: The job being offered, contains room and participant information // // Returns: // - accept: true to accept the job, false to decline // - metadata: Agent participant metadata if accepting (can be nil) OnJobRequest(ctx context.Context, job *livekit.Job) (accept bool, metadata *JobMetadata) // OnJobAssigned is called when a job has been assigned to this agent. // The agent is already connected to the room as a participant when this is called. // This is where the main agent logic should be implemented. // // The method should block until the agent's work is complete or the context is cancelled. // Returning an error will mark the job as failed. // // Parameters: // - ctx: Context for the job, cancelled when the job should terminate // - job: The assigned job with full details // - room: Connected room client for interacting with the room // // Returns: // - error: nil on success, error to mark job as failed OnJobAssigned(ctx context.Context, job *livekit.Job, room *lksdk.Room) error // OnJobTerminated is called when a job is terminated. // This can happen due to: // - Agent disconnection // - Job dispatch removal by the server // - Server shutdown // - Explicit job termination request // // Use this method to clean up any resources associated with the job. // The room connection is already closed when this is called. // // Parameters: // - ctx: Context for cleanup operations // - jobID: ID of the terminated job OnJobTerminated(ctx context.Context, jobID string) }
JobHandler is the interface that agents must implement to handle jobs. A job represents a task assigned to an agent, typically involving joining a LiveKit room and performing some processing on the media streams or interacting with participants.
Implementations should be thread-safe as methods may be called concurrently.
type JobMetadata ¶
type JobMetadata struct { // ParticipantIdentity is the unique identity the agent will use when joining the room. // If empty, a default identity will be generated. ParticipantIdentity string // ParticipantName is the display name for the agent participant. // This is what other participants will see as the agent's name. ParticipantName string // ParticipantMetadata is optional metadata attached to the participant. // This can be any string data (often JSON) that other participants can read. ParticipantMetadata string // ParticipantAttributes are key-value pairs attached to the participant. // These are synchronized to all participants and can be updated during the session. ParticipantAttributes map[string]string // SupportsResume indicates if the agent can resume a previously started job. // If true, the agent should be able to recover its state if reconnected to the same job. SupportsResume bool }
JobMetadata contains agent-specific metadata for a job. When an agent accepts a job, it provides this metadata to specify how it will appear as a participant in the room.
type JobPriority ¶
type JobPriority int
JobPriority represents the priority level of a job.
Jobs are processed based on their priority, with higher priority jobs being dequeued before lower priority ones. Within the same priority level, jobs are processed in FIFO order based on their enqueue time.
const ( // JobPriorityLow indicates a low priority job that can be processed after other jobs. // Suitable for background tasks or non-time-sensitive operations. JobPriorityLow JobPriority = 0 // JobPriorityNormal indicates a standard priority job. // This is the default priority for most jobs. JobPriorityNormal JobPriority = 1 // JobPriorityHigh indicates a high priority job that should be processed soon. // Suitable for user-initiated actions or time-sensitive operations. JobPriorityHigh JobPriority = 2 // JobPriorityUrgent indicates an urgent job that requires immediate processing. // Should be used sparingly for critical operations. JobPriorityUrgent JobPriority = 3 )
type JobPriorityCalculator ¶
type JobPriorityCalculator interface { // CalculatePriority examines a job and returns its priority level. // The implementation should be deterministic for consistent behavior. CalculatePriority(job *livekit.Job) JobPriority }
JobPriorityCalculator determines job priority based on job metadata.
Implementations can inspect job properties, metadata, or external factors to assign appropriate priority levels. This allows for flexible priority assignment strategies without modifying the queue implementation.
type JobQueue ¶
type JobQueue struct {
// contains filtered or unexported fields
}
JobQueue manages pending jobs with priority-based ordering.
The queue uses a priority heap to ensure high-priority jobs are processed before lower-priority ones. Within the same priority level, jobs are processed in FIFO order. The queue is thread-safe and supports blocking dequeue operations.
Example usage:
queue := NewJobQueue(JobQueueOptions{MaxSize: 100}) // Add a job err := queue.Enqueue(job, JobPriorityHigh, token, url) // Get the next job (blocking) item, err := queue.DequeueWithContext(ctx) if err == nil { // Process item.Job }
func NewJobQueue ¶
func NewJobQueue(opts JobQueueOptions) *JobQueue
NewJobQueue creates a new priority-based job queue.
The queue is initialized empty and ready to accept jobs. If MaxSize is specified in options, the queue will enforce that limit.
func (*JobQueue) Clear ¶
func (q *JobQueue) Clear()
Clear removes all jobs from the queue.
After this call, the queue will be empty but still usable. The method is thread-safe.
func (*JobQueue) Close ¶
func (q *JobQueue) Close()
Close closes the queue and prevents new items from being added.
Any blocked DequeueWithContext calls will return with an error. After closing, the queue cannot be reopened.
The method is thread-safe.
func (*JobQueue) Dequeue ¶
func (q *JobQueue) Dequeue() (*JobQueueItem, bool)
Dequeue removes and returns the highest priority job.
Returns the job item and true if a job was available, or nil and false if the queue is empty. This method does not block.
The method is thread-safe.
func (*JobQueue) DequeueWithContext ¶
func (q *JobQueue) DequeueWithContext(ctx context.Context) (*JobQueueItem, error)
DequeueWithContext waits for a job or until context is cancelled.
This method blocks until one of the following occurs:
- A job becomes available in the queue
- The context is cancelled
- The queue is closed
Returns the dequeued job item on success, or an error if the context was cancelled or the queue was closed.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() item, err := queue.DequeueWithContext(ctx) if err != nil { // Handle timeout or cancellation }
func (*JobQueue) Enqueue ¶
Enqueue adds a job to the queue with the specified priority.
Jobs are ordered first by priority (higher priority first) and then by enqueue time (older first) within the same priority level.
Returns an error if:
- The queue is closed
- The queue is full (when MaxSize > 0)
The method is thread-safe and will signal any waiting DequeueWithContext calls.
func (*JobQueue) GetJobsByPriority ¶
func (q *JobQueue) GetJobsByPriority(priority JobPriority) []*JobQueueItem
GetJobsByPriority returns all jobs of a specific priority.
The returned jobs remain in the queue. The slice contains references to the actual queue items, so modifications to the items will affect the queue.
The method is thread-safe but the returned slice is a snapshot and may become stale if the queue is modified.
func (*JobQueue) Peek ¶
func (q *JobQueue) Peek() (*JobQueueItem, bool)
Peek returns the highest priority job without removing it.
Returns the job item and true if the queue has jobs, or nil and false if the queue is empty. The job remains in the queue after this call.
The method is thread-safe.
func (*JobQueue) RemoveJob ¶
RemoveJob removes a specific job from the queue by ID.
Returns true if the job was found and removed, false otherwise. This operation maintains the heap property of the queue.
The method is thread-safe.
type JobQueueItem ¶
type JobQueueItem struct { // Job is the LiveKit job to be processed Job *livekit.Job // Priority determines the processing order of this job Priority JobPriority // EnqueueTime records when the job was added to the queue EnqueueTime time.Time // Token is the authentication token for this job Token string // URL is the server URL for this job URL string // contains filtered or unexported fields }
JobQueueItem represents a job in the priority queue.
Each item contains the job itself along with metadata used for queue management and processing. Items are ordered by priority and enqueue time.
type JobQueueOptions ¶
type JobQueueOptions struct { // MaxSize is the maximum number of jobs allowed in the queue. // Set to 0 for unlimited queue size. // If the queue is full, Enqueue will return an error. MaxSize int }
JobQueueOptions configures the job queue.
type JobRecoveryHandler ¶
type JobRecoveryHandler interface { // OnJobRecoveryAttempt is called when trying to recover a job after reconnection. // // This method allows the handler to decide whether to attempt recovery // based on the job state. Return true to proceed with recovery, false // to abandon the job. // // Common decision factors: // - Job status (only recover running jobs) // - Time since disconnection // - Job type or priority // - Available resources OnJobRecoveryAttempt(ctx context.Context, jobID string, jobState *JobState) bool // OnJobRecovered is called when a job has been successfully recovered. // // The job is now active again with a valid room connection. The handler // can use this callback to restore application state, resume processing, // or notify other components. // // The provided room is already connected and ready for use. OnJobRecovered(ctx context.Context, job *livekit.Job, room *lksdk.Room) // OnJobRecoveryFailed is called when job recovery fails. // // Recovery can fail for various reasons: // - Room no longer exists // - Token expired // - Network issues // - Resource constraints // // The handler can use this to clean up state, log errors, or notify // monitoring systems. OnJobRecoveryFailed(ctx context.Context, jobID string, err error) }
JobRecoveryHandler provides hooks for job recovery during reconnection.
Implement this interface to customize how jobs are recovered after a worker loses and regains connection to the server. This allows for application-specific recovery logic and state restoration.
The recovery process:
- Worker loses connection
- Jobs are saved with their state
- Worker reconnects
- OnJobRecoveryAttempt is called for each saved job
- If approved, recovery is attempted
- OnJobRecovered or OnJobRecoveryFailed is called based on result
type JobRecoveryManager ¶
type JobRecoveryManager struct {
// contains filtered or unexported fields
}
JobRecoveryManager manages job recovery during reconnection.
The manager tracks active jobs and attempts to restore them after network disruptions. It works with JobRecoveryHandler to provide customizable recovery behavior.
Key features:
- Automatic job state preservation
- Configurable recovery timeout
- Retry logic with attempt limits
- Concurrent recovery attempts
- Room reconnection support
func NewJobRecoveryManager ¶
func NewJobRecoveryManager(worker WorkerInterface, handler JobRecoveryHandler) *JobRecoveryManager
NewJobRecoveryManager creates a new job recovery manager.
If handler is nil, DefaultJobRecoveryHandler will be used. The manager uses a 30-second timeout for recovery attempts by default.
func (*JobRecoveryManager) AttemptJobRecovery ¶
func (m *JobRecoveryManager) AttemptJobRecovery(ctx context.Context) map[string]error
AttemptJobRecovery tries to recover all pending jobs after reconnection.
This method should be called after the worker successfully reconnects to the server. It will:
- Check each saved job with the recovery handler
- Skip jobs that are too old or declined by the handler
- Attempt to recover approved jobs concurrently
- Update job states based on recovery results
Returns a map of jobID to error for all recovery attempts. A nil error indicates successful recovery.
func (*JobRecoveryManager) GetRecoverableJobs ¶
func (m *JobRecoveryManager) GetRecoverableJobs() map[string]*RecoverableJob
GetRecoverableJobs returns all jobs pending recovery.
This method returns a copy of the pending recovery map, so modifications to the returned map won't affect the internal state.
Use this to inspect which jobs are queued for recovery, their states, and recovery attempt counts.
func (*JobRecoveryManager) SaveJobForRecovery ¶
func (m *JobRecoveryManager) SaveJobForRecovery(jobID string, job *livekit.Job, token string)
SaveJobForRecovery saves job information for potential recovery.
Call this method when a job starts to enable recovery if the worker disconnects. The job data, token, and current state are preserved.
The saved job will be available for recovery until:
- Recovery succeeds
- Recovery timeout expires
- Maximum retry attempts are exceeded
- The job is explicitly removed
type JobResumptionData ¶
type JobResumptionData struct { // JobID uniquely identifies the job JobID string `json:"job_id"` // WorkerID identifies the worker that was handling the job WorkerID string `json:"worker_id"` // LastStatus is the job's status at the time of suspension LastStatus livekit.JobStatus `json:"last_status"` // LastUpdate is when the job state was last updated LastUpdate time.Time `json:"last_update"` // CheckpointData contains application-specific state for resumption CheckpointData map[string]interface{} `json:"checkpoint_data,omitempty"` }
JobResumptionData contains data needed to resume a job.
This structure can be serialized and used to communicate job state between workers or persist state across restarts. It includes both system-level information and application-specific checkpoint data.
type JobState ¶
type JobState struct { // JobID is the unique job identifier. JobID string // Status is the current job status. Status livekit.JobStatus // StartedAt is when the job was started. StartedAt time.Time // RoomName is the name of the room associated with the job. RoomName string }
JobState holds persistent job state for recovery. This information allows jobs to be resumed after temporary disconnections.
type JobTimingManager ¶
type JobTimingManager struct {
// contains filtered or unexported fields
}
JobTimingManager manages job execution timing and deadlines
func NewJobTimingManager ¶
func NewJobTimingManager() *JobTimingManager
NewJobTimingManager creates a new job timing manager
type JobUtils ¶
type JobUtils struct { Job *livekit.Job Room *lksdk.Room // contains filtered or unexported fields }
JobUtils provides a convenient wrapper around job execution context with utility methods for common operations like waiting for participants, publishing data, and managing job lifecycle. This simplifies job handler implementations.
func NewJobUtils ¶
NewJobUtils creates a new job utils wrapper with the provided context, job, and room. This is typically called at the beginning of a job handler to create a convenient interface for job operations.
func (*JobUtils) Done ¶
func (jc *JobUtils) Done() <-chan struct{}
Done returns a channel that's closed when the job should stop execution. This is equivalent to jc.Context().Done().
func (*JobUtils) GetTargetParticipant ¶
func (jc *JobUtils) GetTargetParticipant() *lksdk.RemoteParticipant
GetTargetParticipant returns the target participant for publisher/participant jobs. Returns nil if the job doesn't have a target participant or if the participant is not currently in the room.
func (*JobUtils) PublishData ¶
PublishData publishes data to the room with optional reliability and targeting. If destinationIdentities is provided, the data is sent only to those participants. Otherwise, it's sent to all participants in the room.
func (*JobUtils) Sleep ¶
Sleep pauses execution for the specified duration or until the job is cancelled. Returns an error if the job context is cancelled during the sleep.
func (*JobUtils) WaitForParticipant ¶
func (jc *JobUtils) WaitForParticipant(identity string, timeout time.Duration) (*lksdk.RemoteParticipant, error)
WaitForParticipant waits for a participant with the specified identity to join the room. It polls every 100ms until the participant joins or the timeout is reached. Returns an error if the timeout is exceeded or the job context is cancelled.
type LoadBalancer ¶
type LoadBalancer struct {
// contains filtered or unexported fields
}
LoadBalancer helps distribute work across multiple workers by tracking their load and availability. It can be used to implement custom load balancing strategies for job distribution in multi-worker deployments.
func NewLoadBalancer ¶
func NewLoadBalancer() *LoadBalancer
NewLoadBalancer creates a new load balancer with an empty worker registry.
func (*LoadBalancer) GetLeastLoadedWorker ¶
func (lb *LoadBalancer) GetLeastLoadedWorker() *WorkerInfo
GetLeastLoadedWorker returns the worker with the lowest load that is still accepting jobs. Workers that haven't updated in the last 30 seconds or are at capacity are ignored. Returns nil if no suitable worker is available.
func (*LoadBalancer) GetWorkerCount ¶
func (lb *LoadBalancer) GetWorkerCount() int
GetWorkerCount returns the number of active workers (those that have reported status within the last 30 seconds).
func (*LoadBalancer) RemoveWorker ¶
func (lb *LoadBalancer) RemoveWorker(id string)
RemoveWorker removes a worker from the load balancer tracking. This should be called when a worker shuts down or becomes unavailable.
func (*LoadBalancer) UpdateWorker ¶
func (lb *LoadBalancer) UpdateWorker(id string, load float32, jobCount, maxJobs int)
UpdateWorker updates or adds information about a worker. This should be called periodically by workers to report their current status to the load balancer.
type LoadBatcher ¶
type LoadBatcher struct {
// contains filtered or unexported fields
}
LoadBatcher manages batched status updates to reduce network overhead.
Instead of sending every load update immediately, the batcher collects updates and sends them at regular intervals. This reduces:
- Network traffic to the server
- Server processing overhead
- WebSocket message frequency
Only the most recent update within each interval is sent.
func NewLoadBatcher ¶
func NewLoadBatcher(worker WorkerInterface, interval time.Duration) *LoadBatcher
NewLoadBatcher creates a new load update batcher.
Parameters:
- worker: The worker whose status to update
- interval: Batching interval (default: 5 seconds if <= 0)
The batcher will hold updates for the specified interval before sending them to the server.
func (*LoadBatcher) Start ¶
func (b *LoadBatcher) Start()
Start begins the batching process.
Currently a no-op as batching is triggered by Update calls. Included for API consistency with other components.
func (*LoadBatcher) Stop ¶
func (b *LoadBatcher) Stop()
Stop stops the batcher and sends any pending update.
This ensures the final status is sent to the server before shutdown. After Stop is called, further Update calls will be ignored.
func (*LoadBatcher) Update ¶
func (b *LoadBatcher) Update(status WorkerStatus, load float32)
Update queues a status update for batched sending.
Multiple updates within the batching interval will be coalesced, with only the most recent values being sent. This method is non-blocking and thread-safe.
Updates after Stop() are ignored.
type LoadCalculator ¶
type LoadCalculator interface { // Calculate returns a load value between 0.0 and 1.0. // // The calculation can consider any combination of metrics: // - Number of active jobs vs capacity // - CPU and memory usage // - Job processing duration // - Historical trends // // Return values: // - 0.0: Worker is idle // - 0.0-0.5: Light load // - 0.5-0.8: Moderate load // - 0.8-1.0: Heavy load // - 1.0: Worker at full capacity Calculate(metrics LoadMetrics) float32 }
LoadCalculator provides custom load calculation for workers.
Implementations determine how "loaded" a worker is based on various metrics. The load value is used for:
- Job distribution decisions
- Auto-scaling triggers
- Health monitoring
- Status reporting to the server
Load values must be normalized between 0.0 (no load) and 1.0 (full load).
type LoadMetrics ¶
type LoadMetrics struct { // ActiveJobs is the current number of jobs being processed ActiveJobs int // MaxJobs is the maximum number of concurrent jobs allowed (0 = unlimited) MaxJobs int // JobDuration maps job IDs to their processing duration. // This helps identify long-running jobs that might indicate higher load. JobDuration map[string]time.Duration // CPUPercent is the current CPU usage percentage (0-100) CPUPercent float64 // MemoryPercent is the current memory usage percentage (0-100) MemoryPercent float64 // MemoryUsedMB is the actual memory used in megabytes MemoryUsedMB uint64 // MemoryTotalMB is the total available memory in megabytes MemoryTotalMB uint64 // RecentLoads contains recent load calculations for trend analysis. // Newer values should be appended to the end. RecentLoads []float32 }
LoadMetrics contains metrics for load calculation.
This structure provides various measurements that LoadCalculator implementations can use to determine worker load. Not all fields need to be populated - calculators should handle missing data gracefully.
type Logger ¶
type Logger interface { // Debug logs a debug-level message with optional fields. Debug(msg string, fields ...interface{}) // Info logs an info-level message with optional fields. Info(msg string, fields ...interface{}) // Warn logs a warning-level message with optional fields. Warn(msg string, fields ...interface{}) // Error logs an error-level message with optional fields. Error(msg string, fields ...interface{}) }
Logger interface for pluggable logging. Implement this interface to integrate with your application's logging system. The fields parameter accepts key-value pairs for structured logging.
func NewDefaultLogger ¶
func NewDefaultLogger() Logger
NewDefaultLogger creates a new default logger
type MediaBuffer ¶
type MediaBuffer struct {
// contains filtered or unexported fields
}
MediaBuffer provides buffering for media data.
Buffers are used to queue media between pipeline stages and to store output for consumption. They provide overflow protection by dropping old data when full (if configured).
The buffer is thread-safe for concurrent access.
func (*MediaBuffer) Clear ¶
func (mb *MediaBuffer) Clear()
Clear removes all items from the buffer.
The buffer remains usable after clearing. The method is thread-safe.
func (*MediaBuffer) Dequeue ¶
func (mb *MediaBuffer) Dequeue() *MediaData
Dequeue removes and returns data from the buffer.
Returns the oldest item in the buffer, or nil if empty. The method is thread-safe.
func (*MediaBuffer) Enqueue ¶
func (mb *MediaBuffer) Enqueue(data MediaData) bool
Enqueue adds data to the buffer.
If the buffer is full:
- With dropOldest=true: Oldest item is removed to make space
- With dropOldest=false: Returns false without adding
Returns true if the data was successfully added. The method is thread-safe.
func (*MediaBuffer) Size ¶
func (mb *MediaBuffer) Size() int
Size returns the current number of items in the buffer.
The method is thread-safe.
type MediaBufferFactory ¶
type MediaBufferFactory struct {
// contains filtered or unexported fields
}
MediaBufferFactory creates media buffers with consistent settings.
The factory ensures all buffers in a pipeline have appropriate size limits and overflow behavior.
func NewMediaBufferFactory ¶
func NewMediaBufferFactory(defaultSize, maxSize int) *MediaBufferFactory
NewMediaBufferFactory creates a new buffer factory.
Parameters:
- defaultSize: Initial capacity for buffers (pre-allocated slots)
- maxSize: Maximum number of items a buffer can hold
The factory ensures consistent buffer configuration across the pipeline.
func (*MediaBufferFactory) CreateBuffer ¶
func (mbf *MediaBufferFactory) CreateBuffer() *MediaBuffer
CreateBuffer creates a new media buffer.
The buffer is configured with:
- Pre-allocated capacity based on defaultSize
- Maximum size limit from factory settings
- Drop-oldest behavior when full
All buffers created by the same factory have consistent settings.
type MediaData ¶
type MediaData struct { // Type indicates whether this is audio or video data Type MediaType // TrackID identifies which track this data belongs to TrackID string // Timestamp is when this data was captured Timestamp time.Time // Data contains the raw media payload Data []byte // Format describes the media format Format MediaFormat // Metadata contains additional information that stages can use Metadata map[string]interface{} }
MediaData represents media data flowing through the pipeline.
This is the fundamental data structure that moves between pipeline stages. It contains the media payload along with format information and metadata.
type MediaFormat ¶
type MediaFormat struct { // SampleRate is the number of audio samples per second (Hz) SampleRate uint32 // Channels is the number of audio channels (1=mono, 2=stereo, etc.) Channels uint8 // BitDepth is the number of bits per audio sample BitDepth uint8 // Width is the video frame width in pixels Width uint32 // Height is the video frame height in pixels Height uint32 // FrameRate is the video frames per second FrameRate float64 // PixelFormat describes how pixels are encoded PixelFormat VideoFormat }
MediaFormat contains format information for media data.
This structure describes the technical properties of media, allowing stages and processors to understand and validate the data they're working with.
type MediaMetricsCollector ¶
type MediaMetricsCollector struct {
// contains filtered or unexported fields
}
MediaMetricsCollector collects metrics from the media pipeline.
The collector aggregates statistics from all tracks being processed, providing a centralized view of pipeline performance.
func NewMediaMetricsCollector ¶
func NewMediaMetricsCollector() *MediaMetricsCollector
NewMediaMetricsCollector creates a new metrics collector.
The collector starts with an empty metrics map and is ready to record statistics for any number of tracks.
func (*MediaMetricsCollector) GetAllMetrics ¶
func (mmc *MediaMetricsCollector) GetAllMetrics() map[string]*MediaProcessingStats
GetAllMetrics returns metrics for all tracks.
Returns a map of track IDs to their statistics. The map and all statistics are copies, so modifications won't affect the collector's internal state.
Use this to get a complete view of pipeline performance across all tracks.
func (*MediaMetricsCollector) GetMetrics ¶
func (mmc *MediaMetricsCollector) GetMetrics(trackID string) (*MediaProcessingStats, bool)
GetMetrics returns metrics for a specific track.
Returns a copy of the statistics and true if the track exists, or nil and false if no metrics have been recorded for this track.
The returned statistics are a snapshot and won't update.
func (*MediaMetricsCollector) RecordProcessing ¶
func (mmc *MediaMetricsCollector) RecordProcessing(trackID string, success bool, processingTime time.Duration)
RecordProcessing records processing metrics for a track.
Parameters:
- trackID: Unique identifier for the track
- success: Whether processing succeeded or failed
- processingTime: How long the processing took
If this is the first recording for a track, statistics are automatically initialized.
type MediaPipeline ¶
type MediaPipeline struct {
// contains filtered or unexported fields
}
MediaPipeline represents a processing pipeline for media tracks.
The pipeline provides a flexible framework for processing audio and video data through a series of configurable stages. Each stage can perform operations like transcoding, filtering, enhancement, or analysis.
Key features:
- Multi-stage processing with priority ordering
- Per-track pipeline instances
- Built-in buffering and metrics collection
- Support for custom processors
- Concurrent processing of multiple tracks
Example usage:
pipeline := NewMediaPipeline() pipeline.AddStage(NewTranscodingStage("transcode", 10, targetFormat)) pipeline.AddStage(NewFilteringStage("denoise", 20)) err := pipeline.StartProcessingTrack(remoteTrack) if err == nil { // Get processed output buffer, _ := pipeline.GetOutputBuffer(remoteTrack.ID()) }
func NewMediaPipeline ¶
func NewMediaPipeline() *MediaPipeline
NewMediaPipeline creates a new media processing pipeline.
The pipeline is initialized with:
- Empty stage list (add stages with AddStage)
- Default buffer factory (100 initial size, 1000 max)
- New metrics collector
The pipeline is ready to use immediately after creation.
func (*MediaPipeline) AddStage ¶
func (mp *MediaPipeline) AddStage(stage MediaPipelineStage)
AddStage adds a processing stage to the pipeline.
Stages are automatically sorted by priority after addition. Lower priority values execute first. Multiple stages can have the same priority and will execute in the order they were added.
The stage will be applied to all tracks processed by this pipeline.
func (*MediaPipeline) GetOutputBuffer ¶
func (mp *MediaPipeline) GetOutputBuffer(trackID string) (*MediaBuffer, bool)
GetOutputBuffer returns the output buffer for a track.
The output buffer contains processed media data ready for consumption. Returns the buffer and true if the track exists, or nil and false otherwise.
The returned buffer reference remains valid until StopProcessingTrack is called. Callers can dequeue data from this buffer to get processed media.
func (*MediaPipeline) GetProcessingStats ¶
func (mp *MediaPipeline) GetProcessingStats(trackID string) (*MediaProcessingStats, bool)
GetProcessingStats returns processing statistics for a track.
Returns a copy of the current statistics and true if the track exists, or nil and false if the track is not being processed.
The returned statistics are a snapshot and won't update.
func (*MediaPipeline) RegisterProcessor ¶
func (mp *MediaPipeline) RegisterProcessor(processor MediaProcessor)
RegisterProcessor registers a media processor with the pipeline.
Processors can be used by pipeline stages to perform specific media transformations. Each processor must have a unique name. Registering a processor with an existing name will replace it.
func (*MediaPipeline) RemoveStage ¶
func (mp *MediaPipeline) RemoveStage(stageName string)
RemoveStage removes a processing stage from the pipeline.
The stage is identified by its name. If multiple stages have the same name, all of them will be removed. This operation does not affect currently processing media.
func (*MediaPipeline) StartProcessingTrack ¶
func (mp *MediaPipeline) StartProcessingTrack(track *webrtc.TrackRemote) error
StartProcessingTrack starts processing a media track.
This creates a dedicated pipeline instance for the track with:
- Input and output buffers
- Processing goroutine
- Media receiver
- Statistics tracking
Each track can only be processed once. Attempting to process the same track again will return an error.
The track will be processed until StopProcessingTrack is called or the pipeline is destroyed.
func (*MediaPipeline) StopProcessingTrack ¶
func (mp *MediaPipeline) StopProcessingTrack(trackID string)
StopProcessingTrack stops processing a media track.
This gracefully shuts down the track's pipeline by:
- Cancelling the processing context
- Waiting for the processing goroutine to finish
- Cleaning up resources
If the track is not being processed, this method does nothing. The method blocks until processing has fully stopped.
type MediaPipelineStage ¶
type MediaPipelineStage interface { // GetName returns the unique name of this stage. // Used for identification and logging. GetName() string // Process transforms media data through this stage. // The stage should return modified data or an error if processing fails. // Stages can modify data in-place or create new data. Process(ctx context.Context, input MediaData) (MediaData, error) // CanProcess checks if this stage can process the given media type. // Return false to skip this stage for specific media types. CanProcess(mediaType MediaType) bool // GetPriority returns the stage priority. // Lower numbers run first. Stages with the same priority run in // the order they were added. GetPriority() int }
MediaPipelineStage represents a stage in the media processing pipeline.
Stages are the building blocks of the pipeline. Each stage performs a specific operation on media data and can be chained together to create complex processing workflows.
Stages are ordered by priority, with lower priority values executing first. Each stage can choose whether to process specific media types.
type MediaProcessingStats ¶
type MediaProcessingStats struct { // FramesProcessed is the total number of successfully processed frames FramesProcessed uint64 // FramesDropped is the number of frames that were dropped FramesDropped uint64 // ProcessingTimeMs is the average processing time in milliseconds ProcessingTimeMs float64 // LastProcessedAt is when the last frame was processed LastProcessedAt time.Time // Errors is the number of processing errors encountered Errors uint64 // contains filtered or unexported fields }
MediaProcessingStats tracks processing statistics.
These metrics help monitor pipeline performance and identify bottlenecks or issues. Stats are updated in real-time as media flows through the pipeline.
type MediaProcessor ¶
type MediaProcessor interface { // ProcessAudio processes audio samples. // // Parameters: // - samples: Raw audio data // - sampleRate: Sample rate in Hz (e.g., 48000) // - channels: Number of audio channels (e.g., 1 for mono, 2 for stereo) // // Returns processed audio data or an error. ProcessAudio(ctx context.Context, samples []byte, sampleRate uint32, channels uint8) ([]byte, error) // ProcessVideo processes video frames. // // Parameters: // - frame: Raw video frame data // - width: Frame width in pixels // - height: Frame height in pixels // - format: Pixel format of the frame // // Returns processed video data or an error. ProcessVideo(ctx context.Context, frame []byte, width, height uint32, format VideoFormat) ([]byte, error) // GetName returns the unique name of this processor. // Used for registration and identification. GetName() string // GetCapabilities returns what this processor can handle. // Used to determine if a processor is suitable for specific media. GetCapabilities() ProcessorCapabilities }
MediaProcessor defines the interface for media processing components.
Processors are specialized components that can transform media data. They can be registered with the pipeline and used by stages to perform specific operations like encoding, decoding, filtering, or effects.
Implementations should be thread-safe as they may be called from multiple goroutines concurrently.
type MediaTrackPipeline ¶
type MediaTrackPipeline struct { // TrackID uniquely identifies the track being processed TrackID string // Track is the WebRTC remote track being processed Track *webrtc.TrackRemote // Pipeline is a reference to the parent pipeline Pipeline *MediaPipeline // InputBuffer queues incoming media data for processing InputBuffer *MediaBuffer // OutputBuffer stores processed media data OutputBuffer *MediaBuffer // ProcessingStats tracks performance metrics ProcessingStats *MediaProcessingStats // Active indicates if the pipeline is currently processing Active bool // contains filtered or unexported fields }
MediaTrackPipeline manages the pipeline for a single track.
Each track gets its own pipeline instance with dedicated buffers and processing goroutine. This ensures isolation between tracks and allows for parallel processing.
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler manages custom message handlers
func NewMessageHandler ¶
func NewMessageHandler(logger Logger) *MessageHandler
NewMessageHandler creates a new message handler
func (*MessageHandler) HandleMessage ¶
func (h *MessageHandler) HandleMessage(msg *ServerMessage) error
HandleMessage handles a server message
func (*MessageHandler) RegisterHandler ¶
func (h *MessageHandler) RegisterHandler(messageType string, handler func(*ServerMessage) error)
RegisterHandler registers a handler for a message type
type MessageTypeHandler ¶
type MessageTypeHandler interface { HandleMessage(ctx context.Context, data []byte) error GetMessageType() string }
MessageTypeHandler handles specific message type processing
type MessageTypeRegistry ¶
type MessageTypeRegistry struct {
// contains filtered or unexported fields
}
MessageTypeRegistry manages custom message type handlers
func NewMessageTypeRegistry ¶
func NewMessageTypeRegistry(logger *zap.Logger) *MessageTypeRegistry
NewMessageTypeRegistry creates a new message type registry
func (*MessageTypeRegistry) HandleMessage ¶
HandleMessage processes a message with the appropriate handler
func (*MessageTypeRegistry) HasHandler ¶
func (r *MessageTypeRegistry) HasHandler(msgType string) bool
HasHandler checks if a handler exists for the message type
func (*MessageTypeRegistry) RegisterHandler ¶
func (r *MessageTypeRegistry) RegisterHandler(handler MessageTypeHandler)
RegisterHandler registers a handler for a specific message type
type MetadataPriorityCalculator ¶
type MetadataPriorityCalculator struct { // PriorityField is the field name in metadata to check for priority value. // For example, if PriorityField is "priority", the calculator looks for // metadata like "priority:high" or "priority:urgent". PriorityField string }
MetadataPriorityCalculator calculates priority from structured job metadata.
This calculator looks for a specific field in the job metadata to determine priority. The metadata is expected to be in a simple key:value format. For production use, consider parsing JSON metadata instead.
Example:
calc := &MetadataPriorityCalculator{PriorityField: "priority"} // Job with metadata "priority:urgent" → JobPriorityUrgent // Job with metadata "priority:high" → JobPriorityHigh
func (*MetadataPriorityCalculator) CalculatePriority ¶
func (m *MetadataPriorityCalculator) CalculatePriority(job *livekit.Job) JobPriority
CalculatePriority extracts priority from job metadata field.
Looks for metadata in format "{PriorityField}:{value}" where value can be:
- "urgent" → JobPriorityUrgent
- "high" → JobPriorityHigh
- "low" → JobPriorityLow
- anything else → JobPriorityNormal
Note: This is a simplified implementation. For production use, consider parsing structured JSON metadata.
type MetricsBatchProcessor ¶
type MetricsBatchProcessor struct {
// contains filtered or unexported fields
}
MetricsBatchProcessor batches events for metrics processing
func NewMetricsBatchProcessor ¶
func NewMetricsBatchProcessor(sink func(events []ParticipantEvent)) *MetricsBatchProcessor
NewMetricsBatchProcessor creates a metrics batch processor
func (*MetricsBatchProcessor) GetBatchSize ¶
func (p *MetricsBatchProcessor) GetBatchSize() int
func (*MetricsBatchProcessor) GetBatchTimeout ¶
func (p *MetricsBatchProcessor) GetBatchTimeout() time.Duration
func (*MetricsBatchProcessor) ProcessBatch ¶
func (p *MetricsBatchProcessor) ProcessBatch(events []ParticipantEvent) error
func (*MetricsBatchProcessor) ShouldBatch ¶
func (p *MetricsBatchProcessor) ShouldBatch(event ParticipantEvent) bool
type MultiParticipantCoordinator ¶
type MultiParticipantCoordinator struct {
// contains filtered or unexported fields
}
MultiParticipantCoordinator coordinates activities across multiple participants
func NewMultiParticipantCoordinator ¶
func NewMultiParticipantCoordinator() *MultiParticipantCoordinator
NewMultiParticipantCoordinator creates a new coordinator
func (*MultiParticipantCoordinator) AddCoordinationRule ¶
func (c *MultiParticipantCoordinator) AddCoordinationRule(rule CoordinationRule)
AddCoordinationRule adds a coordination rule
func (*MultiParticipantCoordinator) AddParticipantToGroup ¶
func (c *MultiParticipantCoordinator) AddParticipantToGroup(identity, groupID string) error
AddParticipantToGroup adds a participant to a group
func (*MultiParticipantCoordinator) CreateGroup ¶
func (c *MultiParticipantCoordinator) CreateGroup(id, name string, metadata map[string]interface{}) (*ParticipantGroup, error)
CreateGroup creates a new participant group
func (*MultiParticipantCoordinator) GetActiveParticipants ¶
func (c *MultiParticipantCoordinator) GetActiveParticipants() []*CoordinatedParticipant
GetActiveParticipants returns participants active within the threshold
func (*MultiParticipantCoordinator) GetActivityMetrics ¶
func (c *MultiParticipantCoordinator) GetActivityMetrics() ActivityMetrics
GetActivityMetrics returns metrics about participant activities
func (*MultiParticipantCoordinator) GetGroupMembers ¶
func (c *MultiParticipantCoordinator) GetGroupMembers(groupID string) []string
GetGroupMembers returns the members of a group
func (*MultiParticipantCoordinator) GetInteractionGraph ¶
func (c *MultiParticipantCoordinator) GetInteractionGraph() map[string]map[string]int
GetInteractionGraph returns interaction data between participants
func (*MultiParticipantCoordinator) GetParticipantGroups ¶
func (c *MultiParticipantCoordinator) GetParticipantGroups(identity string) []*ParticipantGroup
GetParticipantGroups returns groups for a participant
func (*MultiParticipantCoordinator) GetParticipantInteractions ¶
func (c *MultiParticipantCoordinator) GetParticipantInteractions(identity string) []ParticipantInteraction
GetParticipantInteractions returns interactions for a specific participant
func (*MultiParticipantCoordinator) RecordInteraction ¶
func (c *MultiParticipantCoordinator) RecordInteraction(from, to, interactionType string, data interface{})
RecordInteraction records an interaction between participants
func (*MultiParticipantCoordinator) RegisterEventHandler ¶
func (c *MultiParticipantCoordinator) RegisterEventHandler(eventType string, handler CoordinationEventHandler)
RegisterEventHandler registers an event handler
func (*MultiParticipantCoordinator) RegisterParticipant ¶
func (c *MultiParticipantCoordinator) RegisterParticipant(identity string, participant *lksdk.RemoteParticipant)
RegisterParticipant registers a participant with the coordinator
func (*MultiParticipantCoordinator) RemoveParticipantFromGroup ¶
func (c *MultiParticipantCoordinator) RemoveParticipantFromGroup(identity, groupID string) error
RemoveParticipantFromGroup removes a participant from a group
func (*MultiParticipantCoordinator) Stop ¶
func (c *MultiParticipantCoordinator) Stop()
Stop stops the coordinator
func (*MultiParticipantCoordinator) UnregisterParticipant ¶
func (c *MultiParticipantCoordinator) UnregisterParticipant(identity string)
UnregisterParticipant removes a participant from coordination
func (*MultiParticipantCoordinator) UpdateParticipantActivity ¶
func (c *MultiParticipantCoordinator) UpdateParticipantActivity(identity string, activityType ActivityType)
UpdateParticipantActivity updates activity for a participant
type NetworkHandler ¶
type NetworkHandler struct {
// contains filtered or unexported fields
}
NetworkHandler provides enhanced network error handling
func NewNetworkHandler ¶
func NewNetworkHandler() *NetworkHandler
NewNetworkHandler creates a new network handler
func (*NetworkHandler) DetectNetworkPartition ¶
func (n *NetworkHandler) DetectNetworkPartition() bool
DetectNetworkPartition checks for network partition based on activity
func (*NetworkHandler) GetDNSFailureCount ¶
func (n *NetworkHandler) GetDNSFailureCount() int32
GetDNSFailureCount returns the current DNS failure count
func (*NetworkHandler) GetPartialWriteData ¶
func (n *NetworkHandler) GetPartialWriteData() (int, []byte)
GetPartialWriteData returns the partial write data if any
func (*NetworkHandler) HasPartialWrite ¶
func (n *NetworkHandler) HasPartialWrite() bool
HasPartialWrite returns true if there's a partial write pending
func (*NetworkHandler) ResolveDNSWithRetry ¶
ResolveDNSWithRetry performs DNS resolution with retry logic
func (*NetworkHandler) SetNetworkPartition ¶
func (n *NetworkHandler) SetNetworkPartition(detected bool)
SetNetworkPartition sets the network partition state (for testing)
func (*NetworkHandler) UpdateNetworkActivity ¶
func (n *NetworkHandler) UpdateNetworkActivity()
UpdateNetworkActivity updates the last network activity time
func (*NetworkHandler) WriteMessageWithRetry ¶
func (n *NetworkHandler) WriteMessageWithRetry(conn WebSocketConn, messageType int, data []byte) error
WriteMessageWithRetry handles partial writes and retries
type NetworkMonitor ¶
type NetworkMonitor struct {
// contains filtered or unexported fields
}
NetworkMonitor monitors network health
func NewNetworkMonitor ¶
func NewNetworkMonitor(handler *NetworkHandler, checkInterval time.Duration) *NetworkMonitor
NewNetworkMonitor creates a new network monitor
func (*NetworkMonitor) Start ¶
func (m *NetworkMonitor) Start(partitionCallback func())
Start begins monitoring network health
type PartialMessageBuffer ¶
type PartialMessageBuffer struct {
// contains filtered or unexported fields
}
PartialMessageBuffer handles partial WebSocket messages during reconnection.
When a WebSocket connection is interrupted, messages may be partially received. This buffer helps preserve partial data and attempt to reconstruct complete messages after reconnection.
The buffer includes size limits and staleness detection to prevent memory issues from incomplete messages.
func NewPartialMessageBuffer ¶
func NewPartialMessageBuffer(maxSize int) *PartialMessageBuffer
NewPartialMessageBuffer creates a new buffer for partial messages.
maxSize limits the buffer size to prevent memory issues. If maxSize <= 0, defaults to 1MB.
func (*PartialMessageBuffer) Append ¶
func (b *PartialMessageBuffer) Append(messageType int, data []byte) error
Append adds data to the partial message buffer.
If the message type changes from the previous append, the buffer is reset. Returns an error if adding the data would exceed maxSize.
The method is thread-safe.
func (*PartialMessageBuffer) Clear ¶
func (b *PartialMessageBuffer) Clear()
Clear resets the buffer.
Use this to discard incomplete data, such as when giving up on message reconstruction or starting fresh after reconnection.
The method is thread-safe.
func (*PartialMessageBuffer) GetComplete ¶
func (b *PartialMessageBuffer) GetComplete() (int, []byte, bool)
GetComplete returns the complete message if available.
For text messages, attempts to parse as JSON to verify completeness. Binary messages cannot be reliably validated without framing information.
Returns:
- messageType: The WebSocket message type
- data: The complete message data (nil if incomplete)
- complete: true if a complete message is available
If a complete message is returned, the buffer is cleared. The method is thread-safe.
func (*PartialMessageBuffer) IsStale ¶
func (b *PartialMessageBuffer) IsStale(timeout time.Duration) bool
IsStale checks if the buffer has been idle too long.
Returns true if no data has been appended for longer than the specified timeout. Stale buffers likely contain incomplete messages that will never be completed.
The method is thread-safe.
type ParticipantActivity ¶
type ParticipantActivity struct { Type ActivityType Timestamp time.Time Details map[string]interface{} }
ParticipantActivity represents an activity by a participant
type ParticipantEvent ¶
type ParticipantEvent struct { ID string Type EventType Participant *lksdk.RemoteParticipant Track *lksdk.RemoteTrackPublication OldValue string NewValue string Data []byte DataKind livekit.DataPacket_Kind Timestamp time.Time Metadata map[string]interface{} }
ParticipantEvent represents an event related to a participant
type ParticipantEventProcessor ¶
type ParticipantEventProcessor struct {
// contains filtered or unexported fields
}
ParticipantEventProcessor processes participant events
func NewParticipantEventProcessor ¶
func NewParticipantEventProcessor() *ParticipantEventProcessor
NewParticipantEventProcessor creates a new event processor
func (*ParticipantEventProcessor) AddBatchProcessor ¶
func (p *ParticipantEventProcessor) AddBatchProcessor(processor BatchEventProcessor)
AddBatchProcessor adds a batch processor
func (*ParticipantEventProcessor) AddFilter ¶
func (p *ParticipantEventProcessor) AddFilter(filter EventFilter)
AddFilter adds an event filter
func (*ParticipantEventProcessor) GetEventHistory ¶
func (p *ParticipantEventProcessor) GetEventHistory(limit int) []ParticipantEvent
GetEventHistory returns recent event history
func (*ParticipantEventProcessor) GetMetrics ¶
func (p *ParticipantEventProcessor) GetMetrics() map[string]interface{}
GetMetrics returns processing metrics
func (*ParticipantEventProcessor) ProcessPendingEvents ¶
func (p *ParticipantEventProcessor) ProcessPendingEvents()
ProcessPendingEvents processes any pending events synchronously
func (*ParticipantEventProcessor) QueueEvent ¶
func (p *ParticipantEventProcessor) QueueEvent(event ParticipantEvent)
QueueEvent queues an event for processing
func (*ParticipantEventProcessor) RegisterHandler ¶
func (p *ParticipantEventProcessor) RegisterHandler(eventType EventType, handler EventHandler)
RegisterHandler registers an event handler
func (*ParticipantEventProcessor) Stop ¶
func (p *ParticipantEventProcessor) Stop()
Stop stops the event processor
type ParticipantGroup ¶
type ParticipantGroup struct { ID string Name string Participants map[string]bool Created time.Time Metadata map[string]interface{} Rules []GroupRule }
ParticipantGroup represents a group of participants
type ParticipantInfo ¶
type ParticipantInfo struct { Identity string Name string Metadata string JoinedAt time.Time LastActivity time.Time Permissions *livekit.ParticipantPermission Attributes map[string]string IsSpeaking bool AudioLevel float32 ConnectionQuality livekit.ConnectionQuality Participant *lksdk.RemoteParticipant }
ParticipantInfo stores information about a participant
type ParticipantInteraction ¶
type ParticipantInteraction struct { From string To string Type string Timestamp time.Time Bidirectional bool Data interface{} }
ParticipantInteraction represents an interaction between participants
type ParticipantPermissionManager ¶
type ParticipantPermissionManager struct {
// contains filtered or unexported fields
}
ParticipantPermissionManager manages permissions for participants
func NewParticipantPermissionManager ¶
func NewParticipantPermissionManager() *ParticipantPermissionManager
NewParticipantPermissionManager creates a new permission manager
func (*ParticipantPermissionManager) AddPolicy ¶
func (m *ParticipantPermissionManager) AddPolicy(policy PermissionPolicy)
AddPolicy adds a permission policy
func (*ParticipantPermissionManager) CanManagePermissions ¶
func (m *ParticipantPermissionManager) CanManagePermissions() bool
CanManagePermissions checks if the agent can manage permissions
func (*ParticipantPermissionManager) CanSendDataTo ¶
func (m *ParticipantPermissionManager) CanSendDataTo(identity string) bool
CanSendDataTo checks if the agent can send data to a participant
func (*ParticipantPermissionManager) GetParticipantPermissions ¶
func (m *ParticipantPermissionManager) GetParticipantPermissions(identity string) *livekit.ParticipantPermission
GetParticipantPermissions returns current permissions for a participant
func (*ParticipantPermissionManager) GetPermissionHistory ¶
func (m *ParticipantPermissionManager) GetPermissionHistory(identity string) []PermissionChange
GetPermissionHistory returns permission change history for a participant
func (*ParticipantPermissionManager) RemoveParticipant ¶
func (m *ParticipantPermissionManager) RemoveParticipant(identity string)
RemoveParticipant removes a participant from tracking
func (*ParticipantPermissionManager) RequestPermissionChange ¶
func (m *ParticipantPermissionManager) RequestPermissionChange(identity string, requested *livekit.ParticipantPermission) (bool, error)
RequestPermissionChange processes a permission change request
func (*ParticipantPermissionManager) SetAgentCapabilities ¶
func (m *ParticipantPermissionManager) SetAgentCapabilities(caps *AgentCapabilities)
SetAgentCapabilities sets what the agent is allowed to do
func (*ParticipantPermissionManager) SetCustomRestriction ¶
func (m *ParticipantPermissionManager) SetCustomRestriction(identity, restriction string, enabled bool)
SetCustomRestriction sets a custom restriction for a participant
func (*ParticipantPermissionManager) SetDefaultPermissions ¶
func (m *ParticipantPermissionManager) SetDefaultPermissions(perms *livekit.ParticipantPermission)
SetDefaultPermissions sets default permissions for new participants
func (*ParticipantPermissionManager) UpdateParticipantPermissions ¶
func (m *ParticipantPermissionManager) UpdateParticipantPermissions(identity string, perms *livekit.ParticipantPermission)
UpdateParticipantPermissions updates permissions for a participant
func (*ParticipantPermissionManager) ValidatePermissions ¶
func (m *ParticipantPermissionManager) ValidatePermissions(perms *livekit.ParticipantPermission) error
ValidatePermissions validates a permission set
type ParticipantPermissions ¶
type ParticipantPermissions struct { Identity string Current *livekit.ParticipantPermission Requested *livekit.ParticipantPermission LastUpdated time.Time LastRequested time.Time ChangeHistory []PermissionChange CustomRestrictions map[string]bool }
ParticipantPermissions tracks permissions for a participant
type ParticipantTracker ¶
type ParticipantTracker struct {
// contains filtered or unexported fields
}
ParticipantTracker manages participant state for a room
func NewParticipantTracker ¶
func NewParticipantTracker(room *lksdk.Room) *ParticipantTracker
NewParticipantTracker creates a new participant tracker
func (*ParticipantTracker) GetAllParticipants ¶
func (pt *ParticipantTracker) GetAllParticipants() []*lksdk.RemoteParticipant
GetAllParticipants returns all participants in the room
func (*ParticipantTracker) GetParticipant ¶
func (pt *ParticipantTracker) GetParticipant(identity string) (*lksdk.RemoteParticipant, error)
GetParticipant returns a participant by identity
func (*ParticipantTracker) GetParticipantInfo ¶
func (pt *ParticipantTracker) GetParticipantInfo(identity string) (*ParticipantInfo, bool)
GetParticipantInfo returns tracked info for a participant
func (*ParticipantTracker) RemoveParticipant ¶
func (pt *ParticipantTracker) RemoveParticipant(identity string)
RemoveParticipant removes a participant from tracking
func (*ParticipantTracker) UpdateParticipantInfo ¶
func (pt *ParticipantTracker) UpdateParticipantInfo(identity string, info *ParticipantInfo)
UpdateParticipantInfo updates tracked info for a participant
type PassthroughStage ¶
type PassthroughStage struct {
// contains filtered or unexported fields
}
PassthroughStage is a simple stage that passes data through unchanged.
This stage is useful for:
- Testing pipeline functionality
- Placeholder stages during development
- Monitoring points without data modification
The stage processes all media types and returns data unmodified.
func NewPassthroughStage ¶
func NewPassthroughStage(name string, priority int) *PassthroughStage
NewPassthroughStage creates a new passthrough stage.
Parameters:
- name: Unique identifier for this stage
- priority: Execution order (lower runs first)
func (*PassthroughStage) CanProcess ¶
func (ps *PassthroughStage) CanProcess(mediaType MediaType) bool
CanProcess implements MediaPipelineStage. Always returns true.
func (*PassthroughStage) GetName ¶
func (ps *PassthroughStage) GetName() string
GetName implements MediaPipelineStage.
func (*PassthroughStage) GetPriority ¶
func (ps *PassthroughStage) GetPriority() int
GetPriority implements MediaPipelineStage.
type PermissionChange ¶
type PermissionChange struct { Timestamp time.Time From *livekit.ParticipantPermission To *livekit.ParticipantPermission Reason string Approved bool }
PermissionChange represents a permission change event
type PermissionPolicy ¶
type PermissionPolicy interface { // EvaluatePermissionRequest evaluates if a permission change should be allowed EvaluatePermissionRequest(identity string, current, requested *livekit.ParticipantPermission) (bool, string) // GetDefaultPermissions returns default permissions for a participant GetDefaultPermissions(identity string) *livekit.ParticipantPermission }
PermissionPolicy defines a policy for permission management
type PredictiveLoadCalculator ¶
type PredictiveLoadCalculator struct {
// contains filtered or unexported fields
}
PredictiveLoadCalculator uses historical data to predict future load.
This calculator wraps another LoadCalculator and adds trend analysis to predict future load based on recent history. It's useful for:
- Anticipating load increases before they fully materialize
- Smoothing out temporary spikes or dips
- Making proactive scaling decisions
The prediction is conservative (20% of trend) to avoid overreaction.
func NewPredictiveLoadCalculator ¶
func NewPredictiveLoadCalculator(base LoadCalculator, windowSize int) *PredictiveLoadCalculator
NewPredictiveLoadCalculator creates a calculator with trend prediction.
Parameters:
- base: The underlying calculator for current load calculation
- windowSize: Number of historical snapshots to keep (default: 10)
The calculator maintains a sliding window of load history and uses linear regression to detect trends.
func (*PredictiveLoadCalculator) Calculate ¶
func (p *PredictiveLoadCalculator) Calculate(metrics LoadMetrics) float32
Calculate implements LoadCalculator with trend prediction.
The calculation process:
- Calculate current load using the base calculator
- Add the snapshot to history
- If enough history exists (3+ points), calculate trend
- Apply conservative prediction: current + (trend * 0.2)
- Clamp result between 0.0 and 1.0
Returns the base calculation if insufficient history exists.
type ProcessorCapabilities ¶
type ProcessorCapabilities struct { // SupportedMediaTypes lists which media types this processor handles SupportedMediaTypes []MediaType // SupportedFormats lists specific formats the processor supports SupportedFormats []MediaFormat // MaxConcurrency is the maximum number of concurrent operations (0 = unlimited) MaxConcurrency int // RequiresGPU indicates if this processor needs GPU acceleration RequiresGPU bool }
ProcessorCapabilities describes what a processor can handle.
This information helps the pipeline determine if a processor is suitable for specific media and how to manage concurrency.
type ProtocolHandler ¶
type ProtocolHandler struct {
// contains filtered or unexported fields
}
ProtocolHandler manages protocol-level operations and compatibility
func NewProtocolHandler ¶
func NewProtocolHandler(logger *zap.Logger) *ProtocolHandler
NewProtocolHandler creates a new protocol handler
func (*ProtocolHandler) GetProtocolMetrics ¶
func (p *ProtocolHandler) GetProtocolMetrics() map[string]interface{}
GetProtocolMetrics returns protocol-related metrics
func (*ProtocolHandler) GetUnknownMessageCount ¶
func (p *ProtocolHandler) GetUnknownMessageCount() int64
GetUnknownMessageCount returns the count of unknown messages received
func (*ProtocolHandler) GetUnsupportedMessageTypes ¶
func (p *ProtocolHandler) GetUnsupportedMessageTypes() map[string]int64
GetUnsupportedMessageTypes returns a copy of unsupported message types and their counts
func (*ProtocolHandler) HandleUnknownMessage ¶
func (p *ProtocolHandler) HandleUnknownMessage(msgType string, data []byte) error
HandleUnknownMessage processes unknown message types
func (*ProtocolHandler) IsVersionMismatchDetected ¶
func (p *ProtocolHandler) IsVersionMismatchDetected() bool
IsVersionMismatchDetected returns true if version mismatch was detected
func (*ProtocolHandler) SetServerVersion ¶
func (p *ProtocolHandler) SetServerVersion(version string)
SetServerVersion sets the server version from registration response
func (*ProtocolHandler) SetStrictMode ¶
func (p *ProtocolHandler) SetStrictMode(strict bool)
SetStrictMode enables or disables strict protocol mode
func (*ProtocolHandler) ValidateProtocolMessage ¶
func (p *ProtocolHandler) ValidateProtocolMessage(msg *livekit.ServerMessage) error
ValidateProtocolMessage performs protocol-level validation
type ProtocolNegotiator ¶
type ProtocolNegotiator struct {
// contains filtered or unexported fields
}
ProtocolNegotiator handles protocol version negotiation
func NewProtocolNegotiator ¶
func NewProtocolNegotiator() *ProtocolNegotiator
NewProtocolNegotiator creates a new protocol negotiator
func (*ProtocolNegotiator) GetFeatures ¶
func (n *ProtocolNegotiator) GetFeatures(version string) []string
GetFeatures returns features available for a specific version
func (*ProtocolNegotiator) NegotiateVersion ¶
func (n *ProtocolNegotiator) NegotiateVersion(serverVersions []string) (string, error)
NegotiateVersion selects the best compatible version
type ProtocolUpgradeHandler ¶
type ProtocolUpgradeHandler struct {
// contains filtered or unexported fields
}
ProtocolUpgradeHandler manages protocol upgrades
func (*ProtocolUpgradeHandler) PerformUpgrade ¶
func (h *ProtocolUpgradeHandler) PerformUpgrade(timeout time.Duration) error
PerformUpgrade executes a protocol upgrade with rollback capability
type ProtocolValidator ¶
type ProtocolValidator struct {
// contains filtered or unexported fields
}
ProtocolValidator validates protocol messages
func NewProtocolValidator ¶
func NewProtocolValidator(strictMode bool) *ProtocolValidator
NewProtocolValidator creates a new protocol validator
func (*ProtocolValidator) ValidateServerMessage ¶
func (v *ProtocolValidator) ValidateServerMessage(msg *ServerMessage) error
ValidateServerMessage validates a server message
type ProximityRule ¶
type ProximityRule struct {
// contains filtered or unexported fields
}
ProximityRule groups participants based on interaction frequency
func NewProximityRule ¶
func NewProximityRule(threshold int, window time.Duration) *ProximityRule
NewProximityRule creates a proximity-based grouping rule
func (*ProximityRule) Evaluate ¶
func (r *ProximityRule) Evaluate(participants map[string]*CoordinatedParticipant) (bool, []CoordinationAction)
func (*ProximityRule) GetName ¶
func (r *ProximityRule) GetName() string
type PublisherInfo ¶
type PublisherInfo struct { Identity string Name string Metadata string Permissions *livekit.ParticipantPermission }
PublisherInfo contains information about a publisher
type PublisherTrackSubscription ¶
type PublisherTrackSubscription struct { TrackID string ParticipantID string TrackInfo *livekit.TrackInfo Publication *lksdk.RemoteTrackPublication Track interface{} // Generic track interface SubscribedAt time.Time Quality livekit.VideoQuality PreferredQuality livekit.VideoQuality CurrentQuality livekit.VideoQuality LastQualityChange time.Time Dimensions *VideoDimensions FrameRate float32 Enabled bool }
PublisherTrackSubscription represents a subscription to a publisher's track
type QualityAdaptationPolicy ¶
type QualityAdaptationPolicy struct { // LossThresholdUp is the packet loss percentage that triggers quality decrease LossThresholdUp float64 // LossThresholdDown is the packet loss percentage threshold to allow quality increase LossThresholdDown float64 // BitrateThresholdUp is the bitrate utilization percentage to allow quality increase BitrateThresholdUp float64 // BitrateThresholdDown is the bitrate utilization percentage that triggers quality decrease BitrateThresholdDown float64 // RTTThresholdHigh is the RTT in milliseconds that triggers quality decrease RTTThresholdHigh float64 // RTTThresholdLow is the RTT in milliseconds threshold to allow quality increase RTTThresholdLow float64 // StableWindowUp is the time to wait before increasing quality StableWindowUp time.Duration // StableWindowDown is the time to wait before decreasing quality StableWindowDown time.Duration // MinTimeBetweenChanges is the minimum time between quality changes MinTimeBetweenChanges time.Duration // PreferTemporalScaling prefers reducing frame rate over resolution PreferTemporalScaling bool // AllowDynamicFPS allows frame rate adjustments AllowDynamicFPS bool // MaxQuality is the maximum allowed quality level MaxQuality livekit.VideoQuality // MinQuality is the minimum allowed quality level MinQuality livekit.VideoQuality }
QualityAdaptationPolicy defines how quality should be adapted.
This policy controls the behavior of automatic quality adaptation, including thresholds for making changes and timing constraints. Fine-tuning these parameters allows optimization for different network conditions and use cases.
func DefaultQualityAdaptationPolicy ¶
func DefaultQualityAdaptationPolicy() QualityAdaptationPolicy
DefaultQualityAdaptationPolicy returns a default quality adaptation policy.
The default policy provides balanced settings suitable for most use cases:
- 2% packet loss triggers quality decrease
- <1% packet loss allows quality increase
- 80% bitrate usage allows increase
- 95% bitrate usage triggers decrease
- 200ms RTT triggers decrease
- <100ms RTT allows increase
- 10 second stability window for increases
- 2 second window for decreases
These defaults prioritize stability while remaining responsive to degraded network conditions.
type QualityChange ¶
type QualityChange struct { // FromQuality is the quality level before the change FromQuality livekit.VideoQuality // ToQuality is the quality level after the change ToQuality livekit.VideoQuality // Reason describes why the quality was changed Reason string // Timestamp is when the change occurred Timestamp time.Time }
QualityChange represents a quality level change event.
This records when and why quality was changed, providing an audit trail for adaptation decisions.
type QualityController ¶
type QualityController struct {
// contains filtered or unexported fields
}
QualityController manages video quality adaptation for subscribed tracks.
The controller monitors network conditions and track statistics to automatically adjust video quality for optimal viewing experience. It balances quality against available bandwidth and network conditions.
Key features:
- Automatic quality adaptation based on network conditions
- Per-track monitoring and adjustment
- Configurable adaptation policies
- Historical tracking of quality changes
- Support for manual quality control
Example usage:
controller := NewQualityController() controller.SetAdaptationPolicy(customPolicy) controller.StartMonitoring(track, subscription) // Quality will be automatically adjusted based on conditions // Or manually control: controller.ApplyQualitySettings(track, livekit.VideoQuality_HIGH)
func NewQualityController ¶
func NewQualityController() *QualityController
NewQualityController creates a new quality controller.
The controller is initialized with:
- Default adaptation policy
- 1 second update interval
- Empty monitor map
The monitoring loop starts automatically when the first track is added.
func (*QualityController) ApplyDimensionSettings ¶
func (qc *QualityController) ApplyDimensionSettings(track *webrtc.TrackRemote, width, height uint32) error
ApplyDimensionSettings applies dimension preferences to a track.
This method sets the preferred video dimensions (resolution) for a subscribed track. This allows fine-grained control over bandwidth usage by requesting specific resolutions.
Note: Currently logs the intent. Full implementation requires server support for dynamic dimension adjustment.
func (*QualityController) ApplyFrameRateSettings ¶
func (qc *QualityController) ApplyFrameRateSettings(track *webrtc.TrackRemote, fps float64) error
ApplyFrameRateSettings applies frame rate preferences to a track.
This method sets the preferred frame rate for a subscribed track. Lowering frame rate can significantly reduce bandwidth usage while maintaining resolution, useful for screen share or presentation content.
Note: Currently logs the intent. Full implementation requires server support for dynamic frame rate adjustment.
func (*QualityController) ApplyQualitySettings ¶
func (qc *QualityController) ApplyQualitySettings(track *webrtc.TrackRemote, quality livekit.VideoQuality) error
ApplyQualitySettings applies quality settings to a track.
This method sets the preferred quality level for a subscribed track. In a full implementation, this would communicate with the LiveKit server to adjust the quality of the incoming stream.
Note: Currently logs the intent. Full implementation requires server support for dynamic quality adjustment.
func (*QualityController) CalculateOptimalQuality ¶
func (qc *QualityController) CalculateOptimalQuality(connQuality livekit.ConnectionQuality, subscription *PublisherTrackSubscription) livekit.VideoQuality
CalculateOptimalQuality calculates optimal quality based on connection quality.
This provides a simple mapping from connection quality indicators to video quality levels, respecting the configured max/min quality limits.
Connection quality mapping:
- EXCELLENT: Maximum allowed quality
- GOOD: One level below maximum (or maximum if max is MEDIUM or lower)
- POOR: Minimum allowed quality
- Unknown: MEDIUM quality as safe default
This method is useful for setting initial quality or when manual quality selection is needed based on connection assessment.
func (*QualityController) EnableAdaptation ¶
func (qc *QualityController) EnableAdaptation(trackID string, enabled bool)
EnableAdaptation enables or disables quality adaptation for a track.
When disabled, the track's quality remains fixed at its current level regardless of network conditions. This is useful when you want manual control over quality or need consistent quality for specific use cases.
Adaptation can be re-enabled at any time to resume automatic adjustment.
func (*QualityController) GetQualityHistory ¶
func (qc *QualityController) GetQualityHistory(trackID string) []QualityChange
GetQualityHistory returns the quality change history for a track.
Returns a copy of all quality changes for the specified track, ordered chronologically. Returns nil if the track is not found.
The history provides an audit trail showing:
- When quality changed
- What the quality changed from and to
- Why the change was made
This is useful for debugging adaptation behavior and understanding quality patterns over time.
func (*QualityController) GetTrackStats ¶
func (qc *QualityController) GetTrackStats(trackID string) (*TrackQualityStats, bool)
GetTrackStats returns current quality statistics for a track.
Returns a copy of the track's current statistics and true if the track is being monitored, or nil and false if not found.
The returned statistics provide insight into:
- Current quality level
- Network metrics (packet loss, bitrate, RTT)
- Video metrics (resolution, frame rate)
- Quality issues (freezes, pauses)
func (*QualityController) SetAdaptationPolicy ¶
func (qc *QualityController) SetAdaptationPolicy(policy QualityAdaptationPolicy)
SetAdaptationPolicy sets a custom adaptation policy.
This allows fine-tuning the quality adaptation behavior for specific use cases or network conditions. The new policy takes effect immediately for all monitored tracks.
func (*QualityController) SetUpdateInterval ¶
func (qc *QualityController) SetUpdateInterval(interval time.Duration)
SetUpdateInterval sets how often quality is evaluated.
The interval controls the frequency of:
- Statistics collection
- Quality evaluation
- Adaptation decisions
Shorter intervals provide more responsive adaptation but increase processing overhead. Longer intervals reduce overhead but may miss short-term network issues.
Default: 1 second. Recommended range: 500ms - 5s.
func (*QualityController) StartMonitoring ¶
func (qc *QualityController) StartMonitoring(track *webrtc.TrackRemote, subscription *PublisherTrackSubscription)
StartMonitoring starts monitoring a track's quality.
Once monitoring begins, the controller will:
- Collect statistics at regular intervals
- Evaluate network conditions
- Automatically adjust quality based on the adaptation policy
- Record quality changes in history
If the track is already being monitored, this method does nothing. The monitoring loop starts automatically with the first track.
func (*QualityController) StopMonitoring ¶
func (qc *QualityController) StopMonitoring(track *webrtc.TrackRemote)
StopMonitoring stops monitoring a track's quality.
This removes the track from monitoring and cleans up associated resources. If this was the last monitored track, the monitoring loop is also stopped.
Quality history and statistics for the track are discarded.
type QualityMeasurement ¶
type QualityMeasurement struct { Quality livekit.ConnectionQuality Timestamp time.Time }
QualityMeasurement represents a connection quality measurement taken at a specific timestamp. This structure is used to maintain a history of quality changes over time, enabling analysis of connection patterns and stability calculations.
type RaceProtectionGuard ¶
type RaceProtectionGuard struct {
// contains filtered or unexported fields
}
RaceProtectionGuard provides automatic race protection for operations
type RaceProtector ¶
type RaceProtector struct {
// contains filtered or unexported fields
}
RaceProtector provides protection against race conditions during worker lifecycle
func NewRaceProtector ¶
func NewRaceProtector(logger *zap.Logger) *RaceProtector
NewRaceProtector creates a new race condition protector
func (*RaceProtector) CanAcceptJob ¶
func (p *RaceProtector) CanAcceptJob(jobID string) (bool, string)
CanAcceptJob checks if a job can be accepted given current state
func (*RaceProtector) CleanupOldTerminations ¶
func (p *RaceProtector) CleanupOldTerminations(maxAge time.Duration) int
CleanupOldTerminations removes old termination records
func (*RaceProtector) CompleteTermination ¶
func (p *RaceProtector) CompleteTermination(jobID string, err error)
CompleteTermination marks a termination as complete
func (*RaceProtector) FlushPendingStatusUpdates ¶
func (p *RaceProtector) FlushPendingStatusUpdates() []StatusUpdate
FlushPendingStatusUpdates returns all pending status updates and clears the queue
func (*RaceProtector) GetMetrics ¶
func (p *RaceProtector) GetMetrics() map[string]interface{}
GetMetrics returns race protection metrics
func (*RaceProtector) IsDisconnecting ¶
func (p *RaceProtector) IsDisconnecting() bool
IsDisconnecting returns true if the worker is disconnecting
func (*RaceProtector) IsReconnecting ¶
func (p *RaceProtector) IsReconnecting() bool
IsReconnecting returns true if the worker is reconnecting
func (*RaceProtector) NewGuard ¶
func (p *RaceProtector) NewGuard(jobID string, operation string) *RaceProtectionGuard
NewRaceProtectionGuard creates a new guard for an operation
func (*RaceProtector) QueueStatusUpdate ¶
func (p *RaceProtector) QueueStatusUpdate(jobID string, status livekit.JobStatus, errorMsg string) bool
QueueStatusUpdate queues a status update during reconnection
func (*RaceProtector) RecordTerminationRequest ¶
func (p *RaceProtector) RecordTerminationRequest(jobID string) bool
RecordTerminationRequest records a termination request and handles concurrent requests
func (*RaceProtector) SetDisconnecting ¶
func (p *RaceProtector) SetDisconnecting(disconnecting bool)
SetDisconnecting marks the worker as disconnecting
func (*RaceProtector) SetOnJobDroppedCallback ¶
func (p *RaceProtector) SetOnJobDroppedCallback(cb func(jobID string, reason string))
SetOnJobDroppedCallback sets the callback for dropped jobs
func (*RaceProtector) SetOnStatusUpdateQueuedCallback ¶
func (p *RaceProtector) SetOnStatusUpdateQueuedCallback(cb func(jobID string))
SetOnStatusUpdateQueuedCallback sets the callback for queued status updates
func (*RaceProtector) SetReconnecting ¶
func (p *RaceProtector) SetReconnecting(reconnecting bool)
SetReconnecting marks the worker as reconnecting
type RecoverableJob ¶
type RecoverableJob struct { // JobID is the unique identifier of the job JobID string // JobState contains the runtime state of the job JobState *JobState // JobData is the original job assignment from the server JobData *livekit.Job // RoomToken is the authentication token for the room (if available) RoomToken string // LastUpdate tracks when this job was last active LastUpdate time.Time // RecoveryAttempts counts how many times recovery has been tried RecoveryAttempts int }
RecoverableJob contains all information needed to recover a job.
This structure preserves job state during disconnections and provides everything needed to restore the job when the worker reconnects.
type Resource ¶
type Resource interface { // IsHealthy checks if the resource is still usable IsHealthy() bool // Close cleans up the resource Close() error // Reset prepares the resource for reuse Reset() error }
Resource represents a pooled resource that can be acquired and released
type ResourceFactory ¶
type ResourceFactory interface { Create(ctx context.Context) (Resource, error) Validate(resource Resource) error }
ResourceFactory creates new resources for the pool
type ResourceGuard ¶
type ResourceGuard struct {
// contains filtered or unexported fields
}
ResourceGuard provides automatic resource protection
func NewResourceGuard ¶
func NewResourceGuard(monitor *ResourceMonitor) *ResourceGuard
NewResourceGuard creates a new resource guard
func (*ResourceGuard) ExecuteWithProtection ¶
func (g *ResourceGuard) ExecuteWithProtection(fn func() error) error
ExecuteWithProtection executes a function with resource protection
type ResourceHealthLevel ¶
type ResourceHealthLevel int
ResourceHealthLevel represents resource health status
const ( ResourceHealthGood ResourceHealthLevel = iota ResourceHealthWarning ResourceHealthCritical )
func (ResourceHealthLevel) String ¶
func (r ResourceHealthLevel) String() string
type ResourceLimitGuard ¶
type ResourceLimitGuard struct {
// contains filtered or unexported fields
}
ResourceLimitGuard provides automatic resource limit checking before executing operations. It can be used to prevent resource-intensive operations from running when the system is already near its limits, helping maintain stability and prevent resource exhaustion.
func (*ResourceLimitGuard) Execute ¶
func (g *ResourceLimitGuard) Execute(ctx context.Context, fn func() error) error
Execute runs the provided function only if resource usage is below safety thresholds. It checks memory and file descriptor usage before execution and returns an error if either resource is above 90% of its limit. This prevents operations from running when the system is already under resource pressure.
type ResourceLimiter ¶
type ResourceLimiter struct {
// contains filtered or unexported fields
}
ResourceLimiter enforces hard limits on system resources to prevent agents from consuming excessive memory, CPU, or file descriptors. It monitors resource usage periodically and can take corrective actions like forcing garbage collection, CPU throttling, or logging warnings when limits are exceeded. This helps ensure stable operation in production environments.
func NewResourceLimiter ¶
func NewResourceLimiter(logger *zap.Logger, opts ResourceLimiterOptions) *ResourceLimiter
NewResourceLimiter creates a new resource limiter with the specified options. Default values are applied for any unspecified limits:
- Memory: 1GB
- CPU: 200% (2 cores)
- File descriptors: 1024
- Check interval: 1 second
func (*ResourceLimiter) GetMetrics ¶
func (rl *ResourceLimiter) GetMetrics() map[string]interface{}
GetMetrics returns a snapshot of current resource usage and violation counts. This is useful for monitoring, alerting, and debugging resource-related issues. The returned map contains current usage, limits, and violation counts for all tracked resources.
func (*ResourceLimiter) NewGuard ¶
func (rl *ResourceLimiter) NewGuard(name string) *ResourceLimitGuard
NewGuard creates a new ResourceLimitGuard for protecting a named operation. The guard will check resource usage before allowing operations to proceed. The name is used for error messages and logging.
func (*ResourceLimiter) SetCPULimitCallback ¶
func (rl *ResourceLimiter) SetCPULimitCallback(cb func(usage float64))
SetCPULimitCallback sets a callback function to be invoked when CPU limits are exceeded. The callback receives the current CPU usage as a percentage. This can be used for adaptive behaviors like reducing job concurrency or triggering alerts.
func (*ResourceLimiter) SetFDLimitCallback ¶
func (rl *ResourceLimiter) SetFDLimitCallback(cb func(usage, limit int))
SetFDLimitCallback sets a callback function to be invoked when file descriptor limits are exceeded. The callback receives the current usage and limit counts. This can be used to trigger connection cleanup or implement connection pooling strategies.
func (*ResourceLimiter) SetMemoryLimitCallback ¶
func (rl *ResourceLimiter) SetMemoryLimitCallback(cb func(usage, limit uint64))
SetMemoryLimitCallback sets a callback function to be invoked when memory limits are exceeded. The callback receives the current memory usage and limit in bytes. This can be used to implement custom behaviors like alerting or adaptive resource management.
func (*ResourceLimiter) Start ¶
func (rl *ResourceLimiter) Start(ctx context.Context)
Start begins periodic resource monitoring and enforcement in a background goroutine. The monitoring continues until the provided context is cancelled. This should be called once when the agent starts up.
type ResourceLimiterOptions ¶
type ResourceLimiterOptions struct { MemoryLimitMB int // Memory limit in MB CPUQuotaPercent int // CPU quota as percentage (100 = 1 core) MaxFileDescriptors int // Max file descriptors CheckInterval time.Duration // How often to check limits EnforceHardLimits bool // If true, will force GC and throttle }
ResourceLimiterOptions configures the behavior and limits of a ResourceLimiter. All limits are optional and will use sensible defaults if not specified. Setting EnforceHardLimits to true enables active enforcement like garbage collection and CPU throttling, while false only logs violations.
type ResourceLimits ¶
ResourceLimits defines resource limits for the worker
type ResourceMonitor ¶
type ResourceMonitor struct {
// contains filtered or unexported fields
}
ResourceMonitor monitors system resources and detects issues
func NewResourceMonitor ¶
func NewResourceMonitor(logger *zap.Logger, opts ResourceMonitorOptions) *ResourceMonitor
NewResourceMonitor creates a new resource monitor
func (*ResourceMonitor) AddDependency ¶
func (m *ResourceMonitor) AddDependency(from, to string)
AddDependency adds a dependency relationship for circular detection
func (*ResourceMonitor) GetMetrics ¶
func (m *ResourceMonitor) GetMetrics() map[string]interface{}
GetMetrics returns current resource metrics
func (*ResourceMonitor) GetResourceStatus ¶
func (m *ResourceMonitor) GetResourceStatus() ResourceStatus
GetResourceStatus returns detailed resource status
func (*ResourceMonitor) IsHealthy ¶
func (m *ResourceMonitor) IsHealthy() bool
IsHealthy returns true if no resource issues are detected
func (*ResourceMonitor) SetCircularDependencyCallback ¶
func (m *ResourceMonitor) SetCircularDependencyCallback(cb func(deps []string))
SetCircularDependencyCallback sets the callback for circular dependency detection
func (*ResourceMonitor) SetLeakCallback ¶
func (m *ResourceMonitor) SetLeakCallback(cb func(count int))
SetLeakCallback sets the callback for goroutine leak detection
func (*ResourceMonitor) SetOOMCallback ¶
func (m *ResourceMonitor) SetOOMCallback(cb func())
SetOOMCallback sets the callback for OOM detection
func (*ResourceMonitor) Start ¶
func (m *ResourceMonitor) Start(ctx context.Context)
Start begins resource monitoring
type ResourceMonitorOptions ¶
type ResourceMonitorOptions struct { CheckInterval time.Duration MemoryLimitMB int // Memory limit in MB (0 = 80% of system memory) GoroutineLimit int // Max goroutines (0 = 10000) GoroutineLeakThreshold int // Consecutive increases to detect leak (0 = 5) }
ResourceMonitorOptions configures the resource monitor
type ResourcePool ¶
type ResourcePool struct {
// contains filtered or unexported fields
}
ResourcePool manages a pool of reusable resources
func NewResourcePool ¶
func NewResourcePool(factory ResourceFactory, opts ResourcePoolOptions) (*ResourcePool, error)
NewResourcePool creates a new resource pool
func (*ResourcePool) Acquire ¶
func (pool *ResourcePool) Acquire(ctx context.Context) (Resource, error)
Acquire gets a resource from the pool or creates a new one
func (*ResourcePool) Available ¶
func (pool *ResourcePool) Available() int
Available returns the number of available resources
func (*ResourcePool) Close ¶
func (pool *ResourcePool) Close() error
Close closes the pool and all resources
func (*ResourcePool) InUse ¶
func (pool *ResourcePool) InUse() int
InUse returns the number of resources currently in use
func (*ResourcePool) Release ¶
func (pool *ResourcePool) Release(resource Resource)
Release returns a resource to the pool
func (*ResourcePool) Size ¶
func (pool *ResourcePool) Size() int
Size returns the current number of resources (available + in use)
func (*ResourcePool) Stats ¶
func (pool *ResourcePool) Stats() map[string]int64
Stats returns pool statistics
type ResourcePoolOptions ¶
type ResourcePoolOptions struct { MinSize int // Minimum number of resources to maintain MaxSize int // Maximum number of resources in pool MaxIdleTime time.Duration // Maximum time a resource can be idle }
ResourcePoolOptions configures the resource pool
type ResourceStatus ¶
type ResourceStatus struct { MemoryUsageMB uint64 MemoryLimitMB uint64 MemoryPercent float64 GoroutineCount int GoroutineLimit int OOMDetected bool LeakDetected bool CircularDepDetected bool HealthLevel ResourceHealthLevel Timestamp time.Time }
ResourceStatus represents current resource status
type ResourceThresholds ¶
type ResourceThresholds struct { MemoryWarningPercent float64 // Warn at this % of limit (default 80%) MemoryCriticalPercent float64 // Critical at this % of limit (default 90%) GoroutineWarning int // Warn at this count (default 5000) GoroutineCritical int // Critical at this count (default 8000) }
ResourceThresholds defines resource usage thresholds
type RetryableWriteMessage ¶
RetryableWriteMessage wraps a WebSocket message for retry
func (*RetryableWriteMessage) CanRetry ¶
func (r *RetryableWriteMessage) CanRetry() bool
CanRetry returns true if the message can be retried
func (*RetryableWriteMessage) IncrementRetries ¶
func (r *RetryableWriteMessage) IncrementRetries()
IncrementRetries increments the retry count
type RoleBasedPolicy ¶
type RoleBasedPolicy struct {
// contains filtered or unexported fields
}
RoleBasedPolicy implements role-based permission management
func NewRoleBasedPolicy ¶
func NewRoleBasedPolicy() *RoleBasedPolicy
NewRoleBasedPolicy creates a new role-based policy
func (*RoleBasedPolicy) EvaluatePermissionRequest ¶
func (p *RoleBasedPolicy) EvaluatePermissionRequest(identity string, current, requested *livekit.ParticipantPermission) (bool, string)
func (*RoleBasedPolicy) GetDefaultPermissions ¶
func (p *RoleBasedPolicy) GetDefaultPermissions(identity string) *livekit.ParticipantPermission
type RoomCallbackProvider ¶
type RoomCallbackProvider interface { // GetRoomCallbacks returns callbacks to be used when connecting to the room // These will be merged with the Worker's default callbacks GetRoomCallbacks() *lksdk.RoomCallback }
RoomCallbackProvider is an optional interface that handlers can implement to provide custom room callbacks that will be merged with the Worker's callbacks
type ServerMessage ¶
type ServerMessage struct {
Message interface{} // Actual protobuf message
}
ServerMessage represents a message received from server
func (*ServerMessage) GetAssignment ¶
func (m *ServerMessage) GetAssignment() *livekit.JobAssignment
GetAssignment returns the job assignment if this is an assignment message
func (*ServerMessage) GetAvailability ¶
func (m *ServerMessage) GetAvailability() *livekit.AvailabilityRequest
GetAvailability returns the availability request if this is an availability message
func (*ServerMessage) GetPing ¶
func (m *ServerMessage) GetPing() *livekit.Ping
GetPing returns the ping if this is a ping message
func (*ServerMessage) GetRegister ¶
func (m *ServerMessage) GetRegister() *livekit.RegisterWorkerResponse
GetRegister returns the register response if this is a register message
func (*ServerMessage) GetTermination ¶
func (m *ServerMessage) GetTermination() *livekit.JobTermination
GetTermination returns the job termination if this is a termination message
type ServerMessage_Assignment ¶
type ServerMessage_Assignment struct {
Assignment *livekit.JobAssignment
}
type ServerMessage_Availability ¶
type ServerMessage_Availability struct {
Availability *livekit.AvailabilityRequest
}
type ServerMessage_Ping ¶
type ServerMessage_Register ¶
type ServerMessage_Register struct {
Register *livekit.RegisterWorkerResponse
}
Specific message types for ServerMessage
type ServerMessage_Termination ¶
type ServerMessage_Termination struct {
Termination *livekit.JobTermination
}
type ShutdownHandler ¶
type ShutdownHandler struct {
// contains filtered or unexported fields
}
ShutdownHandler manages graceful shutdown
func NewShutdownHandler ¶
func NewShutdownHandler(logger Logger) *ShutdownHandler
NewShutdownHandler creates a new shutdown handler
func (*ShutdownHandler) AddHook ¶
func (h *ShutdownHandler) AddHook(phase ShutdownPhase, hook ShutdownHook) error
AddHook adds a shutdown hook
func (*ShutdownHandler) ExecutePhase ¶
func (h *ShutdownHandler) ExecutePhase(ctx context.Context, phase ShutdownPhase) error
ExecutePhase executes shutdown hooks for a specific phase
func (*ShutdownHandler) GetHooks ¶
func (h *ShutdownHandler) GetHooks(phase ShutdownPhase) []ShutdownHook
GetHooks returns hooks for a phase
func (*ShutdownHandler) RemoveHook ¶
func (h *ShutdownHandler) RemoveHook(phase ShutdownPhase, name string) bool
RemoveHook removes a shutdown hook
type ShutdownHook ¶
type ShutdownHook struct { Name string Priority int // Lower numbers run first Timeout time.Duration Handler func(context.Context) error }
ShutdownHook represents a single shutdown hook that will be executed during worker shutdown. Hooks are organized by phase and priority, allowing for ordered cleanup operations. Each hook has a timeout to prevent hanging during shutdown.
type ShutdownHookBuilder ¶
type ShutdownHookBuilder struct {
// contains filtered or unexported fields
}
ShutdownHookBuilder provides a fluent API for constructing custom shutdown hooks. It allows for readable, chainable configuration of hook properties like priority, timeout, and handler function. Use NewShutdownHookBuilder to create a new builder.
func NewShutdownHookBuilder ¶
func NewShutdownHookBuilder(name string) *ShutdownHookBuilder
NewShutdownHookBuilder creates a new shutdown hook builder with default values. The hook is initialized with priority 100 and 5-second timeout. Use the fluent methods to customize these values and set the handler function.
func (*ShutdownHookBuilder) Build ¶
func (b *ShutdownHookBuilder) Build() ShutdownHook
Build returns the fully constructed ShutdownHook with all configured properties. This is the final step in the builder chain.
func (*ShutdownHookBuilder) WithHandler ¶
func (b *ShutdownHookBuilder) WithHandler(handler func(context.Context) error) *ShutdownHookBuilder
WithHandler sets the function to be executed when the hook runs. The handler receives a context that will be cancelled if the timeout is exceeded. Returns the builder for method chaining.
func (*ShutdownHookBuilder) WithPriority ¶
func (b *ShutdownHookBuilder) WithPriority(priority int) *ShutdownHookBuilder
WithPriority sets the hook priority (lower numbers execute first). Returns the builder for method chaining.
func (*ShutdownHookBuilder) WithTimeout ¶
func (b *ShutdownHookBuilder) WithTimeout(timeout time.Duration) *ShutdownHookBuilder
WithTimeout sets the maximum execution time for the hook. Returns the builder for method chaining.
type ShutdownHookManager ¶
type ShutdownHookManager struct {
// contains filtered or unexported fields
}
ShutdownHookManager manages registration and execution of shutdown hooks across all phases. It ensures hooks are executed in the correct order (by phase and priority) with proper timeout handling and error recovery. The manager is thread-safe and supports dynamic hook registration and removal.
func NewShutdownHookManager ¶
func NewShutdownHookManager(logger *zap.Logger) *ShutdownHookManager
NewShutdownHookManager creates a new shutdown hook manager with initialized phases. All shutdown phases are pre-configured with empty hook lists. The provided logger is used for debugging hook execution and reporting errors.
func (*ShutdownHookManager) AddHook ¶
func (m *ShutdownHookManager) AddHook(phase ShutdownPhase, hook ShutdownHook) error
AddHook registers a shutdown hook for execution in the specified phase. Hooks are automatically sorted by priority (lower numbers execute first). If no timeout is specified on the hook, a default 5-second timeout is applied. Returns an error if the phase is invalid.
func (*ShutdownHookManager) ClearAllHooks ¶
func (m *ShutdownHookManager) ClearAllHooks()
ClearAllHooks removes all registered hooks from all phases. This completely resets the hook manager to its initial empty state.
func (*ShutdownHookManager) ClearHooks ¶
func (m *ShutdownHookManager) ClearHooks(phase ShutdownPhase)
ClearHooks removes all hooks registered for the specified phase. This is useful for cleanup or resetting hook registration for a phase.
func (*ShutdownHookManager) ExecutePhase ¶
func (m *ShutdownHookManager) ExecutePhase(ctx context.Context, phase ShutdownPhase) error
ExecutePhase executes all registered hooks for the specified shutdown phase in priority order. Hooks are executed sequentially (not in parallel) to maintain ordering guarantees. If a hook fails or times out, execution continues with remaining hooks, but the first error is returned. Each hook runs with its individual timeout context.
func (*ShutdownHookManager) GetHookCount ¶
func (m *ShutdownHookManager) GetHookCount() int
GetHookCount returns the total number of registered hooks across all phases. This is useful for monitoring and debugging hook registration.
func (*ShutdownHookManager) GetHooks ¶
func (m *ShutdownHookManager) GetHooks(phase ShutdownPhase) []ShutdownHook
GetHooks returns a copy of all hooks registered for the specified phase. The returned slice is independent and can be safely modified without affecting the internal hook registry. Hooks are returned in priority order.
func (*ShutdownHookManager) RemoveHook ¶
func (m *ShutdownHookManager) RemoveHook(phase ShutdownPhase, name string) bool
RemoveHook removes a specific shutdown hook identified by phase and name. Returns true if the hook was found and removed, false if no matching hook exists. This is useful for dynamic hook management or cleanup of temporary hooks.
type ShutdownPhase ¶
type ShutdownPhase string
ShutdownPhase represents different phases of the shutdown process, allowing hooks to be executed at appropriate times during worker termination. Each phase serves a specific purpose in the shutdown sequence.
const ( // ShutdownPhasePreStop is executed before any jobs are terminated. // Use this phase for preparation tasks like notifying external systems. ShutdownPhasePreStop ShutdownPhase = "pre_stop" // ShutdownPhaseStopJobs is executed during job termination. // Use this phase for job-specific cleanup that must happen during shutdown. ShutdownPhaseStopJobs ShutdownPhase = "stop_jobs" // ShutdownPhasePostJobs is executed after all jobs have completed or been terminated. // Use this phase for cleanup that depends on jobs being stopped. ShutdownPhasePostJobs ShutdownPhase = "post_jobs" // ShutdownPhaseCleanup is executed during general resource cleanup. // Use this phase for releasing resources like database connections. ShutdownPhaseCleanup ShutdownPhase = "cleanup" // ShutdownPhaseFinal is executed as the last step before worker termination. // Use this phase for final tasks like flushing logs or metrics. ShutdownPhaseFinal ShutdownPhase = "final" )
type SimpleJobHandler ¶
type SimpleJobHandler struct { // OnJob is called when a job is assigned OnJob func(ctx context.Context, job *livekit.Job, room *lksdk.Room) error // Metadata provides participant information for the agent Metadata func(job *livekit.Job) *JobMetadata // OnTerminated is called when a job is terminated (optional) OnTerminated func(jobID string) }
SimpleJobHandler provides a simplified interface for handling jobs without needing to implement all JobHandler methods. This is useful for quick prototyping or simple agents that don't need the full complexity of the complete JobHandler interface.
func (*SimpleJobHandler) OnJobAssigned ¶
func (h *SimpleJobHandler) OnJobAssigned(ctx context.Context, job *livekit.Job, room *lksdk.Room) error
OnJobAssigned implements the JobHandler interface by delegating to the OnJob function if provided. Returns nil if no OnJob function is set.
func (*SimpleJobHandler) OnJobRequest ¶
func (h *SimpleJobHandler) OnJobRequest(ctx context.Context, job *livekit.Job) (bool, *JobMetadata)
OnJobRequest implements the JobHandler interface by providing automatic metadata generation based on job type. If a custom Metadata function is provided, it will be used instead. Always returns true (accepts all jobs).
func (*SimpleJobHandler) OnJobTerminated ¶
func (h *SimpleJobHandler) OnJobTerminated(ctx context.Context, jobID string)
OnJobTerminated implements the JobHandler interface by delegating to the OnTerminated function if provided. Does nothing if no OnTerminated function is set.
type SimpleUniversalHandler ¶
type SimpleUniversalHandler struct { BaseHandler // Optional callbacks - set only the ones you need JobRequestFunc func(ctx context.Context, job *livekit.Job) (bool, *JobMetadata) JobAssignedFunc func(ctx context.Context, jobCtx *JobContext) error JobTerminatedFunc func(ctx context.Context, jobID string) RoomConnectedFunc func(ctx context.Context, room *lksdk.Room) RoomDisconnectedFunc func(ctx context.Context, room *lksdk.Room, reason string) RoomMetadataChangedFunc func(ctx context.Context, oldMetadata, newMetadata string) ParticipantJoinedFunc func(ctx context.Context, participant *lksdk.RemoteParticipant) ParticipantLeftFunc func(ctx context.Context, participant *lksdk.RemoteParticipant) ParticipantMetadataChangedFunc func(ctx context.Context, participant *lksdk.RemoteParticipant, oldMetadata string) ParticipantSpeakingChangedFunc func(ctx context.Context, participant *lksdk.RemoteParticipant, speaking bool) TrackPublishedFunc func(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication) TrackUnpublishedFunc func(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication) TrackSubscribedFunc func(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant) TrackUnsubscribedFunc func(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant) TrackMutedFunc func(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant) TrackUnmutedFunc func(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant) DataReceivedFunc func(ctx context.Context, data []byte, participant *lksdk.RemoteParticipant, kind livekit.DataPacket_Kind) ConnectionQualityChangedFunc func(ctx context.Context, participant *lksdk.RemoteParticipant, quality livekit.ConnectionQuality) ActiveSpeakersChangedFunc func(ctx context.Context, speakers []lksdk.Participant) }
SimpleUniversalHandler is a convenience handler that uses function fields for callbacks. This allows for quick prototyping without creating a full handler type.
func (*SimpleUniversalHandler) GetJobMetadata ¶
func (h *SimpleUniversalHandler) GetJobMetadata(job *livekit.Job) *JobMetadata
GetJobMetadata returns metadata for a job (used when job is assigned)
func (*SimpleUniversalHandler) OnActiveSpeakersChanged ¶
func (h *SimpleUniversalHandler) OnActiveSpeakersChanged(ctx context.Context, speakers []lksdk.Participant)
func (*SimpleUniversalHandler) OnConnectionQualityChanged ¶
func (h *SimpleUniversalHandler) OnConnectionQualityChanged(ctx context.Context, participant *lksdk.RemoteParticipant, quality livekit.ConnectionQuality)
func (*SimpleUniversalHandler) OnDataReceived ¶
func (h *SimpleUniversalHandler) OnDataReceived(ctx context.Context, data []byte, participant *lksdk.RemoteParticipant, kind livekit.DataPacket_Kind)
func (*SimpleUniversalHandler) OnJobAssigned ¶
func (h *SimpleUniversalHandler) OnJobAssigned(ctx context.Context, jobCtx *JobContext) error
func (*SimpleUniversalHandler) OnJobRequest ¶
func (h *SimpleUniversalHandler) OnJobRequest(ctx context.Context, job *livekit.Job) (bool, *JobMetadata)
func (*SimpleUniversalHandler) OnJobTerminated ¶
func (h *SimpleUniversalHandler) OnJobTerminated(ctx context.Context, jobID string)
func (*SimpleUniversalHandler) OnParticipantJoined ¶
func (h *SimpleUniversalHandler) OnParticipantJoined(ctx context.Context, participant *lksdk.RemoteParticipant)
func (*SimpleUniversalHandler) OnParticipantLeft ¶
func (h *SimpleUniversalHandler) OnParticipantLeft(ctx context.Context, participant *lksdk.RemoteParticipant)
func (*SimpleUniversalHandler) OnParticipantMetadataChanged ¶
func (h *SimpleUniversalHandler) OnParticipantMetadataChanged(ctx context.Context, participant *lksdk.RemoteParticipant, oldMetadata string)
func (*SimpleUniversalHandler) OnParticipantSpeakingChanged ¶
func (h *SimpleUniversalHandler) OnParticipantSpeakingChanged(ctx context.Context, participant *lksdk.RemoteParticipant, speaking bool)
func (*SimpleUniversalHandler) OnRoomConnected ¶
func (h *SimpleUniversalHandler) OnRoomConnected(ctx context.Context, room *lksdk.Room)
func (*SimpleUniversalHandler) OnRoomDisconnected ¶
func (*SimpleUniversalHandler) OnRoomMetadataChanged ¶
func (h *SimpleUniversalHandler) OnRoomMetadataChanged(ctx context.Context, oldMetadata, newMetadata string)
func (*SimpleUniversalHandler) OnTrackMuted ¶
func (h *SimpleUniversalHandler) OnTrackMuted(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant)
func (*SimpleUniversalHandler) OnTrackPublished ¶
func (h *SimpleUniversalHandler) OnTrackPublished(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication)
func (*SimpleUniversalHandler) OnTrackSubscribed ¶
func (h *SimpleUniversalHandler) OnTrackSubscribed(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant)
func (*SimpleUniversalHandler) OnTrackUnmuted ¶
func (h *SimpleUniversalHandler) OnTrackUnmuted(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant)
func (*SimpleUniversalHandler) OnTrackUnpublished ¶
func (h *SimpleUniversalHandler) OnTrackUnpublished(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication)
func (*SimpleUniversalHandler) OnTrackUnsubscribed ¶
func (h *SimpleUniversalHandler) OnTrackUnsubscribed(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant)
type SizeBasedGroupRule ¶
type SizeBasedGroupRule struct {
// contains filtered or unexported fields
}
SizeBasedGroupRule limits group sizes
func NewSizeBasedGroupRule ¶
func NewSizeBasedGroupRule(maxSize int) *SizeBasedGroupRule
NewSizeBasedGroupRule creates a size-based group rule
func (*SizeBasedGroupRule) CanJoinGroup ¶
func (r *SizeBasedGroupRule) CanJoinGroup(participant *CoordinatedParticipant, group *ParticipantGroup) bool
func (*SizeBasedGroupRule) OnGroupChange ¶
func (r *SizeBasedGroupRule) OnGroupChange(group *ParticipantGroup, added, removed []string)
type StatusUpdate ¶
type StatusUpdate struct { JobID string Status livekit.JobStatus Error string Timestamp time.Time RetryCount int }
StatusUpdate represents a pending status update
type StatusUpdateRaceProtector ¶
type StatusUpdateRaceProtector struct {
// contains filtered or unexported fields
}
StatusUpdateRaceProtector prevents race conditions during status updates
func NewStatusUpdateRaceProtector ¶
func NewStatusUpdateRaceProtector() *StatusUpdateRaceProtector
NewStatusUpdateRaceProtector creates a new race protector
func (*StatusUpdateRaceProtector) ProcessQueuedUpdates ¶
func (p *StatusUpdateRaceProtector) ProcessQueuedUpdates(handler func(jobID string, status livekit.JobStatus, error string))
ProcessQueuedUpdates processes all queued updates
func (*StatusUpdateRaceProtector) QueueStatusUpdate ¶
func (p *StatusUpdateRaceProtector) QueueStatusUpdate(jobID string, status livekit.JobStatus, error string) bool
QueueStatusUpdate queues a status update if reconnecting
type SyntheticAudioTrack ¶
type SyntheticAudioTrack struct { *webrtc.TrackLocalStaticRTP // contains filtered or unexported fields }
SyntheticAudioTrack creates a synthetic audio track for testing
func NewSyntheticAudioTrack ¶
func NewSyntheticAudioTrack(id string, sampleRate uint32, channels uint16) (*SyntheticAudioTrack, error)
NewSyntheticAudioTrack creates a new synthetic audio track that generates a sine wave
func (*SyntheticAudioTrack) StartGenerating ¶
func (s *SyntheticAudioTrack) StartGenerating(frequency float64)
StartGenerating starts generating synthetic audio data
func (*SyntheticAudioTrack) StopGenerating ¶
func (s *SyntheticAudioTrack) StopGenerating()
StopGenerating stops generating audio data
type SyntheticVideoTrack ¶
type SyntheticVideoTrack struct { *webrtc.TrackLocalStaticSample // contains filtered or unexported fields }
SyntheticVideoTrack creates a synthetic video track for testing
func NewSyntheticVideoTrack ¶
func NewSyntheticVideoTrack(id string, width, height int, frameRate float64) (*SyntheticVideoTrack, error)
NewSyntheticVideoTrack creates a new synthetic video track
func (*SyntheticVideoTrack) StartGenerating ¶
func (s *SyntheticVideoTrack) StartGenerating()
StartGenerating starts generating synthetic video frames
func (*SyntheticVideoTrack) StopGenerating ¶
func (s *SyntheticVideoTrack) StopGenerating()
StopGenerating stops generating video frames
type SystemMetricsCollector ¶
type SystemMetricsCollector struct {
// contains filtered or unexported fields
}
SystemMetricsCollector collects system metrics for load calculation.
This collector gathers CPU and memory usage information at regular intervals. The metrics can be used by load calculators to make resource-aware decisions.
Note: CPU calculation uses a heuristic based on goroutine count rather than actual CPU usage, as accurate CPU measurement requires platform-specific code.
func NewSystemMetricsCollector ¶
func NewSystemMetricsCollector() *SystemMetricsCollector
NewSystemMetricsCollector creates a new system metrics collector.
The collector starts a background goroutine that updates metrics every second. Metrics include:
- Memory usage (allocated and system memory)
- CPU usage estimate (based on goroutine count)
The collector runs until the process exits.
func (*SystemMetricsCollector) GetMetrics ¶
func (s *SystemMetricsCollector) GetMetrics() (cpuPercent, memoryPercent float64, memoryUsedMB, memoryTotalMB uint64)
GetMetrics returns the current system metrics.
Returns:
- cpuPercent: Estimated CPU usage (0-100)
- memoryPercent: Memory usage percentage (0-100)
- memoryUsedMB: Allocated memory in megabytes
- memoryTotalMB: Total system memory in megabytes
The values are snapshots from the most recent collection cycle. The method is thread-safe.
func (*SystemMetricsCollector) Stop ¶
func (s *SystemMetricsCollector) Stop()
Stop stops the metrics collector
type TerminationState ¶
type TerminationState struct { JobID string RequestedAt time.Time RequestCount int CompletedAt *time.Time LastError error }
TerminationState tracks termination request state
type TestPeerConnection ¶
type TestPeerConnection struct { Local *webrtc.PeerConnection Remote *webrtc.PeerConnection Tracks map[string]*webrtc.TrackRemote // contains filtered or unexported fields }
TestPeerConnection creates a pair of connected peer connections for testing
func NewTestPeerConnection ¶
func NewTestPeerConnection() (*TestPeerConnection, error)
NewTestPeerConnection creates a new test peer connection pair
func (*TestPeerConnection) AddTrack ¶
func (tc *TestPeerConnection) AddTrack(track webrtc.TrackLocal) (*webrtc.TrackRemote, error)
AddTrack adds a local track and returns the corresponding remote track
func (*TestPeerConnection) Close ¶
func (tc *TestPeerConnection) Close() error
Close closes both peer connections
type TestRoomManager ¶
type TestRoomManager struct { URL string APIKey string APISecret string // contains filtered or unexported fields }
TestRoomManager manages LiveKit rooms for testing
func NewTestRoomManager ¶
func NewTestRoomManager() *TestRoomManager
NewTestRoomManager creates a new test room manager for local LiveKit server
func (*TestRoomManager) CleanupRooms ¶
func (trm *TestRoomManager) CleanupRooms()
CleanupRooms disconnects all rooms
func (*TestRoomManager) CreateConnectedRooms ¶
CreateConnectedRooms creates two connected rooms with participants
func (*TestRoomManager) CreateRoom ¶
func (trm *TestRoomManager) CreateRoom(roomName string) (*lksdk.Room, error)
CreateRoom creates a new test room
func (*TestRoomManager) PublishSyntheticAudioTrack ¶
func (trm *TestRoomManager) PublishSyntheticAudioTrack(room *lksdk.Room, trackID string) (*lksdk.LocalTrackPublication, error)
PublishSyntheticAudioTrack publishes a synthetic audio track to a room
func (*TestRoomManager) WaitForRemoteTrack ¶
func (trm *TestRoomManager) WaitForRemoteTrack(room *lksdk.Room, trackID string, timeout time.Duration) (*webrtc.TrackRemote, error)
WaitForRemoteTrack waits for a remote track to appear
type ThrottleFilter ¶
type ThrottleFilter struct {
// contains filtered or unexported fields
}
ThrottleFilter throttles events from specific participants
func NewThrottleFilter ¶
func NewThrottleFilter(maxPerMinute int) *ThrottleFilter
NewThrottleFilter creates a new throttle filter
func (*ThrottleFilter) Filter ¶
func (f *ThrottleFilter) Filter(event ParticipantEvent) bool
Filter implements EventFilter
type TimeBasedPolicy ¶
type TimeBasedPolicy struct {
// contains filtered or unexported fields
}
TimeBasedPolicy restricts permissions based on time
func NewTimeBasedPolicy ¶
func NewTimeBasedPolicy(startHour, endHour int) *TimeBasedPolicy
NewTimeBasedPolicy creates a time-based policy
func (*TimeBasedPolicy) EvaluatePermissionRequest ¶
func (p *TimeBasedPolicy) EvaluatePermissionRequest(identity string, current, requested *livekit.ParticipantPermission) (bool, string)
func (*TimeBasedPolicy) GetDefaultPermissions ¶
func (p *TimeBasedPolicy) GetDefaultPermissions(identity string) *livekit.ParticipantPermission
type TimingGuard ¶
type TimingGuard struct {
// contains filtered or unexported fields
}
TimingGuard provides timing protection for operations
type TimingManager ¶
type TimingManager struct {
// contains filtered or unexported fields
}
TimingManager handles clock skew, deadline propagation, and backpressure
func NewTimingManager ¶
func NewTimingManager(logger *zap.Logger, opts TimingManagerOptions) *TimingManager
NewTimingManager creates a new timing manager
func (*TimingManager) CheckBackpressure ¶
func (tm *TimingManager) CheckBackpressure() bool
CheckBackpressure checks if backpressure should be applied
func (*TimingManager) CheckDeadline ¶
func (tm *TimingManager) CheckDeadline(jobID string) (bool, time.Duration)
CheckDeadline checks if a job deadline has been exceeded
func (*TimingManager) GetBackpressureDelay ¶
func (tm *TimingManager) GetBackpressureDelay() time.Duration
GetBackpressureDelay returns the recommended delay for backpressure
func (*TimingManager) GetDeadline ¶
func (tm *TimingManager) GetDeadline(jobID string) (*DeadlineContext, bool)
GetDeadline returns the deadline context for a job
func (*TimingManager) GetMetrics ¶
func (tm *TimingManager) GetMetrics() map[string]interface{}
GetMetrics returns timing-related metrics
func (*TimingManager) NewGuard ¶
func (tm *TimingManager) NewGuard(jobID string, operation string) *TimingGuard
NewTimingGuard creates a guard for timing-sensitive operations
func (*TimingManager) PropagateDeadline ¶
func (tm *TimingManager) PropagateDeadline(ctx context.Context, jobID string) (context.Context, context.CancelFunc)
PropagateDeadline creates a context with deadline from a job
func (*TimingManager) RecordEvent ¶
func (tm *TimingManager) RecordEvent()
RecordEvent records an event for backpressure calculation
func (*TimingManager) RemoveDeadline ¶
func (tm *TimingManager) RemoveDeadline(jobID string)
RemoveDeadline removes a deadline for a completed job
func (*TimingManager) ServerTimeNow ¶
func (tm *TimingManager) ServerTimeNow() time.Time
ServerTimeNow returns the current time adjusted for server clock skew
func (*TimingManager) SetDeadline ¶
func (tm *TimingManager) SetDeadline(jobID string, deadline time.Time, propagatedFrom string)
SetDeadline sets a deadline for a job with propagation support
func (*TimingManager) UpdateServerTime ¶
func (tm *TimingManager) UpdateServerTime(serverTime time.Time, receivedAt time.Time)
UpdateServerTime updates the server time offset based on a timestamp from the server
type TimingManagerOptions ¶
type TimingManagerOptions struct { MaxSkewSamples int // Max samples for skew calculation (default: 10) SkewThreshold time.Duration // Threshold to trigger skew correction (default: 1s) BackpressureWindow time.Duration // Window for rate calculation (default: 1s) BackpressureLimit int // Max events per window (default: 100) }
TimingManagerOptions configures the timing manager
type TrackQualityMonitor ¶
type TrackQualityMonitor struct { // Track is the WebRTC track being monitored Track *webrtc.TrackRemote // Subscription contains the track subscription details Subscription *PublisherTrackSubscription // Stats contains current quality statistics Stats *TrackQualityStats // LastStatsUpdate is when statistics were last updated LastStatsUpdate time.Time // QualityHistory tracks all quality changes for this track QualityHistory []QualityChange // AdaptationEnabled controls whether automatic adaptation is active AdaptationEnabled bool }
TrackQualityMonitor monitors quality metrics for a single track.
Each monitored track has its own monitor instance that tracks statistics, quality history, and manages adaptation decisions.
type TrackQualityStats ¶
type TrackQualityStats struct { // CurrentQuality is the current video quality level CurrentQuality livekit.VideoQuality // PacketsReceived is the total number of packets received PacketsReceived uint64 // PacketsLost is the total number of packets lost PacketsLost uint32 // Bitrate is the current bitrate in bits per second Bitrate uint64 // FrameRate is the current frame rate FrameRate float64 // FrameWidth is the current frame width in pixels FrameWidth uint32 // FrameHeight is the current frame height in pixels FrameHeight uint32 // Jitter is the packet jitter in milliseconds Jitter float64 // RTT is the round-trip time in milliseconds RTT float64 // LastKeyFrame is when the last key frame was received LastKeyFrame time.Time // FreezeCount is the number of video freezes detected FreezeCount uint32 // PauseCount is the number of video pauses detected PauseCount uint32 // TotalFreezeTime is the cumulative freeze duration TotalFreezeTime time.Duration // TotalPauseTime is the cumulative pause duration TotalPauseTime time.Duration }
TrackQualityStats contains quality-related statistics.
These metrics are used to make quality adaptation decisions and provide visibility into track performance.
type TrackSubscriptionFilter ¶
type TrackSubscriptionFilter func(publication *lksdk.RemoteTrackPublication) bool
TrackSubscriptionFilter is a function type for custom filtering of tracks before subscription. It receives a RemoteTrackPublication and returns true if the track should be subscribed to. Filters are applied in order and all must return true for a track to be subscribed.
Example filters:
- Subscribe only to camera tracks: func(pub) bool { return pub.Source() == livekit.TrackSource_CAMERA }
- Skip muted tracks: func(pub) bool { return !pub.IsMuted() }
- Subscribe to specific participants: func(pub) bool { return pub.Participant().Identity() == "user123" }
type TrackSubscriptionManager ¶
type TrackSubscriptionManager struct {
// contains filtered or unexported fields
}
TrackSubscriptionManager manages automatic track subscriptions for LiveKit agents. It provides intelligent subscription management with customizable filters, source priorities, and support for different track types (audio/video). This enables agents to efficiently subscribe only to tracks they need for processing, optimizing bandwidth and performance.
func NewTrackSubscriptionManager ¶
func NewTrackSubscriptionManager() *TrackSubscriptionManager
NewTrackSubscriptionManager creates a new subscription manager with default settings. By default, it enables auto-subscription for both audio and video tracks, with source priorities favoring camera/microphone tracks over screen shares. The default configuration is suitable for most agent use cases.
Default source priorities:
- Camera/Microphone: 100
- Screen Share: 90
- Unknown sources: 50
func (*TrackSubscriptionManager) AddFilter ¶
func (tm *TrackSubscriptionManager) AddFilter(filter TrackSubscriptionFilter)
AddFilter adds a custom filter function for track subscription decisions. Filters are applied in the order they were added, and ALL filters must return true for a track to be subscribed. This allows for complex subscription logic like participant-specific filtering, quality-based decisions, or content analysis.
func (*TrackSubscriptionManager) ClearFilters ¶
func (tm *TrackSubscriptionManager) ClearFilters()
ClearFilters removes all custom filters, returning to default subscription behavior. This is useful for resetting the subscription manager to a clean state.
func (*TrackSubscriptionManager) GetSourcePriority ¶
func (tm *TrackSubscriptionManager) GetSourcePriority(source livekit.TrackSource) int
GetSourcePriority returns the priority value for a specific track source. Higher values indicate higher priority. Returns 0 if no priority has been set for the source.
func (*TrackSubscriptionManager) SetAutoSubscribe ¶
func (tm *TrackSubscriptionManager) SetAutoSubscribe(enabled bool)
SetAutoSubscribe enables or disables automatic subscription for all tracks. When enabled, tracks that pass all filters and type checks will be automatically subscribed. When disabled, all automatic subscription is stopped.
func (*TrackSubscriptionManager) SetSourcePriority ¶
func (tm *TrackSubscriptionManager) SetSourcePriority(source livekit.TrackSource, priority int)
SetSourcePriority sets the priority for a specific track source (higher values = more important). This can be used to prioritize certain types of tracks when bandwidth is limited. For example, setting camera tracks to priority 100 and screen shares to 90 will prefer camera tracks over screen shares during subscription decisions.
func (*TrackSubscriptionManager) SetSubscribeAudio ¶
func (tm *TrackSubscriptionManager) SetSubscribeAudio(enabled bool)
SetSubscribeAudio enables or disables automatic subscription to audio tracks. When disabled, audio tracks will be ignored during auto-subscription even if other conditions are met.
func (*TrackSubscriptionManager) SetSubscribeVideo ¶
func (tm *TrackSubscriptionManager) SetSubscribeVideo(enabled bool)
SetSubscribeVideo enables or disables automatic subscription to video tracks. When disabled, video tracks will be ignored during auto-subscription even if other conditions are met.
func (*TrackSubscriptionManager) ShouldAutoSubscribe ¶
func (tm *TrackSubscriptionManager) ShouldAutoSubscribe(publication *lksdk.RemoteTrackPublication) bool
ShouldAutoSubscribe determines if a track should be automatically subscribed based on the current configuration, filters, and track properties. This method evaluates: 1. Whether auto-subscription is enabled 2. Track type (audio/video) preferences 3. All custom filters in order Returns true if the track should be subscribed, false otherwise.
type TranscodingStage ¶
type TranscodingStage struct {
// contains filtered or unexported fields
}
TranscodingStage handles media transcoding between formats.
This is an example stage showing how to implement format conversion. In a real implementation, this would use actual transcoding libraries to convert between audio/video formats.
The stage demonstrates:
- Format transformation
- Metadata updates
- Stage configuration
func NewTranscodingStage ¶
func NewTranscodingStage(name string, priority int, targetFormat MediaFormat) *TranscodingStage
NewTranscodingStage creates a new transcoding stage.
Parameters:
- name: Unique identifier for this stage
- priority: Execution order (lower runs first)
- targetFormat: The format to transcode media into
Note: This is a demonstration stage. Actual transcoding would require integration with media processing libraries.
func (*TranscodingStage) CanProcess ¶
func (ts *TranscodingStage) CanProcess(mediaType MediaType) bool
CanProcess implements MediaPipelineStage. Processes all media types.
func (*TranscodingStage) GetName ¶
func (ts *TranscodingStage) GetName() string
GetName implements MediaPipelineStage.
func (*TranscodingStage) GetPriority ¶
func (ts *TranscodingStage) GetPriority() int
GetPriority implements MediaPipelineStage.
type UniversalEvent ¶
type UniversalEvent struct { Type UniversalEventType Timestamp time.Time Data interface{} }
UniversalEvent represents a generic event in the universal worker system
type UniversalEventHandler ¶
type UniversalEventHandler func(event UniversalEvent) error
UniversalEventHandler is a function that handles universal events
type UniversalEventProcessor ¶
type UniversalEventProcessor struct {
// contains filtered or unexported fields
}
UniversalEventProcessor handles event processing for the universal worker
func NewUniversalEventProcessor ¶
func NewUniversalEventProcessor() *UniversalEventProcessor
NewUniversalEventProcessor creates a new event processor
func (*UniversalEventProcessor) QueueEvent ¶
func (ep *UniversalEventProcessor) QueueEvent(event UniversalEvent)
QueueEvent queues an event for processing
func (*UniversalEventProcessor) RegisterHandler ¶
func (ep *UniversalEventProcessor) RegisterHandler(eventType UniversalEventType, handler UniversalEventHandler)
RegisterHandler registers a handler for an event type
func (*UniversalEventProcessor) Stop ¶
func (ep *UniversalEventProcessor) Stop()
Stop stops the event processor
type UniversalEventType ¶
type UniversalEventType string
UniversalEventType represents different types of events for universal worker
const ( UniversalEventTypeParticipantJoined UniversalEventType = "participant_joined" UniversalEventTypeParticipantLeft UniversalEventType = "participant_left" UniversalEventTypeTrackPublished UniversalEventType = "track_published" UniversalEventTypeTrackUnpublished UniversalEventType = "track_unpublished" UniversalEventTypeMetadataChanged UniversalEventType = "metadata_changed" UniversalEventTypeConnectionQualityChanged UniversalEventType = "connection_quality_changed" )
type UniversalHandler ¶
type UniversalHandler interface { // Core job lifecycle OnJobRequest(ctx context.Context, job *livekit.Job) (accept bool, metadata *JobMetadata) OnJobAssigned(ctx context.Context, jobCtx *JobContext) error OnJobTerminated(ctx context.Context, jobID string) // Helper method to get job metadata (called internally) GetJobMetadata(job *livekit.Job) *JobMetadata // Room events (all job types) OnRoomConnected(ctx context.Context, room *lksdk.Room) OnRoomDisconnected(ctx context.Context, room *lksdk.Room, reason string) OnRoomMetadataChanged(ctx context.Context, oldMetadata, newMetadata string) // Participant events (all job types) OnParticipantJoined(ctx context.Context, participant *lksdk.RemoteParticipant) OnParticipantLeft(ctx context.Context, participant *lksdk.RemoteParticipant) OnParticipantMetadataChanged(ctx context.Context, participant *lksdk.RemoteParticipant, oldMetadata string) OnParticipantSpeakingChanged(ctx context.Context, participant *lksdk.RemoteParticipant, speaking bool) // Track events (all job types) OnTrackPublished(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication) OnTrackUnpublished(ctx context.Context, participant *lksdk.RemoteParticipant, publication *lksdk.RemoteTrackPublication) OnTrackSubscribed(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant) OnTrackUnsubscribed(ctx context.Context, track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant) OnTrackMuted(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant) OnTrackUnmuted(ctx context.Context, publication lksdk.TrackPublication, participant lksdk.Participant) // Media events OnDataReceived(ctx context.Context, data []byte, participant *lksdk.RemoteParticipant, kind livekit.DataPacket_Kind) // Quality events OnConnectionQualityChanged(ctx context.Context, participant *lksdk.RemoteParticipant, quality livekit.ConnectionQuality) // Active speaker events OnActiveSpeakersChanged(ctx context.Context, speakers []lksdk.Participant) }
UniversalHandler is a unified interface that combines all handler capabilities. Implementations can choose which callbacks to implement based on their needs. All callbacks are optional - default no-op implementations are provided via BaseHandler.
type UniversalWorker ¶
type UniversalWorker struct {
// contains filtered or unexported fields
}
UniversalWorker is a unified agent worker that can handle all job types and provides all capabilities in a single, flexible interface.
This is the recommended way to build agents. The older Worker, ParticipantAgent, and PublisherAgent classes are deprecated.
Key features:
- Handles all job types (ROOM, PARTICIPANT, PUBLISHER)
- Media handling capabilities available to all workers
- Participant management and tracking
- Automatic event routing based on job type
- Clean, simple API with optional features
func NewUniversalWorker ¶
func NewUniversalWorker(serverURL, apiKey, apiSecret string, handler UniversalHandler, opts WorkerOptions) *UniversalWorker
NewUniversalWorker creates a worker that can handle any job type
func (*UniversalWorker) AddCleanupHook ¶
AddCleanupHook convenience method for cleanup hooks
func (*UniversalWorker) AddPreStopHook ¶
AddPreStopHook convenience method for pre-stop hooks
func (*UniversalWorker) AddShutdownHook ¶
func (w *UniversalWorker) AddShutdownHook(phase ShutdownPhase, hook ShutdownHook) error
AddShutdownHook adds custom cleanup logic during shutdown
func (*UniversalWorker) EnableTrack ¶
func (w *UniversalWorker) EnableTrack(trackSID string, enabled bool) error
EnableTrack enables/disables a track without unsubscribing
func (*UniversalWorker) GetActiveJobs ¶
func (w *UniversalWorker) GetActiveJobs() map[string]interface{}
GetActiveJobs returns active jobs (for recovery)
func (*UniversalWorker) GetAllParticipantInfo ¶
func (w *UniversalWorker) GetAllParticipantInfo() map[string]*ParticipantInfo
GetAllParticipantInfo returns information about all participants
func (*UniversalWorker) GetAllParticipants ¶
func (w *UniversalWorker) GetAllParticipants(jobID string) ([]*lksdk.RemoteParticipant, error)
func (*UniversalWorker) GetCoordinationManager ¶
func (w *UniversalWorker) GetCoordinationManager() *MultiParticipantCoordinator
GetCoordinationManager returns the coordination manager
func (*UniversalWorker) GetCurrentLoad ¶
func (w *UniversalWorker) GetCurrentLoad() float32
GetCurrentLoad returns the current load of the worker
func (*UniversalWorker) GetEventProcessor ¶
func (w *UniversalWorker) GetEventProcessor() *UniversalEventProcessor
GetEventProcessor returns the event processor
func (*UniversalWorker) GetJobCheckpoint ¶
func (w *UniversalWorker) GetJobCheckpoint(jobID string) *JobCheckpoint
GetJobCheckpoint returns checkpoint for a specific job
func (*UniversalWorker) GetJobContext ¶
func (w *UniversalWorker) GetJobContext(jobID string) (*JobContext, bool)
Job context access
func (*UniversalWorker) GetLogger ¶
func (w *UniversalWorker) GetLogger() Logger
GetLogger returns the logger instance
func (*UniversalWorker) GetMetrics ¶
func (w *UniversalWorker) GetMetrics() map[string]int64
GetMetrics returns operational metrics
func (*UniversalWorker) GetOptions ¶
func (w *UniversalWorker) GetOptions() WorkerOptions
GetOptions returns the worker options
func (*UniversalWorker) GetParticipant ¶
func (w *UniversalWorker) GetParticipant(jobID string, identity string) (*lksdk.RemoteParticipant, error)
Participant management
func (*UniversalWorker) GetParticipantInfo ¶
func (w *UniversalWorker) GetParticipantInfo(identity string) (*ParticipantInfo, bool)
GetParticipantInfo returns information about a specific participant
func (*UniversalWorker) GetPermissionManager ¶
func (w *UniversalWorker) GetPermissionManager() *ParticipantPermissionManager
GetPermissionManager returns the permission manager
func (*UniversalWorker) GetPublisherInfo ¶
func (w *UniversalWorker) GetPublisherInfo() *lksdk.RemoteParticipant
GetPublisherInfo returns the currently tracked publisher participant
func (*UniversalWorker) GetQueueStats ¶
func (w *UniversalWorker) GetQueueStats() map[string]interface{}
GetQueueStats returns job queue statistics
func (*UniversalWorker) GetResourcePoolStats ¶
func (w *UniversalWorker) GetResourcePoolStats() map[string]interface{}
GetResourcePoolStats returns resource pool statistics
func (*UniversalWorker) GetServerURL ¶
func (w *UniversalWorker) GetServerURL() string
GetServerURL returns the server URL
func (*UniversalWorker) GetShutdownHooks ¶
func (w *UniversalWorker) GetShutdownHooks(phase ShutdownPhase) []ShutdownHook
GetShutdownHooks returns all hooks for a phase
func (*UniversalWorker) GetSubscribedTracks ¶
func (w *UniversalWorker) GetSubscribedTracks() map[string]*PublisherTrackSubscription
GetSubscribedTracks returns all currently subscribed tracks
func (*UniversalWorker) GetTargetParticipant ¶
func (w *UniversalWorker) GetTargetParticipant() *lksdk.RemoteParticipant
GetTargetParticipant returns the target participant being monitored
func (*UniversalWorker) GetTrackPublication ¶
func (w *UniversalWorker) GetTrackPublication(trackID string) (*lksdk.RemoteTrackPublication, error)
GetTrackPublication gets a track publication by track ID
func (*UniversalWorker) GetTrackStats ¶
func (w *UniversalWorker) GetTrackStats(trackID string) (map[string]interface{}, error)
GetTrackStats gets statistics for a track
func (*UniversalWorker) GetWorkerType ¶
func (w *UniversalWorker) GetWorkerType() livekit.JobType
GetWorkerType returns the type of jobs this worker handles
func (*UniversalWorker) Health ¶
func (w *UniversalWorker) Health() map[string]interface{}
Health returns comprehensive health status report
func (*UniversalWorker) IsConnected ¶
func (w *UniversalWorker) IsConnected() bool
IsConnected returns whether the worker is connected to the server
func (*UniversalWorker) PublishTrack ¶
func (w *UniversalWorker) PublishTrack(jobID string, track webrtc.TrackLocal) (*lksdk.LocalTrackPublication, error)
Media capabilities
func (*UniversalWorker) QueueJob ¶
func (w *UniversalWorker) QueueJob(job *livekit.Job, token, url string) error
QueueJob adds a job to the priority queue when at capacity
func (*UniversalWorker) RemoveShutdownHook ¶
func (w *UniversalWorker) RemoveShutdownHook(phase ShutdownPhase, name string) bool
RemoveShutdownHook removes a shutdown hook
func (*UniversalWorker) RequestPermissionChange ¶
func (w *UniversalWorker) RequestPermissionChange(jobID string, identity string, permissions *livekit.ParticipantPermission) error
RequestPermissionChange requests permission changes for a participant
func (*UniversalWorker) SendDataToParticipant ¶
func (*UniversalWorker) SetActiveJob ¶
func (w *UniversalWorker) SetActiveJob(jobID string, job interface{})
SetActiveJob sets an active job (for recovery)
func (*UniversalWorker) SetDebugMode ¶
func (w *UniversalWorker) SetDebugMode(enabled bool)
SetDebugMode enables or disables debug mode
func (*UniversalWorker) SetTrackDimensions ¶
func (w *UniversalWorker) SetTrackDimensions(trackSID string, width, height uint32) error
SetTrackDimensions sets preferred video dimensions
func (*UniversalWorker) SetTrackFrameRate ¶
func (w *UniversalWorker) SetTrackFrameRate(trackSID string, fps float64) error
SetTrackFrameRate sets preferred frame rate for video tracks
func (*UniversalWorker) SetTrackQuality ¶
func (w *UniversalWorker) SetTrackQuality(trackSID string, quality livekit.VideoQuality) error
SetTrackQuality sets preferred quality for video tracks
func (*UniversalWorker) Start ¶
func (w *UniversalWorker) Start(ctx context.Context) error
Start begins the worker's operation
func (*UniversalWorker) Stop ¶
func (w *UniversalWorker) Stop() error
Stop gracefully shuts down the worker
func (*UniversalWorker) StopWithTimeout ¶
func (w *UniversalWorker) StopWithTimeout(timeout time.Duration) error
StopWithTimeout gracefully shuts down the worker with a custom timeout
func (*UniversalWorker) SubscribeToTrack ¶
func (w *UniversalWorker) SubscribeToTrack(publication *lksdk.RemoteTrackPublication) error
SubscribeToTrack manually subscribes to a specific track
func (*UniversalWorker) UnsubscribeFromTrack ¶
func (w *UniversalWorker) UnsubscribeFromTrack(publication *lksdk.RemoteTrackPublication) error
UnsubscribeFromTrack manually unsubscribes from a track
func (*UniversalWorker) UnsubscribeTrack ¶
func (w *UniversalWorker) UnsubscribeTrack(trackID string) error
UnsubscribeTrack unsubscribes from a track by track ID
func (*UniversalWorker) UpdateStatus ¶
func (w *UniversalWorker) UpdateStatus(status WorkerStatus, load float32) error
UpdateStatus updates the worker's status and load
type VideoDimensions ¶
VideoDimensions represents video dimensions
type VideoFormat ¶
type VideoFormat int
VideoFormat represents video pixel formats.
These formats describe how pixel data is organized in memory. Different formats have different memory layouts and color spaces.
const ( // VideoFormatI420 is YUV 4:2:0 planar format (most common for video) VideoFormatI420 VideoFormat = iota // VideoFormatNV12 is YUV 4:2:0 semi-planar format VideoFormatNV12 // VideoFormatRGBA is RGB with alpha channel (4 bytes per pixel) VideoFormatRGBA // VideoFormatBGRA is BGR with alpha channel (4 bytes per pixel) VideoFormatBGRA )
type WebSocketConn ¶
type WebSocketConn interface { WriteMessage(messageType int, data []byte) error SetWriteDeadline(t time.Time) error }
WebSocketConn interface for WebSocket operations
type WebSocketState ¶
type WebSocketState int
WebSocketState represents the current state of the WebSocket connection
const ( WebSocketStateDisconnected WebSocketState = iota WebSocketStateConnecting WebSocketStateConnected WebSocketStateReconnecting )
type WorkerInfo ¶
WorkerInfo tracks information about a worker including its current load, job count, and capacity. This information is used by the LoadBalancer to make distribution decisions.
type WorkerInterface ¶
type WorkerInterface interface { // UpdateStatus updates the worker's status and load UpdateStatus(status WorkerStatus, load float32) error // GetServerURL returns the server URL GetServerURL() string // GetLogger returns the logger instance GetLogger() Logger // GetActiveJobs returns active jobs (for recovery) GetActiveJobs() map[string]interface{} // SetActiveJob sets an active job (for recovery) SetActiveJob(jobID string, job interface{}) // contains filtered or unexported methods }
WorkerInterface defines the common interface for worker implementations. This allows shared components like JobRecoveryManager and LoadBatcher to work with both Worker and UniversalWorker.
type WorkerMessage ¶
type WorkerMessage struct {
Message interface{} // Actual protobuf message
}
WorkerMessage represents a message sent from worker to server
type WorkerMessage_Availability ¶
type WorkerMessage_Availability struct {
Availability *livekit.AvailabilityResponse
}
type WorkerMessage_JobAccept ¶
type WorkerMessage_JobAccept struct {
JobAccept *JobAcceptMessage
}
type WorkerMessage_Ping ¶
type WorkerMessage_Pong ¶
type WorkerMessage_Register ¶
type WorkerMessage_Register struct {
Register *livekit.RegisterWorkerRequest
}
Specific message types for WorkerMessage
type WorkerMessage_UpdateJob ¶
type WorkerMessage_UpdateJob struct {
UpdateJob *livekit.UpdateJobStatus
}
type WorkerMessage_UpdateWorker ¶
type WorkerMessage_UpdateWorker struct {
UpdateWorker *livekit.UpdateWorkerStatus
}
type WorkerOptions ¶
type WorkerOptions struct { // AgentName identifies this agent type. // Jobs can be dispatched to specific agent names. // If empty, the agent will receive jobs for any agent name. AgentName string // Version is the agent version string. // This is reported to the server for debugging and compatibility checks. Version string // Namespace provides multi-tenant isolation. // Agents in different namespaces cannot see each other's jobs. // If empty, uses the default namespace. Namespace string // JobType specifies which type of jobs this agent handles. // Common types include: // - JT_ROOM: Agent joins when a room is created // - JT_PARTICIPANT: Agent joins when participants connect // - JT_PUBLISHER: Agent joins when participants publish media JobType livekit.JobType // Permissions the agent will have when joining rooms. // These permissions determine what the agent can do: // - CanPublish: Publish audio/video tracks // - CanSubscribe: Subscribe to other participants' tracks // - CanPublishData: Send data messages // - Hidden: Hide from other participants // If nil, default permissions are used. Permissions *livekit.ParticipantPermission // MaxJobs is the maximum number of concurrent jobs this worker will handle. // Once this limit is reached, the worker reports as "full" and won't receive new jobs. // Set to 0 for unlimited jobs (limited only by system resources). MaxJobs int // Logger for debug output. // If nil, a default logger is used. // Implement the Logger interface for custom logging integration. Logger Logger // PingInterval for keepalive messages to the server. // Regular pings ensure the connection stays active and detect disconnections quickly. // Default: 10s PingInterval time.Duration // PingTimeout for keepalive responses. // If a ping response isn't received within this duration, the connection is considered lost. // Default: 2s PingTimeout time.Duration // LoadCalculator for custom load calculation. // Implement this to define custom metrics for job assignment decisions. // If nil and EnableCPUMemoryLoad is true, a default calculator is used. LoadCalculator LoadCalculator // EnableCPUMemoryLoad enables CPU/memory-based load calculation. // When true, the worker reports system resource usage to help with job distribution. EnableCPUMemoryLoad bool // EnableLoadPrediction enables predictive load calculation. // Uses historical data to predict future load and make better job assignment decisions. EnableLoadPrediction bool // StatusUpdateBatchInterval batches status updates to reduce server load. // Multiple status updates within this interval are combined into a single message. // Set to 0 to disable batching (send updates immediately). StatusUpdateBatchInterval time.Duration // JobRecoveryHandler for custom job recovery logic. // Called when reconnecting to determine which jobs should be resumed. // If nil, all jobs are recovered by default when EnableJobRecovery is true. JobRecoveryHandler JobRecoveryHandler // EnableJobRecovery enables job recovery after reconnection. // When true, the worker attempts to resume jobs after temporary disconnections. EnableJobRecovery bool // PartialMessageBufferSize max size for partial WebSocket message buffering. // Prevents memory exhaustion from malformed or malicious messages. // Default: 1MB PartialMessageBufferSize int // EnableJobQueue enables job queueing with priority handling. // Queued jobs are processed in priority order when capacity becomes available. EnableJobQueue bool // JobQueueSize maximum number of jobs in queue. // When the queue is full, new jobs are rejected. // Set to 0 for unlimited queue size. JobQueueSize int // JobPriorityCalculator custom priority calculator. // Implement this to define custom job prioritization logic. // If nil, jobs are processed in FIFO order. JobPriorityCalculator JobPriorityCalculator // EnableResourcePool enables resource pooling for efficient resource reuse. // Useful for expensive resources like ML models or media processors. EnableResourcePool bool // ResourcePoolMinSize minimum number of resources to maintain in the pool. // Resources are pre-allocated to this level for quick job startup. ResourcePoolMinSize int // ResourcePoolMaxSize maximum number of resources in pool. // Limits memory usage from idle resources. ResourcePoolMaxSize int // ResourceFactory custom resource factory. // Implement this to define how resources are created and destroyed. // If nil, resource pooling is disabled even if EnableResourcePool is true. ResourceFactory ResourceFactory // StrictProtocolMode rejects unknown message types if true. // Enable for better security and protocol compliance. // Disable for forward compatibility with newer server versions. StrictProtocolMode bool // CustomMessageHandlers for handling custom message types. // Allows extending the protocol with application-specific messages. CustomMessageHandlers []MessageTypeHandler // EnableResourceMonitoring enables out-of-memory and leak detection. // The worker monitors resource usage and can take corrective action. EnableResourceMonitoring bool // MemoryLimitMB sets memory limit for OOM detection. // When exceeded, the worker may reject new jobs or trigger garbage collection. // Set to 0 to use 80% of system memory. MemoryLimitMB int // GoroutineLimit sets max goroutines before leak detection. // Helps detect goroutine leaks that could exhaust system resources. // Set to 0 to use default limit of 10000. GoroutineLimit int // EnableResourceLimits enables hard resource limit enforcement. // Uses OS-level limits to prevent resource exhaustion. // May require elevated privileges on some systems. EnableResourceLimits bool // HardMemoryLimitMB sets hard memory limit in MB. // Process is killed if this limit is exceeded. // Set to 0 to use default of 1024MB. HardMemoryLimitMB int // CPUQuotaPercent sets CPU quota as percentage. // 100 = 1 CPU core, 200 = 2 CPU cores, etc. // Set to 0 to use default of 200 (2 cores). CPUQuotaPercent int // MaxFileDescriptors sets max file descriptors. // Prevents "too many open files" errors. // Set to 0 to use default of 1024. MaxFileDescriptors int }
WorkerOptions configures the agent worker behavior and capabilities. These options control how the worker connects to LiveKit, handles jobs, manages resources, and reports its status.
type WorkerResource ¶
type WorkerResource struct { ID string CreatedAt time.Time LastUsedAt time.Time UseCount int64 // contains filtered or unexported fields }
WorkerResource represents a pooled worker resource
func (*WorkerResource) Close ¶
func (w *WorkerResource) Close() error
func (*WorkerResource) IsHealthy ¶
func (w *WorkerResource) IsHealthy() bool
func (*WorkerResource) Reset ¶
func (w *WorkerResource) Reset() error
type WorkerResourceFactory ¶
type WorkerResourceFactory struct {
// contains filtered or unexported fields
}
WorkerResourceFactory creates worker resources
func (*WorkerResourceFactory) Create ¶
func (f *WorkerResourceFactory) Create(ctx context.Context) (Resource, error)
func (*WorkerResourceFactory) Validate ¶
func (f *WorkerResourceFactory) Validate(resource Resource) error
type WorkerState ¶
type WorkerState struct { // WorkerID is the unique identifier assigned by the server. WorkerID string // ActiveJobs maps job IDs to their current state. ActiveJobs map[string]*JobState // LastStatus is the last reported worker status. LastStatus WorkerStatus // LastLoad is the last reported load value (0.0 to 1.0). LastLoad float32 }
WorkerState holds persistent state for reconnection recovery. This state is preserved across disconnections to enable seamless recovery.
type WorkerStatus ¶
type WorkerStatus int
WorkerStatus represents the current state of the worker. Used by the server to determine job assignment.
const ( // WorkerStatusAvailable indicates the worker can accept new jobs. WorkerStatusAvailable WorkerStatus = iota // WorkerStatusFull indicates the worker is at capacity. // No new jobs will be assigned until capacity is available. WorkerStatusFull )
func (WorkerStatus) String ¶
func (ws WorkerStatus) String() string
String returns the string representation of WorkerStatus. Returns "available", "full", or "unknown".
Source Files
¶
- job_queue.go
- job_recovery.go
- livekit_test_utils.go
- load_calculator.go
- media_pipeline.go
- multi_participant_coordinator.go
- network_handler.go
- participant_event_processor.go
- participant_permission_manager.go
- protocol_handler.go
- quality_controller.go
- race_protection.go
- resource_limiter.go
- resource_monitor.go
- resource_pool.go
- shutdown_hooks.go
- test_utils.go
- timing_manager.go
- track_subscription_manager.go
- types.go
- universal_handler.go
- universal_helpers.go
- universal_types.go
- universal_worker.go
- universal_worker_websocket.go
- utils.go
- worker_interface.go