limits

package
v0.9.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2025 License: MIT Imports: 21 Imported by: 12

Documentation

Overview

Package limits helps enforce request-scoped, multi-tenant limits with three kinds of Limiter:

Every limit requires a default value. Additional features like Otel metrics and dynamic updates are available by using the settings.Setting variants.

Limiter errors are GRPC codes.ResourceExhausted and codes.DeadlineExceeded.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrQueueEmpty = fmt.Errorf("queue is empty")

Functions

This section is empty.

Types

type BoundLimiter added in v0.9.1

type BoundLimiter[N Number] interface {
	Limiter[N]
	// Check returns ErrorBoundLimited if the value is above the limit.
	Check(context.Context, N) error
}

BoundLimiter is a limiter for simple bounds checks.

func MakeBoundLimiter added in v0.9.1

func MakeBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimiter[N], error)

MakeBoundLimiter returns a BoundLimiter for the given bound and configured by the Factory. If Meter is set, the following metrics will be emitted

  • bound.*.limit - gauge
  • bound.*.usage - histogram
  • bound.*.denied - histogram

func NewBoundLimiter added in v0.9.1

func NewBoundLimiter[N Number](bound N) BoundLimiter[N]

NewBoundLimiter returns a BoundLimiter with the given bound.

type ErrorBoundLimited added in v0.9.1

type ErrorBoundLimited[N Number] struct {
	Key string

	Scope  settings.Scope
	Tenant string

	Limit, Amount N
}

func (ErrorBoundLimited[N]) Error added in v0.9.1

func (e ErrorBoundLimited[N]) Error() string

func (ErrorBoundLimited[N]) GRPCStatus added in v0.9.1

func (e ErrorBoundLimited[N]) GRPCStatus() *status.Status

func (ErrorBoundLimited[N]) Is added in v0.9.1

func (e ErrorBoundLimited[N]) Is(target error) bool

type ErrorQueueFull added in v0.9.1

type ErrorQueueFull struct {
	Key string

	Scope  settings.Scope
	Tenant string

	Limit int
}

func (ErrorQueueFull) Error added in v0.9.1

func (e ErrorQueueFull) Error() string

func (ErrorQueueFull) GRPCStatus added in v0.9.1

func (e ErrorQueueFull) GRPCStatus() *status.Status

func (ErrorQueueFull) Is added in v0.9.1

func (e ErrorQueueFull) Is(target error) bool

type ErrorRateLimited

type ErrorRateLimited struct {
	Key string

	Scope  settings.Scope
	Tenant string

	N int

	Err error
}

func (ErrorRateLimited) Error

func (e ErrorRateLimited) Error() string

func (ErrorRateLimited) GRPCStatus

func (e ErrorRateLimited) GRPCStatus() *status.Status

func (ErrorRateLimited) Is

func (e ErrorRateLimited) Is(target error) bool

func (ErrorRateLimited) Unwrap

func (e ErrorRateLimited) Unwrap() error

type ErrorResourceLimited

type ErrorResourceLimited[N Number] struct {
	Key string

	Scope  settings.Scope
	Tenant string

	Used, Limit, Amount N
}

func (ErrorResourceLimited[N]) Error

func (e ErrorResourceLimited[N]) Error() string

func (ErrorResourceLimited[N]) GRPCStatus

func (e ErrorResourceLimited[N]) GRPCStatus() *status.Status

func (ErrorResourceLimited[N]) Is

func (e ErrorResourceLimited[N]) Is(target error) bool

type ErrorTimeLimited

type ErrorTimeLimited struct {
	Key string

	Scope  settings.Scope
	Tenant string

	Timeout time.Duration
}

func (ErrorTimeLimited) Error

func (e ErrorTimeLimited) Error() string

func (ErrorTimeLimited) GRPCStatus

func (e ErrorTimeLimited) GRPCStatus() *status.Status

func (ErrorTimeLimited) Is

func (e ErrorTimeLimited) Is(target error) bool

type Factory

type Factory struct {
	// Settings is a source of dynamic limit and burst updates.
	// [settings.Getter.GetScoped] will be polled for updates, unless Settings is also a settings.Registry, in which case
	// the channel based [settings.Registry.SubscribeScoped] will be used instead.
	Settings settings.Getter // optional

	// Meter is an optional way to emit Open Telemetry metrics.
	Meter metric.Meter // optional

	// Logger is used when parsing fails and a limit falls back to the default value.
	Logger logger.Logger // optional
}

Factory holds optional configuration for constructing [Limit]s.

func (Factory) MakeRateLimiter added in v0.9.1

func (f Factory) MakeRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter, error)

MakeRateLimiter creates a RateLimiter for the given rate and configured by the Factory. If Meter is set, the following metrics will be emitted

  • rate.*.limit - float gauge
  • rate.*.burst - int gauge
  • rate.*.usage - int counter
  • rate.*.denied - int histogram

func (Factory) MakeTimeLimiter added in v0.9.1

func (f Factory) MakeTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLimiter, error)

MakeTimeLimiter returns a TimeLimiter for given timeout, and configured by the Factory. If Meter is set, the following metrics will be emitted

  • time.*.limit - float gauge
  • time.*.runtime - float gauge
  • time.*.success - int counter
  • time.*.timeout - int counter

Note: Unit will be ignored. All TimeLimiters emit seconds as "s".

func (Factory) NewRateLimiter deprecated

func (f Factory) NewRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter, error)

Deprecated: use MakeRateLimiter

func (Factory) NewTimeLimiter deprecated

func (f Factory) NewTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLimiter, error)

Deprecated: use MakeTimeLimiter

type Limiter

type Limiter[N any] interface {
	io.Closer // Limiters spawn background goroutines and must be closed.
	// Limit returns the current limit.
	Limit(context.Context) (N, error)
}

type MultiRateLimiter

type MultiRateLimiter []RateLimiter

MultiRateLimiter is a RateLimiter composed of other RateLimiters which are applied in order.

Example
ctx := context.Background()
ctxA := contexts.WithCRE(ctx, contexts.CRE{Owner: "0xabcd"})
ctxB := contexts.WithCRE(ctx, contexts.CRE{Workflow: "ABCD"})

global := GlobalRateLimiter(rate.Every(time.Second), 4)
multiA := MultiRateLimiter{global, OwnerRateLimiter(rate.Every(time.Second), 4)}
multiB := MultiRateLimiter{global, WorkflowRateLimiter(rate.Every(time.Second), 4)}

// Try burst limit of 4 from A
var g errgroup.Group
for range 4 {
	g.Go(func() error {
		return multiA.AllowErr(ctxA)
	})
}
if err := g.Wait(); err != nil {
	fmt.Println("A:", err)
} else {
	fmt.Println("A: success")
}

// Try burst limit of 4 from A & B at the same time

g = errgroup.Group{}
for range 4 {
	g.Go(func() error {
		return multiA.AllowErr(ctxA)
	})
}
for range 4 {
	g.Go(func() error {
		return multiB.AllowErr(ctxB)
	})
}
if err := g.Wait(); err != nil {
	fmt.Println("A&B:", err)
} else {
	fmt.Println("A&B: success")
}
Output:

A: success
A&B: rate limited

func (MultiRateLimiter) Allow

func (m MultiRateLimiter) Allow(ctx context.Context) bool

func (MultiRateLimiter) AllowErr

func (m MultiRateLimiter) AllowErr(ctx context.Context) error

func (MultiRateLimiter) AllowN

func (m MultiRateLimiter) AllowN(ctx context.Context, t time.Time, n int) bool

func (MultiRateLimiter) AllowNErr

func (m MultiRateLimiter) AllowNErr(ctx context.Context, t time.Time, n int) error

func (MultiRateLimiter) Close

func (m MultiRateLimiter) Close() (err error)

func (MultiRateLimiter) Limit added in v0.9.1

func (m MultiRateLimiter) Limit(ctx context.Context) (config.Rate, error)

func (MultiRateLimiter) Reserve

func (m MultiRateLimiter) Reserve(ctx context.Context) (Reservation, error)

func (MultiRateLimiter) ReserveN

func (m MultiRateLimiter) ReserveN(ctx context.Context, t time.Time, n int) (Reservation, error)

func (MultiRateLimiter) Wait

