mirror of
https://github.com/element-hq/synapse.git
synced 2025-10-03 00:01:04 -04:00
Compare commits
9 Commits
0b70e52351
...
2fad394c51
Author | SHA1 | Date | |
---|---|---|---|
|
2fad394c51 | ||
|
70c044db8e | ||
|
6835e7be0d | ||
|
d27ff161f5 | ||
|
06a84f4fe0 | ||
|
1c093509ce | ||
|
e5f500140c | ||
|
dff8c6934d | ||
|
f60859ca72 |
1
changelog.d/18903.misc
Normal file
1
changelog.d/18903.misc
Normal file
@ -0,0 +1 @@
|
||||
Wrap the Rust HTTP client with `make_deferred_yieldable` so it follows Synapse logcontext rules.
|
1
changelog.d/18966.misc
Normal file
1
changelog.d/18966.misc
Normal file
@ -0,0 +1 @@
|
||||
Add debug logs wherever we change current logcontext.
|
1
changelog.d/18989.removal
Normal file
1
changelog.d/18989.removal
Normal file
@ -0,0 +1 @@
|
||||
Remove deprecated `LoggingContext.set_current_context`/`LoggingContext.current_context` methods which already have equivalent bare methods in `synapse.logging.context`.
|
1
changelog.d/18990.misc
Normal file
1
changelog.d/18990.misc
Normal file
@ -0,0 +1 @@
|
||||
Switch task scheduler from raw logcontext manipulation to using the dedicated logcontext utils.
|
1
changelog.d/19007.misc
Normal file
1
changelog.d/19007.misc
Normal file
@ -0,0 +1 @@
|
||||
Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled.
|
@ -548,3 +548,19 @@ chain are dropped. Dropping the the reference to an awaitable you're
|
||||
supposed to be awaiting is bad practice, so this doesn't
|
||||
actually happen too much. Unfortunately, when it does happen, it will
|
||||
lead to leaked logcontexts which are incredibly hard to track down.
|
||||
|
||||
|
||||
## Debugging logcontext issues
|
||||
|
||||
Debugging logcontext issues can be tricky as leaking or losing a logcontext will surface
|
||||
downstream and can point to an unrelated part of the codebase. It's best to enable debug
|
||||
logging for `synapse.logging.context.debug` (needs to be explicitly configured) and go
|
||||
backwards in the logs from the point where the issue is observed to find the root cause.
|
||||
|
||||
`log.config.yaml`
|
||||
```yaml
|
||||
loggers:
|
||||
# Unlike other loggers, this one needs to be explicitly configured to see debug logs.
|
||||
synapse.logging.context.debug:
|
||||
level: DEBUG
|
||||
```
|
||||
|
@ -12,7 +12,7 @@
|
||||
* <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
*/
|
||||
|
||||
use std::{collections::HashMap, future::Future};
|
||||
use std::{collections::HashMap, future::Future, sync::OnceLock};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::TryStreamExt;
|
||||
@ -299,5 +299,22 @@ where
|
||||
});
|
||||
});
|
||||
|
||||
Ok(deferred)
|
||||
// Make the deferred follow the Synapse logcontext rules
|
||||
make_deferred_yieldable(py, &deferred)
|
||||
}
|
||||
|
||||
static MAKE_DEFERRED_YIELDABLE: OnceLock<pyo3::Py<pyo3::PyAny>> = OnceLock::new();
|
||||
|
||||
/// Given a deferred, make it follow the Synapse logcontext rules
|
||||
fn make_deferred_yieldable<'py>(
|
||||
py: Python<'py>,
|
||||
deferred: &Bound<'py, PyAny>,
|
||||
) -> PyResult<Bound<'py, PyAny>> {
|
||||
let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| {
|
||||
let sys = PyModule::import(py, "synapse.logging.context").unwrap();
|
||||
let func = sys.getattr("make_deferred_yieldable").unwrap().unbind();
|
||||
func
|
||||
});
|
||||
|
||||
make_deferred_yieldable.call1(py, (deferred,))?.extract(py)
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ from synapse.api.errors import (
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import (
|
||||
active_span,
|
||||
force_tracing,
|
||||
@ -229,7 +228,6 @@ class MasDelegatedAuth(BaseAuth):
|
||||
try:
|
||||
with start_active_span("mas-introspect-token"):
|
||||
inject_request_headers(raw_headers)
|
||||
with PreserveLoggingContext():
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=self._introspection_endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
|
@ -38,7 +38,6 @@ from synapse.api.errors import (
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import (
|
||||
active_span,
|
||||
force_tracing,
|
||||
@ -327,7 +326,6 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
try:
|
||||
with start_active_span("mas-introspect-token"):
|
||||
inject_request_headers(raw_headers)
|
||||
with PreserveLoggingContext():
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=uri,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
|
@ -1493,6 +1493,18 @@ def _required_state_changes(
|
||||
added, removed and then added again to the required state. In that case we
|
||||
only want to re-send that entry down sync if it has changed.
|
||||
|
||||
Args:
|
||||
prev_required_state_map: Map from state event type to state_keys requested for
|
||||
the room. The values are close to `StateKey` but actually use a syntax where you
|
||||
can provide `*` wildcard. `$ME and `$LAZY` for lazy-loading room members should
|
||||
already be expanded into their explicit forms by this point.
|
||||
request_required_state_map: Map from state event type to state_keys requested for
|
||||
the room. The values are close to `StateKey` but actually use a syntax where you
|
||||
can provide `*` wildcard. `$ME and `$LAZY` for lazy-loading room members should
|
||||
already be expanded into their explicit forms by this point.
|
||||
state_deltas: Relevant changes to the current state. "Relevant" for sync means
|
||||
in the token range.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of updated required state config (or None if there is no update)
|
||||
and the state filter to use to fetch extra current state that we need to
|
||||
|
@ -33,7 +33,6 @@ See doc/log_contexts.rst for details on how this works.
|
||||
import logging
|
||||
import threading
|
||||
import typing
|
||||
import warnings
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@ -55,11 +54,29 @@ from typing_extensions import ParamSpec
|
||||
from twisted.internet import defer, threads
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
|
||||
from synapse.logging.loggers import ExplicitlyConfiguredLogger
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.logging.scopecontextmanager import _LogContextScope
|
||||
from synapse.types import ISynapseReactor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
original_logger_class = logging.getLoggerClass()
|
||||
logging.setLoggerClass(ExplicitlyConfiguredLogger)
|
||||
logcontext_debug_logger = logging.getLogger("synapse.logging.context.debug")
|
||||
"""
|
||||
A logger for debugging when the logcontext switches.
|
||||
|
||||
Because this is very noisy and probably something only developers want to see when
|
||||
debugging logcontext problems, we want people to explictly opt-in before seeing anything
|
||||
in the logs. Requires explicitly setting `synapse.logging.context.debug` in the logging
|
||||
configuration and does not inherit the log level from the parent logger.
|
||||
"""
|
||||
# Restore the original logger class
|
||||
logging.setLoggerClass(original_logger_class)
|
||||
|
||||
try:
|
||||
import resource
|
||||
|
||||
@ -238,7 +255,14 @@ class _Sentinel:
|
||||
we should always know which server the logs are coming from.
|
||||
"""
|
||||
|
||||
__slots__ = ["previous_context", "finished", "server_name", "request", "tag"]
|
||||
__slots__ = [
|
||||
"previous_context",
|
||||
"finished",
|
||||
"scope",
|
||||
"server_name",
|
||||
"request",
|
||||
"tag",
|
||||
]
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Minimal set for compatibility with LoggingContext
|
||||
@ -246,6 +270,7 @@ class _Sentinel:
|
||||
self.finished = False
|
||||
self.server_name = "unknown_server_from_sentinel_context"
|
||||
self.request = None
|
||||
self.scope = None
|
||||
self.tag = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
@ -303,6 +328,7 @@ class LoggingContext:
|
||||
"finished",
|
||||
"request",
|
||||
"tag",
|
||||
"scope",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@ -327,6 +353,7 @@ class LoggingContext:
|
||||
self.main_thread = get_thread_id()
|
||||
self.request = None
|
||||
self.tag = ""
|
||||
self.scope: Optional["_LogContextScope"] = None
|
||||
|
||||
# keep track of whether we have hit the __exit__ block for this context
|
||||
# (suggesting that the the thing that created the context thinks it should
|
||||
@ -340,6 +367,9 @@ class LoggingContext:
|
||||
# which request this corresponds to
|
||||
self.request = self.parent_context.request
|
||||
|
||||
# we also track the current scope:
|
||||
self.scope = self.parent_context.scope
|
||||
|
||||
if request is not None:
|
||||
# the request param overrides the request from the parent context
|
||||
self.request = request
|
||||
@ -347,49 +377,9 @@ class LoggingContext:
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
@classmethod
|
||||
def current_context(cls) -> LoggingContextOrSentinel:
|
||||
"""Get the current logging context from thread local storage
|
||||
|
||||
This exists for backwards compatibility. ``current_context()`` should be
|
||||
called directly.
|
||||
|
||||
Returns:
|
||||
The current logging context
|
||||
"""
|
||||
warnings.warn(
|
||||
"synapse.logging.context.LoggingContext.current_context() is deprecated "
|
||||
"in favor of synapse.logging.context.current_context().",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return current_context()
|
||||
|
||||
@classmethod
|
||||
def set_current_context(
|
||||
cls, context: LoggingContextOrSentinel
|
||||
) -> LoggingContextOrSentinel:
|
||||
"""Set the current logging context in thread local storage
|
||||
|
||||
This exists for backwards compatibility. ``set_current_context()`` should be
|
||||
called directly.
|
||||
|
||||
Args:
|
||||
context: The context to activate.
|
||||
|
||||
Returns:
|
||||
The context that was previously active
|
||||
"""
|
||||
warnings.warn(
|
||||
"synapse.logging.context.LoggingContext.set_current_context() is deprecated "
|
||||
"in favor of synapse.logging.context.set_current_context().",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return set_current_context(context)
|
||||
|
||||
def __enter__(self) -> "LoggingContext":
|
||||
"""Enters this logging context into thread local storage"""
|
||||
logcontext_debug_logger.debug("LoggingContext(%s).__enter__", self.name)
|
||||
old_context = set_current_context(self)
|
||||
if self.previous_context != old_context:
|
||||
logcontext_error(
|
||||
@ -412,6 +402,9 @@ class LoggingContext:
|
||||
Returns:
|
||||
None to avoid suppressing any exceptions that were thrown.
|
||||
"""
|
||||
logcontext_debug_logger.debug(
|
||||
"LoggingContext(%s).__exit__ --> %s", self.name, self.previous_context
|
||||
)
|
||||
current = set_current_context(self.previous_context)
|
||||
if current is not self:
|
||||
if current is SENTINEL_CONTEXT:
|
||||
@ -660,14 +653,21 @@ class PreserveLoggingContext:
|
||||
reactor back to the code).
|
||||
"""
|
||||
|
||||
__slots__ = ["_old_context", "_new_context"]
|
||||
__slots__ = ["_old_context", "_new_context", "_instance_id"]
|
||||
|
||||
def __init__(
|
||||
self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT
|
||||
) -> None:
|
||||
self._new_context = new_context
|
||||
self._instance_id = random_string(5)
|
||||
|
||||
def __enter__(self) -> None:
|
||||
logcontext_debug_logger.debug(
|
||||
"PreserveLoggingContext(%s).__enter__ %s --> %s",
|
||||
self._instance_id,
|
||||
current_context(),
|
||||
self._new_context,
|
||||
)
|
||||
self._old_context = set_current_context(self._new_context)
|
||||
|
||||
def __exit__(
|
||||
@ -676,6 +676,12 @@ class PreserveLoggingContext:
|
||||
value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
logcontext_debug_logger.debug(
|
||||
"PreserveLoggingContext(%s).__exit %s --> %s",
|
||||
self._instance_id,
|
||||
current_context(),
|
||||
self._old_context,
|
||||
)
|
||||
context = set_current_context(self._old_context)
|
||||
|
||||
if context != self._new_context:
|
||||
@ -855,7 +861,11 @@ def run_in_background(
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
instance_id = random_string(5)
|
||||
calling_context = current_context()
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): called with logcontext=%s", instance_id, calling_context
|
||||
)
|
||||
try:
|
||||
# (kick off the task in the current context)
|
||||
res = f(*args, **kwargs)
|
||||
@ -897,6 +907,11 @@ def run_in_background(
|
||||
# to reset the logcontext to the sentinel logcontext as that would run
|
||||
# immediately (remember our goal is to maintain the calling logcontext when we
|
||||
# return).
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): deferred already completed and the function should have maintained the logcontext %s",
|
||||
instance_id,
|
||||
calling_context,
|
||||
)
|
||||
return d
|
||||
|
||||
# Since the function we called may follow the Synapse logcontext rules (Rules for
|
||||
@ -907,6 +922,11 @@ def run_in_background(
|
||||
#
|
||||
# Our goal is to have the caller logcontext unchanged after firing off the
|
||||
# background task and returning.
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): restoring calling logcontext %s",
|
||||
instance_id,
|
||||
calling_context,
|
||||
)
|
||||
set_current_context(calling_context)
|
||||
|
||||
# If the function we called is playing nice and following the Synapse logcontext
|
||||
@ -922,7 +942,23 @@ def run_in_background(
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
|
||||
|
||||
def _log_set_context_cb(
|
||||
result: ResultT, context: LoggingContextOrSentinel
|
||||
) -> ResultT:
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): resetting logcontext to %s",
|
||||
instance_id,
|
||||
context,
|
||||
)
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
d.addBoth(_log_set_context_cb, SENTINEL_CONTEXT)
|
||||
else:
|
||||
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
|
||||
|
||||
return d
|
||||
|
||||
|
||||
@ -978,10 +1014,21 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
|
||||
restores the old context once the awaitable completes (execution passes from the
|
||||
reactor back to the code).
|
||||
"""
|
||||
instance_id = random_string(5)
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): called with logcontext=%s",
|
||||
instance_id,
|
||||
current_context(),
|
||||
)
|
||||
|
||||
# The deferred has already completed
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): deferred already completed and the function should have maintained the logcontext",
|
||||
instance_id,
|
||||
)
|
||||
return deferred
|
||||
|
||||
# Our goal is to have the caller logcontext unchanged after they yield/await the
|
||||
@ -993,8 +1040,31 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
|
||||
# does) while the deferred runs in the reactor event loop, we reset the logcontext
|
||||
# and add a callback to the deferred to restore it so the caller's logcontext is
|
||||
# active when the deferred completes.
|
||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): resetting logcontext to %s",
|
||||
instance_id,
|
||||
SENTINEL_CONTEXT,
|
||||
)
|
||||
calling_context = set_current_context(SENTINEL_CONTEXT)
|
||||
|
||||
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
|
||||
|
||||
def _log_set_context_cb(
|
||||
result: ResultT, context: LoggingContextOrSentinel
|
||||
) -> ResultT:
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): restoring calling logcontext to %s",
|
||||
instance_id,
|
||||
context,
|
||||
)
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
deferred.addBoth(_log_set_context_cb, calling_context)
|
||||
else:
|
||||
deferred.addBoth(_set_context_cb, calling_context)
|
||||
|
||||
return deferred
|
||||
|
||||
|
||||
|
@ -251,17 +251,18 @@ class _DummyTagNames:
|
||||
try:
|
||||
import opentracing
|
||||
import opentracing.tags
|
||||
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
|
||||
|
||||
tags = opentracing.tags
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore[assignment]
|
||||
tags = _DummyTagNames # type: ignore[assignment]
|
||||
ContextVarsScopeManager = None # type: ignore
|
||||
try:
|
||||
from jaeger_client import Config as JaegerConfig
|
||||
|
||||
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||
except ImportError:
|
||||
JaegerConfig = None # type: ignore
|
||||
LogContextScopeManager = None # type: ignore
|
||||
|
||||
|
||||
try:
|
||||
@ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None:
|
||||
config = JaegerConfig(
|
||||
config=jaeger_config,
|
||||
service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
|
||||
scope_manager=ContextVarsScopeManager(),
|
||||
scope_manager=LogContextScopeManager(),
|
||||
metrics_factory=PrometheusMetricsFactory(),
|
||||
)
|
||||
|
||||
@ -683,9 +684,21 @@ def start_active_span_from_edu(
|
||||
|
||||
# Opentracing setters for tags, logs, etc
|
||||
@only_if_tracing
|
||||
def active_span() -> Optional["opentracing.Span"]:
|
||||
"""Get the currently active span, if any"""
|
||||
return opentracing.tracer.active_span
|
||||
def active_span(
|
||||
*,
|
||||
tracer: Optional["opentracing.Tracer"] = None,
|
||||
) -> Optional["opentracing.Span"]:
|
||||
"""
|
||||
Get the currently active span, if any
|
||||
|
||||
Args:
|
||||
tracer: override the opentracing tracer. By default the global tracer is used.
|
||||
"""
|
||||
if tracer is None:
|
||||
# use the global tracer by default
|
||||
tracer = opentracing.tracer
|
||||
|
||||
return tracer.active_span
|
||||
|
||||
|
||||
@ensure_active_span("set a tag")
|
||||
|
161
synapse/logging/scopecontextmanager.py
Normal file
161
synapse/logging/scopecontextmanager.py
Normal file
@ -0,0 +1,161 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from opentracing import Scope, ScopeManager, Span
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
current_context,
|
||||
nested_logging_context,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LogContextScopeManager(ScopeManager):
|
||||
"""
|
||||
The LogContextScopeManager tracks the active scope in opentracing
|
||||
by using the log contexts which are native to synapse. This is so
|
||||
that the basic opentracing api can be used across twisted defereds.
|
||||
|
||||
It would be nice just to use opentracing's ContextVarsScopeManager,
|
||||
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@property
|
||||
def active(self) -> Optional[Scope]:
|
||||
"""
|
||||
Returns the currently active Scope which can be used to access the
|
||||
currently active Scope.span.
|
||||
If there is a non-null Scope, its wrapped Span
|
||||
becomes an implicit parent of any newly-created Span at
|
||||
Tracer.start_active_span() time.
|
||||
|
||||
Return:
|
||||
The Scope that is active, or None if not available.
|
||||
"""
|
||||
ctx = current_context()
|
||||
return ctx.scope
|
||||
|
||||
def activate(self, span: Span, finish_on_close: bool) -> Scope:
|
||||
"""
|
||||
Makes a Span active.
|
||||
Args
|
||||
span: the span that should become active.
|
||||
finish_on_close: whether Span should be automatically finished when
|
||||
Scope.close() is called.
|
||||
|
||||
Returns:
|
||||
Scope to control the end of the active period for
|
||||
*span*. It is a programming error to neglect to call
|
||||
Scope.close() on the returned instance.
|
||||
"""
|
||||
|
||||
ctx = current_context()
|
||||
|
||||
if not ctx:
|
||||
logger.error("Tried to activate scope outside of loggingcontext")
|
||||
return Scope(None, span) # type: ignore[arg-type]
|
||||
|
||||
if ctx.scope is not None:
|
||||
# start a new logging context as a child of the existing one.
|
||||
# Doing so -- rather than updating the existing logcontext -- means that
|
||||
# creating several concurrent spans under the same logcontext works
|
||||
# correctly.
|
||||
ctx = nested_logging_context("")
|
||||
enter_logcontext = True
|
||||
else:
|
||||
# if there is no span currently associated with the current logcontext, we
|
||||
# just store the scope in it.
|
||||
#
|
||||
# This feels a bit dubious, but it does hack around a problem where a
|
||||
# span outlasts its parent logcontext (which would otherwise lead to
|
||||
# "Re-starting finished log context" errors).
|
||||
enter_logcontext = False
|
||||
|
||||
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
|
||||
ctx.scope = scope
|
||||
if enter_logcontext:
|
||||
ctx.__enter__()
|
||||
|
||||
return scope
|
||||
|
||||
|
||||
class _LogContextScope(Scope):
|
||||
"""
|
||||
A custom opentracing scope, associated with a LogContext
|
||||
|
||||
* When the scope is closed, the logcontext's active scope is reset to None.
|
||||
and - if enter_logcontext was set - the logcontext is finished too.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
manager: LogContextScopeManager,
|
||||
span: Span,
|
||||
logcontext: LoggingContext,
|
||||
enter_logcontext: bool,
|
||||
finish_on_close: bool,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
manager:
|
||||
the manager that is responsible for this scope.
|
||||
span:
|
||||
the opentracing span which this scope represents the local
|
||||
lifetime for.
|
||||
logcontext:
|
||||
the log context to which this scope is attached.
|
||||
enter_logcontext:
|
||||
if True the log context will be exited when the scope is finished
|
||||
finish_on_close:
|
||||
if True finish the span when the scope is closed
|
||||
"""
|
||||
super().__init__(manager, span)
|
||||
self.logcontext = logcontext
|
||||
self._finish_on_close = finish_on_close
|
||||
self._enter_logcontext = enter_logcontext
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Scope<{self.span}>"
|
||||
|
||||
def close(self) -> None:
|
||||
active_scope = self.manager.active
|
||||
if active_scope is not self:
|
||||
logger.error(
|
||||
"Closing scope %s which is not the currently-active one %s",
|
||||
self,
|
||||
active_scope,
|
||||
)
|
||||
|
||||
if self._finish_on_close:
|
||||
self.span.finish()
|
||||
|
||||
self.logcontext.scope = None
|
||||
|
||||
if self._enter_logcontext:
|
||||
self.logcontext.__exit__(None, None, None)
|
@ -68,6 +68,11 @@ if TYPE_CHECKING:
|
||||
|
||||
from synapse.server import HomeServer
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore[assignment]
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -225,6 +230,7 @@ def run_as_background_process(
|
||||
func: Callable[..., Awaitable[Optional[R]]],
|
||||
*args: Any,
|
||||
bg_start_span: bool = True,
|
||||
test_only_tracer: Optional["opentracing.Tracer"] = None,
|
||||
**kwargs: Any,
|
||||
) -> "defer.Deferred[Optional[R]]":
|
||||
"""Run the given function in its own logcontext, with resource metrics
|
||||
@ -250,6 +256,8 @@ def run_as_background_process(
|
||||
bg_start_span: Whether to start an opentracing span. Defaults to True.
|
||||
Should only be disabled for processes that will not log to or tag
|
||||
a span.
|
||||
test_only_tracer: Set the OpenTracing tracer to use. This is only useful for
|
||||
tests.
|
||||
args: positional args for func
|
||||
kwargs: keyword args for func
|
||||
|
||||
@ -259,6 +267,12 @@ def run_as_background_process(
|
||||
rules.
|
||||
"""
|
||||
|
||||
# Since we track the tracing scope in the `LoggingContext`, before we move to the
|
||||
# sentinel logcontext (or a new `LoggingContext`), grab the currently active
|
||||
# tracing span (if any) so that we can create a cross-link to the background process
|
||||
# trace.
|
||||
original_active_tracing_span = active_span(tracer=test_only_tracer)
|
||||
|
||||
async def run() -> Optional[R]:
|
||||
with _bg_metrics_lock:
|
||||
count = _background_process_counts.get(desc, 0)
|
||||
@ -276,8 +290,6 @@ def run_as_background_process(
|
||||
) as logging_context:
|
||||
try:
|
||||
if bg_start_span:
|
||||
original_active_tracing_span = active_span()
|
||||
|
||||
# If there is already an active span (e.g. because this background
|
||||
# process was started as part of handling a request for example),
|
||||
# because this is a long-running background task that may serve a
|
||||
@ -308,6 +320,7 @@ def run_as_background_process(
|
||||
# Create a root span for the background process (disconnected
|
||||
# from other spans)
|
||||
ignore_active_span=True,
|
||||
tracer=test_only_tracer,
|
||||
)
|
||||
|
||||
# Also add a span in the original request trace that cross-links
|
||||
@ -324,8 +337,11 @@ def run_as_background_process(
|
||||
f"start_bgproc.{desc}",
|
||||
child_of=original_active_tracing_span,
|
||||
ignore_active_span=True,
|
||||
# Points to the background process span.
|
||||
# Create the `FOLLOWS_FROM` reference to the background
|
||||
# process span so there is a loose coupling between the two
|
||||
# traces and it's easy to jump between.
|
||||
contexts=[root_tracing_scope.span.context],
|
||||
tracer=test_only_tracer,
|
||||
):
|
||||
pass
|
||||
|
||||
@ -341,6 +357,7 @@ def run_as_background_process(
|
||||
# span so there is a loose coupling between the two
|
||||
# traces and it's easy to jump between.
|
||||
contexts=[original_active_tracing_span.context],
|
||||
tracer=test_only_tracer,
|
||||
)
|
||||
|
||||
# For easy usage down below, we create a context manager that
|
||||
@ -359,6 +376,7 @@ def run_as_background_process(
|
||||
tracing_scope = start_active_span(
|
||||
f"bgproc.{desc}",
|
||||
tags={SynapseTags.REQUEST_ID: str(logging_context)},
|
||||
tracer=test_only_tracer,
|
||||
)
|
||||
else:
|
||||
tracing_scope = nullcontext()
|
||||
|
@ -17,6 +17,10 @@ from twisted.internet.defer import Deferred
|
||||
from synapse.types import ISynapseReactor
|
||||
|
||||
class HttpClient:
|
||||
"""
|
||||
The returned deferreds follow Synapse logcontext rules.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: ...
|
||||
def get(self, url: str, response_limit: int) -> Deferred[bytes]: ...
|
||||
def post(
|
||||
|
@ -27,8 +27,8 @@ from twisted.python.failure import Failure
|
||||
from synapse.logging.context import (
|
||||
ContextResourceUsage,
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
nested_logging_context,
|
||||
set_current_context,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
@ -422,14 +422,11 @@ class TaskScheduler:
|
||||
"""
|
||||
|
||||
current_time = self._clock.time()
|
||||
calling_context = set_current_context(task_log_context)
|
||||
try:
|
||||
with PreserveLoggingContext(task_log_context):
|
||||
usage = task_log_context.get_resource_usage()
|
||||
TaskScheduler._log_task_usage(
|
||||
"continuing", task, usage, current_time - start_time
|
||||
)
|
||||
finally:
|
||||
set_current_context(calling_context)
|
||||
|
||||
async def wrapper() -> None:
|
||||
with nested_logging_context(task.id) as log_context:
|
||||
|
@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
from typing import Awaitable, Dict, cast
|
||||
from typing import Awaitable, Optional, cast
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.testing import MemoryReactorClock
|
||||
@ -35,20 +35,25 @@ from synapse.logging.opentracing import (
|
||||
tag_args,
|
||||
trace_with_opname,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore
|
||||
ContextVarsScopeManager = None # type: ignore
|
||||
from tests.server import get_clock
|
||||
|
||||
try:
|
||||
import jaeger_client
|
||||
except ImportError:
|
||||
jaeger_client = None # type: ignore
|
||||
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
|
||||
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore
|
||||
LogContextScopeManager = None # type: ignore
|
||||
|
||||
import logging
|
||||
|
||||
from tests.unittest import TestCase
|
||||
@ -56,7 +61,7 @@ from tests.unittest import TestCase
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TracingScopeTestCase(TestCase):
|
||||
class LogContextScopeManagerTestCase(TestCase):
|
||||
"""
|
||||
Test that our tracing machinery works well in a variety of situations (especially
|
||||
with Twisted's runtime and deferreds).
|
||||
@ -67,7 +72,7 @@ class TracingScopeTestCase(TestCase):
|
||||
opentracing backend is Jaeger.
|
||||
"""
|
||||
|
||||
if opentracing is None:
|
||||
if opentracing is None or LogContextScopeManager is None:
|
||||
skip = "Requires opentracing" # type: ignore[unreachable]
|
||||
if jaeger_client is None:
|
||||
skip = "Requires jaeger_client" # type: ignore[unreachable]
|
||||
@ -77,9 +82,8 @@ class TracingScopeTestCase(TestCase):
|
||||
# global variables that power opentracing. We create our own tracer instance
|
||||
# and test with it.
|
||||
|
||||
scope_manager = ContextVarsScopeManager()
|
||||
config = jaeger_client.config.Config(
|
||||
config={}, service_name="test", scope_manager=scope_manager
|
||||
config={}, service_name="test", scope_manager=LogContextScopeManager()
|
||||
)
|
||||
|
||||
self._reporter = jaeger_client.reporter.InMemoryReporter()
|
||||
@ -220,144 +224,6 @@ class TracingScopeTestCase(TestCase):
|
||||
[scopes[1].span, scopes[2].span, scopes[0].span],
|
||||
)
|
||||
|
||||
def test_run_in_background_active_scope_still_available(self) -> None:
|
||||
"""
|
||||
Test that tasks running via `run_in_background` still have access to the
|
||||
active tracing scope.
|
||||
|
||||
This is a regression test for a previous Synapse issue where the tracing scope
|
||||
would `__exit__` and close before the `run_in_background` task completed and our
|
||||
own previous custom `_LogContextScope.close(...)` would clear
|
||||
`LoggingContext.scope` preventing further tracing spans from having the correct
|
||||
parent.
|
||||
"""
|
||||
reactor = MemoryReactorClock()
|
||||
# type-ignore: mypy-zope doesn't seem to recognise that `MemoryReactorClock`
|
||||
# implements `ISynapseThreadlessReactor` (combination of the normal Twisted
|
||||
# Reactor/Clock interfaces), via inheritance from
|
||||
# `twisted.internet.testing.MemoryReactor` and `twisted.internet.testing.Clock`
|
||||
# Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock`
|
||||
# for testing purposes.
|
||||
clock = Clock( # type: ignore[multiple-internal-clocks]
|
||||
reactor, # type: ignore[arg-type]
|
||||
server_name="test_server",
|
||||
)
|
||||
|
||||
scope_map: Dict[str, opentracing.Scope] = {}
|
||||
|
||||
async def async_task() -> None:
|
||||
root_scope = scope_map["root"]
|
||||
root_context = cast(jaeger_client.SpanContext, root_scope.span.context)
|
||||
|
||||
self.assertEqual(
|
||||
self._tracer.active_span,
|
||||
root_scope.span,
|
||||
"expected to inherit the root tracing scope from where this was run",
|
||||
)
|
||||
|
||||
# Return control back to the reactor thread and wait an arbitrary amount
|
||||
await clock.sleep(4)
|
||||
|
||||
# This is a key part of what we're testing! In a previous version of
|
||||
# Synapse, we would lose the active span at this point.
|
||||
self.assertEqual(
|
||||
self._tracer.active_span,
|
||||
root_scope.span,
|
||||
"expected to still have a root tracing scope/span active",
|
||||
)
|
||||
|
||||
# For complete-ness sake, let's also trace more sub-tasks here and assert
|
||||
# they have the correct span parents as well (root)
|
||||
|
||||
# Start tracing some other sub-task.
|
||||
#
|
||||
# This is a key part of what we're testing! In a previous version of
|
||||
# Synapse, it would have the incorrect span parents.
|
||||
scope = start_active_span(
|
||||
"task1",
|
||||
tracer=self._tracer,
|
||||
)
|
||||
scope_map["task1"] = scope
|
||||
|
||||
# Ensure the span parent is pointing to the root scope
|
||||
context = cast(jaeger_client.SpanContext, scope.span.context)
|
||||
self.assertEqual(
|
||||
context.parent_id,
|
||||
root_context.span_id,
|
||||
"expected task1 parent to be the root span",
|
||||
)
|
||||
|
||||
# Ensure that the active span is our new sub-task now
|
||||
self.assertEqual(self._tracer.active_span, scope.span)
|
||||
# Return control back to the reactor thread and wait an arbitrary amount
|
||||
await clock.sleep(4)
|
||||
# We should still see the active span as the scope wasn't closed yet
|
||||
self.assertEqual(self._tracer.active_span, scope.span)
|
||||
scope.close()
|
||||
|
||||
async def root() -> None:
|
||||
with start_active_span(
|
||||
"root span",
|
||||
tracer=self._tracer,
|
||||
# We will close this off later. We're basically just mimicking the same
|
||||
# pattern for how we handle requests. We pass the span off to the
|
||||
# request for it to finish.
|
||||
finish_on_close=False,
|
||||
) as root_scope:
|
||||
scope_map["root"] = root_scope
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
# Fire-and-forget a task
|
||||
#
|
||||
# XXX: The root scope context manager will `__exit__` before this task
|
||||
# completes.
|
||||
run_in_background(async_task)
|
||||
|
||||
# Because we used `run_in_background`, the active span should still be
|
||||
# the root.
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
# We shouldn't see any active spans outside of the scope
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
with LoggingContext(name="root context", server_name="test_server"):
|
||||
# Start the test off
|
||||
d_root = defer.ensureDeferred(root())
|
||||
|
||||
# Let the tasks complete
|
||||
reactor.pump((2,) * 8)
|
||||
self.successResultOf(d_root)
|
||||
|
||||
# After we see all of the tasks are done (like a request when it
|
||||
# `_finished_processing`), let's finish our root span
|
||||
scope_map["root"].span.finish()
|
||||
|
||||
# Sanity check again: We shouldn't see any active spans leftover in this
|
||||
# this context.
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
# The spans should be reported in order of their finishing: task 1, task 2,
|
||||
# root.
|
||||
#
|
||||
# We use `assertIncludes` just as an easier way to see if items are missing or
|
||||
# added. We assert the order just below
|
||||
self.assertIncludes(
|
||||
set(self._reporter.get_spans()),
|
||||
{
|
||||
scope_map["task1"].span,
|
||||
scope_map["root"].span,
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
# This is where we actually assert the correct order
|
||||
self.assertEqual(
|
||||
self._reporter.get_spans(),
|
||||
[
|
||||
scope_map["task1"].span,
|
||||
scope_map["root"].span,
|
||||
],
|
||||
)
|
||||
|
||||
def test_trace_decorator_sync(self) -> None:
|
||||
"""
|
||||
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
|
||||
@ -455,3 +321,203 @@ class TracingScopeTestCase(TestCase):
|
||||
[span.operation_name for span in self._reporter.get_spans()],
|
||||
["fixture_awaitable_return_func"],
|
||||
)
|
||||
|
||||
async def test_run_as_background_process_standalone(self) -> None:
|
||||
"""
|
||||
Test to make sure that the background process work starts its own trace.
|
||||
"""
|
||||
reactor, clock = get_clock()
|
||||
|
||||
callback_finished = False
|
||||
active_span_in_callback: Optional[jaeger_client.Span] = None
|
||||
|
||||
async def bg_task() -> None:
|
||||
nonlocal callback_finished, active_span_in_callback
|
||||
try:
|
||||
assert isinstance(self._tracer.active_span, jaeger_client.Span)
|
||||
active_span_in_callback = self._tracer.active_span
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
# type-ignore: We ignore because the point is to test the bare function
|
||||
run_as_background_process( # type: ignore[untracked-background-process]
|
||||
desc="some-bg-task",
|
||||
server_name="test_server",
|
||||
func=bg_task,
|
||||
test_only_tracer=self._tracer,
|
||||
)
|
||||
|
||||
# Now wait for the background process to finish
|
||||
while not callback_finished:
|
||||
await clock.sleep(0)
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
active_span_in_callback.operation_name if active_span_in_callback else None,
|
||||
"bgproc.some-bg-task",
|
||||
"expected a new span to be started for the background task",
|
||||
)
|
||||
|
||||
# The spans should be reported in order of their finishing.
|
||||
#
|
||||
# We use `assertIncludes` just as an easier way to see if items are missing or
|
||||
# added. We assert the order just below
|
||||
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
|
||||
expected_spans = ["bgproc.some-bg-task"]
|
||||
self.assertIncludes(
|
||||
set(actual_spans),
|
||||
set(expected_spans),
|
||||
exact=True,
|
||||
)
|
||||
# This is where we actually assert the correct order
|
||||
self.assertEqual(
|
||||
actual_spans,
|
||||
expected_spans,
|
||||
)
|
||||
|
||||
async def test_run_as_background_process_cross_link(self) -> None:
|
||||
"""
|
||||
Test to make sure that the background process work has its own trace and is
|
||||
disconnected from any currently active trace (like a request). But we still have
|
||||
cross-links between the two traces if there was already an active trace/span when
|
||||
we kicked off the background process.
|
||||
"""
|
||||
reactor, clock = get_clock()
|
||||
|
||||
callback_finished = False
|
||||
active_span_in_callback: Optional[jaeger_client.Span] = None
|
||||
|
||||
async def bg_task() -> None:
|
||||
nonlocal callback_finished, active_span_in_callback
|
||||
try:
|
||||
assert isinstance(self._tracer.active_span, jaeger_client.Span)
|
||||
active_span_in_callback = self._tracer.active_span
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext(name="some-request", server_name="test_server"):
|
||||
with start_active_span(
|
||||
"some-request",
|
||||
tracer=self._tracer,
|
||||
):
|
||||
# type-ignore: We ignore because the point is to test the bare function
|
||||
run_as_background_process( # type: ignore[untracked-background-process]
|
||||
desc="some-bg-task",
|
||||
server_name="test_server",
|
||||
func=bg_task,
|
||||
test_only_tracer=self._tracer,
|
||||
)
|
||||
|
||||
# Now wait for the background process to finish
|
||||
while not callback_finished:
|
||||
await clock.sleep(0)
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# We start `bgproc.some-bg-task` and `bgproc_child.some-bg-task` (see
|
||||
# `run_as_background_process` implementation for why). Either is fine but for
|
||||
# now we expect the child as its the innermost one that was started.
|
||||
self.assertEqual(
|
||||
active_span_in_callback.operation_name if active_span_in_callback else None,
|
||||
"bgproc_child.some-bg-task",
|
||||
"expected a new span to be started for the background task",
|
||||
)
|
||||
|
||||
# The spans should be reported in order of their finishing.
|
||||
#
|
||||
# We use `assertIncludes` just as an easier way to see if items are missing or
|
||||
# added. We assert the order just below
|
||||
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
|
||||
expected_spans = [
|
||||
"start_bgproc.some-bg-task",
|
||||
"bgproc_child.some-bg-task",
|
||||
"bgproc.some-bg-task",
|
||||
"some-request",
|
||||
]
|
||||
self.assertIncludes(
|
||||
set(actual_spans),
|
||||
set(expected_spans),
|
||||
exact=True,
|
||||
)
|
||||
# This is where we actually assert the correct order
|
||||
self.assertEqual(
|
||||
actual_spans,
|
||||
expected_spans,
|
||||
)
|
||||
|
||||
span_map = {span.operation_name: span for span in self._reporter.get_spans()}
|
||||
span_id_to_friendly_name = {
|
||||
span.span_id: span.operation_name for span in self._reporter.get_spans()
|
||||
}
|
||||
|
||||
def get_span_friendly_name(span_id: Optional[int]) -> str:
|
||||
if span_id is None:
|
||||
return "None"
|
||||
|
||||
return span_id_to_friendly_name.get(span_id, f"unknown span {span_id}")
|
||||
|
||||
# Ensure the background process trace/span is disconnected from the request
|
||||
# trace/span.
|
||||
self.assertNotEqual(
|
||||
get_span_friendly_name(span_map["bgproc.some-bg-task"].parent_id),
|
||||
get_span_friendly_name(span_map["some-request"].span_id),
|
||||
)
|
||||
|
||||
# We should see a cross-link in the request trace pointing to the background
|
||||
# process trace.
|
||||
#
|
||||
# Make sure `start_bgproc.some-bg-task` is part of the request trace
|
||||
self.assertEqual(
|
||||
get_span_friendly_name(span_map["start_bgproc.some-bg-task"].parent_id),
|
||||
get_span_friendly_name(span_map["some-request"].span_id),
|
||||
)
|
||||
# And has some references to the background process trace
|
||||
self.assertIncludes(
|
||||
{
|
||||
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
|
||||
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
|
||||
else f"{reference.type}:None"
|
||||
for reference in (
|
||||
span_map["start_bgproc.some-bg-task"].references or []
|
||||
)
|
||||
},
|
||||
{
|
||||
f"follows_from:{get_span_friendly_name(span_map['bgproc.some-bg-task'].span_id)}"
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# We should see a cross-link in the background process trace pointing to the
|
||||
# request trace that kicked off the work.
|
||||
#
|
||||
# Make sure `start_bgproc.some-bg-task` is part of the request trace
|
||||
self.assertEqual(
|
||||
get_span_friendly_name(span_map["bgproc_child.some-bg-task"].parent_id),
|
||||
get_span_friendly_name(span_map["bgproc.some-bg-task"].span_id),
|
||||
)
|
||||
# And has some references to the background process trace
|
||||
self.assertIncludes(
|
||||
{
|
||||
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
|
||||
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
|
||||
else f"{reference.type}:None"
|
||||
for reference in (
|
||||
span_map["bgproc_child.some-bg-task"].references or []
|
||||
)
|
||||
},
|
||||
{
|
||||
f"follows_from:{get_span_friendly_name(span_map['some-request'].span_id)}"
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
@ -1597,6 +1597,107 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_rooms_required_state_expand_retract_expand_without_new_activity(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Test that when expanding, retracting and then expanding the required state, we
|
||||
get the changes that happened; even without new activity in the room that would
|
||||
send the room down the connection otherwise.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a room with a room name.
|
||||
room_id1 = self.helper.create_room_as(
|
||||
user1_id, tok=user1_tok, extra_content={"name": "Foo"}
|
||||
)
|
||||
|
||||
# Only request the state event to begin with
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Create, ""],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Update the sliding sync requests to include the room name
|
||||
sync_body["lists"]["foo-list"]["required_state"] = [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.Name, ""],
|
||||
]
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We should see the room name, even though there haven't been any
|
||||
# changes.
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Name, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Update the room name
|
||||
self.helper.send_state(
|
||||
room_id1, EventTypes.Name, {"name": "Bar"}, state_key="", tok=user1_tok
|
||||
)
|
||||
|
||||
# Update the sliding sync requests to exclude the room name again
|
||||
sync_body["lists"]["foo-list"]["required_state"] = [
|
||||
[EventTypes.Create, ""],
|
||||
]
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We should not see the updated room name in state (though it will be in
|
||||
# the timeline).
|
||||
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
|
||||
|
||||
# Update the sliding sync requests to include the room name again
|
||||
sync_body["lists"]["foo-list"]["required_state"] = [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.Name, ""],
|
||||
]
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We should see the *new* room name, even though there haven't been any
|
||||
# changes.
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Name, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_rooms_required_state_expand_deduplicate(self) -> None:
|
||||
"""Test that when expanding, retracting and then expanding the required
|
||||
state, we don't get the state down again if it hasn't changed"""
|
||||
@ -1686,3 +1787,77 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
# We should not see the room name again, as we have already sent that
|
||||
# down.
|
||||
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
|
||||
|
||||
def test_rooms_required_state_expand_with_same_pos(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Test that when expanding the required state, we get the changes that happened
|
||||
even if we're using the same `pos`.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a room with a room name.
|
||||
room_id1 = self.helper.create_room_as(
|
||||
user1_id, tok=user1_tok, extra_content={"name": "Foo"}
|
||||
)
|
||||
|
||||
# Only request the state event to begin with (initial sync)
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Create, ""],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Do an incremental sync using the `pos` token from the initial sync
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Update the sliding sync requests to include the room name
|
||||
sync_body["lists"]["foo-list"]["required_state"] = [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.Name, ""],
|
||||
]
|
||||
# Make a request using the same `pos` token from the initial sync
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We should see the room name, even though there haven't been any
|
||||
# changes.
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Name, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
11
tests/synapse_rust/__init__.py
Normal file
11
tests/synapse_rust/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
225
tests/synapse_rust/test_http_client.py
Normal file
225
tests/synapse_rust/test_http_client.py
Normal file
@ -0,0 +1,225 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from typing import Any, Coroutine, Generator, TypeVar, Union
|
||||
|
||||
from twisted.internet.defer import Deferred, ensureDeferred
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
_Sentinel,
|
||||
current_context,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.synapse_rust.http_client import HttpClient
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.json import json_decoder
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class StubRequestHandler(BaseHTTPRequestHandler):
|
||||
server: "StubServer"
|
||||
|
||||
def do_GET(self) -> None:
|
||||
self.server.calls += 1
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps({"ok": True}).encode("utf-8"))
|
||||
|
||||
def log_message(self, format: str, *args: Any) -> None:
|
||||
# Don't log anything; by default, the server logs to stderr
|
||||
pass
|
||||
|
||||
|
||||
class StubServer(HTTPServer):
|
||||
"""A stub HTTP server that we can send requests to for testing.
|
||||
|
||||
This opens a real HTTP server on a random port, on a separate thread.
|
||||
"""
|
||||
|
||||
calls: int = 0
|
||||
"""How many times has the endpoint been requested."""
|
||||
|
||||
_thread: threading.Thread
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(("127.0.0.1", 0), StubRequestHandler)
|
||||
|
||||
self._thread = threading.Thread(
|
||||
target=self.serve_forever,
|
||||
name="StubServer",
|
||||
kwargs={"poll_interval": 0.01},
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
super().shutdown()
|
||||
self._thread.join()
|
||||
|
||||
@property
|
||||
def endpoint(self) -> str:
|
||||
return f"http://127.0.0.1:{self.server_port}/"
|
||||
|
||||
|
||||
class HttpClientTestCase(HomeserverTestCase):
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
hs = self.setup_test_homeserver()
|
||||
|
||||
# XXX: We must create the Rust HTTP client before we call `reactor.run()` below.
|
||||
# Twisted's `MemoryReactor` doesn't invoke `callWhenRunning` callbacks if it's
|
||||
# already running and we rely on that to start the Tokio thread pool in Rust. In
|
||||
# the future, this may not matter, see https://github.com/twisted/twisted/pull/12514
|
||||
self._http_client = hs.get_proxied_http_client()
|
||||
self._rust_http_client = HttpClient(
|
||||
reactor=hs.get_reactor(),
|
||||
user_agent=self._http_client.user_agent.decode("utf8"),
|
||||
)
|
||||
|
||||
# This triggers the server startup hooks, which starts the Tokio thread pool
|
||||
reactor.run()
|
||||
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.server = StubServer()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
# MemoryReactor doesn't trigger the shutdown phases, and we want the
|
||||
# Tokio thread pool to be stopped
|
||||
# XXX: This logic should probably get moved somewhere else
|
||||
shutdown_triggers = self.reactor.triggers.get("shutdown", {})
|
||||
for phase in ["before", "during", "after"]:
|
||||
triggers = shutdown_triggers.get(phase, [])
|
||||
for callbable, args, kwargs in triggers:
|
||||
callbable(*args, **kwargs)
|
||||
|
||||
def till_deferred_has_result(
|
||||
self,
|
||||
awaitable: Union[
|
||||
"Coroutine[Deferred[Any], Any, T]",
|
||||
"Generator[Deferred[Any], Any, T]",
|
||||
"Deferred[T]",
|
||||
],
|
||||
) -> "Deferred[T]":
|
||||
"""Wait until a deferred has a result.
|
||||
|
||||
This is useful because the Rust HTTP client will resolve the deferred
|
||||
using reactor.callFromThread, which are only run when we call
|
||||
reactor.advance.
|
||||
"""
|
||||
deferred = ensureDeferred(awaitable)
|
||||
tries = 0
|
||||
while not deferred.called:
|
||||
time.sleep(0.1)
|
||||
self.reactor.advance(0)
|
||||
tries += 1
|
||||
if tries > 100:
|
||||
raise Exception("Timed out waiting for deferred to resolve")
|
||||
|
||||
return deferred
|
||||
|
||||
def _check_current_logcontext(self, expected_logcontext_string: str) -> None:
|
||||
context = current_context()
|
||||
assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), (
|
||||
f"Expected LoggingContext({expected_logcontext_string}) but saw {context}"
|
||||
)
|
||||
self.assertEqual(
|
||||
str(context),
|
||||
expected_logcontext_string,
|
||||
f"Expected LoggingContext({expected_logcontext_string}) but saw {context}",
|
||||
)
|
||||
|
||||
def test_request_response(self) -> None:
|
||||
"""
|
||||
Test to make sure we can make a basic request and get the expected
|
||||
response.
|
||||
"""
|
||||
|
||||
async def do_request() -> None:
|
||||
resp_body = await self._rust_http_client.get(
|
||||
url=self.server.endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
)
|
||||
raw_response = json_decoder.decode(resp_body.decode("utf-8"))
|
||||
self.assertEqual(raw_response, {"ok": True})
|
||||
|
||||
self.get_success(self.till_deferred_has_result(do_request()))
|
||||
self.assertEqual(self.server.calls, 1)
|
||||
|
||||
async def test_logging_context(self) -> None:
|
||||
"""
|
||||
Test to make sure the `LoggingContext` (logcontext) is handled correctly
|
||||
when making requests.
|
||||
"""
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_current_logcontext("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def do_request() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# Should have the same logcontext as the caller
|
||||
self._check_current_logcontext("foo")
|
||||
|
||||
with LoggingContext(name="competing", server_name="test_server"):
|
||||
# Make the actual request
|
||||
await self._rust_http_client.get(
|
||||
url=self.server.endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
)
|
||||
self._check_current_logcontext("competing")
|
||||
|
||||
# Back to the caller's context outside of the `LoggingContext` block
|
||||
self._check_current_logcontext("foo")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext(name="foo", server_name="test_server"):
|
||||
# Fire off the function, but don't wait on it.
|
||||
run_in_background(do_request)
|
||||
|
||||
# Now wait for the function under test to have run
|
||||
with PreserveLoggingContext():
|
||||
while not callback_finished:
|
||||
# await self.hs.get_clock().sleep(0)
|
||||
time.sleep(0.1)
|
||||
self.reactor.advance(0)
|
||||
|
||||
# check that the logcontext is left in a sane state.
|
||||
self._check_current_logcontext("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_current_logcontext("sentinel")
|
Loading…
x
Reference in New Issue
Block a user