Bust _membership_stream_cache cache when current state changes (#17732)

This is particularly a problem in a state reset scenario where the membership
might change without a corresponding event.

This PR is targeting a scenario where a state reset happens which causes
room membership to change. Previously, the cache would just hold onto
stale data and now we properly bust the cache in this scenario.

We have a few tests for these scenarios which you can see are now fixed
because we can remove the `FIXME` where we were previously manually
busting the cache in the test itself.

This is a general Synapse thing so by it's nature it helps out Sliding
Sync.

Fix https://github.com/element-hq/synapse/issues/17368

Prerequisite for https://github.com/element-hq/synapse/issues/17929

---

Match when are busting `_curr_state_delta_stream_cache`
This commit is contained in:
Eric Eastwood 2025-01-08 10:11:09 -06:00 committed by GitHub
parent d0677dca39
commit aab3672037
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 88 additions and 26 deletions

1
changelog.d/17732.bugfix Normal file
View File

@ -0,0 +1 @@
Fix membership caches not updating in state reset scenarios.

View File

@ -86,7 +86,9 @@ class SQLBaseStore(metaclass=ABCMeta):
""" """
def _invalidate_state_caches( def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str] self,
room_id: str,
members_changed: Collection[str],
) -> None: ) -> None:
"""Invalidates caches that are based on the current state, but does """Invalidates caches that are based on the current state, but does
not stream invalidations down replication. not stream invalidations down replication.

View File

@ -219,6 +219,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
room_id = row.keys[0] room_id = row.keys[0]
members_changed = set(row.keys[1:]) members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed) self._invalidate_state_caches(room_id, members_changed)
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
room_id, token
)
for user_id in members_changed:
self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined]
elif row.cache_func == PURGE_HISTORY_CACHE_NAME: elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None: if row.keys is None:
raise Exception( raise Exception(
@ -236,6 +241,35 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
room_id = row.keys[0] room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id) self._invalidate_caches_for_room(room_id)
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
room_id, token
)
# Note: This code is commented out to improve cache performance.
# While uncommenting would provide complete correctness, our
# automatic forgotten room purge logic (see
# `forgotten_room_retention_period`) means this would frequently
# clear the entire cache (effectively) and probably have a noticable
# impact on the cache hit ratio.
#
# Not updating the cache here is safe because:
#
# 1. `_membership_stream_cache` is only used to indicate the
# *absence* of changes, i.e. "nothing has changed between tokens
# X and Y and so return early and don't query the database".
# 2. `_membership_stream_cache` is used when we query data from
# `current_state_delta_stream` and `room_memberships` but since
# nothing new is written to the database for those tables when
# purging/deleting a room (only deleting rows), there is nothing
# changed to care about.
#
# At worst, the cache might indicate a change at token X, at which
# point, we will query the database and discover nothing is there.
#
# Ideally, we would make it so that we could clear the cache on a
# more granular level but that's a bit complex and fiddly to do with
# room membership.
#
# self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
else: else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys) self._attempt_to_invalidate_cache(row.cache_func, row.keys)
@ -275,6 +309,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache( self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None "get_sliding_sync_rooms_for_user", None
) )
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption: elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache( self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,) "get_room_encryption", (data.room_id,)
@ -291,6 +326,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# Similar to the above, but the entire caches are invalidated. This is # Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly. # unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))

View File

@ -1605,7 +1605,13 @@ class PersistEventsStore:
room_id room_id
delta_state: Deltas that are going to be used to update the delta_state: Deltas that are going to be used to update the
`current_state_events` table. Changes to the current state of the room. `current_state_events` table. Changes to the current state of the room.
stream_id: TODO stream_id: This is expected to be the minimum `stream_ordering` for the
batch of events that we are persisting; which means we do not end up in a
situation where workers see events before the `current_state_delta` updates.
FIXME: However, this function also gets called with next upcoming
`stream_ordering` when we re-sync the state of a partial stated room (see
`update_current_state(...)`) which may be "correct" but it would be good to
nail down what exactly is the expected value here.
sliding_sync_table_changes: Changes to the sliding_sync_table_changes: Changes to the
`sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
derived from the given `delta_state` (see derived from the given `delta_state` (see
@ -1908,6 +1914,13 @@ class PersistEventsStore:
stream_id, stream_id,
) )
for user_id in members_to_cache_bust:
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
user_id,
stream_id,
)
# Invalidate the various caches # Invalidate the various caches
self.store._invalidate_state_caches_and_stream( self.store._invalidate_state_caches_and_stream(
txn, room_id, members_to_cache_bust txn, room_id, members_to_cache_bust

View File

@ -314,6 +314,15 @@ class StreamChangeCache:
self._entity_to_key[entity] = stream_pos self._entity_to_key[entity] = stream_pos
self._evict() self._evict()
def all_entities_changed(self, stream_pos: int) -> None:
"""
Mark all entities as changed. This is useful when the cache is invalidated and
there may be some potential change for all of the entities.
"""
self._cache.clear()
self._entity_to_key.clear()
self._earliest_known_stream_pos = stream_pos
def _evict(self) -> None: def _evict(self) -> None:
""" """
Ensure the cache has not exceeded the maximum size. Ensure the cache has not exceeded the maximum size.

View File

@ -1169,12 +1169,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.persistence.persist_event(join_rule_event, join_rule_context) self.persistence.persist_event(join_rule_event, join_rule_context)
) )
# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)
# Ensure that the state reset worked and only user2 is in the room now # Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(room_id1)) users_in_room = self.get_success(self.store.get_users_in_room(room_id1))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True) self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
@ -1322,12 +1316,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.persistence.persist_event(join_rule_event, join_rule_context) self.persistence.persist_event(join_rule_event, join_rule_context)
) )
# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)
# Ensure that the state reset worked and only user2 is in the room now # Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True) self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
@ -1506,12 +1494,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.persistence.persist_event(join_rule_event, join_rule_context) self.persistence.persist_event(join_rule_event, join_rule_context)
) )
# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)
# Ensure that the state reset worked and only user2 is in the room now # Ensure that the state reset worked and only user2 is in the room now
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True) self.assertIncludes(set(users_in_room), {user2_id}, exact=True)

View File

@ -1209,12 +1209,6 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
self.persistence.persist_event(join_rule_event, join_rule_context) self.persistence.persist_event(join_rule_event, join_rule_context)
) )
# FIXME: We're manually busting the cache since
# https://github.com/element-hq/synapse/issues/17368 is not solved yet
self.store._membership_stream_cache.entity_has_changed(
user1_id, join_rule_event_pos.stream
)
after_reset_token = self.event_sources.get_current_token() after_reset_token = self.event_sources.get_current_token()
membership_changes = self.get_success( membership_changes = self.get_success(

View File

@ -255,3 +255,28 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# Unknown entities will return None # Unknown entities will return None
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None) self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None)
def test_all_entities_changed(self) -> None:
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("user@elsewhere.org", 4)
cache.all_entities_changed(5)
# Everything should be marked as changed before the stream position where the
# change occurred.
self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
self.assertTrue(cache.has_entity_changed("user@elsewhere.org", 4))
# Nothing should be marked as changed at/after the stream position where the
# change occurred. In other words, nothing has changed since the stream position
# 5.
self.assertFalse(cache.has_entity_changed("user@foo.com", 5))
self.assertFalse(cache.has_entity_changed("bar@baz.net", 5))
self.assertFalse(cache.has_entity_changed("user@elsewhere.org", 5))