Compare commits

...

6 Commits

Author SHA1 Message Date
Will Hunt
369d30b2d6
Merge 0355e6fd6d4f46546a07de685d20528ebb26ae6a into cc8da2c5ed0cecc771919d76533704a04de9a41e 2025-07-02 16:06:37 +01:00
Erik Johnston
cc8da2c5ed
Log the room ID we're purging state for (#18625)
So we can see what we're deleting.
2025-07-02 15:02:12 +01:00
Will Hunt
0355e6fd6d Add hacks to sync to send the deleted room down sync. 2025-06-27 17:25:56 +01:00
Will Hunt
1904389b5d Store deleted room members before deleting the room 2025-06-27 17:25:48 +01:00
Will Hunt
8f33017653 Add functions to store and get the deleted room members 2025-06-27 17:25:28 +01:00
Will Hunt
b590056ba7 Add deleted rooms table to schema. 2025-06-27 17:25:15 +01:00
6 changed files with 169 additions and 9 deletions

1
changelog.d/18625.misc Normal file
View File

@ -0,0 +1 @@
Log the room ID we're purging state for.

View File

@ -1921,6 +1921,13 @@ class RoomShutdownHandler:
logger.info("Shutting down room %r", room_id)
users = await self.store.get_local_users_related_to_room(room_id)
# When deleting a room, we want to store the local membership state so that we
# can still send synthetic leaves down sync after the room has been purged (if indeed it has).
# We must do this prior to kicking as otherwise the current_state_events
# table will be empty.
await self.store.store_deleted_room_members(room_id)
for user_id, membership in users:
# If the user is not in the room (or is banned), nothing to do.
if membership not in (Membership.JOIN, Membership.INVITE, Membership.KNOCK):

View File

@ -27,6 +27,7 @@ from typing import (
Any,
Dict,
FrozenSet,
Iterable,
List,
Literal,
Mapping,
@ -51,8 +52,8 @@ from synapse.api.constants import (
)
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.events import EventBase, FrozenEventV3
from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
@ -235,7 +236,7 @@ class _RoomChanges:
invited: List[InvitedSyncResult]
knocked: List[KnockedSyncResult]
newly_joined_rooms: List[str]
newly_left_rooms: List[str]
newly_left_rooms: set[str]
@attr.s(slots=True, frozen=True, auto_attribs=True)
@ -1829,7 +1830,7 @@ class SyncHandler:
full_state,
)
logger.debug(
logger.info(
"Calculating sync response for %r between %s and %s",
sync_config.user,
sync_result_builder.since_token,
@ -2386,6 +2387,7 @@ class SyncHandler:
since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string()
logger.info("Generating _generate_sync_entry_for_rooms for %s %s", user_id, since_token)
blocks_all_rooms = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
@ -2427,19 +2429,20 @@ class SyncHandler:
# no point in going further.
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
have_changed = await self._have_rooms_changed(sync_result_builder, user_id)
log_kv({"rooms_have_changed": have_changed})
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
if not tags_by_room:
logger.debug("no-oping sync")
logger.info("no-oping sync")
return set(), set()
# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
if since_token:
logger.info("With since_token %s %s", user_id, since_token)
room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users
)
@ -2481,10 +2484,10 @@ class SyncHandler:
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)
return set(newly_joined_rooms), set(newly_left_rooms)
return set(newly_joined_rooms), newly_left_rooms
async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
self, sync_result_builder: "SyncResultBuilder", user_id: str
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
@ -2499,6 +2502,10 @@ class SyncHandler:
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True
# TODO: Hack to check if we have any deleted rooms
if len(await self.store.get_deleted_rooms_for_user(user_id, since_token.room_key.stream)):
return True
stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
@ -2536,6 +2543,7 @@ class SyncHandler:
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
membership_change_events = sync_result_builder.membership_change_events
logger.info("Generating _get_room_changes_for_incremental_sync for %s", user_id)
assert since_token
@ -2757,6 +2765,46 @@ class SyncHandler:
room_entries.append(entry)
deleted_left_rooms = await self.store.get_deleted_rooms_for_user(user_id, since_token.room_key.stream)
for room_id, deleted_stream_id in deleted_left_rooms:
if room_id in newly_left_rooms:
continue
logger.info("Generating synthetic leave for %s in %s as room was deleted.", user_id, room_id)
# Synthetic leaves for deleted rooms
leave_evt = FrozenEventV3({
"state_key": user_id,
"sender": "@server:patroclus",
"room_id": room_id,
"type": "m.room.member",
"depth": 1,
"content": {
"membership": "leave",
"reason": "Room has been deleted"
}
},
RoomVersions.V10, # TODO: Fetch this!
)
leave_evt.internal_metadata.outlier = True
leave_evt.internal_metadata.out_of_band_membership = True
leave_evt.internal_metadata.stream_ordering = deleted_stream_id
room_entries.append(
RoomSyncResultBuilder(
room_id=room_id,
rtype="archived",
events=[leave_evt],
newly_joined=False,
full_state=False,
# TODO: THESE ARE ALL LIES, DAMNED LIES
since_token=since_token,
upto_token=since_token,
end_token=since_token,
out_of_band=True
)
)
newly_left_rooms.append(room_id)
return _RoomChanges(
room_entries,
invited,

View File

@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases
from synapse.types.storage import _BackgroundUpdates
from synapse.util.stringutils import shortstr
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -167,6 +168,12 @@ class PurgeEventsStorageController:
break
(room_id, groups_to_sequences) = next_to_delete
logger.info(
"[purge] deleting state groups for room %s: %s",
room_id,
shortstr(groups_to_sequences.keys(), maxitems=10),
)
made_progress = await self._delete_state_groups(
room_id, groups_to_sequences
)

View File

@ -102,6 +102,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
super().__init__(database, db_conn, hs)
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
self._our_server_name = hs.config.server.server_name
if (
self.hs.config.worker.run_background_tasks
@ -1388,7 +1389,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
rows = cast(
List[Tuple[str, str, str]],
await self.db_pool.simple_select_many_batch(
await self.db_pool.simple_select_onecol(
table="room_memberships",
column="event_id",
iterable=member_event_ids,
@ -1845,6 +1846,83 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
)
async def store_deleted_room_members(
self,
room_id: str,
) -> None:
"""TODO: WRITE
Args:
room_id: the ID of the room
stream_id: stream ID at the point the room was deleted
user_ids: all users who were ever present in the room
"""
# Welcome to the pain zone. We need to first extract all the local members
sql = """
SELECT state_key, membership, event_stream_ordering FROM current_state_events
WHERE type = 'm.room.member'
AND room_id = ?
AND state_key LIKE ?
"""
# TODO: Should we check for any joins, everyone should be banned or left at this point...
# We do need to be careful to ensure that host doesn't have any wild cards
# in it, but we checked above for known ones and we'll check below that
# the returned user actually has the correct domain.
user_rows = await self.db_pool.execute(
"store_deleted_room_members_get_members", sql, room_id, ("%:" + self._our_server_name)
)
logger.info("store_deleted_room_members %s %s %s %s %s", room_id, user_rows, sql, room_id, ("%:" + self._our_server_name))
await self.db_pool.runInteraction(
"store_deleted_room_members",
self._store_deleted_room_members_txn,
room_id,
user_rows
)
def _store_deleted_room_members_txn(
self,
txn: LoggingTransaction,
room_id: str,
users: Iterable[Tuple[str, str, int]],
) -> None:
# If the user is still currently joined, they are about to get kicked so
# use the latest stream position
max = self.get_room_max_stream_ordering()
return DatabasePool.simple_insert_many_txn(
txn,
table="deleted_room_members",
keys=("room_id", "user_id", "deleted_at_stream_id"),
values=[(room_id, user[0], user[2] if user[1] in [Membership.BAN, Membership.LEAVE] else max) for user in users],
)
async def get_deleted_rooms_for_user(
self, user_id: str, stream_pos: int
) -> list[(str, int)]:
"""Checks if the given rooms have partial state.
Returns true for "partial-state" rooms, which means that the state
at events in the room, and `current_state_events`, may not yet be
complete.
"""
def _get_deleted_rooms_for_user(txn: LoggingTransaction) -> list[(str, int)]:
sql = """
SELECT room_id, deleted_at_stream_id FROM deleted_room_members
WHERE user_id = ?
AND ? < deleted_at_stream_id
"""
txn.execute(sql, (user_id, stream_pos))
return set([(r[0], r[1]) for r in txn])
return await self.db_pool.runInteraction(
"get_deleted_rooms_for_user",
_get_deleted_rooms_for_user
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(

View File

@ -0,0 +1,19 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE deleted_room_members (
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
deleted_at_stream_id bigint NOT NULL
);
CREATE UNIQUE INDEX deleted_room_member_idx ON deleted_room_members(room_id, user_id);