mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 00:02:08 -05:00
Fix run_coroutine_in_background(...) incorrectly handling logcontext (#18964)
Regressed in https://github.com/element-hq/synapse/pull/18900#discussion_r2331554278 (see conversation there for more context) ### How is this a regression? > To give this an update with more hindsight; this logic *was* redundant with the early return and it is safe to remove this complexity ✅ > > It seems like this actually has to do with completed vs incomplete deferreds... > > To explain how things previously worked *without* the early-return shortcut: > > With the normal case of **incomplete awaitable**, we store the `calling_context` and the `f` function is called and runs until it yields to the reactor. Because `f` follows the logcontext rules, it sets the `sentinel` logcontext. Then in `run_in_background(...)`, we restore the `calling_context`, store the current `ctx` (which is `sentinel`) and return. When the deferred completes, we restore `ctx` (which is `sentinel`) before yielding to the reactor again (all good ✅) > > With the other case where we see a **completed awaitable**, we store the `calling_context` and the `f` function is called and runs to completion (no logcontext change). *This is where the shortcut would kick in but I'm going to continue explaining as if we commented out the shortcut.* -- Then in `run_in_background(...)`, we restore the `calling_context`, store the current `ctx` (which is same as the `calling_context`). Because the deferred is already completed, our extra callback is called immediately and we restore `ctx` (which is same as the `calling_context`). Since we never yield to the reactor, the `calling_context` is perfect as that's what we want again (all good ✅) > > --- > > But this also means that our early-return shortcut is no longer just an optimization and is *necessary* to act correctly in the **completed awaitable** case as we want to return with the `calling_context` and not reset to the `sentinel` context. I've updated the comment in https://github.com/element-hq/synapse/pull/18964 to explain the necessity as it's currently just described as an optimization. > > But because we made the same change to `run_coroutine_in_background(...)` which didn't have the same early-return shortcut, we regressed the correct behavior ❌ . This is being fixed in https://github.com/element-hq/synapse/pull/18964 > > > *-- @MadLittleMods, https://github.com/element-hq/synapse/pull/18900#discussion_r2373582917* ### How did we find this problem? Spawning from @wrjlewis [seeing](https://matrix.to/#/!SGNQGPGUwtcPBUotTL:matrix.org/$h3TxxPVlqC6BTL07dbrsz6PmaUoZxLiXnSTEY-QYDtA?via=jki.re&via=matrix.org&via=element.io) `Starting metrics collection 'typing.get_new_events' from sentinel context: metrics will be lost` in the logs: <details> <summary>More logs</summary> ``` synapse.http.request_metrics - 222 - ERROR - sentinel - Trying to stop RequestMetrics in the sentinel context. 2025-09-23 14:43:19,712 - synapse.util.metrics - 212 - WARNING - sentinel - Starting metrics collection 'typing.get_new_events' from sentinel context: metrics will be lost 2025-09-23 14:43:19,713 - synapse.rest.client.sync - 851 - INFO - sentinel - Client has disconnected; not serializing response. 2025-09-23 14:43:19,713 - synapse.http.server - 825 - WARNING - sentinel - Not sending response to request <XForwardedForRequest at 0x7f23e8111ed0 method='POST' uri='/_matrix/client/unstable/org.matrix.simplified_msc3575/sync?pos=281963%2Fs929324_147053_10_2652457_147960_2013_25554_4709564_0_164_2&timeout=30000' clientproto='HTTP/1.1' site='8008'>, already dis connected. 2025-09-23 14:43:19,713 - synapse.access.http.8008 - 515 - INFO - sentinel - 92.40.194.87 - 8008 - {@me:wi11.co.uk} Processed request: 30.005sec/-8.041sec (0.001sec, 0.000sec) (0.000sec/0.002sec/2) 0B 200! "POST /_matrix/client/unstable/org.matrix.simplified_msc3575/ ``` </details> From the logs there, we can see things relating to `typing.get_new_events` and `/_matrix/client/unstable/org.matrix.simplified_msc3575/sync` which led me to trying out Sliding Sync with the typing extension enabled and allowed me to reproduce the problem locally. Sliding Sync is a unique scenario as it's the only place we use `gather_optional_coroutines(...)` -> `run_coroutine_in_background(...)` (introduced in https://github.com/element-hq/synapse/pull/17884) to exhibit this behavior. ### Testing strategy 1. Configure Synapse to enable [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186): Simplified Sliding Sync which is actually under [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) ```yaml experimental_features: msc3575_enabled: true ``` 1. Start synapse: `poetry run synapse_homeserver --config-path homeserver.yaml` 1. Make a Sliding Sync request with one of the extensions enabled ```http POST http://localhost:8008/_matrix/client/unstable/org.matrix.simplified_msc3575/sync { "lists": {}, "room_subscriptions": { "!FlgJYGQKAIvAscfBhq:my.synapse.linux.server": { "required_state": [], "timeline_limit": 1 } }, "extensions": { "typing": { "enabled": true } } } ``` 1. Open your homeserver logs and notice warnings about `Starting ... from sentinel context: metrics will be lost`
This commit is contained in:
parent
25fa555395
commit
0458f691b6
1
changelog.d/18964.misc
Normal file
1
changelog.d/18964.misc
Normal file
@ -0,0 +1 @@
|
||||
Fix `run_coroutine_in_background(...)` incorrectly handling logcontext.
|
||||
@ -802,13 +802,15 @@ def run_in_background(
|
||||
deferred returned by the function completes.
|
||||
|
||||
To explain how the log contexts work here:
|
||||
- When `run_in_background` is called, the current context is stored ("original"),
|
||||
we kick off the background task in the current context, and we restore that
|
||||
original context before returning
|
||||
- When the background task finishes, we don't want to leak our context into the
|
||||
reactor which would erroneously get attached to the next operation picked up by
|
||||
the event loop. We add a callback to the deferred which will clear the logging
|
||||
context after it finishes and yields control back to the reactor.
|
||||
- When `run_in_background` is called, the calling logcontext is stored
|
||||
("original"), we kick off the background task in the current context, and we
|
||||
restore that original context before returning.
|
||||
- For a completed deferred, that's the end of the story.
|
||||
- For an incomplete deferred, when the background task finishes, we don't want to
|
||||
leak our context into the reactor which would erroneously get attached to the
|
||||
next operation picked up by the event loop. We add a callback to the deferred
|
||||
which will clear the logging context after it finishes and yields control back to
|
||||
the reactor.
|
||||
|
||||
Useful for wrapping functions that return a deferred or coroutine, which you don't
|
||||
yield or await on (for instance because you want to pass it to
|
||||
@ -857,22 +859,36 @@ def run_in_background(
|
||||
|
||||
# The deferred has already completed
|
||||
if d.called and not d.paused:
|
||||
# The function should have maintained the logcontext, so we can
|
||||
# optimise out the messing about
|
||||
# If the function messes with logcontexts, we can assume it follows the Synapse
|
||||
# logcontext rules (Rules for functions returning awaitables: "If the awaitable
|
||||
# is already complete, the function returns with the same logcontext it started
|
||||
# with."). If it function doesn't touch logcontexts at all, we can also assume
|
||||
# the logcontext is unchanged.
|
||||
#
|
||||
# Either way, the function should have maintained the calling logcontext, so we
|
||||
# can avoid messing with it further. Additionally, if the deferred has already
|
||||
# completed, then it would be a mistake to then add a deferred callback (below)
|
||||
# to reset the logcontext to the sentinel logcontext as that would run
|
||||
# immediately (remember our goal is to maintain the calling logcontext when we
|
||||
# return).
|
||||
return d
|
||||
|
||||
# The function may have reset the context before returning, so we need to restore it
|
||||
# now.
|
||||
# Since the function we called may follow the Synapse logcontext rules (Rules for
|
||||
# functions returning awaitables: "If the awaitable is incomplete, the function
|
||||
# clears the logcontext before returning"), the function may have reset the
|
||||
# logcontext before returning, so we need to restore the calling logcontext now
|
||||
# before we return ourselves.
|
||||
#
|
||||
# Our goal is to have the caller logcontext unchanged after firing off the
|
||||
# background task and returning.
|
||||
set_current_context(calling_context)
|
||||
|
||||
# The original logcontext will be restored when the deferred completes, but
|
||||
# there is nothing waiting for it, so it will get leaked into the reactor (which
|
||||
# would then get picked up by the next thing the reactor does). We therefore
|
||||
# need to reset the logcontext here (set the `sentinel` logcontext) before
|
||||
# yielding control back to the reactor.
|
||||
# If the function we called is playing nice and following the Synapse logcontext
|
||||
# rules, it will restore original calling logcontext when the deferred completes;
|
||||
# but there is nothing waiting for it, so it will get leaked into the reactor (which
|
||||
# would then get picked up by the next thing the reactor does). We therefore need to
|
||||
# reset the logcontext here (set the `sentinel` logcontext) before yielding control
|
||||
# back to the reactor.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
@ -894,10 +910,9 @@ def run_coroutine_in_background(
|
||||
Useful for wrapping coroutines that you don't yield or await on (for
|
||||
instance because you want to pass it to deferred.gatherResults()).
|
||||
|
||||
This is a special case of `run_in_background` where we can accept a
|
||||
coroutine directly rather than a function. We can do this because coroutines
|
||||
do not run until called, and so calling an async function without awaiting
|
||||
cannot change the log contexts.
|
||||
This is a special case of `run_in_background` where we can accept a coroutine
|
||||
directly rather than a function. We can do this because coroutines do not continue
|
||||
running once they have yielded.
|
||||
|
||||
This is an ergonomic helper so we can do this:
|
||||
```python
|
||||
@ -908,33 +923,7 @@ def run_coroutine_in_background(
|
||||
run_in_background(lambda: func1(arg1))
|
||||
```
|
||||
"""
|
||||
calling_context = current_context()
|
||||
|
||||
# Wrap the coroutine in a deferred, which will have the side effect of executing the
|
||||
# coroutine in the background.
|
||||
d = defer.ensureDeferred(coroutine)
|
||||
|
||||
# The function may have reset the context before returning, so we need to restore it
|
||||
# now.
|
||||
#
|
||||
# Our goal is to have the caller logcontext unchanged after firing off the
|
||||
# background task and returning.
|
||||
set_current_context(calling_context)
|
||||
|
||||
# The original logcontext will be restored when the deferred completes, but
|
||||
# there is nothing waiting for it, so it will get leaked into the reactor (which
|
||||
# would then get picked up by the next thing the reactor does). We therefore
|
||||
# need to reset the logcontext here (set the `sentinel` logcontext) before
|
||||
# yielding control back to the reactor.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
|
||||
return d
|
||||
return run_in_background(lambda: coroutine)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@ -22,7 +22,6 @@
|
||||
import logging
|
||||
from typing import Callable, Generator, cast
|
||||
|
||||
import twisted.python.failure
|
||||
from twisted.internet import defer, reactor as _reactor
|
||||
|
||||
from synapse.logging.context import (
|
||||
@ -33,6 +32,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
nested_logging_context,
|
||||
run_coroutine_in_background,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.types import ISynapseReactor
|
||||
@ -249,73 +249,80 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred:
|
||||
sentinel_context = current_context()
|
||||
async def _test_run_in_background(self, function: Callable[[], object]) -> None:
|
||||
clock = Clock(reactor)
|
||||
|
||||
callback_completed = False
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
with LoggingContext("foo"):
|
||||
# fire off function, but don't wait on it.
|
||||
d2 = run_in_background(function)
|
||||
# Fire off the function, but don't wait on it.
|
||||
deferred = run_in_background(function)
|
||||
self._check_test_key("foo")
|
||||
|
||||
def cb(res: object) -> object:
|
||||
nonlocal callback_completed
|
||||
callback_completed = True
|
||||
return res
|
||||
def callback(result: object) -> object:
|
||||
nonlocal callback_finished
|
||||
callback_finished = True
|
||||
# Pass through the result
|
||||
return result
|
||||
|
||||
d2.addCallback(cb)
|
||||
# We `addBoth` because when exceptions happen, we still want to mark the
|
||||
# callback as finished so that the test can complete and we see the
|
||||
# underlying error.
|
||||
deferred.addBoth(callback)
|
||||
|
||||
self._check_test_key("foo")
|
||||
|
||||
# now wait for the function under test to have run, and check that
|
||||
# the logcontext is left in a sane state.
|
||||
d2 = defer.Deferred()
|
||||
# Now wait for the function under test to have run, and check that
|
||||
# the logcontext is left in a sane state.
|
||||
while not callback_finished:
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
|
||||
def check_logcontext() -> None:
|
||||
if not callback_completed:
|
||||
reactor.callLater(0.01, check_logcontext)
|
||||
return
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# make sure that the context was reset before it got thrown back
|
||||
# into the reactor
|
||||
try:
|
||||
self.assertIs(current_context(), sentinel_context)
|
||||
d2.callback(None)
|
||||
except BaseException:
|
||||
d2.errback(twisted.python.failure.Failure())
|
||||
|
||||
reactor.callLater(0.01, check_logcontext)
|
||||
|
||||
# test is done once d2 finishes
|
||||
return d2
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_blocking_fn(self) -> defer.Deferred:
|
||||
async def test_run_in_background_with_blocking_fn(self) -> None:
|
||||
async def blocking_function() -> None:
|
||||
await Clock(reactor).sleep(0)
|
||||
|
||||
return self._test_run_in_background(blocking_function)
|
||||
await self._test_run_in_background(blocking_function)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred:
|
||||
async def test_run_in_background_with_non_blocking_fn(self) -> None:
|
||||
@defer.inlineCallbacks
|
||||
def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]:
|
||||
with PreserveLoggingContext():
|
||||
yield defer.succeed(None)
|
||||
|
||||
return self._test_run_in_background(nonblocking_function)
|
||||
await self._test_run_in_background(nonblocking_function)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_chained_deferred(self) -> defer.Deferred:
|
||||
async def test_run_in_background_with_chained_deferred(self) -> None:
|
||||
# a function which returns a deferred which looks like it has been
|
||||
# called, but is actually paused
|
||||
def testfunc() -> defer.Deferred:
|
||||
return make_deferred_yieldable(_chained_deferred_function())
|
||||
|
||||
return self._test_run_in_background(testfunc)
|
||||
await self._test_run_in_background(testfunc)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_coroutine(self) -> defer.Deferred:
|
||||
async def test_run_in_background_with_coroutine(self) -> None:
|
||||
"""
|
||||
Test `run_in_background` with a coroutine that yields control back to the
|
||||
reactor.
|
||||
|
||||
This will stress the logic around incomplete deferreds in `run_in_background`.
|
||||
"""
|
||||
|
||||
async def testfunc() -> None:
|
||||
self._check_test_key("foo")
|
||||
d = defer.ensureDeferred(Clock(reactor).sleep(0))
|
||||
@ -323,14 +330,111 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
await d
|
||||
self._check_test_key("foo")
|
||||
|
||||
return self._test_run_in_background(testfunc)
|
||||
await self._test_run_in_background(testfunc)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred:
|
||||
async def test_run_in_background_with_nonblocking_coroutine(self) -> None:
|
||||
"""
|
||||
Test `run_in_background` with a "nonblocking" coroutine (never yields control
|
||||
back to the reactor).
|
||||
|
||||
This will stress the logic around completed deferreds in `run_in_background`.
|
||||
"""
|
||||
|
||||
async def testfunc() -> None:
|
||||
self._check_test_key("foo")
|
||||
|
||||
return self._test_run_in_background(testfunc)
|
||||
await self._test_run_in_background(testfunc)
|
||||
|
||||
@logcontext_clean
|
||||
async def test_run_coroutine_in_background(self) -> None:
|
||||
"""
|
||||
Test `run_coroutine_in_background` with a coroutine that yields control back to the
|
||||
reactor.
|
||||
|
||||
This will stress the logic around incomplete deferreds in `run_coroutine_in_background`.
|
||||
"""
|
||||
clock = Clock(reactor)
|
||||
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def competing_callback() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# The callback should have the same logcontext as the caller
|
||||
self._check_test_key("foo")
|
||||
|
||||
with LoggingContext("competing"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("competing")
|
||||
|
||||
self._check_test_key("foo")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext("foo"):
|
||||
run_coroutine_in_background(competing_callback())
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
@logcontext_clean
|
||||
async def test_run_coroutine_in_background_with_nonblocking_coroutine(self) -> None:
|
||||
"""
|
||||
Test `run_coroutine_in_background` with a "nonblocking" coroutine (never yields control
|
||||
back to the reactor).
|
||||
|
||||
This will stress the logic around completed deferreds in `run_coroutine_in_background`.
|
||||
"""
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def competing_callback() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# The callback should have the same logcontext as the caller
|
||||
self._check_test_key("foo")
|
||||
|
||||
with LoggingContext("competing"):
|
||||
# We `await` here but there is nothing to wait for here since the
|
||||
# deferred is already complete so we should immediately continue
|
||||
# executing in the same context.
|
||||
await defer.succeed(None)
|
||||
|
||||
self._check_test_key("competing")
|
||||
|
||||
self._check_test_key("foo")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext("foo"):
|
||||
run_coroutine_in_background(competing_callback())
|
||||
self._check_test_key("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
@logcontext_clean
|
||||
@defer.inlineCallbacks
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user