记fastHTTP协程池的实现

golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的fasthttp就是一个很好的实现,Go开发HTTP的另一个选择fasthttp中总结了它跟标准库实现的几点不同:

  • net/http 的实现是一个连接新建一个 goroutine,fasthttp 是利用一个 worker pool做了协程池,复用 goroutine,减轻 runtime 调度 goroutine 的压力
  • net/http 解析的请求数据很多放在http.Header或者http.Request.Form中,数据结构 map[string]stringmap[string][]string涉及不必要的 []byte 到 string 的转换,是可以规避的
  • net/http 解析 HTTP 请求每次生成新的 *http.Requesthttp.ResponseWriterfasthttp 解析 HTTP 数据到 *fasthttp.RequestCtx,然后使用 sync.Pool 复用结构实例,减少对象的数量
  • fasthttp 会延迟解析 HTTP 请求中的数据,尤其是 Body 部分。这样节省了很多不直接操作 Body 的情况的消耗

workerpool

net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *Server) Serve(ln net.Listener) error {
// default concurrency set to 256*1024
maxWorkersCount := s.getConcurrency()
s.concurrencyCh = make(chan struct{}, maxWorkersCount)
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
}
wp.Start()
for {
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.Stop()
if err == io.EOF {
return nil
}
return err
}
if !wp.Serve(c) {
s.writeFastError(c, StatusServiceUnavailable,
"The connection cannot be served for Server.Concurrency limit exceeded")
c.Close()
time.Sleep(100 * time.Millisecond)
}
c = nil
}
}

workerpool的数据结构中WorkerFunc就是 s.serveConn,即每条net.conn的处理函数。workerChanPool是个对象池,MaxIdleWorkerDuration是worker的最大空闲时间,ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。

这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type workerPool struct {
WorkerFunc func(c net.Conn) error
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger

lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
}

type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
}

start & stop

wp.start开启了一个goroutine,定时清理workerpool中未使用时间超过maxIdleWorkerDuration的goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (wp *workerPool) Start() {
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}

stop停止了ready里所有ch清空,并清空ready。资源清理时chan要置nil

1
2
3
4
5
6
7
8
9
10
11
12
13
func (wp *workerPool) Stop() {
close(wp.stopCh)
wp.stopCh = nil
wp.lock.Lock()
ready := wp.ready
for i, ch := range ready {
ch.ch <- nil
ready[i] = nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
}

serve

实现中还涉及到如果wp已经stop,那worker退出后channel对象通过临时对象池管理等细节,这里就跳过了。总结 wp.serve到s.serveconn的过程大概如下

  1. 当ready这个可用worker列表中没有ch可用时,创建一个新ch绑定wp.Workfunc的goroutine。即新建了一个协程worker,这个协程从绑定的ch中获取待处理net.Conn。
  2. wp.Serve把accept的conn发到这个ch上,供绑定的协程worker处理。
  3. worker处理完后release这个绑定的ch到ready栈里。下一次有连接来时getCh优先从ready栈里找ch,也就是找worker。对ready的读取FILO,类似栈。
1
2
3
4
5
6
7
8
func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}

getCh的实现可以理解为一个用来执行workFunc的goroutine都绑定了一个workerChan。把要处理的conn发到这个workerChan,这个goroutine就开始执行。没有要执行的conn则goroutine阻塞,直到下次workerChan有连接发来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false

wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()

if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}

worker处理完一个连接后,将release这个连接到ready这个可用worker栈。即表示这时worker阻塞,可以交给它任务啦。同时处理完net.Conn后要置nil。正常情况下worker是不退出的,除非wp.Stop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
if c == nil {
break
}
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
}
c = nil
if !wp.release(ch) {
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}

func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = CoarseTimeNow()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}

clean

最后看下start中开启的clean定时任务。之所以清理过程只从前遍历清理前面部分,是因为ready是FILO先进后出的,所以ready中越往后的空闲时间最短。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
currentTime := time.Now()
wp.lock.Lock()
ready := wp.ready
n := len(ready)
i := 0
for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
i++
}
*scratch = append((*scratch)[:0], ready[:i]...)
if i > 0 {
m := copy(ready, ready[i:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
}
wp.lock.Unlock()
tmp := *scratch
for i, ch := range tmp {
ch.ch <- nil
tmp[i] = nil
}
}

Reference