上篇 K8s 源码分析 Deployment 和ReplicaSet 的创建流程 最后replicaSet通过client-go调用pod 创建的api。

创建流程

初始化

pod的调度主要有scheduler负责,创建scheduler源码路径:kubernetes/pkg/scheduler/scheduler.go 上代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// New returns a Scheduler
func New(client clientset.Interface,
	informerFactory informers.SharedInformerFactory,
	podInformer coreinformers.PodInformer,
	recorderFactory profile.RecorderFactory,
	stopCh <-chan struct{},
	opts ...Option) (*Scheduler, error) {
    
    // 初始化cache,30秒后过期,cache中的数据被清除
    schedulerCache := internalcache.New(30*time.Second, stopEverything)

    var sched *Scheduler
    source := options.schedulerAlgorithmSource // 根据调度算法不同,初始化scheduler
    switch {
	case source.Provider != nil:
		// Create the config from a named algorithm provider.
		sc, err := configurator.createFromProvider(*source.Provider)  // 一般都是默认调度算法

    // ...

    // 给scheduler添加 informer handler, controller 发出的pod变更请求,由这里的handler处理
    addAllEventHandlers(sched, informerFactory, podInformer)
    return sched, nil
}

createFromProvider 负责scheduler真正的初始化操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (c *Configurator) create() (*Scheduler, error) {
    // ...
    // Profiles are required to have equivalent queue sort plugins.
	lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()

    // informer的pod handler 将pod 入队 podQueue, 供scheduleOne(ctx context.Context)消费
	podQueue := internalqueue.NewSchedulingQueue(
		lessFn,
		internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
		internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
	)

    // ...
    return &Scheduler{
		SchedulerCache:  c.schedulerCache,
		Algorithm:       algo,
		Profiles:        profiles,
		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
		Error:           MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
		StopEverything:  c.StopEverything,
		VolumeBinder:    c.volumeBinder,
		SchedulingQueue: podQueue, // 将 podQueue 赋值给 SchedulingQueue
	}, nil
}


// 负责总queue中获取一个pod
// MakeNextPodFunc returns a function to retrieve the next pod from a given
// scheduling queue
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
	return func() *framework.PodInfo {
		podInfo, err := queue.Pop()
		if err == nil {
			klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
			return podInfo
		}
		klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
		return nil
	}
}

addAllEventHandlers 初始化的handler比较多,主要包括:pod、node、PersistentVolumes、PersistentVolumeClaims、service、StorageClasses 因本文只关心pod部分,所以以下代码只展示pod 相关handler。pod的handler分为scheduled和unscheduled.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 要先贴出来这个确定pod命运的函数
func assignedPod(pod *v1.Pod) bool {
	return len(pod.Spec.NodeName) != 0
}

// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
	return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
}

func addAllEventHandlers(
    sched *Scheduler,
	informerFactory informers.SharedInformerFactory,
	podInformer coreinformers.PodInformer,
) {
// scheduled pod cache
	podInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch t := obj.(type) {
				case *v1.Pod:
					return assignedPod(t)  // 如果pod已经分配了NodeName 那么这个pod就是scheduled
				case cache.DeletedFinalStateUnknown:
					if pod, ok := t.Obj.(*v1.Pod); ok {
						return assignedPod(pod)
					}
					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
					return false
				default:
					utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
					return false
				}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToCache,
				UpdateFunc: sched.updatePodInCache,
				DeleteFunc: sched.deletePodFromCache,
			},
		},
	)
	// unscheduled pod queue
	podInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch t := obj.(type) {
				case *v1.Pod:
                    // 如果pod没有分配 NodeName,并且已经分配了shceduler算法 那么这个pod就是unscheduled
					return !assignedPod(t) && responsibleForPod(t, sched.Profiles) 
				case cache.DeletedFinalStateUnknown:
					if pod, ok := t.Obj.(*v1.Pod); ok {
						return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
					}
					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
					return false
				default:
					utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
					return false
				}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToSchedulingQueue,
				UpdateFunc: sched.updatePodInSchedulingQueue,
				DeleteFunc: sched.deletePodFromSchedulingQueue,
			},
		},
	)
}

Pod Informer EventHandler的实现, 就是将pod加入到podQueue等待消费

1
2
3
4
5
6
7
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
	pod := obj.(*v1.Pod)
	klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
	if err := sched.SchedulingQueue.Add(pod); err != nil {
		utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
	}
}

Run Scheduler逻辑执行

初始化完成,就开始了熟悉的Run方法,和deployment、replicaSet的流程一致。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (sched *Scheduler) Run(ctx context.Context) {
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
}

// 顺序的消费podQueue中的pod
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // ...

    // 调度算法的具体实现
    // scheduleResult 将返回建议调度的node SuggestedHost
    scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)

    //...

    // 将pod bind到具体的pod上 
    err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
}

bind 的具体流程分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
    // ... 
    bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode)
	if bindStatus.IsSuccess() {
		return nil
	}
}

// 遍历并执行bind插件
func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
    for _, bp := range f.bindPlugins {
		status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
		if status != nil && status.Code() == Skip {
			continue
		}
		if !status.IsSuccess() {
			msg := fmt.Sprintf("plugin %q failed to bind pod \"%v/%v\": %v", bp.Name(), pod.Namespace, pod.Name, status.Message())
			klog.Error(msg)
			return NewStatus(Error, msg)
		}
		return status
	}
}

func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
    // 调用bind plugin interface中的Bind方法
    status := bp.Bind(ctx, state, pod, nodeName)

}

// 最终在DefaultBinder实现的方法中,找到了创建pod的具体实现
// Bind binds pods to nodes using the k8s client.
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(context.TODO(), binding, metav1.CreateOptions{})
	if err != nil {
		return framework.NewStatus(framework.Error, err.Error())
	}
	return nil
}

以上通过bind 调用k8s的client将请求发送给api-server,api-server收到请求后,先落库etcd,然后发送给kubelet。

pod的容忍和驱逐

那么为什么有的pod会被返回创建销毁,什么情况下被销毁?
销毁的原因之一就是pod的toleration策略。
k8s中具体实现此功能的struct为:NoExecuteTaintManager
负责监听Taint/Toleration 的改变,并删除Pods
源码路径:kubernetes/pkg/controller/nodelifecycle/scheduler/taint_manager.go

初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
    // 事件源
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
    // ...

    tm := &NoExecuteTaintManager{
		client:                c,
		recorder:              recorder,
		getPod:                getPod,
		getNode:               getNode,
		getPodsAssignedToNode: getPodsAssignedToNode,
		taintedNodes:          make(map[string][]v1.Taint),

		nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),

        // 定义podUpdateQueue,用于接受pod informer发来的pod事件
		podUpdateQueue:  workqueue.NewNamed("noexec_taint_pod"),
	}

    // 这个是TaintManager的核心,deletePodHandler 实际执行pod的删除操作,所以放在taintEvictionQueue表示,这个pod真正的要被干掉了
	tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
}

const (
    retries              = 5  // 最大重试5次
)
// 真正执行删除pod的函数
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
	return func(args *WorkArgs) error {
		ns := args.NamespacedName.Namespace
		name := args.NamespacedName.Name

        // 这个日志就是我们在看scheduler服务日志时,能看到的信息
        // {"log":"I0313 09:34:32.143424       1 taint_manager.go:106] NoExecuteTaintManager is deleting Pod: gvs/fengniaohezi-1673511852282-deployment-nginx-5d9cbfddd9-r84l8\n","stream":"stderr","time":"2023-03-13T09:34:32.143580888Z"}
		klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
		if emitEventFunc != nil {
			emitEventFunc(args.NamespacedName)
		}
		var err error
		for i := 0; i < retries; i++ {
			err = c.CoreV1().Pods(ns).Delete(context.TODO(), name, metav1.DeleteOptions{})
			if err == nil {
				break
			}
			time.Sleep(10 * time.Millisecond)
		}
		return err
	}
}

Run 执行taint逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}){
    // 更新node的状态,处理node相关逻辑 按下不表
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(nodeUpdateItem)
			hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.nodeUpdateQueue.Done(item)
				return
			case tc.nodeUpdateChannels[hash] <- nodeUpdate:
				// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
			}
		}
	}(stopCh)


    // 处理pod逻辑, podUpdateQueue中的数据从哪里来,之后分析,这里暂且认为podUpdateQueue中已经有数据
    go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(podUpdateItem)
			hash := hash(podUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.podUpdateQueue.Done(item)
				return
			case tc.podUpdateChannels[hash] <- podUpdate:
				// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
			}
		}
	}(stopCh)


    // 根据配置的worker数量,启动多个worker
    for i := 0; i < UpdateWorkerSize; i++ {
		go tc.worker(i, wg.Done, stopCh)
	}
}

woker负责同时消费 nodeUpdateChannels 和 podUpdateChannels ,因为node变更,会影响pod的分布:驱逐、分配、新建等等 所以需要先将 nodeUpdateChannels 中的数据消费完成后,才会处理 podUpdateChannels 的数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
	defer done()
	for {
		select {
		case <-stopCh:
			return
		case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
			tc.handleNodeUpdate(nodeUpdate)
			tc.nodeUpdateQueue.Done(nodeUpdate)
		case podUpdate := <-tc.podUpdateChannels[worker]:
		priority:
			for { // 直到 nodeUpdateChannels 处理完成
				select {
				case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
					tc.handleNodeUpdate(nodeUpdate)
					tc.nodeUpdateQueue.Done(nodeUpdate)
				default:
					break priority
				}
			}
			tc.handlePodUpdate(podUpdate)
			tc.podUpdateQueue.Done(podUpdate)
		}
	}
}

// 处理pod逻辑
// getPod 实现:
podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
    pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)


    taints, ok := func() ([]v1.Taint, bool) {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()

        // pod中的nodeName是否在taintedNodes中,如果在开始干掉pod
        // node状态变更,会维护 taintedNodes
		taints, ok := tc.taintedNodes[nodeName]  
		return taints, ok
	}()

    tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}

func (tc *NoExecuteTaintManager) processPodOnNode(
	podNamespacedName types.NamespacedName,
	nodeName string,
	tolerations []v1.Toleration,
	taints []v1.Taint,
	now time.Time,
) {
    if len(taints) == 0 {  //所以如果node不在污点里,这里直接返回了
		tc.cancelWorkWithEvent(podNamespacedName)
	}

    // ... 
    // 判断驱逐时间
    // 参考上篇中的 defaultUnreachableTolerationSeconds = 300 秒
    minTolerationTime := getMinTolerationTime(usedTolerations)
    startTime = scheduledEviction.CreatedAt
	if startTime.Add(minTolerationTime).Before(triggerTime) {
			return
	}

    // 需要驱逐,加入队列
    tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

AddWork

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
	key := args.KeyFromWorkArgs()
	klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)

	q.Lock()
	defer q.Unlock()
	if _, exists := q.workers[key]; exists {
		klog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
		return
	}

    // CreateWorker直接调用了 q.getWrappedWorkerFunc(key)
    // 其中的 workFunc 就是在初始化时,赋值的函数:deletePodHandler()
	worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
	q.workers[key] = worker
}

podUpdateQueue Add在哪里

到这里还要明确一个问题,podUpdateQueue中的数据是谁set进去的?
controller在初始化时,会初始化一个NodeLifecycleController,new NodeLifecycleController 之后,也会调用NodeLifecycleController的Run方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func NewNodeLifecycleController(){
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			pod := obj.(*v1.Pod)
			nc.podUpdated(nil, pod)
			if nc.taintManager != nil { 
				nc.taintManager.PodUpdated(nil, pod) // 调用NoExecuteTaintManager 中的 PodUpdated
			}
		},
		UpdateFunc: func(prev, obj interface{}) {
			prevPod := prev.(*v1.Pod)
			newPod := obj.(*v1.Pod)
			nc.podUpdated(prevPod, newPod)
			if nc.taintManager != nil {
				nc.taintManager.PodUpdated(prevPod, newPod)
			}
		}
        // ...
    }

    // 初始化 NoExecuteTaintManager
    nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
}

PodUpdated 负责将pod 加入到 podUpdateQueue 中

1
2
3
4
5
6
7
8
9
func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {

    // 如果podtoleration没有变化,则忽略这个pod,不会被驱逐
    if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
		return
	}

    tc.podUpdateQueue.Add(updateItem)
}