本系列主要分析 k8s中的deployment如下问题:

  • 为什么被创建pod
  • 如何调度
  • 为什么被销毁

源码分析基于kubernetes 1.18.8

前置知识

组件

k8s中有多个组件,其中最主要的有kube-apiserver、etcd、kube-scheduler、kube-controller-manager、kubelet、kube-proxy。
鉴于本文的目的,本次只对kube-scheduler、kube-controller-manager 两个组件的部分行为做重点分析,kube-apiserver顺带分析一部分。

组件功能

官方文档已经讲的比较透彻,把官方文档搬运过来的目的,是为了后续分析更加顺畅。 kube-scheduler: Control plane component that watches for newly created Pods with no assigned node, and selects a node for them to run on. 主要是负责将pod调度到node上。

kube-controller-manager:

  • Node controller: Responsible for noticing and responding when nodes go down。负责跟踪并更新node状态。
  • EndpointSlice controller: Populates EndpointSlice objects (to provide a link between Services and Pods).
  • Job controller: Watches for Job objects that represent one-off tasks, then creates Pods to run those tasks to completion. 以上两项其主要目的是保证pod在正确的状态,运行正确的数量。

Informer

Kubernetes使用Informer代替Controller去访问API Server,Controller的所有操作都和Informer进行交互,Informer使用ListAndWatch的机制. Informer负责与Kubernetes APlServer进行Watch操作, Watch的资源,可以是Kubernetes内置资源对象,也可以是CRD.

源码分析

初始化

所有的源码只贴出来相关函数中,本次分析侧重的代码部分。

创建DeploymentController初始化操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// kubernetes/pkg/controller/deployment/deployment_controller.go
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    // ...
    dc := &DeploymentController{
		client:        client,  // 初始化k8s client,通过client-go中的informer机制,与其他组件通信
		eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),

        // 初始化限速队列,informer接收到的所有资源,都在这个队列里
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), 
	}

    // 真正的消费并处理deployment的方法
    dc.syncHandler = dc.syncDeployment

    // 初始化入队deployment资源的函数
    dc.enqueueDeployment = dc.enqueue

    // 初始化deployment和replicaSet的EventHandler
    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addDeployment,
		UpdateFunc: dc.updateDeployment,
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: dc.deleteDeployment,
	})
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addReplicaSet,
		UpdateFunc: dc.updateReplicaSet,
		DeleteFunc: dc.deleteReplicaSet,
	})

    // ...
}

enqueue 是一个入队函数,参数是deployment资源对象,所有deployment、replicaset的EventHandler接收到数据后,最终调用此函数入队资源的key

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
    //获取到的key形式:namespace/deploymentName (gvs/fengniaohezi-1673511852282-deployment-nginx)
	key, err := controller.KeyFunc(deployment) 
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
		return
	}

	dc.queue.Add(key)
}

// 挑两个EventHandler看一下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (dc *DeploymentController) addDeployment(obj interface{}) {
	d := obj.(*apps.Deployment)
	klog.V(4).Infof("Adding deployment %s", d.Name)
	dc.enqueueDeployment(d) //调用enqueue入队
}

func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
	oldD := old.(*apps.Deployment)
	curD := cur.(*apps.Deployment)
	klog.V(4).Infof("Updating deployment %s", oldD.Name)
	dc.enqueueDeployment(curD) //调用enqueue入队
}

所以DeploymentController通过informer收到deployment资源操作后,通过必要的逻辑判断,最终都是入队dc.queue,进行后续的事件循环。

启动

DeploymentController被创建后,调用Run方法,启动controller

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// kubernetes/cmd/kube-controller-manager/app/apps.go
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)

    // 这里启动DeploymentController
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop) 
}

看看Run方法中,我们关心的部分:

1
2
3
4
5
6
7
8
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
    // ...
    for i := 0; i < workers; i++ {
		go wait.Until(dc.worker, time.Second, stopCh)
	}

     // ...
}

Run方法中根据配置的workers值,每秒钟调用一次 dc.worker 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
	for dc.processNextWorkItem() {
	}
}

func (dc *DeploymentController) processNextWorkItem() bool {
    key, quit := dc.queue.Get() // 从队列取出一个key
    // 开始处理deployment,通过上面的New方法得知,调用的是func (dc *DeploymentController) syncDeployment(key string) error
    err := dc.syncHandler(key.(string)) 
}

worker 的注释已经写的很好了,出队一个资源的key,然后处理资源,不会对同一个资源做并发操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (dc *DeploymentController) syncDeployment(key string) error {
    // 从Informer.Lister()中拿到deployment
    deployment, err := dc.dLister.Deployments(namespace).Get(name)
    // ...
    // 判断deployment.Spec.Strategy.Type, 如果是创建资源,RecreateDeploymentStrategyType =  "Recreate"
    switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(d, rsList, podMap) // 创建replicaSet
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(d, rsList)
	}
}

进入到replicaSet的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// kubernetes/pkg/controller/deployment/recreate.go
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
    // 如果没有对应的replicaSet则创建:
    newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)

    // scale up new replica set.
	if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
		return err
	}
}

// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
	return scaled, err
}

func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {

    // 这里判断是要增加还是减少pod
    if *(rs.Spec.Replicas) < newScale {
		scalingOperation = "up"
	} else {
		scalingOperation = "down"
	}

    // 创建replicaSet 的event
    scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
}

func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
    // 调client-go创建replicaSet update event
   rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
}

至此deployment的创建的使命结束,进入到ReplicaSets的逻辑。

ReplicaSets

replicaSet的代码路径 kubernetes/pkg/controller/replicaset/replica_set.go, 同样的也经过一些列的初始化操作,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    rsc := &ReplicaSetController{
		GroupVersionKind: gvk,
		kubeClient:       kubeClient, // 初始化client-go
		podControl:       podControl,
		burstReplicas:    burstReplicas,
		expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName), // 初始化限速队列
	}

    // 事件驱动的入口点,接受replicaSet的evnet,和deployment一样,最终是入队 queue
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    rsc.addRS,
		UpdateFunc: rsc.updateRS,
		DeleteFunc: rsc.deleteRS,
	})

    // 真正的replicaSet处理函数
    rsc.syncHandler = rsc.syncReplicaSet
}

因为整理逻辑框架与deployment类似,快速过一下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    go wait.Until(rsc.worker, time.Second, stopCh)
}

func (rsc *ReplicaSetController) worker() {
	for rsc.processNextWorkItem() {
	}
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    err := rsc.syncHandler(key.(string))
}

核心的syncReplicaSet,处理逻辑开始:

1
2
3
4
5
6
7
8
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    // 同样的通过key,先拿到资源
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)

    // 维护pod数量,达到rs的要求,好戏从这里开始
    manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}

manageReplicas 主要是根据传入的参宿检查、判断,并更新replicas

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    // 计算pod 的diff
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    if diff < 0 { // 需要增加pod
        // slowStartBatch 批量创建pod,出错则退出
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
        })
    } else if diff > 0 { // 需要减少pod
        err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs);  // 删除pod
    }
}

CreatePodsWithControllerRef 是一个interface,默认的实现在 kubernetes/pkg/controller/controller_utils.go

1
2
3
4
5
6
7
8
func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
	return r.createPods("", namespace, template, controllerObject, controllerRef)
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
    // 核心的也是调用client-go 通过informer通知pod controller创建pod
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}

至此pod创建流程基本结束,有一个未解决的问题是,toleration相关的赋值,并没有在上面的代码分析中看到。 最终发现,toleration相关操作,在api-server中赋值的, 以下只对api-server toleration相关流程简单分析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go
注册handler
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
    for _, action := range actions {
        switch action.Verb {
            case "PUT": // Update a resource.
                handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))

            case "POST": // Create a resource.
                handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
        }
    }
}


func restfulUpdateResource(r rest.Updater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.UpdateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
	}
}

func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
    if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
        if mutatingAdmission.Handles(admission.Update) { // 调用 Admit
						return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope)
					}
    }
}


// kubernetes/plugin/pkg/admission/defaulttolerationseconds/admission.go
TaintNodeNotReady = "node.kubernetes.io/not-ready"
TaintNodeUnreachable = "node.kubernetes.io/unreachable"

func (p *Plugin) Admit(ctx context.Context, attributes admission.Attributes, o admission.ObjectInterfaces) (err error){
    for _, toleration := range tolerations { // 检查toleration是否被设置
		if (toleration.Key == v1.TaintNodeNotReady || len(toleration.Key) == 0) &&
			(toleration.Effect == api.TaintEffectNoExecute || len(toleration.Effect) == 0) {
			toleratesNodeNotReady = true
		}

		if (toleration.Key == v1.TaintNodeUnreachable || len(toleration.Key) == 0) &&
			(toleration.Effect == api.TaintEffectNoExecute || len(toleration.Effect) == 0) {
			toleratesNodeUnreachable = true
		}
	}

    // 如果没有被配置,则配置相关toleration
	if !toleratesNodeNotReady {
		pod.Spec.Tolerations = append(pod.Spec.Tolerations, notReadyToleration)
	}

	if !toleratesNodeUnreachable {
		pod.Spec.Tolerations = append(pod.Spec.Tolerations, unreachableToleration)
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var (
	defaultNotReadyTolerationSeconds = flag.Int64("default-not-ready-toleration-seconds", 300,
		"Indicates the tolerationSeconds of the toleration for notReady:NoExecute"+
			" that is added by default to every pod that does not already have such a toleration.")

	defaultUnreachableTolerationSeconds = flag.Int64("default-unreachable-toleration-seconds", 300,
		"Indicates the tolerationSeconds of the toleration for unreachable:NoExecute"+
			" that is added by default to every pod that does not already have such a toleration.")

	notReadyToleration = api.Toleration{
		Key:               v1.TaintNodeNotReady,
		Operator:          api.TolerationOpExists,
		Effect:            api.TaintEffectNoExecute,
		TolerationSeconds: defaultNotReadyTolerationSeconds,
	}

	unreachableToleration = api.Toleration{
		Key:               v1.TaintNodeUnreachable,
		Operator:          api.TolerationOpExists,
		Effect:            api.TaintEffectNoExecute,
		TolerationSeconds: defaultUnreachableTolerationSeconds, // 可以看到pod 每五分钟被创建一次的原因
	}
)