Compare commits

...

23 Commits

Author SHA1 Message Date
Till
bc2c5c1dc0
Merge ee9768c31fce76a3b46aa50a2fb3c51f52dd641e into 70c044db8efabacf3deaf8635d98c593b722541a 2025-10-02 13:23:26 -05:00
Eric Eastwood
70c044db8e
Remove deprecated LoggingContext.set_current_context/LoggingContext.current_context methods (#18989)
These were added for backwards compatibility (and essentially
deprecated) in https://github.com/matrix-org/synapse/pull/7408
(2020-05-04) because
[`synapse-s3-storage-provider`](https://github.com/matrix-org/synapse-s3-storage-provider)
previously relied on them -- but `synapse-s3-storage-provider` since
been
[updated](https://github.com/matrix-org/synapse-s3-storage-provider/pull/36)
to no longer use them.
2025-10-02 13:21:37 -05:00
Eric Eastwood
6835e7be0d
Wrap the Rust HTTP client with make_deferred_yieldable (#18903)
Wrap the Rust HTTP client with `make_deferred_yieldable` so downstream
usage doesn't need to use `PreserveLoggingContext()` or
`make_deferred_yieldable`.

> it seems like we should have some wrapper around it that uses
[`make_deferred_yieldable(...)`](40edb10a98/docs/log_contexts.md (where-you-create-a-new-awaitable-make-it-follow-the-rules))
to make things right so we don't have to do this in the downstream code.
>
> *-- @MadLittleMods,
https://github.com/element-hq/synapse/pull/18357#discussion_r2294941827*

Spawning from wanting to [remove `PreserveLoggingContext()` from the
codebase](https://github.com/element-hq/synapse/pull/18870) and thinking
that we [shouldn't have to pollute all downstream usage with
`PreserveLoggingContext()` or
`make_deferred_yieldable`](https://github.com/element-hq/synapse/pull/18357#discussion_r2294941827)

Part of https://github.com/element-hq/synapse/issues/18905 (Remove
`sentinel` logcontext where we log in Synapse)
2025-10-02 13:00:50 -05:00
Eric Eastwood
d27ff161f5
Add debug logs wherever we change current logcontext (#18966)
Add debug logs wherever we change current logcontext (`LoggingContext`).
I've had to make this same set of changes over and over as I've been
debugging things so it seems useful enough to include by default.

Instead of tracing things at the `set_current_context(...)` level, I've
added the debug logging on all of the utilities that utilize
`set_current_context(...)`. It's much easier to reason about the log
context changing because of `PreserveLoggingContext` changing things
than an opaque `set_current_context(...)` call.
2025-10-02 11:51:17 -05:00
Eric Eastwood
06a84f4fe0
Revert "Switch to OpenTracing's ContextVarsScopeManager (#18849)" (#19007)
Revert https://github.com/element-hq/synapse/pull/18849

Go back to our custom `LogContextScopeManager` after trying
OpenTracing's `ContextVarsScopeManager`.

Fix https://github.com/element-hq/synapse/issues/19004

### Why revert?

For reference, with the normal reactor, `ContextVarsScopeManager` worked
just as good as our custom `LogContextScopeManager` as far as I can tell
(and even better in some cases). But since Twisted appears to not fully
support `ContextVar`'s, it doesn't work as expected in all cases.
Compounding things, `ContextVarsScopeManager` was causing errors with
the experimental `SYNAPSE_ASYNC_IO_REACTOR` option.

Since we're not getting the full benefit that we originally desired, we
might as well revert and figure out alternatives for extending the
logcontext lifetimes to support the use case we were trying to unlock
(c.f. https://github.com/element-hq/synapse/pull/18804).

See
https://github.com/element-hq/synapse/issues/19004#issuecomment-3358052171
for more info.


### Does this require backporting and patch releases?

No. Since `ContextVarsScopeManager` operates just as good with the
normal reactor and was only causing actual errors with the experimental
`SYNAPSE_ASYNC_IO_REACTOR` option, I don't think this requires us to
backport and make patch releases at all.



### Maintain cross-links between main trace and background process work

In order to maintain the functionality introduced in https://github.com/element-hq/synapse/pull/18932 (cross-links between the background process trace and currently active trace), we also needed a small change.

Previously, when we were using `ContextVarsScopeManager`, it tracked the tracing scope across the logcontext changes without issue. Now that we're using our own custom `LogContextScopeManager` again, we need to capture the active span from the logcontext before we reset to the sentinel context because of the `PreserveLoggingContext()` below.

Added some tests to ensure we maintain the `run_as_background` tracing behavior regardless of the tracing scope manager we use.
2025-10-02 11:27:26 -05:00
Eric Eastwood
1c093509ce
Switch task scheduler from raw logcontext manipulation (set_current_context) to utils (PreserveLoggingContext) (#18990)
Prefer the utils over raw logcontext manipulation.

Spawning from adding some logcontext debug logs in
https://github.com/element-hq/synapse/pull/18966 and since we're not
logging at the `set_current_context(...)` level (see reasoning there),
this removes some usage of `set_current_context(...)`.
2025-10-02 10:22:25 -05:00
Andrew Morgan
ee9768c31f Only validate device_keys fields when device keys are provided 2025-10-01 13:42:28 +01:00
Andrew Morgan
fc4e3f3e3c Extend unit tests to cover new user_id, device_id validation 2025-10-01 13:13:13 +01:00
Andrew Morgan
29fe51b293 Use body directly for this endpoint
There's little point in coverting the pydantic model back to a dict when we already have it via the parsed body.
2025-10-01 13:13:02 +01:00
Andrew Morgan
a0c6243798 Merge branch 'develop' of github.com:element-hq/synapse into HEAD 2025-10-01 10:02:32 +01:00
Andrew Morgan
0eaf28fa92 Add further validation of key property format
Some extra validation of the request body.
2025-09-30 18:33:33 +01:00
Andrew Morgan
88bc4bb67e Add a regression unit test 2025-09-30 18:33:33 +01:00
Andrew Morgan
ca0c87c504 Move validation from the handler to the servlet 2025-09-30 18:33:33 +01:00
Andrew Morgan
0d4a08103c Remove redundant validation 2025-09-30 18:33:33 +01:00
Andrew Morgan
b61527b0d8 Validate requests to /keys/upload with pydantic
I really wanted to use a `UserID` instead of a `StrictStr` for the fields that contain a user ID. But this became too
cumbersome due to the handler code wanting to directly json-encode the request body. As `UserID` does not subclass
`str`, one would have to rebuild the entire containing object in order to json-encode it. Perhaps a future PR will do this,
as it would allow us to validate UserID's more easily at the edge.
2025-09-30 18:33:33 +01:00
Andrew Morgan
34d6eba694 Merge branch 'develop' of github.com:element-hq/synapse into HEAD 2025-09-18 15:16:44 +01:00
Till Faelligen
938536186c
Ensure that uploaded keys are dicts 2024-11-22 09:25:11 +01:00
Till Faelligen
9c2d8fd6dd
Merge branch 'develop' of github.com:element-hq/synapse into s7evink/validate-upload-keys-dict 2024-11-22 09:10:31 +01:00
Till Faelligen
75a45e9ce6
Merge branch 'develop' of github.com:element-hq/synapse into s7evink/validate-upload-keys-dict 2024-10-01 11:39:01 +02:00
Till Faelligen
f4c17c5a38
Merge branch 'develop' of github.com:element-hq/synapse into s7evink/validate-upload-keys-dict 2024-09-16 14:10:50 +02:00
Till Faelligen
9d2cd9fe63
Add newsfile 2024-04-16 17:07:14 +02:00
Till Faelligen
67d516d2a4
Run the linters again after changing the file 2024-04-16 17:01:48 +02:00
Till Faelligen
3c0c30ad7d
Ensure that uploaded keys are dicts 2024-04-16 16:56:52 +02:00
22 changed files with 1107 additions and 236 deletions

1
changelog.d/17097.misc Normal file
View File

@ -0,0 +1 @@
Extend validation of uploaded device keys.

1
changelog.d/18903.misc Normal file
View File

@ -0,0 +1 @@
Wrap the Rust HTTP client with `make_deferred_yieldable` so it follows Synapse logcontext rules.

1
changelog.d/18966.misc Normal file
View File

@ -0,0 +1 @@
Add debug logs wherever we change current logcontext.

View File

@ -0,0 +1 @@
Remove deprecated `LoggingContext.set_current_context`/`LoggingContext.current_context` methods which already have equivalent bare methods in `synapse.logging.context`.

1
changelog.d/18990.misc Normal file
View File

@ -0,0 +1 @@
Switch task scheduler from raw logcontext manipulation to using the dedicated logcontext utils.

1
changelog.d/19007.misc Normal file
View File

@ -0,0 +1 @@
Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled.

View File

@ -548,3 +548,19 @@ chain are dropped. Dropping the the reference to an awaitable you're
supposed to be awaiting is bad practice, so this doesn't
actually happen too much. Unfortunately, when it does happen, it will
lead to leaked logcontexts which are incredibly hard to track down.
## Debugging logcontext issues
Debugging logcontext issues can be tricky as leaking or losing a logcontext will surface
downstream and can point to an unrelated part of the codebase. It's best to enable debug
logging for `synapse.logging.context.debug` (needs to be explicitly configured) and go
backwards in the logs from the point where the issue is observed to find the root cause.
`log.config.yaml`
```yaml
loggers:
# Unlike other loggers, this one needs to be explicitly configured to see debug logs.
synapse.logging.context.debug:
level: DEBUG
```

View File

@ -12,7 +12,7 @@
* <https://www.gnu.org/licenses/agpl-3.0.html>.
*/
use std::{collections::HashMap, future::Future};
use std::{collections::HashMap, future::Future, sync::OnceLock};
use anyhow::Context;
use futures::TryStreamExt;
@ -299,5 +299,22 @@ where
});
});
Ok(deferred)
// Make the deferred follow the Synapse logcontext rules
make_deferred_yieldable(py, &deferred)
}
static MAKE_DEFERRED_YIELDABLE: OnceLock<pyo3::Py<pyo3::PyAny>> = OnceLock::new();
/// Given a deferred, make it follow the Synapse logcontext rules
fn make_deferred_yieldable<'py>(
py: Python<'py>,
deferred: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| {
let sys = PyModule::import(py, "synapse.logging.context").unwrap();
let func = sys.getattr("make_deferred_yieldable").unwrap().unbind();
func
});
make_deferred_yieldable.call1(py, (deferred,))?.extract(py)
}

View File

@ -33,7 +33,6 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import (
active_span,
force_tracing,
@ -229,13 +228,12 @@ class MasDelegatedAuth(BaseAuth):
try:
with start_active_span("mas-introspect-token"):
inject_request_headers(raw_headers)
with PreserveLoggingContext():
resp_body = await self._rust_http_client.post(
url=self._introspection_endpoint,
response_limit=1 * 1024 * 1024,
headers=raw_headers,
request_body=body,
)
resp_body = await self._rust_http_client.post(
url=self._introspection_endpoint,
response_limit=1 * 1024 * 1024,
headers=raw_headers,
request_body=body,
)
except HttpResponseException as e:
end_time = self._clock.time()
introspection_response_timer.labels(

View File

@ -38,7 +38,6 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import (
active_span,
force_tracing,
@ -327,13 +326,12 @@ class MSC3861DelegatedAuth(BaseAuth):
try:
with start_active_span("mas-introspect-token"):
inject_request_headers(raw_headers)
with PreserveLoggingContext():
resp_body = await self._rust_http_client.post(
url=uri,
response_limit=1 * 1024 * 1024,
headers=raw_headers,
request_body=body,
)
resp_body = await self._rust_http_client.post(
url=uri,
response_limit=1 * 1024 * 1024,
headers=raw_headers,
request_body=body,
)
except HttpResponseException as e:
end_time = self._clock.time()
introspection_response_timer.labels(

View File

@ -57,7 +57,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
ONE_TIME_KEY_UPLOAD = "one_time_key_upload_lock"
@ -847,14 +846,22 @@ class E2eKeysHandler:
"""
time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys.
device_keys = keys.get("device_keys", None)
if device_keys:
log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
await self.upload_device_keys_for_user(
user_id=user_id,
device_id=device_id,
keys={"device_keys": device_keys},
)
else:
log_kv({"message": "Did not update device_keys", "reason": "not a dict"})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
@ -872,8 +879,9 @@ class E2eKeysHandler:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
fallback_keys = keys.get("fallback_keys")
if fallback_keys and isinstance(fallback_keys, dict):
if fallback_keys:
log_kv(
{
"message": "Updating fallback_keys for device.",
@ -882,8 +890,6 @@ class E2eKeysHandler:
}
)
await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
elif fallback_keys:
log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
else:
log_kv(
{"message": "Did not update fallback_keys", "reason": "no keys given"}

View File

@ -33,7 +33,6 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
import typing
import warnings
from types import TracebackType
from typing import (
TYPE_CHECKING,
@ -55,11 +54,29 @@ from typing_extensions import ParamSpec
from twisted.internet import defer, threads
from twisted.python.threadpool import ThreadPool
from synapse.logging.loggers import ExplicitlyConfiguredLogger
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.logging.scopecontextmanager import _LogContextScope
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
original_logger_class = logging.getLoggerClass()
logging.setLoggerClass(ExplicitlyConfiguredLogger)
logcontext_debug_logger = logging.getLogger("synapse.logging.context.debug")
"""
A logger for debugging when the logcontext switches.
Because this is very noisy and probably something only developers want to see when
debugging logcontext problems, we want people to explictly opt-in before seeing anything
in the logs. Requires explicitly setting `synapse.logging.context.debug` in the logging
configuration and does not inherit the log level from the parent logger.
"""
# Restore the original logger class
logging.setLoggerClass(original_logger_class)
try:
import resource
@ -238,7 +255,14 @@ class _Sentinel:
we should always know which server the logs are coming from.
"""
__slots__ = ["previous_context", "finished", "server_name", "request", "tag"]
__slots__ = [
"previous_context",
"finished",
"scope",
"server_name",
"request",
"tag",
]
def __init__(self) -> None:
# Minimal set for compatibility with LoggingContext
@ -246,6 +270,7 @@ class _Sentinel:
self.finished = False
self.server_name = "unknown_server_from_sentinel_context"
self.request = None
self.scope = None
self.tag = None
def __str__(self) -> str:
@ -303,6 +328,7 @@ class LoggingContext:
"finished",
"request",
"tag",
"scope",
]
def __init__(
@ -327,6 +353,7 @@ class LoggingContext:
self.main_thread = get_thread_id()
self.request = None
self.tag = ""
self.scope: Optional["_LogContextScope"] = None
# keep track of whether we have hit the __exit__ block for this context
# (suggesting that the the thing that created the context thinks it should
@ -340,6 +367,9 @@ class LoggingContext:
# which request this corresponds to
self.request = self.parent_context.request
# we also track the current scope:
self.scope = self.parent_context.scope
if request is not None:
# the request param overrides the request from the parent context
self.request = request
@ -347,49 +377,9 @@ class LoggingContext:
def __str__(self) -> str:
return self.name
@classmethod
def current_context(cls) -> LoggingContextOrSentinel:
"""Get the current logging context from thread local storage
This exists for backwards compatibility. ``current_context()`` should be
called directly.
Returns:
The current logging context
"""
warnings.warn(
"synapse.logging.context.LoggingContext.current_context() is deprecated "
"in favor of synapse.logging.context.current_context().",
DeprecationWarning,
stacklevel=2,
)
return current_context()
@classmethod
def set_current_context(
cls, context: LoggingContextOrSentinel
) -> LoggingContextOrSentinel:
"""Set the current logging context in thread local storage
This exists for backwards compatibility. ``set_current_context()`` should be
called directly.
Args:
context: The context to activate.
Returns:
The context that was previously active
"""
warnings.warn(
"synapse.logging.context.LoggingContext.set_current_context() is deprecated "
"in favor of synapse.logging.context.set_current_context().",
DeprecationWarning,
stacklevel=2,
)
return set_current_context(context)
def __enter__(self) -> "LoggingContext":
"""Enters this logging context into thread local storage"""
logcontext_debug_logger.debug("LoggingContext(%s).__enter__", self.name)
old_context = set_current_context(self)
if self.previous_context != old_context:
logcontext_error(
@ -412,6 +402,9 @@ class LoggingContext:
Returns:
None to avoid suppressing any exceptions that were thrown.
"""
logcontext_debug_logger.debug(
"LoggingContext(%s).__exit__ --> %s", self.name, self.previous_context
)
current = set_current_context(self.previous_context)
if current is not self:
if current is SENTINEL_CONTEXT:
@ -660,14 +653,21 @@ class PreserveLoggingContext:
reactor back to the code).
"""
__slots__ = ["_old_context", "_new_context"]
__slots__ = ["_old_context", "_new_context", "_instance_id"]
def __init__(
self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT
) -> None:
self._new_context = new_context
self._instance_id = random_string(5)
def __enter__(self) -> None:
logcontext_debug_logger.debug(
"PreserveLoggingContext(%s).__enter__ %s --> %s",
self._instance_id,
current_context(),
self._new_context,
)
self._old_context = set_current_context(self._new_context)
def __exit__(
@ -676,6 +676,12 @@ class PreserveLoggingContext:
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
logcontext_debug_logger.debug(
"PreserveLoggingContext(%s).__exit %s --> %s",
self._instance_id,
current_context(),
self._old_context,
)
context = set_current_context(self._old_context)
if context != self._new_context:
@ -855,7 +861,11 @@ def run_in_background(
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
instance_id = random_string(5)
calling_context = current_context()
logcontext_debug_logger.debug(
"run_in_background(%s): called with logcontext=%s", instance_id, calling_context
)
try:
# (kick off the task in the current context)
res = f(*args, **kwargs)
@ -897,6 +907,11 @@ def run_in_background(
# to reset the logcontext to the sentinel logcontext as that would run
# immediately (remember our goal is to maintain the calling logcontext when we
# return).
logcontext_debug_logger.debug(
"run_in_background(%s): deferred already completed and the function should have maintained the logcontext %s",
instance_id,
calling_context,
)
return d
# Since the function we called may follow the Synapse logcontext rules (Rules for
@ -907,6 +922,11 @@ def run_in_background(
#
# Our goal is to have the caller logcontext unchanged after firing off the
# background task and returning.
logcontext_debug_logger.debug(
"run_in_background(%s): restoring calling logcontext %s",
instance_id,
calling_context,
)
set_current_context(calling_context)
# If the function we called is playing nice and following the Synapse logcontext
@ -922,7 +942,23 @@ def run_in_background(
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
def _log_set_context_cb(
result: ResultT, context: LoggingContextOrSentinel
) -> ResultT:
logcontext_debug_logger.debug(
"run_in_background(%s): resetting logcontext to %s",
instance_id,
context,
)
set_current_context(context)
return result
d.addBoth(_log_set_context_cb, SENTINEL_CONTEXT)
else:
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
return d
@ -978,10 +1014,21 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
restores the old context once the awaitable completes (execution passes from the
reactor back to the code).
"""
instance_id = random_string(5)
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): called with logcontext=%s",
instance_id,
current_context(),
)
# The deferred has already completed
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): deferred already completed and the function should have maintained the logcontext",
instance_id,
)
return deferred
# Our goal is to have the caller logcontext unchanged after they yield/await the
@ -993,8 +1040,31 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
# does) while the deferred runs in the reactor event loop, we reset the logcontext
# and add a callback to the deferred to restore it so the caller's logcontext is
# active when the deferred completes.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): resetting logcontext to %s",
instance_id,
SENTINEL_CONTEXT,
)
calling_context = set_current_context(SENTINEL_CONTEXT)
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
def _log_set_context_cb(
result: ResultT, context: LoggingContextOrSentinel
) -> ResultT:
logcontext_debug_logger.debug(
"make_deferred_yieldable(%s): restoring calling logcontext to %s",
instance_id,
context,
)
set_current_context(context)
return result
deferred.addBoth(_log_set_context_cb, calling_context)
else:
deferred.addBoth(_set_context_cb, calling_context)
return deferred

View File

@ -251,17 +251,18 @@ class _DummyTagNames:
try:
import opentracing
import opentracing.tags
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
tags = opentracing.tags
except ImportError:
opentracing = None # type: ignore[assignment]
tags = _DummyTagNames # type: ignore[assignment]
ContextVarsScopeManager = None # type: ignore
try:
from jaeger_client import Config as JaegerConfig
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
JaegerConfig = None # type: ignore
LogContextScopeManager = None # type: ignore
try:
@ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None:
config = JaegerConfig(
config=jaeger_config,
service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
scope_manager=ContextVarsScopeManager(),
scope_manager=LogContextScopeManager(),
metrics_factory=PrometheusMetricsFactory(),
)
@ -683,9 +684,21 @@ def start_active_span_from_edu(
# Opentracing setters for tags, logs, etc
@only_if_tracing
def active_span() -> Optional["opentracing.Span"]:
"""Get the currently active span, if any"""
return opentracing.tracer.active_span
def active_span(
*,
tracer: Optional["opentracing.Tracer"] = None,
) -> Optional["opentracing.Span"]:
"""
Get the currently active span, if any
Args:
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if tracer is None:
# use the global tracer by default
tracer = opentracing.tracer
return tracer.active_span
@ensure_active_span("set a tag")

View File

@ -0,0 +1,161 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import Optional
from opentracing import Scope, ScopeManager, Span
from synapse.logging.context import (
LoggingContext,
current_context,
nested_logging_context,
)
logger = logging.getLogger(__name__)
class LogContextScopeManager(ScopeManager):
"""
The LogContextScopeManager tracks the active scope in opentracing
by using the log contexts which are native to synapse. This is so
that the basic opentracing api can be used across twisted defereds.
It would be nice just to use opentracing's ContextVarsScopeManager,
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
"""
def __init__(self) -> None:
pass
@property
def active(self) -> Optional[Scope]:
"""
Returns the currently active Scope which can be used to access the
currently active Scope.span.
If there is a non-null Scope, its wrapped Span
becomes an implicit parent of any newly-created Span at
Tracer.start_active_span() time.
Return:
The Scope that is active, or None if not available.
"""
ctx = current_context()
return ctx.scope
def activate(self, span: Span, finish_on_close: bool) -> Scope:
"""
Makes a Span active.
Args
span: the span that should become active.
finish_on_close: whether Span should be automatically finished when
Scope.close() is called.
Returns:
Scope to control the end of the active period for
*span*. It is a programming error to neglect to call
Scope.close() on the returned instance.
"""
ctx = current_context()
if not ctx:
logger.error("Tried to activate scope outside of loggingcontext")
return Scope(None, span) # type: ignore[arg-type]
if ctx.scope is not None:
# start a new logging context as a child of the existing one.
# Doing so -- rather than updating the existing logcontext -- means that
# creating several concurrent spans under the same logcontext works
# correctly.
ctx = nested_logging_context("")
enter_logcontext = True
else:
# if there is no span currently associated with the current logcontext, we
# just store the scope in it.
#
# This feels a bit dubious, but it does hack around a problem where a
# span outlasts its parent logcontext (which would otherwise lead to
# "Re-starting finished log context" errors).
enter_logcontext = False
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
ctx.scope = scope
if enter_logcontext:
ctx.__enter__()
return scope
class _LogContextScope(Scope):
"""
A custom opentracing scope, associated with a LogContext
* When the scope is closed, the logcontext's active scope is reset to None.
and - if enter_logcontext was set - the logcontext is finished too.
"""
def __init__(
self,
manager: LogContextScopeManager,
span: Span,
logcontext: LoggingContext,
enter_logcontext: bool,
finish_on_close: bool,
):
"""
Args:
manager:
the manager that is responsible for this scope.
span:
the opentracing span which this scope represents the local
lifetime for.
logcontext:
the log context to which this scope is attached.
enter_logcontext:
if True the log context will be exited when the scope is finished
finish_on_close:
if True finish the span when the scope is closed
"""
super().__init__(manager, span)
self.logcontext = logcontext
self._finish_on_close = finish_on_close
self._enter_logcontext = enter_logcontext
def __str__(self) -> str:
return f"Scope<{self.span}>"
def close(self) -> None:
active_scope = self.manager.active
if active_scope is not self:
logger.error(
"Closing scope %s which is not the currently-active one %s",
self,
active_scope,
)
if self._finish_on_close:
self.span.finish()
self.logcontext.scope = None
if self._enter_logcontext:
self.logcontext.__exit__(None, None, None)

View File

@ -68,6 +68,11 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
try:
import opentracing
except ImportError:
opentracing = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
@ -225,6 +230,7 @@ def run_as_background_process(
func: Callable[..., Awaitable[Optional[R]]],
*args: Any,
bg_start_span: bool = True,
test_only_tracer: Optional["opentracing.Tracer"] = None,
**kwargs: Any,
) -> "defer.Deferred[Optional[R]]":
"""Run the given function in its own logcontext, with resource metrics
@ -250,6 +256,8 @@ def run_as_background_process(
bg_start_span: Whether to start an opentracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
test_only_tracer: Set the OpenTracing tracer to use. This is only useful for
tests.
args: positional args for func
kwargs: keyword args for func
@ -259,6 +267,12 @@ def run_as_background_process(
rules.
"""
# Since we track the tracing scope in the `LoggingContext`, before we move to the
# sentinel logcontext (or a new `LoggingContext`), grab the currently active
# tracing span (if any) so that we can create a cross-link to the background process
# trace.
original_active_tracing_span = active_span(tracer=test_only_tracer)
async def run() -> Optional[R]:
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
@ -276,8 +290,6 @@ def run_as_background_process(
) as logging_context:
try:
if bg_start_span:
original_active_tracing_span = active_span()
# If there is already an active span (e.g. because this background
# process was started as part of handling a request for example),
# because this is a long-running background task that may serve a
@ -308,6 +320,7 @@ def run_as_background_process(
# Create a root span for the background process (disconnected
# from other spans)
ignore_active_span=True,
tracer=test_only_tracer,
)
# Also add a span in the original request trace that cross-links
@ -324,8 +337,11 @@ def run_as_background_process(
f"start_bgproc.{desc}",
child_of=original_active_tracing_span,
ignore_active_span=True,
# Points to the background process span.
# Create the `FOLLOWS_FROM` reference to the background
# process span so there is a loose coupling between the two
# traces and it's easy to jump between.
contexts=[root_tracing_scope.span.context],
tracer=test_only_tracer,
):
pass
@ -341,6 +357,7 @@ def run_as_background_process(
# span so there is a loose coupling between the two
# traces and it's easy to jump between.
contexts=[original_active_tracing_span.context],
tracer=test_only_tracer,
)
# For easy usage down below, we create a context manager that
@ -359,6 +376,7 @@ def run_as_background_process(
tracing_scope = start_active_span(
f"bgproc.{desc}",
tags={SynapseTags.REQUEST_ID: str(logging_context)},
tracer=test_only_tracer,
)
else:
tracing_scope = nullcontext()

View File

@ -23,10 +23,19 @@
import logging
import re
from collections import Counter
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
from typing_extensions import Self
from synapse._pydantic_compat import (
StrictBool,
StrictStr,
validator,
)
from synapse.api.auth.mas import MasDelegatedAuth
from synapse.api.errors import (
Codes,
InteractiveAuthIncompleteError,
InvalidAPICallError,
SynapseError,
@ -37,11 +46,13 @@ from synapse.http.servlet import (
parse_integer,
parse_json_object_from_request,
parse_string,
validate_json_object,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.types import JsonDict, StreamToken
from synapse.types.rest import RequestBodyModel
from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
@ -59,7 +70,6 @@ class KeyUploadServlet(RestServlet):
"device_keys": {
"user_id": "<user_id>",
"device_id": "<device_id>",
"valid_until_ts": <millisecond_timestamp>,
"algorithms": [
"m.olm.curve25519-aes-sha2",
]
@ -111,12 +121,123 @@ class KeyUploadServlet(RestServlet):
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
class KeyUploadRequestBody(RequestBodyModel):
"""
The body of a `POST /_matrix/client/v3/keys/upload` request.
Based on https://spec.matrix.org/v1.16/client-server-api/#post_matrixclientv3keysupload.
"""
class DeviceKeys(RequestBodyModel):
algorithms: List[StrictStr]
"""The encryption algorithms supported by this device."""
device_id: StrictStr
"""The ID of the device these keys belong to. Must match the device ID used when logging in."""
keys: Mapping[StrictStr, StrictStr]
"""
Public identity keys. The names of the properties should be in the
format `<algorithm>:<device_id>`. The keys themselves should be encoded as
specified by the key algorithm.
"""
signatures: Mapping[StrictStr, Mapping[StrictStr, StrictStr]]
"""Signatures for the device key object. A map from user ID, to a map from "<algorithm>:<device_id>" to the signature."""
user_id: StrictStr
"""The ID of the user the device belongs to. Must match the user ID used when logging in."""
class KeyObject(RequestBodyModel):
key: StrictStr
"""The key, encoded using unpadded base64."""
fallback: Optional[StrictBool] = False
"""Whether this is a fallback key. Only used when handling fallback keys."""
signatures: Mapping[StrictStr, Mapping[StrictStr, StrictStr]]
"""Signature for the device. Mapped from user ID to another map of key signing identifier to the signature itself.
See the following for more detail: https://spec.matrix.org/v1.16/appendices/#signing-details
"""
device_keys: Optional[DeviceKeys] = None
"""Identity keys for the device. May be absent if no new identity keys are required."""
fallback_keys: Optional[Mapping[StrictStr, Union[StrictStr, KeyObject]]]
"""
The public key which should be used if the device's one-time keys are
exhausted. The fallback key is not deleted once used, but should be
replaced when additional one-time keys are being uploaded. The server
will notify the client of the fallback key being used through `/sync`.
There can only be at most one key per algorithm uploaded, and the server
will only persist one key per algorithm.
When uploading a signed key, an additional fallback: true key should be
included to denote that the key is a fallback key.
May be absent if a new fallback key is not required.
"""
@validator("fallback_keys", pre=True)
def validate_fallback_keys(cls: Self, v: Any) -> Any:
if v is None:
return v
if not isinstance(v, dict):
raise TypeError("fallback_keys must be a mapping")
for k, _ in v.items():
if not len(k.split(":")) == 2:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg=f"Invalid fallback_keys key {k!r}. "
'Expected "<algorithm>:<device_id>".',
)
return v
one_time_keys: Optional[Mapping[StrictStr, Union[StrictStr, KeyObject]]] = None
"""
One-time public keys for pre-key messages. The names of the properties
should be in the format `<algorithm>:<key_id>`.
The format of the key is determined by the key algorithm, see:
https://spec.matrix.org/v1.16/client-server-api/#key-algorithms.
"""
@validator("one_time_keys", pre=True)
def validate_one_time_keys(cls: Self, v: Any) -> Any:
if v is None:
return v
if not isinstance(v, dict):
raise TypeError("one_time_keys must be a mapping")
for k, _ in v.items():
if not len(k.split(":")) == 2:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg=f"Invalid one_time_keys key {k!r}. "
'Expected "<algorithm>:<device_id>".',
)
return v
async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user_id = requester.user.to_string()
# Parse the request body. Validate separately, as the handler expects a
# plain dict, rather than any parsed object.
#
# Note: It would be nice to work with a parsed object, but the handler
# needs to encode portions of the request body as canonical JSON before
# storing the result in the DB. There's little point in converted to a
# parsed object and then back to a dict.
body = parse_json_object_from_request(request)
validate_json_object(body, self.KeyUploadRequestBody)
if device_id is not None:
# Providing the device_id should only be done for setting keys
@ -149,8 +270,31 @@ class KeyUploadServlet(RestServlet):
400, "To upload keys, you must pass device_id when authenticating"
)
if "device_keys" in body:
# Validate the provided `user_id` and `device_id` fields in
# `device_keys` match that of the requesting user. We can't do
# this directly in the pydantic model as we don't have access
# to the requester yet.
#
# TODO: We could use ValidationInfo when we switch to Pydantic v2.
# https://docs.pydantic.dev/latest/concepts/validators/#validation-info
if body["device_keys"]["user_id"] != user_id:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg="Provided `user_id` in `device_keys` does not match that of the authenticated user",
)
if body["device_keys"]["device_id"] != device_id:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg="Provided `device_id` in `device_keys` does not match that of the authenticated user device",
)
result = await self.e2e_keys_handler.upload_keys_for_user(
user_id=user_id, device_id=device_id, keys=body
user_id=user_id,
device_id=device_id,
keys=body,
)
return 200, result

View File

@ -17,6 +17,10 @@ from twisted.internet.defer import Deferred
from synapse.types import ISynapseReactor
class HttpClient:
"""
The returned deferreds follow Synapse logcontext rules.
"""
def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: ...
def get(self, url: str, response_limit: int) -> Deferred[bytes]: ...
def post(

View File

@ -27,8 +27,8 @@ from twisted.python.failure import Failure
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
PreserveLoggingContext,
nested_logging_context,
set_current_context,
)
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import (
@ -422,14 +422,11 @@ class TaskScheduler:
"""
current_time = self._clock.time()
calling_context = set_current_context(task_log_context)
try:
with PreserveLoggingContext(task_log_context):
usage = task_log_context.get_resource_usage()
TaskScheduler._log_task_usage(
"continuing", task, usage, current_time - start_time
)
finally:
set_current_context(calling_context)
async def wrapper() -> None:
with nested_logging_context(task.id) as log_context:

View File

@ -19,7 +19,7 @@
#
#
from typing import Awaitable, Dict, cast
from typing import Awaitable, Optional, cast
from twisted.internet import defer
from twisted.internet.testing import MemoryReactorClock
@ -35,20 +35,25 @@ from synapse.logging.opentracing import (
tag_args,
trace_with_opname,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.clock import Clock
try:
import opentracing
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
except ImportError:
opentracing = None # type: ignore
ContextVarsScopeManager = None # type: ignore
from tests.server import get_clock
try:
import jaeger_client
except ImportError:
jaeger_client = None # type: ignore
try:
import opentracing
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
opentracing = None # type: ignore
LogContextScopeManager = None # type: ignore
import logging
from tests.unittest import TestCase
@ -56,7 +61,7 @@ from tests.unittest import TestCase
logger = logging.getLogger(__name__)
class TracingScopeTestCase(TestCase):
class LogContextScopeManagerTestCase(TestCase):
"""
Test that our tracing machinery works well in a variety of situations (especially
with Twisted's runtime and deferreds).
@ -67,7 +72,7 @@ class TracingScopeTestCase(TestCase):
opentracing backend is Jaeger.
"""
if opentracing is None:
if opentracing is None or LogContextScopeManager is None:
skip = "Requires opentracing" # type: ignore[unreachable]
if jaeger_client is None:
skip = "Requires jaeger_client" # type: ignore[unreachable]
@ -77,9 +82,8 @@ class TracingScopeTestCase(TestCase):
# global variables that power opentracing. We create our own tracer instance
# and test with it.
scope_manager = ContextVarsScopeManager()
config = jaeger_client.config.Config(
config={}, service_name="test", scope_manager=scope_manager
config={}, service_name="test", scope_manager=LogContextScopeManager()
)
self._reporter = jaeger_client.reporter.InMemoryReporter()
@ -220,144 +224,6 @@ class TracingScopeTestCase(TestCase):
[scopes[1].span, scopes[2].span, scopes[0].span],
)
def test_run_in_background_active_scope_still_available(self) -> None:
"""
Test that tasks running via `run_in_background` still have access to the
active tracing scope.
This is a regression test for a previous Synapse issue where the tracing scope
would `__exit__` and close before the `run_in_background` task completed and our
own previous custom `_LogContextScope.close(...)` would clear
`LoggingContext.scope` preventing further tracing spans from having the correct
parent.
"""
reactor = MemoryReactorClock()
# type-ignore: mypy-zope doesn't seem to recognise that `MemoryReactorClock`
# implements `ISynapseThreadlessReactor` (combination of the normal Twisted
# Reactor/Clock interfaces), via inheritance from
# `twisted.internet.testing.MemoryReactor` and `twisted.internet.testing.Clock`
# Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock`
# for testing purposes.
clock = Clock( # type: ignore[multiple-internal-clocks]
reactor, # type: ignore[arg-type]
server_name="test_server",
)
scope_map: Dict[str, opentracing.Scope] = {}
async def async_task() -> None:
root_scope = scope_map["root"]
root_context = cast(jaeger_client.SpanContext, root_scope.span.context)
self.assertEqual(
self._tracer.active_span,
root_scope.span,
"expected to inherit the root tracing scope from where this was run",
)
# Return control back to the reactor thread and wait an arbitrary amount
await clock.sleep(4)
# This is a key part of what we're testing! In a previous version of
# Synapse, we would lose the active span at this point.
self.assertEqual(
self._tracer.active_span,
root_scope.span,
"expected to still have a root tracing scope/span active",
)
# For complete-ness sake, let's also trace more sub-tasks here and assert
# they have the correct span parents as well (root)
# Start tracing some other sub-task.
#
# This is a key part of what we're testing! In a previous version of
# Synapse, it would have the incorrect span parents.
scope = start_active_span(
"task1",
tracer=self._tracer,
)
scope_map["task1"] = scope
# Ensure the span parent is pointing to the root scope
context = cast(jaeger_client.SpanContext, scope.span.context)
self.assertEqual(
context.parent_id,
root_context.span_id,
"expected task1 parent to be the root span",
)
# Ensure that the active span is our new sub-task now
self.assertEqual(self._tracer.active_span, scope.span)
# Return control back to the reactor thread and wait an arbitrary amount
await clock.sleep(4)
# We should still see the active span as the scope wasn't closed yet
self.assertEqual(self._tracer.active_span, scope.span)
scope.close()
async def root() -> None:
with start_active_span(
"root span",
tracer=self._tracer,
# We will close this off later. We're basically just mimicking the same
# pattern for how we handle requests. We pass the span off to the
# request for it to finish.
finish_on_close=False,
) as root_scope:
scope_map["root"] = root_scope
self.assertEqual(self._tracer.active_span, root_scope.span)
# Fire-and-forget a task
#
# XXX: The root scope context manager will `__exit__` before this task
# completes.
run_in_background(async_task)
# Because we used `run_in_background`, the active span should still be
# the root.
self.assertEqual(self._tracer.active_span, root_scope.span)
# We shouldn't see any active spans outside of the scope
self.assertIsNone(self._tracer.active_span)
with LoggingContext(name="root context", server_name="test_server"):
# Start the test off
d_root = defer.ensureDeferred(root())
# Let the tasks complete
reactor.pump((2,) * 8)
self.successResultOf(d_root)
# After we see all of the tasks are done (like a request when it
# `_finished_processing`), let's finish our root span
scope_map["root"].span.finish()
# Sanity check again: We shouldn't see any active spans leftover in this
# this context.
self.assertIsNone(self._tracer.active_span)
# The spans should be reported in order of their finishing: task 1, task 2,
# root.
#
# We use `assertIncludes` just as an easier way to see if items are missing or
# added. We assert the order just below
self.assertIncludes(
set(self._reporter.get_spans()),
{
scope_map["task1"].span,
scope_map["root"].span,
},
exact=True,
)
# This is where we actually assert the correct order
self.assertEqual(
self._reporter.get_spans(),
[
scope_map["task1"].span,
scope_map["root"].span,
],
)
def test_trace_decorator_sync(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
@ -455,3 +321,203 @@ class TracingScopeTestCase(TestCase):
[span.operation_name for span in self._reporter.get_spans()],
["fixture_awaitable_return_func"],
)
async def test_run_as_background_process_standalone(self) -> None:
"""
Test to make sure that the background process work starts its own trace.
"""
reactor, clock = get_clock()
callback_finished = False
active_span_in_callback: Optional[jaeger_client.Span] = None
async def bg_task() -> None:
nonlocal callback_finished, active_span_in_callback
try:
assert isinstance(self._tracer.active_span, jaeger_client.Span)
active_span_in_callback = self._tracer.active_span
finally:
# When exceptions happen, we still want to mark the callback as finished
# so that the test can complete and we see the underlying error.
callback_finished = True
# type-ignore: We ignore because the point is to test the bare function
run_as_background_process( # type: ignore[untracked-background-process]
desc="some-bg-task",
server_name="test_server",
func=bg_task,
test_only_tracer=self._tracer,
)
# Now wait for the background process to finish
while not callback_finished:
await clock.sleep(0)
self.assertTrue(
callback_finished,
"Callback never finished which means the test probably didn't wait long enough",
)
self.assertEqual(
active_span_in_callback.operation_name if active_span_in_callback else None,
"bgproc.some-bg-task",
"expected a new span to be started for the background task",
)
# The spans should be reported in order of their finishing.
#
# We use `assertIncludes` just as an easier way to see if items are missing or
# added. We assert the order just below
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
expected_spans = ["bgproc.some-bg-task"]
self.assertIncludes(
set(actual_spans),
set(expected_spans),
exact=True,
)
# This is where we actually assert the correct order
self.assertEqual(
actual_spans,
expected_spans,
)
async def test_run_as_background_process_cross_link(self) -> None:
"""
Test to make sure that the background process work has its own trace and is
disconnected from any currently active trace (like a request). But we still have
cross-links between the two traces if there was already an active trace/span when
we kicked off the background process.
"""
reactor, clock = get_clock()
callback_finished = False
active_span_in_callback: Optional[jaeger_client.Span] = None
async def bg_task() -> None:
nonlocal callback_finished, active_span_in_callback
try:
assert isinstance(self._tracer.active_span, jaeger_client.Span)
active_span_in_callback = self._tracer.active_span
finally:
# When exceptions happen, we still want to mark the callback as finished
# so that the test can complete and we see the underlying error.
callback_finished = True
with LoggingContext(name="some-request", server_name="test_server"):
with start_active_span(
"some-request",
tracer=self._tracer,
):
# type-ignore: We ignore because the point is to test the bare function
run_as_background_process( # type: ignore[untracked-background-process]
desc="some-bg-task",
server_name="test_server",
func=bg_task,
test_only_tracer=self._tracer,
)
# Now wait for the background process to finish
while not callback_finished:
await clock.sleep(0)
self.assertTrue(
callback_finished,
"Callback never finished which means the test probably didn't wait long enough",
)
# We start `bgproc.some-bg-task` and `bgproc_child.some-bg-task` (see
# `run_as_background_process` implementation for why). Either is fine but for
# now we expect the child as its the innermost one that was started.
self.assertEqual(
active_span_in_callback.operation_name if active_span_in_callback else None,
"bgproc_child.some-bg-task",
"expected a new span to be started for the background task",
)
# The spans should be reported in order of their finishing.
#
# We use `assertIncludes` just as an easier way to see if items are missing or
# added. We assert the order just below
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
expected_spans = [
"start_bgproc.some-bg-task",
"bgproc_child.some-bg-task",
"bgproc.some-bg-task",
"some-request",
]
self.assertIncludes(
set(actual_spans),
set(expected_spans),
exact=True,
)
# This is where we actually assert the correct order
self.assertEqual(
actual_spans,
expected_spans,
)
span_map = {span.operation_name: span for span in self._reporter.get_spans()}
span_id_to_friendly_name = {
span.span_id: span.operation_name for span in self._reporter.get_spans()
}
def get_span_friendly_name(span_id: Optional[int]) -> str:
if span_id is None:
return "None"
return span_id_to_friendly_name.get(span_id, f"unknown span {span_id}")
# Ensure the background process trace/span is disconnected from the request
# trace/span.
self.assertNotEqual(
get_span_friendly_name(span_map["bgproc.some-bg-task"].parent_id),
get_span_friendly_name(span_map["some-request"].span_id),
)
# We should see a cross-link in the request trace pointing to the background
# process trace.
#
# Make sure `start_bgproc.some-bg-task` is part of the request trace
self.assertEqual(
get_span_friendly_name(span_map["start_bgproc.some-bg-task"].parent_id),
get_span_friendly_name(span_map["some-request"].span_id),
)
# And has some references to the background process trace
self.assertIncludes(
{
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
else f"{reference.type}:None"
for reference in (
span_map["start_bgproc.some-bg-task"].references or []
)
},
{
f"follows_from:{get_span_friendly_name(span_map['bgproc.some-bg-task'].span_id)}"
},
exact=True,
)
# We should see a cross-link in the background process trace pointing to the
# request trace that kicked off the work.
#
# Make sure `start_bgproc.some-bg-task` is part of the request trace
self.assertEqual(
get_span_friendly_name(span_map["bgproc_child.some-bg-task"].parent_id),
get_span_friendly_name(span_map["bgproc.some-bg-task"].span_id),
)
# And has some references to the background process trace
self.assertIncludes(
{
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
else f"{reference.type}:None"
for reference in (
span_map["bgproc_child.some-bg-task"].references or []
)
},
{
f"follows_from:{get_span_friendly_name(span_map['some-request'].span_id)}"
},
exact=True,
)

View File

@ -40,6 +40,127 @@ from tests.unittest import override_config
from tests.utils import HAS_AUTHLIB
class KeyUploadTestCase(unittest.HomeserverTestCase):
servlets = [
keys.register_servlets,
admin.register_servlets_for_client_rest_resource,
login.register_servlets,
]
def test_upload_keys_fails_on_invalid_structure(self) -> None:
"""Check that we validate the structure of keys upon upload.
Regression test for https://github.com/element-hq/synapse/pull/17097
"""
self.register_user("alice", "wonderland")
alice_token = self.login("alice", "wonderland")
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
# Error: device_keys must be a dict
"device_keys": ["some", "stuff", "weewoo"]
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
# Error: properties of fallback_keys must be in the form `<algorithm>:<device_id>`
"fallback_keys": {"invalid_key": "signature_base64"}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
# Same as above, but for one_time_keys
"one_time_keys": {"invalid_key": "signature_base64"}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
def test_upload_keys_fails_on_invalid_user_id_or_device_id(self) -> None:
"""
Validate that the requesting user is uploading their own keys and nobody
else's.
"""
device_id = "DEVICE_ID"
alice_user_id = self.register_user("alice", "wonderland")
alice_token = self.login("alice", "wonderland", device_id=device_id)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
"device_keys": {
# Included `user_id` does not match requesting user.
"user_id": "@unknown_user:test",
"device_id": device_id,
"algorithms": ["m.olm.curve25519-aes-sha2"],
"keys": {
f"ed25519:{device_id}": "publickey",
},
"signatures": {},
}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
"device_keys": {
"user_id": alice_user_id,
# Included `device_id` does not match requesting user's.
"device_id": "UNKNOWN_DEVICE_ID",
"algorithms": ["m.olm.curve25519-aes-sha2"],
"keys": {
f"ed25519:{device_id}": "publickey",
},
"signatures": {},
}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
class KeyQueryTestCase(unittest.HomeserverTestCase):
servlets = [
keys.register_servlets,

View File

@ -0,0 +1,11 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.

View File

@ -0,0 +1,225 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
import json
import logging
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Coroutine, Generator, TypeVar, Union
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.internet.testing import MemoryReactor
from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
_Sentinel,
current_context,
run_in_background,
)
from synapse.server import HomeServer
from synapse.synapse_rust.http_client import HttpClient
from synapse.util.clock import Clock
from synapse.util.json import json_decoder
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
T = TypeVar("T")
class StubRequestHandler(BaseHTTPRequestHandler):
server: "StubServer"
def do_GET(self) -> None:
self.server.calls += 1
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"ok": True}).encode("utf-8"))
def log_message(self, format: str, *args: Any) -> None:
# Don't log anything; by default, the server logs to stderr
pass
class StubServer(HTTPServer):
"""A stub HTTP server that we can send requests to for testing.
This opens a real HTTP server on a random port, on a separate thread.
"""
calls: int = 0
"""How many times has the endpoint been requested."""
_thread: threading.Thread
def __init__(self) -> None:
super().__init__(("127.0.0.1", 0), StubRequestHandler)
self._thread = threading.Thread(
target=self.serve_forever,
name="StubServer",
kwargs={"poll_interval": 0.01},
daemon=True,
)
self._thread.start()
def shutdown(self) -> None:
super().shutdown()
self._thread.join()
@property
def endpoint(self) -> str:
return f"http://127.0.0.1:{self.server_port}/"
class HttpClientTestCase(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
hs = self.setup_test_homeserver()
# XXX: We must create the Rust HTTP client before we call `reactor.run()` below.
# Twisted's `MemoryReactor` doesn't invoke `callWhenRunning` callbacks if it's
# already running and we rely on that to start the Tokio thread pool in Rust. In
# the future, this may not matter, see https://github.com/twisted/twisted/pull/12514
self._http_client = hs.get_proxied_http_client()
self._rust_http_client = HttpClient(
reactor=hs.get_reactor(),
user_agent=self._http_client.user_agent.decode("utf8"),
)
# This triggers the server startup hooks, which starts the Tokio thread pool
reactor.run()
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.server = StubServer()
def tearDown(self) -> None:
# MemoryReactor doesn't trigger the shutdown phases, and we want the
# Tokio thread pool to be stopped
# XXX: This logic should probably get moved somewhere else
shutdown_triggers = self.reactor.triggers.get("shutdown", {})
for phase in ["before", "during", "after"]:
triggers = shutdown_triggers.get(phase, [])
for callbable, args, kwargs in triggers:
callbable(*args, **kwargs)
def till_deferred_has_result(
self,
awaitable: Union[
"Coroutine[Deferred[Any], Any, T]",
"Generator[Deferred[Any], Any, T]",
"Deferred[T]",
],
) -> "Deferred[T]":
"""Wait until a deferred has a result.
This is useful because the Rust HTTP client will resolve the deferred
using reactor.callFromThread, which are only run when we call
reactor.advance.
"""
deferred = ensureDeferred(awaitable)
tries = 0
while not deferred.called:
time.sleep(0.1)
self.reactor.advance(0)
tries += 1
if tries > 100:
raise Exception("Timed out waiting for deferred to resolve")
return deferred
def _check_current_logcontext(self, expected_logcontext_string: str) -> None:
context = current_context()
assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), (
f"Expected LoggingContext({expected_logcontext_string}) but saw {context}"
)
self.assertEqual(
str(context),
expected_logcontext_string,
f"Expected LoggingContext({expected_logcontext_string}) but saw {context}",
)
def test_request_response(self) -> None:
"""
Test to make sure we can make a basic request and get the expected
response.
"""
async def do_request() -> None:
resp_body = await self._rust_http_client.get(
url=self.server.endpoint,
response_limit=1 * 1024 * 1024,
)
raw_response = json_decoder.decode(resp_body.decode("utf-8"))
self.assertEqual(raw_response, {"ok": True})
self.get_success(self.till_deferred_has_result(do_request()))
self.assertEqual(self.server.calls, 1)
async def test_logging_context(self) -> None:
"""
Test to make sure the `LoggingContext` (logcontext) is handled correctly
when making requests.
"""
# Sanity check that we start in the sentinel context
self._check_current_logcontext("sentinel")
callback_finished = False
async def do_request() -> None:
nonlocal callback_finished
try:
# Should have the same logcontext as the caller
self._check_current_logcontext("foo")
with LoggingContext(name="competing", server_name="test_server"):
# Make the actual request
await self._rust_http_client.get(
url=self.server.endpoint,
response_limit=1 * 1024 * 1024,
)
self._check_current_logcontext("competing")
# Back to the caller's context outside of the `LoggingContext` block
self._check_current_logcontext("foo")
finally:
# When exceptions happen, we still want to mark the callback as finished
# so that the test can complete and we see the underlying error.
callback_finished = True
with LoggingContext(name="foo", server_name="test_server"):
# Fire off the function, but don't wait on it.
run_in_background(do_request)
# Now wait for the function under test to have run
with PreserveLoggingContext():
while not callback_finished:
# await self.hs.get_clock().sleep(0)
time.sleep(0.1)
self.reactor.advance(0)
# check that the logcontext is left in a sane state.
self._check_current_logcontext("foo")
self.assertTrue(
callback_finished,
"Callback never finished which means the test probably didn't wait long enough",
)
# Back to the sentinel context
self._check_current_logcontext("sentinel")