From f06885c49ee65b6d61e6f9e42ee90d33e9777ad1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 10:19:36 +0100 Subject: [PATCH 01/10] Use `(user_id, room_id)` bounds for `event_push_summary` As `event_push_summary` doesn't have an index on `stream_ordering`. --- .../databases/main/event_push_actions.py | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 7f7bcb70940b..d2101c29b57c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -316,6 +316,44 @@ def add_thread_id_txn( return processed_rows + def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: + min_user_id = progress.get("max_summary_user_id", "") + min_room_id = progress.get("max_summary_room_id", "") + + sql = """ + SELECT user_id, room_id FROM event_push_summary + WHERE (user_id, room_id) > (?, ?) + AND thread_id IS NULL + ORDER BY user_id, room_id + LIMIT 1 + OFFSET ? + """ + + txn.execute(sql, (min_user_id, min_room_id, batch_size)) + row = txn.fetchone() + if not row: + return 0 + + max_user_id, max_room_id = row + + sql = """ + UPDATE event_push_summary + SET thread_id = 'main' + WHERE + (?, ?) < (user_id, room_id) AND (user_id, room_id) < (?, ?) + AND thread_is IS NULL + """ + txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) + processed_rows = txn.rowcount + + progress["max_summary_user_id"] = max_user_id + progress["max_summary_room_id"] = max_room_id + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + + return processed_rows + # First update the event_push_actions table, then the event_push_summary table. # # Note that the event_push_actions_staging table is ignored since it is @@ -331,9 +369,7 @@ def add_thread_id_txn( else: result = await self.db_pool.runInteraction( "event_push_backfill_thread_id", - add_thread_id_txn, - "event_push_summary", - progress.get("max_event_push_summary_stream_ordering", 0), + add_thread_id_summary_txn, ) # Only done after the event_push_summary table is done. From e036bf201a800ce05ef0387d7f5b5163dabb1618 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 10:20:39 +0100 Subject: [PATCH 02/10] Remove unnecessary generic option --- synapse/storage/databases/main/event_push_actions.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index d2101c29b57c..9eeed420a869 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -269,11 +269,11 @@ async def _background_backfill_thread_id( event_push_actions_done = progress.get("event_push_actions_done", False) def add_thread_id_txn( - txn: LoggingTransaction, table_name: str, start_stream_ordering: int + txn: LoggingTransaction, start_stream_ordering: int ) -> int: sql = f""" SELECT stream_ordering - FROM {table_name} + FROM event_push_actions WHERE thread_id IS NULL AND stream_ordering > ? @@ -285,7 +285,7 @@ def add_thread_id_txn( # No more rows to process. rows = txn.fetchall() if not rows: - progress[f"{table_name}_done"] = True + progress["event_push_actions_done"] = True self.db_pool.updates._background_update_progress_txn( txn, "event_push_backfill_thread_id", progress ) @@ -294,8 +294,8 @@ def add_thread_id_txn( # Update the thread ID for any of those rows. max_stream_ordering = rows[-1][0] - sql = f""" - UPDATE {table_name} + sql = """ + UPDATE event_push_actions SET thread_id = 'main' WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL """ @@ -309,7 +309,7 @@ def add_thread_id_txn( # Update progress. processed_rows = txn.rowcount - progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering + progress["max_event_push_actions_stream_ordering"] = max_stream_ordering self.db_pool.updates._background_update_progress_txn( txn, "event_push_backfill_thread_id", progress ) From 872b91564ae780b6504e18390fad47995956690d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 10:23:56 +0100 Subject: [PATCH 03/10] Newsfile --- changelog.d/14181.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14181.bugfix diff --git a/changelog.d/14181.bugfix b/changelog.d/14181.bugfix new file mode 100644 index 000000000000..36521c670c60 --- /dev/null +++ b/changelog.d/14181.bugfix @@ -0,0 +1 @@ +Fix poor performance of the `event_push_backfill_thread_id` background update, which was introduced in Synapse 1.68.0rc1. From 3044652b7f628d92fdf29031b53e7462fd7cda0e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 10:30:49 +0100 Subject: [PATCH 04/10] Fix lint --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 9eeed420a869..1adba199bdd0 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -271,7 +271,7 @@ async def _background_backfill_thread_id( def add_thread_id_txn( txn: LoggingTransaction, start_stream_ordering: int ) -> int: - sql = f""" + sql = """ SELECT stream_ordering FROM event_push_actions WHERE From 6af46896eb96435dd7686a96659804ba40d17f1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 11:21:48 +0100 Subject: [PATCH 05/10] Fix bg update --- synapse/storage/databases/main/event_push_actions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1adba199bdd0..89fd8c6e0463 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -363,7 +363,6 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: result = await self.db_pool.runInteraction( "event_push_backfill_thread_id", add_thread_id_txn, - "event_push_actions", progress.get("max_event_push_actions_stream_ordering", 0), ) else: From eae6ed45351697e7680f5302e8e91687b9d173a9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 11:40:28 +0100 Subject: [PATCH 06/10] Need an inclsuive bound --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 89fd8c6e0463..a4190d685154 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -340,7 +340,7 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: UPDATE event_push_summary SET thread_id = 'main' WHERE - (?, ?) < (user_id, room_id) AND (user_id, room_id) < (?, ?) + (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) AND thread_is IS NULL """ txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) From d0325efccda0733ac84e7c29389225e06c6aee17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 12:13:23 +0100 Subject: [PATCH 07/10] Handle the last batch --- .../databases/main/event_push_actions.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index a4190d685154..6e56e28f224f 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -331,19 +331,26 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: txn.execute(sql, (min_user_id, min_room_id, batch_size)) row = txn.fetchone() - if not row: - return 0 - max_user_id, max_room_id = row + # Calculate an upper bound. If the previous query doesn't return + # anything then we are almost at the end and process any remaining + # rows. + upper_bound = "" + upper_bound_args = [] + if row: + max_user_id, max_room_id = row - sql = """ + upper_bound = "AND (user_id, room_id) <= (?, ?)" + upper_bound_args = [max_user_id, max_room_id] + + sql = f""" UPDATE event_push_summary SET thread_id = 'main' WHERE - (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) + (?, ?) < (user_id, room_id) {upper_bound} AND thread_is IS NULL """ - txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) + txn.execute(sql, [min_user_id, min_room_id] + upper_bound_args) processed_rows = txn.rowcount progress["max_summary_user_id"] = max_user_id From b4711a324e7ca41475ec144fb7c607004d9af246 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 13:24:10 +0100 Subject: [PATCH 08/10] Fix typo --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 6e56e28f224f..9f9ae00fd786 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -348,7 +348,7 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: SET thread_id = 'main' WHERE (?, ?) < (user_id, room_id) {upper_bound} - AND thread_is IS NULL + AND thread_id IS NULL """ txn.execute(sql, [min_user_id, min_room_id] + upper_bound_args) processed_rows = txn.rowcount From b4e1edae3ebb4e424cb24e785263694c50cf3070 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 13:57:18 +0100 Subject: [PATCH 09/10] Different approach --- .../databases/main/event_push_actions.py | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 9f9ae00fd786..4875ea2181c1 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -320,37 +320,35 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: min_user_id = progress.get("max_summary_user_id", "") min_room_id = progress.get("max_summary_room_id", "") + # Slightly overcomplicated query for getting the Nth user ID / room + # ID tuple, or the last if there are less than N remaining. sql = """ - SELECT user_id, room_id FROM event_push_summary - WHERE (user_id, room_id) > (?, ?) - AND thread_id IS NULL - ORDER BY user_id, room_id + SELECT user_id, room_id FROM ( + SELECT user_id, room_id FROM event_push_summary + WHERE (user_id, room_id) > (?, ?) + AND thread_id IS NULL + ORDER BY user_id, room_id + LIMIT ? + ) AS e + ORDER BY user_id DESC, room_id DESC LIMIT 1 - OFFSET ? """ txn.execute(sql, (min_user_id, min_room_id, batch_size)) row = txn.fetchone() + if not row: + return 0 - # Calculate an upper bound. If the previous query doesn't return - # anything then we are almost at the end and process any remaining - # rows. - upper_bound = "" - upper_bound_args = [] - if row: - max_user_id, max_room_id = row - - upper_bound = "AND (user_id, room_id) <= (?, ?)" - upper_bound_args = [max_user_id, max_room_id] + max_user_id, max_room_id = row sql = f""" UPDATE event_push_summary SET thread_id = 'main' WHERE - (?, ?) < (user_id, room_id) {upper_bound} + (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) AND thread_id IS NULL """ - txn.execute(sql, [min_user_id, min_room_id] + upper_bound_args) + txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) processed_rows = txn.rowcount progress["max_summary_user_id"] = max_user_id From 56f7d4c8c94140df0a032aa68472905cdd617d94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Oct 2022 14:06:35 +0100 Subject: [PATCH 10/10] LINT --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 4875ea2181c1..72cf91eb39c7 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -341,7 +341,7 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: max_user_id, max_room_id = row - sql = f""" + sql = """ UPDATE event_push_summary SET thread_id = 'main' WHERE