总体架构图

Kubeedge-log-arch 源码分析准工作:

kubernetes 1.18.8

kubeedge 1.8.1

具体流程分析:

当我们执行查看日志操作(`kubectl logs -f pre-fengniao-daemonset-infra-vtrl6 -n pmpp distro --tail 1 -v=8`),执行了那些步骤?

0、边缘node上线:

当node上线时,node会收集并上报node的信息: node监听端口源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# kubeedge/edge/pkg/edged/edged_status.go
func (e *edged) updateNodeStatus() error {
	nodeStatus, err := e.getNodeStatusRequest(&initNode) // 获取node状态
	if err != nil {
		klog.Errorf("Unable to construct api.NodeStatusRequest object for edge: %v", err)
		return err
	}

	err = e.metaClient.NodeStatus(e.namespace).Update(e.nodeName, *nodeStatus)
	if err != nil {
		klog.Errorf("update node failed, error: %v", err)
	}
	return nil
}

func (e *edged) getNodeStatusRequest(node *v1.Node) (*edgeapi.NodeStatusRequest, error) {
    .....  // 忽略本次无关代码

    e.setNodeStatusDaemonEndpoints(nodeStatus)  // 设置node的endpoints
    
    .....  // 忽略本次无关代码
}

// 设置上报数据中的DaemonEndpoint.Port 这个非常重要  默认10350端口
func (e *edged) setNodeStatusDaemonEndpoints(node *edgeapi.NodeStatusRequest) {
	node.Status.DaemonEndpoints = v1.NodeDaemonEndpoints{
		KubeletEndpoint: v1.DaemonEndpoint{
			Port: constants.ServerPort,
		},
	}
}

node IP地址源码: node 启动时,会从配置文件中读取本node的IP地址

1
2
3
// kubeedge/edge/pkg/edged/edged.go
edged, err := newEdged(e.Enable)
nodeIP:                    net.ParseIP(edgedconfig.Config.NodeIP),

1、客户端发起动作:

客户端可以时kubectl log…, 也可以是通过client-go客户端 但是最后都是调用如下api: /containerLogs/{podNamespace}/{podID}/{containerName}

2、k8s-apiserver

当请求发送到apiserver,根据请求中的参数,查询即将要连接的pod的信息,最主要的就是ip和port。 代码流程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// kubernetes/pkg/registry/core/pod/rest/log.go
// get log请求,先到这里
func (r *LogREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) {
	.....  // 忽略本次无关代码
	location, transport, err := pod.LogLocation(ctx, r.Store, r.KubeletConn, name, logOpts)
	if err != nil {
		return nil, err
	}
	.....  // 忽略本次无关代码
}

// kubernetes/pkg/registry/core/pod/strategy.go
func LogLocation(
	ctx context.Context, getter ResourceGetter,
	connInfo client.ConnectionInfoGetter,
	name string,
	opts *api.PodLogOptions,
) (*url.URL, http.RoundTripper, error) {
	.....  // 忽略本次无关代码
	nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName) // 
	if err != nil {
		return nil, nil, err
	}
	.....  // 忽略本次无关代码
}

// kubernetes/pkg/kubelet/client/kubelet_client.go
func (k *NodeConnectionInfoGetter) GetConnectionInfo(ctx context.Context, nodeName types.NodeName) (*ConnectionInfo, error) {
	node, err := k.nodes.Get(ctx, string(nodeName), metav1.GetOptions{})
	if err != nil {
		return nil, err
	}

	// Find a kubelet-reported address, using preferred address type
	host, err := nodeutil.GetPreferredNodeAddress(node, k.preferredAddressTypes)
	if err != nil {
		return nil, err
	}

	// Use the kubelet-reported port, if present
	port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) // 这里的端口就是node上传上来的
	if port <= 0 {
		port = k.defaultPort
	}

	return &ConnectionInfo{
		Scheme:                         k.scheme,
		Hostname:                       host,
		Port:                           strconv.Itoa(port),
		Transport:                      k.transport,
		InsecureSkipTLSVerifyTransport: k.insecureSkipTLSVerifyTransport,
	}, nil
}

代码中node.Status.DaemonEndpoints.KubeletEndpoint.Port 可以通过查看kubectl 查看:

1
2
3
4
5
kubectl get node testnode -o yaml

 daemonEndpoints:
    kubeletEndpoint:
      Port: 10350

3、请求日志数据

至此,请求指定node的日志的ip和port已经拿到,发送请求到 xxx.xxx.xxx.xx:10350的node。 这个逻辑在k8s中是没有问题的,因为master节点和所有的node在同一个网络平面。 但是边缘node和master在不同的内网。所以需要通过iptables nats转发,之前需要显式的通过命令配置: iptables -t nat -A OUTPUT -p tcp --dport 10350 -j DNAT --to cloudcore-ip:10003 即,将apiserver请求的10350的数据转发到cloudcore机器的10003端口。kubeedge1.8以后,这部分工作已经集成在cloudcore代码里了:

1
2
3
4
5
6
// kubeedge/cloud/pkg/cloudstream/iptables/iptables.go
args := append([]string{"-p", "tcp", "-j", "DNAT", "--dport", port, "--to", ip + ":" + strconv.Itoa(int(im.cloudStream.StreamPort))})
		if _, err := im.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, tunnelPortChain, args...); err != nil {
			klog.ErrorS(err, "Failed to ensure rules", "table", utiliptables.TableNAT, "chain", tunnelPortChain)
			return
		}

同时cloudcore的10004和边缘node通过websocket建立连接。所以请求到达cloudcore,cloudcore在经过处理,最终将请求发送到边缘node。