server

package
v0.0.0-...-4ff9e67 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2014 License: MIT Imports: 34 Imported by: 0

Documentation

Overview

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Index

Constants

View Source
const KILOBYTE = 1024
View Source
const MAX_REQUEST_SIZE = MEGABYTE * 2
View Source
const MEGABYTE = 1024 * KILOBYTE

Variables

View Source
var (
	V311_MAGIC   = []byte("MQTT")
	V311_VERSION = uint8(4)
	V3_MAGIC     = []byte("MQIsdp")
	V3_VERSION   = uint8(3)
)

Functions

This section is empty.

Types

type Application

type Application struct {
	Engine  *Momonga
	Servers []Server
	// contains filtered or unexported fields
}

func NewApplication

func NewApplication(configPath string) *Application

func (*Application) Loop

func (self *Application) Loop()

func (*Application) RegisterServer

func (self *Application) RegisterServer(svr Server)

func (*Application) Start

func (self *Application) Start()

func (*Application) Stop

func (self *Application) Stop()

type DisconnectError

type DisconnectError struct {
}

func (*DisconnectError) Error

func (e *DisconnectError) Error() string

type DummyPlug

type DummyPlug struct {
	Identity string
	Switch   chan bool

	Running bool
	// contains filtered or unexported fields
}

func NewDummyPlug

func NewDummyPlug(engine *Momonga) *DummyPlug

func (*DummyPlug) AppendSubscribedTopic

func (self *DummyPlug) AppendSubscribedTopic(string, *SubscribeSet)

func (*DummyPlug) Close

func (self *DummyPlug) Close() error

func (*DummyPlug) DisableClearSession

func (self *DummyPlug) DisableClearSession()

func (*DummyPlug) GetGuid

func (self *DummyPlug) GetGuid() util.Guid

func (*DummyPlug) GetId

func (self *DummyPlug) GetId() string

func (*DummyPlug) GetOutGoingTable

func (self *DummyPlug) GetOutGoingTable() *util.MessageTable

func (*DummyPlug) GetRealId

func (self *DummyPlug) GetRealId() string

func (*DummyPlug) GetState

func (self *DummyPlug) GetState() State

func (*DummyPlug) GetSubscribedTopics

func (self *DummyPlug) GetSubscribedTopics() map[string]*SubscribeSet

func (*DummyPlug) GetWillMessage

func (self *DummyPlug) GetWillMessage() *mqtt.WillMessage

func (*DummyPlug) HasWillMessage

func (self *DummyPlug) HasWillMessage() bool

func (*DummyPlug) IsAlived

func (self *DummyPlug) IsAlived() bool

func (*DummyPlug) ReadMessage

func (self *DummyPlug) ReadMessage() (mqtt.Message, error)

func (*DummyPlug) RemoveSubscribedTopic

func (self *DummyPlug) RemoveSubscribedTopic(string)

func (*DummyPlug) ResetState

func (self *DummyPlug) ResetState()

func (*DummyPlug) Run

func (self *DummyPlug) Run()

func (*DummyPlug) SetGuid

func (self *DummyPlug) SetGuid(id util.Guid)

func (*DummyPlug) SetId

func (self *DummyPlug) SetId(id string)

func (*DummyPlug) SetKeepaliveInterval

func (self *DummyPlug) SetKeepaliveInterval(int)

func (*DummyPlug) SetState

func (self *DummyPlug) SetState(State)

func (*DummyPlug) SetWillMessage

func (self *DummyPlug) SetWillMessage(mqtt.WillMessage)

func (*DummyPlug) ShouldClearSession

func (self *DummyPlug) ShouldClearSession() bool

func (*DummyPlug) Stop

func (self *DummyPlug) Stop()

func (*DummyPlug) WriteMessageQueue

func (self *DummyPlug) WriteMessageQueue(request mqtt.Message)

func (*DummyPlug) WriteMessageQueue2

func (self *DummyPlug) WriteMessageQueue2(msg []byte)

type Handler

type Handler struct {
	Engine     *Momonga
	Connection Connection
}

Handler dispatches messages which sent by client. this struct will be use client library soon.

とかいいつつ、ackとかはhandlerで返してねーとか立ち位置分かりづらい Engine側でMQTTの基本機能を全部やれればいいんだけど、そうすると client library別にしないと無理なんだよなー。 目指すところとしては、基本部分はデフォルトのHandlerで動くから それで動かないところだけうわがいてね!って所。 Handler自体は受け渡ししかやらんのでlockしなくて大丈夫なはず

func NewHandler

func NewHandler(conn Connection, engine *Momonga) *Handler

func (*Handler) Close

func (self *Handler) Close()

func (*Handler) Disconnect

func (self *Handler) Disconnect()

func (*Handler) HandshakeInternal

func (self *Handler) HandshakeInternal(p *codec.ConnectMessage)

func (*Handler) Parsed

func (self *Handler) Parsed()

func (*Handler) Pingreq

func (self *Handler) Pingreq()

func (*Handler) Puback

func (self *Handler) Puback(messageId uint16)

func (*Handler) Pubcomp

func (self *Handler) Pubcomp(messageId uint16)

func (*Handler) Publish

func (self *Handler) Publish(p *codec.PublishMessage)

func (*Handler) Pubrec

func (self *Handler) Pubrec(messageId uint16)

func (*Handler) Pubrel

func (self *Handler) Pubrel(messageId uint16)

func (*Handler) Subscribe

func (self *Handler) Subscribe(p *codec.SubscribeMessage)

func (*Handler) Unsubscribe

func (self *Handler) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload)

type HttpListener

type HttpListener struct {
	net.Listener
	WebSocketMount string
	// contains filtered or unexported fields
}

func NewHttpListener

func NewHttpListener(listener net.Listener) *HttpListener

func (*HttpListener) Accept

func (self *HttpListener) Accept() (c net.Conn, err error)

func (*HttpListener) Addr

func (self *HttpListener) Addr() net.Addr

func (*HttpListener) Close

func (self *HttpListener) Close() error

func (*HttpListener) File

func (self *HttpListener) File() (f *os.File, err error)

type HttpServer

type HttpServer struct {
	http.Server
	Engine  *Momonga
	Address string
	// contains filtered or unexported fields
}

func NewHttpServer

func NewHttpServer(engine *Momonga, config *configuration.Config, inherit bool) *HttpServer

func (*HttpServer) Graceful

func (self *HttpServer) Graceful()

func (*HttpServer) ListenAndServe

func (self *HttpServer) ListenAndServe() error

func (*HttpServer) Listener

func (self *HttpServer) Listener() Listener

func (*HttpServer) Serve

func (self *HttpServer) Serve(l net.Listener) error

func (*HttpServer) Stop

func (self *HttpServer) Stop()

type Listener

type Listener interface {
	Accept() (c net.Conn, err error)

	Close() error

	Addr() net.Addr

	File() (f *os.File, err error)
}

type MmuxConnection

type MmuxConnection struct {
	// Primary
	PrimaryConnection Connection
	OfflineQueue      []mqtt.Message
	Connections       map[string]Connection
	MaxOfflineQueue   int
	Identifier        string
	CleanSession      bool
	OutGoingTable     *util.MessageTable
	SubscribeMap      map[string]bool
	Created           time.Time
	Hash              uint32
	Mutex             sync.RWMutex
	SubscribedTopics  map[string]*SubscribeSet
	// contains filtered or unexported fields
}

MQTT Multiplexer Connection

TODO: 途中で死んだとき用のやつを追加する.もうちょい素敵な実装にしたい TODO: goroutine safeにする

Multiplexer、というかなんだろ。Engineとの仲介でおいとくやつ。 下のコネクションとかは純粋に接続周りだけにしておきたいんだけどなー

func NewMmuxConnection

func NewMmuxConnection() *MmuxConnection

func (*MmuxConnection) AppendSubscribedTopic

func (self *MmuxConnection) AppendSubscribedTopic(topic string, set *SubscribeSet)

func (*MmuxConnection) Attach

func (self *MmuxConnection) Attach(conn Connection)

func (*MmuxConnection) Close

func (self *MmuxConnection) Close() error

func (*MmuxConnection) Detach

func (self *MmuxConnection) Detach(conn Connection)

func (*MmuxConnection) DisableClearSession

func (self *MmuxConnection) DisableClearSession()

func (*MmuxConnection) GetGuid

func (self *MmuxConnection) GetGuid() util.Guid

func (*MmuxConnection) GetHash

func (self *MmuxConnection) GetHash() uint32

func (*MmuxConnection) GetId

func (self *MmuxConnection) GetId() string

func (*MmuxConnection) GetOutGoingTable

func (self *MmuxConnection) GetOutGoingTable() *util.MessageTable

func (*MmuxConnection) GetRealId

func (self *MmuxConnection) GetRealId() string

func (*MmuxConnection) GetState

func (self *MmuxConnection) GetState() State

func (*MmuxConnection) GetSubscribedTopics

func (self *MmuxConnection) GetSubscribedTopics() map[string]*SubscribeSet

func (*MmuxConnection) GetWillMessage

func (self *MmuxConnection) GetWillMessage() *mqtt.WillMessage

func (*MmuxConnection) HasWillMessage

func (self *MmuxConnection) HasWillMessage() bool

func (*MmuxConnection) IsAlived

func (self *MmuxConnection) IsAlived() bool

func (*MmuxConnection) IsSubscribed

func (self *MmuxConnection) IsSubscribed(topic string) bool

func (*MmuxConnection) ReadMessage

func (self *MmuxConnection) ReadMessage() (mqtt.Message, error)

func (*MmuxConnection) RemoveSubscribedTopic

func (self *MmuxConnection) RemoveSubscribedTopic(topic string)

func (*MmuxConnection) ResetState

func (self *MmuxConnection) ResetState()

func (*MmuxConnection) SetGuid

func (self *MmuxConnection) SetGuid(id util.Guid)

func (*MmuxConnection) SetId

func (self *MmuxConnection) SetId(id string)

func (*MmuxConnection) SetKeepaliveInterval

func (self *MmuxConnection) SetKeepaliveInterval(interval int)

func (*MmuxConnection) SetState

func (self *MmuxConnection) SetState(state State)

func (*MmuxConnection) SetWillMessage

func (self *MmuxConnection) SetWillMessage(msg mqtt.WillMessage)

func (*MmuxConnection) ShouldClearSession

func (self *MmuxConnection) ShouldClearSession() bool

func (*MmuxConnection) WriteMessageQueue

func (self *MmuxConnection) WriteMessageQueue(request mqtt.Message)

type Momonga

type Momonga struct {
	OutGoingTable *util.MessageTable
	InflightTable map[string]*util.MessageTable
	TopicMatcher  TopicMatcher
	// TODO: improve this.
	Connections  map[string]*MmuxConnection
	RetryMap     map[string][]*Retryable
	ErrorChannel chan *Retryable
	System       System
	EnableSys    bool
	Started      time.Time
	DataStore    datastore.Datastore
	LockPool     map[uint32]*sync.RWMutex
	// contains filtered or unexported fields
}

goroutine (2)

RunMaintenanceThread
Run

func NewMomonga

func NewMomonga(config *configuration.Config) *Momonga

QoS 1, 2 are available. but really suck implementation. reconsider qos design later.

func (*Momonga) CleanSubscription

func (self *Momonga) CleanSubscription(conn Connection)

func (*Momonga) Config

func (self *Momonga) Config() *configuration.Config

func (*Momonga) DisableSys

func (self *Momonga) DisableSys()

func (*Momonga) Doom

func (self *Momonga) Doom()

func (*Momonga) GetConnectionByClientId

func (self *Momonga) GetConnectionByClientId(clientId string) (*MmuxConnection, error)

func (*Momonga) HandleConnection

func (self *Momonga) HandleConnection(conn Connection)

func (*Momonga) Handshake

func (self *Momonga) Handshake(p *codec.ConnectMessage, conn *MyConnection) *MmuxConnection

func (*Momonga) RemoveConnectionByClientId

func (self *Momonga) RemoveConnectionByClientId(clientId string)

func (*Momonga) RetainMatch

func (self *Momonga) RetainMatch(topic string) []*codec.PublishMessage

TODO: wanna implement trie. but regexp works well. retain should persist their data. though, how do we fetch it efficiently...

func (*Momonga) Run

func (self *Momonga) Run()

func (*Momonga) RunMaintenanceThread

func (self *Momonga) RunMaintenanceThread()

below methods are intend to maintain engine itself (remove needless connection, dispatch queue).

func (*Momonga) SendMessage

func (self *Momonga) SendMessage(topic string, message []byte, qos int)

func (*Momonga) SendPublishMessage

func (self *Momonga) SendPublishMessage(msg *codec.PublishMessage)

func (*Momonga) SendWillMessage

func (self *Momonga) SendWillMessage(conn Connection)

func (*Momonga) SetConnectionByClientId

func (self *Momonga) SetConnectionByClientId(clientId string, conn *MmuxConnection)

func (*Momonga) Subscribe

func (self *Momonga) Subscribe(p *codec.SubscribeMessage, conn Connection)

func (*Momonga) Terminate

func (self *Momonga) Terminate()

func (*Momonga) Unsubscribe

func (self *Momonga) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload, conn Connection)

func (*Momonga) Work

func (self *Momonga) Work()

type MyBroker

type MyBroker struct {
	Clients            MyClients
	Messages           MyMessages
	Load               MyLoad
	SubscriptionsCount *expvar.Int
	Uptime             *expvar.Int
}

type MyClients

type MyClients struct {
	Connected    *expvar.Int
	Total        *expvar.Int
	Maximum      *expvar.Int
	Disconnected *expvar.Int
}

type MyConn

type MyConn struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*MyConn) Close

func (self *MyConn) Close() error

type MyHttpServer

type MyHttpServer struct {
	Engine         *Momonga
	WebSocketMount string
}

func (*MyHttpServer) ServeHTTP

func (self *MyHttpServer) ServeHTTP(w http.ResponseWriter, req *http.Request)

type MyListener

type MyListener struct {
	net.Listener
	// contains filtered or unexported fields
}

func (*MyListener) Accept

func (self *MyListener) Accept() (net.Conn, error)

func (*MyListener) File

func (self *MyListener) File() (f *os.File, err error)

type MyLoad

type MyLoad struct {
	BytesSend     *expvar.Int
	BytesReceived *expvar.Int
}

type MyMessages

type MyMessages struct {
	Received       *expvar.Int
	Sent           *expvar.Int
	Stored         *expvar.Int
	PublishDropped *expvar.Int
	RetainedCount  *expvar.Int
}

type MyMetrics

type MyMetrics struct {
	System MySystem

	NumGoroutine      *expvar.Int
	NumCgoCall        *expvar.Int
	Uptime            *expvar.Int
	MemFree           *expvar.Int
	MemUsed           *expvar.Int
	MemActualFree     *expvar.Int
	MemActualUsed     *expvar.Int
	MemTotal          *expvar.Int
	LoadOne           *expvar.Float
	LoadFive          *expvar.Float
	LoadFifteen       *expvar.Float
	CpuUser           *expvar.Float
	CpuNice           *expvar.Float
	CpuSys            *expvar.Float
	CpuIdle           *expvar.Float
	CpuWait           *expvar.Float
	CpuIrq            *expvar.Float
	CpuSoftIrq        *expvar.Float
	CpuStolen         *expvar.Float
	CpuTotal          *expvar.Float
	MessageSentPerSec *myexpvar.DiffInt
	ConnectPerSec     *myexpvar.DiffInt
	GoroutinePerConn  *expvar.Float
}
var Metrics *MyMetrics = &MyMetrics{
	System: MySystem{
		Broker: MyBroker{
			Clients: MyClients{
				Connected:    expvar.NewInt("sys.broker.clients.connected"),
				Total:        expvar.NewInt("sys.broker.clients.total"),
				Maximum:      expvar.NewInt("sys.broker.clients.maximum"),
				Disconnected: expvar.NewInt("sys.broker.clients.disconnected"),
			},
			Uptime: expvar.NewInt("sys.broker.uptime"),
			Messages: MyMessages{
				Received:       expvar.NewInt("sys.broker.messages.received"),
				Sent:           expvar.NewInt("sys.broker.messages.sent"),
				Stored:         expvar.NewInt("sys.broker.messages.stored"),
				PublishDropped: expvar.NewInt("sys.broker.messages.publish.dropped"),
				RetainedCount:  expvar.NewInt("sys.broker.messages.retained.count"),
			},
			Load: MyLoad{
				BytesSend:     expvar.NewInt("sys.broker.load.bytes_send"),
				BytesReceived: expvar.NewInt("sys.broker.load.bytes_received"),
			},
			SubscriptionsCount: expvar.NewInt("sys.broker.subscriptions.count"),
		},
	},

	NumGoroutine:  expvar.NewInt("numgoroutine"),
	NumCgoCall:    expvar.NewInt("numcgocall"),
	Uptime:        expvar.NewInt("uptime"),
	MemFree:       expvar.NewInt("memfree"),
	MemUsed:       expvar.NewInt("memused"),
	MemActualFree: expvar.NewInt("memactualfree"),
	MemActualUsed: expvar.NewInt("memactualused"),
	MemTotal:      expvar.NewInt("memtotal"),
	LoadOne:       expvar.NewFloat("loadone"),
	LoadFive:      expvar.NewFloat("loadfive"),
	LoadFifteen:   expvar.NewFloat("loadfifteen"),
	CpuUser:       expvar.NewFloat("cpuuser"),
	CpuNice:       expvar.NewFloat("cpunice"),
	CpuSys:        expvar.NewFloat("cpusys"),
	CpuIdle:       expvar.NewFloat("cpuidle"),
	CpuWait:       expvar.NewFloat("cpuwait"),
	CpuIrq:        expvar.NewFloat("cpuirq"),
	CpuSoftIrq:    expvar.NewFloat("cpusoftirq"),
	CpuStolen:     expvar.NewFloat("cpustolen"),
	CpuTotal:      expvar.NewFloat("cputotal"),

	MessageSentPerSec: myexpvar.NewDiffInt("msg_sent_per_sec"),
	ConnectPerSec:     myexpvar.NewDiffInt("connect_per_sec"),
	GoroutinePerConn:  expvar.NewFloat("goroutine_per_conn"),
}

TODO: should not use expvar as we can't hold multiple MyMetrics metrics.

type MySystem

type MySystem struct {
	Broker MyBroker
}

type Retryable

type Retryable struct {
	Id      string
	Payload interface{}
}

TODO: haven't used this yet.

type Server

type Server interface {
	ListenAndServe() error
	//ListenAndServeTLS(certFile, keyFile string) error
	Serve(l net.Listener) error

	Graceful()

	Stop()

	Listener() Listener
}

type System

type System struct {
	Broker SystemBroker
}

NOTE: $SYS structures

type SystemBroker

type SystemBroker struct {
	Load          SystemBrokerLoad
	Clients       SystemBrokerClients
	Messages      SystemBrokerMessages
	Subscriptions SystemBrokerSubscriptions
	Broker        SystemBrokerBroker
}

type SystemBrokerBroker

type SystemBrokerBroker struct {
	Time    int
	Uptime  int
	Version string
}

type SystemBrokerClients

type SystemBrokerClients struct {
	Connected    int
	Disconnected int
	Maximum      int
	Total        int
}

type SystemBrokerLoad

type SystemBrokerLoad struct {
	Bytes SystemBrokerLoadBytes
}

type SystemBrokerLoadBytes

type SystemBrokerLoadBytes struct {
	Received int
	Sent     int
}

type SystemBrokerMessages

type SystemBrokerMessages struct {
	Inflight int
	Received int
	Sent     int
	Stored   int
	Publish  SystemBrokerMessagesPublish
	Retained SystemBrokerMessagesRetained
}

type SystemBrokerMessagesPublish

type SystemBrokerMessagesPublish struct {
	Sent  int
	Count int
}

type SystemBrokerMessagesRetained

type SystemBrokerMessagesRetained struct {
	Count int
}

type SystemBrokerSubscriptions

type SystemBrokerSubscriptions struct {
	Count int
}

type TcpServer

type TcpServer struct {
	ListenAddress string
	Engine        *Momonga
	// contains filtered or unexported fields
}

func NewTcpServer

func NewTcpServer(engine *Momonga, config *configuration.Config, inherit bool) *TcpServer

func (*TcpServer) Graceful

func (self *TcpServer) Graceful()

func (*TcpServer) ListenAndServe

func (self *TcpServer) ListenAndServe() error

func (*TcpServer) Listener

func (self *TcpServer) Listener() Listener

func (*TcpServer) Serve

func (self *TcpServer) Serve(l net.Listener) error

func (*TcpServer) Stop

func (self *TcpServer) Stop()

type TopicMatcher

type TopicMatcher interface {
	// TODO: should force []*SubscribeSet
	Match(Topic string) []interface{}
	Add(Topic string, Value interface{})
	Remove(Topic string, val interface{})
	Dump(writer io.Writer)
}

type UnixServer

type UnixServer struct {
	Engine  *Momonga
	Address string
	// contains filtered or unexported fields
}

func NewUnixServer

func NewUnixServer(engine *Momonga, config *configuration.Config, inherit bool) *UnixServer

func (*UnixServer) Graceful

func (self *UnixServer) Graceful()

func (*UnixServer) ListenAndServe

func (self *UnixServer) ListenAndServe() error

func (*UnixServer) Listener

func (self *UnixServer) Listener() Listener

func (*UnixServer) Serve

func (self *UnixServer) Serve(l net.Listener) error

func (*UnixServer) Stop

func (self *UnixServer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL