synapse/tests/handlers/test_worker_lock.py
Eric Eastwood 5a9ca1e3d9
Introduce Clock.call_when_running(...) to include logcontext by default (#18944)
Introduce `Clock.call_when_running(...)` to wrap startup code in a
logcontext, ensuring we can identify which server generated the logs.

Background:

>  Ideally, nothing from the Synapse homeserver would be logged against the `sentinel` 
>  logcontext as we want to know which server the logs came from. In practice, this is not 
>  always the case yet especially outside of request handling. 
>   
>  Global things outside of Synapse (e.g. Twisted reactor code) should run in the 
>  `sentinel` logcontext. It's only when it calls into application code that a logcontext 
>  gets activated. This means the reactor should be started in the `sentinel` logcontext, 
>  and any time an awaitable yields control back to the reactor, it should reset the 
>  logcontext to be the `sentinel` logcontext. This is important to avoid leaking the 
>  current logcontext to the reactor (which would then get picked up and associated with 
>  the next thing the reactor does). 
>
> *-- `docs/log_contexts.md`

Also adds a lint to prefer `Clock.call_when_running(...)` over
`reactor.callWhenRunning(...)`

Part of https://github.com/element-hq/synapse/issues/18905
2025-09-22 10:27:59 -05:00

127 lines
4.3 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2023 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
import platform
from twisted.internet import defer
from twisted.internet.testing import MemoryReactor
from synapse.server import HomeServer
from synapse.util.clock import Clock
from tests import unittest
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import test_timeout
logger = logging.getLogger(__name__)
class WorkerLockTestCase(unittest.HomeserverTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.worker_lock_handler = self.hs.get_worker_locks_handler()
def test_wait_for_lock_locally(self) -> None:
"""Test waiting for a lock on a single worker"""
lock1 = self.worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())
lock2 = self.worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)
self.get_success(lock1.__aexit__(None, None, None))
self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
def test_lock_contention(self) -> None:
"""Test lock contention when a lot of locks wait on a single worker"""
nb_locks_to_test = 500
current_machine = platform.machine().lower()
if current_machine.startswith("riscv"):
# RISC-V specific settings
timeout_seconds = 15 # Increased timeout for RISC-V
# add a print or log statement here for visibility in CI logs
logger.info( # use logger.info
"Detected RISC-V architecture (%s). "
"Adjusting test_lock_contention: timeout=%ss",
current_machine,
timeout_seconds,
)
else:
# Settings for other architectures
timeout_seconds = 5
# It takes around 0.5s on a 5+ years old laptop
with test_timeout(timeout_seconds): # Use the dynamically set timeout
d = self._take_locks(
nb_locks_to_test
) # Use the (potentially adjusted) number of locks
self.assertEqual(
self.get_success(d), nb_locks_to_test
) # Assert against the used number of locks
async def _take_locks(self, nb_locks: int) -> int:
locks = [
self.hs.get_worker_locks_handler().acquire_lock("test_lock", "")
for _ in range(nb_locks)
]
nb_locks_taken = 0
for lock in locks:
async with lock:
nb_locks_taken += 1
return nb_locks_taken
class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.main_worker_lock_handler = self.hs.get_worker_locks_handler()
def test_wait_for_lock_worker(self) -> None:
"""Test waiting for a lock on another worker"""
worker = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"redis": {"enabled": True},
},
)
worker_lock_handler = worker.get_worker_locks_handler()
lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())
lock2 = worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)
self.get_success(lock1.__aexit__(None, None, None))
self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))