stream

package
v0.0.0-...-e5e3ad5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetStreamChannelCacheSize

func SetStreamChannelCacheSize(size int)

SetStreamChannelCacheSize 设置数据流通道的缓存大小

Types

type Count

type Count struct {
	Count int64     `json:"count" xml:"count"`
	Start time.Time `json:"start" xml:"start"`
	End   time.Time `json:"end" xml:"end"`
}

type Item

type Item[T any] interface {
	// WithValue 设置元素值
	WithValue(value T) Item[T]
	// WithGroup 设置元素组
	WithGroup(group string) Item[T]
	// WithIndex 设置元素索引
	WithIndex(index uint64) Item[T]

	// GetValue 获取元素值
	GetValue() T
	// GetGroup 获取元素组
	GetGroup() string
	// GetIndex 获取元素索引
	GetIndex() uint64
}

Item 定义数据流元素的基础接口

func NewItem

func NewItem[T any](value T) Item[T]

NewItem 新建一个基础数据流元素

type Stream

type Stream[T any] interface {
	// Map 处理(异步)
	Map(mapFunc func(item Item[T]) T) Stream[T]
	// Group 分组(异步)
	Group(groupFunc func(item Item[T]) string) Stream[T]
	// Distinct 去重(异步)
	Distinct(distinctFunc func(item Item[T]) any) Stream[T]
	// Filter 过滤(异步)
	Filter(filterFunc func(item Item[T]) bool) Stream[T]
	// Concat 聚合(异步)
	Concat(streams ...Stream[T]) Stream[T]
	// Count 统计(异步)
	Count(interval time.Duration, countChan chan<- *Count) Stream[T]
	// Head 收前 num 个元素(异步)
	Head(num int) Stream[T]
	// Tail 收后 num 个元素(异步)
	Tail(num int) Stream[T]
	// SampleByStepSize 根据步长采样(异步)
	SampleByStepSize(size int64) Stream[T]
	// SampleByTime 根据时间采样(异步)
	SampleByTime(interval time.Duration) Stream[T]

	// Consume 一次性消费(异步)
	Consume(consumeFn func(item Item[T]))
	// Drain 丢弃数据(同步)
	Drain(doBeforeFns ...func(item Item[T]))
	// Receive 读取元素(同步)
	Receive() (Item[T], bool)
	// Channel 获取数据流管道
	Channel() <-chan Item[T]
}

Stream 定义数据流的接口

func NewStreamBy

func NewStreamBy[T any](source <-chan Item[T]) Stream[T]

NewStreamBy 通过指定通道创建一个新的数据流

func NewStreamFrom

func NewStreamFrom[T any](generator func() Stream[T]) Stream[T]

NewStreamFrom 通过生成式函数创建一个新的数据流

func NewStreamFromFile

func NewStreamFromFile[T any](deal func(line string, err error) (Item[T], bool), filename string) (Stream[T], error)

NewStreamFromFile 将指定文件处理后,生成一个新的数据流

func NewStreamFromMap

func NewStreamFromMap[T, R any](mapFunc func(item Item[T]) Item[R], streams ...Stream[T]) Stream[R]

NewStreamFromMap 将指定数据流处理后,输出到一个新的数据流

func NewStreamOf

func NewStreamOf[T any](items ...Item[T]) Stream[T]

NewStreamOf 通过指定数据创建一个新的数据流

func NewStreamWith

func NewStreamWith[T any](streams ...Stream[T]) Stream[T]

NewStreamWith 通过聚合多个流创建一个新的数据流

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL