cloud core 在整个kubeedge中扮演的角色:
本次只分析两大主要流程:
- 如何将kube-api中的数据同步到边缘节点
- 如何将边缘节点通过websocket、quic上报的状态数据同步到k8s
cloudcore启动流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| func NewCloudCoreCommand() *cobra.Command
=> client.InitKubeEdgeClient(config.KubeAPIConfig) // 初始化与k8s的连接 kubeClient、crdClient、dynamicClient
=> registerModules(config) // 注册所有需要启动的module
=> func Register(m Module) //最终调用, 将module存到 modules的map中
=> core.Run() // core.go
=> StartModules()
=> range modules // core.go
=> beehiveContext.AddModule(name) //
=> func AddModule(module string)
=> func (ctx *ChannelContext) AddModule(module string) // staging/src/github.com/kubeedge/beehive/pkg/core/context/context_channel.go
=> channel := ctx.newChannel()
=> channel := make(chan model.Message, ChannelSizeDefault) // ChannelSizeDefault = 1024
=> ctx.addChannel(module, channel)
=> ctx.channels[module] = moduleCh // 为每个模块,初始化一个channel, 默认大小1024, 以edgecontroller为例, module="edgecontroller"
=> beehiveContext.AddModuleGroup(name, module.Group())
=> context.moduleContext.AddModuleGroup(module, group)
=> func (ctx *ChannelContext) AddModuleGroup(module, group string)
=> channel := ctx.getChannel(module)
=> ctx.addTypeChannel(module, group, channel)
=> ctx.typeChannels[group] = make(map[string]chan model.Message) // typeChannels map[string]map[string]chan model.Message
=> ctx.typeChannels[group][module] = moduleCh // 以edgecontroller为例, group="edgecontroller" module="edgecontroller"
=> go module.Start() // core.go
=> func (a *cloudHub) Start() //cloudhub.go
=> go messageq.DispatchMessage() //从 cloudhub的channel中接受消息,下发到node中
=> func (q *ChannelMessageQueue) DispatchMessage()
=> msg, err := beehiveContext.Receive(model.SrcCloudHub) // model间的通信,都是通过 beehiveContext.Receive 接受, model.SrcCloudHub="cloudhub"
=> q.addListMessageToQueue(nodeID, &msg)
=> nodeListStore.Add(msg)
=> nodeListQueue.Add(messageKey)
=> q.addMessageToQueue(nodeID, &msg)
=> nodeQueue.Add(messageKey)
=> nodeStore.Add(msg)
=> servers.StartCloudHub(messageq)
=> var CloudhubHandler *MessageHandle //声明 单例CloudhubHandler
=> handler.InitHandler(messageq)
=> CloudhubHandler.initServerEntries()
=> mux.Entry(mux.NewPattern("*").Op("*"), mh.HandleServer) //所有http请求都走这个方法
=> func (mh *MessageHandle) HandleServer(container *mux.MessageContainer, writer mux.ResponseWriter)
=> if container.Message.GetOperation() == "upload" && container.Message.GetGroup() == modules.UserGroup
=> beehiveContext.Send(modules.RouterModuleName, *container.Message) // 发送到 router module, 此模块默认不开启,需要在配置文件中显式声明开启
=> go startWebsocketServer() // 启动 ws 服务, 10000端口
=> svc := server.Server{ConnNotify: handler.CloudhubHandler.OnRegister,}
=> go udsserver.StartServer(hubconfig.Config.UnixSocket.Address) // The uds server is only used to communicate with csi driver from kubeedge on cloud
=> func StartServer(address string)
=> uds.SetContextHandler
=> beehiveContext.SendSync(hubmodel.SrcCloudHub, *msg, constants.CSISyncMsgRespTimeout)
=> uds.StartServer()
=> listener, err := net.Listen(proto, addr)
=> c, err := listener.Accept()
=> go us.handleServerConn(c)
=> us.handleServerContext(string(buf[0:nr]))
=> return us.handler(context) // uds.SetContextHandler
=> func (ec *EdgeController) Start() //edgecontroller.go
=> func (dc *DeviceController) Start() // devicecontroller.go
=> func (sctl *SyncController) Start() // synccontroller.go
=> func (s *cloudStream) Start() // cloudstream.go
|
1. 下发逻辑
CloudHub 是整个cloudcore的消息枢纽。
负责将消息下发到edgecore,读取edgecore发送来的消息。
当我们通过kubectl执行一个deployment命令时,资源会经过kubeapi首先保存在etcd,cloudcore的SyncController模块在启动时,会定时List kubeapi的所有资源:
1
2
3
4
5
6
7
8
| go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())
allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list all the ObjectSyncs: %v", err)
}
sctl.manageObjectSync(allObjectSyncs)
|
消息会通过几层函数调用,最终到达:
sendEvents(err, nodeName, sync, sync.Spec.ObjectKind, object.GetResourceVersion(), object)
sendEvents 通过判断资源是否被删除,版本号是否是最新的,最终将消息发送到cloudhub模块。
cloudhub收到消息经过解析放到nodeQueue 类型为k8s的 RateLimitingInterface 限速队列。且每个node,有自己单独的限速队列:
nodeQueue := q.GetNodeQueue(nodeID)
cloudhub在初始化的时候会初始化connection handles:
1
2
3
4
5
6
7
| CloudhubHandler.Handlers = []HandleFunc{
CloudhubHandler.KeepaliveCheckLoop,
CloudhubHandler.MessageWriteLoop,
CloudhubHandler.ListMessageWriteLoop,
}
CloudhubHandler.initServerEntries() // 初始化连接handler,node的连接进来,调用到这里初始化的handler。
|
当edgecore新连接到cloudhub,会为该连接启动所有的handles
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // ServeConn starts serving the incoming connection
func (mh *MessageHandle) ServeConn(info *model.HubInfo) {
err := mh.RegisterNode(info) // 注册节点, 发送连接消息到cloudhub
if err != nil {
klog.Errorf("fail to register node %s, reason %s", info.NodeID, err.Error())
return
}
klog.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID)
exitServe := make(chan ExitCode, len(mh.Handlers))
for _, handle := range mh.Handlers { // 启动所有的handler
go handle(info, exitServe)
}
code := <-exitServe
mh.UnregisterNode(info, code)
}
|
在启动的hadlers中,func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode)
负责遍历自己的nodeQueue和nodeStore,如果有消息,在nodeConns sync.Map
nodeConns中拿到连接信息,发送数据到对应的连接中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| conn, ok := mh.nodeConns.Load(info.NodeID)
if !ok {
if retry == 1 {
break
}
retry++
time.Sleep(time.Second * retryInterval)
continue
}
err := mh.sendMsg(conn.(hubio.CloudHubIO), info, copyMsg, msg, nodeStore)
if err != nil {
klog.Errorf("Failed to send event to node: %s, affected event: %s, err: %s",
info.NodeID, dumpMessageMetadata(copyMsg), err.Error())
nodeQueue.AddRateLimited(key.(string))
time.Sleep(time.Second * 2) // 如果发送失败,休眠两秒重试
}
nodeQueue.Forget(key.(string)) // 成功,从本地队列删除消息
nodeQueue.Done(key)
|
2. 上报逻辑
上文提到的initServerEntries初始化新连接的handler
// initServerEntries register handler func
func (mh MessageHandle) initServerEntries() {
mux.Entry(mux.NewPattern("").Op("*"), mh.HandleServer)
}
插播开始
mux.Entry的实现
(跳过这个逻辑,不影响上报逻辑的流程梳理)
定义一个全局的MessageMux:
1
| MuxDefault = NewMessageMux()
|
为msg增加handler
1
2
3
4
| func Entry(pattern *MessagePattern, handle func(*MessageContainer, ResponseWriter)) *MessageMux {
MuxDefault.Entry(pattern, handle)
return MuxDefault
}
|
实现handerl的方法:
1
2
3
4
5
6
7
8
9
10
11
| type Handler interface {
ServeConn(req *MessageRequest, writer ResponseWriter)
}
func (mux *MessageMux) ServeConn(req *MessageRequest, writer ResponseWriter) {
err := mux.processFilter(req)
if err != nil {
return
}
mux.dispatch(req, writer)
}
|
之后 ws的handlerMesage会调用到次方法:
1
2
3
4
5
6
7
8
9
10
| func (conn *WSConnection) handleMessage() {
...
conn.handler.ServeConn(&mux.MessageRequest{
Header: conn.state.Headers,
Message: msg,
}, &responseWriter{
Type: api.ProtocolTypeWS,
Van: conn.wsConn,
})
}
|
在dispatch中调用注册的handler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| func (mux *MessageMux) dispatch(req *MessageRequest, writer ResponseWriter) error {
for _, entry := range mux.muxEntry {
// select entry
matched := entry.pattern.Match(req.Message)
if !matched {
continue
}
// extract parameters
parameters := mux.extractParameters(entry.pattern.resExpr, req.Message.GetResource())
// wrap message
container := mux.wrapMessage(req.Header, req.Message, parameters)
// call user handle of entry
entry.handleFunc(container, writer) // 调用注册的handler
return nil
}
return fmt.Errorf("failed to found entry for message")
}
|
插播结束
handerl的真正逻辑:
1
2
3
4
5
6
7
8
9
10
| // HandleServer handle all the request from node
func (mh *MessageHandle) HandleServer(container *mux.MessageContainer, writer mux.ResponseWriter) {
... 省略非主干逻辑,我们只关心消息最终去了哪里
err := mh.PubToController(&model.HubInfo{ProjectID: projectID, NodeID: nodeID}, container.Message)
if err != nil {
// if err, we should stop node, write data to edgehub, stop nodify
klog.Errorf("Failed to serve handle with error: %s", err.Error())
}
}
}
|
通过以上逻辑,将消息pub到controller:
1
2
3
4
5
6
7
8
9
10
| func (mh *MessageHandle) PubToController(info *model.HubInfo, msg *beehiveModel.Message) error {
msg.SetResourceOperation(fmt.Sprintf("node/%s/%s", info.NodeID, msg.GetResource()), msg.GetOperation())
if model.IsFromEdge(msg) {
err := mh.MessageQueue.Publish(msg)
if err != nil {
...
}
}
return nil
}
|
通过beehive发送到edgecontroller 模块。
1
2
3
4
5
6
7
| // Publish sends message via the channel to Controllers
func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error {
...
beehiveContext.SendToGroup(model.SrcEdgeController, *msg)
}
return nil
}
|
这部分主要由UpstreamController实现,beehive发送消息到edgecontroller,由func (uc *UpstreamController) dispatchMessage()
负责处理。
dispatchMessage解析消息结果,根据资源类型,发送到不同channel中,每个channle对应一组资源处理的goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func (uc *UpstreamController) dispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Info("stop dispatchMessage")
return
default:
}
msg, err := uc.messageLayer.Receive()
...
resourceType, err := messagelayer.GetResourceType(msg)
...
switch resourceType {
case model.ResourceTypeNodeStatus:
uc.nodeStatusChan <- msg
...
}
|
UpstreamController在启动时,为每个资源启动一组goroutine,比如node status:
1
2
3
| for i := 0; i < int(uc.config.Load.UpdateNodeStatusWorkers); i++ {
go uc.updateNodeStatus()
}
|
UpdateNodeStatusWorkers 数量根据配置文件时可配置的。
所有的配置项如下:
1
2
3
4
5
6
7
8
9
10
11
12
| deletePodWorkers: 4
queryConfigMapWorkers: 4
queryEndpointsWorkers: 4
queryNodeWorkers: 4
queryPersistentVolumeClaimWorkers: 4
queryPersistentVolumeWorkers: 4
querySecretWorkers: 4
queryServiceWorkers: 4
queryVolumeAttachmentWorkers: 4
updateNodeStatusWorkers: 1
updateNodeWorkers: 4
updatePodStatusWorkers: 1
|
以updateNodeStatus为例,做个简单分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (uc *UpstreamController) updateNodeStatus(){
select {
case <-beehiveContext.Done():
klog.Warning("stop updateNodeStatus")
return
case msg := <-uc.nodeStatusChan: // 接收消息并处理
...
switch msg.GetOperation() { // 解析消息的动作
case model.InsertOperation:
...
case model.UpdateOperation:
getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{}) // 调用kubeClient将消息同步到k8s
...
}
|
至此,消息的上下行逻辑,大概明了了。