-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
Progress notifications requested using ProgressRequest were sent directly using the ctrlStream, which means that they could race against watch responses in the watchStream. This would especially happen when the stream was not synced - e.g. if you requested a progress notification on a freshly created unsynced watcher, the notification would typically arrive indicating a revision for which not all watch responses had been sent. This changes the behaviour so that v3rpc always goes through the watch stream, using a new RequestProgressAll function that closely matches the behaviour of the v3rpc code - i.e. 1. Generate a message with WatchId -1, indicating the revision for *all* watchers in the stream 2. Guarantee that a response is (eventually) sent The latter might require us to defer the response until all watchers are synced, which is likely as it should be. Note that we do *not* guarantee that the number of progress notifications matches the number of requests, only that eventually at least one gets sent.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -316,10 +316,9 @@ func (sws *serverWatchStream) recvLoop() error { | |||
} | ||||
This comment has been minimized.
Sorry, something went wrong. |
||||
case *pb.WatchRequest_ProgressRequest: | ||||
if uv.ProgressRequest != nil { | ||||
sws.ctrlStream <- &pb.WatchResponse{ | ||||
Header: sws.newResponseHeader(sws.watchStream.Rev()), | ||||
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels | ||||
} | ||||
// Request progress for all watchers, | ||||
// force generation of a response | ||||
sws.watchStream.RequestProgressAll(true) | ||||
} | ||||
default: | ||||
// we probably should not shutdown the entire stream when | ||||
|
@@ -363,6 +362,7 @@ func (sws *serverWatchStream) sendLoop() { | |||
// either return []*mvccpb.Event from the mvcc package | ||||
// or define protocol buffer with []mvccpb.Event. | ||||
evs := wresp.Events | ||||
progressNotify := len(evs) == 0 | ||||
This comment has been minimized.
Sorry, something went wrong.
ahrtr
|
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) |
This comment has been minimized.
This comment has been minimized.
Sorry, something went wrong.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ var ( | |
type watchable interface { | ||
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) | ||
progress(w *watcher) | ||
progress_all(force bool) | ||
rev() int64 | ||
} | ||
|
||
|
@@ -62,6 +63,9 @@ type watchableStore struct { | |
// The key of the map is the key that the watcher watches on. | ||
synced watcherGroup | ||
|
||
// Whether to generate a progress notification once all watchers are synchronised | ||
progressOnSync bool | ||
This comment has been minimized.
Sorry, something went wrong.
ahrtr
|
||
|
||
stopc chan struct{} | ||
wg sync.WaitGroup | ||
} | ||
|
@@ -79,11 +83,12 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci ci | |
lg = zap.NewNop() | ||
} | ||
s := &watchableStore{ | ||
store: NewStore(lg, b, le, ci, cfg), | ||
victimc: make(chan struct{}, 1), | ||
unsynced: newWatcherGroup(), | ||
synced: newWatcherGroup(), | ||
stopc: make(chan struct{}), | ||
store: NewStore(lg, b, le, ci, cfg), | ||
victimc: make(chan struct{}, 1), | ||
unsynced: newWatcherGroup(), | ||
synced: newWatcherGroup(), | ||
stopc: make(chan struct{}), | ||
progressOnSync: false, | ||
} | ||
s.store.ReadView = &readView{s} | ||
s.store.WriteView = &writeView{s} | ||
|
@@ -406,6 +411,15 @@ func (s *watchableStore) syncWatchers() int { | |
} | ||
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) | ||
|
||
// Deferred progress notification left to send when synced? | ||
if s.progressOnSync && s.unsynced.size() == 0 { | ||
for w, _ := range s.synced.watchers { | ||
w.send(WatchResponse{WatchID: -1, Revision: s.rev()}) | ||
break | ||
} | ||
s.progressOnSync = false | ||
} | ||
|
||
return s.unsynced.size() | ||
} | ||
|
||
|
@@ -484,6 +498,27 @@ func (s *watchableStore) progress(w *watcher) { | |
} | ||
} | ||
|
||
func (s *watchableStore) progress_all(force bool) { | ||
s.mu.RLock() | ||
defer s.mu.RUnlock() | ||
|
||
// Any watcher unsynced? | ||
if s.unsynced.size() > 0 { | ||
// If forced: Defer progress until successfully synced | ||
if force { | ||
s.progressOnSync = true | ||
} | ||
|
||
} else { | ||
// If all watchers are synchronised, send out progress | ||
// watch response on first watcher (if any) | ||
for w, _ := range s.synced.watchers { | ||
w.send(WatchResponse{WatchID: -1, Revision: s.rev()}) | ||
break | ||
} | ||
} | ||
This comment has been minimized.
Sorry, something went wrong.
ahrtr
|
||
} | ||
|
||
type watcher struct { | ||
// the watcher key | ||
key []byte | ||
|
This commit branches from some commit around v3.4.10 release. Any fix should be applied to the latest affected etcd version and we can backport it later. Please cherry pick it to main branch.