Documentation
¶
Index ¶
- Constants
- Variables
- func DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)
- func GenerateFsID(name string) string
- func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
- func GetLogger() *logger
- func GetPrivateIpAddr() (string, error)
- func GetPublicIpAddr() (string, error)
- func InitLogger(debugMode bool, prettyLogs bool)
- func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error)
- func SHA1StringToUint64(hash string) (uint64, error)
- func ToSlice(v interface{}) []interface{}
- func ToStruct(m map[string]string, out interface{}) error
- func WithClientName(name string) func(*redis.UniversalOptions)
- type BlobCacheClient
- func (c *BlobCacheClient) Cleanup() error
- func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64, opts struct{ ... }) ([]byte, error)
- func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64, opts struct{ ... }) (chan []byte, error)
- func (c *BlobCacheClient) GetNearbyHosts() ([]*BlobCacheHost, error)
- func (c *BlobCacheClient) GetState() error
- func (c *BlobCacheClient) HostsAvailable() bool
- func (c *BlobCacheClient) IsCachedNearby(hash string) (bool, error)
- func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, path string) bool
- func (c *BlobCacheClient) StoreContent(chunks chan []byte, hash string, opts struct{ ... }) (string, error)
- func (c *BlobCacheClient) StoreContentFromFUSE(source struct{ ... }, opts struct{ ... }) (string, error)
- func (c *BlobCacheClient) StoreContentFromS3(source struct{ ... }, opts struct{ ... }) (string, error)
- func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error
- type BlobCacheClientConfig
- type BlobCacheConfig
- type BlobCacheGlobalConfig
- type BlobCacheHost
- type BlobCacheMetadata
- func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, locality string, ...) ([]*BlobCacheHost, error)
- func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (m *BlobCacheMetadata) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error
- func (m *BlobCacheMetadata) RemoveFsNode(ctx context.Context, id string) error
- func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error
- func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type BlobCacheMetadataMode
- type BlobCacheServerConfig
- type BlobCacheServerMode
- type BlobFs
- type BlobFsConfig
- type BlobFsMetadata
- type BlobFsNode
- type BlobFsSystemOpts
- type CacheService
- func (cs *CacheService) AddFsNodeChild(ctx context.Context, req *proto.AddFsNodeChildRequest) (*proto.AddFsNodeChildResponse, error)
- func (cs *CacheService) AddHostToIndex(ctx context.Context, req *proto.AddHostToIndexRequest) (*proto.AddHostToIndexResponse, error)
- func (cs *CacheService) GetAvailableHosts(ctx context.Context, req *proto.GetAvailableHostsRequest) (*proto.GetAvailableHostsResponse, error)
- func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
- func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error
- func (cs *CacheService) GetFsNode(ctx context.Context, req *proto.GetFsNodeRequest) (*proto.GetFsNodeResponse, error)
- func (cs *CacheService) GetFsNodeChildren(ctx context.Context, req *proto.GetFsNodeChildrenRequest) (*proto.GetFsNodeChildrenResponse, error)
- func (cs *CacheService) GetRegionConfig(ctx context.Context, req *proto.GetRegionConfigRequest) (*proto.GetRegionConfigResponse, error)
- func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
- func (cs *CacheService) HasContent(ctx context.Context, req *proto.HasContentRequest) (*proto.HasContentResponse, error)
- func (cs *CacheService) HostKeepAlive()
- func (cs *CacheService) RefreshStoreFromContentLock(ctx context.Context, req *proto.RefreshStoreFromContentLockRequest) (*proto.RefreshStoreFromContentLockResponse, error)
- func (cs *CacheService) RemoveClientLock(ctx context.Context, req *proto.RemoveClientLockRequest) (*proto.RemoveClientLockResponse, error)
- func (cs *CacheService) RemoveFsNode(ctx context.Context, req *proto.RemoveFsNodeRequest) (*proto.RemoveFsNodeResponse, error)
- func (cs *CacheService) RemoveFsNodeChild(ctx context.Context, req *proto.RemoveFsNodeChildRequest) (*proto.RemoveFsNodeChildResponse, error)
- func (cs *CacheService) RemoveStoreFromContentLock(ctx context.Context, req *proto.RemoveStoreFromContentLockRequest) (*proto.RemoveStoreFromContentLockResponse, error)
- func (cs *CacheService) SetClientLock(ctx context.Context, req *proto.SetClientLockRequest) (*proto.SetClientLockResponse, error)
- func (cs *CacheService) SetFsNode(ctx context.Context, req *proto.SetFsNodeRequest) (*proto.SetFsNodeResponse, error)
- func (cs *CacheService) SetHostKeepAlive(ctx context.Context, req *proto.SetHostKeepAliveRequest) (*proto.SetHostKeepAliveResponse, error)
- func (cs *CacheService) SetStoreFromContentLock(ctx context.Context, req *proto.SetStoreFromContentLockRequest) (*proto.SetStoreFromContentLockResponse, error)
- func (cs *CacheService) StartServer(port uint) error
- func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
- func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error)
- func (cs *CacheService) StoreContentFromSourceWithLock(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceWithLockResponse, error)
- func (cs *CacheService) StoreContentInBlobFs(ctx context.Context, path string, hash string, size uint64) error
- type CacheServiceOpts
- type ClientOptions
- type ClientRequest
- type ClientRequestType
- type ConfigFormat
- type ConfigLoaderFunc
- type ConfigManager
- type ContentAddressableStorage
- func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, content []byte) error
- func (cas *ContentAddressableStorage) Cleanup()
- func (cas *ContentAddressableStorage) Exists(hash string) bool
- func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) (int64, error)
- type CoordinatorClient
- type CoordinatorClientLocal
- func (c *CoordinatorClientLocal) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientLocal) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientLocal) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (c *CoordinatorClientLocal) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (c *CoordinatorClientLocal) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (c *CoordinatorClientLocal) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
- func (c *CoordinatorClientLocal) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientLocal) RemoveClientLock(ctx context.Context, hash string, host string) error
- func (c *CoordinatorClientLocal) RemoveFsNode(ctx context.Context, id string) error
- func (c *CoordinatorClientLocal) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientLocal) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientLocal) SetClientLock(ctx context.Context, hash string, host string) error
- func (c *CoordinatorClientLocal) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (c *CoordinatorClientLocal) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientLocal) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type CoordinatorClientRemote
- func (c *CoordinatorClientRemote) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientRemote) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientRemote) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (c *CoordinatorClientRemote) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (c *CoordinatorClientRemote) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (c *CoordinatorClientRemote) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
- func (c *CoordinatorClientRemote) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientRemote) RemoveClientLock(ctx context.Context, hash string, hostId string) error
- func (c *CoordinatorClientRemote) RemoveFsNode(ctx context.Context, id string) error
- func (c *CoordinatorClientRemote) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientRemote) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientRemote) SetClientLock(ctx context.Context, hash string, hostId string) error
- func (c *CoordinatorClientRemote) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (c *CoordinatorClientRemote) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientRemote) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type DiscoveryClient
- type ErrNodeNotFound
- type FSNode
- func (n *FSNode) Create(ctx context.Context, name string, flags uint32, mode uint32, ...) (inode *fs.Inode, fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)
- func (n *FSNode) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno
- func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)
- func (n *FSNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)
- func (n *FSNode) OnAdd(ctx context.Context)
- func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)
- func (n *FSNode) Opendir(ctx context.Context) syscall.Errno
- func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno)
- func (n *FSNode) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno)
- func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno)
- func (n *FSNode) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbedder, ...) syscall.Errno
- func (n *FSNode) Rmdir(ctx context.Context, name string) syscall.Errno
- func (n *FSNode) Unlink(ctx context.Context, name string) syscall.Errno
- type FileSystem
- type FileSystemOpts
- type FileSystemStorage
- type HostMap
- func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
- func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)
- func (hm *HostMap) Get(hostId string) *BlobCacheHost
- func (hm *HostMap) GetAll() []*BlobCacheHost
- func (hm *HostMap) Members() mapset.Set[string]
- func (hm *HostMap) Remove(host *BlobCacheHost)
- func (hm *HostMap) Set(host *BlobCacheHost)
- type JuiceFSConfig
- type JuiceFsSource
- type MetadataConfig
- type MountPointConfig
- type MountPointSource
- type ParserFunc
- type RedisClient
- func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
- func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
- func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
- func (r *RedisClient) ToSlice(v interface{}) []interface{}
- func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error
- type RedisConfig
- type RedisLock
- type RedisLockOption
- type RedisLockOptions
- type RedisMode
- type RegionConfig
- type RendezvousHasher
- type S3Client
- type S3SourceConfig
- type Source
- type SourceConfig
- type StorageLayer
- type ValkeyConfig
- type ValkeyExistingPrimary
Constants ¶
const ( SourceModeJuiceFS string = "juicefs" SourceModeMountPoint string = "mountpoint" )
const ( BlobCacheHostPrefix string = "blobcache-host" BlobCacheVersion string = "dev" )
Variables ¶
var ( ErrHostNotFound = errors.New("host not found") ErrUnableToReachHost = errors.New("unable to reach host") ErrInvalidHostVersion = errors.New("invalid host version") ErrContentNotFound = errors.New("content not found") ErrClientNotFound = errors.New("client not found") ErrCacheLockHeld = errors.New("cache lock held") ErrUnableToPopulateContent = errors.New("unable to populate content from original source") ErrBlobFsMountFailure = errors.New("failed to mount blobfs") ErrUnableToAcquireLock = errors.New("unable to acquire lock") )
var ( ErrChannelClosed = errors.New("redis: channel closed") ErrConnectionIssue = errors.New("redis: connection issue") ErrUnknownRedisMode = errors.New("redis: unknown mode") )
var (
Logger *logger
)
var MetadataKeys = &metadataKeys{}
Functions ¶
func GenerateFsID ¶
Generates a directory ID based on parent ID and name.
func GetConfigParser ¶
func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
func GetPrivateIpAddr ¶
func GetPublicIpAddr ¶
func InitLogger ¶
func SHA1StringToUint64 ¶
SHA1StringToUint64 converts the first 8 bytes of a SHA-1 hash string to a uint64
func ToSlice ¶
func ToSlice(v interface{}) []interface{}
Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.
func ToStruct ¶
Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.
func WithClientName ¶
func WithClientName(name string) func(*redis.UniversalOptions)
Types ¶
type BlobCacheClient ¶
type BlobCacheClient struct {
// contains filtered or unexported fields
}
func NewBlobCacheClient ¶
func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)
func (*BlobCacheClient) Cleanup ¶
func (c *BlobCacheClient) Cleanup() error
func (*BlobCacheClient) GetContent ¶
func (*BlobCacheClient) GetContentStream ¶
func (*BlobCacheClient) GetNearbyHosts ¶
func (c *BlobCacheClient) GetNearbyHosts() ([]*BlobCacheHost, error)
func (*BlobCacheClient) GetState ¶
func (c *BlobCacheClient) GetState() error
func (*BlobCacheClient) HostsAvailable ¶
func (c *BlobCacheClient) HostsAvailable() bool
func (*BlobCacheClient) IsCachedNearby ¶
func (c *BlobCacheClient) IsCachedNearby(hash string) (bool, error)
func (*BlobCacheClient) IsPathCachedNearby ¶
func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, path string) bool
func (*BlobCacheClient) StoreContent ¶
func (*BlobCacheClient) StoreContentFromFUSE ¶
func (*BlobCacheClient) StoreContentFromS3 ¶
func (*BlobCacheClient) WaitForHosts ¶
func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error
type BlobCacheClientConfig ¶
type BlobCacheClientConfig struct { Token string `key:"token" json:"token"` MinRetryLengthBytes int64 `key:"minRetryLengthBytes" json:"min_retry_length_bytes"` MaxGetContentAttempts int `key:"maxGetContentAttempts" json:"max_get_content_attempts"` NTopHosts int `key:"nTopHosts" json:"n_top_hosts"` BlobFs BlobFsConfig `key:"blobfs" json:"blobfs"` }
type BlobCacheConfig ¶
type BlobCacheConfig struct { Server BlobCacheServerConfig `key:"server" json:"server"` Client BlobCacheClientConfig `key:"client" json:"client"` Global BlobCacheGlobalConfig `key:"global" json:"global"` }
type BlobCacheGlobalConfig ¶
type BlobCacheGlobalConfig struct { DefaultLocality string `key:"defaultLocality" json:"default_locality"` CoordinatorHost string `key:"coordinatorHost" json:"coordinator_host"` ServerPort uint `key:"serverPort" json:"server_port"` DiscoveryIntervalS int `key:"discoveryIntervalS" json:"discovery_interval_s"` RoundTripThresholdMilliseconds uint `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"` HostStorageCapacityThresholdPct float64 `key:"hostStorageCapacityThresholdPct" json:"host_storage_capacity_threshold_pct"` GRPCDialTimeoutS int `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"` GRPCMessageSizeBytes int `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"` DebugMode bool `key:"debugMode" json:"debug_mode"` PrettyLogs bool `key:"prettyLogs" json:"pretty_logs"` }
func (*BlobCacheGlobalConfig) GetLocality ¶
func (c *BlobCacheGlobalConfig) GetLocality() string
type BlobCacheHost ¶
type BlobCacheHost struct { RTT time.Duration `redis:"rtt" json:"rtt"` HostId string `redis:"host_id" json:"host_id"` Addr string `redis:"addr" json:"addr"` PrivateAddr string `redis:"private_addr" json:"private_addr"` CapacityUsagePct float64 `redis:"capacity_usage_pct" json:"capacity_usage_pct"` }
func (*BlobCacheHost) Bytes ¶
func (h *BlobCacheHost) Bytes() []byte
Bytes is needed for the rendezvous hasher
func (*BlobCacheHost) ToProto ¶
func (h *BlobCacheHost) ToProto() *proto.BlobCacheHost
type BlobCacheMetadata ¶
type BlobCacheMetadata struct {
// contains filtered or unexported fields
}
func NewBlobCacheMetadata ¶
func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)
func (*BlobCacheMetadata) AddFsNodeChild ¶
func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*BlobCacheMetadata) AddHostToIndex ¶
func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*BlobCacheMetadata) GetAvailableHosts ¶
func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, locality string, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error)
func (*BlobCacheMetadata) GetFsNode ¶
func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*BlobCacheMetadata) GetFsNodeChildren ¶
func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*BlobCacheMetadata) GetHostIndex ¶
func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*BlobCacheMetadata) RefreshStoreFromContentLock ¶
func (*BlobCacheMetadata) RemoveClientLock ¶
func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error
func (*BlobCacheMetadata) RemoveFsNode ¶
func (m *BlobCacheMetadata) RemoveFsNode(ctx context.Context, id string) error
func (*BlobCacheMetadata) RemoveFsNodeChild ¶
func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*BlobCacheMetadata) RemoveHostFromIndex ¶
func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*BlobCacheMetadata) RemoveStoreFromContentLock ¶
func (*BlobCacheMetadata) SetClientLock ¶
func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error
func (*BlobCacheMetadata) SetFsNode ¶
func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*BlobCacheMetadata) SetHostKeepAlive ¶
func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*BlobCacheMetadata) SetStoreFromContentLock ¶
type BlobCacheMetadataMode ¶
type BlobCacheMetadataMode string
const ( BlobCacheMetadataModeDefault BlobCacheMetadataMode = "default" BlobCacheMetadataModeLocal BlobCacheMetadataMode = "local" )
type BlobCacheServerConfig ¶
type BlobCacheServerConfig struct { Mode BlobCacheServerMode `key:"mode" json:"mode"` DiskCacheDir string `key:"diskCacheDir" json:"disk_cache_dir"` DiskCacheMaxUsagePct float64 `key:"diskCacheMaxUsagePct" json:"disk_cache_max_usage_pct"` Token string `key:"token" json:"token"` PrettyLogs bool `key:"prettyLogs" json:"pretty_logs"` ObjectTtlS int `key:"objectTtlS" json:"object_ttl_s"` MaxCachePct int64 `key:"maxCachePct" json:"max_cache_pct"` PageSizeBytes int64 `key:"pageSizeBytes" json:"page_size_bytes"` Metadata MetadataConfig `key:"metadata" json:"metadata"` Sources []SourceConfig `key:"sources" json:"sources"` // Allows a coordinator to override a slave server's config for a specific locality/region Regions map[string]RegionConfig `key:"regions" json:"regions"` }
func BlobCacheServerConfigFromProto ¶
func BlobCacheServerConfigFromProto(protoConfig *proto.BlobCacheServerConfig) BlobCacheServerConfig
func (*BlobCacheServerConfig) ToProto ¶
func (c *BlobCacheServerConfig) ToProto() *proto.BlobCacheServerConfig
type BlobCacheServerMode ¶
type BlobCacheServerMode string
const ( BlobCacheServerModeCoordinator BlobCacheServerMode = "coordinator" BlobCacheServerModeSlave BlobCacheServerMode = "slave" )
type BlobFs ¶
type BlobFs struct { CoordinatorClient CoordinatorClient Client *BlobCacheClient Config BlobCacheClientConfig // contains filtered or unexported fields }
func NewFileSystem ¶
func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error)
NewFileSystem initializes a new BlobFs with root metadata.
type BlobFsConfig ¶
type BlobFsConfig struct { Enabled bool `key:"enabled" json:"enabled"` MountPoint string `key:"mountPoint" json:"mount_point"` MaxBackgroundTasks int `key:"maxBackgroundTasks" json:"max_background_tasks"` MaxWriteKB int `key:"maxWriteKB" json:"max_write_kb"` MaxReadAheadKB int `key:"maxReadAheadKB" json:"max_read_ahead_kb"` DirectMount bool `key:"directMount" json:"direct_mount"` DirectIO bool `key:"directIO" json:"direct_io"` Options []string `key:"options" json:"options"` }
type BlobFsMetadata ¶
type BlobFsMetadata struct { PID string `redis:"pid" json:"pid"` ID string `redis:"id" json:"id"` Name string `redis:"name" json:"name"` Path string `redis:"path" json:"path"` Hash string `redis:"hash" json:"hash"` Ino uint64 `redis:"ino" json:"ino"` Size uint64 `redis:"size" json:"size"` Blocks uint64 `redis:"blocks" json:"blocks"` Atime uint64 `redis:"atime" json:"atime"` Mtime uint64 `redis:"mtime" json:"mtime"` Ctime uint64 `redis:"ctime" json:"ctime"` Atimensec uint32 `redis:"atimensec" json:"atimensec"` Mtimensec uint32 `redis:"mtimensec" json:"mtimensec"` Ctimensec uint32 `redis:"ctimensec" json:"ctimensec"` Mode uint32 `redis:"mode" json:"mode"` Nlink uint32 `redis:"nlink" json:"nlink"` Rdev uint32 `redis:"rdev" json:"rdev"` Blksize uint32 `redis:"blksize" json:"blksize"` Padding uint32 `redis:"padding" json:"padding"` Uid uint32 `redis:"uid" json:"uid"` Gid uint32 `redis:"gid" json:"gid"` Gen uint64 `redis:"gen" json:"gen"` }
func (*BlobFsMetadata) ToProto ¶
func (m *BlobFsMetadata) ToProto() *proto.BlobFsMetadata
type BlobFsNode ¶
type BlobFsSystemOpts ¶
type BlobFsSystemOpts struct { Verbose bool CoordinatorClient CoordinatorClient Config BlobCacheClientConfig Client *BlobCacheClient }
type CacheService ¶
type CacheService struct { proto.UnimplementedBlobCacheServer // contains filtered or unexported fields }
func NewCacheService ¶
func NewCacheService(ctx context.Context, cfg BlobCacheConfig, locality string) (*CacheService, error)
func (*CacheService) AddFsNodeChild ¶
func (cs *CacheService) AddFsNodeChild(ctx context.Context, req *proto.AddFsNodeChildRequest) (*proto.AddFsNodeChildResponse, error)
func (*CacheService) AddHostToIndex ¶
func (cs *CacheService) AddHostToIndex(ctx context.Context, req *proto.AddHostToIndexRequest) (*proto.AddHostToIndexResponse, error)
func (*CacheService) GetAvailableHosts ¶
func (cs *CacheService) GetAvailableHosts(ctx context.Context, req *proto.GetAvailableHostsRequest) (*proto.GetAvailableHostsResponse, error)
func (*CacheService) GetContent ¶
func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
func (*CacheService) GetContentStream ¶
func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error
func (*CacheService) GetFsNode ¶
func (cs *CacheService) GetFsNode(ctx context.Context, req *proto.GetFsNodeRequest) (*proto.GetFsNodeResponse, error)
func (*CacheService) GetFsNodeChildren ¶
func (cs *CacheService) GetFsNodeChildren(ctx context.Context, req *proto.GetFsNodeChildrenRequest) (*proto.GetFsNodeChildrenResponse, error)
func (*CacheService) GetRegionConfig ¶
func (cs *CacheService) GetRegionConfig(ctx context.Context, req *proto.GetRegionConfigRequest) (*proto.GetRegionConfigResponse, error)
func (*CacheService) GetState ¶
func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
func (*CacheService) HasContent ¶
func (cs *CacheService) HasContent(ctx context.Context, req *proto.HasContentRequest) (*proto.HasContentResponse, error)
func (*CacheService) HostKeepAlive ¶
func (cs *CacheService) HostKeepAlive()
func (*CacheService) RefreshStoreFromContentLock ¶
func (cs *CacheService) RefreshStoreFromContentLock(ctx context.Context, req *proto.RefreshStoreFromContentLockRequest) (*proto.RefreshStoreFromContentLockResponse, error)
func (*CacheService) RemoveClientLock ¶
func (cs *CacheService) RemoveClientLock(ctx context.Context, req *proto.RemoveClientLockRequest) (*proto.RemoveClientLockResponse, error)
func (*CacheService) RemoveFsNode ¶
func (cs *CacheService) RemoveFsNode(ctx context.Context, req *proto.RemoveFsNodeRequest) (*proto.RemoveFsNodeResponse, error)
func (*CacheService) RemoveFsNodeChild ¶
func (cs *CacheService) RemoveFsNodeChild(ctx context.Context, req *proto.RemoveFsNodeChildRequest) (*proto.RemoveFsNodeChildResponse, error)
func (*CacheService) RemoveStoreFromContentLock ¶
func (cs *CacheService) RemoveStoreFromContentLock(ctx context.Context, req *proto.RemoveStoreFromContentLockRequest) (*proto.RemoveStoreFromContentLockResponse, error)
func (*CacheService) SetClientLock ¶
func (cs *CacheService) SetClientLock(ctx context.Context, req *proto.SetClientLockRequest) (*proto.SetClientLockResponse, error)
func (*CacheService) SetFsNode ¶
func (cs *CacheService) SetFsNode(ctx context.Context, req *proto.SetFsNodeRequest) (*proto.SetFsNodeResponse, error)
func (*CacheService) SetHostKeepAlive ¶
func (cs *CacheService) SetHostKeepAlive(ctx context.Context, req *proto.SetHostKeepAliveRequest) (*proto.SetHostKeepAliveResponse, error)
func (*CacheService) SetStoreFromContentLock ¶
func (cs *CacheService) SetStoreFromContentLock(ctx context.Context, req *proto.SetStoreFromContentLockRequest) (*proto.SetStoreFromContentLockResponse, error)
func (*CacheService) StartServer ¶
func (cs *CacheService) StartServer(port uint) error
func (*CacheService) StoreContent ¶
func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
func (*CacheService) StoreContentFromSource ¶
func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error)
func (*CacheService) StoreContentFromSourceWithLock ¶
func (cs *CacheService) StoreContentFromSourceWithLock(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceWithLockResponse, error)
func (*CacheService) StoreContentInBlobFs ¶
type CacheServiceOpts ¶
type CacheServiceOpts struct {
Addr string
}
type ClientOptions ¶
type ClientOptions struct {
RoutingKey string
}
type ClientRequest ¶
type ClientRequest struct {
// contains filtered or unexported fields
}
type ClientRequestType ¶
type ClientRequestType int
const ( ClientRequestTypeStorage ClientRequestType = iota ClientRequestTypeRetrieval )
type ConfigFormat ¶
type ConfigFormat string
var ( JSONConfigFormat ConfigFormat = ".json" YAMLConfigFormat ConfigFormat = ".yaml" YMLConfigFormat ConfigFormat = ".yml" )
type ConfigLoaderFunc ¶
type ConfigLoaderFunc func(k *koanf.Koanf) error
ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.
type ConfigManager ¶
type ConfigManager[T any] struct { // contains filtered or unexported fields }
ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.
func NewConfigManager ¶
func NewConfigManager[T any]() (*ConfigManager[T], error)
NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.
func (*ConfigManager[T]) GetConfig ¶
func (cm *ConfigManager[T]) GetConfig() T
GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.
func (*ConfigManager[T]) LoadConfig ¶
func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error
LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.
func (*ConfigManager[T]) Print ¶
func (cm *ConfigManager[T]) Print() string
Print returns a string representation of the current configuration state.
type ContentAddressableStorage ¶
type ContentAddressableStorage struct {
// contains filtered or unexported fields
}
func NewContentAddressableStorage ¶
func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, locality string, coordinator CoordinatorClient, config BlobCacheConfig) (*ContentAddressableStorage, error)
func (*ContentAddressableStorage) Cleanup ¶
func (cas *ContentAddressableStorage) Cleanup()
func (*ContentAddressableStorage) Exists ¶
func (cas *ContentAddressableStorage) Exists(hash string) bool
type CoordinatorClient ¶
type CoordinatorClient interface { AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error) SetClientLock(ctx context.Context, hash string, host string) error RemoveClientLock(ctx context.Context, hash string, host string) error SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error) RemoveFsNode(ctx context.Context, id string) error RemoveFsNodeChild(ctx context.Context, pid, id string) error GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error) AddFsNodeChild(ctx context.Context, pid, id string) error }
func NewCoordinatorClientLocal ¶
func NewCoordinatorClientLocal(globalConfig BlobCacheGlobalConfig, serverConfig BlobCacheServerConfig) (CoordinatorClient, error)
func NewCoordinatorClientRemote ¶
func NewCoordinatorClientRemote(cfg BlobCacheGlobalConfig, token string) (CoordinatorClient, error)
type CoordinatorClientLocal ¶
type CoordinatorClientLocal struct {
// contains filtered or unexported fields
}
func (*CoordinatorClientLocal) AddFsNodeChild ¶
func (c *CoordinatorClientLocal) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientLocal) AddHostToIndex ¶
func (c *CoordinatorClientLocal) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientLocal) GetAvailableHosts ¶
func (c *CoordinatorClientLocal) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*CoordinatorClientLocal) GetFsNode ¶
func (c *CoordinatorClientLocal) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*CoordinatorClientLocal) GetFsNodeChildren ¶
func (c *CoordinatorClientLocal) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*CoordinatorClientLocal) GetRegionConfig ¶
func (c *CoordinatorClientLocal) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
func (*CoordinatorClientLocal) RefreshStoreFromContentLock ¶
func (*CoordinatorClientLocal) RemoveClientLock ¶
func (*CoordinatorClientLocal) RemoveFsNode ¶
func (c *CoordinatorClientLocal) RemoveFsNode(ctx context.Context, id string) error
func (*CoordinatorClientLocal) RemoveFsNodeChild ¶
func (c *CoordinatorClientLocal) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientLocal) RemoveStoreFromContentLock ¶
func (*CoordinatorClientLocal) SetClientLock ¶
func (*CoordinatorClientLocal) SetFsNode ¶
func (c *CoordinatorClientLocal) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*CoordinatorClientLocal) SetHostKeepAlive ¶
func (c *CoordinatorClientLocal) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientLocal) SetStoreFromContentLock ¶
type CoordinatorClientRemote ¶
type CoordinatorClientRemote struct {
// contains filtered or unexported fields
}
func (*CoordinatorClientRemote) AddFsNodeChild ¶
func (c *CoordinatorClientRemote) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientRemote) AddHostToIndex ¶
func (c *CoordinatorClientRemote) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientRemote) GetAvailableHosts ¶
func (c *CoordinatorClientRemote) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*CoordinatorClientRemote) GetFsNode ¶
func (c *CoordinatorClientRemote) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*CoordinatorClientRemote) GetFsNodeChildren ¶
func (c *CoordinatorClientRemote) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*CoordinatorClientRemote) GetRegionConfig ¶
func (c *CoordinatorClientRemote) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
func (*CoordinatorClientRemote) RefreshStoreFromContentLock ¶
func (*CoordinatorClientRemote) RemoveClientLock ¶
func (*CoordinatorClientRemote) RemoveFsNode ¶
func (c *CoordinatorClientRemote) RemoveFsNode(ctx context.Context, id string) error
func (*CoordinatorClientRemote) RemoveFsNodeChild ¶
func (c *CoordinatorClientRemote) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientRemote) RemoveStoreFromContentLock ¶
func (*CoordinatorClientRemote) SetClientLock ¶
func (*CoordinatorClientRemote) SetFsNode ¶
func (c *CoordinatorClientRemote) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*CoordinatorClientRemote) SetHostKeepAlive ¶
func (c *CoordinatorClientRemote) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientRemote) SetStoreFromContentLock ¶
type DiscoveryClient ¶
type DiscoveryClient struct {
// contains filtered or unexported fields
}
func NewDiscoveryClient ¶
func NewDiscoveryClient(cfg BlobCacheGlobalConfig, hostMap *HostMap, coordinator CoordinatorClient, locality string) *DiscoveryClient
func (*DiscoveryClient) GetHostState ¶
func (d *DiscoveryClient) GetHostState(ctx context.Context, host *BlobCacheHost) (*BlobCacheHost, error)
GetHostState attempts to connect to the gRPC service and verifies its availability
type ErrNodeNotFound ¶
type ErrNodeNotFound struct {
Id string
}
func (*ErrNodeNotFound) Error ¶
func (e *ErrNodeNotFound) Error() string
type FSNode ¶
type FileSystem ¶
type FileSystemOpts ¶
type FileSystemOpts struct { MountPoint string Verbose bool Metadata *BlobCacheMetadata }
BlobFS types
type FileSystemStorage ¶
type HostMap ¶
type HostMap struct {
// contains filtered or unexported fields
}
func NewHostMap ¶
func NewHostMap(cfg BlobCacheGlobalConfig, onHostAdded func(*BlobCacheHost) error) *HostMap
func (*HostMap) Closest ¶
func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
Closest finds the nearest host within a given timeout If no hosts are found, it will error out
func (*HostMap) ClosestWithCapacity ¶
func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)
ClosestWithCapacity finds the nearest host with available storage capacity within a given timeout If no hosts are found, it will error out
func (*HostMap) Get ¶
func (hm *HostMap) Get(hostId string) *BlobCacheHost
func (*HostMap) GetAll ¶
func (hm *HostMap) GetAll() []*BlobCacheHost
func (*HostMap) Remove ¶
func (hm *HostMap) Remove(host *BlobCacheHost)
func (*HostMap) Set ¶
func (hm *HostMap) Set(host *BlobCacheHost)
type JuiceFSConfig ¶
type JuiceFSConfig struct { RedisURI string `key:"redisURI" json:"redis_uri"` Bucket string `key:"bucket" json:"bucket"` AccessKey string `key:"accessKey" json:"access_key"` SecretKey string `key:"secretKey" json:"secret_key"` CacheSize int64 `key:"cacheSize" json:"cache_size"` BlockSize int64 `key:"blockSize" json:"block_size"` Prefetch int64 `key:"prefetch" json:"prefetch"` BufferSize int64 `key:"bufferSize" json:"buffer_size"` }
type JuiceFsSource ¶
type JuiceFsSource struct {
// contains filtered or unexported fields
}
func (*JuiceFsSource) Format ¶
func (s *JuiceFsSource) Format(fsName string) error
func (*JuiceFsSource) Mount ¶
func (s *JuiceFsSource) Mount(localPath string) error
func (*JuiceFsSource) Unmount ¶
func (s *JuiceFsSource) Unmount(localPath string) error
type MetadataConfig ¶
type MetadataConfig struct { Mode BlobCacheMetadataMode `key:"mode" json:"mode"` ValkeyConfig ValkeyConfig `key:"valkey" json:"valkey"` // Default config RedisAddr string `key:"redisAddr" json:"redis_addr"` RedisPasswd string `key:"redisPasswd" json:"redis_passwd"` RedisTLSEnabled bool `key:"redisTLSEnabled" json:"redis_tls_enabled"` RedisMode RedisMode `key:"redisMode" json:"redis_mode"` RedisMasterName string `key:"redisMasterName" json:"redis_master_name"` }
type MountPointConfig ¶
type MountPointConfig struct { BucketName string `key:"bucketName" json:"bucket_name"` AccessKey string `key:"accessKey" json:"access_key"` SecretKey string `key:"secretKey" json:"secret_key"` Region string `key:"region" json:"region"` EndpointURL string `key:"endpointUrl" json:"endpoint_url"` ForcePathStyle bool `key:"forcePathStyle" json:"force_path_style"` }
type MountPointSource ¶
type MountPointSource struct {
// contains filtered or unexported fields
}
func (*MountPointSource) Format ¶
func (s *MountPointSource) Format(fsName string) error
func (*MountPointSource) Mount ¶
func (s *MountPointSource) Mount(localPath string) error
func (*MountPointSource) Unmount ¶
func (s *MountPointSource) Unmount(localPath string) error
type ParserFunc ¶
type ParserFunc func() (koanf.Parser, error)
type RedisClient ¶
type RedisClient struct {
redis.UniversalClient
}
func NewRedisClient ¶
func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)
func (*RedisClient) Keys ¶
Gets all keys using a pattern Actually runs a scan since keys locks up the database.
func (*RedisClient) PSubscribe ¶
func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
func (*RedisClient) Publish ¶
func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
func (*RedisClient) Subscribe ¶
func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
func (*RedisClient) ToSlice ¶
func (r *RedisClient) ToSlice(v interface{}) []interface{}
type RedisConfig ¶
type RedisConfig struct { Addrs []string `key:"addrs" json:"addrs"` Mode RedisMode `key:"mode" json:"mode"` ClientName string `key:"clientName" json:"client_name"` EnableTLS bool `key:"enableTLS" json:"enable_tls"` InsecureSkipVerify bool `key:"insecureSkipVerify" json:"insecure_skip_verify"` MinIdleConns int `key:"minIdleConns" json:"min_idle_conns"` MaxIdleConns int `key:"maxIdleConns" json:"max_idle_conns"` ConnMaxIdleTime time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"` ConnMaxLifetime time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"` DialTimeout time.Duration `key:"dialTimeout" json:"dial_timeout"` ReadTimeout time.Duration `key:"readTimeout" json:"read_timeout"` WriteTimeout time.Duration `key:"writeTimeout" json:"write_timeout"` MaxRedirects int `key:"maxRedirects" json:"max_redirects"` MaxRetries int `key:"maxRetries" json:"max_retries"` PoolSize int `key:"poolSize" json:"pool_size"` Username string `key:"username" json:"username"` Password string `key:"password" json:"password"` RouteByLatency bool `key:"routeByLatency" json:"route_by_latency"` MasterName string `key:"masterName" json:"master_name"` SentinelPassword string `key:"sentinelPassword" json:"sentinel_password"` }
type RedisLock ¶
type RedisLock struct {
// contains filtered or unexported fields
}
func NewRedisLock ¶
func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock
type RedisLockOption ¶
type RedisLockOption func(*RedisLock)
type RedisLockOptions ¶
type RegionConfig ¶
type RegionConfig struct {
ServerConfig BlobCacheServerConfig `key:"server" json:"server"`
}
type RendezvousHasher ¶
type RendezvousHasher interface { Add(hosts ...*BlobCacheHost) Remove(host *BlobCacheHost) GetN(n int, key string) []*BlobCacheHost }
type S3Client ¶
type S3Client struct { Client *s3.Client Source S3SourceConfig }
func NewS3Client ¶
func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig) (*S3Client, error)
func (*S3Client) BucketName ¶
func (*S3Client) DownloadIntoBuffer ¶
type S3SourceConfig ¶
type Source ¶
type Source interface { Mount(localPath string) error Format(fsName string) error Unmount(localPath string) error }
func NewJuiceFsSource ¶
func NewJuiceFsSource(config JuiceFSConfig) (Source, error)
func NewMountPointSource ¶
func NewMountPointSource(config MountPointConfig) (Source, error)
func NewSource ¶
func NewSource(config SourceConfig) (Source, error)
type SourceConfig ¶
type SourceConfig struct { Mode string `key:"mode" json:"mode"` FilesystemName string `key:"fsName" json:"filesystem_name"` FilesystemPath string `key:"fsPath" json:"filesystem_path"` JuiceFS JuiceFSConfig `key:"juicefs" json:"juicefs"` MountPoint MountPointConfig `key:"mountpoint" json:"mountpoint"` }
type StorageLayer ¶
type StorageLayer interface { }
type ValkeyConfig ¶
type ValkeyConfig struct { PrimaryName string `key:"primaryName" json:"primary_name"` Password string `key:"password" json:"password"` TLS bool `key:"tls" json:"tls"` Host string `key:"host" json:"host"` Port int `key:"port" json:"port"` ExistingPrimary ValkeyExistingPrimary `key:"existingPrimary" json:"existingPrimary"` }