cloud core 在整个kubeedge中扮演的角色: Kubeedge-log-arch

本次只分析两大主要流程:

  1. 如何将kube-api中的数据同步到边缘节点
  2. 如何将边缘节点通过websocket、quic上报的状态数据同步到k8s

cloudcore启动流程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func NewCloudCoreCommand() *cobra.Command 
  => client.InitKubeEdgeClient(config.KubeAPIConfig) // 初始化与k8s的连接 kubeClient、crdClient、dynamicClient
  => registerModules(config) // 注册所有需要启动的module
    => func Register(m Module)  //最终调用, 将module存到 modules的map中
  => core.Run()  // core.go
  	=> StartModules()
  		=> range modules // core.go
  		=> beehiveContext.AddModule(name) // 
  			=> func AddModule(module string) 
  				=> func (ctx *ChannelContext) AddModule(module string) // staging/src/github.com/kubeedge/beehive/pkg/core/context/context_channel.go
  					=> channel := ctx.newChannel()
  						=> channel := make(chan model.Message, ChannelSizeDefault) // ChannelSizeDefault = 1024
  					=> ctx.addChannel(module, channel)
  						=> ctx.channels[module] = moduleCh  // 为每个模块,初始化一个channel, 默认大小1024, 以edgecontroller为例, module="edgecontroller"
  		=> beehiveContext.AddModuleGroup(name, module.Group())
  			=> context.moduleContext.AddModuleGroup(module, group)
  				=> func (ctx *ChannelContext) AddModuleGroup(module, group string)
  					=> channel := ctx.getChannel(module)
  					=> ctx.addTypeChannel(module, group, channel)
  						=> ctx.typeChannels[group] = make(map[string]chan model.Message) // typeChannels map[string]map[string]chan model.Message
  						=> ctx.typeChannels[group][module] = moduleCh // 以edgecontroller为例, group="edgecontroller" module="edgecontroller"
  		=> go module.Start() // core.go
  		=> func (a *cloudHub) Start()  //cloudhub.go
  			=> go messageq.DispatchMessage() //从 cloudhub的channel中接受消息,下发到node中
  				=> func (q *ChannelMessageQueue) DispatchMessage()
  					=> msg, err := beehiveContext.Receive(model.SrcCloudHub)  // model间的通信,都是通过 beehiveContext.Receive 接受, model.SrcCloudHub="cloudhub"
  					=> q.addListMessageToQueue(nodeID, &msg)
  						=> nodeListStore.Add(msg)
  						=> nodeListQueue.Add(messageKey)
  					=> q.addMessageToQueue(nodeID, &msg)
  						=> nodeQueue.Add(messageKey)
  						=> nodeStore.Add(msg)
  						
  			=> servers.StartCloudHub(messageq)
  				=> var CloudhubHandler *MessageHandle //声明 单例CloudhubHandler
  				=> handler.InitHandler(messageq)
  					=> CloudhubHandler.initServerEntries()
  						=> mux.Entry(mux.NewPattern("*").Op("*"), mh.HandleServer) //所有http请求都走这个方法
	  						=> func (mh *MessageHandle) HandleServer(container *mux.MessageContainer, writer mux.ResponseWriter)
								=> if container.Message.GetOperation() == "upload" && container.Message.GetGroup() == modules.UserGroup
								=> beehiveContext.Send(modules.RouterModuleName, *container.Message)  // 发送到 router module, 此模块默认不开启,需要在配置文件中显式声明开启
  				=> go startWebsocketServer() // 启动 ws 服务, 10000端口
  					=> svc := server.Server{ConnNotify: handler.CloudhubHandler.OnRegister,}
  			=> go udsserver.StartServer(hubconfig.Config.UnixSocket.Address) // The uds server is only used to communicate with csi driver from kubeedge on cloud
  			=> func StartServer(address string)
  				=> uds.SetContextHandler
  					=> beehiveContext.SendSync(hubmodel.SrcCloudHub, *msg, constants.CSISyncMsgRespTimeout)
  				=> uds.StartServer()
  					=> listener, err := net.Listen(proto, addr)
  					=> c, err := listener.Accept()
  					=> go us.handleServerConn(c)
  						=> us.handleServerContext(string(buf[0:nr]))
  							=> return us.handler(context)  // uds.SetContextHandler
  		=> func (ec *EdgeController) Start() //edgecontroller.go
  		=> func (dc *DeviceController) Start() // devicecontroller.go
  		=> func (sctl *SyncController) Start()  // synccontroller.go
  		=> func (s *cloudStream) Start() // cloudstream.go

1. 下发逻辑

CloudHub 是整个cloudcore的消息枢纽。 负责将消息下发到edgecore,读取edgecore发送来的消息。

当我们通过kubectl执行一个deployment命令时,资源会经过kubeapi首先保存在etcd,cloudcore的SyncController模块在启动时,会定时List kubeapi的所有资源:

1
2
3
4
5
6
7
8
go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())

allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
if err != nil {
	klog.Errorf("Failed to list all the ObjectSyncs: %v", err)
}

sctl.manageObjectSync(allObjectSyncs)

消息会通过几层函数调用,最终到达:

sendEvents(err, nodeName, sync, sync.Spec.ObjectKind, object.GetResourceVersion(), object)

sendEvents 通过判断资源是否被删除,版本号是否是最新的,最终将消息发送到cloudhub模块。

cloudhub收到消息经过解析放到nodeQueue 类型为k8s的 RateLimitingInterface 限速队列。且每个node,有自己单独的限速队列: nodeQueue := q.GetNodeQueue(nodeID)

cloudhub在初始化的时候会初始化connection handles:

1
2
3
4
5
6
7
		CloudhubHandler.Handlers = []HandleFunc{
			CloudhubHandler.KeepaliveCheckLoop,
			CloudhubHandler.MessageWriteLoop,
			CloudhubHandler.ListMessageWriteLoop,
		}

        CloudhubHandler.initServerEntries() // 初始化连接handler,node的连接进来,调用到这里初始化的handler。

当edgecore新连接到cloudhub,会为该连接启动所有的handles

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// ServeConn starts serving the incoming connection
func (mh *MessageHandle) ServeConn(info *model.HubInfo) {
	err := mh.RegisterNode(info) // 注册节点, 发送连接消息到cloudhub
	if err != nil {
		klog.Errorf("fail to register node %s, reason %s", info.NodeID, err.Error())
		return
	}

	klog.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID)
	exitServe := make(chan ExitCode, len(mh.Handlers))

	for _, handle := range mh.Handlers { // 启动所有的handler
		go handle(info, exitServe)
	}

	code := <-exitServe
	mh.UnregisterNode(info, code)
}

在启动的hadlers中,func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) 负责遍历自己的nodeQueue和nodeStore,如果有消息,在nodeConns sync.Map nodeConns中拿到连接信息,发送数据到对应的连接中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
            conn, ok := mh.nodeConns.Load(info.NodeID)
			if !ok {
				if retry == 1 {
					break
				}
				retry++
				time.Sleep(time.Second * retryInterval)
				continue
			}
			err := mh.sendMsg(conn.(hubio.CloudHubIO), info, copyMsg, msg, nodeStore)
			if err != nil {
				klog.Errorf("Failed to send event to node: %s, affected event: %s, err: %s",
					info.NodeID, dumpMessageMetadata(copyMsg), err.Error())
				nodeQueue.AddRateLimited(key.(string))
				time.Sleep(time.Second * 2)  // 如果发送失败,休眠两秒重试
			}
            nodeQueue.Forget(key.(string)) // 成功,从本地队列删除消息
		    nodeQueue.Done(key)

2. 上报逻辑

上文提到的initServerEntries初始化新连接的handler // initServerEntries register handler func func (mh MessageHandle) initServerEntries() { mux.Entry(mux.NewPattern("").Op("*"), mh.HandleServer) }

插播开始

mux.Entry的实现

(跳过这个逻辑,不影响上报逻辑的流程梳理) 定义一个全局的MessageMux:

1
MuxDefault = NewMessageMux()

为msg增加handler

1
2
3
4
func Entry(pattern *MessagePattern, handle func(*MessageContainer, ResponseWriter)) *MessageMux {
	MuxDefault.Entry(pattern, handle)
	return MuxDefault
}

实现handerl的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Handler interface {
	ServeConn(req *MessageRequest, writer ResponseWriter)
}

func (mux *MessageMux) ServeConn(req *MessageRequest, writer ResponseWriter) {
	err := mux.processFilter(req)
	if err != nil {
		return
	}
	mux.dispatch(req, writer)
}

之后 ws的handlerMesage会调用到次方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (conn *WSConnection) handleMessage() {
...
    	conn.handler.ServeConn(&mux.MessageRequest{
			Header:  conn.state.Headers,
			Message: msg,
		}, &responseWriter{
			Type: api.ProtocolTypeWS,
			Van:  conn.wsConn,
		})
}

在dispatch中调用注册的handler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (mux *MessageMux) dispatch(req *MessageRequest, writer ResponseWriter) error {
	for _, entry := range mux.muxEntry {
		// select entry
		matched := entry.pattern.Match(req.Message)
		if !matched {
			continue
		}

		// extract parameters
		parameters := mux.extractParameters(entry.pattern.resExpr, req.Message.GetResource())
		// wrap message
		container := mux.wrapMessage(req.Header, req.Message, parameters)
		// call user handle of entry
		entry.handleFunc(container, writer)  // 调用注册的handler
		return nil
	}
	return fmt.Errorf("failed to found entry for message")
}

插播结束

handerl的真正逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// HandleServer handle all the request from node
func (mh *MessageHandle) HandleServer(container *mux.MessageContainer, writer mux.ResponseWriter) {
	... 省略非主干逻辑,我们只关心消息最终去了哪里
		err := mh.PubToController(&model.HubInfo{ProjectID: projectID, NodeID: nodeID}, container.Message)
		if err != nil {
			// if err, we should stop node, write data to edgehub, stop nodify
			klog.Errorf("Failed to serve handle with error: %s", err.Error())
		}
	}
}

通过以上逻辑,将消息pub到controller:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (mh *MessageHandle) PubToController(info *model.HubInfo, msg *beehiveModel.Message) error {
	msg.SetResourceOperation(fmt.Sprintf("node/%s/%s", info.NodeID, msg.GetResource()), msg.GetOperation())
	if model.IsFromEdge(msg) {
		err := mh.MessageQueue.Publish(msg)
		if err != nil {
...
		}
	}
	return nil
}

通过beehive发送到edgecontroller 模块。

1
2
3
4
5
6
7
// Publish sends message via the channel to Controllers
func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error {
...
		beehiveContext.SendToGroup(model.SrcEdgeController, *msg)
	}
	return nil
}

这部分主要由UpstreamController实现,beehive发送消息到edgecontroller,由func (uc *UpstreamController) dispatchMessage() 负责处理。

dispatchMessage解析消息结果,根据资源类型,发送到不同channel中,每个channle对应一组资源处理的goroutine。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (uc *UpstreamController) dispatchMessage() {
	for {
		select {
		case <-beehiveContext.Done():
			klog.Info("stop dispatchMessage")
			return
		default:
		}
		msg, err := uc.messageLayer.Receive()
		...

		resourceType, err := messagelayer.GetResourceType(msg)
	    ...

		switch resourceType {
		case model.ResourceTypeNodeStatus:
			uc.nodeStatusChan <- msg
	...
}

UpstreamController在启动时,为每个资源启动一组goroutine,比如node status:

1
2
3
for i := 0; i < int(uc.config.Load.UpdateNodeStatusWorkers); i++ {
		go uc.updateNodeStatus()
	}

UpdateNodeStatusWorkers 数量根据配置文件时可配置的。 所有的配置项如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
      deletePodWorkers: 4
      queryConfigMapWorkers: 4
      queryEndpointsWorkers: 4
      queryNodeWorkers: 4
      queryPersistentVolumeClaimWorkers: 4
      queryPersistentVolumeWorkers: 4
      querySecretWorkers: 4
      queryServiceWorkers: 4
      queryVolumeAttachmentWorkers: 4
      updateNodeStatusWorkers: 1
      updateNodeWorkers: 4
      updatePodStatusWorkers: 1

以updateNodeStatus为例,做个简单分析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (uc *UpstreamController) updateNodeStatus(){
    select {
		case <-beehiveContext.Done():
			klog.Warning("stop updateNodeStatus")
			return
		case msg := <-uc.nodeStatusChan: // 接收消息并处理
            ...
            switch msg.GetOperation() { // 解析消息的动作
			case model.InsertOperation: 
            ...
            case model.UpdateOperation:
            getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{}) // 调用kubeClient将消息同步到k8s
            ...

}

至此,消息的上下行逻辑,大概明了了。