steps

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: MIT Imports: 14 Imported by: 0

README

go-steps

go-steps is an experimental data transformation library with one by one processing.

This means that the processing steps are executed sequentially for each input items, and the processing steps are ignored once any step defines this, so the transformation jumps to the processing of the next item.

iter := Transform[int]([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}).
	With(Steps(
		Take[int](7), // 1, 2, 3, 4, 5, 6, 7
		Filter(func(i int) (bool, error) {
			return i%2 == 0, nil
		}), // 2, 4, 6
		Map(func(i int) (string, error) {
			return strconv.Itoa(i * i), nil
		}), // "4", "16", "36"
	)).
	AsRange()

for i := range iter {
	// process
}

As you can see a transformation chain consist of 3 main parts:

  • input: passed into Transform
  • steps: this defines the transformation steps for each input items (With(Steps(...)))
  • output: the result of the transformation (AsRange)

Input can be a slice or a channel, enabling processing of streaming data.

inputCh := make(chan int, 3)
iter := Transform[int](inputCh).
	With(Steps(
		Take[int](7), // 1, 2, 3, 4, 5, 6, 7
		Filter(func(i int) (bool, error) {
			return i%2 == 0, nil
		}), // 2, 4, 6
		Map(func(i int) (string, error) {
			return strconv.Itoa(i * i), nil
		}), // "4", "16", "36"
	)).
	AsRange()

go func() {
	for _, i := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		inputCh <- i
	}
	close(inputCh)
}()

for i := range iter {
	// process
}

Besides the input values you can also set some options for the transformer.

buf := bytes.NewBufferString("")

res := Transform[int]([]int{1, 2, 3}, WithLogWriter(buf)).
	WithSteps(
		Log("before filter"),
		Filter(func(i int) (bool, error) {
			return i%2 == 1, nil
		}),
		Log("after filter"),
	).AsSlice()

fmt.Println(res) // [1 3]
fmt.Prtinln(buf.String())
/*
before filter 	arg0: 1 
after filter 	arg0: 1 
before filter 	arg0: 2 
before filter 	arg0: 3 
after filter 	arg0: 3 
*/
	

Steps are using types to help the developer always see the types of input and output values.

Some type information could be lost during the processing, and because of this a validation process is running for the first time, to help fail fast the processing.

Since the processing steps could be detached from the transformer, this validation could be ran separate, before starting the processing.

steps := Steps(
	Filter(func(i int) (bool, error) {
		return i%2 == 1, nil
	}),
	Map(func(in string) (int, error) {
		return strconv.Atoi(in)
	}),
)

if err := steps.Validate(); err != nil {
	panic(err) //step validation failed [Map:2]: incompatible input argument type [int!=string:1]  
}

_ := Transform[int]([]int{1, 2, 3}).
	With(steps).
	AsSlice()

Errors of the steps are handled inside the processing, but they are not instantly returned, to not make the output more verbose. By default errors are logged, but the error handler is configurable.

When error occurs the processing stops, and the developer can decide how to propagate the error.

var transformErr error
errHandler := func(err error) {
	transformErr = err
}

iter := Transform[int]([]int{10, 0, -10}, WithErrorHandler(errHandler)).
	WithSteps(
		Map(func(i int) (int, error) {
			if i == 0 {
				return 0, fmt.Errorf("division by zero: input=%d", i)
			}
			return i / 10, nil
		}),
	).AsRange()

res := []int{}
for i := range iter {
	res = append(res, i.(int))
}
fmt.Println(res) // [1]
	
if transformErr != nil {
	fmt.Println(transformErr.Error()) //division by zero: input=0
}

Steps can also have an extra final step used to aggregate the results.

There are no more processing steps allowed beyond this point.

type (
	person struct {
		name   string
		age    int
		isMale bool
	}
	ageRange uint8
)

const (
	young ageRange = iota
	mature
	old
)

persons := []person{
	{"John", 30, true},
	{"Jane", 25, false},
	{"Bob", 75, true},
	{"Alice", 28, false},
	{"Charlie", 17, true},
	{"Frank", 81, true},
	{"Bill", 45, true},
}

res := Transform[person](persons).
	With(Steps(
		Filter(func(p person) (bool, error) {
			return p.isMale, nil
		}),
	).Aggregate(
		GroupBy(func(p person) (ageRange, string, error) {
			switch {
			case p.age <= 18:
				return young, p.name, nil
			case p.age <= 65:
				return mature, p.name, nil
			default:
				return old, p.name, nil
			}
		}),
	),
	).
	AsMultiMap()

fmt.Println(res) //map[0:[Charlie] 1:[John Bill] 2:[Bob Frank]]

Output is the result of the transformation. It can return an iterator (AsRange) where the resulting items are passed one by one to the range keyword, but they could also be collected and returned as a single result (AsSlice or AsMap)

inputCh := make(chan int, 3)
go func() {
	for _, i := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		inputCh <- i
	}
	close(inputCh)
}()

res := Transform[int](inputCh).
	With(Steps(
		Take[int](7), // 1, 2, 3, 4, 5, 6, 7
		Filter(func(i int) (bool, error) {
			return i%2 == 0, nil
		}), // 2, 4, 6
		Map(func(i int) (string, error) {
			return strconv.Itoa(i * i), nil
		}), // "4", "16", "36"
	)).
	AsMap()

fmt.Println(res) //map[1:4 3:16 5:36]

Custom inputs and steps could be also defined:

func multiplyBy[IN0 ~int](multiplier IN0) StepWrapper {
	return StepWrapper{
		Name: "multiplyBy",
		StepFn: func(in StepInput) StepOutput {
			return StepOutput{
				Args:    Args{in.Args[0].(IN0) * multiplier},
				ArgsLen: 1,
				Error:   nil,
				Skip:    false,
			}
		},
		Validate: func(prevStepOut ArgTypes) (ArgTypes, error) {
			return ArgTypes{reflect.TypeFor[IN0]()}, nil
		},
	}
}

func primeInput(opts TransformerOptions) chan int {
	inputCh := make(chan int, 3)
	go func() {
		for _, i := range []int{2, 3, 5, 7, 11, 13, 17, 19, 23, 29} {
			inputCh <- i
		}
		close(inputCh)
	}()
	return inputCh
}


func main() {
    res := TransformFn[int](primeInput).
		WithSteps(multiplyBy(3)).
		AsSlice()
	fmt.Println(res) //[6 9 15 21 33 39 51 57 69 87]
}

It is also possible to split up a transformation chain into branches, but they must be merged back before returning the output.

res := Transform[int]([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}).
	WithSteps(
		Split(func(in int) (uint8, error) {
			return uint8(in % 2), nil
		}),
		WithBranches[int](
			Steps(
				Map(func(in int) (string, error) {
					return strconv.Itoa(-in), nil
				}),
			),
			Steps(
    			Map(func(in int) (string, error) {
					return strconv.Itoa(in * 10), nil
				}),
			),
		),
		Merge(),
	).
	AsSlice()

fmt.Printf("%#v", res) //[]interface {}{"10", "-2", "30", "-4", "50", "-6", "70", "-8", "90", "-10"}

The branch steps are also validated, and could be explicitly validated as well.

go-steps supports CSV (using https://github.com/jszwec/csvutil) and JSON input and output.

type salary struct {
	ID         string `csv:"ID" json:"id"`
	Name       string `csv:"Name" json:"name"`
	Age        int    `csv:"Age" json:"age"`
	Department string `csv:"Department" json:"department"`
	Salary     int    `csv:"Salary" json:"salary"`
	City       string `csv:"City" json:"city"`
}

res := TransformFn[salary](FromStreamingCsv[salary](File("testdata/salaries.csv"), false)).
	WithSteps(
		Filter(func(s salary) (bool, error) {
			return s.City == "New York" && s.Department == "Engineering", nil
		}),
		Take[salary](2),
	).
	AsJson()

fmt.Println(res) //[{"id":1,"name":"John Doe","age":25,"department":"Engineering","salary":60000,"city":"New York"},{"lary":78000,"city":"New York"}]

See the samples for more details

For documentation open the godoc with m=all option:

godoc -http=:6060
# http://localhost:6060/pkg/github.com/domahidizoltan/go-steps?m=all

Benchmarks

Since the library is using a lot more steps and reflection, it's obvious that it is slower than the native Go version.

As more complex the transformation chain gets, the difference gets bigger.

Here are some benchmarks for reference:

  • SimpleStep: using a simple step on a slice
  • MultipleSteps: using multiple three steps and an aggregation on a slice
  • CsvToJsonSteps: reading a CSV file and converting it to JSON while doing some minimal transformation on the data
goos: linux
goarch: amd64
cpu: AMD Ryzen 7 7840HS with Radeon 780M Graphics
                  │ tmp/native.txt │               tmp/lo.txt               │           tmp/transformer.txt           │
                  │     sec/op     │    sec/op      vs base                 │    sec/op      vs base                  │
SimpleStep-16          6.544n ± 2%   44.820n ± 19%  +584.90% (p=0.000 n=10)   534.750n ± 3%  +8071.61% (p=0.000 n=10)
MultipleSteps-16       21.79n ± 0%    88.43n ±  1%  +305.83% (p=0.000 n=10)   1392.00n ± 1%  +6288.25% (p=0.000 n=10)
CsvToJsonSteps-16      522.6n ± 3%   2891.0n ± 20%  +453.20% (p=0.000 n=10)   13404.5n ± 1%  +2464.96% (p=0.000 n=10)
geomean                42.08n         225.4n        +435.73%                    2.153µ       +5015.92%

                  │ tmp/native.txt │              tmp/lo.txt               │          tmp/transformer.txt           │
                  │      B/op      │    B/op      vs base                  │     B/op      vs base                  │
SimpleStep-16          0.00 ± 0%      80.00 ± 0%          ? (p=0.000 n=10)    104.00 ± 0%          ? (p=0.000 n=10)
MultipleSteps-16        0.0 ± 0%      128.0 ± 0%          ? (p=0.000 n=10)     464.0 ± 0%          ? (p=0.000 n=10)
CsvToJsonSteps-16     440.0 ± 0%     8639.0 ± 0%  +1863.41% (p=0.000 n=10)   16856.0 ± 0%  +3730.91% (p=0.000 n=10)
geomean                          ¹    445.6       ?                            933.5       ?
¹ summaries must be >0 to compute geomean

                  │ tmp/native.txt │             tmp/lo.txt             │          tmp/transformer.txt           │
                  │   allocs/op    │ allocs/op   vs base                │  allocs/op    vs base                  │
SimpleStep-16         0.000 ± 0%     1.000 ± 0%        ? (p=0.000 n=10)     3.000 ± 0%          ? (p=0.000 n=10)
MultipleSteps-16      0.000 ± 0%     2.000 ± 0%        ? (p=0.000 n=10)    25.000 ± 0%          ? (p=0.000 n=10)
CsvToJsonSteps-16     3.000 ± 0%     4.000 ± 0%  +33.33% (p=0.000 n=10)   211.000 ± 0%  +6933.33% (p=0.000 n=10)
geomean                          ¹   2.000       ?                          25.11       ?
¹ summaries must be >0 to compute geomean

See the benchmarks for more details

Documentation

Overview

Package steps is a collection of functions that can be used to create and validate transformation steps, transformator options, aggregators, input and output adapters and more.

Example (StepsTransformer_AsCsv)
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}

res := Transform[person]([]person{
	{ID: 1, Name: "John Doe"},
	{ID: 2, Name: "Jane Doe"},
}).
	WithSteps().
	AsCsv()

fmt.Println(res)
Output:

id,name
1,John Doe
2,Jane Doe
Example (StepsTransformer_AsIndexedRange)
fmt.Println("see AsKeyValueRange")
Output:

see AsKeyValueRange
Example (StepsTransformer_AsJson)
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}

res := Transform[person]([]person{
	{ID: 1, Name: "John Doe"},
	{ID: 2, Name: "Jane Doe"},
}).
	WithSteps().
	AsJson()

fmt.Println(res)
Output:

[{"ID":1,"Name":"John Doe"},{"ID":2,"Name":"Jane Doe"}]
Example (StepsTransformer_AsKeyValueRange)
res := ""
for k, v := range Transform[string]([]string{"h", "e", "l", "l", "o"}).
	WithSteps().
	AsKeyValueRange() {
	res += fmt.Sprintf("%d:%s ", k, v)
}

fmt.Println(res)
Output:

0:h 1:e 2:l 3:l 4:o
Example (StepsTransformer_AsMap)
res := Transform[string]([]string{"h", "e", "l", "l", "o"}).
	WithSteps().
	AsMap()

fmt.Println(res)
Output:

map[0:h 1:e 2:l 3:l 4:o]
Example (StepsTransformer_AsMultiMap)
fmt.Println("see GroupBy")
Output:

see GroupBy
Example (StepsTransformer_AsRange)
res := []any{}
for i := range Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps().
	AsRange() {
	res = append(res, i)
}

fmt.Println(res)
Output:

[1 2 3 4 5]
Example (StepsTransformer_AsSlice)
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps().
	AsSlice()

fmt.Println(res)
Output:

[1 2 3 4 5]
Example (StepsTransformer_ToStreamingCsv)
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}
var buf bytes.Buffer

Transform[person]([]person{
	{ID: 1, Name: "John Doe"},
	{ID: 2, Name: "Jane Doe"},
}).
	WithSteps().
	ToStreamingCsv(&buf)

fmt.Println(buf.String())
Output:

id,name
1,John Doe
2,Jane Doe
Example (StepsTransformer_ToStreamingJson)
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}
var buf bytes.Buffer

Transform[person]([]person{
	{ID: 1, Name: "John Doe"},
	{ID: 2, Name: "Jane Doe"},
}).
	WithSteps().
	ToStreamingJson(&buf)

fmt.Println(buf.String())
Output:

{"ID":1,"Name":"John Doe"}
{"ID":2,"Name":"Jane Doe"}

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrStepValidationFailed  = errors.New("step validation failed")           // step validation returned an error
	ErrIncompatibleInArgType = errors.New("incompatible input argument type") // the outputs of the previous step don't match the inputs of the current step
	ErrInvalidAggregator     = errors.New("invalid aggregator")               // aggregator has no reducer or name defined
	ErrInvalidStep           = errors.New("invalid step")                     // step has no step or name defined
)

Functions

func File

func File(filePath string) io.Reader

File is a helper function to define the file input for CSV or JSON inputs

func FromCsv

func FromCsv[T any](reader io.Reader) func(TransformerOptions) []T

FromCsv translates a CSV into a slice input

Example
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}
reader := strings.NewReader(`
		id,name
		1,John Doe
		2,Jane Doe`)

res := TransformFn[person](FromCsv[person](reader)).
	WithSteps(
		Map(func(in person) (string, error) {
			return in.Name, nil
		}),
	).
	AsSlice()

fmt.Println(res)
Output:

[John Doe Jane Doe]

func FromJson

func FromJson[T any](reader io.Reader) func(TransformerOptions) []T

FromJson translates a JSON into a slice input

Example
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}
reader := strings.NewReader(`[
		{"id":1,"name":"John Doe"},
		{"id":2,"name":"Jane Doe"}
		]`)

res := TransformFn[person](FromJson[person](reader)).
	WithSteps(
		Map(func(in person) (string, error) {
			return in.Name, nil
		}),
	).
	AsSlice()

fmt.Println(res)
Output:

[John Doe Jane Doe]

func FromStreamingCsv

func FromStreamingCsv[T any](reader io.Reader, withoutHeaders bool) func(TransformerOptions) chan T

FromStreamingCsv translates a CSV into a channel input

Example
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}
reader := strings.NewReader(`1,John Doe
2,Jane Doe`)

res := TransformFn[person](FromStreamingCsv[person](reader, true)).
	WithSteps(
		Map(func(in person) (string, error) {
			return in.Name, nil
		}),
	).
	AsSlice()

fmt.Println(res)
Output:

[John Doe Jane Doe]

func FromStreamingJson

func FromStreamingJson[T any](reader io.Reader) func(TransformerOptions) chan T

FromStreamingJson translates a JSON into a channel input

Example
type person struct {
	ID   int    `csv:"id"`
	Name string `csv:"name"`
}
reader := strings.NewReader(`
		{"id":1,"name":"John Doe"}
		{"id":2,"name":"Jane Doe"}`)

res := TransformFn[person](FromStreamingJson[person](reader)).
	WithSteps(
		Map(func(in person) (string, error) {
			return in.Name, nil
		}),
	).
	AsSlice()

fmt.Println(res)
Output:

[John Doe Jane Doe]

func Transform

func Transform[T any, IT inputType[T]](in IT, options ...func(*TransformerOptions)) input[T, IT]

Transform creates a builder for the transforation chain. It takes the input data (slice or chan) and the transformer options.

func TransformFn

func TransformFn[T any, IT inputType[T]](in func(TransformerOptions) IT, options ...func(*TransformerOptions)) input[T, IT]

TransformFn is an alternative for Transform where the input is a function. This could be used to implement new input sources like files or database connections.

func WithChanSize

func WithChanSize(size uint) func(*TransformerOptions)

WithChanSize sets the channel size used by streaming inputs

func WithContext

func WithContext(ctx context.Context) func(*TransformerOptions)

WithContext sets the context of the transformer (used to cancel operations)

func WithErrorHandler

func WithErrorHandler(handler func(error)) func(*TransformerOptions)

WithErrorHandler sets the error handler or the transformer

func WithLogWriter

func WithLogWriter(writer io.Writer) func(*TransformerOptions)

WithLogWriter sets the log writer

func WithName

func WithName(name string) func(*TransformerOptions)

WithName adds a name to the transformer

func WithPanicHandler

func WithPanicHandler(handler func(error)) func(*TransformerOptions)

WithPanicHandler sets the panic handler used by the transformer input

Types

type ArgTypes

type ArgTypes [maxArgs]reflect.Type

ArgTypes are holding the reflect.Type of arguments. They are mostly used for the validation process.

type Args

type Args [maxArgs]any

Args are the array of input or output arguments. Most of the time only the first argument is used but others could be used for multi-input or output steps.

type ReducerFn

type ReducerFn StepFn

ReducerFn is an alias for StepFn used as a last step transformation (aggregator)

type ReducerWrapper

type ReducerWrapper struct {
	Name      string                                            // name of the step
	ReducerFn ReducerFn                                         // the aggregation step function
	Validate  func(prevStepArgTypes ArgTypes) (ArgTypes, error) // validation of the aggregation step in the chain
	Reset     func()                                            // reset the aggregation state before processing
}

ReducerWrapper is a container for an aggregation step

func Avg

func Avg() ReducerWrapper

Avg returns the average of all float64 inputs

Example
res := Transform[float64]([]float64{-1.1, -2.1, -3.1, -4.1, -5.1}).
	With(Aggregate(
		Avg(),
	)).
	AsSlice()

fmt.Println(res)
Output:

[-3.1]

func Fold

func Fold[IN0 any](initValue IN0, reduceFn func(in1, in2 IN0) (IN0, error)) ReducerWrapper

Fold reduces a series of inputs into a single value using a custom initial value.

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	With(Aggregate(
		Fold(10, func(in1, in2 int) (int, error) {
			return in1 + in2, nil
		}),
	)).
	AsSlice()

fmt.Println(res)
Output:

[25]

func GroupBy

func GroupBy[IN0 any, OUT0 comparable, OUT1 any](fn func(in IN0) (OUT0, OUT1, error)) ReducerWrapper

GroupBy is an aggregator grouping inputs by comparable values. The grouped values are in a slice.

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	With(Aggregate(
		GroupBy(func(in int) (uint8, int, error) {
			return uint8(in % 2), in, nil
		}),
	)).
	AsMultiMap()

fmt.Println(res)
Output:

map[0:[2 4] 1:[1 3 5]]

func Max

func Max[IN0 number]() ReducerWrapper

Max returns the largest number input

Example
res := Transform[float64]([]float64{-1.1, -3.3, -2.2, -5.5, -4.4}).
	With(Aggregate(
		Max[float64](),
	)).
	AsSlice()

fmt.Println(res)
Output:

[-1.1]

func Min

func Min[IN0 number]() ReducerWrapper

Min returns the smallest number input

Example
res := Transform[float64]([]float64{-1.1, -3.3, -2.2, -5.5, -4.4}).
	With(Aggregate(
		Min[float64](),
	)).
	AsSlice()

fmt.Println(res)
Output:

[-5.5]

func Reduce

func Reduce[IN0 any](fn func(in1, in2 IN0) (IN0, error)) ReducerWrapper

Reduce reduces a series of inputs into a single value using a zero value as initial value.

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	With(Aggregate(
		Reduce(func(in1, in2 int) (int, error) {
			return in1 + in2, nil
		}),
	)).
	AsSlice()

fmt.Println(res)
Output:

[15]

func Sum

func Sum[IN0 number]() ReducerWrapper

Sum returns the total of all number inputs

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	With(Aggregate(
		Sum[int](),
	)).
	AsSlice()

fmt.Println(res)
Output:

[15]

type SkipFirstArgValidation

type SkipFirstArgValidation struct{}

SkipFirstArgValidation is used to tell the validator this step is the first in the chain, so it can't be validated against the previous step.

type StepFn

type StepFn func(StepInput) StepOutput

StepFn defines a function used as a single step in a transformation chain

type StepInput

type StepInput struct {
	Args               Args               // input arguments
	ArgsLen            uint8              // length of input arguments (Args can hold zero values, so we need to know it's length)
	TransformerOptions TransformerOptions // transformer options to pass around in the chain
}

StepInput holds the input arguments for a single step

type StepOutput

type StepOutput struct {
	Error   error // error result of the step
	Args    Args  // output arguments
	ArgsLen uint8 // length of output arguments (Args can hold zero values, so we need to know it's length)
	Skip    bool  // used to notify the processor that further transformation steps should be skipped
}

StepOutput holds the output arguments for a single step

func (*StepOutput) Reset added in v0.1.1

func (so *StepOutput) Reset()

type StepWrapper

type StepWrapper struct {
	Name     string                                            // name of the step
	StepFn   StepFn                                            // the transformation step function
	Validate func(prevStepArgTypes ArgTypes) (ArgTypes, error) // validation of the current step in the chain
	Reset    func()                                            // reset the step state before processing
}

StepWrapper is a container for a single transformation step

func Do

func Do[IN0 any](fn func(in IN0) error) StepWrapper

Do runs a function on each input item

Example
total := 0
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps(
		Do(func(in int) error {
			total += in
			return nil
		}),
	).AsSlice()

fmt.Printf("res: %v\ntotal: %d", res, total)
Output:

res: [1 2 3 4 5]
total: 15

func Filter

func Filter[IN0 any](fn func(in IN0) (bool, error)) StepWrapper

Filter skips inputs that do not pass the filter

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps(
		Filter(func(in int) (bool, error) {
			return in%2 == 1, nil
		}),
	).AsSlice()

fmt.Println(res)
Output:

[1 3 5]

func Log

func Log(prefix ...string) StepWrapper

Log logs debug informations between steps

Example
buf := bytes.NewBufferString("")

res := Transform[int]([]int{1, 2, 3}, WithLogWriter(buf)).
	WithSteps(
		Log("before filter"),
		Filter(func(i int) (bool, error) {
			return i%2 == 1, nil
		}),
		Log("after filter"),
	).AsSlice()

logs := strings.ReplaceAll(buf.String(), " \n", "\n")
fmt.Printf("res: %v\nlogs: %s", res, logs)
Output:

res: [1 3]
logs: before filter 	arg0: 1
after filter 	arg0: 1
before filter 	arg0: 2
before filter 	arg0: 3
after filter 	arg0: 3

func Map

func Map[IN0, OUT0 any](fn func(in IN0) (OUT0, error)) StepWrapper

Map transforms a single input into a single output

Example
res := Transform[int]([]int{104, 101, 108, 108, 111}).
	WithSteps(
		Map(func(in int) (string, error) {
			return string(rune(in)), nil
		}),
	).AsSlice()

fmt.Println(res)
Output:

[h e l l o]

func Merge

func Merge() StepWrapper

Merge merges back the transformation branches

Example
fmt.Println("see WithBranches")
Output:

see WithBranches

func Skip

func Skip[IN0 any](count uint64) StepWrapper

Skip is skipping the first N inputs

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps(
		Skip[int](3),
	).AsSlice()

fmt.Println(res)
Output:

[4 5]

func SkipWhile

func SkipWhile[IN0 any](fn func(in IN0) (bool, error)) StepWrapper

SkipWhile skips processing inputs while the filter returns true

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps(
		SkipWhile(func(in int) (bool, error) {
			return in <= 3, nil
		}),
	).AsSlice()

fmt.Println(res)
Output:

[4 5]

func Split

func Split[IN0 any, OUT0 ~uint8](fn func(in IN0) (OUT0, error)) StepWrapper

Split defines how to split inputs into transformation branches The function returns the number of the branch where the input will be sent

Example
fmt.Println("see WithBranches")
Output:

see WithBranches

func Take

func Take[IN0 any](count uint64) StepWrapper

Take is processing the first N inputs

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps(
		Take[int](3),
	).AsSlice()

fmt.Println(res)
Output:

[1 2 3]

func TakeWhile

func TakeWhile[IN0 any](fn func(in IN0) (bool, error)) StepWrapper

TakeWhile processes inputs while the filter returns true

Example
res := Transform[int]([]int{1, 2, 3, 4, 5}).
	WithSteps(
		TakeWhile(func(in int) (bool, error) {
			return in <= 3, nil
		}),
	).AsSlice()

fmt.Println(res)
Output:

[1 2 3]

func WithBranches

func WithBranches[IN0 any](stepsBranches ...StepsBranch) StepWrapper

WithBranches applies a set of steps to each branch. This is not parallel processing. The items keeps the order even if one branch could possibly process faster.

Example
res := Transform[int]([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}).
	WithSteps(
		Split(func(in int) (uint8, error) {
			return uint8(in % 2), nil
		}),
		WithBranches[int](
			Steps(
				Map(func(in int) (string, error) {
					return strconv.Itoa(-in), nil
				}),
			),
			Steps(
				Map(func(in int) (string, error) {
					return strconv.Itoa(in * 10), nil
				}),
			),
		),
		Merge(),
	).
	AsSlice()

fmt.Printf("%#v", res)
Output:

[]interface {}{"10", "-2", "30", "-4", "50", "-6", "70", "-8", "90", "-10"}

type StepsBranch

type StepsBranch struct {
	Error             error           // error result of the sub-path
	StepWrappers      []StepWrapper   // the steps in the sub-path
	Steps             []StepFn        // the already validated steps
	AggregatorWrapper *ReducerWrapper // the aggregation step in the sub-path
	Aggregator        ReducerFn       // the already validated aggregator function
}

StepsBranch represents a sub-path of a branching transformation chain

func Aggregate

func Aggregate(fn ReducerWrapper) StepsBranch

Aggregate adds a reducer to the transformer

func Steps

func Steps(s ...StepWrapper) StepsBranch

Steps creates the steps of a transformation chain.

func (StepsBranch) Aggregate

func (s StepsBranch) Aggregate(fn ReducerWrapper) StepsBranch

Aggregate ads a reducer to the transformer with steps previously defined

func (*StepsBranch) Validate

func (s *StepsBranch) Validate() error

Validate is runs the validation for the steps in the transformation chain. One of the main purpose of the validator is to reduce the possible runtime errors caused by reflection usage. The validator tries to check that the output of the steps are matching the inputs of the next steps. It could be triggered explicitly to validate the chain before running it, but it will also run automatically (if not ran before) when the chain is processing it's first item.

type TransformerOptions

type TransformerOptions struct {
	Name         string
	LogWriter    io.Writer
	ErrorHandler func(error)
	PanicHandler func(error)
	Ctx          context.Context
	ChanSize     uint
}

TransformerOptions holds the options for the transformer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL