以nodeInformer 为例分析当我们注册SharedIndexInformer handler AddEventHandler
后,发生了什么,为什么能通过Get方法在cache中获取到最新的数据,
新的事件到来,我们注册的 hander在什么时候被调用,其他informer流程基本一样
NodeLister interface 的定义
1
2
3
4
5
6
7
8
9
10
| // node list interface
type NodeLister interface {
// List lists all Nodes in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.Node, err error)
// Get retrieves the Node from the index for a given name.
// Objects returned here must be treated as read-only.
Get(name string) (*v1.Node, error)
NodeListerExpansion
}
|
初始化
k8s client和nodeLister实例过程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
连接到k8s api并调用 sharedInformer的factory方法,初始化一个k8sSharedInformerFactory
// kubeedge/cloud/pkg/common/informers/informers.go
func GetInformersManager() Manager {
k8sSharedInformerFactory: k8sinformer.NewSharedInformerFactory(client.GetKubeClient(), 0) // 这里 resyncPeriod 设置为0
}
// 获取这个私有的k8sSharedInformerFactory
func (ifs *informers) GetK8sInformerFactory() k8sinformer.SharedInformerFactory {
return ifs.k8sSharedInformerFactory
}
// kubeedge/cloud/pkg/edgecontroller/controller/upstream.go
// 初始化 nodeLister
func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.SharedInformerFactory) (*UpstreamController, error)
{
uc.nodeLister = factory.Core().V1().Nodes().Lister()
}
|
如何使用
使用nodeLister的Get,从informer获取*v1.node 对象,可以参考如下代码:
1
2
3
| func kubeClientGet(uc *UpstreamController, namespace string, name string, queryType string, msg model.Message) (metaV1.Object, error){
obj, err = uc.nodeLister.Get(name)
}
|
这样就可以通过nodeLister获取到指定name的 v1.Node的对象指针。
窥探
需要思考的问题是,这里的Get是如何获取到对象的。
通过跟踪Get方法代码,很容易找得到:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| type nodeLister struct {
indexer cache.Indexer
}
// Get retrieves the Node from the index for a given name.
func (s *nodeLister) Get(name string) (*v1.Node, error) {
obj, exists, err := s.indexer.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1.Resource("node"), name)
}
return obj.(*v1.Node), nil
}
|
indexer 是在上面的 factory.Core().V1().Nodes().Lister()初始化的,
这里调用了两个方法:Nodes()和Lister()
Nodes()是初始化了一个默认的NodeInformer
Lister() 则紧接着调用了NodeInformer的Lister方法,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| // nodeInformer 结构体定义
type nodeInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// Nodes returns a NodeInformer.
func (v *version) Nodes() NodeInformer {
return &nodeInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}
}
func NewNodeLister(indexer cache.Indexer) NodeLister {
return &nodeLister{indexer: indexer}
}
func (f *nodeInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredNodeInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *nodeInformer) Informer() cache.SharedIndexInformer {
// InformerFor 一方面调用defaultInformer创建informer,同时将新建informer保存到informers的map中,
// 之后调用Start()方法,会一次遍历所有的informer,调Run()方法启动
return f.factory.InformerFor(&corev1.Node{}, f.defaultInformer)
}
func (f *nodeInformer) Lister() v1.NodeLister {
return v1.NewNodeLister(f.Informer().GetIndexer())
}
|
根据nodeInformer结构体定义,factory被赋值为 sharedInformerFactory (k8s.io/client-go/informers/factory.go),
所以通过上面函数的调用,indexer的值为f.Informer().GetIndexer(), 也就是sharedInformerFactory.InformerFor(obj runtime.Object, newFunc NewInformerFunc)
这里的newFunc,就是NewFilteredNodeInformer(),indexer 的最终实例化, 因为调用关系比较复杂,需要谨记主线思路,找到indexer为什么能通过Get(nodename) 获取到对应的node对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // 真正创建indexer的函数
func NewFilteredNodeInformer(client kubernetes.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{ // 定义大名鼎鼎的 list watch回调函数
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Nodes().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Nodes().Watch(context.TODO(), options)
},
},
&corev1.Node{},
resyncPeriod,
indexers,
)
}
|
这里要留意一下WatchFunc, 通过cleint-go的informer框架转一圈,最后我们还会回到这里来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // k8s.io/client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
// 初始化indexer,数据在这里读、写 k8s.io/client-go/tools/cache/store.go
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, // 赋值 ListerWatcher
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod, // 根据上面初始化的resyncPeriod,这里都为0
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
|
这里最终初始化了一个 sharedIndexInformer,这个结构体的方法中需要实现SharedIndexInformer interface。
到这里informer内部的一系列的初始化,基本结束了。
所以nodeLister的Get()中,s.indexer.GetByKey(name)的调用就比较清晰了,就是调用的是:
1
2
3
4
5
| // k8s.io/client-go/tools/cache/store.go
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
item, exists = c.cacheStorage.Get(key)
return item, exists, nil
}
|
剩下的最终的问题,是数据怎么、何时写进去的。
怎么写进去的调用的是:
1
2
3
4
5
6
7
8
9
| // Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj)
return nil
}
|
何时写进去的,有需要绕一圈informer机制:
以kubeedge为例,在cloudcore启动时Run(),首先informers.GetInformersManager() 完成informer的初始化,文章开头提到的k8sSharedInformerFactory也在其中,
之后Run()中会调用gis.Start(ctx.Done()):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // ...
ifs.k8sSharedInformerFactory.Start(stopCh) //调用SharedInformerFactory中的Start方法
// ...
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers { //遍历informers map
if !f.startedInformers[informerType] {
go informer.Run(stopCh) // 调用informer的Run(), informer开始工作
f.startedInformers[informerType] = true
}
}
}
|
1
2
3
4
5
| // 调用关系
sharedIndexInformer.Run()
controller.Run()
Reflector.Run() // watch 相关逻辑
wait.Until(c.processLoop, time.Second, stopCh)
|
sharedIndexInformer Run 开始, 里面会初始化一个 config,config中会带着 listWatch 和fifio, 以及驱动用户handler交互的HandleDeltas。
通过config,初始化一个controller。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| // k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//初始化fifo队列 同时 opts.KeyFunction = MetaNamespaceKeyFunc 给KeyFunction赋值 为MetaNamespaceKeyFunc
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer, // 这里把indexer传进来,只是为了在调用Replace() 时直到那些itmes是缺失的。
EmitDeltaTypeReplaced: true,
})
cfg := &Config{ // Config 实现了具体的Watch方法,所以治理要把包装后的indexer和 listWather都传过去
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, // 负责将DeltaFIFO items map中的数据同步到到indexer 调用indexer的Add方法
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 新建的cfg需要通过controller控制行为, listWatch的需要在controller中被启动,起始还有一层
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh) // 进入到controller Run方法
}
|
controller Run执行真正的watch, 同时周期行的启动processLoop,负责将 controller itmes中的数据同步到indexer中,供Get使用;同时调用用户注册的handler.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| // k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 这里就是网上各个图片中的各种分析Shared-Informer机制中的Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue, // 最终保存watch item到这里
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run) // 终于要开始watch了
wait.Until(c.processLoop, time.Second, stopCh) // 每秒 周期性的同步
wg.Wait()
}
// NewNamedReflector方法中,比较值得借鉴的就是BackoffManager, 退避重试
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
r.setExpectedType(expectedType)
return r
}
|
分析到这里可以贴一张图了:
开始ListAndWatch
1
2
3
4
5
6
7
8
9
| func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil { // 周期性的调用ListAndWatch(), 调用周期遵循上面的NewNamedReflector定义
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
|
具体分析ListAndWatch执行流程,一开始启动时,ListAndWatch会获取所有的item,之后根据资源的版本,增量的同步。
1
2
3
4
5
6
| func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
//... 函数比较长,我们主要关心的是watch
// 全量同步完成后,就剩下一个死循环 for{},里面通过Watch()返回的channel 接收新的item
w, err := r.listerWatcher.Watch(options)
r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh) // 处理 Watch返回的channel
}
|
watchHandler 处理Watch返回的item
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
for {
select {
// ...
case event, ok := <-w.ResultChan(): // 从channel中获取数据,然后保存到store
// ...
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object) // 调用DeltaFIFO的Add()
// ...
}
// ..
}
}
}
// k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock() // 上锁
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj) // return f.keyFunc(obj), 也就是调用MetaNamespaceKeyFunc(obj)
f.queue = append(f.queue, id) // append key
f.items[id] = newDeltas // 最终将对象添加到items中
f.cond.Broadcast() // 激活Pop
}
|
到此 watch中的数据处理结束,数据存到了 DeltaFIFO的map items中。
数据从DeltaFIFO 同步到indexer
controller中的 watch流程分析完,另外controller 的Run中还有processLoop(),
需要用到Config中初始化的 Process = sharedIndexInformer.HandleDeltas.
1
2
3
4
5
6
7
| // k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// ...
}
}
|
Queue.Pop 就是DeltaFIFO中的Pop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock() // 上锁, Pop函数是线性处理,所以一旦调用下游函数变慢,上面Add方法中的lock就会互斥等待
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
f.cond.Wait() // 如果queue空了,等待cond信号
}
id := f.queue[0] // 出队
item, ok := f.items[id] // 获取item
delete(f.items, id) // 从map中删除 item
err := process(item) // 调用process处理,同步到indexer, sharedIndexInformer.HandleDeltas
}
}
|
controller 的Process HandleDeltas
进入sharedIndexInformer.HandleDeltas,根据obj的Type判断动作,然后先尝试indexer获取obj,判断确实是否需要更新、添加,如果是删除,直接执行。
同时,还要回调informer的自定义handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.indexer.Add(d.Object)
s.indexer.Update(d.Object)
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
case Deleted:
s.indexer.Delete(d.Object)
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
// ...
listener.add(obj)
// ..
}
// 以下是informer handler的处理流程
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) pop() {
for {
case notificationToAdd, ok := <-p.addCh:
nextCh = p.nextCh
}
}
func (p *processorListener) run() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
}
|
自定义hander
以kubeedge的handler为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // OnAdd handle Add event
func (c *CommonResourceEventHandler) OnAdd(obj interface{}) {
c.obj2Event(watch.Added, obj)
}
func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
eventObj, ok := obj.(runtime.Object)
if !ok {
klog.Warningf("unknown type: %T, ignore", obj)
return
}
// All obj from client has been removed the information of apiversion/kind called MetaType,
// it is fatal to decode the obj as unstructured.Unstructure or unstructured.UnstructureList at edge.
err := util.SetMetaType(eventObj)
if err != nil {
klog.Warningf("failed to set meta type :%v", err)
}
c.events <- watch.Event{Type: t, Object: eventObj} // 送到events channel去消费
}
type NodesManager struct {
events chan watch.Event
}
func (dc *DownstreamController) syncEdgeNodes() {
for {
select {
case e := <-dc.nodeManager.Events():
}
}
|
所以informer的handle调用函数必须快速处理业务逻辑,不能出现阻塞,否则Pop函数会一直被锁住。
Watch内部流程
1
2
3
4
| // Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
|
调用NewFilteredNodeInforme中定义的WatchFunc,这里调用的是:
client.CoreV1().Nodes().Watch(context.TODO(), options)
底层调用的是:
1
2
3
4
5
| return c.client.Get().
Resource("nodes").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| // watch 调用流程
// k8s.io/client-go/rest/request.go
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// ...
req, err := r.newHTTPRequest(ctx)
return r.newStreamWatcher(resp)
// ..
}
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
// ...
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
}
// k8s.io/apimachinery/pkg/watch/streamwatcher.go
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
go sw.receive()
}
func (sw *StreamWatcher) receive() {
for {
action, obj, err := sw.source.Decode()
case sw.result <- Event{ // 将读取到的数据写入到result channel中
Type: action,
Object: obj,
}:
}
}
// 获取 result channel
func (sw *StreamWatcher) ResultChan() <-chan Event {
return sw.result
}
|
ResultChan() 对应的就是 Reflector.watchHandler中的<-w.ResultChan()。