Documentation
¶
Index ¶
- Constants
- Variables
- func CalcNextRunTime(j Job) (time.Time, error)
- func FuncMapReadable() []map[string]string
- func GetNextRunTimeMax() (time.Time, error)
- func JobMarshal(j Job) ([]byte, error)
- func JobToPbJobPtr(j Job) (*pb.Job, error)
- func JobsToPbJobsPtr(js []Job) ([]*pb.Job, error)
- func QueueToPbQueuePtr(q map[string]any) (*pb.Queue, error)
- func QueuesToPbQueuesPtr(qs []map[string]any) ([]*pb.Queue, error)
- func RecordToPbRecordPtr(r Record) (*pb.Record, error)
- func RecordsToPbRecordsPtr(rs []Record) ([]*pb.Record, error)
- func RegisterFuncs(fps ...FuncPkg)
- type Backend
- type Broker
- type CallbackPkg
- type ClusterNode
- func (cn *ClusterNode) GetEndpointMain() string
- func (cn *ClusterNode) HANodeMap() TypeNodeMap
- func (cn *ClusterNode) IsMainNode() bool
- func (cn *ClusterNode) MainNode() map[string]any
- func (cn *ClusterNode) NodeMapCopy() TypeNodeMap
- func (cn *ClusterNode) NodeMapToPbNodesPtr() map[string]*pb.Node
- func (cn *ClusterNode) RPCHeartbeat(args *Node, reply *Node)
- func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)
- func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error
- func (cn *ClusterNode) SetEndpointMain(endpoint string)
- type EventPkg
- type FuncPkg
- type FuncUnregisteredError
- type HeartbeatArgs
- type HeartbeatReply
- type Job
- type JobNotFoundError
- type JobSlice
- type JobTimeoutError
- type Listener
- type Node
- type Queue
- type QueuePkg
- type Raft
- type Record
- type RecordSlice
- type Recorder
- func (r *Recorder) Clear() error
- func (r *Recorder) DeleteAllRecords() error
- func (r *Recorder) DeleteRecords(jId string) error
- func (r *Recorder) GetAllRecords(page, pageSize int) ([]Record, int64, error)
- func (r *Recorder) GetRecords(jId string, page, pageSize int) ([]Record, int64, error)
- func (r *Recorder) RecordMetadata(j Job) (id uint64, err error)
- func (r *Recorder) RecordResult(id uint64, status string, result string) error
- type Role
- type Scheduler
- func (s *Scheduler) AddJob(j Job) (Job, error)
- func (s *Scheduler) DeleteAllJobs() error
- func (s *Scheduler) DeleteJob(id string) error
- func (s *Scheduler) GetAllJobs() ([]Job, error)
- func (s *Scheduler) GetJob(id string) (Job, error)
- func (s *Scheduler) HasBroker() bool
- func (s *Scheduler) HasListener() bool
- func (s *Scheduler) HasRecorder() bool
- func (s *Scheduler) Info() map[string]any
- func (s *Scheduler) IsClusterMode() bool
- func (s *Scheduler) IsRunning() bool
- func (s *Scheduler) PauseJob(id string) (Job, error)
- func (s *Scheduler) ResumeJob(id string) (Job, error)
- func (s *Scheduler) RunJob(j Job) error
- func (s *Scheduler) ScheduleJob(j Job) error
- func (s *Scheduler) SetBroker(ctx context.Context, brk *Broker) error
- func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error
- func (s *Scheduler) SetListener(lis *Listener) error
- func (s *Scheduler) SetRecorder(rec *Recorder) error
- func (s *Scheduler) SetStore(sto Store) error
- func (s *Scheduler) Start()
- func (s *Scheduler) Stop()
- func (s *Scheduler) UpdateJob(j Job) (Job, error)
- type Store
- type TypeNodeMap
- type VoteArgs
- type VoteReply
Constants ¶
const ( JOB_TYPE_DATETIME = "datetime" JOB_TYPE_INTERVAL = "interval" JOB_TYPE_CRON = "cron" )
constant indicating a job's type
const ( JOB_STATUS_RUNNING = "running" JOB_STATUS_PAUSED = "paused" )
constant indicating a job's status
const ( EVENT_SCHEDULER_STARTED event = 1 << iota EVENT_SCHEDULER_STOPPED EVENT_JOB_ADDED EVENT_JOB_UPDATED EVENT_JOB_DELETED EVENT_ALL_JOBS_DELETED EVENT_JOB_PAUSED EVENT_JOB_RESUMED EVENT_JOB_EXECUTED EVENT_JOB_ERROR EVENT_JOB_TIMEOUT EVENT_JOB_MAX_INSTANCES EVENT_ALL event = EVENT_SCHEDULER_STARTED | EVENT_SCHEDULER_STOPPED | EVENT_JOB_ADDED | EVENT_JOB_UPDATED | EVENT_JOB_DELETED | EVENT_ALL_JOBS_DELETED | EVENT_JOB_PAUSED | EVENT_JOB_RESUMED | EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_TIMEOUT | EVENT_JOB_MAX_INSTANCES )
constant indicating the event.
const ( RECORD_STATUS_RUNNING = "running" RECORD_STATUS_COMPLETED = "completed" RECORD_STATUS_ERROR = "error" RECORD_STATUS_TIMEOUT = "timeout" )
constant indicating the status of the job record
const Version = "0.12.0"
Variables ¶
var FuncMap = make(map[string]FuncPkg)
Record the actual path of function and the corresponding function. Since golang can't serialize functions, need to register them with `RegisterFuncs` before using it.
var GetBroker = (*Scheduler).getBroker
var GetClusterNode = (*Scheduler).getClusterNode
var GetListener = (*Scheduler).getListener
var GetRecorder = (*Scheduler).getRecorder
var GetStore = (*Scheduler).getStore
Functions ¶
func CalcNextRunTime ¶
Calculate the next run time, different job type will be calculated in different ways, when the job is paused, will return `9999-09-09 09:09:09`.
func FuncMapReadable ¶
func GetNextRunTimeMax ¶
func JobMarshal ¶ added in v0.8.3
Serialize Job and convert to Bytes
func QueueToPbQueuePtr ¶ added in v0.10.0
Used to gRPC Protobuf
func QueuesToPbQueuesPtr ¶ added in v0.10.0
Used to gRPC Protobuf
func RecordToPbRecordPtr ¶ added in v0.8.0
Used to gRPC Protobuf
func RecordsToPbRecordsPtr ¶ added in v0.8.0
Used to gRPC Protobuf
func RegisterFuncs ¶
func RegisterFuncs(fps ...FuncPkg)
Types ¶
type Backend ¶ added in v0.8.0
type Backend interface {
// Backend name.
Name() string
// Initialization functions for each backend,
// called when the scheduler run `SetBackend`.
Init() error
// Record the metadata of the job to this backend.
RecordMetadata(r Record) error
// Record the result of the job run to this backend.
RecordResult(id uint64, status string, result string) error
// Get records by job id from this backend.
// @return records, total, error.
GetRecords(jId string, page, pageSize int) ([]Record, int64, error)
// Get all records from this backend.
// @return records, total, error.
GetAllRecords(page, pageSize int) ([]Record, int64, error)
// Delete records by job id from this backend.
DeleteRecords(jId string) error
// Delete all records from this backend.
DeleteAllRecords() error
// Clear all resources bound to this backend.
Clear() error
}
Defines the interface that each backend must implement.
type Broker ¶ added in v0.7.0
type Broker struct {
// Job queues.
// def: map[<queue>]QueuePkg
Queues map[string]QueuePkg
// contains filtered or unexported fields
}
When using a Broker, job scheduling is done in queue and no longer directly via API calls.
type CallbackPkg ¶ added in v0.11.0
type CallbackPkg struct {
Callback func(ep EventPkg)
Event event
}
type ClusterNode ¶
type ClusterNode struct {
// Main node RPC listening address.
// If you are the main, `EndpointMain` is the same as `Endpoint`.
// Default: `127.0.0.1:36380`
EndpointMain string
// The unique identifier of this node.
// RPC listening address.
// Used to expose the cluster's internal API.
// Default: `127.0.0.1:36380`
Endpoint string
// gRPC listening address.
// Used to expose the external API.
// Default: `127.0.0.1:36360`
EndpointGRPC string
// HTTP listening address.
// Used to expose the external API.
// Default: `127.0.0.1:36370`
EndpointHTTP string
// Useful when a job specifies a queue.
// A queue can correspond to multiple nodes.
// Default: `default`
Queue string
// Node mode, for Scheduler high availability.
// If the value is `HA`, the node will join the raft group.
// Default: “, Options `HA`
Mode string
// Bind to each other and the Scheduler.
Scheduler *Scheduler
// For Scheduler high availability.
// Bind to each other and the Raft.
Raft *Raft
// Used to mark the status of Cluster Scheduler operation.
SchedulerCanStart bool
// contains filtered or unexported fields
}
Each node provides `Cluster RPC`, `gRPC`, `HTTP` services, but only the main node starts the scheduler, the other worker nodes register with the main node and then run jobs from the main node via the RPC's `RunJob` API.
func (*ClusterNode) GetEndpointMain ¶
func (cn *ClusterNode) GetEndpointMain() string
func (*ClusterNode) HANodeMap ¶
func (cn *ClusterNode) HANodeMap() TypeNodeMap
func (*ClusterNode) IsMainNode ¶
func (cn *ClusterNode) IsMainNode() bool
func (*ClusterNode) MainNode ¶
func (cn *ClusterNode) MainNode() map[string]any
func (*ClusterNode) NodeMapCopy ¶
func (cn *ClusterNode) NodeMapCopy() TypeNodeMap
func (*ClusterNode) NodeMapToPbNodesPtr ¶
func (cn *ClusterNode) NodeMapToPbNodesPtr() map[string]*pb.Node
Used to gRPC Protobuf
func (*ClusterNode) RPCHeartbeat ¶
func (cn *ClusterNode) RPCHeartbeat(args *Node, reply *Node)
RPC API
func (*ClusterNode) RPCRegister ¶
func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)
RPC API
func (*ClusterNode) RegisterNodeRemote ¶
func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error
Used for worker node
After initialization, node need to register with the main node and synchronize cluster node information.
func (*ClusterNode) SetEndpointMain ¶
func (cn *ClusterNode) SetEndpointMain(endpoint string)
type FuncUnregisteredError ¶
type FuncUnregisteredError string
func (FuncUnregisteredError) Error ¶
func (e FuncUnregisteredError) Error() string
type HeartbeatArgs ¶
type HeartbeatReply ¶
type HeartbeatReply struct {
Term int
}
type Job ¶
type Job struct {
// The unique identifier of this job, automatically generated.
// It should not be set manually.
Id string `json:"id"`
// User defined.
Name string `json:"name"`
// Optional: `JOB_TYPE_DATETIME` | `JOB_TYPE_INTERVAL` | `JOB_TYPE_CRON`
Type string `json:"type"`
// It can be used when Type is `JOB_TYPE_DATETIME`.
// e.g. `2023-09-22 07:30:08`
StartAt string `json:"start_at"`
// This field is useless.
EndAt string `json:"end_at"`
// It can be used when Type is `JOB_TYPE_INTERVAL`.
// e.g. `2s`
Interval string `json:"interval"`
// It can be used when Type is `JOB_TYPE_CRON`.
// See `https://en.wikipedia.org/wiki/Cron`.
// e.g. `*/1 * * * *`
CronExpr string `json:"cron_expr"`
// Refer to `time.LoadLocation`.
// See `https://en.wikipedia.org/wiki/List_of_tz_database_time_zones`
// Default: `UTC`
Timezone string `json:"timezone"`
// The job actually runs the function,
// and you need to register it through 'RegisterFuncs' before using it.
// Since it cannot be stored by serialization,
// when using gRPC or HTTP calls, you should use `FuncName`.
Func func(context.Context, Job) (result string) `json:"-"`
// The actual path of `Func`.
// This field has a higher priority than `Func`
// e.g. `main.xxxFunc`
// `github.com/agscheduler/agscheduler/examples.PrintMsg`
FuncName string `json:"func_name"`
// Arguments for `Func`.
Args map[string]any `json:"args"`
// The running timeout of `Func`.
// Default: `1h`
Timeout string `json:"timeout"`
// Used in cluster mode, if empty, randomly pick a node to run `Func`,
// but when broker exist, if empty, randomly pick a queue to run `Func`.
Queues []string `json:"queues"`
// Maximum number of concurrent instances for this job.
// Minimum value is 1, cannot be set to 0 or negative.
// Default: 1
// Note: In protobuf, values ≤ 0 will be treated as 1.
MaxInstances int `json:"max_instances"`
// Automatic update, not manual setting.
LastRunTime time.Time `json:"last_run_time"`
// Automatic update, not manual setting.
// When the job is paused, this field is set to `9999-09-09 09:09:09`.
NextRunTime time.Time `json:"next_run_time"`
// Optional: `JOB_STATUS_RUNNING` | `JOB_STATUS_PAUSED`
// It should not be set manually.
Status string `json:"status"`
}
Carry the information of the scheduled job
func JobUnmarshal ¶ added in v0.8.3
Deserialize Bytes and convert to Job
func (*Job) LastRunTimeWithTimezone ¶
func (*Job) NextRunTimeWithTimezone ¶
type JobNotFoundError ¶
type JobNotFoundError string
func (JobNotFoundError) Error ¶
func (e JobNotFoundError) Error() string
type JobTimeoutError ¶
func (*JobTimeoutError) Error ¶
func (e *JobTimeoutError) Error() string
type Queue ¶ added in v0.7.0
type Queue interface {
// Queue name.
Name() string
// Initialization functions for each queue,
// called when the scheduler run `SetBroker`.
Init(ctx context.Context) error
// Push job to this queue.
PushJob(bJ []byte) error
// Pull job from this queue.
PullJob() <-chan []byte
// Count the number of jobs in this queue.
// @return -1, nil, if the queue does not support this feature or error.
CountJobs() (int, error)
// Clear all resources bound to this queue.
Clear() error
}
Defines the interface that each queue must implement.
type Raft ¶
type Raft struct {
// contains filtered or unexported fields
}
func (*Raft) RPCHeartbeat ¶
func (rf *Raft) RPCHeartbeat(args HeartbeatArgs, reply *HeartbeatReply) error
type Record ¶ added in v0.8.0
type Record struct {
// Unique Id
Id uint64 `json:"id"`
// Job id
JobId string `json:"job_id"`
// Job name
JobName string `json:"job_name"`
// Optional: `RECORD_STATUS_RUNNING` | `RECORD_STATUS_COMPLETED` | `RECORD_STATUS_ERROR` | `RECORD_STATUS_TIMEOUT`
Status string `json:"status"`
// The result of the job run
Result string `json:"result"`
// Start time
StartAt time.Time `json:"start_at"`
// End time
EndAt time.Time `json:"end_at"`
}
Carry the information of the job run.
func PbRecordPtrToRecord ¶ added in v0.8.0
Used to gRPC Protobuf
func PbRecordsPtrToRecords ¶ added in v0.8.0
Used to gRPC Protobuf
type RecordSlice ¶ added in v0.8.0
type RecordSlice []Record
`sort.Interface`, sorted by 'StartAt', descend.
func (RecordSlice) Len ¶ added in v0.8.0
func (rs RecordSlice) Len() int
func (RecordSlice) Less ¶ added in v0.8.0
func (rs RecordSlice) Less(i, j int) bool
func (RecordSlice) Swap ¶ added in v0.8.0
func (rs RecordSlice) Swap(i, j int)
type Recorder ¶ added in v0.8.0
type Recorder struct {
// Record store
// It should not be used directly.
Backend Backend
// contains filtered or unexported fields
}
When using a Recorder, the results of the job runs will be recorded to the specified backend.
func (*Recorder) DeleteAllRecords ¶ added in v0.8.0
func (*Recorder) DeleteRecords ¶ added in v0.8.0
func (*Recorder) GetAllRecords ¶ added in v0.8.0
func (*Recorder) GetRecords ¶ added in v0.8.0
func (*Recorder) RecordMetadata ¶ added in v0.8.0
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
In standalone mode, the scheduler only needs to run jobs on a regular basis. In cluster mode, the scheduler also needs to be responsible for allocating jobs to cluster nodes.
func (*Scheduler) DeleteAllJobs ¶
func (*Scheduler) GetAllJobs ¶
func (*Scheduler) HasListener ¶ added in v0.11.0
func (*Scheduler) HasRecorder ¶ added in v0.8.0
func (*Scheduler) IsClusterMode ¶
func (*Scheduler) ScheduleJob ¶
Select a worker node or queue.
func (*Scheduler) SetClusterNode ¶
func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error
Bind the cluster node
func (*Scheduler) SetListener ¶ added in v0.11.0
Bind the listener
func (*Scheduler) SetRecorder ¶ added in v0.8.0
Bind the recorder
func (*Scheduler) Start ¶
func (s *Scheduler) Start()
In addition to being called manually, it is also called after `AddJob`.
type Store ¶
type Store interface {
// Store name.
Name() string
// Initialization functions for each store,
// called when the scheduler run `SetStore`.
Init() error
// Add job to this store.
AddJob(j Job) error
// Get the job from this store.
// @return error `JobNotFoundError` if there are no job.
GetJob(id string) (Job, error)
// Get all jobs from this store.
GetAllJobs() ([]Job, error)
// Update job in store with a newer version.
UpdateJob(j Job) error
// Delete the job from this store.
DeleteJob(id string) error
// Delete all jobs from this store.
DeleteAllJobs() error
// Get the earliest next run time of all the jobs stored in this store,
// or `time.Time{}` if there are no job.
// Used to set the wakeup interval for the scheduler.
GetNextRunTime() (time.Time, error)
// Clear all resources bound to this store.
Clear() error
}
Defines the interface that each store must implement.
type TypeNodeMap ¶
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
backends/gorm
command
|
|
|
backends/memory
command
|
|
|
backends/mongodb
command
|
|
|
cluster/cluster_node
command
|
|
|
event
command
|
|
|
grpc
command
|
|
|
grpc/grpc_client
command
|
|
|
grpc/grpc_server
command
|
|
|
http
command
|
|
|
http/http_client
command
|
|
|
http/http_server
command
|
|
|
queues/kafka
command
|
|
|
queues/memory
command
|
|
|
queues/mqtt
command
|
|
|
queues/nsq
command
|
|
|
queues/rabbitmq
command
|
|
|
queues/redis
command
|
|
|
stores/elasticsearch
command
|
|
|
stores/etcd
command
|
|
|
stores/gorm
command
|
|
|
stores/memory
command
|
|
|
stores/mongodb
command
|
|
|
stores/redis
command
|
|
