mirror of
				https://github.com/element-hq/synapse.git
				synced 2025-10-29 00:02:41 -04:00 
			
		
		
		
	Revert https://github.com/element-hq/synapse/pull/18849 Go back to our custom `LogContextScopeManager` after trying OpenTracing's `ContextVarsScopeManager`. Fix https://github.com/element-hq/synapse/issues/19004 ### Why revert? For reference, with the normal reactor, `ContextVarsScopeManager` worked just as good as our custom `LogContextScopeManager` as far as I can tell (and even better in some cases). But since Twisted appears to not fully support `ContextVar`'s, it doesn't work as expected in all cases. Compounding things, `ContextVarsScopeManager` was causing errors with the experimental `SYNAPSE_ASYNC_IO_REACTOR` option. Since we're not getting the full benefit that we originally desired, we might as well revert and figure out alternatives for extending the logcontext lifetimes to support the use case we were trying to unlock (c.f. https://github.com/element-hq/synapse/pull/18804). See https://github.com/element-hq/synapse/issues/19004#issuecomment-3358052171 for more info. ### Does this require backporting and patch releases? No. Since `ContextVarsScopeManager` operates just as good with the normal reactor and was only causing actual errors with the experimental `SYNAPSE_ASYNC_IO_REACTOR` option, I don't think this requires us to backport and make patch releases at all. ### Maintain cross-links between main trace and background process work In order to maintain the functionality introduced in https://github.com/element-hq/synapse/pull/18932 (cross-links between the background process trace and currently active trace), we also needed a small change. Previously, when we were using `ContextVarsScopeManager`, it tracked the tracing scope across the logcontext changes without issue. Now that we're using our own custom `LogContextScopeManager` again, we need to capture the active span from the logcontext before we reset to the sentinel context because of the `PreserveLoggingContext()` below. Added some tests to ensure we maintain the `run_as_background` tracing behavior regardless of the tracing scope manager we use.
This commit is contained in:
		
							parent
							
								
									1c093509ce
								
							
						
					
					
						commit
						06a84f4fe0
					
				
							
								
								
									
										1
									
								
								changelog.d/19007.misc
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								changelog.d/19007.misc
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | ||||
| Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled. | ||||
| @ -56,6 +56,7 @@ from twisted.internet import defer, threads | ||||
| from twisted.python.threadpool import ThreadPool | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from synapse.logging.scopecontextmanager import _LogContextScope | ||||
|     from synapse.types import ISynapseReactor | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| @ -238,7 +239,14 @@ class _Sentinel: | ||||
|     we should always know which server the logs are coming from. | ||||
|     """ | ||||
| 
 | ||||
|     __slots__ = ["previous_context", "finished", "server_name", "request", "tag"] | ||||
|     __slots__ = [ | ||||
|         "previous_context", | ||||
|         "finished", | ||||
|         "scope", | ||||
|         "server_name", | ||||
|         "request", | ||||
|         "tag", | ||||
|     ] | ||||
| 
 | ||||
|     def __init__(self) -> None: | ||||
|         # Minimal set for compatibility with LoggingContext | ||||
| @ -246,6 +254,7 @@ class _Sentinel: | ||||
|         self.finished = False | ||||
|         self.server_name = "unknown_server_from_sentinel_context" | ||||
|         self.request = None | ||||
|         self.scope = None | ||||
|         self.tag = None | ||||
| 
 | ||||
|     def __str__(self) -> str: | ||||
| @ -303,6 +312,7 @@ class LoggingContext: | ||||
|         "finished", | ||||
|         "request", | ||||
|         "tag", | ||||
|         "scope", | ||||
|     ] | ||||
| 
 | ||||
|     def __init__( | ||||
| @ -327,6 +337,7 @@ class LoggingContext: | ||||
|         self.main_thread = get_thread_id() | ||||
|         self.request = None | ||||
|         self.tag = "" | ||||
|         self.scope: Optional["_LogContextScope"] = None | ||||
| 
 | ||||
|         # keep track of whether we have hit the __exit__ block for this context | ||||
|         # (suggesting that the the thing that created the context thinks it should | ||||
| @ -340,6 +351,9 @@ class LoggingContext: | ||||
|             # which request this corresponds to | ||||
|             self.request = self.parent_context.request | ||||
| 
 | ||||
|             # we also track the current scope: | ||||
|             self.scope = self.parent_context.scope | ||||
| 
 | ||||
|         if request is not None: | ||||
|             # the request param overrides the request from the parent context | ||||
|             self.request = request | ||||
|  | ||||
| @ -251,17 +251,18 @@ class _DummyTagNames: | ||||
| try: | ||||
|     import opentracing | ||||
|     import opentracing.tags | ||||
|     from opentracing.scope_managers.contextvars import ContextVarsScopeManager | ||||
| 
 | ||||
|     tags = opentracing.tags | ||||
| except ImportError: | ||||
|     opentracing = None  # type: ignore[assignment] | ||||
|     tags = _DummyTagNames  # type: ignore[assignment] | ||||
|     ContextVarsScopeManager = None  # type: ignore | ||||
| try: | ||||
|     from jaeger_client import Config as JaegerConfig | ||||
| 
 | ||||
|     from synapse.logging.scopecontextmanager import LogContextScopeManager | ||||
| except ImportError: | ||||
|     JaegerConfig = None  # type: ignore | ||||
|     LogContextScopeManager = None  # type: ignore | ||||
| 
 | ||||
| 
 | ||||
| try: | ||||
| @ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None: | ||||
|     config = JaegerConfig( | ||||
|         config=jaeger_config, | ||||
|         service_name=f"{hs.config.server.server_name} {instance_name_by_type}", | ||||
|         scope_manager=ContextVarsScopeManager(), | ||||
|         scope_manager=LogContextScopeManager(), | ||||
|         metrics_factory=PrometheusMetricsFactory(), | ||||
|     ) | ||||
| 
 | ||||
| @ -683,9 +684,21 @@ def start_active_span_from_edu( | ||||
| 
 | ||||
| # Opentracing setters for tags, logs, etc | ||||
| @only_if_tracing | ||||
| def active_span() -> Optional["opentracing.Span"]: | ||||
|     """Get the currently active span, if any""" | ||||
|     return opentracing.tracer.active_span | ||||
| def active_span( | ||||
|     *, | ||||
|     tracer: Optional["opentracing.Tracer"] = None, | ||||
| ) -> Optional["opentracing.Span"]: | ||||
|     """ | ||||
|     Get the currently active span, if any | ||||
| 
 | ||||
