前两年根据Kube-Infomer的框架,写过route-controller实现gorouter和kubenretes集群中应用路由的转发,还写过一个stack-controller实现AOS的堆栈管理,当时对照1.5和1.7版本的代码做了些总结。1.9之后informer已经完全迁移到client-go了,根据1.9的代码整理一遍框架的源码实现。informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。
- 监听通过controller中的Reflector实现,上一节记录过reflector如何将listWatcher得到的事件写到Store里,这里informer使用的Store是DeltaFIFO,它支持实现完全按事件发生顺序的分发处理。
- 由Reflector生产的事件最终由processor消费。processor通过Pop队列里的事件,更新informer本地的indexer缓存,同时将事件distribute给所有的listerner。
- processor的listener由外部通过AddEventHandler注册,每个listener提供AddFunc UpdateFunc DeleteFunc方法。listener内部的实现加了一层缓存,用于存放pendingNotification。listerner最终实现了事件的分发,事件最终被注册的handler处理。
InfomerFactory
在DeploymentController初始化时,使用了PodInformer中的Lister()和Informer(),并通过AddEventHandler给Informer注册了事件分发处理的方式。Controller中使用的所有Informer都是从SharedInformerFactory中根据GroupVersionResource得到,同时informer的启动也是从这里开始start。
1 | func Run(c *config.CompletedConfig) error { |
Informer Register
informer的初始化通过client-go/tools/cache包提供的接口完成。而每个informer都通过其Informer接口实现向factory的注册。实际上一旦调用podInformer.Informer()就完成了注册,这是在startController中完成的,这之后就可以通过informerFactory.Start启动所有informer了。
1 | func NewFilteredPodInformer(client kubernetes.Interface, namespace string, |
1 | func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { |
Informer Run
最终informerFactory将注册到工厂的所有informer都启动,而informer启动后的工作就是事件监听和分发。cache.WaitCacheSync
遍历所有informer看是否所有的informer都收到过事件,最终HasSynced
的判断来自DeltaFIFO。
1 | type sharedInformerFactory struct { |
SharedInformer
informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。成员中indexer是一个保存全量数据的缓存Store,informer对外提供的Lister通过Store完成,即Lister并没有直接操作etcd。
1 | type sharedIndexInformer struct { |
1 | func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { |
Controller
controller的Run主要是个生产者消费者模式,reflector是生产者,而controller的Process函数s.HandleDeltas
是事件的消费者。在controller的processLoop中不断地调用reflector的store的Pop消费事件,事件最终由sharedIndexInformer的HandleDeltas处理。
1 | func (c *controller) Run(stopCh <-chan struct{}) { |
DeltaFIFO
在informer框架中DeltaFIFO作为Reflector的Store,根据list watch结果对Store进行Add/Update/Delete等操作。数据结构中最重要的是items和queue,其中items缓存了几乎所有add到FIFO中的事件,它以[]Delta的形式存储,而queue则是存储这些事件的id作为FIFO处理的先后顺序。
跟UndeltaStore不同的是
1 | type DeltaFIFO struct { |
所有对Store的增删改都会经过下面的函数,它负责将这个obj入队,并存储到items缓存中,即使是删除事件也进入items事件中等到处理。当DeltaFIFO有新的内容加入后通过调用f.cond.BroadCast通知所有在f.cond.Wait中的goroutine可以去尝试Lock。
1 | func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { |
DeltaFIFO的生产者是Reflector,而消费者就是调用DeltaFIFO.Pop()的客户端。Pop方法的所有客户端都cond.Wait,但是只有当在cond.Wait中能真的获取到Lock才能从cond.Wait中返回。返回后取出queue的第一个id,在items获取该id对应的所有Delta事件,调用PopProcessFunc去处理。处理失败的item有可能再次加入队列。
处理失败后如果后续已经有deltas在缓存里,这些item就舍弃了。
1 | func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { |
ResourceEventHandler
通常往informer里添加的处理函数都满足以下接口,cache包里的ResourceEventHandlerFuncs刚好实现了这一组方法,因此只要注册AddFunc UpdatdFunc以及DeleteFunc即可。
1 | type ResourceEventHandler interface { |
在informer里AddEventHandler时,实际是向informer的processor里添加了listener,这个processorListener通过add run pop三个基本方法对外提供事件分发处理的功能。下面是简化删减版的注册方法。
1 | func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler, |
sharedProcessor
注册到controller里的处理函数是HandleDeltas,它主要通过processor分发事件。除了分发事件以外,处理函数会同时更新informer本地的store。
1 | func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { |
在消费事件时,通过informer的processor进行distrubute。processor进行分发的处理函数由外部通过AddEventHandler
,向processor里addListener。其中addListener只是增加一个processor管理的listener,并在分发时遍历listeners,将事件发给所有的listener。
1 | type sharedProcessor struct { |
processor的run保证所有listener都开始运行,并保证退出时所有listener的chan都关闭
1 | func (p *sharedProcessor) run(stopCh <-chan struct{}) { |
processorListener
pendingNotifications装了所有还没分发的事件。其中处理事件processor开始distribute时,会调用listener的add方法,将事件发到addCh上。
buffer.NewRingGrowing
1 | type processorListener struct { |
listener的pop goroutine不断地从addCh中获取事件,写到本地的pendingNotification或写给nextCh,而nextCh从本地pendingNotification或addCh获取事件。最后由run方法消费事件和分发事件。run方法支持指数重试,退出也会重新开始。
1 | func (p *processorListener) pop() { |
1 | func (p *processorListener) run() { |
NewController
Kube-controller-manager是一个controller的集合,它实现了异步事件通知的一个通用框架,所有controller都以它为中心工作。以deploymentController为例记录下controller的实现逻辑。
1 | func startDeploymentController(ctx ControllerContext) (bool, error) { |
DeploymentController的数据结构里listerSynced返回true表示这个lister已经至少开始工作了,Lister则是从informers的缓存中get数据的通道,rsControl提供一组操作RS的接口。节选一部分代码内容。
1 | type DeploymentController struct { |
Controller.Run的过程首先等待所有informer都工作之后,开始并发的起N个goroutine来处理事件分发。这个goroutine循环处理一个事情,就是从queue里拿任务交给syncHandler处理。
queue何时退出
1 | func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { |
queue当中的事件是如何进队的呢,通过注册到informer上的eventHandler处理的,比如当有新的deployment创建时,最终将要处理的deployment的key进队。
1 | func (dc *DeploymentController) addDeployment(obj interface{}) { |
Question
- 为什么kubelet使用的UndeltaStore 而controller使用informer
- sharedInformer的shared体现在缓存indexer是公用的吗
- listerner使用的buffer作用
- controller中的queue如何做到限流