原本写博客只是为了方便自己记忆,后来发现贴代码和写注释并没有帮助自己理清思路。今天看了Hystrix的实现,想试试用更简单的方式能不能说清楚原理和实现要点。
后记:说不贴代码结果还是贴了好多,好像尝试失败了…最终还是觉得代码比文字更容易理解…
Hystrix-go
Hystrix项目本来是Netflix开源的Java项目,主要功能是隔离熔断,hystrix-go是这个隔离熔断库的go版本。其中隔离是异常检测机制,检测的异常包括请求超时、流量过大。熔断是在隔离的基础上统计错误率,当错误达到某种程度就不再发送后续请求,这也是为了杜绝级联错误。另外可以定义熔断发生时的fallback处理方法。从Hystrix-go的配置项来总结其实现的功能包括:
- isolation: 隔离检测 支持检测请求超时,支持检测并发请求过大两种情况
- circuitbreak: 熔断开启的阈值 可设置 10s内请求次数,或10s内的错误率
- fallback: 可以设置是否开启熔断发生后的fallback处理
1 | type Settings struct { |
以下两节主要解释以下两个问题
- 如何检测请求超时,如何控制并发请求数
- 如何统计错误请求数和总请求数
asynchronous
hystrix对外提供Go和Do方法,分别是异步和同步方法。Go开启goroutine执行请求,首先判断是否熔断,再判断是否超过最大请求数,最后开启另一goroutine来统计时间。
- 如何与调用方同步 返回err chan用于调用方同步,
errChan
只有errorWithFallback
会写入错误 - 如何检测并发请求数
executorPool
的tickets是用来控制并发请求数的,简单说就是pool中有max conn reqs这么多个缓冲的chan有ticket,每个请求去读取chan,若请求数达到max则chan读取完毕,下次读取将阻塞。select-case
将不发生阻塞,直接从default返回错误 - 如何检测超时
finished
chan用来同步req goroutine和timer goroutine,后者用于检测请求超时事件 - 如何统计请求执行结果判断是否熔断 每次请求都有events产生,events[0]只有可能是
success
或timeout
,events[1]是fallback的执行结果,最终events会经过circuit.ReportEvent
返回到circuit进行统计,每次请求前先判断circuit是否AllowRequest
。
1 | func Go(name string, run runFunc, fallback fallbackFunc) chan error { |
synchronous
Do方法提供了一种同步调用Go的方式,当然用户可以自己根据err chan做同步。done表示用户的请求正确执行完成,或者表示fallback执行结束。
1 | func Do(name string, run runFunc, fallback fallbackFunc) error { |
circuit-breaker
上一节讲了circuit 是统计请求执行结果判断是否熔断的主要数据结构,同时用于检测并发请求数的pool也在其中。circuit的实现总结来说,就是要实现metrics来统计请求数,错误及时延等metric信息,并根据这些信息判断是否熔断。
进程中可能要请求不同的后端,hystrix要对不同的请求做统计和熔断控制,所以hystrix内部维护了circuitbreakerMap,其中key就是调用Go方法的name参数。
1 | type CircuitBreaker struct { |
executorPool
executorPool通过Tickets控制最大并发数。创建缓冲为max concurrent reqs的chan,并发空结构给它,只有从tickts中读到ticket才能发送请求。一旦tickets被读取完,新一次的读取将阻塞。
1 | type executorPool struct { |
Return是提供给请求调用方在执行完请求后回收ticket的方法。这里的executorPool维护的poolMetrics
主要统计已执行的请求数和最大还支持的并发数这两个metrics。他们通过metrics.Updates
在每次请求结束时传递结果。
1 | func (p *executorPool) Return(ticket *struct{}) { |
metricsExchange
hystrix包支持两种熔断标记:10s内请求次数,或10s内的错误率。下面IsOpen
中判断错误率的函数,其中reqs也是统计10内总请求数的方法。
1 | type metricExchange struct { |
如何判断熔断是否开启 通过metricCollectors收集请求数和错误数等信息,判断时从中读取
1 | func (circuit *CircuitBreaker) AllowRequest() bool { |
1 | func (m *metricExchange) ErrorPercent(now time.Time) int { |
如何退出熔断 进入熔断后,需要allowSingleTest来开启试探,若请求成功则可退出熔断。即当熔断开启时间或上一次singleTest时间与当前时间差大于sleepwindow,则可以开启一次试探。
1 | func (circuit *CircuitBreaker) allowSingleTest() bool { |
如何更新metrics 通过Updates chan传递每次请求的events结果
1 | func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, |
metrics开启goroutine接受updates的消息,在IncrementMetrics
中根据event 更新总请求数,各种失败原因导致的错误次数,以及latency。
1 | func (m *metricExchange) Monitor() { |
collector & rolling
所有错误数等统计,其metrics中的数据结构为rolling.Number。看了rolling包感觉像个时间序列数据库,Buckets的Key是Unix时间戳,整个Number只保存10s内的数据。
1 | type Number struct { |
每次请求结束后调用的Increment方法最终会在这个时间序列数据map中添加当前时间对应的bucket值,同时删掉10s之前的。sum统计的是10s的总数目,sum+=的原因是Increment是以增量进入map的,即每个时间戳对应的是增量。
1 | func (r *Number) Increment(i float64) { |
1 | func (r *Number) Sum(now time.Time) float64 { |