local

package
v0.0.0-...-ecd600c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// BlockDeviceBackedReferenceLocationRecordSize is the size of a
	// single serialized ReferenceLocationRecord in bytes. In
	// serialized form, a ReferenceLocationRecord contains the following
	// fields:
	//
	// - Epoch ID                     4 bytes
	// - SHA256_V1 flat reference    35 bytes
	// - Hash table probing attempt   1 bytes
	// - Object location              8 bytes
	// - Record checksum              8 bytes
	//                        Total: 56 bytes
	BlockDeviceBackedReferenceLocationRecordSize = 4 + object.SHA256V1FlatReferenceSizeBytes + 1 + 8 + 8
)

Variables

This section is empty.

Functions

func NewStore

func NewStore(
	lock *sync.RWMutex,
	referenceLocationMap lossymap.Map[object.FlatReference, uint64, EpochIDResolver],
	locationBlobMap LocationBlobMap,
	epochList EpochList,
) object.Store[object.FlatReference, struct{}]

NewStore creates an object store that uses locally connected disks as its backing store.

func NewStoreFromConfiguration

func NewStoreFromConfiguration(terminationGroup program.Group, configuration *configuration_pb.StoreConfiguration) (object.Store[object.FlatReference, struct{}], error)

NewStoreFromConfiguration creates a new local object store that uses the block devices and parameters specified in a Protobuf configuration message.

Types

type DataSyncer

type DataSyncer func() error

DataSyncer is a callback that PeriodicSyncer.ProcessLocationsChanged() invokes into to request that the contents of objects are synchronized to disk.

Synchronizing these is a requirements to ensure that the ReferenceLocationMap does not reference objects that are only partially written.

type EpochIDResolver

type EpochIDResolver interface {
	GetEpochStateForEpochID(epochID uint32) (EpochState, bool)
	GetCurrentEpochState() (EpochState, uint32)
}

EpochIDResolver is used by implementations of ReferenceLocationRecordArray to determine whether any entries it stores still point to valid data in the location-blob map.

type EpochList

type EpochList interface {
	EpochIDResolver

	// Indicate that a write of an object to the location-blob map
	// ending at a given location has completed, and that a
	// reference-location map entry is about to be written.
	FinalizeWriteUpToLocation(location uint64) error

	// Indicate that an object ending at a given location was read
	// from the location-blob map, but that its contents were
	// invalid. This means that either data corruption has occurred,
	// or that the object got overwritten while being read.
	DiscardUpToLocation(location uint64)
}

EpochList is used by the local object store to manage the creation and lifetime of epochs. Epochs are assigned to reference-location map entries and act as a way of ensuring consistency between the reference-location map and location-blob map, even if unclean shutdowns are performed.

func NewVolatileEpochList

func NewVolatileEpochList(maximumLocationSpan uint64, randomNumberGenerator random.SingleThreadedGenerator) EpochList

NewVolatileEpochList creates a simple EpochList that does not support any persistency. This is sufficient for setups where losing data upon restart is acceptable (e.g., worker level caches).

type EpochState

type EpochState struct {
	HashSeed        uint64
	MinimumLocation uint64
	MaximumLocation uint64
}

EpochState describes for a given state what the range of valid locations are. This allows implementations of ReferenceLocationRecordArray to suppress and overwrite entries that are no longer valid.

EpochState also contains a hash seed that may be used by ReferenceLocationRecordArray when computing hashes of entries. This is needed to ensure that entries belonging to previous unclean shutdowns are suppressed.

func (*EpochState) IsValidLocation

func (s *EpochState) IsValidLocation(location uint64, sizeBytes int) bool

IsValidLocation returns true if a given location and object size are valid within the current epoch. This is used to ensure that entries in the ReferenceLocationRecordArray are suppressed if they refer to locations on the block device that have in the meantime been overwritten.

type LocationBlobMap

type LocationBlobMap interface {
	Get(location uint64, sizeBytes int) ([]byte, error)
	Put([]byte) (uint64, error)
}

LocationBlobMap is a map of object location to its contents. Every time an object is stored, it is assigned a location. Locations are expected to increase monotonically. As storage space is only finite, implementations of LocationBlobMap are expected to behave like ring buffers. Once space is exhausted, objects with the lowest location values are expected to be overwritten.

func NewBlockDeviceBackedLocationBlobMap

func NewBlockDeviceBackedLocationBlobMap(blockDevice blockdevice.BlockDevice, sectorSizeBytes int, sectorCount int64, initialLocation uint64) LocationBlobMap

NewBlockDeviceBackedLocationBlobMap creates a location-blob map that is backed by a block device. This implementation is the one that is most suitable for production workloads.

func NewInMemoryLocationBlobMap

func NewInMemoryLocationBlobMap(sizeBytes int) LocationBlobMap

NewInMemoryLocationBlobMap creates a location-blob map that is backed by a simple fixed size byte slice that resides in memory. This may not be useful for most production worthy setups, as they need to store more data than fits in memory.

type PeriodicSyncer

type PeriodicSyncer struct {
	// contains filtered or unexported fields
}

PeriodicSyncer can be used to monitor PersistentEpochList for allocations for storing objects. When such events occur, the state of the PersistentEpochList is extracted and written to disk. This allows its contents to be recovered after a restart.

func NewPeriodicSyncer

func NewPeriodicSyncer(
	source PersistentStateSource,
	sourceLock *sync.RWMutex,
	store PersistentStateStore,
	clock clock.Clock,
	errorLogger util.ErrorLogger,
	errorRetryInterval,
	minimumEpochInterval time.Duration,
	referenceLocationMapHashInitialization uint64,
	dataSyncer DataSyncer,
) *PeriodicSyncer

NewPeriodicSyncer creates a new PeriodicSyncer according to the arguments provided.

func (*PeriodicSyncer) ProcessLocationsChanged

func (ps *PeriodicSyncer) ProcessLocationsChanged(ctx context.Context) bool

ProcessLocationsChanged waits for allocations of locations for storing object contents to be made against a PersistentEpochList. It causes data on the underlying block device to be synchronized after a certain amount of time, followed by updating the persistent state stored on disk.

This function must generally be called in a loop in a separate goroutine, so that the persistent state is updated continuously. The return value of this method denotes whether the caller must continue to call this method. When false, it indicates the provided context was cancelled, due to a shutdown being requested.

type PersistentEpochList

type PersistentEpochList struct {
	// contains filtered or unexported fields
}

func NewPersistentEpochList

func NewPersistentEpochList(
	maximumLocationSpan uint64,
	randomNumberGenerator random.SingleThreadedGenerator,
	minimumEpochID uint32,
	minimumLocation uint64,
	epochs []*pb.EpochState,
) *PersistentEpochList

func (*PersistentEpochList) DiscardUpToLocation

func (el *PersistentEpochList) DiscardUpToLocation(location uint64)

func (*PersistentEpochList) FinalizeWriteUpToLocation

func (el *PersistentEpochList) FinalizeWriteUpToLocation(location uint64) error

func (*PersistentEpochList) GetCurrentEpochState

func (el *PersistentEpochList) GetCurrentEpochState() (EpochState, uint32)

func (*PersistentEpochList) GetEpochStateForEpochID

func (el *PersistentEpochList) GetEpochStateForEpochID(epochID uint32) (EpochState, bool)

func (*PersistentEpochList) GetLocationsChangedWakeup

func (el *PersistentEpochList) GetLocationsChangedWakeup() <-chan struct{}

GetLocationsChangedWakeup returns a channel that triggers when there was data stored in the location-blob map since the last persistent state was written to disk.

func (*PersistentEpochList) GetPersistentState

func (el *PersistentEpochList) GetPersistentState() (uint32, uint64, []*pb.EpochState)

GetPersistentState returns information that needs to be persisted to disk to be able to restore the layout of the EpochList after a restart.

func (*PersistentEpochList) NotifySyncCompleted

func (el *PersistentEpochList) NotifySyncCompleted()

NotifySyncCompleted needs to be called right after the data on the storage medium underneath the LocationBlobMap is synchronized. This causes the next call to GetPersistentState() to return information on the newly synchronized data.

func (*PersistentEpochList) NotifySyncStarting

func (el *PersistentEpochList) NotifySyncStarting(isFinalSync bool)

NotifySyncStarting needs to be called right before the data on the storage medium underneath the LocationBlobMap is synchronized. This causes the epoch ID to be increased when the next blob is stored.

type PersistentStateSource

type PersistentStateSource interface {
	// GetLocationsChangedWakeup returns a channel that triggers if
	// allocations for storing object contents have been made
	// against an EpochList, or if data is being discarded. This can
	// be used by PeriodicSyncer to synchronize data to storage.
	// PeriodicSyncer may apply a short delay before actually
	// synchronize data to perform some batching.
	//
	// This function must be called while holding a read lock on the
	// EpochList.
	GetLocationsChangedWakeup() <-chan struct{}

	// NotifySyncStarting instructs the EpochList that
	// PeriodicSyncer is about to synchronize data to storage.
	// Successive writes to the EpochList should use a new epoch ID,
	// as there is no guarantee their data is synchronized as part
	// of the current epoch.
	//
	// This function must be called while holding a write lock on
	// the EpochList.
	NotifySyncStarting(isFinalSync bool)

	// NotifySyncCompleted instructs the EpochList that the
	// synchronization performed after the last call to
	// NotifySyncStarting was successful.
	//
	// Future calls to GetPersistentState may now return information
	// about epochs that were created before the previous
	// NotifySyncStarting call.
	//
	// Calling this function may cause the next channel returned by
	// GetLocationsChangedWakeup to block once again.
	//
	// This function must be called while holding a write lock on
	// the EpochList.
	NotifySyncCompleted()

	// GetPersistentState returns information about all epochs that
	// are managed by the EpochList and have been synchronized to
	// storage successfully.
	//
	// This function must be called while holding a read lock on the
	// EpochList.
	GetPersistentState() (minimumEpochID uint32, minimumLocation uint64, epochs []*pb.EpochState)
}

PersistentStateSource is used by PeriodicSyncer to determine whether the persistent state file needs to update, and if so which contents it needs to hold.

type PersistentStateStore

type PersistentStateStore interface {
	ReadPersistentState() (*pb.PersistentState, error)
	WritePersistentState(persistentState *pb.PersistentState) error
}

PersistentStateStore is used by PeriodicSyncer to write PersistentBlockList's state to disk. This state can be reloaded on startup to make it possible to access data that was written in the past.

func NewDirectoryBackedPersistentStateStore

func NewDirectoryBackedPersistentStateStore(directory filesystem.Directory) PersistentStateStore

NewDirectoryBackedPersistentStateStore creates a PersistentStateStore that writes PersistentState Protobuf messages to a file named "state" stored inside a filesystem.Directory.

type ReferenceLocationRecord

type ReferenceLocationRecord = lossymap.Record[object.FlatReference, uint64]

The reference-location map that is used by this storage backend is implemented as a lossy map. The type aliases below are provided for readability.

type ReferenceLocationRecordArray

type ReferenceLocationRecordArray = lossymap.RecordArray[object.FlatReference, uint64, EpochIDResolver]

The reference-location map that is used by this storage backend is implemented as a lossy map. The type aliases below are provided for readability.

func NewBlockDeviceBackedReferenceLocationRecordArray

func NewBlockDeviceBackedReferenceLocationRecordArray(device blockdevice.BlockDevice) ReferenceLocationRecordArray

NewBlockDeviceBackedReferenceLocationRecordArray creates a persistent ReferenceLocationRecordArray. It works by using a block device as an array-like structure, writing serialized ReferenceLocationRecords next to each other.

func NewInMemoryReferenceLocationRecordArray

func NewInMemoryReferenceLocationRecordArray(size int) ReferenceLocationRecordArray

NewInMemoryReferenceLocationRecordArray creates a ReferenceLocationRecordArray that stores its data in memory. This is sufficient for setups where persistency across restarts is not needed. Either because the data store is only used for local caching (e.g., on workers), or because storage nodes use mirroring and the loss of a single replica is tolerated.

type ReferenceLocationRecordKey

type ReferenceLocationRecordKey = lossymap.RecordKey[object.FlatReference]

The reference-location map that is used by this storage backend is implemented as a lossy map. The type aliases below are provided for readability.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL