ServerPush & ClientPull
前两天总结了一下HTTP2相关的发展过程,其中最重要的两点是:实现了服务端Push和多路复用。本篇结合目前见过的一些实现Push&Pull的实例,总结一下数据交互的实现方式:
- etcd v2里的长轮询 long polling的方式
- k8s-apiserver的stream的方式
- service center和config center的websocket的方式
- gRPC也就是http2的server push方式
long polling
由于http1.x没有服务端push的机制,为了watch服务端的数据变化,最简单的办法当然是客户端去pull:客户端每隔定长时间去服务端拉数据同步,无论有没有服务端有没有数据变化。但是必然存在通知不及时和大量无效的轮询的问题。long polling就是在这个polling的基础上的优化,当客户端发起long polling时,如果服务端没有相关数据,会hold住请求,直到服务端有数据要发或者超时才会返回。
client
etcdv2是个比较典型的long polling的例子。下面是客户端keysAPI的代码,它通过Watcher接口返回一个实现了Next方法的实例,客户端通过循环调用Next获得所有服务端事件。
1 | Watcher(key string, opts *WatcherOptions) Watcher |
Next方法里client只是发了标记为wait的请求,通过统一的transport发到服务端。nexWait是用来生成请求体的,请求体的方法为GET,只是params带了wait字段,让服务端识别。
1 | func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { |
server
github.com/etcd/etcdserver/api/v2http/client.go
对应etcdv2的服务端keysHandler
的处理过程是:调用etcdServer的Do方法,根据v2apistore的Get返回event或者watcher。如果请求中有wait字段,那么会返回一个kvStrore的watcher。
1 | func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
在处理watch请求时,通常都是使用context设置超时时间,但是这里defaultWatchTimeout设置的是maxInt64,所以watch的超时是客户端决定的,当超时发生close连接,server通过CloseNotifier
得到通知并放弃处理。
CloseNotifier Flusher
服务端首先把header flush到连接上,以免客户端等待header超时。之后等待内部kvstore的chan上有事件准备好,并发送。stream这个参数在etcdv2这个场景下为false,也就是long pollling获得数据即可以返回。
1 | func handleKeyWatch(ctx context.Context, w http.ResponseWriter, |
streaming
stream是要在同一个连接上,分多个部分发送HTTP响应。一般HTTP的响应中发送的数据是整个发送,并且通过Content-Length消息头字段表示数据的长度。如果分多块传输,需要另外的编码方式,于是Chunked
编码(分块传输编码)引入到了HTTP1.1协议中。它允许HTTP服务端动态生成内容,消息体由数量未定的块组成,并且以最后一个大小为0的块结束。
1 | HTTP/1.1 200 OK |
server & serveWatch
k8s的服务端watch接口是通过etcd的watch接口实现的长连接方式。最终注册到go-restful的Watch路由,对应GET方法和ListResource这个handlerFunc。
k8s.io/apiserver/pkg/endpoints
1 | func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, |
其中watcher是内部storage通过etcd的watch接口封装的返回事件的chan。serveWatch就是在处理这个内部chan,并把chan上发生的事件通过chunk编码发给客户端。这个循环可能因为客户端close连接或超时而结束。
1 | func serveWatch(watcher watch.Interface, scope RequestScope, |
1 | func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { |
registry & storage
这一节本来跟stream没有关系,但它是对etcd的watch的封装所以还是记一下。
上面内部watcher是rest.StandardStorage
接口,它是以下所有接口的组合。它的实现registry.Store
提供了N个函数挂载点,对所有资源类型提供了统一的实现。比如每种资源都实现了NewFunc和KeyFunc,Store统一实现Creater接口实现对每种资源的创建,并最终调用storage包面向etcd的接口实现到后端数据库的持久化。
1 | type StandardStorage interface { |
比如store封装的watch接口最终到storage里面向etcd的watch接口。
1 | func (e *Store) Watch(ctx context.Context, |
etcdHelper这个包封装了etcdv2的接口,最终是通过循环处理Watcher.Next来实现内部事件的产生。这个过程还涉及到storage的watch cache。详细过程下一次写watch cache再写吧。
1 | func (h *etcdHelper) WatchList(ctx context.Context, key string, |
1 | func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, |
client
客户端通过Do获取到服务端的第一个Header响应。最后通过StreamWatcher封装好watch的ResultChan接口,它从连接上decoder反序列化数据由streamWatcher封装好返回。
1 | func (r *Request) Watch() (watch.Interface, error) { |
这个接口最常见的地方是在reflector
的listWatch
当中, 在watch循环中通常客户端会指定超时时间5分钟
,好让服务端知道什么时候超时结束。
1 | for { |
web-socket
上面的两种方式,其实都非常浪费资源。长轮询必须不停连接,长连接必须保持HTTP连接始终打开。websocket就是另一种解决服务端push的方法。简单来说,它是建立在TCP协议之上的ws协议,它跟HTTP协议有良好的兼容性,数据格式比较轻量,可以发送文本,也可以二进制。
WebSocket复用了HTTP一部分握手过程。客户端通过HTTP请求与WebSocket服务端协商要求升级协议。协议升级完成后,后续的数据交换则遵照WebSocket的协议。以下是客户端发出的请求。
1 | GET / HTTP/1.1 |
服务端回应101表示切换协议。具体协议参考WebSocket Protocol
1 | HTTP/1.1 101 Switching Protocols |
连接建立并协议升级后,双方的通信进入web-socket协议,它有以下特点:
- 是真正的全双工方式,可以互相主动请求。
- 在已经建立好的TCP连接中,交换数据不需要再发送和解析HTTP header。
- 可以利用协议头的sec-websocket-key来进行连接复用,不同的URL可以复用同一个连接。
upgrade
web-socket的服务端首先要完成协商协议升级的事情,且后续Handler的处理不用再经过httpServer的请求解析,然后路由的部分,仍然再已建立的httpconn上完成后续的信息交互。
1 | func echo(w http.ResponseWriter, r *http.Request) { |
协议升级其实只是校验方法是否为GET,请求头是否有对应的升级标记。从http的Hijacker中获取原始netConn,并且回复server端的101协议升级信息。
1 | func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, |
keepalive
websocket的连接无法感知对方是否关闭了连接,所以协议层设计了Ping-Pang来做keepalive。每隔7s秒发一次PingMessage,收到PongMessage就更新lastResponseTime,如果超过15s都没有PongMessage的回应,则断开websocket连接。
1 | func (dynHandler *DynamicConfigHandler) startDynamicConfigHandler() error { |
1 | func keepAlive(c *websocket.Conn, timeout time.Duration) { |
http2
http2的writer实现了Pusher接口,通过push可以把消息发给http2conn内部维护的wantStartPushCh
,在conn的serve过程中分发这个msg开始startPush。最终是发了pushPromiseFrame,具体的过程下次写http2实现的时候再说吧。
1 | http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { |
这里只解释一个问题:为什么http2没有改变语义、方法、状态码和URI、以及首部字段,它也没有使用分块传输编码,它如何实现push和多路复用,改进了传输性能。
应用层和传输层之间增加了一个二进制分帧层。
HTTP2将要传输的信息分割成更小的消息和帧,并对它们采用二进制格式的编码。原来的HTTP header和HTTP body都以Frame Payload存在。每个Frame的FrameHeader描述了这个帧的长度,类型等信息。
- Length: Frame Payload 的长度, Frame Header 的长度是 9 字节(Length + Type + Flags + R + Stream Identifier = 72 bit)。
- Type: Frame Payload 存储的数据是属于 HTTP Header 还是 HTTP Body
- Flags: 共 8 位, 每位都起标记作用。每种不同的 Frame Type 都有不同的 Frame Flags。例如发送最后一个 DATA 类型的 Frame 时,就会将 Flags 最后一位设置 1(
flags &= 0x01
),表示 END_STREAM,说明这个 Frame 是流的最后一个数据包。 - Stream Identifier: 流 ID,当客户端和服务端建立 TCP 链接时,就会先发送一个 Stream ID = 0 的流,用来做些初始化工作。之后客户端和服务端从 1 开始发送请求/响应。
1 | +-----------------------------------------------+ |
理解:HTTP1.x客户端解析读取响应是根据响应头的Content-Len读取body体,然后返回。为了让它流式读取,server端要在头里告诉client 现在要变更编码方式为chunked,之后进行分块传输,直到server端发了大小为0的数据。
HTTP2引入Frame了之后完全改变了原来的编解码方式,整个方式类似很多RPC协议。帧由二进制编码,帧头固定位置的字节描述body长度,就可以读取body体,直到flags遇到END_STREAM。这种方式天然支持服务端在stream上发送数据,不需要通知客户端做什么改变。
其次streamID这个是用来做多路复用的,跟许多RPC协议里的msgID是一个意思。