scheduler

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package scheduler provides a global spawn scheduler with paced pane/agent creation. It serializes and paces all pane + agent creation to prevent resource exhaustion and rate limit errors.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculateJitteredDelay

func CalculateJitteredDelay(base time.Duration, jitterFactor float64) time.Duration

CalculateJitteredDelay calculates a delay with jitter for a given base. Useful for external callers that need jittered timing.

func ExponentialBackoff

func ExponentialBackoff(attempt int, initial, max time.Duration, multiplier float64) time.Duration

ExponentialBackoff returns the delay for the nth retry attempt.

func FormatETA

func FormatETA(d time.Duration) string

FormatETA formats duration as human-readable ETA.

func SetGlobal

func SetGlobal(s *Scheduler)

SetGlobal sets the global scheduler instance.

Types

type AgentCapConfig

type AgentCapConfig struct {
	// MaxConcurrent is the maximum number of concurrent instances.
	MaxConcurrent int `json:"max_concurrent"`

	// RampUpEnabled enables gradual capacity increase over time.
	RampUpEnabled bool `json:"ramp_up_enabled"`

	// RampUpInitial is the starting cap when ramp-up is enabled.
	RampUpInitial int `json:"ramp_up_initial"`

	// RampUpStep is how much to increase the cap each interval.
	RampUpStep int `json:"ramp_up_step"`

	// RampUpInterval is how often to increase the cap.
	RampUpInterval time.Duration `json:"ramp_up_interval"`

	// CooldownOnFailure reduces the cap on failure.
	CooldownOnFailure bool `json:"cooldown_on_failure"`

	// CooldownReduction is how much to reduce on failure.
	CooldownReduction int `json:"cooldown_reduction"`

	// CooldownRecovery is how long before restoring cap after cooldown.
	CooldownRecovery time.Duration `json:"cooldown_recovery"`
}

AgentCapConfig configures concurrency caps for a specific agent type.

func CodexCapConfig

func CodexCapConfig() AgentCapConfig

CodexCapConfig returns Codex-specific conservative defaults.

func DefaultAgentCapConfig

func DefaultAgentCapConfig() AgentCapConfig

DefaultAgentCapConfig returns sensible defaults for agent caps.

type AgentCapStats

type AgentCapStats struct {
	Running    int  `json:"running"`     // Currently running instances
	CurrentCap int  `json:"current_cap"` // Current effective cap
	MaxCap     int  `json:"max_cap"`     // Configured max cap
	Waiting    int  `json:"waiting"`     // Waiting for capacity
	InRampUp   bool `json:"in_ramp_up"`  // Currently ramping up
	InCooldown bool `json:"in_cooldown"` // Currently in cooldown
}

AgentCapStats contains statistics for one agent type.

type AgentCaps

type AgentCaps struct {
	// contains filtered or unexported fields
}

AgentCaps manages per-agent concurrency caps with ramp-up and cooldown.

func NewAgentCaps

func NewAgentCaps(cfg AgentCapsConfig) *AgentCaps

NewAgentCaps creates a new agent caps manager.

func (*AgentCaps) Acquire

func (ac *AgentCaps) Acquire(ctx context.Context, agentType string) error

Acquire blocks until a slot is available or context is cancelled.

func (*AgentCaps) ForceRampUp

func (ac *AgentCaps) ForceRampUp(agentType string)

ForceRampUp immediately increases cap to max for an agent type.

func (*AgentCaps) GetAvailable

func (ac *AgentCaps) GetAvailable(agentType string) int

GetAvailable returns available slots for an agent type.

func (*AgentCaps) GetCurrentCap

func (ac *AgentCaps) GetCurrentCap(agentType string) int

GetCurrentCap returns the current effective cap for an agent type.

func (*AgentCaps) GetRunning

func (ac *AgentCaps) GetRunning(agentType string) int

GetRunning returns the current running count for an agent type.

func (*AgentCaps) RecordFailure

func (ac *AgentCaps) RecordFailure(agentType string)

RecordFailure records a failure for an agent type, potentially triggering cooldown.

func (*AgentCaps) RecordSuccess

func (ac *AgentCaps) RecordSuccess(agentType string)

RecordSuccess records a successful spawn for an agent type.

func (*AgentCaps) Release

func (ac *AgentCaps) Release(agentType string)

Release releases a slot for an agent type.

func (*AgentCaps) Reset

func (ac *AgentCaps) Reset()

Reset resets all cap states to initial values.

func (*AgentCaps) SetCap

func (ac *AgentCaps) SetCap(agentType string, cap int)

SetCap dynamically updates the cap for an agent type.

func (*AgentCaps) Stats

func (ac *AgentCaps) Stats() CapsStats

Stats returns cap statistics.

func (*AgentCaps) TryAcquire

func (ac *AgentCaps) TryAcquire(agentType string) bool

TryAcquire tries to acquire a slot without blocking. Returns true if acquired, false if at capacity.

type AgentCapsConfig

type AgentCapsConfig struct {
	// Default is the default cap config for unknown agent types.
	Default AgentCapConfig `json:"default"`

	// PerAgent contains per-agent-type overrides.
	PerAgent map[string]AgentCapConfig `json:"per_agent,omitempty"`

	// GlobalMax is the absolute maximum across all agents (0 = no limit).
	GlobalMax int `json:"global_max"`
}

AgentCapsConfig contains configuration for all agent type caps.

func DefaultAgentCapsConfig

func DefaultAgentCapsConfig() AgentCapsConfig

DefaultAgentCapsConfig returns sensible defaults for agent caps.

type AgentLimiterConfig

type AgentLimiterConfig struct {
	// Default is the default limiter config for unknown agent types.
	Default LimiterConfig `json:"default"`

	// PerAgent contains per-agent-type overrides.
	PerAgent map[string]LimiterConfig `json:"per_agent,omitempty"`
}

AgentLimiterConfig contains configuration for per-agent limiting.

func DefaultAgentLimiterConfig

func DefaultAgentLimiterConfig() AgentLimiterConfig

DefaultAgentLimiterConfig returns sensible defaults for agent rate limiting.

type BackoffConfig

type BackoffConfig struct {
	// InitialDelay is the starting delay for backoff.
	InitialDelay time.Duration `json:"initial_delay"`

	// MaxDelay is the maximum backoff delay.
	MaxDelay time.Duration `json:"max_delay"`

	// Multiplier is the factor by which delay increases.
	Multiplier float64 `json:"multiplier"`

	// JitterFactor is the random jitter as a fraction of delay (0.0-1.0).
	JitterFactor float64 `json:"jitter_factor"`

	// MaxRetries is the maximum number of retry attempts.
	MaxRetries int `json:"max_retries"`

	// PauseQueueOnBackoff pauses the scheduler queue during backoff.
	PauseQueueOnBackoff bool `json:"pause_queue_on_backoff"`

	// ConsecutiveFailuresThreshold triggers global backoff after N failures.
	ConsecutiveFailuresThreshold int `json:"consecutive_failures_threshold"`
}

BackoffConfig configures the exponential backoff behavior.

func DefaultBackoffConfig

func DefaultBackoffConfig() BackoffConfig

DefaultBackoffConfig returns sensible default configuration.

type BackoffController

type BackoffController struct {
	// contains filtered or unexported fields
}

BackoffController manages exponential backoff with jitter for resource errors.

func NewBackoffController

func NewBackoffController(cfg BackoffConfig) *BackoffController

NewBackoffController creates a new backoff controller.

func (*BackoffController) HandleError

func (bc *BackoffController) HandleError(job *SpawnJob, resErr *ResourceError) (bool, time.Duration)

HandleError processes a resource error and returns the backoff action. Returns (shouldRetry, delay) where delay is 0 if no backoff needed.

func (*BackoffController) IsInGlobalBackoff

func (bc *BackoffController) IsInGlobalBackoff() bool

IsInGlobalBackoff returns true if global backoff is active.

func (*BackoffController) RecordSuccess

func (bc *BackoffController) RecordSuccess()

RecordSuccess is the public method to record success.

func (*BackoffController) RemainingBackoff

func (bc *BackoffController) RemainingBackoff() time.Duration

RemainingBackoff returns remaining time in global backoff.

func (*BackoffController) Reset

func (bc *BackoffController) Reset()

Reset resets the backoff controller to initial state.

func (*BackoffController) SetHooks

func (bc *BackoffController) SetHooks(
	onStart func(time.Duration, ResourceErrorType),
	onEnd func(time.Duration),
	onExhausted func(*SpawnJob, int),
)

SetHooks sets event callbacks.

func (*BackoffController) SetScheduler

func (bc *BackoffController) SetScheduler(s *Scheduler)

SetScheduler sets the scheduler reference for queue pause/resume.

func (*BackoffController) Stats

func (bc *BackoffController) Stats() BackoffStats

Stats returns backoff statistics.

type BackoffStats

type BackoffStats struct {
	TotalBackoffs     int64             `json:"total_backoffs"`
	TotalRetries      int64             `json:"total_retries"`
	TotalExhausted    int64             `json:"total_exhausted"`
	MaxConsecutive    int               `json:"max_consecutive"`
	TotalBackoffTime  time.Duration     `json:"total_backoff_time"`
	LastBackoffReason ResourceErrorType `json:"last_backoff_reason,omitempty"`
	LastBackoffAt     time.Time         `json:"last_backoff_at,omitempty"`
}

BackoffStats contains backoff statistics.

type CapsStats

type CapsStats struct {
	// PerAgent contains per-agent statistics.
	PerAgent map[string]AgentCapStats `json:"per_agent"`

	// TotalRunning is the total running across all agents.
	TotalRunning int `json:"total_running"`

	// TotalWaiting is the total waiting across all agents.
	TotalWaiting int `json:"total_waiting"`
}

CapsStats contains cap statistics.

type Config

type Config struct {
	// MaxConcurrent is the maximum number of concurrent spawn operations.
	MaxConcurrent int `json:"max_concurrent"`

	// GlobalRateLimit is the global rate limiter configuration.
	GlobalRateLimit LimiterConfig `json:"global_rate_limit"`

	// AgentRateLimits is the per-agent rate limiter configuration.
	AgentRateLimits AgentLimiterConfig `json:"agent_rate_limits"`

	// AgentCaps is the per-agent concurrency caps configuration.
	AgentCaps AgentCapsConfig `json:"agent_caps"`

	// FairScheduler is the fair scheduler configuration.
	FairScheduler FairSchedulerConfig `json:"fair_scheduler"`

	// Backoff is the backoff configuration for resource errors.
	Backoff BackoffConfig `json:"backoff"`

	// MaxCompleted is the number of completed jobs to retain for status.
	MaxCompleted int `json:"max_completed"`

	// DefaultRetries is the default number of retries for failed jobs.
	DefaultRetries int `json:"default_retries"`

	// DefaultRetryDelay is the default delay between retries.
	DefaultRetryDelay time.Duration `json:"default_retry_delay"`

	// BackpressureThreshold is the queue size that triggers backpressure alerts.
	BackpressureThreshold int `json:"backpressure_threshold"`

	// Headroom is the pre-spawn resource headroom configuration.
	Headroom HeadroomConfig `json:"headroom"`
}

Config configures the scheduler.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible default configuration.

type FairScheduler

type FairScheduler struct {
	// contains filtered or unexported fields
}

FairScheduler wraps JobQueue with fairness guarantees.

func NewFairScheduler

func NewFairScheduler(cfg FairSchedulerConfig) *FairScheduler

NewFairScheduler creates a new fair scheduler.

func (*FairScheduler) Enqueue

func (f *FairScheduler) Enqueue(job *SpawnJob)

Enqueue adds a job to the queue.

func (*FairScheduler) MarkComplete

func (f *FairScheduler) MarkComplete(job *SpawnJob)

MarkComplete marks a job as complete for fairness tracking.

func (*FairScheduler) Queue

func (f *FairScheduler) Queue() *JobQueue

Queue returns the underlying queue for direct access.

func (*FairScheduler) RunningCount

func (f *FairScheduler) RunningCount(sessionName string) int

RunningCount returns the number of running jobs for a session.

func (*FairScheduler) TryDequeue

func (f *FairScheduler) TryDequeue() *SpawnJob

TryDequeue returns the next job that can run without violating fairness. Returns nil if no eligible job is available.

type FairSchedulerConfig

type FairSchedulerConfig struct {
	MaxPerSession int `json:"max_per_session"`
	MaxPerBatch   int `json:"max_per_batch"`
}

FairSchedulerConfig configures the fair scheduler.

func DefaultFairSchedulerConfig

func DefaultFairSchedulerConfig() FairSchedulerConfig

DefaultFairSchedulerConfig returns sensible defaults.

type HeadroomConfig

type HeadroomConfig struct {
	// Enabled toggles headroom checking.
	Enabled bool `json:"enabled"`

	// Threshold is the usage percentage (0.0-1.0) above which spawns are blocked.
	// Default: 0.75 (75%)
	Threshold float64 `json:"threshold"`

	// WarnThreshold is the usage percentage (0.0-1.0) at which warnings are logged.
	// Default: 0.70 (70%)
	WarnThreshold float64 `json:"warn_threshold"`

	// RecheckInterval is how often to recheck resource headroom when blocked.
	// Default: 5s
	RecheckInterval time.Duration `json:"recheck_interval"`

	// MinHeadroom is the minimum number of free process slots required.
	// Default: 50
	MinHeadroom int `json:"min_headroom"`

	// CacheTimeout is how long to cache headroom measurements.
	// Default: 2s
	CacheTimeout time.Duration `json:"cache_timeout"`
}

HeadroomConfig configures pre-spawn resource headroom guardrails.

func DefaultHeadroomConfig

func DefaultHeadroomConfig() HeadroomConfig

DefaultHeadroomConfig returns sensible default configuration.

type HeadroomGuard

type HeadroomGuard struct {
	// contains filtered or unexported fields
}

HeadroomGuard checks system resource headroom before allowing spawns.

func NewHeadroomGuard

func NewHeadroomGuard(cfg HeadroomConfig) *HeadroomGuard

NewHeadroomGuard creates a new headroom guard.

func (*HeadroomGuard) BlockReason

func (h *HeadroomGuard) BlockReason() string

BlockReason returns the current block reason.

func (*HeadroomGuard) CheckHeadroom

func (h *HeadroomGuard) CheckHeadroom() (bool, string)

CheckHeadroom checks if there is sufficient resource headroom for spawning. Returns true if spawning is allowed, false otherwise.

func (*HeadroomGuard) IsBlocked

func (h *HeadroomGuard) IsBlocked() bool

IsBlocked returns whether spawns are currently blocked.

func (*HeadroomGuard) Remediation

func (h *HeadroomGuard) Remediation() string

Remediation returns guidance for resolving resource constraints.

func (*HeadroomGuard) SetCallbacks

func (h *HeadroomGuard) SetCallbacks(
	onBlocked func(reason string, limits *ResourceLimits, usage *ResourceUsage),
	onUnblocked func(),
	onWarning func(reason string, limits *ResourceLimits, usage *ResourceUsage),
)

SetCallbacks sets the callbacks for state changes.

func (*HeadroomGuard) Status

func (h *HeadroomGuard) Status() HeadroomStatus

Status returns the current headroom status.

func (*HeadroomGuard) Stop

func (h *HeadroomGuard) Stop()

Stop stops the headroom guard and cleans up resources.

type HeadroomStatus

type HeadroomStatus struct {
	// Available is the number of free process slots.
	Available int `json:"available"`

	// UsagePercent is the current usage as a percentage (0.0-1.0).
	UsagePercent float64 `json:"usage_percent"`

	// Blocked indicates if spawns are blocked.
	Blocked bool `json:"blocked"`

	// BlockReason is the reason for blocking (empty if not blocked).
	BlockReason string `json:"block_reason,omitempty"`

	// Limits contains the detected resource limits.
	Limits *ResourceLimits `json:"limits"`

	// Usage contains the current resource usage.
	Usage *ResourceUsage `json:"usage"`

	// LastCheck is when headroom was last checked.
	LastCheck time.Time `json:"last_check"`
}

HeadroomStatus represents the current headroom status.

type Hooks

type Hooks struct {
	// OnJobEnqueued is called when a job is added to the queue.
	OnJobEnqueued func(job *SpawnJob)

	// OnJobStarted is called when a job starts executing.
	OnJobStarted func(job *SpawnJob)

	// OnJobCompleted is called when a job completes successfully.
	OnJobCompleted func(job *SpawnJob)

	// OnJobFailed is called when a job fails.
	OnJobFailed func(job *SpawnJob, err error)

	// OnJobRetrying is called when a job is about to retry.
	OnJobRetrying func(job *SpawnJob, attempt int)

	// OnBackpressure is called when the queue is experiencing backpressure.
	OnBackpressure func(queueSize int, waitTime time.Duration)

	// OnGuardrailTriggered is called when a guardrail blocks a spawn.
	OnGuardrailTriggered func(job *SpawnJob, reason string)
}

Hooks contains callbacks for job lifecycle events.

func CreateProgressHooks

func CreateProgressHooks(broadcaster *ProgressBroadcaster, scheduler *Scheduler) Hooks

CreateProgressHooks creates scheduler hooks that broadcast progress events.

type JobPriority

type JobPriority int

JobPriority determines processing order within the queue.

const (
	PriorityUrgent JobPriority = 0 // Respawns, recovery
	PriorityHigh   JobPriority = 1 // User-initiated spawns
	PriorityNormal JobPriority = 2 // Regular batch spawns
	PriorityLow    JobPriority = 3 // Background/deferred spawns
)

type JobProgress

type JobProgress struct {
	// ID is the job ID.
	ID string `json:"id"`

	// Type is the job type.
	Type JobType `json:"type"`

	// Status is the current status.
	Status JobStatus `json:"status"`

	// SessionName is the target session.
	SessionName string `json:"session_name"`

	// AgentType is the agent type if applicable.
	AgentType string `json:"agent_type,omitempty"`

	// PaneIndex is the pane index if applicable.
	PaneIndex int `json:"pane_index,omitempty"`

	// Priority is the job priority.
	Priority JobPriority `json:"priority"`

	// QueuedFor is how long the job has been queued.
	QueuedFor time.Duration `json:"queued_for,omitempty"`

	// RunningFor is how long the job has been running.
	RunningFor time.Duration `json:"running_for,omitempty"`

	// Error is any error message.
	Error string `json:"error,omitempty"`

	// RetryCount is the current retry count.
	RetryCount int `json:"retry_count,omitempty"`

	// EstimatedETASeconds is estimated time until this job starts.
	EstimatedETASeconds float64 `json:"estimated_eta_seconds,omitempty"`
}

JobProgress represents a single job's progress for display.

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

JobQueue is a priority queue for spawn jobs with fairness tracking.

func NewJobQueue

func NewJobQueue() *JobQueue

NewJobQueue creates a new job queue.

func (*JobQueue) CancelBatch

func (q *JobQueue) CancelBatch(batchID string) []*SpawnJob

CancelBatch cancels all jobs in a batch.

func (*JobQueue) CancelSession

func (q *JobQueue) CancelSession(sessionName string) []*SpawnJob

CancelSession cancels all jobs for a session.

func (*JobQueue) Clear

func (q *JobQueue) Clear() []*SpawnJob

Clear removes all jobs from the queue.

func (*JobQueue) CountByBatch

func (q *JobQueue) CountByBatch(batchID string) int

CountByBatch returns the number of jobs in a batch.

func (*JobQueue) CountBySession

func (q *JobQueue) CountBySession(sessionName string) int

CountBySession returns the number of jobs for a session.

func (*JobQueue) Dequeue

func (q *JobQueue) Dequeue() *SpawnJob

Dequeue removes and returns the highest priority job. Returns nil if the queue is empty.

func (*JobQueue) Enqueue

func (q *JobQueue) Enqueue(job *SpawnJob)

Enqueue adds a job to the queue.

func (*JobQueue) Get

func (q *JobQueue) Get(id string) *SpawnJob

Get returns a job by ID without removing it.

func (*JobQueue) IsEmpty

func (q *JobQueue) IsEmpty() bool

IsEmpty returns true if the queue is empty.

func (*JobQueue) Len

func (q *JobQueue) Len() int

Len returns the number of jobs in the queue.

func (*JobQueue) ListAll

func (q *JobQueue) ListAll() []*SpawnJob

ListAll returns all jobs in priority order.

func (*JobQueue) ListByBatch

func (q *JobQueue) ListByBatch(batchID string) []*SpawnJob

ListByBatch returns jobs for a specific batch.

func (*JobQueue) ListBySession

func (q *JobQueue) ListBySession(sessionName string) []*SpawnJob

ListBySession returns jobs for a specific session.

func (*JobQueue) Peek

func (q *JobQueue) Peek() *SpawnJob

Peek returns the highest priority job without removing it.

func (*JobQueue) Remove

func (q *JobQueue) Remove(id string) *SpawnJob

Remove removes a job by ID.

func (*JobQueue) Stats

func (q *JobQueue) Stats() QueueStats

Stats returns a copy of queue statistics.

type JobStatus

type JobStatus string

JobStatus represents the current state of a spawn job.

const (
	StatusPending   JobStatus = "pending"   // Waiting in queue
	StatusScheduled JobStatus = "scheduled" // Rate limiter approved, waiting for execution slot
	StatusRunning   JobStatus = "running"   // Currently executing
	StatusCompleted JobStatus = "completed" // Successfully finished
	StatusFailed    JobStatus = "failed"    // Failed with error
	StatusCancelled JobStatus = "cancelled" // Cancelled by user/system
	StatusRetrying  JobStatus = "retrying"  // Failed but will retry
)

type JobType

type JobType string

JobType represents the type of spawn operation.

const (
	JobTypeSession     JobType = "session"      // Create a new tmux session
	JobTypePaneSplit   JobType = "pane_split"   // Split an existing pane
	JobTypeAgentLaunch JobType = "agent_launch" // Launch an agent in a pane
)

type LimiterConfig

type LimiterConfig struct {
	// Rate is the number of tokens added per second.
	Rate float64 `json:"rate"`

	// Capacity is the maximum number of tokens (burst size).
	Capacity float64 `json:"capacity"`

	// BurstAllowed enables burst mode.
	BurstAllowed bool `json:"burst_allowed"`

	// MinInterval is the minimum time between operations.
	MinInterval time.Duration `json:"min_interval"`
}

LimiterConfig configures the rate limiter.

func DefaultLimiterConfig

func DefaultLimiterConfig() LimiterConfig

DefaultLimiterConfig returns sensible default configuration.

type LimiterStats

type LimiterStats struct {
	// TotalRequests is the total number of Wait calls.
	TotalRequests int64 `json:"total_requests"`

	// AllowedRequests is requests that were allowed immediately.
	AllowedRequests int64 `json:"allowed_requests"`

	// WaitedRequests is requests that had to wait.
	WaitedRequests int64 `json:"waited_requests"`

	// DeniedRequests is requests denied due to timeout/cancellation.
	DeniedRequests int64 `json:"denied_requests"`

	// TotalWaitTime is the cumulative wait time.
	TotalWaitTime time.Duration `json:"total_wait_time"`

	// MaxWaitTime is the longest wait time recorded.
	MaxWaitTime time.Duration `json:"max_wait_time"`

	// AvgWaitTime is the average wait time.
	AvgWaitTime time.Duration `json:"avg_wait_time"`

	// CurrentTokens is the current token count.
	CurrentTokens float64 `json:"current_tokens"`

	// Waiting is the number of requests currently waiting.
	Waiting int `json:"waiting"`
}

LimiterStats contains rate limiter statistics.

type PerAgentLimiter

type PerAgentLimiter struct {
	// contains filtered or unexported fields
}

PerAgentLimiter provides per-agent-type rate limiting.

func NewPerAgentLimiter

func NewPerAgentLimiter(cfg AgentLimiterConfig) *PerAgentLimiter

NewPerAgentLimiter creates a new per-agent rate limiter.

func (*PerAgentLimiter) AllStats

func (p *PerAgentLimiter) AllStats() map[string]LimiterStats

AllStats returns statistics for all agent limiters.

func (*PerAgentLimiter) GetLimiter

func (p *PerAgentLimiter) GetLimiter(agentType string) *RateLimiter

GetLimiter returns the rate limiter for an agent type.

func (*PerAgentLimiter) Wait

func (p *PerAgentLimiter) Wait(ctx context.Context, agentType string) error

Wait waits for a token from the agent-specific limiter.

type Progress

type Progress struct {
	// Timestamp is when this progress snapshot was taken.
	Timestamp time.Time `json:"timestamp"`

	// Status is the overall scheduler status.
	Status string `json:"status"` // running, paused, stopped

	// QueuedCount is the number of jobs waiting in queue.
	QueuedCount int `json:"queued_count"`

	// RunningCount is the number of jobs currently executing.
	RunningCount int `json:"running_count"`

	// CompletedCount is the number of completed jobs (session lifetime).
	CompletedCount int `json:"completed_count"`

	// FailedCount is the number of failed jobs.
	FailedCount int `json:"failed_count"`

	// EstimatedETASeconds is the estimated time until queue is empty.
	EstimatedETASeconds float64 `json:"estimated_eta_seconds,omitempty"`

	// RateLimitInfo contains current rate limiter state.
	RateLimitInfo RateLimitInfo `json:"rate_limit_info"`

	// Queued contains details of queued jobs.
	Queued []JobProgress `json:"queued,omitempty"`

	// Running contains details of running jobs.
	Running []JobProgress `json:"running,omitempty"`

	// RecentCompleted contains recently completed jobs.
	RecentCompleted []JobProgress `json:"recent_completed,omitempty"`

	// BySession contains progress grouped by session.
	BySession map[string]*SessionProgress `json:"by_session,omitempty"`
}

Progress represents the current spawn progress for TUI/robot output.

func (*Progress) JSON

func (p *Progress) JSON() ([]byte, error)

JSON returns the progress as JSON.

type ProgressBroadcaster

type ProgressBroadcaster struct {
	// contains filtered or unexported fields
}

ProgressBroadcaster broadcasts progress events to subscribers.

func NewProgressBroadcaster

func NewProgressBroadcaster() *ProgressBroadcaster

NewProgressBroadcaster creates a new broadcaster.

func (*ProgressBroadcaster) Broadcast

func (b *ProgressBroadcaster) Broadcast(event ProgressEvent)

Broadcast sends an event to all subscribers.

func (*ProgressBroadcaster) Subscribe

func (b *ProgressBroadcaster) Subscribe(sub ProgressSubscriber)

Subscribe adds a subscriber.

type ProgressEvent

type ProgressEvent struct {
	// Type is the event type.
	Type string `json:"type"` // job_enqueued, job_started, job_completed, job_failed, progress_update

	// JobID is the job ID if applicable.
	JobID string `json:"job_id,omitempty"`

	// SessionName is the session name if applicable.
	SessionName string `json:"session_name,omitempty"`

	// Progress is the current progress snapshot.
	Progress *Progress `json:"progress,omitempty"`

	// Message is a human-readable message.
	Message string `json:"message,omitempty"`

	// Timestamp is when this event occurred.
	Timestamp time.Time `json:"timestamp"`
}

ProgressEvent is emitted for progress updates.

type ProgressSubscriber

type ProgressSubscriber func(event ProgressEvent)

ProgressSubscriber receives progress events.

type QueueStats

type QueueStats struct {
	// TotalEnqueued is the total number of jobs ever enqueued.
	TotalEnqueued int64 `json:"total_enqueued"`

	// TotalDequeued is the total number of jobs ever dequeued.
	TotalDequeued int64 `json:"total_dequeued"`

	// CurrentSize is the current queue size.
	CurrentSize int `json:"current_size"`

	// MaxSize is the maximum queue size ever observed.
	MaxSize int `json:"max_size"`

	// ByPriority is the count per priority level.
	ByPriority map[JobPriority]int `json:"by_priority"`

	// ByType is the count per job type.
	ByType map[JobType]int `json:"by_type"`

	// AvgWaitTime is the average time jobs spend in the queue.
	AvgWaitTime time.Duration `json:"avg_wait_time"`

	// MaxWaitTime is the maximum time a job spent in the queue.
	MaxWaitTime time.Duration `json:"max_wait_time"`
	// contains filtered or unexported fields
}

QueueStats contains queue statistics.

type RateLimitInfo

type RateLimitInfo struct {
	// AvailableTokens is the current token count.
	AvailableTokens float64 `json:"available_tokens"`

	// Rate is tokens per second.
	Rate float64 `json:"rate"`

	// Capacity is the maximum tokens.
	Capacity float64 `json:"capacity"`

	// WaitingCount is requests waiting for tokens.
	WaitingCount int `json:"waiting_count"`

	// NextTokenInMs is milliseconds until next token.
	NextTokenInMs int64 `json:"next_token_in_ms"`
}

RateLimitInfo contains rate limiter state for display.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements a token bucket rate limiter for spawn operations. It controls the rate at which spawn jobs can be executed to prevent resource exhaustion and API rate limits.

func NewRateLimiter

func NewRateLimiter(cfg LimiterConfig) *RateLimiter

NewRateLimiter creates a new rate limiter with the given configuration.

func (*RateLimiter) AvailableTokens

func (r *RateLimiter) AvailableTokens() float64

AvailableTokens returns the current number of available tokens.

func (*RateLimiter) Reset

func (r *RateLimiter) Reset()

Reset resets the limiter to initial state.

func (*RateLimiter) SetCapacity

func (r *RateLimiter) SetCapacity(capacity float64)

SetCapacity updates the maximum bucket capacity.

func (*RateLimiter) SetMinInterval

func (r *RateLimiter) SetMinInterval(interval time.Duration)

SetMinInterval updates the minimum interval between operations.

func (*RateLimiter) SetRate

func (r *RateLimiter) SetRate(rate float64)

SetRate updates the token refill rate.

func (*RateLimiter) Stats

func (r *RateLimiter) Stats() LimiterStats

Stats returns a copy of the current statistics.

func (*RateLimiter) TimeUntilNextToken

func (r *RateLimiter) TimeUntilNextToken() time.Duration

TimeUntilNextToken returns the estimated time until a token is available.

func (*RateLimiter) TryAcquire

func (r *RateLimiter) TryAcquire() bool

TryAcquire tries to acquire a token without blocking. Returns true if a token was acquired, false otherwise.

func (*RateLimiter) Wait

func (r *RateLimiter) Wait(ctx context.Context) error

Wait blocks until a token is available or context is cancelled. Returns nil if a token was acquired, or an error if cancelled/timed out.

func (*RateLimiter) Waiting

func (r *RateLimiter) Waiting() int

Waiting returns the number of requests currently waiting.

type ResourceError

type ResourceError struct {
	Original   error
	Type       ResourceErrorType
	ExitCode   int
	StderrHint string
	Retryable  bool
	Timestamp  time.Time
}

ResourceError wraps an error with resource exhaustion classification.

func ClassifyError

func ClassifyError(err error, exitCode int, stderr string) *ResourceError

ClassifyError examines an error and classifies it as a resource exhaustion error.

func (*ResourceError) Error

func (e *ResourceError) Error() string

func (*ResourceError) Unwrap

func (e *ResourceError) Unwrap() error

type ResourceErrorType

type ResourceErrorType string

ResourceErrorType classifies the type of resource exhaustion error.

const (
	ResourceErrorNone      ResourceErrorType = ""
	ResourceErrorEAGAIN    ResourceErrorType = "EAGAIN"
	ResourceErrorENOMEM    ResourceErrorType = "ENOMEM"
	ResourceErrorENFILE    ResourceErrorType = "ENFILE"
	ResourceErrorEMFILE    ResourceErrorType = "EMFILE"
	ResourceErrorRateLimit ResourceErrorType = "RATE_LIMIT"
)

type ResourceLimits

type ResourceLimits struct {
	// UlimitNproc is the user's process limit from ulimit -u.
	UlimitNproc int `json:"ulimit_nproc"`

	// CgroupPidsMax is the cgroup v2 pids.max limit (0 if not available).
	CgroupPidsMax int `json:"cgroup_pids_max"`

	// SystemdTasksMax is the systemd TasksMax limit (0 if not available).
	SystemdTasksMax int `json:"systemd_tasks_max"`

	// KernelPidMax is the kernel's pid_max from /proc/sys/kernel/pid_max.
	KernelPidMax int `json:"kernel_pid_max"`

	// EffectiveLimit is min(all non-zero limits).
	EffectiveLimit int `json:"effective_limit"`

	// Source indicates which limit is the effective one.
	Source string `json:"source"`
}

ResourceLimits holds the effective resource limits from various sources.

type ResourceUsage

type ResourceUsage struct {
	// CgroupPidsCurrent is the cgroup v2 pids.current (0 if not available).
	CgroupPidsCurrent int `json:"cgroup_pids_current"`

	// ProcessCount is the number of processes owned by the current user.
	ProcessCount int `json:"process_count"`

	// EffectiveUsage is the best estimate of current usage.
	EffectiveUsage int `json:"effective_usage"`

	// Source indicates which usage metric is being used.
	Source string `json:"source"`
}

ResourceUsage holds the current resource usage.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the global spawn scheduler that serializes and paces all pane and agent creation operations.

func Global

func Global() *Scheduler

Global returns the global scheduler instance, creating it if necessary.

func New

func New(cfg Config) *Scheduler

New creates a new scheduler with the given configuration.

func (*Scheduler) Cancel

func (s *Scheduler) Cancel(jobID string) bool

Cancel cancels a job by ID.

func (*Scheduler) CancelBatch

func (s *Scheduler) CancelBatch(batchID string) int

CancelBatch cancels all jobs in a batch.

func (*Scheduler) CancelSession

func (s *Scheduler) CancelSession(sessionName string) int

CancelSession cancels all jobs for a session.

func (*Scheduler) EstimateETA

func (s *Scheduler) EstimateETA(jobID string) (time.Duration, error)

EstimateETA estimates when a queued job will start.

func (*Scheduler) GetJob

func (s *Scheduler) GetJob(jobID string) *SpawnJob

GetJob returns a job by ID.

func (*Scheduler) GetProgress

func (s *Scheduler) GetProgress() *Progress

GetProgress returns the current spawn progress.

func (*Scheduler) GetQueuedJobs

func (s *Scheduler) GetQueuedJobs() []*SpawnJob

GetQueuedJobs returns all queued jobs.

func (*Scheduler) GetRecentCompleted

func (s *Scheduler) GetRecentCompleted(limit int) []*SpawnJob

GetRecentCompleted returns recently completed jobs.

func (*Scheduler) GetRunningJobs

func (s *Scheduler) GetRunningJobs() []*SpawnJob

GetRunningJobs returns all currently running jobs.

func (*Scheduler) IsPaused

func (s *Scheduler) IsPaused() bool

IsPaused returns true if the scheduler is paused.

func (*Scheduler) Pause

func (s *Scheduler) Pause()

Pause pauses job processing.

func (*Scheduler) Resume

func (s *Scheduler) Resume()

Resume resumes job processing.

func (*Scheduler) SetExecutor

func (s *Scheduler) SetExecutor(executor SpawnExecutor)

SetExecutor sets the function that executes spawn jobs.

func (*Scheduler) SetHooks

func (s *Scheduler) SetHooks(hooks Hooks)

SetHooks sets the lifecycle hooks.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start starts the scheduler workers.

func (*Scheduler) Stats

func (s *Scheduler) Stats() Stats

Stats returns scheduler statistics.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop gracefully stops the scheduler.

func (*Scheduler) Submit

func (s *Scheduler) Submit(job *SpawnJob) error

Submit submits a new spawn job to the scheduler.

func (*Scheduler) SubmitBatch

func (s *Scheduler) SubmitBatch(jobs []*SpawnJob) (string, error)

SubmitBatch submits multiple jobs as a batch.

type SessionProgress

type SessionProgress struct {
	// SessionName is the session name.
	SessionName string `json:"session_name"`

	// QueuedCount is queued jobs for this session.
	QueuedCount int `json:"queued_count"`

	// RunningCount is running jobs for this session.
	RunningCount int `json:"running_count"`

	// CompletedCount is completed jobs for this session.
	CompletedCount int `json:"completed_count"`

	// FailedCount is failed jobs for this session.
	FailedCount int `json:"failed_count"`

	// TotalPanes is the total panes being spawned.
	TotalPanes int `json:"total_panes"`

	// PanesReady is panes that are ready.
	PanesReady int `json:"panes_ready"`

	// ProgressPercent is the completion percentage.
	ProgressPercent float64 `json:"progress_percent"`
}

SessionProgress groups progress by session.

type SpawnExecutor

type SpawnExecutor func(ctx context.Context, job *SpawnJob) error

SpawnExecutor is a function that executes a spawn job.

type SpawnJob

type SpawnJob struct {
	// ID is a unique identifier for this job.
	ID string `json:"id"`

	// Type is the kind of spawn operation.
	Type JobType `json:"type"`

	// Priority determines processing order.
	Priority JobPriority `json:"priority"`

	// SessionName is the target session.
	SessionName string `json:"session_name"`

	// AgentType is the type of agent (cc, cod, gmi) if applicable.
	AgentType string `json:"agent_type,omitempty"`

	// PaneIndex is the target pane index if applicable.
	PaneIndex int `json:"pane_index,omitempty"`

	// Directory is the working directory for the spawn.
	Directory string `json:"directory,omitempty"`

	// Status is the current job status.
	Status JobStatus `json:"status"`

	// CreatedAt is when the job was created.
	CreatedAt time.Time `json:"created_at"`

	// ScheduledAt is when the job was approved by rate limiter.
	ScheduledAt time.Time `json:"scheduled_at,omitempty"`

	// StartedAt is when execution began.
	StartedAt time.Time `json:"started_at,omitempty"`

	// CompletedAt is when the job finished (success or failure).
	CompletedAt time.Time `json:"completed_at,omitempty"`

	// Error contains any error message if failed.
	Error string `json:"error,omitempty"`

	// RetryCount is the number of retry attempts.
	RetryCount int `json:"retry_count"`

	// MaxRetries is the maximum number of retries allowed.
	MaxRetries int `json:"max_retries"`

	// RetryDelay is the delay before next retry.
	RetryDelay time.Duration `json:"retry_delay,omitempty"`

	// Metadata contains additional context for the job.
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// Result contains the result of the spawn operation.
	Result *SpawnResult `json:"result,omitempty"`

	// BatchID groups related jobs for fairness tracking.
	BatchID string `json:"batch_id,omitempty"`

	// ParentJobID is the ID of the parent job if this is a sub-job.
	ParentJobID string `json:"parent_job_id,omitempty"`

	// Callback is called when the job completes (success or failure).
	Callback func(*SpawnJob) `json:"-"`
	// contains filtered or unexported fields
}

SpawnJob represents a single spawn operation in the queue.

func NewSpawnJob

func NewSpawnJob(id string, jobType JobType, sessionName string) *SpawnJob

NewSpawnJob creates a new spawn job with sensible defaults.

func (*SpawnJob) CanRetry

func (j *SpawnJob) CanRetry() bool

CanRetry returns true if the job can be retried.

func (*SpawnJob) Cancel

func (j *SpawnJob) Cancel()

Cancel cancels this job.

func (*SpawnJob) Clone

func (j *SpawnJob) Clone() *SpawnJob

Clone creates a copy of the job for reporting (without callbacks/context).

func (*SpawnJob) Context

func (j *SpawnJob) Context() context.Context

Context returns the job's context.

func (*SpawnJob) ExecutionDuration

func (j *SpawnJob) ExecutionDuration() time.Duration

ExecutionDuration returns how long the job took to execute.

func (*SpawnJob) GetStatus

func (j *SpawnJob) GetStatus() JobStatus

GetStatus returns the current status.

func (*SpawnJob) IncrementRetry

func (j *SpawnJob) IncrementRetry()

IncrementRetry increments the retry count and updates status.

func (*SpawnJob) IsCancelled

func (j *SpawnJob) IsCancelled() bool

IsCancelled returns true if the job was cancelled.

func (*SpawnJob) IsTerminal

func (j *SpawnJob) IsTerminal() bool

IsTerminal returns true if the job is in a terminal state.

func (*SpawnJob) QueueDuration

func (j *SpawnJob) QueueDuration() time.Duration

QueueDuration returns how long the job has been in the queue.

func (*SpawnJob) SetError

func (j *SpawnJob) SetError(err error)

SetError sets an error on the job.

func (*SpawnJob) SetStatus

func (j *SpawnJob) SetStatus(status JobStatus)

SetStatus updates the job status with proper timestamps.

func (*SpawnJob) TotalDuration

func (j *SpawnJob) TotalDuration() time.Duration

TotalDuration returns the total time from creation to completion.

type SpawnResult

type SpawnResult struct {
	// SessionName is the created/target session.
	SessionName string `json:"session_name"`

	// PaneID is the created pane ID.
	PaneID string `json:"pane_id,omitempty"`

	// PaneIndex is the index of the pane.
	PaneIndex int `json:"pane_index,omitempty"`

	// AgentType is the launched agent type.
	AgentType string `json:"agent_type,omitempty"`

	// Title is the pane title.
	Title string `json:"title,omitempty"`

	// Duration is how long the spawn took.
	Duration time.Duration `json:"duration"`
}

SpawnResult contains the result of a successful spawn operation.

type Stats

type Stats struct {
	// TotalSubmitted is jobs submitted to scheduler.
	TotalSubmitted int64 `json:"total_submitted"`

	// TotalCompleted is jobs that completed successfully.
	TotalCompleted int64 `json:"total_completed"`

	// TotalFailed is jobs that failed after all retries.
	TotalFailed int64 `json:"total_failed"`

	// TotalRetried is the total number of retry attempts.
	TotalRetried int64 `json:"total_retried"`

	// CurrentQueueSize is the current queue size.
	CurrentQueueSize int `json:"current_queue_size"`

	// CurrentRunning is the number of currently running jobs.
	CurrentRunning int `json:"current_running"`

	// AvgQueueTime is the average time jobs spend in queue.
	AvgQueueTime time.Duration `json:"avg_queue_time"`

	// AvgExecutionTime is the average job execution time.
	AvgExecutionTime time.Duration `json:"avg_execution_time"`

	// IsPaused indicates if the scheduler is paused.
	IsPaused bool `json:"is_paused"`

	// StartedAt is when the scheduler started.
	StartedAt time.Time `json:"started_at"`

	// Uptime is how long the scheduler has been running.
	Uptime time.Duration `json:"uptime"`

	// LimiterStats contains rate limiter statistics.
	LimiterStats LimiterStats `json:"limiter_stats"`

	// QueueStats contains queue statistics.
	QueueStats QueueStats `json:"queue_stats"`

	// BackoffStats contains backoff statistics.
	BackoffStats BackoffStats `json:"backoff_stats"`

	// CapsStats contains per-agent concurrency cap statistics.
	CapsStats CapsStats `json:"caps_stats"`

	// InGlobalBackoff indicates if global backoff is active.
	InGlobalBackoff bool `json:"in_global_backoff"`

	// RemainingBackoff is the remaining backoff duration if in global backoff.
	RemainingBackoff time.Duration `json:"remaining_backoff,omitempty"`

	// HeadroomStatus contains resource headroom status.
	HeadroomStatus HeadroomStatus `json:"headroom_status"`
}

Stats contains scheduler statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL