Documentation
¶
Overview ¶
Package db implements the policy layout of databases, tables, and indices as a virtual filesystem tree.
Index ¶
- Constants
- Variables
- func DefinitionPath(db, table string) string
- func IndexPath(db, table string) string
- func List(s fs.FS) ([]string, error)
- func ListComponent(s fs.FS, pattern string, part int) ([]string, error)
- func ListTables(s fs.FS, db string) ([]string, error)
- func OpenIndex(s fs.FS, db, table string, key *blockfmt.Key) (*blockfmt.Index, error)
- func OpenPartialIndex(s fs.FS, db, table string, key *blockfmt.Key) (*blockfmt.Index, error)
- func TablePrefix(db, table string) string
- func Tables(s fs.FS, db string) ([]string, error)
- func WriteDefinition(dst OutputFS, db, table string, s *Definition) error
- type ClientFS
- type Config
- type ContextFS
- type Definition
- type DirFS
- type GCConfig
- type Input
- type InputFS
- type OutputFS
- type Partition
- type Queue
- type QueueItem
- type QueueRunner
- type QueueStatus
- type RemoveFS
- type RetentionPolicy
- type S3FS
- type S3Resolver
- type Tenant
- type TenantConfig
- type TenantConfigurable
Constants ¶
const ( // DefaultMinimumAge is the default minimum // age of packed-* files to be deleted. DefaultMinimumAge = 15 * time.Minute // DefaultInputMinimumAge is the default // minimum age of inputs-* files to be deleted. DefaultInputMinimumAge = 30 * time.Second )
const ( // StatusOK indicates that a QueueItem // was processed completely. StatusOK = iota // StatusTryAgain indicates that a QueueItem // was not processed, and it should be re-tried // shortly. StatusTryAgain // StatusWriteError indicates taht a QueueItem // was not processed, and it should be re-tried // after a delay. StatusWriteError )
const DefaultAlgo = "zion+iguana_v0/specialized"
DefaultAlgo is the default compression algorithm for compressing data blocks.
const ( // DefaultBatchSize is the default queue // batching size that is used when none is set. DefaultBatchSize = 100 * mega )
const DefaultMaxInlineBytes = 100 * giga
DefaultMaxInlineBytes is the default number of decompressed bytes that we reference in blockfmt.Index.Inline before flushing references to blockfmt.Index.Indirect.
const DefaultMinMerge = 50 * 1024 * 1024
DefaultMinMerge is the default minimum merge size.
Rationale: it looks like S3 server-side copy will run at about 250MB/s, so 50MB done as a synchronous server-side copy will introduce 200ms of ingest latency over and above the baseline. That seems like a reasonable maximum overhead. (Keep in mind this is 50MB compressed, so potentially a few hundred MB decompressed.)
const DefaultRangeMultiple = 100
DefaultRangeMultiple is the default multiple of the chunk alignment at which we write out metadata.
const DefaultTargetMergeSize = 1 * giga
DefaultTargetMergeSize is the default target size for compacted packfiles.
const MaxIndexSize = 15 * 1024 * 1024
MaxIndexSize is the maximum size of an index object. (The purpose of an index size cap is to prevent us from reading arbitrarily-sized index objects before we have authenticated the objects.)
Variables ¶
var ErrBuildAgain = errors.New("partial db update")
ErrBuildAgain is returned by db.Config.Sync when only some of the input objects were successfully ingested.
Functions ¶
func DefinitionPath ¶
DefinitionPath returns the path at which the definition file for the given db and table would live relative to the root of the FS.
func IndexPath ¶
IndexPath returns the path at which the index for the given db and table would live relative to the root of the FS.
func ListComponent ¶
ListComponent performs a glob match on s for the given pattern and then yields a deduplicated list of path components corresponding to the given 0-indexed part number.
For example, part 1 of "/foo/*/baz" would yield all of the components that matched "*".
func ListTables ¶
ListTables list the names of all tables in the given database. The database name must not be empty.
A table in the returned list does not guarantee that the table exists. For example, it may have been deleted between the call to ListTables and the call to OpenIndex.
func OpenIndex ¶
OpenIndex opens an index for the specific table and database. The key must correspond to the key used to sign the index when it was first inserted into the index.
func OpenPartialIndex ¶
OpenPartialIndex is equivalent to OpenIndex, but skips decoding Index.Inputs. The returned index is suitable for queries, but not for synchronizing tables.
func TablePrefix ¶
TablePrefix returns the prefix at which the table files live relative to the root of the FS.
func WriteDefinition ¶
func WriteDefinition(dst OutputFS, db, table string, s *Definition) error
WriteDefinition writes a definition to the given database.
Types ¶
type ClientFS ¶
ClientFS is a client for a DirFS. This is meant to be used for testing purposes only.
func DecodeClientFS ¶
DecodeClientFS produces a ClientFS from the datum encoded by DirFS.Encode.
type Config ¶
type Config struct { // Algo is the compression algorithm // used for producing output data blocks. // (See [blockfmt.CompressorByName].) // If Algo is the empty string, Config // uses DefaultAlgo instead. Algo string // Align is the alignment of new // blocks to be produced in objects // inserted into the index. Align int // RangeMultiple is the multiple of Align // at which we write out metadata. // If RangeMultiple is zero, it defaults to // DefaultRangeMultiple RangeMultiple int // MinMergeSize is the base merge // size of objects. If MinMergeSize is zero, // then DefaultMinMerge is used. MinMergeSize int // TargetMergeSize is the target size of // files that are compacted into larger packfiles. // If TargetMergeSize is zero, then DefaultTargetMergeSize is used. TargetMergeSize int // MinInputBytesPerCPU, if non-zero, determines the minimum // number of input bytes necessary to cause the conversion // process to decide to use an additional CPU core. // For example, if MinInputBytesPerCPU is 512kB, then 3MB of input // data would use 6 CPU cores (provided GOMAXPROCS is at least this high). // See blockfmt.MinInputBytesPerCPU MinInputBytesPerCPU int64 // Force forces a full index rebuild // even when the input appears to be up-to-date. Force bool // Fallback determines the format for // objects when the object format is not // obvious from the file extension. Fallback func(name string) blockfmt.RowFormat // MaxScanObjects is the maximum number // of objects to be committed in a single Scan operation. // If MaxScanObjects is less than or equal to zero, // it is ignored and no limit is applied. MaxScanObjects int // MaxScanBytes is the maximum number // of bytes to ingest in a single Scan or Sync operation // (not including merging). // If MaxScanBytes is less than or equal to zero, // it is ignored and no limit is applied. MaxScanBytes int64 // MaxScanTime is the maximum amount of time // to spend listing objects before deciding // to bail out of a scan. MaxScanTime time.Duration // NewIndexScan, if true, enables scanning // for newly-created index objects. NewIndexScan bool // MaxInlineBytes is the maximum number // of (decompressed) data bytes for which // we should store references directly in // blockfmt.Index.Inline. // If this value is zero, then DefaultMaxInlineBytes // is used instead. MaxInlineBytes int64 // TargetRefSize is the target size for stored // indirect references. If this value is zero, // a reasonable default is used. TargetRefSize int64 // GCMaxDelay is the longest amount of time that // a gc cycle will spend blocking a batch insert operation. GCMaxDelay time.Duration // GCMinimumAge is the minimum time that // a packed file should be left around after // it has been dereferenced. // See blockfmt.Index.ToDelete.Expiry for // how this value is used. GCMinimumAge time.Duration // InputMinimumAge is the mininum time // that an input file leaf should be left // around after it is no longer referenced. // See blockfmt.Index.ToDelete.Expiry InputMinimumAge time.Duration // Logf, if non-nil, will be where // the builder will log build actions // as it is executing. Logf must be // safe to call from multiple goroutines // simultaneously. Logf func(f string, args ...interface{}) Verbose bool }
Config is a set of configuration items for synchronizing an Index to match a specification from a Definition.
func (*Config) Format ¶
Format picks the row format for an object based on an explicit format hint and the object name. The following are tried, in order:
- If 'chosen' is the name of a known format, then that format is returned.
- If 'name' has a suffix that indicates a known format, then that format is returned.
- If c.Fallback is non-nil, then Fallback(name) is returned.
Otherwise, Format returns nil.
func (*Config) Scan ¶
Scan performs an incremental append operation on a table by listing input objects and adding them to the index. Scan returns the number of objects added to the table or an error. If Scan returns (0, nil), then scanning has already completed and no further calls to Scan are necessary to build the table.
Semantically, Scan performs a list operation and a call to c.Append on the listed items, taking care to list incrementally from the last call to Append.
func (*Config) SetFeatures ¶
SetFeatures updates b to take into account a list of feature strings. Unknown feature strings are silently ignored.
See also Definition.Features.
type ContextFS ¶
type ContextFS interface { fs.FS // WithContext returns a copy of the file // system configured with the given context. WithContext(ctx context.Context) fs.FS }
ContextFS can be implemented by a file system which allows the file system to be configured with a context which will be applied to all file system operations.
type Definition ¶
type Definition struct { // Inputs is the list of inputs that comprise the table. Inputs []Input `json:"input,omitempty"` // Partitions specifies synthetic fields that // are generated from components of the input // URI and used to partition table data. Partitions []Partition `json:"partitions,omitempty"` // Retention is the expiration policy for data. // Data older than the expiration window will // be periodically purged from the backing // store during table updates. Retention *RetentionPolicy `json:"retention_policy,omitempty"` // Features is a list of feature flags that // can be used to turn on features for beta-testing. Features []string `json:"beta_features,omitempty"` // SkipBackfill, if true, will cause this table // to skip scanning the source bucket(s) for matching // objects when the first objects are inserted into the table. SkipBackfill bool `json:"skip_backfill,omitempty"` }
Definition describes the set of input files that belong to a table.
func DecodeDefinition ¶
func DecodeDefinition(src io.Reader) (*Definition, error)
DecodeDefinition decodes a definition from src using suffix as the hint for the format of the data in src. (You may pass the result of {file}path.Ext directly as suffix if you are reading from an os.File or fs.File.)
See also: OpenDefinition
func OpenDefinition ¶
func OpenDefinition(s fs.FS, db, table string) (*Definition, error)
OpenDefinition opens a definition for the given database and table.
OpenDefinition calls DecodeDefinition on definition.json in the appropriate path for the given db and table.
func (*Definition) Equals ¶
func (d *Definition) Equals(x *Definition) bool
Equals returns whether or not the table definitions are equivalent.
func (*Definition) Hash ¶
func (d *Definition) Hash() []byte
Hash returns a hash of the table definition that can be used to detect changes.
type DirFS ¶
DirFS is an InputFS implementation that can be used for local testing. It includes a local HTTP server bound to the loopback interface that will serve the directory contents.
func (*DirFS) Encode ¶
Encode writes the URL for the server to dst. This can be used by DecodeClientFS to access the DirFS remotely.
type GCConfig ¶
type GCConfig struct { // MinimumAge, if non-zero, specifies // the minimum age for any objects removed // during a garbage-collection pass. // Note that objects are only candidates // for garbage collection if they are older // than the current index *and* not pointed to // by the current index, so the MinimumAge requirement // is only necessary if it is possible for GC and ingest // to run simultaneously. In that case, MinimumAge should be // set to some duration longer than any possible ingest cycle. MinimumAge time.Duration InputMinimumAge time.Duration // MaxDelay is the maximum amount of time // that a GC will spend blocking batch inserts. // If MaxDelay is less than or equal to zero, // then the amount of time spent GC'ing is unlimited. MaxDelay time.Duration // Logf, if non-nil, is a callback used for logging // detailed information regarding GC decisions. Logf func(f string, args ...interface{}) // Precise determines if GC is performed // by only deleting objects that have been // explicitly marked for deletion. Precise bool }
GCConfig is a configuration for garbage collection.
type Input ¶
type Input struct { // Pattern is the glob pattern that // specifies which files are fed into // the table. Patterns should be URIs // where the URI scheme (i.e. s3://, file://, etc.) // indicates where the data ought to come from. Pattern string `json:"pattern"` // Format is the format of the files in pattern. // If Format is the empty string, then the format // will be inferred from the file extension. Format string `json:"format,omitempty"` // Hints, if non-nil, is the hints associated // with the input data. The hints may perform // type-based coercion of certain paths, and may additionally // eliminate some of the data as it is parsed. // Hints data is format-specific. Hints json.RawMessage `json:"hints,omitempty"` }
Input is one input pattern belonging to a Definition.
type Partition ¶
type Partition struct { // Field is the name of the partition field. If // this field conflicts with a field in the // input data, the partition field will // override it. Field string `json:"field"` // Type is the type of the partition field. // If this is "", this defaults to "string". Type string `json:"type,omitempty"` // Value is a template string that is used to // produce the value for the partition field. // The template may reference parts of the // input URI specified in the input pattern. // If this is "", the field name is used to // determine the input URI part that will be // used to determine the value. Value string `json:"value,omitempty"` }
A Partition defines a synthetic field that is generated from parts of an input URI and used to partition table data.
type Queue ¶
type Queue interface { // Close should be called when the // runner has finished processing items // from the queue (usually due to receiving // and external signal to stop processing events). // Close should only be called after all events // returned by Next have been processed via calls // to Finalize. io.Closer // Next should return the next item // in the queue. If the provided pause // duration is non-negative, then Next // should block for up to the provided duration // to produce a new value. If pause is negative, // then Next should block until it can return // a non-nil QueueItem value or an EOF error. // Next should return (nil, io.EOF) is the queue // has been closed and no further processing should // be performed. // // As an optimization, the returned QueueItem // can implement fs.File, which will obviate // the need for the caller to perform additional // I/O to produce a file handle associated with // the QueueItem. Next(pause time.Duration) (QueueItem, error) // Finalize is called to return the final status // of a QueueItem that was previously returned by // ReadInputs. If status is anything other than // StatusOK, then the Queue should arrange for the // item to be read by a future call to ReadInputs. // // Finalize may panic if Queue.Close has been called. Finalize(item QueueItem, status QueueStatus) }
type QueueItem ¶
type QueueItem interface { // Path should return the full path // of the file, including the fs prefix. Path() string // ETag should return the ETag of // the file. ETag() string // Size should return the size of // the file at Path in bytes. Size() int64 // EventTime should return the time // at which the queue item was inserted // into the queue. EventTime is used to // compute statistics about total queue delays. EventTime() time.Time }
QueueItem represents an item in a notification queue.
type QueueRunner ¶
type QueueRunner struct { Owner Tenant // Conf is the configuration // used for building tables. Conf Config // Logf is used to log errors encountered // while processing entries from a queue. // Logf may be nil. Logf func(f string, args ...interface{}) // Open is a hook that can be used to override // how queue items are opened. // The default behavior is to use ifs.Open(item.Path()) Open func(ifs InputFS, item QueueItem) (fs.File, error) // TableRefresh is the interval at which // table definitions are refreshed. // If TableRefresh is less than or equal // to zero, then tables are refreshed every minute. TableRefresh time.Duration // BatchSize is the maximum number of bytes // that the QueueRunner will attempt to read // from a Queue in Run before comitting the // returned items. Batches may be smaller than // BatchSize due to the expiration of BatchInterval // or due to receiving an error from the queue // after batching a non-zero number of items. // (The size of a batch is computed by summing // the QueueItem.Size values from each QueueItem // returned from Next.) // // If BatchSize is less than or equal to zero, // then DefaultBatchSize is used instead. // // See also: BatchInterval BatchSize int64 // BatchInterval is the maximum amount of // time the queue should wait for successive // calls to Queue.Next to accumulate BatchSize items. // // See also: BatchSize BatchInterval time.Duration // IOErrDelay determines how long queue processing // will pause if it encounters an I/O error from // the backing filesystem. IOErrDelay time.Duration }
QueueRunner encapsulates the state required to process a single queue.
func (*QueueRunner) Run ¶
func (q *QueueRunner) Run(in Queue) error
Run processes entries from in until ReadInputs returns io.EOF, at which point it will call in.Close.
type QueueStatus ¶
type QueueStatus int32
QueueStatus indicates the processing status of a QueueItem.
func (QueueStatus) Merge ¶
func (s QueueStatus) Merge(other QueueStatus) QueueStatus
Merge returns the more sever status of either s or other.
type RetentionPolicy ¶
type RetentionPolicy struct { // Field is the path expression for the field // used to determine the age of a record for // the purpose of the data retention policy. // // Currently only timestamp fields are // supported. Field string `json:"field,omitempty"` // ValidFor is the validity window relative to // now. // // This is a string with a format like // "<n>y<n>m<n>d" where "<n>" is a number and // any component can be omitted. // // For example: "6m", "1000d", "1y6m15d" ValidFor date.Duration `json:"valid_for"` }
RetentionPolicy describes a policy for retaining data.
For a given field and validity window, the retention policy only retains data that satisfies the relation
field >= (now - valid_for)
type S3FS ¶
S3FS is an FS implementation that is backed by an S3 bucket.
func DecodeS3FS ¶
DecodeS3FS decodes the output of (*S3FS).Encode.
type S3Resolver ¶
type S3Resolver struct { // DeriveKey is the callback used to // derive a key for a particular bucket. DeriveKey func(bucket string) (*aws.SigningKey, error) // Client, if non-nil, sets the default // client used by returned s3.BucketFS objects. Client *http.Client Ctx context.Context }
S3Resolver is a resolver that expects only s3:// schemes.
type Tenant ¶
type Tenant interface { // ID should return the unique ID of the tenant. ID() string // Key should return the key used // for verifying the integrity of database objects. Key() *blockfmt.Key // Root should return the root of the // storage for database objects. // The returned FS should implement // UploadFS if it supports writing. Root() (InputFS, error) // Split should trim the prefix off of pattern // that specifies the source filesystem and return // the result as an InputFS and the trailing glob // pattern that can be applied to the input to yield // the results. Split(pattern string) (InputFS, string, error) }
Tenant is the set of information necessary to perform queries on behalf of a tenant.
func NewLocalTenant ¶
NewLocalTenant creates a tenant that uses given FS as backend
func NewLocalTenantFromPath ¶
NewLocalTenantFromPath creates a tenats that uses DirFS with given path
type TenantConfig ¶
type TenantConfig struct { // MaxScanBytes is the maximum number of bytes // allowed to be scanned for each query. If // this is 0, there is no limit. MaxScanBytes uint64 }
TenantConfig holds configuration for each tenant.
type TenantConfigurable ¶
type TenantConfigurable interface { Tenant // Config returns the configuration options // for this tenant. This may return nil to // indicate all defaults should be used. Config() *TenantConfig }
TenantConfigurable is a tenant that may provide preferred configuration.