-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xdsclient: don't reset version info after stream restart #5422
Conversation
7ebf9e4
to
65a605f
Compare
test/xds/xds_client_ack_nack_test.go
Outdated
// corresponds to ACKs, and this is what we want to capture. | ||
if len(request.GetResourceNames()) != 0 && request.GetVersionInfo() != "" { | ||
ackVersionsBeforeRestart[request.GetTypeUrl()] = request.GetVersionInfo() | ||
if len(ackVersionsBeforeRestart) == wantResources { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be an else
here so it fails if we go over the expectation?
Another option perhaps: maybe push the resources to the channel instead and validate them back in the main test code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed resources into a channel and changed to validate them from the main test goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed further test simplification ideas offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks.
test/xds/xds_client_ack_nack_test.go
Outdated
if ackVersionsAfterRestart[request.GetTypeUrl()] == "" { | ||
ackVersionsAfterRestart[request.GetTypeUrl()] = request.GetVersionInfo() | ||
} | ||
if len(ackVersionsAfterRestart) == wantResources { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Same here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/xds/xds_client_ack_nack_test.go
Outdated
select { | ||
case resourcesRequestedBeforeStreamClose <- struct{}{}: | ||
default: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just push to the channel normally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required anymore.
test/xds/xds_client_ack_nack_test.go
Outdated
// Channels to notify that all expected resources have been requested by the | ||
// xdsClient before and after stream restart. | ||
resourcesRequestedBeforeStreamClose := make(chan struct{}, 1) | ||
resourcesRequestedAfterStreamClose := make(chan struct{}, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these just be normal, unbuffered channels, that are closed when the condition happens? Or use a grpcsync.Event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, these channels are buffered to have enough space for 4 resources.
d593272
to
e6adc7b
Compare
test/xds/xds_client_ack_nack_test.go
Outdated
ackVersionsAfterRestart := make(map[string]string) | ||
AcksAfterRestart: | ||
for { | ||
select { | ||
case tuple := <-requestsCh: | ||
if tuple.id == idBeforeRestart { | ||
// Ignore any stray requests from the old stream. | ||
continue | ||
} | ||
if tuple.id != idAfterRestart { | ||
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idAfterRestart) | ||
} | ||
// After stream closure, capture the first request for every resource. | ||
// This should not be set to an empty version string, but instead should | ||
// be set to the version last ACKed before stream closure. | ||
req := tuple.req | ||
if len(req.GetResourceNames()) != 0 { | ||
ackVersionsAfterRestart[req.GetTypeUrl()] = req.GetVersionInfo() | ||
} | ||
if len(ackVersionsAfterRestart) == wantResources { | ||
break AcksAfterRestart | ||
} | ||
case <-ctx.Done(): | ||
t.Fatal("timeout when waiting for resources to be re-requested after stream restart") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this in the loop above? I.e. store the first wantResources
requests for each stream ID. Could either be [<id int>]map[string]string
or map[<id int>]map[string]string
, or keep it as two different maps.
You could potentially go even further: get rid of the channel entirely, accumulate the maps directly in the callback instead, and close a channel / fire an event / cancel a context (can have the timeout on it) when the maps have been populated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Moved the logic to populate the map into the callback. The test goroutine now waits for a signal from the callback to go ahead and verify the acks before and after stream restart.
The logic to extract and compare the acks from the populated map seems a little verbose. Not sure if I can do any better though. Thanks.
test/xds/xds_client_ack_nack_test.go
Outdated
acksBeforeRestart := ackVersionsMap[idBeforeRestart] | ||
acksAfterRestart := ackVersionsMap[idAfterRestart] | ||
if !cmp.Equal(acksBeforeRestart, acksAfterRestart) { | ||
t.Fatalf("ACKs before restart: %v and ACKs after restart: %v don't match", acksBeforeRestart, acksAfterRestart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use cmp.Diff
instead of cmp.Equal
and display that output instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/xds/xds_client_ack_nack_test.go
Outdated
var idBeforeRestart, idAfterRestart int64 | ||
for id := range ackVersionsMap { | ||
if id > idAfterRestart { | ||
idBeforeRestart = idAfterRestart | ||
idAfterRestart = id | ||
} else { | ||
idBeforeRestart = id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this needs to be so complicated now. Why can't we use 1 and 2 directly anymore as before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we moved to having a map indexed by the stream id, I thought this should be the way to go. Over engineering .. sigh.
Done now. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Go had better generics support already, I wouldn't mind it so much as:
idBeforeRestart := min(keys(ackVersionsMap))
idAfterRestart := max(keys(ackVersionsMap))
When resending resource names after an ADS stream failure, we were resetting the resource version information received on the previous stream. This was our behavior from day 1, but this was in conflict with that is specified in the spec. This change brings the Go implementation to be in accordance with the spec.
Summary of changes:
xdsClient
to not reset version information about ADS stream restartxdsclient/controller/controller.go
andxdsclient/controller/transport/go
. The remaining changes are test related.ManagementServer
type defined inxds/e2e
package.net.Listener
for tests which need to customize the listener.StartManagementServer
andSetupManagementServer
to accept these options.Fixes #5351
RELEASE NOTES:
xdsClient
will not reset resource version information after ADS stream restart