1. 同步k8s数据
    负责同步k8s数据的模块为cloudcore的edgecontroller中的controller downstream.go。
    NewDownstreamController 方法实例化,同时对原生的资源同步做了一个包装,比如:
1
2
podInformer := k8sInformerFactory.Core().V1().Pods()
podManager, err := manager.NewPodManager(config, podInformer.Informer())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// NewPodManager create PodManager from config
func NewPodManager(config *v1alpha1.EdgeController, si cache.SharedIndexInformer) (*PodManager, error) {
	realEvents := make(chan watch.Event, config.Buffer.PodEvent)
	mergedEvents := make(chan watch.Event, config.Buffer.PodEvent)
	rh := NewCommonResourceEventHandler(realEvents)
	si.AddEventHandler(rh)
	pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}
	go pm.merge() //对k8s过来的数据,做一个合并和去重
	return pm, nil
}

DownstreamController start 负责启动各个资源数据接收,数据流转。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Start DownstreamController
func (dc *DownstreamController) Start() error {
	klog.Info("start downstream controller")
	// pod
	go dc.syncPod()

	// configmap
	go dc.syncConfigMap()

	// secret
	go dc.syncSecret()

	// nodes
	go dc.syncEdgeNodes()

	// rule
	go dc.syncRule()

	// ruleendpoint
	go dc.syncRuleEndpoint()

	return nil
}

以syncPod()为例,分析接收到k8s pod变更后,直至同步下发到edgecore的流程。

1
2
3
4
5
6
func (dc *DownstreamController) syncPod() {
    case e := <-dc.podManager.Events(): // 接受事件
        dc.lc.IsEdgeNode(pod.Spec.NodeName) // 根据node是否连接到cloudcore,判断接受pod的node是否是边缘节点
        resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name) // 构建内部消息
        dc.messageLayer.Send(*msg)  // 发送的cloudhub
}
1
2
3
4
5
6
7
8
func EdgeControllerMessageLayer() MessageLayer {
	return &ContextMessageLayer{
		SendModuleName:       modules.CloudHubModuleName, // cloudhub
		SendRouterModuleName: modules.RouterModuleName,
		ReceiveModuleName:    modules.EdgeControllerModuleName,
		ResponseModuleName:   modules.CloudHubModuleName,
	}
}
  1. 发送到cloudhub
    cloudhub for循环持续接收各个模块发来的消息:
1
2
3
4
5
6
7
8
func (q *ChannelMessageQueue) DispatchMessage() {
    for {
        msg, err := beehiveContext.Receive(model.SrcCloudHub)

        // 最终处理函数
        q.addMessageToQueue(nodeID, &msg)
    }
}

addMessageToQueue()负责将消息加入到本地nodeQueue和nodeStore,同时将消息保存到objectSync crd中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel.Message) {
    nodeQueue := q.GetNodeQueue(nodeID)
	nodeStore := q.GetNodeStore(nodeID)

    messageKey, err := getMsgKey(msg) // msg的uid
    // 如果是删除类消息,直接同步
    if !isDeleteMessage(msg) && msg.GetOperation() != beehiveModel.ResponseOperation {
        item, exist, _ := nodeStore.GetByKey(messageKey)
        if !exist { // 不存在,则保存到crd中
            objectSyncStatus, err := q.crdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(context.Background(), objectSync, metav1.CreateOptions{})
        }
    }
    
    // 保存消息
    nodeStore.Add(msg)

    // 保存key
    nodeQueue.Add(messageKey)
}
  1. 发送到edgecore
    cloudhub的handler包负责监听队列,将消息发送到边缘端的edgecore。 MessageHandle中有个MessageWriteLoop方法,当新edgecore连接,会回调此函数
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) {
    nodeQueue := mh.MessageQueue.GetNodeQueue(info.NodeID)
	nodeStore := mh.MessageQueue.GetNodeStore(info.NodeID)

    for {
        key, quit := nodeQueue.Get() // 阻塞等待数据
        obj, exist, _ := nodeStore.GetByKey(key.(string)) // 获取msg

        err := mh.sendMsg(conn.(hubio.CloudHubIO), info, copyMsg, msg, nodeStore) //发送msg
        nodeQueue.Forget(key.(string)) // 删除消息
    }
}

为什么 nodeQueue.Get()获取到的key可以为nodeStore.GetByKey所用?
在新建连接时:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (q *ChannelMessageQueue) Connect(info *model.HubInfo) {
    nodeStore := cache.NewStore(getMsgKey) // 指定获取key的函数
    q.storePool.Store(info.NodeID, nodeStore)
}

// 所以nodeQueue和 nodeStore.GetByKey用的是同一个获取可以的函数
func getMsgKey(obj interface{}) (string, error) {
	msg := obj.(*beehiveModel.Message)

	if msg.GetGroup() == edgeconst.GroupResource {
		return GetMessageUID(*msg)
	}

	return "", fmt.Errorf("failed to get message key")
}