blobcache

package
v0.0.0-...-443e884 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2025 License: MIT Imports: 58 Imported by: 3

Documentation

Index

Constants

View Source
const (
	SourceModeJuiceFS    string = "juicefs"
	SourceModeMountPoint string = "mountpoint"
)
View Source
const (
	BlobCacheHostPrefix string = "blobcache-host"
	BlobCacheVersion    string = "dev"
)

Variables

View Source
var (
	ErrHostNotFound            = errors.New("host not found")
	ErrUnableToReachHost       = errors.New("unable to reach host")
	ErrInvalidHostVersion      = errors.New("invalid host version")
	ErrContentNotFound         = errors.New("content not found")
	ErrClientNotFound          = errors.New("client not found")
	ErrCacheLockHeld           = errors.New("cache lock held")
	ErrUnableToPopulateContent = errors.New("unable to populate content from original source")
	ErrBlobFsMountFailure      = errors.New("failed to mount blobfs")
	ErrUnableToAcquireLock     = errors.New("unable to acquire lock")
)
View Source
var (
	ErrChannelClosed    = errors.New("redis: channel closed")
	ErrConnectionIssue  = errors.New("redis: connection issue")
	ErrUnknownRedisMode = errors.New("redis: unknown mode")
)
View Source
var (
	Logger *logger
)
View Source
var MetadataKeys = &metadataKeys{}

Functions

func DialWithTimeout

func DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)

func GenerateFsID

func GenerateFsID(name string) string

Generates a directory ID based on parent ID and name.

func GetConfigParser

func GetConfigParser(format ConfigFormat) (koanf.Parser, error)

func GetLogger

func GetLogger() *logger

func GetPrivateIpAddr

func GetPrivateIpAddr() (string, error)

func GetPublicIpAddr

func GetPublicIpAddr() (string, error)

func InitLogger

func InitLogger(debugMode bool, prettyLogs bool)

func Mount

func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error)

func SHA1StringToUint64

func SHA1StringToUint64(hash string) (uint64, error)

SHA1StringToUint64 converts the first 8 bytes of a SHA-1 hash string to a uint64

func ToSlice

func ToSlice(v interface{}) []interface{}

Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.

func ToStruct

func ToStruct(m map[string]string, out interface{}) error

Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.

func WithClientName

func WithClientName(name string) func(*redis.UniversalOptions)

Types

type BlobCacheClient

type BlobCacheClient struct {
	// contains filtered or unexported fields
}

func NewBlobCacheClient

func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)

func (*BlobCacheClient) Cleanup

func (c *BlobCacheClient) Cleanup() error

func (*BlobCacheClient) GetContent

func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64, opts struct {
	RoutingKey string
}) ([]byte, error)

func (*BlobCacheClient) GetContentStream

func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64, opts struct {
	RoutingKey string
}) (chan []byte, error)

func (*BlobCacheClient) GetNearbyHosts

func (c *BlobCacheClient) GetNearbyHosts() ([]*BlobCacheHost, error)

func (*BlobCacheClient) GetState

func (c *BlobCacheClient) GetState() error

func (*BlobCacheClient) HostsAvailable

func (c *BlobCacheClient) HostsAvailable() bool

func (*BlobCacheClient) IsCachedNearby

func (c *BlobCacheClient) IsCachedNearby(hash string) (bool, error)

func (*BlobCacheClient) IsPathCachedNearby

func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, path string) bool

func (*BlobCacheClient) StoreContent

func (c *BlobCacheClient) StoreContent(chunks chan []byte, hash string, opts struct {
	RoutingKey string
}) (string, error)

func (*BlobCacheClient) StoreContentFromFUSE

func (c *BlobCacheClient) StoreContentFromFUSE(source struct {
	Path string
}, opts struct {
	RoutingKey string
	Lock       bool
}) (string, error)

func (*BlobCacheClient) StoreContentFromS3

func (c *BlobCacheClient) StoreContentFromS3(source struct {
	Path        string
	BucketName  string
	Region      string
	EndpointURL string
	AccessKey   string
	SecretKey   string
}, opts struct {
	RoutingKey string
	Lock       bool
}) (string, error)

func (*BlobCacheClient) WaitForHosts

func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error

type BlobCacheClientConfig

type BlobCacheClientConfig struct {
	Token                 string       `key:"token" json:"token"`
	MinRetryLengthBytes   int64        `key:"minRetryLengthBytes" json:"min_retry_length_bytes"`
	MaxGetContentAttempts int          `key:"maxGetContentAttempts" json:"max_get_content_attempts"`
	NTopHosts             int          `key:"nTopHosts" json:"n_top_hosts"`
	BlobFs                BlobFsConfig `key:"blobfs" json:"blobfs"`
}

type BlobCacheConfig

type BlobCacheConfig struct {
	Server BlobCacheServerConfig `key:"server" json:"server"`
	Client BlobCacheClientConfig `key:"client" json:"client"`
	Global BlobCacheGlobalConfig `key:"global" json:"global"`
}

type BlobCacheGlobalConfig

type BlobCacheGlobalConfig struct {
	DefaultLocality                 string  `key:"defaultLocality" json:"default_locality"`
	CoordinatorHost                 string  `key:"coordinatorHost" json:"coordinator_host"`
	ServerPort                      uint    `key:"serverPort" json:"server_port"`
	DiscoveryIntervalS              int     `key:"discoveryIntervalS" json:"discovery_interval_s"`
	RoundTripThresholdMilliseconds  uint    `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"`
	HostStorageCapacityThresholdPct float64 `key:"hostStorageCapacityThresholdPct" json:"host_storage_capacity_threshold_pct"`
	GRPCDialTimeoutS                int     `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"`
	GRPCMessageSizeBytes            int     `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"`
	DebugMode                       bool    `key:"debugMode" json:"debug_mode"`
	PrettyLogs                      bool    `key:"prettyLogs" json:"pretty_logs"`
}

func (*BlobCacheGlobalConfig) GetLocality

func (c *BlobCacheGlobalConfig) GetLocality() string

type BlobCacheHost

type BlobCacheHost struct {
	RTT              time.Duration `redis:"rtt" json:"rtt"`
	HostId           string        `redis:"host_id" json:"host_id"`
	Addr             string        `redis:"addr" json:"addr"`
	PrivateAddr      string        `redis:"private_addr" json:"private_addr"`
	CapacityUsagePct float64       `redis:"capacity_usage_pct" json:"capacity_usage_pct"`
}

func (*BlobCacheHost) Bytes

func (h *BlobCacheHost) Bytes() []byte

Bytes is needed for the rendezvous hasher

func (*BlobCacheHost) ToProto

func (h *BlobCacheHost) ToProto() *proto.BlobCacheHost

type BlobCacheMetadata

type BlobCacheMetadata struct {
	// contains filtered or unexported fields
}

func NewBlobCacheMetadata

func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)

func (*BlobCacheMetadata) AddFsNodeChild

func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*BlobCacheMetadata) AddHostToIndex

func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error

func (*BlobCacheMetadata) GetAvailableHosts

func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, locality string, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error)

func (*BlobCacheMetadata) GetFsNode

func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)

func (*BlobCacheMetadata) GetFsNodeChildren

func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)

func (*BlobCacheMetadata) GetHostIndex

func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context, locality string) ([]*BlobCacheHost, error)

func (*BlobCacheMetadata) RefreshStoreFromContentLock

func (m *BlobCacheMetadata) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*BlobCacheMetadata) RemoveClientLock

func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error

func (*BlobCacheMetadata) RemoveFsNode

func (m *BlobCacheMetadata) RemoveFsNode(ctx context.Context, id string) error

func (*BlobCacheMetadata) RemoveFsNodeChild

func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, pid, id string) error

func (*BlobCacheMetadata) RemoveHostFromIndex

func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, locality string, host *BlobCacheHost) error

func (*BlobCacheMetadata) RemoveStoreFromContentLock

func (m *BlobCacheMetadata) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*BlobCacheMetadata) SetClientLock

func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error

func (*BlobCacheMetadata) SetFsNode

func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error

func (*BlobCacheMetadata) SetHostKeepAlive

func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error

func (*BlobCacheMetadata) SetStoreFromContentLock

func (m *BlobCacheMetadata) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

type BlobCacheMetadataMode

type BlobCacheMetadataMode string
const (
	BlobCacheMetadataModeDefault BlobCacheMetadataMode = "default"
	BlobCacheMetadataModeLocal   BlobCacheMetadataMode = "local"
)

type BlobCacheServerConfig

type BlobCacheServerConfig struct {
	Mode                 BlobCacheServerMode `key:"mode" json:"mode"`
	DiskCacheDir         string              `key:"diskCacheDir" json:"disk_cache_dir"`
	DiskCacheMaxUsagePct float64             `key:"diskCacheMaxUsagePct" json:"disk_cache_max_usage_pct"`
	Token                string              `key:"token" json:"token"`
	PrettyLogs           bool                `key:"prettyLogs" json:"pretty_logs"`
	ObjectTtlS           int                 `key:"objectTtlS" json:"object_ttl_s"`
	MaxCachePct          int64               `key:"maxCachePct" json:"max_cache_pct"`
	PageSizeBytes        int64               `key:"pageSizeBytes" json:"page_size_bytes"`
	Metadata             MetadataConfig      `key:"metadata" json:"metadata"`
	Sources              []SourceConfig      `key:"sources" json:"sources"`

	// Allows a coordinator to override a slave server's config for a specific locality/region
	Regions map[string]RegionConfig `key:"regions" json:"regions"`
}

func BlobCacheServerConfigFromProto

func BlobCacheServerConfigFromProto(protoConfig *proto.BlobCacheServerConfig) BlobCacheServerConfig

func (*BlobCacheServerConfig) ToProto

type BlobCacheServerMode

type BlobCacheServerMode string
const (
	BlobCacheServerModeCoordinator BlobCacheServerMode = "coordinator"
	BlobCacheServerModeSlave       BlobCacheServerMode = "slave"
)

type BlobFs

type BlobFs struct {
	CoordinatorClient CoordinatorClient
	Client            *BlobCacheClient
	Config            BlobCacheClientConfig
	// contains filtered or unexported fields
}

func NewFileSystem

func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error)

NewFileSystem initializes a new BlobFs with root metadata.

func (*BlobFs) Root

func (bfs *BlobFs) Root() (fs.InodeEmbedder, error)

type BlobFsConfig

type BlobFsConfig struct {
	Enabled            bool     `key:"enabled" json:"enabled"`
	MountPoint         string   `key:"mountPoint" json:"mount_point"`
	MaxBackgroundTasks int      `key:"maxBackgroundTasks" json:"max_background_tasks"`
	MaxWriteKB         int      `key:"maxWriteKB" json:"max_write_kb"`
	MaxReadAheadKB     int      `key:"maxReadAheadKB" json:"max_read_ahead_kb"`
	DirectMount        bool     `key:"directMount" json:"direct_mount"`
	DirectIO           bool     `key:"directIO" json:"direct_io"`
	Options            []string `key:"options" json:"options"`
}

type BlobFsMetadata

type BlobFsMetadata struct {
	PID       string `redis:"pid" json:"pid"`
	ID        string `redis:"id" json:"id"`
	Name      string `redis:"name" json:"name"`
	Path      string `redis:"path" json:"path"`
	Hash      string `redis:"hash" json:"hash"`
	Ino       uint64 `redis:"ino" json:"ino"`
	Size      uint64 `redis:"size" json:"size"`
	Blocks    uint64 `redis:"blocks" json:"blocks"`
	Atime     uint64 `redis:"atime" json:"atime"`
	Mtime     uint64 `redis:"mtime" json:"mtime"`
	Ctime     uint64 `redis:"ctime" json:"ctime"`
	Atimensec uint32 `redis:"atimensec" json:"atimensec"`
	Mtimensec uint32 `redis:"mtimensec" json:"mtimensec"`
	Ctimensec uint32 `redis:"ctimensec" json:"ctimensec"`
	Mode      uint32 `redis:"mode" json:"mode"`
	Nlink     uint32 `redis:"nlink" json:"nlink"`
	Rdev      uint32 `redis:"rdev" json:"rdev"`
	Blksize   uint32 `redis:"blksize" json:"blksize"`
	Padding   uint32 `redis:"padding" json:"padding"`
	Uid       uint32 `redis:"uid" json:"uid"`
	Gid       uint32 `redis:"gid" json:"gid"`
	Gen       uint64 `redis:"gen" json:"gen"`
}

func (*BlobFsMetadata) ToProto

func (m *BlobFsMetadata) ToProto() *proto.BlobFsMetadata

type BlobFsNode

type BlobFsNode struct {
	Path     string
	ID       string
	PID      string
	Name     string
	Target   string
	Hash     string
	Attr     fuse.Attr
	Prefetch *bool
}

type BlobFsSystemOpts

type BlobFsSystemOpts struct {
	Verbose           bool
	CoordinatorClient CoordinatorClient
	Config            BlobCacheClientConfig
	Client            *BlobCacheClient
}

type CacheService

type CacheService struct {
	proto.UnimplementedBlobCacheServer
	// contains filtered or unexported fields
}

func NewCacheService

func NewCacheService(ctx context.Context, cfg BlobCacheConfig, locality string) (*CacheService, error)

func (*CacheService) AddFsNodeChild

func (*CacheService) AddHostToIndex

func (*CacheService) GetAvailableHosts

func (*CacheService) GetContent

func (*CacheService) GetContentStream

func (*CacheService) GetFsNode

func (*CacheService) GetFsNodeChildren

func (*CacheService) GetRegionConfig

func (*CacheService) GetState

func (*CacheService) HasContent

func (*CacheService) HostKeepAlive

func (cs *CacheService) HostKeepAlive()

func (*CacheService) RemoveClientLock

func (*CacheService) RemoveFsNode

func (*CacheService) RemoveFsNodeChild

func (*CacheService) SetClientLock

func (*CacheService) SetFsNode

func (*CacheService) SetHostKeepAlive

func (*CacheService) StartServer

func (cs *CacheService) StartServer(port uint) error

func (*CacheService) StoreContent

func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error

func (*CacheService) StoreContentInBlobFs

func (cs *CacheService) StoreContentInBlobFs(ctx context.Context, path string, hash string, size uint64) error

type CacheServiceOpts

type CacheServiceOpts struct {
	Addr string
}

type ClientOptions

type ClientOptions struct {
	RoutingKey string
}

type ClientRequest

type ClientRequest struct {
	// contains filtered or unexported fields
}

type ClientRequestType

type ClientRequestType int
const (
	ClientRequestTypeStorage ClientRequestType = iota
	ClientRequestTypeRetrieval
)

type ConfigFormat

type ConfigFormat string
var (
	JSONConfigFormat ConfigFormat = ".json"
	YAMLConfigFormat ConfigFormat = ".yaml"
	YMLConfigFormat  ConfigFormat = ".yml"
)

type ConfigLoaderFunc

type ConfigLoaderFunc func(k *koanf.Koanf) error

ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.

type ConfigManager

type ConfigManager[T any] struct {
	// contains filtered or unexported fields
}

ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.

func NewConfigManager

func NewConfigManager[T any]() (*ConfigManager[T], error)

NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.

func (*ConfigManager[T]) GetConfig

func (cm *ConfigManager[T]) GetConfig() T

GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.

func (*ConfigManager[T]) LoadConfig

func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error

LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.

func (*ConfigManager[T]) Print

func (cm *ConfigManager[T]) Print() string

Print returns a string representation of the current configuration state.

type ContentAddressableStorage

type ContentAddressableStorage struct {
	// contains filtered or unexported fields
}

func NewContentAddressableStorage

func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, locality string, coordinator CoordinatorClient, config BlobCacheConfig) (*ContentAddressableStorage, error)

func (*ContentAddressableStorage) Add

func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, content []byte) error

func (*ContentAddressableStorage) Cleanup

func (cas *ContentAddressableStorage) Cleanup()

func (*ContentAddressableStorage) Exists

func (cas *ContentAddressableStorage) Exists(hash string) bool

func (*ContentAddressableStorage) Get

func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) (int64, error)

type CoordinatorClient

type CoordinatorClient interface {
	AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
	SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
	GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
	GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
	SetClientLock(ctx context.Context, hash string, host string) error
	RemoveClientLock(ctx context.Context, hash string, host string) error
	SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
	RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
	RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
	SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
	GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
	RemoveFsNode(ctx context.Context, id string) error
	RemoveFsNodeChild(ctx context.Context, pid, id string) error
	GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
	AddFsNodeChild(ctx context.Context, pid, id string) error
}

func NewCoordinatorClientLocal

func NewCoordinatorClientLocal(globalConfig BlobCacheGlobalConfig, serverConfig BlobCacheServerConfig) (CoordinatorClient, error)

func NewCoordinatorClientRemote

func NewCoordinatorClientRemote(cfg BlobCacheGlobalConfig, token string) (CoordinatorClient, error)

type CoordinatorClientLocal

type CoordinatorClientLocal struct {
	// contains filtered or unexported fields
}

func (*CoordinatorClientLocal) AddFsNodeChild

func (c *CoordinatorClientLocal) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*CoordinatorClientLocal) AddHostToIndex

func (c *CoordinatorClientLocal) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error

func (*CoordinatorClientLocal) GetAvailableHosts

func (c *CoordinatorClientLocal) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)

func (*CoordinatorClientLocal) GetFsNode

func (*CoordinatorClientLocal) GetFsNodeChildren

func (c *CoordinatorClientLocal) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)

func (*CoordinatorClientLocal) GetRegionConfig

func (c *CoordinatorClientLocal) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)

func (*CoordinatorClientLocal) RefreshStoreFromContentLock

func (c *CoordinatorClientLocal) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*CoordinatorClientLocal) RemoveClientLock

func (c *CoordinatorClientLocal) RemoveClientLock(ctx context.Context, hash string, host string) error

func (*CoordinatorClientLocal) RemoveFsNode

func (c *CoordinatorClientLocal) RemoveFsNode(ctx context.Context, id string) error

func (*CoordinatorClientLocal) RemoveFsNodeChild

func (c *CoordinatorClientLocal) RemoveFsNodeChild(ctx context.Context, pid, id string) error

func (*CoordinatorClientLocal) RemoveStoreFromContentLock

func (c *CoordinatorClientLocal) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*CoordinatorClientLocal) SetClientLock

func (c *CoordinatorClientLocal) SetClientLock(ctx context.Context, hash string, host string) error

func (*CoordinatorClientLocal) SetFsNode

func (c *CoordinatorClientLocal) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error

func (*CoordinatorClientLocal) SetHostKeepAlive

func (c *CoordinatorClientLocal) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error

func (*CoordinatorClientLocal) SetStoreFromContentLock

func (c *CoordinatorClientLocal) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

type CoordinatorClientRemote

type CoordinatorClientRemote struct {
	// contains filtered or unexported fields
}

func (*CoordinatorClientRemote) AddFsNodeChild

func (c *CoordinatorClientRemote) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*CoordinatorClientRemote) AddHostToIndex

func (c *CoordinatorClientRemote) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error

func (*CoordinatorClientRemote) GetAvailableHosts

func (c *CoordinatorClientRemote) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)

func (*CoordinatorClientRemote) GetFsNode

func (*CoordinatorClientRemote) GetFsNodeChildren

func (c *CoordinatorClientRemote) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)

func (*CoordinatorClientRemote) GetRegionConfig

func (c *CoordinatorClientRemote) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)

func (*CoordinatorClientRemote) RefreshStoreFromContentLock

func (c *CoordinatorClientRemote) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*CoordinatorClientRemote) RemoveClientLock

func (c *CoordinatorClientRemote) RemoveClientLock(ctx context.Context, hash string, hostId string) error

func (*CoordinatorClientRemote) RemoveFsNode

func (c *CoordinatorClientRemote) RemoveFsNode(ctx context.Context, id string) error

func (*CoordinatorClientRemote) RemoveFsNodeChild

func (c *CoordinatorClientRemote) RemoveFsNodeChild(ctx context.Context, pid, id string) error

func (*CoordinatorClientRemote) RemoveStoreFromContentLock

func (c *CoordinatorClientRemote) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*CoordinatorClientRemote) SetClientLock

func (c *CoordinatorClientRemote) SetClientLock(ctx context.Context, hash string, hostId string) error

func (*CoordinatorClientRemote) SetFsNode

func (c *CoordinatorClientRemote) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error

func (*CoordinatorClientRemote) SetHostKeepAlive

func (c *CoordinatorClientRemote) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error

func (*CoordinatorClientRemote) SetStoreFromContentLock

func (c *CoordinatorClientRemote) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

type DiscoveryClient

type DiscoveryClient struct {
	// contains filtered or unexported fields
}

func NewDiscoveryClient

func NewDiscoveryClient(cfg BlobCacheGlobalConfig, hostMap *HostMap, coordinator CoordinatorClient, locality string) *DiscoveryClient

func (*DiscoveryClient) GetHostState

func (d *DiscoveryClient) GetHostState(ctx context.Context, host *BlobCacheHost) (*BlobCacheHost, error)

GetHostState attempts to connect to the gRPC service and verifies its availability

func (*DiscoveryClient) Start

func (d *DiscoveryClient) Start(ctx context.Context) error

Used by blobcache servers to discover their closest peers

type ErrNodeNotFound

type ErrNodeNotFound struct {
	Id string
}

func (*ErrNodeNotFound) Error

func (e *ErrNodeNotFound) Error() string

type FSNode

type FSNode struct {
	fs.Inode
	// contains filtered or unexported fields
}

func (*FSNode) Create

func (n *FSNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (inode *fs.Inode, fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)

func (*FSNode) Getattr

func (n *FSNode) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno

func (*FSNode) Lookup

func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)

func (*FSNode) Mkdir

func (n *FSNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)

func (*FSNode) OnAdd

func (n *FSNode) OnAdd(ctx context.Context)

func (*FSNode) Open

func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)

func (*FSNode) Opendir

func (n *FSNode) Opendir(ctx context.Context) syscall.Errno

func (*FSNode) Read

func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno)

func (*FSNode) Readdir

func (n *FSNode) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno)
func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno)

func (*FSNode) Rename

func (n *FSNode) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbedder, newName string, flags uint32) syscall.Errno

func (*FSNode) Rmdir

func (n *FSNode) Rmdir(ctx context.Context, name string) syscall.Errno
func (n *FSNode) Unlink(ctx context.Context, name string) syscall.Errno

type FileSystem

type FileSystem interface {
	Mount(opts FileSystemOpts) (func() error, <-chan error, error)
	Unmount() error
	Format() error
}

type FileSystemOpts

type FileSystemOpts struct {
	MountPoint string
	Verbose    bool
	Metadata   *BlobCacheMetadata
}

BlobFS types

type FileSystemStorage

type FileSystemStorage interface {
	Metadata()
	Get(string)
	ReadFile(interface{}, []byte, int64)
}

type HostMap

type HostMap struct {
	// contains filtered or unexported fields
}

func NewHostMap

func NewHostMap(cfg BlobCacheGlobalConfig, onHostAdded func(*BlobCacheHost) error) *HostMap

func (*HostMap) Closest

func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)

Closest finds the nearest host within a given timeout If no hosts are found, it will error out

func (*HostMap) ClosestWithCapacity

func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)

ClosestWithCapacity finds the nearest host with available storage capacity within a given timeout If no hosts are found, it will error out

func (*HostMap) Get

func (hm *HostMap) Get(hostId string) *BlobCacheHost

func (*HostMap) GetAll

func (hm *HostMap) GetAll() []*BlobCacheHost

func (*HostMap) Members

func (hm *HostMap) Members() mapset.Set[string]

func (*HostMap) Remove

func (hm *HostMap) Remove(host *BlobCacheHost)

func (*HostMap) Set

func (hm *HostMap) Set(host *BlobCacheHost)

type JuiceFSConfig

type JuiceFSConfig struct {
	RedisURI   string `key:"redisURI" json:"redis_uri"`
	Bucket     string `key:"bucket" json:"bucket"`
	AccessKey  string `key:"accessKey" json:"access_key"`
	SecretKey  string `key:"secretKey" json:"secret_key"`
	CacheSize  int64  `key:"cacheSize" json:"cache_size"`
	BlockSize  int64  `key:"blockSize" json:"block_size"`
	Prefetch   int64  `key:"prefetch" json:"prefetch"`
	BufferSize int64  `key:"bufferSize" json:"buffer_size"`
}

type JuiceFsSource

type JuiceFsSource struct {
	// contains filtered or unexported fields
}

func (*JuiceFsSource) Format

func (s *JuiceFsSource) Format(fsName string) error

func (*JuiceFsSource) Mount

func (s *JuiceFsSource) Mount(localPath string) error

func (*JuiceFsSource) Unmount

func (s *JuiceFsSource) Unmount(localPath string) error

type MetadataConfig

type MetadataConfig struct {
	Mode         BlobCacheMetadataMode `key:"mode" json:"mode"`
	ValkeyConfig ValkeyConfig          `key:"valkey" json:"valkey"`

	// Default config
	RedisAddr       string    `key:"redisAddr" json:"redis_addr"`
	RedisPasswd     string    `key:"redisPasswd" json:"redis_passwd"`
	RedisTLSEnabled bool      `key:"redisTLSEnabled" json:"redis_tls_enabled"`
	RedisMode       RedisMode `key:"redisMode" json:"redis_mode"`
	RedisMasterName string    `key:"redisMasterName" json:"redis_master_name"`
}

type MountPointConfig

type MountPointConfig struct {
	BucketName     string `key:"bucketName" json:"bucket_name"`
	AccessKey      string `key:"accessKey" json:"access_key"`
	SecretKey      string `key:"secretKey" json:"secret_key"`
	Region         string `key:"region" json:"region"`
	EndpointURL    string `key:"endpointUrl" json:"endpoint_url"`
	ForcePathStyle bool   `key:"forcePathStyle" json:"force_path_style"`
}

type MountPointSource

type MountPointSource struct {
	// contains filtered or unexported fields
}

func (*MountPointSource) Format

func (s *MountPointSource) Format(fsName string) error

func (*MountPointSource) Mount

func (s *MountPointSource) Mount(localPath string) error

func (*MountPointSource) Unmount

func (s *MountPointSource) Unmount(localPath string) error

type ParserFunc

type ParserFunc func() (koanf.Parser, error)

type RedisClient

type RedisClient struct {
	redis.UniversalClient
}

func NewRedisClient

func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)

func (*RedisClient) Keys

func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)

Gets all keys using a pattern Actually runs a scan since keys locks up the database.

func (*RedisClient) LRange

func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)

func (*RedisClient) PSubscribe

func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())

func (*RedisClient) Publish

func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd

func (*RedisClient) Scan

func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)

func (*RedisClient) Subscribe

func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)

func (*RedisClient) ToSlice

func (r *RedisClient) ToSlice(v interface{}) []interface{}

func (*RedisClient) ToStruct

func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error

type RedisConfig

type RedisConfig struct {
	Addrs              []string      `key:"addrs" json:"addrs"`
	Mode               RedisMode     `key:"mode" json:"mode"`
	ClientName         string        `key:"clientName" json:"client_name"`
	EnableTLS          bool          `key:"enableTLS" json:"enable_tls"`
	InsecureSkipVerify bool          `key:"insecureSkipVerify" json:"insecure_skip_verify"`
	MinIdleConns       int           `key:"minIdleConns" json:"min_idle_conns"`
	MaxIdleConns       int           `key:"maxIdleConns" json:"max_idle_conns"`
	ConnMaxIdleTime    time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"`
	ConnMaxLifetime    time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"`
	DialTimeout        time.Duration `key:"dialTimeout" json:"dial_timeout"`
	ReadTimeout        time.Duration `key:"readTimeout" json:"read_timeout"`
	WriteTimeout       time.Duration `key:"writeTimeout" json:"write_timeout"`
	MaxRedirects       int           `key:"maxRedirects" json:"max_redirects"`
	MaxRetries         int           `key:"maxRetries" json:"max_retries"`
	PoolSize           int           `key:"poolSize" json:"pool_size"`
	Username           string        `key:"username" json:"username"`
	Password           string        `key:"password" json:"password"`
	RouteByLatency     bool          `key:"routeByLatency" json:"route_by_latency"`
	MasterName         string        `key:"masterName" json:"master_name"`
	SentinelPassword   string        `key:"sentinelPassword" json:"sentinel_password"`
}

type RedisLock

type RedisLock struct {
	// contains filtered or unexported fields
}

func NewRedisLock

func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock

func (*RedisLock) Acquire

func (l *RedisLock) Acquire(ctx context.Context, key string, opts RedisLockOptions) error

func (*RedisLock) Refresh

func (l *RedisLock) Refresh(key string, opts RedisLockOptions) error

func (*RedisLock) Release

func (l *RedisLock) Release(key string) error

type RedisLockOption

type RedisLockOption func(*RedisLock)

type RedisLockOptions

type RedisLockOptions struct {
	TtlS    int
	Retries int
}

type RedisMode

type RedisMode string
var (
	RedisModeSingle   RedisMode = "single"
	RedisModeCluster  RedisMode = "cluster"
	RedisModeSentinel RedisMode = "sentinel"
)

type RegionConfig

type RegionConfig struct {
	ServerConfig BlobCacheServerConfig `key:"server" json:"server"`
}

type RendezvousHasher

type RendezvousHasher interface {
	Add(hosts ...*BlobCacheHost)
	Remove(host *BlobCacheHost)
	GetN(n int, key string) []*BlobCacheHost
}

type S3Client

type S3Client struct {
	Client *s3.Client
	Source S3SourceConfig
}

func NewS3Client

func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig) (*S3Client, error)

func (*S3Client) BucketName

func (c *S3Client) BucketName() string

func (*S3Client) DownloadIntoBuffer

func (c *S3Client) DownloadIntoBuffer(ctx context.Context, key string, buffer *bytes.Buffer) error

func (*S3Client) GetClient

func (c *S3Client) GetClient() *s3.Client

func (*S3Client) Head

func (c *S3Client) Head(ctx context.Context, key string) (bool, *s3.HeadObjectOutput, error)

type S3SourceConfig

type S3SourceConfig struct {
	BucketName  string
	Region      string
	EndpointURL string
	AccessKey   string
	SecretKey   string
}

type Source

type Source interface {
	Mount(localPath string) error
	Format(fsName string) error
	Unmount(localPath string) error
}

func NewJuiceFsSource

func NewJuiceFsSource(config JuiceFSConfig) (Source, error)

func NewMountPointSource

func NewMountPointSource(config MountPointConfig) (Source, error)

func NewSource

func NewSource(config SourceConfig) (Source, error)

type SourceConfig

type SourceConfig struct {
	Mode           string           `key:"mode" json:"mode"`
	FilesystemName string           `key:"fsName" json:"filesystem_name"`
	FilesystemPath string           `key:"fsPath" json:"filesystem_path"`
	JuiceFS        JuiceFSConfig    `key:"juicefs" json:"juicefs"`
	MountPoint     MountPointConfig `key:"mountpoint" json:"mountpoint"`
}

type StorageLayer

type StorageLayer interface {
}

type ValkeyConfig

type ValkeyConfig struct {
	PrimaryName     string                `key:"primaryName" json:"primary_name"`
	Password        string                `key:"password" json:"password"`
	TLS             bool                  `key:"tls" json:"tls"`
	Host            string                `key:"host" json:"host"`
	Port            int                   `key:"port" json:"port"`
	ExistingPrimary ValkeyExistingPrimary `key:"existingPrimary" json:"existingPrimary"`
}

type ValkeyExistingPrimary

type ValkeyExistingPrimary struct {
	Host string `key:"host" json:"host"`
	Port int    `key:"port" json:"port"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL