esync

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: BSD-3-Clause Imports: 10 Imported by: 0

Documentation

Overview

Package esync provides synchronization primitives and utilities similar to the stdlib's sync package to help with concurrent and parallel programming.

It is extensively documented: see the many examples in the documentation for the package and its types.

Summary:

  • Use Map for a type and concurrency-safe map.
  • Use Cache to create a concurrent cache where the user can control invalidation.
  • Use ForkJoin and ForkJoin2 to parallelize work across multiple goroutines.
  • Adjust ForkMaxProcs to control the number of goroutines created by ForkJoin and ForkJoin2.

Iterators

Much of the code and documentation here is copied from the sync package, Copyright 2016 The Go Authors. Use of that source code is governed by a BSD-style license that can be found in the gosrc.license file included with this package.

fork.go contains functions for parallel computation via forking and joining goroutines.

map.go has the Map type and its methods.

pool.go has the Pool and BytePool types and associated methods.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ForkMaxProcs atomic.Int64

ForkMaxProcs is the maximum number of goroutines that can be run concurrently per call to a ForkJoinXXX function.

Defaults to runtime.GOMAXPROCS(0). Set this to an arbitrary high number to remove the limit. If you want to control concurrency more granularly, use the ForkJoinMaxWorkers and ForkJoin2MaxWorkers functions.

Functions

func ForkJoin

func ForkJoin[K, V any](ctx context.Context, f func(ctx context.Context, k K) V, keys ...K) []V

ForkJoin creates a goroutine for each key in keys, calling f with each key and returning the values in the same order as the keys. It uses up to MaxForkedParallelism goroutines.

See also:

Example (Fib)
package main

import (
	"context"
	"fmt"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	var fib func(context.Context, int) int
	fib = func(_ context.Context, n int) int {
		if n < 0 {
			return 0
		}
		switch n {
		case 0, 1:
			return 1
		default:
			return fib(context.TODO(), n-1) + fib(context.TODO(), n-2)
		}
	}
	esync.ForkJoin(context.TODO(), example.Fib, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
	for i := 0; i < 10; i++ {
		fmt.Printf("Fib(%d) = %d\n", i, example.Fib(context.TODO(), i))
	}
}
Output:

Fib(0) = 1
Fib(1) = 1
Fib(2) = 2
Fib(3) = 3
Fib(4) = 5
Fib(5) = 8
Fib(6) = 13
Fib(7) = 21
Fib(8) = 34
Fib(9) = 55

func ForkJoin2

func ForkJoin2[K, V, E any](ctx context.Context, f func(ctx context.Context, k K) (V, E), keys ...K) ([]V, []E)

ForkJoin2 creates a goroutine for each key in keys, calling f with each key and returning the values in the same order as the provided keys. It uses up to MaxForkedParallelism goroutines.

See also:

Example (GoDoc)
package main

import (
	"context"
	"fmt"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	keys := []string{"os", "sync", "fmt", "fakeKey"}
	// spin up a AZgoroutine for each key to fetch the documentation.
	vals, errs := esync.ForkJoin2(context.TODO(), example.GoDoc, keys...)
	const format = "%-10v %-10v %-5v\n"
	fmt.Printf(format, "key", "len", "error")
	fmt.Printf(format, "===", "===", "=====")
	for i := range keys {
		fmt.Printf(format, keys[i], len(vals[i]), errs[i] != nil)
	}
}
Output:

key        len        error
===        ===        =====
os         4604       false
sync       743        false
fmt        16333      false
fakeKey    0          true

func ForkJoin2MaxWorkers

func ForkJoin2MaxWorkers[K, V, E any](ctx context.Context, f func(ctx context.Context, k K) (V, E), maxWorkers int, keys ...K) ([]V, []E)

ForkJoin2 creates a goroutine for each key in keys, calling f with each key and returning the values in the same order as the provided keys.c It uses up to maxWorkers goroutines.

See also:

func ForkJoinMaxWorkers

func ForkJoinMaxWorkers[K, V any](ctx context.Context, f func(ctx context.Context, k K) V, maxWorkers int, keys ...K) []V

ForkJoinMaxWorkers is as ForkJoin, but you control the maximum number of workers, rather than inferring it from ForkMaxProcs. See also:

Types

type BytePool

type BytePool struct {
	// contains filtered or unexported fields
}

BytePool is a specialized Pool for []byte, which comes up often enough to warrant special treatment.

func NewBytePool

func NewBytePool(size int) *BytePool

NewBytePool creates a new BytePool which allocates []byte slices starting at the given size.

func (*BytePool) Get

func (bp *BytePool) Get() []byte

Get a byte slice, possibly reusing a previously allocated slice. The slice will always have length 0, but has an unspecified capacity. The caller should append to the slice if they want to use it. The slice may be returned to the pool with [Put].

func (*BytePool) Put

func (bp *BytePool) Put(b []byte)

Return a byte slice to the pool, making it available for re-use.

type Cache

type Cache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Concurrent in-memory cache. Use this type to store the results of some expensive computation.

Basic Usage

  • NewCache creates a new cache.
  • Cache.Get(ctx, key, ttl) will return a cached value if it exists and is not expired; otherwise, it will fetch a new value.
  • Cache.Check(key) will return the value without fetching a new one.

Memory usage

	By default, this cache only grows and never shrinks.
  - Use [Cache.StartGC] to start a goroutine that will periodically call [Cache.GC] to delete old entries.
  - Use [Cache.Delete] to delete a single entry, or [Cache.Clear] to delete all entries.
Example (Check)
package main

import (
	"context"
	"fmt"
	"math"

	"gitlab.com/efronlicht/esync"
)

func main() {
	realsqrt := func(_ context.Context, x float64) (float64, error) {
		if x < 0 || math.IsNaN(x) {
			return 0, fmt.Errorf("expected x >= 0 and not NaN, got %v", x)
		}
		return math.Sqrt(x), nil
	}

	cache := esync.NewCache(realsqrt)
	checkAndPrint := func(k float64) {
		val, err, _, loaded := cache.Check(k)

		if err != nil {
			fmt.Printf("k=%v: error: %v: loaded: %v\n", k, err, loaded)
			return
		} else {
			fmt.Printf("k=%v: ok: %.3f: loaded: %v\n", k, val, loaded)
		}
	}
	checkAndPrint(2)              // no value yet.
	cache.Warm(context.TODO(), 2) // warm the cache.

	checkAndPrint(2) // now we have the value.

	checkAndPrint(-1)              // this will not fetch the value.
	cache.Warm(context.TODO(), -1) // warm the cache - this should give an error
	checkAndPrint(-1)              // now we have the value.

}
Output:

k=2: ok: 0.000: loaded: false
k=2: ok: 1.414: loaded: true
k=-1: ok: 0.000: loaded: false
k=-1: error: expected x >= 0 and not NaN, got -1: loaded: true
Example (Clear)
package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	c := esync.NewCache(example.GoDoc)
	_, _, _ = c.Get(context.TODO(), "os", 5*time.Second) // this will fetch the value.
	_, _, _ = c.Get(context.TODO(), "sync", 5*time.Second)
	printWarm := func(key string) {
		_, _, _, loaded := c.Check(key)
		fmt.Printf("cache warm for %s: %v\n", key, loaded)
	}
	printWarm("os")
	printWarm("sync")
	fmt.Println("clearing cache")
	c.Clear()
	printWarm("os")
	printWarm("sync")

}
Output:

cache warm for os: true
cache warm for sync: true
clearing cache
cache warm for os: false
cache warm for sync: false
Example (Delete)
package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	c := esync.NewCache(example.GoDoc)
	_, _, _ = c.Get(context.TODO(), "os", 5*time.Second) // this will fetch the value.
	_, _, loaded := c.Get(context.TODO(), "os", 5*time.Second)
	fmt.Printf("before delete: loaded=%v\n", loaded) // true
	c.Delete("os")
	_, _, loaded = c.Get(context.TODO(), "os", 5*time.Second)
	fmt.Printf("after delete: loaded=%v\n", loaded) // false
}
Output:

before delete: loaded=true
after delete: loaded=false
Example (Entries)

Iterate through all the entries in the cache.

package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	c := esync.NewCache(example.GoDoc)
	c.Warm(context.TODO(), "os", "sync")
	t0 := time.Now()
	time.Sleep(50 * time.Millisecond)
	c.Warm(context.TODO(), "sync", "fmt")
	for k, entry := range c.Entries() {
		fmt.Printf("%s: expired: %v\n", k, entry.Stored.Before(t0))
	}
}
Output:

sync: expired: false
os: expired: true
fmt: expired: false
Example (Get)

Fetch and cache documentation from pkg.go.dev and cache it.

package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	cache := esync.NewCache(example.GoDoc)

	printGet := func(key string, ttl time.Duration) {
		_, err, loaded := cache.Get(context.TODO(), key, ttl)
		fmt.Printf("key=%s, (err != nil) = %v, loaded=%v\n", key, err != nil, loaded)
	}

	printGet("sync", 5*time.Second) // the first call will fetch the documentation.
	printGet("sync", 5*time.Second) // the cache entry is still fresh, so this will return the documentation from the cache.
	time.Sleep(50 * time.Millisecond)
	// ask for a value fresher than the cache entry.
	printGet("sync", 25*time.Millisecond) // this will fetch the documentation again.

	// ask for a package we know can't be found.
	printGet("efronlicht/this-package-does-not-exist", 5*time.Second) // this will fail.

	// we also cache errors.
	printGet("efronlicht/this-package-does-not-exist", 5*time.Second) // this will return the error from the cache.
}
Output:

key=sync, (err != nil) = false, loaded=false
key=sync, (err != nil) = false, loaded=true
key=sync, (err != nil) = false, loaded=false
key=efronlicht/this-package-does-not-exist, (err != nil) = true, loaded=false
key=efronlicht/this-package-does-not-exist, (err != nil) = true, loaded=true
Example (LiveEntries)
package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	c := esync.NewCache(example.GoDoc)
	c.Warm(context.TODO(), "os", "sync")
	t0 := time.Now()
	time.Sleep(50 * time.Millisecond)
	c.Warm(context.TODO(), "sync", "fmt")
	for k := range c.LiveEntries(t0) {
		fmt.Printf("%s\n", k)
	}
}
Output:

fmt
sync
Example (StartGC)
package main

import (
	"context"
	"fmt"
	"time"

	"gitlab.com/efronlicht/esync"
	"gitlab.com/efronlicht/esync/example"
)

func main() {
	cache := esync.NewCache(example.GoDoc)
	ticker := time.NewTicker(20 * time.Millisecond)
	ctx, cancel := context.WithCancel(context.Background())
	const ttl = 100 * time.Millisecond
	go cache.StartGC(ctx, ticker.C, 100*time.Millisecond)
	check := func(key string) {
		_, _, _, loaded := cache.Check(key)
		if loaded {
			fmt.Println("hit")
		} else {
			fmt.Println("miss")
		}
	}
	check("os") // never fetched, so it should not be in the cache.
	cache.Warm(context.TODO(), "os")
	check("os") // os should be in the cache.
	time.Sleep(300 * time.Millisecond)
	check("os")                        // os should have been removed from the cache.
	cancel()                           // stop the gc.
	cache.Warm(context.TODO(), "os")   // fetch the value again.
	check("os")                        // os should be in the cache.
	time.Sleep(300 * time.Millisecond) //
	check("os")                        //  gc has stopped: os should still be in the cache

}
Output:

miss
hit
miss
hit
hit

func NewCache

func NewCache[K comparable, V any](fetch func(context.Context, K) (V, error)) *Cache[K, V]

NewCache creates a new in-memory Cache that fetches values with the given function.

Example (FetchDocumentationViaHTTP)

create a cache for documentation on go.pkg.dev.

package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"strings"

	"gitlab.com/efronlicht/esync"
)

func main() {
	cache := esync.NewCache(func(ctx context.Context, path string) (string, error) {
		r, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://pkg.go.dev/"+path, nil)
		if err != nil {
			return "", err
		}
		resp, err := http.DefaultClient.Do(r)
		if err != nil {
			return "", fmt.Errorf("GET %s: %w", r.URL, err)
		}
		defer resp.Body.Close()
		if resp.StatusCode != http.StatusOK {
			return "", fmt.Errorf("GET %s: %s", r.URL, resp.Status)
		}
		var buf strings.Builder
		if n, err := io.Copy(&buf, resp.Body); err != nil {
			return "", fmt.Errorf("copying response: error after %d bytes: %w: partial body: %s", n, err, buf.String())
		}
		return buf.String(), nil
	})
	_ = cache
}
Output:

func (*Cache[K, V]) Check

func (c *Cache[K, V]) Check(key K) (val V, err error, stored time.Time, loaded bool)

Check the cache without fetching a new value.

func (*Cache[K, V]) Clear

func (c *Cache[K, V]) Clear()

Clear deletes all the entries, resulting in an empty Cache.

See also: - Cache.Delete to delete a single entry. - Cache.GC to delete entries that were stored before a given deadline.

func (*Cache[K, V]) Delete

func (c *Cache[K, V]) Delete(key K)

Delete the value for a key.

See also: - Cache.Clear to delete all the entries. - Cache.GC to delete entries that were stored before a given deadline. - Cache.StartGC to start a goroutine that periodically calls [GC].

func (*Cache[K, V]) Entries

func (c *Cache[K, V]) Entries() iter.Seq2[K, *Entry[V]]

Iterate through all the entries in the cache that haven't been Cache.Delete. The entries are returned in an arbitrary order. This is not a consistent snapshot of the cache: entries may be added or removed concurrently.

func (*Cache[K, V]) GC

func (c *Cache[K, V]) GC(deadline time.Time)

GC() deletes all the entries that were stored before the given deadline as of the time GC checks that entry. If another goroutine stores a value after GC checks the entry, it may not ever be deleted.

func (*Cache[K, V]) Get

func (c *Cache[K, V]) Get(ctx context.Context, key K, ttl time.Duration) (v V, err error, loaded bool)

Get the value for a key, fetching it if expired or missing. If the ttl <= 0, the value will be fetched regardless of its freshness. The returned bool is true if the value was loaded from the cache and false if it was freshly fetched.

func (*Cache[K, V]) LiveEntries

func (c *Cache[K, V]) LiveEntries(deadline time.Time) iter.Seq2[K, *Entry[V]]

LiveEntries returns an iterator over all the entries in the cache that were stored after the given deadline.

func (*Cache[K, V]) StartGC

func (c *Cache[K, V]) StartGC(ctx context.Context, timing <-chan time.Time, ttl time.Duration)

Start a goroutine that will periodically call Cache.GC at the given interval. The goroutine will exit when the context is canceled. You should almost always call this function with 'go' to avoid blocking the caller.

func (*Cache[K, V]) Warm

func (c *Cache[K, V]) Warm(ctx context.Context, keys ...K)

Warm the cache for the given keys by fetching them concurrently, regardless of their freshness.

type Entry

type Entry[V any] struct {
	Val    V
	Err    error
	Stored time.Time
	// contains filtered or unexported fields
}

Entry stored in a cache, usually seen via iterating through Cache.Entries.

type Map

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Map[K,V] is like a Go map[K]V but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time. That is, it is as the stdlib's sync.Map but for a single pair of key and value types.

The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate sync.Mutex or sync.RWMutex.

The zero Map is empty and ready for use. A Map must not be copied after first use.

In the terminology of the Go memory model, Map arranges that a write operation “synchronizes before” any read operation that observes the effect of the write, where read and write operations are defined as follows. Map.Load, Map.LoadAndDelete, Map.LoadOrStore, Map.Swap, Map.CompareAndSwap, and Map.CompareAndDelete are read operations; Map.Delete, Map.LoadAndDelete, Map.Store, and Map.Swap are write operations; Map.LoadOrStore is a write operation when it returns loaded set to false; Map.CompareAndSwap is a write operation when it returns swapped set to true; and Map.CompareAndDelete is a write operation when it returns deleted set to true.

func (*Map[K, V]) Clear

func (m *Map[K, V]) Clear()

Clear deletes all the entries, resulting in an empty Map.

func (*Map[K, V]) CompareAndDelete

func (m *Map[K, V]) CompareAndDelete(key K, value V) (deleted bool)

CompareAndDelete deletes the entry for key if its value is equal to old, returning true if the entry was deleted.

func (*Map[K, V]) CompareAndSwap

func (m *Map[K, V]) CompareAndSwap(key K, old, new V) (swapped bool)

CompareAndSwap swaps the old and new values for key if the value stored in the map is equal to old. The old value must be of a comparable type.

func (*Map[K, V]) Delete

func (m *Map[K, V]) Delete(key K)

Delete the value for a key.

func (*Map[K, V]) KeyValues

func (m *Map[K, V]) KeyValues() iter.Seq2[K, V]

KeyValues iterates over the key-value pairs in the map. KeyValues does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, KeyValues may reflect any mapping for that key from any point during the lifetime of the returned iterator.

func (*Map[K, V]) Keys

func (m *Map[K, V]) Keys() iter.Seq[K]

Keys iterates over the key-value pairs in the map. Keys does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, Keys may reflect any mapping for that key from any point during the lifetime of the returned iterator.

func (*Map[K, V]) Load

func (m *Map[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored in the map for a key, or the zero value if no value is present. The ok result indicates whether value was found in the map.

func (*Map[K, V]) LoadAndDelete

func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*Map[K, V]) LoadOrStore

func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*Map[K, V]) Range

func (m *Map[K, V]) Range(f func(key K, value V) bool)

func (*Map[K, V]) Store

func (m *Map[K, V]) Store(key K, value V)

Store sets the value for a key.

func (*Map[K, V]) Swap

func (m *Map[K, V]) Swap(key K, value V) (prev V, loaded bool)

Swap swaps the value for a key and returns the previous value if any. The loaded result reports whether the key was present.

func (*Map[K, V]) Values

func (m *Map[K, V]) Values() iter.Seq[V]

Values iterates over the key-value pairs in the map. Values does not necessarily correspond to any consistent snapshot of the Map's contents: Values may reflect any mapping for that key from any point during the lifetime of the returned iterator.

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

A Pool is a set of temporary objects that may be individually saved and retrieved (aka, a type-safe wapper for sync.Pool).

Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.

A Pool is safe for use by multiple goroutines simultaneously.

Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists. An appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to amortize allocation overhead across many clients.

An example of good use of a Pool is in the fmt package, which maintains a dynamically-sized store of temporary output buffers. The store scales under load (when many goroutines are actively printing) and shrinks when quiescent.

On the other hand, a free list maintained as part of a short-lived object is not a suitable use for a Pool, since the overhead does not amortize well in that scenario. It is more efficient to have such objects implement their own free list.

A Pool must not be copied after first use.

In the terminology of the Go memory model, a call to Put(x) “synchronizes before” a call to Pool.Get returning that same value x. Similarly, a call to New returning x “synchronizes before” a call to Get returning that same value x.

Example

You can use a pool to reduce memory allocations when you need temporary buffers for I/O operations or formatting.

package main

import (
	"bytes"

	"gitlab.com/efronlicht/esync"
)

func main() {
	const kib = 1024
	pool := esync.NewPool(func() *bytes.Buffer { return bytes.NewBuffer(make([]byte, 0, 16*kib)) }) // new buffers will start pre-allocated with 16 KiB of capacity.

	// Get a buffer from the pool.
	buf := pool.Get()
	defer pool.Put(buf) // return the buffer to the pool when done.
	/* do computation with buf */
}
Output:

func NewPool

func NewPool[T any](newT func() *T) *Pool[T]

NewPool creates a new Pool[T] with construction function newT. NewT should return a new instance of T.

func (*Pool[T]) Get

func (p *Pool[T]) Get() *T

Get selects an arbitrary item from the Pool, removes it from the Pool, and returns it to the caller. Get may choose to ignore the pool and treat it as empty. Callers should not assume any relation between values passed to Pool.Put and the values returned by Get. If Get would otherwise return nil and p.New is non-nil, Get returns the result of calling p.New.

func (*Pool[T]) Put

func (p *Pool[T]) Put(x *T)

Put adds x to the pool. Make sure x is not used after calling Put.

Directories

Path Synopsis
Package example contains utility functions for the examples in gitlab.com/efronlicht/esync's documentation.
Package example contains utility functions for the examples in gitlab.com/efronlicht/esync's documentation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL