Compare commits

...

4 Commits

Author SHA1 Message Date
Eric Eastwood
b670864af9 Merge branch 'develop' into madlittlemods/18357-rust-http-client-remove-preserve-logging-context 2025-10-02 12:00:57 -05:00
Eric Eastwood
d27ff161f5
Add debug logs wherever we change current logcontext (#18966)
Add debug logs wherever we change current logcontext (`LoggingContext`).
I've had to make this same set of changes over and over as I've been
debugging things so it seems useful enough to include by default.

Instead of tracing things at the `set_current_context(...)` level, I've
added the debug logging on all of the utilities that utilize
`set_current_context(...)`. It's much easier to reason about the log
context changing because of `PreserveLoggingContext` changing things
than an opaque `set_current_context(...)` call.
2025-10-02 11:51:17 -05:00
Eric Eastwood
06a84f4fe0
Revert "Switch to OpenTracing's ContextVarsScopeManager (#18849)" (#19007)
Revert https://github.com/element-hq/synapse/pull/18849

Go back to our custom `LogContextScopeManager` after trying
OpenTracing's `ContextVarsScopeManager`.

Fix https://github.com/element-hq/synapse/issues/19004

### Why revert?

For reference, with the normal reactor, `ContextVarsScopeManager` worked
just as good as our custom `LogContextScopeManager` as far as I can tell
(and even better in some cases). But since Twisted appears to not fully
support `ContextVar`'s, it doesn't work as expected in all cases.
Compounding things, `ContextVarsScopeManager` was causing errors with
the experimental `SYNAPSE_ASYNC_IO_REACTOR` option.

Since we're not getting the full benefit that we originally desired, we
might as well revert and figure out alternatives for extending the
logcontext lifetimes to support the use case we were trying to unlock
(c.f. https://github.com/element-hq/synapse/pull/18804).

See
https://github.com/element-hq/synapse/issues/19004#issuecomment-3358052171
for more info.


### Does this require backporting and patch releases?

No. Since `ContextVarsScopeManager` operates just as good with the
normal reactor and was only causing actual errors with the experimental
`SYNAPSE_ASYNC_IO_REACTOR` option, I don't think this requires us to
backport and make patch releases at all.



### Maintain cross-links between main trace and background process work

In order to maintain the functionality introduced in https://github.com/element-hq/synapse/pull/18932 (cross-links between the background process trace and currently active trace), we also needed a small change.

Previously, when we were using `ContextVarsScopeManager`, it tracked the tracing scope across the logcontext changes without issue. Now that we're using our own custom `LogContextScopeManager` again, we need to capture the active span from the logcontext before we reset to the sentinel context because of the `PreserveLoggingContext()` below.

Added some tests to ensure we maintain the `run_as_background` tracing behavior regardless of the tracing scope manager we use.
2025-10-02 11:27:26 -05:00
Eric Eastwood
1c093509ce
Switch task scheduler from raw logcontext manipulation (set_current_context) to utils (PreserveLoggingContext) (#18990)
Prefer the utils over raw logcontext manipulation.

Spawning from adding some logcontext debug logs in
https://github.com/element-hq/synapse/pull/18966 and since we're not
logging at the `set_current_context(...)` level (see reasoning there),
this removes some usage of `set_current_context(...)`.
2025-10-02 10:22:25 -05:00
10 changed files with 554 additions and 168 deletions

1
changelog.d/18966.misc Normal file
View File

@ -0,0 +1 @@
Add debug logs wherever we change current logcontext.

1
changelog.d/18990.misc Normal file
View File

@ -0,0 +1 @@
Switch task scheduler from raw logcontext manipulation to using the dedicated logcontext utils.

1
changelog.d/19007.misc Normal file
View File

@ -0,0 +1 @@
Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled.

View File

@ -548,3 +548,19 @@ chain are dropped. Dropping the the reference to an awaitable you're
supposed to be awaiting is bad practice, so this doesn't
actually happen too much. Unfortunately, when it does happen, it will
lead to leaked logcontexts which are incredibly hard to track down.
## Debugging logcontext issues
Debugging logcontext issues can be tricky as leaking or losing a logcontext will surface
downstream and can point to an unrelated part of the codebase. It's best to enable debug
logging for `synapse.logging.context.debug` (needs to be explicitly configured) and go
backwards in the logs from the point where the issue is observed to find the root cause.
`log.config.yaml`
```yaml
loggers:
# Unlike other loggers, this one needs to be explicitly configured to see debug logs.
synapse.logging.context.debug:
level: DEBUG
```

View File

@ -55,11 +55,29 @@ from typing_extensions import ParamSpec
from twisted.internet import defer, threads
from twisted.python.threadpool import ThreadPool
from synapse.logging.loggers import ExplicitlyConfiguredLogger
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.logging.scopecontextmanager import _LogContextScope
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
original_logger_class = logging.getLoggerClass()
logging.setLoggerClass(ExplicitlyConfiguredLogger)
logcontext_debug_logger = logging.getLogger("synapse.logging.context.debug")
"""
A logger for debugging when the logcontext switches.
Because this is very noisy and probably something only developers want to see when
debugging logcontext problems, we want people to explictly opt-in before seeing anything
in the logs. Requires explicitly setting `synapse.logging.context.debug` in the logging
configuration and does not inherit the log level from the parent logger.
"""
# Restore the original logger class
logging.setLoggerClass(original_logger_class)
try:
import resource
@ -238,7 +256,14 @@ class _Sentinel:
we should always know which server the logs are coming from.
"""
__slots__ = ["previous_context", "finished", "server_name", "request", "tag"]
__slots__ = [
"previous_context",
"finished",
"scope",
"server_name",
"request",
"tag",
]
def __init__(self) -> None:
# Minimal set for compatibility with LoggingContext
@ -246,6 +271,7 @@ class _Sentinel:
self.finished = False
self.server_name = "unknown_server_from_sentinel_context"
self.request = None
self.scope = None
self.tag = None
def __str__(self) -> str:
@ -303,6 +329,7 @@ class LoggingContext:
"finished",
"request",
"tag",
"scope",
]
def __init__(
@ -327,6 +354,7 @@ class LoggingContext:
self.main_thread = get_thread_id()
self.request = None
self.tag = ""
self.scope: Optional["_LogContextScope"] = None
# keep track of whether we have hit the __exit__ block for this context
# (suggesting that the the thing that created the context thinks it should
@ -340,6 +368,9 @@ class LoggingContext:
# which request this corresponds to
self.request = self.parent_context.request
# we also track the current scope:
self.scope = self.parent_context.scope
if request is not None:
# the request param overrides the request from the parent context
self.request = request
@ -390,6 +421,7 @@ class LoggingContext:
def __enter__(self) -> "LoggingContext":
"""Enters this logging context into thread local storage"""
logcontext_debug_logger.debug("LoggingContext(%s).__enter__", self.name)
old_context = set_current_context(self)
if self.previous_context != old_context:
logcontext_error(
@ -412,6 +444,9 @@ class LoggingContext:
Returns:
None to avoid suppressing any exceptions that were thrown.
"""
logcontext_debug_logger.debug(
"LoggingContext(%s).__exit__ --> %s", self.name, self.previous_context
)
current = set_current_context(self.previous_context)
if current is not self:
if current is SENTINEL_CONTEXT:
@ -660,14 +695,21 @@ class PreserveLoggingContext:
reactor back to the code).
"""
__slots__ = ["_old_context", "_new_context"]
__slots__ = ["_old_context", "_new_context", "_instance_id"]
def __init__(
self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT
) -> None:
self._new_context = new_context
self._instance_id = random_string(5)
def __enter__(self) -> None:
logcontext_debug_logger.debug(
"PreserveLoggingContext(%s).__enter__ %s --> %s",
self._instance_id,
current_context(),
self._new_context,
)
self._old_context = set_current_context(self._new_context)
def __exit__(
@ -676,6 +718,12 @@ class PreserveLoggingContext:
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
logcontext_debug_logger.debug(
"PreserveLoggingContext(%s).__exit %s --> %s",
self._instance_id,
current_context(),
self._old_context,
)
context = set_current_context(self._old_context)
if context != self._new_context:
@ -855,7 +903,11 @@ def run_in_background(
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
instance_id = random_string(5)
calling_context = current_context()
logcontext_debug_logger.debug(
"run_in_background(%s): called with logcontext=%s", instance_id, calling_context
)
try:
# (kick off the task in the current context)
res = f(*args, **kwargs)
@ -897,6 +949,11 @@ def run_in_background(
# to reset the logcontext to the sentinel logcontext as that would run
# immediately (remember our goal is to maintain the calling logcontext when we
# return).
logcontext_debug_logger.debug(
"run_in_background(%s): deferred already completed and the function should have maintained the logcontext %s",
instance_id,
calling_context,
)
return d
# Since the function we called may follow the Synapse logcontext rules (Rules for
@ -907,6 +964,11 @@ def run_in_background(
#
# Our goal is to have the caller logcontext unchanged after firing off the
# background task and returning.
logcontext_debug_logger.debug(
"run_in_background(%s): restoring calling logcontext %s",
instance_id,
calling_context,
)
set_current_context(calling_context)
# If the function we called is playing nice and following the Synapse logcontext
@ -922,7 +984,23 @@ def run_in_background(
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
def _log_set_context_cb(
result: ResultT, context: LoggingContextOrSentinel
) -> ResultT:
logcontext_debug_logger.debug(
"run_in_background(%s): resetting logcontext to %s",
instance_id,
context,
)
set_current_context(context)
return result
d.addBoth(_log_set_context_cb, SENTINEL_CONTEXT)
else:
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
return d
@ -978,10 +1056,21 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
restores the old context once the awaitable completes (execution passes from the
reactor back to the code).
"""
instance_id = random_string(5)
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): called with logcontext=%s",
instance_id,
current_context(),
)
# The deferred has already completed
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): deferred already completed and the function should have maintained the logcontext",
instance_id,
)
return deferred
# Our goal is to have the caller logcontext unchanged after they yield/await the
@ -993,8 +1082,31 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
# does) while the deferred runs in the reactor event loop, we reset the logcontext
# and add a callback to the deferred to restore it so the caller's logcontext is
# active when the deferred completes.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): resetting logcontext to %s",
instance_id,
SENTINEL_CONTEXT,
)
calling_context = set_current_context(SENTINEL_CONTEXT)
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
def _log_set_context_cb(
result: ResultT, context: LoggingContextOrSentinel
) -> ResultT:
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): restoring calling logcontext to %s",
instance_id,
context,
)
set_current_context(context)
return result
deferred.addBoth(_log_set_context_cb, calling_context)
else:
deferred.addBoth(_set_context_cb, calling_context)
return deferred

View File

@ -251,17 +251,18 @@ class _DummyTagNames:
try:
import opentracing
import opentracing.tags
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
tags = opentracing.tags
except ImportError:
opentracing = None # type: ignore[assignment]
tags = _DummyTagNames # type: ignore[assignment]
ContextVarsScopeManager = None # type: ignore
try:
from jaeger_client import Config as JaegerConfig
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
JaegerConfig = None # type: ignore
LogContextScopeManager = None # type: ignore
try:
@ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None:
config = JaegerConfig(
config=jaeger_config,
service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
scope_manager=ContextVarsScopeManager(),
scope_manager=LogContextScopeManager(),
metrics_factory=PrometheusMetricsFactory(),
)
@ -683,9 +684,21 @@ def start_active_span_from_edu(
# Opentracing setters for tags, logs, etc
@only_if_tracing
def active_span() -> Optional["opentracing.Span"]:
"""Get the currently active span, if any"""
return opentracing.tracer.active_span
def active_span(
*,
tracer: Optional["opentracing.Tracer"] = None,
) -> Optional["opentracing.Span"]:
"""
Get the currently active span, if any
Args:
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if tracer is None:
# use the global tracer by default
tracer = opentracing.tracer
return tracer.active_span
@ensure_active_span("set a tag")

View File

@ -0,0 +1,161 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import Optional
from opentracing import Scope, ScopeManager, Span
from synapse.logging.context import (
LoggingContext,
current_context,
nested_logging_context,
)
logger = logging.getLogger(__name__)
class LogContextScopeManager(ScopeManager):
"""
The LogContextScopeManager tracks the active scope in opentracing
by using the log contexts which are native to synapse. This is so
that the basic opentracing api can be used across twisted defereds.
It would be nice just to use opentracing's ContextVarsScopeManager,
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
"""
def __init__(self) -> None:
pass
@property
def active(self) -> Optional[Scope]:
"""
Returns the currently active Scope which can be used to access the
currently active Scope.span.
If there is a non-null Scope, its wrapped Span
becomes an implicit parent of any newly-created Span at
Tracer.start_active_span() time.
Return:
The Scope that is active, or None if not available.
"""
ctx = current_context()
return ctx.scope
def activate(self, span: Span, finish_on_close: bool) -> Scope:
"""
Makes a Span active.
Args
span: the span that should become active.
finish_on_close: whether Span should be automatically finished when
Scope.close() is called.
Returns:
Scope to control the end of the active period for
*span*. It is a programming error to neglect to call
Scope.close() on the returned instance.
"""
ctx = current_context()
if not ctx:
logger.error("Tried to activate scope outside of loggingcontext")
return Scope(None, span) # type: ignore[arg-type]
if ctx.scope is not None:
# start a new logging context as a child of the existing one.
# Doing so -- rather than updating the existing logcontext -- means that
# creating several concurrent spans under the same logcontext works
# correctly.
ctx = nested_logging_context("")
enter_logcontext = True
else:
# if there is no span currently associated with the current logcontext, we
# just store the scope in it.
#
# This feels a bit dubious, but it does hack around a problem where a
# span outlasts its parent logcontext (which would otherwise lead to
# "Re-starting finished log context" errors).
enter_logcontext = False
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
ctx.scope = scope
if enter_logcontext:
ctx.__enter__()
return scope
class _LogContextScope(Scope):
"""
A custom opentracing scope, associated with a LogContext
* When the scope is closed, the logcontext's active scope is reset to None.
and - if enter_logcontext was set - the logcontext is finished too.
"""
def __init__(
self,
manager: LogContextScopeManager,
span: Span,
logcontext: LoggingContext,
enter_logcontext: bool,
finish_on_close: bool,
):
"""
Args:
manager:
the manager that is responsible for this scope.
span:
the opentracing span which this scope represents the local
lifetime for.
logcontext:
the log context to which this scope is attached.
enter_logcontext:
if True the log context will be exited when the scope is finished
finish_on_close:
if True finish the span when the scope is closed
"""
super().__init__(manager, span)
self.logcontext = logcontext
self._finish_on_close = finish_on_close
self._enter_logcontext = enter_logcontext
def __str__(self) -> str:
return f"Scope<{self.span}>"
def close(self) -> None:
active_scope = self.manager.active
if active_scope is not self:
logger.error(
"Closing scope %s which is not the currently-active one %s",
self,
active_scope,
)
if self._finish_on_close:
self.span.finish()
self.logcontext.scope = None
if self._enter_logcontext:
self.logcontext.__exit__(None, None, None)

View File

@ -68,6 +68,11 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
try:
import opentracing
except ImportError:
opentracing = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
@ -225,6 +230,7 @@ def run_as_background_process(
func: Callable[..., Awaitable[Optional[R]]],
*args: Any,
bg_start_span: bool = True,
test_only_tracer: Optional["opentracing.Tracer"] = None,
**kwargs: Any,
) -> "defer.Deferred[Optional[R]]":
"""Run the given function in its own logcontext, with resource metrics
@ -250,6 +256,8 @@ def run_as_background_process(
bg_start_span: Whether to start an opentracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
test_only_tracer: Set the OpenTracing tracer to use. This is only useful for
tests.
args: positional args for func
kwargs: keyword args for func
@ -259,6 +267,12 @@ def run_as_background_process(
rules.
"""
# Since we track the tracing scope in the `LoggingContext`, before we move to the
# sentinel logcontext (or a new `LoggingContext`), grab the currently active
# tracing span (if any) so that we can create a cross-link to the background process
# trace.
original_active_tracing_span = active_span(tracer=test_only_tracer)
async def run() -> Optional[R]:
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
@ -276,8 +290,6 @@ def run_as_background_process(
) as logging_context:
try:
if bg_start_span:
original_active_tracing_span = active_span()
# If there is already an active span (e.g. because this background
# process was started as part of handling a request for example),
# because this is a long-running background task that may serve a
@ -308,6 +320,7 @@ def run_as_background_process(
# Create a root span for the background process (disconnected
# from other spans)
ignore_active_span=True,
tracer=test_only_tracer,
)
# Also add a span in the original request trace that cross-links
@ -324,8 +337,11 @@ def run_as_background_process(
f"start_bgproc.{desc}",
child_of=original_active_tracing_span,
ignore_active_span=True,
# Points to the background process span.
# Create the `FOLLOWS_FROM` reference to the background
# process span so there is a loose coupling between the two
# traces and it's easy to jump between.
contexts=[root_tracing_scope.span.context],
tracer=test_only_tracer,
):
pass
@ -341,6 +357,7 @@ def run_as_background_process(
# span so there is a loose coupling between the two
# traces and it's easy to jump between.
contexts=[original_active_tracing_span.context],
tracer=test_only_tracer,
)
# For easy usage down below, we create a context manager that
@ -359,6 +376,7 @@ def run_as_background_process(
tracing_scope = start_active_span(
f"bgproc.{desc}",
tags={SynapseTags.REQUEST_ID: str(logging_context)},
tracer=test_only_tracer,
)
else:
tracing_scope = nullcontext()

View File

@ -27,8 +27,8 @@ from twisted.python.failure import Failure
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
PreserveLoggingContext,
nested_logging_context,
set_current_context,
)
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import (
@ -422,14 +422,11 @@ class TaskScheduler:
"""
current_time = self._clock.time()
calling_context = set_current_context(task_log_context)
try:
with PreserveLoggingContext(task_log_context):
usage = task_log_context.get_resource_usage()
TaskScheduler._log_task_usage(
"continuing", task, usage, current_time - start_time
)
finally:
set_current_context(calling_context)
async def wrapper() -> None:
with nested_logging_context(task.id) as log_context:

View File

@ -19,7 +19,7 @@
#
#
from typing import Awaitable, Dict, cast
from typing import Awaitable, Optional, cast
from twisted.internet import defer
from twisted.internet.testing import MemoryReactorClock
@ -35,20 +35,25 @@ from synapse.logging.opentracing import (
tag_args,
trace_with_opname,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.clock import Clock
try:
import opentracing
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
except ImportError:
opentracing = None # type: ignore
ContextVarsScopeManager = None # type: ignore
from tests.server import get_clock
try:
import jaeger_client
except ImportError:
jaeger_client = None # type: ignore
try:
import opentracing
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
opentracing = None # type: ignore
LogContextScopeManager = None # type: ignore
import logging
from tests.unittest import TestCase
@ -56,7 +61,7 @@ from tests.unittest import TestCase
logger = logging.getLogger(__name__)
class TracingScopeTestCase(TestCase):
class LogContextScopeManagerTestCase(TestCase):
"""
Test that our tracing machinery works well in a variety of situations (especially
with Twisted's runtime and deferreds).
@ -67,7 +72,7 @@ class TracingScopeTestCase(TestCase):
opentracing backend is Jaeger.
"""
if opentracing is None:
if opentracing is None or LogContextScopeManager is None:
skip = "Requires opentracing" # type: ignore[unreachable]
if jaeger_client is None:
skip = "Requires jaeger_client" # type: ignore[unreachable]
@ -77,9 +82,8 @@ class TracingScopeTestCase(TestCase):
# global variables that power opentracing. We create our own tracer instance
# and test with it.
scope_manager = ContextVarsScopeManager()
config = jaeger_client.config.Config(
config={}, service_name="test", scope_manager=scope_manager
config={}, service_name="test", scope_manager=LogContextScopeManager()
)
self._reporter = jaeger_client.reporter.InMemoryReporter()
@ -220,144 +224,6 @@ class TracingScopeTestCase(TestCase):
[scopes[1].span, scopes[2].span, scopes[0].span],
)
def test_run_in_background_active_scope_still_available(self) -> None:
"""
Test that tasks running via `run_in_background` still have access to the
active tracing scope.
This is a regression test for a previous Synapse issue where the tracing scope
would `__exit__` and close before the `run_in_background` task completed and our
own previous custom `_LogContextScope.close(...)` would clear
`LoggingContext.scope` preventing further tracing spans from having the correct
parent.
"""
reactor = MemoryReactorClock()
# type-ignore: mypy-zope doesn't seem to recognise that `MemoryReactorClock`
# implements `ISynapseThreadlessReactor` (combination of the normal Twisted
# Reactor/Clock interfaces), via inheritance from
# `twisted.internet.testing.MemoryReactor` and `twisted.internet.testing.Clock`
# Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock`
# for testing purposes.
clock = Clock( # type: ignore[multiple-internal-clocks]
reactor, # type: ignore[arg-type]
server_name="test_server",
)
scope_map: Dict[str, opentracing.Scope] = {}
async def async_task() -> None:
root_scope = scope_map["root"]
root_context = cast(jaeger_client.SpanContext, root_scope.span.context)
self.assertEqual(
self._tracer.active_span,
root_scope.span,
"expected to inherit the root tracing scope from where this was run",
)
# Return control back to the reactor thread and wait an arbitrary amount
await clock.sleep(4)
# This is a key part of what we're testing! In a previous version of
# Synapse, we would lose the active span at this point.
self.assertEqual(
self._tracer.active_span,
root_scope.span,
"expected to still have a root tracing scope/span active",
)
# For complete-ness sake, let's also trace more sub-tasks here and assert
# they have the correct span parents as well (root)
# Start tracing some other sub-task.
#
# This is a key part of what we're testing! In a previous version of
# Synapse, it would have the incorrect span parents.
scope = start_active_span(
"task1",
tracer=self._tracer,
)
scope_map["task1"] = scope
# Ensure the span parent is pointing to the root scope
context = cast(jaeger_client.SpanContext, scope.span.context)
self.assertEqual(
context.parent_id,
root_context.span_id,
"expected task1 parent to be the root span",
)
# Ensure that the active span is our new sub-task now
self.assertEqual(self._tracer.active_span, scope.span)
# Return control back to the reactor thread and wait an arbitrary amount
await clock.sleep(4)
# We should still see the active span as the scope wasn't closed yet
self.assertEqual(self._tracer.active_span, scope.span)
scope.close()
async def root() -> None:
with start_active_span(
"root span",
tracer=self._tracer,
# We will close this off later. We're basically just mimicking the same
# pattern for how we handle requests. We pass the span off to the
# request for it to finish.
finish_on_close=False,
) as root_scope:
scope_map["root"] = root_scope
self.assertEqual(self._tracer.active_span, root_scope.span)
# Fire-and-forget a task
#
# XXX: The root scope context manager will `__exit__` before this task
# completes.
run_in_background(async_task)
# Because we used `run_in_background`, the active span should still be
# the root.
self.assertEqual(self._tracer.active_span, root_scope.span)
# We shouldn't see any active spans outside of the scope
self.assertIsNone(self._tracer.active_span)
with LoggingContext(name="root context", server_name="test_server"):
# Start the test off
d_root = defer.ensureDeferred(root())
# Let the tasks complete
reactor.pump((2,) * 8)
self.successResultOf(d_root)
# After we see all of the tasks are done (like a request when it
# `_finished_processing`), let's finish our root span
scope_map["root"].span.finish()
# Sanity check again: We shouldn't see any active spans leftover in this
# this context.
self.assertIsNone(self._tracer.active_span)
# The spans should be reported in order of their finishing: task 1, task 2,
# root.
#
# We use `assertIncludes` just as an easier way to see if items are missing or
# added. We assert the order just below
self.assertIncludes(
set(self._reporter.get_spans()),
{
scope_map["task1"].span,
scope_map["root"].span,
},
exact=True,
)
# This is where we actually assert the correct order
self.assertEqual(
self._reporter.get_spans(),
[
scope_map["task1"].span,
scope_map["root"].span,
],
)
def test_trace_decorator_sync(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
@ -455,3 +321,203 @@ class TracingScopeTestCase(TestCase):
[span.operation_name for span in self._reporter.get_spans()],
["fixture_awaitable_return_func"],
)
async def test_run_as_background_process_standalone(self) -> None:
"""
Test to make sure that the background process work starts its own trace.
"""
reactor, clock = get_clock()
callback_finished = False
active_span_in_callback: Optional[jaeger_client.Span] = None
async def bg_task() -> None:
nonlocal callback_finished, active_span_in_callback
try:
assert isinstance(self._tracer.active_span, jaeger_client.Span)
active_span_in_callback = self._tracer.active_span
finally:
# When exceptions happen, we still want to mark the callback as finished
# so that the test can complete and we see the underlying error.
callback_finished = True
# type-ignore: We ignore because the point is to test the bare function
run_as_background_process( # type: ignore[untracked-background-process]
desc="some-bg-task",
server_name="test_server",
func=bg_task,
test_only_tracer=self._tracer,
)
# Now wait for the background process to finish
while not callback_finished:
await clock.sleep(0)
self.assertTrue(
callback_finished,
"Callback never finished which means the test probably didn't wait long enough",
)
self.assertEqual(
active_span_in_callback.operation_name if active_span_in_callback else None,
"bgproc.some-bg-task",
"expected a new span to be started for the background task",
)
# The spans should be reported in order of their finishing.
#
# We use `assertIncludes` just as an easier way to see if items are missing or
# added. We assert the order just below
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
expected_spans = ["bgproc.some-bg-task"]
self.assertIncludes(
set(actual_spans),
set(expected_spans),
exact=True,
)
# This is where we actually assert the correct order
self.assertEqual(
actual_spans,
expected_spans,
)
async def test_run_as_background_process_cross_link(self) -> None:
"""
Test to make sure that the background process work has its own trace and is
disconnected from any currently active trace (like a request). But we still have
cross-links between the two traces if there was already an active trace/span when
we kicked off the background process.
"""
reactor, clock = get_clock()
callback_finished = False
active_span_in_callback: Optional[jaeger_client.Span] = None
async def bg_task() -> None:
nonlocal callback_finished, active_span_in_callback
try:
assert isinstance(self._tracer.active_span, jaeger_client.Span)
active_span_in_callback = self._tracer.active_span
finally:
# When exceptions happen, we still want to mark the callback as finished
# so that the test can complete and we see the underlying error.
callback_finished = True
with LoggingContext(name="some-request", server_name="test_server"):
with start_active_span(
"some-request",
tracer=self._tracer,
):
# type-ignore: We ignore because the point is to test the bare function
run_as_background_process( # type: ignore[untracked-background-process]
desc="some-bg-task",
server_name="test_server",
func=bg_task,
test_only_tracer=self._tracer,
)
# Now wait for the background process to finish
while not callback_finished:
await clock.sleep(0)
self.assertTrue(
callback_finished,
"Callback never finished which means the test probably didn't wait long enough",
)
# We start `bgproc.some-bg-task` and `bgproc_child.some-bg-task` (see
# `run_as_background_process` implementation for why). Either is fine but for
# now we expect the child as its the innermost one that was started.
self.assertEqual(
active_span_in_callback.operation_name if active_span_in_callback else None,
"bgproc_child.some-bg-task",
"expected a new span to be started for the background task",
)
# The spans should be reported in order of their finishing.
#
# We use `assertIncludes` just as an easier way to see if items are missing or
# added. We assert the order just below
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
expected_spans = [
"start_bgproc.some-bg-task",
"bgproc_child.some-bg-task",
"bgproc.some-bg-task",
"some-request",
]
self.assertIncludes(
set(actual_spans),
set(expected_spans),
exact=True,
)
# This is where we actually assert the correct order
self.assertEqual(
actual_spans,
expected_spans,
)
span_map = {span.operation_name: span for span in self._reporter.get_spans()}
span_id_to_friendly_name = {
span.span_id: span.operation_name for span in self._reporter.get_spans()
}
def get_span_friendly_name(span_id: Optional[int]) -> str:
if span_id is None:
return "None"
return span_id_to_friendly_name.get(span_id, f"unknown span {span_id}")
# Ensure the background process trace/span is disconnected from the request
# trace/span.
self.assertNotEqual(
get_span_friendly_name(span_map["bgproc.some-bg-task"].parent_id),
get_span_friendly_name(span_map["some-request"].span_id),
)
# We should see a cross-link in the request trace pointing to the background
# process trace.
#
# Make sure `start_bgproc.some-bg-task` is part of the request trace
self.assertEqual(
get_span_friendly_name(span_map["start_bgproc.some-bg-task"].parent_id),
get_span_friendly_name(span_map["some-request"].span_id),
)
# And has some references to the background process trace
self.assertIncludes(
{
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
else f"{reference.type}:None"
for reference in (
span_map["start_bgproc.some-bg-task"].references or []
)
},
{
f"follows_from:{get_span_friendly_name(span_map['bgproc.some-bg-task'].span_id)}"
},
exact=True,
)
# We should see a cross-link in the background process trace pointing to the
# request trace that kicked off the work.
#
# Make sure `start_bgproc.some-bg-task` is part of the request trace
self.assertEqual(
get_span_friendly_name(span_map["bgproc_child.some-bg-task"].parent_id),
get_span_friendly_name(span_map["bgproc.some-bg-task"].span_id),
)
# And has some references to the background process trace
self.assertIncludes(
{
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
else f"{reference.type}:None"
for reference in (
span_map["bgproc_child.some-bg-task"].references or []
)
},
{
f"follows_from:{get_span_friendly_name(span_map['some-request'].span_id)}"
},
exact=True,
)