life

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2025 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventTypeSecondTick = WorkerEventType(iota + 10_000)
	EventTypeMinuteTick
)

Start from 10_000. The previous numbers are reserved for custom event types: v1.WorkerEventType

Variables

This section is empty.

Functions

func ErrorMessage

func ErrorMessage(err error) string

func ErrorMessagef

func ErrorMessagef(err error, format string, args ...any) string

func GetArg

func GetArg[T EventArgValue](arg *EventArg, key any) T

func IsAdminContext

func IsAdminContext(ctx context.Context) bool

func IsAdminID

func IsAdminID(id int64) bool

func IsAdminStatus

func IsAdminStatus(status intrav1.OnlineStatus) bool

func IsDevContext

func IsDevContext(ctx context.Context) bool

func IsDevStatus

func IsDevStatus(status intrav1.OnlineStatus) bool

func IsGateContext

func IsGateContext(ctx context.Context) bool

func IsGateStatus

func IsGateStatus(status intrav1.OnlineStatus) bool

func IsInnerContext

func IsInnerContext(ctx context.Context) bool

func IsInnerStatus

func IsInnerStatus(status intrav1.OnlineStatus) bool

func IsSvcContext

func IsSvcContext(ctx context.Context) bool

func IsSvcStatus

func IsSvcStatus(status intrav1.OnlineStatus) bool

func Message

func Message(format string, args ...any) string

func OnlineStatus

func OnlineStatus(status int64) intrav1.OnlineStatus

func PutEventArg

func PutEventArg(e *EventArg)

Types

type BroadcastMessage

type BroadcastMessage struct {
	// contains filtered or unexported fields
}

type Broadcastable

type Broadcastable interface {
	Broadcast(wctx Context, mod climod.ModuleID, seq int32, obj int64, body proto.Message) error
	Multicast(wctx Context, mod climod.ModuleID, seq int32, obj int64, body proto.Message, uids ...int64) error
	Consume() <-chan *BroadcastMessage
	ExecuteBroadcast(msg *BroadcastMessage) error
}

func NewBroadcaster

func NewBroadcaster(pusher *data.PushRepo) Broadcastable

type BuildTunnelResponseFunc

type BuildTunnelResponseFunc = func(mod int32, seq int32, obj int64, msg proto.Message) (xnet.TunnelMessage, error)

type BuiltinEventFuncs

type BuiltinEventFuncs struct {
	// contains filtered or unexported fields
}

func (*BuiltinEventFuncs) RegisterCustomEvent

func (tf *BuiltinEventFuncs) RegisterCustomEvent(e WorkerEventType, f eventFunc)

func (*BuiltinEventFuncs) RegisterMinuteTick

func (tf *BuiltinEventFuncs) RegisterMinuteTick(f func(ctx Context) error)

func (*BuiltinEventFuncs) RegisterOnCreatedEvent

func (tf *BuiltinEventFuncs) RegisterOnCreatedEvent(f func(ctx Context) error)

func (*BuiltinEventFuncs) RegisterOnLoadEvent

func (tf *BuiltinEventFuncs) RegisterOnLoadEvent(f func(ctx Context) error)

func (*BuiltinEventFuncs) RegisterSecondTick

func (tf *BuiltinEventFuncs) RegisterSecondTick(f func(ctx Context) error)

type ChangedModules

type ChangedModules struct {
	// contains filtered or unexported fields
}

func NewChangedModules

func NewChangedModules(cp int) *ChangedModules

func (*ChangedModules) Add

func (c *ChangedModules) Add(modules []ModuleKey)

func (*ChangedModules) Move

func (c *ChangedModules) Move() []ModuleKey

type Context

type Context interface {
	context.Context
	EventManageable
	Responsive

	Now() time.Time
	ClientIP() string

	UID() int64
	OID() int64
	SID() int64

	UnsafeObject() any
	Snapshot() VersionProto

	Changed(modules ...ModuleKey)
	ChangedImmediately(modules ...ModuleKey)
	ChangedModules() (modules []ModuleKey, immediately bool)
}

func NewContext

func NewContext(ctx context.Context, w *Worker) Context

type EventArg

type EventArg struct {
	// contains filtered or unexported fields
}

func GetEventArg

func GetEventArg() *EventArg

func (*EventArg) Reset

func (e *EventArg) Reset()

type EventArgValue

type EventArgValue interface {
	int64 | string | float64 | []int64 | []string | []float64
}

type EventFunc

type EventFunc func(wctx Context) (err error)

type EventManageable

type EventManageable interface {
	EmitEvent(t WorkerEventType, args ...WithArg) error
	EmitEventFunc(f EventFunc) error
	ConsumeEvent() <-chan EventFunc
}

type Manager

type Manager struct {
	xsync.Stoppable
	*BuiltinEventFuncs
	// contains filtered or unexported fields
}

func NewManager

func NewManager(logger log.Logger, rt routetable.ReNewalRouteTable, pusher *data.PushRepo, newContext newContextFunc, newPersister newPersisterFunc) (m *Manager, stopFunc func() error)

func (*Manager) Pusher

func (m *Manager) Pusher() *data.PushRepo

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) (err error)

func (*Manager) Worker

func (m *Manager) Worker(ctx context.Context, oid int64, replier Responsive) (worker *Worker, err error)

type Module

type Module interface {
	ModuleCodec

	IsLifeModule()
}

type ModuleCodec

type ModuleCodec interface {
	EncodeServer() proto.Message
	DecodeServer(proto.Message) error
}

type ModuleKey

type ModuleKey string

type NewModuleFunc

type NewModuleFunc func() Module

type ObjectHolder

type ObjectHolder interface {
	ID() int64
	Version() int64
	UnsafeObject() any
	Snapshot() VersionProto
}

type Option

type Option func(*options)

func WithShardCount

func WithShardCount(shardCount uint64) Option

type PersistManager

type PersistManager struct {
	xsync.Stoppable
	// contains filtered or unexported fields
}

func (*PersistManager) Change

func (s *PersistManager) Change(ctx context.Context, modules []ModuleKey, immediately bool)

func (*PersistManager) ID

func (s *PersistManager) ID() int64

func (*PersistManager) Immediately

func (s *PersistManager) Immediately() chan struct{}

func (*PersistManager) Persist

func (s *PersistManager) Persist(ctx context.Context, proto VersionProto) error

func (*PersistManager) Persister

func (s *PersistManager) Persister() Persistent

func (*PersistManager) PrepareToPersist

func (s *PersistManager) PrepareToPersist(ctx context.Context) error

func (*PersistManager) SaveChan

func (s *PersistManager) SaveChan() chan VersionProto

func (*PersistManager) Stop

func (s *PersistManager) Stop(ctx context.Context) (err error)

type Persistent

type Persistent interface {
	ObjectHolder

	AllModuleKeys() []ModuleKey
	Lock(f func() error) error
	Refresh(ctx context.Context) (err error)
	PrepareToPersist(ctx context.Context, modules []ModuleKey) (VersionProto, error)
	Persist(ctx context.Context, id int64, proto VersionProto) (err error)
	IncVersion(ctx context.Context, id int64) (err error)
	OnStop(ctx context.Context, id int64, proto VersionProto) (err error)
}

type Responser

type Responser struct {
	// contains filtered or unexported fields
}

func NewResponser

func NewResponser(sendFunc SendFunc, buildPushMessageFunc BuildTunnelResponseFunc) *Responser

func (*Responser) ConsumeTunnelResponse

func (w *Responser) ConsumeTunnelResponse() <-chan xnet.TunnelMessage

func (*Responser) ExecuteSend

func (w *Responser) ExecuteSend(out xnet.TunnelMessage) error

func (*Responser) Push

func (w *Responser) Push(mod int32, seq int32, obj int64, sc proto.Message) error

func (*Responser) PushImmediately

func (w *Responser) PushImmediately(mod int32, seq int32, obj int64, sc proto.Message) error

func (*Responser) Reply

func (w *Responser) Reply(msg xnet.TunnelMessage)

func (*Responser) SCIndex

func (w *Responser) SCIndex() int32

func (*Responser) SendFunc

func (w *Responser) SendFunc() SendFunc

func (*Responser) SetSCIndexFunc

func (w *Responser) SetSCIndexFunc(scIndexFunc func() int32)

func (*Responser) UpdateSendFunc

func (w *Responser) UpdateSendFunc(sendFunc SendFunc)

type Responsive

type Responsive interface {
	Push(mod int32, seq int32, obj int64, sc proto.Message) error
	PushImmediately(mod int32, seq int32, obj int64, sc proto.Message) error
	Reply(msg xnet.TunnelMessage)
	ConsumeTunnelResponse() <-chan xnet.TunnelMessage
	ExecuteSend(msg xnet.TunnelMessage) error
	SendFunc() SendFunc
	UpdateSendFunc(sendFunc SendFunc)
	SCIndex() int32
}

type SendFunc

type SendFunc = func(msg xnet.TunnelMessage) error

type Tickers

type Tickers struct {
	*BuiltinEventFuncs
	// contains filtered or unexported fields
}

func (*Tickers) Stop

func (t *Tickers) Stop()

type Tuple

type Tuple struct {
	Key int64
	Val *Worker
}

Tuple used by the Iter functions to wrap two variables together over a channel,

type VersionProto

type VersionProto interface {
	proto.Message

	GetVersion() int64
}

type WithArg

type WithArg func(arg *EventArg)

func With

func With[T EventArgValue](key any, value T) WithArg

type Workable

type Workable interface {
	xsync.Stoppable
	EventManageable

	ID() int64
}

type Worker

type Worker struct {
	xsync.Stoppable
	Broadcastable
	Responsive
	*BuiltinEventFuncs
	*Tickers
	// contains filtered or unexported fields
}

func (*Worker) ClientIP

func (w *Worker) ClientIP() string

func (*Worker) ConsumeEvent

func (w *Worker) ConsumeEvent() <-chan EventFunc

func (*Worker) EmitEvent

func (w *Worker) EmitEvent(t WorkerEventType, args ...WithArg) error

func (*Worker) EmitEventFunc

func (w *Worker) EmitEventFunc(f EventFunc) error

func (*Worker) ExecuteEvent

func (w *Worker) ExecuteEvent(wctx Context, f EventFunc) error

func (*Worker) ID

func (w *Worker) ID() int64

func (*Worker) IsAdminID

func (w *Worker) IsAdminID() bool

func (*Worker) LogInfo

func (w *Worker) LogInfo() string

func (*Worker) Referer

func (w *Worker) Referer() string

func (*Worker) Status

func (w *Worker) Status() intrav1.OnlineStatus

func (*Worker) Stop

func (w *Worker) Stop(ctx context.Context) (err error)

func (*Worker) Unique

func (w *Worker) Unique() uint64

type WorkerBroadcaster

type WorkerBroadcaster struct {
	// contains filtered or unexported fields
}

WorkerBroadcaster TODO

func (*WorkerBroadcaster) Broadcast

func (w *WorkerBroadcaster) Broadcast(wctx Context, mod climod.ModuleID, seq int32, obj int64, body proto.Message) (err error)

func (*WorkerBroadcaster) Consume

func (w *WorkerBroadcaster) Consume() <-chan *BroadcastMessage

func (*WorkerBroadcaster) ExecuteBroadcast

func (w *WorkerBroadcaster) ExecuteBroadcast(msg *BroadcastMessage) error

func (*WorkerBroadcaster) Multicast

func (w *WorkerBroadcaster) Multicast(wctx Context, mod climod.ModuleID, seq int32, obj int64, body proto.Message, uids ...int64) (err error)

type WorkerEventType

type WorkerEventType int32

WorkerEventType worker event type. Custom event types should be less than 10_000

type WorkerMap

type WorkerMap struct {
	// contains filtered or unexported fields
}

WorkerMap is a thread-safe map implementation that uses sharding to reduce lock contention. It divides the map into multiple shards, each protected by its own RWMutex. This allows for better concurrent access compared to a single map protected by a single lock.

func NewWorkerMap

func NewWorkerMap(opts ...Option) *WorkerMap

func (WorkerMap) Get

func (m WorkerMap) Get(key int64) *Worker

func (WorkerMap) Iter

func (m WorkerMap) Iter() <-chan Tuple

Iter returns a buffered iterator which could be used in a for range loop.

func (WorkerMap) Remove

func (m WorkerMap) Remove(key int64)

func (WorkerMap) Set

func (m WorkerMap) Set(key int64, value *Worker) (old *Worker)

func (WorkerMap) SimilarCount

func (m WorkerMap) SimilarCount() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL