journal

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2025 License: BSD-2-Clause Imports: 20 Imported by: 0

README

journal

WAL-like append-only multi-file journals in Go for WALs, logs and similar use cases, with atomic commits, supporting timestamps, compression and encryption.

Documentation

Overview

Package journal implements WAL-like append-only journals. A journal is split into segments; the last segment is the one being written to.

Intended use cases:

  • Database WAL files.
  • Log files of various kinds.
  • Archival of historical database records.

Features:

  • Suitable for a large number of very short records. Per-record overhead can be as low as 2 bytes.

  • Suitable for very large records, too. (In the future, it will be possible to write records in chunks.)

  • Fault-resistant.

  • Self-healing. Verifies the checksums and truncates corrupted data when opening the journal.

  • Performant.

  • Automatically rotates the files when they reach a certain size.

TODO:

  • Trigger rotation based on time (say, each day gets a new segment). Basically limit how old in-progress segments can be.

  • Allow to rotate a file without writing a new record. (Otherwise rarely-used journals will never get archived.)

  • Give work-in-progress file a prefixed name (W*).

  • Auto-commit every N seconds, after K bytes, after M records.

  • Option for millisecond timestamp precision?

  • Reading API. (Search based on time and record ordinals.)

File format

Segment files:

  • file = segmentHeader item*
  • segmentHeader = (see struct)
  • item = record | commit
  • record = (size << 1):uvarint timestampDelta:uvarint bytes*
  • commit = checksum_with_bit_0_set:64

We always set bit 0 of commit checksums, and we use size*2 when encoding records; so bit 0 of the first byte of an item indicates whether it's a record or a commit.

Timestamps are 32-bit unix times and have 1 second precision. (Rationale is that the primary use of timestamps is to search logs by time, and that does not require a higher precision. For high-frequency logs, with 1-second precision, timestamp deltas will typically fit within 1 byte.)

Index

Constants

View Source
const DefaultMaxFileSize = 10 * 1024 * 1024

Variables

View Source
var (
	ErrIncompatible       = fmt.Errorf("incompatible journal")
	ErrUnsupportedVersion = fmt.Errorf("unsupported journal version")
)
View Source
var ErrInternal = errors.New("journal: internal error")
View Source
var ErrInvalidTimestamp = errors.New("invalid timestamp")
View Source
var ErrMissingSealKey = errors.New("missing seal key")

Functions

func ParseTime added in v0.0.3

func ParseTime(s string) (uint64, error)

func TimeToStr added in v0.0.6

func TimeToStr(t time.Time) string

func ToTime

func ToTime(ts uint64) time.Time

func ToTimestamp

func ToTimestamp(t time.Time) uint64

Types

type AutocommitOptions added in v0.0.2

type AutocommitOptions struct {
	Interval time.Duration
}

type AutorotateOptions

type AutorotateOptions struct {
	Interval time.Duration
}

type Cursor

type Cursor struct {
	Record
	// contains filtered or unexported fields
}

func (*Cursor) Close

func (c *Cursor) Close()

func (*Cursor) Err

func (c *Cursor) Err() error

func (*Cursor) Next

func (c *Cursor) Next() bool

type Filter

type Filter struct {
	MinRecordID  uint64
	MinTimestamp uint64
	MaxRecordID  uint64
	MaxTimestamp uint64
}

type Journal

type Journal struct {
	// contains filtered or unexported fields
}

func New

func New(dir string, o Options) *Journal

func (*Journal) Autocommit added in v0.0.2

func (j *Journal) Autocommit(now uint64) (bool, error)

func (*Journal) Autorotate

func (j *Journal) Autorotate(now uint64) (bool, error)

func (*Journal) CanSeal added in v0.0.6

func (j *Journal) CanSeal() bool

func (*Journal) Commit

func (j *Journal) Commit() error

func (*Journal) FindSegments

func (j *Journal) FindSegments(filter Filter) ([]Segment, error)

func (*Journal) FinishWriting

func (j *Journal) FinishWriting() error

func (*Journal) Initialize

func (j *Journal) Initialize() error

func (*Journal) Now

func (j *Journal) Now() uint64

func (*Journal) QuickSummary added in v0.0.3

func (j *Journal) QuickSummary() (Summary, error)

QuickSummary returns the general information about the journal, without actually reading the journal data. So if the journal hasn't been opened yet, last committed record information will not be available.

func (*Journal) Read

func (j *Journal) Read(filter Filter) *Cursor

func (*Journal) Records

func (j *Journal) Records(filter Filter, fail func(error)) iter.Seq[Record]

func (*Journal) Rotate

func (j *Journal) Rotate() error

func (*Journal) Seal added in v0.0.6

func (j *Journal) Seal(ctx context.Context) (Segment, error)

func (*Journal) SealAndTrimAll added in v0.0.6

func (j *Journal) SealAndTrimAll(ctx context.Context) (int, error)

func (*Journal) SealAndTrimOnce added in v0.0.6

func (j *Journal) SealAndTrimOnce(ctx context.Context) (int, error)

func (*Journal) StartWriting

func (j *Journal) StartWriting()

func (*Journal) String

func (j *Journal) String() string

func (*Journal) Summary added in v0.0.3

func (j *Journal) Summary() (Summary, error)

Summary returns the general information about the journal, including the last committed record. If necessary, it opens, verifies and repairs the journal in the process.

func (*Journal) Trim added in v0.0.6

func (j *Journal) Trim() (Segment, error)

func (*Journal) WriteRecord

func (j *Journal) WriteRecord(timestamp uint64, data []byte) error

type Meta added in v0.0.3

type Meta struct {
	ID        uint64
	Timestamp uint64
}

func (Meta) IsNonZero added in v0.0.5

func (meta Meta) IsNonZero() bool

func (Meta) IsZero added in v0.0.3

func (meta Meta) IsZero() bool

func (Meta) Time added in v0.0.3

func (meta Meta) Time() time.Time

type Options

type Options struct {
	FileName         string // e.g. "mydb-*.bin"
	MaxFileSize      int64  // new segment after this size
	DebugName        string
	Now              func() time.Time
	JournalInvariant [32]byte
	SegmentInvariant [32]byte
	Autorotate       AutorotateOptions
	Autocommit       AutocommitOptions

	Context context.Context
	Logger  *slog.Logger
	Verbose bool

	OnChange func()

	SealKeys []*sealer.Key
	SealOpts sealer.SealOptions
}

type Record

type Record struct {
	ID        uint64
	Timestamp uint64
	Data      []byte
}

func (*Record) Time

func (rec *Record) Time() time.Time

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

func (Segment) FirstRecord added in v0.0.3

func (seg Segment) FirstRecord() Meta

func (Segment) IsNonZero added in v0.0.5

func (seg Segment) IsNonZero() bool

func (Segment) IsZero

func (seg Segment) IsZero() bool

func (Segment) RecordNumber added in v0.0.3

func (seg Segment) RecordNumber() uint64

func (Segment) SegmentNumber added in v0.0.3

func (seg Segment) SegmentNumber() uint64

func (Segment) String

func (seg Segment) String() string

func (Segment) Time added in v0.0.3

func (seg Segment) Time() time.Time

func (Segment) Timestamp added in v0.0.3

func (seg Segment) Timestamp() uint64

type Set

type Set struct {
	// contains filtered or unexported fields
}

func NewSet

func NewSet(opt SetOptions) *Set

func (*Set) Add

func (set *Set) Add(j *Journal)

func (*Set) Autocommit added in v0.0.6

func (set *Set) Autocommit(ctx context.Context) int

func (*Set) Autoseal added in v0.0.6

func (set *Set) Autoseal(ctx context.Context) int

func (*Set) Journals

func (set *Set) Journals() []*Journal

func (*Set) Process

func (set *Set) Process(ctx context.Context) int

func (*Set) Remove

func (set *Set) Remove(j *Journal)

func (*Set) StartBackground

func (set *Set) StartBackground(ctx context.Context) *SetRunner

type SetOptions

type SetOptions struct {
	Now             func() time.Time
	Logger          *slog.Logger
	AutosealEnabled bool
	AutosealDelay   time.Duration
}

type SetRunner

type SetRunner struct {
	// contains filtered or unexported fields
}

func (*SetRunner) Close

func (runner *SetRunner) Close()

type Status

type Status uint8
const (
	Invalid Status = iota
	Draft
	Finalized
	Sealed
)

func (Status) CanSeal added in v0.0.6

func (s Status) CanSeal() bool

func (Status) IsDraft

func (s Status) IsDraft() bool

func (Status) IsSealed

func (s Status) IsSealed() bool

type Summary added in v0.0.3

type Summary struct {
	FirstSealedSegment   Segment
	LastSealedSegment    Segment
	FirstUnsealedSegment Segment
	LastUnsealedSegment  Segment
	SegmentCount         int
	LastCommitted        Meta
	LastUncommitted      Meta
}

func (*Summary) FirstRecord added in v0.0.3

func (s *Summary) FirstRecord() Meta

func (*Summary) UncommittedCount added in v0.0.5

func (s *Summary) UncommittedCount() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL