Documentation
¶
Overview ¶
Package limits helps enforce request-scoped, multi-tenant limits with three kinds of Limiter:
- RateLimiter: for throttling usage
- ResourceLimiter/ResourcePoolLimiter: for allocating resources
- TimeLimiter: for enforcing timeouts
- BoundLimiter: for enforcing bounds
- QueueLimiter: for limited capacity queues
Every limit requires a default value. Additional features like Otel metrics and dynamic updates are available by using the settings.Setting variants.
Limiter errors are GRPC codes.ResourceExhausted and codes.DeadlineExceeded.
Index ¶
- Variables
- type BoundLimiter
- type ErrorBoundLimited
- type ErrorQueueFull
- type ErrorRateLimited
- type ErrorResourceLimited
- type ErrorTimeLimited
- type Factory
- func (f Factory) MakeRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter, error)
- func (f Factory) MakeTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLimiter, error)
- func (f Factory) NewRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter, error)deprecated
- func (f Factory) NewTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLimiter, error)deprecated
- type Limiter
- type MultiRateLimiter
- func (m MultiRateLimiter) Allow(ctx context.Context) bool
- func (m MultiRateLimiter) AllowErr(ctx context.Context) error
- func (m MultiRateLimiter) AllowN(ctx context.Context, t time.Time, n int) bool
- func (m MultiRateLimiter) AllowNErr(ctx context.Context, t time.Time, n int) error
- func (m MultiRateLimiter) Close() (err error)
- func (m MultiRateLimiter) Limit(ctx context.Context) (config.Rate, error)
- func (m MultiRateLimiter) Reserve(ctx context.Context) (Reservation, error)
- func (m MultiRateLimiter) ReserveN(ctx context.Context, t time.Time, n int) (Reservation, error)
- func (m MultiRateLimiter) Wait(ctx context.Context) error
- func (m MultiRateLimiter) WaitN(ctx context.Context, n int) (err error)
- type MultiResourcePoolLimiter
- func (m MultiResourcePoolLimiter[N]) Available(ctx context.Context) (N, error)
- func (m MultiResourcePoolLimiter[N]) Close() (errs error)
- func (m MultiResourcePoolLimiter[N]) Free(ctx context.Context, amount N) (errs error)
- func (m MultiResourcePoolLimiter[N]) Limit(ctx context.Context) (N, error)
- func (m MultiResourcePoolLimiter[N]) Use(ctx context.Context, amount N) error
- func (m MultiResourcePoolLimiter[N]) Wait(ctx context.Context, amount N) (func(), error)
- type Number
- type QueueLimiter
- type RateLimiter
- type Reservation
- type ResourceLimiter
- type ResourcePoolLimiter
- func GlobalResourcePoolLimiter[N Number](limit N) ResourcePoolLimiter[N]
- func MakeResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error)
- func NewResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error)deprecated
- func OrgResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]
- func OwnerResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]
- func UnlimitedResourcePoolLimiter[N Number]() ResourcePoolLimiter[N]
- func WorkflowResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]
- type TimeLimiter
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrQueueEmpty = fmt.Errorf("queue is empty")
Functions ¶
This section is empty.
Types ¶
type BoundLimiter ¶ added in v0.9.1
type BoundLimiter[N Number] interface { Limiter[N] // Check returns ErrorBoundLimited if the value is above the limit. Check(context.Context, N) error }
BoundLimiter is a limiter for simple bounds checks.
func MakeBoundLimiter ¶ added in v0.9.1
MakeBoundLimiter returns a BoundLimiter for the given bound and configured by the Factory. If Meter is set, the following metrics will be emitted
- bound.*.limit - gauge
- bound.*.usage - histogram
- bound.*.denied - histogram
func NewBoundLimiter ¶ added in v0.9.1
func NewBoundLimiter[N Number](bound N) BoundLimiter[N]
NewBoundLimiter returns a BoundLimiter with the given bound.
type ErrorBoundLimited ¶ added in v0.9.1
type ErrorBoundLimited[N Number] struct { Key string Scope settings.Scope Tenant string Limit, Amount N }
func (ErrorBoundLimited[N]) Error ¶ added in v0.9.1
func (e ErrorBoundLimited[N]) Error() string
func (ErrorBoundLimited[N]) GRPCStatus ¶ added in v0.9.1
func (e ErrorBoundLimited[N]) GRPCStatus() *status.Status
func (ErrorBoundLimited[N]) Is ¶ added in v0.9.1
func (e ErrorBoundLimited[N]) Is(target error) bool
type ErrorQueueFull ¶ added in v0.9.1
func (ErrorQueueFull) Error ¶ added in v0.9.1
func (e ErrorQueueFull) Error() string
func (ErrorQueueFull) GRPCStatus ¶ added in v0.9.1
func (e ErrorQueueFull) GRPCStatus() *status.Status
func (ErrorQueueFull) Is ¶ added in v0.9.1
func (e ErrorQueueFull) Is(target error) bool
type ErrorRateLimited ¶
func (ErrorRateLimited) Error ¶
func (e ErrorRateLimited) Error() string
func (ErrorRateLimited) GRPCStatus ¶
func (e ErrorRateLimited) GRPCStatus() *status.Status
func (ErrorRateLimited) Is ¶
func (e ErrorRateLimited) Is(target error) bool
func (ErrorRateLimited) Unwrap ¶
func (e ErrorRateLimited) Unwrap() error
type ErrorResourceLimited ¶
type ErrorResourceLimited[N Number] struct { Key string Scope settings.Scope Tenant string Used, Limit, Amount N }
func (ErrorResourceLimited[N]) Error ¶
func (e ErrorResourceLimited[N]) Error() string
func (ErrorResourceLimited[N]) GRPCStatus ¶
func (e ErrorResourceLimited[N]) GRPCStatus() *status.Status
func (ErrorResourceLimited[N]) Is ¶
func (e ErrorResourceLimited[N]) Is(target error) bool
type ErrorTimeLimited ¶
type ErrorTimeLimited struct {
Key string
Scope settings.Scope
Tenant string
Timeout time.Duration
}
func (ErrorTimeLimited) Error ¶
func (e ErrorTimeLimited) Error() string
func (ErrorTimeLimited) GRPCStatus ¶
func (e ErrorTimeLimited) GRPCStatus() *status.Status
func (ErrorTimeLimited) Is ¶
func (e ErrorTimeLimited) Is(target error) bool
type Factory ¶
type Factory struct {
// Settings is a source of dynamic limit and burst updates.
// [settings.Getter.GetScoped] will be polled for updates, unless Settings is also a settings.Registry, in which case
// the channel based [settings.Registry.SubscribeScoped] will be used instead.
Settings settings.Getter // optional
// Meter is an optional way to emit Open Telemetry metrics.
Meter metric.Meter // optional
// Logger is used when parsing fails and a limit falls back to the default value.
Logger logger.Logger // optional
}
Factory holds optional configuration for constructing [Limit]s.
func (Factory) MakeRateLimiter ¶ added in v0.9.1
MakeRateLimiter creates a RateLimiter for the given rate and configured by the Factory. If Meter is set, the following metrics will be emitted
- rate.*.limit - float gauge
- rate.*.burst - int gauge
- rate.*.usage - int counter
- rate.*.denied - int histogram
func (Factory) MakeTimeLimiter ¶ added in v0.9.1
MakeTimeLimiter returns a TimeLimiter for given timeout, and configured by the Factory. If Meter is set, the following metrics will be emitted
- time.*.limit - float gauge
- time.*.runtime - float gauge
- time.*.success - int counter
- time.*.timeout - int counter
Note: Unit will be ignored. All TimeLimiters emit seconds as "s".
func (Factory) NewRateLimiter
deprecated
func (Factory) NewTimeLimiter
deprecated
type MultiRateLimiter ¶
type MultiRateLimiter []RateLimiter
MultiRateLimiter is a RateLimiter composed of other RateLimiters which are applied in order.
Example ¶
ctx := context.Background()
ctxA := contexts.WithCRE(ctx, contexts.CRE{Owner: "0xabcd"})
ctxB := contexts.WithCRE(ctx, contexts.CRE{Workflow: "ABCD"})
global := GlobalRateLimiter(rate.Every(time.Second), 4)
multiA := MultiRateLimiter{global, OwnerRateLimiter(rate.Every(time.Second), 4)}
multiB := MultiRateLimiter{global, WorkflowRateLimiter(rate.Every(time.Second), 4)}
// Try burst limit of 4 from A
var g errgroup.Group
for range 4 {
g.Go(func() error {
return multiA.AllowErr(ctxA)
})
}
if err := g.Wait(); err != nil {
fmt.Println("A:", err)
} else {
fmt.Println("A: success")
}
// Try burst limit of 4 from A & B at the same time
g = errgroup.Group{}
for range 4 {
g.Go(func() error {
return multiA.AllowErr(ctxA)
})
}
for range 4 {
g.Go(func() error {
return multiB.AllowErr(ctxB)
})
}
if err := g.Wait(); err != nil {
fmt.Println("A&B:", err)
} else {
fmt.Println("A&B: success")
}
Output: A: success A&B: rate limited
func (MultiRateLimiter) Close ¶
func (m MultiRateLimiter) Close() (err error)
func (MultiRateLimiter) Reserve ¶
func (m MultiRateLimiter) Reserve(ctx context.Context) (Reservation, error)
func (MultiRateLimiter) ReserveN ¶
func (m MultiRateLimiter) ReserveN(ctx context.Context, t time.Time, n int) (Reservation, error)
type MultiResourcePoolLimiter ¶
type MultiResourcePoolLimiter[N Number] []ResourcePoolLimiter[N]
MultiResourcePoolLimiter is a ResourcePoolLimiter backed by other limiters, which are each called in order.
Example ¶
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ctx = contexts.WithCRE(ctx, contexts.CRE{Org: "orgID", Owner: "owner-id", Workflow: "workflowID"})
global := GlobalResourcePoolLimiter[int](100)
freeGlobal, err := global.Wait(ctx, 95)
if err != nil {
log.Fatal(err)
}
org := OrgResourcePoolLimiter[int](50)
freeOrg, err := org.Wait(ctx, 45)
if err != nil {
log.Fatal(err)
}
user := OwnerResourcePoolLimiter[int](20)
freeUser, err := user.Wait(ctx, 15)
if err != nil {
log.Fatal(err)
}
workflow := WorkflowResourcePoolLimiter[int](10)
freeWorkflow, err := workflow.Wait(ctx, 5)
if err != nil {
log.Fatal(err)
}
multi := MultiResourcePoolLimiter[int]{global, org, user, workflow}
tryWork := func() error {
err := multi.Use(ctx, 10)
if err != nil {
return err
}
return multi.Free(ctx, 10)
}
fmt.Println(tryWork())
freeGlobal()
fmt.Println(tryWork())
freeOrg()
fmt.Println(tryWork())
freeUser()
fmt.Println(tryWork())
freeWorkflow()
fmt.Println(tryWork())
free, err := multi.Wait(ctx, 10)
if err != nil {
log.Fatal(err)
}
free()
Output: resource limited: cannot use 10, already using 95/100 resource limited for org[orgID]: cannot use 10, already using 45/50 resource limited for owner[owner-id]: cannot use 10, already using 15/20 resource limited for workflow[workflowID]: cannot use 10, already using 5/10 <nil>
func (MultiResourcePoolLimiter[N]) Available ¶ added in v0.9.1
func (m MultiResourcePoolLimiter[N]) Available(ctx context.Context) (N, error)
func (MultiResourcePoolLimiter[N]) Close ¶ added in v0.9.0
func (m MultiResourcePoolLimiter[N]) Close() (errs error)
func (MultiResourcePoolLimiter[N]) Free ¶ added in v0.9.0
func (m MultiResourcePoolLimiter[N]) Free(ctx context.Context, amount N) (errs error)
func (MultiResourcePoolLimiter[N]) Limit ¶ added in v0.9.1
func (m MultiResourcePoolLimiter[N]) Limit(ctx context.Context) (N, error)
type Number ¶
type Number interface {
constraints.Integer | constraints.Float
}
Number includes all integer and float types, although metrics will be emitted either as int64 or float64.
type QueueLimiter ¶ added in v0.9.1
type QueueLimiter[T any] interface { Limiter[int] // Len returns the current size of the queue. Len(context.Context) (int, error) // Put queues the value, or returns ErrorQueueFull. Put(context.Context, T) error // Get returns the next value if available, otherwise ErrQueueEmpty. Get(context.Context) (T, error) // Wait gets the next value, waiting up until context cancellation. Wait(context.Context) (T, error) }
QueueLimiter is a limiter for queues.
Example ¶
ctx := context.Background()
ql := NewQueueLimiter[string](2)
if err := ql.Put(ctx, "foo"); err != nil {
log.Fatalf("Failed to put foo: %v", err)
}
fmt.Println("Queued foo")
// [foo]
if err := ql.Put(ctx, "bar"); err != nil {
log.Fatalf("Failed to put bar: %v", err)
}
fmt.Println("Queued bar")
// [foo, bar]
if err := ql.Put(ctx, "baz"); err == nil {
log.Fatalf("Put baz when queue should have been full")
}
fmt.Println("Queued too full for baz")
if v, err := ql.Get(ctx); err != nil {
log.Fatalf("Failed to get foo: %v", err)
} else if v != "foo" {
log.Fatalf("Got %s, but expected foo", v)
}
fmt.Println("Got foo")
// [bar]
if err := ql.Put(ctx, "baz"); err != nil {
log.Fatalf("Failed to put baz: %v", err)
}
fmt.Println("Queued baz")
// [bar baz]
if v, err := ql.Get(ctx); err != nil {
log.Fatalf("Failed to get bar: %v", err)
} else if v != "bar" {
log.Fatalf("Got %s, but expected bar", v)
}
fmt.Println("Got bar")
if v, err := ql.Get(ctx); err != nil {
log.Fatalf("Failed to get baz: %v", err)
} else if v != "baz" {
log.Fatalf("Got %s, but expected baz", v)
}
fmt.Println("Got baz")
// []
l, err := ql.Len(ctx)
if err != nil {
log.Fatalf("Failed to get length: %v", err)
}
fmt.Println("Queue length", l)
Output: Queued foo Queued bar Queued too full for baz Got foo Queued baz Got bar Got baz Queue length 0
func MakeQueueLimiter ¶ added in v0.9.1
MakeQueueLimiter returns a QueueLimiter for the given limit and configured by the Factory. If Meter is set, the following metrics will be emitted
- queue.*.limit - int gauge
- queue.*.usage - int gauge
- queue.*.denied - int histogram
func NewQueueLimiter ¶ added in v0.9.1
func NewQueueLimiter[T any](capacity int) QueueLimiter[T]
NewQueueLimiter returns a simple static QueueLimiter.
type RateLimiter ¶
type RateLimiter interface {
Limiter[config.Rate]
// Allow reports whether an event may happen now.
Allow(ctx context.Context) bool
// AllowN reports whether n events may happen at time t.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise, use Reserve or Wait.
AllowN(ctx context.Context, t time.Time, n int) bool
// AllowErr is like Allow, but returns an error.
AllowErr(ctx context.Context) error
// AllowNErr is like AllowN, but returns an error.
AllowNErr(ctx context.Context, t time.Time, n int) error
// Reserve is shorthand for ReserveN(time.Now(), 1).
Reserve(ctx context.Context) (Reservation, error)
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
// Usage example:
//
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
//
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
ReserveN(ctx context.Context, t time.Time, n int) (Reservation, error)
// Wait is shorthand for WaitN(ctx, 1).
Wait(ctx context.Context) (err error)
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
WaitN(ctx context.Context, n int) (err error)
}
A RateLimiter applies rate limits. These methods are a subset of rate.Limiter, with context.Context based scoping, and some *Err variants. Methods with errors will return ErrorRateLimited when limits are encountered.
func GlobalRateLimiter ¶
func GlobalRateLimiter(limit rate.Limit, burst int) RateLimiter
GlobalRateLimiter returns an unscoped RateLimiter for the given limit and burst.
func OrgRateLimiter ¶
func OrgRateLimiter(limit rate.Limit, burst int) RateLimiter
OrgRateLimiter returns a RateLimiter scoped by org for the given limit and burst.
func OwnerRateLimiter ¶
func OwnerRateLimiter(limit rate.Limit, burst int) RateLimiter
OwnerRateLimiter returns a RateLimiter scoped by owner for the given limit and burst.
func UnlimitedRateLimiter ¶
func UnlimitedRateLimiter() RateLimiter
UnlimitedRateLimiter returns a RateLimiter without any limit. Every call is allowed, all reservations are accepted without delay, and no calls have to wait.
func WorkflowRateLimiter ¶
func WorkflowRateLimiter(limit rate.Limit, burst int) RateLimiter
WorkflowRateLimiter returns a RateLimiter scoped by workflow for the given limit and burst.
type Reservation ¶
type Reservation interface {
OK() bool
Delay() time.Duration
DelayFrom(time.Time) time.Duration
Cancel()
CancelAt(time.Time)
// Allow returns true if no Delay is required.
Allow() bool
// AllowErr is like Allow but includes a detailed error.
AllowErr() error
}
Reservation extends the exported interface of *rate.Reservation.
type ResourceLimiter ¶
type ResourceLimiter[N Number] interface { Limiter[N] // Use increases the resource count by amount, or returns an error if the limit is reached. // It does not block. Use a ResourcePoolLimiter for blocking semantics. Use(ctx context.Context, amount N) error // Free is the counterpart to Use and releases amount of resources from use. Free(ctx context.Context, amount N) error // Available returns Available(ctx context.Context) (N, error) }
ResourceLimiter is a limiter for resources, where each interaction is typically single-action.
func UnlimitedResourceLimiter ¶
func UnlimitedResourceLimiter[N Number]() ResourceLimiter[N]
type ResourcePoolLimiter ¶
type ResourcePoolLimiter[N Number] interface { ResourceLimiter[N] // Wait is like Use, but blocks until resources are available, or context has expired. The free func must be // called and should be deferred immediately when possible. It effectively calls Free to release N resources. Wait(context.Context, N) (free func(), err error) }
ResourcePoolLimiter is a limiter for a pool of resources, with concurrent active use, and extends the ResourceLimiter API with a Wait method to simplify the typical two-step interaction via a free func() to return resources to the pool.
func GlobalResourcePoolLimiter ¶
func GlobalResourcePoolLimiter[N Number](limit N) ResourcePoolLimiter[N]
GlobalResourcePoolLimiter returns an unscoped ResourcePoolLimiter with default options. See MakeResourcePoolLimiter for dynamic limits, metering, and more.
func MakeResourcePoolLimiter ¶ added in v0.9.1
func MakeResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error)
MakeResourcePoolLimiter returns a ResourcePoolLimiter for the given limit, and configured by the Factory. If Meter is set, the following metrics will be emitted
- resource.*.limit - gauge
- resource.*.usage - gauge
- resource.*.amount - histogram
- resource.*.denied - histogram
func NewResourcePoolLimiter
deprecated
func OrgResourcePoolLimiter ¶
func OrgResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]
OrgResourcePoolLimiter creates a new ResourcePoolLimiter scoped per organization.
func OwnerResourcePoolLimiter ¶
func OwnerResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]
OwnerResourcePoolLimiter creates a new ResourcePoolLimiter scoped per user.
func UnlimitedResourcePoolLimiter ¶
func UnlimitedResourcePoolLimiter[N Number]() ResourcePoolLimiter[N]
func WorkflowResourcePoolLimiter ¶
func WorkflowResourcePoolLimiter[N Number](defaultLimit N) ResourcePoolLimiter[N]
WorkflowResourcePoolLimiter creates a new ResourcePoolLimiter scoped per workflow.
type TimeLimiter ¶
type TimeLimiter interface {
Limiter[time.Duration]
// WithTimeout is like context.WithTimeout, but automatically applies the timeout
// from this TimeLimiter, and returns a done func() that must be called to signal completion.
WithTimeout(context.Context) (ctx context.Context, done func(), err error)
}
TimeLimiter is a Limiter that enforces timeouts.
func NewTimeLimiter ¶
func NewTimeLimiter(timeout time.Duration) TimeLimiter
NewTimeLimiter returns a simple TimeLimiter with the given time out.