pipe

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Chain

type Chain[T models.TokenConstraint] struct {
	// contains filtered or unexported fields
}

The Chain contains a routine to process data. Chains will be running in parallel. Each routine is built from a Reader and/or Processor and/or Writer.

func NewReaderChain

func NewReaderChain[T models.TokenConstraint](r Reader[T], p Processor[T]) (*Chain[T], chan T)

NewReaderChain returns a new Chain with a Reader and a Processor, and communication channel for backup operation.

func NewWriterChain

func NewWriterChain[T models.TokenConstraint](w Writer[T], limiter *bandwidth.Limiter) (*Chain[T], chan T)

NewWriterChain returns a new Chain with a Writer, and communication channel for backup operation.

func (*Chain[T]) Run

func (c *Chain[T]) Run(ctx context.Context) error

Run execute the chain.

type Fanout

type Fanout[T models.TokenConstraint] struct {
	Inputs  []chan T
	Outputs []chan T
	// contains filtered or unexported fields
}

Fanout routes messages between chain pools. FanoutStrategy controls the distribution of messages to output channels.

func NewFanout

func NewFanout[T models.TokenConstraint](
	inputs []chan T,
	outputs []chan T,
	strategy FanoutStrategy,
) (*Fanout[T], error)

NewFanout returns a new Fanout.

func (*Fanout[T]) Close

func (f *Fanout[T]) Close()

Close closes all output channels.

func (*Fanout[T]) GetMetrics

func (f *Fanout[T]) GetMetrics() (in, out int)

GetMetrics returns the accumulated length for input and output channels.

func (*Fanout[T]) Run

func (f *Fanout[T]) Run(ctx context.Context)

Run starts routing messages in separate goroutines based on the defined fanout strategy.

type FanoutStrategy

type FanoutStrategy int

FanoutStrategy represents a pipeline routing strategy.

const (
	// Fixed strategy routes incoming tokens to output channels, establishing a
	// dedicated one-to-one mapping between input channels and output channels.
	// All tokens read from a specific input channel are routed to its pre-assigned
	// output channel. The number of output channels must equal the number of input
	// channels being processed.
	Fixed FanoutStrategy = iota
	// RoundRobin distributes incoming tokens between available output channels in a
	// fair, rotating manner.
	RoundRobin
	// Split routes incoming tokens to output channels using a custom routing function.
	// The routing function determines the destination channel for each token based on
	// its partition id.
	Split
)

type Pipe

type Pipe[T models.TokenConstraint] struct {
	// contains filtered or unexported fields
}

Pipe is running and managing everything.

func NewPipe

func NewPipe[T models.TokenConstraint](
	pc ProcessorCreator[T],
	readers []Reader[T],
	writers []Writer[T],
	limiter *bandwidth.Limiter,
	strategy FanoutStrategy,
) (*Pipe[T], error)

NewPipe creates cew backup pipeline.

func (*Pipe[T]) Close added in v0.5.0

func (p *Pipe[T]) Close()

Close clean memory for GC.

func (*Pipe[T]) GetMetrics

func (p *Pipe[T]) GetMetrics() (in, out int)

GetMetrics returns the accumulated length for input and output channels.

func (*Pipe[T]) Run

func (p *Pipe[T]) Run(ctx context.Context) error

Run start pipe with readers, writers and fanout.

type Pool

type Pool[T models.TokenConstraint] struct {
	Chains []*Chain[T]
	// Outputs and Inputs are mutually exclusive.
	Inputs  []chan T
	Outputs []chan T
}

Pool is a pool of chains. All chains in a pool are running in parallel. Pools are communicating via fanout.

func NewReaderPool

func NewReaderPool[T models.TokenConstraint](readers []Reader[T], pc ProcessorCreator[T]) *Pool[T]

NewReaderPool returns a new pool of Reader and Processor chains for backup operations, with the specified parallelism.

func NewWriterPool

func NewWriterPool[T models.TokenConstraint](writers []Writer[T], limiter *bandwidth.Limiter) *Pool[T]

NewWriterPool creates a new pool of Writer chains for backup operations, with the specified parallelism and bandwidth.

func (*Pool[T]) Close added in v0.5.0

func (p *Pool[T]) Close()

Close closing channels and cleaning links.

func (*Pool[T]) Run

func (p *Pool[T]) Run(ctx context.Context) error

Run runs all chains in the pool.

type Processor

type Processor[T models.TokenConstraint] interface {
	Process(T) (T, error)
}

Processor describes data processors.

type ProcessorCreator

type ProcessorCreator[T models.TokenConstraint] func() Processor[T]

ProcessorCreator is a function type that defines a creator for a Processor.

type Reader

type Reader[T models.TokenConstraint] interface {
	Read(ctx context.Context) (T, error)
	Close()
}

Reader describes data readers. To exit worker, the Reader must return io.EOF.

type Writer

type Writer[T models.TokenConstraint] interface {
	Write(context.Context, T) (n int, err error)
	Close() (err error)
}

Writer describes data writers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL