mirror of
https://github.com/element-hq/synapse.git
synced 2025-08-18 00:00:46 -04:00
Compare commits
30 Commits
212f6646a6
...
7786060e4a
Author | SHA1 | Date | |
---|---|---|---|
|
7786060e4a | ||
|
effebb3e88 | ||
|
cdd333b4f1 | ||
|
3958fdab77 | ||
|
d7b8d87dad | ||
|
dd5f3eb17d | ||
|
d9003ae507 | ||
|
5831342024 | ||
|
02efa51f0f | ||
|
122d190e40 | ||
|
6a0d2dc6fc | ||
|
233e25e193 | ||
|
1e4d9df3cd | ||
|
7bd1575ddf | ||
|
a2dc84fc90 | ||
|
e6d3d808aa | ||
|
cf474a094f | ||
|
5b2b3120c2 | ||
|
bec0313e1b | ||
|
bd6b57653f | ||
|
169c9f85a8 | ||
|
1b7fa7b04a | ||
|
a6b7aed06a | ||
|
52af16c561 | ||
|
38f03a09ff | ||
|
c856ae4724 | ||
|
fe07995e69 | ||
|
52a649580f | ||
|
28a948f04f | ||
|
7cb3f8a979 |
1
changelog.d/17058.doc
Normal file
1
changelog.d/17058.doc
Normal file
@ -0,0 +1 @@
|
||||
Document [`/v1/make_knock`](https://spec.matrix.org/v1.10/server-server-api/#get_matrixfederationv1make_knockroomiduserid) and [`/v1/send_knock/](https://spec.matrix.org/v1.10/server-server-api/#put_matrixfederationv1send_knockroomideventid) federation endpoints as worker-compatible.
|
1
changelog.d/17195.misc
Normal file
1
changelog.d/17195.misc
Normal file
@ -0,0 +1 @@
|
||||
Route `/make_knock` and `/send_knock` federation APIs to the federation reader worker in Complement test runs.
|
1
changelog.d/17201.misc
Normal file
1
changelog.d/17201.misc
Normal file
@ -0,0 +1 @@
|
||||
Organize the sync cache key parameter outside of the sync config (separate concerns).
|
1
changelog.d/17202.misc
Normal file
1
changelog.d/17202.misc
Normal file
@ -0,0 +1 @@
|
||||
Refactor `SyncResultBuilder` assembly to its own function.
|
1
changelog.d/17203.misc
Normal file
1
changelog.d/17203.misc
Normal file
@ -0,0 +1 @@
|
||||
Rename to be obvious: `joined_rooms` -> `joined_room_ids`.
|
1
changelog.d/17208.misc
Normal file
1
changelog.d/17208.misc
Normal file
@ -0,0 +1 @@
|
||||
Rename to be obvious: `joined_rooms` -> `joined_room_ids`.
|
1
changelog.d/17210.misc
Normal file
1
changelog.d/17210.misc
Normal file
@ -0,0 +1 @@
|
||||
Add a short pause when rate-limiting a request.
|
1
changelog.d/17215.bugfix
Normal file
1
changelog.d/17215.bugfix
Normal file
@ -0,0 +1 @@
|
||||
Fix bug where duplicate events could be sent down sync when using workers that are overloaded.
|
1
changelog.d/17216.misc
Normal file
1
changelog.d/17216.misc
Normal file
@ -0,0 +1 @@
|
||||
Improve performance of calculating device lists changes in `/sync`.
|
1
changelog.d/17219.feature
Normal file
1
changelog.d/17219.feature
Normal file
@ -0,0 +1 @@
|
||||
Add logging to tasks managed by the task scheduler, showing CPU and database usage.
|
@ -211,6 +211,8 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
"^/_matrix/federation/(v1|v2)/make_leave/",
|
||||
"^/_matrix/federation/(v1|v2)/send_join/",
|
||||
"^/_matrix/federation/(v1|v2)/send_leave/",
|
||||
"^/_matrix/federation/v1/make_knock/",
|
||||
"^/_matrix/federation/v1/send_knock/",
|
||||
"^/_matrix/federation/(v1|v2)/invite/",
|
||||
"^/_matrix/federation/(v1|v2)/query_auth/",
|
||||
"^/_matrix/federation/(v1|v2)/event_auth/",
|
||||
|
@ -211,6 +211,8 @@ information.
|
||||
^/_matrix/federation/v1/make_leave/
|
||||
^/_matrix/federation/(v1|v2)/send_join/
|
||||
^/_matrix/federation/(v1|v2)/send_leave/
|
||||
^/_matrix/federation/v1/make_knock/
|
||||
^/_matrix/federation/v1/send_knock/
|
||||
^/_matrix/federation/(v1|v2)/invite/
|
||||
^/_matrix/federation/v1/event_auth/
|
||||
^/_matrix/federation/v1/timestamp_to_event/
|
||||
|
@ -200,10 +200,8 @@ netaddr = ">=0.7.18"
|
||||
# add a lower bound to the Jinja2 dependency.
|
||||
Jinja2 = ">=3.0"
|
||||
bleach = ">=1.4.3"
|
||||
# We use `ParamSpec` and `Concatenate`, which were added in `typing-extensions` 3.10.0.0.
|
||||
# Additionally we need https://github.com/python/typing/pull/817 to allow types to be
|
||||
# generic over ParamSpecs.
|
||||
typing-extensions = ">=3.10.0.1"
|
||||
# We use `Self`, which were added in `typing-extensions` 4.0.
|
||||
typing-extensions = ">=4.0"
|
||||
# We enforce that we have a `cryptography` version that bundles an `openssl`
|
||||
# with the latest security patches.
|
||||
cryptography = ">=3.4.7"
|
||||
|
@ -316,6 +316,10 @@ class Ratelimiter:
|
||||
)
|
||||
|
||||
if not allowed:
|
||||
# We pause for a bit here to stop clients from "tight-looping" on
|
||||
# retrying their request.
|
||||
await self.clock.sleep(0.5)
|
||||
|
||||
raise LimitExceededError(
|
||||
limiter_name=self._limiter_name,
|
||||
retry_after_ms=int(1000 * (time_allowed - time_now_s)),
|
||||
|
@ -159,20 +159,32 @@ class DeviceWorkerHandler:
|
||||
|
||||
@cancellable
|
||||
async def get_device_changes_in_shared_rooms(
|
||||
self, user_id: str, room_ids: StrCollection, from_token: StreamToken
|
||||
self,
|
||||
user_id: str,
|
||||
room_ids: StrCollection,
|
||||
from_token: StreamToken,
|
||||
now_token: Optional[StreamToken] = None,
|
||||
) -> Set[str]:
|
||||
"""Get the set of users whose devices have changed who share a room with
|
||||
the given user.
|
||||
"""
|
||||
now_device_lists_key = self.store.get_device_stream_token()
|
||||
if now_token:
|
||||
now_device_lists_key = now_token.device_list_key
|
||||
|
||||
changed_users = await self.store.get_device_list_changes_in_rooms(
|
||||
room_ids, from_token.device_list_key
|
||||
room_ids,
|
||||
from_token.device_list_key,
|
||||
now_device_lists_key,
|
||||
)
|
||||
|
||||
if changed_users is not None:
|
||||
# We also check if the given user has changed their device. If
|
||||
# they're in no rooms then the above query won't include them.
|
||||
changed = await self.store.get_users_whose_devices_changed(
|
||||
from_token.device_list_key, [user_id]
|
||||
from_token.device_list_key,
|
||||
[user_id],
|
||||
to_key=now_device_lists_key,
|
||||
)
|
||||
changed_users.update(changed)
|
||||
return changed_users
|
||||
@ -190,7 +202,9 @@ class DeviceWorkerHandler:
|
||||
tracked_users.add(user_id)
|
||||
|
||||
changed = await self.store.get_users_whose_devices_changed(
|
||||
from_token.device_list_key, tracked_users
|
||||
from_token.device_list_key,
|
||||
tracked_users,
|
||||
to_key=now_device_lists_key,
|
||||
)
|
||||
|
||||
return changed
|
||||
|
@ -135,7 +135,6 @@ class SyncConfig:
|
||||
user: UserID
|
||||
filter_collection: FilterCollection
|
||||
is_guest: bool
|
||||
request_key: SyncRequestKey
|
||||
device_id: Optional[str]
|
||||
|
||||
|
||||
@ -280,6 +279,23 @@ class SyncResult:
|
||||
or self.device_lists
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def empty(next_batch: StreamToken) -> "SyncResult":
|
||||
"Return a new empty result"
|
||||
return SyncResult(
|
||||
next_batch=next_batch,
|
||||
presence=[],
|
||||
account_data=[],
|
||||
joined=[],
|
||||
invited=[],
|
||||
knocked=[],
|
||||
archived=[],
|
||||
to_device=[],
|
||||
device_lists=DeviceListUpdates(),
|
||||
device_one_time_keys_count={},
|
||||
device_unused_fallback_key_types=[],
|
||||
)
|
||||
|
||||
|
||||
class SyncHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@ -328,6 +344,7 @@ class SyncHandler:
|
||||
requester: Requester,
|
||||
sync_config: SyncConfig,
|
||||
sync_version: SyncVersion,
|
||||
request_key: SyncRequestKey,
|
||||
since_token: Optional[StreamToken] = None,
|
||||
timeout: int = 0,
|
||||
full_state: bool = False,
|
||||
@ -340,10 +357,10 @@ class SyncHandler:
|
||||
requester: The user requesting the sync response.
|
||||
sync_config: Config/info necessary to process the sync request.
|
||||
sync_version: Determines what kind of sync response to generate.
|
||||
request_key: The key to use for caching the response.
|
||||
since_token: The point in the stream to sync from.
|
||||
timeout: How long to wait for new data to arrive before giving up.
|
||||
full_state: Whether to return the full state for each room.
|
||||
|
||||
Returns:
|
||||
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
|
||||
"""
|
||||
@ -354,7 +371,7 @@ class SyncHandler:
|
||||
await self.auth_blocking.check_auth_blocking(requester=requester)
|
||||
|
||||
res = await self.response_cache.wrap(
|
||||
sync_config.request_key,
|
||||
request_key,
|
||||
self._wait_for_sync_for_user,
|
||||
sync_config,
|
||||
sync_version,
|
||||
@ -401,6 +418,24 @@ class SyncHandler:
|
||||
if context:
|
||||
context.tag = sync_label
|
||||
|
||||
if since_token is not None:
|
||||
# We need to make sure this worker has caught up with the token. If
|
||||
# this returns false it means we timed out waiting, and we should
|
||||
# just return an empty response.
|
||||
start = self.clock.time_msec()
|
||||
if not await self.notifier.wait_for_stream_token(since_token):
|
||||
logger.warning(
|
||||
"Timed out waiting for worker to catch up. Returning empty response"
|
||||
)
|
||||
return SyncResult.empty(since_token)
|
||||
|
||||
# If we've spent significant time waiting to catch up, take it off
|
||||
# the timeout.
|
||||
now = self.clock.time_msec()
|
||||
if now - start > 1_000:
|
||||
timeout -= now - start
|
||||
timeout = max(timeout, 0)
|
||||
|
||||
# if we have a since token, delete any to-device messages before that token
|
||||
# (since we now know that the device has received them)
|
||||
if since_token is not None:
|
||||
@ -1569,128 +1604,17 @@ class SyncHandler:
|
||||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||
raise NotImplementedError()
|
||||
|
||||
# Note: we get the users room list *before* we get the current token, this
|
||||
# avoids checking back in history if rooms are joined after the token is fetched.
|
||||
token_before_rooms = self.event_sources.get_current_token()
|
||||
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
||||
|
||||
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
||||
# this is due to some of the underlying streams not supporting the ability
|
||||
# to query up to a given point.
|
||||
# Always use the `now_token` in `SyncResultBuilder`
|
||||
now_token = self.event_sources.get_current_token()
|
||||
log_kv({"now_token": now_token})
|
||||
|
||||
# Since we fetched the users room list before the token, there's a small window
|
||||
# during which membership events may have been persisted, so we fetch these now
|
||||
# and modify the joined room list for any changes between the get_rooms_for_user
|
||||
# call and the get_current_token call.
|
||||
membership_change_events = []
|
||||
if since_token:
|
||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||
user_id,
|
||||
since_token.room_key,
|
||||
now_token.room_key,
|
||||
self.rooms_to_exclude_globally,
|
||||
)
|
||||
|
||||
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
||||
for event in membership_change_events:
|
||||
mem_last_change_by_room_id[event.room_id] = event
|
||||
|
||||
# For the latest membership event in each room found, add/remove the room ID
|
||||
# from the joined room list accordingly. In this case we only care if the
|
||||
# latest change is JOIN.
|
||||
|
||||
for room_id, event in mem_last_change_by_room_id.items():
|
||||
assert event.internal_metadata.stream_ordering
|
||||
if (
|
||||
event.internal_metadata.stream_ordering
|
||||
< token_before_rooms.room_key.stream
|
||||
):
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
"User membership change between getting rooms and current token: %s %s %s",
|
||||
user_id,
|
||||
event.membership,
|
||||
room_id,
|
||||
)
|
||||
# User joined a room - we have to then check the room state to ensure we
|
||||
# respect any bans if there's a race between the join and ban events.
|
||||
if event.membership == Membership.JOIN:
|
||||
user_ids_in_room = await self.store.get_users_in_room(room_id)
|
||||
if user_id in user_ids_in_room:
|
||||
mutable_joined_room_ids.add(room_id)
|
||||
# The user left the room, or left and was re-invited but not joined yet
|
||||
else:
|
||||
mutable_joined_room_ids.discard(room_id)
|
||||
|
||||
# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
|
||||
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
|
||||
if not sync_config.filter_collection.lazy_load_members():
|
||||
# Non-lazy syncs should never include partially stated rooms.
|
||||
# Exclude all partially stated rooms from this sync.
|
||||
results = await self.store.is_partial_state_room_batched(
|
||||
mutable_joined_room_ids
|
||||
)
|
||||
mutable_rooms_to_exclude.update(
|
||||
room_id
|
||||
for room_id, is_partial_state in results.items()
|
||||
if is_partial_state
|
||||
)
|
||||
membership_change_events = [
|
||||
event
|
||||
for event in membership_change_events
|
||||
if not results.get(event.room_id, False)
|
||||
]
|
||||
|
||||
# Incremental eager syncs should additionally include rooms that
|
||||
# - we are joined to
|
||||
# - are full-stated
|
||||
# - became fully-stated at some point during the sync period
|
||||
# (These rooms will have been omitted during a previous eager sync.)
|
||||
forced_newly_joined_room_ids: Set[str] = set()
|
||||
if since_token and not sync_config.filter_collection.lazy_load_members():
|
||||
un_partial_stated_rooms = (
|
||||
await self.store.get_un_partial_stated_rooms_between(
|
||||
since_token.un_partial_stated_rooms_key,
|
||||
now_token.un_partial_stated_rooms_key,
|
||||
mutable_joined_room_ids,
|
||||
)
|
||||
)
|
||||
results = await self.store.is_partial_state_room_batched(
|
||||
un_partial_stated_rooms
|
||||
)
|
||||
forced_newly_joined_room_ids.update(
|
||||
room_id
|
||||
for room_id, is_partial_state in results.items()
|
||||
if not is_partial_state
|
||||
)
|
||||
|
||||
# Now we have our list of joined room IDs, exclude as configured and freeze
|
||||
joined_room_ids = frozenset(
|
||||
room_id
|
||||
for room_id in mutable_joined_room_ids
|
||||
if room_id not in mutable_rooms_to_exclude
|
||||
sync_result_builder = await self.get_sync_result_builder(
|
||||
sync_config,
|
||||
since_token,
|
||||
full_state,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Calculating sync response for %r between %s and %s",
|
||||
sync_config.user,
|
||||
since_token,
|
||||
now_token,
|
||||
)
|
||||
|
||||
sync_result_builder = SyncResultBuilder(
|
||||
sync_config,
|
||||
full_state,
|
||||
since_token=since_token,
|
||||
now_token=now_token,
|
||||
joined_room_ids=joined_room_ids,
|
||||
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
|
||||
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
|
||||
membership_change_events=membership_change_events,
|
||||
sync_result_builder.since_token,
|
||||
sync_result_builder.now_token,
|
||||
)
|
||||
|
||||
logger.debug("Fetching account data")
|
||||
@ -1802,6 +1726,149 @@ class SyncHandler:
|
||||
next_batch=sync_result_builder.now_token,
|
||||
)
|
||||
|
||||
async def get_sync_result_builder(
|
||||
self,
|
||||
sync_config: SyncConfig,
|
||||
since_token: Optional[StreamToken] = None,
|
||||
full_state: bool = False,
|
||||
) -> "SyncResultBuilder":
|
||||
"""
|
||||
Assemble a `SyncResultBuilder` with all of the initial context to
|
||||
start building up the sync response:
|
||||
|
||||
- Membership changes between the last sync and the current sync.
|
||||
- Joined room IDs (minus any rooms to exclude).
|
||||
- Rooms that became fully-stated/un-partial stated since the last sync.
|
||||
|
||||
Args:
|
||||
sync_config: Config/info necessary to process the sync request.
|
||||
since_token: The point in the stream to sync from.
|
||||
full_state: Whether to return the full state for each room.
|
||||
|
||||
Returns:
|
||||
`SyncResultBuilder` ready to start generating parts of the sync response.
|
||||
"""
|
||||
user_id = sync_config.user.to_string()
|
||||
|
||||
# Note: we get the users room list *before* we get the current token, this
|
||||
# avoids checking back in history if rooms are joined after the token is fetched.
|
||||
token_before_rooms = self.event_sources.get_current_token()
|
||||
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
||||
|
||||
# NB: The `now_token` gets changed by some of the `generate_sync_*` methods,
|
||||
# this is due to some of the underlying streams not supporting the ability
|
||||
# to query up to a given point.
|
||||
# Always use the `now_token` in `SyncResultBuilder`
|
||||
now_token = self.event_sources.get_current_token()
|
||||
log_kv({"now_token": now_token})
|
||||
|
||||
# Since we fetched the users room list before the token, there's a small window
|
||||
# during which membership events may have been persisted, so we fetch these now
|
||||
# and modify the joined room list for any changes between the get_rooms_for_user
|
||||
# call and the get_current_token call.
|
||||
membership_change_events = []
|
||||
if since_token:
|
||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||
user_id,
|
||||
since_token.room_key,
|
||||
now_token.room_key,
|
||||
self.rooms_to_exclude_globally,
|
||||
)
|
||||
|
||||
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
||||
for event in membership_change_events:
|
||||
mem_last_change_by_room_id[event.room_id] = event
|
||||
|
||||
# For the latest membership event in each room found, add/remove the room ID
|
||||
# from the joined room list accordingly. In this case we only care if the
|
||||
# latest change is JOIN.
|
||||
|
||||
for room_id, event in mem_last_change_by_room_id.items():
|
||||
assert event.internal_metadata.stream_ordering
|
||||
if (
|
||||
event.internal_metadata.stream_ordering
|
||||
< token_before_rooms.room_key.stream
|
||||
):
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
"User membership change between getting rooms and current token: %s %s %s",
|
||||
user_id,
|
||||
event.membership,
|
||||
room_id,
|
||||
)
|
||||
# User joined a room - we have to then check the room state to ensure we
|
||||
# respect any bans if there's a race between the join and ban events.
|
||||
if event.membership == Membership.JOIN:
|
||||
user_ids_in_room = await self.store.get_users_in_room(room_id)
|
||||
if user_id in user_ids_in_room:
|
||||
mutable_joined_room_ids.add(room_id)
|
||||
# The user left the room, or left and was re-invited but not joined yet
|
||||
else:
|
||||
mutable_joined_room_ids.discard(room_id)
|
||||
|
||||
# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
|
||||
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
|
||||
if not sync_config.filter_collection.lazy_load_members():
|
||||
# Non-lazy syncs should never include partially stated rooms.
|
||||
# Exclude all partially stated rooms from this sync.
|
||||
results = await self.store.is_partial_state_room_batched(
|
||||
mutable_joined_room_ids
|
||||
)
|
||||
mutable_rooms_to_exclude.update(
|
||||
room_id
|
||||
for room_id, is_partial_state in results.items()
|
||||
if is_partial_state
|
||||
)
|
||||
membership_change_events = [
|
||||
event
|
||||
for event in membership_change_events
|
||||
if not results.get(event.room_id, False)
|
||||
]
|
||||
|
||||
# Incremental eager syncs should additionally include rooms that
|
||||
# - we are joined to
|
||||
# - are full-stated
|
||||
# - became fully-stated at some point during the sync period
|
||||
# (These rooms will have been omitted during a previous eager sync.)
|
||||
forced_newly_joined_room_ids: Set[str] = set()
|
||||
if since_token and not sync_config.filter_collection.lazy_load_members():
|
||||
un_partial_stated_rooms = (
|
||||
await self.store.get_un_partial_stated_rooms_between(
|
||||
since_token.un_partial_stated_rooms_key,
|
||||
now_token.un_partial_stated_rooms_key,
|
||||
mutable_joined_room_ids,
|
||||
)
|
||||
)
|
||||
results = await self.store.is_partial_state_room_batched(
|
||||
un_partial_stated_rooms
|
||||
)
|
||||
forced_newly_joined_room_ids.update(
|
||||
room_id
|
||||
for room_id, is_partial_state in results.items()
|
||||
if not is_partial_state
|
||||
)
|
||||
|
||||
# Now we have our list of joined room IDs, exclude as configured and freeze
|
||||
joined_room_ids = frozenset(
|
||||
room_id
|
||||
for room_id in mutable_joined_room_ids
|
||||
if room_id not in mutable_rooms_to_exclude
|
||||
)
|
||||
|
||||
sync_result_builder = SyncResultBuilder(
|
||||
sync_config,
|
||||
full_state,
|
||||
since_token=since_token,
|
||||
now_token=now_token,
|
||||
joined_room_ids=joined_room_ids,
|
||||
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
|
||||
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
|
||||
membership_change_events=membership_change_events,
|
||||
)
|
||||
|
||||
return sync_result_builder
|
||||
|
||||
@measure_func("_generate_sync_entry_for_device_list")
|
||||
async def _generate_sync_entry_for_device_list(
|
||||
self,
|
||||
@ -1850,42 +1917,18 @@ class SyncHandler:
|
||||
|
||||
users_that_have_changed = set()
|
||||
|
||||
joined_rooms = sync_result_builder.joined_room_ids
|
||||
joined_room_ids = sync_result_builder.joined_room_ids
|
||||
|
||||
# Step 1a, check for changes in devices of users we share a room
|
||||
# with
|
||||
#
|
||||
# We do this in two different ways depending on what we have cached.
|
||||
# If we already have a list of all the user that have changed since
|
||||
# the last sync then it's likely more efficient to compare the rooms
|
||||
# they're in with the rooms the syncing user is in.
|
||||
#
|
||||
# If we don't have that info cached then we get all the users that
|
||||
# share a room with our user and check if those users have changed.
|
||||
cache_result = self.store.get_cached_device_list_changes(
|
||||
since_token.device_list_key
|
||||
)
|
||||
if cache_result.hit:
|
||||
changed_users = cache_result.entities
|
||||
|
||||
result = await self.store.get_rooms_for_users(changed_users)
|
||||
|
||||
for changed_user_id, entries in result.items():
|
||||
# Check if the changed user shares any rooms with the user,
|
||||
# or if the changed user is the syncing user (as we always
|
||||
# want to include device list updates of their own devices).
|
||||
if user_id == changed_user_id or any(
|
||||
rid in joined_rooms for rid in entries
|
||||
):
|
||||
users_that_have_changed.add(changed_user_id)
|
||||
else:
|
||||
users_that_have_changed = (
|
||||
await self._device_handler.get_device_changes_in_shared_rooms(
|
||||
user_id,
|
||||
sync_result_builder.joined_room_ids,
|
||||
from_token=since_token,
|
||||
)
|
||||
users_that_have_changed = (
|
||||
await self._device_handler.get_device_changes_in_shared_rooms(
|
||||
user_id,
|
||||
sync_result_builder.joined_room_ids,
|
||||
from_token=since_token,
|
||||
now_token=sync_result_builder.now_token,
|
||||
)
|
||||
)
|
||||
|
||||
# Step 1b, check for newly joined rooms
|
||||
for room_id in newly_joined_rooms:
|
||||
@ -1909,7 +1952,7 @@ class SyncHandler:
|
||||
# Remove any users that we still share a room with.
|
||||
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
|
||||
for user_id, entries in left_users_rooms.items():
|
||||
if any(rid in joined_rooms for rid in entries):
|
||||
if any(rid in joined_room_ids for rid in entries):
|
||||
newly_left_users.discard(user_id)
|
||||
|
||||
return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
|
||||
|
@ -763,6 +763,29 @@ class Notifier:
|
||||
|
||||
return result
|
||||
|
||||
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
|
||||
"""Wait for this worker to catch up with the given stream token."""
|
||||
|
||||
start = self.clock.time_msec()
|
||||
while True:
|
||||
current_token = self.event_sources.get_current_token()
|
||||
if stream_token.is_before_or_eq(current_token):
|
||||
return True
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
if now - start > 10_000:
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
"Waiting for current token to reach %s; currently at %s",
|
||||
stream_token,
|
||||
current_token,
|
||||
)
|
||||
|
||||
# TODO: be better
|
||||
await self.clock.sleep(0.5)
|
||||
|
||||
async def _get_room_ids(
|
||||
self, user: UserID, explicit_room_id: Optional[str]
|
||||
) -> Tuple[StrCollection, bool]:
|
||||
|
@ -112,6 +112,15 @@ class ReplicationDataHandler:
|
||||
token: stream token for this batch of rows
|
||||
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
|
||||
"""
|
||||
all_room_ids: Set[str] = set()
|
||||
if stream_name == DeviceListsStream.NAME:
|
||||
if any(row.entity.startswith("@") and not row.is_signature for row in rows):
|
||||
prev_token = self.store.get_device_stream_token()
|
||||
all_room_ids = await self.store.get_all_device_list_changes(
|
||||
prev_token, token
|
||||
)
|
||||
self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
|
||||
|
||||
self.store.process_replication_rows(stream_name, instance_name, token, rows)
|
||||
# NOTE: this must be called after process_replication_rows to ensure any
|
||||
# cache invalidations are first handled before any stream ID advances.
|
||||
@ -146,12 +155,6 @@ class ReplicationDataHandler:
|
||||
StreamKeyType.TO_DEVICE, token, users=entities
|
||||
)
|
||||
elif stream_name == DeviceListsStream.NAME:
|
||||
all_room_ids: Set[str] = set()
|
||||
for row in rows:
|
||||
if row.entity.startswith("@") and not row.is_signature:
|
||||
room_ids = await self.store.get_rooms_for_user(row.entity)
|
||||
all_room_ids.update(room_ids)
|
||||
|
||||
# `all_room_ids` can be large, so let's wake up those streams in batches
|
||||
for batched_room_ids in batch_iter(all_room_ids, 100):
|
||||
self.notifier.on_new_event(
|
||||
|
@ -210,7 +210,6 @@ class SyncRestServlet(RestServlet):
|
||||
user=user,
|
||||
filter_collection=filter_collection,
|
||||
is_guest=requester.is_guest,
|
||||
request_key=request_key,
|
||||
device_id=device_id,
|
||||
)
|
||||
|
||||
@ -234,6 +233,7 @@ class SyncRestServlet(RestServlet):
|
||||
requester,
|
||||
sync_config,
|
||||
SyncVersion.SYNC_V2,
|
||||
request_key,
|
||||
since_token=since_token,
|
||||
timeout=timeout,
|
||||
full_state=full_state,
|
||||
|
@ -70,10 +70,7 @@ from synapse.types import (
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.stream_change_cache import (
|
||||
AllEntitiesChangedResult,
|
||||
StreamChangeCache,
|
||||
)
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.iterutils import batch_iter
|
||||
from synapse.util.stringutils import shortstr
|
||||
@ -132,6 +129,20 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
prefilled_cache=device_list_prefill,
|
||||
)
|
||||
|
||||
device_list_room_prefill, min_device_list_room_id = self.db_pool.get_cache_dict(
|
||||
db_conn,
|
||||
"device_lists_changes_in_room",
|
||||
entity_column="room_id",
|
||||
stream_column="stream_id",
|
||||
max_value=device_list_max,
|
||||
limit=10000,
|
||||
)
|
||||
self._device_list_room_stream_cache = StreamChangeCache(
|
||||
"DeviceListRoomStreamChangeCache",
|
||||
min_device_list_room_id,
|
||||
prefilled_cache=device_list_room_prefill,
|
||||
)
|
||||
|
||||
(
|
||||
user_signature_stream_prefill,
|
||||
user_signature_stream_list_id,
|
||||
@ -211,6 +222,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
row.entity, token
|
||||
)
|
||||
|
||||
def device_lists_in_rooms_have_changed(
|
||||
self, room_ids: StrCollection, token: int
|
||||
) -> None:
|
||||
"Record that device lists have changed in rooms"
|
||||
for room_id in room_ids:
|
||||
self._device_list_room_stream_cache.entity_has_changed(room_id, token)
|
||||
|
||||
def get_device_stream_token(self) -> int:
|
||||
return self._device_list_id_gen.get_current_token()
|
||||
|
||||
@ -834,16 +852,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
)
|
||||
return {device[0]: db_to_json(device[1]) for device in devices}
|
||||
|
||||
def get_cached_device_list_changes(
|
||||
self,
|
||||
from_key: int,
|
||||
) -> AllEntitiesChangedResult:
|
||||
"""Get set of users whose devices have changed since `from_key`, or None
|
||||
if that information is not in our cache.
|
||||
"""
|
||||
|
||||
return self._device_list_stream_cache.get_all_entities_changed(from_key)
|
||||
|
||||
@cancellable
|
||||
async def get_all_devices_changed(
|
||||
self,
|
||||
@ -1459,7 +1467,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
|
||||
@cancellable
|
||||
async def get_device_list_changes_in_rooms(
|
||||
self, room_ids: Collection[str], from_id: int
|
||||
self, room_ids: Collection[str], from_id: int, to_id: int
|
||||
) -> Optional[Set[str]]:
|
||||
"""Return the set of users whose devices have changed in the given rooms
|
||||
since the given stream ID.
|
||||
@ -1475,9 +1483,15 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
if min_stream_id > from_id:
|
||||
return None
|
||||
|
||||
changed_room_ids = self._device_list_room_stream_cache.get_entities_changed(
|
||||
room_ids, from_id
|
||||
)
|
||||
if not changed_room_ids:
|
||||
return set()
|
||||
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM device_lists_changes_in_room
|
||||
WHERE {clause} AND stream_id >= ?
|
||||
WHERE {clause} AND stream_id > ? AND stream_id <= ?
|
||||
"""
|
||||
|
||||
def _get_device_list_changes_in_rooms_txn(
|
||||
@ -1489,11 +1503,12 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
return {user_id for user_id, in txn}
|
||||
|
||||
changes = set()
|
||||
for chunk in batch_iter(room_ids, 1000):
|
||||
for chunk in batch_iter(changed_room_ids, 1000):
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", chunk
|
||||
)
|
||||
args.append(from_id)
|
||||
args.append(to_id)
|
||||
|
||||
changes |= await self.db_pool.runInteraction(
|
||||
"get_device_list_changes_in_rooms",
|
||||
@ -1504,6 +1519,34 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
|
||||
return changes
|
||||
|
||||
async def get_all_device_list_changes(self, from_id: int, to_id: int) -> Set[str]:
|
||||
"""Return the set of rooms where devices have changed since the given
|
||||
stream ID.
|
||||
|
||||
Will raise an exception if the given stream ID is too old.
|
||||
"""
|
||||
|
||||
min_stream_id = await self._get_min_device_lists_changes_in_room()
|
||||
|
||||
if min_stream_id > from_id:
|
||||
raise Exception("stream ID is too old")
|
||||
|
||||
sql = """
|
||||
SELECT DISTINCT room_id FROM device_lists_changes_in_room
|
||||
WHERE stream_id > ? AND stream_id <= ?
|
||||
"""
|
||||
|
||||
def _get_all_device_list_changes_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Set[str]:
|
||||
txn.execute(sql, (from_id, to_id))
|
||||
return {room_id for room_id, in txn}
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_all_device_list_changes",
|
||||
_get_all_device_list_changes_txn,
|
||||
)
|
||||
|
||||
async def get_device_list_changes_in_room(
|
||||
self, room_id: str, min_stream_id: int
|
||||
) -> Collection[Tuple[str, str]]:
|
||||
@ -1964,8 +2007,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
async def add_device_change_to_streams(
|
||||
self,
|
||||
user_id: str,
|
||||
device_ids: Collection[str],
|
||||
room_ids: Collection[str],
|
||||
device_ids: StrCollection,
|
||||
room_ids: StrCollection,
|
||||
) -> Optional[int]:
|
||||
"""Persist that a user's devices have been updated, and which hosts
|
||||
(if any) should be poked.
|
||||
@ -2147,8 +2190,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
user_id: str,
|
||||
device_ids: Iterable[str],
|
||||
room_ids: Collection[str],
|
||||
device_ids: StrCollection,
|
||||
room_ids: StrCollection,
|
||||
stream_ids: List[int],
|
||||
context: Dict[str, str],
|
||||
) -> None:
|
||||
@ -2186,6 +2229,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
],
|
||||
)
|
||||
|
||||
txn.call_after(
|
||||
self.device_lists_in_rooms_have_changed, room_ids, max(stream_ids)
|
||||
)
|
||||
|
||||
async def get_uncoverted_outbound_room_pokes(
|
||||
self, start_stream_id: int, start_room_id: str, limit: int = 10
|
||||
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
|
||||
|
@ -48,7 +48,7 @@ import attr
|
||||
from immutabledict import immutabledict
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.types import VerifyKey
|
||||
from typing_extensions import TypedDict
|
||||
from typing_extensions import Self, TypedDict
|
||||
from unpaddedbase64 import decode_base64
|
||||
from zope.interface import Interface
|
||||
|
||||
@ -515,6 +515,27 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
|
||||
# at `self.stream`.
|
||||
return self.instance_map.get(instance_name, self.stream)
|
||||
|
||||
def is_before_or_eq(self, other_token: Self) -> bool:
|
||||
"""Wether this token is before the other token, i.e. every constituent
|
||||
part is before the other.
|
||||
|
||||
Essentially it is `self <= other`.
|
||||
|
||||
Note: if `self.is_before_or_eq(other_token) is False` then that does not
|
||||
imply that the reverse is True.
|
||||
"""
|
||||
if self.stream > other_token.stream:
|
||||
return False
|
||||
|
||||
instances = self.instance_map.keys() | other_token.instance_map.keys()
|
||||
for instance in instances:
|
||||
if self.instance_map.get(
|
||||
instance, self.stream
|
||||
) > other_token.instance_map.get(instance, other_token.stream):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@attr.s(frozen=True, slots=True, order=False)
|
||||
class RoomStreamToken(AbstractMultiWriterStreamToken):
|
||||
@ -1008,6 +1029,41 @@ class StreamToken:
|
||||
"""Returns the stream ID for the given key."""
|
||||
return getattr(self, key.value)
|
||||
|
||||
def is_before_or_eq(self, other_token: "StreamToken") -> bool:
|
||||
"""Wether this token is before the other token, i.e. every constituent
|
||||
part is before the other.
|
||||
|
||||
Essentially it is `self <= other`.
|
||||
|
||||
Note: if `self.is_before_or_eq(other_token) is False` then that does not
|
||||
imply that the reverse is True.
|
||||
"""
|
||||
|
||||
for _, key in StreamKeyType.__members__.items():
|
||||
if key == StreamKeyType.TYPING:
|
||||
# Typing stream is allowed to "reset", and so comparisons don't
|
||||
# really make sense as is.
|
||||
# TODO: Figure out a better way of tracking resets.
|
||||
continue
|
||||
|
||||
self_value = self.get_field(key)
|
||||
other_value = other_token.get_field(key)
|
||||
|
||||
if isinstance(self_value, RoomStreamToken):
|
||||
assert isinstance(other_value, RoomStreamToken)
|
||||
if not self_value.is_before_or_eq(other_value):
|
||||
return False
|
||||
elif isinstance(self_value, MultiWriterStreamToken):
|
||||
assert isinstance(other_value, MultiWriterStreamToken)
|
||||
if not self_value.is_before_or_eq(other_value):
|
||||
return False
|
||||
else:
|
||||
assert isinstance(other_value, int)
|
||||
if self_value > other_value:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
StreamToken.START = StreamToken(
|
||||
RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0
|
||||
|
@ -24,7 +24,12 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.context import (
|
||||
ContextResourceUsage,
|
||||
LoggingContext,
|
||||
nested_logging_context,
|
||||
set_current_context,
|
||||
)
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
@ -81,6 +86,8 @@ class TaskScheduler:
|
||||
MAX_CONCURRENT_RUNNING_TASKS = 5
|
||||
# Time from the last task update after which we will log a warning
|
||||
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
|
||||
# Report a running task's status and usage every so often.
|
||||
OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._hs = hs
|
||||
@ -346,6 +353,33 @@ class TaskScheduler:
|
||||
assert task.id not in self._running_tasks
|
||||
await self._store.delete_scheduled_task(task.id)
|
||||
|
||||
@staticmethod
|
||||
def _log_task_usage(
|
||||
state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float
|
||||
) -> None:
|
||||
"""
|
||||
Log a line describing the state and usage of a task.
|
||||
The log line is inspired by / a copy of the request log line format,
|
||||
but with irrelevant fields removed.
|
||||
|
||||
active_time: Time that the task has been running for, in seconds.
|
||||
"""
|
||||
|
||||
logger.info(
|
||||
"Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
|
||||
" [%d dbevts] %r, %r",
|
||||
state,
|
||||
active_time,
|
||||
usage.ru_utime,
|
||||
usage.ru_stime,
|
||||
usage.db_sched_duration_sec,
|
||||
usage.db_txn_duration_sec,
|
||||
int(usage.db_txn_count),
|
||||
usage.evt_db_fetch_count,
|
||||
task.resource_id,
|
||||
task.params,
|
||||
)
|
||||
|
||||
async def _launch_task(self, task: ScheduledTask) -> None:
|
||||
"""Launch a scheduled task now.
|
||||
|
||||
@ -360,8 +394,32 @@ class TaskScheduler:
|
||||
)
|
||||
function = self._actions[task.action]
|
||||
|
||||
def _occasional_report(
|
||||
task_log_context: LoggingContext, start_time: float
|
||||
) -> None:
|
||||
"""
|
||||
Helper to log a 'Task continuing' line every so often.
|
||||
"""
|
||||
|
||||
current_time = self._clock.time()
|
||||
calling_context = set_current_context(task_log_context)
|
||||
try:
|
||||
usage = task_log_context.get_resource_usage()
|
||||
TaskScheduler._log_task_usage(
|
||||
"continuing", task, usage, current_time - start_time
|
||||
)
|
||||
finally:
|
||||
set_current_context(calling_context)
|
||||
|
||||
async def wrapper() -> None:
|
||||
with nested_logging_context(task.id):
|
||||
with nested_logging_context(task.id) as log_context:
|
||||
start_time = self._clock.time()
|
||||
occasional_status_call = self._clock.looping_call(
|
||||
_occasional_report,
|
||||
TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS,
|
||||
log_context,
|
||||
start_time,
|
||||
)
|
||||
try:
|
||||
(status, result, error) = await function(task)
|
||||
except Exception:
|
||||
@ -383,6 +441,13 @@ class TaskScheduler:
|
||||
)
|
||||
self._running_tasks.remove(task.id)
|
||||
|
||||
current_time = self._clock.time()
|
||||
usage = log_context.get_resource_usage()
|
||||
TaskScheduler._log_task_usage(
|
||||
status.value, task, usage, current_time - start_time
|
||||
)
|
||||
occasional_status_call.stop()
|
||||
|
||||
# Try launch a new task since we've finished with this one.
|
||||
self._clock.call_later(0.1, self._launch_scheduled_tasks)
|
||||
|
||||
|
@ -116,8 +116,9 @@ class TestRatelimiter(unittest.HomeserverTestCase):
|
||||
# Should raise
|
||||
with self.assertRaises(LimitExceededError) as context:
|
||||
self.get_success_or_raise(
|
||||
limiter.ratelimit(None, key="test_id", _time_now_s=5)
|
||||
limiter.ratelimit(None, key="test_id", _time_now_s=5), by=0.5
|
||||
)
|
||||
|
||||
self.assertEqual(context.exception.retry_after_ms, 5000)
|
||||
|
||||
# Shouldn't raise
|
||||
@ -192,7 +193,7 @@ class TestRatelimiter(unittest.HomeserverTestCase):
|
||||
# Second attempt, 1s later, will fail
|
||||
with self.assertRaises(LimitExceededError) as context:
|
||||
self.get_success_or_raise(
|
||||
limiter.ratelimit(None, key=("test_id",), _time_now_s=1)
|
||||
limiter.ratelimit(None, key=("test_id",), _time_now_s=1), by=0.5
|
||||
)
|
||||
self.assertEqual(context.exception.retry_after_ms, 9000)
|
||||
|
||||
|
@ -36,7 +36,7 @@ from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, StreamToken, create_requester
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.handlers.test_sync import SyncVersion, generate_sync_config
|
||||
from tests.handlers.test_sync import SyncRequestKey, SyncVersion, generate_sync_config
|
||||
from tests.unittest import (
|
||||
FederatingHomeserverTestCase,
|
||||
HomeserverTestCase,
|
||||
@ -498,6 +498,15 @@ def send_presence_update(
|
||||
return channel.json_body
|
||||
|
||||
|
||||
_request_key = 0
|
||||
|
||||
|
||||
def generate_request_key() -> SyncRequestKey:
|
||||
global _request_key
|
||||
_request_key += 1
|
||||
return ("request_key", _request_key)
|
||||
|
||||
|
||||
def sync_presence(
|
||||
testcase: HomeserverTestCase,
|
||||
user_id: str,
|
||||
@ -521,7 +530,11 @@ def sync_presence(
|
||||
sync_config = generate_sync_config(requester.user.to_string())
|
||||
sync_result = testcase.get_success(
|
||||
testcase.hs.get_sync_handler().wait_for_sync_for_user(
|
||||
requester, sync_config, SyncVersion.SYNC_V2, since_token
|
||||
requester,
|
||||
sync_config,
|
||||
SyncVersion.SYNC_V2,
|
||||
generate_request_key(),
|
||||
since_token,
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -483,6 +483,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
||||
event.room_version,
|
||||
),
|
||||
exc=LimitExceededError,
|
||||
by=0.5,
|
||||
)
|
||||
|
||||
def _build_and_send_join_event(
|
||||
|
@ -70,6 +70,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
by=0.5,
|
||||
)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
@ -206,6 +207,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
|
||||
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||
),
|
||||
LimitExceededError,
|
||||
by=0.5,
|
||||
)
|
||||
|
||||
# TODO: test that remote joins to a room are rate limited.
|
||||
@ -273,6 +275,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
by=0.5,
|
||||
)
|
||||
|
||||
# Try to join as Chris on the original worker. Should get denied because Alice
|
||||
@ -285,6 +288,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
by=0.5,
|
||||
)
|
||||
|
||||
|
||||
|
@ -31,7 +31,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.federation.federation_base import event_from_pdu_json
|
||||
from synapse.handlers.sync import SyncConfig, SyncResult, SyncVersion
|
||||
from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVersion
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import knock, login, room
|
||||
from synapse.server import HomeServer
|
||||
@ -41,6 +41,14 @@ from synapse.util import Clock
|
||||
import tests.unittest
|
||||
import tests.utils
|
||||
|
||||
_request_key = 0
|
||||
|
||||
|
||||
def generate_request_key() -> SyncRequestKey:
|
||||
global _request_key
|
||||
_request_key += 1
|
||||
return ("request_key", _request_key)
|
||||
|
||||
|
||||
class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
"""Tests Sync Handler."""
|
||||
@ -77,6 +85,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config,
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
@ -87,6 +96,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config,
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
),
|
||||
ResourceLimitError,
|
||||
)
|
||||
@ -102,6 +112,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config,
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
),
|
||||
ResourceLimitError,
|
||||
)
|
||||
@ -124,6 +135,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config=generate_sync_config(user, device_id="dev"),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
@ -157,6 +169,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config=generate_sync_config(user),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
self.assertIn(joined_room, [r.room_id for r in result.joined])
|
||||
@ -169,6 +182,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config=generate_sync_config(user, device_id="dev"),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_result.next_batch,
|
||||
)
|
||||
)
|
||||
@ -200,6 +214,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config=generate_sync_config(user),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
self.assertNotIn(joined_room, [r.room_id for r in result.joined])
|
||||
@ -212,6 +227,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
requester,
|
||||
sync_config=generate_sync_config(user, device_id="dev"),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_result.next_batch,
|
||||
)
|
||||
)
|
||||
@ -254,6 +270,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
create_requester(owner),
|
||||
generate_sync_config(owner),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
self.assertEqual(len(alice_sync_result.joined), 1)
|
||||
@ -277,6 +294,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
eve_requester,
|
||||
eve_sync_config,
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
@ -295,6 +313,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
eve_requester,
|
||||
eve_sync_config,
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=eve_sync_after_ban.next_batch,
|
||||
)
|
||||
)
|
||||
@ -307,6 +326,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
eve_requester,
|
||||
eve_sync_config,
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=None,
|
||||
)
|
||||
)
|
||||
@ -341,6 +361,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
last_room_creation_event_id = (
|
||||
@ -369,6 +390,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
),
|
||||
),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
@ -414,6 +436,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
last_room_creation_event_id = (
|
||||
@ -452,6 +475,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
),
|
||||
),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
@ -498,6 +522,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
last_room_creation_event_id = (
|
||||
@ -523,6 +548,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
),
|
||||
),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
@ -553,6 +579,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
),
|
||||
),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
@ -615,6 +642,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
last_room_creation_event_id = (
|
||||
@ -639,6 +667,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
),
|
||||
),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
room_sync = initial_sync_result.joined[0]
|
||||
@ -660,6 +689,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
@ -713,6 +743,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
bob_requester,
|
||||
generate_sync_config(bob),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
@ -744,6 +775,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
bob, filter_collection=FilterCollection(self.hs, filter_dict)
|
||||
),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=None if initial_sync else initial_sync_result.next_batch,
|
||||
)
|
||||
).archived[0]
|
||||
@ -839,6 +871,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
create_requester(user),
|
||||
generate_sync_config(user),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
event_ids = []
|
||||
@ -887,6 +920,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
create_requester(user2),
|
||||
generate_sync_config(user2),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
priv_event_ids = []
|
||||
@ -909,7 +943,10 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
|
||||
sync_result: SyncResult = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
create_requester(user), generate_sync_config(user)
|
||||
create_requester(user),
|
||||
generate_sync_config(user),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
@ -923,9 +960,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
self.fail("No push rules found")
|
||||
|
||||
|
||||
_request_key = 0
|
||||
|
||||
|
||||
def generate_sync_config(
|
||||
user_id: str,
|
||||
device_id: Optional[str] = "device_id",
|
||||
@ -942,12 +976,9 @@ def generate_sync_config(
|
||||
if filter_collection is None:
|
||||
filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION
|
||||
|
||||
global _request_key
|
||||
_request_key += 1
|
||||
return SyncConfig(
|
||||
user=UserID.from_string(user_id),
|
||||
filter_collection=filter_collection,
|
||||
is_guest=False,
|
||||
request_key=("request_key", _request_key),
|
||||
device_id=device_id,
|
||||
)
|
||||
|
@ -637,13 +637,13 @@ class HomeserverTestCase(TestCase):
|
||||
return self.successResultOf(deferred)
|
||||
|
||||
def get_failure(
|
||||
self, d: Awaitable[Any], exc: Type[_ExcType]
|
||||
self, d: Awaitable[Any], exc: Type[_ExcType], by: float = 0.0
|
||||
) -> _TypedFailure[_ExcType]:
|
||||
"""
|
||||
Run a Deferred and get a Failure from it. The failure must be of the type `exc`.
|
||||
"""
|
||||
deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type]
|
||||
self.pump()
|
||||
self.pump(by)
|
||||
return self.failureResultOf(deferred, exc)
|
||||
|
||||
def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV:
|
||||
|
Loading…
x
Reference in New Issue
Block a user