Hystrix-go实现

原本写博客只是为了方便自己记忆,后来发现贴代码和写注释并没有帮助自己理清思路。今天看了Hystrix的实现,想试试用更简单的方式能不能说清楚原理和实现要点。

后记:说不贴代码结果还是贴了好多,好像尝试失败了…最终还是觉得代码比文字更容易理解…

Hystrix-go

Hystrix项目本来是Netflix开源的Java项目,主要功能是隔离熔断,hystrix-go是这个隔离熔断库的go版本。其中隔离是异常检测机制,检测的异常包括请求超时、流量过大。熔断是在隔离的基础上统计错误率,当错误达到某种程度就不再发送后续请求,这也是为了杜绝级联错误。另外可以定义熔断发生时的fallback处理方法。从Hystrix-go的配置项来总结其实现的功能包括:

  • isolation: 隔离检测 支持检测请求超时,支持检测并发请求过大两种情况
  • circuitbreak: 熔断开启的阈值 可设置 10s内请求次数,或10s内的错误率
  • fallback: 可以设置是否开启熔断发生后的fallback处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Settings struct {
// isolation
Timeout time.Duration
TimeoutEnabled bool
MaxConcurrentRequests int
// circuit break
CircuitBreakerEnabled bool
RequestVolumeThreshold uint64
SleepWindow time.Duration
ErrorPercentThreshold int
// fallback
ForceFallback bool
ForceOpen bool
ForceClose bool
}

以下两节主要解释以下两个问题

  1. 如何检测请求超时,如何控制并发请求数
  2. 如何统计错误请求数和总请求数

asynchronous

hystrix对外提供Go和Do方法,分别是异步和同步方法。Go开启goroutine执行请求,首先判断是否熔断,再判断是否超过最大请求数,最后开启另一goroutine来统计时间。

  • 如何与调用方同步 返回err chan用于调用方同步,errChan只有errorWithFallback 会写入错误
  • 如何检测并发请求数 executorPool的tickets是用来控制并发请求数的,简单说就是pool中有max conn reqs这么多个缓冲的chan有ticket,每个请求去读取chan,若请求数达到max则chan读取完毕,下次读取将阻塞。select-case将不发生阻塞,直接从default返回错误
  • 如何检测超时 finished chan用来同步req goroutine和timer goroutine,后者用于检测请求超时事件
  • 如何统计请求执行结果判断是否熔断 每次请求都有events产生,events[0]只有可能是successtimeout,events[1]是fallback的执行结果,最终events会经过circuit.ReportEvent 返回到circuit进行统计,每次请求前先判断circuit是否AllowRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func Go(name string, run runFunc, fallback fallbackFunc) chan error {
...
circuit, _, _ = GetCircuit(name)
go func() {
defer func() {
cmd.finished <- true
}()
if !cmd.circuit.AllowRequest() {
cmd.errorWithFallback(ErrCircuitOpen)
return
}
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
cmd.Unlock()
default:
cmd.Unlock()
cmd.errorWithFallback(ErrMaxConcurrency)
return
}
runStart := time.Now()
runErr := run()
if !cmd.isTimedOut() {
cmd.runDuration = time.Since(runStart)
if runErr != nil {
cmd.errorWithFallback(runErr)
return
}
cmd.reportEvent("success")
}
}()

go func() {
defer func() {
cmd.Lock()
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
}()
if getSettings(name).TimeoutEnabled {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
case <-timer.C:
cmd.errorWithFallback(ErrTimeout)
return
}
} else {
<-cmd.finished
}
}()
return cmd.errChan
}

synchronous

Do方法提供了一种同步调用Go的方式,当然用户可以自己根据err chan做同步。done表示用户的请求正确执行完成,或者表示fallback执行结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func Do(name string, run runFunc, fallback fallbackFunc) error {
done := make(chan struct{}, 1)
r := func() error {
err := run()
if err != nil {
return err
}
done <- struct{}{}
return nil
}
f := func(e error) error {
err := fallback(e)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
var errChan chan error
errChan = Go(name, r, f)
select {
case <-done:
return nil
case err := <-errChan:
return err
}
}

circuit-breaker

上一节讲了circuit 是统计请求执行结果判断是否熔断的主要数据结构,同时用于检测并发请求数的pool也在其中。circuit的实现总结来说,就是要实现metrics来统计请求数,错误及时延等metric信息,并根据这些信息判断是否熔断。

进程中可能要请求不同的后端,hystrix要对不同的请求做统计和熔断控制,所以hystrix内部维护了circuitbreakerMap,其中key就是调用Go方法的name参数。

1
2
3
4
5
6
7
8
9
10
11
12
type CircuitBreaker struct {
Name string
open bool
enabled bool
forceOpen bool
forceClosed bool
mutex *sync.RWMutex
openedOrLastTestedTime int64

executorPool *executorPool
metrics *metricExchange
}

executorPool

executorPool通过Tickets控制最大并发数。创建缓冲为max concurrent reqs的chan,并发空结构给它,只有从tickts中读到ticket才能发送请求。一旦tickets被读取完,新一次的读取将阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{}
}

func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Max = getSettings(name).MaxConcurrentRequests
p.Tickets = make(chan *struct{}, p.Max)
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}
return p
}

Return是提供给请求调用方在执行完请求后回收ticket的方法。这里的executorPool维护的poolMetrics主要统计已执行的请求数和最大还支持的并发数这两个metrics。他们通过metrics.Updates在每次请求结束时传递结果。

1
2
3
4
5
6
7
8
9
func (p *executorPool) Return(ticket *struct{}) {
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket
}
func (p *executorPool) ActiveCount() int {
return p.Max - len(p.Tickets)
}

metricsExchange

hystrix包支持两种熔断标记:10s内请求次数,或10s内的错误率。下面IsOpen中判断错误率的函数,其中reqs也是统计10内总请求数的方法。

1
2
3
4
5
6
7
type metricExchange struct {
Name string
Updates chan *commandExecution
Mutex *sync.RWMutex

metricCollectors []metricCollector.MetricCollector
}

如何判断熔断是否开启 通过metricCollectors收集请求数和错误数等信息,判断时从中读取

1
2
3
func (circuit *CircuitBreaker) AllowRequest() bool {
return !circuit.IsOpen() || circuit.allowSingleTest()
}
1
2
3
4
5
6
7
8
9
10
11
func (m *metricExchange) ErrorPercent(now time.Time) int {
m.Mutex.RLock()
defer m.Mutex.RUnlock()
var errPct float64
reqs := m.DefaultCollector().NumRequests().Sum(now)
errs := m.DefaultCollector().Errors().Sum(now)
if reqs > 0 {
errPct = (float64(errs) / float64(reqs)) * 100
}
return int(errPct + 0.5)
}

如何退出熔断 进入熔断后,需要allowSingleTest来开启试探,若请求成功则可退出熔断。即当熔断开启时间或上一次singleTest时间与当前时间差大于sleepwindow,则可以开启一次试探。

1
2
3
4
5
6
7
8
9
10
11
func (circuit *CircuitBreaker) allowSingleTest() bool {
circuit.mutex.RLock()
defer circuit.mutex.RUnlock()

now := time.Now().UnixNano()
if circuit.open &&
now > openedOrLastTestedTime+SleepWindow.Nanoseconds() {
return swapped(openedOrLastTestedTime, now)
}
return false
}

如何更新metrics 通过Updates chan传递每次请求的events结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, 
start time.Time, runDuration time.Duration) error {
if eventTypes[0] == "success" && circuit.open {
circuit.setClose()
}
select {
case circuit.metrics.Updates <- &commandExecution{
Types: eventTypes,
Start: start,
RunDuration: runDuration,
}:
default:
return CircuitError{...}
}
return nil
}

metrics开启goroutine接受updates的消息,在IncrementMetrics中根据event 更新总请求数,各种失败原因导致的错误次数,以及latency。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *metricExchange) Monitor() {
for update := range m.Updates {
m.Mutex.RLock()
totalDuration := time.Since(update.Start)
wg := &sync.WaitGroup{}
for _, collector := range m.metricCollectors {
wg.Add(1)
go m.IncrementMetrics(wg, collector, update, totalDuration)
}
wg.Wait()
m.Mutex.RUnlock()
}
}

collector & rolling

所有错误数等统计,其metrics中的数据结构为rolling.Number。看了rolling包感觉像个时间序列数据库,Buckets的Key是Unix时间戳,整个Number只保存10s内的数据。

1
2
3
4
5
6
7
8
type Number struct {
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex
}

type numberBucket struct {
Value float64
}

每次请求结束后调用的Increment方法最终会在这个时间序列数据map中添加当前时间对应的bucket值,同时删掉10s之前的。sum统计的是10s的总数目,sum+=的原因是Increment是以增量进入map的,即每个时间戳对应的是增量。

1
2
3
4
5
6
7
func (r *Number) Increment(i float64) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
b := r.getCurrentBucket()
b.Value += i
r.removeOldBuckets()
}
1
2
3
4
5
6
7
8
9
10
11
func (r *Number) Sum(now time.Time) float64 {
sum := float64(0)
r.Mutex.RLock()
defer r.Mutex.RUnlock()
for timestamp, bucket := range r.Buckets {
if timestamp >= now.Unix()-10 {
sum += bucket.Value
}
}
return sum
}