|     Args: | ||||
|         tracer: override the opentracing tracer. By default the global tracer is used. | ||||
|     """ | ||||
|     if tracer is None: | ||||
|         # use the global tracer by default | ||||
|         tracer = opentracing.tracer | ||||
| 
 | ||||
|     return tracer.active_span | ||||
| 
 | ||||
| 
 | ||||
| @ensure_active_span("set a tag") | ||||
|  | ||||
							
								
								
									
										161
									
								
								synapse/logging/scopecontextmanager.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										161
									
								
								synapse/logging/scopecontextmanager.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,161 @@ | ||||
| # | ||||
| # This file is licensed under the Affero General Public License (AGPL) version 3. | ||||
| # | ||||
| # Copyright 2019 The Matrix.org Foundation C.I.C. | ||||
| # Copyright (C) 2023 New Vector, Ltd | ||||
| # | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as | ||||
| # published by the Free Software Foundation, either version 3 of the | ||||
| # License, or (at your option) any later version. | ||||
| # | ||||
| # See the GNU Affero General Public License for more details: | ||||
| # <https://www.gnu.org/licenses/agpl-3.0.html>. | ||||
| # | ||||
| # Originally licensed under the Apache License, Version 2.0: | ||||
| # <http://www.apache.org/licenses/LICENSE-2.0>. | ||||
| # | ||||
| # [This file includes modifications made by New Vector Limited] | ||||
| # | ||||
| # | ||||
| 
 | ||||
| import logging | ||||
| from typing import Optional | ||||
| 
 | ||||
| from opentracing import Scope, ScopeManager, Span | ||||
| 
 | ||||
| from synapse.logging.context import ( | ||||
|     LoggingContext, | ||||
|     current_context, | ||||
|     nested_logging_context, | ||||
| ) | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class LogContextScopeManager(ScopeManager): | ||||
|     """ | ||||
|     The LogContextScopeManager tracks the active scope in opentracing | ||||
|     by using the log contexts which are native to synapse. This is so | ||||
|     that the basic opentracing api can be used across twisted defereds. | ||||
| 
 | ||||
|     It would be nice just to use opentracing's ContextVarsScopeManager, | ||||
|     but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self) -> None: | ||||
|         pass | ||||
| 
 | ||||
|     @property | ||||
|     def active(self) -> Optional[Scope]: | ||||
|         """ | ||||
|         Returns the currently active Scope which can be used to access the | ||||
|         currently active Scope.span. | ||||
|         If there is a non-null Scope, its wrapped Span | ||||
|         becomes an implicit parent of any newly-created Span at | ||||
|         Tracer.start_active_span() time. | ||||
| 
 | ||||
|         Return: | ||||
|             The Scope that is active, or None if not available. | ||||
|         """ | ||||
|         ctx = current_context() | ||||
|         return ctx.scope | ||||
| 
 | ||||
|     def activate(self, span: Span, finish_on_close: bool) -> Scope: | ||||
|         """ | ||||
|         Makes a Span active. | ||||
|         Args | ||||
|             span: the span that should become active. | ||||
|             finish_on_close: whether Span should be automatically finished when | ||||
|                 Scope.close() is called. | ||||
| 
 | ||||
|         Returns: | ||||
|             Scope to control the end of the active period for | ||||
|             *span*. It is a programming error to neglect to call | ||||
|             Scope.close() on the returned instance. | ||||
|         """ | ||||
| 
 | ||||
|         ctx = current_context() | ||||
| 
 | ||||
|         if not ctx: | ||||
|             logger.error("Tried to activate scope outside of loggingcontext") | ||||
|             return Scope(None, span)  # type: ignore[arg-type] | ||||
| 
 | ||||
|         if ctx.scope is not None: | ||||
|             # start a new logging context as a child of the existing one. | ||||
|             # Doing so -- rather than updating the existing logcontext -- means that | ||||
|             # creating several concurrent spans under the same logcontext works | ||||
|             # correctly. | ||||
|             ctx = nested_logging_context("") | ||||
|             enter_logcontext = True | ||||
|         else: | ||||
|             # if there is no span currently associated with the current logcontext, we | ||||
|             # just store the scope in it. | ||||
|             # | ||||
|             # This feels a bit dubious, but it does hack around a problem where a | ||||
|             # span outlasts its parent logcontext (which would otherwise lead to | ||||
|             # "Re-starting finished log context" errors). | ||||
|             enter_logcontext = False | ||||
| 
 | ||||
|         scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) | ||||
|         ctx.scope = scope | ||||
|         if enter_logcontext: | ||||
|             ctx.__enter__() | ||||
| 
 | ||||
|         return scope | ||||
| 
 | ||||
| 
 | ||||
| class _LogContextScope(Scope): | ||||
|     """ | ||||
|     A custom opentracing scope, associated with a LogContext | ||||
| 
 | ||||
|       * When the scope is closed, the logcontext's active scope is reset to None. | ||||
|         and - if enter_logcontext was set - the logcontext is finished too. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         manager: LogContextScopeManager, | ||||
|         span: Span, | ||||
|         logcontext: LoggingContext, | ||||
|         enter_logcontext: bool, | ||||
|         finish_on_close: bool, | ||||
|     ): | ||||
|         """ | ||||
|         Args: | ||||
|             manager: | ||||
|                 the manager that is responsible for this scope. | ||||
|             span: | ||||
|                 the opentracing span which this scope represents the local | ||||
|                 lifetime for. | ||||
|             logcontext: | ||||
|                 the log context to which this scope is attached. | ||||
|             enter_logcontext: | ||||
|                 if True the log context will be exited when the scope is finished | ||||
|             finish_on_close: | ||||
|                 if True finish the span when the scope is closed | ||||
|         """ | ||||
|         super().__init__(manager, span) | ||||
|         self.logcontext = logcontext | ||||
|         self._finish_on_close = finish_on_close | ||||
|         self._enter_logcontext = enter_logcontext | ||||
| 
 | ||||
|     def __str__(self) -> str: | ||||
|         return f"Scope<{self.span}>" | ||||
| 
 | ||||
|     def close(self) -> None: | ||||
|         active_scope = self.manager.active | ||||
|         if active_scope is not self: | ||||
|             logger.error( | ||||
|                 "Closing scope %s which is not the currently-active one %s", | ||||
|                 self, | ||||
|                 active_scope, | ||||
|             ) | ||||
| 
 | ||||
|         if self._finish_on_close: | ||||
|             self.span.finish() | ||||
| 
 | ||||
|         self.logcontext.scope = None | ||||
| 
 | ||||
|         if self._enter_logcontext: | ||||
|             self.logcontext.__exit__(None, None, None) | ||||
| @ -68,6 +68,11 @@ if TYPE_CHECKING: | ||||
| 
 | ||||
|     from synapse.server import HomeServer | ||||
| 
 | ||||
|     try: | ||||
|         import opentracing | ||||
|     except ImportError: | ||||
|         opentracing = None  # type: ignore[assignment] | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| @ -225,6 +230,7 @@ def run_as_background_process( | ||||
|     func: Callable[..., Awaitable[Optional[R]]], | ||||
|     *args: Any, | ||||
|     bg_start_span: bool = True, | ||||
|     test_only_tracer: Optional["opentracing.Tracer"] = None, | ||||
|     **kwargs: Any, | ||||
| ) -> "defer.Deferred[Optional[R]]": | ||||
|     """Run the given function in its own logcontext, with resource metrics | ||||
| @ -250,6 +256,8 @@ def run_as_background_process( | ||||
|         bg_start_span: Whether to start an opentracing span. Defaults to True. | ||||
|             Should only be disabled for processes that will not log to or tag | ||||
|             a span. | ||||
|         test_only_tracer: Set the OpenTracing tracer to use. This is only useful for | ||||
|             tests. | ||||
|         args: positional args for func | ||||
|         kwargs: keyword args for func | ||||
| 
 | ||||
| @ -259,6 +267,12 @@ def run_as_background_process( | ||||
|         rules. | ||||
|     """ | ||||
| 
 | ||||
|     # Since we track the tracing scope in the `LoggingContext`, before we move to the | ||||
|     # sentinel logcontext (or a new `LoggingContext`), grab the currently active | ||||
|     # tracing span (if any) so that we can create a cross-link to the background process | ||||
|     # trace. | ||||
|     original_active_tracing_span = active_span(tracer=test_only_tracer) | ||||
| 
 | ||||
|     async def run() -> Optional[R]: | ||||
|         with _bg_metrics_lock: | ||||
|             count = _background_process_counts.get(desc, 0) | ||||
| @ -276,8 +290,6 @@ def run_as_background_process( | ||||
|         ) as logging_context: | ||||
|             try: | ||||
|                 if bg_start_span: | ||||
|                     original_active_tracing_span = active_span() | ||||
| 
 | ||||
|                     # If there is already an active span (e.g. because this background | ||||
|                     # process was started as part of handling a request for example), | ||||
|                     # because this is a long-running background task that may serve a | ||||
| @ -308,6 +320,7 @@ def run_as_background_process( | ||||
|                             # Create a root span for the background process (disconnected | ||||
|                             # from other spans) | ||||
|                             ignore_active_span=True, | ||||
|                             tracer=test_only_tracer, | ||||
|                         ) | ||||
| 
 | ||||
|                         # Also add a span in the original request trace that cross-links | ||||
| @ -324,8 +337,11 @@ def run_as_background_process( | ||||
|                             f"start_bgproc.{desc}", | ||||
|                             child_of=original_active_tracing_span, | ||||
|                             ignore_active_span=True, | ||||
|                             # Points to the background process span. | ||||
|                             # Create the `FOLLOWS_FROM` reference to the background | ||||
|                             # process span so there is a loose coupling between the two | ||||
|                             # traces and it's easy to jump between. | ||||
|                             contexts=[root_tracing_scope.span.context], | ||||
|                             tracer=test_only_tracer, | ||||
|                         ): | ||||
|                             pass | ||||
| 
 | ||||
| @ -341,6 +357,7 @@ def run_as_background_process( | ||||
|                             # span so there is a loose coupling between the two | ||||
|                             # traces and it's easy to jump between. | ||||
|                             contexts=[original_active_tracing_span.context], | ||||
|                             tracer=test_only_tracer, | ||||
|                         ) | ||||
| 
 | ||||
|                         # For easy usage down below, we create a context manager that | ||||
| @ -359,6 +376,7 @@ def run_as_background_process( | ||||
|                         tracing_scope = start_active_span( | ||||
|                             f"bgproc.{desc}", | ||||
|                             tags={SynapseTags.REQUEST_ID: str(logging_context)}, | ||||
|                             tracer=test_only_tracer, | ||||
|                         ) | ||||
|                 else: | ||||
|                     tracing_scope = nullcontext() | ||||
|  | ||||
| @ -19,7 +19,7 @@ | ||||
| # | ||||
| # | ||||
| 
 | ||||
| from typing import Awaitable, Dict, cast | ||||
| from typing import Awaitable, Optional, cast | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| from twisted.internet.testing import MemoryReactorClock | ||||
| @ -35,20 +35,25 @@ from synapse.logging.opentracing import ( | ||||
|     tag_args, | ||||
|     trace_with_opname, | ||||
| ) | ||||
| from synapse.metrics.background_process_metrics import run_as_background_process | ||||
| from synapse.util.clock import Clock | ||||
| 
 | ||||
| try: | ||||
|     import opentracing | ||||
|     from opentracing.scope_managers.contextvars import ContextVarsScopeManager | ||||
| except ImportError: | ||||
|     opentracing = None  # type: ignore | ||||
|     ContextVarsScopeManager = None  # type: ignore | ||||
| from tests.server import get_clock | ||||
| 
 | ||||
| try: | ||||
|     import jaeger_client | ||||
| except ImportError: | ||||
|     jaeger_client = None  # type: ignore | ||||
| 
 | ||||
| 
 | ||||
| try: | ||||
|     import opentracing | ||||
| 
 | ||||
|     from synapse.logging.scopecontextmanager import LogContextScopeManager | ||||
| except ImportError: | ||||
|     opentracing = None  # type: ignore | ||||
|     LogContextScopeManager = None  # type: ignore | ||||
| 
 | ||||
| import logging | ||||
| 
 | ||||
| from tests.unittest import TestCase | ||||
| @ -56,7 +61,7 @@ from tests.unittest import TestCase | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class TracingScopeTestCase(TestCase): | ||||
| class LogContextScopeManagerTestCase(TestCase): | ||||
|     """ | ||||
|     Test that our tracing machinery works well in a variety of situations (especially | ||||
|     with Twisted's runtime and deferreds). | ||||
| @ -67,7 +72,7 @@ class TracingScopeTestCase(TestCase): | ||||
|     opentracing backend is Jaeger. | ||||
|     """ | ||||
| 
 | ||||
|     if opentracing is None: | ||||
|     if opentracing is None or LogContextScopeManager is None: | ||||
|         skip = "Requires opentracing"  # type: ignore[unreachable] | ||||
|     if jaeger_client is None: | ||||
|         skip = "Requires jaeger_client"  # type: ignore[unreachable] | ||||
| @ -77,9 +82,8 @@ class TracingScopeTestCase(TestCase): | ||||
|         # global variables that power opentracing. We create our own tracer instance | ||||
|         # and test with it. | ||||
| 
 | ||||
|         scope_manager = ContextVarsScopeManager() | ||||
|         config = jaeger_client.config.Config( | ||||
|             config={}, service_name="test", scope_manager=scope_manager | ||||
|             config={}, service_name="test", scope_manager=LogContextScopeManager() | ||||
|         ) | ||||
| 
 | ||||
|         self._reporter = jaeger_client.reporter.InMemoryReporter() | ||||
| @ -220,144 +224,6 @@ class TracingScopeTestCase(TestCase): | ||||
|             [scopes[1].span, scopes[2].span, scopes[0].span], | ||||
|         ) | ||||
| 
 | ||||
|     def test_run_in_background_active_scope_still_available(self) -> None: | ||||
|         """ | ||||
|         Test that tasks running via `run_in_background` still have access to the | ||||
|         active tracing scope. | ||||
| 
 | ||||
|         This is a regression test for a previous Synapse issue where the tracing scope | ||||
|         would `__exit__` and close before the `run_in_background` task completed and our | ||||
|         own previous custom `_LogContextScope.close(...)` would clear | ||||
|         `LoggingContext.scope` preventing further tracing spans from having the correct | ||||
|         parent. | ||||
|         """ | ||||
|         reactor = MemoryReactorClock() | ||||
|         # type-ignore: mypy-zope doesn't seem to recognise that `MemoryReactorClock` | ||||
|         # implements `ISynapseThreadlessReactor` (combination of the normal Twisted | ||||
|         # Reactor/Clock interfaces), via inheritance from | ||||
|         # `twisted.internet.testing.MemoryReactor` and `twisted.internet.testing.Clock` | ||||
|         # Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock` | ||||
|         # for testing purposes. | ||||
|         clock = Clock(  # type: ignore[multiple-internal-clocks] | ||||
|             reactor,  # type: ignore[arg-type] | ||||
|             server_name="test_server", | ||||
|         ) | ||||
| 
 | ||||
|         scope_map: Dict[str, opentracing.Scope] = {} | ||||
| 
 | ||||
|         async def async_task() -> None: | ||||
|             root_scope = scope_map["root"] | ||||
|             root_context = cast(jaeger_client.SpanContext, root_scope.span.context) | ||||
| 
 | ||||
|             self.assertEqual( | ||||
|                 self._tracer.active_span, | ||||
|                 root_scope.span, | ||||
|                 "expected to inherit the root tracing scope from where this was run", | ||||
|             ) | ||||
| 
 | ||||
|             # Return control back to the reactor thread and wait an arbitrary amount | ||||
|             await clock.sleep(4) | ||||
| 
 | ||||
|             # This is a key part of what we're testing! In a previous version of | ||||
|             # Synapse, we would lose the active span at this point. | ||||
|             self.assertEqual( | ||||
|                 self._tracer.active_span, | ||||
|                 root_scope.span, | ||||
|                 "expected to still have a root tracing scope/span active", | ||||
|             ) | ||||
| 
 | ||||
|             # For complete-ness sake, let's also trace more sub-tasks here and assert | ||||
|             # they have the correct span parents as well (root) | ||||
| 
 | ||||
|             # Start tracing some other sub-task. | ||||
|             # | ||||
|             # This is a key part of what we're testing! In a previous version of | ||||
|             # Synapse, it would have the incorrect span parents. | ||||
|             scope = start_active_span( | ||||
|                 "task1", | ||||
|                 tracer=self._tracer, | ||||
|             ) | ||||
|             scope_map["task1"] = scope | ||||
| 
 | ||||
|             # Ensure the span parent is pointing to the root scope | ||||
|             context = cast(jaeger_client.SpanContext, scope.span.context) | ||||
|             self.assertEqual( | ||||
|                 context.parent_id, | ||||
|                 root_context.span_id, | ||||
|                 "expected task1 parent to be the root span", | ||||
|             ) | ||||
| 
 | ||||
|             # Ensure that the active span is our new sub-task now | ||||
|             self.assertEqual(self._tracer.active_span, scope.span) | ||||
|             # Return control back to the reactor thread and wait an arbitrary amount | ||||
|             await clock.sleep(4) | ||||
|             # We should still see the active span as the scope wasn't closed yet | ||||
|             self.assertEqual(self._tracer.active_span, scope.span) | ||||
|             scope.close() | ||||
| 
 | ||||
|         async def root() -> None: | ||||
|             with start_active_span( | ||||
|                 "root span", | ||||
|                 tracer=self._tracer, | ||||
|                 # We will close this off later. We're basically just mimicking the same | ||||
|                 # pattern for how we handle requests. We pass the span off to the | ||||
|                 # request for it to finish. | ||||
|                 finish_on_close=False, | ||||
|             ) as root_scope: | ||||
|                 scope_map["root"] = root_scope | ||||
|                 self.assertEqual(self._tracer.active_span, root_scope.span) | ||||
| 
 | ||||
|                 # Fire-and-forget a task | ||||
|                 # | ||||
|                 # XXX: The root scope context manager will `__exit__` before this task | ||||
|                 # completes. | ||||
|                 run_in_background(async_task) | ||||
| 
 | ||||
|                 # Because we used `run_in_background`, the active span should still be | ||||
|                 # the root. | ||||
|                 self.assertEqual(self._tracer.active_span, root_scope.span) | ||||
| 
 | ||||
|             # We shouldn't see any active spans outside of the scope | ||||
|             self.assertIsNone(self._tracer.active_span) | ||||
| 
 | ||||
|         with LoggingContext(name="root context", server_name="test_server"): | ||||
|             # Start the test off | ||||
|             d_root = defer.ensureDeferred(root()) | ||||
| 
 | ||||
|             # Let the tasks complete | ||||
|             reactor.pump((2,) * 8) | ||||
|             self.successResultOf(d_root) | ||||
| 
 | ||||
|             # After we see all of the tasks are done (like a request when it | ||||
|             # `_finished_processing`), let's finish our root span | ||||
|             scope_map["root"].span.finish() | ||||
| 
 | ||||
|             # Sanity check again: We shouldn't see any active spans leftover in this | ||||
|             # this context. | ||||
|             self.assertIsNone(self._tracer.active_span) | ||||
| 
 | ||||
|         # The spans should be reported in order of their finishing: task 1, task 2, | ||||
|         # root. | ||||
|         # | ||||
|         # We use `assertIncludes` just as an easier way to see if items are missing or | ||||
|         # added. We assert the order just below | ||||
|         self.assertIncludes( | ||||
|             set(self._reporter.get_spans()), | ||||
|             { | ||||
|                 scope_map["task1"].span, | ||||
|                 scope_map["root"].span, | ||||
|             }, | ||||
|             exact=True, | ||||
|         ) | ||||
|         # This is where we actually assert the correct order | ||||
|         self.assertEqual( | ||||
|             self._reporter.get_spans(), | ||||
|             [ | ||||
|                 scope_map["task1"].span, | ||||
|                 scope_map["root"].span, | ||||
|             ], | ||||
|         ) | ||||
| 
 | ||||
|     def test_trace_decorator_sync(self) -> None: | ||||
|         """ | ||||
|         Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args` | ||||
| @ -455,3 +321,203 @@ class TracingScopeTestCase(TestCase): | ||||
|             [span.operation_name for span in self._reporter.get_spans()], | ||||
|             ["fixture_awaitable_return_func"], | ||||
|         ) | ||||
| 
 | ||||
|     async def test_run_as_background_process_standalone(self) -> None: | ||||
|         """ | ||||
|         Test to make sure that the background process work starts its own trace. | ||||
|         """ | ||||
|         reactor, clock = get_clock() | ||||
| 
 | ||||
|         callback_finished = False | ||||
|         active_span_in_callback: Optional[jaeger_client.Span] = None | ||||
| 
 | ||||
|         async def bg_task() -> None: | ||||
|             nonlocal callback_finished, active_span_in_callback | ||||
|             try: | ||||
|                 assert isinstance(self._tracer.active_span, jaeger_client.Span) | ||||
|                 active_span_in_callback = self._tracer.active_span | ||||
|             finally: | ||||
|                 # When exceptions happen, we still want to mark the callback as finished | ||||
|                 # so that the test can complete and we see the underlying error. | ||||
|                 callback_finished = True | ||||
| 
 | ||||
|         # type-ignore: We ignore because the point is to test the bare function | ||||
|         run_as_background_process(  # type: ignore[untracked-background-process] | ||||
|             desc="some-bg-task", | ||||
|             server_name="test_server", | ||||
|             func=bg_task, | ||||
|             test_only_tracer=self._tracer, | ||||
|         ) | ||||
| 
 | ||||
|         # Now wait for the background process to finish | ||||
|         while not callback_finished: | ||||
|             await clock.sleep(0) | ||||
| 
 | ||||
|         self.assertTrue( | ||||
|             callback_finished, | ||||
|             "Callback never finished which means the test probably didn't wait long enough", | ||||
|         ) | ||||
| 
 | ||||
|         self.assertEqual( | ||||
|             active_span_in_callback.operation_name if active_span_in_callback else None, | ||||
|             "bgproc.some-bg-task", | ||||
|             "expected a new span to be started for the background task", | ||||
|         ) | ||||
| 
 | ||||
|         # The spans should be reported in order of their finishing. | ||||
|         # | ||||
|         # We use `assertIncludes` just as an easier way to see if items are missing or | ||||
|         # added. We assert the order just below | ||||
|         actual_spans = [span.operation_name for span in self._reporter.get_spans()] | ||||
|         expected_spans = ["bgproc.some-bg-task"] | ||||
|         self.assertIncludes( | ||||
|             set(actual_spans), | ||||
|             set(expected_spans), | ||||
|             exact=True, | ||||
|         ) | ||||
|         # This is where we actually assert the correct order | ||||
|         self.assertEqual( | ||||
|             actual_spans, | ||||
|             expected_spans, | ||||
|         ) | ||||
| 
 | ||||
|     async def test_run_as_background_process_cross_link(self) -> None: | ||||
|         """ | ||||
|         Test to make sure that the background process work has its own trace and is | ||||
|         disconnected from any currently active trace (like a request). But we still have | ||||
|         cross-links between the two traces if there was already an active trace/span when | ||||
|         we kicked off the background process. | ||||
|         """ | ||||
|         reactor, clock = get_clock() | ||||
| 
 | ||||
|         callback_finished = False | ||||
|         active_span_in_callback: Optional[jaeger_client.Span] = None | ||||
| 
 | ||||
|         async def bg_task() -> None: | ||||
|             nonlocal callback_finished, active_span_in_callback | ||||
|             try: | ||||
|                 assert isinstance(self._tracer.active_span, jaeger_client.Span) | ||||
|                 active_span_in_callback = self._tracer.active_span | ||||
|             finally: | ||||
|                 # When exceptions happen, we still want to mark the callback as finished | ||||
|                 # so that the test can complete and we see the underlying error. | ||||
|                 callback_finished = True | ||||
| 
 | ||||
|         with LoggingContext(name="some-request", server_name="test_server"): | ||||
|             with start_active_span( | ||||
|                 "some-request", | ||||
|                 tracer=self._tracer, | ||||
|             ): | ||||
|                 # type-ignore: We ignore because the point is to test the bare function | ||||
|                 run_as_background_process(  # type: ignore[untracked-background-process] | ||||
|                     desc="some-bg-task", | ||||
|                     server_name="test_server", | ||||
|                     func=bg_task, | ||||
|                     test_only_tracer=self._tracer, | ||||
|                 ) | ||||
| 
 | ||||
|         # Now wait for the background process to finish | ||||
|         while not callback_finished: | ||||
|             await clock.sleep(0) | ||||
| 
 | ||||
|         self.assertTrue( | ||||
|             callback_finished, | ||||
|             "Callback never finished which means the test probably didn't wait long enough", | ||||
|         ) | ||||
| 
 | ||||
|         # We start `bgproc.some-bg-task` and `bgproc_child.some-bg-task` (see | ||||
|         # `run_as_background_process` implementation for why). Either is fine but for | ||||
|         # now we expect the child as its the innermost one that was started. | ||||
|         self.assertEqual( | ||||
|             active_span_in_callback.operation_name if active_span_in_callback else None, | ||||
|             "bgproc_child.some-bg-task", | ||||
|             "expected a new span to be started for the background task", | ||||
|         ) | ||||
| 
 | ||||
|         # The spans should be reported in order of their finishing. | ||||
|         # | ||||
|         # We use `assertIncludes` just as an easier way to see if items are missing or | ||||
|         # added. We assert the order just below | ||||
|         actual_spans = [span.operation_name for span in self._reporter.get_spans()] | ||||
|         expected_spans = [ | ||||
|             "start_bgproc.some-bg-task", | ||||
|             "bgproc_child.some-bg-task", | ||||
|             "bgproc.some-bg-task", | ||||
|             "some-request", | ||||
|         ] | ||||
|         self.assertIncludes( | ||||
|             set(actual_spans), | ||||
|             set(expected_spans), | ||||
|             exact=True, | ||||
|         ) | ||||
|         # This is where we actually assert the correct order | ||||
|         self.assertEqual( | ||||
|             actual_spans, | ||||
|             expected_spans, | ||||
|         ) | ||||
| 
 | ||||
|         span_map = {span.operation_name: span for span in self._reporter.get_spans()} | ||||
|         span_id_to_friendly_name = { | ||||
|             span.span_id: span.operation_name for span in self._reporter.get_spans() | ||||
|         } | ||||
| 
 | ||||
|         def get_span_friendly_name(span_id: Optional[int]) -> str: | ||||
|             if span_id is None: | ||||
|                 return "None" | ||||
| 
 | ||||
|             return span_id_to_friendly_name.get(span_id, f"unknown span {span_id}") | ||||
| 
 | ||||
|         # Ensure the background process trace/span is disconnected from the request | ||||
|         # trace/span. | ||||
|         self.assertNotEqual( | ||||
|             get_span_friendly_name(span_map["bgproc.some-bg-task"].parent_id), | ||||
|             get_span_friendly_name(span_map["some-request"].span_id), | ||||
|         ) | ||||
| 
 | ||||
|         # We should see a cross-link in the request trace pointing to the background | ||||
|         # process trace. | ||||
|         # | ||||
|         # Make sure `start_bgproc.some-bg-task` is part of the request trace | ||||
|         self.assertEqual( | ||||
|             get_span_friendly_name(span_map["start_bgproc.some-bg-task"].parent_id), | ||||
|             get_span_friendly_name(span_map["some-request"].span_id), | ||||
|         ) | ||||
|         # And has some references to the background process trace | ||||
|         self.assertIncludes( | ||||
|             { | ||||
|                 f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}" | ||||
|                 if isinstance(reference.referenced_context, jaeger_client.SpanContext) | ||||
|                 else f"{reference.type}:None" | ||||
|                 for reference in ( | ||||
|                     span_map["start_bgproc.some-bg-task"].references or [] | ||||
|                 ) | ||||
|             }, | ||||
|             { | ||||
|                 f"follows_from:{get_span_friendly_name(span_map['bgproc.some-bg-task'].span_id)}" | ||||
|             }, | ||||
|             exact=True, | ||||
|         ) | ||||
| 
 | ||||
|         # We should see a cross-link in the background process trace pointing to the | ||||
|         # request trace that kicked off the work. | ||||
|         # | ||||
|         # Make sure `start_bgproc.some-bg-task` is part of the request trace | ||||
|         self.assertEqual( | ||||
|             get_span_friendly_name(span_map["bgproc_child.some-bg-task"].parent_id), | ||||
|             get_span_friendly_name(span_map["bgproc.some-bg-task"].span_id), | ||||
|         ) | ||||
|         # And has some references to the background process trace | ||||
|         self.assertIncludes( | ||||
|             { | ||||
|                 f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}" | ||||
|                 if isinstance(reference.referenced_context, jaeger_client.SpanContext) | ||||
|                 else f"{reference.type}:None" | ||||
|                 for reference in ( | ||||
|                     span_map["bgproc_child.some-bg-task"].references or [] | ||||
|                 ) | ||||
|             }, | ||||
|             { | ||||
|                 f"follows_from:{get_span_friendly_name(span_map['some-request'].span_id)}" | ||||
|             }, | ||||
|             exact=True, | ||||
|         ) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user