实现Watch的Server Push与Client Poll方式

ServerPush & ClientPull

前两天总结了一下HTTP2相关的发展过程,其中最重要的两点是:实现了服务端Push和多路复用。本篇结合目前见过的一些实现Push&Pull的实例,总结一下数据交互的实现方式:

  1. etcd v2里的长轮询 long polling的方式
  2. k8s-apiserver的stream的方式
  3. service center和config center的websocket的方式
  4. gRPC也就是http2的server push方式

long polling

由于http1.x没有服务端push的机制,为了watch服务端的数据变化,最简单的办法当然是客户端去pull:客户端每隔定长时间去服务端拉数据同步,无论有没有服务端有没有数据变化。但是必然存在通知不及时和大量无效的轮询的问题。long polling就是在这个polling的基础上的优化,当客户端发起long polling时,如果服务端没有相关数据,会hold住请求,直到服务端有数据要发或者超时才会返回。

client

etcdv2是个比较典型的long polling的例子。下面是客户端keysAPI的代码,它通过Watcher接口返回一个实现了Next方法的实例,客户端通过循环调用Next获得所有服务端事件。

1
Watcher(key string, opts *WatcherOptions) Watcher

Next方法里client只是发了标记为wait的请求,通过统一的transport发到服务端。nexWait是用来生成请求体的,请求体的方法为GET,只是params带了wait字段,让服务端识别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
for {
httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
if err != nil {
return nil, err
}
resp, err := unmarshalHTTPResponse(httpresp.StatusCode,
httpresp.Header, body)
if err != nil {
if err == ErrEmptyBody {
continue
}
return nil, err
}
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
return resp, nil
}
}

server

github.com/etcd/etcdserver/api/v2http/client.go

对应etcdv2的服务端keysHandler的处理过程是:调用etcdServer的Do方法,根据v2apistore的Get返回event或者watcher。如果请求中有wait字段,那么会返回一个kvStrore的watcher。

1
2
3
4
5
6
7
8
9
10
11
func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp, err := h.server.Do(ctx, rr)
switch {
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(
context.Background(), defaultWatchTimeout)
defer cancel()
handleKeyWatch(ctx, w, resp, rr.Stream)
...
}
}

在处理watch请求时,通常都是使用context设置超时时间,但是这里defaultWatchTimeout设置的是maxInt64,所以watch的超时是客户端决定的,当超时发生close连接,server通过CloseNotifier得到通知并放弃处理。

CloseNotifier Flusher

服务端首先把header flush到连接上,以免客户端等待header超时。之后等待内部kvstore的chan上有事件准备好,并发送。stream这个参数在etcdv2这个场景下为false,也就是long pollling获得数据即可以返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func handleKeyWatch(ctx context.Context, w http.ResponseWriter,
resp etcdserver.Response, stream bool) {
wa := resp.Watcher
defer wa.Remove()
ech := wa.EventChan()

w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
w.WriteHeader(http.StatusOK)
// Ensure headers are flushed early, in case of long polling
w.(http.Flusher).Flush()
for {
select {
case <-nch: // CloseNotifier, Client closed connection. Nothing to do.
return
case <-ctx.Done(): // Timed out.
return
case ev, ok := <-ech:
if !ok {
return
}
ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
if err := json.NewEncoder(w).Encode(ev); err != nil {
plog.Warningf("error writing event (%v)", err)
return
}
if !stream {
return
}
w.(http.Flusher).Flush()
}
}
}

streaming

stream是要在同一个连接上,分多个部分发送HTTP响应。一般HTTP的响应中发送的数据是整个发送,并且通过Content-Length消息头字段表示数据的长度。如果分多块传输,需要另外的编码方式,于是Chunked编码(分块传输编码)引入到了HTTP1.1协议中。它允许HTTP服务端动态生成内容,消息体由数量未定的块组成,并且以最后一个大小为0的块结束。

1
2
3
4
5
6
7
8
9
10
11
12
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
25
This is the data in the first chunk
1C
and this is the second one
3
con
8
sequence
0

server & serveWatch

k8s的服务端watch接口是通过etcd的watch接口实现的长连接方式。最终注册到go-restful的Watch路由,对应GET方法和ListResource这个handlerFunc。

k8s.io/apiserver/pkg/endpoints

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope,
forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
if opts.Watch || forceWatch {
if rw == nil {
return
}
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
serveWatch(watcher, scope, req, w, timeout)
return
}
}
}

其中watcher是内部storage通过etcd的watch接口封装的返回事件的chan。serveWatch就是在处理这个内部chan,并把chan上发生的事件通过chunk编码发给客户端。这个循环可能因为客户端close连接或超时而结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func serveWatch(watcher watch.Interface, scope RequestScope,
req *http.Request, w http.ResponseWriter, timeout time.Duration) {
// negotiate for the stream serializer ...
server := &WatchServer{
Watching: watcher,
Scope: scope,
UseTextFraming: useTextFraming,
MediaType: mediaType + ";stream=watch",
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {},
TimeoutFactory: &realTimeoutFactory{timeout},
}
server.ServeHTTP(w, req)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// if isWebSocketRequest
cn, ok1 := w.(http.CloseNotifier)
flusher, ok2 := w.(http.Flusher)
if !ok1 || !ok2 {
return
}

framer := s.Framer.NewFrameWriter(w)
e := streaming.NewEncoder(framer, s.Encoder)
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()

w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

buf := &bytes.Buffer{}
ch := s.Watching.ResultChan()
for {
select {
case <-cn.CloseNotify():
return
case <-timeoutCh:
return
case event, ok := <-ch:
if !ok {
return
}
obj := event.Object
s.EmbeddedEncoder.Encode(obj, buf)

unknown.Raw = buf.Bytes()
event.Object = &unknown
metav1.Convert_versioned_InternalEvent_to_versioned_Event(
metav1.InternalEvent(event),
&metav1.WatchEvent{}, nil)

e.Encode(outEvent)
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
}
}

registry & storage

这一节本来跟stream没有关系,但它是对etcd的watch的封装所以还是记一下。

上面内部watcher是rest.StandardStorage接口,它是以下所有接口的组合。它的实现registry.Store提供了N个函数挂载点,对所有资源类型提供了统一的实现。比如每种资源都实现了NewFunc和KeyFunc,Store统一实现Creater接口实现对每种资源的创建,并最终调用storage包面向etcd的接口实现到后端数据库的持久化。

1
2
3
4
5
6
7
8
type StandardStorage interface {
Getter
Lister
CreaterUpdater
GracefulDeleter
CollectionDeleter
Watcher
}

比如store封装的watch接口最终到storage里面向etcd的watch接口。

1
2
3
4
5
6
7
8
9
10
11
12
func (e *Store) Watch(ctx context.Context, 
options *metainternalversion.ListOptions) (watch.Interface, error) {
predicate := e.PredicateFunc(labels.Everything(), fields.Everything())
return e.WatchPredicate(ctx, predicate, options.ResourceVersion)
}
func (e *Store) WatchPredicate(ctx context.Context,
p storage.SelectionPredicate, resourceVersion string)
(watch.Interface, error) {
...
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
return w, nil
}

etcdHelper这个包封装了etcdv2的接口,最终是通过循环处理Watcher.Next来实现内部事件的产生。这个过程还涉及到storage的watch cache。详细过程下一次写watch cache再写吧。

1
2
3
4
5
6
7
8
func (h *etcdHelper) WatchList(ctx context.Context, key string, 
resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred,
h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, resourceVersion)
return w, nil
}
1
2
3
4
5
6
7
8
9
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, 
key string, resourceVersion uint64) {
watcher := client.Watcher(key, &opts)
w.ctx, w.cancel = context.WithCancel(ctx)
for {
resp, err := watcher.Next(w.ctx)
w.etcdIncoming <- resp
}
}

client

客户端通过Do获取到服务端的第一个Header响应。最后通过StreamWatcher封装好watch的ResultChan接口,它从连接上decoder反序列化数据由streamWatcher封装好返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r *Request) Watch() (watch.Interface, error) {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
req.Header = r.headers
client := r.client

resp, err := client.Do(req)
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("got status: %v", resp.StatusCode)
}
framer := r.serializers.Framer.NewFrameReader(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
}

这个接口最常见的地方是在reflectorlistWatch当中, 在watch循环中通常客户端会指定超时时间5分钟,好让服务端知道什么时候超时结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for {
timeoutSeconds:=int64(minWatchTimeout.Seconds()*(rand.Float64()+1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:// watch closed normally
case io.ErrUnexpectedEOF:
}
return nil
}
r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)
}

web-socket

上面的两种方式,其实都非常浪费资源。长轮询必须不停连接,长连接必须保持HTTP连接始终打开。websocket就是另一种解决服务端push的方法。简单来说,它是建立在TCP协议之上的ws协议,它跟HTTP协议有良好的兼容性,数据格式比较轻量,可以发送文本,也可以二进制。

WebSocket复用了HTTP一部分握手过程。客户端通过HTTP请求与WebSocket服务端协商要求升级协议。协议升级完成后,后续的数据交换则遵照WebSocket的协议。以下是客户端发出的请求。

1
2
3
4
5
6
7
GET / HTTP/1.1
Host: localhost:8080
Origin: http://127.0.0.1:3000
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==

服务端回应101表示切换协议。具体协议参考WebSocket Protocol

1
2
3
4
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=

连接建立并协议升级后,双方的通信进入web-socket协议,它有以下特点:

  1. 是真正的全双工方式,可以互相主动请求。
  2. 在已经建立好的TCP连接中,交换数据不需要再发送和解析HTTP header。
  3. 可以利用协议头的sec-websocket-key来进行连接复用,不同的URL可以复用同一个连接。

upgrade

web-socket的服务端首先要完成协商协议升级的事情,且后续Handler的处理不用再经过httpServer的请求解析,然后路由的部分,仍然再已建立的httpconn上完成后续的信息交互。

1
2
3
4
5
6
7
8
func echo(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Upgrader{}.Upgrade(w, r, nil)
defer c.Close()
for {
mt, message, err := c.ReadMessage()
err = c.WriteMessage(mt, message)
}
}

协议升级其实只是校验方法是否为GET,请求头是否有对应的升级标记。从http的Hijacker中获取原始netConn,并且回复server端的101协议升级信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, 
responseHeader http.Header) (*Conn, error) {
...

challengeKey := r.Header.Get("Sec-Websocket-Key")
h, ok := w.(http.Hijacker)
netConn, rw, err = h.Hijack()
br = rw.Reader

c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize)
c.subprotocol = subprotocol

p := c.writeBuf[:0]
p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: \
websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
p = append(p, computeAcceptKey(challengeKey)...)
p = append(p, "\r\n"...)
if c.subprotocol != "" {
p = append(p, "Sec-Websocket-Protocol: "...)
p = append(p, c.subprotocol...)
p = append(p, "\r\n"...)
}

// Clear deadlines set by HTTP server.
netConn.SetDeadline(time.Time{})
if u.HandshakeTimeout > 0 {
netConn.SetWriteDeadline(time.Now().Add(u.HandshakeTimeout))
}

if _, err = netConn.Write(p); err != nil {
netConn.Close()
return nil, err
}
return c, nil
}

keepalive

websocket的连接无法感知对方是否关闭了连接,所以协议层设计了Ping-Pang来做keepalive。每隔7s秒发一次PingMessage,收到PongMessage就更新lastResponseTime,如果超过15s都没有PongMessage的回应,则断开websocket连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (dynHandler *DynamicConfigHandler) startDynamicConfigHandler() error {
if dynHandler != nil && dynHandler.wsDialer != nil {
dynHandler.wsConnection, _, err = dynHandler.wsDialer.Dial(url, nil)
keepAlive(dynHandler.wsConnection, 15*time.Second)
go func() error {
for {
messageType, message, err := dynHandler.wsConnection.ReadMessage()
if err != nil {
break
}
if messageType == websocket.TextMessage {
dynHandler.EventHandler.OnReceive(message)
}
}
return dynHandler.wsConnection.Close()
}()
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func keepAlive(c *websocket.Conn, timeout time.Duration) {
lastResponse := time.Now()
c.SetPongHandler(func(msg string) error {
lastResponse = time.Now()
return nil
})
go func() {
for {
err := c.WriteMessage(websocket.PingMessage, []byte("keepalive"))
if err != nil {
return
}
time.Sleep(timeout / 2)
if time.Now().Sub(lastResponse) > timeout {
c.Close()
return
}
}
}()
}

http2

http2的writer实现了Pusher接口,通过push可以把消息发给http2conn内部维护的wantStartPushCh,在conn的serve过程中分发这个msg开始startPush。最终是发了pushPromiseFrame,具体的过程下次写http2实现的时候再说吧。

1
2
3
4
5
6
7
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if push, ok := w.(http.Pusher); ok {
if err := pusher.Push("/app.js", nil); err != nil {
log.Printf("Failed to push: %v", err)
}
}
})

这里只解释一个问题:为什么http2没有改变语义、方法、状态码和URI、以及首部字段,它也没有使用分块传输编码,它如何实现push和多路复用,改进了传输性能。

应用层和传输层之间增加了一个二进制分帧层。

frame

HTTP2将要传输的信息分割成更小的消息和帧,并对它们采用二进制格式的编码。原来的HTTP header和HTTP body都以Frame Payload存在。每个Frame的FrameHeader描述了这个帧的长度,类型等信息。

  1. Length: Frame Payload 的长度, Frame Header 的长度是 9 字节(Length + Type + Flags + R + Stream Identifier = 72 bit)。
  2. Type: Frame Payload 存储的数据是属于 HTTP Header 还是 HTTP Body
  3. Flags: 共 8 位, 每位都起标记作用。每种不同的 Frame Type 都有不同的 Frame Flags。例如发送最后一个 DATA 类型的 Frame 时,就会将 Flags 最后一位设置 1(flags &= 0x01),表示 END_STREAM,说明这个 Frame 是流的最后一个数据包。
  4. Stream Identifier: 流 ID,当客户端和服务端建立 TCP 链接时,就会先发送一个 Stream ID = 0 的流,用来做些初始化工作。之后客户端和服务端从 1 开始发送请求/响应。
1
2
3
4
5
6
7
8
9
+-----------------------------------------------+
| Length (24) |
+---------------+---------------+---------------+
| Type (8) | Flags (8) |
+-+-------------+---------------+-------------------+
|R| Stream Identifier (31) |
+=+=================================================+
| Frame Payload (0...) ...
+---------------------------------------------------+

理解:HTTP1.x客户端解析读取响应是根据响应头的Content-Len读取body体,然后返回。为了让它流式读取,server端要在头里告诉client 现在要变更编码方式为chunked,之后进行分块传输,直到server端发了大小为0的数据。

HTTP2引入Frame了之后完全改变了原来的编解码方式,整个方式类似很多RPC协议。帧由二进制编码,帧头固定位置的字节描述body长度,就可以读取body体,直到flags遇到END_STREAM。这种方式天然支持服务端在stream上发送数据,不需要通知客户端做什么改变。

其次streamID这个是用来做多路复用的,跟许多RPC协议里的msgID是一个意思。

Reference