Fix performance regression related to delayed events processing (#18926)

This commit is contained in:
Andrew Morgan 2025-09-23 09:47:30 +01:00 committed by Andrew Morgan
parent fcffd2e897
commit 0c8594c9a8
10 changed files with 245 additions and 20 deletions

1
changelog.d/18926.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature.

View File

@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
@ -45,6 +45,7 @@ from synapse.types import (
)
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -146,10 +147,37 @@ class DelayedEventsHandler:
)
async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering
logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)
return
# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
@ -167,7 +195,7 @@ class DelayedEventsHandler:
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return
logger.debug(
@ -202,23 +230,81 @@ class DelayedEventsHandler:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)
# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)
# `delta.event_id` and `delta.sender` can be `None` in a few valid
# cases (see the docstring of
# `get_current_state_delta_membership_changes_for_user` for details).
if delta.event_id is None:
logger.debug(
"Not handling delta for deleted state: %r %r",
# TODO: Differentiate between this being caused by a state reset
# which removed a user from a room, or the homeserver
# purposefully having left the room. We can do so by checking
# whether there are any local memberships still left in the
# room. If so, then this is the result of a state reset.
#
# If it is a state reset, we should avoid cancelling new,
# delayed state events due to old state resurfacing. So we
# should skip and log a warning in this case.
#
# If the homeserver has left the room, then we should cancel all
# delayed state events intended for this room, as there is no
# need to try and send a delayed event into a room we've left.
logger.warning(
"Skipping state delta (%r, %r) without corresponding event ID. "
"This can happen if the homeserver has left the room (in which "
"case this can be ignored), or if there has been a state reset "
"which has caused the sender to be kicked out of the room",
delta.event_type,
delta.state_key,
)
continue
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
sender_str = event_id_and_sender_dict.get(
delta.event_id, Sentinel.UNSET_SENTINEL
)
event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
if sender_str is None:
# An event exists, but the `sender` field was "null" and Synapse
# incorrectly accepted the event. This is not expected.
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was None. "
"This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is Sentinel.UNSET_SENTINEL:
# We have an event ID, but the event was not found in the
# datastore. This can happen if a room, or its history, is
# purged. State deltas related to the room are left behind, but
# the event no longer exists.
#
# As we cannot get the sender of this event, we can't calculate
# whether to cancel delayed events related to this one. So we skip.
logger.debug(
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
delta.event_id,
)
continue
try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue
sender = UserID.from_string(event.sender)
next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,

View File

@ -1540,7 +1540,7 @@ class PresenceHandler(BasePresenceHandler):
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return
logger.debug(

View File

@ -13,7 +13,6 @@
#
import enum
import logging
from itertools import chain
from typing import (
@ -75,6 +74,7 @@ from synapse.types.handlers.sliding_sync import (
)
from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -83,12 +83,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]

View File

@ -682,6 +682,8 @@ class StateStorageController:
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008

View File

@ -182,6 +182,21 @@ class DelayedEventsStore(SQLBaseStore):
"restart_delayed_event", restart_delayed_event_txn
)
async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"
txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0
return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)
async def get_all_delayed_events_for_user(
self,
user_localpart: str,

View File

@ -2135,6 +2135,39 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True
async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, Optional[str]]:
"""
Given a sequence of event IDs, return the sender associated with each.
Args:
event_ids: A collection of event IDs as strings.
Returns:
A dict of event ID -> sender of the event.
If a given event ID does not exist in the `events` table, then no entry
for that event ID will be returned.
"""
def _get_senders_for_event_ids(
txn: LoggingTransaction,
) -> Dict[str, Optional[str]]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)
return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(

View File

@ -94,6 +94,8 @@ class StateDeltasStore(SQLBaseStore):
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)

21
synapse/util/sentinel.py Normal file
View File

@ -0,0 +1,21 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import enum
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()

View File

@ -20,7 +20,7 @@
#
import logging
from typing import List, Optional
from typing import Dict, List, Optional
from twisted.internet.testing import MemoryReactor
@ -39,6 +39,77 @@ from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class EventsTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._store = self.hs.get_datastores().main
def test_get_senders_for_event_ids(self) -> None:
"""Tests the `get_senders_for_event_ids` storage function."""
users_and_tokens: Dict[str, str] = {}
for localpart_suffix in range(10):
localpart = f"user_{localpart_suffix}"
user_id = self.register_user(localpart, "rabbit")
token = self.login(localpart, "rabbit")
users_and_tokens[user_id] = token
room_creator_user_id = self.register_user("room_creator", "rabbit")
room_creator_token = self.login("room_creator", "rabbit")
users_and_tokens[room_creator_user_id] = room_creator_token
# Create a room and invite some users.
room_id = self.helper.create_room_as(
room_creator_user_id, tok=room_creator_token
)
event_ids_to_senders: Dict[str, str] = {}
for user_id, token in users_and_tokens.items():
if user_id == room_creator_user_id:
continue
self.helper.invite(
room=room_id,
targ=user_id,
tok=room_creator_token,
)
# Have the user accept the invite and join the room.
self.helper.join(
room=room_id,
user=user_id,
tok=token,
)
# Have the user send an event.
response = self.helper.send_event(
room_id=room_id,
type="m.room.message",
content={
"msgtype": "m.text",
"body": f"hello, I'm {user_id}!",
},
tok=token,
)
# Record the event ID and sender.
event_id = response["event_id"]
event_ids_to_senders[event_id] = user_id
# Check that `get_senders_for_event_ids` returns the correct data.
response = self.get_success(
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
)
self.assert_dict(event_ids_to_senders, response)
class ExtremPruneTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,