storage

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2025 License: MIT Imports: 12 Imported by: 2

Documentation

Index

Constants

View Source
const (
	PrefixKey = "__host"
)

Variables

This section is empty.

Functions

func GetRedisClient

func GetRedisClient() redis.UniversalClient

GetRedisClient 获取redis客户端

func SetRedisClient

func SetRedisClient(c redis.UniversalClient)

SetRedisClient 设置redis客户端

Types

type AdapterCache

type AdapterCache interface {
	Name() string
	String() string
	Initialize(*gorm.DB) error
	RemoveFromTag(ctx context.Context, tag string) error
	Get(ctx context.Context, key string) (string, error)
	Set(ctx context.Context, key string, val interface{}, expire time.Duration) error
	Del(ctx context.Context, key string) error
	HashGet(ctx context.Context, hk, key string) (string, error)
	HashDel(ctx context.Context, hk, key string) error
	Increase(ctx context.Context, key string) error
	Decrease(ctx context.Context, key string) error
	Expire(ctx context.Context, key string, dur time.Duration) error
}

type AdapterLocker

type AdapterLocker interface {
	String() string
	Lock(ctx context.Context, key string, ttl time.Duration, options *redislock.Options) (*redislock.Lock, error)
}

type AdapterQueue

type AdapterQueue interface {
	String() string
	Append(opts ...Option) error
	Register(opts ...Option)
	Run(context.Context)
	Shutdown()
}

type ConsumerFunc

type ConsumerFunc func(Messager) error

type Messager

type Messager interface {
	SetID(string)
	SetStream(string)
	SetValues(map[string]interface{})
	GetID() string
	GetStream() string
	GetValues() map[string]interface{}
	GetPrefix() string
	SetPrefix(string)
	SetErrorCount(count int)
	GetErrorCount() int
	SetContext(ctx context.Context)
	GetContext() context.Context
}

type NSQOptions

type NSQOptions struct {
	DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`

	LookupdAddr string `opt:"-" json:"lookupdAddr" yaml:"lookupdAddr"`

	AdminAddr string `opt:"-" json:"adminAddr" yaml:"adminAddr"`

	// Deadlines for network reads and writes
	ReadTimeout  time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
	WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

	// Addresses is the local address to use when dialing an nsqd.
	Addresses []string `opt:"addresses" yaml:"addresses" json:"addresses"`

	// Duration between polling lookupd for new producers, and fractional jitter to add to
	// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
	// restart at the same time
	//
	// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
	// reconnection attempts
	LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
	LookupdPollJitter   float64       `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

	// Maximum duration when REQueueing (for doubling of deferred requeue)
	MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
	DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

	// Maximum amount of time to backoff when processing fails 0 == no backoff
	MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
	// Unit of time for calculating consumer backoff
	BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

	// Maximum number of times this consumer will attempt to process a message before giving up
	MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

	// Duration to wait for a message from an nsqd when in a state where RDY
	// counts are re-distributed (e.g. max_in_flight < num_producers)
	LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
	// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
	LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
	// Duration between redistributing max-in-flight to connections
	RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

	// Identifiers sent to nsqd representing this client
	// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
	ClientID  string `opt:"client_id"` // (defaults: short hostname)
	Hostname  string `opt:"hostname"`
	UserAgent string `opt:"user_agent"`

	// Duration of time between heartbeats. This must be less than ReadTimeout
	HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
	// Integer percentage to sample the channel (requires nsqd 0.2.25+)
	SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

	Tls *TLS `yaml:"tls" json:"tls"`

	// Compression Settings
	Deflate      bool `opt:"deflate"`
	DeflateLevel int  `opt:"deflate_level" min:"1" max:"9" default:"6"`
	Snappy       bool `opt:"snappy"`

	// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
	OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
	// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
	//
	// WARNING: configuring clients with an extremely low
	// (< 25ms) output_buffer_timeout has a significant effect
	// on nsqd CPU usage (particularly with > 50 clients connected).
	OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

	// Maximum number of messages to allow in flight (concurrency knob)
	MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

	// The server-side message timeout for messages delivered to this client
	MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

	// secret for nsqd authentication (requires nsqd 0.2.29+)
	AuthSecret string `opt:"auth_secret"`
}

func (NSQOptions) GetNSQOptions

func (e NSQOptions) GetNSQOptions() (*nsq.Config, error)

type Option

type Option func(*Options)

func WithConsumerFunc

func WithConsumerFunc(f ConsumerFunc) Option

func WithGroupID

func WithGroupID(groupID string) Option

func WithKafkaConfig

func WithKafkaConfig(c *sarama.Config) Option

func WithMessage

func WithMessage(message Messager) Option

func WithPartition

func WithPartition(partition int) Option

func WithStrategy

func WithStrategy(f sarama.BalanceStrategy) Option

func WithTopic

func WithTopic(topic string) Option

type Options

type Options struct {
	Topic                       string
	GroupID                     string
	F                           ConsumerFunc
	Message                     Messager
	Partition                   int
	PartitionAssignmentStrategy sarama.BalanceStrategy
	KafkaConfig                 *sarama.Config
}

func DefaultOptions

func DefaultOptions() *Options

func SetOptions

func SetOptions(opts ...Option) *Options

type RedisConnectOptions

type RedisConnectOptions struct {
	// Addr In order to be compatible with the previous configuration
	// Deprecated: Use Addrs instead.
	Addr             string        `yaml:"addr" json:"addr"`
	Addrs            []string      `yaml:"addrs"`
	ClientName       string        `yaml:"clientName"`
	DB               int           `yaml:"db"`
	Username         string        `yaml:"username"`
	Password         string        `yaml:"password"`
	SentinelUsername string        `yaml:"sentinelUsername"`
	SentinelPassword string        `yaml:"sentinelPassword"`
	MasterName       string        `yaml:"masterName"`
	Protocol         int           `yaml:"protocol"`
	MaxRetries       int           `yaml:"maxRetries"`
	MinRetryBackoff  time.Duration `yaml:"minRetryBackoff"`
	MaxRetryBackoff  time.Duration `yaml:"maxRetryBackoff"`
	DialTimeout      time.Duration `yaml:"dialTimeout"`
	ReadTimeout      time.Duration `yaml:"readTimeout"`
	WriteTimeout     time.Duration `yaml:"writeTimeout"`
	ContextTimeout   bool          `yaml:"contextTimeoutEnabled"`
	PoolFIFO         bool          `yaml:"poolFIFO"`
	PoolSize         int           `yaml:"poolSize"`
	PoolTimeout      time.Duration `yaml:"poolTimeout"`
	MinIdleConns     int           `yaml:"minIdleConns"`
	MaxIdleConns     int           `yaml:"maxIdleConns"`
	MaxActiveConns   int           `yaml:"maxActiveConns"`
	ConnMaxIdleTime  time.Duration `yaml:"connMaxIdleTime"`
	ConnMaxLifetime  time.Duration `yaml:"connMaxLifetime"`
	TLS              *TLS          `yaml:"tls" json:"tls"`
	MaxRedirects     int           `yaml:"maxRedirects"`
	ReadOnly         bool          `yaml:"readOnly"`
	RouteByLatency   bool          `yaml:"routeByLatency"`
	RouteRandomly    bool          `yaml:"routeRandomly"`
	DisableIdentity  bool          `yaml:"disableIdentity"`
	IdentitySuffix   string        `yaml:"identitySuffix"`
	UnstableResp3    bool          `yaml:"unstableResp3"`
}

func (*RedisConnectOptions) GetRedisOptions

func (e *RedisConnectOptions) GetRedisOptions() (opt *redis.UniversalOptions, err error)

type TLS

type TLS struct {
	Cert string `yaml:"cert" json:"cert"`
	Key  string `yaml:"key" json:"key"`
	Ca   string `yaml:"ca" json:"ca"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL