Documentation
¶
Overview ¶
Package steps is a collection of functions that can be used to create and validate transformation steps, transformator options, aggregators, input and output adapters and more.
Example (StepsTransformer_AsCsv) ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } res := Transform[person]([]person{ {ID: 1, Name: "John Doe"}, {ID: 2, Name: "Jane Doe"}, }). WithSteps(). AsCsv() fmt.Println(res)
Output: id,name 1,John Doe 2,Jane Doe
Example (StepsTransformer_AsIndexedRange) ¶
fmt.Println("see AsKeyValueRange")
Output: see AsKeyValueRange
Example (StepsTransformer_AsJson) ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } res := Transform[person]([]person{ {ID: 1, Name: "John Doe"}, {ID: 2, Name: "Jane Doe"}, }). WithSteps(). AsJson() fmt.Println(res)
Output: [{"ID":1,"Name":"John Doe"},{"ID":2,"Name":"Jane Doe"}]
Example (StepsTransformer_AsKeyValueRange) ¶
res := "" for k, v := range Transform[string]([]string{"h", "e", "l", "l", "o"}). WithSteps(). AsKeyValueRange() { res += fmt.Sprintf("%d:%s ", k, v) } fmt.Println(res)
Output: 0:h 1:e 2:l 3:l 4:o
Example (StepsTransformer_AsMap) ¶
res := Transform[string]([]string{"h", "e", "l", "l", "o"}). WithSteps(). AsMap() fmt.Println(res)
Output: map[0:h 1:e 2:l 3:l 4:o]
Example (StepsTransformer_AsMultiMap) ¶
fmt.Println("see GroupBy")
Output: see GroupBy
Example (StepsTransformer_AsRange) ¶
res := []any{} for i := range Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps(). AsRange() { res = append(res, i) } fmt.Println(res)
Output: [1 2 3 4 5]
Example (StepsTransformer_AsSlice) ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps(). AsSlice() fmt.Println(res)
Output: [1 2 3 4 5]
Example (StepsTransformer_ToStreamingCsv) ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } var buf bytes.Buffer Transform[person]([]person{ {ID: 1, Name: "John Doe"}, {ID: 2, Name: "Jane Doe"}, }). WithSteps(). ToStreamingCsv(&buf) fmt.Println(buf.String())
Output: id,name 1,John Doe 2,Jane Doe
Example (StepsTransformer_ToStreamingJson) ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } var buf bytes.Buffer Transform[person]([]person{ {ID: 1, Name: "John Doe"}, {ID: 2, Name: "Jane Doe"}, }). WithSteps(). ToStreamingJson(&buf) fmt.Println(buf.String())
Output: {"ID":1,"Name":"John Doe"} {"ID":2,"Name":"Jane Doe"}
Index ¶
- Variables
- func File(filePath string) io.Reader
- func FromCsv[T any](reader io.Reader) func(TransformerOptions) []T
- func FromJson[T any](reader io.Reader) func(TransformerOptions) []T
- func FromStreamingCsv[T any](reader io.Reader, withoutHeaders bool) func(TransformerOptions) chan T
- func FromStreamingJson[T any](reader io.Reader) func(TransformerOptions) chan T
- func Transform[T any, IT inputType[T]](in IT, options ...func(*TransformerOptions)) input[T, IT]
- func TransformFn[T any, IT inputType[T]](in func(TransformerOptions) IT, options ...func(*TransformerOptions)) input[T, IT]
- func WithChanSize(size uint) func(*TransformerOptions)
- func WithContext(ctx context.Context) func(*TransformerOptions)
- func WithErrorHandler(handler func(error)) func(*TransformerOptions)
- func WithLogWriter(writer io.Writer) func(*TransformerOptions)
- func WithName(name string) func(*TransformerOptions)
- func WithPanicHandler(handler func(error)) func(*TransformerOptions)
- type ArgTypes
- type Args
- type ReducerFn
- type ReducerWrapper
- func Avg() ReducerWrapper
- func Fold[IN0 any](initValue IN0, reduceFn func(in1, in2 IN0) (IN0, error)) ReducerWrapper
- func GroupBy[IN0 any, OUT0 comparable, OUT1 any](fn func(in IN0) (OUT0, OUT1, error)) ReducerWrapper
- func Max[IN0 number]() ReducerWrapper
- func Min[IN0 number]() ReducerWrapper
- func Reduce[IN0 any](fn func(in1, in2 IN0) (IN0, error)) ReducerWrapper
- func Sum[IN0 number]() ReducerWrapper
- type SkipFirstArgValidation
- type StepFn
- type StepInput
- type StepOutput
- type StepWrapper
- func Do[IN0 any](fn func(in IN0) error) StepWrapper
- func Filter[IN0 any](fn func(in IN0) (bool, error)) StepWrapper
- func Log(prefix ...string) StepWrapper
- func Map[IN0, OUT0 any](fn func(in IN0) (OUT0, error)) StepWrapper
- func Merge() StepWrapper
- func Skip[IN0 any](count uint64) StepWrapper
- func SkipWhile[IN0 any](fn func(in IN0) (bool, error)) StepWrapper
- func Split[IN0 any, OUT0 ~uint8](fn func(in IN0) (OUT0, error)) StepWrapper
- func Take[IN0 any](count uint64) StepWrapper
- func TakeWhile[IN0 any](fn func(in IN0) (bool, error)) StepWrapper
- func WithBranches[IN0 any](stepsBranches ...StepsBranch) StepWrapper
- type StepsBranch
- type TransformerOptions
Examples ¶
- Package (StepsTransformer_AsCsv)
- Package (StepsTransformer_AsIndexedRange)
- Package (StepsTransformer_AsJson)
- Package (StepsTransformer_AsKeyValueRange)
- Package (StepsTransformer_AsMap)
- Package (StepsTransformer_AsMultiMap)
- Package (StepsTransformer_AsRange)
- Package (StepsTransformer_AsSlice)
- Package (StepsTransformer_ToStreamingCsv)
- Package (StepsTransformer_ToStreamingJson)
- Avg
- Do
- Filter
- Fold
- FromCsv
- FromJson
- FromStreamingCsv
- FromStreamingJson
- GroupBy
- Log
- Map
- Max
- Merge
- Min
- Reduce
- Skip
- SkipWhile
- Split
- Sum
- Take
- TakeWhile
- WithBranches
Constants ¶
This section is empty.
Variables ¶
var ( ErrStepValidationFailed = errors.New("step validation failed") // step validation returned an error ErrIncompatibleInArgType = errors.New("incompatible input argument type") // the outputs of the previous step don't match the inputs of the current step ErrInvalidAggregator = errors.New("invalid aggregator") // aggregator has no reducer or name defined ErrInvalidStep = errors.New("invalid step") // step has no step or name defined )
Functions ¶
func FromCsv ¶
func FromCsv[T any](reader io.Reader) func(TransformerOptions) []T
FromCsv translates a CSV into a slice input
Example ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } reader := strings.NewReader(` id,name 1,John Doe 2,Jane Doe`) res := TransformFn[person](FromCsv[person](reader)). WithSteps( Map(func(in person) (string, error) { return in.Name, nil }), ). AsSlice() fmt.Println(res)
Output: [John Doe Jane Doe]
func FromJson ¶
func FromJson[T any](reader io.Reader) func(TransformerOptions) []T
FromJson translates a JSON into a slice input
Example ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } reader := strings.NewReader(`[ {"id":1,"name":"John Doe"}, {"id":2,"name":"Jane Doe"} ]`) res := TransformFn[person](FromJson[person](reader)). WithSteps( Map(func(in person) (string, error) { return in.Name, nil }), ). AsSlice() fmt.Println(res)
Output: [John Doe Jane Doe]
func FromStreamingCsv ¶
func FromStreamingCsv[T any](reader io.Reader, withoutHeaders bool) func(TransformerOptions) chan T
FromStreamingCsv translates a CSV into a channel input
Example ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } reader := strings.NewReader(`1,John Doe 2,Jane Doe`) res := TransformFn[person](FromStreamingCsv[person](reader, true)). WithSteps( Map(func(in person) (string, error) { return in.Name, nil }), ). AsSlice() fmt.Println(res)
Output: [John Doe Jane Doe]
func FromStreamingJson ¶
func FromStreamingJson[T any](reader io.Reader) func(TransformerOptions) chan T
FromStreamingJson translates a JSON into a channel input
Example ¶
type person struct { ID int `csv:"id"` Name string `csv:"name"` } reader := strings.NewReader(` {"id":1,"name":"John Doe"} {"id":2,"name":"Jane Doe"}`) res := TransformFn[person](FromStreamingJson[person](reader)). WithSteps( Map(func(in person) (string, error) { return in.Name, nil }), ). AsSlice() fmt.Println(res)
Output: [John Doe Jane Doe]
func Transform ¶
func Transform[T any, IT inputType[T]](in IT, options ...func(*TransformerOptions)) input[T, IT]
Transform creates a builder for the transforation chain. It takes the input data (slice or chan) and the transformer options.
func TransformFn ¶
func TransformFn[T any, IT inputType[T]](in func(TransformerOptions) IT, options ...func(*TransformerOptions)) input[T, IT]
TransformFn is an alternative for Transform where the input is a function. This could be used to implement new input sources like files or database connections.
func WithChanSize ¶
func WithChanSize(size uint) func(*TransformerOptions)
WithChanSize sets the channel size used by streaming inputs
func WithContext ¶
func WithContext(ctx context.Context) func(*TransformerOptions)
WithContext sets the context of the transformer (used to cancel operations)
func WithErrorHandler ¶
func WithErrorHandler(handler func(error)) func(*TransformerOptions)
WithErrorHandler sets the error handler or the transformer
func WithLogWriter ¶
func WithLogWriter(writer io.Writer) func(*TransformerOptions)
WithLogWriter sets the log writer
func WithName ¶
func WithName(name string) func(*TransformerOptions)
WithName adds a name to the transformer
func WithPanicHandler ¶
func WithPanicHandler(handler func(error)) func(*TransformerOptions)
WithPanicHandler sets the panic handler used by the transformer input
Types ¶
type ArgTypes ¶
ArgTypes are holding the reflect.Type of arguments. They are mostly used for the validation process.
type Args ¶
type Args [maxArgs]any
Args are the array of input or output arguments. Most of the time only the first argument is used but others could be used for multi-input or output steps.
type ReducerFn ¶
type ReducerFn StepFn
ReducerFn is an alias for StepFn used as a last step transformation (aggregator)
type ReducerWrapper ¶
type ReducerWrapper struct { Name string // name of the step ReducerFn ReducerFn // the aggregation step function Validate func(prevStepArgTypes ArgTypes) (ArgTypes, error) // validation of the aggregation step in the chain Reset func() // reset the aggregation state before processing }
ReducerWrapper is a container for an aggregation step
func Avg ¶
func Avg() ReducerWrapper
Avg returns the average of all float64 inputs
Example ¶
res := Transform[float64]([]float64{-1.1, -2.1, -3.1, -4.1, -5.1}). With(Aggregate( Avg(), )). AsSlice() fmt.Println(res)
Output: [-3.1]
func Fold ¶
func Fold[IN0 any](initValue IN0, reduceFn func(in1, in2 IN0) (IN0, error)) ReducerWrapper
Fold reduces a series of inputs into a single value using a custom initial value.
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). With(Aggregate( Fold(10, func(in1, in2 int) (int, error) { return in1 + in2, nil }), )). AsSlice() fmt.Println(res)
Output: [25]
func GroupBy ¶
func GroupBy[IN0 any, OUT0 comparable, OUT1 any](fn func(in IN0) (OUT0, OUT1, error)) ReducerWrapper
GroupBy is an aggregator grouping inputs by comparable values. The grouped values are in a slice.
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). With(Aggregate( GroupBy(func(in int) (uint8, int, error) { return uint8(in % 2), in, nil }), )). AsMultiMap() fmt.Println(res)
Output: map[0:[2 4] 1:[1 3 5]]
func Max ¶
func Max[IN0 number]() ReducerWrapper
Max returns the largest number input
Example ¶
res := Transform[float64]([]float64{-1.1, -3.3, -2.2, -5.5, -4.4}). With(Aggregate( Max[float64](), )). AsSlice() fmt.Println(res)
Output: [-1.1]
func Min ¶
func Min[IN0 number]() ReducerWrapper
Min returns the smallest number input
Example ¶
res := Transform[float64]([]float64{-1.1, -3.3, -2.2, -5.5, -4.4}). With(Aggregate( Min[float64](), )). AsSlice() fmt.Println(res)
Output: [-5.5]
func Reduce ¶
func Reduce[IN0 any](fn func(in1, in2 IN0) (IN0, error)) ReducerWrapper
Reduce reduces a series of inputs into a single value using a zero value as initial value.
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). With(Aggregate( Reduce(func(in1, in2 int) (int, error) { return in1 + in2, nil }), )). AsSlice() fmt.Println(res)
Output: [15]
func Sum ¶
func Sum[IN0 number]() ReducerWrapper
Sum returns the total of all number inputs
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). With(Aggregate( Sum[int](), )). AsSlice() fmt.Println(res)
Output: [15]
type SkipFirstArgValidation ¶
type SkipFirstArgValidation struct{}
SkipFirstArgValidation is used to tell the validator this step is the first in the chain, so it can't be validated against the previous step.
type StepFn ¶
type StepFn func(StepInput) StepOutput
StepFn defines a function used as a single step in a transformation chain
type StepInput ¶
type StepInput struct { Args Args // input arguments ArgsLen uint8 // length of input arguments (Args can hold zero values, so we need to know it's length) TransformerOptions TransformerOptions // transformer options to pass around in the chain }
StepInput holds the input arguments for a single step
type StepOutput ¶
type StepOutput struct { Error error // error result of the step Args Args // output arguments ArgsLen uint8 // length of output arguments (Args can hold zero values, so we need to know it's length) Skip bool // used to notify the processor that further transformation steps should be skipped }
StepOutput holds the output arguments for a single step
func (*StepOutput) Reset ¶ added in v0.1.1
func (so *StepOutput) Reset()
type StepWrapper ¶
type StepWrapper struct { Name string // name of the step StepFn StepFn // the transformation step function Validate func(prevStepArgTypes ArgTypes) (ArgTypes, error) // validation of the current step in the chain Reset func() // reset the step state before processing }
StepWrapper is a container for a single transformation step
func Do ¶
func Do[IN0 any](fn func(in IN0) error) StepWrapper
Do runs a function on each input item
Example ¶
total := 0 res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps( Do(func(in int) error { total += in return nil }), ).AsSlice() fmt.Printf("res: %v\ntotal: %d", res, total)
Output: res: [1 2 3 4 5] total: 15
func Filter ¶
func Filter[IN0 any](fn func(in IN0) (bool, error)) StepWrapper
Filter skips inputs that do not pass the filter
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps( Filter(func(in int) (bool, error) { return in%2 == 1, nil }), ).AsSlice() fmt.Println(res)
Output: [1 3 5]
func Log ¶
func Log(prefix ...string) StepWrapper
Log logs debug informations between steps
Example ¶
buf := bytes.NewBufferString("") res := Transform[int]([]int{1, 2, 3}, WithLogWriter(buf)). WithSteps( Log("before filter"), Filter(func(i int) (bool, error) { return i%2 == 1, nil }), Log("after filter"), ).AsSlice() logs := strings.ReplaceAll(buf.String(), " \n", "\n") fmt.Printf("res: %v\nlogs: %s", res, logs)
Output: res: [1 3] logs: before filter arg0: 1 after filter arg0: 1 before filter arg0: 2 before filter arg0: 3 after filter arg0: 3
func Map ¶
func Map[IN0, OUT0 any](fn func(in IN0) (OUT0, error)) StepWrapper
Map transforms a single input into a single output
Example ¶
res := Transform[int]([]int{104, 101, 108, 108, 111}). WithSteps( Map(func(in int) (string, error) { return string(rune(in)), nil }), ).AsSlice() fmt.Println(res)
Output: [h e l l o]
func Merge ¶
func Merge() StepWrapper
Merge merges back the transformation branches
Example ¶
fmt.Println("see WithBranches")
Output: see WithBranches
func Skip ¶
func Skip[IN0 any](count uint64) StepWrapper
Skip is skipping the first N inputs
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps( Skip[int](3), ).AsSlice() fmt.Println(res)
Output: [4 5]
func SkipWhile ¶
func SkipWhile[IN0 any](fn func(in IN0) (bool, error)) StepWrapper
SkipWhile skips processing inputs while the filter returns true
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps( SkipWhile(func(in int) (bool, error) { return in <= 3, nil }), ).AsSlice() fmt.Println(res)
Output: [4 5]
func Split ¶
func Split[IN0 any, OUT0 ~uint8](fn func(in IN0) (OUT0, error)) StepWrapper
Split defines how to split inputs into transformation branches The function returns the number of the branch where the input will be sent
Example ¶
fmt.Println("see WithBranches")
Output: see WithBranches
func Take ¶
func Take[IN0 any](count uint64) StepWrapper
Take is processing the first N inputs
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps( Take[int](3), ).AsSlice() fmt.Println(res)
Output: [1 2 3]
func TakeWhile ¶
func TakeWhile[IN0 any](fn func(in IN0) (bool, error)) StepWrapper
TakeWhile processes inputs while the filter returns true
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5}). WithSteps( TakeWhile(func(in int) (bool, error) { return in <= 3, nil }), ).AsSlice() fmt.Println(res)
Output: [1 2 3]
func WithBranches ¶
func WithBranches[IN0 any](stepsBranches ...StepsBranch) StepWrapper
WithBranches applies a set of steps to each branch. This is not parallel processing. The items keeps the order even if one branch could possibly process faster.
Example ¶
res := Transform[int]([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). WithSteps( Split(func(in int) (uint8, error) { return uint8(in % 2), nil }), WithBranches[int]( Steps( Map(func(in int) (string, error) { return strconv.Itoa(-in), nil }), ), Steps( Map(func(in int) (string, error) { return strconv.Itoa(in * 10), nil }), ), ), Merge(), ). AsSlice() fmt.Printf("%#v", res)
Output: []interface {}{"10", "-2", "30", "-4", "50", "-6", "70", "-8", "90", "-10"}
type StepsBranch ¶
type StepsBranch struct { Error error // error result of the sub-path StepWrappers []StepWrapper // the steps in the sub-path Steps []StepFn // the already validated steps AggregatorWrapper *ReducerWrapper // the aggregation step in the sub-path Aggregator ReducerFn // the already validated aggregator function }
StepsBranch represents a sub-path of a branching transformation chain
func Aggregate ¶
func Aggregate(fn ReducerWrapper) StepsBranch
Aggregate adds a reducer to the transformer
func Steps ¶
func Steps(s ...StepWrapper) StepsBranch
Steps creates the steps of a transformation chain.
func (StepsBranch) Aggregate ¶
func (s StepsBranch) Aggregate(fn ReducerWrapper) StepsBranch
Aggregate ads a reducer to the transformer with steps previously defined
func (*StepsBranch) Validate ¶
func (s *StepsBranch) Validate() error
Validate is runs the validation for the steps in the transformation chain. One of the main purpose of the validator is to reduce the possible runtime errors caused by reflection usage. The validator tries to check that the output of the steps are matching the inputs of the next steps. It could be triggered explicitly to validate the chain before running it, but it will also run automatically (if not ran before) when the chain is processing it's first item.