-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transport: Fix deadlock in transport caused by GOAWAY race with new stream creation #5652
Conversation
9bd4117
to
05537c1
Compare
05537c1
to
6004b0d
Compare
internal/transport/http2_client.go
Outdated
activeStreams := make(map[uint32]*Stream) | ||
for streamID, stream := range t.activeStreams { | ||
activeStreams[streamID] = stream | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a slice; only add streams to be closed to it. Early exit if t.activeStreams
is zero?
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(...)
return
}
streamsToClose := make([]*Stream, 0, len(t.activeStreams))
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
streamsToClose = append(streamsToClose, stream)
}
}
t.mu.Unlock()
for _, stream := range(streamsToClose) {
atomic.StoreUint32()
t.closeStream()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it important to have the cap(streamsToClose) len(t.activeStreams)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, sorry, I tried this way and for some reason it still induced deadlock. I'm going to keep it as is. I couldn't figure out why your way wouldn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Verified on my e2e_test I checked in and also on the transport test which I didn't check in since it wrote directly to framer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment. Sorry I couldn't get your solution working. The e2e test with the new helper class that actually induced it took everything out of me.
internal/transport/http2_client.go
Outdated
activeStreams := make(map[uint32]*Stream) | ||
for streamID, stream := range t.activeStreams { | ||
activeStreams[streamID] = stream | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it important to have the cap(streamsToClose) len(t.activeStreams)?
internal/transport/http2_client.go
Outdated
activeStreams := make(map[uint32]*Stream) | ||
for streamID, stream := range t.activeStreams { | ||
activeStreams[streamID] = stream | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, sorry, I tried this way and for some reason it still induced deadlock. I'm going to keep it as is. I couldn't figure out why your way wouldn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks! Just a few really small things.
return | ||
} | ||
|
||
streamsToClose := make([]*Stream, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var streamsToClose []*Stream
Or delete the ,0
but make
is unnecessary for a slice either way if you aren't allocating anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched. You initially suggested make for the capacity length. Why was that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you specify the capacity then it won't need to do reallocations as you add to it. In this case, since we don't have any idea how many active streams are before/after the ID (and it isn't worth computing it first) then this should be fine. It will need reallocations (Go doubles the slice capacity as you add so it's O(logN)), but this is a rare case and not worth optimizing.
t.mu.Unlock() | ||
if active == 0 { | ||
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) | ||
for _, stream := range streamsToClose { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment about why this is called outside t.mu
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/end2end_test.go
Outdated
t.Errorf("error in lis.Accept(): %v", err) | ||
} | ||
ct := newClientTester(t, conn) | ||
ct.greet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fold this into new
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I'm doing this very begrudingly. ServerTester is done this way, and I feel like it' made a lot clearer to the user of the clientTester struct that it gets to the point where the client is happy with it's HTTP2 connection establisment.
test/end2end_test.go
Outdated
ct, ok := val.(*clientTester) | ||
if !ok { | ||
t.Fatalf("value received not a clientTester") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just let it panic and keep the code simpler. This is just a test and the panic is as informative as this error, and it's a coding error if it happens. ct := val.(*clientTester)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if i == 10 { | ||
<-goAwayWritten.Done() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do that is different from blocking on <-goAwayWritten.Done()
immediately after someStreamsCreate.Fire()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly was just playing around with this until it induced deadlock, it was hard to induce on master. This allows it to induce it because the only way to induce it is to concurrently create a stream (controlBuf.mu then transport.mu), where the closeStream was (transport.mu then controlBuf.mu). If I block where you suggested, there are no concurrent streams trying to be created while the goAway is being sent from server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments :D!
return | ||
} | ||
|
||
streamsToClose := make([]*Stream, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched. You initially suggested make for the capacity length. Why was that?
test/end2end_test.go
Outdated
ct, ok := val.(*clientTester) | ||
if !ok { | ||
t.Fatalf("value received not a clientTester") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if i == 10 { | ||
<-goAwayWritten.Done() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly was just playing around with this until it induced deadlock, it was hard to induce on master. This allows it to induce it because the only way to induce it is to concurrently create a stream (controlBuf.mu then transport.mu), where the closeStream was (transport.mu then controlBuf.mu). If I block where you suggested, there are no concurrent streams trying to be created while the goAway is being sent from server.
test/end2end_test.go
Outdated
t.Errorf("error in lis.Accept(): %v", err) | ||
} | ||
ct := newClientTester(t, conn) | ||
ct.greet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I'm doing this very begrudingly. ServerTester is done this way, and I feel like it' made a lot clearer to the user of the clientTester struct that it gets to the point where the client is happy with it's HTTP2 connection establisment.
t.mu.Unlock() | ||
if active == 0 { | ||
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) | ||
for _, stream := range streamsToClose { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return | ||
} | ||
|
||
streamsToClose := make([]*Stream, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you specify the capacity then it won't need to do reallocations as you add to it. In this case, since we don't have any idea how many active streams are before/after the ID (and it isn't worth computing it first) then this should be fine. It will need reallocations (Go doubles the slice capacity as you add so it's O(logN)), but this is a rare case and not worth optimizing.
I clearly see this in the 1.50.x commit history: https://github.com/grpc/grpc-go/commits/v1.50.x. Scroll to 29 days ago. |
Fixes #5644.
RELEASE NOTES: