From 1bddd25a85d82b2ef4a2a42f6ecd476108d7dd96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Dec 2025 13:55:06 +0000 Subject: [PATCH] Port `Clock` functions to use `Duration` class (#19229) This changes the arguments in clock functions to be `Duration` and converts call sites and constants into `Duration`. There are still some more functions around that should be converted (e.g. `timeout_deferred`), but we leave that to another PR. We also changes `.as_secs()` to return a float, as the rounding broke things subtly. The only reason to keep it (its the same as `timedelta.total_seconds()`) is for symmetry with `as_millis()`. Follows on from https://github.com/element-hq/synapse/pull/19223 --- changelog.d/19229.misc | 1 + rust/src/duration.rs | 56 +++++++++++++ rust/src/lib.rs | 1 + rust/src/rendezvous/mod.rs | 5 +- synapse/api/ratelimiting.py | 3 +- synapse/app/phone_stats_home.py | 12 +-- synapse/appservice/scheduler.py | 3 +- synapse/federation/federation_client.py | 3 +- synapse/federation/federation_server.py | 7 +- synapse/federation/send_queue.py | 3 +- synapse/federation/sender/__init__.py | 13 +-- synapse/handlers/account_validity.py | 3 +- synapse/handlers/auth.py | 3 +- synapse/handlers/delayed_events.py | 9 ++- synapse/handlers/device.py | 13 ++- synapse/handlers/e2e_keys.py | 3 +- synapse/handlers/federation.py | 5 +- synapse/handlers/federation_event.py | 3 +- synapse/handlers/message.py | 18 ++--- synapse/handlers/pagination.py | 3 +- synapse/handlers/presence.py | 17 ++-- synapse/handlers/profile.py | 3 +- synapse/handlers/room.py | 3 +- synapse/handlers/room_member.py | 9 ++- synapse/handlers/stats.py | 3 +- synapse/handlers/typing.py | 26 +++--- synapse/handlers/user_directory.py | 19 +++-- synapse/handlers/worker_lock.py | 4 +- synapse/http/client.py | 5 +- .../http/federation/well_known_resolver.py | 3 +- synapse/http/server.py | 3 +- synapse/media/media_repository.py | 9 ++- synapse/media/media_storage.py | 5 +- synapse/media/url_previewer.py | 5 +- synapse/metrics/common_usage_metrics.py | 3 +- synapse/module_api/__init__.py | 8 +- synapse/notifier.py | 11 ++- synapse/push/emailpusher.py | 3 +- synapse/push/httppusher.py | 5 +- synapse/replication/http/_base.py | 5 +- synapse/replication/tcp/client.py | 3 +- synapse/replication/tcp/protocol.py | 5 +- synapse/replication/tcp/redis.py | 3 +- synapse/replication/tcp/resource.py | 3 +- synapse/rest/client/account.py | 13 ++- synapse/rest/client/register.py | 9 ++- synapse/rest/client/transactions.py | 2 +- synapse/state/__init__.py | 3 +- synapse/state/v2.py | 15 ++-- synapse/storage/background_updates.py | 3 +- synapse/storage/controllers/purge_events.py | 3 +- synapse/storage/database.py | 7 +- synapse/storage/databases/main/cache.py | 13 +-- .../storage/databases/main/censor_events.py | 3 +- synapse/storage/databases/main/client_ips.py | 5 +- synapse/storage/databases/main/deviceinbox.py | 4 +- synapse/storage/databases/main/devices.py | 3 +- .../databases/main/event_federation.py | 7 +- .../databases/main/event_push_actions.py | 11 ++- .../storage/databases/main/events_worker.py | 3 +- synapse/storage/databases/main/lock.py | 11 +-- synapse/storage/databases/main/metrics.py | 3 +- .../storage/databases/main/registration.py | 9 +-- synapse/storage/databases/main/roommember.py | 5 +- synapse/storage/databases/main/session.py | 3 +- .../storage/databases/main/sliding_sync.py | 2 +- .../storage/databases/main/transactions.py | 3 +- synapse/util/async_helpers.py | 7 +- synapse/util/batching_queue.py | 3 +- synapse/util/caches/expiringcache.py | 3 +- synapse/util/caches/lrucache.py | 7 +- synapse/util/caches/response_cache.py | 7 +- synapse/util/clock.py | 33 ++++---- synapse/util/duration.py | 81 ++++++++++++++++++- synapse/util/ratelimitutils.py | 7 +- synapse/util/task_scheduler.py | 15 ++-- synmark/suites/logging.py | 3 +- .../federation/transport/server/test__base.py | 5 +- tests/handlers/test_device.py | 2 +- tests/handlers/test_typing.py | 2 +- tests/http/test_servlet.py | 5 +- tests/logging/test_opentracing.py | 9 ++- tests/replication/http/test__base.py | 5 +- tests/rest/admin/test_background_updates.py | 3 +- tests/rest/admin/test_room.py | 7 +- tests/rest/client/test_transactions.py | 3 +- tests/server_notices/__init__.py | 3 +- tests/state/test_v2.py | 3 +- tests/state/test_v21.py | 3 +- tests/storage/databases/main/test_lock.py | 4 +- tests/storage/test_background_update.py | 5 +- tests/test_server.py | 9 ++- tests/util/caches/test_response_cache.py | 5 +- tests/util/test_logcontext.py | 55 +++++++------ tests/util/test_task_scheduler.py | 7 +- 95 files changed, 511 insertions(+), 260 deletions(-) create mode 100644 changelog.d/19229.misc create mode 100644 rust/src/duration.rs diff --git a/changelog.d/19229.misc b/changelog.d/19229.misc new file mode 100644 index 0000000000..8caebead72 --- /dev/null +++ b/changelog.d/19229.misc @@ -0,0 +1 @@ +Move towards using a dedicated `Duration` type. diff --git a/rust/src/duration.rs b/rust/src/duration.rs new file mode 100644 index 0000000000..a3dbe919b2 --- /dev/null +++ b/rust/src/duration.rs @@ -0,0 +1,56 @@ +/* + * This file is licensed under the Affero General Public License (AGPL) version 3. + * + * Copyright (C) 2025 Element Creations, Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * See the GNU Affero General Public License for more details: + * . + */ + +use once_cell::sync::OnceCell; +use pyo3::{ + types::{IntoPyDict, PyAnyMethods}, + Bound, BoundObject, IntoPyObject, Py, PyAny, PyErr, PyResult, Python, +}; + +/// A reference to the `synapse.util.duration` module. +static DURATION: OnceCell> = OnceCell::new(); + +/// Access to the `synapse.util.duration` module. +fn duration_module(py: Python<'_>) -> PyResult<&Bound<'_, PyAny>> { + Ok(DURATION + .get_or_try_init(|| py.import("synapse.util.duration").map(Into::into))? + .bind(py)) +} + +/// Mirrors the `synapse.util.duration.Duration` Python class. +pub struct SynapseDuration { + microseconds: u64, +} + +impl SynapseDuration { + /// For now we only need to create durations from milliseconds. + pub fn from_milliseconds(milliseconds: u64) -> Self { + Self { + microseconds: milliseconds * 1_000, + } + } +} + +impl<'py> IntoPyObject<'py> for &SynapseDuration { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let duration_module = duration_module(py)?; + let kwargs = [("microseconds", self.microseconds)].into_py_dict(py)?; + let duration_instance = duration_module.call_method("Duration", (), Some(&kwargs))?; + Ok(duration_instance.into_bound()) + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 6522148fa1..fe880af2ea 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -5,6 +5,7 @@ use pyo3::prelude::*; use pyo3_log::ResetHandle; pub mod acl; +pub mod duration; pub mod errors; pub mod events; pub mod http; diff --git a/rust/src/rendezvous/mod.rs b/rust/src/rendezvous/mod.rs index 848b5035bb..9a6da9fcc3 100644 --- a/rust/src/rendezvous/mod.rs +++ b/rust/src/rendezvous/mod.rs @@ -35,6 +35,7 @@ use ulid::Ulid; use self::session::Session; use crate::{ + duration::SynapseDuration, errors::{NotFoundError, SynapseError}, http::{http_request_from_twisted, http_response_to_twisted, HeaderMapPyExt}, UnwrapInfallible, @@ -132,6 +133,8 @@ impl RendezvousHandler { .unwrap_infallible() .unbind(); + let eviction_duration = SynapseDuration::from_milliseconds(eviction_interval); + // Construct a Python object so that we can get a reference to the // evict method and schedule it to run. let self_ = Py::new( @@ -149,7 +152,7 @@ impl RendezvousHandler { let evict = self_.getattr(py, "_evict")?; homeserver.call_method0("get_clock")?.call_method( "looping_call", - (evict, eviction_interval), + (evict, &eviction_duration), None, )?; diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index df884d47d7..d6cc3d26b5 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -27,6 +27,7 @@ from synapse.config.ratelimiting import RatelimitSettings from synapse.storage.databases.main import DataStore from synapse.types import Requester from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.wheel_timer import WheelTimer if TYPE_CHECKING: @@ -100,7 +101,7 @@ class Ratelimiter: # and doesn't affect correctness. self._timer: WheelTimer[Hashable] = WheelTimer() - self.clock.looping_call(self._prune_message_counts, 15 * 1000) + self.clock.looping_call(self._prune_message_counts, Duration(seconds=15)) def _get_key(self, requester: Requester | None, key: Hashable | None) -> Hashable: """Use the requester's MXID as a fallback key if no key is provided.""" diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index d278e30850..7b4bf25c28 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -218,13 +218,13 @@ def start_phone_stats_home(hs: "HomeServer") -> None: # table will decrease clock.looping_call( hs.get_datastores().main.generate_user_daily_visits, - Duration(minutes=5).as_millis(), + Duration(minutes=5), ) # monthly active user limiting functionality clock.looping_call( hs.get_datastores().main.reap_monthly_active_users, - Duration(hours=1).as_millis(), + Duration(hours=1), ) hs.get_datastores().main.reap_monthly_active_users() @@ -263,14 +263,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None: if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: generate_monthly_active_users() - clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + clock.looping_call(generate_monthly_active_users, Duration(minutes=5)) # End of monthly active user settings if hs.config.metrics.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") clock.looping_call( phone_stats_home, - PHONE_HOME_INTERVAL.as_millis(), + PHONE_HOME_INTERVAL, hs, stats, ) @@ -278,14 +278,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None: # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process clock.call_later( - 0, + Duration(seconds=0), performance_stats_init, ) # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes clock.call_later( - INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME.as_secs(), + INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME, phone_stats_home, hs, stats, diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 250f84d644..befb4ae44b 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ from synapse.logging.context import run_in_background from synapse.storage.databases.main import DataStore from synapse.types import DeviceListUpdates, JsonMapping from synapse.util.clock import Clock, DelayedCallWrapper +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -504,7 +505,7 @@ class _Recoverer: self.scheduled_recovery: DelayedCallWrapper | None = None def recover(self) -> None: - delay = 2**self.backoff_counter + delay = Duration(seconds=2**self.backoff_counter) logger.info("Scheduling retries on %s in %fs", self.service.id, delay) self.scheduled_recovery = self.clock.call_later( delay, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4110a90ed6..ba738ad65e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -75,6 +75,7 @@ from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.retryutils import NotRetryingDestination if TYPE_CHECKING: @@ -132,7 +133,7 @@ class FederationClient(FederationBase): super().__init__(hs) self.pdu_destination_tried: dict[str, dict[str, int]] = {} - self._clock.looping_call(self._clear_tried_cache, 60 * 1000) + self._clock.looping_call(self._clear_tried_cache, Duration(minutes=1)) self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34abac1cec..b909f1e595 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -89,6 +89,7 @@ from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id from synapse.util import unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache +from synapse.util.duration import Duration from synapse.util.stringutils import parse_server_name if TYPE_CHECKING: @@ -226,7 +227,7 @@ class FederationServer(FederationBase): ) # We pause a bit so that we don't start handling all rooms at once. - await self._clock.sleep(random.uniform(0, 0.1)) + await self._clock.sleep(Duration(seconds=random.uniform(0, 0.1))) async def on_backfill_request( self, origin: str, room_id: str, versions: list[str], limit: int @@ -301,7 +302,9 @@ class FederationServer(FederationBase): # Start a periodic check for old staged events. This is to handle # the case where locks time out, e.g. if another process gets killed # without dropping its locks. - self._clock.looping_call(self._handle_old_staged_events, 60 * 1000) + self._clock.looping_call( + self._handle_old_staged_events, Duration(minutes=1) + ) # keep this as early as possible to make the calculated origin ts as # accurate as possible. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index cf70e10a58..4a6d155217 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.replication.tcp.streams.federation import FederationStream from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection +from synapse.util.duration import Duration from synapse.util.metrics import Measure from .units import Edu @@ -137,7 +138,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): assert isinstance(queue, Sized) register(queue_name, queue=queue) - self.clock.looping_call(self._clear_queue, 30 * 1000) + self.clock.looping_call(self._clear_queue, Duration(seconds=30)) def shutdown(self) -> None: """Stops this federation sender instance from sending further transactions.""" diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 0bd97c25df..f7240c2f7f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -174,6 +174,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter @@ -218,12 +219,12 @@ transaction_queue_pending_edus_gauge = LaterGauge( # Please note that rate limiting still applies, so while the loop is # executed every X seconds the destinations may not be woken up because # they are being rate limited following previous attempt failures. -WAKEUP_RETRY_PERIOD_SEC = 60 +WAKEUP_RETRY_PERIOD = Duration(minutes=1) -# Time (in s) to wait in between waking up each destination, i.e. one destination +# Time to wait in between waking up each destination, i.e. one destination # will be woken up every seconds until we have woken every destination # has outstanding catch-up. -WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5 +WAKEUP_INTERVAL_BETWEEN_DESTINATIONS = Duration(seconds=5) class AbstractFederationSender(metaclass=abc.ABCMeta): @@ -379,7 +380,7 @@ class _DestinationWakeupQueue: queue.attempt_new_transaction() - await self.clock.sleep(current_sleep_seconds) + await self.clock.sleep(Duration(seconds=current_sleep_seconds)) if not self.queue: break @@ -468,7 +469,7 @@ class FederationSender(AbstractFederationSender): # Regularly wake up destinations that have outstanding PDUs to be caught up self.clock.looping_call_now( self.hs.run_as_background_process, - WAKEUP_RETRY_PERIOD_SEC * 1000.0, + WAKEUP_RETRY_PERIOD, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) @@ -1161,4 +1162,4 @@ class FederationSender(AbstractFederationSender): last_processed, ) self.wake_destination(destination) - await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC) + await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS) diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index bc50efa1a7..ba40d5763e 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -28,6 +28,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces from synapse.types import UserID from synapse.util import stringutils from synapse.util.async_helpers import delay_cancellation +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -73,7 +74,7 @@ class AccountValidityHandler: # Check the renewal emails to send and send them every 30min. if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000) + self.clock.looping_call(self._send_renewal_emails, Duration(minutes=30)) async def is_user_expired(self, user_id: str) -> bool: """Checks if a user has expired against third-party modules. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d9355d33da..b5c0cbdba2 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -74,6 +74,7 @@ from synapse.storage.databases.main.registration import ( from synapse.types import JsonDict, Requester, StrCollection, UserID from synapse.util import stringutils as stringutils from synapse.util.async_helpers import delay_cancellation, maybe_awaitable +from synapse.util.duration import Duration from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.stringutils import base62_encode from synapse.util.threepids import canonicalise_email @@ -242,7 +243,7 @@ class AuthHandler: if hs.config.worker.run_background_tasks: self._clock.looping_call( run_as_background_process, - 5 * 60 * 1000, + Duration(minutes=5), "expire_old_sessions", self.server_name, self._expire_old_sessions, diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index 8817b65316..cb0a4dd6b2 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -42,6 +42,7 @@ from synapse.types import ( UserID, create_requester, ) +from synapse.util.duration import Duration from synapse.util.events import generate_fake_event_id from synapse.util.metrics import Measure from synapse.util.sentinel import Sentinel @@ -92,7 +93,7 @@ class DelayedEventsHandler: # Kick off again (without blocking) to catch any missed notifications # that may have fired before the callback was added. self._clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) @@ -508,17 +509,17 @@ class DelayedEventsHandler: def _schedule_next_at(self, next_send_ts: Timestamp) -> None: delay = next_send_ts - self._get_current_ts() - delay_sec = delay / 1000 if delay > 0 else 0 + delay_duration = Duration(milliseconds=max(delay, 0)) if self._next_delayed_event_call is None: self._next_delayed_event_call = self._clock.call_later( - delay_sec, + delay_duration, self.hs.run_as_background_process, "_send_on_timeout", self._send_on_timeout, ) else: - self._next_delayed_event_call.reset(delay_sec) + self._next_delayed_event_call.reset(delay_duration.as_secs()) async def get_all_for_user(self, requester: Requester) -> list[JsonDict]: """Return all pending delayed events requested by the given user.""" diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 3f1a5fe6d6..1b7de57ab9 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -71,6 +71,7 @@ from synapse.util import stringutils from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.metrics import measure_func from synapse.util.retryutils import ( NotRetryingDestination, @@ -85,7 +86,7 @@ logger = logging.getLogger(__name__) DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 -DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 +DELETE_STALE_DEVICES_INTERVAL = Duration(days=1) def _check_device_name_length(name: str | None) -> None: @@ -186,7 +187,7 @@ class DeviceHandler: ): self.clock.looping_call( self.hs.run_as_background_process, - DELETE_STALE_DEVICES_INTERVAL_MS, + DELETE_STALE_DEVICES_INTERVAL, desc="delete_stale_devices", func=self._delete_stale_devices, ) @@ -915,7 +916,7 @@ class DeviceHandler: ) DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 - DEVICE_MSGS_DELETE_SLEEP_MS = 100 + DEVICE_MSGS_DELETE_SLEEP = Duration(milliseconds=100) async def _delete_device_messages( self, @@ -941,9 +942,7 @@ class DeviceHandler: if from_stream_id is None: return TaskStatus.COMPLETE, None, None - await self.clock.sleep( - DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0 - ) + await self.clock.sleep(DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP) class DeviceWriterHandler(DeviceHandler): @@ -1469,7 +1468,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): self._resync_retry_lock = Lock() self.clock.looping_call( self.hs.run_as_background_process, - 30 * 1000, + Duration(seconds=30), func=self._maybe_retry_device_resync, desc="_maybe_retry_device_resync", ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 41d27d47da..64f705a3da 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -46,6 +46,7 @@ from synapse.types import ( ) from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.json import json_decoder from synapse.util.retryutils import ( NotRetryingDestination, @@ -1634,7 +1635,7 @@ class E2eKeysHandler: # matrix.org has about 15M users in the e2e_one_time_keys_json table # (comprising 20M devices). We want this to take about a week, so we need # to do about one batch of 100 users every 4 seconds. - await self.clock.sleep(4) + await self.clock.sleep(Duration(seconds=4)) def _check_cross_signing_key( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1bba3fc758..7808f8928b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -72,6 +72,7 @@ from synapse.storage.invite_rule import InviteRule from synapse.types import JsonDict, StrCollection, get_domain_from_id from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer +from synapse.util.duration import Duration from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -1972,7 +1973,9 @@ class FederationHandler: logger.warning( "%s; waiting for %d ms...", e, e.retry_after_ms ) - await self.clock.sleep(e.retry_after_ms / 1000) + await self.clock.sleep( + Duration(milliseconds=e.retry_after_ms) + ) # Success, no need to try the rest of the destinations. break diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 01e98f60ad..e314180e12 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -91,6 +91,7 @@ from synapse.types import ( ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter, partition, sorted_topologically from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1802,7 +1803,7 @@ class FederationEventHandler: # the reactor. For large rooms let's yield to the reactor # occasionally to ensure we don't block other work. if (i + 1) % 1000 == 0: - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) # Also persist the new event in batches for similar reasons as above. for batch in batch_iter(events_and_contexts_to_persist, 1000): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7679303a36..bac4bd9361 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -83,6 +83,7 @@ from synapse.types.state import StateFilter from synapse.util import log_failure, unwrapFirstError from synapse.util.async_helpers import Linearizer, gather_results from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.json import json_decoder, json_encoder from synapse.util.metrics import measure_func from synapse.visibility import get_effective_room_visibility_from_state @@ -433,14 +434,11 @@ class MessageHandler: # Figure out how many seconds we need to wait before expiring the event. now_ms = self.clock.time_msec() - delay = (expiry_ts - now_ms) / 1000 + delay = Duration(milliseconds=max(expiry_ts - now_ms, 0)) - # callLater doesn't support negative delays, so trim the delay to 0 if we're - # in that case. - if delay < 0: - delay = 0 - - logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay) + logger.info( + "Scheduling expiry for event %s in %.3fs", event_id, delay.as_secs() + ) self._scheduled_expiry = self.clock.call_later( delay, @@ -551,7 +549,7 @@ class EventCreationHandler: "send_dummy_events_to_fill_extremities", self._send_dummy_events_to_fill_extremities, ), - 5 * 60 * 1000, + Duration(minutes=5), ) self._message_handler = hs.get_message_handler() @@ -1012,7 +1010,7 @@ class EventCreationHandler: if not ignore_shadow_ban and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() room_version = None @@ -1515,7 +1513,7 @@ class EventCreationHandler: and requester.shadow_banned ): # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() if event.is_state(): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a90ed3193c..f869a41c5e 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -42,6 +42,7 @@ from synapse.types import ( from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock +from synapse.util.duration import Duration from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -116,7 +117,7 @@ class PaginationHandler: self.clock.looping_call( self.hs.run_as_background_process, - job.interval, + Duration(milliseconds=job.interval), "purge_history_for_rooms_in_range", self.purge_history_for_rooms_in_range, job.shortest_max_lifetime, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ca5002cab3..4c3adca46e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -121,6 +121,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -203,7 +204,7 @@ EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 # Delay before a worker tells the presence handler that a user has stopped # syncing. -UPDATE_SYNCING_USERS_MS = 10 * 1000 +UPDATE_SYNCING_USERS = Duration(seconds=10) assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER @@ -528,7 +529,7 @@ class WorkerPresenceHandler(BasePresenceHandler): self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs) - self.clock.looping_call(self.send_stop_syncing, UPDATE_SYNCING_USERS_MS) + self.clock.looping_call(self.send_stop_syncing, UPDATE_SYNCING_USERS) hs.register_async_shutdown_handler( phase="before", @@ -581,7 +582,7 @@ class WorkerPresenceHandler(BasePresenceHandler): for (user_id, device_id), last_sync_ms in list( self._user_devices_going_offline.items() ): - if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: + if now - last_sync_ms > UPDATE_SYNCING_USERS.as_millis(): self._user_devices_going_offline.pop((user_id, device_id), None) self.send_user_sync(user_id, device_id, False, last_sync_ms) @@ -861,20 +862,20 @@ class PresenceHandler(BasePresenceHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 30, + Duration(seconds=30), self.clock.looping_call, self._handle_timeouts, - 5000, + Duration(seconds=5), ) # Presence information is persisted, whether or not it is being tracked # internally. if self._presence_enabled: self.clock.call_later( - 60, + Duration(minutes=1), self.clock.looping_call, self._persist_unpersisted_changes, - 60 * 1000, + Duration(minutes=1), ) presence_wheel_timer_size_gauge.register_hook( @@ -2430,7 +2431,7 @@ class PresenceFederationQueue: _KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000 # How often to check if we can expire entries from the queue. - _CLEAR_ITEMS_EVERY_MS = 60 * 1000 + _CLEAR_ITEMS_EVERY_MS = Duration(minutes=1) def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler): self._clock = hs.get_clock() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 59904cd995..8f16ae6dec 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -34,6 +34,7 @@ from synapse.api.errors import ( from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia from synapse.types import JsonDict, JsonValue, Requester, UserID, create_requester from synapse.util.caches.descriptors import cached +from synapse.util.duration import Duration from synapse.util.stringutils import parse_and_validate_mxc_uri if TYPE_CHECKING: @@ -583,7 +584,7 @@ class ProfileHandler: # Do not actually update the room state for shadow-banned users. if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) return room_ids = await self.store.get_rooms_for_user(target_user.to_string()) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d62ad5393f..1026bfd876 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -92,6 +92,7 @@ from synapse.types.state import StateFilter from synapse.util import stringutils from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.stringutils import parse_and_validate_server_name from synapse.visibility import filter_events_for_client @@ -1179,7 +1180,7 @@ class RoomCreationHandler: if (invite_list or invite_3pid_list) and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) # Allow the request to go through, but remove any associated invites. invite_3pid_list = [] diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d5f72c1732..6f8481de9a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -66,6 +66,7 @@ from synapse.types import ( from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_left_room +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -642,7 +643,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if action == Membership.INVITE and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() key = (room_id,) @@ -1647,7 +1648,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() # We need to rate limit *before* we send out any 3PID invites, so we @@ -2190,7 +2191,7 @@ class RoomForgetterHandler(StateDeltasHandler): # We kick this off to pick up outstanding work from before the last restart. self._clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) @@ -2232,7 +2233,7 @@ class RoomForgetterHandler(StateDeltasHandler): # # We wait for a short time so that we don't "tight" loop just # keeping the table up to date. - await self._clock.sleep(0.5) + await self._clock.sleep(Duration(milliseconds=500)) self.pos = self._store.get_room_max_stream_ordering() await self._store.update_room_forgetter_stream_pos(self.pos) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 6d661453ac..c87b5f854a 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -32,6 +32,7 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import JsonDict +from synapse.util.duration import Duration from synapse.util.events import get_plain_text_topic_from_event_content if TYPE_CHECKING: @@ -72,7 +73,7 @@ class StatsHandler: # We kick this off so that we don't have to wait for a change before # we start populating stats self.clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8b577d5d58..e66396fecc 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -41,6 +41,7 @@ from synapse.types import ( UserID, ) from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.wheel_timer import WheelTimer @@ -60,15 +61,15 @@ class RoomMember: # How often we expect remote servers to resend us presence. -FEDERATION_TIMEOUT = 60 * 1000 +FEDERATION_TIMEOUT = Duration(minutes=1) # How often to resend typing across federation. -FEDERATION_PING_INTERVAL = 40 * 1000 +FEDERATION_PING_INTERVAL = Duration(seconds=40) # How long to remember a typing notification happened in a room before # forgetting about it. -FORGET_TIMEOUT = 10 * 60 * 1000 +FORGET_TIMEOUT = Duration(minutes=10) class FollowerTypingHandler: @@ -106,7 +107,7 @@ class FollowerTypingHandler: self._rooms_updated: set[str] = set() - self.clock.looping_call(self._handle_timeouts, 5000) + self.clock.looping_call(self._handle_timeouts, Duration(seconds=5)) self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT) def _reset(self) -> None: @@ -141,7 +142,10 @@ class FollowerTypingHandler: # user. if self.federation and self.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) - if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: + if ( + not last_fed_poke + or last_fed_poke + FEDERATION_PING_INTERVAL.as_millis() <= now + ): self.hs.run_as_background_process( "typing._push_remote", self._push_remote, @@ -165,7 +169,7 @@ class FollowerTypingHandler: now = self.clock.time_msec() self.wheel_timer.insert( - now=now, obj=member, then=now + FEDERATION_PING_INTERVAL + now=now, obj=member, then=now + FEDERATION_PING_INTERVAL.as_millis() ) hosts: StrCollection = ( @@ -315,7 +319,7 @@ class TypingWriterHandler(FollowerTypingHandler): if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() await self.auth.check_user_in_room(room_id, requester) @@ -350,7 +354,7 @@ class TypingWriterHandler(FollowerTypingHandler): if requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) + await self.clock.sleep(Duration(seconds=random.randint(1, 10))) raise ShadowBanError() await self.auth.check_user_in_room(room_id, requester) @@ -428,8 +432,10 @@ class TypingWriterHandler(FollowerTypingHandler): if user.domain in domains: logger.info("Got typing update from %s: %r", user_id, content) now = self.clock.time_msec() - self._member_typing_until[member] = now + FEDERATION_TIMEOUT - self.wheel_timer.insert(now=now, obj=member, then=now + FEDERATION_TIMEOUT) + self._member_typing_until[member] = now + FEDERATION_TIMEOUT.as_millis() + self.wheel_timer.insert( + now=now, obj=member, then=now + FEDERATION_TIMEOUT.as_millis() + ) self._push_update_local(member=member, typing=content["typing"]) def _push_update_local(self, member: RoomMember, typing: bool) -> None: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e5210a3e97..36b037e8e1 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -40,6 +40,7 @@ from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo from synapse.types import UserID +from synapse.util.duration import Duration from synapse.util.metrics import Measure from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import non_null_str_or_none @@ -52,7 +53,7 @@ logger = logging.getLogger(__name__) # Don't refresh a stale user directory entry, using a Federation /profile request, # for 60 seconds. This gives time for other state events to arrive (which will # then be coalesced such that only one /profile request is made). -USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000 +USER_DIRECTORY_STALE_REFRESH_TIME = Duration(minutes=1) # Maximum number of remote servers that we will attempt to refresh profiles for # in one go. @@ -60,7 +61,7 @@ MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5 # As long as we have servers to refresh (without backoff), keep adding more # every 15 seconds. -INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15 +INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = Duration(seconds=15) def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int: @@ -137,13 +138,13 @@ class UserDirectoryHandler(StateDeltasHandler): # We kick this off so that we don't have to wait for a change before # we start populating the user directory self.clock.call_later( - 0, + Duration(seconds=0), self.notify_new_event, ) # Kick off the profile refresh process on startup self._refresh_remote_profiles_call_later = self.clock.call_later( - 10, + Duration(seconds=10), self.kick_off_remote_profile_refresh_process, ) @@ -550,7 +551,7 @@ class UserDirectoryHandler(StateDeltasHandler): now_ts = self.clock.time_msec() await self.store.set_remote_user_profile_in_user_dir_stale( user_id, - next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS, + next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME.as_millis(), retry_counter=0, ) # Schedule a wake-up to refresh the user directory for this server. @@ -558,13 +559,13 @@ class UserDirectoryHandler(StateDeltasHandler): # other servers ahead of it in the queue to get in the way of updating # the profile if the server only just sent us an event. self.clock.call_later( - USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + USER_DIRECTORY_STALE_REFRESH_TIME + Duration(seconds=1), self.kick_off_remote_profile_refresh_process_for_remote_server, UserID.from_string(user_id).domain, ) # Schedule a wake-up to handle any backoffs that may occur in the future. self.clock.call_later( - 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + USER_DIRECTORY_STALE_REFRESH_TIME * 2 + Duration(seconds=1), self.kick_off_remote_profile_refresh_process, ) return @@ -656,7 +657,9 @@ class UserDirectoryHandler(StateDeltasHandler): if not users: return _, _, next_try_at_ts = users[0] - delay = ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2 + delay = Duration( + milliseconds=next_try_at_ts - self.clock.time_msec() + ) + Duration(seconds=2) self._refresh_remote_profiles_call_later = self.clock.call_later( delay, self.kick_off_remote_profile_refresh_process, diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 4f9c632f5c..1537a18cc0 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -72,7 +72,7 @@ class WorkerLocksHandler: # that lock. self._locks: dict[tuple[str, str], WeakSet[WaitingLock | WaitingMultiLock]] = {} - self._clock.looping_call(self._cleanup_locks, 30_000) + self._clock.looping_call(self._cleanup_locks, Duration(seconds=30)) self._notifier.add_lock_released_callback(self._on_lock_released) @@ -187,7 +187,7 @@ class WorkerLocksHandler: lock.release_lock() self._clock.call_later( - 0, + Duration(seconds=0), _wake_all_locks, locks, ) diff --git a/synapse/http/client.py b/synapse/http/client.py index cb9b8cd683..f0b9201086 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -87,6 +87,7 @@ from synapse.metrics import SERVER_NAME_LABEL from synapse.types import ISynapseReactor, StrSequence from synapse.util.async_helpers import timeout_deferred from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.json import json_decoder if TYPE_CHECKING: @@ -161,7 +162,9 @@ def _is_ip_blocked( return False -_EPSILON = 0.00000001 +# The delay used by the scheduler to schedule tasks "as soon as possible", while +# still allowing other tasks to run between runs. +_EPSILON = Duration(microseconds=1) def _make_scheduler(clock: Clock) -> Callable[[Callable[[], object]], IDelayedCall]: diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index ec72e178c9..303b3856a2 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -37,6 +37,7 @@ from synapse.logging.context import make_deferred_yieldable from synapse.types import ISynapseThreadlessReactor from synapse.util.caches.ttlcache import TTLCache from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.json import json_decoder from synapse.util.metrics import Measure @@ -315,7 +316,7 @@ class WellKnownResolver: logger.info("Error fetching %s: %s. Retrying", uri_str, e) # Sleep briefly in the hopes that they come back up - await self._clock.sleep(0.5) + await self._clock.sleep(Duration(milliseconds=500)) def _cache_period_from_headers( diff --git a/synapse/http/server.py b/synapse/http/server.py index 5f4e7484fd..226cb00831 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -76,6 +76,7 @@ from synapse.logging.opentracing import active_span, start_active_span, trace_se from synapse.util.caches import intern_dict from synapse.util.cancellation import is_function_cancellable from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.iterutils import chunk_seq from synapse.util.json import json_encoder @@ -334,7 +335,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): callback_return = await self._async_render(request) except LimitExceededError as e: if e.pause: - await self._clock.sleep(e.pause) + await self._clock.sleep(Duration(seconds=e.pause)) raise if callback_return is not None: diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 7b4408b2bc..29c5e66ec4 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -70,6 +70,7 @@ from synapse.media.url_previewer import UrlPreviewer from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia from synapse.types import UserID from synapse.util.async_helpers import Linearizer +from synapse.util.duration import Duration from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import random_string @@ -80,10 +81,10 @@ logger = logging.getLogger(__name__) # How often to run the background job to update the "recently accessed" # attribute of local and remote media. -UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 # 1 minute +UPDATE_RECENTLY_ACCESSED_TS = Duration(minutes=1) # How often to run the background job to check for local and remote media # that should be purged according to the configured media retention settings. -MEDIA_RETENTION_CHECK_PERIOD_MS = 60 * 60 * 1000 # 1 hour +MEDIA_RETENTION_CHECK_PERIOD = Duration(hours=1) class MediaRepository: @@ -166,7 +167,7 @@ class MediaRepository: # with the duration between runs dictated by the homeserver config. self.clock.looping_call( self._start_apply_media_retention_rules, - MEDIA_RETENTION_CHECK_PERIOD_MS, + MEDIA_RETENTION_CHECK_PERIOD, ) if hs.config.media.url_preview_enabled: @@ -485,7 +486,7 @@ class MediaRepository: if now >= wait_until: break - await self.clock.sleep(0.5) + await self.clock.sleep(Duration(milliseconds=500)) logger.info("Media %s has not yet been uploaded", media_id) self.respond_not_yet_uploaded(request) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index bc12212c46..e83869bf4d 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -51,6 +51,7 @@ from synapse.logging.context import defer_to_thread, run_in_background from synapse.logging.opentracing import start_active_span, trace, trace_with_opname from synapse.media._base import ThreadedFileSender from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.file_consumer import BackgroundFileConsumer from ..types import JsonDict @@ -457,7 +458,7 @@ class ReadableFileWrapper: callback(chunk) # We yield to the reactor by sleeping for 0 seconds. - await self.clock.sleep(0) + await self.clock.sleep(Duration(seconds=0)) @implementer(interfaces.IConsumer) @@ -652,7 +653,7 @@ class MultipartFileConsumer: self.paused = False while not self.paused: producer.resumeProducing() - await self.clock.sleep(0) + await self.clock.sleep(Duration(seconds=0)) class Header: diff --git a/synapse/media/url_previewer.py b/synapse/media/url_previewer.py index bbd8017b13..2c5e518918 100644 --- a/synapse/media/url_previewer.py +++ b/synapse/media/url_previewer.py @@ -47,6 +47,7 @@ from synapse.media.preview_html import decode_body, parse_html_to_open_graph from synapse.types import JsonDict, UserID from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.json import json_encoder from synapse.util.stringutils import random_string @@ -208,7 +209,9 @@ class UrlPreviewer: ) if self._worker_run_media_background_jobs: - self.clock.looping_call(self._start_expire_url_cache_data, 10 * 1000) + self.clock.looping_call( + self._start_expire_url_cache_data, Duration(seconds=10) + ) async def preview(self, url: str, user: UserID, ts: int) -> bytes: # the in-memory cache: diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py index 3f38412fa7..bd27a9ca9f 100644 --- a/synapse/metrics/common_usage_metrics.py +++ b/synapse/metrics/common_usage_metrics.py @@ -23,6 +23,7 @@ from typing import TYPE_CHECKING import attr from synapse.metrics import SERVER_NAME_LABEL +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -70,7 +71,7 @@ class CommonUsageMetricsManager: ) self._clock.looping_call( self._hs.run_as_background_process, - 5 * 60 * 1000, + Duration(minutes=5), desc="common_usage_metrics_update_gauges", func=self._update_gauges, ) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 6a2d152e3f..0580f3665c 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -158,6 +158,7 @@ from synapse.types.state import StateFilter from synapse.util.async_helpers import maybe_awaitable from synapse.util.caches.descriptors import CachedFunction, cached as _cached from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.frozenutils import freeze if TYPE_CHECKING: @@ -1389,7 +1390,7 @@ class ModuleApi: if self._hs.config.worker.run_background_tasks or run_on_all_instances: self._clock.looping_call( self._hs.run_as_background_process, - msec, + Duration(milliseconds=msec), desc, lambda: maybe_awaitable(f(*args, **kwargs)), ) @@ -1444,8 +1445,7 @@ class ModuleApi: desc = f.__name__ return self._clock.call_later( - # convert ms to seconds as needed by call_later. - msec * 0.001, + Duration(milliseconds=msec), self._hs.run_as_background_process, desc, lambda: maybe_awaitable(f(*args, **kwargs)), @@ -1457,7 +1457,7 @@ class ModuleApi: Added in Synapse v1.49.0. """ - await self._clock.sleep(seconds) + await self._clock.sleep(Duration(seconds=seconds)) async def send_http_push_notification( self, diff --git a/synapse/notifier.py b/synapse/notifier.py index 260a2c0d87..d8d2db17f1 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -61,6 +61,7 @@ from synapse.types import ( from synapse.util.async_helpers import ( timeout_deferred, ) +from synapse.util.duration import Duration from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_client @@ -235,7 +236,7 @@ class Notifier: Primarily used from the /events stream. """ - UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 + UNUSED_STREAM_EXPIRY = Duration(minutes=10) def __init__(self, hs: "HomeServer"): self.user_to_user_stream: dict[str, _NotifierUserStream] = {} @@ -269,9 +270,7 @@ class Notifier: self.state_handler = hs.get_state_handler() - self.clock.looping_call( - self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS - ) + self.clock.looping_call(self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY) # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at @@ -861,7 +860,7 @@ class Notifier: logged = True # TODO: be better - await self.clock.sleep(0.5) + await self.clock.sleep(Duration(milliseconds=500)) async def _get_room_ids( self, user: UserID, explicit_room_id: str | None @@ -889,7 +888,7 @@ class Notifier: def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] - expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS + expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY.as_millis() for stream in self.user_to_user_stream.values(): if stream.count_listeners(): continue diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 36dc9bf6fc..ce4a2102e4 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -29,6 +29,7 @@ from synapse.push import Pusher, PusherConfig, PusherConfigException, ThrottlePa from synapse.push.mailer import Mailer from synapse.push.push_types import EmailReason from synapse.storage.databases.main.event_push_actions import EmailPushAction +from synapse.util.duration import Duration from synapse.util.threepids import validate_email if TYPE_CHECKING: @@ -229,7 +230,7 @@ class EmailPusher(Pusher): if soonest_due_at is not None: delay = self.seconds_until(soonest_due_at) self.timed_call = self.hs.get_clock().call_later( - delay, + Duration(seconds=delay), self.on_timer, ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index edcabf0c29..1e7e742ddd 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -40,6 +40,7 @@ from . import push_tools if TYPE_CHECKING: from synapse.server import HomeServer +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -336,7 +337,7 @@ class HttpPusher(Pusher): else: logger.info("Push failed: delaying for %ds", self.backoff_delay) self.timed_call = self.hs.get_clock().call_later( - self.backoff_delay, + Duration(seconds=self.backoff_delay), self.on_timer, ) self.backoff_delay = min( @@ -371,7 +372,7 @@ class HttpPusher(Pusher): delay_ms = random.randint(1, self.push_jitter_delay_ms) diff_ms = event.origin_server_ts + delay_ms - self.clock.time_msec() if diff_ms > 0: - await self.clock.sleep(diff_ms / 1000) + await self.clock.sleep(Duration(milliseconds=diff_ms)) rejected = await self.dispatch_push_event(event, tweaks, badge) if rejected is False: diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index d76b40cf39..2bab9c2d71 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -42,6 +42,7 @@ from synapse.metrics import SERVER_NAME_LABEL from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache from synapse.util.cancellation import is_function_cancellable +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -317,7 +318,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. - await clock.sleep(1) + await clock.sleep(Duration(seconds=1)) except (ConnectError, DNSLookupError) as e: if not cls.RETRY_ON_CONNECT_ERROR: raise @@ -332,7 +333,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): e, ) - await clock.sleep(delay) + await clock.sleep(Duration(seconds=delay)) attempts += 1 except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 297feb0049..fdda932ead 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import ( ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -173,7 +174,7 @@ class ReplicationDataHandler: ) # Yield to reactor so that we don't block. - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 3068e60af0..489a2c76a6 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -55,6 +55,7 @@ from synapse.replication.tcp.commands import ( parse_command_from_line, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -193,7 +194,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._send_pending_commands() # Starts sending pings - self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000) + self._send_ping_loop = self.clock.looping_call( + self.send_ping, Duration(seconds=5) + ) # Always send the initial PING so that the other side knows that they # can time us out. diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 27d43e6fba..93ba48b406 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -53,6 +53,7 @@ from synapse.replication.tcp.protocol import ( tcp_inbound_commands_counter, tcp_outbound_commands_counter, ) +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.replication.tcp.handler import ReplicationCommandHandler @@ -317,7 +318,7 @@ class SynapseRedisFactory(RedisFactory): self.hs = hs # nb must be called this for @wrap_as_background_process self.server_name = hs.hostname - hs.get_clock().looping_call(self._send_ping, 30 * 1000) + hs.get_clock().looping_call(self._send_ping, Duration(seconds=30)) @wrap_as_background_process("redis_ping") async def _send_ping(self) -> None: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 134d8d921f..36dd39ed67 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -34,6 +34,7 @@ from synapse.replication.tcp.commands import PositionCommand from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token +from synapse.util.duration import Duration from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -116,7 +117,7 @@ class ReplicationStreamer: # # Note that if the position hasn't advanced then we won't send anything. if any(EventsStream.NAME == s.NAME for s in self.streams): - self.clock.looping_call(self.on_notifier_poke, 1000) + self.clock.looping_call(self.on_notifier_poke, Duration(seconds=1)) def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index b052052be0..3cb1e09f44 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -58,6 +58,7 @@ from synapse.types.rest.client import ( EmailRequestTokenBody, MsisdnRequestTokenBody, ) +from synapse.util.duration import Duration from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.stringutils import assert_valid_client_secret, random_string from synapse.util.threepids import check_3pid_allowed, validate_email @@ -125,7 +126,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND) @@ -383,7 +386,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) @@ -449,7 +454,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} logger.info("MSISDN %s is already in use by %s", msisdn, existing_user_id) diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index 9503446b92..fdd2f1985a 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -59,6 +59,7 @@ from synapse.http.site import SynapseRequest from synapse.metrics import SERVER_NAME_LABEL, threepid_send_requests from synapse.push.mailer import Mailer from synapse.types import JsonDict +from synapse.util.duration import Duration from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.stringutils import assert_valid_client_secret, random_string @@ -150,7 +151,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. await self.already_in_use_mailer.send_already_in_use_mail(email) - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE) @@ -219,7 +222,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): # comments for request_token_inhibit_3pid_errors. # Also wait for some random amount of time between 100ms and 1s to make it # look like we did something. - await self.hs.get_clock().sleep(random.randint(1, 10) / 10) + await self.hs.get_clock().sleep( + Duration(milliseconds=random.randint(100, 1000)) + ) return 200, {"sid": random_string(16)} raise SynapseError( diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 0c1ac1f11b..43c7b6f993 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -57,7 +57,7 @@ class HttpTransactionCache: ] = {} # Try to clean entries every 30 mins. This means entries will exist # for at *LEAST* 30 mins, and at *MOST* 60 mins. - self.clock.looping_call(self._cleanup, CLEANUP_PERIOD.as_millis()) + self.clock.looping_call(self._cleanup, CLEANUP_PERIOD) def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable: """A helper function which returns a transaction key that can be used diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 9fc49be4b1..a92233c863 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -54,6 +54,7 @@ from synapse.types import StateMap, StrCollection from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.duration import Duration from synapse.util.metrics import Measure, measure_func from synapse.util.stringutils import shortstr @@ -663,7 +664,7 @@ class StateResolutionHandler: _StateResMetrics ) - self.clock.looping_call(self._report_metrics, 120 * 1000) + self.clock.looping_call(self._report_metrics, Duration(minutes=2)) async def resolve_state_groups( self, diff --git a/synapse/state/v2.py b/synapse/state/v2.py index c410c3a7ec..1241a4d66e 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -40,6 +40,7 @@ from synapse.api.room_versions import RoomVersion, StateResolutionVersions from synapse.events import EventBase, is_creator from synapse.storage.databases.main.event_federation import StateDifference from synapse.types import MutableStateMap, StateMap, StrCollection +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -48,7 +49,7 @@ class Clock(Protocol): # This is usually synapse.util.Clock, but it's replaced with a FakeClock in tests. # We only ever sleep(0) though, so that other async functions can make forward # progress without waiting for stateres to complete. - async def sleep(self, duration_ms: float) -> None: ... + async def sleep(self, duration: Duration) -> None: ... class StateResolutionStore(Protocol): @@ -639,7 +640,7 @@ async def _reverse_topological_power_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) event_to_pl = {} for idx, event_id in enumerate(graph, start=1): @@ -651,7 +652,7 @@ async def _reverse_topological_power_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) def _get_power_order(event_id: str) -> tuple[int, int, str]: ev = event_map[event_id] @@ -745,7 +746,7 @@ async def _iterative_auth_checks( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) return resolved_state @@ -796,7 +797,7 @@ async def _mainline_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx != 0 and idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) idx += 1 @@ -814,7 +815,7 @@ async def _mainline_sort( # We await occasionally when we're working with large data sets to # ensure that we don't block the reactor loop for too long. if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) event_ids.sort(key=lambda ev_id: order_map[ev_id]) @@ -865,7 +866,7 @@ async def _get_mainline_depth_for_event( idx += 1 if idx % _AWAIT_AFTER_ITERATIONS == 0: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) # Didn't find a power level auth event, so we just return 0 return 0 diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index c71bcdb7fb..311534c5e7 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -40,6 +40,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.types import Connection, Cursor from synapse.types import JsonDict, StrCollection from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.json import json_encoder from . import engines @@ -162,7 +163,7 @@ class _BackgroundUpdateContextManager: async def __aenter__(self) -> int: if self._sleep: - await self._clock.sleep(self._sleep_duration_ms / 1000) + await self._clock.sleep(Duration(milliseconds=self._sleep_duration_ms)) return self._update_duration_ms diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 4ca3f8f4e1..8a2053d25a 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -32,6 +32,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces from synapse.storage.database import LoggingTransaction from synapse.storage.databases import Databases from synapse.types.storage import _BackgroundUpdates +from synapse.util.duration import Duration from synapse.util.stringutils import shortstr if TYPE_CHECKING: @@ -50,7 +51,7 @@ class PurgeEventsStorageController: if hs.config.worker.run_background_tasks: self._delete_state_loop_call = hs.get_clock().looping_call( - self._delete_state_groups_loop, 60 * 1000 + self._delete_state_groups_loop, Duration(minutes=1) ) self.stores.state.db_pool.updates.register_background_update_handler( diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 18f0eac585..2d5e1d3c48 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -62,6 +62,7 @@ from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3E from synapse.storage.types import Connection, Cursor, SQLQueryParameters from synapse.types import StrCollection from synapse.util.async_helpers import delay_cancellation +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -631,7 +632,7 @@ class DatabasePool: # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. self._clock.call_later( - 0.0, + Duration(seconds=0), self.hs.run_as_background_process, "upsert_safety_check", self._check_safe_to_upsert, @@ -679,7 +680,7 @@ class DatabasePool: # If there's any updates still running, reschedule to run. if background_update_names: self._clock.call_later( - 15.0, + Duration(seconds=15), self.hs.run_as_background_process, "upsert_safety_check", self._check_safe_to_upsert, @@ -706,7 +707,7 @@ class DatabasePool: "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) - self._clock.looping_call(loop, 10000) + self._clock.looping_call(loop, Duration(seconds=10)) def new_transaction( self, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index b7b9b42461..a4530796f2 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -45,6 +45,7 @@ from synapse.storage.database import ( from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import CachedFunction +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -71,11 +72,11 @@ GET_E2E_CROSS_SIGNING_SIGNATURES_FOR_DEVICE_CACHE_NAME = ( # How long between cache invalidation table cleanups, once we have caught up # with the backlog. -REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") +REGULAR_CLEANUP_INTERVAL = Duration(hours=1) # How long between cache invalidation table cleanups, before we have caught # up with the backlog. -CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") +CATCH_UP_CLEANUP_INTERVAL = Duration(minutes=1) # Maximum number of cache invalidation rows to delete at once. CLEAN_UP_MAX_BATCH_SIZE = 20_000 @@ -139,7 +140,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.database_engine, PostgresEngine ): self.hs.get_clock().call_later( - CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + CATCH_UP_CLEANUP_INTERVAL, self._clean_up_cache_invalidation_wrapper, ) @@ -825,12 +826,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore): # Vary how long we wait before calling again depending on whether we # are still sifting through backlog or we have caught up. if in_backlog: - next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + next_interval = CATCH_UP_CLEANUP_INTERVAL else: - next_interval = REGULAR_CLEANUP_INTERVAL_MS + next_interval = REGULAR_CLEANUP_INTERVAL self.hs.get_clock().call_later( - next_interval / 1000, + next_interval, self._clean_up_cache_invalidation_wrapper, ) diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 5d667a5345..a5ae4bf506 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -32,6 +32,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.util.duration import Duration from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -54,7 +55,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase hs.config.worker.run_background_tasks and self.hs.config.server.redaction_retention_period is not None ): - hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000) + hs.get_clock().looping_call(self._censor_redactions, Duration(minutes=5)) @wrap_as_background_process("_censor_redactions") async def _censor_redactions(self) -> None: diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 4948d0c286..7cd3667a2b 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -42,6 +42,7 @@ from synapse.storage.databases.main.monthly_active_users import ( ) from synapse.types import JsonDict, UserID from synapse.util.caches.lrucache import LruCache +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -437,7 +438,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke ) if hs.config.worker.run_background_tasks and self.user_ips_max_age: - self.clock.looping_call(self._prune_old_user_ips, 5 * 1000) + self.clock.looping_call(self._prune_old_user_ips, Duration(seconds=5)) if self._update_on_this_worker: # This is the designated worker that can write to the client IP @@ -448,7 +449,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke tuple[str, str, str], tuple[str, str | None, int] ] = {} - self.clock.looping_call(self._update_client_ips_batch, 5 * 1000) + self.clock.looping_call(self._update_client_ips_batch, Duration(seconds=5)) hs.register_async_shutdown_handler( phase="before", eventType="shutdown", diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 28e706d5c3..fc61f46c1c 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -152,7 +152,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): if hs.config.worker.run_background_tasks: self.clock.looping_call( run_as_background_process, - DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL.as_millis(), + DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL, "_delete_old_federation_inbox_rows", self.server_name, self._delete_old_federation_inbox_rows, @@ -1029,7 +1029,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We sleep a bit so that we don't hammer the database in a tight # loop first time we run this. - await self.clock.sleep(1) + await self.clock.sleep(Duration(seconds=1)) async def get_devices_with_messages( self, user_id: str, device_ids: StrCollection diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index caae2a0648..cbad40faf7 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -62,6 +62,7 @@ from synapse.types import ( from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.json import json_decoder, json_encoder from synapse.util.stringutils import shortstr @@ -191,7 +192,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): if hs.config.worker.run_background_tasks: self.clock.looping_call( - self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + self._prune_old_outbound_device_pokes, Duration(hours=1) ) def process_replication_rows( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index b2f0aeaf58..cc7083b605 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -56,6 +56,7 @@ from synapse.types import JsonDict, StrCollection from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.json import json_encoder @@ -155,7 +156,7 @@ class EventFederationWorkerStore( if hs.config.worker.run_background_tasks: hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + self._delete_old_forward_extrem_cache, Duration(hours=1) ) # Cache of event ID to list of auth event IDs and their depths. @@ -171,7 +172,9 @@ class EventFederationWorkerStore( # index. self.tests_allow_no_chain_cover_index = True - self.clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) + self.clock.looping_call( + self._get_stats_for_federation_staging, Duration(seconds=30) + ) if isinstance(self.database_engine, PostgresEngine): self.db_pool.updates.register_background_validate_constraint_and_delete_rows( diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2e99d7314e..a66caa672c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -105,6 +105,7 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.types import JsonDict, StrCollection from synapse.util.caches.descriptors import cached +from synapse.util.duration import Duration from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -270,15 +271,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas self._find_stream_orderings_for_times_txn(cur) cur.close() - self.clock.looping_call(self._find_stream_orderings_for_times, 10 * 60 * 1000) + self.clock.looping_call( + self._find_stream_orderings_for_times, Duration(minutes=10) + ) self._rotate_count = 10000 self._doing_notif_rotation = False if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._rotate_notifs, 30 * 1000) + self.clock.looping_call(self._rotate_notifs, Duration(seconds=30)) self.clock.looping_call( - self._clear_old_push_actions_staging, 30 * 60 * 1000 + self._clear_old_push_actions_staging, Duration(minutes=30) ) self.db_pool.updates.register_background_index_update( @@ -1817,7 +1820,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas return # We sleep to ensure that we don't overwhelm the DB. - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) async def get_push_actions_for_user( self, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4cf708442d..ae6ee50dc2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -92,6 +92,7 @@ from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import AsyncLruCache from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -278,7 +279,7 @@ class EventsWorkerStore(SQLBaseStore): # We periodically clean out old transaction ID mappings self.clock.looping_call( self._cleanup_old_transaction_ids, - 5 * 60 * 1000, + Duration(minutes=5), ) self._get_event_cache: AsyncLruCache[tuple[str], EventCacheEntry] = ( diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 51f04acbcb..dd49f98366 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -38,6 +38,7 @@ from synapse.storage.database import ( ) from synapse.types import ISynapseReactor from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -49,11 +50,13 @@ logger = logging.getLogger(__name__) # How often to renew an acquired lock by updating the `last_renewed_ts` time in # the lock table. -_RENEWAL_INTERVAL_MS = 30 * 1000 +_RENEWAL_INTERVAL = Duration(seconds=30) # How long before an acquired lock times out. _LOCK_TIMEOUT_MS = 2 * 60 * 1000 +_LOCK_REAP_INTERVAL = Duration(milliseconds=_LOCK_TIMEOUT_MS / 10.0) + class LockStore(SQLBaseStore): """Provides a best effort distributed lock between worker instances. @@ -106,9 +109,7 @@ class LockStore(SQLBaseStore): self._acquiring_locks: set[tuple[str, str]] = set() - self.clock.looping_call( - self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0 - ) + self.clock.looping_call(self._reap_stale_read_write_locks, _LOCK_REAP_INTERVAL) @wrap_as_background_process("LockStore._on_shutdown") async def _on_shutdown(self) -> None: @@ -410,7 +411,7 @@ class Lock: def _setup_looping_call(self) -> None: self._looping_call = self._clock.looping_call( self._renew, - _RENEWAL_INTERVAL_MS, + _RENEWAL_INTERVAL, self._server_name, self._store, self._hs, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index dc8e2c1616..b2b4561247 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -34,6 +34,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.event_push_actions import ( EventPushActionsWorkerStore, ) +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -78,7 +79,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): # Read the extrems every 60 minutes if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._read_forward_extremities, 60 * 60 * 1000) + self.clock.looping_call(self._read_forward_extremities, Duration(hours=1)) # Used in _generate_user_daily_visits to keep track of progress self._last_user_visit_update = self._get_start_of_day() diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 545b0f11c4..9a9c0fffc7 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -49,13 +49,12 @@ from synapse.storage.util.id_generators import IdGenerator from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, StrCollection, UserID, UserInfo from synapse.util.caches.descriptors import cached +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter if TYPE_CHECKING: from synapse.server import HomeServer -THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 - logger = logging.getLogger(__name__) @@ -213,7 +212,7 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore): if hs.config.worker.run_background_tasks: self.clock.call_later( - 0.0, + Duration(seconds=0), self._set_expiration_date_when_missing, ) @@ -227,7 +226,7 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore): # Create a background job for culling expired 3PID validity tokens if hs.config.worker.run_background_tasks: self.clock.looping_call( - self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS + self.cull_expired_threepid_validation_tokens, Duration(minutes=30) ) async def register_user( @@ -2739,7 +2738,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): # Create a background job for removing expired login tokens if hs.config.worker.run_background_tasks: self.clock.looping_call( - self._delete_expired_login_tokens, THIRTY_MINUTES_IN_MS + self._delete_expired_login_tokens, Duration(minutes=30) ) async def add_access_token_to_user( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4fb7779d38..9b06ab69fe 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -63,6 +63,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.caches.descriptors import _CacheContext, cached, cachedList +from synapse.util.duration import Duration from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -110,10 +111,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): self._known_servers_count = 1 self.hs.get_clock().looping_call( self._count_known_servers, - 60 * 1000, + Duration(minutes=1), ) self.hs.get_clock().call_later( - 1, + Duration(seconds=1), self._count_known_servers, ) federation_known_servers_gauge.register_hook( diff --git a/synapse/storage/databases/main/session.py b/synapse/storage/databases/main/session.py index 1154bb2d59..f088a8d88c 100644 --- a/synapse/storage/databases/main/session.py +++ b/synapse/storage/databases/main/session.py @@ -30,6 +30,7 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.types import JsonDict +from synapse.util.duration import Duration from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -55,7 +56,7 @@ class SessionStore(SQLBaseStore): # Create a background job for culling expired sessions. if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._delete_expired_sessions, 30 * 60 * 1000) + self.clock.looping_call(self._delete_expired_sessions, Duration(minutes=30)) async def create_session( self, session_type: str, value: JsonDict, expiry_ms: int diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 6f87308cde..828eed3a73 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -96,7 +96,7 @@ class SlidingSyncStore(SQLBaseStore): if self.hs.config.worker.run_background_tasks: self.clock.looping_call( self.delete_old_sliding_sync_connections, - CONNECTION_EXPIRY_FREQUENCY.as_millis(), + CONNECTION_EXPIRY_FREQUENCY, ) async def get_latest_bump_stamp_for_room( diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 70c5b928fd..2fdd27d3da 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -37,6 +37,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.types import JsonDict, StrCollection from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -81,7 +82,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): super().__init__(database, db_conn, hs) if hs.config.worker.run_background_tasks: - self.clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + self.clock.looping_call(self._cleanup_transactions, Duration(minutes=30)) @wrap_as_background_process("cleanup_transactions") async def _cleanup_transactions(self) -> None: diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 6f9bbcac67..818f8b1a69 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -58,6 +58,7 @@ from synapse.logging.context import ( run_in_background, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -640,7 +641,7 @@ class Linearizer: # This needs to happen while we hold the lock. We could put it on the # exit path, but that would slow down the uncontended case. try: - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) except CancelledError: self._release_lock(key, entry) raise @@ -818,7 +819,9 @@ def timeout_deferred( # We don't track these calls since they are short. delayed_call = clock.call_later( - timeout, time_it_out, call_later_cancel_on_shutdown=cancel_on_shutdown + Duration(seconds=timeout), + time_it_out, + call_later_cancel_on_shutdown=cancel_on_shutdown, ) def convert_cancelled(value: Failure) -> Failure: diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 514abcbec1..43eefcb7f1 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -36,6 +36,7 @@ from twisted.internet import defer from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics import SERVER_NAME_LABEL from synapse.util.clock import Clock +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -175,7 +176,7 @@ class BatchingQueue(Generic[V, R]): # pattern is to call `add_to_queue` multiple times at once, and # deferring to the next reactor tick allows us to batch all of # those up. - await self._clock.sleep(0) + await self._clock.sleep(Duration(seconds=0)) next_values = self._next_values.pop(key, []) if not next_values: diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 528e4bb852..87870f4223 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -38,6 +38,7 @@ from twisted.internet import defer from synapse.config import cache as cache_config from synapse.util.caches import EvictionReason, register_cache from synapse.util.clock import Clock +from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer @@ -112,7 +113,7 @@ class ExpiringCache(Generic[KT, VT]): def f() -> "defer.Deferred[None]": return hs.run_as_background_process("prune_cache", self._prune_cache) - self._clock.looping_call(f, self._expiry_ms / 2) + self._clock.looping_call(f, Duration(milliseconds=self._expiry_ms / 2)) def __setitem__(self, key: KT, value: VT) -> None: now = self._clock.time_msec() diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index d304e804e9..a3e7bd4d03 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -50,6 +50,7 @@ from synapse.util.caches.treecache import ( iterate_tree_cache_items, ) from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.linked_list import ListNode if TYPE_CHECKING: @@ -202,9 +203,9 @@ def _expire_old_entries( if (i + 1) % 10000 == 0: logger.debug("Waiting during drop") if node.last_access_ts_secs > now - expiry_seconds: - await clock.sleep(0.5) + await clock.sleep(Duration(milliseconds=500)) else: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) logger.debug("Waking during drop") node = next_node @@ -248,7 +249,7 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: clock = hs.get_clock() clock.looping_call( _expire_old_entries, - 30 * 1000, + Duration(seconds=30), server_name, hs, clock, diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index b1cdc81dda..0289e13f6a 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -42,6 +42,7 @@ from synapse.logging.opentracing import ( from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred from synapse.util.caches import EvictionReason, register_cache from synapse.util.clock import Clock +from synapse.util.duration import Duration logger = logging.getLogger(__name__) @@ -120,7 +121,7 @@ class ResponseCache(Generic[KV]): self._result_cache: dict[KV, ResponseCacheEntry] = {} self.clock = clock - self.timeout_sec = timeout_ms / 1000.0 + self.timeout = Duration(milliseconds=timeout_ms) self._name = name self._metrics = register_cache( @@ -195,9 +196,9 @@ class ResponseCache(Generic[KV]): # if this cache has a non-zero timeout, and the callback has not cleared # the should_cache bit, we leave it in the cache for now and schedule # its removal later. - if self.timeout_sec and context.should_cache: + if self.timeout and context.should_cache: self.clock.call_later( - self.timeout_sec, + self.timeout, self._entry_timeout, key, # We don't need to track these calls since they don't hold any strong diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 65f7164896..6fd31864b7 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -31,6 +31,7 @@ from twisted.internet.task import LoopingCall from synapse.logging import context from synapse.types import ISynapseThreadlessReactor from synapse.util import log_failure +from synapse.util.duration import Duration from synapse.util.stringutils import random_string_insecure_fast P = ParamSpec("P") @@ -84,14 +85,14 @@ class Clock: self.cancel_all_looping_calls() self.cancel_all_delayed_calls() - async def sleep(self, seconds: float) -> None: + async def sleep(self, duration: Duration) -> None: d: defer.Deferred[float] = defer.Deferred() # Start task in the `sentinel` logcontext, to avoid leaking the current context # into the reactor once it finishes. with context.PreserveLoggingContext(): # We can ignore the lint here since this class is the one location callLater should # be called. - self._reactor.callLater(seconds, d.callback, seconds) # type: ignore[call-later-not-tracked] + self._reactor.callLater(duration.as_secs(), d.callback, duration.as_secs()) # type: ignore[call-later-not-tracked] await d def time(self) -> float: @@ -105,13 +106,13 @@ class Clock: def looping_call( self, f: Callable[P, object], - msec: float, + duration: Duration, *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. - Waits `msec` initially before calling `f` for the first time. + Waits `duration` initially before calling `f` for the first time. If the function given to `looping_call` returns an awaitable/deferred, the next call isn't scheduled until after the returned awaitable has finished. We get @@ -124,16 +125,16 @@ class Clock: Args: f: The function to call repeatedly. - msec: How long to wait between calls in milliseconds. + duration: How long to wait between calls. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common(f, msec, False, *args, **kwargs) + return self._looping_call_common(f, duration, False, *args, **kwargs) def looping_call_now( self, f: Callable[P, object], - msec: float, + duration: Duration, *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: @@ -148,16 +149,16 @@ class Clock: Args: f: The function to call repeatedly. - msec: How long to wait between calls in milliseconds. + duration: How long to wait between calls. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common(f, msec, True, *args, **kwargs) + return self._looping_call_common(f, duration, True, *args, **kwargs) def _looping_call_common( self, f: Callable[P, object], - msec: float, + duration: Duration, now: bool, *args: P.args, **kwargs: P.kwargs, @@ -217,7 +218,7 @@ class Clock: # We want to start the task in the `sentinel` logcontext, to avoid leaking the # current context into the reactor after the function finishes. with context.PreserveLoggingContext(): - d = call.start(msec / 1000.0, now=now) + d = call.start(duration.as_secs(), now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) self._looping_calls.append(call) @@ -225,7 +226,7 @@ class Clock: "%s(%s): Scheduled looping call every %sms later", looping_call_context_string, instance_id, - msec, + duration.as_millis(), # Find out who is scheduling the call which makes it easy to follow in the # logs. stack_info=True, @@ -251,7 +252,7 @@ class Clock: def call_later( self, - delay: float, + delay: Duration, callback: Callable, *args: Any, call_later_cancel_on_shutdown: bool = True, @@ -264,7 +265,7 @@ class Clock: `run_as_background_process` to give it more specific label and track metrics. Args: - delay: How long to wait in seconds. + delay: How long to wait. callback: Function to call *args: Postional arguments to pass to function. call_later_cancel_on_shutdown: Whether this call should be tracked for cleanup during @@ -322,7 +323,9 @@ class Clock: # We can ignore the lint here since this class is the one location callLater should # be called. - call = self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) # type: ignore[call-later-not-tracked] + call = self._reactor.callLater( + delay.as_secs(), wrapped_callback, *args, **kwargs + ) # type: ignore[call-later-not-tracked] logger.debug( "call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)", diff --git a/synapse/util/duration.py b/synapse/util/duration.py index 3419f6dda6..135b980852 100644 --- a/synapse/util/duration.py +++ b/synapse/util/duration.py @@ -13,6 +13,7 @@ # from datetime import timedelta +from typing import overload # Constant so we don't keep creating new timedelta objects when calling # `.as_millis()`. @@ -35,6 +36,82 @@ class Duration(timedelta): """Returns the duration in milliseconds.""" return int(self / _ONE_MILLISECOND) - def as_secs(self) -> int: + def as_secs(self) -> float: """Returns the duration in seconds.""" - return int(self.total_seconds()) + return self.total_seconds() + + # Override arithmetic operations to return Duration instances + + def __add__(self, other: timedelta) -> "Duration": + """Add two durations together, returning a Duration.""" + result = super().__add__(other) + return Duration(seconds=result.total_seconds()) + + def __radd__(self, other: timedelta) -> "Duration": + """Add two durations together (reversed), returning a Duration.""" + result = super().__radd__(other) + return Duration(seconds=result.total_seconds()) + + def __sub__(self, other: timedelta) -> "Duration": + """Subtract two durations, returning a Duration.""" + result = super().__sub__(other) + return Duration(seconds=result.total_seconds()) + + def __rsub__(self, other: timedelta) -> "Duration": + """Subtract two durations (reversed), returning a Duration.""" + result = super().__rsub__(other) + return Duration(seconds=result.total_seconds()) + + def __mul__(self, other: float) -> "Duration": + """Multiply a duration by a scalar, returning a Duration.""" + result = super().__mul__(other) + return Duration(seconds=result.total_seconds()) + + def __rmul__(self, other: float) -> "Duration": + """Multiply a duration by a scalar (reversed), returning a Duration.""" + result = super().__rmul__(other) + return Duration(seconds=result.total_seconds()) + + @overload + def __truediv__(self, other: timedelta) -> float: ... + + @overload + def __truediv__(self, other: float) -> "Duration": ... + + def __truediv__(self, other: float | timedelta) -> "Duration | float": + """Divide a duration by a scalar or another duration. + + If dividing by a scalar, returns a Duration. + If dividing by a timedelta, returns a float ratio. + """ + result = super().__truediv__(other) + if isinstance(other, timedelta): + # Dividing by a timedelta gives a float ratio + assert isinstance(result, float) + return result + else: + # Dividing by a scalar gives a Duration + assert isinstance(result, timedelta) + return Duration(seconds=result.total_seconds()) + + @overload + def __floordiv__(self, other: timedelta) -> int: ... + + @overload + def __floordiv__(self, other: int) -> "Duration": ... + + def __floordiv__(self, other: int | timedelta) -> "Duration | int": + """Floor divide a duration by a scalar or another duration. + + If dividing by a scalar, returns a Duration. + If dividing by a timedelta, returns an int ratio. + """ + result = super().__floordiv__(other) + if isinstance(other, timedelta): + # Dividing by a timedelta gives an int ratio + assert isinstance(result, int) + return result + else: + # Dividing by a scalar gives a Duration + assert isinstance(result, timedelta) + return Duration(seconds=result.total_seconds()) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 024706d9cf..d1053d227b 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -48,6 +48,7 @@ from synapse.logging.context import ( from synapse.logging.opentracing import start_active_span from synapse.metrics import SERVER_NAME_LABEL, Histogram, LaterGauge from synapse.util.clock import Clock +from synapse.util.duration import Duration if typing.TYPE_CHECKING: from contextlib import _GeneratorContextManager @@ -353,7 +354,9 @@ class _PerHostRatelimiter: rate_limiter_name=self.metrics_name, **{SERVER_NAME_LABEL: self.our_server_name}, ).inc() - ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) + ret_defer = run_in_background( + self.clock.sleep, Duration(seconds=self.sleep_sec) + ) self.sleeping_requests.add(request_id) @@ -414,6 +417,6 @@ class _PerHostRatelimiter: pass self.clock.call_later( - 0.0, + Duration(seconds=0), start_next_request, ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 3b4423a1ff..353ddb70bc 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -35,6 +35,7 @@ from synapse.metrics.background_process_metrics import ( wrap_as_background_process, ) from synapse.types import JsonMapping, ScheduledTask, TaskStatus +from synapse.util.duration import Duration from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -92,8 +93,8 @@ class TaskScheduler: """ # Precision of the scheduler, evaluation of tasks to run will only happen - # every `SCHEDULE_INTERVAL_MS` ms - SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn + # every `SCHEDULE_INTERVAL` + SCHEDULE_INTERVAL = Duration(minutes=1) # How often to clean up old tasks. CLEANUP_INTERVAL_MS = 30 * 60 * 1000 # Time before a complete or failed task is deleted from the DB @@ -103,7 +104,7 @@ class TaskScheduler: # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs # Report a running task's status and usage every so often. - OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes + OCCASIONAL_REPORT_INTERVAL = Duration(minutes=5) def __init__(self, hs: "HomeServer"): self.hs = hs # nb must be called this for @wrap_as_background_process @@ -127,11 +128,11 @@ class TaskScheduler: if self._run_background_tasks: self._clock.looping_call( self._launch_scheduled_tasks, - TaskScheduler.SCHEDULE_INTERVAL_MS, + TaskScheduler.SCHEDULE_INTERVAL, ) self._clock.looping_call( self._clean_scheduled_tasks, - TaskScheduler.SCHEDULE_INTERVAL_MS, + TaskScheduler.SCHEDULE_INTERVAL, ) running_tasks_gauge.register_hook( @@ -433,7 +434,7 @@ class TaskScheduler: start_time = self._clock.time() occasional_status_call = self._clock.looping_call( _occasional_report, - TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS, + TaskScheduler.OCCASIONAL_REPORT_INTERVAL, log_context, start_time, ) @@ -468,7 +469,7 @@ class TaskScheduler: # Try launch a new task since we've finished with this one. self._clock.call_later( - 0.1, + Duration(milliseconds=100), self._launch_scheduled_tasks, ) diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index d89f487d3d..243f9dbca0 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -37,6 +37,7 @@ from synapse.logging import RemoteHandler from synapse.synapse_rust import reset_logging_config from synapse.types import ISynapseReactor from synapse.util.clock import Clock +from synapse.util.duration import Duration class LineCounter(LineOnlyReceiver): @@ -141,7 +142,7 @@ async def main(reactor: ISynapseReactor, loops: int) -> float: if len(handler._buffer) == handler.maximum_buffer: while len(handler._buffer) > handler.maximum_buffer / 2: - await clock.sleep(0.01) + await clock.sleep(Duration(milliseconds=10)) await logger_factory.on_done diff --git a/tests/federation/transport/server/test__base.py b/tests/federation/transport/server/test__base.py index 3c553e6e40..00a9c2064c 100644 --- a/tests/federation/transport/server/test__base.py +++ b/tests/federation/transport/server/test__base.py @@ -30,6 +30,7 @@ from synapse.http.server import JsonResource from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from synapse.util.ratelimitutils import FederationRateLimiter from tests import unittest @@ -53,13 +54,13 @@ class CancellableFederationServlet(BaseFederationServlet): async def on_GET( self, origin: str, content: None, query: dict[bytes, list[bytes]] ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} async def on_POST( self, origin: str, content: JsonDict, query: dict[bytes, list[bytes]] ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index fa6bb4970b..183234b8a0 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -250,7 +250,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): self.assertEqual(10, len(res)) # wait for the task scheduler to do a second delete pass - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) # remaining messages should now be deleted res = self.get_success( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 70557a4a5f..623eef0ecb 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -544,7 +544,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) self.assertEqual(rows, [(2, [ROOM_ID, []])]) - self.reactor.advance(FORGET_TIMEOUT) + self.reactor.advance(FORGET_TIMEOUT.as_secs()) rows, _, _ = self.get_success( self.handler.get_all_typing_updates( diff --git a/tests/http/test_servlet.py b/tests/http/test_servlet.py index 5bf8305d05..2f1c8f03c6 100644 --- a/tests/http/test_servlet.py +++ b/tests/http/test_servlet.py @@ -34,6 +34,7 @@ from synapse.rest.client._base import client_patterns from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from tests import unittest from tests.http.server._base import test_disconnect @@ -108,11 +109,11 @@ class CancellableRestServlet(RestServlet): @cancellable async def on_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} diff --git a/tests/logging/test_opentracing.py b/tests/logging/test_opentracing.py index 3aaa743265..d5e643585d 100644 --- a/tests/logging/test_opentracing.py +++ b/tests/logging/test_opentracing.py @@ -37,6 +37,7 @@ from synapse.logging.opentracing import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests.server import get_clock @@ -184,7 +185,7 @@ class LogContextScopeManagerTestCase(TestCase): scopes.append(scope) self.assertEqual(self._tracer.active_span, scope.span) - await clock.sleep(4) + await clock.sleep(Duration(seconds=4)) self.assertEqual(self._tracer.active_span, scope.span) scope.close() @@ -194,7 +195,7 @@ class LogContextScopeManagerTestCase(TestCase): scopes.append(root_scope) d1 = run_in_background(task, 1) - await clock.sleep(2) + await clock.sleep(Duration(seconds=2)) d2 = run_in_background(task, 2) # because we did run_in_background, the active span should still be the @@ -351,7 +352,7 @@ class LogContextScopeManagerTestCase(TestCase): # Now wait for the background process to finish while not callback_finished: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -418,7 +419,7 @@ class LogContextScopeManagerTestCase(TestCase): # Now wait for the background process to finish while not callback_finished: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, diff --git a/tests/replication/http/test__base.py b/tests/replication/http/test__base.py index b757c6428a..1c7e7e997b 100644 --- a/tests/replication/http/test__base.py +++ b/tests/replication/http/test__base.py @@ -30,6 +30,7 @@ from synapse.replication.http._base import ReplicationEndpoint from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.cancellation import cancellable +from synapse.util.duration import Duration from tests import unittest from tests.http.server._base import test_disconnect @@ -52,7 +53,7 @@ class CancellableReplicationEndpoint(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} @@ -73,7 +74,7 @@ class UncancellableReplicationEndpoint(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict ) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py index 25112baaa2..a4a3112e20 100644 --- a/tests/rest/admin/test_background_updates.py +++ b/tests/rest/admin/test_background_updates.py @@ -31,6 +31,7 @@ from synapse.server import HomeServer from synapse.storage.background_updates import BackgroundUpdater from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest @@ -105,7 +106,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): "Adds a bg update but doesn't start it" async def _fake_update(progress: JsonDict, batch_size: int) -> int: - await self.clock.sleep(0.2) + await self.clock.sleep(Duration(milliseconds=200)) return batch_size self.store.db_pool.updates.register_background_update_handler( diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 7daf13ad22..1c340efa0c 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -44,6 +44,7 @@ from synapse.storage.databases.main.purge_events import ( ) from synapse.types import UserID from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.task_scheduler import TaskScheduler from tests import unittest @@ -1161,7 +1162,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): # Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call # before the purge is over. Note that it doesn't purge anymore, but we don't care. async def purge_room(room_id: str, force: bool) -> None: - await self.hs.get_clock().sleep(100) + await self.hs.get_clock().sleep(Duration(seconds=100)) self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign] @@ -1464,7 +1465,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self._is_purged(room_id) # Wait for next scheduler run - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) self._is_purged(room_id) @@ -1501,7 +1502,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self._is_purged(room_id) # Wait for next scheduler run - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) # Test that all users has been kicked (room is shutdown) self._has_no_members(room_id) diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py index 0407bb5347..31586a451f 100644 --- a/tests/rest/client/test_transactions.py +++ b/tests/rest/client/test_transactions.py @@ -29,6 +29,7 @@ from synapse.logging.context import SENTINEL_CONTEXT, LoggingContext, current_co from synapse.rest.client.transactions import CLEANUP_PERIOD, HttpTransactionCache from synapse.types import ISynapseReactor, JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.server import get_clock @@ -93,7 +94,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase): # Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock` # for testing purposes. yield defer.ensureDeferred( - Clock(reactor, server_name="test_server").sleep(0) # type: ignore[multiple-internal-clocks] + Clock(reactor, server_name="test_server").sleep(Duration(seconds=0)) # type: ignore[multiple-internal-clocks] ) return 1, {} diff --git a/tests/server_notices/__init__.py b/tests/server_notices/__init__.py index eca52930db..19bda218e3 100644 --- a/tests/server_notices/__init__.py +++ b/tests/server_notices/__init__.py @@ -20,6 +20,7 @@ from synapse.rest.client import login, room, sync from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.unittest import override_config @@ -131,7 +132,7 @@ class ServerNoticesTests(unittest.HomeserverTestCase): break # Sleep and try again. - self.get_success(self.clock.sleep(0.1)) + self.get_success(self.clock.sleep(Duration(milliseconds=100))) else: self.fail( f"Failed to join the server notices room. No 'join' field in sync_body['rooms']: {sync_body['rooms']}" diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 7db710846d..85ce5bede2 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -42,6 +42,7 @@ from synapse.state.v2 import ( ) from synapse.storage.databases.main.event_federation import StateDifference from synapse.types import EventID, StateMap +from synapse.util.duration import Duration from tests import unittest @@ -61,7 +62,7 @@ ORIGIN_SERVER_TS = 0 class FakeClock: - async def sleep(self, msec: float) -> None: + async def sleep(self, duration: Duration) -> None: return None diff --git a/tests/state/test_v21.py b/tests/state/test_v21.py index b17773fb56..58d800f921 100644 --- a/tests/state/test_v21.py +++ b/tests/state/test_v21.py @@ -39,6 +39,7 @@ from synapse.state.v2 import ( ) from synapse.types import StateMap from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.state.test_v2 import TestStateResolutionStore @@ -66,7 +67,7 @@ def monotonic_timestamp() -> int: class FakeClock: - async def sleep(self, duration_ms: float) -> None: + async def sleep(self, duration: Duration) -> None: defer.succeed(None) diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 3743a4a386..622eb96ded 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -26,7 +26,7 @@ from twisted.internet.defer import Deferred from twisted.internet.testing import MemoryReactor from synapse.server import HomeServer -from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS +from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL from synapse.util.clock import Clock from tests import unittest @@ -377,7 +377,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): # Wait for ages with the lock, we should not be able to get the lock. for _ in range(10): - self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000)) + self.reactor.advance((_RENEWAL_INTERVAL.as_secs())) lock2 = self.get_success( self.store.try_acquire_read_write_lock("name", "key", write=True) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 3505423691..e3f79d7670 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -38,6 +38,7 @@ from synapse.storage.database import LoggingTransaction from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.unittest import override_config @@ -59,7 +60,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): async def update(self, progress: JsonDict, count: int) -> int: duration_ms = 10 - await self.clock.sleep((count * duration_ms) / 1000) + await self.clock.sleep(Duration(milliseconds=count * duration_ms)) progress = {"my_key": progress["my_key"] + 1} await self.store.db_pool.runInteraction( "update_progress", @@ -309,7 +310,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): # Run the update with the long-running update item async def update_long(progress: JsonDict, count: int) -> int: - await self.clock.sleep((count * duration_ms) / 1000) + await self.clock.sleep(Duration(milliseconds=count * duration_ms)) progress = {"my_key": progress["my_key"] + 1} await self.store.db_pool.runInteraction( "update_progress", diff --git a/tests/test_server.py b/tests/test_server.py index 2df6bdfa44..ec31b6cc5f 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -38,6 +38,7 @@ from synapse.logging.context import make_deferred_yieldable from synapse.types import JsonDict from synapse.util.cancellation import cancellable from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.http.server._base import test_disconnect @@ -406,11 +407,11 @@ class CancellableDirectServeJsonResource(DirectServeJsonResource): @cancellable async def _async_render_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} async def _async_render_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, {"result": True} @@ -423,11 +424,11 @@ class CancellableDirectServeHtmlResource(DirectServeHtmlResource): @cancellable async def _async_render_GET(self, request: SynapseRequest) -> tuple[int, bytes]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, b"ok" async def _async_render_POST(self, request: SynapseRequest) -> tuple[int, bytes]: - await self.clock.sleep(1.0) + await self.clock.sleep(Duration(seconds=1)) return HTTPStatus.OK, b"ok" diff --git a/tests/util/caches/test_response_cache.py b/tests/util/caches/test_response_cache.py index 30cd6ef0e4..def5c817db 100644 --- a/tests/util/caches/test_response_cache.py +++ b/tests/util/caches/test_response_cache.py @@ -26,6 +26,7 @@ from parameterized import parameterized from twisted.internet import defer from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext +from synapse.util.duration import Duration from tests.server import get_clock from tests.unittest import TestCase @@ -55,7 +56,7 @@ class ResponseCacheTestCase(TestCase): return o async def delayed_return(self, o: str) -> str: - await self.clock.sleep(1) + await self.clock.sleep(Duration(seconds=1)) return o def test_cache_hit(self) -> None: @@ -182,7 +183,7 @@ class ResponseCacheTestCase(TestCase): async def non_caching(o: str, cache_context: ResponseCacheContext[int]) -> str: nonlocal call_count call_count += 1 - await self.clock.sleep(1) + await self.clock.sleep(Duration(seconds=1)) cache_context.should_cache = should_cache return o diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index ca805bb20a..a4114cdfcc 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -37,6 +37,7 @@ from synapse.logging.context import ( ) from synapse.types import ISynapseReactor from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.unittest import logcontext_clean @@ -82,7 +83,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("sentinel") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("sentinel") @@ -94,9 +95,9 @@ class LoggingContextTestCase(unittest.TestCase): reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) # type: ignore[call-later-not-tracked] with LoggingContext(name="foo", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -128,7 +129,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("looping_call") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("looping_call") @@ -139,12 +140,12 @@ class LoggingContextTestCase(unittest.TestCase): with LoggingContext(name="foo", server_name="test_server"): lc = clock.looping_call( - lambda: defer.ensureDeferred(competing_callback()), 0 + lambda: defer.ensureDeferred(competing_callback()), Duration(seconds=0) ) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -179,7 +180,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("looping_call") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("looping_call") @@ -190,10 +191,10 @@ class LoggingContextTestCase(unittest.TestCase): with LoggingContext(name="foo", server_name="test_server"): lc = clock.looping_call_now( - lambda: defer.ensureDeferred(competing_callback()), 0 + lambda: defer.ensureDeferred(competing_callback()), Duration(seconds=0) ) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -228,7 +229,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("call_later") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("call_later") @@ -238,11 +239,13 @@ class LoggingContextTestCase(unittest.TestCase): callback_finished = True with LoggingContext(name="foo", server_name="test_server"): - clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) + clock.call_later( + Duration(seconds=0), lambda: defer.ensureDeferred(competing_callback()) + ) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -280,7 +283,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("foo") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("foo") @@ -303,7 +306,7 @@ class LoggingContextTestCase(unittest.TestCase): await d self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -338,7 +341,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("sentinel") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("sentinel") @@ -364,7 +367,7 @@ class LoggingContextTestCase(unittest.TestCase): d.callback(None) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -400,7 +403,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("foo") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("foo") @@ -446,7 +449,7 @@ class LoggingContextTestCase(unittest.TestCase): run_in_background(lambda: (d.callback(None), d)[1]) # type: ignore[call-overload, func-returns-value] self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self.assertTrue( callback_finished, @@ -486,7 +489,7 @@ class LoggingContextTestCase(unittest.TestCase): # Now wait for the function under test to have run, and check that # the logcontext is left in a sane state. while not callback_finished: - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( @@ -501,7 +504,7 @@ class LoggingContextTestCase(unittest.TestCase): async def test_run_in_background_with_blocking_fn(self) -> None: async def blocking_function() -> None: # Ignore linter error since we are creating a `Clock` for testing purposes. - await Clock(reactor, server_name="test_server").sleep(0) # type: ignore[multiple-internal-clocks] + await Clock(reactor, server_name="test_server").sleep(Duration(seconds=0)) # type: ignore[multiple-internal-clocks] await self._test_run_in_background(blocking_function) @@ -535,7 +538,9 @@ class LoggingContextTestCase(unittest.TestCase): async def testfunc() -> None: self._check_test_key("foo") # Ignore linter error since we are creating a `Clock` for testing purposes. - d = defer.ensureDeferred(Clock(reactor, server_name="test_server").sleep(0)) # type: ignore[multiple-internal-clocks] + d = defer.ensureDeferred( + Clock(reactor, server_name="test_server").sleep(Duration(seconds=0)) # type: ignore[multiple-internal-clocks] + ) self.assertIs(current_context(), SENTINEL_CONTEXT) await d self._check_test_key("foo") @@ -579,7 +584,7 @@ class LoggingContextTestCase(unittest.TestCase): self._check_test_key("foo") with LoggingContext(name="competing", server_name="test_server"): - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("competing") self._check_test_key("foo") @@ -591,7 +596,7 @@ class LoggingContextTestCase(unittest.TestCase): with LoggingContext(name="foo", server_name="test_server"): run_coroutine_in_background(competing_callback()) self._check_test_key("foo") - await clock.sleep(0) + await clock.sleep(Duration(seconds=0)) self._check_test_key("foo") self.assertTrue( diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index e33ded8a7f..2c8e21b339 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -26,6 +26,7 @@ from synapse.logging.context import make_deferred_yieldable from synapse.server import HomeServer from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.task_scheduler import TaskScheduler from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -68,7 +69,7 @@ class TestTaskScheduler(HomeserverTestCase): # The timestamp being 30s after now the task should been executed # after the first scheduling loop is run - self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL.as_secs()) task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None @@ -87,7 +88,7 @@ class TestTaskScheduler(HomeserverTestCase): self, task: ScheduledTask ) -> tuple[TaskStatus, JsonMapping | None, str | None]: # Sleep for a second - await self.hs.get_clock().sleep(1) + await self.hs.get_clock().sleep(Duration(seconds=1)) return TaskStatus.COMPLETE, None, None def test_schedule_lot_of_tasks(self) -> None: @@ -187,7 +188,7 @@ class TestTaskScheduler(HomeserverTestCase): # Simulate a synapse restart by emptying the list of running tasks self.task_scheduler._running_tasks = set() - self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL.as_secs())) task = self.get_success(self.task_scheduler.get_task(task_id)) assert task is not None