Skip to content

Commit

Permalink
http2: surface errors occurring very early in a client conn's lifetime
Browse files Browse the repository at this point in the history
When we create a new connection for a request, the request should
fail if the connection attempt fails.

There is a race condition which can cause this to not happen:

- net/http sends a request to a http2.Transport
- the http2.Transport returns ErrNoCachedConn
- net/http creates a new tls.Conn and passes it to the http2.Transport
- the http2.Transport adds the conn to its connection pool
- the connection immediately encounters an error
- the http2.Transport removes the conn from its connection pool
- net/http resends the request to the http2.Transport
- the http2.Transport returns ErrNoCachedConn, and the process repeates

If the request is sent to the http2.Transport before the connection
encounters an error, then the request fails. But otherwise, we get
stuck in an infinite loop of the http2.Transport asking for a new
connection, receiving one, and throwing it away.

To fix this, leave a dead connection in the pool for a short while
if it has never had a request sent to it. If a dead connection is
used for a new request, return an error and remove the connection
from the pool.

Change-Id: I64eb15a8f1512a6bda52db423072b945fab6f4b5
Reviewed-on: https://go-review.googlesource.com/c/net/+/625398
Reviewed-by: Brad Fitzpatrick <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
Reviewed-by: Jonathan Amsterdam <[email protected]>
  • Loading branch information
neild committed Nov 6, 2024
1 parent 0aa844c commit 858db1a
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 20 deletions.
30 changes: 23 additions & 7 deletions http2/clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package http2
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -112,27 +113,40 @@ func newTestClientConnFromClientConn(t *testing.T, cc *ClientConn) *testClientCo
cc: cc,
group: cc.t.transportTestHooks.group.(*synctestGroup),
}
cli, srv := synctestNetPipe(tc.group)

// srv is the side controlled by the test.
var srv *synctestNetConn
if cc.tconn == nil {
// If cc.tconn is nil, we're being called with a new conn created by the
// Transport's client pool. This path skips dialing the server, and we
// create a test connection pair here.
cc.tconn, srv = synctestNetPipe(tc.group)
} else {
// If cc.tconn is non-nil, we're in a test which provides a conn to the
// Transport via a TLSNextProto hook. Extract the test connection pair.
if tc, ok := cc.tconn.(*tls.Conn); ok {
// Unwrap any *tls.Conn to the underlying net.Conn,
// to avoid dealing with encryption in tests.
cc.tconn = tc.NetConn()
}
srv = cc.tconn.(*synctestNetConn).peer
}

srv.SetReadDeadline(tc.group.Now())
srv.autoWait = true
tc.netconn = srv
tc.enc = hpack.NewEncoder(&tc.encbuf)

// all writes and reads are finished.
//
// cli is the ClientConn's side, srv is the side controlled by the test.
cc.tconn = cli
tc.fr = NewFramer(srv, srv)
tc.testConnFramer = testConnFramer{
t: t,
fr: tc.fr,
dec: hpack.NewDecoder(initialHeaderTableSize, nil),
}

tc.fr.SetMaxReadFrameSize(10 << 20)
t.Cleanup(func() {
tc.closeWrite()
})

return tc
}

Expand Down Expand Up @@ -503,6 +517,8 @@ func newTestTransport(t *testing.T, opts ...any) *testTransport {
o(tr.t1)
case func(*Transport):
o(tr)
case *Transport:
tr = o
}
}
tt.tr = tr
Expand Down
10 changes: 8 additions & 2 deletions http2/netconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
s1 := newSynctestNetConnHalf(s1addr)
s2 := newSynctestNetConnHalf(s2addr)
return &synctestNetConn{group: group, loc: s1, rem: s2},
&synctestNetConn{group: group, loc: s2, rem: s1}
r = &synctestNetConn{group: group, loc: s1, rem: s2}
w = &synctestNetConn{group: group, loc: s2, rem: s1}
r.peer = w
w.peer = r
return r, w
}

// A synctestNetConn is one endpoint of the connection created by synctestNetPipe.
Expand All @@ -43,6 +46,9 @@ type synctestNetConn struct {

// When set, group.Wait is automatically called before reads and after writes.
autoWait bool

// peer is the other endpoint.
peer *synctestNetConn
}

// Read reads data from the connection.
Expand Down
86 changes: 75 additions & 11 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,20 @@ func (t *Transport) markNewGoroutine() {
}
}

func (t *Transport) now() time.Time {
if t != nil && t.transportTestHooks != nil {
return t.transportTestHooks.group.Now()
}
return time.Now()
}

func (t *Transport) timeSince(when time.Time) time.Duration {
if t != nil && t.transportTestHooks != nil {
return t.now().Sub(when)
}
return time.Since(when)
}

// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
Expand Down Expand Up @@ -343,7 +357,7 @@ type ClientConn struct {
t *Transport
tconn net.Conn // usually *tls.Conn, except specialized impls
tlsState *tls.ConnectionState // nil only for specialized impls
reused uint32 // whether conn is being reused; atomic
atomicReused uint32 // whether conn is being reused; atomic
singleUse bool // whether being used for a single http.Request
getConnCalled bool // used by clientConnPool

Expand Down Expand Up @@ -609,7 +623,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
return nil, err
}
reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
traceGotConn(req, cc, reused)
res, err := cc.RoundTrip(req)
if err != nil && retry <= 6 {
Expand All @@ -634,6 +648,22 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
}
}
}
if err == errClientConnNotEstablished {
// This ClientConn was created recently,
// this is the first request to use it,
// and the connection is closed and not usable.
//
// In this state, cc.idleTimer will remove the conn from the pool
// when it fires. Stop the timer and remove it here so future requests
// won't try to use this connection.
//
// If the timer has already fired and we're racing it, the redundant
// call to MarkDead is harmless.
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
t.connPool().MarkDead(cc)
}
if err != nil {
t.vlogf("RoundTrip failure: %v", err)
return nil, err
Expand All @@ -652,9 +682,10 @@ func (t *Transport) CloseIdleConnections() {
}

var (
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnNotEstablished = errors.New("http2: client conn could not be established")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)

// shouldRetryRequest is called by RoundTrip when a request fails to get
Expand Down Expand Up @@ -793,6 +824,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
pingTimeout: conf.PingTimeout,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
lastActive: t.now(),
}
var group synctestGroupInterface
if t.transportTestHooks != nil {
Expand Down Expand Up @@ -1041,6 +1073,16 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
!cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()

// If this connection has never been used for a request and is closed,
// then let it take a request (which will fail).
//
// This avoids a situation where an error early in a connection's lifetime
// goes unreported.
if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed {
st.canTakeNewRequest = true
}

return
}

Expand All @@ -1062,7 +1104,7 @@ func (cc *ClientConn) tooIdleLocked() bool {
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
}

// onIdleTimeout is called from a time.AfterFunc goroutine. It will
Expand Down Expand Up @@ -1706,7 +1748,12 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
// Must hold cc.mu.
func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
for {
cc.lastActive = time.Now()
if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
// This is the very first request sent to this connection.
// Return a fatal error which aborts the retry loop.
return errClientConnNotEstablished
}
cc.lastActive = cc.t.now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return errClientConnUnusable
}
Expand Down Expand Up @@ -2253,10 +2300,10 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
if len(cc.streams) != slen-1 {
panic("forgetting unknown stream id")
}
cc.lastActive = time.Now()
cc.lastActive = cc.t.now()
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
cc.lastIdle = time.Now()
cc.lastIdle = cc.t.now()
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
Expand Down Expand Up @@ -2316,7 +2363,6 @@ func isEOFOrNetReadError(err error) bool {

func (rl *clientConnReadLoop) cleanup() {
cc := rl.cc
cc.t.connPool().MarkDead(cc)
defer cc.closeConn()
defer close(cc.readerDone)

Expand All @@ -2340,6 +2386,24 @@ func (rl *clientConnReadLoop) cleanup() {
}
cc.closed = true

// If the connection has never been used, and has been open for only a short time,
// leave it in the connection pool for a little while.
//
// This avoids a situation where new connections are constantly created,
// added to the pool, fail, and are removed from the pool, without any error
// being surfaced to the user.
const unusedWaitTime = 5 * time.Second
idleTime := cc.t.now().Sub(cc.lastActive)
if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime {
cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
cc.t.connPool().MarkDead(cc)
})
} else {
cc.mu.Unlock() // avoid any deadlocks in MarkDead
cc.t.connPool().MarkDead(cc)
cc.mu.Lock()
}

for _, cs := range cc.streams {
select {
case <-cs.peerClosed:
Expand Down Expand Up @@ -3332,7 +3396,7 @@ func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
cc.mu.Lock()
ci.WasIdle = len(cc.streams) == 0 && reused
if ci.WasIdle && !cc.lastActive.IsZero() {
ci.IdleTime = time.Since(cc.lastActive)
ci.IdleTime = cc.t.timeSince(cc.lastActive)
}
cc.mu.Unlock()

Expand Down
113 changes: 113 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5638,3 +5638,116 @@ func TestTransportConnBecomesUnresponsive(t *testing.T) {
rt2.wantStatus(200)
rt2.response().Body.Close()
}

// Test that the Transport can use a conn provided to it by a TLSNextProto hook.
func TestTransportTLSNextProtoConnOK(t *testing.T) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)

// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()
tc.greet()

// Send a request on the Transport.
// It uses the conn we provided.
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tt.roundTrip(req)
tc.wantHeaders(wantHeader{
streamID: 1,
endStream: true,
header: http.Header{
":authority": []string{"dummy.tld"},
":method": []string{"GET"},
":path": []string{"/"},
},
})
tc.writeHeaders(HeadersFrameParam{
StreamID: 1,
EndHeaders: true,
EndStream: true,
BlockFragment: tc.makeHeaderBlockFragment(
":status", "200",
),
})
rt.wantStatus(200)
rt.wantBody(nil)
}

// Test the case where a conn provided via a TLSNextProto hook immediately encounters an error.
func TestTransportTLSNextProtoConnImmediateFailureUsed(t *testing.T) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)

// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()

// The connection encounters an error before we send a request that uses it.
tc.closeWrite()

// Send a request on the Transport.
//
// It should fail, because we have no usable connections, but not with ErrNoCachedConn.
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tt.roundTrip(req)
if err := rt.err(); err == nil || errors.Is(err, ErrNoCachedConn) {
t.Fatalf("RoundTrip with broken conn: got %v, want an error other than ErrNoCachedConn", err)
}

// Send the request again.
// This time it should fail with ErrNoCachedConn,
// because the dead conn has been removed from the pool.
rt = tt.roundTrip(req)
if err := rt.err(); !errors.Is(err, ErrNoCachedConn) {
t.Fatalf("RoundTrip after broken conn is used: got %v, want ErrNoCachedConn", err)
}
}

// Test the case where a conn provided via a TLSNextProto hook immediately encounters an error,
// but no requests are sent which would use the bad connection.
func TestTransportTLSNextProtoConnImmediateFailureUnused(t *testing.T) {
t1 := &http.Transport{}
t2, _ := ConfigureTransports(t1)
tt := newTestTransport(t, t2)

// Create a new, fake connection and pass it to the Transport via the TLSNextProto hook.
cli, _ := synctestNetPipe(tt.group)
cliTLS := tls.Client(cli, tlsConfigInsecure)
go func() {
tt.group.Join()
t1.TLSNextProto["h2"]("dummy.tld", cliTLS)
}()
tt.sync()
tc := tt.getConn()

// The connection encounters an error before we send a request that uses it.
tc.closeWrite()

// Some time passes.
// The dead connection is removed from the pool.
tc.advance(10 * time.Second)

// Send a request on the Transport.
//
// It should fail with ErrNoCachedConn, because the pool contains no conns.
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tt.roundTrip(req)
if err := rt.err(); !errors.Is(err, ErrNoCachedConn) {
t.Fatalf("RoundTrip after broken conn expires: got %v, want ErrNoCachedConn", err)
}
}

0 comments on commit 858db1a

Please sign in to comment.