mirror of
https://github.com/element-hq/synapse.git
synced 2025-07-04 00:00:27 -04:00
Compare commits
6 Commits
c570605c7b
...
369d30b2d6
Author | SHA1 | Date | |
---|---|---|---|
|
369d30b2d6 | ||
|
cc8da2c5ed | ||
|
0355e6fd6d | ||
|
1904389b5d | ||
|
8f33017653 | ||
|
b590056ba7 |
1
changelog.d/18625.misc
Normal file
1
changelog.d/18625.misc
Normal file
@ -0,0 +1 @@
|
||||
Log the room ID we're purging state for.
|
@ -1921,6 +1921,13 @@ class RoomShutdownHandler:
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
|
||||
users = await self.store.get_local_users_related_to_room(room_id)
|
||||
|
||||
# When deleting a room, we want to store the local membership state so that we
|
||||
# can still send synthetic leaves down sync after the room has been purged (if indeed it has).
|
||||
# We must do this prior to kicking as otherwise the current_state_events
|
||||
# table will be empty.
|
||||
await self.store.store_deleted_room_members(room_id)
|
||||
|
||||
for user_id, membership in users:
|
||||
# If the user is not in the room (or is banned), nothing to do.
|
||||
if membership not in (Membership.JOIN, Membership.INVITE, Membership.KNOCK):
|
||||
|
@ -27,6 +27,7 @@ from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
Iterable,
|
||||
List,
|
||||
Literal,
|
||||
Mapping,
|
||||
@ -51,8 +52,8 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.api.filtering import FilterCollection
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
|
||||
from synapse.events import EventBase, FrozenEventV3
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.context import current_context
|
||||
@ -235,7 +236,7 @@ class _RoomChanges:
|
||||
invited: List[InvitedSyncResult]
|
||||
knocked: List[KnockedSyncResult]
|
||||
newly_joined_rooms: List[str]
|
||||
newly_left_rooms: List[str]
|
||||
newly_left_rooms: set[str]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@ -1829,7 +1830,7 @@ class SyncHandler:
|
||||
full_state,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"Calculating sync response for %r between %s and %s",
|
||||
sync_config.user,
|
||||
sync_result_builder.since_token,
|
||||
@ -2386,6 +2387,7 @@ class SyncHandler:
|
||||
|
||||
since_token = sync_result_builder.since_token
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.info("Generating _generate_sync_entry_for_rooms for %s %s", user_id, since_token)
|
||||
|
||||
blocks_all_rooms = (
|
||||
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
|
||||
@ -2427,19 +2429,20 @@ class SyncHandler:
|
||||
# no point in going further.
|
||||
if not sync_result_builder.full_state:
|
||||
if since_token and not ephemeral_by_room and not account_data_by_room:
|
||||
have_changed = await self._have_rooms_changed(sync_result_builder)
|
||||
have_changed = await self._have_rooms_changed(sync_result_builder, user_id)
|
||||
log_kv({"rooms_have_changed": have_changed})
|
||||
if not have_changed:
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
)
|
||||
if not tags_by_room:
|
||||
logger.debug("no-oping sync")
|
||||
logger.info("no-oping sync")
|
||||
return set(), set()
|
||||
|
||||
# 3. Work out which rooms need reporting in the sync response.
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
if since_token:
|
||||
logger.info("With since_token %s %s", user_id, since_token)
|
||||
room_changes = await self._get_room_changes_for_incremental_sync(
|
||||
sync_result_builder, ignored_users
|
||||
)
|
||||
@ -2481,10 +2484,10 @@ class SyncHandler:
|
||||
sync_result_builder.invited.extend(invited)
|
||||
sync_result_builder.knocked.extend(knocked)
|
||||
|
||||
return set(newly_joined_rooms), set(newly_left_rooms)
|
||||
return set(newly_joined_rooms), newly_left_rooms
|
||||
|
||||
async def _have_rooms_changed(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
self, sync_result_builder: "SyncResultBuilder", user_id: str
|
||||
) -> bool:
|
||||
"""Returns whether there may be any new events that should be sent down
|
||||
the sync. Returns True if there are.
|
||||
@ -2499,6 +2502,10 @@ class SyncHandler:
|
||||
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
|
||||
return True
|
||||
|
||||
# TODO: Hack to check if we have any deleted rooms
|
||||
if len(await self.store.get_deleted_rooms_for_user(user_id, since_token.room_key.stream)):
|
||||
return True
|
||||
|
||||
stream_id = since_token.room_key.stream
|
||||
for room_id in sync_result_builder.joined_room_ids:
|
||||
if self.store.has_room_changed_since(room_id, stream_id):
|
||||
@ -2536,6 +2543,7 @@ class SyncHandler:
|
||||
now_token = sync_result_builder.now_token
|
||||
sync_config = sync_result_builder.sync_config
|
||||
membership_change_events = sync_result_builder.membership_change_events
|
||||
logger.info("Generating _get_room_changes_for_incremental_sync for %s", user_id)
|
||||
|
||||
assert since_token
|
||||
|
||||
@ -2757,6 +2765,46 @@ class SyncHandler:
|
||||
|
||||
room_entries.append(entry)
|
||||
|
||||
deleted_left_rooms = await self.store.get_deleted_rooms_for_user(user_id, since_token.room_key.stream)
|
||||
|
||||
for room_id, deleted_stream_id in deleted_left_rooms:
|
||||
if room_id in newly_left_rooms:
|
||||
continue
|
||||
logger.info("Generating synthetic leave for %s in %s as room was deleted.", user_id, room_id)
|
||||
# Synthetic leaves for deleted rooms
|
||||
leave_evt = FrozenEventV3({
|
||||
"state_key": user_id,
|
||||
"sender": "@server:patroclus",
|
||||
"room_id": room_id,
|
||||
"type": "m.room.member",
|
||||
"depth": 1,
|
||||
"content": {
|
||||
"membership": "leave",
|
||||
"reason": "Room has been deleted"
|
||||
}
|
||||
},
|
||||
RoomVersions.V10, # TODO: Fetch this!
|
||||
)
|
||||
leave_evt.internal_metadata.outlier = True
|
||||
leave_evt.internal_metadata.out_of_band_membership = True
|
||||
leave_evt.internal_metadata.stream_ordering = deleted_stream_id
|
||||
|
||||
room_entries.append(
|
||||
RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="archived",
|
||||
events=[leave_evt],
|
||||
newly_joined=False,
|
||||
full_state=False,
|
||||
# TODO: THESE ARE ALL LIES, DAMNED LIES
|
||||
since_token=since_token,
|
||||
upto_token=since_token,
|
||||
end_token=since_token,
|
||||
out_of_band=True
|
||||
)
|
||||
)
|
||||
newly_left_rooms.append(room_id)
|
||||
|
||||
return _RoomChanges(
|
||||
room_entries,
|
||||
invited,
|
||||
|
@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util.stringutils import shortstr
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@ -167,6 +168,12 @@ class PurgeEventsStorageController:
|
||||
break
|
||||
|
||||
(room_id, groups_to_sequences) = next_to_delete
|
||||
|
||||
logger.info(
|
||||
"[purge] deleting state groups for room %s: %s",
|
||||
room_id,
|
||||
shortstr(groups_to_sequences.keys(), maxitems=10),
|
||||
)
|
||||
made_progress = await self._delete_state_groups(
|
||||
room_id, groups_to_sequences
|
||||
)
|
||||
|
@ -102,6 +102,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
|
||||
self._our_server_name = hs.config.server.server_name
|
||||
|
||||
if (
|
||||
self.hs.config.worker.run_background_tasks
|
||||
@ -1388,7 +1389,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
rows = cast(
|
||||
List[Tuple[str, str, str]],
|
||||
await self.db_pool.simple_select_many_batch(
|
||||
await self.db_pool.simple_select_onecol(
|
||||
table="room_memberships",
|
||||
column="event_id",
|
||||
iterable=member_event_ids,
|
||||
@ -1845,6 +1846,83 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
|
||||
)
|
||||
|
||||
async def store_deleted_room_members(
|
||||
self,
|
||||
room_id: str,
|
||||
) -> None:
|
||||
"""TODO: WRITE
|
||||
|
||||
Args:
|
||||
room_id: the ID of the room
|
||||
stream_id: stream ID at the point the room was deleted
|
||||
user_ids: all users who were ever present in the room
|
||||
"""
|
||||
|
||||
# Welcome to the pain zone. We need to first extract all the local members
|
||||
sql = """
|
||||
SELECT state_key, membership, event_stream_ordering FROM current_state_events
|
||||
WHERE type = 'm.room.member'
|
||||
AND room_id = ?
|
||||
AND state_key LIKE ?
|
||||
"""
|
||||
|
||||
# TODO: Should we check for any joins, everyone should be banned or left at this point...
|
||||
|
||||
# We do need to be careful to ensure that host doesn't have any wild cards
|
||||
# in it, but we checked above for known ones and we'll check below that
|
||||
# the returned user actually has the correct domain.
|
||||
user_rows = await self.db_pool.execute(
|
||||
"store_deleted_room_members_get_members", sql, room_id, ("%:" + self._our_server_name)
|
||||
)
|
||||
|
||||
logger.info("store_deleted_room_members %s %s %s %s %s", room_id, user_rows, sql, room_id, ("%:" + self._our_server_name))
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"store_deleted_room_members",
|
||||
self._store_deleted_room_members_txn,
|
||||
room_id,
|
||||
user_rows
|
||||
)
|
||||
|
||||
def _store_deleted_room_members_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
users: Iterable[Tuple[str, str, int]],
|
||||
) -> None:
|
||||
# If the user is still currently joined, they are about to get kicked so
|
||||
# use the latest stream position
|
||||
max = self.get_room_max_stream_ordering()
|
||||
return DatabasePool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="deleted_room_members",
|
||||
keys=("room_id", "user_id", "deleted_at_stream_id"),
|
||||
values=[(room_id, user[0], user[2] if user[1] in [Membership.BAN, Membership.LEAVE] else max) for user in users],
|
||||
)
|
||||
|
||||
async def get_deleted_rooms_for_user(
|
||||
self, user_id: str, stream_pos: int
|
||||
) -> list[(str, int)]:
|
||||
"""Checks if the given rooms have partial state.
|
||||
|
||||
Returns true for "partial-state" rooms, which means that the state
|
||||
at events in the room, and `current_state_events`, may not yet be
|
||||
complete.
|
||||
"""
|
||||
|
||||
def _get_deleted_rooms_for_user(txn: LoggingTransaction) -> list[(str, int)]:
|
||||
sql = """
|
||||
SELECT room_id, deleted_at_stream_id FROM deleted_room_members
|
||||
WHERE user_id = ?
|
||||
AND ? < deleted_at_stream_id
|
||||
"""
|
||||
txn.execute(sql, (user_id, stream_pos))
|
||||
return set([(r[0], r[1]) for r in txn])
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_deleted_rooms_for_user",
|
||||
_get_deleted_rooms_for_user
|
||||
)
|
||||
|
||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||
def __init__(
|
||||
|
@ -0,0 +1,19 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
CREATE TABLE deleted_room_members (
|
||||
room_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
deleted_at_stream_id bigint NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX deleted_room_member_idx ON deleted_room_members(room_id, user_id);
|
Loading…
x
Reference in New Issue
Block a user