nsqd

package
v0.0.0-...-89065a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2025 License: MIT Imports: 50 Imported by: 0

README

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.

Read the docs

Documentation

Index

Constants

View Source
const (
	LOG_DEBUG = lg.DEBUG
	LOG_INFO  = lg.INFO
	LOG_WARN  = lg.WARN
	LOG_ERROR = lg.ERROR
	LOG_FATAL = lg.FATAL
)
View Source
const (
	TLSNotRequired = iota
	TLSRequiredExceptHTTP
	TLSRequired
)
View Source
const (
	MsgIDLength = 16
)

Variables

View Source
var AllExperiments = []Experiment{
	TopologyAwareConsumption,
}
View Source
var ErrIDBackwards = errors.New("ID went backward")
View Source
var ErrSequenceExpired = errors.New("sequence expired")
View Source
var ErrTimeBackwards = errors.New("time has gone backwards")

Functions

func NewGUIDFactory

func NewGUIDFactory(nodeID int64) *guidFactory

Types

type BackendQueue

type BackendQueue interface {
	Put([]byte) error
	ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

BackendQueue represents the behavior for the secondary message storage system

type Channel

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)

There can be multiple channels per topic, each with there own unique set of subscribers (clients).

Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.

func NewChannel

func NewChannel(topicName string, channelName string, nsqd *NSQD,
	deleteCallback func(*Channel)) *Channel

NewChannel creates a new instance of the Channel type and returns a pointer

func (*Channel) AddClient

func (c *Channel) AddClient(clientID int64, client Consumer) error

AddClient adds a client to the Channel's client list

func (*Channel) Close

func (c *Channel) Close() error

Close cleanly closes the Channel

func (*Channel) Delete

func (c *Channel) Delete() error

Delete empties the channel and closes

func (*Channel) Depth

func (c *Channel) Depth() int64

func (*Channel) Empty

func (c *Channel) Empty() error

func (*Channel) Exiting

func (c *Channel) Exiting() bool

Exiting returns a boolean indicating if this channel is closed/exiting

func (*Channel) FinishMessage

func (c *Channel) FinishMessage(clientID int64, id MessageID) error

FinishMessage successfully discards an in-flight message

func (*Channel) IsPaused

func (c *Channel) IsPaused() bool

func (*Channel) Pause

func (c *Channel) Pause() error

func (*Channel) PutMessage

func (c *Channel) PutMessage(m *Message) error

PutMessage writes a Message to the queue

func (*Channel) PutMessageDeferred

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration)

func (*Channel) RemoveClient

func (c *Channel) RemoveClient(clientID int64)

RemoveClient removes a client from the Channel's client list

func (*Channel) RequeueMessage

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error

RequeueMessage requeues a message based on `time.Duration`, ie:

`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout

and requeue a message (aka "deferred requeue")

func (*Channel) StartDeferredTimeout

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error

func (*Channel) StartInFlightTimeout

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error

func (*Channel) TouchMessage

func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error

TouchMessage resets the timeout for an in-flight message

func (*Channel) UnPause

func (c *Channel) UnPause() error

type ChannelMetadata

type ChannelMetadata struct {
	Name   string `json:"name"`
	Paused bool   `json:"paused"`
}

ChannelMetadata is the collection of persistent information about a channel.

type ChannelStats

type ChannelStats struct {
	ChannelName         string        `json:"channel_name"`
	Depth               int64         `json:"depth"`
	BackendDepth        int64         `json:"backend_depth"`
	InFlightCount       int           `json:"in_flight_count"`
	DeferredCount       int           `json:"deferred_count"`
	MessageCount        uint64        `json:"message_count"`
	ZoneLocalMsgCount   uint64        `json:"zone_local_msg_count,omitempty"`
	RegionLocalMsgCount uint64        `json:"region_local_msg_count,omitempty"`
	GlobalMsgCount      uint64        `json:"global_msg_count,omitempty"`
	RequeueCount        uint64        `json:"requeue_count"`
	TimeoutCount        uint64        `json:"timeout_count"`
	ClientCount         int           `json:"client_count"`
	Clients             []ClientStats `json:"clients"`
	Paused              bool          `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewChannelStats

func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats

type Channels

type Channels []*Channel

func (Channels) Len

func (c Channels) Len() int

func (Channels) Swap

func (c Channels) Swap(i, j int)

type ChannelsByName

type ChannelsByName struct {
	Channels
}

func (ChannelsByName) Less

func (c ChannelsByName) Less(i, j int) bool

type Client

type Client interface {
	Type() int
	Stats(string) ClientStats
}

type ClientStats

type ClientStats interface {
	String() string
}

type ClientV2Stats

type ClientV2Stats struct {
	ClientID            string `json:"client_id"`
	Hostname            string `json:"hostname"`
	Version             string `json:"version"`
	RemoteAddress       string `json:"remote_address"`
	State               int32  `json:"state"`
	ReadyCount          int64  `json:"ready_count"`
	InFlightCount       int64  `json:"in_flight_count"`
	MessageCount        uint64 `json:"message_count"`
	ZoneLocalMsgCount   uint64 `json:"zone_local_msg_count,omitempty"`
	RegionLocalMsgCount uint64 `json:"region_local_msg_count,omitempty"`
	GlobalMsgCount      uint64 `json:"global_msg_count,omitempty"`
	FinishCount         uint64 `json:"finish_count"`
	RequeueCount        uint64 `json:"requeue_count"`
	ConnectTime         int64  `json:"connect_ts"`
	SampleRate          int32  `json:"sample_rate"`
	Deflate             bool   `json:"deflate"`
	Snappy              bool   `json:"snappy"`
	UserAgent           string `json:"user_agent"`
	Authed              bool   `json:"authed,omitempty"`
	AuthIdentity        string `json:"auth_identity,omitempty"`
	AuthIdentityURL     string `json:"auth_identity_url,omitempty"`
	TopologyZone        string `json:"topology_zone"`
	TopologyRegion      string `json:"topology_region"`

	PubCounts []PubCount `json:"pub_counts,omitempty"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

func (ClientV2Stats) String

func (s ClientV2Stats) String() string

type Consumer

type Consumer interface {
	UnPause()
	Pause()
	Close() error
	TimedOutMessage()
	Stats(string) ClientStats
	Empty()
}

type Experiment

type Experiment string
const (
	TopologyAwareConsumption Experiment = "topology-aware-consumption"
)

type Logger

type Logger lg.Logger

type Message

type Message struct {
	ID        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(id MessageID, body []byte) *Message

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (int64, error)

type MessageID

type MessageID [MsgIDLength]byte

type Metadata

type Metadata struct {
	Topics  []TopicMetadata `json:"topics"`
	Version string          `json:"version"`
}

Metadata is the collection of persistent information about the current NSQD.

type NSQD

type NSQD struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New

func New(opts *Options) (*NSQD, error)

func (*NSQD) Context

func (n *NSQD) Context() context.Context

Context returns a context that will be canceled when nsqd initiates the shutdown

func (*NSQD) DeleteExistingTopic

func (n *NSQD) DeleteExistingTopic(topicName string) error

DeleteExistingTopic removes a topic only if it exists

func (*NSQD) Exit

func (n *NSQD) Exit()

func (*NSQD) GetError

func (n *NSQD) GetError() error

func (*NSQD) GetExistingTopic

func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)

GetExistingTopic gets a topic only if it exists

func (*NSQD) GetHealth

func (n *NSQD) GetHealth() string

func (*NSQD) GetMetadata

func (n *NSQD) GetMetadata(ephemeral bool) *Metadata

GetMetadata retrieves the current topic and channel set of the NSQ daemon. If the ephemeral flag is set, ephemeral topics are also returned even though these are not saved to disk.

func (*NSQD) GetStartTime

func (n *NSQD) GetStartTime() time.Time

func (*NSQD) GetStats

func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats

func (*NSQD) GetTopic

func (n *NSQD) GetTopic(topicName string) *Topic

GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)

func (*NSQD) IsAuthEnabled

func (n *NSQD) IsAuthEnabled() bool

func (*NSQD) IsHealthy

func (n *NSQD) IsHealthy() bool

func (*NSQD) LoadMetadata

func (n *NSQD) LoadMetadata() error

func (*NSQD) Main

func (n *NSQD) Main() error

func (*NSQD) Notify

func (n *NSQD) Notify(v interface{}, persist bool)

func (*NSQD) PersistMetadata

func (n *NSQD) PersistMetadata() error

func (*NSQD) RealHTTPAddr

func (n *NSQD) RealHTTPAddr() net.Addr

func (*NSQD) RealHTTPSAddr

func (n *NSQD) RealHTTPSAddr() *net.TCPAddr

func (*NSQD) RealTCPAddr

func (n *NSQD) RealTCPAddr() net.Addr

func (*NSQD) SetHealth

func (n *NSQD) SetHealth(err error)

type Options

type Options struct {
	// basic options
	ID        int64       `flag:"node-id" cfg:"id"`
	LogLevel  lg.LogLevel `flag:"log-level"`
	LogPrefix string      `flag:"log-prefix"`
	Logger    Logger

	TCPAddress               string        `flag:"tcp-address"`
	HTTPAddress              string        `flag:"http-address"`
	HTTPSAddress             string        `flag:"https-address"`
	BroadcastAddress         string        `flag:"broadcast-address"`
	BroadcastTCPPort         int           `flag:"broadcast-tcp-port"`
	BroadcastHTTPPort        int           `flag:"broadcast-http-port"`
	NSQLookupdTCPAddresses   []string      `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
	AuthHTTPAddresses        []string      `flag:"auth-http-address" cfg:"auth_http_addresses"`
	AuthHTTPRequestMethod    string        `flag:"auth-http-request-method" cfg:"auth_http_request_method"`
	HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"`
	HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"`
	TopologyRegion           string        `flag:"topology-region"`
	TopologyZone             string        `flag:"topology-zone"`

	// diskqueue options
	DataPath        string        `flag:"data-path"`
	MemQueueSize    int64         `flag:"mem-queue-size"`
	MaxBytesPerFile int64         `flag:"max-bytes-per-file"`
	SyncEvery       int64         `flag:"sync-every"`
	SyncTimeout     time.Duration `flag:"sync-timeout"`

	QueueScanInterval        time.Duration
	QueueScanRefreshInterval time.Duration
	QueueScanSelectionCount  int `flag:"queue-scan-selection-count"`
	QueueScanWorkerPoolMax   int `flag:"queue-scan-worker-pool-max"`
	QueueScanDirtyPercent    float64

	// msg and command options
	MsgTimeout    time.Duration `flag:"msg-timeout"`
	MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
	MaxMsgSize    int64         `flag:"max-msg-size"`
	MaxBodySize   int64         `flag:"max-body-size"`
	MaxReqTimeout time.Duration `flag:"max-req-timeout"`
	ClientTimeout time.Duration

	// client overridable configuration options
	MaxHeartbeatInterval   time.Duration `flag:"max-heartbeat-interval"`
	MaxRdyCount            int64         `flag:"max-rdy-count"`
	MaxOutputBufferSize    int64         `flag:"max-output-buffer-size"`
	MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`
	MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"`
	OutputBufferTimeout    time.Duration `flag:"output-buffer-timeout"`
	MaxChannelConsumers    int           `flag:"max-channel-consumers"`

	// statsd integration
	StatsdAddress          string        `flag:"statsd-address"`
	StatsdPrefix           string        `flag:"statsd-prefix"`
	StatsdInterval         time.Duration `flag:"statsd-interval"`
	StatsdMemStats         bool          `flag:"statsd-mem-stats"`
	StatsdUDPPacketSize    int           `flag:"statsd-udp-packet-size"`
	StatsdExcludeEphemeral bool          `flag:"statsd-exclude-ephemeral"`

	// e2e message latency
	E2EProcessingLatencyWindowTime  time.Duration `flag:"e2e-processing-latency-window-time"`
	E2EProcessingLatencyPercentiles []float64     `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"`

	// TLS config
	TLSCert             string `flag:"tls-cert"`
	TLSKey              string `flag:"tls-key"`
	TLSClientAuthPolicy string `flag:"tls-client-auth-policy"`
	TLSRootCAFile       string `flag:"tls-root-ca-file"`
	TLSRequired         int    `flag:"tls-required"`
	TLSMinVersion       uint16 `flag:"tls-min-version"`

	// compression
	DeflateEnabled  bool `flag:"deflate"`
	MaxDeflateLevel int  `flag:"max-deflate-level"`
	SnappyEnabled   bool `flag:"snappy"`

	// experimental features
	Experiments []string `flag:"enable-experiment" cfg:"enable_experiment"`
}

func NewOptions

func NewOptions() *Options

func (Options) HasExperiment

func (o Options) HasExperiment(e Experiment) bool

type PubCount

type PubCount struct {
	Topic string `json:"topic"`
	Count uint64 `json:"count"`
}

type Stats

type Stats struct {
	Topics    []TopicStats
	Producers []ClientStats
}

type Topic

type Topic struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic

Topic constructor

func (*Topic) AggregateChannelE2eProcessingLatency

func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile

func (*Topic) Close

func (t *Topic) Close() error

Close persists all outstanding topic data and closes all its channels

func (*Topic) Delete

func (t *Topic) Delete() error

Delete empties the topic and all its channels and closes

func (*Topic) DeleteExistingChannel

func (t *Topic) DeleteExistingChannel(channelName string) error

DeleteExistingChannel removes a channel from the topic only if it exists

func (*Topic) Depth

func (t *Topic) Depth() int64

func (*Topic) Empty

func (t *Topic) Empty() error

func (*Topic) Exiting

func (t *Topic) Exiting() bool

Exiting returns a boolean indicating if this topic is closed/exiting

func (*Topic) GenerateID

func (t *Topic) GenerateID() MessageID

func (*Topic) GetChannel

func (t *Topic) GetChannel(channelName string) *Channel

GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic

func (*Topic) GetExistingChannel

func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

func (*Topic) IsPaused

func (t *Topic) IsPaused() bool

func (*Topic) Pause

func (t *Topic) Pause() error

func (*Topic) PutMessage

func (t *Topic) PutMessage(m *Message) error

PutMessage writes a Message to the queue

func (*Topic) PutMessages

func (t *Topic) PutMessages(msgs []*Message) error

PutMessages writes multiple Messages to the queue

func (*Topic) Start

func (t *Topic) Start()

func (*Topic) UnPause

func (t *Topic) UnPause() error

type TopicMetadata

type TopicMetadata struct {
	Name     string            `json:"name"`
	Paused   bool              `json:"paused"`
	Channels []ChannelMetadata `json:"channels"`
}

TopicMetadata is the collection of persistent information about a topic.

type TopicStats

type TopicStats struct {
	TopicName    string         `json:"topic_name"`
	Channels     []ChannelStats `json:"channels"`
	Depth        int64          `json:"depth"`
	BackendDepth int64          `json:"backend_depth"`
	MessageCount uint64         `json:"message_count"`
	MessageBytes uint64         `json:"message_bytes"`
	Paused       bool           `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewTopicStats

func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats

type Topics

type Topics []*Topic

func (Topics) Len

func (t Topics) Len() int

func (Topics) Swap

func (t Topics) Swap(i, j int)

type TopicsByName

type TopicsByName struct {
	Topics
}

func (TopicsByName) Less

func (t TopicsByName) Less(i, j int) bool

type Uint64Slice

type Uint64Slice []uint64

func (Uint64Slice) Len

func (s Uint64Slice) Len() int

func (Uint64Slice) Less

func (s Uint64Slice) Less(i, j int) bool

func (Uint64Slice) Swap

func (s Uint64Slice) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL