From 416eda70f39787679e73f54e15a00dd1fe969241 Mon Sep 17 00:00:00 2001 From: Peter Wortmann Date: Tue, 31 Jan 2023 23:26:23 +0000 Subject: [PATCH] v3rpc, mvcc: Guarantee order of requested progress notifications Progress notifications requested using ProgressRequest were sent directly using the ctrlStream, which means that they could race against watch responses in the watchStream. This would especially happen when the stream was not synced - e.g. if you requested a progress notification on a freshly created unsynced watcher, the notification would typically arrive indicating a revision for which not all watch responses had been sent. This changes the behaviour so that v3rpc always goes through the watch stream, using a new RequestProgressAll function that closely matches the behaviour of the v3rpc code - i.e. 1. Generate a message with WatchId -1, indicating the revision for *all* watchers in the stream 2. Guarantee that a response is (eventually) sent The latter might require us to defer the response until all watchers are synced, which is likely as it should be. Note that we do *not* guarantee that the number of progress notifications matches the number of requests, only that eventually at least one gets sent. Signed-off-by: Peter Wortmann --- server/etcdserver/api/v3rpc/watch.go | 38 ++++++++++++++++++++------ server/storage/mvcc/watchable_store.go | 21 ++++++++++++++ server/storage/mvcc/watcher.go | 13 +++++++++ 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 5153007258d3..fde58d618697 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -145,6 +145,10 @@ type serverWatchStream struct { // records fragmented watch IDs fragment map[mvcc.WatchID]bool + // indicates whether we have an outstanding global progress + // notification to send + deferredProgress bool + // closec indicates the stream is closed. closec chan struct{} @@ -174,6 +178,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { prevKV: make(map[mvcc.WatchID]bool), fragment: make(map[mvcc.WatchID]bool), + deferredProgress: false, + closec: make(chan struct{}), } @@ -360,10 +366,16 @@ func (sws *serverWatchStream) recvLoop() error { } case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { - sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels + sws.mu.Lock() + // Ignore if deferred progress notification is already in progress + if !sws.deferredProgress { + // Request progress for all watchers, + // force generation of a response + if !sws.watchStream.RequestProgressAll() { + sws.deferredProgress = true + } } + sws.mu.Unlock() } default: // we probably should not shutdown the entire stream when @@ -408,6 +420,7 @@ func (sws *serverWatchStream) sendLoop() { // either return []*mvccpb.Event from the mvcc package // or define protocol buffer with []mvccpb.Event. evs := wresp.Events + progressNotify := len(evs) == 0 events := make([]*mvccpb.Event, len(evs)) sws.mu.RLock() needPrevKV := sws.prevKV[wresp.WatchID] @@ -432,11 +445,15 @@ func (sws *serverWatchStream) sendLoop() { Canceled: canceled, } - if _, okID := ids[wresp.WatchID]; !okID { - // buffer if id not yet announced - wrs := append(pending[wresp.WatchID], wr) - pending[wresp.WatchID] = wrs - continue + // Progress notifications can have WatchID -1 + // if they announce on behalf of multiple watchers + if !progressNotify || wresp.WatchID != clientv3.InvalidWatchID { + if _, okID := ids[wresp.WatchID]; !okID { + // buffer if id not yet announced + wrs := append(pending[wresp.WatchID], wr) + pending[wresp.WatchID] = wrs + continue + } } mvcc.ReportEventReceived(len(evs)) @@ -467,6 +484,11 @@ func (sws *serverWatchStream) sendLoop() { // elide next progress update if sent a key update sws.progress[wresp.WatchID] = false } + if sws.deferredProgress { + if sws.watchStream.RequestProgressAll() { + sws.deferredProgress = false + } + } sws.mu.Unlock() case c, ok := <-sws.ctrlStream: diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index adf07f7755b0..2fd5e6d85839 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -41,6 +41,7 @@ var ( type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) progress(w *watcher) + progress_all(watchers map[WatchID]*watcher) bool rev() int64 } @@ -482,6 +483,26 @@ func (s *watchableStore) progress(w *watcher) { } } +func (s *watchableStore) progress_all(watchers map[WatchID]*watcher) bool { + s.mu.Lock() + defer s.mu.Unlock() + + // Any watcher unsynced? + for w, _ := range watchers { + if _, ok := s.unsynced.watchers[watchers[w]]; ok { + return false + } + } + + // If all watchers are synchronised, send out progress notification + // on first watcher + for w, _ := range watchers { + watchers[w].send(WatchResponse{WatchID: -1, Revision: s.rev()}) + return true + } + return true +} + type watcher struct { // the watcher key key []byte diff --git a/server/storage/mvcc/watcher.go b/server/storage/mvcc/watcher.go index 7d2490b1d6e9..91b678a99565 100644 --- a/server/storage/mvcc/watcher.go +++ b/server/storage/mvcc/watcher.go @@ -58,6 +58,13 @@ type WatchStream interface { // of the watchers since the watcher is currently synced. RequestProgress(id WatchID) + // RequestProgressAll requests a progress notification for the + // entire watcher group. The responses will be sent through + // the WatchRespone Chan of the first watcher of this stream, + // if any. The response will only be sent if all watchers are + // synced, in which case the function returns true + RequestProgressAll() bool + // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. Cancel(id WatchID) error @@ -188,3 +195,9 @@ func (ws *watchStream) RequestProgress(id WatchID) { } ws.watchable.progress(w) } + +func (ws *watchStream) RequestProgressAll() bool { + ws.mu.Lock() + defer ws.mu.Unlock() + return ws.watchable.progress_all(ws.watchers) +}