Skip to content

Commit

Permalink
Fixed deadlock in transport
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Sep 13, 2022
1 parent 552de12 commit 05537c1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
13 changes: 10 additions & 3 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,16 +1232,23 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}

activeStreams := make(map[uint32]*Stream)
for streamID, stream := range t.activeStreams {
activeStreams[streamID] = stream
}

t.prevGoAwayID = id
t.mu.Unlock()
for streamID, stream := range activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock()

active := len(activeStreams)
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
Expand Down
44 changes: 44 additions & 0 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,3 +2501,47 @@ func (s) TestPeerSetInServerContext(t *testing.T) {
}
server.mu.Unlock()
}

// TestGoAwayCloseStreams tests the scenario where a client has many streams
// created, and the server sends a GOAWAY frame with a stream id less than some
// of them, while the client is still creating new streams. This should not
// induce a deadlock.
func (s) TestGoAwayCloseStreams(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
defer server.stop()
defer ct.Close(fmt.Errorf("closed manually by test"))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < 5; i++ {
_, err := ct.NewStream(ctx, &CallHdr{})
if err != nil {
t.Fatalf("error creating stream: %v", err)
}
}

waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()

if len(server.conns) == 0 {
return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
}
return false, nil
})

var st *http2Server
server.mu.Lock()
for k := range server.conns {
st = k.(*http2Server)
}
server.mu.Unlock()

st.framer.fr.WriteGoAway(5, http2.ErrCodeNo, []byte{})
for i := 0; i < 10; i++ {
_, err := ct.NewStream(ctx, &CallHdr{})
if err != nil {
t.Fatalf("error creating stream: %v", err)
}
}
}

0 comments on commit 05537c1

Please sign in to comment.