KubeController异步事件处理Infomer的实现

前两年根据Kube-Infomer的框架,写过route-controller实现gorouter和kubenretes集群中应用路由的转发,还写过一个stack-controller实现AOS的堆栈管理,当时对照1.5和1.7版本的代码做了些总结。1.9之后informer已经完全迁移到client-go了,根据1.9的代码整理一遍框架的源码实现。informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。

informer

  1. 监听通过controller中的Reflector实现,上一节记录过reflector如何将listWatcher得到的事件写到Store里,这里informer使用的Store是DeltaFIFO,它支持实现完全按事件发生顺序的分发处理。
  2. 由Reflector生产的事件最终由processor消费。processor通过Pop队列里的事件,更新informer本地的indexer缓存,同时将事件distribute给所有的listerner。
  3. processor的listener由外部通过AddEventHandler注册,每个listener提供AddFunc UpdateFunc DeleteFunc方法。listener内部的实现加了一层缓存,用于存放pendingNotification。listerner最终实现了事件的分发,事件最终被注册的handler处理。

InfomerFactory

在DeploymentController初始化时,使用了PodInformer中的Lister()和Informer(),并通过AddEventHandler给Informer注册了事件分发处理的方式。Controller中使用的所有Informer都是从SharedInformerFactory中根据GroupVersionResource得到,同时informer的启动也是从这里开始start。

1
2
3
4
5
6
7
8
func Run(c *config.CompletedConfig) error { 
sharedInformerFactory := informers.NewSharedInformerFactory(
verClient, ResyncPeriod)
// startController
// podInformer := sharedInformerFactory.Core().V1().Pods()
sharedInformerFactory.Start()
select {}
}

Informer Register

informer的初始化通过client-go/tools/cache包提供的接口完成。而每个informer都通过其Informer接口实现向factory的注册。实际上一旦调用podInformer.Informer()就完成了注册,这是在startController中完成的,这之后就可以通过informerFactory.Start启动所有informer了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewFilteredPodInformer(client kubernetes.Interface, namespace string,
resyncPeriod time.Duration, indexers cache.Indexers,
/*tweakListOptions*/) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&core_v1.Pod{},
resyncPeriod,
indexers,
)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = newFunc(f.client, f.defaultResync)
f.informers[informerType] = informer

return informer
}

Informer Run

最终informerFactory将注册到工厂的所有informer都启动,而informer启动后的工作就是事件监听和分发。cache.WaitCacheSync遍历所有informer看是否所有的informer都收到过事件,最终HasSynced的判断来自DeltaFIFO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type sharedInformerFactory struct {
client clientset.Interface
lock sync.Mutex
defaultResync time.Duration

informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
}

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

SharedInformer

informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。成员中indexer是一个保存全量数据的缓存Store,informer对外提供的Lister通过Store完成,即Lister并没有直接操作etcd。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type sharedIndexInformer struct {
indexer Indexer
controller Controller

processor *sharedProcessor
cacheMutationDetector CacheMutationDetector

listerWatcher ListerWatcher
objectType runtime.Object

resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock

started, stopped bool
startedLock sync.Mutex

blockDeltas sync.Mutex
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

s.controller.Run(stopCh)
}

Controller

controller的Run主要是个生产者消费者模式,reflector是生产者,而controller的Process函数s.HandleDeltas是事件的消费者。在controller的processLoop中不断地调用reflector的store的Pop消费事件,事件最终由sharedIndexInformer的HandleDeltas处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c *controller) Run(stopCh <-chan struct{}) {
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// handler err
}
}

DeltaFIFO

在informer框架中DeltaFIFO作为Reflector的Store,根据list watch结果对Store进行Add/Update/Delete等操作。数据结构中最重要的是items和queue,其中items缓存了几乎所有add到FIFO中的事件,它以[]Delta的形式存储,而queue则是存储这些事件的id作为FIFO处理的先后顺序。

跟UndeltaStore不同的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond

// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string

populated bool
initialPopulationCount int

keyFunc KeyFunc
knownObjects KeyListerGetter

closed bool
closedLock sync.Mutex
}

所有对Store的增删改都会经过下面的函数,它负责将这个obj入队,并存储到items缓存中,即使是删除事件也进入items事件中等到处理。当DeltaFIFO有新的内容加入后通过调用f.cond.BroadCast通知所有在f.cond.Wait中的goroutine可以去尝试Lock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}

newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
_, exists := f.items[id]
if len(newDeltas) > 0 {
if !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
delete(f.items, id)
}
return nil
}

DeltaFIFO的生产者是Reflector,而消费者就是调用DeltaFIFO.Pop()的客户端。Pop方法的所有客户端都cond.Wait,但是只有当在cond.Wait中能真的获取到Lock才能从cond.Wait中返回。返回后取出queue的第一个id,在items获取该id对应的所有Delta事件,调用PopProcessFunc去处理。处理失败的item有可能再次加入队列。

处理失败后如果后续已经有deltas在缓存里,这些item就舍弃了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, FIFOClosedError
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}

return item, err
}
}

ResourceEventHandler

通常往informer里添加的处理函数都满足以下接口,cache包里的ResourceEventHandlerFuncs刚好实现了这一组方法,因此只要注册AddFunc UpdatdFunc以及DeleteFunc即可。

1
2
3
4
5
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

在informer里AddEventHandler时,实际是向informer的processor里添加了listener,这个processorListener通过add run pop三个基本方法对外提供事件分发处理的功能。下面是简化删减版的注册方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler, 
resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()

listener := newProcessListener(handler, resyncPeriod,
determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod),
s.clock.Now(), initialBufferSize)

s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

s.processor.addListener(listener)
}

sharedProcessor

注册到controller里的处理函数是HandleDeltas,它主要通过processor分发事件。除了分发事件以外,处理函数会同时更新informer本地的store。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
s.indexer.Update(d.Object)
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
s.indexer.Add(d.Object)
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
s.indexer.Delete(d.Object)
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

在消费事件时,通过informer的processor进行distrubute。processor进行分发的处理函数由外部通过AddEventHandler,向processor里addListener。其中addListener只是增加一个processor管理的listener,并在分发时遍历listeners,将事件发给所有的listener。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
listener.add(obj)
}
}

processor的run保证所有listener都开始运行,并保证退出时所有listener的chan都关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// Tell .pop() to stop. .pop() will tell .run() to stop
close(listener.addCh)
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

processorListener

pendingNotifications装了所有还没分发的事件。其中处理事件processor开始distribute时,会调用listener的add方法,将事件发到addCh上。

buffer.NewRingGrowing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}

handler ResourceEventHandler
pendingNotifications buffer.RingGrowing

requestedResyncPeriod time.Duration
resyncPeriod time.Duration

nextResync time.Time
resyncLock sync.Mutex
}

func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}

listener的pop goroutine不断地从addCh中获取事件,写到本地的pendingNotification或写给nextCh,而nextCh从本地pendingNotification或addCh获取事件。最后由run方法消费事件和分发事件。run方法支持指数重试,退出也会重新开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok {
nextCh = nil
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
notification = notificationToAdd
nextCh = p.nextCh
} else {
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
}
}
return true, nil
})
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}

NewController

Kube-controller-manager是一个controller的集合,它实现了异步事件通知的一个通用框架,所有controller都以它为中心工作。以deploymentController为例记录下controller的实现逻辑。

1
2
3
4
5
6
7
8
9
func startDeploymentController(ctx ControllerContext) (bool, error) {
go deployment.NewDeploymentController(
ctx.InformerFactory.Extensions().V1beta1().Deployments(),
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
return true, nil
}

DeploymentController的数据结构里listerSynced返回true表示这个lister已经至少开始工作了,Lister则是从informers的缓存中get数据的通道,rsControl提供一组操作RS的接口。节选一部分代码内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type DeploymentController struct {
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder

syncHandler func(dKey string) error
enqueueDeployment func(deployment *extensions.Deployment)

dLister extensionslisters.DeploymentLister
dListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}

func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer,
rsInformer extensionsinformers.ReplicaSetInformer,
podInformer coreinformers.PodInformer,
client clientset.Interface) *DeploymentController {
dc := &DeploymentController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
DeleteFunc: dc.deleteDeployment,
})

dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
return dc
}

Controller.Run的过程首先等待所有informer都工作之后,开始并发的起N个goroutine来处理事件分发。这个goroutine循环处理一个事情,就是从queue里拿任务交给syncHandler处理。

queue何时退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()

if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced,
dc.rsListerSynced, dc.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}

func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}

func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)
return true
}

queue当中的事件是如何进队的呢,通过注册到informer上的eventHandler处理的,比如当有新的deployment创建时,最终将要处理的deployment的key进队。

1
2
3
4
5
6
7
8
9
10
func (dc *DeploymentController) addDeployment(obj interface{}) {
d := obj.(*extensions.Deployment)
dc.enqueueDeployment(d)
}

func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) {
key, err := controller.KeyFunc(deployment)
// process error
dc.queue.Add(key)
}

Question

  1. 为什么kubelet使用的UndeltaStore 而controller使用informer
  2. sharedInformer的shared体现在缓存indexer是公用的吗
  3. listerner使用的buffer作用
  4. controller中的queue如何做到限流

Reference

sync.Cond的源码观察