Kubelet数据源的合并

Kubelet的功能简单总结就是根据系统中pod的变化,来处理pod的生命周期。它向下通过底层docker等接口处理pod的启动 更新停止,它向上通过list watch获取pod的信息来做任务识别和分发。这一节主要总结下kubelet三个数据来源合并的过程,代码参考1.7版本,是17年写的了,最近整理搬运到博客。

kubelet的Pod来源有三个,分别是manifest http以及apiserver。三者通过mux提供的三个channel给mux发送event。mux中开启三个协程去分别处理每种source到来的event,也就是pod。mux的处理过程就是将chan上收到的pod交给podstorage去merge。merge的过程就是跟本地的pod缓存比较,然后分解出不同事件往podstorage的update chan上统一的发送事件。这个chan通过Updates()方法被kubelet.Run使用。

chan数据流

Kubelet的启动

Kubelet的工作原理是通过三种途径获得Pod,由KubeletBootstrap实现的Run方法根据事件种类的不同来分发和处理Pod。其中连接具体工作和Pod来源的通道是podCfg.Updates()返回的chan。今天总结下ListWatch结果和Kubelet主线程之间通信的过程。

1
2
3
4
func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig) {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
}

在函数makePodSourceConfig中PodConfig通过cfg.Channel合并了三种Pod的来源,包括放在固定目录的manifest,以及HTTP请求,以及ListWatch获得的Pod变化。

1
2
3
4
5
6
7
8
9
func makePodSourceConfig(...) (*config.PodConfig, error) {
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, ...)
// define watching apiserver
if kubeDeps.KubeClient != nil {
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName,
cfg.Channel(kubetypes.ApiserverSource))
}
return cfg
}

PodConfig的实现

podStorage

storage是Pod的全部缓存,其中包含有updates这个chan,且storage负责单向向chan上写数据,即更新Pod的结果。这个chan通过PodConfig的Updates()方法对外提供读取chan上Pod的信息的方法,没有读取将阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type PodConfig struct {
pods *podStorage
mux *config.Mux

// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate

// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
}

type podStorage struct {
podLock sync.RWMutex
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubetypes.PodUpdate
...
}
1
2
3
func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
return c.updates
}

podStorage实现了Merge接口,提供Merge(source string, change interface{})方法。它识别从不同源来的change事件,并根据change 的类型对pod进行merge。如果是apiserver源的Pod,最终类型都是SET,这时从storage中读取的pods为oldPods,跟change中的所有合法的newPods比较,如果oldPods中没有则属于AddPods,如果oldPods有则决定是需要update还是reconcile还是delete。另外如果oldPods中有而newPods中没有则removePods。操作podstorage的过程podLock要上锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}

从不同Pod源整理出remove, add, update, delete的Pod后,依次往podstorage的updates chan上写数据,这个过程updateLock要上锁。

mux

mux用于实现源合并,它维护了每种source对应的chan,并通过Channel(source string)方法返回。该方法同时开启一个listen的goroutine,一旦这个chan上有数据写入,就读取并通过mux的merge进行合并。也就是这个podstorage的数据来源是mux暴露给外界的,让不同源可以写入的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}

这个chan通过PodConfig的Channel方法提供给外部使用,每种源通过cfg.Channel(sourceName)来写入数据到mux,mux通过merge最终将数据合并到podStorage。

1
2
3
4
5
6
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}

apiserver源的实现

如上所述,updates是一个允许每个源单向写入的chan。APIServer这个源通过reflector来实现对Pod对象的ListWatch,它通过client-go/tools/cache这个包提供对reflector的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, 
updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", metav1.NamespaceAll,
fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}

func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET,
Source: kubetypes.ApiserverSource}
}
cache.NewReflector(lw, &v1.Pod{},
cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}

Reflector

reflector对象主要的成员就是listerWatcher,store则是NewReflector外部的store,reflector就是把list和watch到的结果set到store里面。reflector.Run() 开启新协程去执行ListAndWatch()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Reflector struct {
name string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
lastSyncResourceVersionMutex sync.RWMutex
}

func (r *Reflector) Run() {
go wait.Until(func() {
if err := r.ListAndWatch(wait.NeverStop); err != nil {
utilruntime.HandleError(err)
}
}, r.period, wait.NeverStop)
}

list会触发一次store的同步,在syncWith中执行store.Replace,并记录这次同步的resourceVersion。之后将开启goroutine去处理定时同步。在开启的watch循环中,设置watch的options包括resourceVersion和timeout时间。这里reflector为了避免watcher的挂起,设置了timeout时间,它会停止在该时间窗内不能获得任何事件的watchers。Watch最后返回的接口提供两个方法,可停止Watch,可从中获取Watch事件。

1
2
3
4
5
6
7
8
9
10
type Interface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()

// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan Event
}

当无错误建立起watch通道,获得watch.Interface后,reflector调用watchHandler去处理。它就是从ResultChan中获取事件更新到store,同时更新resourceVersion。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string,
errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0
defer w.Stop()

loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
...
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
...
case watch.Modified:
err := r.store.Update(event.Object)
...
case watch.Deleted:
err := r.store.Delete(event.Object)
...
default:
...
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}

watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
//watch lasted less than a second and no items received
return errors.New("very short watch")
}
...
return nil
}

UndeltaStore

reflector最终操作的store,kubelet中使用的是UndeltaStore,它是Store的一层封装,加入了PushFunc。在每次Add或UpdateDelete操作时,都执行注册的PushFunc将完整的Store.List() Push出去。这里的PushFunc是往apiserver源的chan上push数据。

为什么要完整的store.List,而不是只push增量呢。

1
2
3
4
5
6
7
8
9
10
11
12
type UndeltaStore struct {
Store
PushFunc func([]interface{})
}

func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}

这里的Store是个实现增删改查等方法的接口,它是线程安全的Store的一层封装,对外提供对单一对象obj的接口,通过keyfunc从obj提取存储的key值,而ThreadSafeStore则提供根据key-value更新存储的接口,比如Add(key string, obj interface{}),其中ThreadSafeStore通过treadSafeMap实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}

// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}

Question

  1. reflector的store当中pushfunc为什么每次push所有的store,而不是只取增量。
  2. updates chan阻塞会发生什么

Reference

kubernetes1.9源码阅读 kubelet对pod资源的watch

kubelet之store