Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeFalconMessage(ptr unsafe.Pointer, iter *jsoniter.Iterator)
- func NewRateLimitPoster(_limiter *unsafe.Pointer, poster Poster) *rateLimitPoster
- func NewRetryBuffer(dataDir string, name string, size, batch int, max time.Duration, p Poster) *retryBuffer
- func NewRocksDBBuffer(dataDir string, name string, size uint64, p Poster) *rocksDBBuffer
- func NewSimplePoster(name string, writeEndpoint string, location string, timeout time.Duration, ...) *simplePoster
- func Register(name string, constructor RelayConstructor)
- type Config
- type ConfigServer
- type ConsumeProcessor
- type DatabaseConfig
- type DefaultProduceCallback
- type FalconMessage
- type FalconProtocol
- type HTTPConfig
- type HTTPOutputConfig
- type HealthChecker
- type HttpBackend
- type InfluxdbProtocol
- type KafkaBackend
- type KafkaCluster
- func (c *KafkaCluster) AsyncProduce(topic string, value []byte, key []byte, headers []sarama.RecordHeader, ...)
- func (c *KafkaCluster) Close() error
- func (c *KafkaCluster) Name() string
- func (c *KafkaCluster) Open() error
- func (c *KafkaCluster) Produce(context context.Context, topic string, value []byte, key []byte, ...) error
- type KafkaClusterConfig
- type KafkaConsumer
- type KafkaOutputConfig
- type MeasurementKey
- type Poster
- type ProduceCallback
- type Protocol
- type ProtocolFactory
- type Relay
- type RelayConstructor
- type ResponseData
- type Service
- type ServiceConfig
- type UDPConfig
- type UDPOutputConfig
Constants ¶
Variables ¶
View Source
var ErrBufferFull = errors.New("retry buffer full")
View Source
var (
OnShutdownWg sync.WaitGroup
)
Functions ¶
func NewRateLimitPoster ¶
func NewRetryBuffer ¶
func NewRocksDBBuffer ¶
func NewSimplePoster ¶
func Register ¶
func Register(name string, constructor RelayConstructor)
Types ¶
type Config ¶
type Config struct { HTTPRelays []HTTPConfig `toml:"http"` UDPRelays []UDPConfig `toml:"udp"` PprofPort int `toml:"pprof-port"` }
type ConfigServer ¶
type ConsumeProcessor ¶
type DatabaseConfig ¶
type DefaultProduceCallback ¶
type DefaultProduceCallback struct {
// contains filtered or unexported fields
}
func (*DefaultProduceCallback) Catch ¶
func (cb *DefaultProduceCallback) Catch(err *sarama.ProducerError)
func (*DefaultProduceCallback) Done ¶
func (cb *DefaultProduceCallback) Done(message *sarama.ProducerMessage)
type FalconMessage ¶
type FalconProtocol ¶
type FalconProtocol struct {
// contains filtered or unexported fields
}
func (*FalconProtocol) Decode ¶
func (p *FalconProtocol) Decode(msg *sarama.ConsumerMessage) (query, auth string, body []byte, err error)
type HTTPConfig ¶
type HTTPConfig struct { // Name identifies the HTTP relay Name string `toml:"name"` // Addr should be set to the desired listening host:port Addr string `toml:"bind-addr"` ConfigServer ConfigServer `toml:"config-server"` // Set certificate in order to handle HTTPS requests SSLCombinedPem string `toml:"ssl-combined-pem"` // Default retention policy to set for forwarded requests DefaultRetentionPolicy string `toml:"default-retention-policy"` // Outputs is a list of backed servers where writes will be forwarded Outputs []HTTPOutputConfig `toml:"output"` Databases []DatabaseConfig `toml:"database"` DataDir string `toml:"data-dir"` UseKafkaOutput bool `toml:"use-kafka-output"` UseKafkaInput bool `toml:"use-kafka-input"` KafkaGroup string `toml:"kafka-group"` SlowLogThreshold time.Duration `toml:"slow-log-threshold"` QueryMaxRange time.Duration `toml:"query-max-range"` MetricDb string `toml:"metric-db"` }
type HTTPOutputConfig ¶
type HTTPOutputConfig struct { // Name of the backend server Name string `toml:"name" json:"name"` // Location should be set to the URL of the backend server's write endpoint Location string `toml:"location" json:"location"` // WriteEndpoint only for write endpoint WriteEndpoint string `toml:"write-endpoint" json:"write-endpoint"` // QueryEndpoint only for query endpoint QueryEndpoint string `toml:"query-endpoint" json:"query-endpoint"` // HealthCheck HealthCheck string `toml:"health-check" json:"health-check"` // Timeout sets a per-backend timeout for write requests. (Default 10s) // The format used is the same seen in time.ParseDuration Timeout string `toml:"timeout" json:"timeout"` RocksDBBufferSizeMB int `toml:"rocksdb-buffer-size-mb" json:"rocksdb-buffer-size-mb"` // Buffer failed writes up to maximum count. (Default 0, retry/buffering disabled) BufferSizeMB int `toml:"buffer-size-mb" json:"buffer-size-mb"` // Maximum batch size in KB (Default 512) MaxBatchKB int `toml:"max-batch-kb" json:"max-batch-kb"` // Maximum delay between retry attempts. // The format used is the same seen in time.ParseDuration (Default 10s) MaxDelayInterval string `toml:"max-delay-interval" json:"max-delay-interval"` // Skip TLS verification in order to use self signed certificate. // WARNING: It's insecure. Use it only for developing and don't use in production. SkipTLSVerification bool `toml:"skip-tls-verification" json:"skip-tls-verification"` RateLimit float64 `toml:"rate-limit" json:"rate-limit"` RateBurst int `toml:"rate-burst" json:"rate-burst"` Global bool `toml:"global" json:"global"` }
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
func NewHealthChecker ¶
func NewHealthChecker(backend *HttpBackend) *HealthChecker
func (*HealthChecker) Run ¶
func (c *HealthChecker) Run()
func (*HealthChecker) Stop ¶
func (c *HealthChecker) Stop()
type HttpBackend ¶
type HttpBackend struct { Poster // contains filtered or unexported fields }
func NewHTTPBackend ¶
func NewHTTPBackend(dataDir string, cfg *HTTPOutputConfig, registry metrics.Registry) (*HttpBackend, error)
func (*HttpBackend) Alive ¶
func (h *HttpBackend) Alive() bool
func (*HttpBackend) Host ¶
func (h *HttpBackend) Host() string
func (*HttpBackend) Name ¶
func (h *HttpBackend) Name() string
func (*HttpBackend) ReverseProxy ¶
func (h *HttpBackend) ReverseProxy() *httputil.ReverseProxy
func (*HttpBackend) UpdateConfig ¶
func (h *HttpBackend) UpdateConfig(cfg HTTPOutputConfig) error
type InfluxdbProtocol ¶
type InfluxdbProtocol struct{}
func (*InfluxdbProtocol) Decode ¶
func (*InfluxdbProtocol) Decode(msg *sarama.ConsumerMessage) (query, auth string, body []byte, err error)
type KafkaBackend ¶
type KafkaBackend struct {
// contains filtered or unexported fields
}
func NewKafkaBackend ¶
func NewKafkaBackend(cfg *KafkaOutputConfig, cluster *KafkaCluster) (*KafkaBackend, error)
func (*KafkaBackend) Catch ¶
func (b *KafkaBackend) Catch(err *sarama.ProducerError)
func (*KafkaBackend) Done ¶
func (b *KafkaBackend) Done(message *sarama.ProducerMessage)
func (*KafkaBackend) Name ¶
func (b *KafkaBackend) Name() string
func (*KafkaBackend) Post ¶
func (b *KafkaBackend) Post(buf []byte, query string, auth string) (*ResponseData, error)
type KafkaCluster ¶
type KafkaCluster struct {
// contains filtered or unexported fields
}
func NewKafkaCluster ¶
func NewKafkaCluster(cluster KafkaClusterConfig) (*KafkaCluster, error)
func (*KafkaCluster) AsyncProduce ¶
func (c *KafkaCluster) AsyncProduce(topic string, value []byte, key []byte, headers []sarama.RecordHeader, callback ProduceCallback)
func (*KafkaCluster) Close ¶
func (c *KafkaCluster) Close() error
func (*KafkaCluster) Name ¶
func (c *KafkaCluster) Name() string
func (*KafkaCluster) Open ¶
func (c *KafkaCluster) Open() error
type KafkaClusterConfig ¶
type KafkaClusterConfig struct { // Name kafka cluster name Name string `toml:"name" json:"name"` // Zookeeper zookeeper [host:port] Zookeeper []string `toml:"zookeeper" json:"zookeeper"` // BootstrapServer kafka server [host:port] BootstrapServer []string `toml:"bootstrap-server" json:"bootstrap-server"` // Version kafka version 1.1.0 Version string `toml:"version" json:"version"` }
type KafkaConsumer ¶
type KafkaConsumer struct { ConsumeProcessor // contains filtered or unexported fields }
func NewKafkaConsumer ¶
func NewKafkaConsumer(group string, cp ConsumeProcessor, config KafkaClusterConfig) (*KafkaConsumer, error)
func (*KafkaConsumer) AddTopic ¶
func (c *KafkaConsumer) AddTopic(o KafkaOutputConfig) error
func (*KafkaConsumer) Close ¶
func (c *KafkaConsumer) Close()
func (*KafkaConsumer) Name ¶
func (c *KafkaConsumer) Name() string
func (*KafkaConsumer) Open ¶
func (c *KafkaConsumer) Open() error
type KafkaOutputConfig ¶
type KafkaOutputConfig struct { // Name name Name string `toml:"name" json:"name"` // Cluster kafka cluster name Cluster string `toml:"cluster" json:"cluster"` // Topic name Topic string `toml:"topic" json:"topic"` // Protocol 默认为空时 influxdb, json, falcon 其他按需增加 Protocol map[string]interface{} `toml:"protocol" json:"protocol"` }
type MeasurementKey ¶
type MeasurementKey string
func NewMeasurementKey ¶
func NewMeasurementKey(name string, tvs ...string) MeasurementKey
func (MeasurementKey) Measurement ¶
func (m MeasurementKey) Measurement() string
func (MeasurementKey) Tags ¶
func (m MeasurementKey) Tags() map[string]string
type ProduceCallback ¶
type ProduceCallback interface { Catch(err *sarama.ProducerError) Done(message *sarama.ProducerMessage) }
type Protocol ¶
type Protocol interface {
Decode(msg *sarama.ConsumerMessage) (query, auth string, body []byte, err error)
}
func NewFalconProtocol ¶
func NewInfluxdbProtocol ¶
type ProtocolFactory ¶
type ResponseData ¶
func (*ResponseData) Write ¶
func (rd *ResponseData) Write(w http.ResponseWriter)
type ServiceConfig ¶
type ServiceConfig struct {
PprofPort int `mapstructure:"pprof-port"`
}
type UDPConfig ¶
type UDPConfig struct { // Name identifies the UDP relay Name string `toml:"name"` // Addr is where the UDP relay will listen for packets Addr string `toml:"bind-addr"` // Precision sets the precision of the timestamps (input and output) Precision string `toml:"precision"` // ReadBuffer sets the socket buffer for incoming connections ReadBuffer int `toml:"read-buffer"` // Outputs is a list of backend servers where writes will be forwarded Outputs []UDPOutputConfig `toml:"output"` }
type UDPOutputConfig ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.