Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: Fix deadlock in transport caused by GOAWAY race with new stream creation #5652

Merged
merged 4 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,18 +1232,27 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}

t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}

streamsToClose := make([]*Stream, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var streamsToClose []*Stream

Or delete the ,0 but make is unnecessary for a slice either way if you aren't allocating anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched. You initially suggested make for the capacity length. Why was that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you specify the capacity then it won't need to do reallocations as you add to it. In this case, since we don't have any idea how many active streams are before/after the ID (and it isn't worth computing it first) then this should be fine. It will need reallocations (Go doubles the slice capacity as you add so it's O(logN)), but this is a rare case and not worth optimizing.

for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
if streamID > id && streamID <= upperLimit {
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
for _, stream := range streamsToClose {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about why this is called outside t.mu.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}

Expand Down
108 changes: 108 additions & 0 deletions test/clienttester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test

import (
"bytes"
"io"
"net"
"testing"

"golang.org/x/net/http2"
)

var (
clientPreface = []byte(http2.ClientPreface)
)

func newClientTester(t *testing.T, conn net.Conn) *clientTester {
ct := &clientTester{
t: t,
conn: conn,
}
ct.fr = http2.NewFramer(conn, conn)
return ct
}

type clientTester struct {
t *testing.T
conn net.Conn
fr *http2.Framer
}

// greet() performs the necessary steps for http2 connection establishment on
// the server side.
func (ct *clientTester) greet() {
ct.wantClientPreface()
ct.wantSettingsFrame()
dfawley marked this conversation as resolved.
Show resolved Hide resolved
ct.writeSettingsFrame()
ct.writeSettingsAck()

for {
f, err := ct.fr.ReadFrame()
if err != nil {
ct.t.Errorf("error reading frame from client side: %v", err)
}
switch f := f.(type) {
case *http2.SettingsFrame:
if f.IsAck() { // HTTP/2 handshake completed.
return
}
default:
ct.t.Errorf("during greet, unexpected frame type %T", f)
}
}
}

func (ct *clientTester) wantClientPreface() {
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(ct.conn, preface); err != nil {
ct.t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
}
if !bytes.Equal(preface, clientPreface) {
ct.t.Errorf("received bogus greeting from client %q", preface)
}
}

func (ct *clientTester) wantSettingsFrame() {
frame, err := ct.fr.ReadFrame()
if err != nil {
ct.t.Errorf("error reading initial settings frame from client: %v", err)
}
_, ok := frame.(*http2.SettingsFrame)
if !ok {
ct.t.Errorf("initial frame sent from client is not a settings frame, type %T", frame)
}
}

func (ct *clientTester) writeSettingsFrame() {
if err := ct.fr.WriteSettings(); err != nil {
ct.t.Fatalf("Error writing initial SETTINGS frame from client to server: %v", err)
}
}

func (ct *clientTester) writeSettingsAck() {
if err := ct.fr.WriteSettingsAck(); err != nil {
ct.t.Fatalf("Error writing ACK of client's SETTINGS: %v", err)
}
}

func (ct *clientTester) writeGoAway(maxStreamID uint32, code http2.ErrCode, debugData []byte) {
if err := ct.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
ct.t.Fatalf("Error writing GOAWAY: %v", err)
}
}
60 changes: 59 additions & 1 deletion test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7407,7 +7407,6 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
return
}
writer.Flush() // necessary since client is expecting preface before declaring connection fully setup.

var sid uint32
// Loop until conn is closed and framer returns io.EOF
for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) {
Expand Down Expand Up @@ -8130,3 +8129,62 @@ func (s) TestRecvWhileReturningStatus(t *testing.T) {
}
}
}

// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
// sends a goaway with a stream id that is smaller than some created streams on
// the client, while the client is simultaneously creating new streams. This
// should not induce a deadlock.
func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}

ctCh := testutils.NewChannel()
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("error in lis.Accept(): %v", err)
}
ct := newClientTester(t, conn)
ct.greet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fold this into new?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I'm doing this very begrudingly. ServerTester is done this way, and I feel like it' made a lot clearer to the user of the clientTester struct that it gets to the point where the client is happy with it's HTTP2 connection establisment.

ctCh.Send(ct)
}()

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("error dialing: %v", err)
}
defer cc.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

val, err := ctCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
}
ct, ok := val.(*clientTester)
if !ok {
t.Fatalf("value received not a clientTester")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just let it panic and keep the code simpler. This is just a test and the panic is as informative as this error, and it's a coding error if it happens. ct := val.(*clientTester)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


tc := testpb.NewTestServiceClient(cc)
someStreamsCreated := grpcsync.NewEvent()
goAwayWritten := grpcsync.NewEvent()
go func() {
for i := 0; i < 20; i++ {
if i == 10 {
<-goAwayWritten.Done()
}
Comment on lines +8173 to +8175
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do that is different from blocking on <-goAwayWritten.Done() immediately after someStreamsCreate.Fire() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly was just playing around with this until it induced deadlock, it was hard to induce on master. This allows it to induce it because the only way to induce it is to concurrently create a stream (controlBuf.mu then transport.mu), where the closeStream was (transport.mu then controlBuf.mu). If I block where you suggested, there are no concurrent streams trying to be created while the goAway is being sent from server.

tc.FullDuplexCall(ctx)
if i == 4 {
someStreamsCreated.Fire()
}
}
}()

<-someStreamsCreated.Done()
ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
goAwayWritten.Fire()
}