grpc连接维护五个状态:
1
2
3
4
5
6
7
8
9
10
11
12
| const (
// Idle indicates the ClientConn is idle.
Idle State = iota
// Connecting indicates the ClientConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
|
这五个状态贯穿整个连接的生命周期
经过一系列的准备:服务发现、负载均衡等策略,最终要和服务端建立真正的连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // google.golang.org/grpc@v1.41.0/clientconn.go
// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown { // 如果调用了close方法,conn的状态为 Shutdown,此时不需要建立连接了,直接退出
ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle { // new出来的conn默认都是Idle状态, 参考下文的 newAddrConn方法
ac.mu.Unlock()
return nil
}
// Update connectivity state within the lock to prevent subsequent or
// concurrent calls from resetting the transport more than once.
ac.updateConnectivityState(connectivity.Connecting, nil) // 更新状态为正在连接
ac.mu.Unlock()
ac.resetTransport()
return nil
}
|
newAddrConn方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // google.golang.org/grpc@v1.41.0/clientconn.go
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
state: connectivity.Idle, // 默认状态
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
...
return ac, nil
}
|
之后的主要逻辑通过func (ac *addrConn) resetTransport()
调到tryAllAddrs
, tryAllAddrs遍历所有的addr,发起连接(ac.createTransport(addr, copts, connectDeadline)
)。
在createTransport中transport.NewClientTransport 一路向下调用http2的new client方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| // google.golang.org/grpc@v1.41.0/internal/transport/http2_client.go
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
...
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
...
if kp.Time == 0 { //确保设置keepalive
kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 { //确保设置timeout
kp.Timeout = defaultClientKeepaliveTimeout
}
...
writeBufSize := opts.WriteBufferSize //设置write buf
readBufSize := opts.ReadBufferSize //设置read buf
// 初始化一个http2Client结构体
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
...
}
// 开启 keepalive
go t.keepalive()
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t.reader() // 上面的注释写的很好了
// Send connection preface to server.
n, err := t.conn.Write(clientPreface) //发送一个消息头
|
在createTransport
中会用select case等待连接返回,如果连接成功,调用startHealthCheck
,保存新连接,开启健康检查
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // google.golang.org/grpc@v1.41.0/clientconn.go
func (ac *addrConn) startHealthCheck(ctx context.Context) {
var healthcheckManagingState bool
defer func() {
if !healthcheckManagingState {
ac.updateConnectivityState(connectivity.Ready, nil) // 更新状态为Ready
}
}()
...
ac.curAddr = addr // 将当前连接用的地址保存为当前地址,下次连接,接着用
ac.transport = newTr // 将连接信息保存到transport
// Start the health checking stream.
go func() {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
}
}
}()
}
|
如果用户没有指定healthCheckFunc,则使用grpc默认提供的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
| // google.golang.org/grpc@v1.41.0/health/client.go
func init() {
internal.HealthCheckFunc = clientHealthCheck
}
// This function implements the protocol defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), service string) error {
tryCnt := 0
retryConnection:
for {
// Backs off if the connection has failed in some way without receiving a message in the previous retry.
if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) { // 退避重连算法,保证不要重连过快
return nil
}
tryCnt++
if ctx.Err() != nil {
return nil
}
setConnectivityState(connectivity.Connecting, nil) // 更新状态为Connecting 开始连接
// newStream 函数在google.golang.org/grpc@v1.41.0/stream.go
//func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error)
rawS, err := newStream(healthCheckMethod)
if err != nil {
continue retryConnection
}
s, ok := rawS.(grpc.ClientStream)
// Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes.
if !ok {
setConnectivityState(connectivity.Ready, nil)
return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
}
// 发送健康检查
if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF {
// Stream should have been closed, so we can safely continue to create a new stream.
continue retryConnection
}
s.CloseSend()
resp := new(healthpb.HealthCheckResponse)
for {
err = s.RecvMsg(resp) // 等待接收消息
// Reports healthy for the LBing purposes if health check is not implemented in the server.
if status.Code(err) == codes.Unimplemented {
setConnectivityState(connectivity.Ready, nil)
return err
}
// Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED.
if err != nil { // 报错,开始重新检查
setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err))
continue retryConnection
}
// As a message has been received, removes the need for backoff for the next retry by resetting the try count.
tryCnt = 0 // 成功,重置以下tryCnt,影响检查时间间隔
if resp.Status == healthpb.HealthCheckResponse_SERVING {
setConnectivityState(connectivity.Ready, nil) // 更新状态为ready
} else {
setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status))
}
}
}
}
|
借助etcd list-watch实现断线重连
grpc 提供一个builder interface,用来监控服务地址的变化,当有服务退出,或者新增,通过etcd的watch机制更新服务信息,实现重连。
1
2
3
4
5
6
7
8
9
10
11
| // Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
update := func() { // 监听key-value变更时watch调用的函数
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{ // 更新监听server的地址信息
Addr: val,
})
}
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
log.Error(err)
}
}
sub.AddListener(update) // 注册update 函数
update()
return &nopResolver{cc: cc}, nil
}
|
注册就是将函数append到listeners slice中,当变动时遍历所有的listener。
watch实现,在上面的NewSubscriber中,会起一个goroutine调用monitor,monitor负责启动watch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func (c *cluster) monitor(key string, l UpdateListener) error {
c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l)
c.lock.Unlock()
cli, err := c.getClient()
if err != nil {
return err
}
c.load(cli, key)
c.watchGroup.Add(1)
go func() {
defer c.watchGroup.Done()
c.watch(cli, key)
}()
return nil
}
|
monitor中的watch最终调用到此处的watchStream,watchStream启动一个etcd的watch,当有事件,调用handleWatchEvents回调上面注册的函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| func (c *cluster) watchStream(cli EtcdClient, key string) bool {
rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
for {
select {
case wresp, ok := <-rch:
if !ok {
log.Error("etcd monitor chan has been closed")
return false
}
if wresp.Canceled {
log.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
return false
}
if wresp.Err() != nil {
log.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return false
}
c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return true
}
}
}
|
当grpc的cc.UpdateState被调用时,通过调用关系:
1
2
3
4
5
6
7
8
9
| // google.golang.org/grpc@v1.43.0/resolver_conn_wrapper.go
func (ccr *ccResolverWrapper) UpdateState(s resolver.State)
// google.golang.org/grpc@v1.43.0/clientconn.go
=> func (cc *ClientConn) updateResolverState(s resolver.State, err error)
=> func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState)
=> b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
=> ccb.cc.newAddrConn(addrs, opts)
=> func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error)
|
可以发现,经过一系列的调用最终调用到上面讲到的newAddrConn方法。