Documentation
¶
Overview ¶
Package functional provides highly performant functional primitives for Go. It supports streaming and parallel execution of stages of a processing pipeline.
Index ¶
- Variables
- func Reduce[T, A any](s *Stage[T], initial A, r ReduceFunc[T, A], opts ...StageOption) A
- func SliceFromIterator[T any](a []T, t T) ([]T, error)
- type ErrorContext
- type ErrorHandler
- type FilterFunc
- type Iterator
- type MapFunc
- type ReduceFunc
- type Size
- type Stage
- func Filter[T any](s *Stage[T], f FilterFunc[T], opts ...StageOption) *Stage[T]
- func Map[T, M any](s *Stage[T], m MapFunc[T, M], opts ...StageOption) *Stage[M]
- func NewChannelStage[T any](ch chan T, opts ...StageOption) *Stage[T]
- func NewScannerStage(s scanner.Scanner, opts ...StageOption) *Stage[string]
- func NewSliceStage[T any](s []T, opts ...StageOption) *Stage[T]
- func NewStage[T any](i Iterator[T], opts ...StageOption) *Stage[T]
- type StageOption
- func InheritOptions(inherit bool) StageOption
- func Parallelism(max uint) StageOption
- func PreserveOrder(preserve bool) StageOption
- func ProcessingType(t StageType) StageOption
- func SizeHint(hint uint) StageOption
- func WithContext(ctx context.Context) StageOption
- func WithErrorHandler(handler ErrorHandler) StageOption
- func WithTraceFunc(f TraceFunc) StageOption
- func WithTracing(enable bool) StageOption
- type StageType
- type TraceFunc
Constants ¶
This section is empty.
Variables ¶
var DefaultSizeHint uint = 100
DefaultSizeHint is used by batch processing functions for initial allocations when the underlying iterator cannot provide size infomation and a stage specific size hint has not been provided.
var DefaultTracer = func(format string, v ...any) { fmt.Fprintf(os.Stderr, "<TRACE> "+format+"\n", v...) }
DefaultTracer is the global default trace function. It prints messages to stderr. DefaultTracer can be replaced by another tracing function to effect all stages.
Functions ¶
func Reduce ¶
func Reduce[T, A any](s *Stage[T], initial A, r ReduceFunc[T, A], opts ...StageOption) A
Reduce is the non-OO version of stage.Reduce(). It must be used in the case where the accumulator of the reduce function is of a different type to the input elements (due to limitations of go generics).
func SliceFromIterator ¶
Convenience reduction function that returns a slice of elements from the iterator of the pipeline stage.
Types ¶
type ErrorContext ¶
type ErrorContext int
ErrorContext provides error handler callbacks with a hint about where in processing the error occured
const ( // ErrorContextIterator hints that the error occured reading an interator ErrorContextItertator ErrorContext = iota // ErrorContextFilterFunction means the error occured in a filter func ErrorContextFilterFunction // ErrorContextMapFunction means the error occured in a map func ErrorContextMapFunction // ErrorContextReduceFunction means the error occued in a reduce func ErrorContextReduceFunction // We don't know which phase of processing the error occured when // the hint it ErrorContextOther ErrorContextOther )
type ErrorHandler ¶
type ErrorHandler func(where ErrorContext, err error) bool
Functions complying with the ErrorHandler prototype can be used to process errors that occur during the pipeline processing functions. The default handler ignores the error. A custom handler can be provided using the WithErrorHandler option.
Parameters:
- where describes the context in which the error occured
- err is the error to be handled
The function should return true if processing should continue regardless, or false to stop processing.
type FilterFunc ¶
FilterFunc is a generic function type that takes a single element and returns true if it is to be included or false if the element is to be excluded from the result set.
If an error is returned, it is passed to the stage's error handler function which may elect to continue or abort processing.
Example:
func findEvenInts(i int) (bool, error) { return i%2 == 0, nil }
type Iterator ¶
type Iterator[T any] interface { // Next traverses the iterator to the next element // Returns true if the iterator advanced, or false if there are no more // elements or if an error occured (see Error() below) Next(ctx context.Context) bool // Get returns current value referred to by the iterator Get() T // Error returns a non-nil value if an error occured processing Next() Error() error }
Iterator is a generic interface for one-directional traversal through a collection or stream of items.
type MapFunc ¶
MapFunc is a generic function that takes a single element and returns a single transformed element.
Example:
func ipAddress(host string) (net.IP, error) { return net.LookupIp(host) }
type ReduceFunc ¶
ReduceFunc is a generic function that takes an element value of type T and an accululator value of type A and returns a new accumulator value.
Example:
func add(a, i int) (int, error) { return a + i, nil }
type Size ¶
Size is an interface that can be implemented by an iterator that knows the number of elements in the collection when it is initialized
type Stage ¶
type Stage[T any] struct { // contains filtered or unexported fields }
Stage represents one processing phase of a larger pipeline The processing methods of a stage read input elements using the underlying Iterator and return a new Stage ready to read elements from the previous stage using a new iterator.
func Filter ¶
func Filter[T any](s *Stage[T], f FilterFunc[T], opts ...StageOption) *Stage[T]
Filter is the non-OO version of Stage.Filter().
func Map ¶
func Map[T, M any](s *Stage[T], m MapFunc[T, M], opts ...StageOption) *Stage[M]
Map is the non-OO version of Stage.Map(). It must be used in the case where the map function returns items of a different type than the input elements, due to limitations of Golang's generic syntax.
func NewChannelStage ¶
func NewChannelStage[T any](ch chan T, opts ...StageOption) *Stage[T]
NewChannelStage instantiates a pipeline stage using a channel iterator backed by the provided channel.
func NewScannerStage ¶
func NewScannerStage(s scanner.Scanner, opts ...StageOption) *Stage[string]
NewScannerState instantiates a pipeline stage using a scanner iterator, backed by the provided scanner.
func NewSliceStage ¶
func NewSliceStage[T any](s []T, opts ...StageOption) *Stage[T]
NewSliceStage instantiates a pipeline stage using a slice iterator backed by the provided slice.
func NewStage ¶
func NewStage[T any](i Iterator[T], opts ...StageOption) *Stage[T]
NewStage instantiates a pipeline stage from an Iterator and optional set of processing optionns
func (*Stage[T]) Filter ¶
func (s *Stage[T]) Filter(f FilterFunc[T], opts ...StageOption) *Stage[T]
Filter processes this stage's input elements by calling f for each element and returns a new stage that will process all the elements where f(e) is true.
If this stage is configured to process in batch, Filter returns after all the input elements have been processed; those elements are passed to the next stage as a slice.
If this stage is configured to stream, Filter returns immediately after launching a go-routine to process the elements in the background. The next stage reads from a channel that the processing goroutine writes its results to as they are processed.
func (*Stage[T]) Iterator ¶
Iterator returns the underlying iterator for a stage. It is most useful as a mechanism for retrieving the result from the last stage of a pipeline by the caller of the pipeline.
func (*Stage[T]) Map ¶
func (s *Stage[T]) Map(m MapFunc[T, T], opts ...StageOption) *Stage[T]
Map processes the stage's input elements by calling m for each element, returning a new stage containing the same number of elements, mapped to new values of the same type.
If the map function returns values of a different type to the input values, the non-OO version of Map() must be used instead.
If the stage is configured to process in batch, Map returns after all the input elements have been processed; those elements are passed to the next stage as a slice.
If the stage is configued to stream, Map returns immediately after launching go-routines to process the elements in the background. The returned stage reads from a channel that the processing goroutine writes its result to as they are processed.
func (*Stage[T]) Reduce ¶
func (s *Stage[T]) Reduce(initial T, r ReduceFunc[T, T], opts ...StageOption) T
Reduce processes the stage's input elements to a single element of the same type, by calling r for every element and passing an accumulator value that each invocation of r can update by returning a value.
If the Reduce function returns a value of a different type to the input values, the non-OO version of Reduce() must be used instead.
Reduce always runs sequentially in a batch mode.
type StageOption ¶
type StageOption func(g *stageOptions)
StageOptions provide a mechanism to customize how the processing functions of a stage opterate.
func InheritOptions ¶
func InheritOptions(inherit bool) StageOption
InheritOptions causes this stage's options to be inherited by the next stage. The next stage can override these inherited options. Further inheritence can be disabled by passing this option with a false value.
The default is no inheritence.
func Parallelism ¶
func Parallelism(max uint) StageOption
The Parallem option defines the maximum concurrency of the stage.
If not specified, the default is to process elements serially.
func PreserveOrder ¶
func PreserveOrder(preserve bool) StageOption
PreserveOrder causes concurent batch stages to retain the order of processed elements. This is always the case with serial stages and is not possible for concurrent streaming stages. Maintaining the order of elements for concurrent batch stages incurs a performance penalty.
The default is to not maintain order.
func ProcessingType ¶
func ProcessingType(t StageType) StageOption
The ProcessingType option configures whether the stage operates in batch or streaming mode. If not specified, stages default to processing in batch mode.
func SizeHint ¶
func SizeHint(hint uint) StageOption
The SizeHint option provides the stage processor functions with a guideline regarding the number of elements there are to process. This is primarily used with iterators that cannot provide the information themselves.
If not specified and the iterator cannot provide the information, the default value DefaultSizeHint is used.
func WithContext ¶
func WithContext(ctx context.Context) StageOption
WithContext attaches the provided context to the stage.
func WithErrorHandler ¶
func WithErrorHandler(handler ErrorHandler) StageOption
WithErrorHandler installs a custom error handler which will be called from the processing functions when the filter/map/reduce function or an iterator emits an error.
The handler should return true to continue processing or false to abort.
The handler can stash the error for use in the pipeline's caller.
func WithTraceFunc ¶
func WithTraceFunc(f TraceFunc) StageOption
WithTraceFunc sets the trace function for the stage. Use WithTracing to enable/disable tracing.
func WithTracing ¶
func WithTracing(enable bool) StageOption
WithTracing enables tracing for the stage. If a custom trace function has not been set using WithTraceFunc, trace messages are printed to stderr.
type StageType ¶
type StageType int
StageType describes the behaviour of a pipeline stage
const ( // Batch stages collect the results of processing all of the // input items before passing control to the next stage BatchStage StageType = iota // Streaming stages pass the results of processed input items to the // next pipeline stage as a stream while processing other elements continues. StreamingStage )
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
iter
|
|
channel
Package channel implements an interator that reads a data stream from the supplied channel.
|
Package channel implements an interator that reads a data stream from the supplied channel. |
scanner
Package scanner implements a stream tokenizer iterator.
|
Package scanner implements a stream tokenizer iterator. |
slice
Package slice implements an iterator that traverses uni-directionally over a generic slice of elements
|
Package slice implements an iterator that traverses uni-directionally over a generic slice of elements |