Documentation
¶
Index ¶
- Constants
- func GetRedisClient() redis.UniversalClient
- func SetRedisClient(c redis.UniversalClient)
- type AdapterCache
- type AdapterLocker
- type AdapterQueue
- type ConsumerFunc
- type Messager
- type NSQOptions
- type Option
- func WithConsumerFunc(f ConsumerFunc) Option
- func WithGroupID(groupID string) Option
- func WithKafkaConfig(c *sarama.Config) Option
- func WithMessage(message Messager) Option
- func WithPartition(partition int) Option
- func WithStrategy(f sarama.BalanceStrategy) Option
- func WithTopic(topic string) Option
- type Options
- type RedisConnectOptions
- type TLS
Constants ¶
View Source
const (
PrefixKey = "__host"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AdapterCache ¶
type AdapterCache interface { Name() string String() string Initialize(*gorm.DB) error RemoveFromTag(ctx context.Context, tag string) error Get(ctx context.Context, key string) (string, error) Set(ctx context.Context, key string, val interface{}, expire time.Duration) error Del(ctx context.Context, key string) error HashGet(ctx context.Context, hk, key string) (string, error) HashDel(ctx context.Context, hk, key string) error Increase(ctx context.Context, key string) error Decrease(ctx context.Context, key string) error Expire(ctx context.Context, key string, dur time.Duration) error }
type AdapterLocker ¶
type AdapterQueue ¶
type ConsumerFunc ¶
type Messager ¶
type Messager interface { SetID(string) SetStream(string) SetValues(map[string]interface{}) GetID() string GetStream() string GetValues() map[string]interface{} GetPrefix() string SetPrefix(string) SetErrorCount(count int) GetErrorCount() int SetContext(ctx context.Context) GetContext() context.Context }
type NSQOptions ¶
type NSQOptions struct { DialTimeout time.Duration `opt:"dial_timeout" default:"1s"` LookupdAddr string `opt:"-" json:"lookupdAddr" yaml:"lookupdAddr"` AdminAddr string `opt:"-" json:"adminAddr" yaml:"adminAddr"` // Deadlines for network reads and writes ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"` WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"` // Addresses is the local address to use when dialing an nsqd. Addresses []string `opt:"addresses" yaml:"addresses" json:"addresses"` // Duration between polling lookupd for new producers, and fractional jitter to add to // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers // restart at the same time // // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between // reconnection attempts LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"` LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"` // Maximum duration when REQueueing (for doubling of deferred requeue) MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"` DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"` // Maximum amount of time to backoff when processing fails 0 == no backoff MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"` // Unit of time for calculating consumer backoff BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"` // Maximum number of times this consumer will attempt to process a message before giving up MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"` // Duration to wait for a message from an nsqd when in a state where RDY // counts are re-distributed (e.g. max_in_flight < num_producers) LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"` // Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"` // Duration between redistributing max-in-flight to connections RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"` // Identifiers sent to nsqd representing this client // UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>") ClientID string `opt:"client_id"` // (defaults: short hostname) Hostname string `opt:"hostname"` UserAgent string `opt:"user_agent"` // Duration of time between heartbeats. This must be less than ReadTimeout HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"` // Integer percentage to sample the channel (requires nsqd 0.2.25+) SampleRate int32 `opt:"sample_rate" min:"0" max:"99"` Tls *TLS `yaml:"tls" json:"tls"` // Compression Settings Deflate bool `opt:"deflate"` DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"` Snappy bool `opt:"snappy"` // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"` // Timeout used by nsqd before flushing buffered writes (set to 0 to disable). // // WARNING: configuring clients with an extremely low // (< 25ms) output_buffer_timeout has a significant effect // on nsqd CPU usage (particularly with > 50 clients connected). OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"` // Maximum number of messages to allow in flight (concurrency knob) MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"` // The server-side message timeout for messages delivered to this client MsgTimeout time.Duration `opt:"msg_timeout" min:"0"` // secret for nsqd authentication (requires nsqd 0.2.29+) AuthSecret string `opt:"auth_secret"` }
func (NSQOptions) GetNSQOptions ¶
func (e NSQOptions) GetNSQOptions() (*nsq.Config, error)
type Option ¶
type Option func(*Options)
func WithConsumerFunc ¶
func WithConsumerFunc(f ConsumerFunc) Option
func WithGroupID ¶
func WithKafkaConfig ¶
func WithMessage ¶
func WithPartition ¶
func WithStrategy ¶
func WithStrategy(f sarama.BalanceStrategy) Option
type Options ¶
type Options struct { Topic string GroupID string F ConsumerFunc Message Messager Partition int PartitionAssignmentStrategy sarama.BalanceStrategy KafkaConfig *sarama.Config }
func DefaultOptions ¶
func DefaultOptions() *Options
func SetOptions ¶
type RedisConnectOptions ¶
type RedisConnectOptions struct { // Addr In order to be compatible with the previous configuration // Deprecated: Use Addrs instead. Addr string `yaml:"addr" json:"addr"` Addrs []string `yaml:"addrs"` ClientName string `yaml:"clientName"` DB int `yaml:"db"` Username string `yaml:"username"` Password string `yaml:"password"` SentinelUsername string `yaml:"sentinelUsername"` SentinelPassword string `yaml:"sentinelPassword"` MasterName string `yaml:"masterName"` Protocol int `yaml:"protocol"` MaxRetries int `yaml:"maxRetries"` MinRetryBackoff time.Duration `yaml:"minRetryBackoff"` MaxRetryBackoff time.Duration `yaml:"maxRetryBackoff"` DialTimeout time.Duration `yaml:"dialTimeout"` ReadTimeout time.Duration `yaml:"readTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"` ContextTimeout bool `yaml:"contextTimeoutEnabled"` PoolFIFO bool `yaml:"poolFIFO"` PoolSize int `yaml:"poolSize"` PoolTimeout time.Duration `yaml:"poolTimeout"` MinIdleConns int `yaml:"minIdleConns"` MaxIdleConns int `yaml:"maxIdleConns"` MaxActiveConns int `yaml:"maxActiveConns"` ConnMaxIdleTime time.Duration `yaml:"connMaxIdleTime"` ConnMaxLifetime time.Duration `yaml:"connMaxLifetime"` TLS *TLS `yaml:"tls" json:"tls"` MaxRedirects int `yaml:"maxRedirects"` ReadOnly bool `yaml:"readOnly"` RouteByLatency bool `yaml:"routeByLatency"` RouteRandomly bool `yaml:"routeRandomly"` DisableIdentity bool `yaml:"disableIdentity"` IdentitySuffix string `yaml:"identitySuffix"` UnstableResp3 bool `yaml:"unstableResp3"` }
func (*RedisConnectOptions) GetRedisOptions ¶
func (e *RedisConnectOptions) GetRedisOptions() (opt *redis.UniversalOptions, err error)
Click to show internal directories.
Click to hide internal directories.