Documentation
¶
Overview ¶
Compressor pool for sending metrics:
Index ¶
- Constants
- Variables
- func BuildHtmlBasicAuth(username, password string) (string, error)
- func CompliantTaskInterval(interval time.Duration) time.Duration
- func FormatFlagUsage(usage string) string
- func FormatFlagUsageWidth(usage string, width int) string
- func GetAvailableCPUCount() int
- func GetCpuTime(who int) (float64, error)
- func GetInitialCycleNum(fullMetricsFactor int) int
- func GetMyCpuTime() (float64, error)
- func GetOsBootTime() (time.Time, error)
- func GetOsInfo() (map[string]string, error)
- func GetOsReleaseInfo() (map[string]string, error)
- func GetRootLogger() *logrusx.CollectableLogger
- func GetSysClktck() (int64, error)
- func LoadPasswordSpec(password string) (string, error)
- func NewCompLogger(compName string) *logrus.Entry
- func ParseCreditRateSpec(spec string) (int, time.Duration, error)
- func RegisterTaskBuilder(tb func(config any) ([]MetricsGeneratorTask, error))
- func Run(genConfig any) int
- func SetLogger(logCfg *logrusx.LoggerConfig) error
- func SplitWords(s string) []string
- type BufferQueue
- type BytesReadSeekCloser
- type CompressorPool
- func (pool *CompressorPool) GetBuf() *bytes.Buffer
- func (pool *CompressorPool) GetTargetSize() int
- func (pool *CompressorPool) QueueBuf(b *bytes.Buffer)
- func (pool *CompressorPool) ReturnBuf(buf *bytes.Buffer)
- func (pool *CompressorPool) Shutdown()
- func (pool *CompressorPool) SnapStats(to CompressorPoolStats) CompressorPoolStats
- func (pool *CompressorPool) Start(sender Sender)
- type CompressorPoolConfig
- type CompressorPoolInternalMetrics
- type CompressorPoolState
- type CompressorPoolStats
- type CompressorStats
- type Credit
- type CreditController
- type CreditReader
- type GeneratorBase
- type GeneratorInternalMetrics
- type GoInternalMetrics
- type HttpClientDoer
- type HttpEndpoint
- type HttpEndpointConfig
- type HttpEndpointDoublyLinkedList
- func (epDblLnkList *HttpEndpointDoublyLinkedList) AddToHead(ep *HttpEndpoint)
- func (epDblLnkList *HttpEndpointDoublyLinkedList) AddToTail(ep *HttpEndpoint)
- func (epDblLnkList *HttpEndpointDoublyLinkedList) Insert(ep, after *HttpEndpoint)
- func (epDblLnkList *HttpEndpointDoublyLinkedList) Remove(ep *HttpEndpoint)
- type HttpEndpointPool
- func (epPool *HttpEndpointPool) GetCurrentHealthy(maxWait time.Duration) *HttpEndpoint
- func (epPool *HttpEndpointPool) HealthCheck(ep *HttpEndpoint)
- func (epPool *HttpEndpointPool) MoveToHealthy(ep *HttpEndpoint)
- func (epPool *HttpEndpointPool) ReportError(ep *HttpEndpoint)
- func (epPool *HttpEndpointPool) SendBuffer(b []byte, timeout time.Duration, gzipped bool) error
- func (epPool *HttpEndpointPool) Shutdown()
- func (pool *HttpEndpointPool) SnapStats(to *HttpEndpointPoolStats) *HttpEndpointPoolStats
- type HttpEndpointPoolConfig
- type HttpEndpointPoolInternalMetrics
- type HttpEndpointPoolStats
- type HttpEndpointStats
- type HttpPoolStats
- type InternalMetrics
- type InternalMetricsConfig
- type MetricsGeneratorStats
- type MetricsGeneratorStatsContainer
- type MetricsGeneratorTask
- type ProcessInternalMetrics
- type ReadFileBufPool
- type ReadSeekRewindCloser
- type Scheduler
- func (scheduler *Scheduler) AddNewTask(task *Task)
- func (scheduler *Scheduler) Len() int
- func (scheduler *Scheduler) Less(i, j int) bool
- func (scheduler *Scheduler) Pop() any
- func (scheduler *Scheduler) Push(x any)
- func (scheduler *Scheduler) Shutdown()
- func (scheduler *Scheduler) SnapStats(to SchedulerStats) SchedulerStats
- func (scheduler *Scheduler) Start()
- func (scheduler *Scheduler) Swap(i, j int)
- type SchedulerConfig
- type SchedulerInternalMetrics
- type SchedulerState
- type SchedulerStats
- type Sender
- type StdoutMetricsQueue
- type Task
- type TaskStats
- type VmiConfig
Constants ¶
const ( COMPRESSOR_POOL_CONFIG_COMPRESSION_LEVEL_DEFAULT = gzip.DefaultCompression COMPRESSOR_POOL_CONFIG_NUM_COMPRESSORS_DEFAULT = -1 COMPRESSOR_POOL_MAX_NUM_COMPRESSORS = 4 COMPRESSOR_POOL_CONFIG_BUFFER_POOL_MAX_SIZE_DEFAULT = 64 COMPRESSOR_POOL_CONFIG_METRICS_QUEUE_SIZE_DEFAULT = 64 COMPRESSOR_POOL_CONFIG_BATCH_TARGET_SIZE_DEFAULT = "64k" COMPRESSOR_POOL_CONFIG_FLUSH_INTERVAL_DEFAULT = 5 * time.Second )
const ( INITIAL_COMPRESSION_FACTOR = 2. COMPRESSION_FACTOR_EXP_DECAY_ALPHA = 0.8 // A compressed batch should be at least this size to be used for updating // the compression factor: COMPRESSED_BATCH_MIN_SIZE_FOR_CF = 128 )
const ( COMPRESSOR_STATS_READ_COUNT = iota COMPRESSOR_STATS_READ_BYTE_COUNT COMPRESSOR_STATS_SEND_COUNT COMPRESSOR_STATS_SEND_BYTE_COUNT COMPRESSOR_STATS_TIMEOUT_FLUSH_COUNT COMPRESSOR_STATS_SEND_ERROR_COUNT COMPRESSOR_STATS_WRITE_ERROR_COUNT // Must be last: COMPRESSOR_STATS_UINT64_LEN )
Compressor stats:
const ( COMPRESSOR_STATS_COMPRESSION_FACTOR = iota // Must be last: COMPRESSOR_STATS_FLOAT64_LEN )
const ( VMI_CONFIG_SECTION_NAME = "vmi_config" GENERATORS_SECTION_NAME = "generators" VMI_CONFIG_USE_SHORT_HOSTNAME_DEFAULT = false VMI_CONFIG_SHUTDOWN_MAX_WAIT_DEFAULT = 5 * time.Second )
const ( // Indexes into the per generator []int stats: METRICS_GENERATOR_INVOCATION_COUNT = iota METRICS_GENERATOR_METRICS_COUNT METRICS_GENERATOR_BYTE_COUNT // Must be last: METRICS_GENERATOR_NUM_STATS )
Each metrics generator will maintain the following common stats:
const ( // The order in the metrics cache: GO_NUM_GOROUTINE_METRIC_INDEX = iota GO_MEM_SYS_BYTES_METRIC_INDEX GO_MEM_HEAP_BYTES_METRIC_INDEX GO_MEM_HEAP_SYS_BYTES_METRIC_INDEX GO_MEM_MALLOCS_DELTA_METRIC_INDEX GO_MEM_FREE_DELTA_METRIC_INDEX GO_MEM_IN_USE_OBJECT_COUNT_METRIC_INDEX GO_MEM_NUM_GC_DELTA_METRIC_INDEX // Must be last: GO_INTERNAL_METRICS_NUM )
const ( // Endpoint default values: HTTP_ENDPOINT_URL_DEFAULT = "http://localhost:8428/api/v1/import/prometheus" HTTP_ENDPOINT_MARK_UNHEALTHY_THRESHOLD_DEFAULT = 1 // Endpoint config pool default values: HTTP_ENDPOINT_POOL_CONFIG_SHUFFLE_DEFAULT = false HTTP_ENDPOINT_POOL_CONFIG_HEALTHY_ROTATE_INTERVAL_DEFAULT = 5 * time.Minute HTTP_ENDPOINT_POOL_CONFIG_ERROR_RESET_INTERVAL_DEFAULT = 1 * time.Minute HTTP_ENDPOINT_POOL_CONFIG_HEALTH_CHECK_INTERVAL_DEFAULT = 5 * time.Second HTTP_ENDPOINT_POOL_CONFIG_HEALTHY_MAX_WAIT_DEFAULT = 10 * time.Second HTTP_ENDPOINT_POOL_CONFIG_SEND_BUFFER_TIMEOUT_DEFAULT = 20 * time.Second HTTP_ENDPOINT_POOL_CONFIG_RATE_LIMIT_MBPS_DEFAULT = "" // Endpoint config definitions, later they may be configurable: HTTP_ENDPOINT_POOL_HEALTHY_CHECK_MIN_INTERVAL = 1 * time.Second HTTP_ENDPOINT_POOL_HEALTHY_POLL_INTERVAL = 500 * time.Millisecond HTTP_ENDPOINT_POOL_HEALTH_CHECK_ERR_LOG_INTERVAL = 10 * time.Second // http.Transport config default values: // Dialer config default values: HTTP_ENDPOINT_POOL_CONFIG_TCP_CONN_TIMEOUT_DEFAULT = 2 * time.Second HTTP_ENDPOINT_POOL_CONFIG_TCP_KEEP_ALIVE_DEFAULT = 15 * time.Second HTTP_ENDPOINT_POOL_CONFIG_MAX_IDLE_CONNS_DEFAULT = 0 // No limit HTTP_ENDPOINT_POOL_CONFIG_MAX_IDLE_CONNS_PER_HOST_DEFAULT = 1 HTTP_ENDPOINT_POOL_CONFIG_MAX_CONNS_PER_HOST_DEFAULT = 0 // No limit HTTP_ENDPOINT_POOL_CONFIG_IDLE_CONN_TIMEOUT_DEFAULT = 1 * time.Minute // http.Client config default values: HTTP_ENDPOINT_POOL_CONFIG_RESPONSE_TIMEOUT_DEFAULT = 5 * time.Second // Prefixes for the password field: HTTP_ENDPOINT_POOL_CONFIG_PASSWORD_FILE_PREFIX = "file:" HTTP_ENDPOINT_POOL_CONFIG_PASSWORD_ENV_PREFIX = "env:" HTTP_ENDPOINT_POOL_CONFIG_PASSWORD_PASS_PREFIX = "pass:" )
const ( HTTP_ENDPOINT_STATS_SEND_BUFFER_COUNT = iota HTTP_ENDPOINT_STATS_SEND_BUFFER_BYTE_COUNT HTTP_ENDPOINT_STATS_SEND_BUFFER_ERROR_COUNT HTTP_ENDPOINT_STATS_HEALTH_CHECK_COUNT HTTP_ENDPOINT_STATS_HEALTH_CHECK_ERROR_COUNT // Must be last: HTTP_ENDPOINT_STATS_LEN )
Endpoint stats:
const ( HTTP_ENDPOINT_POOL_STATS_HEALTHY_ROTATE_COUNT = iota HTTP_ENDPOINT_POOL_STATS_NO_HEALTHY_EP_ERROR_COUNT // Must be last: HTTP_ENDPOINT_POOL_STATS_LEN )
Endpoint pool stats:
const ( INTERNAL_METRICS_CONFIG_INTERVAL_DEFAULT = 5 * time.Second INTERNAL_METRICS_CONFIG_FULL_METRICS_FACTOR_DEFAULT = 12 // This generator id: INTERNAL_METRICS_ID = "internal_metrics" )
Generate internal metrics:
const ( // The following labels are common to all metrics: INSTANCE_LABEL_NAME = "vmi_inst" HOSTNAME_LABEL_NAME = "hostname" // Deltas since previous internal metrics interval: COMPRESSOR_STATS_READ_DELTA_METRIC = "vmi_compressor_read_delta" COMPRESSOR_STATS_READ_BYTE_DELTA_METRIC = "vmi_compressor_read_byte_delta" COMPRESSOR_STATS_SEND_DELTA_METRIC = "vmi_compressor_send_delta" COMPRESSOR_STATS_SEND_BYTE_DELTA_METRIC = "vmi_compressor_send_byte_delta" COMPRESSOR_STATS_TIMEOUT_FLUSH_DELTA_METRIC = "vmi_compressor_tout_flush_delta" COMPRESSOR_STATS_SEND_ERROR_DELTA_METRIC = "vmi_compressor_send_error_delta" COMPRESSOR_STATS_WRITE_ERROR_DELTA_METRIC = "vmi_compressor_write_error_delta" COMPRESSOR_STATS_COMPRESSION_FACTOR_METRIC = "vmi_compressor_compression_factor" COMPRESSOR_ID_LABEL_NAME = "compressor" // Invocation, metric and byte counts for the generator: METRICS_GENERATOR_INVOCATION_DELTA_METRIC = "vmi_metrics_gen_invocation_delta" METRICS_GENERATOR_METRICS_DELTA_METRIC = "vmi_metrics_gen_metrics_delta" METRICS_GENERATOR_BYTE_DELTA_METRIC = "vmi_metrics_gen_byte_delta" // Actual interval since the previous invocation. It should be closed to the // configured interval, but may be longer if the generator is busy. It could // be used to calculate the rates out of deltas METRICS_GENERATOR_DTIME_METRIC = "vmi_metrics_gen_dtime_sec" METRICS_GENERATOR_DTIME_METRIC_PRECISION = 6 METRICS_GENERATOR_ID_LABEL_NAME = "gen_id" GO_NUM_GOROUTINE_METRIC = "vmi_go_num_goroutine" GO_MEM_SYS_BYTES_METRIC = "vmi_go_mem_sys_bytes" GO_MEM_HEAP_BYTES_METRIC = "vmi_go_mem_heap_bytes" GO_MEM_HEAP_SYS_BYTES_METRIC = "vmi_go_mem_heap_sys_bytes" GO_MEM_IN_USE_OBJECT_COUNT_METRIC = "vmi_go_mem_in_use_object_count" // Deltas since previous internal metrics interval: GO_MEM_MALLOCS_DELTA_METRIC = "vmi_go_mem_malloc_delta" GO_MEM_FREE_DELTA_METRIC = "vmi_go_mem_free_delta" GO_MEM_NUM_GC_DELTA_METRIC = "vmi_go_mem_gc_delta" // Deltas since previous internal metrics interval: HTTP_ENDPOINT_STATS_SEND_BUFFER_DELTA_METRIC = "vmi_http_ep_send_buffer_delta" HTTP_ENDPOINT_STATS_SEND_BUFFER_BYTE_DELTA_METRIC = "vmi_http_ep_send_buffer_byte_delta" HTTP_ENDPOINT_STATS_SEND_BUFFER_ERROR_DELTA_METRIC = "vmi_http_ep_send_buffer_error_delta" HTTP_ENDPOINT_STATS_HEALTH_CHECK_DELTA_METRIC = "vmi_http_ep_healthcheck_delta" HTTP_ENDPOINT_STATS_HEALTH_CHECK_ERROR_DELTA_METRIC = "vmi_http_ep_healthcheck_error_delta" // Labels: HTTP_ENDPOINT_STATS_STATE_LABEL = "state" HTTP_ENDPOINT_URL_LABEL_NAME = "url" // Deltas since previous internal metrics interval: HTTP_ENDPOINT_POOL_STATS_HEALTHY_ROTATE_DELTA_METRIC = "vmi_http_ep_pool_healthy_rotate_delta" HTTP_ENDPOINT_POOL_STATS_NO_HEALTHY_EP_ERROR_DELTA_METRIC = "vmi_http_ep_pool_no_healthy_ep_error_delta" // Importer metric: VMI_UPTIME_METRIC = "vmi_uptime_sec" // heartbeat VMI_BUILD_INFO_METRIC = "vmi_build_info" VMI_VERSION_LABEL_NAME = "vmi_version" VMI_GIT_INFO_LABEL_NAME = "vmi_git_info" // OS metrics: OS_INFO_METRIC = "vmi_os_info" OS_INFO_LABEL_PREFIX = "os_info_" // prefix + OSInfoLabelKeys OS_RELEASE_METRIC = "vmi_os_release" OS_RELEASE_LABEL_PREFIX = "os_rel_" // prefix + OSReleaseLabelKeys OS_UPTIME_METRIC = "vmi_os_uptime_sec" UPTIME_METRIC_PRECISION = 6 // %CPU over internal metrics interval: VMI_PROC_PCPU_METRIC = "vmi_proc_pcpu" TASK_STATS_SCHEDULED_DELTA_METRIC = "vmi_task_scheduled_delta" TASK_STATS_DELAYED_DELTA_METRIC = "vmi_task_delayed_delta" TASK_STATS_OVERRUN_DELTA_METRIC = "vmi_task_overrun_delta" TASK_STATS_EXECUTED_DELTA_METRIC = "vmi_task_executed_delta" TASK_STATS_NEXT_TS_HACK_DELTA_METRIC = "vmi_task_next_ts_hack_delta" TASK_STATS_AVG_RUNTIME_METRIC = "vmi_task_avg_runtime_sec" TASK_STATS_AVG_RUNTIME_METRIC_PRECISION = 6 // Re-use generator ID label since they have the same value: TASK_STATS_TASK_ID_LABEL_NAME = METRICS_GENERATOR_ID_LABEL_NAME )
const ( CREDIT_NO_LIMIT = 0 CREDIT_EXACT_MATCH = 0 )
const ( READ_FILE_BUF_POOL_MAX_SIZE_UNBOUND = 0 READ_FILE_BUF_POOL_MAX_READ_SIZE_UNBOUND = 0 )
const ( CONFIG_FLAG_NAME = "config" INSTANCE_DEFAULT = "vmi" )
const ( SCHEDULER_CONFIG_NUM_WORKERS_DEFAULT = -1 SCHEDULER_MAX_NUM_WORKERS = 8 )
const ( SCHEDULER_TASK_Q_LEN = 64 SCHEDULER_TODO_Q_LEN = 64 // All intervals will be rounded to be a multiple of scheduler's granularity: SCHEDULER_GRANULARITY = 20 * time.Millisecond // The minimum pause between 2 consecutive executions of the same task: SCHEDULER_TASK_MIN_EXECUTION_PAUSE = 2 * SCHEDULER_GRANULARITY )
const ( // How many times the task was scheduled: TASK_STATS_SCHEDULED_COUNT = iota // How many times the task was delayed because it was too close to its // previous execution: TASK_STATS_DELAYED_COUNT // How many times the task overran, i.e. its runtime >= interval: TASK_STATS_OVERRUN_COUNT // How many times the task was executed: TASK_STATS_EXECUTED_COUNT // How many times the next scheduling time was hacked to counter the wall // clock seemingly going backwards (see AddNewTask): TASK_STATS_NEXT_TS_HACK_COUNT // Total runtime of the task, in microseconds. TASK_STATS_TOTAL_RUNTIME // Must be last: TASK_STATS_UINT64_LEN )
const (
// The help usage message line wraparound default width:
DEFAULT_FLAG_USAGE_WIDTH = 58
)
const (
GENERATOR_RUNTIME_UNAVAILABLE = -1.
)
Variables ¶
var ( AvailableCPUCount = GetAvailableCPUCount() BootTime = time.Now() Clktck int64 ClktckSec float64 OsInfo = make(map[string]string) OsRelease = make(map[string]string) )
var ( // The hostname, based on OS, config or command line arg. Hostname string // The instance should be primed w/ the desired default *before* invoking // the runner, most likely from an init() (e.g. set to "lsvmi" for Linux // Stats VictoriaMetrics importer) Its value may be modified via config and // command line args. Instance string = INSTANCE_DEFAULT // Build info, normally set via init() by the user of this package. Version string GitInfo string MetricsGenStats = NewMetricsGeneratorStatsContainer() MetricsQueue BufferQueue )
var ErrHttpEndpointPoolNoHealthyEP = errors.New("no healthy HTTP endpoint available")
Error codes:
var ErrReadFileBufPotentialTruncation = errors.New("potential truncation")
Reading a file may be limited by a max size; if the cap is reached then it is possible that the file was truncated (note that stat system will report size 0 for /proc files, so it cannot be used to determine the actual size). Should such a condition occur, it should be treated as an error to signal potential truncation to the caller.
var HttpEndpointPoolRetryCodes = map[int]bool{}
The list of HTTP codes that should be retried:
var HttpEndpointPoolSuccessCodes = map[int]bool{ http.StatusOK: true, http.StatusNoContent: true, }
The list of HTTP codes that denote success:
var MetricsGeneratorStatsMetricsNameMap = map[int]string{ METRICS_GENERATOR_INVOCATION_COUNT: METRICS_GENERATOR_INVOCATION_DELTA_METRIC, METRICS_GENERATOR_METRICS_COUNT: METRICS_GENERATOR_METRICS_DELTA_METRIC, METRICS_GENERATOR_BYTE_COUNT: METRICS_GENERATOR_BYTE_DELTA_METRIC, }
var OSInfoLabelKeys = []string{
"name",
"release",
"version",
"machine",
}
The following OsInfo keys will be used as labels in OS info metrics:
var OSReleaseLabelKeys = []string{
"id",
"name",
"pretty_name",
"version",
"version_codename",
"version_id",
}
The following OSRelease keys will be used as labels in OS info metrics:
var RootLogger = logrusx.NewCollectableLogger()
Functions ¶
func BuildHtmlBasicAuth ¶
func CompliantTaskInterval ¶
Ensure that a task interval is scheduler compliant:
func FormatFlagUsage ¶
func FormatFlagUsageWidth ¶
Format command flag usage for help message, by wrapping the lines around a given width. The original line breaks and prefixing white spaces are ignored. Example:
var flagArg = flag.String(
name, value, FormatFlagUsageWidth(` This usage message will be reformatted to the given width, discarding the current line breaks and line prefixing spaces. `, 40),
)
func GetAvailableCPUCount ¶
func GetAvailableCPUCount() int
For linux count available CPUs based on CPU affinity, w/ a fallback on runtime:
func GetCpuTime ¶
func GetInitialCycleNum ¶
func GetMyCpuTime ¶
func GetOsBootTime ¶
func GetOsReleaseInfo ¶
func GetRootLogger ¶
func GetRootLogger() *logrusx.CollectableLogger
Public access to the root logger, needed for testing:
func GetSysClktck ¶
func LoadPasswordSpec ¶
func NewCompLogger ¶
func ParseCreditRateSpec ¶
Parse rate limit Mbps string. Supported formats: FLOAT or FLOAT:INTERVAL, where INTERVAL should be in the format supported by time.ParseDuration(). FLOAT is equivalent w/ FLOAT:1s.
func RegisterTaskBuilder ¶
func RegisterTaskBuilder(tb func(config any) ([]MetricsGeneratorTask, error))
func SplitWords ¶
Types ¶
type BufferQueue ¶
type BufferQueue interface { GetBuf() *bytes.Buffer ReturnBuf(b *bytes.Buffer) QueueBuf(b *bytes.Buffer) GetTargetSize() int }
The generated metrics are written into *bytes.Buffer's which are then queued into the metrics queue for transmission.
type BytesReadSeekCloser ¶
type BytesReadSeekCloser struct {
// contains filtered or unexported fields
}
Convert bytes.Reader into ReadSeekRewindCloser such that it can be used as body for http.Request w/ retries:
func NewBytesReadSeekCloser ¶
func NewBytesReadSeekCloser(b []byte) *BytesReadSeekCloser
func (*BytesReadSeekCloser) Close ¶
func (brsc *BytesReadSeekCloser) Close() error
func (*BytesReadSeekCloser) Rewind ¶
func (brsc *BytesReadSeekCloser) Rewind() error
Reuse, for HTTP retries:
type CompressorPool ¶
type CompressorPool struct {
// contains filtered or unexported fields
}
func NewCompressorPool ¶
func NewCompressorPool(poolCfg *CompressorPoolConfig) (*CompressorPool, error)
func (*CompressorPool) GetBuf ¶
func (pool *CompressorPool) GetBuf() *bytes.Buffer
Satisfy BufferQueue interface:
func (*CompressorPool) GetTargetSize ¶
func (pool *CompressorPool) GetTargetSize() int
func (*CompressorPool) QueueBuf ¶
func (pool *CompressorPool) QueueBuf(b *bytes.Buffer)
func (*CompressorPool) ReturnBuf ¶
func (pool *CompressorPool) ReturnBuf(buf *bytes.Buffer)
func (*CompressorPool) Shutdown ¶
func (pool *CompressorPool) Shutdown()
func (*CompressorPool) SnapStats ¶
func (pool *CompressorPool) SnapStats(to CompressorPoolStats) CompressorPoolStats
func (*CompressorPool) Start ¶
func (pool *CompressorPool) Start(sender Sender)
type CompressorPoolConfig ¶
type CompressorPoolConfig struct { // The number of compressors. If set to -1 it will match the number of // available cores but not more than COMPRESSOR_POOL_MAX_NUM_COMPRESSORS: NumCompressors int `yaml:"num_compressors"` // Buffer pool size; buffers are pulled by metrics generators as needed and // they are returned after they are compressed. The pool max size controls // only how many idle buffers are being kept around, since they are created // as many as requested but they are discarded if they exceed the value // below. A value is too small leads to object churning and a value too // large may waste memory. BufferPoolMaxSize int `yaml:"buffer_pool_max_size"` // Metrics queue size, it should be deep enough to accommodate metrics up to // send_buffer_timeout: MetricsQueueSize int `yaml:"metrics_queue_size"` // Compression level: 0..9: CompressionLevel int `yaml:"compression_level"` // Batch target size; metrics will be read from the queue until the // compressed size is ~ to the value below. The value can have the usual `k` // or `m` suffixes for KiB or MiB accordingly. BatchTargetSize string `yaml:"batch_target_size"` // Flush interval. If batch_target_size is not reached before this interval // expires, the metrics compressed thus far are being sent anyway. Use 0 to // disable time flush. FlushInterval time.Duration `yaml:"flush_interval"` }
func DefaultCompressorPoolConfig ¶
func DefaultCompressorPoolConfig() *CompressorPoolConfig
type CompressorPoolInternalMetrics ¶
type CompressorPoolInternalMetrics struct {
// contains filtered or unexported fields
}
func NewCompressorPoolInternalMetrics ¶
func NewCompressorPoolInternalMetrics(internalMetrics *InternalMetrics) *CompressorPoolInternalMetrics
type CompressorPoolState ¶
type CompressorPoolState int
var ( CompressorPoolStateCreated CompressorPoolState = 0 CompressorPoolStateRunning CompressorPoolState = 1 CompressorPoolStateStopped CompressorPoolState = 2 )
func (CompressorPoolState) String ¶
func (state CompressorPoolState) String() string
type CompressorPoolStats ¶
type CompressorPoolStats map[string]*CompressorStats
func NewCompressorPoolStats ¶
func NewCompressorPoolStats(numCompressors int) CompressorPoolStats
type CompressorStats ¶
func NewCompressorStats ¶
func NewCompressorStats() *CompressorStats
type Credit ¶
type Credit struct {
// contains filtered or unexported fields
}
The actual implementation:
func NewCreditFromSpec ¶
func (*Credit) StopReplenish ¶
func (c *Credit) StopReplenish()
func (*Credit) StopReplenishWait ¶
func (c *Credit) StopReplenishWait()
type CreditController ¶
Define an interface for testing:
type CreditReader ¶
type CreditReader struct {
// contains filtered or unexported fields
}
Credit based reader, limiting the rate of data read from a byte buffer and implementing the io.ReadSeekCloser interface, so it can be used in http.Request.Body. This is used to control the rate of import into VictoriaMetrics.
func NewCreditReader ¶
func NewCreditReader(cc CreditController, minAcceptable int, b []byte) *CreditReader
func (*CreditReader) Read ¶
func (cr *CreditReader) Read(p []byte) (int, error)
Implement the Read interface:
func (*CreditReader) Reuse ¶
func (cr *CreditReader) Reuse(minAcceptable int, b []byte)
Reuse w/ new data:
type GeneratorBase ¶
type GeneratorBase struct { // Unique generator ID: Id string // Scheduling interval: Interval time.Duration // Full metrics factor (see "Partial V. Full Metrics" in README.md): FullMetricsFactor int // The current cycle# used in conjunction with the FullMetricsFactor: CycleNum int // The timestamp of the last metrics generation: LastTs time.Time // Cache for generator metrics: DtimeMetric []byte // Cache for timestamp suffix, common to all/a group of metrics: TsSuffixBuf *bytes.Buffer // Whether the structure was initialized (caches, etc) or not: Initialized bool // The following fields, if left to their default values (type's nil) will // be set during initialization with the usual values. They may be // pre-populated during tests after the generator was created and before // initialization. Instance string Hostname string TimeNowFunc func() time.Time MetricsQueue BufferQueue TestMode bool }
func (*GeneratorBase) GenBaseInit ¶
func (gb *GeneratorBase) GenBaseInit()
func (*GeneratorBase) GenBaseMetricsStart ¶
Start metrics generation; this should the 1st call in a metrics generation since it establishes the timestamp suffix. Call with the buffer for the metrics and the timestamp of the collection. Return the metric count and the last timestamp of the previous run. If the buffer is nil, then no metrics are generated, but the timestamp suffix is still updated.
func (*GeneratorBase) GetInterval ¶
func (gb *GeneratorBase) GetInterval() time.Duration
type GeneratorInternalMetrics ¶
type GeneratorInternalMetrics struct {
// contains filtered or unexported fields
}
func NewGeneratorInternalMetrics ¶
func NewGeneratorInternalMetrics(internalMetrics *InternalMetrics) *GeneratorInternalMetrics
func (*GeneratorInternalMetrics) SnapStats ¶
func (gim *GeneratorInternalMetrics) SnapStats()
type GoInternalMetrics ¶
type GoInternalMetrics struct {
// contains filtered or unexported fields
}
func NewGoInternalMetrics ¶
func NewGoInternalMetrics(internalMetrics *InternalMetrics) *GoInternalMetrics
func (*GoInternalMetrics) SnapStats ¶
func (gim *GoInternalMetrics) SnapStats()
type HttpClientDoer ¶
type HttpClientDoer interface { Do(req *http.Request) (*http.Response, error) CloseIdleConnections() }
Define a mockable interface to substitute http.Client.Do() for testing purposes:
type HttpEndpoint ¶
type HttpEndpoint struct { // The parsed format for above, to be used for http calls: URL *url.URL // contains filtered or unexported fields }
func NewHttpEndpoint ¶
func NewHttpEndpoint(cfg *HttpEndpointConfig) (*HttpEndpoint, error)
type HttpEndpointConfig ¶
type HttpEndpointConfig struct { URL string MarkUnhealthyThreshold int `yaml:"mark_unhealthy_threshold"` }
func DefaultHttpEndpointConfig ¶
func DefaultHttpEndpointConfig() *HttpEndpointConfig
type HttpEndpointDoublyLinkedList ¶
type HttpEndpointDoublyLinkedList struct {
// contains filtered or unexported fields
}
func (*HttpEndpointDoublyLinkedList) AddToHead ¶
func (epDblLnkList *HttpEndpointDoublyLinkedList) AddToHead(ep *HttpEndpoint)
func (*HttpEndpointDoublyLinkedList) AddToTail ¶
func (epDblLnkList *HttpEndpointDoublyLinkedList) AddToTail(ep *HttpEndpoint)
func (*HttpEndpointDoublyLinkedList) Insert ¶
func (epDblLnkList *HttpEndpointDoublyLinkedList) Insert(ep, after *HttpEndpoint)
func (*HttpEndpointDoublyLinkedList) Remove ¶
func (epDblLnkList *HttpEndpointDoublyLinkedList) Remove(ep *HttpEndpoint)
type HttpEndpointPool ¶
type HttpEndpointPool struct {
// contains filtered or unexported fields
}
func NewHttpEndpointPool ¶
func NewHttpEndpointPool(poolCfg *HttpEndpointPoolConfig) (*HttpEndpointPool, error)
func (*HttpEndpointPool) GetCurrentHealthy ¶
func (epPool *HttpEndpointPool) GetCurrentHealthy(maxWait time.Duration) *HttpEndpoint
Get the current healthy endpoint or nil if none available after max wait; if maxWait < 0 then the pool healthyMaxWait is used:
func (*HttpEndpointPool) HealthCheck ¶
func (epPool *HttpEndpointPool) HealthCheck(ep *HttpEndpoint)
func (*HttpEndpointPool) MoveToHealthy ¶
func (epPool *HttpEndpointPool) MoveToHealthy(ep *HttpEndpoint)
func (*HttpEndpointPool) ReportError ¶
func (epPool *HttpEndpointPool) ReportError(ep *HttpEndpoint)
func (*HttpEndpointPool) SendBuffer ¶
SendBuffer: the main reason for the pool is to send buffers w/ load balancing and retries. If timeout is < 0 then the pool's sendBufferTimeout is used:
func (*HttpEndpointPool) Shutdown ¶
func (epPool *HttpEndpointPool) Shutdown()
Needed for testing or clean exit in general:
func (*HttpEndpointPool) SnapStats ¶
func (pool *HttpEndpointPool) SnapStats(to *HttpEndpointPoolStats) *HttpEndpointPoolStats
type HttpEndpointPoolConfig ¶
type HttpEndpointPoolConfig struct { Endpoints []*HttpEndpointConfig `yaml:"endpoints"` Username string `yaml:"username"` Password string `yaml:"password"` MarkUnhealthyThreshold int `yaml:"mark_unhealthy_threshold"` Shuffle bool `yaml:"shuffle"` HealthyRotateInterval time.Duration `yaml:"healthy_rotate_interval"` ErrorResetInterval time.Duration `yaml:"error_reset_interval"` HealthCheckInterval time.Duration `yaml:"health_check_interval"` HealthyMaxWait time.Duration `yaml:"healthy_max_wait"` SendBufferTimeout time.Duration `yaml:"send_buffer_timeout"` RateLimitMbps string `yaml:"rate_limit_mbps"` IgnoreTLSVerify bool `yaml:"ignore_tls_verify"` TcpConnTimeout time.Duration `yaml:"tcp_conn_timeout"` TcpKeepAlive time.Duration `yaml:"tcp_keep_alive"` MaxIdleConns int `yaml:"max_idle_conns"` MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` MaxConnsPerHost int `yaml:"max_conns_per_host"` IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"` ResponseTimeout time.Duration `yaml:"response_timeout"` }
func DefaultHttpEndpointPoolConfig ¶
func DefaultHttpEndpointPoolConfig() *HttpEndpointPoolConfig
func (*HttpEndpointPoolConfig) OverrideEndpoints ¶
func (poolCfg *HttpEndpointPoolConfig) OverrideEndpoints(urlList string)
Used for command line argument override:
type HttpEndpointPoolInternalMetrics ¶
type HttpEndpointPoolInternalMetrics struct {
// contains filtered or unexported fields
}
func NewHttpEndpointPoolInternalMetrics ¶
func NewHttpEndpointPoolInternalMetrics(internalMetrics *InternalMetrics) *HttpEndpointPoolInternalMetrics
type HttpEndpointPoolStats ¶
type HttpEndpointPoolStats struct { PoolStats HttpPoolStats // Endpoint stats are indexed by URL: EndpointStats map[string]HttpEndpointStats }
func NewHttpEndpointPoolStats ¶
func NewHttpEndpointPoolStats() *HttpEndpointPoolStats
type HttpEndpointStats ¶
type HttpEndpointStats []uint64
type HttpPoolStats ¶
type HttpPoolStats []uint64
type InternalMetrics ¶
type InternalMetrics struct { GeneratorBase // contains filtered or unexported fields }
func NewInternalMetrics ¶
func NewInternalMetrics(internalMetricsCfg *InternalMetricsConfig) (*InternalMetrics, error)
func (*InternalMetrics) TaskAction ¶
func (internalMetrics *InternalMetrics) TaskAction() bool
type InternalMetricsConfig ¶
type InternalMetricsConfig struct { Interval time.Duration `yaml:"interval"` FullMetricsFactor int `yaml:"full_metrics_factor"` }
func DefaultInternalMetricsConfig ¶
func DefaultInternalMetricsConfig() *InternalMetricsConfig
type MetricsGeneratorStats ¶
type MetricsGeneratorStatsContainer ¶
type MetricsGeneratorStatsContainer struct {
// contains filtered or unexported fields
}
func NewMetricsGeneratorStatsContainer ¶
func NewMetricsGeneratorStatsContainer() *MetricsGeneratorStatsContainer
func (*MetricsGeneratorStatsContainer) Clear ¶
func (mgsc *MetricsGeneratorStatsContainer) Clear()
func (*MetricsGeneratorStatsContainer) Update ¶
func (mgsc *MetricsGeneratorStatsContainer) Update(genId string, metricCount, byteCount uint64)
type MetricsGeneratorTask ¶
type MetricsGeneratorTask interface { GetId() string GetInterval() time.Duration TaskActivity() bool }
The metrics generator interface which allows it to be scheduled as a Task:
type ProcessInternalMetrics ¶
type ProcessInternalMetrics struct {
// contains filtered or unexported fields
}
func NewProcessInternalMetrics ¶
func NewProcessInternalMetrics(internalMetrics *InternalMetrics) *ProcessInternalMetrics
func (*ProcessInternalMetrics) SnapStats ¶
func (pim *ProcessInternalMetrics) SnapStats()
type ReadFileBufPool ¶
type ReadFileBufPool struct {
// contains filtered or unexported fields
}
func NewBufPool ¶
func NewBufPool(maxPoolSize int) *ReadFileBufPool
func NewReadFileBufPool ¶
func NewReadFileBufPool(maxPoolSize int, maxReadSize int64) *ReadFileBufPool
func (*ReadFileBufPool) GetBuf ¶
func (p *ReadFileBufPool) GetBuf() *bytes.Buffer
func (*ReadFileBufPool) MaxPoolSize ¶
func (p *ReadFileBufPool) MaxPoolSize() int
func (*ReadFileBufPool) MaxReadSize ¶
func (p *ReadFileBufPool) MaxReadSize() int64
func (*ReadFileBufPool) ReadFile ¶
func (p *ReadFileBufPool) ReadFile(path string) (*bytes.Buffer, error)
func (*ReadFileBufPool) ReturnBuf ¶
func (p *ReadFileBufPool) ReturnBuf(b *bytes.Buffer)
type ReadSeekRewindCloser ¶
type ReadSeekRewindCloser interface { io.ReadSeekCloser Rewind() error }
Interface for a http.Request body w/ retries:
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(schedulerCfg *SchedulerConfig) (*Scheduler, error)
func (*Scheduler) AddNewTask ¶
func (*Scheduler) SnapStats ¶
func (scheduler *Scheduler) SnapStats(to SchedulerStats) SchedulerStats
Snap current stats.
type SchedulerConfig ¶
type SchedulerConfig struct { // The number of workers. If set to -1 it will match the number of // available cores: NumWorkers int `yaml:"num_workers"` }
func DefaultSchedulerConfig ¶
func DefaultSchedulerConfig() *SchedulerConfig
type SchedulerInternalMetrics ¶
type SchedulerInternalMetrics struct {
// contains filtered or unexported fields
}
func NewSchedulerInternalMetrics ¶
func NewSchedulerInternalMetrics(internalMetrics *InternalMetrics) *SchedulerInternalMetrics
type SchedulerState ¶
type SchedulerState int
var ( SchedulerStateCreated SchedulerState = 0 SchedulerStateRunning SchedulerState = 1 SchedulerStateStopped SchedulerState = 2 )
func (SchedulerState) String ¶
func (state SchedulerState) String() string
type SchedulerStats ¶
type StdoutMetricsQueue ¶
type StdoutMetricsQueue struct {
// contains filtered or unexported fields
}
func NewStdoutMetricsQueue ¶
func NewStdoutMetricsQueue(poolCfg *CompressorPoolConfig) (*StdoutMetricsQueue, error)
func (*StdoutMetricsQueue) GetBuf ¶
func (mq *StdoutMetricsQueue) GetBuf() *bytes.Buffer
func (*StdoutMetricsQueue) GetTargetSize ¶
func (mq *StdoutMetricsQueue) GetTargetSize() int
func (*StdoutMetricsQueue) QueueBuf ¶
func (mq *StdoutMetricsQueue) QueueBuf(buf *bytes.Buffer)
func (*StdoutMetricsQueue) ReturnBuf ¶
func (mq *StdoutMetricsQueue) ReturnBuf(buf *bytes.Buffer)
func (*StdoutMetricsQueue) Shutdown ¶
func (mq *StdoutMetricsQueue) Shutdown()
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func InternalMetricsTaskBuilder ¶
Define and register the task builder:
type TaskStats ¶
func NewTaskStats ¶
func NewTaskStats() *TaskStats
type VmiConfig ¶
type VmiConfig struct { // The instance name, default "vmi". It may be overridden by --instance // command line arg. Instance string `yaml:"instance"` // Whether to use short hostname or not as the value for hostname label. // Typically the hostname is determined from the hostname system call and if // the flag below is in effect, it is stripped of domain part. However if // the hostname is overridden by --hostname command line arg, that value is // used as-is. UseShortHostname bool `yaml:"use_short_hostname"` // How long to wait for a graceful shutdown. A negative value signifies // indefinite wait and 0 stands for no wait at all (exit abruptly). ShutdownMaxWait time.Duration `yaml:"shutdown_max_wait"` // Specific components configuration. LoggerConfig *logrusx.LoggerConfig `yaml:"log_config"` CompressorPoolConfig *CompressorPoolConfig `yaml:"compressor_pool_config"` HttpEndpointPoolConfig *HttpEndpointPoolConfig `yaml:"http_endpoint_pool_config"` SchedulerConfig *SchedulerConfig `yaml:"scheduler_config"` // Internal metrics configuration. InternalMetricsConfig *InternalMetricsConfig `yaml:"internal_metrics_config"` }
func DefaultVmiConfig ¶
func DefaultVmiConfig() *VmiConfig
func LoadConfig ¶
LoadConfig loads the configuration from the specified YAML file (or buffer, for testing) as follows:
- the vmi_config section is returned as a *VmiConfig structure
- the generators section is loaded into the provided genConfig structure, which expected to have been primed with default values.
Additionally an error is returned if the configuration could not be loaded or parsed.
Source Files
¶
- available_cpus_linux.go
- clktck_unix.go
- cmdline_utils.go
- compressor_pool.go
- compressor_pool_internal_metrics.go
- config.go
- generator_base.go
- generator_internal_metrics.go
- go_internal_metrics.go
- http_endpoint_pool.go
- http_endpoint_pool_internal_metrics.go
- internal_metrics.go
- logger.go
- metrics_definitions.go
- os_boot_time_unix.go
- os_info.go
- os_info_unix.go
- os_release_linux.go
- process_internal_metrics.go
- process_unix.go
- rate_controller.go
- readfile_buf_pool.go
- runner.go
- scheduler.go
- scheduler_internal_metrics.go
- stdout_metrics_queue.go
- string_utils.go