ograph

package module
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2025 License: MIT Imports: 17 Imported by: 0

README

                ________________                     ______  
                __  __ \_  ____/____________ ___________  /_ 
                _  / / /  / __ __  ___/  __ `/__  __ \_  __ \
                / /_/ // /_/ / _  /   / /_/ /__  /_/ /  / / /
                \____/ \____/  /_/    \__,_/ _  .___//_/ /_/ 
			                                 /_/             

OGraph: A simple way to build a pipeline with Go

languages os

中文 | English

OGraph 是一个用 Go 实现的图流程执行框架。

你可以通过构建Pipeline(流水线),来控制依赖元素依次顺序执行、非依赖元素并发执行的调度功能。

此外,OGraph 还提供了丰富的重试,超时限制,执行追踪等开箱即用的特征。

同类项目对比

OGraph 受启发于另一个 C++项目 CGraph。但 OGraph 并不等于 Go 版本的 CGraph。

功能对比

和 CGraph 一样,OGraph 也提供基本的构图和调度执行能力,但有以下几点关键不同:

  • 用 Go 实现,使用协程而非线程进行调度,更轻量灵活

  • 支持通过 Wrapper 来自定义循环、执行条件判断、错误处理等逻辑,并可以随意组合

  • 支持导出图结构,再在别处导入执行(符合限制的情况下)

  • 灵活的虚节点设置,用以简化依赖关系

性能对比

经过 Benchmark 测试,OGraph 性能优于 CGraph。

CGraph 性能测试参考

OGraph 性能测试参考

CGraph(基准) OGraph(本项目)
场景一(无连接32节点) 8204 ns/op 4308 ns/op(+90.4%)
场景二(串行连接32节点) 572 ns/op 281.7 ns/op(+103%)
场景三(简单DAG 6节点) 4042 ns/op 2762 ns/op(+46.3%)
场景四(8x8全连接) 13450 ns/op 8333 ns/op(+61.4%)

快速开始

第一步:声明一个 Node 接口实现
type Person struct {
	ograph.BaseNode
}

func (person *Person) Run(ctx context.Context, state ogcore.State) error {
	fmt.Printf("Hello, i am %s.\n", person.Name())
	return nil
}

上面代码中 Person 组合了 BaseNode,并覆写了 Node 接口方法 Run。

第二步:构建一个 Pipeline 并运行
func TestHello(t *testing.T) {
	pipeline := ograph.NewPipeline()

	zhangSan := ograph.NewElement("ZhangSan").UseNode(&Person{})
	liSi := ograph.NewElement("LiSi").UseNode(&Person{})

	pipeline.Register(zhangSan).
		Register(liSi, ograph.Rely(zhangSan))

	if err := pipeline.Run(context.TODO(), nil); err != nil {
		t.Error(err)
	}
}

上面代码在 pipeline 中注册了两个 Person 节点(zhangSan、liSi),并指定 liSi 依赖于 zhangSan。

输出结果

Hello, i am ZhangSan.
Hello, i am LiSi.

更多文档

请前往 https://symphony09.github.io/ograph-docs 查看更多文档!

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Branch = func(elements ...*Element) Op {
	return func(pipeline *Pipeline, element *Element) {
		if len(elements) == 0 {
			return
		}

		var prev, next *Element

		prev = element

		for i := range elements {
			next = elements[i]

			if pipeline.elements[next.Name] == nil {
				pipeline.Register(next)
			}

			if pipeline.elements[next.Name] == next {
				pipeline.graph.AddEdge(prev.Name, next.Name)
			}

			prev = next
		}
	}
}

Register(a, ograph.Branch(b, c, d)) => a->b->c->d

View Source
var ErrFactoryNotFound error = errors.New("factory not found")
View Source
var ErrSingletonNotSet error = errors.New("single node not set")
View Source
var Rely = func(dependencies ...*Element) Op {
	return func(pipeline *Pipeline, element *Element) {
		for _, dep := range dependencies {
			if pipeline.elements[dep.Name] == nil {
				pipeline.Register(dep)
			}

			if pipeline.elements[dep.Name] == dep {
				pipeline.graph.AddEdge(dep.Name, element.Name)
			}
		}
	}
}
View Source
var Then = func(nextElements ...*Element) Op {
	return func(pipeline *Pipeline, element *Element) {
		for _, next := range nextElements {
			if pipeline.elements[next.Name] == nil {
				pipeline.Register(next)
			}

			if pipeline.elements[next.Name] == next {
				pipeline.graph.AddEdge(element.Name, next.Name)
			}
		}
	}
}

Functions

func LoadPrivateState

func LoadPrivateState[SK ~string, SV any](state ogcore.State, key string) SV

func LoadState

func LoadState[SV any](state ogcore.State, key string) SV

func SavePrivateState

func SavePrivateState[SK ~string](state ogcore.State, key string, val any, overwrite bool)

func SaveState

func SaveState(state ogcore.State, key string, val any, overwrite bool)

func UpdatePrivateState

func UpdatePrivateState[SK ~string, SV any](state ogcore.State, key string, updateFunc func(oldVal SV) (val SV)) error

func UpdateState

func UpdateState[SV any](state ogcore.State, key string, updateFunc func(oldVal SV) (val SV)) error

Types

type BaseCluster

type BaseCluster struct {
	BaseNode

	Group   []ogcore.Node
	NodeMap map[string]ogcore.Node
}

func (*BaseCluster) Join

func (cluster *BaseCluster) Join(nodes []ogcore.Node)

func (BaseCluster) Run

func (cluster BaseCluster) Run(ctx context.Context, state ogcore.State) error

type BaseEventNode added in v0.7.0

type BaseEventNode struct {
	BaseNode
	*eventd.EventBus[ogcore.State]
}

func (*BaseEventNode) AttachBus added in v0.7.0

func (node *BaseEventNode) AttachBus(bus *eventd.EventBus[ogcore.State])

type BaseNode

type BaseNode struct {
	Action ogcore.Action
	// contains filtered or unexported fields
}

func (*BaseNode) Name

func (node *BaseNode) Name() string

func (BaseNode) Run

func (node BaseNode) Run(ctx context.Context, state ogcore.State) error

func (*BaseNode) SetName

func (node *BaseNode) SetName(name string)

type BaseState

type BaseState struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewState

func NewState() *BaseState

func (*BaseState) Get

func (state *BaseState) Get(key any) (any, bool)

func (*BaseState) Set

func (state *BaseState) Set(key any, val any)

func (*BaseState) Update

func (state *BaseState) Update(key any, updateFunc func(val any) any)

type BaseWrapper

type BaseWrapper struct {
	BaseNode

	ogcore.Node
}

func (BaseWrapper) Run

func (wrapper BaseWrapper) Run(ctx context.Context, state ogcore.State) error

func (*BaseWrapper) Wrap

func (wrapper *BaseWrapper) Wrap(node ogcore.Node)

type Builder

type Builder struct {
	Factories *ogcore.Factories
}

func (*Builder) RegisterFactory

func (builder *Builder) RegisterFactory(name string, factory func() ogcore.Node) *Builder

func (*Builder) RegisterPrototype

func (builder *Builder) RegisterPrototype(name string, prototype ogcore.Cloneable) *Builder

type Element

type Element struct {
	Virtual     bool `json:"Virtual,omitempty"`
	Name        string
	FactoryName string         `json:"FactoryName,omitempty"`
	Wrappers    []string       `json:"Wrappers,omitempty"`
	ParamsMap   map[string]any `json:"ParamsMap,omitempty"`
	DefaultImpl string         `json:"DefaultImpl,omitempty"`
	Priority    int            `json:"Priority,omitempty"`

	WrapperAlias map[string]string `json:"WrapperAlias,omitempty"`

	Singleton ogcore.Node `json:"-"`

	PrivateFactory func() ogcore.Node `json:"-"`

	SubElements []*Element `json:"SubElements,omitempty"`
}

func NewElement

func NewElement(name string) *Element

func (*Element) Apply

func (e *Element) Apply(options ...ElementOption) *Element

func (*Element) AsVirtual

func (e *Element) AsVirtual() *Element

func (*Element) GetPriority added in v0.7.0

func (e *Element) GetPriority() int

func (*Element) Params

func (e *Element) Params(key string, val any) *Element

func (*Element) SetPriority added in v0.7.0

func (e *Element) SetPriority(priority int) *Element

func (*Element) SetVirtual

func (e *Element) SetVirtual(isVirtual bool) *Element

func (*Element) UseFactory

func (e *Element) UseFactory(name string, subElements ...*Element) *Element

func (*Element) UseFn

func (e *Element) UseFn(fn func() error) *Element

func (*Element) UseNode

func (e *Element) UseNode(node ogcore.Node) *Element

func (*Element) UsePrivateFactory

func (e *Element) UsePrivateFactory(factory func() ogcore.Node, subElements ...*Element) *Element

func (*Element) Wrap

func (e *Element) Wrap(wrappers ...string) *Element

func (*Element) WrapByAlias added in v0.8.2

func (e *Element) WrapByAlias(wrapper string, alias string) *Element

type ElementOption

type ElementOption func(e *Element)

type FuncNode

type FuncNode struct {
	BaseNode

	RunFunc func(ctx context.Context, state ogcore.State) error
}

func NewFuncNode

func NewFuncNode(runFunc func(ctx context.Context, state ogcore.State) error) *FuncNode

func (*FuncNode) Run

func (node *FuncNode) Run(ctx context.Context, state ogcore.State) error

type Op

type Op func(pipeline *Pipeline, element *Element)

type PGraph

type PGraph = internal.Graph[*Element]

type Pipeline

type Pipeline struct {
	BaseNode
	Builder
	*slog.Logger

	Interrupts       iter.Seq[string]
	ParallelismLimit int
	DisablePool      bool
	EnableMonitor    bool
	SlowThreshold    time.Duration
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline() *Pipeline

func (*Pipeline) AsyncRun added in v0.7.0

func (pipeline *Pipeline) AsyncRun(ctx context.Context, state ogcore.State) (pause, continueRun func(), wait func() error)

func (*Pipeline) Check

func (pipeline *Pipeline) Check() error

func (*Pipeline) DumpDOT

func (pipeline *Pipeline) DumpDOT() ([]byte, error)

func (*Pipeline) DumpGraph

func (pipeline *Pipeline) DumpGraph() ([]byte, error)

func (*Pipeline) ForEachElem

func (pipeline *Pipeline) ForEachElem(op func(e *Element)) *Pipeline

func (*Pipeline) LoadGraph

func (pipeline *Pipeline) LoadGraph(data []byte) error

func (*Pipeline) Register

func (pipeline *Pipeline) Register(e *Element, ops ...Op) *Pipeline

func (*Pipeline) ResetPool

func (pipeline *Pipeline) ResetPool()

func (*Pipeline) Run

func (pipeline *Pipeline) Run(ctx context.Context, state ogcore.State) error

func (*Pipeline) SetPoolCache

func (pipeline *Pipeline) SetPoolCache(size int, warmup bool) error

func (*Pipeline) Subscribe added in v0.7.0

func (pipeline *Pipeline) Subscribe(callback eventd.CallBack[ogcore.State], ops ...eventd.Op) (cancel func(), err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL