Documentation
¶
Index ¶
- Variables
- func LoadPrivateState[SK ~string, SV any](state ogcore.State, key string) SV
- func LoadState[SV any](state ogcore.State, key string) SV
- func SavePrivateState[SK ~string](state ogcore.State, key string, val any, overwrite bool)
- func SaveState(state ogcore.State, key string, val any, overwrite bool)
- func UpdatePrivateState[SK ~string, SV any](state ogcore.State, key string, updateFunc func(oldVal SV) (val SV)) error
- func UpdateState[SV any](state ogcore.State, key string, updateFunc func(oldVal SV) (val SV)) error
- type BaseCluster
- type BaseEventNode
- type BaseNode
- type BaseState
- type BaseWrapper
- type Builder
- type Element
- func (e *Element) Apply(options ...ElementOption) *Element
- func (e *Element) AsVirtual() *Element
- func (e *Element) GetPriority() int
- func (e *Element) Params(key string, val any) *Element
- func (e *Element) SetPriority(priority int) *Element
- func (e *Element) SetVirtual(isVirtual bool) *Element
- func (e *Element) UseFactory(name string, subElements ...*Element) *Element
- func (e *Element) UseFn(fn func() error) *Element
- func (e *Element) UseNode(node ogcore.Node) *Element
- func (e *Element) UsePrivateFactory(factory func() ogcore.Node, subElements ...*Element) *Element
- func (e *Element) Wrap(wrappers ...string) *Element
- func (e *Element) WrapByAlias(wrapper string, alias string) *Element
- type ElementOption
- type FuncNode
- type Op
- type PGraph
- type Pipeline
- func (pipeline *Pipeline) AsyncRun(ctx context.Context, state ogcore.State) (pause, continueRun func(), wait func() error)
- func (pipeline *Pipeline) Check() error
- func (pipeline *Pipeline) DumpDOT() ([]byte, error)
- func (pipeline *Pipeline) DumpGraph() ([]byte, error)
- func (pipeline *Pipeline) ForEachElem(op func(e *Element)) *Pipeline
- func (pipeline *Pipeline) LoadGraph(data []byte) error
- func (pipeline *Pipeline) Register(e *Element, ops ...Op) *Pipeline
- func (pipeline *Pipeline) ResetPool()
- func (pipeline *Pipeline) Run(ctx context.Context, state ogcore.State) error
- func (pipeline *Pipeline) SetPoolCache(size int, warmup bool) error
- func (pipeline *Pipeline) Subscribe(callback eventd.CallBack[ogcore.State], ops ...eventd.Op) (cancel func(), err error)
Constants ¶
This section is empty.
Variables ¶
View Source
var Branch = func(elements ...*Element) Op { return func(pipeline *Pipeline, element *Element) { if len(elements) == 0 { return } var prev, next *Element prev = element for i := range elements { next = elements[i] if pipeline.elements[next.Name] == nil { pipeline.Register(next) } if pipeline.elements[next.Name] == next { pipeline.graph.AddEdge(prev.Name, next.Name) } prev = next } } }
Register(a, ograph.Branch(b, c, d)) => a->b->c->d
View Source
var ErrFactoryNotFound error = errors.New("factory not found")
View Source
var ErrSingletonNotSet error = errors.New("single node not set")
View Source
var Rely = func(dependencies ...*Element) Op { return func(pipeline *Pipeline, element *Element) { for _, dep := range dependencies { if pipeline.elements[dep.Name] == nil { pipeline.Register(dep) } if pipeline.elements[dep.Name] == dep { pipeline.graph.AddEdge(dep.Name, element.Name) } } } }
View Source
var Then = func(nextElements ...*Element) Op { return func(pipeline *Pipeline, element *Element) { for _, next := range nextElements { if pipeline.elements[next.Name] == nil { pipeline.Register(next) } if pipeline.elements[next.Name] == next { pipeline.graph.AddEdge(element.Name, next.Name) } } } }
Functions ¶
func LoadPrivateState ¶
func SavePrivateState ¶
func UpdatePrivateState ¶
Types ¶
type BaseCluster ¶
func (*BaseCluster) Join ¶
func (cluster *BaseCluster) Join(nodes []ogcore.Node)
type BaseEventNode ¶ added in v0.7.0
type BaseWrapper ¶
func (*BaseWrapper) Wrap ¶
func (wrapper *BaseWrapper) Wrap(node ogcore.Node)
type Builder ¶
func (*Builder) RegisterFactory ¶
type Element ¶
type Element struct { Virtual bool `json:"Virtual,omitempty"` Name string FactoryName string `json:"FactoryName,omitempty"` Wrappers []string `json:"Wrappers,omitempty"` ParamsMap map[string]any `json:"ParamsMap,omitempty"` DefaultImpl string `json:"DefaultImpl,omitempty"` Priority int `json:"Priority,omitempty"` WrapperAlias map[string]string `json:"WrapperAlias,omitempty"` Singleton ogcore.Node `json:"-"` PrivateFactory func() ogcore.Node `json:"-"` SubElements []*Element `json:"SubElements,omitempty"` }
func NewElement ¶
func (*Element) Apply ¶
func (e *Element) Apply(options ...ElementOption) *Element
func (*Element) GetPriority ¶ added in v0.7.0
func (*Element) SetPriority ¶ added in v0.7.0
func (*Element) SetVirtual ¶
func (*Element) UseFactory ¶
func (*Element) UsePrivateFactory ¶
type ElementOption ¶
type ElementOption func(e *Element)
type FuncNode ¶
func NewFuncNode ¶
type Pipeline ¶
type Pipeline struct { BaseNode Builder *slog.Logger Interrupts iter.Seq[string] ParallelismLimit int DisablePool bool EnableMonitor bool SlowThreshold time.Duration // contains filtered or unexported fields }
func NewPipeline ¶
func NewPipeline() *Pipeline
func (*Pipeline) ForEachElem ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.