Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle remote device list updates during partial join #13913

Merged
merged 5 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13913.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster remote room joins: correctly handle remote device list updates during a partial join.
58 changes: 58 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
"self_signing_key": self_signing_key,
}

async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""

logger.error(
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"Trying handling device list state for partial join: not supported on workers."
)


class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -746,6 +755,15 @@ async def _handle_new_device_update_async(self) -> None:
finally:
self._handle_new_device_update_is_processing = False

async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""

# We defer to the device list updater implementation as we're on the
# right worker.
await self.device_list_updater.handle_room_un_partial_stated(room_id)


def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
Expand Down Expand Up @@ -836,6 +854,16 @@ async def incoming_device_list_update(
)
return

# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
partial_rooms = await self.store.get_partial_state_rooms_and_servers()
if partial_rooms:
await self.store.add_remote_device_list_to_pending(
user_id,
device_id,
)

room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
Expand Down Expand Up @@ -1175,3 +1203,33 @@ async def process_cross_signing_key_update(
device_ids.append(verify_key.version)

return device_ids

async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""

pending_updates = (
await self.store.get_pending_remote_device_list_updates_for_room(room_id)
)

for user_id, device_id in pending_updates:
logger.info(
"Got pending device list update in room %s: %s / %s",
room_id,
user_id,
device_id,
)
position = await self.store.add_device_change_to_streams(
user_id,
[device_id],
room_ids=[room_id],
)

if not position:
# This should only happen if there are no updates, so we bail.
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unreachable, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, yeah actually. Should we do something else here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we adjust the comment to explain that it's unreachable?


self.device_handler.notifier.on_new_event(
StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
)
4 changes: 4 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, hs: "HomeServer"):
self.http_client = hs.get_proxied_blacklisted_http_client()
self._replication = hs.get_replication_data_handler()
self._federation_event_handler = hs.get_federation_event_handler()
self._device_handler = hs.get_device_handler()

self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
Expand Down Expand Up @@ -1624,6 +1625,9 @@ async def _sync_partial_state_room(
# https://github.com/matrix-org/synapse/issues/12994
await self.state_handler.update_current_state(room_id)

logger.info("Handling any pending device list updates")
await self._device_handler.handle_room_un_partial_stated(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
Expand Down
55 changes: 55 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,3 +1995,58 @@ def add_device_list_outbound_pokes_txn(
add_device_list_outbound_pokes_txn,
stream_ids,
)

async def add_remote_device_list_to_pending(
self, user_id: str, device_id: str
) -> None:
"""Add a device list update to the table tracking remote device list
updates during partial joins.
"""

async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
await self.db_pool.simple_upsert(
table="device_lists_remote_pending",
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={"stream_id": stream_id},
desc="add_remote_device_list_to_pending",
)

async def get_pending_remote_device_list_updates_for_room(
self, room_id: str
) -> Collection[Tuple[str, str]]:
"""Get the set of remote device list updates from the pending table for
the room.
"""

min_device_stream_id = await self.db_pool.simple_select_one_onecol(
table="partial_state_rooms",
keyvalues={
"room_id": room_id,
},
retcol="device_lists_stream_id",
desc="get_pending_remote_device_list_updates_for_room_device",
)

sql = """
SELECT user_id, device_id FROM device_lists_remote_pending AS d
INNER JOIN current_state_events AS c ON
type = 'm.room.member'
AND state_key = user_id
AND membership = 'join'
WHERE
room_id = ? AND stream_id > ?
"""

def get_pending_remote_device_list_updates_for_room_txn(
txn: LoggingTransaction,
) -> Collection[Tuple[str, str]]:
txn.execute(sql, (room_id, min_device_stream_id))
return cast(Collection[Tuple[str, str]], txn.fetchall())

return await self.db_pool.runInteraction(
"get_pending_remote_device_list_updates_for_room",
get_pending_remote_device_list_updates_for_room_txn,
)
20 changes: 20 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,26 @@ def _clear_partial_state_room_txn(
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))

# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id < ?
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
txn.execute(sql, (device_lists_stream_id,))

@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Stores remote device lists we have received for remote users while a partial
-- join is in progress.
--
-- This allows us to replay any device list updates if it turns out the remote
-- user was in the partially joined room
CREATE TABLE device_lists_remote_pending(
stream_id BIGINT PRIMARY KEY,
user_id TEXT NOT NULL,
device_id TEXT NOT NULL
);

-- We only keep the most recent update for a given user/device pair.
CREATE UNIQUE INDEX device_lists_remote_pending_user_device_id ON device_lists_remote_pending(user_id, device_id);
squahtx marked this conversation as resolved.
Show resolved Hide resolved