lrdb

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2025 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchAlreadyClosed = errors.New("batch already closed")
)

Functions

func AddFlushFunction

func AddFlushFunction(f flushFunction)

func FlushCaches

func FlushCaches()

func IsPartitionTableRemembered

func IsPartitionTableRemembered(tableName string) bool

func NewConnectionPool

func NewConnectionPool(ctx context.Context, url string) (*pgxpool.Pool, error)

NewMetadataConnectionPool creates a new connection pool using the PostgreSQL connection string provided, and using pgx v5.

func RememberPartitionTable

func RememberPartitionTable(tableName string)

Types

type ActionEnum

type ActionEnum string
const (
	ActionEnumCompact ActionEnum = "compact"
	ActionEnumRollup  ActionEnum = "rollup"
)

func (*ActionEnum) Scan

func (e *ActionEnum) Scan(src interface{}) error

type BatchDeleteMetricSegsBatchResults

type BatchDeleteMetricSegsBatchResults struct {
	// contains filtered or unexported fields
}

func (*BatchDeleteMetricSegsBatchResults) Close

func (*BatchDeleteMetricSegsBatchResults) Exec

func (b *BatchDeleteMetricSegsBatchResults) Exec(f func(int, error))

type BatchDeleteMetricSegsParams

type BatchDeleteMetricSegsParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	FrequencyMs    int32     `json:"frequency_ms"`
	SegmentID      int64     `json:"segment_id"`
	InstanceNum    int16     `json:"instance_num"`
	TidPartition   int16     `json:"tid_partition"`
}

type BatchInsertMetricSegsBatchResults

type BatchInsertMetricSegsBatchResults struct {
	// contains filtered or unexported fields
}

func (*BatchInsertMetricSegsBatchResults) Close

func (*BatchInsertMetricSegsBatchResults) Exec

func (b *BatchInsertMetricSegsBatchResults) Exec(f func(int, error))

type BatchInsertMetricSegsParams

type BatchInsertMetricSegsParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	IngestDateint  int32     `json:"ingest_dateint"`
	FrequencyMs    int32     `json:"frequency_ms"`
	SegmentID      int64     `json:"segment_id"`
	InstanceNum    int16     `json:"instance_num"`
	TidPartition   int16     `json:"tid_partition"`
	StartTs        int64     `json:"start_ts"`
	EndTs          int64     `json:"end_ts"`
	RecordCount    int64     `json:"record_count"`
	FileSize       int64     `json:"file_size"`
	TidCount       int32     `json:"tid_count"`
	Published      bool      `json:"published"`
	CreatedBy      CreatedBy `json:"created_by"`
	Rolledup       bool      `json:"rolledup"`
}

type BatchMarkMetricSegsRolledupBatchResults

type BatchMarkMetricSegsRolledupBatchResults struct {
	// contains filtered or unexported fields
}

func (*BatchMarkMetricSegsRolledupBatchResults) Close

func (*BatchMarkMetricSegsRolledupBatchResults) Exec

type BatchMarkMetricSegsRolledupParams

type BatchMarkMetricSegsRolledupParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	FrequencyMs    int32     `json:"frequency_ms"`
	SegmentID      int64     `json:"segment_id"`
	InstanceNum    int16     `json:"instance_num"`
	TidPartition   int16     `json:"tid_partition"`
}

type BatchUpsertExemplarLogsBatchResults

type BatchUpsertExemplarLogsBatchResults struct {
	// contains filtered or unexported fields
}

func (*BatchUpsertExemplarLogsBatchResults) Close

func (*BatchUpsertExemplarLogsBatchResults) QueryRow

func (b *BatchUpsertExemplarLogsBatchResults) QueryRow(f func(int, bool, error))

type BatchUpsertExemplarLogsParams

type BatchUpsertExemplarLogsParams struct {
	OrganizationID      uuid.UUID      `json:"organization_id"`
	ServiceIdentifierID uuid.UUID      `json:"service_identifier_id"`
	Fingerprint         int64          `json:"fingerprint"`
	Attributes          map[string]any `json:"attributes"`
	Exemplar            map[string]any `json:"exemplar"`
	OldFingerprint      int64          `json:"old_fingerprint"`
}

type BatchUpsertExemplarMetricsBatchResults

type BatchUpsertExemplarMetricsBatchResults struct {
	// contains filtered or unexported fields
}

func (*BatchUpsertExemplarMetricsBatchResults) Close

func (*BatchUpsertExemplarMetricsBatchResults) QueryRow

func (b *BatchUpsertExemplarMetricsBatchResults) QueryRow(f func(int, bool, error))

type BatchUpsertExemplarMetricsParams

type BatchUpsertExemplarMetricsParams struct {
	OrganizationID      uuid.UUID      `json:"organization_id"`
	ServiceIdentifierID uuid.UUID      `json:"service_identifier_id"`
	MetricName          string         `json:"metric_name"`
	MetricType          string         `json:"metric_type"`
	Attributes          map[string]any `json:"attributes"`
	Exemplar            map[string]any `json:"exemplar"`
}

type BatchUpsertExemplarTracesBatchResults

type BatchUpsertExemplarTracesBatchResults struct {
	// contains filtered or unexported fields
}

func (*BatchUpsertExemplarTracesBatchResults) Close

func (*BatchUpsertExemplarTracesBatchResults) QueryRow

func (b *BatchUpsertExemplarTracesBatchResults) QueryRow(f func(int, bool, error))

type BatchUpsertExemplarTracesParams

type BatchUpsertExemplarTracesParams struct {
	OrganizationID      uuid.UUID      `json:"organization_id"`
	ServiceIdentifierID uuid.UUID      `json:"service_identifier_id"`
	Fingerprint         int64          `json:"fingerprint"`
	Attributes          map[string]any `json:"attributes"`
	Exemplar            map[string]any `json:"exemplar"`
	SpanName            string         `json:"span_name"`
	SpanKind            int32          `json:"span_kind"`
}

type ClaimInqueueWorkParams

type ClaimInqueueWorkParams struct {
	ClaimedBy     int64  `json:"claimed_by"`
	TelemetryType string `json:"telemetry_type"`
}

type CompactLogSegmentsParams

type CompactLogSegmentsParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	IngestDateint  int32     `json:"ingest_dateint"`
	NewSegmentID   int64     `json:"new_segment_id"`
	InstanceNum    int16     `json:"instance_num"`
	NewRecordCount int64     `json:"new_record_count"`
	NewFileSize    int64     `json:"new_file_size"`
	NewStartTs     int64     `json:"new_start_ts"`
	NewEndTs       int64     `json:"new_end_ts"`
	CreatedBy      CreatedBy `json:"created_by"`
	OldSegmentIds  []int64   `json:"old_segment_ids"`
}

type CreatedBy

type CreatedBy int16
const (
	CreatedByUnknown CreatedBy = iota
	CreatedByIngest
	CreatedByCompact
	CreateByRollup
)

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
	SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
}

type DeleteInqueueWorkParams

type DeleteInqueueWorkParams struct {
	ID        uuid.UUID `json:"id"`
	ClaimedBy int64     `json:"claimed_by"`
}

type ExemplarLog

type ExemplarLog struct {
	CreatedAt           time.Time      `json:"created_at"`
	UpdatedAt           time.Time      `json:"updated_at"`
	OrganizationID      uuid.UUID      `json:"organization_id"`
	ServiceIdentifierID uuid.UUID      `json:"service_identifier_id"`
	Attributes          map[string]any `json:"attributes"`
	Exemplar            map[string]any `json:"exemplar"`
	Fingerprint         int64          `json:"fingerprint"`
	RelatedFingerprints []int64        `json:"related_fingerprints"`
}

type ExemplarMetric

type ExemplarMetric struct {
	CreatedAt           time.Time      `json:"created_at"`
	UpdatedAt           time.Time      `json:"updated_at"`
	OrganizationID      uuid.UUID      `json:"organization_id"`
	ServiceIdentifierID uuid.UUID      `json:"service_identifier_id"`
	Attributes          map[string]any `json:"attributes"`
	Exemplar            map[string]any `json:"exemplar"`
	MetricName          string         `json:"metric_name"`
	MetricType          string         `json:"metric_type"`
}

type ExemplarTrace

type ExemplarTrace struct {
	CreatedAt           time.Time      `json:"created_at"`
	UpdatedAt           time.Time      `json:"updated_at"`
	OrganizationID      uuid.UUID      `json:"organization_id"`
	ServiceIdentifierID uuid.UUID      `json:"service_identifier_id"`
	Attributes          map[string]any `json:"attributes"`
	Exemplar            map[string]any `json:"exemplar"`
	Fingerprint         int64          `json:"fingerprint"`
	SpanName            string         `json:"span_name"`
	SpanKind            int32          `json:"span_kind"`
}

type GetExemplarLogsByFingerprintParams

type GetExemplarLogsByFingerprintParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Fingerprint    int64     `json:"fingerprint"`
}

type GetExemplarLogsByServiceParams

type GetExemplarLogsByServiceParams struct {
	OrganizationID      uuid.UUID `json:"organization_id"`
	ServiceIdentifierID uuid.UUID `json:"service_identifier_id"`
}

type GetExemplarMetricsByServiceParams

type GetExemplarMetricsByServiceParams struct {
	OrganizationID      uuid.UUID `json:"organization_id"`
	ServiceIdentifierID uuid.UUID `json:"service_identifier_id"`
}

type GetExemplarTracesByFingerprintParams

type GetExemplarTracesByFingerprintParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Fingerprint    int64     `json:"fingerprint"`
}

type GetExemplarTracesByServiceParams

type GetExemplarTracesByServiceParams struct {
	OrganizationID      uuid.UUID `json:"organization_id"`
	ServiceIdentifierID uuid.UUID `json:"service_identifier_id"`
}

type GetLogSegmentsForCompactionParams

type GetLogSegmentsForCompactionParams struct {
	OrganizationID  uuid.UUID `json:"organization_id"`
	Dateint         int32     `json:"dateint"`
	InstanceNum     int16     `json:"instance_num"`
	MaxFileSize     int64     `json:"max_file_size"`
	CursorCreatedAt time.Time `json:"cursor_created_at"`
	CursorSegmentID int64     `json:"cursor_segment_id"`
	Maxrows         int32     `json:"maxrows"`
}

type GetLogSegmentsForCompactionRow

type GetLogSegmentsForCompactionRow struct {
	SegmentID     int64     `json:"segment_id"`
	StartTs       int64     `json:"start_ts"`
	EndTs         int64     `json:"end_ts"`
	FileSize      int64     `json:"file_size"`
	RecordCount   int64     `json:"record_count"`
	IngestDateint int32     `json:"ingest_dateint"`
	CreatedAt     time.Time `json:"created_at"`
}

type GetMetricSegsForCompactionParams

type GetMetricSegsForCompactionParams struct {
	OrganizationID  uuid.UUID `json:"organization_id"`
	Dateint         int32     `json:"dateint"`
	FrequencyMs     int32     `json:"frequency_ms"`
	InstanceNum     int16     `json:"instance_num"`
	StartTs         int64     `json:"start_ts"`
	EndTs           int64     `json:"end_ts"`
	MaxFileSize     int64     `json:"max_file_size"`
	CursorCreatedAt time.Time `json:"cursor_created_at"`
	CursorSegmentID int64     `json:"cursor_segment_id"`
	Maxrows         int32     `json:"maxrows"`
}

type GetMetricSegsForRollupParams

type GetMetricSegsForRollupParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	FrequencyMs    int32     `json:"frequency_ms"`
	InstanceNum    int16     `json:"instance_num"`
	StartTs        int64     `json:"start_ts"`
	EndTs          int64     `json:"end_ts"`
}

type GetSpanInfoByFingerprintParams

type GetSpanInfoByFingerprintParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Fingerprint    int64     `json:"fingerprint"`
}

type GetSpanInfoByFingerprintRow

type GetSpanInfoByFingerprintRow struct {
	Exemplar map[string]any `json:"exemplar"`
	SpanName string         `json:"span_name"`
	SpanKind int32          `json:"span_kind"`
}

type Inqueue

type Inqueue struct {
	ID             uuid.UUID  `json:"id"`
	QueueTs        time.Time  `json:"queue_ts"`
	Priority       int32      `json:"priority"`
	OrganizationID uuid.UUID  `json:"organization_id"`
	CollectorName  string     `json:"collector_name"`
	InstanceNum    int16      `json:"instance_num"`
	Bucket         string     `json:"bucket"`
	ObjectID       string     `json:"object_id"`
	TelemetryType  string     `json:"telemetry_type"`
	Tries          int32      `json:"tries"`
	ClaimedBy      int64      `json:"claimed_by"`
	ClaimedAt      *time.Time `json:"claimed_at"`
}

type InqueueJournal

type InqueueJournal struct {
	ID             int64     `json:"id"`
	OrganizationID uuid.UUID `json:"organization_id"`
	Bucket         string    `json:"bucket"`
	ObjectID       string    `json:"object_id"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

type InqueueJournalDeleteParams

type InqueueJournalDeleteParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Bucket         string    `json:"bucket"`
	ObjectID       string    `json:"object_id"`
}

type InqueueJournalUpsertParams

type InqueueJournalUpsertParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Bucket         string    `json:"bucket"`
	ObjectID       string    `json:"object_id"`
}

type InsertLogSegmentParams

type InsertLogSegmentParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	IngestDateint  int32     `json:"ingest_dateint"`
	SegmentID      int64     `json:"segment_id"`
	InstanceNum    int16     `json:"instance_num"`
	StartTs        int64     `json:"start_ts"`
	EndTs          int64     `json:"end_ts"`
	RecordCount    int64     `json:"record_count"`
	FileSize       int64     `json:"file_size"`
	CreatedBy      CreatedBy `json:"created_by"`
	Fingerprints   []int64   `json:"fingerprints"`
}

type InsertMetricSegmentParams

type InsertMetricSegmentParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	Dateint        int32     `json:"dateint"`
	IngestDateint  int32     `json:"ingest_dateint"`
	FrequencyMs    int32     `json:"frequency_ms"`
	SegmentID      int64     `json:"segment_id"`
	InstanceNum    int16     `json:"instance_num"`
	TidPartition   int16     `json:"tid_partition"`
	StartTs        int64     `json:"start_ts"`
	EndTs          int64     `json:"end_ts"`
	RecordCount    int64     `json:"record_count"`
	FileSize       int64     `json:"file_size"`
	TidCount       int32     `json:"tid_count"`
	CreatedBy      CreatedBy `json:"created_by"`
	Published      bool      `json:"published"`
}

type ListSegmentsForQueryParams

type ListSegmentsForQueryParams struct {
	Int8range      int64     `json:"int8range"`
	Int8range_2    int64     `json:"int8range_2"`
	Dateint        int32     `json:"dateint"`
	FrequencyMs    int32     `json:"frequency_ms"`
	OrganizationID uuid.UUID `json:"organization_id"`
}

type ListSegmentsForQueryRow

type ListSegmentsForQueryRow struct {
	InstanceNum int16 `json:"instance_num"`
	SegmentID   int64 `json:"segment_id"`
	StartTs     int64 `json:"start_ts"`
	EndTs       int64 `json:"end_ts"`
}

type LogSeg

type LogSeg struct {
	OrganizationID uuid.UUID                 `json:"organization_id"`
	Dateint        int32                     `json:"dateint"`
	SegmentID      int64                     `json:"segment_id"`
	InstanceNum    int16                     `json:"instance_num"`
	Fingerprints   []int64                   `json:"fingerprints"`
	RecordCount    int64                     `json:"record_count"`
	FileSize       int64                     `json:"file_size"`
	IngestDateint  int32                     `json:"ingest_dateint"`
	TsRange        pgtype.Range[pgtype.Int8] `json:"ts_range"`
	CreatedBy      CreatedBy                 `json:"created_by"`
	CreatedAt      time.Time                 `json:"created_at"`
}

type LogSegEstimatorParams

type LogSegEstimatorParams struct {
	DateintLow  int32 `json:"dateint_low"`
	DateintHigh int32 `json:"dateint_high"`
	MsLow       int64 `json:"ms_low"`
	MsHigh      int64 `json:"ms_high"`
}

type LogSegEstimatorRow

type LogSegEstimatorRow struct {
	OrganizationID   uuid.UUID `json:"organization_id"`
	InstanceNum      int16     `json:"instance_num"`
	EstimatedRecords int64     `json:"estimated_records"`
}

type LogSegmentUpserter

type LogSegmentUpserter interface {
	//InsertLogFingerprints(ctx context.Context, params InsertLogFingerprintsParams) error
	InsertLogSegment(ctx context.Context, params InsertLogSegmentParams) error
}

type MetricSeg

type MetricSeg struct {
	OrganizationID uuid.UUID                 `json:"organization_id"`
	Dateint        int32                     `json:"dateint"`
	FrequencyMs    int32                     `json:"frequency_ms"`
	SegmentID      int64                     `json:"segment_id"`
	InstanceNum    int16                     `json:"instance_num"`
	TidPartition   int16                     `json:"tid_partition"`
	TsRange        pgtype.Range[pgtype.Int8] `json:"ts_range"`
	RecordCount    int64                     `json:"record_count"`
	FileSize       int64                     `json:"file_size"`
	TidCount       int32                     `json:"tid_count"`
	IngestDateint  int32                     `json:"ingest_dateint"`
	Published      bool                      `json:"published"`
	Rolledup       bool                      `json:"rolledup"`
	CreatedAt      time.Time                 `json:"created_at"`
	CreatedBy      CreatedBy                 `json:"created_by"`
}

type MetricSegEstimatorParams

type MetricSegEstimatorParams struct {
	DateintLow  int32 `json:"dateint_low"`
	DateintHigh int32 `json:"dateint_high"`
	MsLow       int64 `json:"ms_low"`
	MsHigh      int64 `json:"ms_high"`
}

type MetricSegEstimatorRow

type MetricSegEstimatorRow struct {
	OrganizationID   uuid.UUID `json:"organization_id"`
	InstanceNum      int16     `json:"instance_num"`
	EstimatedRecords int64     `json:"estimated_records"`
}

type MetricSegmentInserter

type MetricSegmentInserter interface {
	InsertMetricSegment(ctx context.Context, params InsertMetricSegmentParams) error
}

type NullActionEnum

type NullActionEnum struct {
	ActionEnum ActionEnum `json:"action_enum"`
	Valid      bool       `json:"valid"` // Valid is true if ActionEnum is not NULL
}

func (*NullActionEnum) Scan

func (ns *NullActionEnum) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullActionEnum) Value

func (ns NullActionEnum) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type NullSignalEnum

type NullSignalEnum struct {
	SignalEnum SignalEnum `json:"signal_enum"`
	Valid      bool       `json:"valid"` // Valid is true if SignalEnum is not NULL
}

func (*NullSignalEnum) Scan

func (ns *NullSignalEnum) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullSignalEnum) Value

func (ns NullSignalEnum) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type ObjCleanup

type ObjCleanup struct {
	ID             uuid.UUID `json:"id"`
	DeleteAt       time.Time `json:"delete_at"`
	OrganizationID uuid.UUID `json:"organization_id"`
	InstanceNum    int16     `json:"instance_num"`
	BucketID       string    `json:"bucket_id"`
	ObjectID       string    `json:"object_id"`
	Tries          int32     `json:"tries"`
}

type ObjectCleanupAddParams

type ObjectCleanupAddParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	InstanceNum    int16     `json:"instance_num"`
	BucketID       string    `json:"bucket_id"`
	ObjectID       string    `json:"object_id"`
}

type ObjectCleanupGetRow

type ObjectCleanupGetRow struct {
	ID             uuid.UUID `json:"id"`
	OrganizationID uuid.UUID `json:"organization_id"`
	InstanceNum    int16     `json:"instance_num"`
	BucketID       string    `json:"bucket_id"`
	ObjectID       string    `json:"object_id"`
}

type PutInqueueWorkParams

type PutInqueueWorkParams struct {
	OrganizationID uuid.UUID `json:"organization_id"`
	CollectorName  string    `json:"collector_name"`
	InstanceNum    int16     `json:"instance_num"`
	Bucket         string    `json:"bucket"`
	ObjectID       string    `json:"object_id"`
	TelemetryType  string    `json:"telemetry_type"`
	Priority       int32     `json:"priority"`
}

type Querier

type Querier interface {
	BatchDeleteMetricSegs(ctx context.Context, arg []BatchDeleteMetricSegsParams) *BatchDeleteMetricSegsBatchResults
	BatchInsertMetricSegs(ctx context.Context, arg []BatchInsertMetricSegsParams) *BatchInsertMetricSegsBatchResults
	BatchMarkMetricSegsRolledup(ctx context.Context, arg []BatchMarkMetricSegsRolledupParams) *BatchMarkMetricSegsRolledupBatchResults
	// This will upsert a new log exemplar. Attributes, exemplar, and updated_at are always updated
	// to the provided values. If old_fingerprint is not 0, it is added to the list of related
	// fingerprints. This means the "old" fingerprint should be fingerprint, so it always updates
	// an existing record, not changing it to the new one.
	// The return value is a boolean indicating if the record is new.
	BatchUpsertExemplarLogs(ctx context.Context, arg []BatchUpsertExemplarLogsParams) *BatchUpsertExemplarLogsBatchResults
	BatchUpsertExemplarMetrics(ctx context.Context, arg []BatchUpsertExemplarMetricsParams) *BatchUpsertExemplarMetricsBatchResults
	BatchUpsertExemplarTraces(ctx context.Context, arg []BatchUpsertExemplarTracesParams) *BatchUpsertExemplarTracesBatchResults
	ClaimInqueueWork(ctx context.Context, arg ClaimInqueueWorkParams) (Inqueue, error)
	CleanupInqueueWork(ctx context.Context) error
	CompactLogSegments(ctx context.Context, arg CompactLogSegmentsParams) error
	DeleteInqueueWork(ctx context.Context, arg DeleteInqueueWorkParams) error
	GetExemplarLogsByFingerprint(ctx context.Context, arg GetExemplarLogsByFingerprintParams) (ExemplarLog, error)
	GetExemplarLogsByService(ctx context.Context, arg GetExemplarLogsByServiceParams) ([]ExemplarLog, error)
	GetExemplarLogsCreatedAfter(ctx context.Context, ts time.Time) ([]ExemplarLog, error)
	GetExemplarMetricsByService(ctx context.Context, arg GetExemplarMetricsByServiceParams) ([]ExemplarMetric, error)
	GetExemplarMetricsCreatedAfter(ctx context.Context, ts time.Time) ([]ExemplarMetric, error)
	GetExemplarTracesByFingerprint(ctx context.Context, arg GetExemplarTracesByFingerprintParams) (ExemplarTrace, error)
	GetExemplarTracesByService(ctx context.Context, arg GetExemplarTracesByServiceParams) ([]ExemplarTrace, error)
	GetExemplarTracesCreatedAfter(ctx context.Context, ts time.Time) ([]ExemplarTrace, error)
	GetLogSegmentsForCompaction(ctx context.Context, arg GetLogSegmentsForCompactionParams) ([]GetLogSegmentsForCompactionRow, error)
	GetMetricSegsForCompaction(ctx context.Context, arg GetMetricSegsForCompactionParams) ([]MetricSeg, error)
	GetMetricSegsForRollup(ctx context.Context, arg GetMetricSegsForRollupParams) ([]MetricSeg, error)
	GetSpanInfoByFingerprint(ctx context.Context, arg GetSpanInfoByFingerprintParams) (GetSpanInfoByFingerprintRow, error)
	InqueueJournalDelete(ctx context.Context, arg InqueueJournalDeleteParams) error
	InqueueJournalUpsert(ctx context.Context, arg InqueueJournalUpsertParams) (bool, error)
	InsertLogSegmentDirect(ctx context.Context, arg InsertLogSegmentParams) error
	InsertMetricSegmentDirect(ctx context.Context, arg InsertMetricSegmentParams) error
	ListSegmentsForQuery(ctx context.Context, arg ListSegmentsForQueryParams) ([]ListSegmentsForQueryRow, error)
	// Returns an estimate of the number of log segments, average bytes, average records,
	// and average bytes per record for log segments in the last hour per organization and instance.
	// This query is basically identical to the MetricSegEstimator, but for log segments.
	LogSegEstimator(ctx context.Context, arg LogSegEstimatorParams) ([]LogSegEstimatorRow, error)
	// Returns an estimate of the number of metric segments, average bytes, average records,
	// and average bytes per record for metric segments in the last hour per organization and instance.
	// This query is basically identical to the LogSegEstimator, but for metric segments.
	MetricSegEstimator(ctx context.Context, arg MetricSegEstimatorParams) ([]MetricSegEstimatorRow, error)
	ObjectCleanupAdd(ctx context.Context, arg ObjectCleanupAddParams) error
	ObjectCleanupComplete(ctx context.Context, id uuid.UUID) error
	ObjectCleanupFail(ctx context.Context, id uuid.UUID) error
	ObjectCleanupGet(ctx context.Context, maxrows int32) ([]ObjectCleanupGetRow, error)
	PutInqueueWork(ctx context.Context, arg PutInqueueWorkParams) error
	ReleaseInqueueWork(ctx context.Context, arg ReleaseInqueueWorkParams) error
	SignalLockCleanup(ctx context.Context) (int32, error)
	TouchInqueueWork(ctx context.Context, arg TouchInqueueWorkParams) error
	UpsertServiceIdentifier(ctx context.Context, arg UpsertServiceIdentifierParams) (UpsertServiceIdentifierRow, error)
	WorkQueueAddDirect(ctx context.Context, arg WorkQueueAddParams) error
	WorkQueueClaimDirect(ctx context.Context, arg WorkQueueClaimParams) (WorkQueueClaimRow, error)
	WorkQueueCleanupDirect(ctx context.Context) ([]WorkQueueCleanupRow, error)
	WorkQueueCompleteDirect(ctx context.Context, arg WorkQueueCompleteParams) error
	WorkQueueFailDirect(ctx context.Context, arg WorkQueueFailParams) error
	WorkQueueGC(ctx context.Context, arg WorkQueueGCParams) (int32, error)
	WorkQueueGlobalLock(ctx context.Context) error
	// 1) heart-beat the work_queue
	WorkQueueHeartbeatDirect(ctx context.Context, arg WorkQueueHeartbeatParams) error
}

type QuerierFull

type QuerierFull interface {
	Querier
	ReplaceMetricSegs(ctx context.Context, args ReplaceMetricSegsParams) error
}

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) BatchUpsertExemplarLogs

This will upsert a new log exemplar. Attributes, exemplar, and updated_at are always updated to the provided values. If old_fingerprint is not 0, it is added to the list of related fingerprints. This means the "old" fingerprint should be fingerprint, so it always updates an existing record, not changing it to the new one. The return value is a boolean indicating if the record is new.

func (*Queries) ClaimInqueueWork

func (q *Queries) ClaimInqueueWork(ctx context.Context, arg ClaimInqueueWorkParams) (Inqueue, error)

func (*Queries) CleanupInqueueWork

func (q *Queries) CleanupInqueueWork(ctx context.Context) error

func (*Queries) CompactLogSegments

func (q *Queries) CompactLogSegments(ctx context.Context, arg CompactLogSegmentsParams) error

func (*Queries) DeleteInqueueWork

func (q *Queries) DeleteInqueueWork(ctx context.Context, arg DeleteInqueueWorkParams) error

func (*Queries) GetExemplarLogsByFingerprint

func (q *Queries) GetExemplarLogsByFingerprint(ctx context.Context, arg GetExemplarLogsByFingerprintParams) (ExemplarLog, error)

func (*Queries) GetExemplarLogsByService

func (q *Queries) GetExemplarLogsByService(ctx context.Context, arg GetExemplarLogsByServiceParams) ([]ExemplarLog, error)

func (*Queries) GetExemplarLogsCreatedAfter

func (q *Queries) GetExemplarLogsCreatedAfter(ctx context.Context, ts time.Time) ([]ExemplarLog, error)

func (*Queries) GetExemplarMetricsByService

func (q *Queries) GetExemplarMetricsByService(ctx context.Context, arg GetExemplarMetricsByServiceParams) ([]ExemplarMetric, error)

func (*Queries) GetExemplarMetricsCreatedAfter

func (q *Queries) GetExemplarMetricsCreatedAfter(ctx context.Context, ts time.Time) ([]ExemplarMetric, error)

func (*Queries) GetExemplarTracesByFingerprint

func (q *Queries) GetExemplarTracesByFingerprint(ctx context.Context, arg GetExemplarTracesByFingerprintParams) (ExemplarTrace, error)

func (*Queries) GetExemplarTracesByService

func (q *Queries) GetExemplarTracesByService(ctx context.Context, arg GetExemplarTracesByServiceParams) ([]ExemplarTrace, error)

func (*Queries) GetExemplarTracesCreatedAfter

func (q *Queries) GetExemplarTracesCreatedAfter(ctx context.Context, ts time.Time) ([]ExemplarTrace, error)

func (*Queries) GetMetricSegsForCompaction

func (q *Queries) GetMetricSegsForCompaction(ctx context.Context, arg GetMetricSegsForCompactionParams) ([]MetricSeg, error)

func (*Queries) GetMetricSegsForRollup

func (q *Queries) GetMetricSegsForRollup(ctx context.Context, arg GetMetricSegsForRollupParams) ([]MetricSeg, error)

func (*Queries) GetSpanInfoByFingerprint

func (*Queries) InqueueJournalDelete

func (q *Queries) InqueueJournalDelete(ctx context.Context, arg InqueueJournalDeleteParams) error

func (*Queries) InqueueJournalUpsert

func (q *Queries) InqueueJournalUpsert(ctx context.Context, arg InqueueJournalUpsertParams) (bool, error)

func (*Queries) InsertLogSegmentDirect

func (q *Queries) InsertLogSegmentDirect(ctx context.Context, arg InsertLogSegmentParams) error

func (*Queries) InsertMetricSegmentDirect

func (q *Queries) InsertMetricSegmentDirect(ctx context.Context, arg InsertMetricSegmentParams) error

func (*Queries) ListSegmentsForQuery

func (q *Queries) ListSegmentsForQuery(ctx context.Context, arg ListSegmentsForQueryParams) ([]ListSegmentsForQueryRow, error)

func (*Queries) LogSegEstimator

func (q *Queries) LogSegEstimator(ctx context.Context, arg LogSegEstimatorParams) ([]LogSegEstimatorRow, error)

Returns an estimate of the number of log segments, average bytes, average records, and average bytes per record for log segments in the last hour per organization and instance. This query is basically identical to the MetricSegEstimator, but for log segments.

func (*Queries) MetricSegEstimator

func (q *Queries) MetricSegEstimator(ctx context.Context, arg MetricSegEstimatorParams) ([]MetricSegEstimatorRow, error)

Returns an estimate of the number of metric segments, average bytes, average records, and average bytes per record for metric segments in the last hour per organization and instance. This query is basically identical to the LogSegEstimator, but for metric segments.

func (*Queries) ObjectCleanupAdd

func (q *Queries) ObjectCleanupAdd(ctx context.Context, arg ObjectCleanupAddParams) error

func (*Queries) ObjectCleanupComplete

func (q *Queries) ObjectCleanupComplete(ctx context.Context, id uuid.UUID) error

func (*Queries) ObjectCleanupFail

func (q *Queries) ObjectCleanupFail(ctx context.Context, id uuid.UUID) error

func (*Queries) ObjectCleanupGet

func (q *Queries) ObjectCleanupGet(ctx context.Context, maxrows int32) ([]ObjectCleanupGetRow, error)

func (*Queries) PutInqueueWork

func (q *Queries) PutInqueueWork(ctx context.Context, arg PutInqueueWorkParams) error

func (*Queries) ReleaseInqueueWork

func (q *Queries) ReleaseInqueueWork(ctx context.Context, arg ReleaseInqueueWorkParams) error

func (*Queries) SignalLockCleanup

func (q *Queries) SignalLockCleanup(ctx context.Context) (int32, error)

func (*Queries) TouchInqueueWork

func (q *Queries) TouchInqueueWork(ctx context.Context, arg TouchInqueueWorkParams) error

func (*Queries) UpsertServiceIdentifier

func (q *Queries) UpsertServiceIdentifier(ctx context.Context, arg UpsertServiceIdentifierParams) (UpsertServiceIdentifierRow, error)

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

func (*Queries) WorkQueueAddDirect

func (q *Queries) WorkQueueAddDirect(ctx context.Context, arg WorkQueueAddParams) error

func (*Queries) WorkQueueClaimDirect

func (q *Queries) WorkQueueClaimDirect(ctx context.Context, arg WorkQueueClaimParams) (WorkQueueClaimRow, error)

func (*Queries) WorkQueueCleanupDirect

func (q *Queries) WorkQueueCleanupDirect(ctx context.Context) ([]WorkQueueCleanupRow, error)

func (*Queries) WorkQueueCompleteDirect

func (q *Queries) WorkQueueCompleteDirect(ctx context.Context, arg WorkQueueCompleteParams) error

func (*Queries) WorkQueueFailDirect

func (q *Queries) WorkQueueFailDirect(ctx context.Context, arg WorkQueueFailParams) error

func (*Queries) WorkQueueGC

func (q *Queries) WorkQueueGC(ctx context.Context, arg WorkQueueGCParams) (int32, error)

func (*Queries) WorkQueueGlobalLock

func (q *Queries) WorkQueueGlobalLock(ctx context.Context) error

func (*Queries) WorkQueueHeartbeatDirect

func (q *Queries) WorkQueueHeartbeatDirect(ctx context.Context, arg WorkQueueHeartbeatParams) error

1) heart-beat the work_queue

type ReleaseInqueueWorkParams

type ReleaseInqueueWorkParams struct {
	ID        uuid.UUID `json:"id"`
	ClaimedBy int64     `json:"claimed_by"`
}

type ReplaceMetricSegsNew

type ReplaceMetricSegsNew struct {
	TidPartition int16
	SegmentID    int64
	StartTs      int64
	EndTs        int64
	RecordCount  int64
	FileSize     int64
	TidCount     int32
}

type ReplaceMetricSegsOld

type ReplaceMetricSegsOld struct {
	TidPartition int16
	SegmentID    int64
}

type ReplaceMetricSegsParams

type ReplaceMetricSegsParams struct {
	// OrganizationID is the ID of the organization to which the metric segments belong.
	OrganizationID uuid.UUID
	// Dateint is the date in YYYYMMDD format for which the metric segments are being replaced.
	Dateint int32
	// IngestDateint is the date in YYYYMMDD format when the segments were ingested.
	IngestDateint int32
	// InstanceNum is the collector instance number, gotta keep it separated.
	InstanceNum int16
	// FrequencyMs is the frequency in milliseconds at which the metrics are collected.
	FrequencyMs int32
	// Published indicates whether the new segments are marked as published.
	Published bool
	// Rolledup indicates whether the new segments are marked as rolledup.
	Rolledup bool
	// OldRecords contains the segments to be deleted.
	OldRecords []ReplaceMetricSegsOld
	// NewRecords contains the segments to be inserted.
	NewRecords []ReplaceMetricSegsNew
	CreatedBy  CreatedBy
}

type ServiceIdentifier

type ServiceIdentifier struct {
	ID             uuid.UUID   `json:"id"`
	CreatedAt      time.Time   `json:"created_at"`
	UpdatedAt      time.Time   `json:"updated_at"`
	OrganizationID pgtype.UUID `json:"organization_id"`
	ServiceName    pgtype.Text `json:"service_name"`
	ClusterName    pgtype.Text `json:"cluster_name"`
	Namespace      pgtype.Text `json:"namespace"`
}

type Setting

type Setting struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type SignalEnum

type SignalEnum string
const (
	SignalEnumLogs    SignalEnum = "logs"
	SignalEnumMetrics SignalEnum = "metrics"
	SignalEnumTraces  SignalEnum = "traces"
)

func (*SignalEnum) Scan

func (e *SignalEnum) Scan(src interface{}) error

type SignalLock

type SignalLock struct {
	ID             int64                            `json:"id"`
	WorkID         *int64                           `json:"work_id"`
	OrganizationID uuid.UUID                        `json:"organization_id"`
	InstanceNum    int16                            `json:"instance_num"`
	Dateint        int32                            `json:"dateint"`
	FrequencyMs    int32                            `json:"frequency_ms"`
	Signal         SignalEnum                       `json:"signal"`
	TsRange        pgtype.Range[pgtype.Timestamptz] `json:"ts_range"`
	ClaimedBy      int64                            `json:"claimed_by"`
	ClaimedAt      *time.Time                       `json:"claimed_at"`
	HeartbeatedAt  time.Time                        `json:"heartbeated_at"`
	SlotID         int32                            `json:"slot_id"`
}

type Store

type Store struct {
	*Queries
	// contains filtered or unexported fields
}

Store provides all functions to execute db queries and transactions

func NewStore

func NewStore(connPool *pgxpool.Pool) *Store

NewStore creates a new Store

func (*Store) InsertLogSegment

func (q *Store) InsertLogSegment(ctx context.Context, params InsertLogSegmentParams) error

func (*Store) InsertMetricSegment

func (q *Store) InsertMetricSegment(ctx context.Context, params InsertMetricSegmentParams) error

func (*Store) Pool

func (store *Store) Pool() *pgxpool.Pool

func (*Store) ReplaceMetricSegs

func (q *Store) ReplaceMetricSegs(ctx context.Context, args ReplaceMetricSegsParams) error

ReplaceMetricSegs replaces old metric segments with new ones for a given organization, date, and instance. The change is made atomically.

func (*Store) WorkQueueAdd

func (q *Store) WorkQueueAdd(ctx context.Context, params WorkQueueAddParams) error

func (*Store) WorkQueueClaim

func (q *Store) WorkQueueClaim(ctx context.Context, params WorkQueueClaimParams) (WorkQueueClaimRow, error)

func (*Store) WorkQueueCleanup

func (q *Store) WorkQueueCleanup(ctx context.Context) ([]WorkQueueCleanupRow, error)

func (*Store) WorkQueueComplete

func (q *Store) WorkQueueComplete(ctx context.Context, params WorkQueueCompleteParams) error

func (*Store) WorkQueueFail

func (q *Store) WorkQueueFail(ctx context.Context, params WorkQueueFailParams) error

func (*Store) WorkQueueHeartbeat

func (q *Store) WorkQueueHeartbeat(ctx context.Context, params WorkQueueHeartbeatParams) error

type TouchInqueueWorkParams

type TouchInqueueWorkParams struct {
	Ids       []uuid.UUID `json:"ids"`
	ClaimedBy int64       `json:"claimed_by"`
}

type UpsertServiceIdentifierParams

type UpsertServiceIdentifierParams struct {
	OrganizationID pgtype.UUID `json:"organization_id"`
	ServiceName    pgtype.Text `json:"service_name"`
	ClusterName    pgtype.Text `json:"cluster_name"`
	Namespace      pgtype.Text `json:"namespace"`
}

type UpsertServiceIdentifierRow

type UpsertServiceIdentifierRow struct {
	ID        uuid.UUID `json:"id"`
	CreatedAt time.Time `json:"created_at"`
}

type WorkQueue

type WorkQueue struct {
	ID             int64                            `json:"id"`
	Priority       int32                            `json:"priority"`
	RunnableAt     time.Time                        `json:"runnable_at"`
	OrganizationID uuid.UUID                        `json:"organization_id"`
	InstanceNum    int16                            `json:"instance_num"`
	Dateint        int32                            `json:"dateint"`
	FrequencyMs    int32                            `json:"frequency_ms"`
	Signal         SignalEnum                       `json:"signal"`
	Action         ActionEnum                       `json:"action"`
	NeedsRun       bool                             `json:"needs_run"`
	Tries          int32                            `json:"tries"`
	TsRange        pgtype.Range[pgtype.Timestamptz] `json:"ts_range"`
	ClaimedBy      int64                            `json:"claimed_by"`
	ClaimedAt      *time.Time                       `json:"claimed_at"`
	HeartbeatedAt  time.Time                        `json:"heartbeated_at"`
	SlotID         int32                            `json:"slot_id"`
}

type WorkQueueAddParams

type WorkQueueAddParams struct {
	OrgID      uuid.UUID                        `json:"org_id"`
	Instance   int16                            `json:"instance"`
	Dateint    int32                            `json:"dateint"`
	Frequency  int32                            `json:"frequency"`
	Signal     SignalEnum                       `json:"signal"`
	Action     ActionEnum                       `json:"action"`
	TsRange    pgtype.Range[pgtype.Timestamptz] `json:"ts_range"`
	RunnableAt time.Time                        `json:"runnable_at"`
	Priority   int32                            `json:"priority"`
	SlotID     int32                            `json:"slot_id"`
}

type WorkQueueClaimParams

type WorkQueueClaimParams struct {
	TargetFreqs []int32    `json:"target_freqs"`
	Signal      SignalEnum `json:"signal"`
	SlotID      int32      `json:"slot_id"`
	Action      ActionEnum `json:"action"`
	MinPriority int32      `json:"min_priority"`
	WorkerID    int64      `json:"worker_id"`
}

type WorkQueueClaimRow

type WorkQueueClaimRow struct {
	ID             int64                            `json:"id"`
	Priority       int32                            `json:"priority"`
	RunnableAt     time.Time                        `json:"runnable_at"`
	OrganizationID uuid.UUID                        `json:"organization_id"`
	InstanceNum    int16                            `json:"instance_num"`
	Dateint        int32                            `json:"dateint"`
	FrequencyMs    int32                            `json:"frequency_ms"`
	Signal         SignalEnum                       `json:"signal"`
	Action         ActionEnum                       `json:"action"`
	NeedsRun       bool                             `json:"needs_run"`
	Tries          int32                            `json:"tries"`
	TsRange        pgtype.Range[pgtype.Timestamptz] `json:"ts_range"`
	ClaimedBy      int64                            `json:"claimed_by"`
	ClaimedAt      *time.Time                       `json:"claimed_at"`
	HeartbeatedAt  time.Time                        `json:"heartbeated_at"`
	SlotID         int32                            `json:"slot_id"`
}

type WorkQueueCleanupRow

type WorkQueueCleanupRow struct {
	ID             int64                            `json:"id"`
	Priority       int32                            `json:"priority"`
	RunnableAt     time.Time                        `json:"runnable_at"`
	OrganizationID uuid.UUID                        `json:"organization_id"`
	InstanceNum    int16                            `json:"instance_num"`
	Dateint        int32                            `json:"dateint"`
	FrequencyMs    int32                            `json:"frequency_ms"`
	Signal         SignalEnum                       `json:"signal"`
	Action         ActionEnum                       `json:"action"`
	NeedsRun       bool                             `json:"needs_run"`
	Tries          int32                            `json:"tries"`
	TsRange        pgtype.Range[pgtype.Timestamptz] `json:"ts_range"`
	ClaimedBy      int64                            `json:"claimed_by"`
	ClaimedAt      *time.Time                       `json:"claimed_at"`
	HeartbeatedAt  time.Time                        `json:"heartbeated_at"`
	SlotID         int32                            `json:"slot_id"`
	LocksRemoved   int64                            `json:"locks_removed"`
}

type WorkQueueCompleteParams

type WorkQueueCompleteParams struct {
	WorkerID int64 `json:"worker_id"`
	ID       int64 `json:"id"`
}

type WorkQueueFailParams

type WorkQueueFailParams struct {
	ID       int64 `json:"id"`
	WorkerID int64 `json:"worker_id"`
}

type WorkQueueGCParams

type WorkQueueGCParams struct {
	Cutoff  time.Time `json:"cutoff"`
	Maxrows int32     `json:"maxrows"`
}

type WorkQueueHeartbeatParams

type WorkQueueHeartbeatParams struct {
	Ids      []int64 `json:"ids"`
	WorkerID int64   `json:"worker_id"`
}

type WorkQueueQuerier

type WorkQueueQuerier interface {
	WorkQueueAdd(ctx context.Context, params WorkQueueAddParams) error
	WorkQueueFail(ctx context.Context, params WorkQueueFailParams) error
	WorkQueueComplete(ctx context.Context, params WorkQueueCompleteParams) error
	WorkQueueHeartbeat(ctx context.Context, params WorkQueueHeartbeatParams) error
	WorkQueueCleanup(ctx context.Context) ([]WorkQueueCleanupRow, error)
	WorkQueueClaim(ctx context.Context, params WorkQueueClaimParams) (WorkQueueClaimRow, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL