network

package
v0.0.0-...-9f791b1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2022 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ONLINE network state -online
	ONLINE = "online"
	// UNAVAILABLE state -unavailable
	UNAVAILABLE = "unavailable"
	// OFFLINE state -offline
	OFFLINE = "offline"
	// UNKNOWN or other state -unknown
	UNKNOWN = "unknown"
	// ROOMPREFIX room prefix
	ROOMPREFIX = "photon"
	// ROOMSEP with ',' to separate room name's part
	ROOMSEP = "_"
	// PATHPREFIX0 the lastest matrix client api version
	PATHPREFIX0 = "/_matrix/client/r0"
	// LOGINTYPE login type we used
	LOGINTYPE = "m.login.password"
	// CHATPRESET the type of chat=public
	CHATPRESET = "public_chat"
	//EventAddressRoom is user defined event type
	EventAddressRoom = "network.photon.rooms"
)
View Source
const DefaultTemporaryPeerTimeout = time.Second * 30

DefaultTemporaryPeerTimeout is the time when to remove a peer without receiving new message

Variables

View Source
var (
	// ValidUserIDRegex user ID 's format
	ValidUserIDRegex = regexp.MustCompile(`^@(0x[0-9a-f]{40})(?:\.[0-9a-f]{8})?(?::.+)?$`) //(`^[0-9a-z_\-./]+$`)
	//NETWORKNAME which network is used
	NETWORKNAME = params.NETWORKNAME
	//ALIASFRAGMENT the terminal part of alias
	ALIASFRAGMENT = params.AliasFragment
	//DISCOVERYROOMSERVER discovery room server name
	DISCOVERYROOMSERVER = params.DiscoveryServer
)
View Source
var DeviceTypeMeshBox = xmpptransport.TypeMeshBox

DeviceTypeMeshBox if you are a Photon running on a meshbox

View Source
var DeviceTypeMobile = xmpptransport.TypeMobile

DeviceTypeMobile if you are a Photon running on a mobile phone

View Source
var DeviceTypeOther = xmpptransport.TypeOtherDevice

DeviceTypeOther if you don't known the type,and is not a mobile phone, then other

Functions

func New

func New(sample interface{}) interface{}

New create new object from sample.

func SubscribeNeighbor

func SubscribeNeighbor(p *PhotonProtocol, addr common.Address) error

SubscribeNeighbor subscribe neighbor's online and offline status

Types

type ChannelStatusGetter

type ChannelStatusGetter interface {
	GetChannelStatus(channelIdentifier common.Hash) (int, int64)
}

ChannelStatusGetter get the status of channel address, so sender can remove msg based on channel status

for example :
	A send B a mediated transfer, but B is offline
	when B is online ,this transfer is invalid, so A will never receive ack
	if A  remove this msg, this channel can not be used only more.
	but if A does't remove, when A settle/withdraw/reopen channel with B,this msg will make the new channel unusable too.
	So A need to remove channel when channel status change.

type MatrixMixTransport

type MatrixMixTransport struct {
	// contains filtered or unexported fields
}

MatrixMixTransport is a wrapper for two Transporter(UDP and Matrix) if I can reach the node by UDP,then UDP, if I cannot reach the node, try Matrix

func NewMatrixMixTransporter

func NewMatrixMixTransporter(name, host string, port int, key *ecdsa.PrivateKey, protocol ProtocolReceiver, policy Policier, deviceType string) (t *MatrixMixTransport, err error)

NewMatrixMixTransporter create a MixTransport and discover

func (*MatrixMixTransport) GetNotify

func (t *MatrixMixTransport) GetNotify() (notify <-chan netshare.Status, err error)

GetNotify notification of connection status change

func (*MatrixMixTransport) NodeStatus

func (t *MatrixMixTransport) NodeStatus(addr common.Address) (deviceType string, isOnline bool)

NodeStatus get node's status and is online right now

func (*MatrixMixTransport) RegisterProtocol

func (t *MatrixMixTransport) RegisterProtocol(protcol ProtocolReceiver)

RegisterProtocol register receiver for the two transporter

func (*MatrixMixTransport) RegisterWakeUpChan

func (t *MatrixMixTransport) RegisterWakeUpChan(addr common.Address, c chan int)

RegisterWakeUpChan :

func (*MatrixMixTransport) Send

func (t *MatrixMixTransport) Send(receiver common.Address, data []byte) error

Send message 优先选择局域网,在局域网走不通的情况下,才会考虑 matrix

  • Send message prefers to choose LAN,
  • after LAN does not work, then try matrix.

func (*MatrixMixTransport) SetMatrixDB

func (t *MatrixMixTransport) SetMatrixDB(db xmpptransport.XMPPDb) error

SetMatrixDB get the status change notification of partner node func (t *MatrixMixTransport) SetMatrixDB(db xmpptransport.XMPPDb) error {

func (*MatrixMixTransport) Start

func (t *MatrixMixTransport) Start()

Start the two transporter

func (*MatrixMixTransport) Stop

func (t *MatrixMixTransport) Stop()

Stop the two transporter

func (*MatrixMixTransport) StopAccepting

func (t *MatrixMixTransport) StopAccepting()

StopAccepting stops receiving for the two transporter

func (*MatrixMixTransport) UnRegisterWakeUpChan

func (t *MatrixMixTransport) UnRegisterWakeUpChan(addr common.Address)

UnRegisterWakeUpChan :

type MatrixPeer

type MatrixPeer struct {
	// contains filtered or unexported fields
}

MatrixPeer is the photon node on matrix server

func NewMatrixPeer

func NewMatrixPeer(address common.Address, hasChannel bool, removeChan chan<- common.Address) *MatrixPeer

NewMatrixPeer create matrix user

type MatrixTransport

type MatrixTransport struct {
	NodeAddress common.Address

	Peers map[common.Address]*MatrixPeer

	UserID         string //the current user's ID(@kitty:thisserver)
	NodeDeviceType string
	// contains filtered or unexported fields
}

MatrixTransport represents a matrix transport Instantiation

func NewMatrixTransport

func NewMatrixTransport(logname string, key *ecdsa.PrivateKey, devicetype string, servers map[string]string) *MatrixTransport

NewMatrixTransport init matrix

func (*MatrixTransport) HandleMessage

func (m *MatrixTransport) HandleMessage(from common.Address, data []byte)

HandleMessage regist the interface of call receive(func)

func (*MatrixTransport) NodeStatus

func (m *MatrixTransport) NodeStatus(addr common.Address) (deviceType string, isOnline bool)

NodeStatus gets Node states of network, if check self node, `status` is not always be true instead it switches according to server handshake signal.

func (*MatrixTransport) RegisterProtocol

func (m *MatrixTransport) RegisterProtocol(protcol ProtocolReceiver)

RegisterProtocol regist the interface of call RegisterProtocol(func)

func (*MatrixTransport) RegisterWakeUpChan

func (m *MatrixTransport) RegisterWakeUpChan(addr common.Address, c chan int)

RegisterWakeUpChan :

func (*MatrixTransport) Send

func (m *MatrixTransport) Send(receiverAddr common.Address, data []byte) error

Send send message

func (*MatrixTransport) Start

func (m *MatrixTransport) Start()

Start matrix 后台不断重试登陆,注册,并初始化相关信息 如果网络连接正常的话,会保证登陆,初始化完成以后再返回。 如果网络连接异常,那么会立即返回,然后后台不断尝试

func (*MatrixTransport) Stop

func (m *MatrixTransport) Stop()

Stop Does Stop need to destroy matrix resource ?

func (*MatrixTransport) StopAccepting

func (m *MatrixTransport) StopAccepting()

StopAccepting stop receive message and wait

func (*MatrixTransport) UnRegisterWakeUpChan

func (m *MatrixTransport) UnRegisterWakeUpChan(addr common.Address)

UnRegisterWakeUpChan :

type MessageToPhoton

type MessageToPhoton struct {
	Msg      encoding.SignedMessager
	EchoHash common.Hash
}

MessageToPhoton message and it's echo hash

type MixTransport

type MixTransport struct {
	// contains filtered or unexported fields
}

MixTransport is a wrapper for two Transporter(UDP and XMPP) if I can reach the node by UDP,then UDP, if I cannot reach the node, try XMPP

func MakeTestMixTransport

func MakeTestMixTransport(name string, key *ecdsa.PrivateKey) *MixTransport

MakeTestMixTransport creat a test mix transport

func NewMixTranspoter

func NewMixTranspoter(name, xmppServer, host string, port int, key *ecdsa.PrivateKey, protocol ProtocolReceiver, policy Policier, deviceType string) (t *MixTransport, err error)

NewMixTranspoter create a MixTransport and discover

func (*MixTransport) GetNotify

func (t *MixTransport) GetNotify() (notify <-chan netshare.Status, err error)

GetNotify notification of connection status change

func (*MixTransport) NodeStatus

func (t *MixTransport) NodeStatus(addr common.Address) (deviceType string, isOnline bool)

NodeStatus get node's status and is online right now

func (*MixTransport) Reconnect

func (t *MixTransport) Reconnect()

Reconnect :

func (*MixTransport) RegisterProtocol

func (t *MixTransport) RegisterProtocol(protcol ProtocolReceiver)

RegisterProtocol register receiver for the two transporter

func (*MixTransport) Send

func (t *MixTransport) Send(receiver common.Address, data []byte) error

Send message 优先选择局域网,在局域网走不通的情况下,才会考虑 xmpp

  • Send : function to send out messages. *
  • Note that this function prefers to choose LAN, ifor new c local network does not work,
  • then it chooses xmpp.

func (*MixTransport) Start

func (t *MixTransport) Start()

Start the two transporter

func (*MixTransport) Stop

func (t *MixTransport) Stop()

Stop the two transporter

func (*MixTransport) StopAccepting

func (t *MixTransport) StopAccepting()

StopAccepting stops receiving for the two transporter

func (*MixTransport) SubscribeNeighbor

func (t *MixTransport) SubscribeNeighbor(db xmpptransport.XMPPDb) error

SubscribeNeighbor get the status change notification of partner node

type NodeInfo

type NodeInfo struct {
	Address    string `json:"address"`
	IPPort     string `json:"ip_port"`
	DeviceType string `json:"device_type"` // must be mobile?
}

NodeInfo get from user

type PhotonProtocol

type PhotonProtocol struct {
	Transport Transporter

	SentHashesToChannel map[common.Hash]*SentMessageState

	/*
		message from other nodes
	*/
	ReceivedMessageChan chan *MessageToPhoton
	/*
		this is a synchronized chan,reading  process message result from photon
	*/
	ReceivedMessageResultChan chan error

	ChannelStatusGetter ChannelStatusGetter
	// contains filtered or unexported fields
}

PhotonProtocol is a UDP protocol, every message needs a ack to make sure sent success.

func MakeTestDiscardExpiredTransferPhotonProtocol

func MakeTestDiscardExpiredTransferPhotonProtocol(name string) *PhotonProtocol

MakeTestDiscardExpiredTransferPhotonProtocol test only

func MakeTestPhotonProtocol

func MakeTestPhotonProtocol(name string) *PhotonProtocol

MakeTestPhotonProtocol test only

func NewPhotonProtocol

func NewPhotonProtocol(transport Transporter, privKey *ecdsa.PrivateKey, channelStatusGetter ChannelStatusGetter) *PhotonProtocol

NewPhotonProtocol create PhotonProtocol

func (*PhotonProtocol) CreateAck

func (p *PhotonProtocol) CreateAck(echohash common.Hash) *encoding.Ack

CreateAck creat a ack message,

func (*PhotonProtocol) GetNetworkStatus

func (p *PhotonProtocol) GetNetworkStatus(addr common.Address) (deviceType string, isOnline bool)

GetNetworkStatus return `addr` node's network status

func (*PhotonProtocol) SendAndWait

func (p *PhotonProtocol) SendAndWait(receiver common.Address, msg encoding.Messager, timeout time.Duration) error

SendAndWait send this packet and wait ack until timeout

func (*PhotonProtocol) SendAsync

func (p *PhotonProtocol) SendAsync(receiver common.Address, msg encoding.Messager) *utils.AsyncResult

SendAsync send a message asynchronize ,notify by `AsyncResult`

func (*PhotonProtocol) SendPing

func (p *PhotonProtocol) SendPing(receiver common.Address) error

SendPing PingSender

func (*PhotonProtocol) SetReceivedMessageSaver

func (p *PhotonProtocol) SetReceivedMessageSaver(saver ReceivedMessageSaver)

SetReceivedMessageSaver set db saver

func (*PhotonProtocol) Start

func (p *PhotonProtocol) Start(receive bool)

Start photon protocol

func (*PhotonProtocol) StartReceive

func (p *PhotonProtocol) StartReceive()

StartReceive start event loop if not start,otherwise crash

func (*PhotonProtocol) StopAndWait

func (p *PhotonProtocol) StopAndWait()

StopAndWait stop andf wait for clean.

func (*PhotonProtocol) UpdateMeshNetworkNodes

func (p *PhotonProtocol) UpdateMeshNetworkNodes(nodes []*NodeInfo) error

UpdateMeshNetworkNodes update nodes in this intranet

type PingSender

type PingSender interface {
	//SendPing send a ping to receiver,and not block
	SendPing(receiver common.Address) error
}

PingSender do send ping task

type Policier

type Policier interface {
	//Consume tokens.
	//Args:
	//tokens (float): number of transport tokens to consume
	//Returns:
	//wait_time (float): waiting time for the consumer
	Consume(tokens float64) time.Duration
}

Policier to control the sending speed of transporter

type ProtocolReceiver

type ProtocolReceiver interface {
	// contains filtered or unexported methods
}

ProtocolReceiver receive

type ReceivedMessageSaver

type ReceivedMessageSaver interface {
	//GetAck return nil if not found,call this before message sent
	GetAck(echohash common.Hash) []byte
	//SaveAck  marks ack has been sent
	SaveAck(echohash common.Hash, msg encoding.Messager, ack []byte)
}

ReceivedMessageSaver is designed for ignore duplicated message

type SafeUDPConnection

type SafeUDPConnection struct {
	*net.UDPConn
}

SafeUDPConnection a udp connection with lock

func NewSafeUDPConnection

func NewSafeUDPConnection(protocol string, laddr *net.UDPAddr) (*SafeUDPConnection, error)

NewSafeUDPConnection create udp connection

func (*SafeUDPConnection) WriteTo

func (su *SafeUDPConnection) WriteTo(b []byte, addr net.Addr) (n int, err error)

WriteTo only writeto needs protection

type SentMessageState

type SentMessageState struct {
	AsyncResult     *utils.AsyncResult
	AckChannel      chan error
	ReceiverAddress common.Address
	Success         bool

	Message  encoding.Messager //message to send
	EchoHash common.Hash       //message echo hash
	Data     []byte            //packed message
}

SentMessageState is the state of message on sending

type TokenBucket

type TokenBucket struct {
	Capacity float64
	FillRate float64
	Tokens   float64

	Timestamp time.Time
	// contains filtered or unexported fields
}

TokenBucket Implementation of the token bucket throttling algorithm.

func NewTokenBucket

func NewTokenBucket(capacity, fillRate float64, timeFunc ...timeFunc) *TokenBucket

NewTokenBucket create a TokenBucket

func (*TokenBucket) Consume

func (tb *TokenBucket) Consume(tokens float64) time.Duration

Consume calc wait time.

type Transporter

type Transporter interface {
	//Send a message to receiver
	Send(receiver common.Address, data []byte) error
	//Start ,ready for send and receive
	Start()
	//Stop send and receive
	Stop()
	//StopAccepting stops receiving
	StopAccepting()
	//RegisterProtocol a receiver
	RegisterProtocol(protcol ProtocolReceiver)
	//NodeStatus get node's status and is online right now
	NodeStatus(addr common.Address) (deviceType string, isOnline bool)
}

Transporter denotes a communication transport used by protocol

type UDPTransport

type UDPTransport struct {
	UAddr *net.UDPAddr
	// contains filtered or unexported fields
}

UDPTransport represents a UDP server but how to handle listen error? we need stop listen when switch to background restart listen when switch foreground

func MakeTestUDPTransport

func MakeTestUDPTransport(name string, port int) *UDPTransport

MakeTestUDPTransport test only

func NewUDPTransport

func NewUDPTransport(name, host string, port int, protocol ProtocolReceiver, policy Policier) (t *UDPTransport, err error)

NewUDPTransport create UDPTransport,name必须是完整的地址

func (*UDPTransport) HandlePeerFound

func (ut *UDPTransport) HandlePeerFound(id string, addr *net.UDPAddr)

HandlePeerFound notification from mdns

func (*UDPTransport) NodeStatus

func (ut *UDPTransport) NodeStatus(addr common.Address) (deviceType string, isOnline bool)

NodeStatus always mark the node offline

func (*UDPTransport) Receive

func (ut *UDPTransport) Receive(data []byte) error

Receive a message

func (*UDPTransport) RegisterProtocol

func (ut *UDPTransport) RegisterProtocol(proto ProtocolReceiver)

RegisterProtocol register receiver

func (*UDPTransport) Send

func (ut *UDPTransport) Send(receiver common.Address, data []byte) error

Send `bytes_` to `host_port`. Args:

sender (address): The address of the running node.
host_port (Tuple[(str, int)]): Tuple with the Host name and Port number.
bytes_ (bytes): The bytes that are going to be sent through the wire.

func (*UDPTransport) Start

func (ut *UDPTransport) Start()

Start udp listening

func (*UDPTransport) Stop

func (ut *UDPTransport) Stop()

Stop UDP connection

func (*UDPTransport) StopAccepting

func (ut *UDPTransport) StopAccepting()

StopAccepting stop receiving

type XMPPTransport

type XMPPTransport struct {
	NodeAddress common.Address
	// contains filtered or unexported fields
}

XMPPTransport use XMPP to comminucate with other Photon nodes

func MakeTestXMPPTransport

func MakeTestXMPPTransport(name string, key *ecdsa.PrivateKey) *XMPPTransport

MakeTestXMPPTransport create a test xmpp transport

func NewXMPPTransport

func NewXMPPTransport(name, ServerURL string, key *ecdsa.PrivateKey, deviceType string) (x *XMPPTransport)

NewXMPPTransport create xmpp transporter, if not success ,for example cannot connect to xmpp server, will try background

func (*XMPPTransport) DataHandler

func (x *XMPPTransport) DataHandler(from common.Address, data []byte)

DataHandler call back of xmpp connection

func (*XMPPTransport) GetPassWord

func (x *XMPPTransport) GetPassWord() string

GetPassWord returns current login password

func (*XMPPTransport) NodeStatus

func (x *XMPPTransport) NodeStatus(addr common.Address) (deviceType string, isOnline bool)

NodeStatus get node's status and is online right now

func (*XMPPTransport) RegisterProtocol

func (x *XMPPTransport) RegisterProtocol(protcol ProtocolReceiver)

RegisterProtocol a receiver

func (*XMPPTransport) Send

func (x *XMPPTransport) Send(receiver common.Address, data []byte) error

Send a message

func (*XMPPTransport) Start

func (x *XMPPTransport) Start()

Start ,ready for send and receive

func (*XMPPTransport) Stop

func (x *XMPPTransport) Stop()

Stop send and receive

func (*XMPPTransport) StopAccepting

func (x *XMPPTransport) StopAccepting()

StopAccepting stops receiving

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL