diff --git a/clientconn.go b/clientconn.go index e4d6a0b66024..f21a568525d3 100644 --- a/clientconn.go +++ b/clientconn.go @@ -559,15 +559,16 @@ func (cc *ClientConn) scWatcher() { // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. +// +// We should never need to replace an addrConn with a new one. This function is only used +// as newAddrConn to create new addrConn. +// TODO rename this function and clean up the code. func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { ac := &addrConn{ cc: cc, addr: addr, dopts: cc.dopts, } - cc.mu.RLock() - ac.dopts.copts.KeepaliveParams = cc.mkp - cc.mu.RUnlock() ac.ctx, ac.cancel = context.WithCancel(cc.ctx) ac.stateCV = sync.NewCond(&ac.mu) if EnableTracing { @@ -598,10 +599,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) cc.mu.Unlock() if stale != nil { // There is an addrConn alive on ac.addr already. This could be due to - // 1) a buggy Balancer notifies duplicated Addresses; - // 2) goaway was received, a new ac will replace the old ac. - // The old ac should be deleted from cc.conns, but the - // underlying transport should drain rather than close. + // a buggy Balancer that reports duplicated Addresses. if tearDownErr == nil { // tearDownErr is nil if resetAddrConn is called by // 1) Dial @@ -828,26 +826,44 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti return ac.state, nil } -func (ac *addrConn) resetTransport(closeTransport bool) error { +// resetTransport recreates a transport to the address for ac. +// For the old transport: +// - if drain is true, it will be gracefully closed. +// - otherwise, it will be closed. +func (ac *addrConn) resetTransport(drain bool) error { + ac.mu.Lock() + if ac.state == Shutdown { + ac.mu.Unlock() + return errConnClosing + } + ac.printf("connecting") + if ac.down != nil { + ac.down(downErrorf(false, true, "%v", errNetworkIO)) + ac.down = nil + } + ac.state = Connecting + ac.stateCV.Broadcast() + t := ac.transport + ac.transport = nil + ac.mu.Unlock() + if t != nil { + if drain { + t.GracefulClose() + } else { + t.Close() + } + } + ac.cc.mu.RLock() + ac.dopts.copts.KeepaliveParams = ac.cc.mkp + ac.cc.mu.RUnlock() for retries := 0; ; retries++ { ac.mu.Lock() - ac.printf("connecting") if ac.state == Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing } - if ac.down != nil { - ac.down(downErrorf(false, true, "%v", errNetworkIO)) - ac.down = nil - } - ac.state = Connecting - ac.stateCV.Broadcast() - t := ac.transport ac.mu.Unlock() - if closeTransport && t != nil { - t.Close() - } sleepTime := ac.dopts.bs.backoff(retries) timeout := minConnectTimeout if timeout < sleepTime { @@ -883,7 +899,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { ac.ready = nil } ac.mu.Unlock() - closeTransport = false timer := time.NewTimer(sleepTime - time.Since(connectTime)) select { case <-timer.C: @@ -936,19 +951,25 @@ func (ac *addrConn) transportMonitor() { return case <-t.GoAway(): ac.adjustParams(t.GetGoAwayReason()) - // If GoAway happens without any network I/O error, ac is closed without shutting down the - // underlying transport (the transport will be closed when all the pending RPCs finished or - // failed.). - // If GoAway and some network I/O error happen concurrently, ac and its underlying transport - // are closed. - // In both cases, a new ac is created. + // If GoAway happens without any network I/O error, the underlying transport + // will be gracefully closed, and a new transport will be created. + // (The transport will be closed when all the pending RPCs finished or failed.) + // If GoAway and some network I/O error happen concurrently, the underlying transport + // will be closed, and a new transport will be created. + var drain bool select { case <-t.Error(): - ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) default: - ac.cc.resetAddrConn(ac.addr, false, errConnDrain) + drain = true + } + if err := ac.resetTransport(drain); err != nil { + grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } + return } - return case <-t.Error(): select { case <-ac.ctx.Done(): @@ -956,8 +977,14 @@ func (ac *addrConn) transportMonitor() { return case <-t.GoAway(): ac.adjustParams(t.GetGoAwayReason()) - ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) - return + if err := ac.resetTransport(false); err != nil { + grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } + return + } default: } ac.mu.Lock() @@ -969,7 +996,8 @@ func (ac *addrConn) transportMonitor() { ac.state = TransientFailure ac.stateCV.Broadcast() ac.mu.Unlock() - if err := ac.resetTransport(true); err != nil { + if err := ac.resetTransport(false); err != nil { + grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) ac.mu.Lock() ac.printf("transport exiting: %v", err) ac.mu.Unlock()