mirror of
https://github.com/element-hq/synapse.git
synced 2025-10-03 00:01:04 -04:00
Compare commits
15 Commits
19bdff8716
...
1b9f22d520
Author | SHA1 | Date | |
---|---|---|---|
|
1b9f22d520 | ||
|
70c044db8e | ||
|
6835e7be0d | ||
|
d27ff161f5 | ||
|
06a84f4fe0 | ||
|
1c093509ce | ||
|
0615b64bb4 | ||
|
339660851d | ||
|
2584a6fe8f | ||
|
9766e110e4 | ||
|
960bb62788 | ||
|
c7e4846a2e | ||
|
97dc729005 | ||
|
0f35e9af04 | ||
|
33e0abff8c |
1
changelog.d/18253.feature
Normal file
1
changelog.d/18253.feature
Normal file
@ -0,0 +1 @@
|
||||
Add admin API for fetching (paginated) room reports.
|
1
changelog.d/18903.misc
Normal file
1
changelog.d/18903.misc
Normal file
@ -0,0 +1 @@
|
||||
Wrap the Rust HTTP client with `make_deferred_yieldable` so it follows Synapse logcontext rules.
|
1
changelog.d/18966.misc
Normal file
1
changelog.d/18966.misc
Normal file
@ -0,0 +1 @@
|
||||
Add debug logs wherever we change current logcontext.
|
1
changelog.d/18989.removal
Normal file
1
changelog.d/18989.removal
Normal file
@ -0,0 +1 @@
|
||||
Remove deprecated `LoggingContext.set_current_context`/`LoggingContext.current_context` methods which already have equivalent bare methods in `synapse.logging.context`.
|
1
changelog.d/18990.misc
Normal file
1
changelog.d/18990.misc
Normal file
@ -0,0 +1 @@
|
||||
Switch task scheduler from raw logcontext manipulation to using the dedicated logcontext utils.
|
1
changelog.d/19007.misc
Normal file
1
changelog.d/19007.misc
Normal file
@ -0,0 +1 @@
|
||||
Switch back to our own custom `LogContextScopeManager` instead of OpenTracing's `ContextVarsScopeManager` which was causing problems when using the experimental `SYNAPSE_ASYNC_IO_REACTOR` option with tracing enabled.
|
76
docs/admin_api/room_reports.md
Normal file
76
docs/admin_api/room_reports.md
Normal file
@ -0,0 +1,76 @@
|
||||
# Show reported rooms
|
||||
|
||||
This API returns information about reported rooms.
|
||||
|
||||
To use it, you will need to authenticate by providing an `access_token`
|
||||
for a server admin: see [Admin API](../usage/administration/admin_api/).
|
||||
|
||||
The api is:
|
||||
```
|
||||
GET /_synapse/admin/v1/room_reports?from=0&limit=10
|
||||
```
|
||||
|
||||
It returns a JSON body like the following:
|
||||
|
||||
```json
|
||||
{
|
||||
"room_reports": [
|
||||
{
|
||||
"id": 2,
|
||||
"reason": "foo",
|
||||
"received_ts": 1570897107409,
|
||||
"canonical_alias": "#alias1:matrix.org",
|
||||
"room_id": "!ERAgBpSOcCCuTJqQPk:matrix.org",
|
||||
"name": "Matrix HQ",
|
||||
"user_id": "@foo:matrix.org"
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"reason": "bar",
|
||||
"received_ts": 1598889612059,
|
||||
"canonical_alias": "#alias2:matrix.org",
|
||||
"room_id": "!eGvUQuTCkHGVwNMOjv:matrix.org",
|
||||
"name": "Your room name here",
|
||||
"user_id": "@bar:matrix.org"
|
||||
}
|
||||
],
|
||||
"next_token": 2,
|
||||
"total": 4
|
||||
}
|
||||
```
|
||||
|
||||
To paginate, check for `next_token` and if present, call the endpoint again with `from`
|
||||
set to the value of `next_token` and the same `limit`. This will return a new page.
|
||||
|
||||
If the endpoint does not return a `next_token` then there are no more reports to
|
||||
paginate through.
|
||||
|
||||
**Query parameters:**
|
||||
|
||||
* `limit`: integer - Is optional but is used for pagination, denoting the maximum number
|
||||
of items to return in this call. Defaults to `100`.
|
||||
* `from`: integer - Is optional but used for pagination, denoting the offset in the
|
||||
returned results. This should be treated as an opaque value and not explicitly set to
|
||||
anything other than the return value of `next_token` from a previous call. Defaults to `0`.
|
||||
* `dir`: string - Direction of event report order. Whether to fetch the most recent
|
||||
first (`b`) or the oldest first (`f`). Defaults to `b`.
|
||||
* `user_id`: optional string - Filter by the user ID of the reporter. This is the user who reported the event
|
||||
and wrote the reason.
|
||||
* `room_id`: optional string - Filter by (reported) room id.
|
||||
|
||||
**Response**
|
||||
|
||||
The following fields are returned in the JSON response body:
|
||||
|
||||
* `id`: integer - ID of room report.
|
||||
* `received_ts`: integer - The timestamp (in milliseconds since the unix epoch) when this
|
||||
report was sent.
|
||||
* `room_id`: string - The ID of the room being reported.
|
||||
* `name`: string - The name of the room.
|
||||
* `user_id`: string - This is the user who reported the room and wrote the reason.
|
||||
* `reason`: string - Comment made by the `user_id` in this report. May be blank or `null`.
|
||||
* `canonical_alias`: string - The canonical alias of the room. `null` if the room does not
|
||||
have a canonical alias set.
|
||||
* `next_token`: integer - Indication for pagination. See above.
|
||||
* `total`: integer - Total number of room reports related to the query
|
||||
(`user_id` and `room_id`).
|
@ -548,3 +548,19 @@ chain are dropped. Dropping the the reference to an awaitable you're
|
||||
supposed to be awaiting is bad practice, so this doesn't
|
||||
actually happen too much. Unfortunately, when it does happen, it will
|
||||
lead to leaked logcontexts which are incredibly hard to track down.
|
||||
|
||||
|
||||
## Debugging logcontext issues
|
||||
|
||||
Debugging logcontext issues can be tricky as leaking or losing a logcontext will surface
|
||||
downstream and can point to an unrelated part of the codebase. It's best to enable debug
|
||||
logging for `synapse.logging.context.debug` (needs to be explicitly configured) and go
|
||||
backwards in the logs from the point where the issue is observed to find the root cause.
|
||||
|
||||
`log.config.yaml`
|
||||
```yaml
|
||||
loggers:
|
||||
# Unlike other loggers, this one needs to be explicitly configured to see debug logs.
|
||||
synapse.logging.context.debug:
|
||||
level: DEBUG
|
||||
```
|
||||
|
6
poetry.lock
generated
6
poetry.lock
generated
@ -1589,14 +1589,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "phonenumbers"
|
||||
version = "9.0.14"
|
||||
version = "9.0.15"
|
||||
description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "phonenumbers-9.0.14-py2.py3-none-any.whl", hash = "sha256:6bdf5c46dbfefa1d941d122432d1958418d1dfe3f8c8c81d4c8e80f5442ea41f"},
|
||||
{file = "phonenumbers-9.0.14.tar.gz", hash = "sha256:98afb3e86bf9ae02cc7c98ca44fa8827babb72842f90da9884c5d998937572ae"},
|
||||
{file = "phonenumbers-9.0.15-py2.py3-none-any.whl", hash = "sha256:269b73bc05258e8fd57582770b9559307099ea677c8f1dc5272476f661344776"},
|
||||
{file = "phonenumbers-9.0.15.tar.gz", hash = "sha256:345ff7f23768332d866f37732f815cdf1d33c7f0961246562a5c5b78c12c3ff3"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -12,7 +12,7 @@
|
||||
* <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
*/
|
||||
|
||||
use std::{collections::HashMap, future::Future};
|
||||
use std::{collections::HashMap, future::Future, sync::OnceLock};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::TryStreamExt;
|
||||
@ -299,5 +299,22 @@ where
|
||||
});
|
||||
});
|
||||
|
||||
Ok(deferred)
|
||||
// Make the deferred follow the Synapse logcontext rules
|
||||
make_deferred_yieldable(py, &deferred)
|
||||
}
|
||||
|
||||
static MAKE_DEFERRED_YIELDABLE: OnceLock<pyo3::Py<pyo3::PyAny>> = OnceLock::new();
|
||||
|
||||
/// Given a deferred, make it follow the Synapse logcontext rules
|
||||
fn make_deferred_yieldable<'py>(
|
||||
py: Python<'py>,
|
||||
deferred: &Bound<'py, PyAny>,
|
||||
) -> PyResult<Bound<'py, PyAny>> {
|
||||
let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| {
|
||||
let sys = PyModule::import(py, "synapse.logging.context").unwrap();
|
||||
let func = sys.getattr("make_deferred_yieldable").unwrap().unbind();
|
||||
func
|
||||
});
|
||||
|
||||
make_deferred_yieldable.call1(py, (deferred,))?.extract(py)
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ from synapse.api.errors import (
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import (
|
||||
active_span,
|
||||
force_tracing,
|
||||
@ -229,13 +228,12 @@ class MasDelegatedAuth(BaseAuth):
|
||||
try:
|
||||
with start_active_span("mas-introspect-token"):
|
||||
inject_request_headers(raw_headers)
|
||||
with PreserveLoggingContext():
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=self._introspection_endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
headers=raw_headers,
|
||||
request_body=body,
|
||||
)
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=self._introspection_endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
headers=raw_headers,
|
||||
request_body=body,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
end_time = self._clock.time()
|
||||
introspection_response_timer.labels(
|
||||
|
@ -38,7 +38,6 @@ from synapse.api.errors import (
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import (
|
||||
active_span,
|
||||
force_tracing,
|
||||
@ -327,13 +326,12 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
try:
|
||||
with start_active_span("mas-introspect-token"):
|
||||
inject_request_headers(raw_headers)
|
||||
with PreserveLoggingContext():
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=uri,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
headers=raw_headers,
|
||||
request_body=body,
|
||||
)
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=uri,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
headers=raw_headers,
|
||||
request_body=body,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
end_time = self._clock.time()
|
||||
introspection_response_timer.labels(
|
||||
|
@ -33,7 +33,6 @@ See doc/log_contexts.rst for details on how this works.
|
||||
import logging
|
||||
import threading
|
||||
import typing
|
||||
import warnings
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@ -55,11 +54,29 @@ from typing_extensions import ParamSpec
|
||||
from twisted.internet import defer, threads
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
|
||||
from synapse.logging.loggers import ExplicitlyConfiguredLogger
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.logging.scopecontextmanager import _LogContextScope
|
||||
from synapse.types import ISynapseReactor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
original_logger_class = logging.getLoggerClass()
|
||||
logging.setLoggerClass(ExplicitlyConfiguredLogger)
|
||||
logcontext_debug_logger = logging.getLogger("synapse.logging.context.debug")
|
||||
"""
|
||||
A logger for debugging when the logcontext switches.
|
||||
|
||||
Because this is very noisy and probably something only developers want to see when
|
||||
debugging logcontext problems, we want people to explictly opt-in before seeing anything
|
||||
in the logs. Requires explicitly setting `synapse.logging.context.debug` in the logging
|
||||
configuration and does not inherit the log level from the parent logger.
|
||||
"""
|
||||
# Restore the original logger class
|
||||
logging.setLoggerClass(original_logger_class)
|
||||
|
||||
try:
|
||||
import resource
|
||||
|
||||
@ -238,7 +255,14 @@ class _Sentinel:
|
||||
we should always know which server the logs are coming from.
|
||||
"""
|
||||
|
||||
__slots__ = ["previous_context", "finished", "server_name", "request", "tag"]
|
||||
__slots__ = [
|
||||
"previous_context",
|
||||
"finished",
|
||||
"scope",
|
||||
"server_name",
|
||||
"request",
|
||||
"tag",
|
||||
]
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Minimal set for compatibility with LoggingContext
|
||||
@ -246,6 +270,7 @@ class _Sentinel:
|
||||
self.finished = False
|
||||
self.server_name = "unknown_server_from_sentinel_context"
|
||||
self.request = None
|
||||
self.scope = None
|
||||
self.tag = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
@ -303,6 +328,7 @@ class LoggingContext:
|
||||
"finished",
|
||||
"request",
|
||||
"tag",
|
||||
"scope",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@ -327,6 +353,7 @@ class LoggingContext:
|
||||
self.main_thread = get_thread_id()
|
||||
self.request = None
|
||||
self.tag = ""
|
||||
self.scope: Optional["_LogContextScope"] = None
|
||||
|
||||
# keep track of whether we have hit the __exit__ block for this context
|
||||
# (suggesting that the the thing that created the context thinks it should
|
||||
@ -340,6 +367,9 @@ class LoggingContext:
|
||||
# which request this corresponds to
|
||||
self.request = self.parent_context.request
|
||||
|
||||
# we also track the current scope:
|
||||
self.scope = self.parent_context.scope
|
||||
|
||||
if request is not None:
|
||||
# the request param overrides the request from the parent context
|
||||
self.request = request
|
||||
@ -347,49 +377,9 @@ class LoggingContext:
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
@classmethod
|
||||
def current_context(cls) -> LoggingContextOrSentinel:
|
||||
"""Get the current logging context from thread local storage
|
||||
|
||||
This exists for backwards compatibility. ``current_context()`` should be
|
||||
called directly.
|
||||
|
||||
Returns:
|
||||
The current logging context
|
||||
"""
|
||||
warnings.warn(
|
||||
"synapse.logging.context.LoggingContext.current_context() is deprecated "
|
||||
"in favor of synapse.logging.context.current_context().",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return current_context()
|
||||
|
||||
@classmethod
|
||||
def set_current_context(
|
||||
cls, context: LoggingContextOrSentinel
|
||||
) -> LoggingContextOrSentinel:
|
||||
"""Set the current logging context in thread local storage
|
||||
|
||||
This exists for backwards compatibility. ``set_current_context()`` should be
|
||||
called directly.
|
||||
|
||||
Args:
|
||||
context: The context to activate.
|
||||
|
||||
Returns:
|
||||
The context that was previously active
|
||||
"""
|
||||
warnings.warn(
|
||||
"synapse.logging.context.LoggingContext.set_current_context() is deprecated "
|
||||
"in favor of synapse.logging.context.set_current_context().",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return set_current_context(context)
|
||||
|
||||
def __enter__(self) -> "LoggingContext":
|
||||
"""Enters this logging context into thread local storage"""
|
||||
logcontext_debug_logger.debug("LoggingContext(%s).__enter__", self.name)
|
||||
old_context = set_current_context(self)
|
||||
if self.previous_context != old_context:
|
||||
logcontext_error(
|
||||
@ -412,6 +402,9 @@ class LoggingContext:
|
||||
Returns:
|
||||
None to avoid suppressing any exceptions that were thrown.
|
||||
"""
|
||||
logcontext_debug_logger.debug(
|
||||
"LoggingContext(%s).__exit__ --> %s", self.name, self.previous_context
|
||||
)
|
||||
current = set_current_context(self.previous_context)
|
||||
if current is not self:
|
||||
if current is SENTINEL_CONTEXT:
|
||||
@ -660,14 +653,21 @@ class PreserveLoggingContext:
|
||||
reactor back to the code).
|
||||
"""
|
||||
|
||||
__slots__ = ["_old_context", "_new_context"]
|
||||
__slots__ = ["_old_context", "_new_context", "_instance_id"]
|
||||
|
||||
def __init__(
|
||||
self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT
|
||||
) -> None:
|
||||
self._new_context = new_context
|
||||
self._instance_id = random_string(5)
|
||||
|
||||
def __enter__(self) -> None:
|
||||
logcontext_debug_logger.debug(
|
||||
"PreserveLoggingContext(%s).__enter__ %s --> %s",
|
||||
self._instance_id,
|
||||
current_context(),
|
||||
self._new_context,
|
||||
)
|
||||
self._old_context = set_current_context(self._new_context)
|
||||
|
||||
def __exit__(
|
||||
@ -676,6 +676,12 @@ class PreserveLoggingContext:
|
||||
value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
logcontext_debug_logger.debug(
|
||||
"PreserveLoggingContext(%s).__exit %s --> %s",
|
||||
self._instance_id,
|
||||
current_context(),
|
||||
self._old_context,
|
||||
)
|
||||
context = set_current_context(self._old_context)
|
||||
|
||||
if context != self._new_context:
|
||||
@ -855,7 +861,11 @@ def run_in_background(
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
instance_id = random_string(5)
|
||||
calling_context = current_context()
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): called with logcontext=%s", instance_id, calling_context
|
||||
)
|
||||
try:
|
||||
# (kick off the task in the current context)
|
||||
res = f(*args, **kwargs)
|
||||
@ -897,6 +907,11 @@ def run_in_background(
|
||||
# to reset the logcontext to the sentinel logcontext as that would run
|
||||
# immediately (remember our goal is to maintain the calling logcontext when we
|
||||
# return).
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): deferred already completed and the function should have maintained the logcontext %s",
|
||||
instance_id,
|
||||
calling_context,
|
||||
)
|
||||
return d
|
||||
|
||||
# Since the function we called may follow the Synapse logcontext rules (Rules for
|
||||
@ -907,6 +922,11 @@ def run_in_background(
|
||||
#
|
||||
# Our goal is to have the caller logcontext unchanged after firing off the
|
||||
# background task and returning.
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): restoring calling logcontext %s",
|
||||
instance_id,
|
||||
calling_context,
|
||||
)
|
||||
set_current_context(calling_context)
|
||||
|
||||
# If the function we called is playing nice and following the Synapse logcontext
|
||||
@ -922,7 +942,23 @@ def run_in_background(
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
|
||||
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
|
||||
|
||||
def _log_set_context_cb(
|
||||
result: ResultT, context: LoggingContextOrSentinel
|
||||
) -> ResultT:
|
||||
logcontext_debug_logger.debug(
|
||||
"run_in_background(%s): resetting logcontext to %s",
|
||||
instance_id,
|
||||
context,
|
||||
)
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
d.addBoth(_log_set_context_cb, SENTINEL_CONTEXT)
|
||||
else:
|
||||
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
|
||||
|
||||
return d
|
||||
|
||||
|
||||
@ -978,10 +1014,21 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
|
||||
restores the old context once the awaitable completes (execution passes from the
|
||||
reactor back to the code).
|
||||
"""
|
||||
instance_id = random_string(5)
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): called with logcontext=%s",
|
||||
instance_id,
|
||||
current_context(),
|
||||
)
|
||||
|
||||
# The deferred has already completed
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): deferred already completed and the function should have maintained the logcontext",
|
||||
instance_id,
|
||||
)
|
||||
return deferred
|
||||
|
||||
# Our goal is to have the caller logcontext unchanged after they yield/await the
|
||||
@ -993,8 +1040,31 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
|
||||
# does) while the deferred runs in the reactor event loop, we reset the logcontext
|
||||
# and add a callback to the deferred to restore it so the caller's logcontext is
|
||||
# active when the deferred completes.
|
||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): resetting logcontext to %s",
|
||||
instance_id,
|
||||
SENTINEL_CONTEXT,
|
||||
)
|
||||
calling_context = set_current_context(SENTINEL_CONTEXT)
|
||||
|
||||
if logcontext_debug_logger.isEnabledFor(logging.DEBUG):
|
||||
|
||||
def _log_set_context_cb(
|
||||
result: ResultT, context: LoggingContextOrSentinel
|
||||
) -> ResultT:
|
||||
logcontext_debug_logger.debug(
|
||||
"make_deferred_yieldable(%s): restoring calling logcontext to %s",
|
||||
instance_id,
|
||||
context,
|
||||
)
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
deferred.addBoth(_log_set_context_cb, calling_context)
|
||||
else:
|
||||
deferred.addBoth(_set_context_cb, calling_context)
|
||||
|
||||
return deferred
|
||||
|
||||
|
||||
|
@ -251,17 +251,18 @@ class _DummyTagNames:
|
||||
try:
|
||||
import opentracing
|
||||
import opentracing.tags
|
||||
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
|
||||
|
||||
tags = opentracing.tags
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore[assignment]
|
||||
tags = _DummyTagNames # type: ignore[assignment]
|
||||
ContextVarsScopeManager = None # type: ignore
|
||||
try:
|
||||
from jaeger_client import Config as JaegerConfig
|
||||
|
||||
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||
except ImportError:
|
||||
JaegerConfig = None # type: ignore
|
||||
LogContextScopeManager = None # type: ignore
|
||||
|
||||
|
||||
try:
|
||||
@ -483,7 +484,7 @@ def init_tracer(hs: "HomeServer") -> None:
|
||||
config = JaegerConfig(
|
||||
config=jaeger_config,
|
||||
service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
|
||||
scope_manager=ContextVarsScopeManager(),
|
||||
scope_manager=LogContextScopeManager(),
|
||||
metrics_factory=PrometheusMetricsFactory(),
|
||||
)
|
||||
|
||||
@ -683,9 +684,21 @@ def start_active_span_from_edu(
|
||||
|
||||
# Opentracing setters for tags, logs, etc
|
||||
@only_if_tracing
|
||||
def active_span() -> Optional["opentracing.Span"]:
|
||||
"""Get the currently active span, if any"""
|
||||
return opentracing.tracer.active_span
|
||||
def active_span(
|
||||
*,
|
||||
tracer: Optional["opentracing.Tracer"] = None,
|
||||
) -> Optional["opentracing.Span"]:
|
||||
"""
|
||||
Get the currently active span, if any
|
||||
|
||||
Args:
|
||||
tracer: override the opentracing tracer. By default the global tracer is used.
|
||||
"""
|
||||
if tracer is None:
|
||||
# use the global tracer by default
|
||||
tracer = opentracing.tracer
|
||||
|
||||
return tracer.active_span
|
||||
|
||||
|
||||
@ensure_active_span("set a tag")
|
||||
|
161
synapse/logging/scopecontextmanager.py
Normal file
161
synapse/logging/scopecontextmanager.py
Normal file
@ -0,0 +1,161 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from opentracing import Scope, ScopeManager, Span
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
current_context,
|
||||
nested_logging_context,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LogContextScopeManager(ScopeManager):
|
||||
"""
|
||||
The LogContextScopeManager tracks the active scope in opentracing
|
||||
by using the log contexts which are native to synapse. This is so
|
||||
that the basic opentracing api can be used across twisted defereds.
|
||||
|
||||
It would be nice just to use opentracing's ContextVarsScopeManager,
|
||||
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@property
|
||||
def active(self) -> Optional[Scope]:
|
||||
"""
|
||||
Returns the currently active Scope which can be used to access the
|
||||
currently active Scope.span.
|
||||
If there is a non-null Scope, its wrapped Span
|
||||
becomes an implicit parent of any newly-created Span at
|
||||
Tracer.start_active_span() time.
|
||||
|
||||
Return:
|
||||
The Scope that is active, or None if not available.
|
||||
"""
|
||||
ctx = current_context()
|
||||
return ctx.scope
|
||||
|
||||
def activate(self, span: Span, finish_on_close: bool) -> Scope:
|
||||
"""
|
||||
Makes a Span active.
|
||||
Args
|
||||
span: the span that should become active.
|
||||
finish_on_close: whether Span should be automatically finished when
|
||||
Scope.close() is called.
|
||||
|
||||
Returns:
|
||||
Scope to control the end of the active period for
|
||||
*span*. It is a programming error to neglect to call
|
||||
Scope.close() on the returned instance.
|
||||
"""
|
||||
|
||||
ctx = current_context()
|
||||
|
||||
if not ctx:
|
||||
logger.error("Tried to activate scope outside of loggingcontext")
|
||||
return Scope(None, span) # type: ignore[arg-type]
|
||||
|
||||
if ctx.scope is not None:
|
||||
# start a new logging context as a child of the existing one.
|
||||
# Doing so -- rather than updating the existing logcontext -- means that
|
||||
# creating several concurrent spans under the same logcontext works
|
||||
# correctly.
|
||||
ctx = nested_logging_context("")
|
||||
enter_logcontext = True
|
||||
else:
|
||||
# if there is no span currently associated with the current logcontext, we
|
||||
# just store the scope in it.
|
||||
#
|
||||
# This feels a bit dubious, but it does hack around a problem where a
|
||||
# span outlasts its parent logcontext (which would otherwise lead to
|
||||
# "Re-starting finished log context" errors).
|
||||
enter_logcontext = False
|
||||
|
||||
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
|
||||
ctx.scope = scope
|
||||
if enter_logcontext:
|
||||
ctx.__enter__()
|
||||
|
||||
return scope
|
||||
|
||||
|
||||
class _LogContextScope(Scope):
|
||||
"""
|
||||
A custom opentracing scope, associated with a LogContext
|
||||
|
||||
* When the scope is closed, the logcontext's active scope is reset to None.
|
||||
and - if enter_logcontext was set - the logcontext is finished too.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
manager: LogContextScopeManager,
|
||||
span: Span,
|
||||
logcontext: LoggingContext,
|
||||
enter_logcontext: bool,
|
||||
finish_on_close: bool,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
manager:
|
||||
the manager that is responsible for this scope.
|
||||
span:
|
||||
the opentracing span which this scope represents the local
|
||||
lifetime for.
|
||||
logcontext:
|
||||
the log context to which this scope is attached.
|
||||
enter_logcontext:
|
||||
if True the log context will be exited when the scope is finished
|
||||
finish_on_close:
|
||||
if True finish the span when the scope is closed
|
||||
"""
|
||||
super().__init__(manager, span)
|
||||
self.logcontext = logcontext
|
||||
self._finish_on_close = finish_on_close
|
||||
self._enter_logcontext = enter_logcontext
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Scope<{self.span}>"
|
||||
|
||||
def close(self) -> None:
|
||||
active_scope = self.manager.active
|
||||
if active_scope is not self:
|
||||
logger.error(
|
||||
"Closing scope %s which is not the currently-active one %s",
|
||||
self,
|
||||
active_scope,
|
||||
)
|
||||
|
||||
if self._finish_on_close:
|
||||
self.span.finish()
|
||||
|
||||
self.logcontext.scope = None
|
||||
|
||||
if self._enter_logcontext:
|
||||
self.logcontext.__exit__(None, None, None)
|
@ -68,6 +68,11 @@ if TYPE_CHECKING:
|
||||
|
||||
from synapse.server import HomeServer
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore[assignment]
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -225,6 +230,7 @@ def run_as_background_process(
|
||||
func: Callable[..., Awaitable[Optional[R]]],
|
||||
*args: Any,
|
||||
bg_start_span: bool = True,
|
||||
test_only_tracer: Optional["opentracing.Tracer"] = None,
|
||||
**kwargs: Any,
|
||||
) -> "defer.Deferred[Optional[R]]":
|
||||
"""Run the given function in its own logcontext, with resource metrics
|
||||
@ -250,6 +256,8 @@ def run_as_background_process(
|
||||
bg_start_span: Whether to start an opentracing span. Defaults to True.
|
||||
Should only be disabled for processes that will not log to or tag
|
||||
a span.
|
||||
test_only_tracer: Set the OpenTracing tracer to use. This is only useful for
|
||||
tests.
|
||||
args: positional args for func
|
||||
kwargs: keyword args for func
|
||||
|
||||
@ -259,6 +267,12 @@ def run_as_background_process(
|
||||
rules.
|
||||
"""
|
||||
|
||||
# Since we track the tracing scope in the `LoggingContext`, before we move to the
|
||||
# sentinel logcontext (or a new `LoggingContext`), grab the currently active
|
||||
# tracing span (if any) so that we can create a cross-link to the background process
|
||||
# trace.
|
||||
original_active_tracing_span = active_span(tracer=test_only_tracer)
|
||||
|
||||
async def run() -> Optional[R]:
|
||||
with _bg_metrics_lock:
|
||||
count = _background_process_counts.get(desc, 0)
|
||||
@ -276,8 +290,6 @@ def run_as_background_process(
|
||||
) as logging_context:
|
||||
try:
|
||||
if bg_start_span:
|
||||
original_active_tracing_span = active_span()
|
||||
|
||||
# If there is already an active span (e.g. because this background
|
||||
# process was started as part of handling a request for example),
|
||||
# because this is a long-running background task that may serve a
|
||||
@ -308,6 +320,7 @@ def run_as_background_process(
|
||||
# Create a root span for the background process (disconnected
|
||||
# from other spans)
|
||||
ignore_active_span=True,
|
||||
tracer=test_only_tracer,
|
||||
)
|
||||
|
||||
# Also add a span in the original request trace that cross-links
|
||||
@ -324,8 +337,11 @@ def run_as_background_process(
|
||||
f"start_bgproc.{desc}",
|
||||
child_of=original_active_tracing_span,
|
||||
ignore_active_span=True,
|
||||
# Points to the background process span.
|
||||
# Create the `FOLLOWS_FROM` reference to the background
|
||||
# process span so there is a loose coupling between the two
|
||||
# traces and it's easy to jump between.
|
||||
contexts=[root_tracing_scope.span.context],
|
||||
tracer=test_only_tracer,
|
||||
):
|
||||
pass
|
||||
|
||||
@ -341,6 +357,7 @@ def run_as_background_process(
|
||||
# span so there is a loose coupling between the two
|
||||
# traces and it's easy to jump between.
|
||||
contexts=[original_active_tracing_span.context],
|
||||
tracer=test_only_tracer,
|
||||
)
|
||||
|
||||
# For easy usage down below, we create a context manager that
|
||||
@ -359,6 +376,7 @@ def run_as_background_process(
|
||||
tracing_scope = start_active_span(
|
||||
f"bgproc.{desc}",
|
||||
tags={SynapseTags.REQUEST_ID: str(logging_context)},
|
||||
tracer=test_only_tracer,
|
||||
)
|
||||
else:
|
||||
tracing_scope = nullcontext()
|
||||
|
@ -70,6 +70,7 @@ from synapse.rest.admin.registration_tokens import (
|
||||
NewRegistrationTokenRestServlet,
|
||||
RegistrationTokenRestServlet,
|
||||
)
|
||||
from synapse.rest.admin.room_reports import RoomReportsRestServlet
|
||||
from synapse.rest.admin.rooms import (
|
||||
BlockRoomRestServlet,
|
||||
DeleteRoomStatusByDeleteIdRestServlet,
|
||||
@ -303,6 +304,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
LargestRoomsStatistics(hs).register(http_server)
|
||||
EventReportDetailRestServlet(hs).register(http_server)
|
||||
EventReportsRestServlet(hs).register(http_server)
|
||||
RoomReportsRestServlet(hs).register(http_server)
|
||||
AccountDataRestServlet(hs).register(http_server)
|
||||
PushersRestServlet(hs).register(http_server)
|
||||
MakeRoomAdminRestServlet(hs).register(http_server)
|
||||
|
91
synapse/rest/admin/room_reports.py
Normal file
91
synapse/rest/admin/room_reports.py
Normal file
@ -0,0 +1,91 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse.api.constants import Direction
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http.servlet import RestServlet, parse_enum, parse_integer, parse_string
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Based upon EventReportsRestServlet
|
||||
class RoomReportsRestServlet(RestServlet):
|
||||
"""
|
||||
List all reported rooms that are known to the homeserver. Results are returned
|
||||
in a dictionary containing report information. Supports pagination.
|
||||
The requester must have administrator access in Synapse.
|
||||
|
||||
GET /_synapse/admin/v1/room_reports
|
||||
returns:
|
||||
200 OK with list of reports if success otherwise an error.
|
||||
|
||||
Args:
|
||||
The parameters `from` and `limit` are required only for pagination.
|
||||
By default, a `limit` of 100 is used.
|
||||
The parameter `dir` can be used to define the order of results.
|
||||
The `user_id` query parameter filters by the user ID of the reporter of the event.
|
||||
The `room_id` query parameter filters by room id.
|
||||
Returns:
|
||||
A list of reported rooms and an integer representing the total number of
|
||||
reported rooms that exist given this query
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/room_reports$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
self._store = hs.get_datastores().main
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
start = parse_integer(request, "from", default=0)
|
||||
limit = parse_integer(request, "limit", default=100)
|
||||
direction = parse_enum(request, "dir", Direction, Direction.BACKWARDS)
|
||||
user_id = parse_string(request, "user_id")
|
||||
room_id = parse_string(request, "room_id")
|
||||
|
||||
if start < 0:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"The start parameter must be a positive integer.",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
if limit < 0:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"The limit parameter must be a positive integer.",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
room_reports, total = await self._store.get_room_reports_paginate(
|
||||
start, limit, direction, user_id, room_id
|
||||
)
|
||||
ret = {"room_reports": room_reports, "total": total}
|
||||
if (start + limit) < total:
|
||||
ret["next_token"] = start + len(room_reports)
|
||||
|
||||
return HTTPStatus.OK, ret
|
@ -1868,6 +1868,107 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
"get_event_reports_paginate", _get_event_reports_paginate_txn
|
||||
)
|
||||
|
||||
async def get_room_reports_paginate(
|
||||
self,
|
||||
start: int,
|
||||
limit: int,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
user_id: Optional[str] = None,
|
||||
room_id: Optional[str] = None,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
"""Retrieve a paginated list of room reports
|
||||
|
||||
Args:
|
||||
start: room offset to begin the query from
|
||||
limit: number of rows to retrieve
|
||||
direction: Whether to fetch the most recent first (backwards) or the
|
||||
oldest first (forwards)
|
||||
user_id: search for user_id. Ignored if user_id is None
|
||||
room_id: search for room_id. Ignored if room_id is None
|
||||
Returns:
|
||||
Tuple of:
|
||||
json list of room reports
|
||||
total number of room reports matching the filter criteria
|
||||
"""
|
||||
|
||||
def _get_room_reports_paginate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
filters = []
|
||||
args: List[object] = []
|
||||
|
||||
if user_id:
|
||||
filters.append("er.user_id LIKE ?")
|
||||
args.extend(["%" + user_id + "%"])
|
||||
if room_id:
|
||||
filters.append("er.room_id LIKE ?")
|
||||
args.extend(["%" + room_id + "%"])
|
||||
|
||||
if direction == Direction.BACKWARDS:
|
||||
order = "DESC"
|
||||
else:
|
||||
order = "ASC"
|
||||
|
||||
where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
|
||||
|
||||
# We join on room_stats_state despite not using any columns from it
|
||||
# because the join can influence the number of rows returned;
|
||||
# e.g. a room that doesn't have state, maybe because it was deleted.
|
||||
# The query returning the total count should be consistent with
|
||||
# the query returning the results.
|
||||
sql = """
|
||||
SELECT COUNT(*) as total_room_reports
|
||||
FROM room_reports AS rr
|
||||
JOIN room_stats_state ON room_stats_state.room_id = rr.room_id
|
||||
{}
|
||||
""".format(where_clause)
|
||||
txn.execute(sql, args)
|
||||
count = cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
rr.id,
|
||||
rr.received_ts,
|
||||
rr.room_id,
|
||||
rr.user_id,
|
||||
rr.reason,
|
||||
room_stats_state.canonical_alias,
|
||||
room_stats_state.name
|
||||
FROM event_reports AS rr
|
||||
JOIN room_stats_state
|
||||
ON room_stats_state.room_id = rr.room_id
|
||||
{where_clause}
|
||||
ORDER BY rr.received_ts {order}
|
||||
LIMIT ?
|
||||
OFFSET ?
|
||||
""".format(
|
||||
where_clause=where_clause,
|
||||
order=order,
|
||||
)
|
||||
|
||||
args += [limit, start]
|
||||
txn.execute(sql, args)
|
||||
|
||||
room_reports = []
|
||||
for row in txn:
|
||||
room_reports.append(
|
||||
{
|
||||
"id": row[0],
|
||||
"received_ts": row[1],
|
||||
"room_id": row[2],
|
||||
"user_id": row[3],
|
||||
"reason": row[4],
|
||||
"canonical_alias": row[5],
|
||||
"name": row[6],
|
||||
}
|
||||
)
|
||||
|
||||
return room_reports, count
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_room_reports_paginate", _get_room_reports_paginate_txn
|
||||
)
|
||||
|
||||
async def delete_event_report(self, report_id: int) -> bool:
|
||||
"""Remove an event report from database.
|
||||
|
||||
|
@ -17,6 +17,10 @@ from twisted.internet.defer import Deferred
|
||||
from synapse.types import ISynapseReactor
|
||||
|
||||
class HttpClient:
|
||||
"""
|
||||
The returned deferreds follow Synapse logcontext rules.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: ...
|
||||
def get(self, url: str, response_limit: int) -> Deferred[bytes]: ...
|
||||
def post(
|
||||
|
@ -27,8 +27,8 @@ from twisted.python.failure import Failure
|
||||
from synapse.logging.context import (
|
||||
ContextResourceUsage,
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
nested_logging_context,
|
||||
set_current_context,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
@ -422,14 +422,11 @@ class TaskScheduler:
|
||||
"""
|
||||
|
||||
current_time = self._clock.time()
|
||||
calling_context = set_current_context(task_log_context)
|
||||
try:
|
||||
with PreserveLoggingContext(task_log_context):
|
||||
usage = task_log_context.get_resource_usage()
|
||||
TaskScheduler._log_task_usage(
|
||||
"continuing", task, usage, current_time - start_time
|
||||
)
|
||||
finally:
|
||||
set_current_context(calling_context)
|
||||
|
||||
async def wrapper() -> None:
|
||||
with nested_logging_context(task.id) as log_context:
|
||||
|
@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
from typing import Awaitable, Dict, cast
|
||||
from typing import Awaitable, Optional, cast
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.testing import MemoryReactorClock
|
||||
@ -35,20 +35,25 @@ from synapse.logging.opentracing import (
|
||||
tag_args,
|
||||
trace_with_opname,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore
|
||||
ContextVarsScopeManager = None # type: ignore
|
||||
from tests.server import get_clock
|
||||
|
||||
try:
|
||||
import jaeger_client
|
||||
except ImportError:
|
||||
jaeger_client = None # type: ignore
|
||||
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
|
||||
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||
except ImportError:
|
||||
opentracing = None # type: ignore
|
||||
LogContextScopeManager = None # type: ignore
|
||||
|
||||
import logging
|
||||
|
||||
from tests.unittest import TestCase
|
||||
@ -56,7 +61,7 @@ from tests.unittest import TestCase
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TracingScopeTestCase(TestCase):
|
||||
class LogContextScopeManagerTestCase(TestCase):
|
||||
"""
|
||||
Test that our tracing machinery works well in a variety of situations (especially
|
||||
with Twisted's runtime and deferreds).
|
||||
@ -67,7 +72,7 @@ class TracingScopeTestCase(TestCase):
|
||||
opentracing backend is Jaeger.
|
||||
"""
|
||||
|
||||
if opentracing is None:
|
||||
if opentracing is None or LogContextScopeManager is None:
|
||||
skip = "Requires opentracing" # type: ignore[unreachable]
|
||||
if jaeger_client is None:
|
||||
skip = "Requires jaeger_client" # type: ignore[unreachable]
|
||||
@ -77,9 +82,8 @@ class TracingScopeTestCase(TestCase):
|
||||
# global variables that power opentracing. We create our own tracer instance
|
||||
# and test with it.
|
||||
|
||||
scope_manager = ContextVarsScopeManager()
|
||||
config = jaeger_client.config.Config(
|
||||
config={}, service_name="test", scope_manager=scope_manager
|
||||
config={}, service_name="test", scope_manager=LogContextScopeManager()
|
||||
)
|
||||
|
||||
self._reporter = jaeger_client.reporter.InMemoryReporter()
|
||||
@ -220,144 +224,6 @@ class TracingScopeTestCase(TestCase):
|
||||
[scopes[1].span, scopes[2].span, scopes[0].span],
|
||||
)
|
||||
|
||||
def test_run_in_background_active_scope_still_available(self) -> None:
|
||||
"""
|
||||
Test that tasks running via `run_in_background` still have access to the
|
||||
active tracing scope.
|
||||
|
||||
This is a regression test for a previous Synapse issue where the tracing scope
|
||||
would `__exit__` and close before the `run_in_background` task completed and our
|
||||
own previous custom `_LogContextScope.close(...)` would clear
|
||||
`LoggingContext.scope` preventing further tracing spans from having the correct
|
||||
parent.
|
||||
"""
|
||||
reactor = MemoryReactorClock()
|
||||
# type-ignore: mypy-zope doesn't seem to recognise that `MemoryReactorClock`
|
||||
# implements `ISynapseThreadlessReactor` (combination of the normal Twisted
|
||||
# Reactor/Clock interfaces), via inheritance from
|
||||
# `twisted.internet.testing.MemoryReactor` and `twisted.internet.testing.Clock`
|
||||
# Ignore `multiple-internal-clocks` linter error here since we are creating a `Clock`
|
||||
# for testing purposes.
|
||||
clock = Clock( # type: ignore[multiple-internal-clocks]
|
||||
reactor, # type: ignore[arg-type]
|
||||
server_name="test_server",
|
||||
)
|
||||
|
||||
scope_map: Dict[str, opentracing.Scope] = {}
|
||||
|
||||
async def async_task() -> None:
|
||||
root_scope = scope_map["root"]
|
||||
root_context = cast(jaeger_client.SpanContext, root_scope.span.context)
|
||||
|
||||
self.assertEqual(
|
||||
self._tracer.active_span,
|
||||
root_scope.span,
|
||||
"expected to inherit the root tracing scope from where this was run",
|
||||
)
|
||||
|
||||
# Return control back to the reactor thread and wait an arbitrary amount
|
||||
await clock.sleep(4)
|
||||
|
||||
# This is a key part of what we're testing! In a previous version of
|
||||
# Synapse, we would lose the active span at this point.
|
||||
self.assertEqual(
|
||||
self._tracer.active_span,
|
||||
root_scope.span,
|
||||
"expected to still have a root tracing scope/span active",
|
||||
)
|
||||
|
||||
# For complete-ness sake, let's also trace more sub-tasks here and assert
|
||||
# they have the correct span parents as well (root)
|
||||
|
||||
# Start tracing some other sub-task.
|
||||
#
|
||||
# This is a key part of what we're testing! In a previous version of
|
||||
# Synapse, it would have the incorrect span parents.
|
||||
scope = start_active_span(
|
||||
"task1",
|
||||
tracer=self._tracer,
|
||||
)
|
||||
scope_map["task1"] = scope
|
||||
|
||||
# Ensure the span parent is pointing to the root scope
|
||||
context = cast(jaeger_client.SpanContext, scope.span.context)
|
||||
self.assertEqual(
|
||||
context.parent_id,
|
||||
root_context.span_id,
|
||||
"expected task1 parent to be the root span",
|
||||
)
|
||||
|
||||
# Ensure that the active span is our new sub-task now
|
||||
self.assertEqual(self._tracer.active_span, scope.span)
|
||||
# Return control back to the reactor thread and wait an arbitrary amount
|
||||
await clock.sleep(4)
|
||||
# We should still see the active span as the scope wasn't closed yet
|
||||
self.assertEqual(self._tracer.active_span, scope.span)
|
||||
scope.close()
|
||||
|
||||
async def root() -> None:
|
||||
with start_active_span(
|
||||
"root span",
|
||||
tracer=self._tracer,
|
||||
# We will close this off later. We're basically just mimicking the same
|
||||
# pattern for how we handle requests. We pass the span off to the
|
||||
# request for it to finish.
|
||||
finish_on_close=False,
|
||||
) as root_scope:
|
||||
scope_map["root"] = root_scope
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
# Fire-and-forget a task
|
||||
#
|
||||
# XXX: The root scope context manager will `__exit__` before this task
|
||||
# completes.
|
||||
run_in_background(async_task)
|
||||
|
||||
# Because we used `run_in_background`, the active span should still be
|
||||
# the root.
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
# We shouldn't see any active spans outside of the scope
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
with LoggingContext(name="root context", server_name="test_server"):
|
||||
# Start the test off
|
||||
d_root = defer.ensureDeferred(root())
|
||||
|
||||
# Let the tasks complete
|
||||
reactor.pump((2,) * 8)
|
||||
self.successResultOf(d_root)
|
||||
|
||||
# After we see all of the tasks are done (like a request when it
|
||||
# `_finished_processing`), let's finish our root span
|
||||
scope_map["root"].span.finish()
|
||||
|
||||
# Sanity check again: We shouldn't see any active spans leftover in this
|
||||
# this context.
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
# The spans should be reported in order of their finishing: task 1, task 2,
|
||||
# root.
|
||||
#
|
||||
# We use `assertIncludes` just as an easier way to see if items are missing or
|
||||
# added. We assert the order just below
|
||||
self.assertIncludes(
|
||||
set(self._reporter.get_spans()),
|
||||
{
|
||||
scope_map["task1"].span,
|
||||
scope_map["root"].span,
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
# This is where we actually assert the correct order
|
||||
self.assertEqual(
|
||||
self._reporter.get_spans(),
|
||||
[
|
||||
scope_map["task1"].span,
|
||||
scope_map["root"].span,
|
||||
],
|
||||
)
|
||||
|
||||
def test_trace_decorator_sync(self) -> None:
|
||||
"""
|
||||
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
|
||||
@ -455,3 +321,203 @@ class TracingScopeTestCase(TestCase):
|
||||
[span.operation_name for span in self._reporter.get_spans()],
|
||||
["fixture_awaitable_return_func"],
|
||||
)
|
||||
|
||||
async def test_run_as_background_process_standalone(self) -> None:
|
||||
"""
|
||||
Test to make sure that the background process work starts its own trace.
|
||||
"""
|
||||
reactor, clock = get_clock()
|
||||
|
||||
callback_finished = False
|
||||
active_span_in_callback: Optional[jaeger_client.Span] = None
|
||||
|
||||
async def bg_task() -> None:
|
||||
nonlocal callback_finished, active_span_in_callback
|
||||
try:
|
||||
assert isinstance(self._tracer.active_span, jaeger_client.Span)
|
||||
active_span_in_callback = self._tracer.active_span
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
# type-ignore: We ignore because the point is to test the bare function
|
||||
run_as_background_process( # type: ignore[untracked-background-process]
|
||||
desc="some-bg-task",
|
||||
server_name="test_server",
|
||||
func=bg_task,
|
||||
test_only_tracer=self._tracer,
|
||||
)
|
||||
|
||||
# Now wait for the background process to finish
|
||||
while not callback_finished:
|
||||
await clock.sleep(0)
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
active_span_in_callback.operation_name if active_span_in_callback else None,
|
||||
"bgproc.some-bg-task",
|
||||
"expected a new span to be started for the background task",
|
||||
)
|
||||
|
||||
# The spans should be reported in order of their finishing.
|
||||
#
|
||||
# We use `assertIncludes` just as an easier way to see if items are missing or
|
||||
# added. We assert the order just below
|
||||
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
|
||||
expected_spans = ["bgproc.some-bg-task"]
|
||||
self.assertIncludes(
|
||||
set(actual_spans),
|
||||
set(expected_spans),
|
||||
exact=True,
|
||||
)
|
||||
# This is where we actually assert the correct order
|
||||
self.assertEqual(
|
||||
actual_spans,
|
||||
expected_spans,
|
||||
)
|
||||
|
||||
async def test_run_as_background_process_cross_link(self) -> None:
|
||||
"""
|
||||
Test to make sure that the background process work has its own trace and is
|
||||
disconnected from any currently active trace (like a request). But we still have
|
||||
cross-links between the two traces if there was already an active trace/span when
|
||||
we kicked off the background process.
|
||||
"""
|
||||
reactor, clock = get_clock()
|
||||
|
||||
callback_finished = False
|
||||
active_span_in_callback: Optional[jaeger_client.Span] = None
|
||||
|
||||
async def bg_task() -> None:
|
||||
nonlocal callback_finished, active_span_in_callback
|
||||
try:
|
||||
assert isinstance(self._tracer.active_span, jaeger_client.Span)
|
||||
active_span_in_callback = self._tracer.active_span
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext(name="some-request", server_name="test_server"):
|
||||
with start_active_span(
|
||||
"some-request",
|
||||
tracer=self._tracer,
|
||||
):
|
||||
# type-ignore: We ignore because the point is to test the bare function
|
||||
run_as_background_process( # type: ignore[untracked-background-process]
|
||||
desc="some-bg-task",
|
||||
server_name="test_server",
|
||||
func=bg_task,
|
||||
test_only_tracer=self._tracer,
|
||||
)
|
||||
|
||||
# Now wait for the background process to finish
|
||||
while not callback_finished:
|
||||
await clock.sleep(0)
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# We start `bgproc.some-bg-task` and `bgproc_child.some-bg-task` (see
|
||||
# `run_as_background_process` implementation for why). Either is fine but for
|
||||
# now we expect the child as its the innermost one that was started.
|
||||
self.assertEqual(
|
||||
active_span_in_callback.operation_name if active_span_in_callback else None,
|
||||
"bgproc_child.some-bg-task",
|
||||
"expected a new span to be started for the background task",
|
||||
)
|
||||
|
||||
# The spans should be reported in order of their finishing.
|
||||
#
|
||||
# We use `assertIncludes` just as an easier way to see if items are missing or
|
||||
# added. We assert the order just below
|
||||
actual_spans = [span.operation_name for span in self._reporter.get_spans()]
|
||||
expected_spans = [
|
||||
"start_bgproc.some-bg-task",
|
||||
"bgproc_child.some-bg-task",
|
||||
"bgproc.some-bg-task",
|
||||
"some-request",
|
||||
]
|
||||
self.assertIncludes(
|
||||
set(actual_spans),
|
||||
set(expected_spans),
|
||||
exact=True,
|
||||
)
|
||||
# This is where we actually assert the correct order
|
||||
self.assertEqual(
|
||||
actual_spans,
|
||||
expected_spans,
|
||||
)
|
||||
|
||||
span_map = {span.operation_name: span for span in self._reporter.get_spans()}
|
||||
span_id_to_friendly_name = {
|
||||
span.span_id: span.operation_name for span in self._reporter.get_spans()
|
||||
}
|
||||
|
||||
def get_span_friendly_name(span_id: Optional[int]) -> str:
|
||||
if span_id is None:
|
||||
return "None"
|
||||
|
||||
return span_id_to_friendly_name.get(span_id, f"unknown span {span_id}")
|
||||
|
||||
# Ensure the background process trace/span is disconnected from the request
|
||||
# trace/span.
|
||||
self.assertNotEqual(
|
||||
get_span_friendly_name(span_map["bgproc.some-bg-task"].parent_id),
|
||||
get_span_friendly_name(span_map["some-request"].span_id),
|
||||
)
|
||||
|
||||
# We should see a cross-link in the request trace pointing to the background
|
||||
# process trace.
|
||||
#
|
||||
# Make sure `start_bgproc.some-bg-task` is part of the request trace
|
||||
self.assertEqual(
|
||||
get_span_friendly_name(span_map["start_bgproc.some-bg-task"].parent_id),
|
||||
get_span_friendly_name(span_map["some-request"].span_id),
|
||||
)
|
||||
# And has some references to the background process trace
|
||||
self.assertIncludes(
|
||||
{
|
||||
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
|
||||
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
|
||||
else f"{reference.type}:None"
|
||||
for reference in (
|
||||
span_map["start_bgproc.some-bg-task"].references or []
|
||||
)
|
||||
},
|
||||
{
|
||||
f"follows_from:{get_span_friendly_name(span_map['bgproc.some-bg-task'].span_id)}"
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# We should see a cross-link in the background process trace pointing to the
|
||||
# request trace that kicked off the work.
|
||||
#
|
||||
# Make sure `start_bgproc.some-bg-task` is part of the request trace
|
||||
self.assertEqual(
|
||||
get_span_friendly_name(span_map["bgproc_child.some-bg-task"].parent_id),
|
||||
get_span_friendly_name(span_map["bgproc.some-bg-task"].span_id),
|
||||
)
|
||||
# And has some references to the background process trace
|
||||
self.assertIncludes(
|
||||
{
|
||||
f"{reference.type}:{get_span_friendly_name(reference.referenced_context.span_id)}"
|
||||
if isinstance(reference.referenced_context, jaeger_client.SpanContext)
|
||||
else f"{reference.type}:None"
|
||||
for reference in (
|
||||
span_map["bgproc_child.some-bg-task"].references or []
|
||||
)
|
||||
},
|
||||
{
|
||||
f"follows_from:{get_span_friendly_name(span_map['some-request'].span_id)}"
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
417
tests/rest/admin/test_room_reports.py
Normal file
417
tests/rest/admin/test_room_reports.py
Normal file
@ -0,0 +1,417 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
from typing import List
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.errors import Codes
|
||||
from synapse.rest.client import login, reporting, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
# Based upon EventReportsTestCase
|
||||
class RoomReportsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
reporting.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
self.other_user = self.register_user("user", "pass")
|
||||
self.other_user_tok = self.login("user", "pass")
|
||||
|
||||
self.room_id1 = self.helper.create_room_as(
|
||||
self.other_user, tok=self.other_user_tok, is_public=True
|
||||
)
|
||||
self.helper.join(self.room_id1, user=self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
self.room_id2 = self.helper.create_room_as(
|
||||
self.other_user, tok=self.other_user_tok, is_public=True
|
||||
)
|
||||
self.helper.join(self.room_id2, user=self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
# Every user reports both rooms
|
||||
self._report_room(self.room_id1, self.other_user_tok)
|
||||
self._report_room(self.room_id2, self.other_user_tok)
|
||||
self._report_room_without_parameters(self.room_id1, self.admin_user_tok)
|
||||
self._report_room_without_parameters(self.room_id2, self.admin_user_tok)
|
||||
|
||||
self.url = "/_synapse/admin/v1/room_reports"
|
||||
|
||||
def test_no_auth(self) -> None:
|
||||
"""
|
||||
Try to get an event report without authentication.
|
||||
"""
|
||||
channel = self.make_request("GET", self.url, {})
|
||||
|
||||
self.assertEqual(401, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
||||
|
||||
def test_requester_is_no_admin(self) -> None:
|
||||
"""
|
||||
If the user is not a server admin, an error 403 is returned.
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url,
|
||||
access_token=self.other_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(403, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
||||
|
||||
def test_default_success(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 4)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
def test_limit(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms with limit
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?limit=2",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 2)
|
||||
self.assertEqual(channel.json_body["next_token"], 2)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
def test_from(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms with a defined starting point (from)
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?from=2",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 2)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
def test_limit_and_from(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms with a defined starting point and limit
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?from=2&limit=1",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(channel.json_body["next_token"], 2)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 1)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
def test_filter_room(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms with a filter of room
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?room_id=%s" % self.room_id1,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 2)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 2)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
for report in channel.json_body["room_reports"]:
|
||||
self.assertEqual(report["room_id"], self.room_id1)
|
||||
|
||||
def test_filter_user(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms with a filter of user
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?user_id=%s" % self.other_user,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 2)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 2)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
for report in channel.json_body["room_reports"]:
|
||||
self.assertEqual(report["user_id"], self.other_user)
|
||||
|
||||
def test_filter_user_and_room(self) -> None:
|
||||
"""
|
||||
Testing list of reported rooms with a filter of user and room
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?user_id=%s&room_id=%s" % (self.other_user, self.room_id1),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 1)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 1)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
self._check_fields(channel.json_body["room_reports"])
|
||||
|
||||
for report in channel.json_body["room_reports"]:
|
||||
self.assertEqual(report["user_id"], self.other_user)
|
||||
self.assertEqual(report["room_id"], self.room_id1)
|
||||
|
||||
def test_valid_search_order(self) -> None:
|
||||
"""
|
||||
Testing search order. Order by timestamps.
|
||||
"""
|
||||
|
||||
# fetch the most recent first, largest timestamp
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?dir=b",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 4)
|
||||
report = 1
|
||||
while report < len(channel.json_body["room_reports"]):
|
||||
self.assertGreaterEqual(
|
||||
channel.json_body["room_reports"][report - 1]["received_ts"],
|
||||
channel.json_body["room_reports"][report]["received_ts"],
|
||||
)
|
||||
report += 1
|
||||
|
||||
# fetch the oldest first, smallest timestamp
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?dir=f",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 4)
|
||||
report = 1
|
||||
while report < len(channel.json_body["room_reports"]):
|
||||
self.assertLessEqual(
|
||||
channel.json_body["room_reports"][report - 1]["received_ts"],
|
||||
channel.json_body["room_reports"][report]["received_ts"],
|
||||
)
|
||||
report += 1
|
||||
|
||||
def test_invalid_search_order(self) -> None:
|
||||
"""
|
||||
Testing that a invalid search order returns a 400
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?dir=bar",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(400, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
||||
self.assertEqual(
|
||||
"Query parameter 'dir' must be one of ['b', 'f']",
|
||||
channel.json_body["error"],
|
||||
)
|
||||
|
||||
def test_limit_is_negative(self) -> None:
|
||||
"""
|
||||
Testing that a negative limit parameter returns a 400
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?limit=-5",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(400, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
||||
|
||||
def test_from_is_negative(self) -> None:
|
||||
"""
|
||||
Testing that a negative from parameter returns a 400
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?from=-5",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(400, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
|
||||
|
||||
def test_next_token(self) -> None:
|
||||
"""
|
||||
Testing that `next_token` appears at the right place
|
||||
"""
|
||||
|
||||
# `next_token` does not appear
|
||||
# Number of results is the number of entries
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?limit=4",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 2)
|
||||
self.assertNotIn("room_reports", channel.json_body)
|
||||
|
||||
# `next_token` does not appear
|
||||
# Number of max results is larger than the number of entries
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?limit=5",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 4)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
|
||||
# `next_token` does appear
|
||||
# Number of max results is smaller than the number of entries
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?limit=3",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 3)
|
||||
self.assertEqual(channel.json_body["next_token"], 3)
|
||||
|
||||
# Check
|
||||
# Set `from` to value of `next_token` for request remaining entries
|
||||
# `next_token` does not appear
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url + "?from=3",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(channel.json_body["total"], 4)
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 1)
|
||||
self.assertNotIn("next_token", channel.json_body)
|
||||
|
||||
def _report_room(self, room_id: str, user_tok: str) -> None:
|
||||
"""Report a room"""
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{room_id}/report",
|
||||
{"reason": "this makes me sad"},
|
||||
access_token=user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
def _report_room_without_parameters(self, room_id: str, user_tok: str) -> None:
|
||||
"""Report a room, but omit reason"""
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{room_id}/report",
|
||||
{},
|
||||
access_token=user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
def _check_fields(self, content: List[JsonDict]) -> None:
|
||||
"""Checks that all attributes are present in a room report"""
|
||||
for c in content:
|
||||
self.assertIn("id", c)
|
||||
self.assertIn("received_ts", c)
|
||||
self.assertIn("room_id", c)
|
||||
self.assertIn("user_id", c)
|
||||
self.assertIn("canonical_alias", c)
|
||||
self.assertIn("name", c)
|
||||
self.assertIn("reason", c)
|
||||
|
||||
self.assertEqual(len(c.keys()), 7)
|
||||
|
||||
def test_count_correct_despite_table_deletions(self) -> None:
|
||||
"""
|
||||
Tests that the count matches the number of rows, even if rows in joined tables
|
||||
are missing.
|
||||
"""
|
||||
|
||||
# Delete rows from room_stats_state for one of our rooms.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.db_pool.simple_delete(
|
||||
"room_stats_state", {"room_id": self.room_id1}, desc="_"
|
||||
)
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
# The 'total' field is 10 because only 10 reports will actually
|
||||
# be retrievable since we deleted the rows in the room_stats_state
|
||||
# table.
|
||||
self.assertEqual(channel.json_body["total"], 2)
|
||||
# This is consistent with the number of rows actually returned.
|
||||
self.assertEqual(len(channel.json_body["room_reports"]), 2)
|
11
tests/synapse_rust/__init__.py
Normal file
11
tests/synapse_rust/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
225
tests/synapse_rust/test_http_client.py
Normal file
225
tests/synapse_rust/test_http_client.py
Normal file
@ -0,0 +1,225 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from typing import Any, Coroutine, Generator, TypeVar, Union
|
||||
|
||||
from twisted.internet.defer import Deferred, ensureDeferred
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
_Sentinel,
|
||||
current_context,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.synapse_rust.http_client import HttpClient
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.json import json_decoder
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class StubRequestHandler(BaseHTTPRequestHandler):
|
||||
server: "StubServer"
|
||||
|
||||
def do_GET(self) -> None:
|
||||
self.server.calls += 1
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps({"ok": True}).encode("utf-8"))
|
||||
|
||||
def log_message(self, format: str, *args: Any) -> None:
|
||||
# Don't log anything; by default, the server logs to stderr
|
||||
pass
|
||||
|
||||
|
||||
class StubServer(HTTPServer):
|
||||
"""A stub HTTP server that we can send requests to for testing.
|
||||
|
||||
This opens a real HTTP server on a random port, on a separate thread.
|
||||
"""
|
||||
|
||||
calls: int = 0
|
||||
"""How many times has the endpoint been requested."""
|
||||
|
||||
_thread: threading.Thread
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(("127.0.0.1", 0), StubRequestHandler)
|
||||
|
||||
self._thread = threading.Thread(
|
||||
target=self.serve_forever,
|
||||
name="StubServer",
|
||||
kwargs={"poll_interval": 0.01},
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
super().shutdown()
|
||||
self._thread.join()
|
||||
|
||||
@property
|
||||
def endpoint(self) -> str:
|
||||
return f"http://127.0.0.1:{self.server_port}/"
|
||||
|
||||
|
||||
class HttpClientTestCase(HomeserverTestCase):
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
hs = self.setup_test_homeserver()
|
||||
|
||||
# XXX: We must create the Rust HTTP client before we call `reactor.run()` below.
|
||||
# Twisted's `MemoryReactor` doesn't invoke `callWhenRunning` callbacks if it's
|
||||
# already running and we rely on that to start the Tokio thread pool in Rust. In
|
||||
# the future, this may not matter, see https://github.com/twisted/twisted/pull/12514
|
||||
self._http_client = hs.get_proxied_http_client()
|
||||
self._rust_http_client = HttpClient(
|
||||
reactor=hs.get_reactor(),
|
||||
user_agent=self._http_client.user_agent.decode("utf8"),
|
||||
)
|
||||
|
||||
# This triggers the server startup hooks, which starts the Tokio thread pool
|
||||
reactor.run()
|
||||
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.server = StubServer()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
# MemoryReactor doesn't trigger the shutdown phases, and we want the
|
||||
# Tokio thread pool to be stopped
|
||||
# XXX: This logic should probably get moved somewhere else
|
||||
shutdown_triggers = self.reactor.triggers.get("shutdown", {})
|
||||
for phase in ["before", "during", "after"]:
|
||||
triggers = shutdown_triggers.get(phase, [])
|
||||
for callbable, args, kwargs in triggers:
|
||||
callbable(*args, **kwargs)
|
||||
|
||||
def till_deferred_has_result(
|
||||
self,
|
||||
awaitable: Union[
|
||||
"Coroutine[Deferred[Any], Any, T]",
|
||||
"Generator[Deferred[Any], Any, T]",
|
||||
"Deferred[T]",
|
||||
],
|
||||
) -> "Deferred[T]":
|
||||
"""Wait until a deferred has a result.
|
||||
|
||||
This is useful because the Rust HTTP client will resolve the deferred
|
||||
using reactor.callFromThread, which are only run when we call
|
||||
reactor.advance.
|
||||
"""
|
||||
deferred = ensureDeferred(awaitable)
|
||||
tries = 0
|
||||
while not deferred.called:
|
||||
time.sleep(0.1)
|
||||
self.reactor.advance(0)
|
||||
tries += 1
|
||||
if tries > 100:
|
||||
raise Exception("Timed out waiting for deferred to resolve")
|
||||
|
||||
return deferred
|
||||
|
||||
def _check_current_logcontext(self, expected_logcontext_string: str) -> None:
|
||||
context = current_context()
|
||||
assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), (
|
||||
f"Expected LoggingContext({expected_logcontext_string}) but saw {context}"
|
||||
)
|
||||
self.assertEqual(
|
||||
str(context),
|
||||
expected_logcontext_string,
|
||||
f"Expected LoggingContext({expected_logcontext_string}) but saw {context}",
|
||||
)
|
||||
|
||||
def test_request_response(self) -> None:
|
||||
"""
|
||||
Test to make sure we can make a basic request and get the expected
|
||||
response.
|
||||
"""
|
||||
|
||||
async def do_request() -> None:
|
||||
resp_body = await self._rust_http_client.get(
|
||||
url=self.server.endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
)
|
||||
raw_response = json_decoder.decode(resp_body.decode("utf-8"))
|
||||
self.assertEqual(raw_response, {"ok": True})
|
||||
|
||||
self.get_success(self.till_deferred_has_result(do_request()))
|
||||
self.assertEqual(self.server.calls, 1)
|
||||
|
||||
async def test_logging_context(self) -> None:
|
||||
"""
|
||||
Test to make sure the `LoggingContext` (logcontext) is handled correctly
|
||||
when making requests.
|
||||
"""
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_current_logcontext("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def do_request() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# Should have the same logcontext as the caller
|
||||
self._check_current_logcontext("foo")
|
||||
|
||||
with LoggingContext(name="competing", server_name="test_server"):
|
||||
# Make the actual request
|
||||
await self._rust_http_client.get(
|
||||
url=self.server.endpoint,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
)
|
||||
self._check_current_logcontext("competing")
|
||||
|
||||
# Back to the caller's context outside of the `LoggingContext` block
|
||||
self._check_current_logcontext("foo")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext(name="foo", server_name="test_server"):
|
||||
# Fire off the function, but don't wait on it.
|
||||
run_in_background(do_request)
|
||||
|
||||
# Now wait for the function under test to have run
|
||||
with PreserveLoggingContext():
|
||||
while not callback_finished:
|
||||
# await self.hs.get_clock().sleep(0)
|
||||
time.sleep(0.1)
|
||||
self.reactor.advance(0)
|
||||
|
||||
# check that the logcontext is left in a sane state.
|
||||
self._check_current_logcontext("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_current_logcontext("sentinel")
|
Loading…
x
Reference in New Issue
Block a user