Documentation
¶
Overview ¶
Project `amqprpc` provides access to the exported methods of an object across RabbitMQ connection. It is meant as a near drop-in implementation of TCP based built-in `net/rpc` package thus most of characteristics and limitations are kept intact. The major difference stems from the fact of using messaging queues for transport, one well defined queue for requests and a temporary queue established internally by a client for processing responses. The implementation is intended to follow to certain extent RabbitMQ tutorial http://www.rabbitmq.com/tutorials/tutorial-six-go.html
A simple example of a use-case adopted from "net/rpc" would look as follows using a queue based RPC:
A server wishes to export an object of type Arith:
package X import "errors" type Args struct { A, B int } type Quotient struct { Quo, Rem int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func (t *Arith) Divide(args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by zero") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil }
The server calls for AMQP service:
package X const ( RPCQueueName = "rpc_queue" AMQPURI = "amqp://guest:guest@127.0.0.1:5672//" ) arith := new(Arith) server, err := amqprpc.NewServer(AMQPURI) if err != nil { panic(err) } defer server.Close() server.Register(RPCQueueName, arith) err = server.Listen() if err != nil { panic(err) }
At this point, clients can see a service "Arith" with methods "Arith.Multiply" and "Arith.Divide". To invoke one, a client first dials the server:
package X client, err := amqprpc.NewClient(AMQPURI) if err != nil { panic(err) } defer client.Close()
Then it can make a remote call:
// Synchronous call
args := &X.Args{7,8} var reply int err = client.Call(RPCQueueName, "Arith.Multiply", args, &reply) if err != nil { panic(err) } fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
or
// Asynchronous call
quotient := new(X.Quotient) divCall := client.Go(RPCQueueName, "Arith.Divide", args, quotient, nil) replyCall := <-divCall.Done // will be equal to divCall // check errors, print, etc.
Index ¶
- type Call
- type Client
- type ClientCodec
- type ClientConfig
- type GobClientCodec
- type GobServerCodec
- type Server
- func (server *Server) Close()
- func (server *Server) ConnError() error
- func (server *Server) Listen() error
- func (s *Server) Register(key string, rcvr interface{}) error
- func (s *Server) RegisterConfig(qn, name string, rcvr interface{}, conf *ServiceConfig) error
- func (s *Server) RegisterName(key, name string, rcvr interface{}) error
- type ServerCodec
- type ServerConfig
- type ServiceConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Call ¶
type Call struct { ServiceMethod string // The name of the service and method to call. Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call // Strobes when call is complete // contains filtered or unexported fields }
Call represents an active RPC.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is RPC Client
func NewClient ¶
NewClient returns a new Client to handle requests to the set of services at broker connected using AMQP URI url
func NewClientConfig ¶
func NewClientConfig(url string, config *ClientConfig) (*Client, error)
NewClientConfig returns a new Client to handle requests to the set of services at broker connected using AMQP URI url customised with config parameters
func (*Client) Call ¶
Call invokes the named function, waits for it to complete, and returns its error status.
func (*Client) Close ¶
func (client *Client) Close()
Close sends a close message to underlying connection and empties internal pending message pool
func (*Client) Go ¶
func (client *Client) Go(key, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
type ClientCodec ¶
type ClientCodec interface { // ClientCodec returns a one-time codec for serializing requests ClientCodec(*bytes.Buffer) rpc.ClientCodec // ContentType is MIME content type for messages encoded with the codec ContentType() string }
type ClientConfig ¶
type ClientConfig struct { // PoolSize is number of threads sending requests to the server // and processing responses PoolSize uint // TLS is SSL connection configuration TLS *tls.Config // Codec is an encoder/decoder used for serialization of requests Codec ClientCodec }
type GobClientCodec ¶
type GobClientCodec struct {
// contains filtered or unexported fields
}
func (GobClientCodec) ClientCodec ¶
func (c GobClientCodec) ClientCodec(buf *bytes.Buffer) rpc.ClientCodec
func (GobClientCodec) ContentType ¶
func (c GobClientCodec) ContentType() string
type GobServerCodec ¶
type GobServerCodec struct {
// contains filtered or unexported fields
}
func (GobServerCodec) ContentType ¶
func (c GobServerCodec) ContentType() string
func (GobServerCodec) ServerCodec ¶
func (c GobServerCodec) ServerCodec(in *bytes.Buffer, out *bytes.Buffer) rpc.ServerCodec
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is a AMQP-RPC server wrapper
func NewServerConfig ¶
func NewServerConfig(url string, config *ServerConfig) (*Server, error)
NewServerConfig returns a new Server with customisable max number of attempts to re-connect to the broker after failure MaxAttempts, delay between connections ConnDelay and TLS connection configuration using ServerConfig config.
func (*Server) Close ¶
func (server *Server) Close()
Close sends request to interrupt underlying connection
func (*Server) ConnError ¶
ConnError returns last connection reported from underlying connection attempt
func (*Server) Register ¶
Register is a queue key oriented wrapper for net/rpc.Server.Register that configures the set of methods of the receiver value that satisfy the following conditions:
- exported method of exported type
- two arguments, both of exported type
- the second argument is a pointer
- one return value, of type error
It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error using package log. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.
func (*Server) RegisterConfig ¶
func (s *Server) RegisterConfig(qn, name string, rcvr interface{}, conf *ServiceConfig) error
func (*Server) RegisterName ¶
RegisterName is like Register but uses the provided name for the type instead of the receiver's concrete type.
type ServerCodec ¶
type ServerCodec interface { // ServerCodec returns a one-time codec used in rpc.Server.ServeRequest // call with input byte buffer (invoked with bytes.NewBuffer(d.Body)) // and a transient output byte buffer for writing a response back to // the message broker ServerCodec(*bytes.Buffer, *bytes.Buffer) rpc.ServerCodec // ContentType is MIME content type for messages encoded with the codec ContentType() string }
ServerCodec is a synchronous codec used to decode amqp.Delivery Body using rpc.ServerCodec