job

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2025 License: MIT Imports: 27 Imported by: 0

README

go-job

go-job is a flexible job runner and scheduler written in Go that allows you to embed configuration metadata directly into your job files. By extracting configuration options from comments across different file types (YAML, shell, JavaScript, SQL, etc.), go-job makes it easy to define job behavior alongside your scripts.

Features

  • Multi-format Metadata Extraction: Supports extracting configuration from:
    • YAML Front Matter: Using the standard --- markers.
    • Shell Scripts: Metadata specified in comment lines using #.
    • SQL Scripts: Metadata specified using -- comments.
    • JavaScript:
      • Single-line Comments: e.g. // config ...
      • Block Comments: e.g.
        /** config
         * schedule: "0 12 * * *"
         * timeout: 300s
         * retries: 3
         * debug: true
         * run_once: true
         * script_type: shell
         * transaction: true
         * env:
         *  APP_NAME: "test"
         *  API_KEY: "my-secret-key"
         * metadata:
         *  key: value
         */
        
  • Multiple Execution Engines:
    • Shell Engine: Execute shell scripts with environment variables and timeout control
    • JavaScript Engine: Run JavaScript code with Node.js-like environment (uses goja)
    • SQL Engine: Execute SQL scripts with transaction support
  • Source Providers: Flexible system for loading script content from different sources
  • Configurable Registry: Store and retrieve jobs with the in-memory registry
  • Task Scheduling: Support for cron-like scheduling expressions
  • Robust Timeout Handling: Configure timeouts at both the engine and job level
  • Metadata-driven Configuration: Extract job configuration directly from script file comments
  • Extensible Architecture: Easily add new script types or execution engines

Installation

go get github.com/goliatone/go-job

Usage

Basic Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/goliatone/go-job"
)

func main() {
    // Create a registry to store jobs
    registry := job.NewMemoryRegistry()

    // Create execution engines
    shellEngine := job.NewShellRunner(
        job.WithShellTimeout(time.Minute),
    )

    jsEngine := job.NewJSRunner(
        job.WithJSTimeout(time.Minute),
    )

    sqlEngine := job.NewSQLRunner(
        job.WithSQLDatabase("postgres", "postgres://user:pass@localhost/db"),
    )

    // Create a source provider to discover scripts
    sourceProvider := job.NewFileSystemSourceProvider("./scripts")

    // Create a task creator with all available engines
    taskCreator := job.NewTaskCreator(
        sourceProvider,
        []job.Engine{shellEngine, jsEngine, sqlEngine},
    )

    // Create context
    ctx := context.Background()

    // Discover and create tasks
    tasks, err := taskCreator.CreateTasks(ctx)
    if err != nil {
        fmt.Printf("Error creating tasks: %v\n", err)
        return
    }

    // Register tasks
    for _, task := range tasks {
        if err := registry.Add(task); err != nil {
            fmt.Printf("Error adding task %s: %v\n", task.GetID(), err)
        }
    }

    // List all registered tasks
    allTasks := registry.List()
    fmt.Printf("Registered %d tasks\n", len(allTasks))

    // Execute a specific task by ID
    taskID := "my-task.js"
    if task, found := registry.Get(taskID); found {
        handler := task.GetHandler()
        if err := handler(); err != nil {
            fmt.Printf("Error executing task %s: %v\n", taskID, err)
        }
    }
}
Creating a Script with Metadata
JavaScript Example
/** config
 * schedule: "0 */5 * * * *"  // Run every 5 minutes
 * timeout: 30s
 * retries: 2
 * run_once: false
 * env:
 *   API_KEY: "my-secret-key"
 *   DEBUG: "true"
 */

console.log("Starting job execution");

// Access environment variables
console.log(`API Key: ${API_KEY}`);

// Make HTTP requests using fetch
fetch("https://api.example.com/data")
  .then(response => response.json())
  .then(data => {
    console.log("Received data:", data);
  })
  .catch(error => {
    console.error("Error fetching data:", error);
  });
Shell Script Example
#!/bin/bash
# config
# schedule: "0 0 * * *"  # Run daily at midnight
# timeout: 120s
# retries: 3
# env:
#   DB_HOST: localhost
#   DB_USER: admin

echo "Running backup script"
pg_dump -h "$DB_HOST" -U "$DB_USER" my_database > /backups/backup-$(date +%Y%m%d).sql
SQL Script Example
-- config
-- schedule: "0 4 * * *"
-- timeout: 60s
-- transaction: true
-- metadata:
--   driver: postgres
--   dsn: postgres://user:password@localhost/mydb

-- This script will run in a transaction
INSERT INTO audit_log (event_type, description)
VALUES ('DAILY_CLEANUP', 'Removing old records');

DELETE FROM temporary_data
WHERE created_at < NOW() - INTERVAL '30 days';
Executing a Job Manually
package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/goliatone/go-job"
)

func main() {
    // Create a shell engine
    shellEngine := job.NewShellRunner(
        job.WithShellTimeout(time.Minute),
    )

    // Read script content
    content, err := os.ReadFile("./scripts/backup.sh")
    if err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    // Parse metadata and script content
    config, scriptContent, err := job.NewYAMLMetadataParser().Parse(content)
    if err != nil {
        fmt.Printf("Error parsing metadata: %v\n", err)
    }

    // Create execution message
    msg := &job.ExecutionMessage{
        JobID:      "backup.sh",
        ScriptPath: "./scripts/backup.sh",
        Config:     config,
        Parameters: map[string]interface{}{
            "script": scriptContent,
        },
    }

    // Execute the script
    ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    defer cancel()

    if err := shellEngine.Execute(ctx, msg); err != nil {
        fmt.Printf("Error executing script: %v\n", err)
        return
    }

    fmt.Println("Script executed successfully")
}

Configuration Options

Common Configuration Options
Option Description Default
schedule Cron expression for scheduling * * * * *
timeout Maximum execution time 1 minute
no_timeout Disable execution timeout false
retries Number of retry attempts 0
debug Enable debug mode false
run_once Run job only once false
script_type Override script type detection Auto-detected
env Environment variables for execution {}
metadata Additional metadata for engines {}
Engine-Specific Options
SQL Engine
Option Description
transaction Execute SQL in a transaction
driver SQL driver name (in metadata)
dsn Data source name (in metadata)
Shell Engine
Option Description
use_env Pass system environment variables (in metadata)

Architecture

go-job uses a modular architecture with several key components:

  • Engines: Responsible for executing specific script types
  • Registry: Stores and manages job definitions
  • MetadataParser: Extracts configuration from script content
  • SourceProvider: Loads script content from various sources
  • TaskCreator: Creates task instances from scripts
  • Task: Represents a job with its configuration and handler

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultTimeout is used to setup the default timeout for tasks
	DefaultTimeout  = time.Minute
	DefaultSchedule = "* * * * *"
)
View Source
var DefaultMatchPatterns = []MatchPattern{
	{
		Name:          "yaml",
		StartPattern:  `^---\s*$`,
		EndPattern:    `^---\s*$`,
		CommentPrefix: "",
	},
	{
		Name:          "javascript",
		StartPattern:  `^/{2,}\s*config`,
		EndPattern:    `^(?!/{2,})`,
		CommentPrefix: "//",
		IsBlock:       false,
	},
	{
		Name:          "javascript_block",
		StartPattern:  `^/\*\*\s*config(.*)$`,
		EndPattern:    `^\*/`,
		CommentPrefix: "*",
		IsBlock:       true,
	},
	{
		Name:          "shell",
		StartPattern:  `^#{1,}\s*config`,
		EndPattern:    `^(?!#{1,})`,
		CommentPrefix: "#",
		IsBlock:       false,
	},
	{
		Name:          "sql",
		StartPattern:  `^-{2,}\s*config`,
		EndPattern:    `^(?!-{2,})`,
		CommentPrefix: "--",
		IsBlock:       false,
	},
}
View Source
var LoggerEnabled = false

Functions

func NewMemoryRegistry

func NewMemoryRegistry() *memoryRegistry

func NewTaskCreator

func NewTaskCreator(provider SourceProvider, engines []Engine) *taskCreator

func NewYAMLMetadataParser

func NewYAMLMetadataParser(patterns ...MatchPattern) *yamlMetadataParser

Types

type BaseEngine

type BaseEngine struct {
	FileExtensions []string
	Timeout        time.Duration
	MetadataParser MetadataParser
	FS             fs.FS
	SourceProvider SourceProvider
	EngineType     string
	Self           Engine
	// contains filtered or unexported fields
}

func NewBaseEngine

func NewBaseEngine(self Engine, engingeType string, exts ...string) *BaseEngine

func (*BaseEngine) CanHandle

func (e *BaseEngine) CanHandle(path string) bool

CanHandle checks if this engine can process the given file based on its extension

func (*BaseEngine) GetExecutionContext

func (e *BaseEngine) GetExecutionContext(ctx context.Context) (context.Context, context.CancelFunc)

func (*BaseEngine) GetExecutionTimeout

func (e *BaseEngine) GetExecutionTimeout(ctx context.Context) time.Duration

func (*BaseEngine) GetScriptContent

func (e *BaseEngine) GetScriptContent(msg *ExecutionMessage) (string, error)

func (*BaseEngine) Name

func (e *BaseEngine) Name() string

Name returns the engine identifier

func (*BaseEngine) ParseJob

func (e *BaseEngine) ParseJob(path string, content []byte) (Task, error)

ParseJob extracts metadata and content from a job script file

type Config

type Config struct {
	Schedule    string            `yaml:"schedule" json:"schedule"`
	Retries     int               `yaml:"retries" json:"retries"`
	Timeout     time.Duration     `yaml:"duration" json:"duration"`
	NoTimeout   bool              `yaml:"no_timeout" json:"no_timeout"`
	Debug       bool              `yaml:"debug" json:"debug"`
	RunOnce     bool              `yaml:"run_once" json:"run_once"`
	ScriptType  string            `yaml:"script_type" json:"script_type"`
	Transaction bool              `yaml:"transaction" json:"transaction"`
	Metadata    map[string]any    `yaml:"metadata" json:"metadata"`
	Env         map[string]string `yaml:"env" json:"env"`
}

handler options Deadline time.Time `json:"deadline"` MaxRetries int `json:"max_retries"` MaxRuns int `json:"max_runs"` RunOnce bool `json:"run_once"`

type Engine

type Engine interface {
	Name() string
	ParseJob(path string, content []byte) (Task, error)
	CanHandle(path string) bool
	Execute(ctx context.Context, msg *ExecutionMessage) error
}

type ExecutionMessage

type ExecutionMessage struct {
	command.BaseMessage
	JobID          string
	ScriptPath     string
	Config         Config
	Parameters     map[string]any
	OutputCallback func(stdout, stderr string)
}

ExecutionMessage represents a request to execute a job script

func (ExecutionMessage) Type

func (msg ExecutionMessage) Type() string

Type returns the message type for the command system

func (ExecutionMessage) Validate

func (msg ExecutionMessage) Validate() error

Validate ensures the message contains required fields

type FetchOptions

type FetchOptions struct {
	Method  string            `json:"method"`
	Headers map[string]string `json:"headers"`
	Body    any               `json:"body"`
	Timeout int               `json:"timeout"` //milliseconds
}

FetchOptions represents the options for the fetch function

type FetchResponse

type FetchResponse struct {
	Status     int                 `json:"status"`
	StatusText string              `json:"statusText"`
	Headers    map[string][]string `json:"headers"`
	URL        string              `json:"url"`
	Body       []byte              `json:"-"`
}

FetchResponse represents the response from a fetch call

type FileSystemSourceProvider

type FileSystemSourceProvider struct {
	// contains filtered or unexported fields
}

func NewFileSystemSourceProvider

func NewFileSystemSourceProvider(rootDir string, fss ...fs.FS) *FileSystemSourceProvider

func (*FileSystemSourceProvider) GetScript

func (p *FileSystemSourceProvider) GetScript(path string) ([]byte, error)

func (*FileSystemSourceProvider) ListScripts

func (p *FileSystemSourceProvider) ListScripts(ctx context.Context) ([]ScriptInfo, error)

type JSEngine

type JSEngine struct {
	*BaseEngine
	// contains filtered or unexported fields
}

func NewJSRunner

func NewJSRunner(opts ...JSOption) *JSEngine

func (*JSEngine) Execute

func (e *JSEngine) Execute(ctx context.Context, msg *ExecutionMessage) error

Execute runs a JavaScript file in a Node-like environment using goja_nodejs' eventloop.

type JSOption

type JSOption func(*JSEngine)

func WithJSExtension

func WithJSExtension(ext string) JSOption

func WithJSFS

func WithJSFS(dirfs fs.FS) JSOption

WithJSFS sets the default execution timeout

func WithJSLogger added in v0.3.0

func WithJSLogger(logger Logger) JSOption

func WithJSMetadataParser

func WithJSMetadataParser(parser MetadataParser) JSOption

WithJSMetadataParser sets a custom metadata parser

func WithJSModuleLoader

func WithJSModuleLoader(loader func(path string) ([]byte, error)) JSOption

func WithJSPanicHandler

func WithJSPanicHandler(handler func(funcName string, fields ...map[string]any)) JSOption

func WithJSPathResolver

func WithJSPathResolver(resolver func(base, path string) string) JSOption

func WithJSTimeout

func WithJSTimeout(timeout time.Duration) JSOption

WithJSTimeout sets the default execution timeout

type Logger added in v0.3.0

type Logger interface {
	Debug(format string, args ...any)
	Info(format string, args ...any)
	Warn(format string, args ...any)
	Error(format string, args ...any)
}

type MatchPattern

type MatchPattern struct {
	Name          string
	StartPattern  string
	EndPattern    string
	CommentPrefix string
	IsBlock       bool // true for block comment styles (e.g. /** ... */)
}

type MetadataParser

type MetadataParser interface {
	Parse(content []byte) (Config, string, error)
}

type Option

type Option func(*Runner)

func WithErrorHandler

func WithErrorHandler(handler func(Task, error)) Option

func WithMetadataParser

func WithMetadataParser(parser MetadataParser) Option

func WithRegistry

func WithRegistry(registry Registry) Option

func WithTaskCreator

func WithTaskCreator(creator TaskCreator) Option

type Processor added in v0.2.0

type Processor interface {
	Process([]byte) ([]byte, error)
}

type Registry

type Registry interface {
	Add(job Task) error
	Get(id string) (Task, bool)
	List() []Task
}

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(opts ...Option) *Runner

func (*Runner) RegisteredTasks

func (r *Runner) RegisteredTasks() []Task

func (*Runner) Start

func (r *Runner) Start(ctx context.Context) error

func (*Runner) Stop

func (r *Runner) Stop(_ context.Context) error

type SQLEngine

type SQLEngine struct {
	*BaseEngine
	// contains filtered or unexported fields
}

func NewSQLRunner

func NewSQLRunner(opts ...SQLOption) *SQLEngine

func (*SQLEngine) Execute

func (e *SQLEngine) Execute(ctx context.Context, msg *ExecutionMessage) error

type SQLOption

type SQLOption func(*SQLEngine)

func WithSQLClient added in v0.2.0

func WithSQLClient(db *sql.DB) SQLOption

WithSQLClient sets the db client

func WithSQLDatabase

func WithSQLDatabase(driverName, dataSourceName string) SQLOption

WithDatabase sets the database connection

func WithSQLExtension

func WithSQLExtension(ext string) SQLOption

WithSQLExtension adds file extensions that this SQLOption can handle

func WithSQLFS

func WithSQLFS(dirfs fs.FS) SQLOption

WithSQLFS sets the default filesystem timeout

func WithSQLLogger added in v0.3.0

func WithSQLLogger(logger Logger) SQLOption

func WithSQLMetadataParser

func WithSQLMetadataParser(parser MetadataParser) SQLOption

WithSQLMetadataParser sets a custom metadata parser

func WithSQLTimeout

func WithSQLTimeout(timeout time.Duration) SQLOption

WithTimeout sets the default execution timeout

type ScheduleQuotesProcessor added in v0.2.0

type ScheduleQuotesProcessor struct{}

ScheduleQuotesProcessor ensures that schedule values like @every are properly quoted so the parser does not barf an error

func (*ScheduleQuotesProcessor) Process added in v0.2.0

func (s *ScheduleQuotesProcessor) Process(data []byte) ([]byte, error)

type ScriptInfo

type ScriptInfo struct {
	ID      string
	Path    string
	Content []byte
}

type ShellEngine

type ShellEngine struct {
	*BaseEngine
	// contains filtered or unexported fields
}

func NewShellRunner

func NewShellRunner(opts ...ShellOption) *ShellEngine

func (*ShellEngine) Execute

func (e *ShellEngine) Execute(ctx context.Context, msg *ExecutionMessage) error

type ShellOption

type ShellOption func(*ShellEngine)

func WithShellEnvironment

func WithShellEnvironment(env []string) ShellOption

WithShellEnvironment sets additional environment variables

func WithShellExtension

func WithShellExtension(ext string) ShellOption

WithShellExtension adds file extensions that this engine can handle

func WithShellFS

func WithShellFS(dirfs fs.FS) ShellOption

WithShellFS sets the default filesystem timeout

func WithShellLogger added in v0.3.0

func WithShellLogger(logger Logger) ShellOption

func WithShellMetadataParser

func WithShellMetadataParser(parser MetadataParser) ShellOption

WithShellMetadataParser sets a custom metadata parser

func WithShellShell

func WithShellShell(shell string, args ...string) ShellOption

WithShellShell sets the shell executable and arguments

func WithShellTimeout

func WithShellTimeout(timeout time.Duration) ShellOption

WithShellTimeout sets the default execution timeout

func WithShellWorkingDirectory

func WithShellWorkingDirectory(dir string) ShellOption

WithShellWorkingDirectory sets the working directory for script execution

type SourceProvider

type SourceProvider interface {
	GetScript(path string) (content []byte, err error)
	ListScripts(ctx context.Context) ([]ScriptInfo, error)
}

type Task

type Task interface {
	GetID() string
	GetHandler() func() error
	GetHandlerConfig() command.HandlerConfig
	GetConfig() Config
}

Task represents a schedulable job discovered from the filesystem

func NewBaseTask

func NewBaseTask(
	id, path, scriptType string,
	config Config,
	scriptContent string,
	engine Engine,
) Task

type TaskCreator

type TaskCreator interface {
	CreateTasks(ctx context.Context) ([]Task, error)
}

type TaskRunner

type TaskRunner interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	RegisteredTasks() []Task
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL