因为最近工作上要做跟gRPC代理相关的东西,在端午假期出门看央美毕业展累得半死之后,还是决定最后一天宅家里看看gRPC的transport的实现,顺便能更了解HTTP2协议。之前写过gRPC负载均衡接口和拦截器相关的笔记,感觉挖的坑越来越多啦。
grpc server
之前就大致看过grpc和go-rpc的实现,总结来说go写的服务端都是一个意思,都是accept conn之后开单独的goroutine来处理这条连接。处理的过程也大概分成固定的几部分:
transport
:按照协议封装Transport层,负责conn上的读写。codeC
:根据协议和注册的schema编解码。registerService
:根据注册的service和method,本地处理请求并返回响应。
本篇打算从transport开始记录下grpc里HTTP2协议的实现,最主要是想了解steaming形式的流请求是怎么做的,跟long polling、分块传输有什么本质区别。
1 | func (s *Server) handleRawConn(rawConn net.Conn) { |
grpc用http2server实现了这个接口,并实现了以下方法。handleStreams提供了处理stream的方法,drain告诉client这个连接上拒绝接收新的RPC请求,write相关的方法允许在这个stream上写东西。
1 | type ServerTransport interface { |
HTTP2 transport
WireShark默认支持HTTP2,只要在Analysize的decode as里选择这个端口对应的协议为HTTP2就可以了。在TCP三次握手之后server端主动发送了SETTINGS帧,client端也会回Mgic和SETTINGS。之后发送HEADERS和DATA进行通信,过程中可能伴随WINDOW_UPDATE和PING。
newHTTP2Server
这个类似于HTTP2协议的握手,在新建transport时可以看到这个过程。服务端发送的SETTINGS包含一些设置,比如最大并发stream数目。客户端先回应的MAGIC+SETTINGS,其中MAGIC就是PRI表明支持HTTP2.0协议。
The client connection preface starts with a sequence of 24 octets,
which in hex notation are:
0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a
(the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). This sequence is
followed by a SETTINGS [SETTINGS] frame (Section 6.5).
1 | func newHTTP2Server(conn net.Conn, config *ServerConfig) |
这个
newLoopyWriter
是用来写controlBuf中的控制信息的,在需要发SETTINGS帧或者WindowUpdate帧时通常把信息写入controlBuf.put中。
keepalive
用来关闭超过最大空闲连接数或者最大空闲时间的conn。
handleStreams
对conn的处理都封装在了transport的HandleStreams方法中。从frame中读取帧并按照帧类型处理。一次通信过程通常以SETINGS+HEADERS+DATA开始。
1 | func (t *http2Server) HandleStreams(handle func(*Stream), |
每个stream都是从收到HEADER帧开始创建和处理的,根据header里带的fields去处理decodeState,新建Stream,下面就是这个header可能带上的fields。
stream以streamID为区分,包含recvBuffer以及涉及编解码,方法和序列化方式,流控参数等一系列配置。最后交给handle这个回调方法去处理,在这里面真正开始读请求,进行本地调用和回写响应。
1 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, |
从stream里读出的method就是helloworld.Greeter/SayHello
,前者是服务名,后者是方法名。根据在Server上通过RegisterService注册的服务可以获得对应的method,再根据是Unary还是Stream类型分别处理。
1 | func (s *Server) handleStream(t transport.ServerTransport, |
processUnaryRPC
grpc的协议文档里解释了request和response格式。总结来说就是一个请求分为HEADER帧,可以从帧里读出HTTP头域的信息,其次是连续的DATA帧,用来发请求message,最后有个DATA帧代表end of stream。其中中间用来发message的连续帧的格式是:
Request → Request-Headers *Length-Prefixed-Message EOS
- Length-Prefixed-Message → Compressed-Flag Message-Length Message
- Compressed-Flag → 0 / 1 # encoded as 1 byte unsigned integer
- Message-Length → {length of Message} # encoded as 4 byte unsigned integer
- Message → *{binary octet}
对应代码来看grpc在处理上直接把这个Length-prefixed-message作为Header帧的后续进行处理,并没有放在单独的dataframe的处理函数中。所以在下面processUnaryRPC的处理过程中recvMsg
1 | func (s *Server) processUnaryRPC(t transport.ServerTransport, |
对应协议,每个parser的头对应Compressed-Flag和Message-Length,在UnaryRPC模式下请求Message只有一个。读出后根据不同的codec和注册的方法入参进行反序列化,调用本地方法获得响应。最后通过sendresponse发送响应。收到EOS的data帧之后给stream写入EOF,并处理server端activeStream相关记录表。
1 | type parser struct { |
processStreamingRPC
grpc支持stream流式的请求和响应,sd是从注册的服务中读取的StreamDesc。
1 | func (s *Server) processStreamingRPC(t transport.ServerTransport, |
这个StreamDesc注册的函数格式是StreamHandler,每个sd.Handler
执行之前由processStreamingRPC生成新的serverStream
来实现协议基本的SendMsg和RecvMsg方法。
1 | type StreamHandler func(srv interface{}, stream ServerStream) error |
SendMsg和RecvMsg的实现跟processUnaryRPC里的一致。之所以是称为stream是指在server端的实现上通常是下面这种循环模式。最终以_User_Chat_Handler注册到StreamDesc里。
1 | func (svr *UserService) Chat(stream pb.User_ChatServer) error { |
这个示例转自某个博客,暂时没找到原始出处,之后补上。
1 | func _User_Chat_Handler(srv interface{}, stream grpc.ServerStream) error { |
register service
在server端的实现里通常pb会定义这个ServiceDesc以及包含的MethodDesc。外部可以自定义满足GreeterServer接口的具体实例通过RegisterGreeterServer注册到grpcServer里。
1 | type GreeterServer interface { |
注册的这个MethodDesc的Handler是满足特定func入参格式的handler。在processUnaryRPC处理函数中会根据服务名和方法名找到Handler,并调用_Greeter_SayHello_Handler
。其中srv就是register的那个实例,dec是加入在这里做compress和反序列化的函数,interceptor是拦截器可参考我之前博客gRPC拦截器interceptor与chain。
1 | var _Greeter_serviceDesc = grpc.ServiceDesc{ |
grpc注册和读取的方式都很直接,pb的相关东西也是自动生成的。并不需要像go-rpc那样通过反射来存取相关函数的入参等信息。
1 | func (s *Server) register(sd *ServiceDesc, ss interface{}) { |
codec
grpc的encoding包提供了proto和gzip两种方式,以插件的形式存在。一般都可以直接proto.Unmarshal。
1 | func (codec) Unmarshal(data []byte, v interface{}) error { |
summary
主要写了HTTP2协议结合transport的实现,理解stream和grpc server端服务注册相关的内容。下一节写grpc client端和proto相关的内容吧。