From 0c8594c9a80d16f7e769e4b70f14d10047de7fce Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 23 Sep 2025 09:47:30 +0100 Subject: [PATCH] Fix performance regression related to delayed events processing (#18926) --- changelog.d/18926.bugfix | 1 + synapse/handlers/delayed_events.py | 108 ++++++++++++++++-- synapse/handlers/presence.py | 2 +- synapse/handlers/sliding_sync/room_lists.py | 8 +- synapse/storage/controllers/state.py | 2 + .../storage/databases/main/delayed_events.py | 15 +++ .../storage/databases/main/events_worker.py | 33 ++++++ .../storage/databases/main/state_deltas.py | 2 + synapse/util/sentinel.py | 21 ++++ tests/storage/test_events.py | 73 +++++++++++- 10 files changed, 245 insertions(+), 20 deletions(-) create mode 100644 changelog.d/18926.bugfix create mode 100644 synapse/util/sentinel.py diff --git a/changelog.d/18926.bugfix b/changelog.d/18926.bugfix new file mode 100644 index 0000000000..c450313764 --- /dev/null +++ b/changelog.d/18926.bugfix @@ -0,0 +1 @@ +Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature. \ No newline at end of file diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index a6749801a5..17e134ca32 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple from twisted.internet.interfaces import IDelayedCall from synapse.api.constants import EventTypes -from synapse.api.errors import ShadowBanError +from synapse.api.errors import ShadowBanError, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.logging.opentracing import set_tag @@ -45,6 +45,7 @@ from synapse.types import ( ) from synapse.util.events import generate_fake_event_id from synapse.util.metrics import Measure +from synapse.util.sentinel import Sentinel if TYPE_CHECKING: from synapse.server import HomeServer @@ -146,10 +147,37 @@ class DelayedEventsHandler: ) async def _unsafe_process_new_event(self) -> None: + # We purposefully fetch the current max room stream ordering before + # doing anything else, as it could increment duing processing of state + # deltas. We want to avoid updating `delayed_events_stream_pos` past + # the stream ordering of the state deltas we've processed. Otherwise + # we'll leave gaps in our processing. + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + + # Check that there are actually any delayed events to process. If not, bail early. + delayed_events_count = await self._store.get_count_of_delayed_events() + if delayed_events_count == 0: + # There are no delayed events to process. Update the + # `delayed_events_stream_pos` to the latest `events` stream pos and + # exit early. + self._event_pos = room_max_stream_ordering + + logger.debug( + "No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)", + room_max_stream_ordering, + ) + + await self._store.update_delayed_events_stream_pos(room_max_stream_ordering) + + event_processing_positions.labels( + name="delayed_events", **{SERVER_NAME_LABEL: self.server_name} + ).set(room_max_stream_ordering) + + return + # If self._event_pos is None then means we haven't fetched it from the DB yet if self._event_pos is None: self._event_pos = await self._store.get_delayed_events_stream_pos() - room_max_stream_ordering = self._store.get_room_max_stream_ordering() if self._event_pos > room_max_stream_ordering: # apparently, we've processed more events than exist in the database! # this can happen if events are removed with history purge or similar. @@ -167,7 +195,7 @@ class DelayedEventsHandler: self._clock, name="delayed_events_delta", server_name=self.server_name ): room_max_stream_ordering = self._store.get_room_max_stream_ordering() - if self._event_pos == room_max_stream_ordering: + if self._event_pos >= room_max_stream_ordering: return logger.debug( @@ -202,23 +230,81 @@ class DelayedEventsHandler: Process current state deltas to cancel other users' pending delayed events that target the same state. """ + # Get the senders of each delta's state event (as sender information is + # not currently stored in the `current_state_deltas` table). + event_id_and_sender_dict = await self._store.get_senders_for_event_ids( + [delta.event_id for delta in deltas if delta.event_id is not None] + ) + + # Note: No need to batch as `get_current_state_deltas` will only ever + # return 100 rows at a time. for delta in deltas: + logger.debug( + "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id + ) + + # `delta.event_id` and `delta.sender` can be `None` in a few valid + # cases (see the docstring of + # `get_current_state_delta_membership_changes_for_user` for details). if delta.event_id is None: - logger.debug( - "Not handling delta for deleted state: %r %r", + # TODO: Differentiate between this being caused by a state reset + # which removed a user from a room, or the homeserver + # purposefully having left the room. We can do so by checking + # whether there are any local memberships still left in the + # room. If so, then this is the result of a state reset. + # + # If it is a state reset, we should avoid cancelling new, + # delayed state events due to old state resurfacing. So we + # should skip and log a warning in this case. + # + # If the homeserver has left the room, then we should cancel all + # delayed state events intended for this room, as there is no + # need to try and send a delayed event into a room we've left. + logger.warning( + "Skipping state delta (%r, %r) without corresponding event ID. " + "This can happen if the homeserver has left the room (in which " + "case this can be ignored), or if there has been a state reset " + "which has caused the sender to be kicked out of the room", delta.event_type, delta.state_key, ) continue - logger.debug( - "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id + sender_str = event_id_and_sender_dict.get( + delta.event_id, Sentinel.UNSET_SENTINEL ) - - event = await self._store.get_event(delta.event_id, allow_none=True) - if not event: + if sender_str is None: + # An event exists, but the `sender` field was "null" and Synapse + # incorrectly accepted the event. This is not expected. + logger.error( + "Skipping state delta with event ID '%s' as 'sender' was None. " + "This is unexpected - please report it as a bug!", + delta.event_id, + ) + continue + if sender_str is Sentinel.UNSET_SENTINEL: + # We have an event ID, but the event was not found in the + # datastore. This can happen if a room, or its history, is + # purged. State deltas related to the room are left behind, but + # the event no longer exists. + # + # As we cannot get the sender of this event, we can't calculate + # whether to cancel delayed events related to this one. So we skip. + logger.debug( + "Skipping state delta with event ID '%s' - the room, or its history, may have been purged", + delta.event_id, + ) + continue + + try: + sender = UserID.from_string(sender_str) + except SynapseError as e: + logger.error( + "Skipping state delta with Matrix User ID '%s' that failed to parse: %s", + sender_str, + e, + ) continue - sender = UserID.from_string(event.sender) next_send_ts = await self._store.cancel_delayed_state_events( room_id=delta.room_id, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b253117498..2ec8548473 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1540,7 +1540,7 @@ class PresenceHandler(BasePresenceHandler): self.clock, name="presence_delta", server_name=self.server_name ): room_max_stream_ordering = self.store.get_room_max_stream_ordering() - if self._event_pos == room_max_stream_ordering: + if self._event_pos >= room_max_stream_ordering: return logger.debug( diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index e196199f8a..19116590f7 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -13,7 +13,6 @@ # -import enum import logging from itertools import chain from typing import ( @@ -75,6 +74,7 @@ from synapse.types.handlers.sliding_sync import ( ) from synapse.types.state import StateFilter from synapse.util import MutableOverlayMapping +from synapse.util.sentinel import Sentinel if TYPE_CHECKING: from synapse.server import HomeServer @@ -83,12 +83,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class Sentinel(enum.Enum): - # defining a sentinel in this way allows mypy to correctly handle the - # type of a dictionary lookup and subsequent type narrowing. - UNSET_SENTINEL = object() - - # Helper definition for the types that we might return. We do this to avoid # copying data between types (which can be expensive for many rooms). RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync] diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 8997f4526f..ad90a1be13 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -682,6 +682,8 @@ class StateStorageController: - the stream id which these results go up to - list of current_state_delta_stream rows. If it is empty, we are up to date. + + A maximum of 100 rows will be returned. """ # FIXME(faster_joins): what do we do here? # https://github.com/matrix-org/synapse/issues/13008 diff --git a/synapse/storage/databases/main/delayed_events.py b/synapse/storage/databases/main/delayed_events.py index c88682d55c..0809ffc986 100644 --- a/synapse/storage/databases/main/delayed_events.py +++ b/synapse/storage/databases/main/delayed_events.py @@ -182,6 +182,21 @@ class DelayedEventsStore(SQLBaseStore): "restart_delayed_event", restart_delayed_event_txn ) + async def get_count_of_delayed_events(self) -> int: + """Returns the number of pending delayed events in the DB.""" + + def _get_count_of_delayed_events(txn: LoggingTransaction) -> int: + sql = "SELECT count(*) FROM delayed_events" + + txn.execute(sql) + resp = txn.fetchone() + return resp[0] if resp is not None else 0 + + return await self.db_pool.runInteraction( + "get_count_of_delayed_events", + _get_count_of_delayed_events, + ) + async def get_all_delayed_events_for_user( self, user_localpart: str, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cc031d8996..31e2312211 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2135,6 +2135,39 @@ class EventsWorkerStore(SQLBaseStore): return rows, to_token, True + async def get_senders_for_event_ids( + self, event_ids: Collection[str] + ) -> Dict[str, Optional[str]]: + """ + Given a sequence of event IDs, return the sender associated with each. + + Args: + event_ids: A collection of event IDs as strings. + + Returns: + A dict of event ID -> sender of the event. + + If a given event ID does not exist in the `events` table, then no entry + for that event ID will be returned. + """ + + def _get_senders_for_event_ids( + txn: LoggingTransaction, + ) -> Dict[str, Optional[str]]: + rows = self.db_pool.simple_select_many_txn( + txn=txn, + table="events", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=["event_id", "sender"], + ) + return dict(rows) + + return await self.db_pool.runInteraction( + "get_senders_for_event_ids", _get_senders_for_event_ids + ) + @cached(max_entries=5000) async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]: res = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 00f87cc3a1..303b232d7b 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -94,6 +94,8 @@ class StateDeltasStore(SQLBaseStore): - the stream id which these results go up to - list of current_state_delta_stream rows. If it is empty, we are up to date. + + A maximum of 100 rows will be returned. """ prev_stream_id = int(prev_stream_id) diff --git a/synapse/util/sentinel.py b/synapse/util/sentinel.py new file mode 100644 index 0000000000..c8434fc97a --- /dev/null +++ b/synapse/util/sentinel.py @@ -0,0 +1,21 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# + +import enum + + +class Sentinel(enum.Enum): + # defining a sentinel in this way allows mypy to correctly handle the + # type of a dictionary lookup and subsequent type narrowing. + UNSET_SENTINEL = object() diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 6d2e4e4bbe..39ff49744f 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -20,7 +20,7 @@ # import logging -from typing import List, Optional +from typing import Dict, List, Optional from twisted.internet.testing import MemoryReactor @@ -39,6 +39,77 @@ from tests.unittest import HomeserverTestCase logger = logging.getLogger(__name__) +class EventsTestCase(HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self._store = self.hs.get_datastores().main + + def test_get_senders_for_event_ids(self) -> None: + """Tests the `get_senders_for_event_ids` storage function.""" + + users_and_tokens: Dict[str, str] = {} + for localpart_suffix in range(10): + localpart = f"user_{localpart_suffix}" + user_id = self.register_user(localpart, "rabbit") + token = self.login(localpart, "rabbit") + + users_and_tokens[user_id] = token + + room_creator_user_id = self.register_user("room_creator", "rabbit") + room_creator_token = self.login("room_creator", "rabbit") + users_and_tokens[room_creator_user_id] = room_creator_token + + # Create a room and invite some users. + room_id = self.helper.create_room_as( + room_creator_user_id, tok=room_creator_token + ) + event_ids_to_senders: Dict[str, str] = {} + for user_id, token in users_and_tokens.items(): + if user_id == room_creator_user_id: + continue + + self.helper.invite( + room=room_id, + targ=user_id, + tok=room_creator_token, + ) + + # Have the user accept the invite and join the room. + self.helper.join( + room=room_id, + user=user_id, + tok=token, + ) + + # Have the user send an event. + response = self.helper.send_event( + room_id=room_id, + type="m.room.message", + content={ + "msgtype": "m.text", + "body": f"hello, I'm {user_id}!", + }, + tok=token, + ) + + # Record the event ID and sender. + event_id = response["event_id"] + event_ids_to_senders[event_id] = user_id + + # Check that `get_senders_for_event_ids` returns the correct data. + response = self.get_success( + self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys())) + ) + self.assert_dict(event_ids_to_senders, response) + + class ExtremPruneTestCase(HomeserverTestCase): servlets = [ admin.register_servlets,