记go-RPC的实现

Remote Procedure Call

RPC是一种进程间通信协议,它允许程序调用另一个地址空间的过程或函数。开发RPC的动机和核心问题就是如何执行另外一个地址空间上的函数和方法,就像本地调用一样。

在网络通信中,RPC相当于一种约束Request和Response的协议。目前RPC框架大致有两种不同的侧重,一种是偏向于服务治理,提供了丰富的功能,适用于大型服务的微服务拆分和管理,另一种侧重于跨语言调用,比如gRPC。smallnest.gitbooks.io/go-rpc讨论了国内外许多RPC框架,总结了RPC调用的基本过程如下

个人理解RPC就是描述client server间点对点的通信过程,它要实现stub,通信和消息解析三个部分。下面就从这三个方面记录下go标准库的RPC是怎么实现这三部分的。

  • stub主要完成协议结构(Wire Protocol),它要跟序列化和反序列化配合完成消息的读取和转换
  • 通信传输(Transport)可以用TCP也可以HTTP
  • 序列化反序列化(Serialization)可以是protobuf也可以是json等,go-RPC的序列化用gob做的

Go-RPC-Client

官方例子中客户端示例中,client调用Call方法即可获得结果。RPC实现的要点就是如何把这个rpc call在client stub转化为发给server的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal("dialing:", err)
}
args := &schema.Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
log.Println("Arith:", args.A, "*", args.B, "=", reply)
}

DialHTTP

在RPC Over HTTP的场景下,rpc.DialHTTP其实就是用默认Path = /\_goRPC\_发一条CONNECT请求给server端。如果正常相应并连接,则创建rpcClient进行后续处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func DialHTTPPath(network, address, path string) (*Client, error) {
var err error
conn, err := net.Dial(network, address)
...
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
resp, err := http.ReadResponse(bufio.NewReader(conn),
&http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
...
conn.Close()
return nil, &net.OpError{Op:"dial-http"}
}

Go & Do

go-RPC包默认使用gob编解码,本节跳过编解码过程。创建rpcClient的过程开启了input goroutine,等待响应。

1
2
3
4
5
6
7
8
9
10
11
12
type Client struct {
codec ClientCodec

reqMutex sync.Mutex // protects following
request Request

mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}
1
2
3
4
5
6
7
8
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}

call方法是就是将method以及req和reply封装在数据结构Call中,最终经过client.send发送。

1
2
3
4
5
6
7
type Call struct {
ServiceMethod string
Args interface{}
Reply interface{}
Error error
Done chan *Call
}

rpc包提供给外部调用的是call方法,调用了异步的Go,他们通过call同步,但Call没有提供超时机制,肯定有性能问题。Do是rpc包提供的默认同步调用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

send & input

之前看http client的实现,其实就是要实现conn复用。http的实现方式是通过复用一个DefaultTransport,在其中维护连接池来实现。而rpc的client是通过seq序列号和pending来记录client上每个请求。

这里send需要上锁是是因为一个rpcClient可以支持并发发送请求。pending这个map是用来存目前client正在处理的call,其中key用seq来标记,seq单调递增,这个seq类似在对client端的请求编号。

最后通过codec进行请求参数的序列化,并写入socket。若返回错误将pending中的请求记录删掉,并通过call同步调用方本次call已完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()

client.mutex.Lock()
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()

// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

input负责这条RPC连接上所有的响应读取。如果请求成功,则删掉pending对应的请求记录,并取出call记录反序列化响应到call.reply上,最后通知调用方call.done。

当读取响应头出错后说明发生连接关闭或EOF等错误,这时要把这个client上所有pending的请求全部call.done。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (client *Client) input() {
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()

switch {
case call == nil:
case response.Error != "":
call.Error = ServerError(response.Error)
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
}

Go-RPC-Server

这是go-rpc官方注释里的例子,显然这是个RPC Over HTTP的例子。rpc.HandleHTTP直接向http的ServerMux注册了默认的rpcPath=/\_goRPC\_rpc.DefaultServer

1
2
3
4
5
6
7
8
9
func main() {
arith := new(schema.Arith)
rpc.Register(arith)
rpc.HandleHTTP()
e := http.ListenAndServe(":1234", nil)
if e != nil {
log.Fatal("listen error:", e)
}
}
1
2
3
4
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
}

serveHTTP & serveConn

上一节讲了client如果是DialHTTP会先通过发送CONNECT请求来建立连接,对应的RPC server端只处理CONNECT这种请求,并通过Hijack读出conn,然后开始处理这个TCPconn。

1
2
3
4
5
6
7
8
9
10
11
12
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
...
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

serveConn包含了RPC server端所有的操作:read请求,开goroutine处理请求,返回响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}

register & call

server端就是要根据service,method以及对应的请求参数来执行一次远程调用,并返回响应。如何注册一个服务,如何从注册服务中找到要执行的方法,go-RPC是通过反射提供的类型信息完成的。

下面Server的数据结构,serviceMap用于保存所有注册的服务。其中service保留了服务名,服务类型以及服务实例,服务实例通常是指针的值。method中保留了服务对外Export的方法,以及方法名,参数和响应类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Server struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}

以arith示例为例,这里*schema.Arith是注册服务实例的类型。s.rcvr是该指针的值,即这个实例的地址。sname通过Indirect方法获得这个指针Value的Elem,即获得了该指针指向的实例,通过Type获得它的类型是schema.Arith,注册名为Arith。

reflect.Indirect方法是返回的指针value的Elem,是副本还是直接取地址,是否可改变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (server *Server) register(rcvr interface{}) error {
server.mu.Lock()
defer server.mu.Unlock()

s := new(service)
s.typ = reflect.TypeOf(rcvr) // type of receiver
s.rcvr = reflect.ValueOf(rcvr) // receiver itself
sname := reflect.Indirect(s.rcvr).Type().Name()
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
return errors.New(str)
}
server.serviceMap[s.name] = s
return nil
}

register的过程中检查该类型所有的method,并且通过NumInIn检查入参的类型和是否Exported,其中reply的类型必须是指针。最后NumOutOut 是用来检查返回值是否只有一个且类型为error。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()

func suitableMethods(typ reflect.Type) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
// method.Type ~ func(*schema.Arith, *schema.Args, *schema.Quotient) error
// method.Name ~ Divide
mtype := method.Type
mname := method.Name

// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
methods[mname] = &methodType{method: method,
ArgType: argType, ReplyType: replyType}
}
return methods
}

read request & header

处理连接时首先读取header,为了重用request数据结构,go-RPC用了一个链表。有点难理解的是Request其实只是一个头,其中有Seq和Method。从Req中读取的ServiceMethod通常是 Arith.Divide这种形式,最后就通过server.serviceMap找到服务名和方法名。

1
2
3
4
5
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (server *Server) readRequestHeader(codec ServerCodec) (service *service, 
mtype *methodType, req *Request, keepReading bool, err error) {
req = server.getRequest()
err = codec.ReadRequestHeader(req)
...
keepReading = true
dot := strings.LastIndex(req.ServiceMethod, ".")
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]

server.mu.RLock()
service = server.serviceMap[serviceName]
server.mu.RUnlock()
mtype = service.method[methodName]
return
}

在header中读取到method之后,可以知道参数类型,通过reflect.New生成对应该类型的PtrTo(typ),即*schema.Arith类型。Interface方法返回的是当前argv的值,也就是&{0,0}。同理最后replyv就是创建的存储reply对象的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (server *Server) readRequest(codec ServerCodec) (service *service, 
mtype *methodType, req *Request, argv, replyv reflect.Value,
keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
// Decode the argument value.
argIsValue := false
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
replyv = reflect.New(mtype.ReplyType.Elem())
return
}

call & response

reflect中的Method的Type字段存储方法类型,Func字段直接存储方法,该方法以receiver为第一个入参。返回值通过Interface反射回来,可能是nil或error类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, 
req *Request, argv, replyv reflect.Value, codec ServerCodec) {
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}

回写响应要把客户端带的seq返回,并且写响应时需要lock的。

1
2
3
4
5
6
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
err := codec.WriteResponse(resp, reply)
sending.Unlock()
server.freeResponse(resp)
}

Reference

体系化认识RPC