relay

package
v0.0.0-...-1658f41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2019 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultHTTPTimeout      = 10 * time.Second
	DefaultMaxDelayInterval = 10 * time.Second
	DefaultBatchSizeKB      = 512
	KB                      = 1024
	MB                      = 1024 * KB
)

Variables

View Source
var ErrBufferFull = errors.New("retry buffer full")
View Source
var (
	OnShutdownWg sync.WaitGroup
)

Functions

func DecodeFalconMessage

func DecodeFalconMessage(ptr unsafe.Pointer, iter *jsoniter.Iterator)

func NewRateLimitPoster

func NewRateLimitPoster(_limiter *unsafe.Pointer, poster Poster) *rateLimitPoster

func NewRetryBuffer

func NewRetryBuffer(dataDir string, name string, size, batch int, max time.Duration, p Poster) *retryBuffer

func NewRocksDBBuffer

func NewRocksDBBuffer(dataDir string, name string, size uint64, p Poster) *rocksDBBuffer

func NewSimplePoster

func NewSimplePoster(name string, writeEndpoint string, location string, timeout time.Duration, skipTLSVerification bool, registry metrics.Registry) *simplePoster

func Register

func Register(name string, constructor RelayConstructor)

Types

type Config

type Config struct {
	HTTPRelays []HTTPConfig `toml:"http"`
	UDPRelays  []UDPConfig  `toml:"udp"`
	PprofPort  int          `toml:"pprof-port"`
}

type ConfigServer

type ConfigServer struct {
	Name           string `toml:"name"`
	BindClient     string `toml:"bind-client"`
	BindPeer       string `toml:"bind-peer"`
	Dir            string `toml:"dir"`
	InitialCluster string `toml:"initial-cluster"`
	ClusterState   string `toml:"cluster-state"`
}

type ConsumeProcessor

type ConsumeProcessor interface {
	Process(msg *sarama.ConsumerMessage, protocol Protocol) error
	ShouldRetry(err error, fail int) bool
}

type DatabaseConfig

type DatabaseConfig struct {
	Name    string   `toml:"name" json:"name"`
	Owner   string   `toml:"owner" json:"owner"`
	Outputs []string `toml:"output" json:"output"`
}

type DefaultProduceCallback

type DefaultProduceCallback struct {
	// contains filtered or unexported fields
}

func (*DefaultProduceCallback) Catch

func (*DefaultProduceCallback) Done

func (cb *DefaultProduceCallback) Done(message *sarama.ProducerMessage)

type FalconMessage

type FalconMessage struct {
	Step      int64             `json:"step"`
	Value     float64           `json:"value"`
	Tags      map[string]string `json:"tags"`
	Metric    string            `json:"metric"`
	Timestamp int64             `json:"timestamp"`
	OtherTags map[string]string `json:"other_tags"`
}

type FalconProtocol

type FalconProtocol struct {
	// contains filtered or unexported fields
}

func (*FalconProtocol) Decode

func (p *FalconProtocol) Decode(msg *sarama.ConsumerMessage) (query, auth string, body []byte, err error)

type HTTPConfig

type HTTPConfig struct {
	// Name identifies the HTTP relay
	Name string `toml:"name"`

	// Addr should be set to the desired listening host:port
	Addr string `toml:"bind-addr"`

	ConfigServer ConfigServer `toml:"config-server"`

	// Set certificate in order to handle HTTPS requests
	SSLCombinedPem string `toml:"ssl-combined-pem"`
	// Default retention policy to set for forwarded requests
	DefaultRetentionPolicy string `toml:"default-retention-policy"`

	// Outputs is a list of backed servers where writes will be forwarded
	Outputs []HTTPOutputConfig `toml:"output"`

	Databases []DatabaseConfig `toml:"database"`

	DataDir string `toml:"data-dir"`

	UseKafkaOutput   bool          `toml:"use-kafka-output"`
	UseKafkaInput    bool          `toml:"use-kafka-input"`
	KafkaGroup       string        `toml:"kafka-group"`
	SlowLogThreshold time.Duration `toml:"slow-log-threshold"`
	QueryMaxRange    time.Duration `toml:"query-max-range"`
	MetricDb         string        `toml:"metric-db"`
}

type HTTPOutputConfig

type HTTPOutputConfig struct {
	// Name of the backend server
	Name string `toml:"name" json:"name"`

	// Location should be set to the URL of the backend server's write endpoint
	Location string `toml:"location" json:"location"`

	// WriteEndpoint only for write endpoint
	WriteEndpoint string `toml:"write-endpoint" json:"write-endpoint"`

	// QueryEndpoint only for query endpoint
	QueryEndpoint string `toml:"query-endpoint" json:"query-endpoint"`

	// HealthCheck
	HealthCheck string `toml:"health-check" json:"health-check"`

	// Timeout sets a per-backend timeout for write requests. (Default 10s)
	// The format used is the same seen in time.ParseDuration
	Timeout string `toml:"timeout" json:"timeout"`

	RocksDBBufferSizeMB int `toml:"rocksdb-buffer-size-mb" json:"rocksdb-buffer-size-mb"`

	// Buffer failed writes up to maximum count. (Default 0, retry/buffering disabled)
	BufferSizeMB int `toml:"buffer-size-mb" json:"buffer-size-mb"`

	// Maximum batch size in KB (Default 512)
	MaxBatchKB int `toml:"max-batch-kb" json:"max-batch-kb"`

	// Maximum delay between retry attempts.
	// The format used is the same seen in time.ParseDuration (Default 10s)
	MaxDelayInterval string `toml:"max-delay-interval" json:"max-delay-interval"`

	// Skip TLS verification in order to use self signed certificate.
	// WARNING: It's insecure. Use it only for developing and don't use in production.
	SkipTLSVerification bool `toml:"skip-tls-verification" json:"skip-tls-verification"`

	RateLimit float64 `toml:"rate-limit" json:"rate-limit"`

	RateBurst int `toml:"rate-burst" json:"rate-burst"`

	Global bool `toml:"global" json:"global"`
}

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

func NewHealthChecker

func NewHealthChecker(backend *HttpBackend) *HealthChecker

func (*HealthChecker) Run

func (c *HealthChecker) Run()

func (*HealthChecker) Stop

func (c *HealthChecker) Stop()

type HttpBackend

type HttpBackend struct {
	Poster
	// contains filtered or unexported fields
}

func NewHTTPBackend

func NewHTTPBackend(dataDir string, cfg *HTTPOutputConfig, registry metrics.Registry) (*HttpBackend, error)

func (*HttpBackend) Alive

func (h *HttpBackend) Alive() bool

func (*HttpBackend) Host

func (h *HttpBackend) Host() string

func (*HttpBackend) Name

func (h *HttpBackend) Name() string

func (*HttpBackend) ReverseProxy

func (h *HttpBackend) ReverseProxy() *httputil.ReverseProxy

func (*HttpBackend) UpdateConfig

func (h *HttpBackend) UpdateConfig(cfg HTTPOutputConfig) error

type InfluxdbProtocol

type InfluxdbProtocol struct{}

func (*InfluxdbProtocol) Decode

func (*InfluxdbProtocol) Decode(msg *sarama.ConsumerMessage) (query, auth string, body []byte, err error)

type KafkaBackend

type KafkaBackend struct {
	// contains filtered or unexported fields
}

func NewKafkaBackend

func NewKafkaBackend(cfg *KafkaOutputConfig, cluster *KafkaCluster) (*KafkaBackend, error)

func (*KafkaBackend) Catch

func (b *KafkaBackend) Catch(err *sarama.ProducerError)

func (*KafkaBackend) Done

func (b *KafkaBackend) Done(message *sarama.ProducerMessage)

func (*KafkaBackend) Name

func (b *KafkaBackend) Name() string

func (*KafkaBackend) Post

func (b *KafkaBackend) Post(buf []byte, query string, auth string) (*ResponseData, error)

type KafkaCluster

type KafkaCluster struct {
	// contains filtered or unexported fields
}

func NewKafkaCluster

func NewKafkaCluster(cluster KafkaClusterConfig) (*KafkaCluster, error)

func (*KafkaCluster) AsyncProduce

func (c *KafkaCluster) AsyncProduce(topic string, value []byte, key []byte, headers []sarama.RecordHeader, callback ProduceCallback)

func (*KafkaCluster) Close

func (c *KafkaCluster) Close() error

func (*KafkaCluster) Name

func (c *KafkaCluster) Name() string

func (*KafkaCluster) Open

func (c *KafkaCluster) Open() error

func (*KafkaCluster) Produce

func (c *KafkaCluster) Produce(context context.Context, topic string, value []byte, key []byte, headers []sarama.RecordHeader) error

type KafkaClusterConfig

type KafkaClusterConfig struct {
	// Name kafka cluster name
	Name string `toml:"name" json:"name"`

	// Zookeeper zookeeper [host:port]
	Zookeeper []string `toml:"zookeeper" json:"zookeeper"`

	// BootstrapServer kafka server [host:port]
	BootstrapServer []string `toml:"bootstrap-server" json:"bootstrap-server"`

	// Version kafka version 1.1.0
	Version string `toml:"version" json:"version"`
}

type KafkaConsumer

type KafkaConsumer struct {
	ConsumeProcessor
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(group string, cp ConsumeProcessor, config KafkaClusterConfig) (*KafkaConsumer, error)

func (*KafkaConsumer) AddTopic

func (c *KafkaConsumer) AddTopic(o KafkaOutputConfig) error

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close()

func (*KafkaConsumer) Name

func (c *KafkaConsumer) Name() string

func (*KafkaConsumer) Open

func (c *KafkaConsumer) Open() error

type KafkaOutputConfig

type KafkaOutputConfig struct {
	// Name name
	Name string `toml:"name" json:"name"`

	// Cluster kafka cluster name
	Cluster string `toml:"cluster" json:"cluster"`

	// Topic name
	Topic string `toml:"topic" json:"topic"`

	// Protocol 默认为空时 influxdb, json, falcon 其他按需增加
	Protocol map[string]interface{} `toml:"protocol" json:"protocol"`
}

type MeasurementKey

type MeasurementKey string

func NewMeasurementKey

func NewMeasurementKey(name string, tvs ...string) MeasurementKey

func (MeasurementKey) Measurement

func (m MeasurementKey) Measurement() string

func (MeasurementKey) Tags

func (m MeasurementKey) Tags() map[string]string

type Poster

type Poster interface {
	Post([]byte, string, string) (*ResponseData, error)
}

type ProduceCallback

type ProduceCallback interface {
	Catch(err *sarama.ProducerError)
	Done(message *sarama.ProducerMessage)
}

type Protocol

type Protocol interface {
	Decode(msg *sarama.ConsumerMessage) (query, auth string, body []byte, err error)
}

func NewFalconProtocol

func NewFalconProtocol(cfgMap map[string]interface{}) (Protocol, error)

func NewInfluxdbProtocol

func NewInfluxdbProtocol(_ map[string]interface{}) (Protocol, error)

type ProtocolFactory

type ProtocolFactory = func(map[string]interface{}) (Protocol, error)

type Relay

type Relay interface {
	Name() string
	Run() error
	Stop() error
}

type RelayConstructor

type RelayConstructor = func(*viper.Viper) (Relay, error)

type ResponseData

type ResponseData struct {
	ContentType     string
	ContentEncoding string
	StatusCode      int
	Body            []byte
}

func (*ResponseData) Write

func (rd *ResponseData) Write(w http.ResponseWriter)

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(config *viper.Viper) (*Service, error)

func (*Service) Run

func (s *Service) Run()

func (*Service) Stop

func (s *Service) Stop()

type ServiceConfig

type ServiceConfig struct {
	PprofPort int `mapstructure:"pprof-port"`
}

type UDPConfig

type UDPConfig struct {
	// Name identifies the UDP relay
	Name string `toml:"name"`

	// Addr is where the UDP relay will listen for packets
	Addr string `toml:"bind-addr"`

	// Precision sets the precision of the timestamps (input and output)
	Precision string `toml:"precision"`

	// ReadBuffer sets the socket buffer for incoming connections
	ReadBuffer int `toml:"read-buffer"`

	// Outputs is a list of backend servers where writes will be forwarded
	Outputs []UDPOutputConfig `toml:"output"`
}

type UDPOutputConfig

type UDPOutputConfig struct {
	// Name identifies the UDP backend
	Name string `toml:"name"`

	// Location should be set to the host:port of the backend server
	Location string `toml:"location"`

	// MTU sets the maximum output payload size, default is 1024
	MTU int `toml:"mtu"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL