Documentation
¶
Index ¶
- Constants
- Variables
- func ExtractActorName(pid *actor.PID) string
- func ExtractActorNameFromPath(path string) string
- func GenManagerName(level Level) string
- func InitPatternLevelMap(list []struct{ ... })
- func RegFactories(factories ...)
- func RegFactory(pattern string, factory BehaivorFactory)
- func StopActor(actorName, pattern string) error
- type ActorInfo
- type ActorMetaCache
- type ActorRef
- func (*ActorRef) Descriptor() ([]byte, []int)deprecated
- func (x *ActorRef) GetActorName() string
- func (x *ActorRef) GetPattern() string
- func (x *ActorRef) GetServerId() string
- func (*ActorRef) ProtoMessage()
- func (x *ActorRef) ProtoReflect() protoreflect.Message
- func (actorRef *ActorRef) RequestFuture(msg any, timeout ...time.Duration) (any, error)
- func (x *ActorRef) Reset()
- func (actorRef *ActorRef) Send(msg any) error
- func (actorRef *ActorRef) Stop()
- func (x *ActorRef) String() string
- type ActorRefCache
- type ActorSupervision
- type ActorSystem
- func (af *ActorSystem) ActorSystem() *actor.ActorSystem
- func (f *ActorSystem) RequestFuture(pattern string, msg any, timeout time.Duration) (any, error)
- func (af *ActorSystem) Start() error
- func (af *ActorSystem) Stop(ctx context.Context) error
- func (af *ActorSystem) StopWithDefaultTimeout() error
- func (af *ActorSystem) StopWithTimeout(timeout time.Duration) error
- type ActorsCache
- type BehaivorFactory
- type Behavior
- type CallMessage
- type CallMessageResponse
- type CastMessage
- type CheckAliveMessage
- type ChildActor
- func (state *ChildActor) GetActorContext() actor.Context
- func (state *ChildActor) GetActorName() string
- func (state *ChildActor) GetContext() IContext
- func (state *ChildActor) GetMetaData() *Meta
- func (state *ChildActor) GetPattern() string
- func (state *ChildActor) GetServerId() string
- func (state *ChildActor) HandleInit(context actor.Context)
- func (state *ChildActor) HandleStopped(context actor.Context)
- func (state *ChildActor) HandleStopping(context actor.Context) error
- func (state *ChildActor) IsActive() bool
- func (state *ChildActor) Receive(context actor.Context)
- func (state *ChildActor) SetActorContext(context actor.Context)
- func (state *ChildActor) SetMetaData(meta *Meta)
- type ChildStartedNotification
- type Dispatcher
- func (*Dispatcher) Descriptor() ([]byte, []int)deprecated
- func (x *Dispatcher) GetNodeId() string
- func (x *Dispatcher) GetServerId() string
- func (x *Dispatcher) GetType() DispatcherType
- func (*Dispatcher) ProtoMessage()
- func (x *Dispatcher) ProtoReflect() protoreflect.Message
- func (x *Dispatcher) Reset()
- func (x *Dispatcher) String() string
- type DispatcherType
- func (DispatcherType) Descriptor() protoreflect.EnumDescriptor
- func (x DispatcherType) Enum() *DispatcherType
- func (DispatcherType) EnumDescriptor() ([]byte, []int)deprecated
- func (x DispatcherType) Number() protoreflect.EnumNumber
- func (x DispatcherType) String() string
- func (DispatcherType) Type() protoreflect.EnumType
- type ForwardMessage
- type ForwardMessageResponse
- type GracefulShutdownManager
- type IBaseContext
- type IContext
- type IService
- type ITimerContext
- type Item
- type Level
- type Meta
- func (*Meta) Descriptor() ([]byte, []int)deprecated
- func (x *Meta) GetActorName() string
- func (x *Meta) GetDispatcher() *Dispatcher
- func (x *Meta) GetPattern() string
- func (x *Meta) GetServerId() string
- func (*Meta) ProtoMessage()
- func (x *Meta) ProtoReflect() protoreflect.Message
- func (x *Meta) Reset()
- func (x *Meta) String() string
- type PoisonActorMessage
- type Process
- type Props
- type PropsOption
- type Queue
- func (q *Queue) Empty() bool
- func (q *Queue) Exists(key string) bool
- func (q *Queue) Free()
- func (q *Queue) Insert(actorName string, item *Item, priority int64) error
- func (q *Queue) Pop(key string) (*Item, bool)
- func (q *Queue) PopAndRangeWithKey(key string, iter func(name, pattern string, child, future *actor.PID) bool) (*Item, bool)
- func (q *Queue) PushFuture(actorName string, future *actor.PID)
- type RequestMessage
- type StartActorRequest
- type StartActorWait
- type StopAllRequest
- type StopAllResponse
- type Timer
- type TimerMessage
- type TimerMgr
Constants ¶
const ( StateNone = iota StateStopped )
const ( StartActorTimeout = 30 * time.Second StopActorTimeout = 30 * time.Second ManagerStartActorFutureTimeout = 30 * time.Second AliveCheckInterval = 30 * time.Second DefaultAliveTimeout = 30 * time.Minute )
const ( MessageTypeSend int8 = iota MessageTypeRequest MessageTypeForward )
const ( StateActorSupervisionNormal int32 = iota StateActorSupervisionStopping StateActorSupervisionStopped )
const ( ManagerName = "system-actor-supervision" ActorSystemStateRunning = 0 ActorSystemStateStopping = 1 ActorSystemStateStopped = 2 )
Variables ¶
var ( DispatcherType_name = map[int32]string{ 0: "DISPATCHER_TYPE_IN_WORLD", 1: "DISPATCHER_TYPE_IN_REGION", 2: "DISPATCHER_TYPE_RANDOM", } DispatcherType_value = map[string]int32{ "DISPATCHER_TYPE_IN_WORLD": 0, "DISPATCHER_TYPE_IN_REGION": 1, "DISPATCHER_TYPE_RANDOM": 2, } )
Enum value maps for DispatcherType.
var ( ErrActorNotFound = errors.New("actor not found") ErrActorStopped = errors.New("actor is stopped") ErrSupervisionStopped = errors.New("supervision stopped") )
var File_actor_ref_proto protoreflect.FileDescriptor
Functions ¶
func ExtractActorName ¶
ExtractActorName extracts the base actor name from a PID For example: "system-actor-supervision-level-0/test-actor-2" -> "test-actor-2"
func ExtractActorNameFromPath ¶
ExtractActorNameFromPath extracts the base actor name from a path string For example: "system-actor-supervision-level-0/test-actor-2" -> "test-actor-2"
func GenManagerName ¶
func InitPatternLevelMap ¶
func RegFactories ¶
func RegFactories(factories ...struct { pattern string factory BehaivorFactory })
RegFactories registers multiple factories at once
func RegFactory ¶
func RegFactory(pattern string, factory BehaivorFactory)
RegFactory registers a factory for a specific actor type
Types ¶
type ActorMetaCache ¶
type ActorMetaCache struct {
// contains filtered or unexported fields
}
func NewMetaCache ¶
func NewMetaCache(cli *redis.Client) *ActorMetaCache
func (*ActorMetaCache) Del ¶
func (c *ActorMetaCache) Del(key string)
func (*ActorMetaCache) Set ¶
func (c *ActorMetaCache) Set(key string, value *Meta)
type ActorRef ¶
type ActorRef struct { ActorName string `protobuf:"bytes,2,opt,name=ActorName,proto3" json:"ActorName,omitempty"` Pattern string `protobuf:"bytes,3,opt,name=Pattern,proto3" json:"Pattern,omitempty"` ServerId string `protobuf:"bytes,4,opt,name=ServerId,proto3" json:"ServerId,omitempty"` //初始逻辑服ID Props *Props `protobuf:"-"` // contains filtered or unexported fields }
ActorRef 包含 Actor不变的初始化信息
func NewActorRef ¶
func NewActorRef(props *Props, actorName, pattern string, ops ...PropsOption) *ActorRef
NewActorRef 创建一个新的ActorRef实例 ActorRef为Actor的代理。它主要的作用是支持向它所代表的Actor发送消息, 从而实现Actor之间的通信。通过ActorRef,可以避免直接访问或操作Actor的内部信息和状态。
func (*ActorRef) Descriptor
deprecated
func (*ActorRef) GetActorName ¶
func (*ActorRef) GetPattern ¶
func (*ActorRef) GetServerId ¶
func (*ActorRef) ProtoMessage ¶
func (*ActorRef) ProtoMessage()
func (*ActorRef) ProtoReflect ¶
func (x *ActorRef) ProtoReflect() protoreflect.Message
func (*ActorRef) RequestFuture ¶
RequestFuture 发送消息到Actor并等待消息回复 如果Actor正在停止,则尝试排队订阅新Actor事件, 当新的Actor启动就绪后,重新发送消息
type ActorRefCache ¶
type ActorRefCache struct {
// contains filtered or unexported fields
}
func NewActorRefCache ¶
func NewActorRefCache() *ActorRefCache
func (*ActorRefCache) Del ¶
func (c *ActorRefCache) Del(actorName string)
func (*ActorRefCache) Set ¶
func (c *ActorRefCache) Set(actorName string, value *ActorRef)
type ActorSupervision ¶
type ActorSupervision struct {
// contains filtered or unexported fields
}
ActorSupervision is responsible for managing actor lifecycle
func NewActorSupervision ¶
func NewActorSupervision(actorSystem *actor.ActorSystem, level Level) *ActorSupervision
NewActorSupervision creates a new instance of ActorManager
func (*ActorSupervision) Receive ¶
func (m *ActorSupervision) Receive(context actor.Context)
Receive handles messages sent to the ActorManager
type ActorSystem ¶
type ActorSystem struct {
// contains filtered or unexported fields
}
ActorSystem provides a simplified interface for managing actors
var ( System *ActorSystem ActorFacadeStopping atomic.Bool )
func NewActorFacade ¶
func NewActorFacade(actorSystem *actor.ActorSystem) *ActorSystem
NewActorFacade creates a new instance of ActorFacade
func (*ActorSystem) ActorSystem ¶
func (af *ActorSystem) ActorSystem() *actor.ActorSystem
func (*ActorSystem) RequestFuture ¶
func (*ActorSystem) Start ¶
func (af *ActorSystem) Start() error
func (*ActorSystem) Stop ¶
func (af *ActorSystem) Stop(ctx context.Context) error
Stop 开始ActorSystem的优雅关闭流程 参数:
- ctx: 用于控制停止操作超时和取消的上下文
返回:
- 如果系统已在关闭或关闭成功则返回nil,如果发生错误则返回错误
说明:
- 此方法只有在系统处于运行状态时才会进行实际关闭
- 使用CAS操作确保只有一个goroutine能够触发系统关闭
func (*ActorSystem) StopWithDefaultTimeout ¶
func (af *ActorSystem) StopWithDefaultTimeout() error
StopWithDefaultTimeout is a convenience method that stops the actor system with a default timeout of 60 seconds
func (*ActorSystem) StopWithTimeout ¶
func (af *ActorSystem) StopWithTimeout(timeout time.Duration) error
StopWithTimeout is a convenience method that stops the actor system with a timeout It creates a context with the specified timeout and calls Stop
type ActorsCache ¶
type ActorsCache struct {
// contains filtered or unexported fields
}
func NewActorsCache ¶
func NewActorsCache() *ActorsCache
func (*ActorsCache) Delete ¶
func (c *ActorsCache) Delete(actorName string)
func (*ActorsCache) Exist ¶
func (c *ActorsCache) Exist(actorName string) bool
func (*ActorsCache) Set ¶
func (c *ActorsCache) Set(actorName string, p *Process)
type BehaivorFactory ¶
Factory function type that accepts an actor ID
func Dispatch ¶
func Dispatch(pattern string) BehaivorFactory
Dispatch returns a factory function for creating actors with the given ID
type Behavior ¶
type Behavior interface { HandleRequest(ctx IContext, msg any) (any, error) HandleSend(ctx IContext, msg any) HandleForward(ctx IContext, msg any) HandleInit(ctx IContext) error HandleStopping(ctx IContext) error HandleStopped(ctx IContext) error }
func CreateBehaviorWithID ¶
CreateBehaviorWithID creates an actor with a specific ID
type CallMessage ¶
type CallMessageResponse ¶
type CastMessage ¶
type CheckAliveMessage ¶
type CheckAliveMessage struct{}
type ChildActor ¶
ChildActor 表示由SupervisorActor管理的子Actor 实现了InitNotifiable接口,允许在初始化完成后发送通知
func NewChildActor ¶
func NewChildActor(behavior Behavior, name, pattern string, meta *Meta, aliveTimeout time.Duration, initCB func(err error) error) *ChildActor
NewChildActor 创建一个新的子Actor
func (*ChildActor) GetActorContext ¶
func (state *ChildActor) GetActorContext() actor.Context
func (*ChildActor) GetActorName ¶
func (state *ChildActor) GetActorName() string
func (*ChildActor) GetContext ¶
func (state *ChildActor) GetContext() IContext
func (*ChildActor) GetMetaData ¶
func (state *ChildActor) GetMetaData() *Meta
func (*ChildActor) GetPattern ¶
func (state *ChildActor) GetPattern() string
func (*ChildActor) GetServerId ¶
func (state *ChildActor) GetServerId() string
func (*ChildActor) HandleInit ¶
func (state *ChildActor) HandleInit(context actor.Context)
HandleInit 在Actor启动时执行的初始化逻辑 返回nil表示成功,否则返回错误
func (*ChildActor) HandleStopped ¶
func (state *ChildActor) HandleStopped(context actor.Context)
func (*ChildActor) HandleStopping ¶
func (state *ChildActor) HandleStopping(context actor.Context) error
func (*ChildActor) Receive ¶
func (state *ChildActor) Receive(context actor.Context)
Receive 处理接收到的消息
func (*ChildActor) SetActorContext ¶
func (state *ChildActor) SetActorContext(context actor.Context)
func (*ChildActor) SetMetaData ¶
func (state *ChildActor) SetMetaData(meta *Meta)
type ChildStartedNotification ¶
ChildStartedNotification 子Actor启动完成并执行Behavior HandleInit后发送的通知
type Dispatcher ¶
type Dispatcher struct { Type DispatcherType `protobuf:"varint,1,opt,name=Type,proto3,enum=actor.DispatcherType" json:"Type,omitempty"` ServerId string `protobuf:"bytes,2,opt,name=ServerId,proto3" json:"ServerId,omitempty"` //最新的逻辑服ID NodeId string `protobuf:"bytes,3,opt,name=NodeId,proto3" json:"NodeId,omitempty"` //最新的节点ID // contains filtered or unexported fields }
func (*Dispatcher) Descriptor
deprecated
func (*Dispatcher) Descriptor() ([]byte, []int)
Deprecated: Use Dispatcher.ProtoReflect.Descriptor instead.
func (*Dispatcher) GetNodeId ¶
func (x *Dispatcher) GetNodeId() string
func (*Dispatcher) GetServerId ¶
func (x *Dispatcher) GetServerId() string
func (*Dispatcher) GetType ¶
func (x *Dispatcher) GetType() DispatcherType
func (*Dispatcher) ProtoMessage ¶
func (*Dispatcher) ProtoMessage()
func (*Dispatcher) ProtoReflect ¶
func (x *Dispatcher) ProtoReflect() protoreflect.Message
func (*Dispatcher) Reset ¶
func (x *Dispatcher) Reset()
func (*Dispatcher) String ¶
func (x *Dispatcher) String() string
type DispatcherType ¶
type DispatcherType int32
const ( DispatcherType_DISPATCHER_TYPE_IN_WORLD DispatcherType = 0 // 世界分组 DispatcherType_DISPATCHER_TYPE_IN_REGION DispatcherType = 1 // 区域分组 DispatcherType_DISPATCHER_TYPE_RANDOM DispatcherType = 2 // 随机分组 )
func (DispatcherType) Descriptor ¶
func (DispatcherType) Descriptor() protoreflect.EnumDescriptor
func (DispatcherType) Enum ¶
func (x DispatcherType) Enum() *DispatcherType
func (DispatcherType) EnumDescriptor
deprecated
func (DispatcherType) EnumDescriptor() ([]byte, []int)
Deprecated: Use DispatcherType.Descriptor instead.
func (DispatcherType) Number ¶
func (x DispatcherType) Number() protoreflect.EnumNumber
func (DispatcherType) String ¶
func (x DispatcherType) String() string
func (DispatcherType) Type ¶
func (DispatcherType) Type() protoreflect.EnumType
type ForwardMessage ¶
type ForwardMessageResponse ¶
type GracefulShutdownManager ¶
type GracefulShutdownManager struct {
// contains filtered or unexported fields
}
GracefulShutdownManager 负责管理优雅关闭过程,通过反复尝试特定操作直到连续成功达到阈值次数 用于确保系统组件可以安全、可靠地关闭,即使在分布式或并发环境中
func NewGracefulShutdownManager ¶
func NewGracefulShutdownManager(successThreshold int32, shutdownOperation func() bool) *GracefulShutdownManager
NewGracefulShutdownManager 创建一个新的优雅关闭管理器 参数:
- successThreshold: 确认操作完成所需的连续成功次数
- shutdownOperation: 实际执行关闭操作并返回是否成功的函数
func NewGracefulShutdownManagerWithMaxAttempts ¶
func NewGracefulShutdownManagerWithMaxAttempts(successThreshold int32, maxAttempts int32, shutdownOperation func() bool) *GracefulShutdownManager
NewGracefulShutdownManagerWithMaxAttempts 创建一个有最大尝试次数限制的优雅关闭管理器 参数:
- successThreshold: 确认操作完成所需的连续成功次数
- maxAttempts: 最大尝试次数,0表示无限重试
- shutdownOperation: 实际执行关闭操作并返回是否成功的函数
type IBaseContext ¶
type IContext ¶
type IContext interface { IBaseContext ITimerContext }
type ITimerContext ¶
type Item ¶
type Item struct { ActorName string Pattern string Future []*actor.PID Child *actor.PID Props *Props }
func (*Item) FuturesNum ¶
type Meta ¶
type Meta struct { ActorName string `protobuf:"bytes,1,opt,name=ActorName,proto3" json:"ActorName,omitempty"` Pattern string `protobuf:"bytes,2,opt,name=Pattern,proto3" json:"Pattern,omitempty"` ServerId string `protobuf:"bytes,3,opt,name=ServerId,proto3" json:"ServerId,omitempty"` //初始逻辑服ID Dispatcher *Dispatcher `protobuf:"bytes,4,opt,name=Dispatcher,proto3" json:"Dispatcher,omitempty"` // contains filtered or unexported fields }
func NewMeta ¶
func NewMeta(name, pattern, serverId string, dispatcher *Dispatcher) *Meta
func (*Meta) Descriptor
deprecated
func (*Meta) GetActorName ¶
func (*Meta) GetDispatcher ¶
func (x *Meta) GetDispatcher() *Dispatcher
func (*Meta) GetPattern ¶
func (*Meta) GetServerId ¶
func (*Meta) ProtoMessage ¶
func (*Meta) ProtoMessage()
func (*Meta) ProtoReflect ¶
func (x *Meta) ProtoReflect() protoreflect.Message
type PoisonActorMessage ¶
type Process ¶
type Process struct { State int8 ActorName string Pattern string Props *Props PID *actor.PID // contains filtered or unexported fields }
func GetOrStartActor ¶
GetOrStartActor 获取一个就绪的Actor对象
func NewActorProcess ¶
func (*Process) RequestFuture ¶
type Props ¶
type Props struct { InitHandler func() error Meta *Meta AliveTimeout time.Duration // contains filtered or unexported fields }
func (*Props) GetAliveTimeout ¶
func (*Props) GetInitHandler ¶
type PropsOption ¶
type PropsOption func(pp *Props)
func WithAliveTimeout ¶
func WithAliveTimeout(timeout time.Duration) PropsOption
WithAliveTimeout 配置Actor的活跃超时时间
参数:
- timeout: 活跃超时时间,如果Actor在此时间内没有收到任何消息或处理任何事件, 将被视为不活跃。系统可能会根据此配置停止或重启不活跃的Actor。
详细说明:
- 每个Actor都有一个最后活动时间戳,当收到消息或处理事件时会更新此时间戳
- 系统会定期检查Actor的活跃状态,如果发现Actor超过指定的timeout时间没有活动, 将触发清理机制,可能会停止或重启该Actor
- 这个机制有助于识别并清理"僵尸"Actor,避免资源泄漏
- 如果不设置此选项,将使用系统默认的超时时间(通常为30分钟)
使用场景:
- 限制长时间不活跃的Actor占用系统资源
- 对不同类型的Actor设置不同的活跃检测策略
- 确保关键Actor在长时间不活跃时能被重启
示例:
props := NewProps( WithAliveTimeout(30 * time.Minute), // 设置30分钟活跃超时 ) actorRef := NewActorRef(props, "my-actor", "my-pattern")
func WithInitHandler ¶
func WithInitHandler(handler func() error) PropsOption
func WithMeta ¶
func WithMeta(meta *Meta) PropsOption
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewPriorityQueue ¶
func NewPriorityQueue() *Queue
func (*Queue) PopAndRangeWithKey ¶
type RequestMessage ¶
type StartActorRequest ¶
type StartActorRequest struct { Pattern string ActorName string Timeout time.Duration Future *actor.PID Props *Props }
Message types for ActorManager
type StartActorWait ¶
type StartActorWait struct { }
type StopAllRequest ¶
type StopAllRequest struct { }
type StopAllResponse ¶
type StopAllResponse struct {
Complete bool
}
type Timer ¶
type Timer struct {
// contains filtered or unexported fields
}
Timer 表示一个定时器
func NewSystemTimer ¶
func (*Timer) GetDuration ¶
type TimerMessage ¶
type TimerMessage struct{}
type TimerMgr ¶
type TimerMgr struct {
// contains filtered or unexported fields
}
TimerMgr 使用最小堆管理Timer对象
1:所有接口不允许并发操作 2:管理两种定时器 Timer: 普通定时器,不支持自动续约 支持操作类型:Insert,Remove,Update SystemTimer: 系统定时器,支持自动续约 支持操作类型:Insert,Remove 3: 所有接口不允许并发操作
func (*TimerMgr) AddSystemTimer ¶
AddSystemTimer 添加一个系统定时器 SystemTimer:
1: 系统定时器会自动续约 2: 系统定时器不支持更新操作,如果需要更新,请使用RemoveTimer和AddSystemTimer
func (*TimerMgr) AddTimerOnce ¶
AddTimerOnce 添加一个一次性定时器 Timer:
1: 一次性定时器,支持插入/更新/删除操作 2: 一次性定时器不支持自动续约