- 同步k8s数据
负责同步k8s数据的模块为cloudcore的edgecontroller中的controller downstream.go。
NewDownstreamController 方法实例化,同时对原生的资源同步做了一个包装,比如:
1
2
| podInformer := k8sInformerFactory.Core().V1().Pods()
podManager, err := manager.NewPodManager(config, podInformer.Informer())
|
1
2
3
4
5
6
7
8
9
10
| // NewPodManager create PodManager from config
func NewPodManager(config *v1alpha1.EdgeController, si cache.SharedIndexInformer) (*PodManager, error) {
realEvents := make(chan watch.Event, config.Buffer.PodEvent)
mergedEvents := make(chan watch.Event, config.Buffer.PodEvent)
rh := NewCommonResourceEventHandler(realEvents)
si.AddEventHandler(rh)
pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}
go pm.merge() //对k8s过来的数据,做一个合并和去重
return pm, nil
}
|
DownstreamController start 负责启动各个资源数据接收,数据流转。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("start downstream controller")
// pod
go dc.syncPod()
// configmap
go dc.syncConfigMap()
// secret
go dc.syncSecret()
// nodes
go dc.syncEdgeNodes()
// rule
go dc.syncRule()
// ruleendpoint
go dc.syncRuleEndpoint()
return nil
}
|
以syncPod()为例,分析接收到k8s pod变更后,直至同步下发到edgecore的流程。
1
2
3
4
5
6
| func (dc *DownstreamController) syncPod() {
case e := <-dc.podManager.Events(): // 接受事件
dc.lc.IsEdgeNode(pod.Spec.NodeName) // 根据node是否连接到cloudcore,判断接受pod的node是否是边缘节点
resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name) // 构建内部消息
dc.messageLayer.Send(*msg) // 发送的cloudhub
}
|
1
2
3
4
5
6
7
8
| func EdgeControllerMessageLayer() MessageLayer {
return &ContextMessageLayer{
SendModuleName: modules.CloudHubModuleName, // cloudhub
SendRouterModuleName: modules.RouterModuleName,
ReceiveModuleName: modules.EdgeControllerModuleName,
ResponseModuleName: modules.CloudHubModuleName,
}
}
|
- 发送到cloudhub
cloudhub for循环持续接收各个模块发来的消息:
1
2
3
4
5
6
7
8
| func (q *ChannelMessageQueue) DispatchMessage() {
for {
msg, err := beehiveContext.Receive(model.SrcCloudHub)
// 最终处理函数
q.addMessageToQueue(nodeID, &msg)
}
}
|
addMessageToQueue()负责将消息加入到本地nodeQueue和nodeStore,同时将消息保存到objectSync crd中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel.Message) {
nodeQueue := q.GetNodeQueue(nodeID)
nodeStore := q.GetNodeStore(nodeID)
messageKey, err := getMsgKey(msg) // msg的uid
// 如果是删除类消息,直接同步
if !isDeleteMessage(msg) && msg.GetOperation() != beehiveModel.ResponseOperation {
item, exist, _ := nodeStore.GetByKey(messageKey)
if !exist { // 不存在,则保存到crd中
objectSyncStatus, err := q.crdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(context.Background(), objectSync, metav1.CreateOptions{})
}
}
// 保存消息
nodeStore.Add(msg)
// 保存key
nodeQueue.Add(messageKey)
}
|
- 发送到edgecore
cloudhub的handler包负责监听队列,将消息发送到边缘端的edgecore。
MessageHandle中有个MessageWriteLoop方法,当新edgecore连接,会回调此函数
1
2
3
4
5
6
7
8
9
10
11
12
| func (mh *MessageHandle) MessageWriteLoop(info *model.HubInfo, stopServe chan ExitCode) {
nodeQueue := mh.MessageQueue.GetNodeQueue(info.NodeID)
nodeStore := mh.MessageQueue.GetNodeStore(info.NodeID)
for {
key, quit := nodeQueue.Get() // 阻塞等待数据
obj, exist, _ := nodeStore.GetByKey(key.(string)) // 获取msg
err := mh.sendMsg(conn.(hubio.CloudHubIO), info, copyMsg, msg, nodeStore) //发送msg
nodeQueue.Forget(key.(string)) // 删除消息
}
}
|
为什么 nodeQueue.Get()获取到的key可以为nodeStore.GetByKey所用?
在新建连接时:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (q *ChannelMessageQueue) Connect(info *model.HubInfo) {
nodeStore := cache.NewStore(getMsgKey) // 指定获取key的函数
q.storePool.Store(info.NodeID, nodeStore)
}
// 所以nodeQueue和 nodeStore.GetByKey用的是同一个获取可以的函数
func getMsgKey(obj interface{}) (string, error) {
msg := obj.(*beehiveModel.Message)
if msg.GetGroup() == edgeconst.GroupResource {
return GetMessageUID(*msg)
}
return "", fmt.Errorf("failed to get message key")
}
|