以nodeInformer 为例分析当我们注册SharedIndexInformer handler AddEventHandler后,发生了什么,为什么能通过Get方法在cache中获取到最新的数据, 新的事件到来,我们注册的 hander在什么时候被调用,其他informer流程基本一样

NodeLister interface 的定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// node list interface
type NodeLister interface {
	// List lists all Nodes in the indexer.
	// Objects returned here must be treated as read-only.
	List(selector labels.Selector) (ret []*v1.Node, err error)
	// Get retrieves the Node from the index for a given name.
	// Objects returned here must be treated as read-only.
	Get(name string) (*v1.Node, error)
	NodeListerExpansion
}

初始化

k8s client和nodeLister实例过程如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

连接到k8s api并调用 sharedInformerfactory方法,初始化一个k8sSharedInformerFactory
// kubeedge/cloud/pkg/common/informers/informers.go
func GetInformersManager() Manager {
	k8sSharedInformerFactory:     k8sinformer.NewSharedInformerFactory(client.GetKubeClient(), 0) // 这里 resyncPeriod 设置为0
}

// 获取这个私有的k8sSharedInformerFactory
func (ifs *informers) GetK8sInformerFactory() k8sinformer.SharedInformerFactory {
	return ifs.k8sSharedInformerFactory
}


// kubeedge/cloud/pkg/edgecontroller/controller/upstream.go
// 初始化 nodeLister
func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.SharedInformerFactory) (*UpstreamController, error)
{
	uc.nodeLister = factory.Core().V1().Nodes().Lister()
}

如何使用

使用nodeLister的Get,从informer获取*v1.node 对象,可以参考如下代码:

1
2
3
func kubeClientGet(uc *UpstreamController, namespace string, name string, queryType string, msg model.Message) (metaV1.Object, error){
	obj, err = uc.nodeLister.Get(name)
}

这样就可以通过nodeLister获取到指定name的 v1.Node的对象指针。

窥探

需要思考的问题是,这里的Get是如何获取到对象的。
通过跟踪Get方法代码,很容易找得到:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type nodeLister struct {
	indexer cache.Indexer
}

// Get retrieves the Node from the index for a given name.
func (s *nodeLister) Get(name string) (*v1.Node, error) {
	obj, exists, err := s.indexer.GetByKey(name)
	if err != nil {
		return nil, err
	}
	if !exists {
		return nil, errors.NewNotFound(v1.Resource("node"), name)
	}
	return obj.(*v1.Node), nil
}

indexer 是在上面的 factory.Core().V1().Nodes().Lister()初始化的, 这里调用了两个方法:Nodes()和Lister() Nodes()是初始化了一个默认的NodeInformer
Lister() 则紧接着调用了NodeInformer的Lister方法,具体代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// nodeInformer 结构体定义
type nodeInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// Nodes returns a NodeInformer.
func (v *version) Nodes() NodeInformer {
	return &nodeInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}
}

func NewNodeLister(indexer cache.Indexer) NodeLister {
	return &nodeLister{indexer: indexer}
}

func (f *nodeInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredNodeInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *nodeInformer) Informer() cache.SharedIndexInformer {
    // InformerFor 一方面调用defaultInformer创建informer,同时将新建informer保存到informers的map中,
    // 之后调用Start()方法,会一次遍历所有的informer,调Run()方法启动
	return f.factory.InformerFor(&corev1.Node{}, f.defaultInformer) 
}

func (f *nodeInformer) Lister() v1.NodeLister {
	return v1.NewNodeLister(f.Informer().GetIndexer())
}

创建Informer

根据nodeInformer结构体定义,factory被赋值为 sharedInformerFactory (k8s.io/client-go/informers/factory.go), 所以通过上面函数的调用,indexer的值为f.Informer().GetIndexer(), 也就是sharedInformerFactory.InformerFor(obj runtime.Object, newFunc NewInformerFunc)
这里的newFunc,就是NewFilteredNodeInformer(),indexer 的最终实例化, 因为调用关系比较复杂,需要谨记主线思路,找到indexer为什么能通过Get(nodename) 获取到对应的node对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 真正创建indexer的函数
func NewFilteredNodeInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{ // 定义大名鼎鼎的 list watch回调函数
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Nodes().List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Nodes().Watch(context.TODO(), options)
			},
		},
		&corev1.Node{},
		resyncPeriod,
		indexers,
	)
}

这里要留意一下WatchFunc, 通过cleint-go的informer框架转一圈,最后我们还会回到这里来。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// k8s.io/client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},

        // 初始化indexer,数据在这里读、写 k8s.io/client-go/tools/cache/store.go
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), 
		listerWatcher:                   lw, // 赋值 ListerWatcher
		objectType:                      exampleObject,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod, // 根据上面初始化的resyncPeriod,这里都为0
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

这里最终初始化了一个 sharedIndexInformer,这个结构体的方法中需要实现SharedIndexInformer interface。

到这里informer内部的一系列的初始化,基本结束了。

所以nodeLister的Get()中,s.indexer.GetByKey(name)的调用就比较清晰了,就是调用的是:

1
2
3
4
5
// k8s.io/client-go/tools/cache/store.go
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
	item, exists = c.cacheStorage.Get(key)
	return item, exists, nil
}

Add方法数据写入到Informer

剩下的最终的问题,是数据怎么、何时写进去的。 怎么写进去的调用的是:

1
2
3
4
5
6
7
8
9
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Add(key, obj)
	return nil
}

何时写进去的,有需要绕一圈informer机制: 以kubeedge为例,在cloudcore启动时Run(),首先informers.GetInformersManager() 完成informer的初始化,文章开头提到的k8sSharedInformerFactory也在其中,
之后Run()中会调用gis.Start(ctx.Done()):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// ...
ifs.k8sSharedInformerFactory.Start(stopCh) //调用SharedInformerFactory中的Start方法
// ...

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
	defer f.lock.Unlock()
 
	for informerType, informer := range f.informers { //遍历informers map
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)  // 调用informer的Run(), informer开始工作
			f.startedInformers[informerType] = true
		}
	}
}

shared_informer 工作过程分析

1
2
3
4
5
// 调用关系
sharedIndexInformer.Run()
    controller.Run()
        Reflector.Run()  // watch 相关逻辑
        wait.Until(c.processLoop, time.Second, stopCh)

启动Informer

sharedIndexInformer Run 开始, 里面会初始化一个 config,config中会带着 listWatch 和fifio, 以及驱动用户handler交互的HandleDeltas。 通过config,初始化一个controller。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

    //初始化fifo队列 同时 opts.KeyFunction = MetaNamespaceKeyFunc 给KeyFunction赋值 为MetaNamespaceKeyFunc
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 
		KnownObjects:          s.indexer, // 这里把indexer传进来,只是为了在调用Replace() 时直到那些itmes是缺失的。
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{ // Config 实现了具体的Watch方法,所以治理要把包装后的indexer和 listWather都传过去
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process:           s.HandleDeltas, // 负责将DeltaFIFO items map中的数据同步到到indexer 调用indexer的Add方法
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

        // 新建的cfg需要通过controller控制行为, listWatch的需要在controller中被启动,起始还有一层
		s.controller = New(cfg) 
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)  // 进入到controller Run方法
}

controller Run执行真正的watch, 同时周期行的启动processLoop,负责将 controller itmes中的数据同步到indexer中,供Get使用;同时调用用户注册的handler.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()

    // 这里就是网上各个图片中的各种分析Shared-Informer机制中的Reflector
	r := NewReflector(  
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue, // 最终保存watch item到这里
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run) // 终于要开始watch了

	wait.Until(c.processLoop, time.Second, stopCh) // 每秒 周期性的同步
	wg.Wait()
}

// NewNamedReflector方法中,比较值得借鉴的就是BackoffManager, 退避重试
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	realClock := &clock.RealClock{}
	r := &Reflector{
		name:          name,
		listerWatcher: lw,
		store:         store,
		// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
		// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
		// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
		backoffManager:         wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
		initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
		resyncPeriod:           resyncPeriod,
		clock:                  realClock,
		watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),
	}
	r.setExpectedType(expectedType)
	return r
}

分析到这里可以贴一张图了: informer

开始ListAndWatch

1
2
3
4
5
6
7
8
9
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {  // 周期性的调用ListAndWatch(), 调用周期遵循上面的NewNamedReflector定义
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

具体分析ListAndWatch执行流程,一开始启动时,ListAndWatch会获取所有的item,之后根据资源的版本,增量的同步。

1
2
3
4
5
6
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    //... 函数比较长,我们主要关心的是watch
    // 全量同步完成后,就剩下一个死循环 for{},里面通过Watch()返回的channel 接收新的item
    w, err := r.listerWatcher.Watch(options)
    r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh) // 处理 Watch返回的channel
}

watchHandler 处理Watch返回的item

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    for {
		select {
// ...
        case event, ok := <-w.ResultChan(): // 从channel中获取数据,然后保存到store
        // ...
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object) // 调用DeltaFIFO的Add() 
            // ...
            }
// ..
        }
    }
}


// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock() // 上锁
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj) // return f.keyFunc(obj), 也就是调用MetaNamespaceKeyFunc(obj)
    f.queue = append(f.queue, id) // append key
    f.items[id] = newDeltas // 最终将对象添加到items中
    f.cond.Broadcast() // 激活Pop
}

到此 watch中的数据处理结束,数据存到了 DeltaFIFO的map items中。

数据从DeltaFIFO 同步到indexer

controller中的 watch流程分析完,另外controller 的Run中还有processLoop(),
需要用到Config中初始化的 Process = sharedIndexInformer.HandleDeltas.

1
2
3
4
5
6
7
// k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		// ...
	}
}

Queue.Pop 就是DeltaFIFO中的Pop()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock() // 上锁, Pop函数是线性处理,所以一旦调用下游函数变慢,上面Add方法中的lock就会互斥等待
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            f.cond.Wait() // 如果queue空了,等待cond信号
        }

        id := f.queue[0] // 出队
        item, ok := f.items[id] // 获取item
        delete(f.items, id) // 从map中删除 item

        err := process(item) // 调用process处理,同步到indexer, sharedIndexInformer.HandleDeltas
    }
}

controller 的Process HandleDeltas

进入sharedIndexInformer.HandleDeltas,根据obj的Type判断动作,然后先尝试indexer获取obj,判断确实是否需要更新、添加,如果是删除,直接执行。 同时,还要回调informer的自定义handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
             s.indexer.Add(d.Object)
             s.indexer.Update(d.Object)
             s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
        case Deleted:
            s.indexer.Delete(d.Object)
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
}


func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    // ...
        listener.add(obj)
    // ..
}

// 以下是informer handler的处理流程
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

func (p *processorListener) pop() {
    for {
        case notificationToAdd, ok := <-p.addCh:
            nextCh = p.nextCh
    }
}

func (p *processorListener) run() {
    for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
	}
}

自定义hander

以kubeedge的handler为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// OnAdd handle Add event
func (c *CommonResourceEventHandler) OnAdd(obj interface{}) {
	c.obj2Event(watch.Added, obj)
}

func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
	eventObj, ok := obj.(runtime.Object)
	if !ok {
		klog.Warningf("unknown type: %T, ignore", obj)
		return
	}
	// All obj from client has been removed the information of apiversion/kind called MetaType,
	// it is fatal to decode the obj as unstructured.Unstructure or unstructured.UnstructureList at edge.
	err := util.SetMetaType(eventObj)
	if err != nil {
		klog.Warningf("failed to set meta type :%v", err)
	}

	c.events <- watch.Event{Type: t, Object: eventObj}  // 送到events channel去消费
}

type NodesManager struct {
	events chan watch.Event
}


func (dc *DownstreamController) syncEdgeNodes() {
    for {
		select {
            case e := <-dc.nodeManager.Events():
        }
}

所以informer的handle调用函数必须快速处理业务逻辑,不能出现阻塞,否则Pop函数会一直被锁住。

Watch内部流程

1
2
3
4
// Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return lw.WatchFunc(options)
}

调用NewFilteredNodeInforme中定义的WatchFunc,这里调用的是: client.CoreV1().Nodes().Watch(context.TODO(), options)

底层调用的是:

1
2
3
4
5
return c.client.Get().
		Resource("nodes").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Watch(ctx)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// watch 调用流程
// k8s.io/client-go/rest/request.go
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
    // ...

    req, err := r.newHTTPRequest(ctx)
    return r.newStreamWatcher(resp)

    // ..
}

func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
    // ...

    return watch.NewStreamWatcher(
		restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
		// use 500 to indicate that the cause of the error is unknown - other error codes
		// are more specific to HTTP interactions, and set a reason
		errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
	), nil
}

// k8s.io/apimachinery/pkg/watch/streamwatcher.go
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
    go sw.receive()
}

func (sw *StreamWatcher) receive() {
    for {
		action, obj, err := sw.source.Decode()
        case sw.result <- Event{ // 将读取到的数据写入到result channel中
			Type:   action,
			Object: obj,
		}:
    }
}

// 获取 result channel
func (sw *StreamWatcher) ResultChan() <-chan Event {
	return sw.result
}

ResultChan() 对应的就是 Reflector.watchHandler中的<-w.ResultChan()。