gRPC的HTTP2实现

因为最近工作上要做跟gRPC代理相关的东西,在端午假期出门看央美毕业展累得半死之后,还是决定最后一天宅家里看看gRPC的transport的实现,顺便能更了解HTTP2协议。之前写过gRPC负载均衡接口和拦截器相关的笔记,感觉挖的坑越来越多啦。

grpc server

之前就大致看过grpc和go-rpc的实现,总结来说go写的服务端都是一个意思,都是accept conn之后开单独的goroutine来处理这条连接。处理的过程也大概分成固定的几部分:

  1. transport:按照协议封装Transport层,负责conn上的读写。
  2. codeC:根据协议和注册的schema编解码。
  3. registerService:根据注册的service和method,本地处理请求并返回响应。

本篇打算从transport开始记录下grpc里HTTP2协议的实现,最主要是想了解steaming形式的流请求是怎么做的,跟long polling、分块传输有什么本质区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *Server) handleRawConn(rawConn net.Conn) {
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
...
var serve func()
c := conn.(io.Closer)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
c = st
serve = func() { s.serveStreams(st) }

rawConn.SetDeadline(time.Time{})
if !s.addConn(c) {
return
}
go func() {
serve()
s.removeConn(c)
}()
}

grpc用http2server实现了这个接口,并实现了以下方法。handleStreams提供了处理stream的方法,drain告诉client这个连接上拒绝接收新的RPC请求,write相关的方法允许在这个stream上写东西。

1
2
3
4
5
6
7
8
9
10
11
type ServerTransport interface {
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
WriteHeader(s *Stream, md metadata.MD) error
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
WriteStatus(s *Stream, st *status.Status) error
Close() error
RemoteAddr() net.Addr
Drain()
IncrMsgSent()
IncrMsgRecv()
}

HTTP2 transport

WireShark默认支持HTTP2,只要在Analysize的decode as里选择这个端口对应的协议为HTTP2就可以了。在TCP三次握手之后server端主动发送了SETTINGS帧,client端也会回Mgic和SETTINGS。之后发送HEADERS和DATA进行通信,过程中可能伴随WINDOW_UPDATE和PING。

wireshark

newHTTP2Server

这个类似于HTTP2协议的握手,在新建transport时可以看到这个过程。服务端发送的SETTINGS包含一些设置,比如最大并发stream数目。客户端先回应的MAGIC+SETTINGS,其中MAGIC就是PRI表明支持HTTP2.0协议。

The client connection preface starts with a sequence of 24 octets,
which in hex notation are:

0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a

(the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). This sequence is
followed by a SETTINGS [SETTINGS] frame (Section 6.5).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func newHTTP2Server(conn net.Conn, config *ServerConfig)
(_ ServerTransport, err error) {
framer := newFramer(conn, writeBufSize, readBufSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
})

if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
}

ctx, cancel := context.WithCancel(context.Background())
t := &http2Server{...}
t.controlBuf = newControlBuffer(t.ctxDone)
t.framer.writer.Flush()

// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
io.ReadFull(t.conn, preface)
if !bytes.Equal(preface, clientPreface) {
// transport: http2Server.HandleStreams received bogus greeting
}

frame, err := t.framer.fr.ReadFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, err
}
atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
// transport: http2Server.HandleStreams saw invalid preface type
}
t.handleSettings(sf)

go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
errorf("transport: loopyWriter.run returning. Err: %v", err)
}
t.conn.Close()
close(t.writerDone)
}()
go t.keepalive()
return t, nil
}

这个newLoopyWriter是用来写controlBuf中的控制信息的,在需要发SETTINGS帧或者WindowUpdate帧时通常把信息写入controlBuf.put中。

keepalive用来关闭超过最大空闲连接数或者最大空闲时间的conn。

handleStreams

对conn的处理都封装在了transport的HandleStreams方法中。从frame中读取帧并按照帧类型处理。一次通信过程通常以SETINGS+HEADERS+DATA开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (t *http2Server) HandleStreams(handle func(*Stream),
traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
....
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
default:
}
}
}

每个stream都是从收到HEADER帧开始创建和处理的,根据header里带的fields去处理decodeState,新建Stream,下面就是这个header可能带上的fields。

streamheader

stream以streamID为区分,包含recvBuffer以及涉及编解码,方法和序列化方式,流控参数等一系列配置。最后交给handle这个回调方法去处理,在这里面真正开始读请求,进行本地调用和回写响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, 
handle func(*Stream), traceCtx func(context.Context, string) context.Context)
(close bool) {
streamID := frame.Header().StreamID
var state decodeState
for _, hf := range frame.Fields {
if err := state.processHeaderField(hf); err != nil {
}
}
buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
recvCompress: state.encoding,
method: state.method,
contentSubtype: state.contentSubtype,
}
t.maxStreamID = streamID
t.activeStreams[streamID] = s
...
handle(s)
return
}

从stream里读出的method就是helloworld.Greeter/SayHello,前者是服务名,后者是方法名。根据在Server上通过RegisterService注册的服务可以获得对应的method,再根据是Unary还是Stream类型分别处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *Server) handleStream(t transport.ServerTransport, 
stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
pos := strings.LastIndex(sm, "/")
service := sm[:pos]
method := sm[pos+1:]
srv, ok := s.m[service]

// Unary RPC or Streaming RPC?
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}

processUnaryRPC

grpc的协议文档里解释了request和response格式。总结来说就是一个请求分为HEADER帧,可以从帧里读出HTTP头域的信息,其次是连续的DATA帧,用来发请求message,最后有个DATA帧代表end of stream。其中中间用来发message的连续帧的格式是:

Request → Request-Headers *Length-Prefixed-Message EOS

  • Length-Prefixed-Message → Compressed-Flag Message-Length Message
  • Compressed-Flag → 0 / 1 # encoded as 1 byte unsigned integer
  • Message-Length → {length of Message} # encoded as 4 byte unsigned integer
  • Message → *{binary octet}

对应代码来看grpc在处理上直接把这个Length-prefixed-message作为Header帧的后续进行处理,并没有放在单独的dataframe的处理函数中。所以在下面processUnaryRPC的处理过程中recvMsg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *Server) processUnaryRPC(t transport.ServerTransport, 
stream *transport.Stream, srv *service, md *MethodDesc,
trInfo *traceInfo) (err error) {

p := &parser{r: stream}
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)

df := func(v interface{}) error {
s.getCodec(stream.ContentSubtype()).Unmarshal(req, v)
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
....
opts := &transport.Options{
Last: true,
Delay: false,
}
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
return err
}
return t.WriteStatus(stream, status.New(codes.OK, ""))
}

对应协议,每个parser的头对应Compressed-Flag和Message-Length,在UnaryRPC模式下请求Message只有一个。读出后根据不同的codec和注册的方法入参进行反序列化,调用本地方法获得响应。最后通过sendresponse发送响应。收到EOS的data帧之后给stream写入EOF,并处理server端activeStream相关记录表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type parser struct {
r io.Reader
header [5]byte
}
func (p *parser) recvMsg(maxReceiveMessageSize int)
(pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}

pf = payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:])
if length == 0 {
return pf, nil, nil
}

msg = make([]byte, int(length))
if _, err := p.r.Read(msg); err != nil {
return 0, nil, err
}
return pf, msg, nil
}

processStreamingRPC

grpc支持stream流式的请求和响应,sd是从注册的服务中读取的StreamDesc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (s *Server) processStreamingRPC(t transport.ServerTransport,
stream *transport.Stream, srv *service, sd *StreamDesc,
trInfo *traceInfo) (err error) {
...
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ss := &serverStream{
ctx: ctx,
t: t,
s: stream,
p: &parser{r: stream},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: sh,
}
appErr = sd.Handler(server, ss)
...
return t.WriteStatus(ss.s, status.New(codes.OK, ""))
}

这个StreamDesc注册的函数格式是StreamHandler,每个sd.Handler执行之前由processStreamingRPC生成新的serverStream来实现协议基本的SendMsg和RecvMsg方法。

1
2
3
4
5
6
7
8
9
10
11
type StreamHandler func(srv interface{}, stream ServerStream) error

// StreamDesc represents a streaming RPC service's method specification.
type StreamDesc struct {
StreamName string
Handler StreamHandler

// At least one of these is true.
ServerStreams bool
ClientStreams bool
}

SendMsg和RecvMsg的实现跟processUnaryRPC里的一致。之所以是称为stream是指在server端的实现上通常是下面这种循环模式。最终以_User_Chat_Handler注册到StreamDesc里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (svr *UserService) Chat(stream pb.User_ChatServer) error {
log.Printf("Begin Chat")
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
msg := in.GetTalk()
stream.Send(&pb.UserMessage{
Talk: "You Talk: " + msg,
})
}
}

这个示例转自某个博客,暂时没找到原始出处,之后补上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func _User_Chat_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(UserServer).Chat(&userChatServer{stream})
}
type User_ChatServer interface {
Send(*UserMessage) error
Recv() (*UserMessage, error)
grpc.ServerStream
}
type userChatServer struct {
grpc.ServerStream
}
func (x *userChatServer) Send(m *UserMessage) error {
return x.ServerStream.SendMsg(m)
}
func (x *userChatServer) Recv() (*UserMessage, error) {
m := new(UserMessage)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

register service

在server端的实现里通常pb会定义这个ServiceDesc以及包含的MethodDesc。外部可以自定义满足GreeterServer接口的具体实例通过RegisterGreeterServer注册到grpcServer里。

1
2
3
4
5
6
7
type GreeterServer interface {
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}

注册的这个MethodDesc的Handler是满足特定func入参格式的handler。在processUnaryRPC处理函数中会根据服务名和方法名找到Handler,并调用_Greeter_SayHello_Handler。其中srv就是register的那个实例,dec是加入在这里做compress和反序列化的函数,interceptor是拦截器可参考我之前博客gRPC拦截器interceptor与chain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context,
dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor)
(interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GreeterServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/helloworld.Greeter/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}

grpc注册和读取的方式都很直接,pb的相关东西也是自动生成的。并不需要像go-rpc那样通过反射来存取相关函数的入参等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
....
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}

codec

grpc的encoding包提供了proto和gzip两种方式,以插件的形式存在。一般都可以直接proto.Unmarshal。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (codec) Unmarshal(data []byte, v interface{}) error {
protoMsg := v.(proto.Message)
protoMsg.Reset()

if pu, ok := protoMsg.(proto.Unmarshaler); ok {
// object can unmarshal itself, no need for buffer
return pu.Unmarshal(data)
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return err
}

summary

主要写了HTTP2协议结合transport的实现,理解stream和grpc server端服务注册相关的内容。下一节写grpc client端和proto相关的内容吧。