Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BlockingQueue ¶
type BlockingQueue[T any] interface { // Enqueue 将元素放入队列。如果在 ctx 超时之前,队列有空闲位置,那么元素会被放入队列; // 否则返回 error。 // 在超时或者调用者主动 cancel 的情况下,所有的实现都必须返回 ctx。 // 调用者可以通过检查 error 是否为 context.DeadlineExceeded // 或者 context.Canceled 来判断入队失败的原因 // 注意,调用者必须使用 errors.Is 来判断,而不能直接使用 == Enqueue(ctx context.Context, t T) error // Dequeue 从队首获得一个元素 // 如果在 ctx 超时之前,队列中有元素,那么会返回队首的元素,否则返回 error。 // 在超时或者调用者主动 cancel 的情况下,所有的实现都必须返回 ctx。 // 调用者可以通过检查 error 是否为 context.DeadlineExceeded // 或者 context.Canceled 来判断入队失败的原因 // 注意,调用者必须使用 errors.Is 来判断,而不能直接使用 == Dequeue(ctx context.Context) (T, error) }
BlockingQueue 阻塞队列 参考 Queue 普通队列 一个阻塞队列是否遵循 FIFO 取决于具体实现
type ConcurrentArrayBlockingQueue ¶
type ConcurrentArrayBlockingQueue[T any] struct { // contains filtered or unexported fields }
ConcurrentArrayBlockingQueue 有界并发阻塞队列
func NewConcurrentArrayBlockingQueue ¶
func NewConcurrentArrayBlockingQueue[T any](capacity int) *ConcurrentArrayBlockingQueue[T]
NewConcurrentArrayBlockingQueue 创建一个有界阻塞队列 容量会在最开始的时候就初始化好 capacity 必须为正数
Example ¶
q := NewConcurrentArrayBlockingQueue[int](10) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() _ = q.Enqueue(ctx, 22) val, err := q.Dequeue(ctx) // 这是例子,实际中你不需要写得那么复杂 switch err { case context.Canceled: // 有人主动取消了,即调用了 cancel 方法。在这个例子里不会出现这个情况 case context.DeadlineExceeded: // 超时了 case nil: fmt.Println(val) default: // 其它乱七八糟的 }
Output: 22
func (*ConcurrentArrayBlockingQueue[T]) AsSlice ¶
func (c *ConcurrentArrayBlockingQueue[T]) AsSlice() []T
func (*ConcurrentArrayBlockingQueue[T]) Dequeue ¶
func (c *ConcurrentArrayBlockingQueue[T]) Dequeue(ctx context.Context) (T, error)
Dequeue 出队 通过sema来控制容量、超时、阻塞问题
func (*ConcurrentArrayBlockingQueue[T]) Enqueue ¶
func (c *ConcurrentArrayBlockingQueue[T]) Enqueue(ctx context.Context, t T) error
Enqueue 入队 通过sema来控制容量、超时、阻塞问题
func (*ConcurrentArrayBlockingQueue[T]) Len ¶
func (c *ConcurrentArrayBlockingQueue[T]) Len() int
type ConcurrentLinkedBlockingQueue ¶
type ConcurrentLinkedBlockingQueue[T any] struct { // contains filtered or unexported fields }
ConcurrentLinkedBlockingQueue 基于链表的并发阻塞队列 如果 maxSize 是正数。那么就是有界并发阻塞队列 如果不是,就是无界并发阻塞队列, 在这种情况下,入队永远能够成功
func NewConcurrentLinkedBlockingQueue ¶
func NewConcurrentLinkedBlockingQueue[T any](capacity int) *ConcurrentLinkedBlockingQueue[T]
NewConcurrentLinkedBlockingQueue 创建链式阻塞队列 capacity <= 0 时,为无界队列
Example ¶
// 创建一个容量为 10 的有界并发阻塞队列,如果传入 0 或者负数,那么创建的是无界并发阻塞队列 q := NewConcurrentLinkedBlockingQueue[int](10) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() _ = q.Enqueue(ctx, 22) val, err := q.Dequeue(ctx) // 这是例子,实际中你不需要写得那么复杂 switch err { case context.Canceled: // 有人主动取消了,即调用了 cancel 方法。在这个例子里不会出现这个情况 case context.DeadlineExceeded: // 超时了 case nil: fmt.Println(val) default: // 其它乱七八糟的 }
Output: 22
func (*ConcurrentLinkedBlockingQueue[T]) AsSlice ¶
func (c *ConcurrentLinkedBlockingQueue[T]) AsSlice() []T
func (*ConcurrentLinkedBlockingQueue[T]) Dequeue ¶
func (c *ConcurrentLinkedBlockingQueue[T]) Dequeue(ctx context.Context) (T, error)
Dequeue 出队 注意:目前我们已经通过broadcast实现了超时控制
func (*ConcurrentLinkedBlockingQueue[T]) Enqueue ¶
func (c *ConcurrentLinkedBlockingQueue[T]) Enqueue(ctx context.Context, t T) error
Enqueue 入队 注意:目前我们已经通过broadcast实现了超时控制
func (*ConcurrentLinkedBlockingQueue[T]) Len ¶
func (c *ConcurrentLinkedBlockingQueue[T]) Len() int
type ConcurrentLinkedQueue ¶
type ConcurrentLinkedQueue[T any] struct { // contains filtered or unexported fields }
ConcurrentLinkedQueue 无界并发安全队列
func NewConcurrentLinkedQueue ¶
func NewConcurrentLinkedQueue[T any]() *ConcurrentLinkedQueue[T]
Example ¶
q := NewConcurrentLinkedQueue[int]() _ = q.Enqueue(10) val, err := q.Dequeue() if err != nil { // 一般意味着队列为空 fmt.Println(err) } fmt.Println(val)
Output: 10
func (*ConcurrentLinkedQueue[T]) Dequeue ¶
func (c *ConcurrentLinkedQueue[T]) Dequeue() (T, error)
func (*ConcurrentLinkedQueue[T]) Enqueue ¶
func (c *ConcurrentLinkedQueue[T]) Enqueue(t T) error
type ConcurrentPriorityQueue ¶
type ConcurrentPriorityQueue[T any] struct { // contains filtered or unexported fields }
func NewConcurrentPriorityQueue ¶
func NewConcurrentPriorityQueue[T any](capacity int, compare ekit.Comparator[T]) *ConcurrentPriorityQueue[T]
NewConcurrentPriorityQueue 创建优先队列 capacity <= 0 时,为无界队列
Example ¶
q := NewConcurrentPriorityQueue[int](10, ekit.ComparatorRealNumber[int]) _ = q.Enqueue(3) _ = q.Enqueue(2) _ = q.Enqueue(1) var vals []int val, _ := q.Dequeue() vals = append(vals, val) val, _ = q.Dequeue() vals = append(vals, val) val, _ = q.Dequeue() vals = append(vals, val) fmt.Println(vals)
Output: [1 2 3]
func (*ConcurrentPriorityQueue[T]) Cap ¶
func (c *ConcurrentPriorityQueue[T]) Cap() int
Cap 无界队列返回0,有界队列返回创建队列时设置的值
func (*ConcurrentPriorityQueue[T]) Dequeue ¶
func (c *ConcurrentPriorityQueue[T]) Dequeue() (T, error)
func (*ConcurrentPriorityQueue[T]) Enqueue ¶
func (c *ConcurrentPriorityQueue[T]) Enqueue(t T) error
func (*ConcurrentPriorityQueue[T]) Len ¶
func (c *ConcurrentPriorityQueue[T]) Len() int
func (*ConcurrentPriorityQueue[T]) Peek ¶
func (c *ConcurrentPriorityQueue[T]) Peek() (T, error)
type DelayQueue ¶
type DelayQueue[T Delayable] struct { // contains filtered or unexported fields }
DelayQueue 延时队列 每次出队的元素必然都是已经到期的元素,即 Delay() 返回的值小于等于 0 延时队列本身对时间的精确度并不是很高,其时间精确度主要取决于 time.Timer 所以如果你需要极度精确的延时队列,那么这个结构并不太适合你。 但是如果你能够容忍至多在毫秒级的误差,那么这个结构还是可以使用的
func NewDelayQueue ¶
func NewDelayQueue[T Delayable](c int) *DelayQueue[T]
Example ¶
q := NewDelayQueue[delayElem](10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() now := time.Now() _ = q.Enqueue(ctx, delayElem{ // 3 秒后过期 deadline: now.Add(time.Second * 3), val: 3, }) _ = q.Enqueue(ctx, delayElem{ // 2 秒后过期 deadline: now.Add(time.Second * 2), val: 2, }) _ = q.Enqueue(ctx, delayElem{ // 1 秒后过期 deadline: now.Add(time.Second * 1), val: 1, }) var vals []int val, _ := q.Dequeue(ctx) vals = append(vals, val.val) val, _ = q.Dequeue(ctx) vals = append(vals, val.val) val, _ = q.Dequeue(ctx) vals = append(vals, val.val) fmt.Println(vals) duration := time.Since(now) if duration > time.Second*3 { fmt.Println("delay!") }
Output: [1 2 3] delay!