Compare commits

...

6 Commits

Author SHA1 Message Date
Will Hunt
369d30b2d6
Merge 0355e6fd6d4f46546a07de685d20528ebb26ae6a into cc8da2c5ed0cecc771919d76533704a04de9a41e 2025-07-02 16:06:37 +01:00
Erik Johnston
cc8da2c5ed
Log the room ID we're purging state for (#18625)
So we can see what we're deleting.
2025-07-02 15:02:12 +01:00
Will Hunt
0355e6fd6d Add hacks to sync to send the deleted room down sync. 2025-06-27 17:25:56 +01:00
Will Hunt
1904389b5d Store deleted room members before deleting the room 2025-06-27 17:25:48 +01:00
Will Hunt
8f33017653 Add functions to store and get the deleted room members 2025-06-27 17:25:28 +01:00
Will Hunt
b590056ba7 Add deleted rooms table to schema. 2025-06-27 17:25:15 +01:00
6 changed files with 169 additions and 9 deletions

1
changelog.d/18625.misc Normal file
View File

@ -0,0 +1 @@
Log the room ID we're purging state for.

View File

@ -1921,6 +1921,13 @@ class RoomShutdownHandler:
logger.info("Shutting down room %r", room_id) logger.info("Shutting down room %r", room_id)
users = await self.store.get_local_users_related_to_room(room_id) users = await self.store.get_local_users_related_to_room(room_id)
# When deleting a room, we want to store the local membership state so that we
# can still send synthetic leaves down sync after the room has been purged (if indeed it has).
# We must do this prior to kicking as otherwise the current_state_events
# table will be empty.
await self.store.store_deleted_room_members(room_id)
for user_id, membership in users: for user_id, membership in users:
# If the user is not in the room (or is banned), nothing to do. # If the user is not in the room (or is banned), nothing to do.
if membership not in (Membership.JOIN, Membership.INVITE, Membership.KNOCK): if membership not in (Membership.JOIN, Membership.INVITE, Membership.KNOCK):

View File

@ -27,6 +27,7 @@ from typing import (
Any, Any,
Dict, Dict,
FrozenSet, FrozenSet,
Iterable,
List, List,
Literal, Literal,
Mapping, Mapping,
@ -51,8 +52,8 @@ from synapse.api.constants import (
) )
from synapse.api.filtering import FilterCollection from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.events import EventBase from synapse.events import EventBase, FrozenEventV3
from synapse.handlers.relations import BundledAggregations from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger from synapse.logging import issue9533_logger
from synapse.logging.context import current_context from synapse.logging.context import current_context
@ -235,7 +236,7 @@ class _RoomChanges:
invited: List[InvitedSyncResult] invited: List[InvitedSyncResult]
knocked: List[KnockedSyncResult] knocked: List[KnockedSyncResult]
newly_joined_rooms: List[str] newly_joined_rooms: List[str]
newly_left_rooms: List[str] newly_left_rooms: set[str]
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
@ -1829,7 +1830,7 @@ class SyncHandler:
full_state, full_state,
) )
logger.debug( logger.info(
"Calculating sync response for %r between %s and %s", "Calculating sync response for %r between %s and %s",
sync_config.user, sync_config.user,
sync_result_builder.since_token, sync_result_builder.since_token,
@ -2386,6 +2387,7 @@ class SyncHandler:
since_token = sync_result_builder.since_token since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string() user_id = sync_result_builder.sync_config.user.to_string()
logger.info("Generating _generate_sync_entry_for_rooms for %s %s", user_id, since_token)
blocks_all_rooms = ( blocks_all_rooms = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms() sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
@ -2427,19 +2429,20 @@ class SyncHandler:
# no point in going further. # no point in going further.
if not sync_result_builder.full_state: if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room: if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder) have_changed = await self._have_rooms_changed(sync_result_builder, user_id)
log_kv({"rooms_have_changed": have_changed}) log_kv({"rooms_have_changed": have_changed})
if not have_changed: if not have_changed:
tags_by_room = await self.store.get_updated_tags( tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key user_id, since_token.account_data_key
) )
if not tags_by_room: if not tags_by_room:
logger.debug("no-oping sync") logger.info("no-oping sync")
return set(), set() return set(), set()
# 3. Work out which rooms need reporting in the sync response. # 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id) ignored_users = await self.store.ignored_users(user_id)
if since_token: if since_token:
logger.info("With since_token %s %s", user_id, since_token)
room_changes = await self._get_room_changes_for_incremental_sync( room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users sync_result_builder, ignored_users
) )
@ -2481,10 +2484,10 @@ class SyncHandler:
sync_result_builder.invited.extend(invited) sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked) sync_result_builder.knocked.extend(knocked)
return set(newly_joined_rooms), set(newly_left_rooms) return set(newly_joined_rooms), newly_left_rooms
async def _have_rooms_changed( async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder" self, sync_result_builder: "SyncResultBuilder", user_id: str
) -> bool: ) -> bool:
"""Returns whether there may be any new events that should be sent down """Returns whether there may be any new events that should be sent down
the sync. Returns True if there are. the sync. Returns True if there are.
@ -2499,6 +2502,10 @@ class SyncHandler:
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids: if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True return True
# TODO: Hack to check if we have any deleted rooms
if len(await self.store.get_deleted_rooms_for_user(user_id, since_token.room_key.stream)):
return True
stream_id = since_token.room_key.stream stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids: for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id): if self.store.has_room_changed_since(room_id, stream_id):
@ -2536,6 +2543,7 @@ class SyncHandler:
now_token = sync_result_builder.now_token now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config sync_config = sync_result_builder.sync_config
membership_change_events = sync_result_builder.membership_change_events membership_change_events = sync_result_builder.membership_change_events
logger.info("Generating _get_room_changes_for_incremental_sync for %s", user_id)
assert since_token assert since_token
@ -2757,6 +2765,46 @@ class SyncHandler:
room_entries.append(entry) room_entries.append(entry)
deleted_left_rooms = await self.store.get_deleted_rooms_for_user(user_id, since_token.room_key.stream)
for room_id, deleted_stream_id in deleted_left_rooms:
if room_id in newly_left_rooms:
continue
logger.info("Generating synthetic leave for %s in %s as room was deleted.", user_id, room_id)
# Synthetic leaves for deleted rooms
leave_evt = FrozenEventV3({
"state_key": user_id,
"sender": "@server:patroclus",
"room_id": room_id,
"type": "m.room.member",
"depth": 1,
"content": {
"membership": "leave",
"reason": "Room has been deleted"
}
},
RoomVersions.V10, # TODO: Fetch this!
)
leave_evt.internal_metadata.outlier = True
leave_evt.internal_metadata.out_of_band_membership = True
leave_evt.internal_metadata.stream_ordering = deleted_stream_id
room_entries.append(
RoomSyncResultBuilder(
room_id=room_id,
rtype="archived",
events=[leave_evt],
newly_joined=False,
full_state=False,
# TODO: THESE ARE ALL LIES, DAMNED LIES
since_token=since_token,
upto_token=since_token,
end_token=since_token,
out_of_band=True
)
)
newly_left_rooms.append(room_id)
return _RoomChanges( return _RoomChanges(
room_entries, room_entries,
invited, invited,

View File

@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
from synapse.storage.database import LoggingTransaction from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases from synapse.storage.databases import Databases
from synapse.types.storage import _BackgroundUpdates from synapse.types.storage import _BackgroundUpdates
from synapse.util.stringutils import shortstr
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@ -167,6 +168,12 @@ class PurgeEventsStorageController:
break break
(room_id, groups_to_sequences) = next_to_delete (room_id, groups_to_sequences) = next_to_delete
logger.info(
"[purge] deleting state groups for room %s: %s",
room_id,
shortstr(groups_to_sequences.keys(), maxitems=10),
)
made_progress = await self._delete_state_groups( made_progress = await self._delete_state_groups(
room_id, groups_to_sequences room_id, groups_to_sequences
) )

View File

@ -102,6 +102,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
self._our_server_name = hs.config.server.server_name
if ( if (
self.hs.config.worker.run_background_tasks self.hs.config.worker.run_background_tasks
@ -1388,7 +1389,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
rows = cast( rows = cast(
List[Tuple[str, str, str]], List[Tuple[str, str, str]],
await self.db_pool.simple_select_many_batch( await self.db_pool.simple_select_onecol(
table="room_memberships", table="room_memberships",
column="event_id", column="event_id",
iterable=member_event_ids, iterable=member_event_ids,
@ -1845,6 +1846,83 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id "_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
) )
async def store_deleted_room_members(
self,
room_id: str,
) -> None:
"""TODO: WRITE
Args:
room_id: the ID of the room
stream_id: stream ID at the point the room was deleted
user_ids: all users who were ever present in the room
"""
# Welcome to the pain zone. We need to first extract all the local members
sql = """
SELECT state_key, membership, event_stream_ordering FROM current_state_events
WHERE type = 'm.room.member'
AND room_id = ?
AND state_key LIKE ?
"""
# TODO: Should we check for any joins, everyone should be banned or left at this point...
# We do need to be careful to ensure that host doesn't have any wild cards
# in it, but we checked above for known ones and we'll check below that
# the returned user actually has the correct domain.
user_rows = await self.db_pool.execute(
"store_deleted_room_members_get_members", sql, room_id, ("%:" + self._our_server_name)
)
logger.info("store_deleted_room_members %s %s %s %s %s", room_id, user_rows, sql, room_id, ("%:" + self._our_server_name))
await self.db_pool.runInteraction(
"store_deleted_room_members",
self._store_deleted_room_members_txn,
room_id,
user_rows
)
def _store_deleted_room_members_txn(
self,
txn: LoggingTransaction,
room_id: str,
users: Iterable[Tuple[str, str, int]],
) -> None:
# If the user is still currently joined, they are about to get kicked so
# use the latest stream position
max = self.get_room_max_stream_ordering()
return DatabasePool.simple_insert_many_txn(
txn,
table="deleted_room_members",
keys=("room_id", "user_id", "deleted_at_stream_id"),
values=[(room_id, user[0], user[2] if user[1] in [Membership.BAN, Membership.LEAVE] else max) for user in users],
)
async def get_deleted_rooms_for_user(
self, user_id: str, stream_pos: int
) -> list[(str, int)]:
"""Checks if the given rooms have partial state.
Returns true for "partial-state" rooms, which means that the state
at events in the room, and `current_state_events`, may not yet be
complete.
"""
def _get_deleted_rooms_for_user(txn: LoggingTransaction) -> list[(str, int)]:
sql = """
SELECT room_id, deleted_at_stream_id FROM deleted_room_members
WHERE user_id = ?
AND ? < deleted_at_stream_id
"""
txn.execute(sql, (user_id, stream_pos))
return set([(r[0], r[1]) for r in txn])
return await self.db_pool.runInteraction(
"get_deleted_rooms_for_user",
_get_deleted_rooms_for_user
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore): class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__( def __init__(

View File

@ -0,0 +1,19 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE deleted_room_members (
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
deleted_at_stream_id bigint NOT NULL
);
CREATE UNIQUE INDEX deleted_room_member_idx ON deleted_room_members(room_id, user_id);