func (m MultiRateLimiter) Wait(ctx context.Context) error

func (MultiRateLimiter) WaitN

func (m MultiRateLimiter) WaitN(ctx context.Context, n int) (err error)

type MultiResourcePoolLimiter

type MultiResourcePoolLimiter[N Number] []ResourcePoolLimiter[N]

MultiResourcePoolLimiter is a ResourcePoolLimiter backed by other limiters, which are each called in order.

Example
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ctx = contexts.WithCRE(ctx, contexts.CRE{Org: "orgID", Owner: "owner-id", Workflow: "workflowID"})
global := GlobalResourcePoolLimiter[int](100)
freeGlobal, err := global.Wait(ctx, 95)
if err != nil {
	log.Fatal(err)
}
org := OrgResourcePoolLimiter[int](50)
freeOrg, err := org.Wait(ctx, 45)
if err != nil {
	log.Fatal(err)
}
user := OwnerResourcePoolLimiter[int](20)
freeUser, err := user.Wait(ctx, 15)
if err != nil {
	log.Fatal(err)
}
workflow := WorkflowResourcePoolLimiter[int](10)
freeWorkflow, err := workflow.Wait(ctx, 5)
if err != nil {
	log.Fatal(err)
}
multi := MultiResourcePoolLimiter[int]{global, org, user, workflow}
tryWork := func() error {
	err := multi.Use(ctx, 10)
	if err != nil {
		return err
	}
	return multi.Free(ctx, 10)
}

fmt.Println(tryWork())
freeGlobal()
fmt.Println(tryWork())
freeOrg()
fmt.Println(tryWork())
freeUser()
fmt.Println(tryWork())
freeWorkflow()
fmt.Println(tryWork())
free, err := multi.Wait(ctx, 10)
if err != nil {
	log.Fatal(err)
}
free()
Output:

resource limited: cannot use 10, already using 95/100
resource limited for org[orgID]: cannot use 10, already using 45/50
resource limited for owner[owner-id]: cannot use 10, already using 15/20
resource limited for workflow[workflowID]: cannot use 10, already using 5/10
<nil>

func (MultiResourcePoolLimiter[N]) Available added in v0.9.1

func (m MultiResourcePoolLimiter[N]) Available(ctx context.Context) (N, error)

func (MultiResourcePoolLimiter[N]) Close added in v0.9.0

func (m MultiResourcePoolLimiter[N]) Close() (errs error)

func (MultiResourcePoolLimiter[N]) Free added in v0.9.0

func (m MultiResourcePoolLimiter[N]) Free(ctx context.Context, amount N) (errs error)

func (MultiResourcePoolLimiter[N]) Limit added in v0.9.1

func (m MultiResourcePoolLimiter[N]) Limit(ctx context.Context) (N, error)

func (MultiResourcePoolLimiter[N]) Use

func (m MultiResourcePoolLimiter[N]) Use(ctx context.Context, amount N) error

func (MultiResourcePoolLimiter[N]) Wait

func (m MultiResourcePoolLimiter[N]) Wait(ctx context.Context, amount N) (func(), error)

type Number

type Number interface {
	constraints.Integer | constraints.Float
}

Number includes all integer and float types, although metrics will be emitted either as int64 or float64.

type QueueLimiter added in v0.9.1

type QueueLimiter[T any] interface {
	Limiter[int]
	// Len returns the current size of the queue.
	Len(context.Context) (int, error)
	// Put queues the value, or returns ErrorQueueFull.
	Put(context.Context, T) error
	// Get returns the next value if available, otherwise ErrQueueEmpty.
	Get(context.Context) (T, error)
	// Wait gets the next value, waiting up until context cancellation.
	Wait(context.Context) (T, error)
}

QueueLimiter is a limiter for queues.

Example
ctx := context.Background()
ql := NewQueueLimiter[string](2)

if err := ql.Put(ctx, "foo"); err != nil {
	log.Fatalf("Failed to put foo: %v", err)
}
fmt.Println("Queued foo")
// [foo]
if err := ql.Put(ctx, "bar"); err != nil {
	log.Fatalf("Failed to put bar: %v", err)
}
fmt.Println("Queued bar")
// [foo, bar]
if err := ql.Put(ctx, "baz"); err == nil {
	log.Fatalf("Put baz when queue should have been full")
}
fmt.Println("Queued too full for baz")

if v, err := ql.Get(ctx); err != nil {
	log.Fatalf("Failed to get foo: %v", err)
} else if v != "foo" {
	log.Fatalf("Got %s, but expected foo", v)
}
fmt.Println("Got foo")
// [bar]
if err := ql.Put(ctx, "baz"); err != nil {
	log.Fatalf("Failed to put baz: %v", err)
}
fmt.Println("Queued baz")
// [bar baz]

if v, err := ql.Get(ctx); err != nil {
	log.Fatalf("Failed to get bar: %v", err)
} else if v != "bar" {
	log.Fatalf("Got %s, but expected bar", v)
}
fmt.Println("Got bar")
if v, err := ql.Get(ctx); err != nil {
	log.Fatalf("Failed to get baz: %v", err)
} else if v != "baz" {
	log.Fatalf("Got %s, but expected baz", v)
}
fmt.Println("Got baz")
// []
l, err := ql.Len(ctx)
if err != nil {
	log.Fatalf("Failed to get length: %v", err)
}
fmt.Println("Queue length", l)
Output:

Queued foo
Queued bar
Queued too full for baz
Got foo
Queued baz
Got bar
Got baz
Queue length 0

func MakeQueueLimiter added in v0.9.1

func MakeQueueLimiter[T any](f Factory, limit settings.Setting[int]) (QueueLimiter[T], error)

MakeQueueLimiter returns a QueueLimiter for the given limit and configured by the Factory. If Meter is set, the following metrics will be emitted

  • queue.*.limit - int gauge
  • queue.*.usage - int gauge
  • queue.*.denied - int histogram

func NewQueueLimiter added in v0.9.1

func NewQueueLimiter[T any](capacity int) QueueLimiter[T]

NewQueueLimiter returns a simple static QueueLimiter.

type RateLimiter

type RateLimiter interface {
	Limiter[config.Rate]

	// Allow reports whether an event may happen now.
	Allow(ctx context.Context) bool
	// AllowN reports whether n events may happen at time t.
	// Use this method if you intend to drop / skip events that exceed the rate limit.
	// Otherwise, use Reserve or Wait.
	AllowN(ctx context.Context, t time.Time, n int) bool
	// AllowErr is like Allow, but returns an error.
	AllowErr(ctx context.Context) error
	// AllowNErr is like AllowN, but returns an error.
	AllowNErr(ctx context.Context, t time.Time, n int) error

	// Reserve is shorthand for ReserveN(time.Now(), 1).
	Reserve(ctx context.Context) (Reservation, error)
	// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
	// The Limiter takes this Reservation into account when allowing future events.
	// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
	// Usage example:
	//
	//	r := lim.ReserveN(time.Now(), 1)
	//	if !r.OK() {
	//	  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
	//	  return
	//	}
	//	time.Sleep(r.Delay())
	//	Act()
	//
	// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
	// If you need to respect a deadline or cancel the delay, use Wait instead.
	// To drop or skip events exceeding rate limit, use Allow instead.
	ReserveN(ctx context.Context, t time.Time, n int) (Reservation, error)

	// Wait is shorthand for WaitN(ctx, 1).
	Wait(ctx context.Context) (err error)
	// WaitN blocks until lim permits n events to happen.
	// It returns an error if n exceeds the Limiter's burst size, the Context is
	// canceled, or the expected wait time exceeds the Context's Deadline.
	WaitN(ctx context.Context, n int) (err error)
}

A RateLimiter applies rate limits. These methods are a subset of rate.Limiter, with context.Context based scoping, and some *Err variants. Methods with errors will return ErrorRateLimited when limits are encountered.

func GlobalRateLimiter

func GlobalRateLimiter(limit rate.Limit, burst int) RateLimiter

GlobalRateLimiter returns an unscoped RateLimiter for the given limit and burst.

func OrgRateLimiter

func OrgRateLimiter(limit rate.Limit, burst int) RateLimiter

OrgRateLimiter returns a RateLimiter scoped by org for the given limit and burst.

func OwnerRateLimiter

func OwnerRateLimiter(limit rate.Limit, burst int) RateLimiter

OwnerRateLimiter returns a RateLimiter scoped by owner for the given limit and burst.

func UnlimitedRateLimiter

func UnlimitedRateLimiter() RateLimiter

UnlimitedRateLimiter returns a RateLimiter without any limit. Every call is allowed, all reservations are accepted without delay, and no calls have to wait.

func WorkflowRateLimiter

func WorkflowRateLimiter(limit rate.Limit, burst int) RateLimiter

WorkflowRateLimiter returns a RateLimiter scoped by workflow for the given limit and burst.

type Reservation

type Reservation interface {
	OK() bool
	Delay() time.Duration
	DelayFrom(time.Time) time.Duration
	Cancel()
	CancelAt(time.Time)

	// Allow returns true if no Delay is required.
	Allow() bool
	// AllowErr is like Allow but includes a detailed error.
	AllowErr() error
}

Reservation extends the exported interface of *rate.Reservation.

type ResourceLimiter

type ResourceLimiter[N Number] interface {
	Limiter[N]
	// Use increases the resource count by amount, or returns an error if the limit is reached.
	// It does not block. Use a ResourcePoolLimiter for blocking semantics.
	Use(ctx context.Context, amount N) error
	// Free is the counterpart to Use and releases amount of resources from use.
	Free(ctx context.Context, amount N) error
	// Available returns
	Available(ctx context.Context) (N, error)
}

ResourceLimiter is a limiter for resources, where each interaction is typically single-action.

func UnlimitedResourceLimiter

func UnlimitedResourceLimiter[N Number]() ResourceLimiter[N]

type ResourcePoolLimiter

type ResourcePoolLimiter[N Number] interface {
	ResourceLimiter[N]
	// Wait is like Use, but blocks until resources are available, or context has expired. The free func must be
	// called and should be deferred immediately when possible. It effectively calls Free to release N resources.
	Wait(context.Context, N) (free func(), err error)
}

ResourcePoolLimiter is a limiter for a pool of resources, with concurrent active use, and extends the ResourceLimiter API with a Wait method to simplify the typical two-step interaction via a free func() to return resources to the pool.

func GlobalResourcePoolLimiter

func GlobalResourcePoolLimiter[N Number](limit N) ResourcePoolLimiter[N]

GlobalResourcePoolLimiter returns an unscoped ResourcePoolLimiter with default options. See MakeResourcePoolLimiter for dynamic limits, metering, and more.

func MakeResourcePoolLimiter added in v0.9.1

func MakeResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error)

MakeResourcePoolLimiter returns a ResourcePoolLimiter for the given limit, and configured by the Factory. If Meter is set, the following metrics will be emitted

  • resource.*.limit - gauge
  • resource.*.usage - gauge
  • resource.*.amount - histogram
  • resource.*.denied - histogram

func NewResourcePoolLimiter deprecated

func NewResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error)

Deprecated: use MakeResourcePoolLimiter

func OrgResourcePoolLimiter

func OrgResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]

OrgResourcePoolLimiter creates a new ResourcePoolLimiter scoped per organization.

func OwnerResourcePoolLimiter

func OwnerResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]

OwnerResourcePoolLimiter creates a new ResourcePoolLimiter scoped per user.

func UnlimitedResourcePoolLimiter

func UnlimitedResourcePoolLimiter[N Number]() ResourcePoolLimiter[N]

func WorkflowResourcePoolLimiter

func WorkflowResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]

WorkflowResourcePoolLimiter creates a new ResourcePoolLimiter scoped per workflow.

type TimeLimiter

type TimeLimiter interface {
	Limiter[time.Duration]
	// WithTimeout is like context.WithTimeout, but automatically applies the timeout
	// from this TimeLimiter, and returns a done func() that must be called to signal completion.
	WithTimeout(context.Context) (ctx context.Context, done func(), err error)
}

TimeLimiter is a Limiter that enforces timeouts.

func NewTimeLimiter

func NewTimeLimiter(timeout time.Duration) TimeLimiter

NewTimeLimiter returns a simple TimeLimiter with the given time out.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL