actor

package
v0.0.0-...-884197f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

README

Orbit Actor 系统

概述

Orbit Actor 系统是基于 Proto.Actor 框架的强大实现,提供了一个简化的 Actor 生命周期管理接口。该系统专为高并发场景设计,内置了弹性恢复机制。

核心特性

  • Actor 监督机制:分层监督结构,实现强大的错误处理
  • 基于级别的 Actor 管理:为不同类型的 Actor 提供不同的优先级
  • 自动恢复:系统自动尝试重启失败的 Actor
  • 状态管理:高效跟踪 Actor 状态(启动中、停止中、已停止)

核心组件

Actor Facade

一个简化的 Actor 系统交互接口:

// ActorFacade 提供了管理 Actor 的简化接口
type ActorFacade struct {
    actorSystem *actor.ActorSystem
    supervisors []*actor.PID
}
Actor 监督

负责管理 Actor 生命周期:

// ActorSupervision 负责管理 Actor 生命周期
type ActorSupervision struct {
    state       atomic.Int32
    level       Level
    actorSystem *actor.ActorSystem
    starting    *Queue
    stopping    *Queue
}

可靠的 Actor 通信

GetOrStartActor 方法

GetOrStartActor 方法是我们 Actor 系统的关键组件,通过确保获取到随时可用的 Actor 来保证通信可靠性:

// GetOrStartActor 返回一个就绪可用的 Actor
func GetOrStartActor(actorName, pattern string) (*actor.PID, error)
可靠性特性
  • 保证就绪状态:该方法确保返回的 Actor 处于就绪状态,可以立即处理消息。
  • 自动重启机制:如果检测到 Actor 正在停止中,系统会自动将请求加入队列,并在 Actor 完全停止后重新启动它。
  • 停止状态处理:当 Actor 正在停止过程中,系统不会立即返回错误,而是将请求加入队列,并在 Actor 完全停止后自动重启它。
  • 消息投递保证:通过确保 Actor 就绪后再返回给调用者,系统防止了由于向已停止或停止中的 Actor 发送消息而导致的消息投递失败。
  • 透明恢复:调用者无需处理 Actor 生命周期管理 - 系统透明地处理 Actor 重启,无需显式干预。
  • 故障预防:帮助避免常见问题,如消息超时或由于 Actor 状态不适当而导致的消息丢失。

这种方法显著减少了以下错误情况:

  • 调用者向已停止的 Actor 发送消息
  • 由于向正在停止过程中的 Actor 发送消息而导致消息丢失
  • 由于 Actor 正在重启过程中而导致消息投递失败
  • 由于 Actor 无法处理消息而导致超时
  • 调用者遇到静默失败,无法检测到消息投递失败

使用 GetOrStartActor 的关键优势在于,客户端无需实现复杂的重试逻辑或处理 Actor 生命周期边缘情况 - 系统透明地处理这些问题,确保消息只发送给准备好接收它们的 Actor。

使用示例

// 向 Actor 发送消息(消息投递)
func SendMessage(data string) error {
    return Send("my-actor", "worker-pattern", &MyMessage{Data: data})
}

// 与 Actor 进行请求-响应模式通信
func RequestData(id string) (Response, error) {
    result, err := Request("data-actor", "data-pattern", &DataRequest{ID: id})
    if err != nil {
        return nil, err
    }
    return result.(Response), nil
}

线程安全

Actor 系统的所有组件都设计为线程安全的,允许多个 goroutine 并发访问而无需额外的同步机制。

Documentation

Index

Constants

View Source
const (
	StateNone = iota
	StateStopped
)
View Source
const (
	StartActorTimeout              = 30 * time.Second
	StopActorTimeout               = 30 * time.Second
	ManagerStartActorFutureTimeout = 30 * time.Second

	AliveCheckInterval  = 30 * time.Second
	DefaultAliveTimeout = 30 * time.Minute
)
View Source
const (
	MessageTypeSend int8 = iota
	MessageTypeRequest
	MessageTypeForward
)
View Source
const (
	StateActorSupervisionNormal int32 = iota
	StateActorSupervisionStopping
	StateActorSupervisionStopped
)
View Source
const (
	ManagerName = "system-actor-supervision"

	ActorSystemStateRunning  = 0
	ActorSystemStateStopping = 1
	ActorSystemStateStopped  = 2
)

Variables

View Source
var (
	DispatcherType_name = map[int32]string{
		0: "DISPATCHER_TYPE_IN_WORLD",
		1: "DISPATCHER_TYPE_IN_REGION",
		2: "DISPATCHER_TYPE_RANDOM",
	}
	DispatcherType_value = map[string]int32{
		"DISPATCHER_TYPE_IN_WORLD":  0,
		"DISPATCHER_TYPE_IN_REGION": 1,
		"DISPATCHER_TYPE_RANDOM":    2,
	}
)

Enum value maps for DispatcherType.

View Source
var (
	ErrActorNotFound      = errors.New("actor not found")
	ErrActorStopped       = errors.New("actor is stopped")
	ErrSupervisionStopped = errors.New("supervision stopped")
)
View Source
var File_actor_ref_proto protoreflect.FileDescriptor

Functions

func ExtractActorName

func ExtractActorName(pid *actor.PID) string

ExtractActorName extracts the base actor name from a PID For example: "system-actor-supervision-level-0/test-actor-2" -> "test-actor-2"

func ExtractActorNameFromPath

func ExtractActorNameFromPath(path string) string

ExtractActorNameFromPath extracts the base actor name from a path string For example: "system-actor-supervision-level-0/test-actor-2" -> "test-actor-2"

func GenManagerName

func GenManagerName(level Level) string

func InitPatternLevelMap

func InitPatternLevelMap(list []struct {
	Pattern string
	Level   Level
})

func RegFactories

func RegFactories(factories ...struct {
	pattern string
	factory BehaivorFactory
})

RegFactories registers multiple factories at once

func RegFactory

func RegFactory(pattern string, factory BehaivorFactory)

RegFactory registers a factory for a specific actor type

func StopActor

func StopActor(actorName, pattern string) error

StopActor stops the actor with the given ID

Types

type ActorInfo

type ActorInfo struct {
	ActorName string
	Pattern   string
}

type ActorMetaCache

type ActorMetaCache struct {
	// contains filtered or unexported fields
}

func NewMetaCache

func NewMetaCache(cli *redis.Client) *ActorMetaCache

func (*ActorMetaCache) Del

func (c *ActorMetaCache) Del(key string)

func (*ActorMetaCache) Get

func (c *ActorMetaCache) Get(key string) (*Meta, bool)

func (*ActorMetaCache) Load

func (c *ActorMetaCache) Load(actorName string) (*Meta, error)

func (*ActorMetaCache) Set

func (c *ActorMetaCache) Set(key string, value *Meta)

func (*ActorMetaCache) Store

func (c *ActorMetaCache) Store(actorName string, value *Meta) (*Meta, error)

type ActorRef

type ActorRef struct {
	ActorName string `protobuf:"bytes,2,opt,name=ActorName,proto3" json:"ActorName,omitempty"`
	Pattern   string `protobuf:"bytes,3,opt,name=Pattern,proto3" json:"Pattern,omitempty"`
	ServerId  string `protobuf:"bytes,4,opt,name=ServerId,proto3" json:"ServerId,omitempty"` //初始逻辑服ID

	Props *Props `protobuf:"-"`
	// contains filtered or unexported fields
}

ActorRef 包含 Actor不变的初始化信息

func NewActorRef

func NewActorRef(props *Props, actorName, pattern string, ops ...PropsOption) *ActorRef

NewActorRef 创建一个新的ActorRef实例 ActorRef为Actor的代理。它主要的作用是支持向它所代表的Actor发送消息, 从而实现Actor之间的通信。通过ActorRef,可以避免直接访问或操作Actor的内部信息和状态。

func (*ActorRef) Descriptor deprecated

func (*ActorRef) Descriptor() ([]byte, []int)

Deprecated: Use ActorRef.ProtoReflect.Descriptor instead.

func (*ActorRef) GetActorName

func (x *ActorRef) GetActorName() string

func (*ActorRef) GetPattern

func (x *ActorRef) GetPattern() string

func (*ActorRef) GetServerId

func (x *ActorRef) GetServerId() string

func (*ActorRef) ProtoMessage

func (*ActorRef) ProtoMessage()

func (*ActorRef) ProtoReflect

func (x *ActorRef) ProtoReflect() protoreflect.Message

func (*ActorRef) RequestFuture

func (actorRef *ActorRef) RequestFuture(msg any, timeout ...time.Duration) (any, error)

RequestFuture 发送消息到Actor并等待消息回复 如果Actor正在停止,则尝试排队订阅新Actor事件, 当新的Actor启动就绪后,重新发送消息

func (*ActorRef) Reset

func (x *ActorRef) Reset()

func (*ActorRef) Send

func (actorRef *ActorRef) Send(msg any) error

Send 发送消息到Actor 如果Actor正在停止,则尝试排队订阅新Actor事件, 当新的Actor启动就绪后,重新发送消息

func (*ActorRef) Stop

func (actorRef *ActorRef) Stop()

Stop 停止当前Actor 此方法向Actor系统发送停止信号,请求终止目标Actor的执行. 当有新消息发送到目标Actor,会将Actor重新激活。 调用此方法后,目标Actor将完成当前正在处理的消息,然后优雅地关闭 注意: 停止操作是异步的,方法调用后立即返回,不等待Actor实际停止

func (*ActorRef) String

func (x *ActorRef) String() string

type ActorRefCache

type ActorRefCache struct {
	// contains filtered or unexported fields
}

func NewActorRefCache

func NewActorRefCache() *ActorRefCache

func (*ActorRefCache) Del

func (c *ActorRefCache) Del(actorName string)

func (*ActorRefCache) Get

func (c *ActorRefCache) Get(actorName string) (*ActorRef, bool)

func (*ActorRefCache) Set

func (c *ActorRefCache) Set(actorName string, value *ActorRef)

type ActorSupervision

type ActorSupervision struct {
	// contains filtered or unexported fields
}

ActorSupervision is responsible for managing actor lifecycle

func NewActorSupervision

func NewActorSupervision(actorSystem *actor.ActorSystem, level Level) *ActorSupervision

NewActorSupervision creates a new instance of ActorManager

func (*ActorSupervision) Receive

func (m *ActorSupervision) Receive(context actor.Context)

Receive handles messages sent to the ActorManager

type ActorSystem

type ActorSystem struct {
	// contains filtered or unexported fields
}

ActorSystem provides a simplified interface for managing actors

var (
	System *ActorSystem

	ActorFacadeStopping atomic.Bool
)

func NewActorFacade

func NewActorFacade(actorSystem *actor.ActorSystem) *ActorSystem

NewActorFacade creates a new instance of ActorFacade

func (*ActorSystem) ActorSystem

func (af *ActorSystem) ActorSystem() *actor.ActorSystem

func (*ActorSystem) RequestFuture

func (f *ActorSystem) RequestFuture(pattern string, msg any, timeout time.Duration) (any, error)

func (*ActorSystem) Start

func (af *ActorSystem) Start() error

func (*ActorSystem) Stop

func (af *ActorSystem) Stop(ctx context.Context) error

Stop 开始ActorSystem的优雅关闭流程 参数:

  • ctx: 用于控制停止操作超时和取消的上下文

返回:

  • 如果系统已在关闭或关闭成功则返回nil,如果发生错误则返回错误

说明:

  • 此方法只有在系统处于运行状态时才会进行实际关闭
  • 使用CAS操作确保只有一个goroutine能够触发系统关闭

func (*ActorSystem) StopWithDefaultTimeout

func (af *ActorSystem) StopWithDefaultTimeout() error

StopWithDefaultTimeout is a convenience method that stops the actor system with a default timeout of 60 seconds

func (*ActorSystem) StopWithTimeout

func (af *ActorSystem) StopWithTimeout(timeout time.Duration) error

StopWithTimeout is a convenience method that stops the actor system with a timeout It creates a context with the specified timeout and calls Stop

type ActorsCache

type ActorsCache struct {
	// contains filtered or unexported fields
}

func NewActorsCache

func NewActorsCache() *ActorsCache

func (*ActorsCache) Delete

func (c *ActorsCache) Delete(actorName string)

func (*ActorsCache) Exist

func (c *ActorsCache) Exist(actorName string) bool

func (*ActorsCache) Get

func (c *ActorsCache) Get(actorName string) (*Process, bool)

func (*ActorsCache) Set

func (c *ActorsCache) Set(actorName string, p *Process)

type BehaivorFactory

type BehaivorFactory func(actorName string) Behavior

Factory function type that accepts an actor ID

func Dispatch

func Dispatch(pattern string) BehaivorFactory

Dispatch returns a factory function for creating actors with the given ID

type Behavior

type Behavior interface {
	HandleRequest(ctx IContext, msg any) (any, error)
	HandleSend(ctx IContext, msg any)
	HandleForward(ctx IContext, msg any)
	HandleInit(ctx IContext) error
	HandleStopping(ctx IContext) error
	HandleStopped(ctx IContext) error
}

func CreateBehaviorWithID

func CreateBehaviorWithID(pattern string, actorName string) Behavior

CreateBehaviorWithID creates an actor with a specific ID

type CallMessage

type CallMessage struct {
	ActorName string
	Message   any
}

type CallMessageResponse

type CallMessageResponse struct {
	ActorName string
	Message   any
	Error     error
}

type CastMessage

type CastMessage struct {
	ActorName string
	Message   any
}

type CheckAliveMessage

type CheckAliveMessage struct{}

type ChildActor

type ChildActor struct {
	Behavior
	*TimerMgr
	// contains filtered or unexported fields
}

ChildActor 表示由SupervisorActor管理的子Actor 实现了InitNotifiable接口,允许在初始化完成后发送通知

func NewChildActor

func NewChildActor(behavior Behavior, name, pattern string, meta *Meta, aliveTimeout time.Duration, initCB func(err error) error) *ChildActor

NewChildActor 创建一个新的子Actor

func (*ChildActor) GetActorContext

func (state *ChildActor) GetActorContext() actor.Context

func (*ChildActor) GetActorName

func (state *ChildActor) GetActorName() string

func (*ChildActor) GetContext

func (state *ChildActor) GetContext() IContext

func (*ChildActor) GetMetaData

func (state *ChildActor) GetMetaData() *Meta

func (*ChildActor) GetPattern

func (state *ChildActor) GetPattern() string

func (*ChildActor) GetServerId

func (state *ChildActor) GetServerId() string

func (*ChildActor) HandleInit

func (state *ChildActor) HandleInit(context actor.Context)

HandleInit 在Actor启动时执行的初始化逻辑 返回nil表示成功,否则返回错误

func (*ChildActor) HandleStopped

func (state *ChildActor) HandleStopped(context actor.Context)

func (*ChildActor) HandleStopping

func (state *ChildActor) HandleStopping(context actor.Context) error

func (*ChildActor) IsActive

func (state *ChildActor) IsActive() bool

IsActive 检查Actor是否活跃

func (*ChildActor) Receive

func (state *ChildActor) Receive(context actor.Context)

Receive 处理接收到的消息

func (*ChildActor) SetActorContext

func (state *ChildActor) SetActorContext(context actor.Context)

func (*ChildActor) SetMetaData

func (state *ChildActor) SetMetaData(meta *Meta)

type ChildStartedNotification

type ChildStartedNotification struct {
	ActorName string
	Error     error
}

ChildStartedNotification 子Actor启动完成并执行Behavior HandleInit后发送的通知

type Dispatcher

type Dispatcher struct {
	Type     DispatcherType `protobuf:"varint,1,opt,name=Type,proto3,enum=actor.DispatcherType" json:"Type,omitempty"`
	ServerId string         `protobuf:"bytes,2,opt,name=ServerId,proto3" json:"ServerId,omitempty"` //最新的逻辑服ID
	NodeId   string         `protobuf:"bytes,3,opt,name=NodeId,proto3" json:"NodeId,omitempty"`     //最新的节点ID
	// contains filtered or unexported fields
}

func (*Dispatcher) Descriptor deprecated

func (*Dispatcher) Descriptor() ([]byte, []int)

Deprecated: Use Dispatcher.ProtoReflect.Descriptor instead.

func (*Dispatcher) GetNodeId

func (x *Dispatcher) GetNodeId() string

func (*Dispatcher) GetServerId

func (x *Dispatcher) GetServerId() string

func (*Dispatcher) GetType

func (x *Dispatcher) GetType() DispatcherType

func (*Dispatcher) ProtoMessage

func (*Dispatcher) ProtoMessage()

func (*Dispatcher) ProtoReflect

func (x *Dispatcher) ProtoReflect() protoreflect.Message

func (*Dispatcher) Reset

func (x *Dispatcher) Reset()

func (*Dispatcher) String

func (x *Dispatcher) String() string

type DispatcherType

type DispatcherType int32
const (
	DispatcherType_DISPATCHER_TYPE_IN_WORLD  DispatcherType = 0 // 世界分组
	DispatcherType_DISPATCHER_TYPE_IN_REGION DispatcherType = 1 // 区域分组
	DispatcherType_DISPATCHER_TYPE_RANDOM    DispatcherType = 2 // 随机分组
)

func (DispatcherType) Descriptor

func (DispatcherType) Enum

func (x DispatcherType) Enum() *DispatcherType

func (DispatcherType) EnumDescriptor deprecated

func (DispatcherType) EnumDescriptor() ([]byte, []int)

Deprecated: Use DispatcherType.Descriptor instead.

func (DispatcherType) Number

func (DispatcherType) String

func (x DispatcherType) String() string

func (DispatcherType) Type

type ForwardMessage

type ForwardMessage struct {
	ActorName string
	Message   any
}

type ForwardMessageResponse

type ForwardMessageResponse struct {
	ActorName string
	Message   any
	Error     error
}

type GracefulShutdownManager

type GracefulShutdownManager struct {
	// contains filtered or unexported fields
}

GracefulShutdownManager 负责管理优雅关闭过程,通过反复尝试特定操作直到连续成功达到阈值次数 用于确保系统组件可以安全、可靠地关闭,即使在分布式或并发环境中

func NewGracefulShutdownManager

func NewGracefulShutdownManager(successThreshold int32, shutdownOperation func() bool) *GracefulShutdownManager

NewGracefulShutdownManager 创建一个新的优雅关闭管理器 参数:

  • successThreshold: 确认操作完成所需的连续成功次数
  • shutdownOperation: 实际执行关闭操作并返回是否成功的函数

func NewGracefulShutdownManagerWithMaxAttempts

func NewGracefulShutdownManagerWithMaxAttempts(successThreshold int32, maxAttempts int32, shutdownOperation func() bool) *GracefulShutdownManager

NewGracefulShutdownManagerWithMaxAttempts 创建一个有最大尝试次数限制的优雅关闭管理器 参数:

  • successThreshold: 确认操作完成所需的连续成功次数
  • maxAttempts: 最大尝试次数,0表示无限重试
  • shutdownOperation: 实际执行关闭操作并返回是否成功的函数

func (*GracefulShutdownManager) Shutdown

func (g *GracefulShutdownManager) Shutdown(ctx context.Context) error

Shutdown 开始并管理关闭过程 参数:

  • ctx: 用于控制关闭超时和取消的上下文

返回:

  • 如果关闭成功完成则返回nil,否则返回错误

注意:

  • 此方法会阻塞直到关闭完成或上下文被取消
  • 内部使用goroutine执行反复尝试的关闭操作,避免阻塞主线程

type IBaseContext

type IBaseContext interface {
	SetMetaData(meta *Meta)
	GetMetaData() *Meta
	GetActorName() string
	GetPattern() string
	GetActorContext() actor.Context
	SetActorContext(context actor.Context)
	GetServerId() string
}

type IContext

type IContext interface {
	IBaseContext
	ITimerContext
}

type IService

type IService interface {
	Start() error
	Stop(ctx context.Context) error
}

type ITimerContext

type ITimerContext interface {
	AddSystemTimer(key string, duration time.Duration, msg any) *Timer
	AddTimerOnce(key string, duration time.Duration, msg any) *Timer
	RemoveTimer(key string)
}

type Item

type Item struct {
	ActorName string
	Pattern   string
	Future    []*actor.PID
	Child     *actor.PID
	Props     *Props
}

func NewItem

func NewItem(actorName, pattern string, child *actor.PID, props *Props, future ...*actor.PID) *Item

func (*Item) AddFuture

func (i *Item) AddFuture(futures ...*actor.PID)

func (*Item) Futures

func (i *Item) Futures() []*actor.PID

func (*Item) FuturesNum

func (i *Item) FuturesNum() int

type Level

type Level int
const (
	// Level definitions for actors
	// 等级越高,停止的优先度越低
	LevelNormal Level = iota // Normal priority level
	LevelHigh                // High priority level

	LevelMaxLimit // Max limit level,无实际意义
)

func GetLevelByPattern

func GetLevelByPattern(pattern string) Level

type Meta

type Meta struct {
	ActorName  string      `protobuf:"bytes,1,opt,name=ActorName,proto3" json:"ActorName,omitempty"`
	Pattern    string      `protobuf:"bytes,2,opt,name=Pattern,proto3" json:"Pattern,omitempty"`
	ServerId   string      `protobuf:"bytes,3,opt,name=ServerId,proto3" json:"ServerId,omitempty"` //初始逻辑服ID
	Dispatcher *Dispatcher `protobuf:"bytes,4,opt,name=Dispatcher,proto3" json:"Dispatcher,omitempty"`
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(name, pattern, serverId string, dispatcher *Dispatcher) *Meta

func (*Meta) Descriptor deprecated

func (*Meta) Descriptor() ([]byte, []int)

Deprecated: Use Meta.ProtoReflect.Descriptor instead.

func (*Meta) GetActorName

func (x *Meta) GetActorName() string

func (*Meta) GetDispatcher

func (x *Meta) GetDispatcher() *Dispatcher

func (*Meta) GetPattern

func (x *Meta) GetPattern() string

func (*Meta) GetServerId

func (x *Meta) GetServerId() string

func (*Meta) ProtoMessage

func (*Meta) ProtoMessage()

func (*Meta) ProtoReflect

func (x *Meta) ProtoReflect() protoreflect.Message

func (*Meta) Reset

func (x *Meta) Reset()

func (*Meta) String

func (x *Meta) String() string

type PoisonActorMessage

type PoisonActorMessage struct {
	ActorName string
	Pattern   string
}

type Process

type Process struct {
	State     int8
	ActorName string
	Pattern   string
	Props     *Props
	PID       *actor.PID
	// contains filtered or unexported fields
}

func GetOrStartActor

func GetOrStartActor(actorName, pattern string, props *Props) (*Process, error)

GetOrStartActor 获取一个就绪的Actor对象

func NewActorProcess

func NewActorProcess(actorName, pattern string, child *actor.PID, props *Props) *Process

func (*Process) GetPID

func (p *Process) GetPID() *actor.PID

func (*Process) IsStopped

func (p *Process) IsStopped() bool

func (*Process) RequestFuture

func (p *Process) RequestFuture(msg any, timeout ...time.Duration) (any, error)

func (*Process) Send

func (p *Process) Send(msg any) error

func (*Process) Stop

func (p *Process) Stop()

type Props

type Props struct {
	InitHandler  func() error
	Meta         *Meta
	AliveTimeout time.Duration
	// contains filtered or unexported fields
}

func NewProps

func NewProps() *Props

func (*Props) GetAliveTimeout

func (pp *Props) GetAliveTimeout() time.Duration

func (*Props) GetInitHandler

func (pp *Props) GetInitHandler() func() error

func (*Props) GetKvs

func (pp *Props) GetKvs(iter func(k string, v any))

func (*Props) GetMeta

func (pp *Props) GetMeta() *Meta

type PropsOption

type PropsOption func(pp *Props)

func WithAliveTimeout

func WithAliveTimeout(timeout time.Duration) PropsOption

WithAliveTimeout 配置Actor的活跃超时时间

参数:

  • timeout: 活跃超时时间,如果Actor在此时间内没有收到任何消息或处理任何事件, 将被视为不活跃。系统可能会根据此配置停止或重启不活跃的Actor。

详细说明:

  1. 每个Actor都有一个最后活动时间戳,当收到消息或处理事件时会更新此时间戳
  2. 系统会定期检查Actor的活跃状态,如果发现Actor超过指定的timeout时间没有活动, 将触发清理机制,可能会停止或重启该Actor
  3. 这个机制有助于识别并清理"僵尸"Actor,避免资源泄漏
  4. 如果不设置此选项,将使用系统默认的超时时间(通常为30分钟)

使用场景:

  • 限制长时间不活跃的Actor占用系统资源
  • 对不同类型的Actor设置不同的活跃检测策略
  • 确保关键Actor在长时间不活跃时能被重启

示例:

props := NewProps(
  WithAliveTimeout(30 * time.Minute),  // 设置30分钟活跃超时
)
actorRef := NewActorRef(props, "my-actor", "my-pattern")

func WithInitHandler

func WithInitHandler(handler func() error) PropsOption

func WithMeta

func WithMeta(meta *Meta) PropsOption

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewPriorityQueue

func NewPriorityQueue() *Queue

func (*Queue) Empty

func (q *Queue) Empty() bool

func (*Queue) Exists

func (q *Queue) Exists(key string) bool

func (*Queue) Free

func (q *Queue) Free()

func (*Queue) Insert

func (q *Queue) Insert(actorName string, item *Item, priority int64) error

func (*Queue) Pop

func (q *Queue) Pop(key string) (*Item, bool)

func (*Queue) PopAndRangeWithKey

func (q *Queue) PopAndRangeWithKey(key string, iter func(name, pattern string, child, future *actor.PID) bool) (*Item, bool)

func (*Queue) PushFuture

func (q *Queue) PushFuture(actorName string, future *actor.PID)

type RequestMessage

type RequestMessage struct {
	MsgType int8
	Message any
}

type StartActorRequest

type StartActorRequest struct {
	Pattern   string
	ActorName string
	Timeout   time.Duration
	Future    *actor.PID
	Props     *Props
}

Message types for ActorManager

type StartActorWait

type StartActorWait struct {
}

type StopAllRequest

type StopAllRequest struct {
}

type StopAllResponse

type StopAllResponse struct {
	Complete bool
}

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

Timer 表示一个定时器

func NewSystemTimer

func NewSystemTimer(key string, idx int64, duration time.Duration, msg any) *Timer

func NewTimer

func NewTimer(key string, idx int64, duration time.Duration, msg any) *Timer

func (*Timer) Equal

func (t *Timer) Equal(other *Timer) bool

func (*Timer) GetDuration

func (t *Timer) GetDuration() time.Duration

func (*Timer) GetKey

func (t *Timer) GetKey() string

func (*Timer) IsSystem

func (t *Timer) IsSystem() bool

type TimerMessage

type TimerMessage struct{}

type TimerMgr

type TimerMgr struct {
	// contains filtered or unexported fields
}

TimerMgr 使用最小堆管理Timer对象

1:所有接口不允许并发操作
2:管理两种定时器
	Timer: 普通定时器,不支持自动续约
		支持操作类型:Insert,Remove,Update
	SystemTimer: 系统定时器,支持自动续约
		支持操作类型:Insert,Remove
3: 所有接口不允许并发操作

func NewTimerMgr

func NewTimerMgr(callback func()) *TimerMgr

NewTimerMgr 创建一个新的TimerMgr

func (*TimerMgr) AddSystemTimer

func (t *TimerMgr) AddSystemTimer(key string, duration time.Duration, msg any) *Timer

AddSystemTimer 添加一个系统定时器 SystemTimer:

1: 系统定时器会自动续约
2: 系统定时器不支持更新操作,如果需要更新,请使用RemoveTimer和AddSystemTimer

func (*TimerMgr) AddTimerOnce

func (t *TimerMgr) AddTimerOnce(key string, duration time.Duration, msg any) *Timer

AddTimerOnce 添加一个一次性定时器 Timer:

1: 一次性定时器,支持插入/更新/删除操作
2: 一次性定时器不支持自动续约

func (*TimerMgr) Process

func (t *TimerMgr) Process(delegate func(msg any))

func (*TimerMgr) RemoveTimer

func (t *TimerMgr) RemoveTimer(key string)

RemoveTimer 从管理器中移除一个定时器

func (*TimerMgr) Stop

func (t *TimerMgr) Stop()

Stop 停止定时器管理器

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL