diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e8fab62f9c6..bcb18d8bfd45 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -23,6 +23,7 @@ """ import abc import contextlib +import itertools import logging from bisect import bisect from contextlib import contextmanager @@ -191,7 +192,9 @@ async def user_syncing( """ @abc.abstractmethod - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + def get_currently_syncing_users_for_replication( + self, + ) -> Iterable[Tuple[str, Optional[str]]]: """Get an iterable of syncing users on this worker, to send to the presence handler This is called when a replication connection is established. It should return @@ -286,7 +289,12 @@ async def bump_presence_active_time( """ async def update_external_syncs_row( # noqa: B027 (no-op by design) - self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + self, + process_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + sync_time_msec: int, ) -> None: """Update the syncing users for an external process as a delta. @@ -297,6 +305,7 @@ async def update_external_syncs_row( # noqa: B027 (no-op by design) syncing against. This allows synapse to process updates as user start and stop syncing against a given process. user_id: The user who has started or stopped syncing + device_id: The device ID of the device that has started or stopped syncing. is_syncing: Whether or not the user is now syncing sync_time_msec: Time in ms when the user was last syncing """ @@ -427,16 +436,18 @@ def __init__(self, hs: "HomeServer"): hs.config.worker.writers.presence, ) - # The number of ongoing syncs on this process, by user id. + # The number of ongoing syncs on this process, by (user ID, device ID). # Empty if _presence_enabled is false. - self._user_to_num_current_syncs: Dict[str, int] = {} + self._user_device_to_num_current_syncs: Dict[ + Tuple[str, Optional[str]], int + ] = {} self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() - # user_id -> last_sync_ms. Lists the users that have stopped syncing but - # we haven't notified the presence writer of that yet - self.users_going_offline: Dict[str, int] = {} + # (user_id, device_id) -> last_sync_ms. Lists the devices that have stopped + # syncing but we haven't notified the presence writer of that yet + self.user_devices_going_offline: Dict[Tuple[str, Optional[str]], int] = {} self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs) @@ -459,39 +470,47 @@ async def _on_shutdown(self) -> None: ClearUserSyncsCommand(self.instance_id) ) - def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: + def send_user_sync( + self, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + last_sync_ms: int, + ) -> None: if self._presence_enabled: self.hs.get_replication_command_handler().send_user_sync( - self.instance_id, user_id, is_syncing, last_sync_ms + self.instance_id, user_id, device_id, is_syncing, last_sync_ms ) - def mark_as_coming_online(self, user_id: str) -> None: + def mark_as_coming_online(self, user_id: str, device_id: Optional[str]) -> None: """A user has started syncing. Send a UserSync to the presence writer, unless they had recently stopped syncing. """ - going_offline = self.users_going_offline.pop(user_id, None) + going_offline = self.user_devices_going_offline.pop((user_id, device_id), None) if not going_offline: # Safe to skip because we haven't yet told the presence writer they # were offline - self.send_user_sync(user_id, True, self.clock.time_msec()) + self.send_user_sync(user_id, device_id, True, self.clock.time_msec()) - def mark_as_going_offline(self, user_id: str) -> None: + def mark_as_going_offline(self, user_id: str, device_id: Optional[str]) -> None: """A user has stopped syncing. We wait before notifying the presence writer as its likely they'll come back soon. This allows us to avoid sending a stopped syncing immediately followed by a started syncing notification to the presence writer """ - self.users_going_offline[user_id] = self.clock.time_msec() + self.user_devices_going_offline[(user_id, device_id)] = self.clock.time_msec() def send_stop_syncing(self) -> None: """Check if there are any users who have stopped syncing a while ago and haven't come back yet. If there are poke the presence writer about them. """ now = self.clock.time_msec() - for user_id, last_sync_ms in list(self.users_going_offline.items()): + for (user_id, device_id), last_sync_ms in list( + self.user_devices_going_offline.items() + ): if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: - self.users_going_offline.pop(user_id, None) - self.send_user_sync(user_id, False, last_sync_ms) + self.user_devices_going_offline.pop((user_id, device_id), None) + self.send_user_sync(user_id, device_id, False, last_sync_ms) async def user_syncing( self, @@ -523,23 +542,23 @@ async def user_syncing( is_sync=True, ) - curr_sync = self._user_to_num_current_syncs.get(user_id, 0) - self._user_to_num_current_syncs[user_id] = curr_sync + 1 + curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0) + self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1 # If we went from no in flight sync to some, notify replication - if self._user_to_num_current_syncs[user_id] == 1: - self.mark_as_coming_online(user_id) + if self._user_device_to_num_current_syncs[(user_id, device_id)] == 1: + self.mark_as_coming_online(user_id, device_id) def _end() -> None: # We check that the user_id is in user_to_num_current_syncs because # user_to_num_current_syncs may have been cleared if we are # shutting down. - if user_id in self._user_to_num_current_syncs: - self._user_to_num_current_syncs[user_id] -= 1 + if (user_id, device_id) in self._user_device_to_num_current_syncs: + self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1 # If we went from one in flight sync to non, notify replication - if self._user_to_num_current_syncs[user_id] == 0: - self.mark_as_going_offline(user_id) + if self._user_device_to_num_current_syncs[(user_id, device_id)] == 0: + self.mark_as_going_offline(user_id, device_id) @contextlib.contextmanager def _user_syncing() -> Generator[None, None, None]: @@ -606,10 +625,15 @@ async def process_replication_rows( # If this is a federation sender, notify about presence updates. await self.maybe_send_presence_to_interested_destinations(state_to_notify) - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + def get_currently_syncing_users_for_replication( + self, + ) -> Iterable[Tuple[str, Optional[str]]]: return [ - user_id - for user_id, count in self._user_to_num_current_syncs.items() + (user_id, device_id) + for ( + user_id, + device_id, + ), count in self._user_device_to_num_current_syncs.items() if count > 0 ] @@ -736,14 +760,18 @@ def __init__(self, hs: "HomeServer"): ] = {} # Keeps track of the number of *ongoing* syncs on other processes. - # While any sync is ongoing on another process the user will never + # + # While any sync is ongoing on another process the user's device will never # go offline. + # # Each process has a unique identifier and an update frequency. If # no update is received from that process within the update period then # we assume that all the sync requests on that process have stopped. - # Stored as a dict from process_id to set of user_id, and a dict of - # process_id to millisecond timestamp last updated. - self.external_process_to_current_syncs: Dict[str, Set[str]] = {} + # Stored as a dict from process_id to set of (user_id, device_id), and + # a dict of process_id to millisecond timestamp last updated. + self.external_process_to_current_syncs: Dict[ + str, Set[Tuple[str, Optional[str]]] + ] = {} self.external_process_last_updated_ms: Dict[str, int] = {} self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") @@ -953,7 +981,10 @@ async def _handle_timeouts(self) -> None: # that were syncing on that process to see if they need to be timed # out. users_to_check.update( - self.external_process_to_current_syncs.pop(process_id, ()) + user_id + for user_id, device_id in self.external_process_to_current_syncs.pop( + process_id, () + ) ) self.external_process_last_updated_ms.pop(process_id) @@ -969,9 +1000,10 @@ async def _handle_timeouts(self) -> None: user_id: {device_id for device_id, count in devices.items() if count} for user_id, devices in self.user_to_device_to_num_current_syncs.items() } - # XXX This will not work yet. - # for user_ids in self.external_process_to_current_syncs.values(): - # syncing_user_devices.update(user_ids) + for user_id, device_id in itertools.chain( + *self.external_process_to_current_syncs.values() + ): + syncing_user_devices.setdefault(user_id, set()).add(device_id) changes = handle_timeouts( states, @@ -1072,6 +1104,9 @@ async def user_syncing( # To keep the single process behaviour consistent with worker mode, run the # same logic as `update_external_syncs_row`, even though it looks weird. + # + # TODO This does not really match update_external_syncs_row which sets + # the device state and then generates a new state from that. if prev_state.state == PresenceState.OFFLINE: await self._update_states( [ @@ -1115,12 +1150,19 @@ def _user_syncing() -> Generator[None, None, None]: return _user_syncing() - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + def get_currently_syncing_users_for_replication( + self, + ) -> Iterable[Tuple[str, Optional[str]]]: # since we are the process handling presence, there is nothing to do here. return [] async def update_external_syncs_row( - self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + self, + process_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + sync_time_msec: int, ) -> None: """Update the syncing users for an external process as a delta. @@ -1129,6 +1171,7 @@ async def update_external_syncs_row( syncing against. This allows synapse to process updates as user start and stop syncing against a given process. user_id: The user who has started or stopped syncing + device_id: The device ID of the device that has started or stopped syncing. is_syncing: Whether or not the user is now syncing sync_time_msec: Time in ms when the user was last syncing """ @@ -1139,12 +1182,23 @@ async def update_external_syncs_row( process_id, set() ) + devices = self.user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + device_state.last_sync_ts = sync_time_msec + updates = [] - if is_syncing and user_id not in process_presence: - if prev_state.state == PresenceState.OFFLINE: + if is_syncing and (user_id, device_id) not in process_presence: + if device_state.state == PresenceState.OFFLINE: + # Mark the device as online since it is not syncing, this + # might mark the user as online, so send an update. + device_state.state = PresenceState.ONLINE + presence = _combine_device_states(devices.values()) + updates.append( prev_state.copy_and_replace( - state=PresenceState.ONLINE, + state=presence, last_active_ts=sync_time_msec, last_user_sync_ts=sync_time_msec, ) @@ -1153,14 +1207,14 @@ async def update_external_syncs_row( updates.append( prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) ) - process_presence.add(user_id) - elif user_id in process_presence: + process_presence.add((user_id, device_id)) + elif (user_id, device_id) in process_presence: updates.append( prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) ) if not is_syncing: - process_presence.discard(user_id) + process_presence.discard((user_id, device_id)) if updates: await self._update_states(updates) @@ -1177,9 +1231,23 @@ async def update_external_syncs_clear(self, process_id: str) -> None: process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) - prev_states = await self.current_state_for_users(process_presence) + + # Mark each device as having a last sync time. time_now_ms = self.clock.time_msec() + updated_users = set() + for user_id, device_id in process_presence: + device_state = self.user_to_device_to_current_state.setdefault( + user_id, {} + ).setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + + device_state.last_sync_ts = time_now_ms + updated_users.add(user_id) + # Update eah user (and insert into the appropriate timers to check if + # they've gone offline). + prev_states = await self.current_state_for_users(updated_users) await self._update_states( [ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 10f5c98ff8a9..165f10cecd4a 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -267,27 +267,38 @@ class UserSyncCommand(Command): NAME = "USER_SYNC" def __init__( - self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int + self, + instance_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + last_sync_ms: int, ): self.instance_id = instance_id self.user_id = user_id + self.device_id = device_id self.is_syncing = is_syncing self.last_sync_ms = last_sync_ms @classmethod def from_line(cls: Type["UserSyncCommand"], line: str) -> "UserSyncCommand": - instance_id, user_id, state, last_sync_ms = line.split(" ", 3) + device_id: Optional[str] + instance_id, user_id, device_id, state, last_sync_ms = line.split(" ", 4) + + if device_id == "None": + device_id = None if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(instance_id, user_id, state == "start", int(last_sync_ms)) + return cls(instance_id, user_id, device_id, state == "start", int(last_sync_ms)) def to_line(self) -> str: return " ".join( ( self.instance_id, self.user_id, + str(self.device_id), "start" if self.is_syncing else "end", str(self.last_sync_ms), ) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index a2cabba7b1c5..2a999935528f 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -423,7 +423,11 @@ def on_USER_SYNC( if self._is_presence_writer: return self._presence_handler.update_external_syncs_row( - cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms + cmd.instance_id, + cmd.user_id, + cmd.device_id, + cmd.is_syncing, + cmd.last_sync_ms, ) else: return None @@ -685,9 +689,9 @@ def new_connection(self, connection: IReplicationConnection) -> None: ) now = self._clock.time_msec() - for user_id in currently_syncing: + for user_id, device_id in currently_syncing: connection.send_command( - UserSyncCommand(self._instance_id, user_id, True, now) + UserSyncCommand(self._instance_id, user_id, device_id, True, now) ) def lost_connection(self, connection: IReplicationConnection) -> None: @@ -739,11 +743,16 @@ def send_federation_ack(self, token: int) -> None: self.send_command(FederationAckCommand(self._instance_name, token)) def send_user_sync( - self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int + self, + instance_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + last_sync_ms: int, ) -> None: """Poke the master that a user has started/stopped syncing.""" self.send_command( - UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) + UserSyncCommand(instance_id, user_id, device_id, is_syncing, last_sync_ms) ) def send_user_ip( diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 73edd6aca13d..7034789e2bed 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -625,7 +625,7 @@ def test_external_process_timeout(self) -> None: # Notify handler that a user is now syncing. self.get_success( self.presence_handler.update_external_syncs_row( - process_id, user_id, True, self.clock.time_msec() + process_id, user_id, None, True, self.clock.time_msec() ) ) @@ -957,6 +957,22 @@ def test_set_presence_from_syncing_multi_device( ) self.assertEqual(state.state, expected_state_1) + # When testing with workers, make another random sync (with any *different* + # user) to keep the process information from expiring. + # + # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER. + if test_with_workers: + with self.get_success( + worker_presence_handler.user_syncing( + f"@other-user:{self.hs.config.server.server_name}", + "dev-3", + affect_presence=True, + presence_state=PresenceState.ONLINE, + ), + by=0.01, + ): + pass + # 5. Advance such that the first device should be discarded (the idle timer), # then pump so _handle_timeouts function to called. self.reactor.advance(IDLE_TIMER / 1000 / 2)