From 12461b3a090b054104d992693d30df311d5f67a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Oct 2023 13:50:06 +0100 Subject: [PATCH 1/5] Reduce spurious replication catchup --- synapse/replication/tcp/handler.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 1d586fb180b6..b4d65a35cb5f 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -611,10 +611,14 @@ async def _process_position( # Find where we previously streamed up to. current_token = stream.current_token(cmd.instance_name) - # If the position token matches our current token then we're up to - # date and there's nothing to do. Otherwise, fetch all updates - # between then and now. - missing_updates = cmd.prev_token != current_token + # If the position token matches our current token then we're up to date + # and there's nothing to do. Otherwise, fetch all updates between then + # and now. + # + # Note: We have to check that `current_token` is within the range, to + # handle the case where the stream gets "reset" (e.g. for `caches` and + # `typing` after the writer's restart). + missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token) while missing_updates: # Note: There may very well not be any new updates, but we check to # make sure. This can particularly happen for the event stream where @@ -644,7 +648,7 @@ async def _process_position( [stream.parse_row(row) for row in rows], ) - logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token) + logger.info("Caught up with stream '%s' to %i", stream_name, current_token) # We've now caught up to position sent to us, notify handler. await self._replication_data_handler.on_position( From b43fc5e9c236d6082bc6c57fcadc58f72edf12b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Oct 2023 13:57:25 +0100 Subject: [PATCH 2/5] Newsfile --- changelog.d/16555.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16555.misc diff --git a/changelog.d/16555.misc b/changelog.d/16555.misc new file mode 100644 index 000000000000..d02efb21147b --- /dev/null +++ b/changelog.d/16555.misc @@ -0,0 +1 @@ +Reduce some spurious logging in worker mode. From 78bdba5c54285e3ff7db3fad3b1cedb1620d1651 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Oct 2023 13:13:12 +0100 Subject: [PATCH 3/5] Update comments --- synapse/replication/tcp/handler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b4d65a35cb5f..ed216bb22fa8 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -611,13 +611,13 @@ async def _process_position( # Find where we previously streamed up to. current_token = stream.current_token(cmd.instance_name) - # If the position token matches our current token then we're up to date - # and there's nothing to do. Otherwise, fetch all updates between then - # and now. + # If the incoming previous position is less than our current position + # then we're up to date and there's nothing to do. Otherwise, fetch + # all updates between then and now. # - # Note: We have to check that `current_token` is within the range, to - # handle the case where the stream gets "reset" (e.g. for `caches` and - # `typing` after the writer's restart). + # Note: We also have to check that `current_token` is at least the + # new position, to handle the case where the stream gets "reset" + # (e.g. for `caches` and `typing` after the writer's restart). missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token) while missing_updates: # Note: There may very well not be any new updates, but we check to From 60ec71aacb5d3ee5f5851366cae8cf8977a53853 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Oct 2023 13:23:49 +0100 Subject: [PATCH 4/5] Update synapse/replication/tcp/handler.py Co-authored-by: Patrick Cloke --- synapse/replication/tcp/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ed216bb22fa8..893f9939a972 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -615,7 +615,7 @@ async def _process_position( # then we're up to date and there's nothing to do. Otherwise, fetch # all updates between then and now. # - # Note: We also have to check that `current_token` is at least the + # Note: We also have to check that `current_token` is at most the # new position, to handle the case where the stream gets "reset" # (e.g. for `caches` and `typing` after the writer's restart). missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token) From 363b3551afbce02c66fc26d4ab04ec83bb4b15eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Oct 2023 13:33:27 +0100 Subject: [PATCH 5/5] Fix lint --- synapse/replication/tcp/handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 893f9939a972..afd03137f0fd 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -612,11 +612,11 @@ async def _process_position( current_token = stream.current_token(cmd.instance_name) # If the incoming previous position is less than our current position - # then we're up to date and there's nothing to do. Otherwise, fetch - # all updates between then and now. + # then we're up to date and there's nothing to do. Otherwise, fetch + # all updates between then and now. # # Note: We also have to check that `current_token` is at most the - # new position, to handle the case where the stream gets "reset" + # new position, to handle the case where the stream gets "reset" # (e.g. for `caches` and `typing` after the writer's restart). missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token) while missing_updates: