goworker2

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2025 License: MIT Imports: 0 Imported by: 0

README

goworker2

Build GoDoc

goworker2 is a Go-based background job processing library with pluggable components. It provides a clean, modular architecture supporting multiple queue backends, serializers, and statistics providers.

Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework that can work with Redis, RabbitMQ, and custom backends.

Note: This is a complete rewrite and modernization of the original goworker library by Benjamin Manns, designed as a new project rather than a backwards-compatible upgrade. We're grateful for the inspiration and foundation provided by the original work.

Features

  • Multiple Queue Backends: Redis, RabbitMQ, or bring your own
  • Pluggable Serializers: JSON, Resque, Sneakers/ActiveJob, or custom formats
  • Statistics Providers: Resque-compatible, NoOp, or custom monitoring
  • Pre-configured Engines: Ready-to-use setups for common scenarios
  • Graceful Shutdown: Proper signal handling and worker cleanup
  • Concurrent Processing: Configurable worker pools with job distribution
  • Health Monitoring: Built-in health checks and statistics

Quick Start

Using Pre-configured Engines

The easiest way to get started is with pre-configured engines:

Redis with Resque Compatibility
package main

import (
	"context"
	"log"
	
	"github.com/BranchIntl/goworker2/engines"
)

func emailJob(queue string, args ...interface{}) error {
	// Process email job
	return nil
}

func main() {
	engine := engines.NewResqueEngine(engines.DefaultResqueOptions())
	engine.Register("EmailJob", emailJob)
	
	if err := engine.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}
RabbitMQ with ActiveJob Compatibility
package main

import (
	"context"
	"log"
	
	"github.com/BranchIntl/goworker2/engines"
)

func imageProcessor(queue string, args ...interface{}) error {
	// Process image
	return nil
}

func main() {
	engine := engines.NewSneakersEngine(engines.DefaultSneakersOptions())
	engine.Register("ImageProcessor", imageProcessor)
	
	if err := engine.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}
Custom Configuration

For more control, you can configure components manually:

package main

import (
	"context"
	"log"
	"time"
	
	"github.com/BranchIntl/goworker2/brokers/redis"
	"github.com/BranchIntl/goworker2/core"
	"github.com/BranchIntl/goworker2/registry"
	"github.com/BranchIntl/goworker2/serializers/resque"
	"github.com/BranchIntl/goworker2/statistics/resque"
)

func main() {
	// Create components
	broker := redis.NewBroker(redis.DefaultOptions(), resque.NewSerializer())
	stats := resque.NewStatistics(resque.DefaultOptions())
	registry := registry.NewRegistry()
	
	// Create engine with custom options
	engine := core.NewEngine(
		broker,
		stats,
		registry,
		resque.NewSerializer(),
		core.WithConcurrency(10),
		core.WithQueues([]string{"critical", "default"}),
		core.WithPollInterval(5*time.Second),
	)
	
	// Register workers
	registry.Register("MyJob", func(queue string, args ...interface{}) error {
		// Handle job
		return nil
	})
	
	// Start processing
	if err := engine.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Installation

go get github.com/BranchIntl/goworker2

Acknowledgments

This project was inspired by and builds upon the concepts from the original goworker library by Benjamin Manns. While this is a complete rewrite with different architecture and capabilities, we acknowledge and appreciate the foundational work that made this project possible.

Architecture

goworker2 uses a modular architecture with dependency injection:

┌─────────────────┐
│     Engine      │  ← Orchestrates components
├─────────────────┤
│   Broker        │  ← Queue backend (Redis/RabbitMQ)
│   Statistics    │  ← Metrics and monitoring
│   Registry      │  ← Worker function registry
│   Serializer    │  ← Job serialization format
│   WorkerPool    │  ← Manages concurrent workers
│   Poller        │  ← Polls queues for jobs
└─────────────────┘
Components
  • Broker: Handles queue operations (enqueue, dequeue, ack/nack)
  • Statistics: Records metrics and worker information
  • Registry: Maps job classes to worker functions
  • Serializer: Converts jobs to/from bytes
  • Engine: Orchestrates all components and handles lifecycle
Pre-configured Engines
  • ResqueEngine: Redis + Resque serializer + Resque statistics (Ruby Resque compatibility)
  • SneakersEngine: RabbitMQ + ActiveJob serializer + NoOp statistics (Rails ActiveJob compatibility)

See engines/ directory for detailed engine documentation.

Configuration

Engine Options
engine := core.NewEngine(
	broker, stats, registry, serializer,
	core.WithConcurrency(25),                    // Number of workers
	core.WithQueues([]string{"high", "low"}),    // Queue names
	core.WithPollInterval(5*time.Second),        // Polling frequency
	core.WithShutdownTimeout(30*time.Second),    // Graceful shutdown timeout
	core.WithJobBufferSize(100),                 // Job channel buffer
	core.WithExitOnComplete(false),              // Exit when queues empty
)
Broker Options
Redis
options := redis.DefaultOptions()
options.URI = "redis://localhost:6379/"
options.Namespace = "jobs:"
options.MaxConnections = 10
RabbitMQ
options := rabbitmq.DefaultOptions()
options.URI = "amqp://guest:guest@localhost:5672/"
options.Exchange = "jobs"
options.PrefetchCount = 1

Worker Functions

Worker functions must match this signature:

func(queue string, args ...interface{}) error
Type Assertions

Use Go type assertions to handle job arguments:

func processUser(queue string, args ...interface{}) error {
	if len(args) != 2 {
		return fmt.Errorf("expected 2 arguments, got %d", len(args))
	}
	
	userID, ok := args[0].(float64)  // JSON numbers are float64
	if !ok {
		return fmt.Errorf("invalid user ID type")
	}
	
	action, ok := args[1].(string)
	if !ok {
		return fmt.Errorf("invalid action type")
	}
	
	// Process user
	return processUserAction(int(userID), action)
}

Signal Handling

goworker handles these signals automatically:

  • SIGINT/SIGTERM: Graceful shutdown
  • Custom signals: Can be handled in advanced examples
// Automatic signal handling
engine.Run(ctx)  // Blocks until SIGINT/SIGTERM

// Manual control
engine.Start(ctx)
// ... custom signal handling ...
engine.Stop()

Testing

For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.

Examples

Complete working examples are available in the examples/ directory covering both pre-configured engines and manual component setup.

Monitoring and Health

Health Checks
health := engine.Health()
fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Active Workers: %d\n", health.ActiveWorkers)
for queue, count := range health.QueuedJobs {
	fmt.Printf("Queue %s: %d jobs\n", queue, count)
}
Statistics
stats, err := engine.GetStats().GetGlobalStats(ctx)
if err == nil {
	fmt.Printf("Total Processed: %d\n", stats.TotalProcessed)
	fmt.Printf("Total Failed: %d\n", stats.TotalFailed)
}

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -am 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

Package goworker2 provides a Go-based background job processing library with pluggable components and modular architecture.

Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework supporting multiple queue backends (Redis, RabbitMQ), serializers (JSON, Resque, Sneakers/ActiveJob), and statistics providers.

Architecture

goworker uses dependency injection with these core components:

  • Broker: Handles queue operations (Redis, RabbitMQ)
  • Statistics: Records metrics and monitoring data
  • Registry: Maps job classes to worker functions
  • Serializer: Converts jobs to/from bytes
  • Engine: Orchestrates all components and handles lifecycle

Quick Start with Pre-configured Engines

For Resque compatibility with Redis:

import "github.com/BranchIntl/goworker2/engines"

func emailJob(queue string, args ...interface{}) error {
	// Process email job
	return nil
}

func main() {
	engine := engines.NewResqueEngine(engines.DefaultResqueOptions())
	engine.Register("EmailJob", emailJob)
	engine.Run(context.Background())
}

For ActiveJob compatibility with RabbitMQ:

import "github.com/BranchIntl/goworker2/engines"

func imageProcessor(queue string, args ...interface{}) error {
	// Process image
	return nil
}

func main() {
	engine := engines.NewSneakersEngine(engines.DefaultSneakersOptions())
	engine.Register("ImageProcessor", imageProcessor)
	engine.Run(context.Background())
}

Custom Configuration

For complete control over components:

import (
	"context"
	"github.com/BranchIntl/goworker2/brokers/redis"
	"github.com/BranchIntl/goworker2/core"
	"github.com/BranchIntl/goworker2/registry"
	"github.com/BranchIntl/goworker2/serializers/resque"
	"github.com/BranchIntl/goworker2/statistics/resque"
)

func main() {
	// Create components
	broker := redis.NewBroker(redis.DefaultOptions(), resque.NewSerializer())
	stats := resque.NewStatistics(resque.DefaultOptions())
	reg := registry.NewRegistry()
	serializer := resque.NewSerializer()

	// Create engine
	engine := core.NewEngine(
		broker,    // implements core.Broker
		stats,     // implements core.Statistics
		reg,       // implements core.Registry
		serializer, // implements core.Serializer
		core.WithConcurrency(10),
		core.WithQueues([]string{"critical", "default"}),
	)

	// Register workers
	reg.Register("EmailJob", sendEmail)

	// Start processing
	engine.Run(context.Background())
}

Worker Functions

Worker functions must match this signature:

func(queue string, args ...interface{}) error

Use type assertions to handle arguments:

func processUser(queue string, args ...interface{}) error {
	userID, ok := args[0].(float64)  // JSON numbers are float64
	if !ok {
		return fmt.Errorf("invalid user ID")
	}
	// Process user...
	return nil
}

Signal Handling

The engine.Run() method automatically handles SIGINT and SIGTERM for graceful shutdown. For manual control:

engine.Start(ctx)
// Custom signal handling...
engine.Stop()

Testing

For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.

Available Engines

ResqueEngine: Redis + Resque serializer + Resque statistics - Compatible with Ruby Resque - Uses Redis for queuing and statistics

SneakersEngine: RabbitMQ + Sneakers serializer + NoOp statistics - Compatible with Rails ActiveJob/Sneakers - Uses RabbitMQ for queuing

Health Monitoring

health := engine.Health()
if health.Healthy {
	fmt.Printf("Active workers: %d\n", health.ActiveWorkers)
	for queue, count := range health.QueuedJobs {
		fmt.Printf("Queue %s: %d jobs\n", queue, count)
	}
}

Directories

Path Synopsis
brokers
Package engines provides pre-configured engine setups for common background job processing scenarios.
Package engines provides pre-configured engine setups for common background job processing scenarios.
Package errors provides error types and utilities for the goworker library.
Package errors provides error types and utilities for the goworker library.
examples
internal
serializers
statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL