Documentation
¶
Overview ¶
Package nsq is the official Go package for NSQ (http://nsq.io/)
It provides high-level Consumer and Producer types as well as low-level functions to communicate over the NSQ protocol
Index ¶
- Constants
- Variables
- func FindString(src []string, f string) int
- func IsFailedOnNotLeader(err error) bool
- func IsFailedOnNotLeaderBytes(err []byte) bool
- func IsFailedOnNotWritable(err error) bool
- func IsFailedOnNotWritableBytes(err []byte) bool
- func IsTopicNotExist(err error) bool
- func IsTopicNotExistBytes(err []byte) bool
- func IsValidChannelName(name string) bool
- func IsValidTopicName(name string) bool
- func ReadResponse(r io.Reader) ([]byte, error)
- func ReadUnpackedResponse(r io.Reader) (int32, []byte, error)
- func UnpackResponse(response []byte) (int32, []byte, error)
- type AddrPartInfo
- type AuthResponse
- type BackoffStrategy
- type CmdFuncT
- type Command
- func Auth(secret string) (*Command, error)
- func CreateTopic(topic string, partition int) *Command
- func CreateTopicWithExt(topic string, partition int) *Command
- func Finish(id MessageID) *Command
- func Identify(js map[string]interface{}) (*Command, error)
- func MultiPublish(topic string, bodies [][]byte) (*Command, error)
- func MultiPublishTrace(topic string, part string, traceIDList []uint64, bodies [][]byte) (*Command, error)
- func MultiPublishV2(topic string, bodies []*bytes.Buffer) (*Command, error)
- func MultiPublishWithJsonExt(topic string, part string, extList []*MsgExt, bodies [][]byte) (*Command, error)
- func MultiPublishWithPart(topic string, part string, bodies [][]byte) (*Command, error)
- func MultiPublishWithPartV2(topic string, part string, bodies []*bytes.Buffer) (*Command, error)
- func Nop() *Command
- func Ping() *Command
- func Publish(topic string, body []byte) *Command
- func PublishTrace(topic string, part string, traceID uint64, body []byte) (*Command, error)
- func PublishWithJsonExt(topic string, part string, body []byte, jsonExt []byte) (*Command, error)
- func PublishWithPart(topic string, part string, body []byte) *Command
- func Ready(count int) *Command
- func Register(topic string, partition string, channel string) *Command
- func Requeue(id MessageID, delay time.Duration) *Command
- func StartClose() *Command
- func Subscribe(topic string, channel string) *Command
- func SubscribeAdvanced(topic string, channel string, part string, consumeStart ConsumeOffset) *Command
- func SubscribeAndTrace(topic string, channel string) *Command
- func SubscribeOrdered(topic string, channel string, part string) *Command
- func SubscribeWithPart(topic string, channel string, part string) *Command
- func SubscribeWithPartAndTrace(topic string, channel string, part string) *Command
- func Touch(id MessageID) *Command
- func UnRegister(topic string, partition string, channel string) *Command
- type Config
- type ConfigFlag
- type Conn
- func (c *Conn) Close() error
- func (c *Conn) Connect() (*IdentifyResponse, error)
- func (c *Conn) Flush() error
- func (c *Conn) GetConnUID() string
- func (c *Conn) IsClosing() bool
- func (c *Conn) LastMessageTime() time.Time
- func (c *Conn) LastRDY() int64
- func (c *Conn) MaxRDY() int64
- func (c *Conn) RDY() int64
- func (c *Conn) Read(p []byte) (int, error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) SetLogger(l logger, lvl LogLevel, format string)
- func (c *Conn) SetRDY(rdy int64)
- func (c *Conn) String() string
- func (c *Conn) Write(p []byte) (int, error)
- func (c *Conn) WriteCommand(cmd *Command) error
- type ConnDelegate
- type ConsumeOffset
- type Consumer
- func (r *Consumer) AddConcurrentHandlerFuncs(handler HandlerFunc, failHandler FailHandlerFunc, concurrency int)
- func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int)
- func (r *Consumer) AddEtcdServiceAddr(address []string, cluster string, key string)
- func (r *Consumer) AddHandler(handler Handler)
- func (r *Consumer) ChangeMaxInFlight(maxInFlight int)
- func (r *Consumer) ConnectToNSQD(addr string, part int) error
- func (r *Consumer) ConnectToNSQDs(addresses []AddrPartInfo) error
- func (r *Consumer) ConnectToNSQLookupd(addr string) error
- func (r *Consumer) ConnectToNSQLookupds(addresses []string) error
- func (r *Consumer) ConnectToSeeds() error
- func (r *Consumer) DisconnectFromNSQD(addr string, part string) error
- func (r *Consumer) DisconnectFromNSQLookupd(addr string) error
- func (r *Consumer) IsConsumeExt() bool
- func (r *Consumer) IsStarved() bool
- func (r *Consumer) SetBehaviorDelegate(cb interface{})
- func (r *Consumer) SetConsumeExt(topicExt bool) bool
- func (r *Consumer) SetConsumeOffset(partition int, offset ConsumeOffset) error
- func (r *Consumer) SetLogger(l logger, lvl LogLevel)
- func (r *Consumer) Stats() *ConsumerStats
- func (r *Consumer) Stop()
- type ConsumerStats
- type DiscoveryFilter
- type ErrIdentify
- type ErrProtocol
- type ExponentialStrategy
- type FailHandlerFunc
- type FailedMessageLogger
- type FullJitterStrategy
- type FullMessageID
- type Handler
- type HandlerFunc
- type IdentifyResponse
- type LogLevel
- type Message
- func (m *Message) DisableAutoResponse()
- func (m *Message) Finish()
- func (m *Message) GetFullMsgID() FullMessageID
- func (m *Message) GetJsonExt() (*MsgExt, error)
- func (m *Message) GetTraceID() uint64
- func (m *Message) HasResponded() bool
- func (m *Message) IsAutoResponseDisabled() bool
- func (m *Message) Requeue(delay time.Duration)
- func (m *Message) RequeueWithoutBackoff(delay time.Duration)
- func (m *Message) Touch()
- func (m *Message) WriteTo(w io.Writer) (int64, error)
- type MessageDelegate
- type MessageID
- type MsgExt
- type NewMessageID
- type NsqLookupdNodeInfo
- type Producer
- func (w *Producer) MultiPublish(topic string, body [][]byte) error
- func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, ...) error
- func (w *Producer) Ping() error
- func (w *Producer) Publish(topic string, body []byte) error
- func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, ...) error
- func (w *Producer) PublishWithPartitionIdAsync(topic string, partition string, body []byte, ext *MsgExt, ...) error
- func (w *Producer) SetLogger(l logger, lvl LogLevel)
- func (w *Producer) Stop()
- func (w *Producer) String() string
- type ProducerTransaction
- type PubStrategyType
- type RemoveProducerInfo
- type TopicPartProducerInfo
- type TopicProducerMgr
- func (self *TopicProducerMgr) AddLookupdNodes(addresses []string)
- func (self *TopicProducerMgr) ConnectToNSQLookupd(addr string) error
- func (self *TopicProducerMgr) ConnectToSeeds() error
- func (self *TopicProducerMgr) MultiPublish(topic string, body [][]byte) error
- func (self *TopicProducerMgr) MultiPublishAndTrace(topic string, traceIDList []uint64, body [][]byte) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, ...) error
- func (self *TopicProducerMgr) MultiPublishV2(topic string, body []*bytes.Buffer) error
- func (self *TopicProducerMgr) MultiPublishWithJsonExt(topic string, body [][]byte, extList []*MsgExt) error
- func (self *TopicProducerMgr) Publish(topic string, body []byte) error
- func (self *TopicProducerMgr) PublishAndRetryBackground(topic string, body []byte) error
- func (self *TopicProducerMgr) PublishAndTrace(topic string, traceID uint64, body []byte) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) PublishAndTraceWithPartitionId(topic string, partition int, traceID uint64, body []byte) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, ...) error
- func (self *TopicProducerMgr) PublishAsyncWithJsonExt(topic string, body []byte, ext *MsgExt, doneChan chan *ProducerTransaction, ...) error
- func (self *TopicProducerMgr) PublishAsyncWithPart(topic string, part int, body []byte, ext *MsgExt, ...) error
- func (self *TopicProducerMgr) PublishOrdered(topic string, partitionKey []byte, body []byte) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) PublishOrderedWithJsonExt(topic string, partitionKey []byte, body []byte, ext *MsgExt) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) PublishWithExtAndRetryBackground(topic string, body []byte, ext *MsgExt) error
- func (self *TopicProducerMgr) PublishWithJsonExt(topic string, body []byte, ext *MsgExt) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) PublishWithJsonExtAndPartitionId(topic string, partition int, body []byte, ext *MsgExt) (NewMessageID, uint64, uint32, error)
- func (self *TopicProducerMgr) PublishWithPartitionId(topic string, partition int, body []byte) error
- func (self *TopicProducerMgr) SetEtcdConf(servers []string, cluster string, lookupPath string)
- func (self *TopicProducerMgr) SetLogger(l logger, lvl LogLevel)
- func (self *TopicProducerMgr) SetLoggerLevel(lvl LogLevel)
- func (self *TopicProducerMgr) Stop()
- func (self *TopicProducerMgr) TriggerCheckForError(err error, delay time.Duration)
Examples ¶
Constants ¶
const ( MAX_PARTITION_NUM = 1024 MIN_RETRY_SLEEP = time.Millisecond * 8 )
const ( FrameTypeResponse int32 = 0 FrameTypeError int32 = 1 FrameTypeMessage int32 = 2 )
frame types
const ( StateInit = iota StateDisconnected StateConnected StateSubscribed // StateClosing means CLOSE has started... // (responses are ok, but no new messages will be sent) StateClosing )
states
const MsgIDLength = 16
The number of bytes for a Message.ID
const (
// for old nsqd it may return the special pid for compatible
OLD_VERSION_PID = -11
)
const VERSION = "1.3.1 HA"
VERSION
Variables ¶
var ( FailedOnNotLeader = "E_FAILED_ON_NOT_LEADER" FailedOnNotLeaderBytes = []byte("E_FAILED_ON_NOT_LEADER") E_TOPIC_NOT_EXIST = "E_TOPIC_NOT_EXIST" E_TOPIC_NOT_EXIST_BYTES = []byte("E_TOPIC_NOT_EXIST") FailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE" FailedOnNotWritableBytes = []byte("E_FAILED_ON_NOT_WRITABLE") )
var ( ErrTopicNotSet = errors.New("topic is not set as producer") ErrNoProducer = errors.New("topic producer not found") ErrRetryBackground = errors.New("retrying in background") )
var ErrAlreadyConnected = errors.New("already connected")
ErrAlreadyConnected is returned from ConnectToNSQD when already connected
var ErrClosing = errors.New("closing")
ErrClosing is returned when a connection is closing
var ErrNotConnected = errors.New("not connected")
ErrNotConnected is returned when a publish command is made against a Producer that is not connected
var ErrOverMaxInFlight = errors.New("over configure max-inflight")
ErrOverMaxInFlight is returned from Consumer if over max-in-flight
var ErrStopped = errors.New("stopped")
ErrStopped is returned when a publish command is made against a Producer that has been stopped
var JSONHeaderExtVer = uint8(4)
version for message has json header ext
var MagicV1 = []byte(" V1")
MagicV1 is the initial identifier sent when connecting for V1 clients
var MagicV2 = []byte(" V2")
MagicV2 is the initial identifier sent when connecting for V2 clients
var NoExtVer = uint8(0)
ext versions version for message has no ext
var OffsetSpecialType = "special"
var OffsetTimestampType = "timestamp"
var offsetCountType = "count"
var OffsetVirtualQueueType = "virtual_queue"
Functions ¶
func FindString ¶ added in v1.3.1
func IsFailedOnNotLeader ¶ added in v1.3.1
func IsFailedOnNotLeaderBytes ¶ added in v1.3.1
func IsFailedOnNotWritable ¶ added in v1.3.1
func IsFailedOnNotWritableBytes ¶ added in v1.3.1
func IsTopicNotExist ¶ added in v1.3.1
func IsTopicNotExistBytes ¶ added in v1.3.1
func IsValidChannelName ¶
IsValidChannelName checks a channel name for correctness
func IsValidTopicName ¶
IsValidTopicName checks a topic name for correctness
func ReadResponse ¶
ReadResponse is a client-side utility function to read from the supplied Reader according to the NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... size data
func ReadUnpackedResponse ¶
ReadUnpackedResponse reads and parses data from the underlying TCP connection according to the NSQ TCP protocol spec and returns the frameType, data or error
func UnpackResponse ¶
UnpackResponse is a client-side utility function that unpacks serialized data according to NSQ protocol spec:
[x][x][x][x][x][x][x][x]... | (int32) || (binary) | 4-byte || N-byte ------------------------... frame ID data
Returns a triplicate of: frame type, data ([]byte), error
Types ¶
type AddrPartInfo ¶ added in v1.3.1
type AddrPartInfo struct {
// contains filtered or unexported fields
}
type AuthResponse ¶
type AuthResponse struct { Identity string `json:"identity"` IdentityUrl string `json:"identity_url"` PermissionCount int64 `json:"permission_count"` }
AuthResponse represents the metadata returned from an AUTH command to nsqd
type BackoffStrategy ¶ added in v1.0.4
BackoffStrategy defines a strategy for calculating the duration of time a consumer should backoff for a given attempt
type Command ¶
Command represents a command from a client to an NSQ daemon
func Auth ¶
Auth sends credentials for authentication
After `Identify`, this is usually the first message sent, if auth is used.
func CreateTopic ¶ added in v1.3.1
Publish creates a new Command to write a message to a given topic
func CreateTopicWithExt ¶ added in v1.3.1
func Finish ¶
Finish creates a new Command to indiciate that a given message (by id) has been processed successfully
func Identify ¶
Identify creates a new Command to provide information about the client. After connecting, it is generally the first message sent.
The supplied map is marshaled into JSON to provide some flexibility for this command to evolve over time.
See http://nsq.io/clients/tcp_protocol_spec.html#identify for information on the supported options
func MultiPublish ¶
MultiPublish creates a new Command to write more than one message to a given topic (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
func MultiPublishTrace ¶ added in v1.3.1
func MultiPublishTrace(topic string, part string, traceIDList []uint64, bodies [][]byte) (*Command, error)
MultiPublish creates a new Command to write more than one message to a given topic (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
func MultiPublishV2 ¶ added in v1.3.1
func MultiPublishWithJsonExt ¶ added in v1.3.1
func MultiPublishWithPart ¶ added in v1.3.1
MultiPublish creates a new Command to write more than one message to a given topic (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
func MultiPublishWithPartV2 ¶ added in v1.3.1
func Nop ¶
func Nop() *Command
Nop creates a new Command that has no effect server side. Commonly used to respond to heartbeats
func Ping ¶
func Ping() *Command
Ping creates a new Command to keep-alive the state of all the announced topic/channels for a given client
func PublishTrace ¶ added in v1.3.1
func PublishWithJsonExt ¶ added in v1.3.1
func PublishWithPart ¶ added in v1.3.1
Publish creates a new Command to write a message to a given topic
func Ready ¶
Ready creates a new Command to specify the number of messages a client is willing to receive
func Requeue ¶
Requeue creates a new Command to indicate that a given message (by id) should be requeued after the given delay NOTE: a delay of 0 indicates immediate requeue
func StartClose ¶
func StartClose() *Command
StartClose creates a new Command to indicate that the client would like to start a close cycle. nsqd will no longer send messages to a client in this state and the client is expected finish pending messages and close the connection
func SubscribeAdvanced ¶ added in v1.3.1
func SubscribeAdvanced(topic string, channel string, part string, consumeStart ConsumeOffset) *Command
func SubscribeAndTrace ¶ added in v1.3.1
func SubscribeOrdered ¶ added in v1.3.1
func SubscribeWithPart ¶ added in v1.3.1
func SubscribeWithPartAndTrace ¶ added in v1.3.1
func UnRegister ¶
UnRegister creates a new Command to remove a topic/channel for the connected nsqd
type Config ¶
type Config struct { DialTimeout time.Duration `opt:"dial_timeout" default:"5s"` // Deadlines for network reads and writes ReadTimeout time.Duration `opt:"read_timeout" max:"5m" default:"60s"` WriteTimeout time.Duration `opt:"write_timeout" max:"5m" default:"1s"` PubTimeout time.Duration `opt:"pub_timeout" max:"1m" default:"10s"` // LocalAddr is the local address to use when dialing an nsqd. // If empty, a local address is automatically chosen. LocalAddr net.Addr `opt:"local_addr"` // Duration between polling lookupd for new producers, and fractional jitter to add to // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers // restart at the same time // // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between // reconnection attempts LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"1s" max:"5m" default:"60s"` LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"` // Maximum duration when REQueueing (for doubling of deferred requeue) MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"600m" default:"15m"` DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"` // Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms. BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"` // Maximum amount of time to backoff when processing fails 0 == no backoff MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"` // Unit of time for calculating consumer backoff BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"` // Maximum number of times this consumer will attempt to process a message before giving up MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"15"` // Duration to wait for a message from a producer when in a state where RDY // counts are re-distributed (ie. max_in_flight < num_producers) LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"` // Duration between redistributing max-in-flight to connections RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"` // Identifiers sent to nsqd representing this client // UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>") ClientID string `opt:"client_id"` // (defaults: short hostname) Hostname string `opt:"hostname"` UserAgent string `opt:"user_agent"` // Duration of time between heartbeats. This must be less than ReadTimeout HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"` // Integer percentage to sample the channel (requires nsqd 0.2.25+) SampleRate int32 `opt:"sample_rate" min:"0" max:"99"` // To set TLS config, use the following options: // // tls_v1 - Bool enable TLS negotiation // tls_root_ca_file - String path to file containing root CA // tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates // tls_cert - String path to file containing public key for certificate // tls_key - String path to file containing private key for certificate // tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2') // TlsV1 bool `opt:"tls_v1"` TlsConfig *tls.Config `opt:"tls_config"` // Compression Settings Deflate bool `opt:"deflate"` DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"` Snappy bool `opt:"snappy"` // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"` // Timeout used by nsqd before flushing buffered writes (set to 0 to disable). // // WARNING: configuring clients with an extremely low // (< 25ms) output_buffer_timeout has a significant effect // on nsqd CPU usage (particularly with > 50 clients connected). OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"` // Maximum number of messages to allow in flight (concurrency knob) MaxInFlight int `opt:"max_in_flight" min:"0" default:"20"` // The server-side message timeout for messages delivered to this client MsgTimeout time.Duration `opt:"msg_timeout" min:"0"` // secret for nsqd authentication (requires nsqd 0.2.29+) AuthSecret string `opt:"auth_secret"` EnableTrace bool `opt:"enable_trace"` EnableOrdered bool `opt:"enable_ordered"` Hasher hash.Hash32 PubStrategy PubStrategyType // pub will retry max retry times and each retry will wait pub timeout PubMaxRetry int `opt:"pub_max_retry" min:"1" max:"15" default:"3"` PubMaxBackgroundRetry int `opt:"pub_max_background_retry" min:"1" max:"100" default:"15"` PubBackgroundBuffer int `opt:"pub_background_buffer" min:"10" max:"10000" default:"1000"` ProducerPoolSize int `opt:"producer_pool_size" min:"1" max:"100" default:"2"` EnableMultiplexing bool DesiredTag string `opt:"desired_tag"` // seeds lookupd should be domain format (nsq.xxx.xxx:4160) // and will not be removed if conection refused // So it will always retry if not connected LookupdSeeds []string `opt:"lookupd_seeds"` // contains filtered or unexported fields }
Config is a struct of NSQ options
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).
Use Set(option string, value interface{}) as an alternate way to set parameters
func NewConfig ¶
func NewConfig() *Config
NewConfig returns a new default nsq configuration.
This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func (*Config) Set ¶
Set takes an option as a string and a value as an interface and attempts to set the appropriate configuration option.
It attempts to coerce the value into the right format depending on the named option and the underlying type of the value passed in.
Calls to Set() that take a time.Duration as an argument can be input as:
"1000ms" (a string parsed by time.ParseDuration()) 1000 (an integer interpreted as milliseconds) 1000*time.Millisecond (a literal time.Duration value)
Calls to Set() that take bool can be input as:
"true" (a string parsed by strconv.ParseBool()) true (a boolean) 1 (an int where 1 == true and 0 == false)
It returns an error for an invalid option or value.
type ConfigFlag ¶ added in v1.0.5
type ConfigFlag struct {
*Config
}
ConfigFlag wraps a Config and implements the flag.Value interface
Example ¶
cfg := nsq.NewConfig() flagSet := flag.NewFlagSet("", flag.ExitOnError) flagSet.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to pass through to nsq.Consumer (may be given multiple times)") flagSet.PrintDefaults() err := flagSet.Parse([]string{ "--consumer-opt=heartbeat_interval,1s", "--consumer-opt=max_attempts,10", }) if err != nil { panic(err.Error()) } println("HeartbeatInterval", cfg.HeartbeatInterval) println("MaxAttempts", cfg.MaxAttempts)
Output:
func (*ConfigFlag) Set ¶ added in v1.0.5
func (c *ConfigFlag) Set(opt string) (err error)
Set takes a comma separated value and follows the rules in Config.Set using the first field as the option key, and the second (if present) as the value
func (*ConfigFlag) String ¶ added in v1.0.5
func (c *ConfigFlag) String() string
String implements the flag.Value interface
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a connection to nsqd
Conn exposes a set of callbacks for the various events that occur on a connection
func NewConn ¶
func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn
NewConn returns a new Conn instance
func (*Conn) Connect ¶
func (c *Conn) Connect() (*IdentifyResponse, error)
Connect dials and bootstraps the nsqd connection (including IDENTIFY) and returns the IdentifyResponse
func (*Conn) GetConnUID ¶ added in v1.3.1
func (*Conn) IsClosing ¶
IsClosing indicates whether or not the connection is currently in the processing of gracefully closing
func (*Conn) LastMessageTime ¶
LastMessageTime returns a time.Time representing the time at which the last message was received
func (*Conn) MaxRDY ¶
MaxRDY returns the nsqd negotiated maximum RDY count that it will accept for this connection
func (*Conn) RemoteAddr ¶
RemoteAddr returns the configured destination nsqd address
func (*Conn) SetLogger ¶
SetLogger assigns the logger to use as well as a level.
The format parameter is expected to be a printf compatible string with a single %s argument. This is useful if you want to provide additional context to the log messages that the connection will print, the default is '(%s)'.
The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
Output(calldepth int, s string)
func (*Conn) WriteCommand ¶
WriteCommand is a goroutine safe method to write a Command to this connection, and flush.
type ConnDelegate ¶
type ConnDelegate interface { // OnResponse is called when the connection // receives a FrameTypeResponse from nsqd OnResponse(*Conn, []byte) // OnError is called when the connection // receives a FrameTypeError from nsqd OnError(*Conn, []byte) // OnMessage is called when the connection // receives a FrameTypeMessage from nsqd OnMessage(*Conn, *Message) // OnMessageFinished is called when the connection // handles a FIN command from a message handler OnMessageFinished(*Conn, *Message) // OnMessageRequeued is called when the connection // handles a REQ command from a message handler OnMessageRequeued(*Conn, *Message) // OnBackoff is called when the connection triggers a backoff state OnBackoff(*Conn) // OnContinue is called when the connection finishes a message without adjusting backoff state OnContinue(*Conn) // OnResume is called when the connection triggers a resume state OnResume(*Conn) // OnIOError is called when the connection experiences // a low-level TCP transport error OnIOError(*Conn, error) // OnHeartbeat is called when the connection // receives a heartbeat from nsqd OnHeartbeat(*Conn) // OnClose is called when the connection // closes, after all cleanup OnClose(*Conn) }
ConnDelegate is an interface of methods that are used as callbacks in Conn
type ConsumeOffset ¶ added in v1.3.1
func (*ConsumeOffset) SetTime ¶ added in v1.3.1
func (self *ConsumeOffset) SetTime(sec int64)
sub from the second since epoch time
func (*ConsumeOffset) SetToEnd ¶ added in v1.3.1
func (self *ConsumeOffset) SetToEnd()
func (*ConsumeOffset) SetVirtualQueueOffset ¶ added in v1.3.1
func (self *ConsumeOffset) SetVirtualQueueOffset(offset int64)
func (*ConsumeOffset) ToString ¶ added in v1.3.1
func (self *ConsumeOffset) ToString() string
type Consumer ¶
type Consumer struct { // read from this channel to block until consumer is cleanly stopped StopChan chan int // contains filtered or unexported fields }
Consumer is a high-level type to consume from NSQ.
A Consumer instance is supplied a Handler that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel. See: Handler/HandlerFunc for details on implementing the interface to create handlers.
If configured, it will poll nsqlookupd instances and handle connection (and reconnection) to any discovered nsqds.
func NewConsumer ¶
NewConsumer creates a new instance of Consumer for the specified topic/channel
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewConsumer the values are no longer mutable (they are copied).
func NewPartitionConsumer ¶ added in v1.3.1
func (*Consumer) AddConcurrentHandlerFuncs ¶ added in v1.3.1
func (r *Consumer) AddConcurrentHandlerFuncs(handler HandlerFunc, failHandler FailHandlerFunc, concurrency int)
func (*Consumer) AddConcurrentHandlers ¶
AddConcurrentHandlers sets the Handler for messages received by this Consumer. It takes a second argument which indicates the number of goroutines to spawn for message handling.
This panics if called after connecting to NSQD or NSQ Lookupd ¶
(see Handler or HandlerFunc for details on implementing this interface)
func (*Consumer) AddEtcdServiceAddr ¶ added in v1.3.1
func (*Consumer) AddHandler ¶
AddHandler sets the Handler for messages received by this Consumer. This can be called multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
This panics if called after connecting to NSQD or NSQ Lookupd ¶
(see Handler or HandlerFunc for details on implementing this interface)
func (*Consumer) ChangeMaxInFlight ¶
ChangeMaxInFlight sets a new maximum number of messages this comsumer instance will allow in-flight, and updates all existing connections as appropriate.
For example, ChangeMaxInFlight(0) would pause message flow ¶
If already connected, it updates the reader RDY state for each connection.
func (*Consumer) ConnectToNSQD ¶
ConnectToNSQD takes a nsqd address to connect directly to.
It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to a single, local, instance.
func (*Consumer) ConnectToNSQDs ¶
func (r *Consumer) ConnectToNSQDs(addresses []AddrPartInfo) error
ConnectToNSQDs takes multiple nsqd addresses to connect directly to.
It is recommended to use ConnectToNSQLookupd so that topics are discovered automatically. This method is useful when you want to connect to local instance.
func (*Consumer) ConnectToNSQLookupd ¶
ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
If it is the first to be added, it initiates an HTTP request to discover nsqd producers for the configured topic.
A goroutine is spawned to handle continual polling.
func (*Consumer) ConnectToNSQLookupds ¶
ConnectToNSQLookupds adds multiple nsqlookupd address to the list for this Consumer instance.
If adding the first address it initiates an HTTP request to discover nsqd producers for the configured topic.
A goroutine is spawned to handle continual polling.
func (*Consumer) ConnectToSeeds ¶ added in v1.3.1
func (*Consumer) DisconnectFromNSQD ¶
DisconnectFromNSQD closes the connection to and removes the specified `nsqd` address from the list
func (*Consumer) DisconnectFromNSQLookupd ¶
DisconnectFromNSQLookupd removes the specified `nsqlookupd` address from the list used for periodic discovery.
func (*Consumer) IsConsumeExt ¶ added in v1.3.1
func (*Consumer) IsStarved ¶
IsStarved indicates whether any connections for this consumer are blocked on processing before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (*Consumer) SetBehaviorDelegate ¶
func (r *Consumer) SetBehaviorDelegate(cb interface{})
SetBehaviorDelegate takes a type implementing one or more of the following interfaces that modify the behavior of the `Consumer`:
DiscoveryFilter
func (*Consumer) SetConsumeExt ¶ added in v1.3.1
func (*Consumer) SetConsumeOffset ¶ added in v1.3.1
func (r *Consumer) SetConsumeOffset(partition int, offset ConsumeOffset) error
func (*Consumer) SetLogger ¶
SetLogger assigns the logger to use as well as a level
The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
Output(calldepth int, s string)
func (*Consumer) Stats ¶ added in v1.0.2
func (r *Consumer) Stats() *ConsumerStats
Stats retrieves the current connection and message statistics for a Consumer
type ConsumerStats ¶ added in v1.0.2
type ConsumerStats struct { MessagesReceived uint64 MessagesFinished uint64 MessagesRequeued uint64 Connections int }
ConsumerStats represents a snapshot of the state of a Consumer's connections and the messages it has seen
type DiscoveryFilter ¶
DiscoveryFilter is an interface accepted by `SetBehaviorDelegate()` for filtering the nsqds returned from discovery via nsqlookupd
type ErrIdentify ¶
type ErrIdentify struct {
Reason string
}
ErrIdentify is returned from Conn as part of the IDENTIFY handshake
type ErrProtocol ¶
type ErrProtocol struct {
Reason string
}
ErrProtocol is returned from Producer when encountering an NSQ protocol level error
type ExponentialStrategy ¶ added in v1.0.4
type ExponentialStrategy struct {
// contains filtered or unexported fields
}
ExponentialStrategy implements an exponential backoff strategy (default)
type FailHandlerFunc ¶ added in v1.3.1
type FailHandlerFunc func(message *Message)
type FailedMessageLogger ¶
type FailedMessageLogger interface {
LogFailedMessage(message *Message)
}
FailedMessageLogger is an interface that can be implemented by handlers that wish to receive a callback when a message is deemed "failed" (i.e. the number of attempts exceeded the Consumer specified MaxAttemptCount)
type FullJitterStrategy ¶ added in v1.0.4
type FullJitterStrategy struct {
// contains filtered or unexported fields
}
FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html
type FullMessageID ¶ added in v1.3.1
type FullMessageID [MsgIDLength]byte
type Handler ¶
Handler is the message processing interface for Consumer
Implement this interface for handlers that return whether or not message processing completed successfully.
When the return value is nil Consumer will automatically handle FINishing.
When the returned value is non-nil Consumer will automatically handle REQueing.
type HandlerFunc ¶
HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:
consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error { // handle the message }))
func (HandlerFunc) HandleMessage ¶
func (h HandlerFunc) HandleMessage(m *Message) error
HandleMessage implements the Handler interface
type IdentifyResponse ¶
type IdentifyResponse struct { MaxRdyCount int64 `json:"max_rdy_count"` TLSv1 bool `json:"tls_v1"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` AuthRequired bool `json:"auth_required"` }
IdentifyResponse represents the metadata returned from an IDENTIFY command to nsqd
type Message ¶
type Message struct { ID MessageID Body []byte Timestamp int64 Attempts uint16 NSQDAddress string Partition string Delegate MessageDelegate Offset uint64 RawSize uint32 ExtVer uint8 ExtBytes []byte // contains filtered or unexported fields }
Message is the fundamental data type containing the id, body, and metadata
func DecodeMessage ¶
DecodeMessage deseralizes data (as []byte) and creates a new Message
func DecodeMessageWithExt ¶ added in v1.3.1
DecodeMessage deseralizes data (as []byte) and creates a new Message
func NewMessage ¶
NewMessage creates a Message, initializes some metadata, and returns a pointer
func (*Message) DisableAutoResponse ¶
func (m *Message) DisableAutoResponse()
DisableAutoResponse disables the automatic response that would normally be sent when a handler.HandleMessage returns (FIN/REQ based on the error value returned).
This is useful if you want to batch, buffer, or asynchronously respond to messages.
func (*Message) Finish ¶
func (m *Message) Finish()
Finish sends a FIN command to the nsqd which sent this message
func (*Message) GetFullMsgID ¶ added in v1.3.1
func (m *Message) GetFullMsgID() FullMessageID
func (*Message) GetJsonExt ¶ added in v1.3.1
func (*Message) GetTraceID ¶ added in v1.3.1
func (*Message) HasResponded ¶
HasResponded indicates whether or not this message has been responded to
func (*Message) IsAutoResponseDisabled ¶
IsAutoResponseDisabled indicates whether or not this message will be responded to automatically
func (*Message) Requeue ¶
Requeue sends a REQ command to the nsqd which sent this message, using the supplied delay.
A delay of -1 will automatically calculate based on the number of attempts and the configured default_requeue_delay
func (*Message) RequeueWithoutBackoff ¶
RequeueWithoutBackoff sends a REQ command to the nsqd which sent this message, using the supplied delay.
Notably, using this method to respond does not trigger a backoff event on the configured Delegate.
type MessageDelegate ¶
type MessageDelegate interface { // OnFinish is called when the Finish() method // is triggered on the Message OnFinish(*Message) // OnRequeue is called when the Requeue() method // is triggered on the Message OnRequeue(m *Message, delay time.Duration, backoff bool) // OnTouch is called when the Touch() method // is triggered on the Message OnTouch(*Message) }
MessageDelegate is an interface of methods that are used as callbacks in Message
type MessageID ¶
type MessageID [MsgIDLength]byte
MessageID is the binary bytes message ID
func GetCompatibleMsgIDFromNew ¶ added in v1.3.1
func GetCompatibleMsgIDFromNew(id NewMessageID, traceID uint64) MessageID
type NewMessageID ¶ added in v1.3.1
type NewMessageID uint64
func GetNewMessageID ¶ added in v1.3.1
func GetNewMessageID(old []byte) NewMessageID
type NsqLookupdNodeInfo ¶ added in v1.3.1
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a high-level type to publish to NSQ.
A Producer instance is 1:1 with a destination `nsqd` and will lazily connect to that instance (and re-connect) when Publish commands are executed.
func NewProducer ¶
NewProducer returns an instance of Producer for the specified address
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into NewProducer the values are no longer mutable (they are copied).
func (*Producer) MultiPublish ¶
MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning an error if publish failed
func (*Producer) MultiPublishAsync ¶
func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, args ...interface{}) error
MultiPublishAsync publishes a slice of message bodies to the specified topic but does not wait for the response from `nsqd`.
When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present
func (*Producer) Ping ¶ added in v1.0.4
Ping causes the Producer to connect to it's configured nsqd (if not already connected) and send a `Nop` command, returning any error that might occur.
This method can be used to verify that a newly-created Producer instance is configured correctly, rather than relying on the lazy "connect on Publish" behavior of a Producer.
func (*Producer) Publish ¶
Publish synchronously publishes a message body to the specified topic, returning an error if publish failed
func (*Producer) PublishAsync ¶
func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, args ...interface{}) error
PublishAsync publishes a message body to the specified topic but does not wait for the response from `nsqd`.
When the Producer eventually receives the response from `nsqd`, the supplied `doneChan` (if specified) will receive a `ProducerTransaction` instance with the supplied variadic arguments and the response error if present
func (*Producer) PublishWithPartitionIdAsync ¶ added in v1.3.1
func (*Producer) SetLogger ¶
SetLogger assigns the logger to use as well as a level
The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
Output(calldepth int, s string)
type ProducerTransaction ¶
type ProducerTransaction struct { Error error // the error (or nil) of the publish command Args []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync ResponseData []byte // contains filtered or unexported fields }
ProducerTransaction is returned by the async publish methods to retrieve metadata about the command after the response is received.
type PubStrategyType ¶ added in v1.3.1
type PubStrategyType int
the strategy how the message publish on different partitions
const ( PubRR PubStrategyType = iota // not yet supported PubDynamicLoad )
type RemoveProducerInfo ¶ added in v1.3.1
type RemoveProducerInfo struct {
// contains filtered or unexported fields
}
type TopicPartProducerInfo ¶ added in v1.3.1
type TopicPartProducerInfo struct {
// contains filtered or unexported fields
}
func NewTopicPartProducerInfo ¶ added in v1.3.1
func NewTopicPartProducerInfo(meta metaInfo, isMetaValid bool) *TopicPartProducerInfo
type TopicProducerMgr ¶ added in v1.3.1
type TopicProducerMgr struct {
// contains filtered or unexported fields
}
func NewTopicProducerMgr ¶ added in v1.3.1
func NewTopicProducerMgr(topics []string, conf *Config) (*TopicProducerMgr, error)
use part=-1 to handle all partitions of topic
func (*TopicProducerMgr) AddLookupdNodes ¶ added in v1.3.1
func (self *TopicProducerMgr) AddLookupdNodes(addresses []string)
func (*TopicProducerMgr) ConnectToNSQLookupd ¶ added in v1.3.1
func (self *TopicProducerMgr) ConnectToNSQLookupd(addr string) error
func (*TopicProducerMgr) ConnectToSeeds ¶ added in v1.3.1
func (self *TopicProducerMgr) ConnectToSeeds() error
func (*TopicProducerMgr) MultiPublish ¶ added in v1.3.1
func (self *TopicProducerMgr) MultiPublish(topic string, body [][]byte) error
func (*TopicProducerMgr) MultiPublishAndTrace ¶ added in v1.3.1
func (self *TopicProducerMgr) MultiPublishAndTrace(topic string, traceIDList []uint64, body [][]byte) (NewMessageID, uint64, uint32, error)
func (*TopicProducerMgr) MultiPublishAsync ¶ added in v1.3.1
func (self *TopicProducerMgr) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction, args ...interface{}) error
func (*TopicProducerMgr) MultiPublishV2 ¶ added in v1.3.1
func (self *TopicProducerMgr) MultiPublishV2(topic string, body []*bytes.Buffer) error
func (*TopicProducerMgr) MultiPublishWithJsonExt ¶ added in v1.3.1
func (self *TopicProducerMgr) MultiPublishWithJsonExt(topic string, body [][]byte, extList []*MsgExt) error
func (*TopicProducerMgr) Publish ¶ added in v1.3.1
func (self *TopicProducerMgr) Publish(topic string, body []byte) error
func (*TopicProducerMgr) PublishAndRetryBackground ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishAndRetryBackground(topic string, body []byte) error
note: background retry is not reliable. return nil error, means it is published to server return ErrRetryBackground means it failed for first attemp and will retry publish background, and may fail or success. return other error, means failed due to other reasons and no retry.
func (*TopicProducerMgr) PublishAndTrace ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishAndTrace(topic string, traceID uint64, body []byte) (NewMessageID, uint64, uint32, error)
pub with trace will return the message id and the message offset and size in the queue
func (*TopicProducerMgr) PublishAndTraceWithPartitionId ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishAndTraceWithPartitionId(topic string, partition int, traceID uint64, body []byte) (NewMessageID, uint64, uint32, error)
pub with trace will return the message id and the message offset and size in the queue
func (*TopicProducerMgr) PublishAsync ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction, args ...interface{}) error
func (*TopicProducerMgr) PublishAsyncWithJsonExt ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishAsyncWithJsonExt(topic string, body []byte, ext *MsgExt, doneChan chan *ProducerTransaction, args ...interface{}) error
func (*TopicProducerMgr) PublishAsyncWithPart ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishAsyncWithPart(topic string, part int, body []byte, ext *MsgExt, doneChan chan *ProducerTransaction, args ...interface{}) error
func (*TopicProducerMgr) PublishOrdered ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishOrdered(topic string, partitionKey []byte, body []byte) (NewMessageID, uint64, uint32, error)
func (*TopicProducerMgr) PublishOrderedWithJsonExt ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishOrderedWithJsonExt(topic string, partitionKey []byte, body []byte, ext *MsgExt) (NewMessageID, uint64, uint32, error)
func (*TopicProducerMgr) PublishWithExtAndRetryBackground ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishWithExtAndRetryBackground(topic string, body []byte, ext *MsgExt) error
func (*TopicProducerMgr) PublishWithJsonExt ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishWithJsonExt(topic string, body []byte, ext *MsgExt) (NewMessageID, uint64, uint32, error)
func (*TopicProducerMgr) PublishWithJsonExtAndPartitionId ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishWithJsonExtAndPartitionId(topic string, partition int, body []byte, ext *MsgExt) (NewMessageID, uint64, uint32, error)
func (*TopicProducerMgr) PublishWithPartitionId ¶ added in v1.3.1
func (self *TopicProducerMgr) PublishWithPartitionId(topic string, partition int, body []byte) error
func (*TopicProducerMgr) SetEtcdConf ¶ added in v1.3.1
func (self *TopicProducerMgr) SetEtcdConf(servers []string, cluster string, lookupPath string)
func (*TopicProducerMgr) SetLogger ¶ added in v1.3.1
func (self *TopicProducerMgr) SetLogger(l logger, lvl LogLevel)
func (*TopicProducerMgr) SetLoggerLevel ¶ added in v1.3.1
func (self *TopicProducerMgr) SetLoggerLevel(lvl LogLevel)
func (*TopicProducerMgr) Stop ¶ added in v1.3.1
func (self *TopicProducerMgr) Stop()
func (*TopicProducerMgr) TriggerCheckForError ¶ added in v1.3.1
func (self *TopicProducerMgr) TriggerCheckForError(err error, delay time.Duration)
for async operation, the async error should be check by the application if async operation has error.