Documentation
¶
Overview ¶
Package pump provides a minimalist framework for composing data processing pipelines. The pipelines are type-safe, impose little overhead, and can be composed either statically, or dynamically (for example, as a function of configuration). A running pipeline stops on and returns the first error encountered.
The package defines two generic types:
- Data generator Gen[T]: a callback-based ("push") iterator that supplies a stream of data of any type T, and
- Pipeline stage Stage[T,U]: a function that invokes input generator Gen[T], does whatever processing it is programmed to do, and feeds the supplied callback with data items of type U.
The package also provides a basic set of functions for composing pipeline stages and binding stages to generators, as well as support for pipelining and parallel execution.
Example ¶
// input data data := []string{" 123 ", " 321 ", " ", "-42"} // pipeline (may also be composed statically, or as a function of configuration) pipe := Chain4( // trim whitespace Map(strings.TrimSpace), // allow only non-empty strings Filter(func(s string) bool { return len(s) > 0 }), // convert to integer MapE(strconv.Atoi), // run all the above in a separate thread Pipe, ) // run the pipeline err := pipe(FromSlice(data), func(x int) (e error) { // just print the value _, e = fmt.Println(x) return }) if err != nil { log.Fatal(err) }
Output: 123 321 -42
Index ¶
- Variables
- func Pipe[T any](src Gen[T], yield func(T) error) error
- type Gen
- type It
- type Stage
- func Chain10[A, B, C, D, E, F, G, H, I, J, K any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, K]
- func Chain11[A, B, C, D, E, F, G, H, I, J, K, L any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, L]
- func Chain12[A, B, C, D, E, F, G, H, I, J, K, L, M any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, M]
- func Chain13[A, B, C, D, E, F, G, H, I, J, K, L, M, O any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, O]
- func Chain14[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, P]
- func Chain15[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P, Q any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, Q]
- func Chain16[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P, Q, R any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, R]
- func Chain2[A, B, C any](s1 Stage[A, B], s2 Stage[B, C]) Stage[A, C]
- func Chain3[A, B, C, D any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D]) Stage[A, D]
- func Chain4[A, B, C, D, E any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E]) Stage[A, E]
- func Chain5[A, B, C, D, E, F any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F]) Stage[A, F]
- func Chain6[A, B, C, D, E, F, G any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, G]
- func Chain7[A, B, C, D, E, F, G, H any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, H]
- func Chain8[A, B, C, D, E, F, G, H, I any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, I]
- func Chain9[A, B, C, D, E, F, G, H, I, J any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], ...) Stage[A, J]
- func Filter[T any](pred func(T) bool) Stage[T, T]
- func Lazy[T, U any](create func(func(U) error) func(T) error) Stage[T, U]
- func Lazy1[A, T, U any](arg A, create func(A, func(U) error) func(T) error) Stage[T, U]
- func Lazy2[A, B, T, U any](arg1 A, arg2 B, create func(A, B, func(U) error) func(T) error) Stage[T, U]
- func Map[T, U any](fn func(T) U) Stage[T, U]
- func MapE[T, U any](fn func(T) (U, error)) Stage[T, U]
- func Parallel[T, U any](n int, stage Stage[T, U]) Stage[T, U]
- func ParallelCtx[T, U any](ctx context.Context, n int, stage Stage[T, U]) Stage[T, U]
- func PipeCtx[T any](ctx context.Context) Stage[T, T]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrStop = errors.New("pipeline cancelled")
ErrStop signals early exit from range over function loop. It is not stored in It.Err, but within a stage function in some (probably, rare) situations it may be treated as a special case.
Functions ¶
Types ¶
type Gen ¶
Gen is a generic push iterator, or a generator. When invoked with a user-provided callback, it is expected to iterate its data source invoking the callback once per each data item. It is also expected to stop on the first error either from the callback, or stumbled over internally. It is up to the user to develop their own generators, because it's not possible to provide a generic code for all possible data sources. Also, there is one caveat: some generators can be run only once (for example, those sourcing data from a socket), so please structure your code accordingly.
func All ¶ added in v0.7.3
All constructs a generator that invokes all the given generators one after another, in order.
func Bind ¶
Bind takes an existing generator of some type T and returns a new generator of some type U that does T -> U conversion via the given stage function.
type It ¶ added in v0.7.1
type It[T any] struct { Err error // error returned from the pipeline // contains filtered or unexported fields }
It is an iterator over the given generator. Its main purpose is to provide a function to range over using a for loop. Since the release of Go v1.23 everybody does range-over-function, so me too. Given some type T and a generator "src" of type Gen[T], we can then do:
it := src.Iter() for item := range it.All { // process item } if it.Err != nil { ... }
A generator like "src" is typically constructed as some input generator bound to a processing stage using Bind function.
type Stage ¶
Stage is a generic type (a function) representing a pipeline stage. For any given types T and U, the function takes a generator of type Gen[T] and a callback of type func(U) error. When invoked, it is expected to run the generator, do whatever processing it is programmed to do, also calling the callback function once per each data element produced. Stage function is expected to stop at and return the first error (if any) from either the callback, or from the iteration itself. The signature of the function is designed to allow for full control over when and how the source generator is invoked. For example, suppose we want to have a pipeline stage where processing of each input item involves database queries, and we also want to establish a database connection before the iteration, and close it afterwards. This can be achieved using the following stage function (for some already defined types T and U):
func process(src pump.Gen[T], yield func(U) error) error { conn, err := connectToDatabase() if err != nil { return err } defer conn.Close() return src(func(item T) error { // this actually invokes the source generator // produce a result of type U result, err := produceResult(item, conn) if err != nil { return err } // pass the result further down the pipeline return yield(result) }) }
func Chain10 ¶
func Chain10[A, B, C, D, E, F, G, H, I, J, K any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K]) Stage[A, K]
Chain10 composes 10 pipeline stages into one.
func Chain11 ¶
func Chain11[A, B, C, D, E, F, G, H, I, J, K, L any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L]) Stage[A, L]
Chain11 composes 11 pipeline stages into one.
func Chain12 ¶
func Chain12[A, B, C, D, E, F, G, H, I, J, K, L, M any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M]) Stage[A, M]
Chain12 composes 12 pipeline stages into one.
func Chain13 ¶
func Chain13[A, B, C, D, E, F, G, H, I, J, K, L, M, O any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O]) Stage[A, O]
Chain13 composes 13 pipeline stages into one.
func Chain14 ¶
func Chain14[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O], s14 Stage[O, P]) Stage[A, P]
Chain14 composes 14 pipeline stages into one.
func Chain15 ¶
func Chain15[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P, Q any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O], s14 Stage[O, P], s15 Stage[P, Q]) Stage[A, Q]
Chain15 composes 15 pipeline stages into one.
func Chain16 ¶
func Chain16[A, B, C, D, E, F, G, H, I, J, K, L, M, O, P, Q, R any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J], s10 Stage[J, K], s11 Stage[K, L], s12 Stage[L, M], s13 Stage[M, O], s14 Stage[O, P], s15 Stage[P, Q], s16 Stage[Q, R]) Stage[A, R]
Chain16 composes 16 pipeline stages into one.
func Chain4 ¶
func Chain4[A, B, C, D, E any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E]) Stage[A, E]
Chain4 composes 4 pipeline stages into one.
func Chain5 ¶
func Chain5[A, B, C, D, E, F any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F]) Stage[A, F]
Chain5 composes 5 pipeline stages into one.
func Chain6 ¶
func Chain6[A, B, C, D, E, F, G any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G]) Stage[A, G]
Chain6 composes 6 pipeline stages into one.
func Chain7 ¶
func Chain7[A, B, C, D, E, F, G, H any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H]) Stage[A, H]
Chain7 composes 7 pipeline stages into one.
func Chain8 ¶
func Chain8[A, B, C, D, E, F, G, H, I any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I]) Stage[A, I]
Chain8 composes 8 pipeline stages into one.
func Chain9 ¶
func Chain9[A, B, C, D, E, F, G, H, I, J any](s1 Stage[A, B], s2 Stage[B, C], s3 Stage[C, D], s4 Stage[D, E], s5 Stage[E, F], s6 Stage[F, G], s7 Stage[G, H], s8 Stage[H, I], s9 Stage[I, J]) Stage[A, J]
Chain9 composes 9 pipeline stages into one.
func Filter ¶
Filter creates a stage function that filters input items according to the given predicate.
func Lazy ¶ added in v0.7.2
Lazy creates a stage that is lazily initialised via the given constructor function. The constructor is called once upon every invocation of the stage before the iteration starts. It receives a "yield" continuation function as a parameter, and is expected to return a callback function for the source generator. This arrangement allows for some internal state to be used during the iteration, and the constructor can create a fresh instance of the state upon each invocation of the stage. Examples of such internal state are item counters, flags, caches, etc. Here is an example of a constructor of the function that deduplicates data items (of some type Item) by their IDs:
func dedup(yield func(*Item) error) func(*Item) error { // internal state: a set to detect duplicates seen := make(map[int]struct{}) return func(item *Item) error { // check for duplicate if _, yes := seen[item.ID]; yes { log.Warn("skipped a duplicate of the item %d", item.ID) return nil } // mark as seen seen[item.ID] = struct{}{} // yield return yield(item) } }
This function can later be added as a pipeline stage, for example:
pipe := Chain3(..., Lazy(dedup), ...)
func Lazy1 ¶ added in v0.7.2
Lazy1 does the same as Lazy, but with one additional parameter passed over to the constructor.
func Lazy2 ¶ added in v0.7.2
func Lazy2[A, B, T, U any](arg1 A, arg2 B, create func(A, B, func(U) error) func(T) error) Stage[T, U]
Lazy2 does the same as Lazy, but with two additional parameters passed over to the constructor.
func MapE ¶
MapE creates a stage function that converts each data element via the given function, stopping on the first error encountered, if any.
func Parallel ¶
Parallel constructs a stage function that invokes the given stage from n goroutines in parallel. The value of n has the upper bound of 100 * runtime.NumCPU(). Zero or negative value of n sets the number of goroutines to the result of calling runtime.NumCPU. This stage does not preserve the order of data items.
func ParallelCtx ¶
ParallelCtx constructs a stage function that invokes the given stage from n goroutines in parallel, under control of the given context. The value of n has the upper bound of 100 * runtime.NumCPU(). Zero or negative value of n sets the number of goroutines to the result of calling runtime.NumCPU. This stage does not preserve the order of data items.