From eaf3ab1436c3360f82b9f8b18c14aacfdea8818c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kate=C5=99ina=20Churanov=C3=A1?= Date: Mon, 22 Aug 2022 18:39:14 +0200 Subject: [PATCH 1/4] fix: Push notifications for invite over federation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kateřina Churanová --- changelog.d/13719.bugfix | 1 + synapse/events/__init__.py | 13 ++++++++++--- synapse/handlers/federation.py | 6 ++++++ synapse/handlers/federation_event.py | 15 ++++++++------- synapse/push/bulk_push_rule_evaluator.py | 10 +++++++--- synapse/push/push_rule_evaluator.py | 16 ++++++++-------- synapse/storage/controllers/persist_events.py | 10 ++++++---- synapse/storage/databases/main/events.py | 14 ++++++-------- 8 files changed, 52 insertions(+), 33 deletions(-) create mode 100644 changelog.d/13719.bugfix diff --git a/changelog.d/13719.bugfix b/changelog.d/13719.bugfix new file mode 100644 index 000000000000..4318f4dafff5 --- /dev/null +++ b/changelog.d/13719.bugfix @@ -0,0 +1 @@ +Send invite push notifications for invite over federation. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 39ad2793d98d..c6690ee041f1 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -37,7 +37,7 @@ from typing_extensions import Literal from unpaddedbase64 import encode_base64 -from synapse.api.constants import RelationTypes +from synapse.api.constants import Membership, RelationTypes from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.types import JsonDict, RoomStreamToken from synapse.util.caches import intern_dict @@ -339,12 +339,19 @@ def event_id(self) -> str: raise NotImplementedError() @property - def membership(self) -> str: - return self.content["membership"] + def membership(self) -> Optional[str]: + return self.content.get("membership") def is_state(self) -> bool: return self.get_state_key() is not None + @property + def is_notifiable(self) -> bool: + return ( + self.membership == Membership.INVITE + or not self.internal_metadata.is_outlier() + ) + def get_state_key(self) -> Optional[str]: """Get the state key of this event, or None if it's not a state event""" return self._dict.get("state_key") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dd4b9f66d10e..1cf09356a013 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -946,6 +946,12 @@ async def on_invite_request( ) context = EventContext.for_outlier(self._storage_controllers) + + if event.is_notifiable: + await self._federation_event_handler.bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) + await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ace7adcffb61..db04caea0664 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -145,7 +145,7 @@ def __init__(self, hs: "HomeServer"): self._event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() - self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() + self.bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._state_resolution_handler = hs.get_state_resolution_handler() # avoid a circular dependency by deferring execution here self._get_room_member_handler = hs.get_room_member_handler @@ -2110,7 +2110,7 @@ async def _run_push_actions_and_persist_event( min_depth, ) else: - await self._bulk_push_rule_evaluator.action_for_event_by_user( + await self.bulk_push_rule_evaluator.action_for_event_by_user( event, context ) @@ -2153,6 +2153,7 @@ async def persist_events_and_notify( if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` + result = {} try: for batch in batch_iter(event_and_contexts, 200): result = await self._send_events( @@ -2173,14 +2174,14 @@ async def persist_events_and_notify( # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. ( - events, + output_events, max_stream_token, ) = await self._storage_controllers.persistence.persist_events( event_and_contexts, backfilled=backfilled ) if self._ephemeral_messages_enabled: - for event in events: + for event in output_events: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) @@ -2188,13 +2189,13 @@ async def persist_events_and_notify( with start_active_span("notify_persisted_events"): set_tag( SynapseTags.RESULT_PREFIX + "event_ids", - str([ev.event_id for ev in events]), + str([ev.event_id for ev in output_events]), ) set_tag( SynapseTags.RESULT_PREFIX + "event_ids.length", - str(len(events)), + str(len(output_events)), ) - for event in events: + for event in output_events: await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index ccd512be54b9..4e66a79e0d91 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -173,7 +173,11 @@ async def _get_rules_for_event( async def _get_power_levels_and_sender_level( self, event: EventBase, context: EventContext - ) -> Tuple[dict, int]: + ) -> Tuple[dict, Optional[int]]: + # There are no power levels and sender levels possible to get from outlier + if event.internal_metadata.is_outlier(): + return {}, None + event_types = auth_types_for_event(event.room_version, event) prev_state_ids = await context.get_prev_state_ids( StateFilter.from_types(event_types) @@ -258,8 +262,8 @@ async def action_for_event_by_user( should increment the unread count, and insert the results into the event_push_actions_staging table. """ - if event.internal_metadata.is_outlier(): - # This can happen due to out of band memberships + if not event.is_notifiable: + # Push rules for events that aren't notifiable can't be processed by this return count_as_unread = _should_count_as_unread(event, context) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 3c5632cd9153..f8176c5a4253 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -42,18 +42,18 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") -def _room_member_count( - ev: EventBase, condition: Mapping[str, Any], room_member_count: int -) -> bool: +def _room_member_count(condition: Mapping[str, Any], room_member_count: int) -> bool: return _test_ineq_condition(condition, room_member_count) def _sender_notification_permission( - ev: EventBase, condition: Mapping[str, Any], - sender_power_level: int, + sender_power_level: Optional[int], power_levels: Dict[str, Union[int, Dict[str, int]]], ) -> bool: + if sender_power_level is None: + return False + notif_level_key = condition.get("key") if notif_level_key is None: return False @@ -129,7 +129,7 @@ def __init__( self, event: EventBase, room_member_count: int, - sender_power_level: int, + sender_power_level: Optional[int], power_levels: Dict[str, Union[int, Dict[str, int]]], relations: Dict[str, Set[Tuple[str, str]]], relations_match_enabled: bool, @@ -198,10 +198,10 @@ def matches( elif condition["kind"] == "contains_display_name": return self._contains_display_name(display_name) elif condition["kind"] == "room_member_count": - return _room_member_count(self._event, condition, self._room_member_count) + return _room_member_count(condition, self._room_member_count) elif condition["kind"] == "sender_notification_permission": return _sender_notification_permission( - self._event, condition, self._sender_power_level, self._power_levels + condition, self._sender_power_level, self._power_levels ) elif ( condition["kind"] == "org.matrix.msc3772.relation_match" diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index dad3731b9b50..cf7816f15ae7 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -423,16 +423,18 @@ async def enqueue( for d in ret_vals: replaced_events.update(d) - events = [] + persisted_events = [] for event, _ in events_and_contexts: existing_event_id = replaced_events.get(event.event_id) if existing_event_id: - events.append(await self.main_store.get_event(existing_event_id)) + persisted_events.append( + await self.main_store.get_event(existing_event_id) + ) else: - events.append(event) + persisted_events.append(event) return ( - events, + persisted_events, self.main_store.get_room_max_token(), ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..2a9fa3d2d6b1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2180,13 +2180,11 @@ def _set_push_actions_for_event_and_users_txn( appear in events_and_context. """ - # Only non outlier events will have push actions associated with them, + # Only notifiable events will have push actions associated with them, # so let's filter them out. (This makes joining large rooms faster, as # these queries took seconds to process all the state events). - non_outlier_events = [ - event - for event, _ in events_and_contexts - if not event.internal_metadata.is_outlier() + notifiable_events = [ + event for event, _ in events_and_contexts if event.is_notifiable ] sql = """ @@ -2199,7 +2197,7 @@ def _set_push_actions_for_event_and_users_txn( WHERE event_id = ? """ - if non_outlier_events: + if notifiable_events: txn.execute_batch( sql, ( @@ -2209,12 +2207,12 @@ def _set_push_actions_for_event_and_users_txn( event.depth, event.event_id, ) - for event in non_outlier_events + for event in notifiable_events ), ) room_to_event_ids: Dict[str, List[str]] = {} - for e in non_outlier_events: + for e in notifiable_events: room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) for room_id, event_ids in room_to_event_ids.items(): From d3e70fba93829795936b0a420ded3ed722518795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kate=C5=99ina=20Churanov=C3=A1?= Date: Thu, 15 Sep 2022 15:56:21 +0200 Subject: [PATCH 2/4] fix: Address code review - missing exception handling and code structure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kateřina Churanová --- synapse/events/__init__.py | 13 +++++-------- synapse/handlers/federation.py | 15 ++++++++------- synapse/handlers/federation_event.py | 4 ++-- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/storage/databases/main/events.py | 4 +++- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 29cb7c9e51ee..e26f1010da06 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -37,7 +37,7 @@ from typing_extensions import Literal from unpaddedbase64 import encode_base64 -from synapse.api.constants import Membership, RelationTypes +from synapse.api.constants import RelationTypes from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.types import JsonDict, RoomStreamToken from synapse.util.caches import intern_dict @@ -289,6 +289,10 @@ def is_historical(self) -> bool: """ return self._dict.get("historical", False) + def is_notifiable(self) -> bool: + """Whether this event can trigger a push notification""" + return not self.is_outlier() or self.is_out_of_band_membership() + class EventBase(metaclass=abc.ABCMeta): @property @@ -345,13 +349,6 @@ def membership(self) -> Optional[str]: def is_state(self) -> bool: return self.get_state_key() is not None - @property - def is_notifiable(self) -> bool: - return ( - self.membership == Membership.INVITE - or not self.internal_metadata.is_outlier() - ) - def get_state_key(self) -> Optional[str]: """Get the state key of this event, or None if it's not a state event""" return self._dict.get("state_key") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1cf09356a013..e24a6f74a1a1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -149,6 +149,7 @@ def __init__(self, hs: "HomeServer"): self.http_client = hs.get_proxied_blacklisted_http_client() self._replication = hs.get_replication_data_handler() self._federation_event_handler = hs.get_federation_event_handler() + self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs @@ -947,14 +948,14 @@ async def on_invite_request( context = EventContext.for_outlier(self._storage_controllers) - if event.is_notifiable: - await self._federation_event_handler.bulk_push_rule_evaluator.action_for_event_by_user( - event, context + await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context) + try: + await self._federation_event_handler.persist_events_and_notify( + event.room_id, [(event, context)] ) - - await self._federation_event_handler.persist_events_and_notify( - event.room_id, [(event, context)] - ) + except Exception: + await self.store.remove_push_actions_from_staging(event.event_id) + raise return event diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index db04caea0664..a5a14268b987 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -145,7 +145,7 @@ def __init__(self, hs: "HomeServer"): self._event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() - self.bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() + self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._state_resolution_handler = hs.get_state_resolution_handler() # avoid a circular dependency by deferring execution here self._get_room_member_handler = hs.get_room_member_handler @@ -2110,7 +2110,7 @@ async def _run_push_actions_and_persist_event( min_depth, ) else: - await self.bulk_push_rule_evaluator.action_for_event_by_user( + await self._bulk_push_rule_evaluator.action_for_event_by_user( event, context ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7b116763ec91..0f6d12bc2a9d 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -262,7 +262,7 @@ async def action_for_event_by_user( should increment the unread count, and insert the results into the event_push_actions_staging table. """ - if not event.is_notifiable: + if not event.internal_metadata.is_notifiable(): # Push rules for events that aren't notifiable can't be processed by this return diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 2a9fa3d2d6b1..868f300a122c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2184,7 +2184,9 @@ def _set_push_actions_for_event_and_users_txn( # so let's filter them out. (This makes joining large rooms faster, as # these queries took seconds to process all the state events). notifiable_events = [ - event for event, _ in events_and_contexts if event.is_notifiable + event + for event, _ in events_and_contexts + if event.internal_metadata.is_notifiable() ] sql = """ From ec50b5381429e25c51feab15e922d8ce513be737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kate=C5=99ina=20Churanov=C3=A1?= Date: Thu, 22 Sep 2022 15:48:05 +0200 Subject: [PATCH 3/4] fix: Revert unnecessary change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kateřina Churanová --- synapse/handlers/federation_event.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 1ec958907cdf..c9a7a5f335a4 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2191,14 +2191,14 @@ async def persist_events_and_notify( # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. ( - output_events, + events, max_stream_token, ) = await self._storage_controllers.persistence.persist_events( event_and_contexts, backfilled=backfilled ) if self._ephemeral_messages_enabled: - for event in output_events: + for event in events: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) @@ -2206,13 +2206,13 @@ async def persist_events_and_notify( with start_active_span("notify_persisted_events"): set_tag( SynapseTags.RESULT_PREFIX + "event_ids", - str([ev.event_id for ev in output_events]), + str([ev.event_id for ev in events]), ) set_tag( SynapseTags.RESULT_PREFIX + "event_ids.length", - str(len(output_events)), + str(len(events)), ) - for event in output_events: + for event in events: await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream From 65f1162c2e1a85264e474b733b98d0c3375915c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kate=C5=99ina=20Churanov=C3=A1?= Date: Wed, 28 Sep 2022 11:46:33 +0200 Subject: [PATCH 4/4] fix: Revert change on membership property MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kateřina Churanová --- synapse/events/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index e26f1010da06..030c3ca408c0 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -343,8 +343,8 @@ def event_id(self) -> str: raise NotImplementedError() @property - def membership(self) -> Optional[str]: - return self.content.get("membership") + def membership(self) -> str: + return self.content["membership"] def is_state(self) -> bool: return self.get_state_key() is not None