Documentation
¶
Overview ¶
Package esync provides synchronization primitives and utilities similar to the stdlib's sync package to help with concurrent and parallel programming.
It is extensively documented: see the many examples in the documentation for the package and its types.
Summary: ¶
- Use Map for a type and concurrency-safe map.
- Use Cache to create a concurrent cache where the user can control invalidation.
- Use ForkJoin and ForkJoin2 to parallelize work across multiple goroutines.
- Adjust ForkMaxProcs to control the number of goroutines created by ForkJoin and ForkJoin2.
Iterators ¶
Copyright ¶
Much of the code and documentation here is copied from the sync package, Copyright 2016 The Go Authors. Use of that source code is governed by a BSD-style license that can be found in the gosrc.license file included with this package.
fork.go contains functions for parallel computation via forking and joining goroutines.
map.go has the Map type and its methods.
pool.go has the Pool and BytePool types and associated methods.
Index ¶
- Variables
- func ForkJoin[K, V any](ctx context.Context, f func(ctx context.Context, k K) V, keys ...K) []V
- func ForkJoin2[K, V, E any](ctx context.Context, f func(ctx context.Context, k K) (V, E), keys ...K) ([]V, []E)
- func ForkJoin2MaxWorkers[K, V, E any](ctx context.Context, f func(ctx context.Context, k K) (V, E), maxWorkers int, ...) ([]V, []E)
- func ForkJoinMaxWorkers[K, V any](ctx context.Context, f func(ctx context.Context, k K) V, maxWorkers int, ...) []V
- type BytePool
- type Cache
- func (c *Cache[K, V]) Check(key K) (val V, err error, stored time.Time, loaded bool)
- func (c *Cache[K, V]) Clear()
- func (c *Cache[K, V]) Delete(key K)
- func (c *Cache[K, V]) Entries() iter.Seq2[K, *Entry[V]]
- func (c *Cache[K, V]) GC(deadline time.Time)
- func (c *Cache[K, V]) Get(ctx context.Context, key K, ttl time.Duration) (v V, err error, loaded bool)
- func (c *Cache[K, V]) LiveEntries(deadline time.Time) iter.Seq2[K, *Entry[V]]
- func (c *Cache[K, V]) StartGC(ctx context.Context, timing <-chan time.Time, ttl time.Duration)
- func (c *Cache[K, V]) Warm(ctx context.Context, keys ...K)
- type Entry
- type Map
- func (m *Map[K, V]) Clear()
- func (m *Map[K, V]) CompareAndDelete(key K, value V) (deleted bool)
- func (m *Map[K, V]) CompareAndSwap(key K, old, new V) (swapped bool)
- func (m *Map[K, V]) Delete(key K)
- func (m *Map[K, V]) KeyValues() iter.Seq2[K, V]
- func (m *Map[K, V]) Keys() iter.Seq[K]
- func (m *Map[K, V]) Load(key K) (value V, ok bool)
- func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)
- func (m *Map[K, V]) Range(f func(key K, value V) bool)
- func (m *Map[K, V]) Store(key K, value V)
- func (m *Map[K, V]) Swap(key K, value V) (prev V, loaded bool)
- func (m *Map[K, V]) Values() iter.Seq[V]
- type Pool
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ForkMaxProcs atomic.Int64
ForkMaxProcs is the maximum number of goroutines that can be run concurrently per call to a ForkJoinXXX function.
Defaults to runtime.GOMAXPROCS(0). Set this to an arbitrary high number to remove the limit. If you want to control concurrency more granularly, use the ForkJoinMaxWorkers and ForkJoin2MaxWorkers functions.
Functions ¶
func ForkJoin ¶
ForkJoin creates a goroutine for each key in keys, calling f with each key and returning the values in the same order as the keys. It uses up to MaxForkedParallelism goroutines.
See also:
- ForkJoinMaxWorkers for fine-grained control over the number of workers.
- ForkJoin2 and ForkJoinMaxWorkers for functions that return two values (i.e, a value and an error).
Example (Fib) ¶
package main import ( "context" "fmt" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { var fib func(context.Context, int) int fib = func(_ context.Context, n int) int { if n < 0 { return 0 } switch n { case 0, 1: return 1 default: return fib(context.TODO(), n-1) + fib(context.TODO(), n-2) } } esync.ForkJoin(context.TODO(), example.Fib, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for i := 0; i < 10; i++ { fmt.Printf("Fib(%d) = %d\n", i, example.Fib(context.TODO(), i)) } }
Output: Fib(0) = 1 Fib(1) = 1 Fib(2) = 2 Fib(3) = 3 Fib(4) = 5 Fib(5) = 8 Fib(6) = 13 Fib(7) = 21 Fib(8) = 34 Fib(9) = 55
func ForkJoin2 ¶
func ForkJoin2[K, V, E any](ctx context.Context, f func(ctx context.Context, k K) (V, E), keys ...K) ([]V, []E)
ForkJoin2 creates a goroutine for each key in keys, calling f with each key and returning the values in the same order as the provided keys. It uses up to MaxForkedParallelism goroutines.
See also:
- ForkJoin2MaxWorkers for fine-grained control over the number of workers.
- ForkJoin and ForkJoinMaxWorkers for functions that return a single value.
Example (GoDoc) ¶
package main import ( "context" "fmt" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { keys := []string{"os", "sync", "fmt", "fakeKey"} // spin up a AZgoroutine for each key to fetch the documentation. vals, errs := esync.ForkJoin2(context.TODO(), example.GoDoc, keys...) const format = "%-10v %-10v %-5v\n" fmt.Printf(format, "key", "len", "error") fmt.Printf(format, "===", "===", "=====") for i := range keys { fmt.Printf(format, keys[i], len(vals[i]), errs[i] != nil) } }
Output: key len error === === ===== os 4604 false sync 743 false fmt 16333 false fakeKey 0 true
func ForkJoin2MaxWorkers ¶
func ForkJoin2MaxWorkers[K, V, E any](ctx context.Context, f func(ctx context.Context, k K) (V, E), maxWorkers int, keys ...K) ([]V, []E)
ForkJoin2 creates a goroutine for each key in keys, calling f with each key and returning the values in the same order as the provided keys.c It uses up to maxWorkers goroutines.
See also:
- ForkJoin2 for a simpler interface.
- ForkJoinMaxWorkers for fine-grained control over the number of workers.
- ForkJoin and ForkJoinMaxWorkers for functions that return a single value.
- ForkJoin2MaxWorkers for functions that return two values (i.e, a value and an error).
func ForkJoinMaxWorkers ¶
func ForkJoinMaxWorkers[K, V any](ctx context.Context, f func(ctx context.Context, k K) V, maxWorkers int, keys ...K) []V
ForkJoinMaxWorkers is as ForkJoin, but you control the maximum number of workers, rather than inferring it from ForkMaxProcs. See also:
- ForkJoin for a simpler interface.
- ForkJoin2MaxWorkers for functions that return two values (i.e, a value and an error).
Types ¶
type BytePool ¶
type BytePool struct {
// contains filtered or unexported fields
}
BytePool is a specialized Pool for []byte, which comes up often enough to warrant special treatment.
func NewBytePool ¶
NewBytePool creates a new BytePool which allocates []byte slices starting at the given size.
type Cache ¶
type Cache[K comparable, V any] struct { // contains filtered or unexported fields }
Concurrent in-memory cache. Use this type to store the results of some expensive computation.
Basic Usage ¶
- NewCache creates a new cache.
- Cache.Get(ctx, key, ttl) will return a cached value if it exists and is not expired; otherwise, it will fetch a new value.
- Cache.Check(key) will return the value without fetching a new one.
Memory usage ¶
By default, this cache only grows and never shrinks. - Use [Cache.StartGC] to start a goroutine that will periodically call [Cache.GC] to delete old entries. - Use [Cache.Delete] to delete a single entry, or [Cache.Clear] to delete all entries.
Example (Check) ¶
package main import ( "context" "fmt" "math" "gitlab.com/efronlicht/esync" ) func main() { realsqrt := func(_ context.Context, x float64) (float64, error) { if x < 0 || math.IsNaN(x) { return 0, fmt.Errorf("expected x >= 0 and not NaN, got %v", x) } return math.Sqrt(x), nil } cache := esync.NewCache(realsqrt) checkAndPrint := func(k float64) { val, err, _, loaded := cache.Check(k) if err != nil { fmt.Printf("k=%v: error: %v: loaded: %v\n", k, err, loaded) return } else { fmt.Printf("k=%v: ok: %.3f: loaded: %v\n", k, val, loaded) } } checkAndPrint(2) // no value yet. cache.Warm(context.TODO(), 2) // warm the cache. checkAndPrint(2) // now we have the value. checkAndPrint(-1) // this will not fetch the value. cache.Warm(context.TODO(), -1) // warm the cache - this should give an error checkAndPrint(-1) // now we have the value. }
Output: k=2: ok: 0.000: loaded: false k=2: ok: 1.414: loaded: true k=-1: ok: 0.000: loaded: false k=-1: error: expected x >= 0 and not NaN, got -1: loaded: true
Example (Clear) ¶
package main import ( "context" "fmt" "time" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { c := esync.NewCache(example.GoDoc) _, _, _ = c.Get(context.TODO(), "os", 5*time.Second) // this will fetch the value. _, _, _ = c.Get(context.TODO(), "sync", 5*time.Second) printWarm := func(key string) { _, _, _, loaded := c.Check(key) fmt.Printf("cache warm for %s: %v\n", key, loaded) } printWarm("os") printWarm("sync") fmt.Println("clearing cache") c.Clear() printWarm("os") printWarm("sync") }
Output: cache warm for os: true cache warm for sync: true clearing cache cache warm for os: false cache warm for sync: false
Example (Delete) ¶
package main import ( "context" "fmt" "time" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { c := esync.NewCache(example.GoDoc) _, _, _ = c.Get(context.TODO(), "os", 5*time.Second) // this will fetch the value. _, _, loaded := c.Get(context.TODO(), "os", 5*time.Second) fmt.Printf("before delete: loaded=%v\n", loaded) // true c.Delete("os") _, _, loaded = c.Get(context.TODO(), "os", 5*time.Second) fmt.Printf("after delete: loaded=%v\n", loaded) // false }
Output: before delete: loaded=true after delete: loaded=false
Example (Entries) ¶
Iterate through all the entries in the cache.
package main import ( "context" "fmt" "time" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { c := esync.NewCache(example.GoDoc) c.Warm(context.TODO(), "os", "sync") t0 := time.Now() time.Sleep(50 * time.Millisecond) c.Warm(context.TODO(), "sync", "fmt") for k, entry := range c.Entries() { fmt.Printf("%s: expired: %v\n", k, entry.Stored.Before(t0)) } }
Output: sync: expired: false os: expired: true fmt: expired: false
Example (Get) ¶
Fetch and cache documentation from pkg.go.dev and cache it.
package main import ( "context" "fmt" "time" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { cache := esync.NewCache(example.GoDoc) printGet := func(key string, ttl time.Duration) { _, err, loaded := cache.Get(context.TODO(), key, ttl) fmt.Printf("key=%s, (err != nil) = %v, loaded=%v\n", key, err != nil, loaded) } printGet("sync", 5*time.Second) // the first call will fetch the documentation. printGet("sync", 5*time.Second) // the cache entry is still fresh, so this will return the documentation from the cache. time.Sleep(50 * time.Millisecond) // ask for a value fresher than the cache entry. printGet("sync", 25*time.Millisecond) // this will fetch the documentation again. // ask for a package we know can't be found. printGet("efronlicht/this-package-does-not-exist", 5*time.Second) // this will fail. // we also cache errors. printGet("efronlicht/this-package-does-not-exist", 5*time.Second) // this will return the error from the cache. }
Output: key=sync, (err != nil) = false, loaded=false key=sync, (err != nil) = false, loaded=true key=sync, (err != nil) = false, loaded=false key=efronlicht/this-package-does-not-exist, (err != nil) = true, loaded=false key=efronlicht/this-package-does-not-exist, (err != nil) = true, loaded=true
Example (LiveEntries) ¶
package main import ( "context" "fmt" "time" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { c := esync.NewCache(example.GoDoc) c.Warm(context.TODO(), "os", "sync") t0 := time.Now() time.Sleep(50 * time.Millisecond) c.Warm(context.TODO(), "sync", "fmt") for k := range c.LiveEntries(t0) { fmt.Printf("%s\n", k) } }
Output: fmt sync
Example (StartGC) ¶
package main import ( "context" "fmt" "time" "gitlab.com/efronlicht/esync" "gitlab.com/efronlicht/esync/example" ) func main() { cache := esync.NewCache(example.GoDoc) ticker := time.NewTicker(20 * time.Millisecond) ctx, cancel := context.WithCancel(context.Background()) const ttl = 100 * time.Millisecond go cache.StartGC(ctx, ticker.C, 100*time.Millisecond) check := func(key string) { _, _, _, loaded := cache.Check(key) if loaded { fmt.Println("hit") } else { fmt.Println("miss") } } check("os") // never fetched, so it should not be in the cache. cache.Warm(context.TODO(), "os") check("os") // os should be in the cache. time.Sleep(300 * time.Millisecond) check("os") // os should have been removed from the cache. cancel() // stop the gc. cache.Warm(context.TODO(), "os") // fetch the value again. check("os") // os should be in the cache. time.Sleep(300 * time.Millisecond) // check("os") // gc has stopped: os should still be in the cache }
Output: miss hit miss hit hit
func NewCache ¶
func NewCache[K comparable, V any](fetch func(context.Context, K) (V, error)) *Cache[K, V]
NewCache creates a new in-memory Cache that fetches values with the given function.
Example (FetchDocumentationViaHTTP) ¶
create a cache for documentation on go.pkg.dev.
package main import ( "context" "fmt" "io" "net/http" "strings" "gitlab.com/efronlicht/esync" ) func main() { cache := esync.NewCache(func(ctx context.Context, path string) (string, error) { r, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://pkg.go.dev/"+path, nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(r) if err != nil { return "", fmt.Errorf("GET %s: %w", r.URL, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("GET %s: %s", r.URL, resp.Status) } var buf strings.Builder if n, err := io.Copy(&buf, resp.Body); err != nil { return "", fmt.Errorf("copying response: error after %d bytes: %w: partial body: %s", n, err, buf.String()) } return buf.String(), nil }) _ = cache }
Output:
func (*Cache[K, V]) Clear ¶
func (c *Cache[K, V]) Clear()
Clear deletes all the entries, resulting in an empty Cache.
See also: - Cache.Delete to delete a single entry. - Cache.GC to delete entries that were stored before a given deadline.
func (*Cache[K, V]) Delete ¶
func (c *Cache[K, V]) Delete(key K)
Delete the value for a key.
See also: - Cache.Clear to delete all the entries. - Cache.GC to delete entries that were stored before a given deadline. - Cache.StartGC to start a goroutine that periodically calls [GC].
func (*Cache[K, V]) Entries ¶
Iterate through all the entries in the cache that haven't been Cache.Delete. The entries are returned in an arbitrary order. This is not a consistent snapshot of the cache: entries may be added or removed concurrently.
func (*Cache[K, V]) GC ¶
GC() deletes all the entries that were stored before the given deadline as of the time GC checks that entry. If another goroutine stores a value after GC checks the entry, it may not ever be deleted.
func (*Cache[K, V]) Get ¶
func (c *Cache[K, V]) Get(ctx context.Context, key K, ttl time.Duration) (v V, err error, loaded bool)
Get the value for a key, fetching it if expired or missing. If the ttl <= 0, the value will be fetched regardless of its freshness. The returned bool is true if the value was loaded from the cache and false if it was freshly fetched.
func (*Cache[K, V]) LiveEntries ¶
LiveEntries returns an iterator over all the entries in the cache that were stored after the given deadline.
type Entry ¶
type Entry[V any] struct { Val V Err error Stored time.Time // contains filtered or unexported fields }
Entry stored in a cache, usually seen via iterating through Cache.Entries.
type Map ¶
type Map[K comparable, V any] struct { // contains filtered or unexported fields }
Map[K,V] is like a Go map[K]V but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time. That is, it is as the stdlib's sync.Map but for a single pair of key and value types.
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate sync.Mutex or sync.RWMutex.
The zero Map is empty and ready for use. A Map must not be copied after first use.
In the terminology of the Go memory model, Map arranges that a write operation “synchronizes before” any read operation that observes the effect of the write, where read and write operations are defined as follows. Map.Load, Map.LoadAndDelete, Map.LoadOrStore, Map.Swap, Map.CompareAndSwap, and Map.CompareAndDelete are read operations; Map.Delete, Map.LoadAndDelete, Map.Store, and Map.Swap are write operations; Map.LoadOrStore is a write operation when it returns loaded set to false; Map.CompareAndSwap is a write operation when it returns swapped set to true; and Map.CompareAndDelete is a write operation when it returns deleted set to true.
func (*Map[K, V]) Clear ¶
func (m *Map[K, V]) Clear()
Clear deletes all the entries, resulting in an empty Map.
func (*Map[K, V]) CompareAndDelete ¶
CompareAndDelete deletes the entry for key if its value is equal to old, returning true if the entry was deleted.
func (*Map[K, V]) CompareAndSwap ¶
CompareAndSwap swaps the old and new values for key if the value stored in the map is equal to old. The old value must be of a comparable type.
func (*Map[K, V]) KeyValues ¶
KeyValues iterates over the key-value pairs in the map. KeyValues does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, KeyValues may reflect any mapping for that key from any point during the lifetime of the returned iterator.
func (*Map[K, V]) Keys ¶
Keys iterates over the key-value pairs in the map. Keys does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, Keys may reflect any mapping for that key from any point during the lifetime of the returned iterator.
func (*Map[K, V]) Load ¶
Load returns the value stored in the map for a key, or the zero value if no value is present. The ok result indicates whether value was found in the map.
func (*Map[K, V]) LoadAndDelete ¶
LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.
func (*Map[K, V]) LoadOrStore ¶
LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.
func (*Map[K, V]) Swap ¶
Swap swaps the value for a key and returns the previous value if any. The loaded result reports whether the key was present.
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
A Pool is a set of temporary objects that may be individually saved and retrieved (aka, a type-safe wapper for sync.Pool).
Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.
A Pool is safe for use by multiple goroutines simultaneously.
Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists. An appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to amortize allocation overhead across many clients.
An example of good use of a Pool is in the fmt package, which maintains a dynamically-sized store of temporary output buffers. The store scales under load (when many goroutines are actively printing) and shrinks when quiescent.
On the other hand, a free list maintained as part of a short-lived object is not a suitable use for a Pool, since the overhead does not amortize well in that scenario. It is more efficient to have such objects implement their own free list.
A Pool must not be copied after first use.
In the terminology of the Go memory model, a call to Put(x) “synchronizes before” a call to Pool.Get returning that same value x. Similarly, a call to New returning x “synchronizes before” a call to Get returning that same value x.
Example ¶
You can use a pool to reduce memory allocations when you need temporary buffers for I/O operations or formatting.
package main import ( "bytes" "gitlab.com/efronlicht/esync" ) func main() { const kib = 1024 pool := esync.NewPool(func() *bytes.Buffer { return bytes.NewBuffer(make([]byte, 0, 16*kib)) }) // new buffers will start pre-allocated with 16 KiB of capacity. // Get a buffer from the pool. buf := pool.Get() defer pool.Put(buf) // return the buffer to the pool when done. /* do computation with buf */ }
Output:
func NewPool ¶
NewPool creates a new Pool[T] with construction function newT. NewT should return a new instance of T.
func (*Pool[T]) Get ¶
func (p *Pool[T]) Get() *T
Get selects an arbitrary item from the Pool, removes it from the Pool, and returns it to the caller. Get may choose to ignore the pool and treat it as empty. Callers should not assume any relation between values passed to Pool.Put and the values returned by Get. If Get would otherwise return nil and p.New is non-nil, Get returns the result of calling p.New.