floxy

package module
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2025 License: Apache-2.0 Imports: 30 Imported by: 0

README

floxy

Go Reference Go Report Card Coverage Status

A Go library for creating and executing workflows with a custom DSL. Implements the Saga pattern with orchestrator approach, providing transaction management and compensation capabilities.

floxy means "flow" + "flux" + "tiny".

Table of Contents

Features

  • Workflow DSL: Declarative workflow definition using Builder pattern
  • Saga Pattern: Orchestrator-based saga implementation with compensation
  • Workflows Versioning: Safe changing flows using versions
  • Transaction Management: Built-in transaction support with rollback capabilities
  • Parallel Execution: Fork/Join patterns for concurrent workflow steps with dynamic wait-for detection
  • Error Handling: Automatic retry mechanisms and failure compensation
  • SavePoints: Rollback to specific points in workflow execution
  • Conditional branching with Condition steps. Smart rollback for parallel flows with condition steps
  • Human-in-the-loop: Interactive workflow steps that pause execution for human decisions
  • Cancel\Abort: Possibility to cancel workflow with rollback to the root step and immediate abort workflow
  • Dead Letter Queue (DLQ): Two modes for error handling - Classic Saga with rollback/compensation or DLQ Mode with paused workflow and manual recovery
  • Distributed Mode: Microservices can register only their handlers; steps without local handlers are returned to queue for other services to process
  • Priority Aging: Prevents queue starvation by gradually increasing step priority as waiting time increases
  • PostgreSQL Storage: Persistent workflow state and event logging
  • SQLite Storage: In-memory and persistent storage (unstable)
  • HTTP API: API for starting, pausing, resuming, and canceling workflows
  • Integration Tests: Fully integrated integration tests using testcontainers
  • Migrations: Embedded database migrations with go:embed

PlantUML diagrams of compensations flow: DIAGRAMS

Engine specification: ENGINE

Ecosystem

  • Web UI for visualizing and managing workflows: Web UI
  • GoLand Plugin: plugin
  • VS Code Extension: extension

Why Floxy?

Floxy is a lightweight, embeddable workflow engine for Go developers. It was born from the idea that not every system needs a full-blown workflow platform like Cadence or Temporal.

1. Lightweight, Not Heavyweight

Most workflow engines require you to deploy multiple services, brokers, and databases just to run a single flow. Floxy is different — it’s a Go library. You import it, initialize an engine, and define your workflow directly in Go code.

No clusters. No queues.

wf, _ := floxy.NewBuilder("order", 1).
    Step("reserve_stock", "stock.Reserve").
    Then("charge_payment", "payment.Charge").
    OnFailure("refund", "payment.Refund").
    Build()

That’s it — a complete Saga with compensation.

2. Pragmatic by Design

Floxy doesn’t try to solve every problem in distributed systems. It focuses on clear, deterministic workflow execution with the tools Go developers already use:

  • PostgreSQL as durable storage
  • Go’s standard net/http for API
  • Structured retries, compensation, and rollback

You don’t need to learn Cadence’s terminology. Everything is plain Go — just like your codebase.

3. Embedded

You can embed Floxy inside any Go service.

Floxy is for developers who love Go, simplicity, and control. No orchestration clusters. No external DSLs. Just workflows — defined in Go, executed anywhere.

Quick Start

package main

import (
    "context"
    "encoding/json"
    "log"
    
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/rom8726/floxy"
)

func main() {
    ctx := context.Background()
    
    // Connect to PostgreSQL
    pool, err := pgxpool.New(ctx, "postgres://floxy:password@localhost:5432/floxy?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Close()
    
    // Run database migrations
    if err := floxy.RunMigrations(ctx, pool); err != nil {
        log.Fatal(err)
    }
    
    // Create engine
    engine := floxy.NewEngine(pool)
	defer engine.Shutdown()
    
    // Register step handlers
    engine.RegisterHandler(&PaymentHandler{})
    engine.RegisterHandler(&InventoryHandler{})
    engine.RegisterHandler(&ShippingHandler{})
    engine.RegisterHandler(&CompensationHandler{})
    
    // Define workflow using Builder DSL
    workflow, err := floxy.NewBuilder("order-processing", 1, floxy.WithDLQEnabled(true)).
        Step("process-payment", "payment", floxy.WithStepMaxRetries(3)).
        OnFailure("refund-payment", "compensation").
        SavePoint("payment-checkpoint").
        Then("reserve-inventory", "inventory", floxy.WithStepMaxRetries(2)).
        OnFailure("release-inventory", "compensation").
        Then("ship-order", "shipping").
        OnFailure("cancel-shipment", "compensation").
        Build()
    if err != nil {
        log.Fatal(err)
    }
    
    // Register and start workflow
    if err := engine.RegisterWorkflow(ctx, workflow); err != nil {
        log.Fatal(err)
    }
    
    order := map[string]any{
        "user_id": "user123",
        "amount":  100.0,
        "items":   []string{"item1", "item2"},
    }
    input, _ := json.Marshal(order)
    
    instanceID, err := engine.Start(ctx, "order-processing-v1", input)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Workflow started: %d", instanceID)
    
    // Process workflow steps
    for {
        empty, err := engine.ExecuteNext(ctx, "worker1")
        if err != nil {
            log.Printf("ExecuteNext error: %v", err)
        }
        if empty {
            break
        }
    }
    
    // Check final status
    status, err := engine.GetStatus(ctx, instanceID)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Workflow status: %s", status)
}

// Step handlers
type PaymentHandler struct{}
func (h *PaymentHandler) Name() string { return "payment" }
func (h *PaymentHandler) Execute(ctx context.Context, stepCtx floxy.StepContext, input json.RawMessage) (json.RawMessage, error) {
    // Process payment logic
    return json.Marshal(map[string]any{"status": "paid"})
}

type InventoryHandler struct{}
func (h *InventoryHandler) Name() string { return "inventory" }
func (h *InventoryHandler) Execute(ctx context.Context, stepCtx floxy.StepContext, input json.RawMessage) (json.RawMessage, error) {
    // Reserve inventory logic
    return json.Marshal(map[string]any{"status": "reserved"})
}

type ShippingHandler struct{}
func (h *ShippingHandler) Name() string { return "shipping" }
func (h *ShippingHandler) Execute(ctx context.Context, stepCtx floxy.StepContext, input json.RawMessage) (json.RawMessage, error) {
    // Ship order logic
    return json.Marshal(map[string]any{"status": "shipped"})
}

type CompensationHandler struct{}
func (h *CompensationHandler) Name() string { return "compensation" }
func (h *CompensationHandler) Execute(ctx context.Context, stepCtx floxy.StepContext, input json.RawMessage) (json.RawMessage, error) {
    // Compensation logic
    return json.Marshal(map[string]any{"status": "compensated"})
}

Examples

See the examples/ directory for complete workflow examples:

  • Hello World: Basic single-step workflow
  • E-commerce: Order processing with compensation flows
  • Data Pipeline: Parallel data processing with Fork/Join patterns
  • Microservices: Complex service orchestration with multiple branches
  • SavePoint Demo: Demonstrates SavePoint functionality and rollback
  • Rollback Demo: Shows full rollback mechanism with OnFailure handlers
  • Human-in-the-loop: Interactive workflows with human decision points

Run examples:

# Start PostgreSQL (required for examples)
make dev-up

# Run all examples
cd examples/hello_world && go run main.go
cd examples/ecommerce && go run main.go
cd examples/data_pipeline && go run main.go
cd examples/microservices && go run main.go
cd examples/savepoint_demo && go run main.go
cd examples/rollback_demo && go run main.go
cd examples/human_in_the_loop__approved && go run main.go
cd examples/human_in_the_loop__rejected && go run main.go

Integration Tests

The library includes comprehensive integration tests using testcontainers:

# Run all tests
go test ./...

# Run only integration tests
go test -v -run TestIntegration

# Run specific integration test
go test -v -run TestIntegration_DataPipeline

Integration tests cover:

  • Data Pipeline: Parallel data processing with multiple sources
  • E-commerce: Order processing with success/failure scenarios
  • Microservices: Complex orchestration with multiple service calls
  • SavePoint Demo: SavePoint functionality with conditional failures
  • Rollback Demo: Full rollback mechanism testing
  • Human-in-the-loop: Make decisions (confirm/reject)

Database Migrations

The library includes embedded database migrations using go:embed. Migrations are automatically applied when using floxy.RunMigrations():

// Run migrations
if err := floxy.RunMigrations(ctx, pool); err != nil {
    log.Fatal(err)
}

Available migrations:

  • 001_initial.up.sql: Initial schema creation
  • 002_add_savepoint_and_rollback.up.sql: SavePoint and rollback support
  • 003_add_compensation_retry_count.up.sql: compensation step status and compensation_retry_count added
  • 004_add_compensation_to_views.up.sql: active_workflows view updated
  • 005_add_idempotency_key_to_steps.up.sql: Idempotency Key added to step table
  • 006_add_human_in_the_loop_step.up.sql: Human-in-the-loop step support and decision tracking
  • 007_add_workflow_cancel_requests_table.up.sql: Cancel requests table
  • 009_add_dead_letter_queue.up.sql: Dead Letter Queue for failed steps

Dead Letter Queue (DLQ)

Overview

Floxy supports two different error handling modes:

  1. Classic Saga Mode (default): When a step fails, the engine performs rollback to the last SavePoint and executes compensation handlers
  2. DLQ Mode: Rollback is disabled, the workflow pauses in dlq state, and failed steps are stored in DLQ for manual investigation
DLQ Mode Behavior

When DLQ is enabled for a workflow:

  • No Rollback: Compensation handlers are not executed on failure
  • Workflow Paused: Instance status → dlq (not terminal, can be resumed)
  • Active Steps Frozen: All running steps are set to paused status
  • Queue Cleared: Instance queue is cleared to prevent further progress
  • Manual Recovery: After fixing issues, use RequeueFromDLQ to resume
Enabling DLQ Mode

Enable DLQ for a workflow during definition:

workflow, err := floxy.NewBuilder("payment-processing", 1, floxy.WithDLQEnabled(true)).
    Step("validate-payment", "payment-validator", floxy.WithStepMaxRetries(2)).
    Then("process-payment", "payment-processor", floxy.WithStepMaxRetries(3)).
    Then("notify-user", "notification-service", floxy.WithStepMaxRetries(1)).
    Build()
Fork/Join with DLQ

When using Fork/Join with DLQ enabled:

  • Parallel Branches: Other branches continue to completion before workflow pauses
  • Join Step: Created as paused when all dependencies are met but the instance is in dlq state
  • Requeue Behavior: After requeuing the failed step, join transitions from pausedpending for automatic continuation
RequeueFromDLQ

The RequeueFromDLQ method restores a failed step from DLQ and resumes workflow execution:

// Requeue with original input
err := engine.RequeueFromDLQ(ctx, dlqID, nil)

// Requeue with modified input
newInput := json.RawMessage(`{"status": "fixed", "data": "corrected"}`)
err := engine.RequeueFromDLQ(ctx, dlqID, &newInput)

What happens during requeue:

  1. Instance status: dlqrunning
  2. Failed step: pausedpending (retry counters reset)
  3. Input updated if newInput provided
  4. Step enqueued for execution
  5. Join steps: pausedpending
  6. DLQ record deleted
Use Cases for DLQ Mode
  • Manual Data Review: Steps that require human inspection before retry
  • External Service Outages: When downstream services are temporarily unavailable
  • Data Quality Issues: Malformed data requiring manual correction
  • Complex Debugging: When failures need detailed investigation
  • Business Approval Workflows: Where failures should pause for review rather than auto-rollback
Example: Payment Processing with DLQ
// Define workflow with DLQ enabled
workflow, err := floxy.NewBuilder("payment-processing", 1, floxy.WithDLQEnabled(true)).
    Step("validate-payment", "payment-validator", floxy.WithStepMaxRetries(2)).
    Then("process-payment", "payment-processor", floxy.WithStepMaxRetries(3)).
    Then("notify-user", "notification-service", floxy.WithStepMaxRetries(1)).
    Build()

// Register and start workflow
err = engine.RegisterWorkflow(ctx, workflow)
instanceID, err := engine.Start(ctx, "payment-processing-v1", input)

// Process workflow - if step fails, workflow goes to dlq state
for {
    empty, err := engine.ExecuteNext(ctx, "worker1")
    if empty || err != nil {
        break
    }
}

// Later: investigate failure in DLQ, fix issue, then requeue
newInput := json.RawMessage(`{"payment_id": "corrected-id", "amount": 100.0}`)
err = engine.RequeueFromDLQ(ctx, dlqID, &newInput)

// Workflow resumes from where it paused

Known Issues

Condition Steps in Forked Branches

When using Condition steps within Fork branches, the JoinStep step may not wait for all dynamically created steps (like else branches) to complete before considering the workflow finished. This can lead to premature workflow completion.

Example of problematic case:

Fork("parallel_branch", func(branch1 *floxy.Builder) {
    branch1.Step("branch1_step1", "handler").
        Condition("branch1_condition", "{{ gt .count 5 }}", func(elseBranch *floxy.Builder) {
            elseBranch.Step("branch1_else", "handler") // This step might not be waited for
        }).
        Then("branch1_next", "handler")
}, func(branch2 *floxy.Builder) {
    branch2.Step("branch2_step1", "handler").
        Condition("branch2_condition", "{{ lt .count 3 }}", func(elseBranch *floxy.Builder) {
            elseBranch.Step("branch2_else", "handler") // This step might not be waited for
        }).
        Then("branch2_next", "handler")
}).
JoinStep("join", []string{"branch1_step1", "branch2_step1"}, floxy.JoinStrategyAll)

SOLVED: Avoid using JoinStep with Condition, use Join instead that dynamically creates waitFor list (virtual steps conception used).

See examples/condition/main.go for a demonstration of this issue.

Installation

go get github.com/rom8726/floxy

Dependencies

  • PostgreSQL database
  • Go 1.24+

Documentation

Index

Constants

View Source
const (
	// Event types
	EventWorkflowStarted           = "workflow_started"
	EventWorkflowCompleted         = "workflow_completed"
	EventWorkflowCancelled         = "workflow_cancelled"
	EventWorkflowAborted           = "workflow_aborted"
	EventStepStarted               = "step_started"
	EventStepCompleted             = "step_completed"
	EventStepRetry                 = "step_retry"
	EventStepFailed                = "step_failed"
	EventForkStarted               = "fork_started"
	EventJoinStateCreated          = "join_state_created"
	EventJoinCheck                 = "join_check"
	EventJoinCompleted             = "join_completed"
	EventJoinUpdated               = "join_updated"
	EventJoinReady                 = "join_ready"
	EventConditionCheck            = "condition_check"
	EventCancellationStarted       = "cancellation_started"
	EventAbortStarted              = "abort_started"
	EventDLQRequeued               = "dlq_requeued"
	EventStepSkippedMissingHandler = "step_skipped_missing_handler"

	// Event data keys
	KeyWorkflowID    = "workflow_id"
	KeyStepName      = "step_name"
	KeyStepType      = "step_type"
	KeyParallelSteps = "parallel_steps"
	KeyJoinStep      = "join_step"
	KeyWaitingFor    = "waiting_for"
	KeyStrategy      = "strategy"
	KeyCompleted     = "completed"
	KeyFailed        = "failed"
	KeyIsReady       = "is_ready"
	KeyOutputs       = "outputs"
	KeyStatus        = "status"
	KeyRetryCount    = "retry_count"
	KeyError         = "error"
	KeyCompletedStep = "completed_step"
	KeySuccess       = "success"
	KeyReason        = "reason"
	KeyResult        = "result"
	KeyDecision      = "decision"
	KeyDecidedBy     = "decided_by"
	KeyMessage       = "message"
	KeyRequestedBy   = "requested_by"
	KeyCancelType    = "cancel_type"
)
View Source
const (
	// MinAgingRate is the minimum allowed value for aging rate (0.0 = no aging)
	MinAgingRate = 0.0
	// MaxAgingRate is the maximum allowed value for aging rate to prevent excessive priority boosts
	MaxAgingRate = 100.0
)

Variables

View Source
var (
	ErrEntityNotFound = errors.New("entity not found")
)

Functions

func CalculateRetryDelay added in v1.9.0

func CalculateRetryDelay(strategy RetryStrategy, baseDelay time.Duration, retryAttempt int) time.Duration

func RunMigrations

func RunMigrations(ctx context.Context, pool *pgxpool.Pool) error

RunMigrations executes all migration files in order

func RunSQLiteMigrations added in v1.13.0

func RunSQLiteMigrations(ctx context.Context, db *sql.DB) error

RunSQLiteMigrations executes embedded SQLite migrations in lexical order within a single transaction.

Types

type ActiveWorkflow added in v1.0.2

type ActiveWorkflow struct {
	InstanceID        int64          `json:"instance_id"`
	WorkflowID        string         `json:"workflow_id"`
	Status            WorkflowStatus `json:"status"`
	CreatedAt         time.Time      `json:"created_at"`
	Duration          time.Duration  `json:"duration"`
	TotalSteps        int            `json:"total_steps"`
	CompletedSteps    int            `json:"completed_steps"`
	FailedSteps       int            `json:"failed_steps"`
	RunningSteps      int            `json:"running_steps"`
	CompensationSteps int            `json:"compensation_steps"`
	RolledBackSteps   int            `json:"rolled_back_steps"`
}

type ActiveWorkflowInstance added in v1.3.0

type ActiveWorkflowInstance struct {
	ID              int64     `json:"id"`
	WorkflowID      string    `json:"workflow_id"`
	WorkflowName    string    `json:"workflow_name"`
	Status          string    `json:"status"`
	StartedAt       time.Time `json:"started_at"`
	UpdatedAt       time.Time `json:"updated_at"`
	CurrentStep     string    `json:"current_step"`
	TotalSteps      int       `json:"total_steps"`
	CompletedSteps  int       `json:"completed_steps"`
	RolledBackSteps int       `json:"rolled_back_steps"`
}

type BasePlugin added in v1.8.0

type BasePlugin struct {
	// contains filtered or unexported fields
}

BasePlugin provides default no-op implementations

func NewBasePlugin added in v1.8.0

func NewBasePlugin(name string, priority Priority) BasePlugin

func (BasePlugin) Name added in v1.8.0

func (p BasePlugin) Name() string

func (BasePlugin) OnRollbackStepChain added in v1.10.0

func (p BasePlugin) OnRollbackStepChain(context.Context, int64, string, int) error

func (BasePlugin) OnStepComplete added in v1.8.0

func (BasePlugin) OnStepFailed added in v1.8.0

func (BasePlugin) OnStepStart added in v1.8.0

func (BasePlugin) OnWorkflowComplete added in v1.8.0

func (p BasePlugin) OnWorkflowComplete(context.Context, *WorkflowInstance) error

func (BasePlugin) OnWorkflowFailed added in v1.8.0

func (p BasePlugin) OnWorkflowFailed(context.Context, *WorkflowInstance) error

func (BasePlugin) OnWorkflowStart added in v1.8.0

func (p BasePlugin) OnWorkflowStart(context.Context, *WorkflowInstance) error

func (BasePlugin) Priority added in v1.8.0

func (p BasePlugin) Priority() Priority

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

func NewBuilder

func NewBuilder(name string, version int, opts ...BuilderOption) *Builder

func (*Builder) Build

func (builder *Builder) Build() (*WorkflowDefinition, error)

func (*Builder) Condition added in v1.2.0

func (builder *Builder) Condition(name, expr string, elseBranch func(elseBranchBuilder *Builder)) *Builder

func (*Builder) Fork

func (builder *Builder) Fork(name string, branches ...func(branch *Builder)) *Builder

func (*Builder) ForkJoin

func (builder *Builder) ForkJoin(
	forkName string,
	branches []func(branch *Builder),
	joinName string,
	joinStrategy JoinStrategy,
) *Builder

func (*Builder) Join added in v1.10.0

func (builder *Builder) Join(name string, strategy JoinStrategy) *Builder

func (*Builder) JoinStep

func (builder *Builder) JoinStep(name string, waitFor []string, strategy JoinStrategy) *Builder

JoinStep should be used without Condition. Use Join instead, which automatically handles dynamic step detection in fork branches. JoinStep with an explicit waitFor list does not account for dynamically created steps (e.g., Condition else branches).

func (*Builder) OnFailure

func (builder *Builder) OnFailure(name, handler string, opts ...StepOption) *Builder

func (*Builder) Parallel

func (builder *Builder) Parallel(name string, tasks ...*StepDefinition) *Builder

func (*Builder) SavePoint

func (builder *Builder) SavePoint(name string) *Builder

func (*Builder) Step

func (builder *Builder) Step(name, handler string, opts ...StepOption) *Builder

func (*Builder) Then

func (builder *Builder) Then(name, handler string, opts ...StepOption) *Builder

func (*Builder) WaitHumanConfirm added in v1.5.0

func (builder *Builder) WaitHumanConfirm(name string, opts ...StepOption) *Builder

type BuilderOption

type BuilderOption func(builder *Builder)

func WithBuilderMaxRetries

func WithBuilderMaxRetries(maxRetries int) BuilderOption

func WithDLQEnabled added in v1.9.0

func WithDLQEnabled(enabled bool) BuilderOption

WithDLQEnabled enables or disables Dead Letter Queue mode for the workflow. When enabled, failed steps will be sent to DLQ and the engine will skip rollback/compensation.

type CancelType added in v1.6.1

type CancelType string
const (
	CancelTypeCancel CancelType = "cancel"
	CancelTypeAbort  CancelType = "abort"
)

type CleanupService added in v1.0.2

type CleanupService struct {
	// contains filtered or unexported fields
}

func NewCleanupService added in v1.0.2

func NewCleanupService(pool *pgxpool.Pool) *CleanupService

func (*CleanupService) CleanupOldWorkflows added in v1.0.2

func (c *CleanupService) CleanupOldWorkflows(ctx context.Context, olderThan time.Duration) (int64, error)

type DeadLetterRecord added in v1.9.0

type DeadLetterRecord struct {
	ID         int64           `json:"id"`
	InstanceID int64           `json:"instance_id"`
	WorkflowID string          `json:"workflow_id"`
	StepID     int64           `json:"step_id"`
	StepName   string          `json:"step_name"`
	StepType   string          `json:"step_type"`
	Input      json.RawMessage `json:"input"`
	Error      *string         `json:"error"`
	Reason     string          `json:"reason"`
	CreatedAt  time.Time       `json:"created_at"`
}

DeadLetterRecord represents a record stored in the Dead Letter Queue used to resume failed steps later.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(pool *pgxpool.Pool, opts ...EngineOption) *Engine

func (*Engine) AbortWorkflow added in v1.6.0

func (engine *Engine) AbortWorkflow(ctx context.Context, instanceID int64, requestedBy, reason string) error

func (*Engine) CancelWorkflow added in v1.6.0

func (engine *Engine) CancelWorkflow(ctx context.Context, instanceID int64, requestedBy, reason string) error

func (*Engine) ExecuteNext

func (engine *Engine) ExecuteNext(ctx context.Context, workerID string) (empty bool, err error)

func (*Engine) GetStatus

func (engine *Engine) GetStatus(ctx context.Context, instanceID int64) (WorkflowStatus, error)

func (*Engine) GetSteps

func (engine *Engine) GetSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*Engine) HumanDecisionWaitingEvents added in v1.8.0

func (engine *Engine) HumanDecisionWaitingEvents() <-chan HumanDecisionWaitingEvent

func (*Engine) MakeHumanDecision added in v1.5.0

func (engine *Engine) MakeHumanDecision(
	ctx context.Context,
	stepID int64,
	decidedBy string,
	decision HumanDecision,
	comment *string,
) error

func (*Engine) RegisterHandler

func (engine *Engine) RegisterHandler(handler StepHandler)

func (*Engine) RegisterPlugin added in v1.8.0

func (engine *Engine) RegisterPlugin(plugin Plugin)

func (*Engine) RegisterWorkflow

func (engine *Engine) RegisterWorkflow(ctx context.Context, def *WorkflowDefinition) error

func (*Engine) RequeueFromDLQ added in v1.9.0

func (engine *Engine) RequeueFromDLQ(ctx context.Context, dlqID int64, newInput *json.RawMessage) error

RequeueFromDLQ extracts a record from the DLQ and re-enqueues its step. If newInput is non-nil, it will be used as the step input before enqueueing.

func (*Engine) Shutdown added in v1.6.1

func (engine *Engine) Shutdown()

func (*Engine) Start

func (engine *Engine) Start(ctx context.Context, workflowID string, input json.RawMessage) (int64, error)

type EngineOption added in v1.6.1

type EngineOption func(engine *Engine)

func WithEngineCancelInterval added in v1.6.1

func WithEngineCancelInterval(interval time.Duration) EngineOption

func WithEnginePluginManager added in v1.8.0

func WithEnginePluginManager(pluginManager *PluginManager) EngineOption

func WithEngineStore added in v1.6.3

func WithEngineStore(store Store) EngineOption

func WithEngineTxManager added in v1.6.1

func WithEngineTxManager(txManager TxManager) EngineOption

func WithMissingHandlerCooldown added in v1.11.0

func WithMissingHandlerCooldown(d time.Duration) EngineOption

WithMissingHandlerCooldown Distributed missing-handler behavior options

func WithMissingHandlerJitterPct added in v1.11.0

func WithMissingHandlerJitterPct(pct float64) EngineOption

WithMissingHandlerJitterPct Percent in [0,1], e.g. 0.2 = +/-20% jitter

func WithMissingHandlerLogThrottle added in v1.11.0

func WithMissingHandlerLogThrottle(d time.Duration) EngineOption

func WithQueueAgingEnabled added in v1.11.2

func WithQueueAgingEnabled(enabled bool) EngineOption

WithQueueAgingEnabled toggles SQL-side priority aging in dequeue ordering.

func WithQueueAgingRate added in v1.11.2

func WithQueueAgingRate(rate float64) EngineOption

WithQueueAgingRate sets the points-per-second rate for priority aging (e.g., 0.5). Effective priority is min(100, priority + floor(wait_seconds * rate)).

type GraphDefinition

type GraphDefinition struct {
	Steps      map[string]*StepDefinition `json:"steps"`
	Start      string                     `json:"start"`
	DLQEnabled bool                       `json:"dlq_enabled"`
}

type HumanDecision added in v1.5.0

type HumanDecision string
const (
	HumanDecisionConfirmed HumanDecision = "confirmed"
	HumanDecisionRejected  HumanDecision = "rejected"
)

type HumanDecisionRecord added in v1.5.0

type HumanDecisionRecord struct {
	ID         int64         `json:"id"`
	InstanceID int64         `json:"instance_id"`
	StepID     int64         `json:"step_id"`
	DecidedBy  string        `json:"decided_by"`
	Decision   HumanDecision `json:"decision"`
	Comment    *string       `json:"comment"`
	DecidedAt  time.Time     `json:"decided_at"`
	CreatedAt  time.Time     `json:"created_at"`
}

type HumanDecisionWaitingEvent added in v1.8.0

type HumanDecisionWaitingEvent struct {
	InstanceID int64           `json:"instance_id"`
	OutputData json.RawMessage `json:"output_data"`
}

type IEngine added in v1.5.0

type IEngine interface {
	MakeHumanDecision(
		ctx context.Context,
		stepID int64,
		decidedBy string,
		decision HumanDecision,
		comment *string,
	) error
	CancelWorkflow(
		ctx context.Context,
		instanceID int64,
		requestedBy string,
		reason string,
	) error
	AbortWorkflow(
		ctx context.Context,
		instanceID int64,
		requestedBy string,
		reason string,
	) error
	// RequeueFromDLQ extracts a record from DLQ and enqueues the step again.
	// If newInput is provided, it will override step input before enqueueing.
	RequeueFromDLQ(
		ctx context.Context,
		dlqID int64,
		newInput *json.RawMessage,
	) error
}

type JSONHandler added in v1.0.2

type JSONHandler struct {
	// contains filtered or unexported fields
}

func NewJSONHandler added in v1.0.2

func NewJSONHandler(
	name string,
	fn func(ctx context.Context, stepCtx StepContext, data map[string]any) (map[string]any, error),
) *JSONHandler

func (*JSONHandler) Execute added in v1.0.2

func (h *JSONHandler) Execute(
	ctx context.Context,
	stepCtx StepContext,
	input json.RawMessage,
) (json.RawMessage, error)

func (*JSONHandler) Name added in v1.0.2

func (h *JSONHandler) Name() string

type JoinState

type JoinState struct {
	InstanceID   int64        `json:"instance_id"`
	JoinStepName string       `json:"join_step_name"`
	WaitingFor   []string     `json:"waiting_for"`
	Completed    []string     `json:"completed"`
	Failed       []string     `json:"failed"`
	JoinStrategy JoinStrategy `json:"join_strategy"`
	IsReady      bool         `json:"is_ready"`
	CreatedAt    time.Time    `json:"created_at"`
	UpdatedAt    time.Time    `json:"updated_at"`
}

type JoinStrategy

type JoinStrategy string
const (
	JoinStrategyAll JoinStrategy = "all"
	JoinStrategyAny JoinStrategy = "any"
)

type MemoryStore added in v1.12.0

type MemoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStore added in v1.12.0

func NewMemoryStore() *MemoryStore

func (*MemoryStore) AddToJoinWaitFor added in v1.12.0

func (s *MemoryStore) AddToJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, stepToAdd string) error

func (*MemoryStore) CleanupOldWorkflows added in v1.12.0

func (s *MemoryStore) CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error)

func (*MemoryStore) CreateCancelRequest added in v1.12.0

func (s *MemoryStore) CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error

func (*MemoryStore) CreateDeadLetterRecord added in v1.12.0

func (s *MemoryStore) CreateDeadLetterRecord(ctx context.Context, rec *DeadLetterRecord) error

func (*MemoryStore) CreateHumanDecision added in v1.12.0

func (s *MemoryStore) CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error

func (*MemoryStore) CreateInstance added in v1.12.0

func (s *MemoryStore) CreateInstance(
	ctx context.Context,
	workflowID string,
	input json.RawMessage,
) (*WorkflowInstance, error)

func (*MemoryStore) CreateJoinState added in v1.12.0

func (s *MemoryStore) CreateJoinState(
	ctx context.Context,
	instanceID int64,
	joinStepName string,
	waitingFor []string,
	strategy JoinStrategy,
) error

func (*MemoryStore) CreateStep added in v1.12.0

func (s *MemoryStore) CreateStep(ctx context.Context, step *WorkflowStep) error

func (*MemoryStore) DeleteCancelRequest added in v1.12.0

func (s *MemoryStore) DeleteCancelRequest(ctx context.Context, instanceID int64) error

func (*MemoryStore) DequeueStep added in v1.12.0

func (s *MemoryStore) DequeueStep(ctx context.Context, workerID string) (*QueueItem, error)

func (*MemoryStore) EnqueueStep added in v1.12.0

func (s *MemoryStore) EnqueueStep(
	ctx context.Context,
	instanceID int64,
	stepID *int64,
	priority Priority,
	delay time.Duration,
) error

func (*MemoryStore) GetActiveInstances added in v1.12.0

func (s *MemoryStore) GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error)

func (*MemoryStore) GetActiveStepsForUpdate added in v1.12.0

func (s *MemoryStore) GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*MemoryStore) GetAllWorkflowInstances added in v1.12.0

func (s *MemoryStore) GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error)

func (*MemoryStore) GetCancelRequest added in v1.12.0

func (s *MemoryStore) GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error)

func (*MemoryStore) GetDeadLetterByID added in v1.12.0

func (s *MemoryStore) GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error)

func (*MemoryStore) GetHumanDecision added in v1.12.0

func (s *MemoryStore) GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error)

func (*MemoryStore) GetHumanDecisionStepByInstanceID added in v1.12.0

func (s *MemoryStore) GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error)

func (*MemoryStore) GetInstance added in v1.12.0

func (s *MemoryStore) GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error)

func (*MemoryStore) GetJoinState added in v1.12.0

func (s *MemoryStore) GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error)

func (*MemoryStore) GetStepByID added in v1.12.0

func (s *MemoryStore) GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error)

func (*MemoryStore) GetStepsByInstance added in v1.12.0

func (s *MemoryStore) GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*MemoryStore) GetSummaryStats added in v1.12.0

func (s *MemoryStore) GetSummaryStats(ctx context.Context) (*SummaryStats, error)

func (*MemoryStore) GetWorkflowDefinition added in v1.12.0

func (s *MemoryStore) GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error)

func (*MemoryStore) GetWorkflowDefinitions added in v1.12.0

func (s *MemoryStore) GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error)

func (*MemoryStore) GetWorkflowEvents added in v1.12.0

func (s *MemoryStore) GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error)

func (*MemoryStore) GetWorkflowInstances added in v1.12.0

func (s *MemoryStore) GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error)

func (*MemoryStore) GetWorkflowStats added in v1.12.0

func (s *MemoryStore) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)

func (*MemoryStore) GetWorkflowSteps added in v1.12.0

func (s *MemoryStore) GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*MemoryStore) ListDeadLetters added in v1.12.0

func (s *MemoryStore) ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error)

func (*MemoryStore) LogEvent added in v1.12.0

func (s *MemoryStore) LogEvent(
	ctx context.Context,
	instanceID int64,
	stepID *int64,
	eventType string,
	payload any,
) error

func (*MemoryStore) PauseActiveStepsAndClearQueue added in v1.12.0

func (s *MemoryStore) PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error

func (*MemoryStore) ReleaseQueueItem added in v1.12.0

func (s *MemoryStore) ReleaseQueueItem(ctx context.Context, queueID int64) error

func (*MemoryStore) RemoveFromQueue added in v1.12.0

func (s *MemoryStore) RemoveFromQueue(ctx context.Context, queueID int64) error

func (*MemoryStore) ReplaceInJoinWaitFor added in v1.12.0

func (s *MemoryStore) ReplaceInJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, virtualStep, realStep string) error

func (*MemoryStore) RequeueDeadLetter added in v1.12.0

func (s *MemoryStore) RequeueDeadLetter(
	ctx context.Context,
	dlqID int64,
	newInput *json.RawMessage,
) error

func (*MemoryStore) RescheduleAndReleaseQueueItem added in v1.12.0

func (s *MemoryStore) RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error

func (*MemoryStore) SaveWorkflowDefinition added in v1.12.0

func (s *MemoryStore) SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error

func (*MemoryStore) SetAgingEnabled added in v1.12.0

func (s *MemoryStore) SetAgingEnabled(enabled bool)

func (*MemoryStore) SetAgingRate added in v1.12.0

func (s *MemoryStore) SetAgingRate(rate float64)

func (*MemoryStore) UpdateInstanceStatus added in v1.12.0

func (s *MemoryStore) UpdateInstanceStatus(
	ctx context.Context,
	instanceID int64,
	status WorkflowStatus,
	output json.RawMessage,
	errMsg *string,
) error

func (*MemoryStore) UpdateJoinState added in v1.12.0

func (s *MemoryStore) UpdateJoinState(
	ctx context.Context,
	instanceID int64,
	joinStepName, completedStep string,
	success bool,
) (bool, error)

func (*MemoryStore) UpdateStep added in v1.12.0

func (s *MemoryStore) UpdateStep(
	ctx context.Context,
	stepID int64,
	status StepStatus,
	output json.RawMessage,
	errMsg *string,
) error

func (*MemoryStore) UpdateStepCompensationRetry added in v1.12.0

func (s *MemoryStore) UpdateStepCompensationRetry(
	ctx context.Context,
	stepID int64,
	retryCount int,
	status StepStatus,
) error

func (*MemoryStore) UpdateStepStatus added in v1.12.0

func (s *MemoryStore) UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error

type MemoryTxManager added in v1.12.0

type MemoryTxManager struct{}

func NewMemoryTxManager added in v1.12.0

func NewMemoryTxManager() *MemoryTxManager

func (*MemoryTxManager) ReadCommitted added in v1.12.0

func (m *MemoryTxManager) ReadCommitted(ctx context.Context, fn func(ctx context.Context) error) error

func (*MemoryTxManager) RepeatableRead added in v1.12.0

func (m *MemoryTxManager) RepeatableRead(ctx context.Context, fn func(ctx context.Context) error) error

type MockIEngine added in v1.5.0

type MockIEngine struct {
	mock.Mock
}

MockIEngine is an autogenerated mock type for the IEngine type

func NewMockIEngine added in v1.5.0

func NewMockIEngine(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockIEngine

NewMockIEngine creates a new instance of MockIEngine. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockIEngine) AbortWorkflow added in v1.6.0

func (_mock *MockIEngine) AbortWorkflow(ctx context.Context, instanceID int64, requestedBy string, reason string) error

AbortWorkflow provides a mock function for the type MockIEngine

func (*MockIEngine) CancelWorkflow added in v1.6.0

func (_mock *MockIEngine) CancelWorkflow(ctx context.Context, instanceID int64, requestedBy string, reason string) error

CancelWorkflow provides a mock function for the type MockIEngine

func (*MockIEngine) EXPECT added in v1.5.0

func (_m *MockIEngine) EXPECT() *MockIEngine_Expecter

func (*MockIEngine) MakeHumanDecision added in v1.5.0

func (_mock *MockIEngine) MakeHumanDecision(ctx context.Context, stepID int64, decidedBy string, decision HumanDecision, comment *string) error

MakeHumanDecision provides a mock function for the type MockIEngine

func (*MockIEngine) RequeueFromDLQ added in v1.9.0

func (_mock *MockIEngine) RequeueFromDLQ(ctx context.Context, dlqID int64, newInput *json.RawMessage) error

RequeueFromDLQ provides a mock function for the type MockIEngine

type MockIEngine_AbortWorkflow_Call added in v1.6.0

type MockIEngine_AbortWorkflow_Call struct {
	*mock.Call
}

MockIEngine_AbortWorkflow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AbortWorkflow'

func (*MockIEngine_AbortWorkflow_Call) Return added in v1.6.0

func (*MockIEngine_AbortWorkflow_Call) Run added in v1.6.0

func (_c *MockIEngine_AbortWorkflow_Call) Run(run func(ctx context.Context, instanceID int64, requestedBy string, reason string)) *MockIEngine_AbortWorkflow_Call

func (*MockIEngine_AbortWorkflow_Call) RunAndReturn added in v1.6.0

func (_c *MockIEngine_AbortWorkflow_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, requestedBy string, reason string) error) *MockIEngine_AbortWorkflow_Call

type MockIEngine_CancelWorkflow_Call added in v1.6.0

type MockIEngine_CancelWorkflow_Call struct {
	*mock.Call
}

MockIEngine_CancelWorkflow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CancelWorkflow'

func (*MockIEngine_CancelWorkflow_Call) Return added in v1.6.0

func (*MockIEngine_CancelWorkflow_Call) Run added in v1.6.0

func (_c *MockIEngine_CancelWorkflow_Call) Run(run func(ctx context.Context, instanceID int64, requestedBy string, reason string)) *MockIEngine_CancelWorkflow_Call

func (*MockIEngine_CancelWorkflow_Call) RunAndReturn added in v1.6.0

func (_c *MockIEngine_CancelWorkflow_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, requestedBy string, reason string) error) *MockIEngine_CancelWorkflow_Call

type MockIEngine_Expecter added in v1.5.0

type MockIEngine_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockIEngine_Expecter) AbortWorkflow added in v1.6.0

func (_e *MockIEngine_Expecter) AbortWorkflow(ctx interface{}, instanceID interface{}, requestedBy interface{}, reason interface{}) *MockIEngine_AbortWorkflow_Call

AbortWorkflow is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • requestedBy string
  • reason string

func (*MockIEngine_Expecter) CancelWorkflow added in v1.6.0

func (_e *MockIEngine_Expecter) CancelWorkflow(ctx interface{}, instanceID interface{}, requestedBy interface{}, reason interface{}) *MockIEngine_CancelWorkflow_Call

CancelWorkflow is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • requestedBy string
  • reason string

func (*MockIEngine_Expecter) MakeHumanDecision added in v1.5.0

func (_e *MockIEngine_Expecter) MakeHumanDecision(ctx interface{}, stepID interface{}, decidedBy interface{}, decision interface{}, comment interface{}) *MockIEngine_MakeHumanDecision_Call

MakeHumanDecision is a helper method to define mock.On call

  • ctx context.Context
  • stepID int64
  • decidedBy string
  • decision HumanDecision
  • comment *string

func (*MockIEngine_Expecter) RequeueFromDLQ added in v1.9.0

func (_e *MockIEngine_Expecter) RequeueFromDLQ(ctx interface{}, dlqID interface{}, newInput interface{}) *MockIEngine_RequeueFromDLQ_Call

RequeueFromDLQ is a helper method to define mock.On call

  • ctx context.Context
  • dlqID int64
  • newInput *json.RawMessage

type MockIEngine_MakeHumanDecision_Call added in v1.5.0

type MockIEngine_MakeHumanDecision_Call struct {
	*mock.Call
}

MockIEngine_MakeHumanDecision_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MakeHumanDecision'

func (*MockIEngine_MakeHumanDecision_Call) Return added in v1.5.0

func (*MockIEngine_MakeHumanDecision_Call) Run added in v1.5.0

func (_c *MockIEngine_MakeHumanDecision_Call) Run(run func(ctx context.Context, stepID int64, decidedBy string, decision HumanDecision, comment *string)) *MockIEngine_MakeHumanDecision_Call

func (*MockIEngine_MakeHumanDecision_Call) RunAndReturn added in v1.5.0

func (_c *MockIEngine_MakeHumanDecision_Call) RunAndReturn(run func(ctx context.Context, stepID int64, decidedBy string, decision HumanDecision, comment *string) error) *MockIEngine_MakeHumanDecision_Call

type MockIEngine_RequeueFromDLQ_Call added in v1.9.0

type MockIEngine_RequeueFromDLQ_Call struct {
	*mock.Call
}

MockIEngine_RequeueFromDLQ_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RequeueFromDLQ'

func (*MockIEngine_RequeueFromDLQ_Call) Return added in v1.9.0

func (*MockIEngine_RequeueFromDLQ_Call) Run added in v1.9.0

func (*MockIEngine_RequeueFromDLQ_Call) RunAndReturn added in v1.9.0

func (_c *MockIEngine_RequeueFromDLQ_Call) RunAndReturn(run func(ctx context.Context, dlqID int64, newInput *json.RawMessage) error) *MockIEngine_RequeueFromDLQ_Call

type MockMonitor added in v1.5.0

type MockMonitor struct {
	mock.Mock
}

MockMonitor is an autogenerated mock type for the Monitor type

func NewMockMonitor added in v1.5.0

func NewMockMonitor(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockMonitor

NewMockMonitor creates a new instance of MockMonitor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockMonitor) EXPECT added in v1.5.0

func (_m *MockMonitor) EXPECT() *MockMonitor_Expecter

func (*MockMonitor) GetWorkflowStats added in v1.5.0

func (_mock *MockMonitor) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)

GetWorkflowStats provides a mock function for the type MockMonitor

type MockMonitor_Expecter added in v1.5.0

type MockMonitor_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockMonitor_Expecter) GetWorkflowStats added in v1.5.0

func (_e *MockMonitor_Expecter) GetWorkflowStats(ctx interface{}) *MockMonitor_GetWorkflowStats_Call

GetWorkflowStats is a helper method to define mock.On call

  • ctx context.Context

type MockMonitor_GetWorkflowStats_Call added in v1.5.0

type MockMonitor_GetWorkflowStats_Call struct {
	*mock.Call
}

MockMonitor_GetWorkflowStats_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowStats'

func (*MockMonitor_GetWorkflowStats_Call) Return added in v1.5.0

func (*MockMonitor_GetWorkflowStats_Call) Run added in v1.5.0

func (*MockMonitor_GetWorkflowStats_Call) RunAndReturn added in v1.5.0

type MockPlugin added in v1.8.0

type MockPlugin struct {
	mock.Mock
}

MockPlugin is an autogenerated mock type for the Plugin type

func NewMockPlugin added in v1.8.0

func NewMockPlugin(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockPlugin

NewMockPlugin creates a new instance of MockPlugin. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockPlugin) EXPECT added in v1.8.0

func (_m *MockPlugin) EXPECT() *MockPlugin_Expecter

func (*MockPlugin) Name added in v1.8.0

func (_mock *MockPlugin) Name() string

Name provides a mock function for the type MockPlugin

func (*MockPlugin) OnRollbackStepChain added in v1.10.0

func (_mock *MockPlugin) OnRollbackStepChain(ctx context.Context, instanceID int64, stepName string, depth int) error

OnRollbackStepChain provides a mock function for the type MockPlugin

func (*MockPlugin) OnStepComplete added in v1.8.0

func (_mock *MockPlugin) OnStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error

OnStepComplete provides a mock function for the type MockPlugin

func (*MockPlugin) OnStepFailed added in v1.8.0

func (_mock *MockPlugin) OnStepFailed(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep, err error) error

OnStepFailed provides a mock function for the type MockPlugin

func (*MockPlugin) OnStepStart added in v1.8.0

func (_mock *MockPlugin) OnStepStart(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error

OnStepStart provides a mock function for the type MockPlugin

func (*MockPlugin) OnWorkflowComplete added in v1.8.0

func (_mock *MockPlugin) OnWorkflowComplete(ctx context.Context, instance *WorkflowInstance) error

OnWorkflowComplete provides a mock function for the type MockPlugin

func (*MockPlugin) OnWorkflowFailed added in v1.8.0

func (_mock *MockPlugin) OnWorkflowFailed(ctx context.Context, instance *WorkflowInstance) error

OnWorkflowFailed provides a mock function for the type MockPlugin

func (*MockPlugin) OnWorkflowStart added in v1.8.0

func (_mock *MockPlugin) OnWorkflowStart(ctx context.Context, instance *WorkflowInstance) error

OnWorkflowStart provides a mock function for the type MockPlugin

func (*MockPlugin) Priority added in v1.8.0

func (_mock *MockPlugin) Priority() Priority

Priority provides a mock function for the type MockPlugin

type MockPlugin_Expecter added in v1.8.0

type MockPlugin_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockPlugin_Expecter) Name added in v1.8.0

Name is a helper method to define mock.On call

func (*MockPlugin_Expecter) OnRollbackStepChain added in v1.10.0

func (_e *MockPlugin_Expecter) OnRollbackStepChain(ctx interface{}, instanceID interface{}, stepName interface{}, depth interface{}) *MockPlugin_OnRollbackStepChain_Call

OnRollbackStepChain is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • stepName string
  • depth int

func (*MockPlugin_Expecter) OnStepComplete added in v1.8.0

func (_e *MockPlugin_Expecter) OnStepComplete(ctx interface{}, instance interface{}, step interface{}) *MockPlugin_OnStepComplete_Call

OnStepComplete is a helper method to define mock.On call

  • ctx context.Context
  • instance *WorkflowInstance
  • step *WorkflowStep

func (*MockPlugin_Expecter) OnStepFailed added in v1.8.0

func (_e *MockPlugin_Expecter) OnStepFailed(ctx interface{}, instance interface{}, step interface{}, err interface{}) *MockPlugin_OnStepFailed_Call

OnStepFailed is a helper method to define mock.On call

  • ctx context.Context
  • instance *WorkflowInstance
  • step *WorkflowStep
  • err error

func (*MockPlugin_Expecter) OnStepStart added in v1.8.0

func (_e *MockPlugin_Expecter) OnStepStart(ctx interface{}, instance interface{}, step interface{}) *MockPlugin_OnStepStart_Call

OnStepStart is a helper method to define mock.On call

  • ctx context.Context
  • instance *WorkflowInstance
  • step *WorkflowStep

func (*MockPlugin_Expecter) OnWorkflowComplete added in v1.8.0

func (_e *MockPlugin_Expecter) OnWorkflowComplete(ctx interface{}, instance interface{}) *MockPlugin_OnWorkflowComplete_Call

OnWorkflowComplete is a helper method to define mock.On call

  • ctx context.Context
  • instance *WorkflowInstance

func (*MockPlugin_Expecter) OnWorkflowFailed added in v1.8.0

func (_e *MockPlugin_Expecter) OnWorkflowFailed(ctx interface{}, instance interface{}) *MockPlugin_OnWorkflowFailed_Call

OnWorkflowFailed is a helper method to define mock.On call

  • ctx context.Context
  • instance *WorkflowInstance

func (*MockPlugin_Expecter) OnWorkflowStart added in v1.8.0

func (_e *MockPlugin_Expecter) OnWorkflowStart(ctx interface{}, instance interface{}) *MockPlugin_OnWorkflowStart_Call

OnWorkflowStart is a helper method to define mock.On call

  • ctx context.Context
  • instance *WorkflowInstance

func (*MockPlugin_Expecter) Priority added in v1.8.0

Priority is a helper method to define mock.On call

type MockPlugin_Name_Call added in v1.8.0

type MockPlugin_Name_Call struct {
	*mock.Call
}

MockPlugin_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'

func (*MockPlugin_Name_Call) Return added in v1.8.0

func (*MockPlugin_Name_Call) Run added in v1.8.0

func (_c *MockPlugin_Name_Call) Run(run func()) *MockPlugin_Name_Call

func (*MockPlugin_Name_Call) RunAndReturn added in v1.8.0

func (_c *MockPlugin_Name_Call) RunAndReturn(run func() string) *MockPlugin_Name_Call

type MockPlugin_OnRollbackStepChain_Call added in v1.10.0

type MockPlugin_OnRollbackStepChain_Call struct {
	*mock.Call
}

MockPlugin_OnRollbackStepChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnRollbackStepChain'

func (*MockPlugin_OnRollbackStepChain_Call) Return added in v1.10.0

func (*MockPlugin_OnRollbackStepChain_Call) Run added in v1.10.0

func (_c *MockPlugin_OnRollbackStepChain_Call) Run(run func(ctx context.Context, instanceID int64, stepName string, depth int)) *MockPlugin_OnRollbackStepChain_Call

func (*MockPlugin_OnRollbackStepChain_Call) RunAndReturn added in v1.10.0

func (_c *MockPlugin_OnRollbackStepChain_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, stepName string, depth int) error) *MockPlugin_OnRollbackStepChain_Call

type MockPlugin_OnStepComplete_Call added in v1.8.0

type MockPlugin_OnStepComplete_Call struct {
	*mock.Call
}

MockPlugin_OnStepComplete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnStepComplete'

func (*MockPlugin_OnStepComplete_Call) Return added in v1.8.0

func (*MockPlugin_OnStepComplete_Call) Run added in v1.8.0

func (*MockPlugin_OnStepComplete_Call) RunAndReturn added in v1.8.0

type MockPlugin_OnStepFailed_Call added in v1.8.0

type MockPlugin_OnStepFailed_Call struct {
	*mock.Call
}

MockPlugin_OnStepFailed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnStepFailed'

func (*MockPlugin_OnStepFailed_Call) Return added in v1.8.0

func (*MockPlugin_OnStepFailed_Call) Run added in v1.8.0

func (*MockPlugin_OnStepFailed_Call) RunAndReturn added in v1.8.0

func (_c *MockPlugin_OnStepFailed_Call) RunAndReturn(run func(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep, err error) error) *MockPlugin_OnStepFailed_Call

type MockPlugin_OnStepStart_Call added in v1.8.0

type MockPlugin_OnStepStart_Call struct {
	*mock.Call
}

MockPlugin_OnStepStart_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnStepStart'

func (*MockPlugin_OnStepStart_Call) Return added in v1.8.0

func (*MockPlugin_OnStepStart_Call) Run added in v1.8.0

func (*MockPlugin_OnStepStart_Call) RunAndReturn added in v1.8.0

type MockPlugin_OnWorkflowComplete_Call added in v1.8.0

type MockPlugin_OnWorkflowComplete_Call struct {
	*mock.Call
}

MockPlugin_OnWorkflowComplete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnWorkflowComplete'

func (*MockPlugin_OnWorkflowComplete_Call) Return added in v1.8.0

func (*MockPlugin_OnWorkflowComplete_Call) Run added in v1.8.0

func (*MockPlugin_OnWorkflowComplete_Call) RunAndReturn added in v1.8.0

type MockPlugin_OnWorkflowFailed_Call added in v1.8.0

type MockPlugin_OnWorkflowFailed_Call struct {
	*mock.Call
}

MockPlugin_OnWorkflowFailed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnWorkflowFailed'

func (*MockPlugin_OnWorkflowFailed_Call) Return added in v1.8.0

func (*MockPlugin_OnWorkflowFailed_Call) Run added in v1.8.0

func (*MockPlugin_OnWorkflowFailed_Call) RunAndReturn added in v1.8.0

type MockPlugin_OnWorkflowStart_Call added in v1.8.0

type MockPlugin_OnWorkflowStart_Call struct {
	*mock.Call
}

MockPlugin_OnWorkflowStart_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnWorkflowStart'

func (*MockPlugin_OnWorkflowStart_Call) Return added in v1.8.0

func (*MockPlugin_OnWorkflowStart_Call) Run added in v1.8.0

func (*MockPlugin_OnWorkflowStart_Call) RunAndReturn added in v1.8.0

type MockPlugin_Priority_Call added in v1.8.0

type MockPlugin_Priority_Call struct {
	*mock.Call
}

MockPlugin_Priority_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Priority'

func (*MockPlugin_Priority_Call) Return added in v1.8.0

func (*MockPlugin_Priority_Call) Run added in v1.8.0

func (*MockPlugin_Priority_Call) RunAndReturn added in v1.8.0

func (_c *MockPlugin_Priority_Call) RunAndReturn(run func() Priority) *MockPlugin_Priority_Call

type MockStepContext added in v1.5.0

type MockStepContext struct {
	mock.Mock
}

MockStepContext is an autogenerated mock type for the StepContext type

func NewMockStepContext added in v1.5.0

func NewMockStepContext(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockStepContext

NewMockStepContext creates a new instance of MockStepContext. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockStepContext) CloneData added in v1.5.0

func (_mock *MockStepContext) CloneData() map[string]any

CloneData provides a mock function for the type MockStepContext

func (*MockStepContext) EXPECT added in v1.5.0

func (*MockStepContext) GetVariable added in v1.5.0

func (_mock *MockStepContext) GetVariable(key string) (any, bool)

GetVariable provides a mock function for the type MockStepContext

func (*MockStepContext) GetVariableAsString added in v1.5.0

func (_mock *MockStepContext) GetVariableAsString(key string) (string, bool)

GetVariableAsString provides a mock function for the type MockStepContext

func (*MockStepContext) IdempotencyKey added in v1.5.0

func (_mock *MockStepContext) IdempotencyKey() string

IdempotencyKey provides a mock function for the type MockStepContext

func (*MockStepContext) InstanceID added in v1.5.0

func (_mock *MockStepContext) InstanceID() int64

InstanceID provides a mock function for the type MockStepContext

func (*MockStepContext) RetryCount added in v1.5.0

func (_mock *MockStepContext) RetryCount() int

RetryCount provides a mock function for the type MockStepContext

func (*MockStepContext) StepName added in v1.5.0

func (_mock *MockStepContext) StepName() string

StepName provides a mock function for the type MockStepContext

type MockStepContext_CloneData_Call added in v1.5.0

type MockStepContext_CloneData_Call struct {
	*mock.Call
}

MockStepContext_CloneData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CloneData'

func (*MockStepContext_CloneData_Call) Return added in v1.5.0

func (*MockStepContext_CloneData_Call) Run added in v1.5.0

func (*MockStepContext_CloneData_Call) RunAndReturn added in v1.5.0

func (_c *MockStepContext_CloneData_Call) RunAndReturn(run func() map[string]any) *MockStepContext_CloneData_Call

type MockStepContext_Expecter added in v1.5.0

type MockStepContext_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockStepContext_Expecter) CloneData added in v1.5.0

CloneData is a helper method to define mock.On call

func (*MockStepContext_Expecter) GetVariable added in v1.5.0

func (_e *MockStepContext_Expecter) GetVariable(key interface{}) *MockStepContext_GetVariable_Call

GetVariable is a helper method to define mock.On call

  • key string

func (*MockStepContext_Expecter) GetVariableAsString added in v1.5.0

func (_e *MockStepContext_Expecter) GetVariableAsString(key interface{}) *MockStepContext_GetVariableAsString_Call

GetVariableAsString is a helper method to define mock.On call

  • key string

func (*MockStepContext_Expecter) IdempotencyKey added in v1.5.0

IdempotencyKey is a helper method to define mock.On call

func (*MockStepContext_Expecter) InstanceID added in v1.5.0

InstanceID is a helper method to define mock.On call

func (*MockStepContext_Expecter) RetryCount added in v1.5.0

RetryCount is a helper method to define mock.On call

func (*MockStepContext_Expecter) StepName added in v1.5.0

StepName is a helper method to define mock.On call

type MockStepContext_GetVariableAsString_Call added in v1.5.0

type MockStepContext_GetVariableAsString_Call struct {
	*mock.Call
}

MockStepContext_GetVariableAsString_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetVariableAsString'

func (*MockStepContext_GetVariableAsString_Call) Return added in v1.5.0

func (*MockStepContext_GetVariableAsString_Call) Run added in v1.5.0

func (*MockStepContext_GetVariableAsString_Call) RunAndReturn added in v1.5.0

type MockStepContext_GetVariable_Call added in v1.5.0

type MockStepContext_GetVariable_Call struct {
	*mock.Call
}

MockStepContext_GetVariable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetVariable'

func (*MockStepContext_GetVariable_Call) Return added in v1.5.0

func (*MockStepContext_GetVariable_Call) Run added in v1.5.0

func (*MockStepContext_GetVariable_Call) RunAndReturn added in v1.5.0

type MockStepContext_IdempotencyKey_Call added in v1.5.0

type MockStepContext_IdempotencyKey_Call struct {
	*mock.Call
}

MockStepContext_IdempotencyKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IdempotencyKey'

func (*MockStepContext_IdempotencyKey_Call) Return added in v1.5.0

func (*MockStepContext_IdempotencyKey_Call) Run added in v1.5.0

func (*MockStepContext_IdempotencyKey_Call) RunAndReturn added in v1.5.0

type MockStepContext_InstanceID_Call added in v1.5.0

type MockStepContext_InstanceID_Call struct {
	*mock.Call
}

MockStepContext_InstanceID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InstanceID'

func (*MockStepContext_InstanceID_Call) Return added in v1.5.0

func (*MockStepContext_InstanceID_Call) Run added in v1.5.0

func (*MockStepContext_InstanceID_Call) RunAndReturn added in v1.5.0

type MockStepContext_RetryCount_Call added in v1.5.0

type MockStepContext_RetryCount_Call struct {
	*mock.Call
}

MockStepContext_RetryCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetryCount'

func (*MockStepContext_RetryCount_Call) Return added in v1.5.0

func (*MockStepContext_RetryCount_Call) Run added in v1.5.0

func (*MockStepContext_RetryCount_Call) RunAndReturn added in v1.5.0

type MockStepContext_StepName_Call added in v1.5.0

type MockStepContext_StepName_Call struct {
	*mock.Call
}

MockStepContext_StepName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StepName'

func (*MockStepContext_StepName_Call) Return added in v1.5.0

func (*MockStepContext_StepName_Call) Run added in v1.5.0

func (*MockStepContext_StepName_Call) RunAndReturn added in v1.5.0

type MockStepHandler added in v1.5.0

type MockStepHandler struct {
	mock.Mock
}

MockStepHandler is an autogenerated mock type for the StepHandler type

func NewMockStepHandler added in v1.5.0

func NewMockStepHandler(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockStepHandler

NewMockStepHandler creates a new instance of MockStepHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockStepHandler) EXPECT added in v1.5.0

func (*MockStepHandler) Execute added in v1.5.0

func (_mock *MockStepHandler) Execute(ctx context.Context, stepCtx StepContext, input json.RawMessage) (json.RawMessage, error)

Execute provides a mock function for the type MockStepHandler

func (*MockStepHandler) Name added in v1.5.0

func (_mock *MockStepHandler) Name() string

Name provides a mock function for the type MockStepHandler

type MockStepHandler_Execute_Call added in v1.5.0

type MockStepHandler_Execute_Call struct {
	*mock.Call
}

MockStepHandler_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'

func (*MockStepHandler_Execute_Call) Return added in v1.5.0

func (*MockStepHandler_Execute_Call) Run added in v1.5.0

func (*MockStepHandler_Execute_Call) RunAndReturn added in v1.5.0

type MockStepHandler_Expecter added in v1.5.0

type MockStepHandler_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockStepHandler_Expecter) Execute added in v1.5.0

func (_e *MockStepHandler_Expecter) Execute(ctx interface{}, stepCtx interface{}, input interface{}) *MockStepHandler_Execute_Call

Execute is a helper method to define mock.On call

  • ctx context.Context
  • stepCtx StepContext
  • input json.RawMessage

func (*MockStepHandler_Expecter) Name added in v1.5.0

Name is a helper method to define mock.On call

type MockStepHandler_Name_Call added in v1.5.0

type MockStepHandler_Name_Call struct {
	*mock.Call
}

MockStepHandler_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'

func (*MockStepHandler_Name_Call) Return added in v1.5.0

func (*MockStepHandler_Name_Call) Run added in v1.5.0

func (*MockStepHandler_Name_Call) RunAndReturn added in v1.5.0

func (_c *MockStepHandler_Name_Call) RunAndReturn(run func() string) *MockStepHandler_Name_Call

type MockStore added in v1.5.0

type MockStore struct {
	mock.Mock
}

MockStore is an autogenerated mock type for the Store type

func NewMockStore added in v1.5.0

func NewMockStore(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockStore

NewMockStore creates a new instance of MockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockStore) AddToJoinWaitFor added in v1.10.0

func (_mock *MockStore) AddToJoinWaitFor(ctx context.Context, instanceID int64, joinStepName string, stepToAdd string) error

AddToJoinWaitFor provides a mock function for the type MockStore

func (*MockStore) CleanupOldWorkflows added in v1.7.0

func (_mock *MockStore) CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error)

CleanupOldWorkflows provides a mock function for the type MockStore

func (*MockStore) CreateCancelRequest added in v1.6.1

func (_mock *MockStore) CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error

CreateCancelRequest provides a mock function for the type MockStore

func (*MockStore) CreateDeadLetterRecord added in v1.9.0

func (_mock *MockStore) CreateDeadLetterRecord(ctx context.Context, rec *DeadLetterRecord) error

CreateDeadLetterRecord provides a mock function for the type MockStore

func (*MockStore) CreateHumanDecision added in v1.5.0

func (_mock *MockStore) CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error

CreateHumanDecision provides a mock function for the type MockStore

func (*MockStore) CreateInstance added in v1.5.0

func (_mock *MockStore) CreateInstance(ctx context.Context, workflowID string, input json.RawMessage) (*WorkflowInstance, error)

CreateInstance provides a mock function for the type MockStore

func (*MockStore) CreateJoinState added in v1.5.0

func (_mock *MockStore) CreateJoinState(ctx context.Context, instanceID int64, joinStepName string, waitingFor []string, strategy JoinStrategy) error

CreateJoinState provides a mock function for the type MockStore

func (*MockStore) CreateStep added in v1.5.0

func (_mock *MockStore) CreateStep(ctx context.Context, step *WorkflowStep) error

CreateStep provides a mock function for the type MockStore

func (*MockStore) DeleteCancelRequest added in v1.6.1

func (_mock *MockStore) DeleteCancelRequest(ctx context.Context, instanceID int64) error

DeleteCancelRequest provides a mock function for the type MockStore

func (*MockStore) DequeueStep added in v1.5.0

func (_mock *MockStore) DequeueStep(ctx context.Context, workerID string) (*QueueItem, error)

DequeueStep provides a mock function for the type MockStore

func (*MockStore) EXPECT added in v1.5.0

func (_m *MockStore) EXPECT() *MockStore_Expecter

func (*MockStore) EnqueueStep added in v1.5.0

func (_mock *MockStore) EnqueueStep(ctx context.Context, instanceID int64, stepID *int64, priority Priority, delay time.Duration) error

EnqueueStep provides a mock function for the type MockStore

func (*MockStore) GetActiveInstances added in v1.5.0

func (_mock *MockStore) GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error)

GetActiveInstances provides a mock function for the type MockStore

func (*MockStore) GetActiveStepsForUpdate added in v1.8.0

func (_mock *MockStore) GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

GetActiveStepsForUpdate provides a mock function for the type MockStore

func (*MockStore) GetAllWorkflowInstances added in v1.5.0

func (_mock *MockStore) GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error)

GetAllWorkflowInstances provides a mock function for the type MockStore

func (*MockStore) GetCancelRequest added in v1.6.1

func (_mock *MockStore) GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error)

GetCancelRequest provides a mock function for the type MockStore

func (*MockStore) GetDeadLetterByID added in v1.9.0

func (_mock *MockStore) GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error)

GetDeadLetterByID provides a mock function for the type MockStore

func (*MockStore) GetHumanDecision added in v1.5.0

func (_mock *MockStore) GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error)

GetHumanDecision provides a mock function for the type MockStore

func (*MockStore) GetHumanDecisionStepByInstanceID added in v1.5.0

func (_mock *MockStore) GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error)

GetHumanDecisionStepByInstanceID provides a mock function for the type MockStore

func (*MockStore) GetInstance added in v1.5.0

func (_mock *MockStore) GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error)

GetInstance provides a mock function for the type MockStore

func (*MockStore) GetJoinState added in v1.5.0

func (_mock *MockStore) GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error)

GetJoinState provides a mock function for the type MockStore

func (*MockStore) GetStepByID added in v1.5.0

func (_mock *MockStore) GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error)

GetStepByID provides a mock function for the type MockStore

func (*MockStore) GetStepsByInstance added in v1.5.0

func (_mock *MockStore) GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

GetStepsByInstance provides a mock function for the type MockStore

func (*MockStore) GetSummaryStats added in v1.5.0

func (_mock *MockStore) GetSummaryStats(ctx context.Context) (*SummaryStats, error)

GetSummaryStats provides a mock function for the type MockStore

func (*MockStore) GetWorkflowDefinition added in v1.5.0

func (_mock *MockStore) GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error)

GetWorkflowDefinition provides a mock function for the type MockStore

func (*MockStore) GetWorkflowDefinitions added in v1.5.0

func (_mock *MockStore) GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error)

GetWorkflowDefinitions provides a mock function for the type MockStore

func (*MockStore) GetWorkflowEvents added in v1.5.0

func (_mock *MockStore) GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error)

GetWorkflowEvents provides a mock function for the type MockStore

func (*MockStore) GetWorkflowInstances added in v1.5.0

func (_mock *MockStore) GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error)

GetWorkflowInstances provides a mock function for the type MockStore

func (*MockStore) GetWorkflowStats added in v1.5.0

func (_mock *MockStore) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)

GetWorkflowStats provides a mock function for the type MockStore

func (*MockStore) GetWorkflowSteps added in v1.5.0

func (_mock *MockStore) GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

GetWorkflowSteps provides a mock function for the type MockStore

func (*MockStore) ListDeadLetters added in v1.9.0

func (_mock *MockStore) ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error)

ListDeadLetters provides a mock function for the type MockStore

func (*MockStore) LogEvent added in v1.5.0

func (_mock *MockStore) LogEvent(ctx context.Context, instanceID int64, stepID *int64, eventType string, payload any) error

LogEvent provides a mock function for the type MockStore

func (*MockStore) PauseActiveStepsAndClearQueue added in v1.9.0

func (_mock *MockStore) PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error

PauseActiveStepsAndClearQueue provides a mock function for the type MockStore

func (*MockStore) ReleaseQueueItem added in v1.11.0

func (_mock *MockStore) ReleaseQueueItem(ctx context.Context, queueID int64) error

ReleaseQueueItem provides a mock function for the type MockStore

func (*MockStore) RemoveFromQueue added in v1.5.0

func (_mock *MockStore) RemoveFromQueue(ctx context.Context, queueID int64) error

RemoveFromQueue provides a mock function for the type MockStore

func (*MockStore) ReplaceInJoinWaitFor added in v1.10.0

func (_mock *MockStore) ReplaceInJoinWaitFor(ctx context.Context, instanceID int64, joinStepName string, virtualStep string, realStep string) error

ReplaceInJoinWaitFor provides a mock function for the type MockStore

func (*MockStore) RequeueDeadLetter added in v1.9.0

func (_mock *MockStore) RequeueDeadLetter(ctx context.Context, dlqID int64, newInput *json.RawMessage) error

RequeueDeadLetter provides a mock function for the type MockStore

func (*MockStore) RescheduleAndReleaseQueueItem added in v1.11.0

func (_mock *MockStore) RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error

RescheduleAndReleaseQueueItem provides a mock function for the type MockStore

func (*MockStore) SaveWorkflowDefinition added in v1.5.0

func (_mock *MockStore) SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error

SaveWorkflowDefinition provides a mock function for the type MockStore

func (*MockStore) SetAgingEnabled added in v1.11.2

func (_mock *MockStore) SetAgingEnabled(enabled bool)

SetAgingEnabled provides a mock function for the type MockStore

func (*MockStore) SetAgingRate added in v1.11.2

func (_mock *MockStore) SetAgingRate(rate float64)

SetAgingRate provides a mock function for the type MockStore

func (*MockStore) UpdateInstanceStatus added in v1.5.0

func (_mock *MockStore) UpdateInstanceStatus(ctx context.Context, instanceID int64, status WorkflowStatus, output json.RawMessage, errMsg *string) error

UpdateInstanceStatus provides a mock function for the type MockStore

func (*MockStore) UpdateJoinState added in v1.5.0

func (_mock *MockStore) UpdateJoinState(ctx context.Context, instanceID int64, joinStepName string, completedStep string, success bool) (bool, error)

UpdateJoinState provides a mock function for the type MockStore

func (*MockStore) UpdateStep added in v1.5.0

func (_mock *MockStore) UpdateStep(ctx context.Context, stepID int64, status StepStatus, output json.RawMessage, errMsg *string) error

UpdateStep provides a mock function for the type MockStore

func (*MockStore) UpdateStepCompensationRetry added in v1.5.0

func (_mock *MockStore) UpdateStepCompensationRetry(ctx context.Context, stepID int64, retryCount int, status StepStatus) error

UpdateStepCompensationRetry provides a mock function for the type MockStore

func (*MockStore) UpdateStepStatus added in v1.5.0

func (_mock *MockStore) UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error

UpdateStepStatus provides a mock function for the type MockStore

type MockStore_AddToJoinWaitFor_Call added in v1.10.0

type MockStore_AddToJoinWaitFor_Call struct {
	*mock.Call
}

MockStore_AddToJoinWaitFor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddToJoinWaitFor'

func (*MockStore_AddToJoinWaitFor_Call) Return added in v1.10.0

func (*MockStore_AddToJoinWaitFor_Call) Run added in v1.10.0

func (_c *MockStore_AddToJoinWaitFor_Call) Run(run func(ctx context.Context, instanceID int64, joinStepName string, stepToAdd string)) *MockStore_AddToJoinWaitFor_Call

func (*MockStore_AddToJoinWaitFor_Call) RunAndReturn added in v1.10.0

func (_c *MockStore_AddToJoinWaitFor_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, joinStepName string, stepToAdd string) error) *MockStore_AddToJoinWaitFor_Call

type MockStore_CleanupOldWorkflows_Call added in v1.7.0

type MockStore_CleanupOldWorkflows_Call struct {
	*mock.Call
}

MockStore_CleanupOldWorkflows_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupOldWorkflows'

func (*MockStore_CleanupOldWorkflows_Call) Return added in v1.7.0

func (*MockStore_CleanupOldWorkflows_Call) Run added in v1.7.0

func (*MockStore_CleanupOldWorkflows_Call) RunAndReturn added in v1.7.0

type MockStore_CreateCancelRequest_Call added in v1.6.1

type MockStore_CreateCancelRequest_Call struct {
	*mock.Call
}

MockStore_CreateCancelRequest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateCancelRequest'

func (*MockStore_CreateCancelRequest_Call) Return added in v1.6.1

func (*MockStore_CreateCancelRequest_Call) Run added in v1.6.1

func (*MockStore_CreateCancelRequest_Call) RunAndReturn added in v1.6.1

type MockStore_CreateDeadLetterRecord_Call added in v1.9.0

type MockStore_CreateDeadLetterRecord_Call struct {
	*mock.Call
}

MockStore_CreateDeadLetterRecord_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateDeadLetterRecord'

func (*MockStore_CreateDeadLetterRecord_Call) Return added in v1.9.0

func (*MockStore_CreateDeadLetterRecord_Call) Run added in v1.9.0

func (*MockStore_CreateDeadLetterRecord_Call) RunAndReturn added in v1.9.0

type MockStore_CreateHumanDecision_Call added in v1.5.0

type MockStore_CreateHumanDecision_Call struct {
	*mock.Call
}

MockStore_CreateHumanDecision_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateHumanDecision'

func (*MockStore_CreateHumanDecision_Call) Return added in v1.5.0

func (*MockStore_CreateHumanDecision_Call) Run added in v1.5.0

func (*MockStore_CreateHumanDecision_Call) RunAndReturn added in v1.5.0

type MockStore_CreateInstance_Call added in v1.5.0

type MockStore_CreateInstance_Call struct {
	*mock.Call
}

MockStore_CreateInstance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateInstance'

func (*MockStore_CreateInstance_Call) Return added in v1.5.0

func (*MockStore_CreateInstance_Call) Run added in v1.5.0

func (*MockStore_CreateInstance_Call) RunAndReturn added in v1.5.0

type MockStore_CreateJoinState_Call added in v1.5.0

type MockStore_CreateJoinState_Call struct {
	*mock.Call
}

MockStore_CreateJoinState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateJoinState'

func (*MockStore_CreateJoinState_Call) Return added in v1.5.0

func (*MockStore_CreateJoinState_Call) Run added in v1.5.0

func (_c *MockStore_CreateJoinState_Call) Run(run func(ctx context.Context, instanceID int64, joinStepName string, waitingFor []string, strategy JoinStrategy)) *MockStore_CreateJoinState_Call

func (*MockStore_CreateJoinState_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_CreateJoinState_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, joinStepName string, waitingFor []string, strategy JoinStrategy) error) *MockStore_CreateJoinState_Call

type MockStore_CreateStep_Call added in v1.5.0

type MockStore_CreateStep_Call struct {
	*mock.Call
}

MockStore_CreateStep_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateStep'

func (*MockStore_CreateStep_Call) Return added in v1.5.0

func (*MockStore_CreateStep_Call) Run added in v1.5.0

func (*MockStore_CreateStep_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_CreateStep_Call) RunAndReturn(run func(ctx context.Context, step *WorkflowStep) error) *MockStore_CreateStep_Call

type MockStore_DeleteCancelRequest_Call added in v1.6.1

type MockStore_DeleteCancelRequest_Call struct {
	*mock.Call
}

MockStore_DeleteCancelRequest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteCancelRequest'

func (*MockStore_DeleteCancelRequest_Call) Return added in v1.6.1

func (*MockStore_DeleteCancelRequest_Call) Run added in v1.6.1

func (*MockStore_DeleteCancelRequest_Call) RunAndReturn added in v1.6.1

type MockStore_DequeueStep_Call added in v1.5.0

type MockStore_DequeueStep_Call struct {
	*mock.Call
}

MockStore_DequeueStep_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DequeueStep'

func (*MockStore_DequeueStep_Call) Return added in v1.5.0

func (*MockStore_DequeueStep_Call) Run added in v1.5.0

func (*MockStore_DequeueStep_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_DequeueStep_Call) RunAndReturn(run func(ctx context.Context, workerID string) (*QueueItem, error)) *MockStore_DequeueStep_Call

type MockStore_EnqueueStep_Call added in v1.5.0

type MockStore_EnqueueStep_Call struct {
	*mock.Call
}

MockStore_EnqueueStep_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnqueueStep'

func (*MockStore_EnqueueStep_Call) Return added in v1.5.0

func (*MockStore_EnqueueStep_Call) Run added in v1.5.0

func (_c *MockStore_EnqueueStep_Call) Run(run func(ctx context.Context, instanceID int64, stepID *int64, priority Priority, delay time.Duration)) *MockStore_EnqueueStep_Call

func (*MockStore_EnqueueStep_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_EnqueueStep_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, stepID *int64, priority Priority, delay time.Duration) error) *MockStore_EnqueueStep_Call

type MockStore_Expecter added in v1.5.0

type MockStore_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockStore_Expecter) AddToJoinWaitFor added in v1.10.0

func (_e *MockStore_Expecter) AddToJoinWaitFor(ctx interface{}, instanceID interface{}, joinStepName interface{}, stepToAdd interface{}) *MockStore_AddToJoinWaitFor_Call

AddToJoinWaitFor is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • joinStepName string
  • stepToAdd string

func (*MockStore_Expecter) CleanupOldWorkflows added in v1.7.0

func (_e *MockStore_Expecter) CleanupOldWorkflows(ctx interface{}, daysToKeep interface{}) *MockStore_CleanupOldWorkflows_Call

CleanupOldWorkflows is a helper method to define mock.On call

  • ctx context.Context
  • daysToKeep int

func (*MockStore_Expecter) CreateCancelRequest added in v1.6.1

func (_e *MockStore_Expecter) CreateCancelRequest(ctx interface{}, req interface{}) *MockStore_CreateCancelRequest_Call

CreateCancelRequest is a helper method to define mock.On call

  • ctx context.Context
  • req *WorkflowCancelRequest

func (*MockStore_Expecter) CreateDeadLetterRecord added in v1.9.0

func (_e *MockStore_Expecter) CreateDeadLetterRecord(ctx interface{}, rec interface{}) *MockStore_CreateDeadLetterRecord_Call

CreateDeadLetterRecord is a helper method to define mock.On call

  • ctx context.Context
  • rec *DeadLetterRecord

func (*MockStore_Expecter) CreateHumanDecision added in v1.5.0

func (_e *MockStore_Expecter) CreateHumanDecision(ctx interface{}, decision interface{}) *MockStore_CreateHumanDecision_Call

CreateHumanDecision is a helper method to define mock.On call

  • ctx context.Context
  • decision *HumanDecisionRecord

func (*MockStore_Expecter) CreateInstance added in v1.5.0

func (_e *MockStore_Expecter) CreateInstance(ctx interface{}, workflowID interface{}, input interface{}) *MockStore_CreateInstance_Call

CreateInstance is a helper method to define mock.On call

  • ctx context.Context
  • workflowID string
  • input json.RawMessage

func (*MockStore_Expecter) CreateJoinState added in v1.5.0

func (_e *MockStore_Expecter) CreateJoinState(ctx interface{}, instanceID interface{}, joinStepName interface{}, waitingFor interface{}, strategy interface{}) *MockStore_CreateJoinState_Call

CreateJoinState is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • joinStepName string
  • waitingFor []string
  • strategy JoinStrategy

func (*MockStore_Expecter) CreateStep added in v1.5.0

func (_e *MockStore_Expecter) CreateStep(ctx interface{}, step interface{}) *MockStore_CreateStep_Call

CreateStep is a helper method to define mock.On call

  • ctx context.Context
  • step *WorkflowStep

func (*MockStore_Expecter) DeleteCancelRequest added in v1.6.1

func (_e *MockStore_Expecter) DeleteCancelRequest(ctx interface{}, instanceID interface{}) *MockStore_DeleteCancelRequest_Call

DeleteCancelRequest is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) DequeueStep added in v1.5.0

func (_e *MockStore_Expecter) DequeueStep(ctx interface{}, workerID interface{}) *MockStore_DequeueStep_Call

DequeueStep is a helper method to define mock.On call

  • ctx context.Context
  • workerID string

func (*MockStore_Expecter) EnqueueStep added in v1.5.0

func (_e *MockStore_Expecter) EnqueueStep(ctx interface{}, instanceID interface{}, stepID interface{}, priority interface{}, delay interface{}) *MockStore_EnqueueStep_Call

EnqueueStep is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • stepID *int64
  • priority Priority
  • delay time.Duration

func (*MockStore_Expecter) GetActiveInstances added in v1.5.0

func (_e *MockStore_Expecter) GetActiveInstances(ctx interface{}) *MockStore_GetActiveInstances_Call

GetActiveInstances is a helper method to define mock.On call

  • ctx context.Context

func (*MockStore_Expecter) GetActiveStepsForUpdate added in v1.8.0

func (_e *MockStore_Expecter) GetActiveStepsForUpdate(ctx interface{}, instanceID interface{}) *MockStore_GetActiveStepsForUpdate_Call

GetActiveStepsForUpdate is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) GetAllWorkflowInstances added in v1.5.0

func (_e *MockStore_Expecter) GetAllWorkflowInstances(ctx interface{}) *MockStore_GetAllWorkflowInstances_Call

GetAllWorkflowInstances is a helper method to define mock.On call

  • ctx context.Context

func (*MockStore_Expecter) GetCancelRequest added in v1.6.1

func (_e *MockStore_Expecter) GetCancelRequest(ctx interface{}, instanceID interface{}) *MockStore_GetCancelRequest_Call

GetCancelRequest is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) GetDeadLetterByID added in v1.9.0

func (_e *MockStore_Expecter) GetDeadLetterByID(ctx interface{}, id interface{}) *MockStore_GetDeadLetterByID_Call

GetDeadLetterByID is a helper method to define mock.On call

  • ctx context.Context
  • id int64

func (*MockStore_Expecter) GetHumanDecision added in v1.5.0

func (_e *MockStore_Expecter) GetHumanDecision(ctx interface{}, stepID interface{}) *MockStore_GetHumanDecision_Call

GetHumanDecision is a helper method to define mock.On call

  • ctx context.Context
  • stepID int64

func (*MockStore_Expecter) GetHumanDecisionStepByInstanceID added in v1.5.0

func (_e *MockStore_Expecter) GetHumanDecisionStepByInstanceID(ctx interface{}, instanceID interface{}) *MockStore_GetHumanDecisionStepByInstanceID_Call

GetHumanDecisionStepByInstanceID is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) GetInstance added in v1.5.0

func (_e *MockStore_Expecter) GetInstance(ctx interface{}, instanceID interface{}) *MockStore_GetInstance_Call

GetInstance is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) GetJoinState added in v1.5.0

func (_e *MockStore_Expecter) GetJoinState(ctx interface{}, instanceID interface{}, joinStepName interface{}) *MockStore_GetJoinState_Call

GetJoinState is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • joinStepName string

func (*MockStore_Expecter) GetStepByID added in v1.5.0

func (_e *MockStore_Expecter) GetStepByID(ctx interface{}, stepID interface{}) *MockStore_GetStepByID_Call

GetStepByID is a helper method to define mock.On call

  • ctx context.Context
  • stepID int64

func (*MockStore_Expecter) GetStepsByInstance added in v1.5.0

func (_e *MockStore_Expecter) GetStepsByInstance(ctx interface{}, instanceID interface{}) *MockStore_GetStepsByInstance_Call

GetStepsByInstance is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) GetSummaryStats added in v1.5.0

func (_e *MockStore_Expecter) GetSummaryStats(ctx interface{}) *MockStore_GetSummaryStats_Call

GetSummaryStats is a helper method to define mock.On call

  • ctx context.Context

func (*MockStore_Expecter) GetWorkflowDefinition added in v1.5.0

func (_e *MockStore_Expecter) GetWorkflowDefinition(ctx interface{}, id interface{}) *MockStore_GetWorkflowDefinition_Call

GetWorkflowDefinition is a helper method to define mock.On call

  • ctx context.Context
  • id string

func (*MockStore_Expecter) GetWorkflowDefinitions added in v1.5.0

func (_e *MockStore_Expecter) GetWorkflowDefinitions(ctx interface{}) *MockStore_GetWorkflowDefinitions_Call

GetWorkflowDefinitions is a helper method to define mock.On call

  • ctx context.Context

func (*MockStore_Expecter) GetWorkflowEvents added in v1.5.0

func (_e *MockStore_Expecter) GetWorkflowEvents(ctx interface{}, instanceID interface{}) *MockStore_GetWorkflowEvents_Call

GetWorkflowEvents is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) GetWorkflowInstances added in v1.5.0

func (_e *MockStore_Expecter) GetWorkflowInstances(ctx interface{}, workflowID interface{}) *MockStore_GetWorkflowInstances_Call

GetWorkflowInstances is a helper method to define mock.On call

  • ctx context.Context
  • workflowID string

func (*MockStore_Expecter) GetWorkflowStats added in v1.5.0

func (_e *MockStore_Expecter) GetWorkflowStats(ctx interface{}) *MockStore_GetWorkflowStats_Call

GetWorkflowStats is a helper method to define mock.On call

  • ctx context.Context

func (*MockStore_Expecter) GetWorkflowSteps added in v1.5.0

func (_e *MockStore_Expecter) GetWorkflowSteps(ctx interface{}, instanceID interface{}) *MockStore_GetWorkflowSteps_Call

GetWorkflowSteps is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) ListDeadLetters added in v1.9.0

func (_e *MockStore_Expecter) ListDeadLetters(ctx interface{}, offset interface{}, limit interface{}) *MockStore_ListDeadLetters_Call

ListDeadLetters is a helper method to define mock.On call

  • ctx context.Context
  • offset int
  • limit int

func (*MockStore_Expecter) LogEvent added in v1.5.0

func (_e *MockStore_Expecter) LogEvent(ctx interface{}, instanceID interface{}, stepID interface{}, eventType interface{}, payload interface{}) *MockStore_LogEvent_Call

LogEvent is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • stepID *int64
  • eventType string
  • payload any

func (*MockStore_Expecter) PauseActiveStepsAndClearQueue added in v1.9.0

func (_e *MockStore_Expecter) PauseActiveStepsAndClearQueue(ctx interface{}, instanceID interface{}) *MockStore_PauseActiveStepsAndClearQueue_Call

PauseActiveStepsAndClearQueue is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64

func (*MockStore_Expecter) ReleaseQueueItem added in v1.11.0

func (_e *MockStore_Expecter) ReleaseQueueItem(ctx interface{}, queueID interface{}) *MockStore_ReleaseQueueItem_Call

ReleaseQueueItem is a helper method to define mock.On call

  • ctx context.Context
  • queueID int64

func (*MockStore_Expecter) RemoveFromQueue added in v1.5.0

func (_e *MockStore_Expecter) RemoveFromQueue(ctx interface{}, queueID interface{}) *MockStore_RemoveFromQueue_Call

RemoveFromQueue is a helper method to define mock.On call

  • ctx context.Context
  • queueID int64

func (*MockStore_Expecter) ReplaceInJoinWaitFor added in v1.10.0

func (_e *MockStore_Expecter) ReplaceInJoinWaitFor(ctx interface{}, instanceID interface{}, joinStepName interface{}, virtualStep interface{}, realStep interface{}) *MockStore_ReplaceInJoinWaitFor_Call

ReplaceInJoinWaitFor is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • joinStepName string
  • virtualStep string
  • realStep string

func (*MockStore_Expecter) RequeueDeadLetter added in v1.9.0

func (_e *MockStore_Expecter) RequeueDeadLetter(ctx interface{}, dlqID interface{}, newInput interface{}) *MockStore_RequeueDeadLetter_Call

RequeueDeadLetter is a helper method to define mock.On call

  • ctx context.Context
  • dlqID int64
  • newInput *json.RawMessage

func (*MockStore_Expecter) RescheduleAndReleaseQueueItem added in v1.11.0

func (_e *MockStore_Expecter) RescheduleAndReleaseQueueItem(ctx interface{}, queueID interface{}, delay interface{}) *MockStore_RescheduleAndReleaseQueueItem_Call

RescheduleAndReleaseQueueItem is a helper method to define mock.On call

  • ctx context.Context
  • queueID int64
  • delay time.Duration

func (*MockStore_Expecter) SaveWorkflowDefinition added in v1.5.0

func (_e *MockStore_Expecter) SaveWorkflowDefinition(ctx interface{}, def interface{}) *MockStore_SaveWorkflowDefinition_Call

SaveWorkflowDefinition is a helper method to define mock.On call

  • ctx context.Context
  • def *WorkflowDefinition

func (*MockStore_Expecter) SetAgingEnabled added in v1.11.2

func (_e *MockStore_Expecter) SetAgingEnabled(enabled interface{}) *MockStore_SetAgingEnabled_Call

SetAgingEnabled is a helper method to define mock.On call

  • enabled bool

func (*MockStore_Expecter) SetAgingRate added in v1.11.2

func (_e *MockStore_Expecter) SetAgingRate(rate interface{}) *MockStore_SetAgingRate_Call

SetAgingRate is a helper method to define mock.On call

  • rate float64

func (*MockStore_Expecter) UpdateInstanceStatus added in v1.5.0

func (_e *MockStore_Expecter) UpdateInstanceStatus(ctx interface{}, instanceID interface{}, status interface{}, output interface{}, errMsg interface{}) *MockStore_UpdateInstanceStatus_Call

UpdateInstanceStatus is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • status WorkflowStatus
  • output json.RawMessage
  • errMsg *string

func (*MockStore_Expecter) UpdateJoinState added in v1.5.0

func (_e *MockStore_Expecter) UpdateJoinState(ctx interface{}, instanceID interface{}, joinStepName interface{}, completedStep interface{}, success interface{}) *MockStore_UpdateJoinState_Call

UpdateJoinState is a helper method to define mock.On call

  • ctx context.Context
  • instanceID int64
  • joinStepName string
  • completedStep string
  • success bool

func (*MockStore_Expecter) UpdateStep added in v1.5.0

func (_e *MockStore_Expecter) UpdateStep(ctx interface{}, stepID interface{}, status interface{}, output interface{}, errMsg interface{}) *MockStore_UpdateStep_Call

UpdateStep is a helper method to define mock.On call

  • ctx context.Context
  • stepID int64
  • status StepStatus
  • output json.RawMessage
  • errMsg *string

func (*MockStore_Expecter) UpdateStepCompensationRetry added in v1.5.0

func (_e *MockStore_Expecter) UpdateStepCompensationRetry(ctx interface{}, stepID interface{}, retryCount interface{}, status interface{}) *MockStore_UpdateStepCompensationRetry_Call

UpdateStepCompensationRetry is a helper method to define mock.On call

  • ctx context.Context
  • stepID int64
  • retryCount int
  • status StepStatus

func (*MockStore_Expecter) UpdateStepStatus added in v1.5.0

func (_e *MockStore_Expecter) UpdateStepStatus(ctx interface{}, stepID interface{}, status interface{}) *MockStore_UpdateStepStatus_Call

UpdateStepStatus is a helper method to define mock.On call

  • ctx context.Context
  • stepID int64
  • status StepStatus

type MockStore_GetActiveInstances_Call added in v1.5.0

type MockStore_GetActiveInstances_Call struct {
	*mock.Call
}

MockStore_GetActiveInstances_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetActiveInstances'

func (*MockStore_GetActiveInstances_Call) Return added in v1.5.0

func (*MockStore_GetActiveInstances_Call) Run added in v1.5.0

func (*MockStore_GetActiveInstances_Call) RunAndReturn added in v1.5.0

type MockStore_GetActiveStepsForUpdate_Call added in v1.8.0

type MockStore_GetActiveStepsForUpdate_Call struct {
	*mock.Call
}

MockStore_GetActiveStepsForUpdate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetActiveStepsForUpdate'

func (*MockStore_GetActiveStepsForUpdate_Call) Return added in v1.8.0

func (*MockStore_GetActiveStepsForUpdate_Call) Run added in v1.8.0

func (*MockStore_GetActiveStepsForUpdate_Call) RunAndReturn added in v1.8.0

type MockStore_GetAllWorkflowInstances_Call added in v1.5.0

type MockStore_GetAllWorkflowInstances_Call struct {
	*mock.Call
}

MockStore_GetAllWorkflowInstances_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAllWorkflowInstances'

func (*MockStore_GetAllWorkflowInstances_Call) Return added in v1.5.0

func (*MockStore_GetAllWorkflowInstances_Call) Run added in v1.5.0

func (*MockStore_GetAllWorkflowInstances_Call) RunAndReturn added in v1.5.0

type MockStore_GetCancelRequest_Call added in v1.6.1

type MockStore_GetCancelRequest_Call struct {
	*mock.Call
}

MockStore_GetCancelRequest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCancelRequest'

func (*MockStore_GetCancelRequest_Call) Return added in v1.6.1

func (*MockStore_GetCancelRequest_Call) Run added in v1.6.1

func (*MockStore_GetCancelRequest_Call) RunAndReturn added in v1.6.1

type MockStore_GetDeadLetterByID_Call added in v1.9.0

type MockStore_GetDeadLetterByID_Call struct {
	*mock.Call
}

MockStore_GetDeadLetterByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDeadLetterByID'

func (*MockStore_GetDeadLetterByID_Call) Return added in v1.9.0

func (*MockStore_GetDeadLetterByID_Call) Run added in v1.9.0

func (*MockStore_GetDeadLetterByID_Call) RunAndReturn added in v1.9.0

type MockStore_GetHumanDecisionStepByInstanceID_Call added in v1.5.0

type MockStore_GetHumanDecisionStepByInstanceID_Call struct {
	*mock.Call
}

MockStore_GetHumanDecisionStepByInstanceID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHumanDecisionStepByInstanceID'

func (*MockStore_GetHumanDecisionStepByInstanceID_Call) Return added in v1.5.0

func (*MockStore_GetHumanDecisionStepByInstanceID_Call) Run added in v1.5.0

func (*MockStore_GetHumanDecisionStepByInstanceID_Call) RunAndReturn added in v1.5.0

type MockStore_GetHumanDecision_Call added in v1.5.0

type MockStore_GetHumanDecision_Call struct {
	*mock.Call
}

MockStore_GetHumanDecision_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHumanDecision'

func (*MockStore_GetHumanDecision_Call) Return added in v1.5.0

func (*MockStore_GetHumanDecision_Call) Run added in v1.5.0

func (*MockStore_GetHumanDecision_Call) RunAndReturn added in v1.5.0

type MockStore_GetInstance_Call added in v1.5.0

type MockStore_GetInstance_Call struct {
	*mock.Call
}

MockStore_GetInstance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetInstance'

func (*MockStore_GetInstance_Call) Return added in v1.5.0

func (*MockStore_GetInstance_Call) Run added in v1.5.0

func (_c *MockStore_GetInstance_Call) Run(run func(ctx context.Context, instanceID int64)) *MockStore_GetInstance_Call

func (*MockStore_GetInstance_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_GetInstance_Call) RunAndReturn(run func(ctx context.Context, instanceID int64) (*WorkflowInstance, error)) *MockStore_GetInstance_Call

type MockStore_GetJoinState_Call added in v1.5.0

type MockStore_GetJoinState_Call struct {
	*mock.Call
}

MockStore_GetJoinState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetJoinState'

func (*MockStore_GetJoinState_Call) Return added in v1.5.0

func (*MockStore_GetJoinState_Call) Run added in v1.5.0

func (_c *MockStore_GetJoinState_Call) Run(run func(ctx context.Context, instanceID int64, joinStepName string)) *MockStore_GetJoinState_Call

func (*MockStore_GetJoinState_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_GetJoinState_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error)) *MockStore_GetJoinState_Call

type MockStore_GetStepByID_Call added in v1.5.0

type MockStore_GetStepByID_Call struct {
	*mock.Call
}

MockStore_GetStepByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStepByID'

func (*MockStore_GetStepByID_Call) Return added in v1.5.0

func (*MockStore_GetStepByID_Call) Run added in v1.5.0

func (*MockStore_GetStepByID_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_GetStepByID_Call) RunAndReturn(run func(ctx context.Context, stepID int64) (*WorkflowStep, error)) *MockStore_GetStepByID_Call

type MockStore_GetStepsByInstance_Call added in v1.5.0

type MockStore_GetStepsByInstance_Call struct {
	*mock.Call
}

MockStore_GetStepsByInstance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStepsByInstance'

func (*MockStore_GetStepsByInstance_Call) Return added in v1.5.0

func (*MockStore_GetStepsByInstance_Call) Run added in v1.5.0

func (*MockStore_GetStepsByInstance_Call) RunAndReturn added in v1.5.0

type MockStore_GetSummaryStats_Call added in v1.5.0

type MockStore_GetSummaryStats_Call struct {
	*mock.Call
}

MockStore_GetSummaryStats_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSummaryStats'

func (*MockStore_GetSummaryStats_Call) Return added in v1.5.0

func (*MockStore_GetSummaryStats_Call) Run added in v1.5.0

func (*MockStore_GetSummaryStats_Call) RunAndReturn added in v1.5.0

type MockStore_GetWorkflowDefinition_Call added in v1.5.0

type MockStore_GetWorkflowDefinition_Call struct {
	*mock.Call
}

MockStore_GetWorkflowDefinition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowDefinition'

func (*MockStore_GetWorkflowDefinition_Call) Return added in v1.5.0

func (*MockStore_GetWorkflowDefinition_Call) Run added in v1.5.0

func (*MockStore_GetWorkflowDefinition_Call) RunAndReturn added in v1.5.0

type MockStore_GetWorkflowDefinitions_Call added in v1.5.0

type MockStore_GetWorkflowDefinitions_Call struct {
	*mock.Call
}

MockStore_GetWorkflowDefinitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowDefinitions'

func (*MockStore_GetWorkflowDefinitions_Call) Return added in v1.5.0

func (*MockStore_GetWorkflowDefinitions_Call) Run added in v1.5.0

func (*MockStore_GetWorkflowDefinitions_Call) RunAndReturn added in v1.5.0

type MockStore_GetWorkflowEvents_Call added in v1.5.0

type MockStore_GetWorkflowEvents_Call struct {
	*mock.Call
}

MockStore_GetWorkflowEvents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowEvents'

func (*MockStore_GetWorkflowEvents_Call) Return added in v1.5.0

func (*MockStore_GetWorkflowEvents_Call) Run added in v1.5.0

func (*MockStore_GetWorkflowEvents_Call) RunAndReturn added in v1.5.0

type MockStore_GetWorkflowInstances_Call added in v1.5.0

type MockStore_GetWorkflowInstances_Call struct {
	*mock.Call
}

MockStore_GetWorkflowInstances_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowInstances'

func (*MockStore_GetWorkflowInstances_Call) Return added in v1.5.0

func (*MockStore_GetWorkflowInstances_Call) Run added in v1.5.0

func (*MockStore_GetWorkflowInstances_Call) RunAndReturn added in v1.5.0

type MockStore_GetWorkflowStats_Call added in v1.5.0

type MockStore_GetWorkflowStats_Call struct {
	*mock.Call
}

MockStore_GetWorkflowStats_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowStats'

func (*MockStore_GetWorkflowStats_Call) Return added in v1.5.0

func (*MockStore_GetWorkflowStats_Call) Run added in v1.5.0

func (*MockStore_GetWorkflowStats_Call) RunAndReturn added in v1.5.0

type MockStore_GetWorkflowSteps_Call added in v1.5.0

type MockStore_GetWorkflowSteps_Call struct {
	*mock.Call
}

MockStore_GetWorkflowSteps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSteps'

func (*MockStore_GetWorkflowSteps_Call) Return added in v1.5.0

func (*MockStore_GetWorkflowSteps_Call) Run added in v1.5.0

func (*MockStore_GetWorkflowSteps_Call) RunAndReturn added in v1.5.0

type MockStore_ListDeadLetters_Call added in v1.9.0

type MockStore_ListDeadLetters_Call struct {
	*mock.Call
}

MockStore_ListDeadLetters_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListDeadLetters'

func (*MockStore_ListDeadLetters_Call) Return added in v1.9.0

func (*MockStore_ListDeadLetters_Call) Run added in v1.9.0

func (*MockStore_ListDeadLetters_Call) RunAndReturn added in v1.9.0

func (_c *MockStore_ListDeadLetters_Call) RunAndReturn(run func(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error)) *MockStore_ListDeadLetters_Call

type MockStore_LogEvent_Call added in v1.5.0

type MockStore_LogEvent_Call struct {
	*mock.Call
}

MockStore_LogEvent_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LogEvent'

func (*MockStore_LogEvent_Call) Return added in v1.5.0

func (*MockStore_LogEvent_Call) Run added in v1.5.0

func (_c *MockStore_LogEvent_Call) Run(run func(ctx context.Context, instanceID int64, stepID *int64, eventType string, payload any)) *MockStore_LogEvent_Call

func (*MockStore_LogEvent_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_LogEvent_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, stepID *int64, eventType string, payload any) error) *MockStore_LogEvent_Call

type MockStore_PauseActiveStepsAndClearQueue_Call added in v1.9.0

type MockStore_PauseActiveStepsAndClearQueue_Call struct {
	*mock.Call
}

MockStore_PauseActiveStepsAndClearQueue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PauseActiveStepsAndClearQueue'

func (*MockStore_PauseActiveStepsAndClearQueue_Call) Return added in v1.9.0

func (*MockStore_PauseActiveStepsAndClearQueue_Call) Run added in v1.9.0

func (*MockStore_PauseActiveStepsAndClearQueue_Call) RunAndReturn added in v1.9.0

type MockStore_ReleaseQueueItem_Call added in v1.11.0

type MockStore_ReleaseQueueItem_Call struct {
	*mock.Call
}

MockStore_ReleaseQueueItem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseQueueItem'

func (*MockStore_ReleaseQueueItem_Call) Return added in v1.11.0

func (*MockStore_ReleaseQueueItem_Call) Run added in v1.11.0

func (*MockStore_ReleaseQueueItem_Call) RunAndReturn added in v1.11.0

type MockStore_RemoveFromQueue_Call added in v1.5.0

type MockStore_RemoveFromQueue_Call struct {
	*mock.Call
}

MockStore_RemoveFromQueue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveFromQueue'

func (*MockStore_RemoveFromQueue_Call) Return added in v1.5.0

func (*MockStore_RemoveFromQueue_Call) Run added in v1.5.0

func (*MockStore_RemoveFromQueue_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_RemoveFromQueue_Call) RunAndReturn(run func(ctx context.Context, queueID int64) error) *MockStore_RemoveFromQueue_Call

type MockStore_ReplaceInJoinWaitFor_Call added in v1.10.0

type MockStore_ReplaceInJoinWaitFor_Call struct {
	*mock.Call
}

MockStore_ReplaceInJoinWaitFor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReplaceInJoinWaitFor'

func (*MockStore_ReplaceInJoinWaitFor_Call) Return added in v1.10.0

func (*MockStore_ReplaceInJoinWaitFor_Call) Run added in v1.10.0

func (_c *MockStore_ReplaceInJoinWaitFor_Call) Run(run func(ctx context.Context, instanceID int64, joinStepName string, virtualStep string, realStep string)) *MockStore_ReplaceInJoinWaitFor_Call

func (*MockStore_ReplaceInJoinWaitFor_Call) RunAndReturn added in v1.10.0

func (_c *MockStore_ReplaceInJoinWaitFor_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, joinStepName string, virtualStep string, realStep string) error) *MockStore_ReplaceInJoinWaitFor_Call

type MockStore_RequeueDeadLetter_Call added in v1.9.0

type MockStore_RequeueDeadLetter_Call struct {
	*mock.Call
}

MockStore_RequeueDeadLetter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RequeueDeadLetter'

func (*MockStore_RequeueDeadLetter_Call) Return added in v1.9.0

func (*MockStore_RequeueDeadLetter_Call) Run added in v1.9.0

func (*MockStore_RequeueDeadLetter_Call) RunAndReturn added in v1.9.0

type MockStore_RescheduleAndReleaseQueueItem_Call added in v1.11.0

type MockStore_RescheduleAndReleaseQueueItem_Call struct {
	*mock.Call
}

MockStore_RescheduleAndReleaseQueueItem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RescheduleAndReleaseQueueItem'

func (*MockStore_RescheduleAndReleaseQueueItem_Call) Return added in v1.11.0

func (*MockStore_RescheduleAndReleaseQueueItem_Call) Run added in v1.11.0

func (*MockStore_RescheduleAndReleaseQueueItem_Call) RunAndReturn added in v1.11.0

type MockStore_SaveWorkflowDefinition_Call added in v1.5.0

type MockStore_SaveWorkflowDefinition_Call struct {
	*mock.Call
}

MockStore_SaveWorkflowDefinition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveWorkflowDefinition'

func (*MockStore_SaveWorkflowDefinition_Call) Return added in v1.5.0

func (*MockStore_SaveWorkflowDefinition_Call) Run added in v1.5.0

func (*MockStore_SaveWorkflowDefinition_Call) RunAndReturn added in v1.5.0

type MockStore_SetAgingEnabled_Call added in v1.11.2

type MockStore_SetAgingEnabled_Call struct {
	*mock.Call
}

MockStore_SetAgingEnabled_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAgingEnabled'

func (*MockStore_SetAgingEnabled_Call) Return added in v1.11.2

func (*MockStore_SetAgingEnabled_Call) Run added in v1.11.2

func (*MockStore_SetAgingEnabled_Call) RunAndReturn added in v1.11.2

func (_c *MockStore_SetAgingEnabled_Call) RunAndReturn(run func(enabled bool)) *MockStore_SetAgingEnabled_Call

type MockStore_SetAgingRate_Call added in v1.11.2

type MockStore_SetAgingRate_Call struct {
	*mock.Call
}

MockStore_SetAgingRate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetAgingRate'

func (*MockStore_SetAgingRate_Call) Return added in v1.11.2

func (*MockStore_SetAgingRate_Call) Run added in v1.11.2

func (*MockStore_SetAgingRate_Call) RunAndReturn added in v1.11.2

func (_c *MockStore_SetAgingRate_Call) RunAndReturn(run func(rate float64)) *MockStore_SetAgingRate_Call

type MockStore_UpdateInstanceStatus_Call added in v1.5.0

type MockStore_UpdateInstanceStatus_Call struct {
	*mock.Call
}

MockStore_UpdateInstanceStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateInstanceStatus'

func (*MockStore_UpdateInstanceStatus_Call) Return added in v1.5.0

func (*MockStore_UpdateInstanceStatus_Call) Run added in v1.5.0

func (*MockStore_UpdateInstanceStatus_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_UpdateInstanceStatus_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, status WorkflowStatus, output json.RawMessage, errMsg *string) error) *MockStore_UpdateInstanceStatus_Call

type MockStore_UpdateJoinState_Call added in v1.5.0

type MockStore_UpdateJoinState_Call struct {
	*mock.Call
}

MockStore_UpdateJoinState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateJoinState'

func (*MockStore_UpdateJoinState_Call) Return added in v1.5.0

func (*MockStore_UpdateJoinState_Call) Run added in v1.5.0

func (_c *MockStore_UpdateJoinState_Call) Run(run func(ctx context.Context, instanceID int64, joinStepName string, completedStep string, success bool)) *MockStore_UpdateJoinState_Call

func (*MockStore_UpdateJoinState_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_UpdateJoinState_Call) RunAndReturn(run func(ctx context.Context, instanceID int64, joinStepName string, completedStep string, success bool) (bool, error)) *MockStore_UpdateJoinState_Call

type MockStore_UpdateStepCompensationRetry_Call added in v1.5.0

type MockStore_UpdateStepCompensationRetry_Call struct {
	*mock.Call
}

MockStore_UpdateStepCompensationRetry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStepCompensationRetry'

func (*MockStore_UpdateStepCompensationRetry_Call) Return added in v1.5.0

func (*MockStore_UpdateStepCompensationRetry_Call) Run added in v1.5.0

func (*MockStore_UpdateStepCompensationRetry_Call) RunAndReturn added in v1.5.0

type MockStore_UpdateStepStatus_Call added in v1.5.0

type MockStore_UpdateStepStatus_Call struct {
	*mock.Call
}

MockStore_UpdateStepStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStepStatus'

func (*MockStore_UpdateStepStatus_Call) Return added in v1.5.0

func (*MockStore_UpdateStepStatus_Call) Run added in v1.5.0

func (*MockStore_UpdateStepStatus_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_UpdateStepStatus_Call) RunAndReturn(run func(ctx context.Context, stepID int64, status StepStatus) error) *MockStore_UpdateStepStatus_Call

type MockStore_UpdateStep_Call added in v1.5.0

type MockStore_UpdateStep_Call struct {
	*mock.Call
}

MockStore_UpdateStep_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStep'

func (*MockStore_UpdateStep_Call) Return added in v1.5.0

func (*MockStore_UpdateStep_Call) Run added in v1.5.0

func (_c *MockStore_UpdateStep_Call) Run(run func(ctx context.Context, stepID int64, status StepStatus, output json.RawMessage, errMsg *string)) *MockStore_UpdateStep_Call

func (*MockStore_UpdateStep_Call) RunAndReturn added in v1.5.0

func (_c *MockStore_UpdateStep_Call) RunAndReturn(run func(ctx context.Context, stepID int64, status StepStatus, output json.RawMessage, errMsg *string) error) *MockStore_UpdateStep_Call

type MockTx added in v1.5.0

type MockTx struct {
	mock.Mock
}

MockTx is an autogenerated mock type for the Tx type

func NewMockTx added in v1.5.0

func NewMockTx(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTx

NewMockTx creates a new instance of MockTx. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTx) EXPECT added in v1.5.0

func (_m *MockTx) EXPECT() *MockTx_Expecter

func (*MockTx) Exec added in v1.5.0

func (_mock *MockTx) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)

Exec provides a mock function for the type MockTx

func (*MockTx) Query added in v1.5.0

func (_mock *MockTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)

Query provides a mock function for the type MockTx

func (*MockTx) QueryRow added in v1.5.0

func (_mock *MockTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row

QueryRow provides a mock function for the type MockTx

func (*MockTx) SendBatch added in v1.5.0

func (_mock *MockTx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults

SendBatch provides a mock function for the type MockTx

type MockTxManager added in v1.5.0

type MockTxManager struct {
	mock.Mock
}

MockTxManager is an autogenerated mock type for the TxManager type

func NewMockTxManager added in v1.5.0

func NewMockTxManager(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTxManager

NewMockTxManager creates a new instance of MockTxManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTxManager) EXPECT added in v1.5.0

func (_m *MockTxManager) EXPECT() *MockTxManager_Expecter

func (*MockTxManager) ReadCommitted added in v1.5.0

func (_mock *MockTxManager) ReadCommitted(ctx context.Context, fn func(ctx context.Context) error) error

ReadCommitted provides a mock function for the type MockTxManager

func (*MockTxManager) RepeatableRead added in v1.5.0

func (_mock *MockTxManager) RepeatableRead(ctx context.Context, fn func(ctx context.Context) error) error

RepeatableRead provides a mock function for the type MockTxManager

type MockTxManager_Expecter added in v1.5.0

type MockTxManager_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTxManager_Expecter) ReadCommitted added in v1.5.0

func (_e *MockTxManager_Expecter) ReadCommitted(ctx interface{}, fn interface{}) *MockTxManager_ReadCommitted_Call

ReadCommitted is a helper method to define mock.On call

  • ctx context.Context
  • fn func(ctx context.Context) error

func (*MockTxManager_Expecter) RepeatableRead added in v1.5.0

func (_e *MockTxManager_Expecter) RepeatableRead(ctx interface{}, fn interface{}) *MockTxManager_RepeatableRead_Call

RepeatableRead is a helper method to define mock.On call

  • ctx context.Context
  • fn func(ctx context.Context) error

type MockTxManager_ReadCommitted_Call added in v1.5.0

type MockTxManager_ReadCommitted_Call struct {
	*mock.Call
}

MockTxManager_ReadCommitted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadCommitted'

func (*MockTxManager_ReadCommitted_Call) Return added in v1.5.0

func (*MockTxManager_ReadCommitted_Call) Run added in v1.5.0

func (*MockTxManager_ReadCommitted_Call) RunAndReturn added in v1.5.0

type MockTxManager_RepeatableRead_Call added in v1.5.0

type MockTxManager_RepeatableRead_Call struct {
	*mock.Call
}

MockTxManager_RepeatableRead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RepeatableRead'

func (*MockTxManager_RepeatableRead_Call) Return added in v1.5.0

func (*MockTxManager_RepeatableRead_Call) Run added in v1.5.0

func (*MockTxManager_RepeatableRead_Call) RunAndReturn added in v1.5.0

type MockTx_Exec_Call added in v1.5.0

type MockTx_Exec_Call struct {
	*mock.Call
}

MockTx_Exec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exec'

func (*MockTx_Exec_Call) Return added in v1.5.0

func (_c *MockTx_Exec_Call) Return(commandTag pgconn.CommandTag, err error) *MockTx_Exec_Call

func (*MockTx_Exec_Call) Run added in v1.5.0

func (_c *MockTx_Exec_Call) Run(run func(ctx context.Context, sql string, args ...any)) *MockTx_Exec_Call

func (*MockTx_Exec_Call) RunAndReturn added in v1.5.0

func (_c *MockTx_Exec_Call) RunAndReturn(run func(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)) *MockTx_Exec_Call

type MockTx_Expecter added in v1.5.0

type MockTx_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTx_Expecter) Exec added in v1.5.0

func (_e *MockTx_Expecter) Exec(ctx interface{}, sql interface{}, args ...interface{}) *MockTx_Exec_Call

Exec is a helper method to define mock.On call

  • ctx context.Context
  • sql string
  • args ...any

func (*MockTx_Expecter) Query added in v1.5.0

func (_e *MockTx_Expecter) Query(ctx interface{}, sql interface{}, args ...interface{}) *MockTx_Query_Call

Query is a helper method to define mock.On call

  • ctx context.Context
  • sql string
  • args ...any

func (*MockTx_Expecter) QueryRow added in v1.5.0

func (_e *MockTx_Expecter) QueryRow(ctx interface{}, sql interface{}, args ...interface{}) *MockTx_QueryRow_Call

QueryRow is a helper method to define mock.On call

  • ctx context.Context
  • sql string
  • args ...any

func (*MockTx_Expecter) SendBatch added in v1.5.0

func (_e *MockTx_Expecter) SendBatch(ctx interface{}, b interface{}) *MockTx_SendBatch_Call

SendBatch is a helper method to define mock.On call

  • ctx context.Context
  • b *pgx.Batch

type MockTx_QueryRow_Call added in v1.5.0

type MockTx_QueryRow_Call struct {
	*mock.Call
}

MockTx_QueryRow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRow'

func (*MockTx_QueryRow_Call) Return added in v1.5.0

func (_c *MockTx_QueryRow_Call) Return(row pgx.Row) *MockTx_QueryRow_Call

func (*MockTx_QueryRow_Call) Run added in v1.5.0

func (_c *MockTx_QueryRow_Call) Run(run func(ctx context.Context, sql string, args ...any)) *MockTx_QueryRow_Call

func (*MockTx_QueryRow_Call) RunAndReturn added in v1.5.0

func (_c *MockTx_QueryRow_Call) RunAndReturn(run func(ctx context.Context, sql string, args ...any) pgx.Row) *MockTx_QueryRow_Call

type MockTx_Query_Call added in v1.5.0

type MockTx_Query_Call struct {
	*mock.Call
}

MockTx_Query_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Query'

func (*MockTx_Query_Call) Return added in v1.5.0

func (_c *MockTx_Query_Call) Return(rows pgx.Rows, err error) *MockTx_Query_Call

func (*MockTx_Query_Call) Run added in v1.5.0

func (_c *MockTx_Query_Call) Run(run func(ctx context.Context, sql string, args ...any)) *MockTx_Query_Call

func (*MockTx_Query_Call) RunAndReturn added in v1.5.0

func (_c *MockTx_Query_Call) RunAndReturn(run func(ctx context.Context, sql string, args ...any) (pgx.Rows, error)) *MockTx_Query_Call

type MockTx_SendBatch_Call added in v1.5.0

type MockTx_SendBatch_Call struct {
	*mock.Call
}

MockTx_SendBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendBatch'

func (*MockTx_SendBatch_Call) Return added in v1.5.0

func (_c *MockTx_SendBatch_Call) Return(batchResults pgx.BatchResults) *MockTx_SendBatch_Call

func (*MockTx_SendBatch_Call) Run added in v1.5.0

func (_c *MockTx_SendBatch_Call) Run(run func(ctx context.Context, b *pgx.Batch)) *MockTx_SendBatch_Call

func (*MockTx_SendBatch_Call) RunAndReturn added in v1.5.0

func (_c *MockTx_SendBatch_Call) RunAndReturn(run func(ctx context.Context, b *pgx.Batch) pgx.BatchResults) *MockTx_SendBatch_Call

type Monitor added in v1.0.2

type Monitor interface {
	GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)
}

type MonitorImpl added in v1.3.0

type MonitorImpl struct {
	// contains filtered or unexported fields
}

func NewMonitor added in v1.0.2

func NewMonitor(pool *pgxpool.Pool) *MonitorImpl

func (*MonitorImpl) GetActiveWorkflows added in v1.3.0

func (m *MonitorImpl) GetActiveWorkflows(ctx context.Context) ([]ActiveWorkflow, error)

func (*MonitorImpl) GetQueueLength added in v1.3.0

func (m *MonitorImpl) GetQueueLength(ctx context.Context) (int, error)

func (*MonitorImpl) GetWorkflowStats added in v1.3.0

func (m *MonitorImpl) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)

type Plugin added in v1.8.0

type Plugin interface {
	// Name returns unique plugin identifier
	Name() string

	// Priority determines execution order (higher = earlier)
	Priority() Priority

	// Lifecycle hooks
	OnWorkflowStart(ctx context.Context, instance *WorkflowInstance) error
	OnWorkflowComplete(ctx context.Context, instance *WorkflowInstance) error
	OnWorkflowFailed(ctx context.Context, instance *WorkflowInstance) error
	OnStepStart(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error
	OnStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error
	OnStepFailed(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep, err error) error
	OnRollbackStepChain(ctx context.Context, instanceID int64, stepName string, depth int) error
}

Plugin represents a lifecycle hook system for workflows

type PluginManager added in v1.8.0

type PluginManager struct {
	// contains filtered or unexported fields
}

PluginManager manages plugin lifecycle

func NewPluginManager added in v1.8.0

func NewPluginManager() *PluginManager

func (*PluginManager) ExecuteRollbackStepChain added in v1.10.0

func (pm *PluginManager) ExecuteRollbackStepChain(ctx context.Context, instanceID int64, stepName string, depth int) error

func (*PluginManager) ExecuteStepComplete added in v1.8.0

func (pm *PluginManager) ExecuteStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error

func (*PluginManager) ExecuteStepFailed added in v1.8.0

func (pm *PluginManager) ExecuteStepFailed(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep, err error) error

func (*PluginManager) ExecuteStepStart added in v1.8.0

func (pm *PluginManager) ExecuteStepStart(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error

func (*PluginManager) ExecuteWorkflowComplete added in v1.8.0

func (pm *PluginManager) ExecuteWorkflowComplete(ctx context.Context, instance *WorkflowInstance) error

func (*PluginManager) ExecuteWorkflowFailed added in v1.8.0

func (pm *PluginManager) ExecuteWorkflowFailed(ctx context.Context, instance *WorkflowInstance) error

func (*PluginManager) ExecuteWorkflowStart added in v1.8.0

func (pm *PluginManager) ExecuteWorkflowStart(ctx context.Context, instance *WorkflowInstance) error

func (*PluginManager) Register added in v1.8.0

func (pm *PluginManager) Register(plugin Plugin)

type Priority added in v1.11.2

type Priority int
const (
	PriorityLow    Priority = 0
	PriorityLower  Priority = 25
	PriorityNormal Priority = 50
	PriorityHigher Priority = 75
	PriorityHigh   Priority = 100
)

type QueueItem

type QueueItem struct {
	ID          int64      `json:"id"`
	InstanceID  int64      `json:"instance_id"`
	StepID      *int64     `json:"step_id"`
	ScheduledAt time.Time  `json:"scheduled_at"`
	AttemptedAt *time.Time `json:"attempted_at"`
	AttemptedBy *string    `json:"attempted_by"`
	Priority    int        `json:"priority"`
}

type RetryStrategy added in v1.9.0

type RetryStrategy uint8
const (
	RetryStrategyFixed       RetryStrategy = iota // Fixed delay between retries
	RetryStrategyExponential                      // Exponential backoff: delay = base * 2^attempt
	RetryStrategyLinear                           // Linear backoff: delay = base * attempt
)

type SQLiteStore added in v1.13.0

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore provides a lightweight Store backed by SQLite. It implements only the subset of capabilities required by the SQLite tests. Non‑essential methods return a not-implemented error.

func NewSQLiteInMemoryStore added in v1.13.0

func NewSQLiteInMemoryStore() (*SQLiteStore, error)

NewSQLiteInMemoryStore creates an in-memory SQLite database and initializes schema. This is useful for testing. For production use, prefer NewSQLiteStore with a file path.

func NewSQLiteStore added in v1.13.0

func NewSQLiteStore(filepath string) (*SQLiteStore, error)

NewSQLiteStore creates a persistent SQLite database stored in a file and initializes schema. The filepath parameter specifies the path to the SQLite database file. If the file doesn't exist, it will be created automatically.

func (*SQLiteStore) AddToJoinWaitFor added in v1.13.0

func (s *SQLiteStore) AddToJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, stepToAdd string) error

func (*SQLiteStore) CleanupOldWorkflows added in v1.13.0

func (s *SQLiteStore) CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error)

func (*SQLiteStore) Close added in v1.13.0

func (s *SQLiteStore) Close() error

Close closes the database connection. It's recommended to call this method when the store is no longer needed, especially for persistent SQLite stores.

func (*SQLiteStore) CreateCancelRequest added in v1.13.0

func (s *SQLiteStore) CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error

func (*SQLiteStore) CreateDeadLetterRecord added in v1.13.0

func (s *SQLiteStore) CreateDeadLetterRecord(ctx context.Context, rec *DeadLetterRecord) error

func (*SQLiteStore) CreateHumanDecision added in v1.13.0

func (s *SQLiteStore) CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error

func (*SQLiteStore) CreateInstance added in v1.13.0

func (s *SQLiteStore) CreateInstance(ctx context.Context, workflowID string, input json.RawMessage) (*WorkflowInstance, error)

Instances

func (*SQLiteStore) CreateJoinState added in v1.13.0

func (s *SQLiteStore) CreateJoinState(ctx context.Context, instanceID int64, joinStepName string, waitingFor []string, strategy JoinStrategy) error

Joins and other auxiliary features — minimal or not implemented for now

func (*SQLiteStore) CreateStep added in v1.13.0

func (s *SQLiteStore) CreateStep(ctx context.Context, step *WorkflowStep) error

Steps

func (*SQLiteStore) DeleteCancelRequest added in v1.13.0

func (s *SQLiteStore) DeleteCancelRequest(ctx context.Context, instanceID int64) error

func (*SQLiteStore) DequeueStep added in v1.13.0

func (s *SQLiteStore) DequeueStep(ctx context.Context, workerID string) (*QueueItem, error)

func (*SQLiteStore) EnqueueStep added in v1.13.0

func (s *SQLiteStore) EnqueueStep(ctx context.Context, instanceID int64, stepID *int64, priority Priority, delay time.Duration) error

Queue

func (*SQLiteStore) GetActiveInstances added in v1.13.0

func (s *SQLiteStore) GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error)

func (*SQLiteStore) GetActiveStepsForUpdate added in v1.13.0

func (s *SQLiteStore) GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*SQLiteStore) GetAllWorkflowInstances added in v1.13.0

func (s *SQLiteStore) GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error)

func (*SQLiteStore) GetCancelRequest added in v1.13.0

func (s *SQLiteStore) GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error)

func (*SQLiteStore) GetDeadLetterByID added in v1.13.0

func (s *SQLiteStore) GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error)

func (*SQLiteStore) GetHumanDecision added in v1.13.0

func (s *SQLiteStore) GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error)

func (*SQLiteStore) GetHumanDecisionStepByInstanceID added in v1.13.0

func (s *SQLiteStore) GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error)

func (*SQLiteStore) GetInstance added in v1.13.0

func (s *SQLiteStore) GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error)

func (*SQLiteStore) GetJoinState added in v1.13.0

func (s *SQLiteStore) GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error)

func (*SQLiteStore) GetStepByID added in v1.13.0

func (s *SQLiteStore) GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error)

func (*SQLiteStore) GetStepsByInstance added in v1.13.0

func (s *SQLiteStore) GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*SQLiteStore) GetSummaryStats added in v1.13.0

func (s *SQLiteStore) GetSummaryStats(ctx context.Context) (*SummaryStats, error)

func (*SQLiteStore) GetWorkflowDefinition added in v1.13.0

func (s *SQLiteStore) GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error)

func (*SQLiteStore) GetWorkflowDefinitions added in v1.13.0

func (s *SQLiteStore) GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error)

func (*SQLiteStore) GetWorkflowEvents added in v1.13.0

func (s *SQLiteStore) GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error)

func (*SQLiteStore) GetWorkflowInstances added in v1.13.0

func (s *SQLiteStore) GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error)

func (*SQLiteStore) GetWorkflowStats added in v1.13.0

func (s *SQLiteStore) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)

Minimal implementation to satisfy interface; not used in SQLite tests

func (*SQLiteStore) GetWorkflowSteps added in v1.13.0

func (s *SQLiteStore) GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*SQLiteStore) ListDeadLetters added in v1.13.0

func (s *SQLiteStore) ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error)

func (*SQLiteStore) LogEvent added in v1.13.0

func (s *SQLiteStore) LogEvent(ctx context.Context, instanceID int64, stepID *int64, eventType string, payload any) error

Events

func (*SQLiteStore) PauseActiveStepsAndClearQueue added in v1.13.0

func (s *SQLiteStore) PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error

func (*SQLiteStore) ReleaseQueueItem added in v1.13.0

func (s *SQLiteStore) ReleaseQueueItem(ctx context.Context, queueID int64) error

func (*SQLiteStore) RemoveFromQueue added in v1.13.0

func (s *SQLiteStore) RemoveFromQueue(ctx context.Context, queueID int64) error

func (*SQLiteStore) ReplaceInJoinWaitFor added in v1.13.0

func (s *SQLiteStore) ReplaceInJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, virtualStep, realStep string) error

func (*SQLiteStore) RequeueDeadLetter added in v1.13.0

func (s *SQLiteStore) RequeueDeadLetter(ctx context.Context, dlqID int64, newInput *json.RawMessage) error

func (*SQLiteStore) RescheduleAndReleaseQueueItem added in v1.13.0

func (s *SQLiteStore) RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error

func (*SQLiteStore) SaveWorkflowDefinition added in v1.13.0

func (s *SQLiteStore) SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error

Definitions

func (*SQLiteStore) SetAgingEnabled added in v1.13.0

func (s *SQLiteStore) SetAgingEnabled(enabled bool)

func (*SQLiteStore) SetAgingRate added in v1.13.0

func (s *SQLiteStore) SetAgingRate(rate float64)

SetAgingRate sets the aging rate for queue priority adjustment. The rate is clamped to [MinAgingRate, MaxAgingRate] to prevent SQL injection and ensure valid SQL expressions. NaN and Inf values are clamped to 0.0.

func (*SQLiteStore) UpdateInstanceStatus added in v1.13.0

func (s *SQLiteStore) UpdateInstanceStatus(ctx context.Context, instanceID int64, status WorkflowStatus, output json.RawMessage, errMsg *string) error

func (*SQLiteStore) UpdateJoinState added in v1.13.0

func (s *SQLiteStore) UpdateJoinState(ctx context.Context, instanceID int64, joinStepName, completedStep string, success bool) (bool, error)

func (*SQLiteStore) UpdateStep added in v1.13.0

func (s *SQLiteStore) UpdateStep(ctx context.Context, stepID int64, status StepStatus, output json.RawMessage, errMsg *string) error

func (*SQLiteStore) UpdateStepCompensationRetry added in v1.13.0

func (s *SQLiteStore) UpdateStepCompensationRetry(ctx context.Context, stepID int64, retryCount int, status StepStatus) error

func (*SQLiteStore) UpdateStepStatus added in v1.13.0

func (s *SQLiteStore) UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error

type StepContext

type StepContext interface {
	InstanceID() int64
	StepName() string
	IdempotencyKey() string
	RetryCount() int
	CloneData() map[string]any
	GetVariable(key string) (any, bool)
	GetVariableAsString(key string) (string, bool)
}

type StepDefinition

type StepDefinition struct {
	Name          string         `json:"name"`
	Type          StepType       `json:"type"`
	Handler       string         `json:"handler"`
	MaxRetries    int            `json:"max_retries"`
	Next          []string       `json:"next"`
	Prev          string         `json:"prev"`           // previous step in the chain
	Else          string         `json:"else,omitempty"` // alternative step for condition steps for false branch
	OnFailure     string         `json:"on_failure"`     // compensation step
	Condition     string         `json:"condition"`      // for conditional transitions
	Parallel      []string       `json:"parallel"`       // for parallel steps (fork)
	WaitFor       []string       `json:"wait_for"`       // for join, we are waiting for these steps to be completed
	JoinStrategy  JoinStrategy   `json:"join_strategy"`  // "all" (default) or "any"
	Metadata      map[string]any `json:"metadata"`
	NoIdempotent  bool           `json:"no_idempotent"`
	Delay         time.Duration  `json:"delay"`
	RetryDelay    time.Duration  `json:"retry_delay"`
	RetryStrategy RetryStrategy  `json:"retry_strategy"` // Strategy for retry delays: fixed, exponential, linear
	Timeout       time.Duration  `json:"timeout"`
}

func NewTask

func NewTask(name, handler string, opts ...StepOption) *StepDefinition

type StepHandler

type StepHandler interface {
	Execute(ctx context.Context, stepCtx StepContext, input json.RawMessage) (json.RawMessage, error)
	Name() string
}

type StepOption

type StepOption func(step *StepDefinition)

func WithStepDelay added in v1.6.0

func WithStepDelay(delay time.Duration) StepOption

func WithStepMaxRetries

func WithStepMaxRetries(maxRetries int) StepOption

func WithStepMetadata

func WithStepMetadata(metadata map[string]any) StepOption

func WithStepNoIdempotent

func WithStepNoIdempotent() StepOption

func WithStepRetryDelay added in v1.9.0

func WithStepRetryDelay(retryDelay time.Duration) StepOption

func WithStepRetryStrategy added in v1.9.0

func WithStepRetryStrategy(strategy RetryStrategy) StepOption

func WithStepTimeout added in v1.6.0

func WithStepTimeout(timeout time.Duration) StepOption

type StepStatus

type StepStatus string
const (
	StepStatusPending         StepStatus = "pending"
	StepStatusRunning         StepStatus = "running"
	StepStatusCompleted       StepStatus = "completed"
	StepStatusFailed          StepStatus = "failed"
	StepStatusSkipped         StepStatus = "skipped"
	StepStatusCompensation    StepStatus = "compensation"
	StepStatusRolledBack      StepStatus = "rolled_back"
	StepStatusWaitingDecision StepStatus = "waiting_decision"
	StepStatusConfirmed       StepStatus = "confirmed"
	StepStatusRejected        StepStatus = "rejected"
	StepStatusPaused          StepStatus = "paused"
)

type StepType

type StepType string
const (
	StepTypeTask      StepType = "task"
	StepTypeParallel  StepType = "parallel"
	StepTypeCondition StepType = "condition"
	StepTypeFork      StepType = "fork"
	StepTypeJoin      StepType = "join"
	StepTypeSavePoint StepType = "save_point"
	StepTypeHuman     StepType = "human"
)

type Store

type Store interface {
	SetAgingEnabled(enabled bool)
	SetAgingRate(rate float64)
	SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error
	GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error)
	CreateInstance(
		ctx context.Context,
		workflowID string,
		input json.RawMessage,
	) (*WorkflowInstance, error)
	UpdateInstanceStatus(
		ctx context.Context,
		instanceID int64,
		status WorkflowStatus,
		output json.RawMessage,
		errMsg *string,
	) error
	GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error)
	CreateStep(ctx context.Context, step *WorkflowStep) error
	UpdateStep(
		ctx context.Context,
		stepID int64,
		status StepStatus,
		output json.RawMessage,
		errMsg *string,
	) error
	GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error)
	EnqueueStep(
		ctx context.Context,
		instanceID int64,
		stepID *int64,
		priority Priority,
		delay time.Duration,
	) error
	UpdateStepCompensationRetry(
		ctx context.Context,
		stepID int64,
		retryCount int,
		status StepStatus,
	) error
	DequeueStep(ctx context.Context, workerID string) (*QueueItem, error)
	RemoveFromQueue(ctx context.Context, queueID int64) error
	ReleaseQueueItem(ctx context.Context, queueID int64) error
	RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error
	LogEvent(
		ctx context.Context,
		instanceID int64,
		stepID *int64,
		eventType string,
		payload any,
	) error
	CreateJoinState(
		ctx context.Context,
		instanceID int64,
		joinStepName string,
		waitingFor []string,
		strategy JoinStrategy,
	) error
	UpdateJoinState(
		ctx context.Context,
		instanceID int64,
		joinStepName, completedStep string,
		success bool,
	) (bool, error)
	GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error)
	AddToJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, stepToAdd string) error
	ReplaceInJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, virtualStep, realStep string) error
	GetSummaryStats(ctx context.Context) (*SummaryStats, error)
	GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error)
	GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error)
	GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error)
	GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error)
	GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error)
	GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error)
	GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)
	GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error)
	CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error
	GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error)
	DeleteCancelRequest(ctx context.Context, instanceID int64) error

	// Human decision methods
	CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error
	GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error)
	UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error
	GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error)
	GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error)

	// DLQ methods
	CreateDeadLetterRecord(ctx context.Context, rec *DeadLetterRecord) error
	RequeueDeadLetter(
		ctx context.Context,
		dlqID int64,
		newInput *json.RawMessage,
	) error
	ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error)
	GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error)
	PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error

	// Cleanup methods
	CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error)
}

type StoreImpl

type StoreImpl struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(pool *pgxpool.Pool) *StoreImpl

func (*StoreImpl) AddToJoinWaitFor added in v1.10.0

func (store *StoreImpl) AddToJoinWaitFor(
	ctx context.Context,
	instanceID int64,
	joinStepName, stepToAdd string,
) error

func (*StoreImpl) CleanupOldWorkflows added in v1.7.0

func (store *StoreImpl) CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error)

func (*StoreImpl) CreateCancelRequest added in v1.6.1

func (store *StoreImpl) CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error

func (*StoreImpl) CreateDeadLetterRecord added in v1.9.0

func (store *StoreImpl) CreateDeadLetterRecord(
	ctx context.Context,
	rec *DeadLetterRecord,
) error

func (*StoreImpl) CreateHumanDecision added in v1.5.0

func (store *StoreImpl) CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error

func (*StoreImpl) CreateInstance

func (store *StoreImpl) CreateInstance(
	ctx context.Context,
	workflowID string,
	input json.RawMessage,
) (*WorkflowInstance, error)

func (*StoreImpl) CreateJoinState

func (store *StoreImpl) CreateJoinState(
	ctx context.Context,
	instanceID int64,
	joinStepName string,
	waitingFor []string,
	strategy JoinStrategy,
) error

func (*StoreImpl) CreateStep

func (store *StoreImpl) CreateStep(ctx context.Context, step *WorkflowStep) error

func (*StoreImpl) DeleteCancelRequest added in v1.6.1

func (store *StoreImpl) DeleteCancelRequest(ctx context.Context, instanceID int64) error

func (*StoreImpl) DequeueStep

func (store *StoreImpl) DequeueStep(ctx context.Context, workerID string) (*QueueItem, error)

func (*StoreImpl) EnqueueStep

func (store *StoreImpl) EnqueueStep(
	ctx context.Context,
	instanceID int64,
	stepID *int64,
	priority Priority,
	delay time.Duration,
) error

func (*StoreImpl) GetActiveInstances added in v1.3.0

func (store *StoreImpl) GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error)

func (*StoreImpl) GetActiveStepsForUpdate added in v1.8.0

func (store *StoreImpl) GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*StoreImpl) GetAllWorkflowInstances added in v1.3.0

func (store *StoreImpl) GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error)

func (*StoreImpl) GetCancelRequest added in v1.6.1

func (store *StoreImpl) GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error)

func (*StoreImpl) GetDeadLetterByID added in v1.9.0

func (store *StoreImpl) GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error)

func (*StoreImpl) GetHumanDecision added in v1.5.0

func (store *StoreImpl) GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error)

func (*StoreImpl) GetHumanDecisionStepByInstanceID added in v1.5.0

func (store *StoreImpl) GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error)

func (*StoreImpl) GetInstance

func (store *StoreImpl) GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error)

func (*StoreImpl) GetJoinState

func (store *StoreImpl) GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error)

func (*StoreImpl) GetStepByID added in v1.5.0

func (store *StoreImpl) GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error)

func (*StoreImpl) GetStepsByInstance

func (store *StoreImpl) GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*StoreImpl) GetSummaryStats added in v1.3.0

func (store *StoreImpl) GetSummaryStats(ctx context.Context) (*SummaryStats, error)

func (*StoreImpl) GetWorkflowDefinition

func (store *StoreImpl) GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error)

func (*StoreImpl) GetWorkflowDefinitions added in v1.3.0

func (store *StoreImpl) GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error)

func (*StoreImpl) GetWorkflowEvents added in v1.3.0

func (store *StoreImpl) GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error)

func (*StoreImpl) GetWorkflowInstances added in v1.3.0

func (store *StoreImpl) GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error)

func (*StoreImpl) GetWorkflowStats added in v1.4.0

func (store *StoreImpl) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error)

func (*StoreImpl) GetWorkflowSteps added in v1.3.0

func (store *StoreImpl) GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error)

func (*StoreImpl) ListDeadLetters added in v1.9.0

func (store *StoreImpl) ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error)

func (*StoreImpl) LogEvent

func (store *StoreImpl) LogEvent(
	ctx context.Context,
	instanceID int64,
	stepID *int64,
	eventType string,
	payload any,
) error

func (*StoreImpl) PauseActiveStepsAndClearQueue added in v1.9.0

func (store *StoreImpl) PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error

func (*StoreImpl) ReleaseQueueItem added in v1.11.0

func (store *StoreImpl) ReleaseQueueItem(ctx context.Context, queueID int64) error

func (*StoreImpl) RemoveFromQueue

func (store *StoreImpl) RemoveFromQueue(ctx context.Context, queueID int64) error

func (*StoreImpl) ReplaceInJoinWaitFor added in v1.10.0

func (store *StoreImpl) ReplaceInJoinWaitFor(
	ctx context.Context,
	instanceID int64,
	joinStepName, virtualStep, realStep string,
) error

func (*StoreImpl) RequeueDeadLetter added in v1.9.0

func (store *StoreImpl) RequeueDeadLetter(
	ctx context.Context,
	dlqID int64,
	newInput *json.RawMessage,
) error

func (*StoreImpl) RescheduleAndReleaseQueueItem added in v1.11.0

func (store *StoreImpl) RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error

func (*StoreImpl) SaveWorkflowDefinition

func (store *StoreImpl) SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error

func (*StoreImpl) SetAgingEnabled added in v1.11.2

func (store *StoreImpl) SetAgingEnabled(enabled bool)

func (*StoreImpl) SetAgingRate added in v1.11.2

func (store *StoreImpl) SetAgingRate(rate float64)

func (*StoreImpl) UpdateInstanceStatus

func (store *StoreImpl) UpdateInstanceStatus(
	ctx context.Context,
	instanceID int64,
	status WorkflowStatus,
	output json.RawMessage,
	errMsg *string,
) error

func (*StoreImpl) UpdateJoinState

func (store *StoreImpl) UpdateJoinState(
	ctx context.Context,
	instanceID int64,
	joinStepName, completedStep string,
	success bool,
) (bool, error)

func (*StoreImpl) UpdateStep

func (store *StoreImpl) UpdateStep(
	ctx context.Context,
	stepID int64,
	status StepStatus,
	output json.RawMessage,
	errMsg *string,
) error

func (*StoreImpl) UpdateStepCompensationRetry added in v1.0.4

func (store *StoreImpl) UpdateStepCompensationRetry(
	ctx context.Context,
	stepID int64,
	retryCount int,
	status StepStatus,
) error

func (*StoreImpl) UpdateStepStatus added in v1.5.0

func (store *StoreImpl) UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error

type SummaryStats added in v1.3.0

type SummaryStats struct {
	TotalWorkflows     uint `json:"total_workflows"`
	CompletedWorkflows uint `json:"completed_workflows"`
	FailedWorkflows    uint `json:"failed_workflows"`
	RunningWorkflows   uint `json:"running_workflows"`
	PendingWorkflows   uint `json:"pending_workflows"`
	ActiveWorkflows    uint `json:"active_workflows"`
}

type Tx

type Tx interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
	SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
}

func TxFromContext

func TxFromContext(ctx context.Context) Tx

type TxManager

type TxManager interface {
	ReadCommitted(ctx context.Context, fn func(ctx context.Context) error) error
	RepeatableRead(ctx context.Context, fn func(ctx context.Context) error) error
}

type TxManagerImpl

type TxManagerImpl struct {
	// contains filtered or unexported fields
}

func NewTxManager

func NewTxManager(pool *pgxpool.Pool) *TxManagerImpl

func (*TxManagerImpl) ReadCommitted

func (m *TxManagerImpl) ReadCommitted(ctx context.Context, fn func(ctx context.Context) error) error

func (*TxManagerImpl) RepeatableRead

func (m *TxManagerImpl) RepeatableRead(ctx context.Context, fn func(ctx context.Context) error) error

type TypedHandler added in v1.9.0

type TypedHandler[IN, OUT any] struct {
	// contains filtered or unexported fields
}

=== TypedHandler ===

func NewTypedHandler added in v1.9.0

func NewTypedHandler[I, O any](
	name string,
	fn func(ctx context.Context, stepCtx StepContext, input I) (O, error),
) *TypedHandler[I, O]

func (*TypedHandler[IN, OUT]) Execute added in v1.9.0

func (h *TypedHandler[IN, OUT]) Execute(
	ctx context.Context,
	stepCtx StepContext,
	input json.RawMessage,
) (json.RawMessage, error)

func (*TypedHandler[IN, OUT]) Name added in v1.9.0

func (h *TypedHandler[IN, OUT]) Name() string

type Visualizer added in v1.0.2

type Visualizer struct{}

func NewVisualizer added in v1.0.2

func NewVisualizer() *Visualizer

func (*Visualizer) RenderGraph added in v1.0.2

func (v *Visualizer) RenderGraph(def *WorkflowDefinition) string

func (*Visualizer) RenderInstanceStatus added in v1.6.0

func (v *Visualizer) RenderInstanceStatus(instance *WorkflowInstance, steps []WorkflowStep) string

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(engine *Engine, interval time.Duration) *Worker

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

func (*Worker) Stop

func (w *Worker) Stop()

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(engine *Engine, size int, interval time.Duration) *WorkerPool

func (*WorkerPool) Size

func (p *WorkerPool) Size() int

func (*WorkerPool) Start

func (p *WorkerPool) Start(ctx context.Context)

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

type WorkflowCancelRequest added in v1.6.1

type WorkflowCancelRequest struct {
	ID          int64      `json:"id"`
	InstanceID  int64      `json:"instance_id"`
	RequestedBy string     `json:"requested_by"`
	CancelType  CancelType `json:"cancel_type"`
	Reason      *string    `json:"reason"`
	CreatedAt   time.Time  `json:"created_at"`
}

type WorkflowDefinition

type WorkflowDefinition struct {
	ID         string          `json:"id"`
	Name       string          `json:"name"`
	Version    int             `json:"version"`
	Definition GraphDefinition `json:"definition"`
	CreatedAt  time.Time       `json:"created_at"`
}

type WorkflowEvent

type WorkflowEvent struct {
	ID         int64           `json:"id"`
	InstanceID int64           `json:"instance_id"`
	StepID     *int64          `json:"step_id"`
	EventType  string          `json:"event_type"`
	Payload    json.RawMessage `json:"payload"`
	CreatedAt  time.Time       `json:"created_at"`
}

type WorkflowInstance

type WorkflowInstance struct {
	ID          int64           `json:"id"`
	WorkflowID  string          `json:"workflow_id"`
	Status      WorkflowStatus  `json:"status"`
	Input       json.RawMessage `json:"input"`
	Output      json.RawMessage `json:"output"`
	Error       *string         `json:"error"`
	StartedAt   *time.Time      `json:"started_at"`
	CompletedAt *time.Time      `json:"completed_at"`
	CreatedAt   time.Time       `json:"created_at"`
	UpdatedAt   time.Time       `json:"updated_at"`
}

type WorkflowStats added in v1.0.2

type WorkflowStats struct {
	WorkflowName       string        `json:"workflow_name"`
	Version            int           `json:"version"`
	TotalInstances     int           `json:"total_instances"`
	CompletedInstances int           `json:"completed_instances"`
	FailedInstances    int           `json:"failed_instances"`
	RunningInstances   int           `json:"running_instances"`
	AverageDuration    time.Duration `json:"average_duration"`
}

type WorkflowStatus

type WorkflowStatus string
const (
	StatusPending     WorkflowStatus = "pending"
	StatusRunning     WorkflowStatus = "running"
	StatusCompleted   WorkflowStatus = "completed"
	StatusFailed      WorkflowStatus = "failed"
	StatusRollingBack WorkflowStatus = "rolling_back"
	StatusCancelling  WorkflowStatus = "cancelling"
	StatusCancelled   WorkflowStatus = "cancelled"
	StatusAborted     WorkflowStatus = "aborted"
	StatusDLQ         WorkflowStatus = "dlq"
)

type WorkflowStep

type WorkflowStep struct {
	ID                     int64           `json:"id"`
	InstanceID             int64           `json:"instance_id"`
	StepName               string          `json:"step_name"`
	StepType               StepType        `json:"step_type"`
	Status                 StepStatus      `json:"status"`
	Input                  json.RawMessage `json:"input"`
	Output                 json.RawMessage `json:"output"`
	Error                  *string         `json:"error"`
	RetryCount             int             `json:"retry_count"`
	MaxRetries             int             `json:"max_retries"`
	CompensationRetryCount int             `json:"compensation_retry_count"`
	IdempotencyKey         string          `json:"idempotency_key"`
	StartedAt              *time.Time      `json:"started_at"`
	CompletedAt            *time.Time      `json:"completed_at"`
	CreatedAt              time.Time       `json:"created_at"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL