grpc连接维护五个状态:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
const (
	// Idle indicates the ClientConn is idle.
	Idle State = iota
	// Connecting indicates the ClientConn is connecting.
	Connecting
	// Ready indicates the ClientConn is ready for work.
	Ready
	// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
	TransientFailure
	// Shutdown indicates the ClientConn has started shutting down.
	Shutdown
)

这五个状态贯穿整个连接的生命周期

经过一系列的准备:服务发现、负载均衡等策略,最终要和服务端建立真正的连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// google.golang.org/grpc@v1.41.0/clientconn.go
// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {
	ac.mu.Lock()
	if ac.state == connectivity.Shutdown { // 如果调用了close方法,conn的状态为 Shutdown,此时不需要建立连接了,直接退出
		ac.mu.Unlock()
		return errConnClosing
	}
	if ac.state != connectivity.Idle { // new出来的conn默认都是Idle状态, 参考下文的 newAddrConn方法
		ac.mu.Unlock()
		return nil
	}
	// Update connectivity state within the lock to prevent subsequent or
	// concurrent calls from resetting the transport more than once.
	ac.updateConnectivityState(connectivity.Connecting, nil) // 更新状态为正在连接
	ac.mu.Unlock()

	ac.resetTransport()
	return nil
}

newAddrConn方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// google.golang.org/grpc@v1.41.0/clientconn.go
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
	ac := &addrConn{
		state:        connectivity.Idle, // 默认状态
		cc:           cc,
		addrs:        addrs,
		scopts:       opts,
		dopts:        cc.dopts,
		czData:       new(channelzData),
		resetBackoff: make(chan struct{}),
	}
...
	return ac, nil
}

之后的主要逻辑通过func (ac *addrConn) resetTransport()调到tryAllAddrs, tryAllAddrs遍历所有的addr,发起连接(ac.createTransport(addr, copts, connectDeadline))。

在createTransport中transport.NewClientTransport 一路向下调用http2的new client方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// google.golang.org/grpc@v1.41.0/internal/transport/http2_client.go
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
...
    conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
... 

	if kp.Time == 0 { //确保设置keepalive
		kp.Time = defaultClientKeepaliveTime
	}
	if kp.Timeout == 0 { //确保设置timeout
		kp.Timeout = defaultClientKeepaliveTimeout
	}
...
    writeBufSize := opts.WriteBufferSize  //设置write buf 
	readBufSize := opts.ReadBufferSize    //设置read buf

    // 初始化一个http2Client结构体
    	t := &http2Client{
		ctx:                   ctx,
		ctxDone:               ctx.Done(), // Cache Done chan.
		cancel:                cancel,
		userAgent:             opts.UserAgent,
		conn:                  conn,
		remoteAddr:            conn.RemoteAddr(),
		localAddr:             conn.LocalAddr(),
		authInfo:              authInfo,
...
	}

    // 开启 keepalive
    go t.keepalive()

    // Start the reader goroutine for incoming message. Each transport has
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
	// dispatches the frame to the corresponding stream entity.
    go t.reader() // 上面的注释写的很好了

    // Send connection preface to server.
	n, err := t.conn.Write(clientPreface) //发送一个消息头

createTransport中会用select case等待连接返回,如果连接成功,调用startHealthCheck,保存新连接,开启健康检查

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// google.golang.org/grpc@v1.41.0/clientconn.go
func (ac *addrConn) startHealthCheck(ctx context.Context) {
	var healthcheckManagingState bool
	defer func() {
		if !healthcheckManagingState {
			ac.updateConnectivityState(connectivity.Ready, nil) // 更新状态为Ready
		}
	}()

...
    ac.curAddr = addr // 将当前连接用的地址保存为当前地址,下次连接,接着用
    ac.transport = newTr // 将连接信息保存到transport
	// Start the health checking stream.
	go func() {
		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
		if err != nil {
			if status.Code(err) == codes.Unimplemented {
				channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
			} else {
				channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
			}
		}
	}()
}

如果用户没有指定healthCheckFunc,则使用grpc默认提供的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// google.golang.org/grpc@v1.41.0/health/client.go
func init() {
	internal.HealthCheckFunc = clientHealthCheck
}

// This function implements the protocol defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), service string) error {
	tryCnt := 0

retryConnection:
	for {
		// Backs off if the connection has failed in some way without receiving a message in the previous retry.
		if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) { // 退避重连算法,保证不要重连过快
			return nil
		}
		tryCnt++

		if ctx.Err() != nil {
			return nil
		}
		setConnectivityState(connectivity.Connecting, nil) // 更新状态为Connecting 开始连接

        // newStream 函数在google.golang.org/grpc@v1.41.0/stream.go 
        //func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error)
		rawS, err := newStream(healthCheckMethod)
		if err != nil {
			continue retryConnection
		}

		s, ok := rawS.(grpc.ClientStream)
		// Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes.
		if !ok {
			setConnectivityState(connectivity.Ready, nil)
			return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
		}

        // 发送健康检查
		if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF {
			// Stream should have been closed, so we can safely continue to create a new stream.
			continue retryConnection
		}
		s.CloseSend()

		resp := new(healthpb.HealthCheckResponse)
		for {
			err = s.RecvMsg(resp) // 等待接收消息

			// Reports healthy for the LBing purposes if health check is not implemented in the server.
			if status.Code(err) == codes.Unimplemented {
				setConnectivityState(connectivity.Ready, nil)
				return err
			}

			// Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED.
			if err != nil { // 报错,开始重新检查
				setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err))
				continue retryConnection
			}

			// As a message has been received, removes the need for backoff for the next retry by resetting the try count.
			tryCnt = 0 // 成功,重置以下tryCnt,影响检查时间间隔
			if resp.Status == healthpb.HealthCheckResponse_SERVING {
				setConnectivityState(connectivity.Ready, nil) // 更新状态为ready
			} else {
				setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status))
			}
		}
	}
}

借助etcd list-watch实现断线重连 grpc 提供一个builder interface,用来监控服务地址的变化,当有服务退出,或者新增,通过etcd的watch机制更新服务信息,实现重连。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
	resolver.Resolver, error) {
	hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
		return r == EndpointSepChar
	})
	sub, err := discov.NewSubscriber(hosts, target.Endpoint)
	if err != nil {
		return nil, err
	}
	update := func() { // 监听key-value变更时watch调用的函数
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{ // 更新监听server的地址信息
				Addr: val,
			})
		}
		if err := cc.UpdateState(resolver.State{
			Addresses: addrs,
		}); err != nil {
			log.Error(err)
		}
	}
	sub.AddListener(update) // 注册update 函数
	update()

	return &nopResolver{cc: cc}, nil
}

注册就是将函数append到listeners slice中,当变动时遍历所有的listener。

watch实现,在上面的NewSubscriber中,会起一个goroutine调用monitor,monitor负责启动watch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (c *cluster) monitor(key string, l UpdateListener) error {
	c.lock.Lock()
	c.listeners[key] = append(c.listeners[key], l)
	c.lock.Unlock()

	cli, err := c.getClient()
	if err != nil {
		return err
	}

	c.load(cli, key)
	c.watchGroup.Add(1)
	go func() {
		defer c.watchGroup.Done()
		c.watch(cli, key)
	}()

	return nil
}

monitor中的watch最终调用到此处的watchStream,watchStream启动一个etcd的watch,当有事件,调用handleWatchEvents回调上面注册的函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *cluster) watchStream(cli EtcdClient, key string) bool {
	rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
	for {
		select {
		case wresp, ok := <-rch:
			if !ok {
				log.Error("etcd monitor chan has been closed")
				return false
			}
			if wresp.Canceled {
				log.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
				return false
			}
			if wresp.Err() != nil {
				log.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
				return false
			}

			c.handleWatchEvents(key, wresp.Events)
		case <-c.done:
			return true
		}
	}
}

当grpc的cc.UpdateState被调用时,通过调用关系:

1
2
3
4
5
6
7
8
9
// google.golang.org/grpc@v1.43.0/resolver_conn_wrapper.go
func (ccr *ccResolverWrapper) UpdateState(s resolver.State)
		// google.golang.org/grpc@v1.43.0/clientconn.go
	=> func (cc *ClientConn) updateResolverState(s resolver.State, err error)
		=> func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState)
			=> b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
				=> ccb.cc.newAddrConn(addrs, opts)
					=> func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) 
	

可以发现,经过一系列的调用最终调用到上面讲到的newAddrConn方法。