本系列主要分析 k8s中的deployment如下问题:
源码分析基于kubernetes 1.18.8
前置知识
组件
k8s中有多个组件,其中最主要的有kube-apiserver、etcd、kube-scheduler、kube-controller-manager、kubelet、kube-proxy。
鉴于本文的目的,本次只对kube-scheduler、kube-controller-manager 两个组件的部分行为做重点分析,kube-apiserver顺带分析一部分。
组件功能
官方文档已经讲的比较透彻,把官方文档搬运过来的目的,是为了后续分析更加顺畅。
kube-scheduler:
Control plane component that watches for newly created Pods with no assigned node, and selects a node for them to run on.
主要是负责将pod调度到node上。
kube-controller-manager:
- Node controller: Responsible for noticing and responding when nodes go down。负责跟踪并更新node状态。
- EndpointSlice controller: Populates EndpointSlice objects (to provide a link between Services and Pods).
- Job controller: Watches for Job objects that represent one-off tasks, then creates Pods to run those tasks to completion.
以上两项其主要目的是保证pod在正确的状态,运行正确的数量。
Kubernetes使用Informer代替Controller去访问API Server,Controller的所有操作都和Informer进行交互,Informer使用ListAndWatch的机制.
Informer负责与Kubernetes APlServer进行Watch操作, Watch的资源,可以是Kubernetes内置资源对象,也可以是CRD.
源码分析
初始化
所有的源码只贴出来相关函数中,本次分析侧重的代码部分。
创建DeploymentController初始化操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // kubernetes/pkg/controller/deployment/deployment_controller.go
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ...
dc := &DeploymentController{
client: client, // 初始化k8s client,通过client-go中的informer机制,与其他组件通信
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
// 初始化限速队列,informer接收到的所有资源,都在这个队列里
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
// 真正的消费并处理deployment的方法
dc.syncHandler = dc.syncDeployment
// 初始化入队deployment资源的函数
dc.enqueueDeployment = dc.enqueue
// 初始化deployment和replicaSet的EventHandler
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
// ...
}
|
enqueue 是一个入队函数,参数是deployment资源对象,所有deployment、replicaset的EventHandler接收到数据后,最终调用此函数入队资源的key
1
2
3
4
5
6
7
8
9
10
| func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
//获取到的key形式:namespace/deploymentName (gvs/fengniaohezi-1673511852282-deployment-nginx)
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.Add(key)
}
|
// 挑两个EventHandler看一下
1
2
3
4
5
6
7
8
9
10
11
12
| func (dc *DeploymentController) addDeployment(obj interface{}) {
d := obj.(*apps.Deployment)
klog.V(4).Infof("Adding deployment %s", d.Name)
dc.enqueueDeployment(d) //调用enqueue入队
}
func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
oldD := old.(*apps.Deployment)
curD := cur.(*apps.Deployment)
klog.V(4).Infof("Updating deployment %s", oldD.Name)
dc.enqueueDeployment(curD) //调用enqueue入队
}
|
所以DeploymentController通过informer收到deployment资源操作后,通过必要的逻辑判断,最终都是入队dc.queue,进行后续的事件循环。
启动
DeploymentController被创建后,调用Run方法,启动controller
1
2
3
4
5
6
7
8
9
10
11
12
| // kubernetes/cmd/kube-controller-manager/app/apps.go
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
// 这里启动DeploymentController
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
}
|
看看Run方法中,我们关心的部分:
1
2
3
4
5
6
7
8
| func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
// ...
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
// ...
}
|
Run方法中根据配置的workers值,每秒钟调用一次 dc.worker 方法
1
2
3
4
5
6
7
8
9
10
11
12
| // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}
func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get() // 从队列取出一个key
// 开始处理deployment,通过上面的New方法得知,调用的是func (dc *DeploymentController) syncDeployment(key string) error
err := dc.syncHandler(key.(string))
}
|
worker 的注释已经写的很好了,出队一个资源的key,然后处理资源,不会对同一个资源做并发操作。
1
2
3
4
5
6
7
8
9
10
11
12
| func (dc *DeploymentController) syncDeployment(key string) error {
// 从Informer.Lister()中拿到deployment
deployment, err := dc.dLister.Deployments(namespace).Get(name)
// ...
// 判断deployment.Spec.Strategy.Type, 如果是创建资源,RecreateDeploymentStrategyType = "Recreate"
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d, rsList, podMap) // 创建replicaSet
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d, rsList)
}
}
|
进入到replicaSet的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| // kubernetes/pkg/controller/deployment/recreate.go
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
// 如果没有对应的replicaSet则创建:
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
// scale up new replica set.
if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
return err
}
}
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
return scaled, err
}
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
// 这里判断是要增加还是减少pod
if *(rs.Spec.Replicas) < newScale {
scalingOperation = "up"
} else {
scalingOperation = "down"
}
// 创建replicaSet 的event
scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
}
func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
// 调client-go创建replicaSet update event
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
}
|
至此deployment的创建的使命结束,进入到ReplicaSets的逻辑。
ReplicaSets
replicaSet的代码路径 kubernetes/pkg/controller/replicaset/replica_set.go,
同样的也经过一些列的初始化操作,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient, // 初始化client-go
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName), // 初始化限速队列
}
// 事件驱动的入口点,接受replicaSet的evnet,和deployment一样,最终是入队 queue
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addRS,
UpdateFunc: rsc.updateRS,
DeleteFunc: rsc.deleteRS,
})
// 真正的replicaSet处理函数
rsc.syncHandler = rsc.syncReplicaSet
}
|
因为整理逻辑框架与deployment类似,快速过一下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
| // Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(rsc.worker, time.Second, stopCh)
}
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
err := rsc.syncHandler(key.(string))
}
|
核心的syncReplicaSet,处理逻辑开始:
1
2
3
4
5
6
7
8
| func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 同样的通过key,先拿到资源
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
// 维护pod数量,达到rs的要求,好戏从这里开始
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
|
manageReplicas 主要是根据传入的参宿检查、判断,并更新replicas
1
2
3
4
5
6
7
8
9
10
11
12
| func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// 计算pod 的diff
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
if diff < 0 { // 需要增加pod
// slowStartBatch 批量创建pod,出错则退出
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
})
} else if diff > 0 { // 需要减少pod
err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); // 删除pod
}
}
|
CreatePodsWithControllerRef 是一个interface,默认的实现在 kubernetes/pkg/controller/controller_utils.go
1
2
3
4
5
6
7
8
| func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
return r.createPods("", namespace, template, controllerObject, controllerRef)
}
func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
// 核心的也是调用client-go 通过informer通知pod controller创建pod
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}
|
至此pod创建流程基本结束,有一个未解决的问题是,toleration相关的赋值,并没有在上面的代码分析中看到。
最终发现,toleration相关操作,在api-server中赋值的,
以下只对api-server toleration相关流程简单分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| // kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go
注册handler
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
for _, action := range actions {
switch action.Verb {
case "PUT": // Update a resource.
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))
case "POST": // Create a resource.
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
}
}
}
func restfulUpdateResource(r rest.Updater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.UpdateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
if mutatingAdmission.Handles(admission.Update) { // 调用 Admit
return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope)
}
}
}
// kubernetes/plugin/pkg/admission/defaulttolerationseconds/admission.go
TaintNodeNotReady = "node.kubernetes.io/not-ready"
TaintNodeUnreachable = "node.kubernetes.io/unreachable"
func (p *Plugin) Admit(ctx context.Context, attributes admission.Attributes, o admission.ObjectInterfaces) (err error){
for _, toleration := range tolerations { // 检查toleration是否被设置
if (toleration.Key == v1.TaintNodeNotReady || len(toleration.Key) == 0) &&
(toleration.Effect == api.TaintEffectNoExecute || len(toleration.Effect) == 0) {
toleratesNodeNotReady = true
}
if (toleration.Key == v1.TaintNodeUnreachable || len(toleration.Key) == 0) &&
(toleration.Effect == api.TaintEffectNoExecute || len(toleration.Effect) == 0) {
toleratesNodeUnreachable = true
}
}
// 如果没有被配置,则配置相关toleration
if !toleratesNodeNotReady {
pod.Spec.Tolerations = append(pod.Spec.Tolerations, notReadyToleration)
}
if !toleratesNodeUnreachable {
pod.Spec.Tolerations = append(pod.Spec.Tolerations, unreachableToleration)
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| var (
defaultNotReadyTolerationSeconds = flag.Int64("default-not-ready-toleration-seconds", 300,
"Indicates the tolerationSeconds of the toleration for notReady:NoExecute"+
" that is added by default to every pod that does not already have such a toleration.")
defaultUnreachableTolerationSeconds = flag.Int64("default-unreachable-toleration-seconds", 300,
"Indicates the tolerationSeconds of the toleration for unreachable:NoExecute"+
" that is added by default to every pod that does not already have such a toleration.")
notReadyToleration = api.Toleration{
Key: v1.TaintNodeNotReady,
Operator: api.TolerationOpExists,
Effect: api.TaintEffectNoExecute,
TolerationSeconds: defaultNotReadyTolerationSeconds,
}
unreachableToleration = api.Toleration{
Key: v1.TaintNodeUnreachable,
Operator: api.TolerationOpExists,
Effect: api.TaintEffectNoExecute,
TolerationSeconds: defaultUnreachableTolerationSeconds, // 可以看到pod 每五分钟被创建一次的原因
}
)
|