Documentation
¶
Overview ¶
Package scheduler provides a global spawn scheduler with paced pane/agent creation. It serializes and paces all pane + agent creation to prevent resource exhaustion and rate limit errors.
Index ¶
- func CalculateJitteredDelay(base time.Duration, jitterFactor float64) time.Duration
- func ExponentialBackoff(attempt int, initial, max time.Duration, multiplier float64) time.Duration
- func FormatETA(d time.Duration) string
- func SetGlobal(s *Scheduler)
- type AgentCapConfig
- type AgentCapStats
- type AgentCaps
- func (ac *AgentCaps) Acquire(ctx context.Context, agentType string) error
- func (ac *AgentCaps) ForceRampUp(agentType string)
- func (ac *AgentCaps) GetAvailable(agentType string) int
- func (ac *AgentCaps) GetCurrentCap(agentType string) int
- func (ac *AgentCaps) GetRunning(agentType string) int
- func (ac *AgentCaps) RecordFailure(agentType string)
- func (ac *AgentCaps) RecordSuccess(agentType string)
- func (ac *AgentCaps) Release(agentType string)
- func (ac *AgentCaps) Reset()
- func (ac *AgentCaps) SetCap(agentType string, cap int)
- func (ac *AgentCaps) Stats() CapsStats
- func (ac *AgentCaps) TryAcquire(agentType string) bool
- type AgentCapsConfig
- type AgentLimiterConfig
- type BackoffConfig
- type BackoffController
- func (bc *BackoffController) HandleError(job *SpawnJob, resErr *ResourceError) (bool, time.Duration)
- func (bc *BackoffController) IsInGlobalBackoff() bool
- func (bc *BackoffController) RecordSuccess()
- func (bc *BackoffController) RemainingBackoff() time.Duration
- func (bc *BackoffController) Reset()
- func (bc *BackoffController) SetHooks(onStart func(time.Duration, ResourceErrorType), onEnd func(time.Duration), ...)
- func (bc *BackoffController) SetScheduler(s *Scheduler)
- func (bc *BackoffController) Stats() BackoffStats
- type BackoffStats
- type CapsStats
- type Config
- type FairScheduler
- type FairSchedulerConfig
- type HeadroomConfig
- type HeadroomGuard
- func (h *HeadroomGuard) BlockReason() string
- func (h *HeadroomGuard) CheckHeadroom() (bool, string)
- func (h *HeadroomGuard) IsBlocked() bool
- func (h *HeadroomGuard) Remediation() string
- func (h *HeadroomGuard) SetCallbacks(onBlocked func(reason string, limits *ResourceLimits, usage *ResourceUsage), ...)
- func (h *HeadroomGuard) Status() HeadroomStatus
- func (h *HeadroomGuard) Stop()
- type HeadroomStatus
- type Hooks
- type JobPriority
- type JobProgress
- type JobQueue
- func (q *JobQueue) CancelBatch(batchID string) []*SpawnJob
- func (q *JobQueue) CancelSession(sessionName string) []*SpawnJob
- func (q *JobQueue) Clear() []*SpawnJob
- func (q *JobQueue) CountByBatch(batchID string) int
- func (q *JobQueue) CountBySession(sessionName string) int
- func (q *JobQueue) Dequeue() *SpawnJob
- func (q *JobQueue) Enqueue(job *SpawnJob)
- func (q *JobQueue) Get(id string) *SpawnJob
- func (q *JobQueue) IsEmpty() bool
- func (q *JobQueue) Len() int
- func (q *JobQueue) ListAll() []*SpawnJob
- func (q *JobQueue) ListByBatch(batchID string) []*SpawnJob
- func (q *JobQueue) ListBySession(sessionName string) []*SpawnJob
- func (q *JobQueue) Peek() *SpawnJob
- func (q *JobQueue) Remove(id string) *SpawnJob
- func (q *JobQueue) Stats() QueueStats
- type JobStatus
- type JobType
- type LimiterConfig
- type LimiterStats
- type PerAgentLimiter
- type Progress
- type ProgressBroadcaster
- type ProgressEvent
- type ProgressSubscriber
- type QueueStats
- type RateLimitInfo
- type RateLimiter
- func (r *RateLimiter) AvailableTokens() float64
- func (r *RateLimiter) Reset()
- func (r *RateLimiter) SetCapacity(capacity float64)
- func (r *RateLimiter) SetMinInterval(interval time.Duration)
- func (r *RateLimiter) SetRate(rate float64)
- func (r *RateLimiter) Stats() LimiterStats
- func (r *RateLimiter) TimeUntilNextToken() time.Duration
- func (r *RateLimiter) TryAcquire() bool
- func (r *RateLimiter) Wait(ctx context.Context) error
- func (r *RateLimiter) Waiting() int
- type ResourceError
- type ResourceErrorType
- type ResourceLimits
- type ResourceUsage
- type Scheduler
- func (s *Scheduler) Cancel(jobID string) bool
- func (s *Scheduler) CancelBatch(batchID string) int
- func (s *Scheduler) CancelSession(sessionName string) int
- func (s *Scheduler) EstimateETA(jobID string) (time.Duration, error)
- func (s *Scheduler) GetJob(jobID string) *SpawnJob
- func (s *Scheduler) GetProgress() *Progress
- func (s *Scheduler) GetQueuedJobs() []*SpawnJob
- func (s *Scheduler) GetRecentCompleted(limit int) []*SpawnJob
- func (s *Scheduler) GetRunningJobs() []*SpawnJob
- func (s *Scheduler) IsPaused() bool
- func (s *Scheduler) Pause()
- func (s *Scheduler) Resume()
- func (s *Scheduler) SetExecutor(executor SpawnExecutor)
- func (s *Scheduler) SetHooks(hooks Hooks)
- func (s *Scheduler) Start() error
- func (s *Scheduler) Stats() Stats
- func (s *Scheduler) Stop()
- func (s *Scheduler) Submit(job *SpawnJob) error
- func (s *Scheduler) SubmitBatch(jobs []*SpawnJob) (string, error)
- type SessionProgress
- type SpawnExecutor
- type SpawnJob
- func (j *SpawnJob) CanRetry() bool
- func (j *SpawnJob) Cancel()
- func (j *SpawnJob) Clone() *SpawnJob
- func (j *SpawnJob) Context() context.Context
- func (j *SpawnJob) ExecutionDuration() time.Duration
- func (j *SpawnJob) GetStatus() JobStatus
- func (j *SpawnJob) IncrementRetry()
- func (j *SpawnJob) IsCancelled() bool
- func (j *SpawnJob) IsTerminal() bool
- func (j *SpawnJob) QueueDuration() time.Duration
- func (j *SpawnJob) SetError(err error)
- func (j *SpawnJob) SetStatus(status JobStatus)
- func (j *SpawnJob) TotalDuration() time.Duration
- type SpawnResult
- type Stats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CalculateJitteredDelay ¶
CalculateJitteredDelay calculates a delay with jitter for a given base. Useful for external callers that need jittered timing.
func ExponentialBackoff ¶
ExponentialBackoff returns the delay for the nth retry attempt.
Types ¶
type AgentCapConfig ¶
type AgentCapConfig struct {
// MaxConcurrent is the maximum number of concurrent instances.
MaxConcurrent int `json:"max_concurrent"`
// RampUpEnabled enables gradual capacity increase over time.
RampUpEnabled bool `json:"ramp_up_enabled"`
// RampUpInitial is the starting cap when ramp-up is enabled.
RampUpInitial int `json:"ramp_up_initial"`
// RampUpStep is how much to increase the cap each interval.
RampUpStep int `json:"ramp_up_step"`
// RampUpInterval is how often to increase the cap.
RampUpInterval time.Duration `json:"ramp_up_interval"`
// CooldownOnFailure reduces the cap on failure.
CooldownOnFailure bool `json:"cooldown_on_failure"`
// CooldownReduction is how much to reduce on failure.
CooldownReduction int `json:"cooldown_reduction"`
// CooldownRecovery is how long before restoring cap after cooldown.
CooldownRecovery time.Duration `json:"cooldown_recovery"`
}
AgentCapConfig configures concurrency caps for a specific agent type.
func CodexCapConfig ¶
func CodexCapConfig() AgentCapConfig
CodexCapConfig returns Codex-specific conservative defaults.
func DefaultAgentCapConfig ¶
func DefaultAgentCapConfig() AgentCapConfig
DefaultAgentCapConfig returns sensible defaults for agent caps.
type AgentCapStats ¶
type AgentCapStats struct {
Running int `json:"running"` // Currently running instances
CurrentCap int `json:"current_cap"` // Current effective cap
MaxCap int `json:"max_cap"` // Configured max cap
Waiting int `json:"waiting"` // Waiting for capacity
InRampUp bool `json:"in_ramp_up"` // Currently ramping up
InCooldown bool `json:"in_cooldown"` // Currently in cooldown
}
AgentCapStats contains statistics for one agent type.
type AgentCaps ¶
type AgentCaps struct {
// contains filtered or unexported fields
}
AgentCaps manages per-agent concurrency caps with ramp-up and cooldown.
func NewAgentCaps ¶
func NewAgentCaps(cfg AgentCapsConfig) *AgentCaps
NewAgentCaps creates a new agent caps manager.
func (*AgentCaps) ForceRampUp ¶
ForceRampUp immediately increases cap to max for an agent type.
func (*AgentCaps) GetAvailable ¶
GetAvailable returns available slots for an agent type.
func (*AgentCaps) GetCurrentCap ¶
GetCurrentCap returns the current effective cap for an agent type.
func (*AgentCaps) GetRunning ¶
GetRunning returns the current running count for an agent type.
func (*AgentCaps) RecordFailure ¶
RecordFailure records a failure for an agent type, potentially triggering cooldown.
func (*AgentCaps) RecordSuccess ¶
RecordSuccess records a successful spawn for an agent type.
func (*AgentCaps) Reset ¶
func (ac *AgentCaps) Reset()
Reset resets all cap states to initial values.
func (*AgentCaps) TryAcquire ¶
TryAcquire tries to acquire a slot without blocking. Returns true if acquired, false if at capacity.
type AgentCapsConfig ¶
type AgentCapsConfig struct {
// Default is the default cap config for unknown agent types.
Default AgentCapConfig `json:"default"`
// PerAgent contains per-agent-type overrides.
PerAgent map[string]AgentCapConfig `json:"per_agent,omitempty"`
// GlobalMax is the absolute maximum across all agents (0 = no limit).
GlobalMax int `json:"global_max"`
}
AgentCapsConfig contains configuration for all agent type caps.
func DefaultAgentCapsConfig ¶
func DefaultAgentCapsConfig() AgentCapsConfig
DefaultAgentCapsConfig returns sensible defaults for agent caps.
type AgentLimiterConfig ¶
type AgentLimiterConfig struct {
// Default is the default limiter config for unknown agent types.
Default LimiterConfig `json:"default"`
// PerAgent contains per-agent-type overrides.
PerAgent map[string]LimiterConfig `json:"per_agent,omitempty"`
}
AgentLimiterConfig contains configuration for per-agent limiting.
func DefaultAgentLimiterConfig ¶
func DefaultAgentLimiterConfig() AgentLimiterConfig
DefaultAgentLimiterConfig returns sensible defaults for agent rate limiting.
type BackoffConfig ¶
type BackoffConfig struct {
// InitialDelay is the starting delay for backoff.
InitialDelay time.Duration `json:"initial_delay"`
// MaxDelay is the maximum backoff delay.
MaxDelay time.Duration `json:"max_delay"`
// Multiplier is the factor by which delay increases.
Multiplier float64 `json:"multiplier"`
// JitterFactor is the random jitter as a fraction of delay (0.0-1.0).
JitterFactor float64 `json:"jitter_factor"`
// MaxRetries is the maximum number of retry attempts.
MaxRetries int `json:"max_retries"`
// PauseQueueOnBackoff pauses the scheduler queue during backoff.
PauseQueueOnBackoff bool `json:"pause_queue_on_backoff"`
// ConsecutiveFailuresThreshold triggers global backoff after N failures.
ConsecutiveFailuresThreshold int `json:"consecutive_failures_threshold"`
}
BackoffConfig configures the exponential backoff behavior.
func DefaultBackoffConfig ¶
func DefaultBackoffConfig() BackoffConfig
DefaultBackoffConfig returns sensible default configuration.
type BackoffController ¶
type BackoffController struct {
// contains filtered or unexported fields
}
BackoffController manages exponential backoff with jitter for resource errors.
func NewBackoffController ¶
func NewBackoffController(cfg BackoffConfig) *BackoffController
NewBackoffController creates a new backoff controller.
func (*BackoffController) HandleError ¶
func (bc *BackoffController) HandleError(job *SpawnJob, resErr *ResourceError) (bool, time.Duration)
HandleError processes a resource error and returns the backoff action. Returns (shouldRetry, delay) where delay is 0 if no backoff needed.
func (*BackoffController) IsInGlobalBackoff ¶
func (bc *BackoffController) IsInGlobalBackoff() bool
IsInGlobalBackoff returns true if global backoff is active.
func (*BackoffController) RecordSuccess ¶
func (bc *BackoffController) RecordSuccess()
RecordSuccess is the public method to record success.
func (*BackoffController) RemainingBackoff ¶
func (bc *BackoffController) RemainingBackoff() time.Duration
RemainingBackoff returns remaining time in global backoff.
func (*BackoffController) Reset ¶
func (bc *BackoffController) Reset()
Reset resets the backoff controller to initial state.
func (*BackoffController) SetHooks ¶
func (bc *BackoffController) SetHooks( onStart func(time.Duration, ResourceErrorType), onEnd func(time.Duration), onExhausted func(*SpawnJob, int), )
SetHooks sets event callbacks.
func (*BackoffController) SetScheduler ¶
func (bc *BackoffController) SetScheduler(s *Scheduler)
SetScheduler sets the scheduler reference for queue pause/resume.
func (*BackoffController) Stats ¶
func (bc *BackoffController) Stats() BackoffStats
Stats returns backoff statistics.
type BackoffStats ¶
type BackoffStats struct {
TotalBackoffs int64 `json:"total_backoffs"`
TotalRetries int64 `json:"total_retries"`
TotalExhausted int64 `json:"total_exhausted"`
MaxConsecutive int `json:"max_consecutive"`
TotalBackoffTime time.Duration `json:"total_backoff_time"`
LastBackoffReason ResourceErrorType `json:"last_backoff_reason,omitempty"`
LastBackoffAt time.Time `json:"last_backoff_at,omitempty"`
}
BackoffStats contains backoff statistics.
type CapsStats ¶
type CapsStats struct {
// PerAgent contains per-agent statistics.
PerAgent map[string]AgentCapStats `json:"per_agent"`
// TotalRunning is the total running across all agents.
TotalRunning int `json:"total_running"`
// TotalWaiting is the total waiting across all agents.
TotalWaiting int `json:"total_waiting"`
}
CapsStats contains cap statistics.
type Config ¶
type Config struct {
// MaxConcurrent is the maximum number of concurrent spawn operations.
MaxConcurrent int `json:"max_concurrent"`
// GlobalRateLimit is the global rate limiter configuration.
GlobalRateLimit LimiterConfig `json:"global_rate_limit"`
// AgentRateLimits is the per-agent rate limiter configuration.
AgentRateLimits AgentLimiterConfig `json:"agent_rate_limits"`
// AgentCaps is the per-agent concurrency caps configuration.
AgentCaps AgentCapsConfig `json:"agent_caps"`
// FairScheduler is the fair scheduler configuration.
FairScheduler FairSchedulerConfig `json:"fair_scheduler"`
// Backoff is the backoff configuration for resource errors.
Backoff BackoffConfig `json:"backoff"`
// MaxCompleted is the number of completed jobs to retain for status.
MaxCompleted int `json:"max_completed"`
// DefaultRetries is the default number of retries for failed jobs.
DefaultRetries int `json:"default_retries"`
// DefaultRetryDelay is the default delay between retries.
DefaultRetryDelay time.Duration `json:"default_retry_delay"`
// BackpressureThreshold is the queue size that triggers backpressure alerts.
BackpressureThreshold int `json:"backpressure_threshold"`
// Headroom is the pre-spawn resource headroom configuration.
Headroom HeadroomConfig `json:"headroom"`
}
Config configures the scheduler.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns sensible default configuration.
type FairScheduler ¶
type FairScheduler struct {
// contains filtered or unexported fields
}
FairScheduler wraps JobQueue with fairness guarantees.
func NewFairScheduler ¶
func NewFairScheduler(cfg FairSchedulerConfig) *FairScheduler
NewFairScheduler creates a new fair scheduler.
func (*FairScheduler) Enqueue ¶
func (f *FairScheduler) Enqueue(job *SpawnJob)
Enqueue adds a job to the queue.
func (*FairScheduler) MarkComplete ¶
func (f *FairScheduler) MarkComplete(job *SpawnJob)
MarkComplete marks a job as complete for fairness tracking.
func (*FairScheduler) Queue ¶
func (f *FairScheduler) Queue() *JobQueue
Queue returns the underlying queue for direct access.
func (*FairScheduler) RunningCount ¶
func (f *FairScheduler) RunningCount(sessionName string) int
RunningCount returns the number of running jobs for a session.
func (*FairScheduler) TryDequeue ¶
func (f *FairScheduler) TryDequeue() *SpawnJob
TryDequeue returns the next job that can run without violating fairness. Returns nil if no eligible job is available.
type FairSchedulerConfig ¶
type FairSchedulerConfig struct {
MaxPerSession int `json:"max_per_session"`
MaxPerBatch int `json:"max_per_batch"`
}
FairSchedulerConfig configures the fair scheduler.
func DefaultFairSchedulerConfig ¶
func DefaultFairSchedulerConfig() FairSchedulerConfig
DefaultFairSchedulerConfig returns sensible defaults.
type HeadroomConfig ¶
type HeadroomConfig struct {
// Enabled toggles headroom checking.
Enabled bool `json:"enabled"`
// Threshold is the usage percentage (0.0-1.0) above which spawns are blocked.
// Default: 0.75 (75%)
Threshold float64 `json:"threshold"`
// WarnThreshold is the usage percentage (0.0-1.0) at which warnings are logged.
// Default: 0.70 (70%)
WarnThreshold float64 `json:"warn_threshold"`
// RecheckInterval is how often to recheck resource headroom when blocked.
// Default: 5s
RecheckInterval time.Duration `json:"recheck_interval"`
// MinHeadroom is the minimum number of free process slots required.
// Default: 50
MinHeadroom int `json:"min_headroom"`
// CacheTimeout is how long to cache headroom measurements.
// Default: 2s
CacheTimeout time.Duration `json:"cache_timeout"`
}
HeadroomConfig configures pre-spawn resource headroom guardrails.
func DefaultHeadroomConfig ¶
func DefaultHeadroomConfig() HeadroomConfig
DefaultHeadroomConfig returns sensible default configuration.
type HeadroomGuard ¶
type HeadroomGuard struct {
// contains filtered or unexported fields
}
HeadroomGuard checks system resource headroom before allowing spawns.
func NewHeadroomGuard ¶
func NewHeadroomGuard(cfg HeadroomConfig) *HeadroomGuard
NewHeadroomGuard creates a new headroom guard.
func (*HeadroomGuard) BlockReason ¶
func (h *HeadroomGuard) BlockReason() string
BlockReason returns the current block reason.
func (*HeadroomGuard) CheckHeadroom ¶
func (h *HeadroomGuard) CheckHeadroom() (bool, string)
CheckHeadroom checks if there is sufficient resource headroom for spawning. Returns true if spawning is allowed, false otherwise.
func (*HeadroomGuard) IsBlocked ¶
func (h *HeadroomGuard) IsBlocked() bool
IsBlocked returns whether spawns are currently blocked.
func (*HeadroomGuard) Remediation ¶
func (h *HeadroomGuard) Remediation() string
Remediation returns guidance for resolving resource constraints.
func (*HeadroomGuard) SetCallbacks ¶
func (h *HeadroomGuard) SetCallbacks( onBlocked func(reason string, limits *ResourceLimits, usage *ResourceUsage), onUnblocked func(), onWarning func(reason string, limits *ResourceLimits, usage *ResourceUsage), )
SetCallbacks sets the callbacks for state changes.
func (*HeadroomGuard) Status ¶
func (h *HeadroomGuard) Status() HeadroomStatus
Status returns the current headroom status.
func (*HeadroomGuard) Stop ¶
func (h *HeadroomGuard) Stop()
Stop stops the headroom guard and cleans up resources.
type HeadroomStatus ¶
type HeadroomStatus struct {
// Available is the number of free process slots.
Available int `json:"available"`
// UsagePercent is the current usage as a percentage (0.0-1.0).
UsagePercent float64 `json:"usage_percent"`
// Blocked indicates if spawns are blocked.
Blocked bool `json:"blocked"`
// BlockReason is the reason for blocking (empty if not blocked).
BlockReason string `json:"block_reason,omitempty"`
// Limits contains the detected resource limits.
Limits *ResourceLimits `json:"limits"`
// Usage contains the current resource usage.
Usage *ResourceUsage `json:"usage"`
// LastCheck is when headroom was last checked.
LastCheck time.Time `json:"last_check"`
}
HeadroomStatus represents the current headroom status.
type Hooks ¶
type Hooks struct {
// OnJobEnqueued is called when a job is added to the queue.
OnJobEnqueued func(job *SpawnJob)
// OnJobStarted is called when a job starts executing.
OnJobStarted func(job *SpawnJob)
// OnJobCompleted is called when a job completes successfully.
OnJobCompleted func(job *SpawnJob)
// OnJobFailed is called when a job fails.
OnJobFailed func(job *SpawnJob, err error)
// OnJobRetrying is called when a job is about to retry.
OnJobRetrying func(job *SpawnJob, attempt int)
// OnBackpressure is called when the queue is experiencing backpressure.
OnBackpressure func(queueSize int, waitTime time.Duration)
// OnGuardrailTriggered is called when a guardrail blocks a spawn.
OnGuardrailTriggered func(job *SpawnJob, reason string)
}
Hooks contains callbacks for job lifecycle events.
func CreateProgressHooks ¶
func CreateProgressHooks(broadcaster *ProgressBroadcaster, scheduler *Scheduler) Hooks
CreateProgressHooks creates scheduler hooks that broadcast progress events.
type JobPriority ¶
type JobPriority int
JobPriority determines processing order within the queue.
const ( PriorityUrgent JobPriority = 0 // Respawns, recovery PriorityHigh JobPriority = 1 // User-initiated spawns PriorityNormal JobPriority = 2 // Regular batch spawns PriorityLow JobPriority = 3 // Background/deferred spawns )
type JobProgress ¶
type JobProgress struct {
// ID is the job ID.
ID string `json:"id"`
// Type is the job type.
Type JobType `json:"type"`
// Status is the current status.
Status JobStatus `json:"status"`
// SessionName is the target session.
SessionName string `json:"session_name"`
// AgentType is the agent type if applicable.
AgentType string `json:"agent_type,omitempty"`
// PaneIndex is the pane index if applicable.
PaneIndex int `json:"pane_index,omitempty"`
// Priority is the job priority.
Priority JobPriority `json:"priority"`
// QueuedFor is how long the job has been queued.
QueuedFor time.Duration `json:"queued_for,omitempty"`
// RunningFor is how long the job has been running.
RunningFor time.Duration `json:"running_for,omitempty"`
// Error is any error message.
Error string `json:"error,omitempty"`
// RetryCount is the current retry count.
RetryCount int `json:"retry_count,omitempty"`
// EstimatedETASeconds is estimated time until this job starts.
EstimatedETASeconds float64 `json:"estimated_eta_seconds,omitempty"`
}
JobProgress represents a single job's progress for display.
type JobQueue ¶
type JobQueue struct {
// contains filtered or unexported fields
}
JobQueue is a priority queue for spawn jobs with fairness tracking.
func (*JobQueue) CancelBatch ¶
CancelBatch cancels all jobs in a batch.
func (*JobQueue) CancelSession ¶
CancelSession cancels all jobs for a session.
func (*JobQueue) CountByBatch ¶
CountByBatch returns the number of jobs in a batch.
func (*JobQueue) CountBySession ¶
CountBySession returns the number of jobs for a session.
func (*JobQueue) Dequeue ¶
Dequeue removes and returns the highest priority job. Returns nil if the queue is empty.
func (*JobQueue) ListByBatch ¶
ListByBatch returns jobs for a specific batch.
func (*JobQueue) ListBySession ¶
ListBySession returns jobs for a specific session.
func (*JobQueue) Stats ¶
func (q *JobQueue) Stats() QueueStats
Stats returns a copy of queue statistics.
type JobStatus ¶
type JobStatus string
JobStatus represents the current state of a spawn job.
const ( StatusPending JobStatus = "pending" // Waiting in queue StatusScheduled JobStatus = "scheduled" // Rate limiter approved, waiting for execution slot StatusRunning JobStatus = "running" // Currently executing StatusCompleted JobStatus = "completed" // Successfully finished StatusFailed JobStatus = "failed" // Failed with error StatusCancelled JobStatus = "cancelled" // Cancelled by user/system StatusRetrying JobStatus = "retrying" // Failed but will retry )
type LimiterConfig ¶
type LimiterConfig struct {
// Rate is the number of tokens added per second.
Rate float64 `json:"rate"`
// Capacity is the maximum number of tokens (burst size).
Capacity float64 `json:"capacity"`
// BurstAllowed enables burst mode.
BurstAllowed bool `json:"burst_allowed"`
// MinInterval is the minimum time between operations.
MinInterval time.Duration `json:"min_interval"`
}
LimiterConfig configures the rate limiter.
func DefaultLimiterConfig ¶
func DefaultLimiterConfig() LimiterConfig
DefaultLimiterConfig returns sensible default configuration.
type LimiterStats ¶
type LimiterStats struct {
// TotalRequests is the total number of Wait calls.
TotalRequests int64 `json:"total_requests"`
// AllowedRequests is requests that were allowed immediately.
AllowedRequests int64 `json:"allowed_requests"`
// WaitedRequests is requests that had to wait.
WaitedRequests int64 `json:"waited_requests"`
// DeniedRequests is requests denied due to timeout/cancellation.
DeniedRequests int64 `json:"denied_requests"`
// TotalWaitTime is the cumulative wait time.
TotalWaitTime time.Duration `json:"total_wait_time"`
// MaxWaitTime is the longest wait time recorded.
MaxWaitTime time.Duration `json:"max_wait_time"`
// AvgWaitTime is the average wait time.
AvgWaitTime time.Duration `json:"avg_wait_time"`
// CurrentTokens is the current token count.
CurrentTokens float64 `json:"current_tokens"`
// Waiting is the number of requests currently waiting.
Waiting int `json:"waiting"`
}
LimiterStats contains rate limiter statistics.
type PerAgentLimiter ¶
type PerAgentLimiter struct {
// contains filtered or unexported fields
}
PerAgentLimiter provides per-agent-type rate limiting.
func NewPerAgentLimiter ¶
func NewPerAgentLimiter(cfg AgentLimiterConfig) *PerAgentLimiter
NewPerAgentLimiter creates a new per-agent rate limiter.
func (*PerAgentLimiter) AllStats ¶
func (p *PerAgentLimiter) AllStats() map[string]LimiterStats
AllStats returns statistics for all agent limiters.
func (*PerAgentLimiter) GetLimiter ¶
func (p *PerAgentLimiter) GetLimiter(agentType string) *RateLimiter
GetLimiter returns the rate limiter for an agent type.
type Progress ¶
type Progress struct {
// Timestamp is when this progress snapshot was taken.
Timestamp time.Time `json:"timestamp"`
// Status is the overall scheduler status.
Status string `json:"status"` // running, paused, stopped
// QueuedCount is the number of jobs waiting in queue.
QueuedCount int `json:"queued_count"`
// RunningCount is the number of jobs currently executing.
RunningCount int `json:"running_count"`
// CompletedCount is the number of completed jobs (session lifetime).
CompletedCount int `json:"completed_count"`
// FailedCount is the number of failed jobs.
FailedCount int `json:"failed_count"`
// EstimatedETASeconds is the estimated time until queue is empty.
EstimatedETASeconds float64 `json:"estimated_eta_seconds,omitempty"`
// RateLimitInfo contains current rate limiter state.
RateLimitInfo RateLimitInfo `json:"rate_limit_info"`
// Queued contains details of queued jobs.
Queued []JobProgress `json:"queued,omitempty"`
// Running contains details of running jobs.
Running []JobProgress `json:"running,omitempty"`
// RecentCompleted contains recently completed jobs.
RecentCompleted []JobProgress `json:"recent_completed,omitempty"`
// BySession contains progress grouped by session.
BySession map[string]*SessionProgress `json:"by_session,omitempty"`
}
Progress represents the current spawn progress for TUI/robot output.
type ProgressBroadcaster ¶
type ProgressBroadcaster struct {
// contains filtered or unexported fields
}
ProgressBroadcaster broadcasts progress events to subscribers.
func NewProgressBroadcaster ¶
func NewProgressBroadcaster() *ProgressBroadcaster
NewProgressBroadcaster creates a new broadcaster.
func (*ProgressBroadcaster) Broadcast ¶
func (b *ProgressBroadcaster) Broadcast(event ProgressEvent)
Broadcast sends an event to all subscribers.
func (*ProgressBroadcaster) Subscribe ¶
func (b *ProgressBroadcaster) Subscribe(sub ProgressSubscriber)
Subscribe adds a subscriber.
type ProgressEvent ¶
type ProgressEvent struct {
// Type is the event type.
Type string `json:"type"` // job_enqueued, job_started, job_completed, job_failed, progress_update
// JobID is the job ID if applicable.
JobID string `json:"job_id,omitempty"`
// SessionName is the session name if applicable.
SessionName string `json:"session_name,omitempty"`
// Progress is the current progress snapshot.
Progress *Progress `json:"progress,omitempty"`
// Message is a human-readable message.
Message string `json:"message,omitempty"`
// Timestamp is when this event occurred.
Timestamp time.Time `json:"timestamp"`
}
ProgressEvent is emitted for progress updates.
type ProgressSubscriber ¶
type ProgressSubscriber func(event ProgressEvent)
ProgressSubscriber receives progress events.
type QueueStats ¶
type QueueStats struct {
// TotalEnqueued is the total number of jobs ever enqueued.
TotalEnqueued int64 `json:"total_enqueued"`
// TotalDequeued is the total number of jobs ever dequeued.
TotalDequeued int64 `json:"total_dequeued"`
// CurrentSize is the current queue size.
CurrentSize int `json:"current_size"`
// MaxSize is the maximum queue size ever observed.
MaxSize int `json:"max_size"`
// ByPriority is the count per priority level.
ByPriority map[JobPriority]int `json:"by_priority"`
// ByType is the count per job type.
ByType map[JobType]int `json:"by_type"`
// AvgWaitTime is the average time jobs spend in the queue.
AvgWaitTime time.Duration `json:"avg_wait_time"`
// MaxWaitTime is the maximum time a job spent in the queue.
MaxWaitTime time.Duration `json:"max_wait_time"`
// contains filtered or unexported fields
}
QueueStats contains queue statistics.
type RateLimitInfo ¶
type RateLimitInfo struct {
// AvailableTokens is the current token count.
AvailableTokens float64 `json:"available_tokens"`
// Rate is tokens per second.
Rate float64 `json:"rate"`
// Capacity is the maximum tokens.
Capacity float64 `json:"capacity"`
// WaitingCount is requests waiting for tokens.
WaitingCount int `json:"waiting_count"`
// NextTokenInMs is milliseconds until next token.
NextTokenInMs int64 `json:"next_token_in_ms"`
}
RateLimitInfo contains rate limiter state for display.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a token bucket rate limiter for spawn operations. It controls the rate at which spawn jobs can be executed to prevent resource exhaustion and API rate limits.
func NewRateLimiter ¶
func NewRateLimiter(cfg LimiterConfig) *RateLimiter
NewRateLimiter creates a new rate limiter with the given configuration.
func (*RateLimiter) AvailableTokens ¶
func (r *RateLimiter) AvailableTokens() float64
AvailableTokens returns the current number of available tokens.
func (*RateLimiter) Reset ¶
func (r *RateLimiter) Reset()
Reset resets the limiter to initial state.
func (*RateLimiter) SetCapacity ¶
func (r *RateLimiter) SetCapacity(capacity float64)
SetCapacity updates the maximum bucket capacity.
func (*RateLimiter) SetMinInterval ¶
func (r *RateLimiter) SetMinInterval(interval time.Duration)
SetMinInterval updates the minimum interval between operations.
func (*RateLimiter) SetRate ¶
func (r *RateLimiter) SetRate(rate float64)
SetRate updates the token refill rate.
func (*RateLimiter) Stats ¶
func (r *RateLimiter) Stats() LimiterStats
Stats returns a copy of the current statistics.
func (*RateLimiter) TimeUntilNextToken ¶
func (r *RateLimiter) TimeUntilNextToken() time.Duration
TimeUntilNextToken returns the estimated time until a token is available.
func (*RateLimiter) TryAcquire ¶
func (r *RateLimiter) TryAcquire() bool
TryAcquire tries to acquire a token without blocking. Returns true if a token was acquired, false otherwise.
func (*RateLimiter) Wait ¶
func (r *RateLimiter) Wait(ctx context.Context) error
Wait blocks until a token is available or context is cancelled. Returns nil if a token was acquired, or an error if cancelled/timed out.
func (*RateLimiter) Waiting ¶
func (r *RateLimiter) Waiting() int
Waiting returns the number of requests currently waiting.
type ResourceError ¶
type ResourceError struct {
Original error
Type ResourceErrorType
ExitCode int
StderrHint string
Retryable bool
Timestamp time.Time
}
ResourceError wraps an error with resource exhaustion classification.
func ClassifyError ¶
func ClassifyError(err error, exitCode int, stderr string) *ResourceError
ClassifyError examines an error and classifies it as a resource exhaustion error.
func (*ResourceError) Error ¶
func (e *ResourceError) Error() string
func (*ResourceError) Unwrap ¶
func (e *ResourceError) Unwrap() error
type ResourceErrorType ¶
type ResourceErrorType string
ResourceErrorType classifies the type of resource exhaustion error.
const ( ResourceErrorNone ResourceErrorType = "" ResourceErrorEAGAIN ResourceErrorType = "EAGAIN" ResourceErrorENOMEM ResourceErrorType = "ENOMEM" ResourceErrorENFILE ResourceErrorType = "ENFILE" ResourceErrorEMFILE ResourceErrorType = "EMFILE" ResourceErrorRateLimit ResourceErrorType = "RATE_LIMIT" )
type ResourceLimits ¶
type ResourceLimits struct {
// UlimitNproc is the user's process limit from ulimit -u.
UlimitNproc int `json:"ulimit_nproc"`
// CgroupPidsMax is the cgroup v2 pids.max limit (0 if not available).
CgroupPidsMax int `json:"cgroup_pids_max"`
// SystemdTasksMax is the systemd TasksMax limit (0 if not available).
SystemdTasksMax int `json:"systemd_tasks_max"`
// KernelPidMax is the kernel's pid_max from /proc/sys/kernel/pid_max.
KernelPidMax int `json:"kernel_pid_max"`
// EffectiveLimit is min(all non-zero limits).
EffectiveLimit int `json:"effective_limit"`
// Source indicates which limit is the effective one.
Source string `json:"source"`
}
ResourceLimits holds the effective resource limits from various sources.
type ResourceUsage ¶
type ResourceUsage struct {
// CgroupPidsCurrent is the cgroup v2 pids.current (0 if not available).
CgroupPidsCurrent int `json:"cgroup_pids_current"`
// ProcessCount is the number of processes owned by the current user.
ProcessCount int `json:"process_count"`
// EffectiveUsage is the best estimate of current usage.
EffectiveUsage int `json:"effective_usage"`
// Source indicates which usage metric is being used.
Source string `json:"source"`
}
ResourceUsage holds the current resource usage.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is the global spawn scheduler that serializes and paces all pane and agent creation operations.
func Global ¶
func Global() *Scheduler
Global returns the global scheduler instance, creating it if necessary.
func (*Scheduler) CancelBatch ¶
CancelBatch cancels all jobs in a batch.
func (*Scheduler) CancelSession ¶
CancelSession cancels all jobs for a session.
func (*Scheduler) EstimateETA ¶
EstimateETA estimates when a queued job will start.
func (*Scheduler) GetProgress ¶
GetProgress returns the current spawn progress.
func (*Scheduler) GetQueuedJobs ¶
GetQueuedJobs returns all queued jobs.
func (*Scheduler) GetRecentCompleted ¶
GetRecentCompleted returns recently completed jobs.
func (*Scheduler) GetRunningJobs ¶
GetRunningJobs returns all currently running jobs.
func (*Scheduler) SetExecutor ¶
func (s *Scheduler) SetExecutor(executor SpawnExecutor)
SetExecutor sets the function that executes spawn jobs.
type SessionProgress ¶
type SessionProgress struct {
// SessionName is the session name.
SessionName string `json:"session_name"`
// QueuedCount is queued jobs for this session.
QueuedCount int `json:"queued_count"`
// RunningCount is running jobs for this session.
RunningCount int `json:"running_count"`
// CompletedCount is completed jobs for this session.
CompletedCount int `json:"completed_count"`
// FailedCount is failed jobs for this session.
FailedCount int `json:"failed_count"`
// TotalPanes is the total panes being spawned.
TotalPanes int `json:"total_panes"`
// PanesReady is panes that are ready.
PanesReady int `json:"panes_ready"`
// ProgressPercent is the completion percentage.
ProgressPercent float64 `json:"progress_percent"`
}
SessionProgress groups progress by session.
type SpawnExecutor ¶
SpawnExecutor is a function that executes a spawn job.
type SpawnJob ¶
type SpawnJob struct {
// ID is a unique identifier for this job.
ID string `json:"id"`
// Type is the kind of spawn operation.
Type JobType `json:"type"`
// Priority determines processing order.
Priority JobPriority `json:"priority"`
// SessionName is the target session.
SessionName string `json:"session_name"`
// AgentType is the type of agent (cc, cod, gmi) if applicable.
AgentType string `json:"agent_type,omitempty"`
// PaneIndex is the target pane index if applicable.
PaneIndex int `json:"pane_index,omitempty"`
// Directory is the working directory for the spawn.
Directory string `json:"directory,omitempty"`
// Status is the current job status.
Status JobStatus `json:"status"`
// CreatedAt is when the job was created.
CreatedAt time.Time `json:"created_at"`
// ScheduledAt is when the job was approved by rate limiter.
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
// StartedAt is when execution began.
StartedAt time.Time `json:"started_at,omitempty"`
// CompletedAt is when the job finished (success or failure).
CompletedAt time.Time `json:"completed_at,omitempty"`
// Error contains any error message if failed.
Error string `json:"error,omitempty"`
// RetryCount is the number of retry attempts.
RetryCount int `json:"retry_count"`
// MaxRetries is the maximum number of retries allowed.
MaxRetries int `json:"max_retries"`
// RetryDelay is the delay before next retry.
RetryDelay time.Duration `json:"retry_delay,omitempty"`
// Metadata contains additional context for the job.
Metadata map[string]interface{} `json:"metadata,omitempty"`
// Result contains the result of the spawn operation.
Result *SpawnResult `json:"result,omitempty"`
// BatchID groups related jobs for fairness tracking.
BatchID string `json:"batch_id,omitempty"`
// ParentJobID is the ID of the parent job if this is a sub-job.
ParentJobID string `json:"parent_job_id,omitempty"`
// Callback is called when the job completes (success or failure).
Callback func(*SpawnJob) `json:"-"`
// contains filtered or unexported fields
}
SpawnJob represents a single spawn operation in the queue.
func NewSpawnJob ¶
NewSpawnJob creates a new spawn job with sensible defaults.
func (*SpawnJob) ExecutionDuration ¶
ExecutionDuration returns how long the job took to execute.
func (*SpawnJob) IncrementRetry ¶
func (j *SpawnJob) IncrementRetry()
IncrementRetry increments the retry count and updates status.
func (*SpawnJob) IsCancelled ¶
IsCancelled returns true if the job was cancelled.
func (*SpawnJob) IsTerminal ¶
IsTerminal returns true if the job is in a terminal state.
func (*SpawnJob) QueueDuration ¶
QueueDuration returns how long the job has been in the queue.
func (*SpawnJob) TotalDuration ¶
TotalDuration returns the total time from creation to completion.
type SpawnResult ¶
type SpawnResult struct {
// SessionName is the created/target session.
SessionName string `json:"session_name"`
// PaneID is the created pane ID.
PaneID string `json:"pane_id,omitempty"`
// PaneIndex is the index of the pane.
PaneIndex int `json:"pane_index,omitempty"`
// AgentType is the launched agent type.
AgentType string `json:"agent_type,omitempty"`
// Title is the pane title.
Title string `json:"title,omitempty"`
// Duration is how long the spawn took.
Duration time.Duration `json:"duration"`
}
SpawnResult contains the result of a successful spawn operation.
type Stats ¶
type Stats struct {
// TotalSubmitted is jobs submitted to scheduler.
TotalSubmitted int64 `json:"total_submitted"`
// TotalCompleted is jobs that completed successfully.
TotalCompleted int64 `json:"total_completed"`
// TotalFailed is jobs that failed after all retries.
TotalFailed int64 `json:"total_failed"`
// TotalRetried is the total number of retry attempts.
TotalRetried int64 `json:"total_retried"`
// CurrentQueueSize is the current queue size.
CurrentQueueSize int `json:"current_queue_size"`
// CurrentRunning is the number of currently running jobs.
CurrentRunning int `json:"current_running"`
// AvgQueueTime is the average time jobs spend in queue.
AvgQueueTime time.Duration `json:"avg_queue_time"`
// AvgExecutionTime is the average job execution time.
AvgExecutionTime time.Duration `json:"avg_execution_time"`
// IsPaused indicates if the scheduler is paused.
IsPaused bool `json:"is_paused"`
// StartedAt is when the scheduler started.
StartedAt time.Time `json:"started_at"`
// Uptime is how long the scheduler has been running.
Uptime time.Duration `json:"uptime"`
// LimiterStats contains rate limiter statistics.
LimiterStats LimiterStats `json:"limiter_stats"`
// QueueStats contains queue statistics.
QueueStats QueueStats `json:"queue_stats"`
// BackoffStats contains backoff statistics.
BackoffStats BackoffStats `json:"backoff_stats"`
// CapsStats contains per-agent concurrency cap statistics.
CapsStats CapsStats `json:"caps_stats"`
// InGlobalBackoff indicates if global backoff is active.
InGlobalBackoff bool `json:"in_global_backoff"`
// RemainingBackoff is the remaining backoff duration if in global backoff.
RemainingBackoff time.Duration `json:"remaining_backoff,omitempty"`
// HeadroomStatus contains resource headroom status.
HeadroomStatus HeadroomStatus `json:"headroom_status"`
}
Stats contains scheduler statistics.