Documentation
¶
Index ¶
- func SetStreamChannelCacheSize(size int)
- type Count
- type Item
- type Stream
- func NewStreamBy[T any](source <-chan Item[T]) Stream[T]
- func NewStreamFrom[T any](generator func() Stream[T]) Stream[T]
- func NewStreamFromFile[T any](deal func(line string, err error) (Item[T], bool), filename string) (Stream[T], error)
- func NewStreamFromMap[T, R any](mapFunc func(item Item[T]) Item[R], streams ...Stream[T]) Stream[R]
- func NewStreamOf[T any](items ...Item[T]) Stream[T]
- func NewStreamWith[T any](streams ...Stream[T]) Stream[T]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetStreamChannelCacheSize ¶
func SetStreamChannelCacheSize(size int)
SetStreamChannelCacheSize 设置数据流通道的缓存大小
Types ¶
type Item ¶
type Item[T any] interface { // WithValue 设置元素值 WithValue(value T) Item[T] // WithGroup 设置元素组 WithGroup(group string) Item[T] // WithIndex 设置元素索引 WithIndex(index uint64) Item[T] // GetValue 获取元素值 GetValue() T // GetGroup 获取元素组 GetGroup() string // GetIndex 获取元素索引 GetIndex() uint64 }
Item 定义数据流元素的基础接口
type Stream ¶
type Stream[T any] interface { // Map 处理(异步) Map(mapFunc func(item Item[T]) T) Stream[T] // Group 分组(异步) Group(groupFunc func(item Item[T]) string) Stream[T] // Distinct 去重(异步) Distinct(distinctFunc func(item Item[T]) any) Stream[T] // Filter 过滤(异步) Filter(filterFunc func(item Item[T]) bool) Stream[T] // Concat 聚合(异步) Concat(streams ...Stream[T]) Stream[T] // Count 统计(异步) Count(interval time.Duration, countChan chan<- *Count) Stream[T] // Head 收前 num 个元素(异步) Head(num int) Stream[T] // Tail 收后 num 个元素(异步) Tail(num int) Stream[T] // SampleByStepSize 根据步长采样(异步) SampleByStepSize(size int64) Stream[T] // SampleByTime 根据时间采样(异步) SampleByTime(interval time.Duration) Stream[T] // Consume 一次性消费(异步) Consume(consumeFn func(item Item[T])) // Drain 丢弃数据(同步) Drain(doBeforeFns ...func(item Item[T])) // Receive 读取元素(同步) Receive() (Item[T], bool) // Channel 获取数据流管道 Channel() <-chan Item[T] }
Stream 定义数据流的接口
func NewStreamBy ¶
NewStreamBy 通过指定通道创建一个新的数据流
func NewStreamFrom ¶
NewStreamFrom 通过生成式函数创建一个新的数据流
func NewStreamFromFile ¶
func NewStreamFromFile[T any](deal func(line string, err error) (Item[T], bool), filename string) (Stream[T], error)
NewStreamFromFile 将指定文件处理后,生成一个新的数据流
func NewStreamFromMap ¶
NewStreamFromMap 将指定数据流处理后,输出到一个新的数据流
func NewStreamWith ¶
NewStreamWith 通过聚合多个流创建一个新的数据流
Click to show internal directories.
Click to hide internal directories.