Add log to determine whether clients are using /messages as expected (#19226)

Spawning from wanting some better homeserver logs to debug
https://github.com/element-hq/synapse/issues/19153. We can check whether
we are returning a `/messages` response with an `end` pagination token
and then check to see whether the client is making another `/messages`
request with that token.

Although clients should also have similar logs and debugging
capabilities to determine this info as well. This just makes it easier
for us when someone creates an issue claiming backend issue and we can
ask them for homeserver logs.
This commit is contained in:
Eric Eastwood 2025-12-01 17:10:22 -06:00 committed by GitHub
parent 08e1b63b30
commit 88310fe7ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 210 additions and 42 deletions

1
changelog.d/19226.misc Normal file
View File

@ -0,0 +1 @@
Add log to determine whether clients are using `/messages` as expected.

View File

@ -21,22 +21,25 @@
import logging import logging
from typing import TYPE_CHECKING, cast from typing import TYPE_CHECKING, cast
import attr
from twisted.python.failure import Failure from twisted.python.failure import Failure
from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.rest.admin._base import assert_user_is_admin from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import ( from synapse.types import (
JsonDict,
JsonMapping, JsonMapping,
Requester, Requester,
ScheduledTask, ScheduledTask,
StreamKeyType, StreamKeyType,
StreamToken,
TaskStatus, TaskStatus,
) )
from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
@ -70,6 +73,58 @@ PURGE_ROOM_ACTION_NAME = "purge_room"
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room" SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class GetMessagesResult:
"""
Everything needed to serialize a `/messages` response.
"""
messages_chunk: list[EventBase]
"""
A list of room events.
- When the request is `Direction.FORWARDS`, events will be in the range:
`start_token` < x <= `end_token`, (ascending topological_order)
- When the request is `Direction.BACKWARDS`, events will be in the range:
`start_token` >= x > `end_token`, (descending topological_order)
Note that an empty chunk does not necessarily imply that no more events are
available. Clients should continue to paginate until no `end_token` property is returned.
"""
bundled_aggregations: dict[str, BundledAggregations]
"""
A map of event ID to the bundled aggregations for the events in the chunk.
If an event doesn't have any bundled aggregations, it may not appear in the map.
"""
state: list[EventBase] | None
"""
A list of state events relevant to showing the chunk. For example, if
lazy_load_members is enabled in the filter then this may contain the membership
events for the senders of events in the chunk.
Omitted from the response when `None`.
"""
start_token: StreamToken
"""
Token corresponding to the start of chunk. This will be the same as the value given
in `from` query parameter of the `/messages` request.
"""
end_token: StreamToken | None
"""
A token corresponding to the end of chunk. This token can be passed back to this
endpoint to request further events.
If no further events are available (either because we have reached the start of the
timeline, or because the user does not have permission to see any more events), this
property is omitted from the response.
"""
class PaginationHandler: class PaginationHandler:
"""Handles pagination and purge history requests. """Handles pagination and purge history requests.
@ -418,7 +473,7 @@ class PaginationHandler:
as_client_event: bool = True, as_client_event: bool = True,
event_filter: Filter | None = None, event_filter: Filter | None = None,
use_admin_priviledge: bool = False, use_admin_priviledge: bool = False,
) -> JsonDict: ) -> GetMessagesResult:
"""Get messages in a room. """Get messages in a room.
Args: Args:
@ -617,10 +672,13 @@ class PaginationHandler:
# In that case we do not return end, to tell the client # In that case we do not return end, to tell the client
# there is no need for further queries. # there is no need for further queries.
if not events: if not events:
return { return GetMessagesResult(
"chunk": [], messages_chunk=[],
"start": await from_token.to_string(self.store), bundled_aggregations={},
} state=None,
start_token=from_token,
end_token=None,
)
if event_filter: if event_filter:
events = await event_filter.filter(events) events = await event_filter.filter(events)
@ -636,11 +694,13 @@ class PaginationHandler:
# if after the filter applied there are no more events # if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch # return immediately - but there might be more in next_token batch
if not events: if not events:
return { return GetMessagesResult(
"chunk": [], messages_chunk=[],
"start": await from_token.to_string(self.store), bundled_aggregations={},
"end": await next_token.to_string(self.store), state=None,
} start_token=from_token,
end_token=next_token,
)
state = None state = None
if event_filter and event_filter.lazy_load_members and len(events) > 0: if event_filter and event_filter.lazy_load_members and len(events) > 0:
@ -657,38 +717,20 @@ class PaginationHandler:
if state_ids: if state_ids:
state_dict = await self.store.get_events(list(state_ids.values())) state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values() state = list(state_dict.values())
aggregations = await self._relations_handler.get_bundled_aggregations( aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id events, user_id
) )
time_now = self.clock.time_msec() return GetMessagesResult(
messages_chunk=events,
serialize_options = SerializeEventConfig( bundled_aggregations=aggregations,
as_client_event=as_client_event, requester=requester state=state,
start_token=from_token,
end_token=next_token,
) )
chunk = {
"chunk": (
await self._event_serializer.serialize_events(
events,
time_now,
config=serialize_options,
bundle_aggregations=aggregations,
)
),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}
if state:
chunk["state"] = await self._event_serializer.serialize_events(
state, time_now, config=serialize_options
)
return chunk
async def _shutdown_and_purge_room( async def _shutdown_and_purge_room(
self, self,
task: ScheduledTask, task: ScheduledTask,

View File

@ -28,9 +28,13 @@ from immutabledict import immutabledict
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events.utils import (
SerializeEventConfig,
)
from synapse.handlers.pagination import ( from synapse.handlers.pagination import (
PURGE_ROOM_ACTION_NAME, PURGE_ROOM_ACTION_NAME,
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
GetMessagesResult,
) )
from synapse.http.servlet import ( from synapse.http.servlet import (
ResolveRoomIdMixin, ResolveRoomIdMixin,
@ -44,11 +48,13 @@ from synapse.http.servlet import (
parse_string, parse_string,
) )
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace
from synapse.rest.admin._base import ( from synapse.rest.admin._base import (
admin_patterns, admin_patterns,
assert_requester_is_admin, assert_requester_is_admin,
assert_user_is_admin, assert_user_is_admin,
) )
from synapse.rest.client.room import SerializeMessagesDeps, encode_messages_response
from synapse.storage.databases.main.room import RoomSortOrder from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester
@ -976,6 +982,7 @@ class RoomMessagesRestServlet(RestServlet):
self._pagination_handler = hs.get_pagination_handler() self._pagination_handler = hs.get_pagination_handler()
self._auth = hs.get_auth() self._auth = hs.get_auth()
self._store = hs.get_datastores().main self._store = hs.get_datastores().main
self._event_serializer = hs.get_event_client_serializer()
async def on_GET( async def on_GET(
self, request: SynapseRequest, room_id: str self, request: SynapseRequest, room_id: str
@ -999,7 +1006,11 @@ class RoomMessagesRestServlet(RestServlet):
): ):
as_client_event = False as_client_event = False
msgs = await self._pagination_handler.get_messages( serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
)
get_messages_result = await self._pagination_handler.get_messages(
room_id=room_id, room_id=room_id,
requester=requester, requester=requester,
pagin_config=pagination_config, pagin_config=pagination_config,
@ -1008,7 +1019,27 @@ class RoomMessagesRestServlet(RestServlet):
use_admin_priviledge=True, use_admin_priviledge=True,
) )
return HTTPStatus.OK, msgs response_content = await self.encode_response(
get_messages_result, serialize_options
)
return HTTPStatus.OK, response_content
@trace
async def encode_response(
self,
get_messages_result: GetMessagesResult,
serialize_options: SerializeEventConfig,
) -> JsonDict:
return await encode_messages_response(
get_messages_result=get_messages_result,
serialize_options=serialize_options,
serialize_deps=SerializeMessagesDeps(
clock=self._clock,
event_serializer=self._event_serializer,
store=self._store,
),
)
class RoomTimestampToEventRestServlet(RestServlet): class RoomTimestampToEventRestServlet(RestServlet):

View File

@ -28,6 +28,7 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, Awaitable from typing import TYPE_CHECKING, Awaitable
from urllib import parse as urlparse from urllib import parse as urlparse
import attr
from prometheus_client.core import Histogram from prometheus_client.core import Histogram
from twisted.web.server import Request from twisted.web.server import Request
@ -45,10 +46,12 @@ from synapse.api.errors import (
) )
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events.utils import ( from synapse.events.utils import (
EventClientSerializer,
SerializeEventConfig, SerializeEventConfig,
format_event_for_client_v2, format_event_for_client_v2,
serialize_event, serialize_event,
) )
from synapse.handlers.pagination import GetMessagesResult
from synapse.http.server import HttpServer from synapse.http.server import HttpServer
from synapse.http.servlet import ( from synapse.http.servlet import (
ResolveRoomIdMixin, ResolveRoomIdMixin,
@ -64,15 +67,17 @@ from synapse.http.servlet import (
) )
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag from synapse.logging.opentracing import set_tag, trace
from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics import SERVER_NAME_LABEL
from synapse.rest.client._base import client_patterns from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache from synapse.rest.client.transactions import HttpTransactionCache
from synapse.state import CREATE_KEY, POWER_KEY from synapse.state import CREATE_KEY, POWER_KEY
from synapse.storage.databases.main import DataStore
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util.cancellation import cancellable from synapse.util.cancellation import cancellable
from synapse.util.clock import Clock
from synapse.util.events import generate_fake_event_id from synapse.util.events import generate_fake_event_id
from synapse.util.stringutils import parse_and_validate_server_name from synapse.util.stringutils import parse_and_validate_server_name
@ -790,6 +795,56 @@ class JoinedRoomMemberListRestServlet(RestServlet):
return 200, {"joined": users_with_profile} return 200, {"joined": users_with_profile}
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SerializeMessagesDeps:
clock: Clock
event_serializer: EventClientSerializer
store: DataStore
@trace
async def encode_messages_response(
*,
get_messages_result: GetMessagesResult,
serialize_options: SerializeEventConfig,
serialize_deps: SerializeMessagesDeps,
) -> JsonDict:
"""
Serialize a `GetMessagesResult` into the JSON response format for the `/messages`
endpoint.
This logic is shared between the client API and Synapse admin API.
"""
time_now = serialize_deps.clock.time_msec()
serialized_result = {
"chunk": (
await serialize_deps.event_serializer.serialize_events(
get_messages_result.messages_chunk,
time_now,
config=serialize_options,
bundle_aggregations=get_messages_result.bundled_aggregations,
)
),
"start": await get_messages_result.start_token.to_string(serialize_deps.store),
}
if get_messages_result.end_token is not None:
serialized_result["end"] = await get_messages_result.end_token.to_string(
serialize_deps.store
)
if get_messages_result.state is not None:
serialized_result[
"state"
] = await serialize_deps.event_serializer.serialize_events(
get_messages_result.state, time_now, config=serialize_options
)
return serialized_result
# TODO: Needs better unit testing # TODO: Needs better unit testing
class RoomMessageListRestServlet(RestServlet): class RoomMessageListRestServlet(RestServlet):
PATTERNS = client_patterns("/rooms/(?P<room_id>[^/]*)/messages$", v1=True) PATTERNS = client_patterns("/rooms/(?P<room_id>[^/]*)/messages$", v1=True)
@ -806,6 +861,7 @@ class RoomMessageListRestServlet(RestServlet):
self.pagination_handler = hs.get_pagination_handler() self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.event_serializer = hs.get_event_client_serializer()
async def on_GET( async def on_GET(
self, request: SynapseRequest, room_id: str self, request: SynapseRequest, room_id: str
@ -839,7 +895,11 @@ class RoomMessageListRestServlet(RestServlet):
): ):
as_client_event = False as_client_event = False
msgs = await self.pagination_handler.get_messages( serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
)
get_messages_result = await self.pagination_handler.get_messages(
room_id=room_id, room_id=room_id,
requester=requester, requester=requester,
pagin_config=pagination_config, pagin_config=pagination_config,
@ -847,6 +907,24 @@ class RoomMessageListRestServlet(RestServlet):
event_filter=event_filter, event_filter=event_filter,
) )
# Useful for debugging timeline/pagination issues. For example, if a client
# isn't seeing the full history, we can check the homeserver logs to see if the
# client just never made the next request with the given `end` token.
logger.info(
"Responding to `/messages` request: {%s} %s %s -> %d messages with end_token=%s",
requester.user.to_string(),
request.get_method(),
request.get_redacted_uri(),
len(get_messages_result.messages_chunk),
(await get_messages_result.end_token.to_string(self.store))
if get_messages_result.end_token
else None,
)
response_content = await self.encode_response(
get_messages_result, serialize_options
)
processing_end_time = self.clock.time_msec() processing_end_time = self.clock.time_msec()
room_member_count = await make_deferred_yieldable(room_member_count_deferred) room_member_count = await make_deferred_yieldable(room_member_count_deferred)
messsages_response_timer.labels( messsages_response_timer.labels(
@ -854,7 +932,23 @@ class RoomMessageListRestServlet(RestServlet):
**{SERVER_NAME_LABEL: self.server_name}, **{SERVER_NAME_LABEL: self.server_name},
).observe((processing_end_time - processing_start_time) / 1000) ).observe((processing_end_time - processing_start_time) / 1000)
return 200, msgs return 200, response_content
@trace
async def encode_response(
self,
get_messages_result: GetMessagesResult,
serialize_options: SerializeEventConfig,
) -> JsonDict:
return await encode_messages_response(
get_messages_result=get_messages_result,
serialize_options=serialize_options,
serialize_deps=SerializeMessagesDeps(
clock=self.clock,
event_serializer=self.event_serializer,
store=self.store,
),
)
# TODO: Needs unit testing # TODO: Needs unit testing