vmi_internal

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Compressor pool for sending metrics:

Index

Constants

View Source
const (
	COMPRESSOR_POOL_CONFIG_COMPRESSION_LEVEL_DEFAULT    = gzip.DefaultCompression
	COMPRESSOR_POOL_CONFIG_NUM_COMPRESSORS_DEFAULT      = -1
	COMPRESSOR_POOL_MAX_NUM_COMPRESSORS                 = 4
	COMPRESSOR_POOL_CONFIG_BUFFER_POOL_MAX_SIZE_DEFAULT = 64
	COMPRESSOR_POOL_CONFIG_METRICS_QUEUE_SIZE_DEFAULT   = 64
	COMPRESSOR_POOL_CONFIG_BATCH_TARGET_SIZE_DEFAULT    = "64k"
	COMPRESSOR_POOL_CONFIG_FLUSH_INTERVAL_DEFAULT       = 5 * time.Second
)
View Source
const (
	INITIAL_COMPRESSION_FACTOR         = 2.
	COMPRESSION_FACTOR_EXP_DECAY_ALPHA = 0.8
	// A compressed batch should be at least this size to be used for updating
	// the compression factor:
	COMPRESSED_BATCH_MIN_SIZE_FOR_CF = 128
)
View Source
const (
	COMPRESSOR_STATS_READ_COUNT = iota
	COMPRESSOR_STATS_READ_BYTE_COUNT
	COMPRESSOR_STATS_SEND_COUNT
	COMPRESSOR_STATS_SEND_BYTE_COUNT
	COMPRESSOR_STATS_TIMEOUT_FLUSH_COUNT
	COMPRESSOR_STATS_SEND_ERROR_COUNT
	COMPRESSOR_STATS_WRITE_ERROR_COUNT
	// Must be last:
	COMPRESSOR_STATS_UINT64_LEN
)

Compressor stats:

View Source
const (
	COMPRESSOR_STATS_COMPRESSION_FACTOR = iota
	// Must be last:
	COMPRESSOR_STATS_FLOAT64_LEN
)
View Source
const (
	VMI_CONFIG_SECTION_NAME = "vmi_config"
	GENERATORS_SECTION_NAME = "generators"

	VMI_CONFIG_USE_SHORT_HOSTNAME_DEFAULT = false
	VMI_CONFIG_SHUTDOWN_MAX_WAIT_DEFAULT  = 5 * time.Second
)
View Source
const (
	// Indexes into the per generator []int stats:
	METRICS_GENERATOR_INVOCATION_COUNT = iota
	METRICS_GENERATOR_METRICS_COUNT
	METRICS_GENERATOR_BYTE_COUNT
	// Must be last:
	METRICS_GENERATOR_NUM_STATS
)

Each metrics generator will maintain the following common stats:

View Source
const (
	// The order in the metrics cache:
	GO_NUM_GOROUTINE_METRIC_INDEX = iota
	GO_MEM_SYS_BYTES_METRIC_INDEX
	GO_MEM_HEAP_BYTES_METRIC_INDEX
	GO_MEM_HEAP_SYS_BYTES_METRIC_INDEX
	GO_MEM_MALLOCS_DELTA_METRIC_INDEX
	GO_MEM_FREE_DELTA_METRIC_INDEX
	GO_MEM_IN_USE_OBJECT_COUNT_METRIC_INDEX
	GO_MEM_NUM_GC_DELTA_METRIC_INDEX

	// Must be last:
	GO_INTERNAL_METRICS_NUM
)
View Source
const (
	// Endpoint default values:
	HTTP_ENDPOINT_URL_DEFAULT                      = "http://localhost:8428/api/v1/import/prometheus"
	HTTP_ENDPOINT_MARK_UNHEALTHY_THRESHOLD_DEFAULT = 1

	// Endpoint config pool default values:
	HTTP_ENDPOINT_POOL_CONFIG_SHUFFLE_DEFAULT                 = false
	HTTP_ENDPOINT_POOL_CONFIG_HEALTHY_ROTATE_INTERVAL_DEFAULT = 5 * time.Minute
	HTTP_ENDPOINT_POOL_CONFIG_ERROR_RESET_INTERVAL_DEFAULT    = 1 * time.Minute
	HTTP_ENDPOINT_POOL_CONFIG_HEALTH_CHECK_INTERVAL_DEFAULT   = 5 * time.Second
	HTTP_ENDPOINT_POOL_CONFIG_HEALTHY_MAX_WAIT_DEFAULT        = 10 * time.Second
	HTTP_ENDPOINT_POOL_CONFIG_SEND_BUFFER_TIMEOUT_DEFAULT     = 20 * time.Second
	HTTP_ENDPOINT_POOL_CONFIG_RATE_LIMIT_MBPS_DEFAULT         = ""
	// Endpoint config definitions, later they may be configurable:
	HTTP_ENDPOINT_POOL_HEALTHY_CHECK_MIN_INTERVAL    = 1 * time.Second
	HTTP_ENDPOINT_POOL_HEALTHY_POLL_INTERVAL         = 500 * time.Millisecond
	HTTP_ENDPOINT_POOL_HEALTH_CHECK_ERR_LOG_INTERVAL = 10 * time.Second

	// http.Transport config default values:
	//   Dialer config default values:
	HTTP_ENDPOINT_POOL_CONFIG_TCP_CONN_TIMEOUT_DEFAULT        = 2 * time.Second
	HTTP_ENDPOINT_POOL_CONFIG_TCP_KEEP_ALIVE_DEFAULT          = 15 * time.Second
	HTTP_ENDPOINT_POOL_CONFIG_MAX_IDLE_CONNS_DEFAULT          = 0 // No limit
	HTTP_ENDPOINT_POOL_CONFIG_MAX_IDLE_CONNS_PER_HOST_DEFAULT = 1
	HTTP_ENDPOINT_POOL_CONFIG_MAX_CONNS_PER_HOST_DEFAULT      = 0 // No limit
	HTTP_ENDPOINT_POOL_CONFIG_IDLE_CONN_TIMEOUT_DEFAULT       = 1 * time.Minute
	// http.Client config default values:
	HTTP_ENDPOINT_POOL_CONFIG_RESPONSE_TIMEOUT_DEFAULT = 5 * time.Second

	// Prefixes for the password field:
	HTTP_ENDPOINT_POOL_CONFIG_PASSWORD_FILE_PREFIX = "file:"
	HTTP_ENDPOINT_POOL_CONFIG_PASSWORD_ENV_PREFIX  = "env:"
	HTTP_ENDPOINT_POOL_CONFIG_PASSWORD_PASS_PREFIX = "pass:"
)
View Source
const (
	HTTP_ENDPOINT_STATS_SEND_BUFFER_COUNT = iota
	HTTP_ENDPOINT_STATS_SEND_BUFFER_BYTE_COUNT
	HTTP_ENDPOINT_STATS_SEND_BUFFER_ERROR_COUNT
	HTTP_ENDPOINT_STATS_HEALTH_CHECK_COUNT
	HTTP_ENDPOINT_STATS_HEALTH_CHECK_ERROR_COUNT
	// Must be last:
	HTTP_ENDPOINT_STATS_LEN
)

Endpoint stats:

View Source
const (
	HTTP_ENDPOINT_POOL_STATS_HEALTHY_ROTATE_COUNT = iota
	HTTP_ENDPOINT_POOL_STATS_NO_HEALTHY_EP_ERROR_COUNT
	// Must be last:
	HTTP_ENDPOINT_POOL_STATS_LEN
)

Endpoint pool stats:

View Source
const (
	INTERNAL_METRICS_CONFIG_INTERVAL_DEFAULT            = 5 * time.Second
	INTERNAL_METRICS_CONFIG_FULL_METRICS_FACTOR_DEFAULT = 12

	// This generator id:
	INTERNAL_METRICS_ID = "internal_metrics"
)

Generate internal metrics:

View Source
const (
	// The following labels are common to all metrics:
	INSTANCE_LABEL_NAME = "vmi_inst"
	HOSTNAME_LABEL_NAME = "hostname"

	// Deltas since previous internal metrics interval:
	COMPRESSOR_STATS_READ_DELTA_METRIC          = "vmi_compressor_read_delta"
	COMPRESSOR_STATS_READ_BYTE_DELTA_METRIC     = "vmi_compressor_read_byte_delta"
	COMPRESSOR_STATS_SEND_DELTA_METRIC          = "vmi_compressor_send_delta"
	COMPRESSOR_STATS_SEND_BYTE_DELTA_METRIC     = "vmi_compressor_send_byte_delta"
	COMPRESSOR_STATS_TIMEOUT_FLUSH_DELTA_METRIC = "vmi_compressor_tout_flush_delta"
	COMPRESSOR_STATS_SEND_ERROR_DELTA_METRIC    = "vmi_compressor_send_error_delta"
	COMPRESSOR_STATS_WRITE_ERROR_DELTA_METRIC   = "vmi_compressor_write_error_delta"
	COMPRESSOR_STATS_COMPRESSION_FACTOR_METRIC  = "vmi_compressor_compression_factor"

	COMPRESSOR_ID_LABEL_NAME = "compressor"

	// Invocation, metric and byte counts for the generator:
	METRICS_GENERATOR_INVOCATION_DELTA_METRIC = "vmi_metrics_gen_invocation_delta"
	METRICS_GENERATOR_METRICS_DELTA_METRIC    = "vmi_metrics_gen_metrics_delta"
	METRICS_GENERATOR_BYTE_DELTA_METRIC       = "vmi_metrics_gen_byte_delta"

	// Actual interval since the previous invocation. It should be closed to the
	// configured interval, but may be longer if the generator is busy. It could
	// be used to calculate the rates out of deltas
	METRICS_GENERATOR_DTIME_METRIC           = "vmi_metrics_gen_dtime_sec"
	METRICS_GENERATOR_DTIME_METRIC_PRECISION = 6

	METRICS_GENERATOR_ID_LABEL_NAME = "gen_id"

	GO_NUM_GOROUTINE_METRIC           = "vmi_go_num_goroutine"
	GO_MEM_SYS_BYTES_METRIC           = "vmi_go_mem_sys_bytes"
	GO_MEM_HEAP_BYTES_METRIC          = "vmi_go_mem_heap_bytes"
	GO_MEM_HEAP_SYS_BYTES_METRIC      = "vmi_go_mem_heap_sys_bytes"
	GO_MEM_IN_USE_OBJECT_COUNT_METRIC = "vmi_go_mem_in_use_object_count"

	// Deltas since previous internal metrics interval:
	GO_MEM_MALLOCS_DELTA_METRIC = "vmi_go_mem_malloc_delta"
	GO_MEM_FREE_DELTA_METRIC    = "vmi_go_mem_free_delta"
	GO_MEM_NUM_GC_DELTA_METRIC  = "vmi_go_mem_gc_delta"

	// Deltas since previous internal metrics interval:
	HTTP_ENDPOINT_STATS_SEND_BUFFER_DELTA_METRIC        = "vmi_http_ep_send_buffer_delta"
	HTTP_ENDPOINT_STATS_SEND_BUFFER_BYTE_DELTA_METRIC   = "vmi_http_ep_send_buffer_byte_delta"
	HTTP_ENDPOINT_STATS_SEND_BUFFER_ERROR_DELTA_METRIC  = "vmi_http_ep_send_buffer_error_delta"
	HTTP_ENDPOINT_STATS_HEALTH_CHECK_DELTA_METRIC       = "vmi_http_ep_healthcheck_delta"
	HTTP_ENDPOINT_STATS_HEALTH_CHECK_ERROR_DELTA_METRIC = "vmi_http_ep_healthcheck_error_delta"

	// Labels:
	HTTP_ENDPOINT_STATS_STATE_LABEL = "state"
	HTTP_ENDPOINT_URL_LABEL_NAME    = "url"

	// Deltas since previous internal metrics interval:
	HTTP_ENDPOINT_POOL_STATS_HEALTHY_ROTATE_DELTA_METRIC      = "vmi_http_ep_pool_healthy_rotate_delta"
	HTTP_ENDPOINT_POOL_STATS_NO_HEALTHY_EP_ERROR_DELTA_METRIC = "vmi_http_ep_pool_no_healthy_ep_error_delta"

	// Importer metric:
	VMI_UPTIME_METRIC = "vmi_uptime_sec" // heartbeat

	VMI_BUILD_INFO_METRIC   = "vmi_build_info"
	VMI_VERSION_LABEL_NAME  = "vmi_version"
	VMI_GIT_INFO_LABEL_NAME = "vmi_git_info"

	// OS metrics:
	OS_INFO_METRIC          = "vmi_os_info"
	OS_INFO_LABEL_PREFIX    = "os_info_" // prefix + OSInfoLabelKeys
	OS_RELEASE_METRIC       = "vmi_os_release"
	OS_RELEASE_LABEL_PREFIX = "os_rel_" // prefix + OSReleaseLabelKeys
	OS_UPTIME_METRIC        = "vmi_os_uptime_sec"

	UPTIME_METRIC_PRECISION = 6

	// %CPU over internal metrics interval:
	VMI_PROC_PCPU_METRIC = "vmi_proc_pcpu"

	TASK_STATS_SCHEDULED_DELTA_METRIC       = "vmi_task_scheduled_delta"
	TASK_STATS_DELAYED_DELTA_METRIC         = "vmi_task_delayed_delta"
	TASK_STATS_OVERRUN_DELTA_METRIC         = "vmi_task_overrun_delta"
	TASK_STATS_EXECUTED_DELTA_METRIC        = "vmi_task_executed_delta"
	TASK_STATS_NEXT_TS_HACK_DELTA_METRIC    = "vmi_task_next_ts_hack_delta"
	TASK_STATS_AVG_RUNTIME_METRIC           = "vmi_task_avg_runtime_sec"
	TASK_STATS_AVG_RUNTIME_METRIC_PRECISION = 6

	// Re-use generator ID label since they have the same value:
	TASK_STATS_TASK_ID_LABEL_NAME = METRICS_GENERATOR_ID_LABEL_NAME
)
View Source
const (
	CREDIT_NO_LIMIT    = 0
	CREDIT_EXACT_MATCH = 0
)
View Source
const (
	READ_FILE_BUF_POOL_MAX_SIZE_UNBOUND      = 0
	READ_FILE_BUF_POOL_MAX_READ_SIZE_UNBOUND = 0
)
View Source
const (
	CONFIG_FLAG_NAME = "config"
	INSTANCE_DEFAULT = "vmi"
)
View Source
const (
	SCHEDULER_CONFIG_NUM_WORKERS_DEFAULT = -1
	SCHEDULER_MAX_NUM_WORKERS            = 8
)
View Source
const (
	SCHEDULER_TASK_Q_LEN = 64
	SCHEDULER_TODO_Q_LEN = 64
	// All intervals will be rounded to be a multiple of scheduler's granularity:
	SCHEDULER_GRANULARITY = 20 * time.Millisecond
	// The minimum pause between 2 consecutive executions of the same task:
	SCHEDULER_TASK_MIN_EXECUTION_PAUSE = 2 * SCHEDULER_GRANULARITY
)
View Source
const (

	// How many times the task was scheduled:
	TASK_STATS_SCHEDULED_COUNT = iota

	// How many times the task was delayed because it was too close to its
	// previous execution:
	TASK_STATS_DELAYED_COUNT

	// How many times the task overran, i.e. its runtime >= interval:
	TASK_STATS_OVERRUN_COUNT

	// How many times the task was executed:
	TASK_STATS_EXECUTED_COUNT

	// How many times the next scheduling time was hacked to counter the wall
	// clock seemingly going backwards (see AddNewTask):
	TASK_STATS_NEXT_TS_HACK_COUNT

	// Total runtime of the task, in microseconds.
	TASK_STATS_TOTAL_RUNTIME

	// Must be last:
	TASK_STATS_UINT64_LEN
)
View Source
const (
	// The help usage message line wraparound default width:
	DEFAULT_FLAG_USAGE_WIDTH = 58
)
View Source
const (
	GENERATOR_RUNTIME_UNAVAILABLE = -1.
)

Variables

View Source
var (
	AvailableCPUCount = GetAvailableCPUCount()
	BootTime          = time.Now()
	Clktck            int64
	ClktckSec         float64
	OsInfo            = make(map[string]string)
	OsRelease         = make(map[string]string)
)
View Source
var (
	// The hostname, based on OS, config or command line arg.
	Hostname string

	// The instance should be primed w/ the desired default *before* invoking
	// the runner, most likely from an init() (e.g. set to "lsvmi" for Linux
	// Stats VictoriaMetrics importer) Its value may be modified via config and
	// command line args.
	Instance string = INSTANCE_DEFAULT

	// Build info, normally set via init() by the user of this package.
	Version string
	GitInfo string

	MetricsGenStats = NewMetricsGeneratorStatsContainer()
	MetricsQueue    BufferQueue
)
View Source
var ErrHttpEndpointPoolNoHealthyEP = errors.New("no healthy HTTP endpoint available")

Error codes:

View Source
var ErrReadFileBufPotentialTruncation = errors.New("potential truncation")

Reading a file may be limited by a max size; if the cap is reached then it is possible that the file was truncated (note that stat system will report size 0 for /proc files, so it cannot be used to determine the actual size). Should such a condition occur, it should be treated as an error to signal potential truncation to the caller.

View Source
var HttpEndpointPoolRetryCodes = map[int]bool{}

The list of HTTP codes that should be retried:

View Source
var HttpEndpointPoolSuccessCodes = map[int]bool{
	http.StatusOK:        true,
	http.StatusNoContent: true,
}

The list of HTTP codes that denote success:

View Source
var OSInfoLabelKeys = []string{
	"name",
	"release",
	"version",
	"machine",
}

The following OsInfo keys will be used as labels in OS info metrics:

View Source
var OSReleaseLabelKeys = []string{
	"id",
	"name",
	"pretty_name",
	"version",
	"version_codename",
	"version_id",
}

The following OSRelease keys will be used as labels in OS info metrics:

Functions

func BuildHtmlBasicAuth

func BuildHtmlBasicAuth(username, password string) (string, error)

func CompliantTaskInterval

func CompliantTaskInterval(interval time.Duration) time.Duration

Ensure that a task interval is scheduler compliant:

func FormatFlagUsage

func FormatFlagUsage(usage string) string

func FormatFlagUsageWidth

func FormatFlagUsageWidth(usage string, width int) string

Format command flag usage for help message, by wrapping the lines around a given width. The original line breaks and prefixing white spaces are ignored. Example:

var flagArg = flag.String(

name,
value,
FormatFlagUsageWidth(`
This usage message will be reformatted to the given width, discarding
the current line breaks and line prefixing spaces.
`, 40),

)

func GetAvailableCPUCount

func GetAvailableCPUCount() int

For linux count available CPUs based on CPU affinity, w/ a fallback on runtime:

func GetCpuTime

func GetCpuTime(who int) (float64, error)

func GetInitialCycleNum

func GetInitialCycleNum(fullMetricsFactor int) int

func GetMyCpuTime

func GetMyCpuTime() (float64, error)

func GetOsBootTime

func GetOsBootTime() (time.Time, error)

func GetOsInfo

func GetOsInfo() (map[string]string, error)

func GetOsReleaseInfo

func GetOsReleaseInfo() (map[string]string, error)

func GetRootLogger

func GetRootLogger() *logrusx.CollectableLogger

Public access to the root logger, needed for testing:

func GetSysClktck

func GetSysClktck() (int64, error)

func LoadPasswordSpec

func LoadPasswordSpec(password string) (string, error)

func NewCompLogger

func NewCompLogger(compName string) *logrus.Entry

func ParseCreditRateSpec

func ParseCreditRateSpec(spec string) (int, time.Duration, error)

Parse rate limit Mbps string. Supported formats: FLOAT or FLOAT:INTERVAL, where INTERVAL should be in the format supported by time.ParseDuration(). FLOAT is equivalent w/ FLOAT:1s.

func RegisterTaskBuilder

func RegisterTaskBuilder(tb func(config any) ([]MetricsGeneratorTask, error))

func Run

func Run(genConfig any) int

func SetLogger

func SetLogger(logCfg *logrusx.LoggerConfig) error

Set the logger based on config:

func SplitWords

func SplitWords(s string) []string

Types

type BufferQueue

type BufferQueue interface {
	GetBuf() *bytes.Buffer
	ReturnBuf(b *bytes.Buffer)
	QueueBuf(b *bytes.Buffer)
	GetTargetSize() int
}

The generated metrics are written into *bytes.Buffer's which are then queued into the metrics queue for transmission.

type BytesReadSeekCloser

type BytesReadSeekCloser struct {
	// contains filtered or unexported fields
}

Convert bytes.Reader into ReadSeekRewindCloser such that it can be used as body for http.Request w/ retries:

func NewBytesReadSeekCloser

func NewBytesReadSeekCloser(b []byte) *BytesReadSeekCloser

func (*BytesReadSeekCloser) Close

func (brsc *BytesReadSeekCloser) Close() error

func (*BytesReadSeekCloser) Read

func (brsc *BytesReadSeekCloser) Read(p []byte) (int, error)

func (*BytesReadSeekCloser) Rewind

func (brsc *BytesReadSeekCloser) Rewind() error

Reuse, for HTTP retries:

func (*BytesReadSeekCloser) Seek

func (brsc *BytesReadSeekCloser) Seek(offset int64, whence int) (int64, error)

type CompressorPool

type CompressorPool struct {
	// contains filtered or unexported fields
}

func NewCompressorPool

func NewCompressorPool(poolCfg *CompressorPoolConfig) (*CompressorPool, error)

func (*CompressorPool) GetBuf

func (pool *CompressorPool) GetBuf() *bytes.Buffer

Satisfy BufferQueue interface:

func (*CompressorPool) GetTargetSize

func (pool *CompressorPool) GetTargetSize() int

func (*CompressorPool) QueueBuf

func (pool *CompressorPool) QueueBuf(b *bytes.Buffer)

func (*CompressorPool) ReturnBuf

func (pool *CompressorPool) ReturnBuf(buf *bytes.Buffer)

func (*CompressorPool) Shutdown

func (pool *CompressorPool) Shutdown()

func (*CompressorPool) SnapStats

func (*CompressorPool) Start

func (pool *CompressorPool) Start(sender Sender)

type CompressorPoolConfig

type CompressorPoolConfig struct {
	// The number of compressors. If set to -1 it will match the number of
	// available cores but not more than COMPRESSOR_POOL_MAX_NUM_COMPRESSORS:
	NumCompressors int `yaml:"num_compressors"`
	// Buffer pool size; buffers are pulled by metrics generators as needed and
	// they are returned after they are compressed. The pool max size controls
	// only how many idle buffers are being kept around, since they are created
	// as many as requested but they are discarded if they exceed the value
	// below. A value is too small leads to object churning and a value too
	// large may waste memory.
	BufferPoolMaxSize int `yaml:"buffer_pool_max_size"`
	// Metrics queue size, it should be deep enough to accommodate metrics up to
	// send_buffer_timeout:
	MetricsQueueSize int `yaml:"metrics_queue_size"`
	// Compression level: 0..9:
	CompressionLevel int `yaml:"compression_level"`
	// Batch target size; metrics will be read from the queue until the
	// compressed size is ~ to the value below. The value can have the usual `k`
	// or `m` suffixes for KiB or MiB accordingly.
	BatchTargetSize string `yaml:"batch_target_size"`
	// Flush interval. If batch_target_size is not reached before this interval
	// expires, the metrics compressed thus far are being sent anyway. Use 0 to
	// disable time flush.
	FlushInterval time.Duration `yaml:"flush_interval"`
}

func DefaultCompressorPoolConfig

func DefaultCompressorPoolConfig() *CompressorPoolConfig

type CompressorPoolInternalMetrics

type CompressorPoolInternalMetrics struct {
	// contains filtered or unexported fields
}

func NewCompressorPoolInternalMetrics

func NewCompressorPoolInternalMetrics(internalMetrics *InternalMetrics) *CompressorPoolInternalMetrics

type CompressorPoolState

type CompressorPoolState int
var (
	CompressorPoolStateCreated CompressorPoolState = 0
	CompressorPoolStateRunning CompressorPoolState = 1
	CompressorPoolStateStopped CompressorPoolState = 2
)

func (CompressorPoolState) String

func (state CompressorPoolState) String() string

type CompressorPoolStats

type CompressorPoolStats map[string]*CompressorStats

func NewCompressorPoolStats

func NewCompressorPoolStats(numCompressors int) CompressorPoolStats

type CompressorStats

type CompressorStats struct {
	Uint64Stats  []uint64
	Float64Stats []float64
}

func NewCompressorStats

func NewCompressorStats() *CompressorStats

type Credit

type Credit struct {
	// contains filtered or unexported fields
}

The actual implementation:

func NewCredit

func NewCredit(replenishValue, max int, replenishInt time.Duration) *Credit

func NewCreditFromSpec

func NewCreditFromSpec(spec string) (*Credit, error)

func (*Credit) GetCredit

func (c *Credit) GetCredit(desired, minAcceptable int) (got int)

func (*Credit) StopReplenish

func (c *Credit) StopReplenish()

func (*Credit) StopReplenishWait

func (c *Credit) StopReplenishWait()

func (*Credit) String

func (c *Credit) String() string

type CreditController

type CreditController interface {
	GetCredit(desired, minAcceptable int) int
}

Define an interface for testing:

type CreditReader

type CreditReader struct {
	// contains filtered or unexported fields
}

Credit based reader, limiting the rate of data read from a byte buffer and implementing the io.ReadSeekCloser interface, so it can be used in http.Request.Body. This is used to control the rate of import into VictoriaMetrics.

func NewCreditReader

func NewCreditReader(cc CreditController, minAcceptable int, b []byte) *CreditReader

func (*CreditReader) Close

func (cr *CreditReader) Close() error

Implement Close interface:

func (*CreditReader) Read

func (cr *CreditReader) Read(p []byte) (int, error)

Implement the Read interface:

func (*CreditReader) Reuse

func (cr *CreditReader) Reuse(minAcceptable int, b []byte)

Reuse w/ new data:

func (*CreditReader) Rewind

func (cr *CreditReader) Rewind() error

Reuse w/ the same data:

func (*CreditReader) Seek

func (cr *CreditReader) Seek(offset int64, whence int) (int64, error)

type GeneratorBase

type GeneratorBase struct {
	// Unique generator ID:
	Id string
	// Scheduling interval:
	Interval time.Duration
	// Full metrics factor (see "Partial V. Full Metrics" in README.md):
	FullMetricsFactor int
	// The current cycle# used in conjunction with the FullMetricsFactor:
	CycleNum int
	// The timestamp of the last metrics generation:
	LastTs time.Time
	// Cache for generator metrics:
	DtimeMetric []byte
	// Cache for timestamp suffix, common to all/a group of metrics:
	TsSuffixBuf *bytes.Buffer
	// Whether the structure was initialized (caches, etc) or not:
	Initialized bool
	// The following fields, if left to their default values (type's nil) will
	// be set during initialization with the usual values. They may be
	// pre-populated during tests after the generator was created and before
	// initialization.
	Instance     string
	Hostname     string
	TimeNowFunc  func() time.Time
	MetricsQueue BufferQueue
	TestMode     bool
}

func (*GeneratorBase) GenBaseInit

func (gb *GeneratorBase) GenBaseInit()

func (*GeneratorBase) GenBaseMetricsStart

func (gb *GeneratorBase) GenBaseMetricsStart(buf *bytes.Buffer, ts time.Time) (int, time.Time)

Start metrics generation; this should the 1st call in a metrics generation since it establishes the timestamp suffix. Call with the buffer for the metrics and the timestamp of the collection. Return the metric count and the last timestamp of the previous run. If the buffer is nil, then no metrics are generated, but the timestamp suffix is still updated.

func (*GeneratorBase) GetId

func (gb *GeneratorBase) GetId() string

Satisfy GeneratorTask I/F:

func (*GeneratorBase) GetInterval

func (gb *GeneratorBase) GetInterval() time.Duration

type GeneratorInternalMetrics

type GeneratorInternalMetrics struct {
	// contains filtered or unexported fields
}

func NewGeneratorInternalMetrics

func NewGeneratorInternalMetrics(internalMetrics *InternalMetrics) *GeneratorInternalMetrics

func (*GeneratorInternalMetrics) SnapStats

func (gim *GeneratorInternalMetrics) SnapStats()

type GoInternalMetrics

type GoInternalMetrics struct {
	// contains filtered or unexported fields
}

func NewGoInternalMetrics

func NewGoInternalMetrics(internalMetrics *InternalMetrics) *GoInternalMetrics

func (*GoInternalMetrics) SnapStats

func (gim *GoInternalMetrics) SnapStats()

type HttpClientDoer

type HttpClientDoer interface {
	Do(req *http.Request) (*http.Response, error)
	CloseIdleConnections()
}

Define a mockable interface to substitute http.Client.Do() for testing purposes:

type HttpEndpoint

type HttpEndpoint struct {

	// The parsed format for above, to be used for http calls:
	URL *url.URL
	// contains filtered or unexported fields
}

func NewHttpEndpoint

func NewHttpEndpoint(cfg *HttpEndpointConfig) (*HttpEndpoint, error)

type HttpEndpointConfig

type HttpEndpointConfig struct {
	URL                    string
	MarkUnhealthyThreshold int `yaml:"mark_unhealthy_threshold"`
}

func DefaultHttpEndpointConfig

func DefaultHttpEndpointConfig() *HttpEndpointConfig

type HttpEndpointDoublyLinkedList

type HttpEndpointDoublyLinkedList struct {
	// contains filtered or unexported fields
}

func (*HttpEndpointDoublyLinkedList) AddToHead

func (epDblLnkList *HttpEndpointDoublyLinkedList) AddToHead(ep *HttpEndpoint)

func (*HttpEndpointDoublyLinkedList) AddToTail

func (epDblLnkList *HttpEndpointDoublyLinkedList) AddToTail(ep *HttpEndpoint)

func (*HttpEndpointDoublyLinkedList) Insert

func (epDblLnkList *HttpEndpointDoublyLinkedList) Insert(ep, after *HttpEndpoint)

func (*HttpEndpointDoublyLinkedList) Remove

func (epDblLnkList *HttpEndpointDoublyLinkedList) Remove(ep *HttpEndpoint)

type HttpEndpointPool

type HttpEndpointPool struct {
	// contains filtered or unexported fields
}

func NewHttpEndpointPool

func NewHttpEndpointPool(poolCfg *HttpEndpointPoolConfig) (*HttpEndpointPool, error)

func (*HttpEndpointPool) GetCurrentHealthy

func (epPool *HttpEndpointPool) GetCurrentHealthy(maxWait time.Duration) *HttpEndpoint

Get the current healthy endpoint or nil if none available after max wait; if maxWait < 0 then the pool healthyMaxWait is used:

func (*HttpEndpointPool) HealthCheck

func (epPool *HttpEndpointPool) HealthCheck(ep *HttpEndpoint)

func (*HttpEndpointPool) MoveToHealthy

func (epPool *HttpEndpointPool) MoveToHealthy(ep *HttpEndpoint)

func (*HttpEndpointPool) ReportError

func (epPool *HttpEndpointPool) ReportError(ep *HttpEndpoint)

func (*HttpEndpointPool) SendBuffer

func (epPool *HttpEndpointPool) SendBuffer(b []byte, timeout time.Duration, gzipped bool) error

SendBuffer: the main reason for the pool is to send buffers w/ load balancing and retries. If timeout is < 0 then the pool's sendBufferTimeout is used:

func (*HttpEndpointPool) Shutdown

func (epPool *HttpEndpointPool) Shutdown()

Needed for testing or clean exit in general:

func (*HttpEndpointPool) SnapStats

type HttpEndpointPoolConfig

type HttpEndpointPoolConfig struct {
	Endpoints              []*HttpEndpointConfig `yaml:"endpoints"`
	Username               string                `yaml:"username"`
	Password               string                `yaml:"password"`
	MarkUnhealthyThreshold int                   `yaml:"mark_unhealthy_threshold"`
	Shuffle                bool                  `yaml:"shuffle"`
	HealthyRotateInterval  time.Duration         `yaml:"healthy_rotate_interval"`
	ErrorResetInterval     time.Duration         `yaml:"error_reset_interval"`
	HealthCheckInterval    time.Duration         `yaml:"health_check_interval"`
	HealthyMaxWait         time.Duration         `yaml:"healthy_max_wait"`
	SendBufferTimeout      time.Duration         `yaml:"send_buffer_timeout"`
	RateLimitMbps          string                `yaml:"rate_limit_mbps"`
	IgnoreTLSVerify        bool                  `yaml:"ignore_tls_verify"`
	TcpConnTimeout         time.Duration         `yaml:"tcp_conn_timeout"`
	TcpKeepAlive           time.Duration         `yaml:"tcp_keep_alive"`
	MaxIdleConns           int                   `yaml:"max_idle_conns"`
	MaxIdleConnsPerHost    int                   `yaml:"max_idle_conns_per_host"`
	MaxConnsPerHost        int                   `yaml:"max_conns_per_host"`
	IdleConnTimeout        time.Duration         `yaml:"idle_conn_timeout"`
	ResponseTimeout        time.Duration         `yaml:"response_timeout"`
}

func DefaultHttpEndpointPoolConfig

func DefaultHttpEndpointPoolConfig() *HttpEndpointPoolConfig

func (*HttpEndpointPoolConfig) OverrideEndpoints

func (poolCfg *HttpEndpointPoolConfig) OverrideEndpoints(urlList string)

Used for command line argument override:

type HttpEndpointPoolInternalMetrics

type HttpEndpointPoolInternalMetrics struct {
	// contains filtered or unexported fields
}

func NewHttpEndpointPoolInternalMetrics

func NewHttpEndpointPoolInternalMetrics(internalMetrics *InternalMetrics) *HttpEndpointPoolInternalMetrics

type HttpEndpointPoolStats

type HttpEndpointPoolStats struct {
	PoolStats HttpPoolStats
	// Endpoint stats are indexed by URL:
	EndpointStats map[string]HttpEndpointStats
}

func NewHttpEndpointPoolStats

func NewHttpEndpointPoolStats() *HttpEndpointPoolStats

type HttpEndpointStats

type HttpEndpointStats []uint64

type HttpPoolStats

type HttpPoolStats []uint64

type InternalMetrics

type InternalMetrics struct {
	GeneratorBase
	// contains filtered or unexported fields
}

func NewInternalMetrics

func NewInternalMetrics(internalMetricsCfg *InternalMetricsConfig) (*InternalMetrics, error)

func (*InternalMetrics) TaskAction

func (internalMetrics *InternalMetrics) TaskAction() bool

type InternalMetricsConfig

type InternalMetricsConfig struct {
	Interval          time.Duration `yaml:"interval"`
	FullMetricsFactor int           `yaml:"full_metrics_factor"`
}

func DefaultInternalMetricsConfig

func DefaultInternalMetricsConfig() *InternalMetricsConfig

type MetricsGeneratorStats

type MetricsGeneratorStats map[string][]uint64

type MetricsGeneratorStatsContainer

type MetricsGeneratorStatsContainer struct {
	// contains filtered or unexported fields
}

func NewMetricsGeneratorStatsContainer

func NewMetricsGeneratorStatsContainer() *MetricsGeneratorStatsContainer

func (*MetricsGeneratorStatsContainer) Clear

func (mgsc *MetricsGeneratorStatsContainer) Clear()

func (*MetricsGeneratorStatsContainer) Update

func (mgsc *MetricsGeneratorStatsContainer) Update(genId string, metricCount, byteCount uint64)

type MetricsGeneratorTask

type MetricsGeneratorTask interface {
	GetId() string
	GetInterval() time.Duration
	TaskActivity() bool
}

The metrics generator interface which allows it to be scheduled as a Task:

type ProcessInternalMetrics

type ProcessInternalMetrics struct {
	// contains filtered or unexported fields
}

func NewProcessInternalMetrics

func NewProcessInternalMetrics(internalMetrics *InternalMetrics) *ProcessInternalMetrics

func (*ProcessInternalMetrics) SnapStats

func (pim *ProcessInternalMetrics) SnapStats()

type ReadFileBufPool

type ReadFileBufPool struct {
	// contains filtered or unexported fields
}

func NewBufPool

func NewBufPool(maxPoolSize int) *ReadFileBufPool

func NewReadFileBufPool

func NewReadFileBufPool(maxPoolSize int, maxReadSize int64) *ReadFileBufPool

func (*ReadFileBufPool) GetBuf

func (p *ReadFileBufPool) GetBuf() *bytes.Buffer

func (*ReadFileBufPool) MaxPoolSize

func (p *ReadFileBufPool) MaxPoolSize() int

func (*ReadFileBufPool) MaxReadSize

func (p *ReadFileBufPool) MaxReadSize() int64

func (*ReadFileBufPool) ReadFile

func (p *ReadFileBufPool) ReadFile(path string) (*bytes.Buffer, error)

func (*ReadFileBufPool) ReturnBuf

func (p *ReadFileBufPool) ReturnBuf(b *bytes.Buffer)

type ReadSeekRewindCloser

type ReadSeekRewindCloser interface {
	io.ReadSeekCloser
	Rewind() error
}

Interface for a http.Request body w/ retries:

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(schedulerCfg *SchedulerConfig) (*Scheduler, error)

func (*Scheduler) AddNewTask

func (scheduler *Scheduler) AddNewTask(task *Task)

func (*Scheduler) Len

func (scheduler *Scheduler) Len() int

sort.Interface:

func (*Scheduler) Less

func (scheduler *Scheduler) Less(i, j int) bool

func (*Scheduler) Pop

func (scheduler *Scheduler) Pop() any

func (*Scheduler) Push

func (scheduler *Scheduler) Push(x any)

heap.Interface:

func (*Scheduler) Shutdown

func (scheduler *Scheduler) Shutdown()

func (*Scheduler) SnapStats

func (scheduler *Scheduler) SnapStats(to SchedulerStats) SchedulerStats

Snap current stats.

func (*Scheduler) Start

func (scheduler *Scheduler) Start()

func (*Scheduler) Swap

func (scheduler *Scheduler) Swap(i, j int)

type SchedulerConfig

type SchedulerConfig struct {
	// The number of workers. If set to -1 it will match the number of
	// available cores:
	NumWorkers int `yaml:"num_workers"`
}

func DefaultSchedulerConfig

func DefaultSchedulerConfig() *SchedulerConfig

type SchedulerInternalMetrics

type SchedulerInternalMetrics struct {
	// contains filtered or unexported fields
}

func NewSchedulerInternalMetrics

func NewSchedulerInternalMetrics(internalMetrics *InternalMetrics) *SchedulerInternalMetrics

type SchedulerState

type SchedulerState int
var (
	SchedulerStateCreated SchedulerState = 0
	SchedulerStateRunning SchedulerState = 1
	SchedulerStateStopped SchedulerState = 2
)

func (SchedulerState) String

func (state SchedulerState) String() string

type SchedulerStats

type SchedulerStats map[string]*TaskStats

type Sender

type Sender interface {
	SendBuffer(b []byte, timeout time.Duration, gzipped bool) error
}

The HTTP endpoint pool interface as seen by the compressor:

type StdoutMetricsQueue

type StdoutMetricsQueue struct {
	// contains filtered or unexported fields
}

func NewStdoutMetricsQueue

func NewStdoutMetricsQueue(poolCfg *CompressorPoolConfig) (*StdoutMetricsQueue, error)

func (*StdoutMetricsQueue) GetBuf

func (mq *StdoutMetricsQueue) GetBuf() *bytes.Buffer

func (*StdoutMetricsQueue) GetTargetSize

func (mq *StdoutMetricsQueue) GetTargetSize() int

func (*StdoutMetricsQueue) QueueBuf

func (mq *StdoutMetricsQueue) QueueBuf(buf *bytes.Buffer)

func (*StdoutMetricsQueue) ReturnBuf

func (mq *StdoutMetricsQueue) ReturnBuf(buf *bytes.Buffer)

func (*StdoutMetricsQueue) Shutdown

func (mq *StdoutMetricsQueue) Shutdown()

type Task

type Task struct {
	// contains filtered or unexported fields
}

func InternalMetricsTaskBuilder

func InternalMetricsTaskBuilder(vmiConfig *VmiConfig) (*Task, error)

Define and register the task builder:

func NewTask

func NewTask(id string, interval time.Duration, action func() bool) *Task

type TaskStats

type TaskStats struct {
	Uint64Stats []uint64
	Disabled    bool
}

func NewTaskStats

func NewTaskStats() *TaskStats

type VmiConfig

type VmiConfig struct {
	// The instance name, default "vmi". It may be overridden by --instance
	// command line arg.
	Instance string `yaml:"instance"`

	// Whether to use short hostname or not as the value for hostname label.
	// Typically the hostname is determined from the hostname system call and if
	// the flag below is in effect, it is stripped of domain part. However if
	// the hostname is overridden by --hostname command line arg, that value is
	// used as-is.
	UseShortHostname bool `yaml:"use_short_hostname"`

	// How long to wait for a graceful shutdown. A negative value signifies
	// indefinite wait and 0 stands for no wait at all (exit abruptly).
	ShutdownMaxWait time.Duration `yaml:"shutdown_max_wait"`

	// Specific components configuration.
	LoggerConfig           *logrusx.LoggerConfig   `yaml:"log_config"`
	CompressorPoolConfig   *CompressorPoolConfig   `yaml:"compressor_pool_config"`
	HttpEndpointPoolConfig *HttpEndpointPoolConfig `yaml:"http_endpoint_pool_config"`
	SchedulerConfig        *SchedulerConfig        `yaml:"scheduler_config"`

	// Internal metrics configuration.
	InternalMetricsConfig *InternalMetricsConfig `yaml:"internal_metrics_config"`
}

func DefaultVmiConfig

func DefaultVmiConfig() *VmiConfig

func LoadConfig

func LoadConfig(cfgFile string, genConfig any, buf []byte) (*VmiConfig, error)

LoadConfig loads the configuration from the specified YAML file (or buffer, for testing) as follows:

  • the vmi_config section is returned as a *VmiConfig structure
  • the generators section is loaded into the provided genConfig structure, which expected to have been primed with default values.

Additionally an error is returned if the configuration could not be loaded or parsed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL