pump

package module
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2025 License: BSD-3-Clause Imports: 6 Imported by: 0

README

pump: a minimalist framework for assembling data processing pipelines.

GoDoc Go Report Card

Package pump provides a minimalist framework for composing data processing pipelines. The pipelines are type-safe, impose little overhead, and can be composed either statically, or dynamically (for example, as a function of configuration). A running pipeline stops on and returns the first error encountered.

The package defines two generic types:

  • Data generator Gen[T]: a callback-based ("push") iterator that supplies a stream of data of any type T, and
  • Pipeline stage Stage[T,U]: a function that invokes input generator Gen[T], does whatever processing it is programmed to do, and feeds the supplied callback with data items of type U.

The package also provides a basic set of functions for composing pipeline stages and binding stages to generators, as well as support for pipelining and parallel execution.

For API details see documentation.

Concept

The library is built around two data types: generator Gen[T any] and stage Stage[T,U any]. Generator is a function that passes data items to its argument - a callback function func(T) error. It is defined as

type Gen[T any] func(func(T) error) error

This is very similar to iter.Seq type from Go v1.23, except that the callback function returns error instead of a boolean. Any implementation of the generator function should stop on the first error returned from the callback, or on any internal error encountered during iteration. Here is a (simplified) example of a constructor that creates a generator iterating over the given slice:

func fromSlice[T any](src []T) Gen[T] {
    return func(yield func(T) error) error {
        for _, item := range src {
            if err := yield(item); err != nil {
                return err
            }
        }

        return nil
    }
}

Note: the library provides its own FromSlice function implementation. Also, in practice generators are more likely to read data from more complex sources, such as files, sockets, database queries, etc.

The second type, Stage, is a function that is expected to invoke the given generator, process each data item of type T and possibly forward each result (of type U) to the given callback. The Stage type is defined as

type Stage[T, U any] func(Gen[T], func(U) error) error

Just to give a simple example, this is a stage that increments every integer from its generator:

func increment(src Gen[int], yield func(int) error) error {
    return src(func(x int) error {
        return yield(x + 1)
    })
}

Just as a note, the library provides a more succinct way of defining such a simple stage (see below).

The signature of the stage function is designed to allow for full control over when and how the source generator is invoked. For example, suppose we want to have a pipeline stage where processing of each input item involves database queries, and we also want to establish a database connection before the iteration, and close it afterwards. This can be achieved using the following stage function (for some already defined types T and U):

func process(src pump.Gen[T], yield func(U) error) error {
    conn, err := connectToDatabase()

    if err != nil {
        return err
    }

    defer conn.Close()

    return src(func(item T) error { // this actually invokes the source generator
        // produce a result of type U
        result, err := produceResult(item, conn)

        if err != nil {
            return err
        }

        // pass the result further down the pipeline
        return yield(result)
    })
}

The rest of the library is essentially about constructing and composing stages. Multiple stages can be composed into one using Chain* family of functions, for example:

pipe := Chain3(increment, times2, modulo5)

Here we want to calculate (2 * (x + 1)) % 5 for each integer x. The resulting pipe is a new stage function of type func(Gen[int], func(int) error) error, and it can be invoked like

gen := FromSlice([]int{ 1, 2, 3 }) // input data generator
err := pipe(gen, func(x int) error {
    _, e := fmt.Println(x)
    return e
})

if err != nil { ... }

Or, using for-range loop:

gen := FromSlice([]int{ 1, 2, 3 }) // input data generator
it := Bind(gen, pipe).Iter()       // iterator

for x := range it.All {
    if _, err := fmt.Println(x); err != nil {
        return err
    }
}

if it.Err != nil {
    return it.Err
}

Side note: ranging over a function may be giving a bit more convenient syntax, but in practice it often results in more verbose error handling code.

To assist with writing simple stage functions (like increment above) the library provides a number of constructors, for example:

inrement := Map(func(x int) int { return x + 1 })
times2   := Map(func(x int) int { return x * 2 })
modulo5  := Map(func(x int) int { return x % 5 })

pipe := Chain3(increment, times2, modulo5)

Or, alternatively:

pipe := Chain3(
           Map(func(x int) int { return x + 1 }),
           Map(func(x int) int { return x * 2 }),
           Map(func(x int) int { return x % 5 }),
        )

In fact, a stage function can convert any input type T to any output type U, so the above pipeline can be modified to produce strings instead of integers:

pipe := Chain4(
           inrement,
           times2,
           modulo5,
           Map(strconv.Itoa),
        )

Or the input data can be filtered to skip odd numbers:

pipe := Chain4(
           Filter(func(x int) bool { return x & 1 == 0 }),
           inrement,
           times2,
           modulo5,
        )

To deal with parallelisation the library provides two helpers: Pipe and Parallel. Pipe runs all stages before it in a separate goroutine, for example:

pipe := Chain4(
           inrement,
           Pipe,
           times2,
           modulo5,
        )

When this pipeline is invoked, its generator and increment stage will be running in a dedicated goroutine, while the rest will be executed in the current goroutine.

Parallel executes the given stage in the specified number of goroutines, in parallel. All stages before Parallel are also run in a dedicated goroutine. Example:

pipe := Chain3(
           inrement,
           Parallel(5, times2),
           modulo5,
        )

Upon invocation of this pipeline, its generator and increment stage will be running in a dedicated goroutine, the times2 stage will be running in 5 goroutines in parallel, and the last stage will be in the calling goroutine.

The above pipeline can also be rearranged to run all stages in parallel:

pipe := Parallel(5, Chain3(
           inrement,
           times2,
           modulo5,
        ))

Note: Parallel stage does not preserve the order of data items.

In general, pipelines can be assembled either statically (i.e., when pipe is literally a static variable), or dynamically, for example, as a function of configuration. Also, separation between processing stages and their composition often reduces the number of modifications we have to make to the code when requirements change.

Benchmarks

All benchmarks below simply pump integers through stages with no processing at all, thus only measuring the overhead associated with running stages themselves. The first benchmark (the simplest pass-through stage) shows a very small overhead probably due to compiler optimisations, but that also highlights the fact that the iteration itself is generally quite efficient. Using for-range loop (second benchmark) gives a constant overhead of just a few nanoseconds per iteration. Benchmarks for Pipe and Parallel stages show higher overhead because of the Go channels used internally (one channel for Pipe stage, and two for Parallel).

▶ go test -bench .
goos: linux
goarch: amd64
pkg: github.com/maxim2266/pump
cpu: Intel(R) Core(TM) i5-8500T CPU @ 2.10GHz
BenchmarkSimple-6      	616850120	         2.195 ns/op
BenchmarkRangeFunc-6   	259452792	         4.608 ns/op
BenchmarkPipe-6        	 8047437	       168.5 ns/op
BenchmarkParallel-6    	 2440783	       489.2 ns/op
Project status

Tested on Linux Mint 22. Requires Go version 1.23 or higher.

Documentation

Overview

Package pump provides a minimalist framework for composing data processing pipelines. The pipelines are type-safe, impose little overhead, and can be composed either statically, or dynamically (for example, as a function of configuration). A running pipeline stops on and returns the first error encountered.

The package defines two generic types:

  • Data generator Gen[T]: a callback-based ("push") iterator that supplies a stream of data of any type T, and
  • Pipeline stage Stage[T,U]: a function that invokes input generator Gen[T], does whatever processing it is programmed to do, and feeds the supplied callback with data items of type U.

The package also provides a basic set of functions for composing pipeline stages and binding stages to generators, as well as support for pipelining and parallel execution.

Example
// input data
data := []string{" 123 ", " 321 ", " ", "-42"}

// pipeline (may also be composed statically, or as a function of configuration)
pipe := Chain4(
	// trim whitespace
	Map(strings.TrimSpace),
	// allow only non-empty strings
	Filter(func(s string) bool { return len(s) > 0 }),
	// convert to integer
	MapE(strconv.Atoi),
	// run all the above in a separate thread
	Pipe,
)

// run the pipeline
err := pipe(FromSlice(data), func(x int) (e error) {
	// just print the value
	_, e = fmt.Println(x)
	return
})

if err != nil {
	log.Fatal(err)
}
Output:

123
321
-42

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrStop = errors.New("pipeline cancelled")

ErrStop signals early exit from range over function loop. It is not stored in It.Err, but within a stage function in some (probably, rare) situations it may be treated as a special case.

Functions

func Pipe

func Pipe[T any](src Gen[T], yield func(T) error) error

Pipe is a stage function that runs its source in a separate goroutine.

Types

type Gen

type Gen[T any] func(func(T) error) error

Gen is a generic push iterator, or a generator. When invoked with a user-provided callback, it is expected to iterate its data source invoking the callback once per each data item. It is also expected to stop on the first error either from the callback, or stumbled over internally. It is up to the user to develop their own generators, because it's not possible to provide a generic code for all possible data sources. Also, there is one caveat: some generators can be run only once (for example, those sourcing data from a socket), so please structure your code accordingly.

func All added in v0.7.3

func All[T any](srcs ...Gen[T]) Gen[T]

All constructs a generator that invokes all the given generators one after another, in order.

func Bind

func Bind[T, U any](src Gen[T], stage Stage[T, U]) Gen[U]

Bind takes an existing generator of some type T and returns a new generator of some type U that does T -> U conversion via the given stage function.

func FromSeq added in v0.7.0

func FromSeq[T any](src iter.Seq[T]) Gen[T]

FromSeq constructs a generator from the given iterator.

func FromSlice added in v0.7.0

func FromSlice[S ~[]T, T any](src S) Gen[T]

FromSlice constructs a generator that reads data from the given slice, in order. In Go v1.23 it saves a few nanoseconds per iteration when compared to FromSeq(slices.Values(src)).

func (Gen[T]) Iter added in v0.7.2

func (src Gen[T]) Iter() It[T]

Iter constructs a new iterator from the given generator.

type It added in v0.7.1

type It[T any] struct {
	Err error // error returned from the pipeline
	// contains filtered or unexported fields
}

It is an iterator over the given generator. Its main purpose is to provide a function to range over using a for loop. Since the release of Go v1.23 everybody does range-over-function, so me too. Given some type T and a generator "src" of type Gen[T], we can then do:

it := src.Iter()

for item := range it.All {
	// process item
}

if it.Err != nil { ... }

A generator like "src" is typically constructed as some input generator bound to a processing stage using Bind function.

func (*It[T]) All added in v0.7.1

func (it *It[T]) All(yield func(T) bool)

All is the function to range over using a for loop.

type Stage

type Stage[T, U any] func(Gen[T], func(U) error) error

Stage is a generic type (a function) representing a pipeline stage. For any given types T and U, the function takes a generator of type Gen[T] and a callback of type func(U) error. When invoked, it is expected to run the generator, do whatever processing it is programmed to do, also calling the callback function once per each data element produced. Stage function is expected to stop at and return the first error (if any) from either the callback, or from the iteration itself. The signature of the function is designed to allow for full control over when and how the source generator is invoked. For example, suppose we want to have a pipeline stage where processing of each input item involves database queries, and we also want to establish a database connection before the iteration, and close it afterwards. This can be achieved using the following stage function (for some already defined types T and U):

func process(src pump.Gen[T], yield func(U) error) error {
	conn, err := connectToDatabase()

	if err != nil {
		return err
	}

	defer conn.Close()

	return src(func(item T) error {	// this actually invokes the source generator
		// produce a result of type U
		result, err := produceResult(item, conn)

		if err != nil {
			return err
		}

		// pass the result further down the pipeline
		return yield(result)
	})
}

func Chain10

func Chain10[A, B, C, D, E, F, G, H, I, J, K any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K]) Stage[A, K]

Chain10 composes 10 pipeline stages into one.

func Chain11

func Chain11[A, B, C, D, E, F, G, H, I, J, K, L any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L]) Stage[A, L]

Chain11 composes 11 pipeline stages into one.

func Chain12

func Chain12[A, B, C, D, E, F, G, H, I, J, K, L, M any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M]) Stage[A, M]

Chain12 composes 12 pipeline stages into one.

func Chain13

func Chain13[A, B, C, D, E, F, G, H, I, J, K, L, M, O any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O]) Stage[A, O]

Chain13 composes 13 pipeline stages into one.

func Chain14

func Chain14[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O], s14 Stage[O, P]) Stage[A, P]

Chain14 composes 14 pipeline stages into one.

func Chain15

func Chain15[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P, Q any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O], s14 Stage[O, P], s15 Stage[P, Q]) Stage[A, Q]

Chain15 composes 15 pipeline stages into one.

func Chain16

func Chain16[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P, Q, R any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O], s14 Stage[O, P], s15 Stage[P, Q], s16 Stage[Q, R]) Stage[A, R]

Chain16 composes 16 pipeline stages into one.

func Chain2

func Chain2[A, B, C any](s1 Stage[A, B], s2 Stage[B, C]) Stage[A, C]

Chain2 composes 2 pipeline stages into one.

func Chain3

func Chain3[A, B, C, D any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D]) Stage[A, D]

Chain3 composes 3 pipeline stages into one.

func Chain4

func Chain4[A, B, C, D, E any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E]) Stage[A, E]

Chain4 composes 4 pipeline stages into one.

func Chain5

func Chain5[A, B, C, D, E, F any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F]) Stage[A, F]

Chain5 composes 5 pipeline stages into one.

func Chain6

func Chain6[A, B, C, D, E, F, G any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G]) Stage[A, G]

Chain6 composes 6 pipeline stages into one.

func Chain7

func Chain7[A, B, C, D, E, F, G, H any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H]) Stage[A, H]

Chain7 composes 7 pipeline stages into one.

func Chain8

func Chain8[A, B, C, D, E, F, G, H, I any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I]) Stage[A, I]

Chain8 composes 8 pipeline stages into one.

func Chain9

func Chain9[A, B, C, D, E, F, G, H, I, J any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J]) Stage[A, J]

Chain9 composes 9 pipeline stages into one.

func Filter

func Filter[T any](pred func(T) bool) Stage[T, T]

Filter creates a stage function that filters input items according to the given predicate.

func Lazy added in v0.7.2

func Lazy[T, U any](create func(func(U) error) func(T) error) Stage[T, U]

Lazy creates a stage that is lazily initialised via the given constructor function. The constructor is called once upon every invocation of the stage before the iteration starts. It receives a "yield" continuation function as a parameter, and is expected to return a callback function for the source generator. This arrangement allows for some internal state to be used during the iteration, and the constructor can create a fresh instance of the state upon each invocation of the stage. Examples of such internal state are item counters, flags, caches, etc. Here is an example of a constructor of the function that deduplicates data items (of some type Item) by their IDs:

func dedup(yield func(*Item) error) func(*Item) error {
	// internal state: a set to detect duplicates
	seen := make(map[int]struct{})

	return func(item *Item) error {
		// check for duplicate
		if _, yes := seen[item.ID]; yes {
			log.Warn("skipped a duplicate of the item %d", item.ID)
			return nil
		}

		// mark as seen
		seen[item.ID] = struct{}{}

		// yield
		return yield(item)
	}
}

This function can later be added as a pipeline stage, for example:

pipe := Chain3(..., Lazy(dedup), ...)

func Lazy1 added in v0.7.2

func Lazy1[A, T, U any](arg A, create func(A, func(U) error) func(T) error) Stage[T, U]

Lazy1 does the same as Lazy, but with one additional parameter passed over to the constructor.

func Lazy2 added in v0.7.2

func Lazy2[A, B, T, U any](arg1 A, arg2 B, create func(A, B, func(U) error) func(T) error) Stage[T, U]

Lazy2 does the same as Lazy, but with two additional parameters passed over to the constructor.

func Map

func Map[T, U any](fn func(T) U) Stage[T, U]

Map creates a stage function that converts each data element via the given function.

func MapE

func MapE[T, U any](fn func(T) (U, error)) Stage[T, U]

MapE creates a stage function that converts each data element via the given function, stopping on the first error encountered, if any.

func Parallel

func Parallel[T, U any](n int, stage Stage[T, U]) Stage[T, U]

Parallel constructs a stage function that invokes the given stage from n goroutines in parallel. The value of n has the upper bound of 100 * runtime.NumCPU(). Zero or negative value of n sets the number of goroutines to the result of calling runtime.NumCPU. This stage does not preserve the order of data items.

func ParallelCtx

func ParallelCtx[T, U any](ctx context.Context, n int, stage Stage[T, U]) Stage[T, U]

ParallelCtx constructs a stage function that invokes the given stage from n goroutines in parallel, under control of the given context. The value of n has the upper bound of 100 * runtime.NumCPU(). Zero or negative value of n sets the number of goroutines to the result of calling runtime.NumCPU. This stage does not preserve the order of data items.

func PipeCtx

func PipeCtx[T any](ctx context.Context) Stage[T, T]

PipeCtx creates a stage function that runs its source in a separate goroutine. The lifetime of the pipe is managed via the given context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL