Compare commits

...

12 Commits

Author SHA1 Message Date
Eric Eastwood
30b91fdd86
Merge e0f8992ee34562d41b6b78991132bd4fe678a30f into 24bcdb3f3c5aa2d8d2000dc34d54a8002a914616 2025-07-01 10:52:00 -05:00
Eric Eastwood
e0f8992ee3 Fix failing to save metrics because of incorrect label names
We would see this in the logs before:
```
Failed to save metrics! Usage: <ContextResourceUsage ...> Error: Incorrect label names
```
2025-06-26 16:43:13 -05:00
Eric Eastwood
06f9af155b Add introduction comment 2025-06-26 16:19:35 -05:00
Eric Eastwood
5ad555cefc Add docstrings for block metrics 2025-06-26 16:18:37 -05:00
Eric Eastwood
652c34bda6 Better docstrings for _InFlightMetric -> _BlockInFlightMetric 2025-06-26 16:16:37 -05:00
Eric Eastwood
521c68cafe Add changelog 2025-06-26 16:13:39 -05:00
Eric Eastwood
c232ec7b3b Fix mypy complaining about unknown types by changing property order around
Fix mypy complaints

```
synapse/handlers/delayed_events.py:266: error: Cannot determine type of "validator"  [has-type]
synapse/handlers/delayed_events.py:267: error: Cannot determine type of "event_builder_factory"  [has-type]
```
2025-06-26 16:09:34 -05:00
Eric Eastwood
c7d15dbcc7 Bulk refactor @measure_func decorator usage 2025-06-26 16:06:50 -05:00
Eric Eastwood
6731c4bbf0 Refactor Measure in WellKnownResolver 2025-06-26 16:06:50 -05:00
Eric Eastwood
d05b6ca4c1 Bulk refactor Measure(...) to add server_name 2025-06-26 16:06:50 -05:00
Eric Eastwood
65035b6098 Refactor @measure_func decorator to include server name
Update `@measure_func` docstring
2025-06-26 16:06:49 -05:00
Eric Eastwood
02a7668bb2 Add instance label to Measure 2025-06-26 15:53:24 -05:00
35 changed files with 376 additions and 91 deletions

1
changelog.d/18601.misc Normal file
View File

@ -0,0 +1 @@
Refactor `Measure` block metrics to be homeserver-scoped.

View File

@ -156,7 +156,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
with Measure(
self.clock, name="send_queue._clear", server_name=self.server_name
):
# Delete things out of presence maps
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)

View File

@ -657,7 +657,11 @@ class FederationSender(AbstractFederationSender):
logger.debug(
"Handling %i events in room %s", len(events), events[0].room_id
)
with Measure(self.clock, "handle_room_events"):
with Measure(
self.clock,
name="handle_room_events",
server_name=self.server_name,
):
for event in events:
await handle_event(event)

View File

@ -58,7 +58,7 @@ class TransactionManager:
"""
def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastores().main
self._transaction_actions = TransactionActions(self._store)
@ -116,7 +116,7 @@ class TransactionManager:
transaction = Transaction(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
origin=self.server_name,
destination=destination,
pdus=[p.get_pdu_json() for p in pdus],
edus=[edu.get_dict() for edu in edus],

View File

@ -73,6 +73,7 @@ events_processed_counter = Counter("synapse_handlers_appservice_events_processed
class ApplicationServicesHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self.is_mine_id = hs.is_mine_id
self.appservice_api = hs.get_application_service_api()
@ -120,7 +121,9 @@ class ApplicationServicesHandler:
@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
with Measure(
self.clock, name="notify_interested_services", server_name=self.server_name
):
self.is_processing = True
try:
upper_bound = -1
@ -329,7 +332,11 @@ class ApplicationServicesHandler:
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
with Measure(
self.clock,
name="notify_interested_services_ephemeral",
server_name=self.server_name,
):
for service in services:
if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)

View File

@ -54,6 +54,7 @@ logger = logging.getLogger(__name__)
class DelayedEventsHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
@ -159,7 +160,9 @@ class DelayedEventsHandler:
# Loop round handling deltas until we're up to date
while True:
with Measure(self._clock, "delayed_events_delta"):
with Measure(
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return

View File

@ -526,6 +526,8 @@ class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.federation_sender = hs.get_federation_sender()
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
@ -1215,7 +1217,8 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
self.store = hs.get_datastores().main
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.device_handler = device_handler
self._notifier = hs.get_notifier()

View File

@ -476,16 +476,16 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
class EventCreationHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.validator = EventValidator()
self.event_builder_factory = hs.get_event_builder_factory()
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.auth_blocking = hs.get_auth_blocking()
self._event_auth_handler = hs.get_event_auth_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.profile_handler = hs.get_profile_handler()
self.event_builder_factory = hs.get_event_builder_factory()
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = (

View File

@ -747,6 +747,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.hostname
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
@ -941,7 +942,9 @@ class PresenceHandler(BasePresenceHandler):
now = self.clock.time_msec()
with Measure(self.clock, "presence_update_states"):
with Measure(
self.clock, name="presence_update_states", server_name=self.server_name
):
# NOTE: We purposefully don't await between now and when we've
# calculated what we want to do with the new states, to avoid races.
@ -1497,7 +1500,9 @@ class PresenceHandler(BasePresenceHandler):
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
with Measure(
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
@ -1759,6 +1764,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# Same with get_presence_router:
#
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.server_name = hs.hostname
self.get_presence_handler = hs.get_presence_handler
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
@ -1792,7 +1798,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache
with Measure(self.clock, "presence.get_new_events"):
with Measure(
self.clock, name="presence.get_new_events", server_name=self.server_name
):
if from_key is not None:
from_key = int(from_key)

View File

@ -329,6 +329,7 @@ class E2eeSyncResult:
class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.hs_config = hs.config
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
@ -710,7 +711,9 @@ class SyncHandler:
sync_config = sync_result_builder.sync_config
with Measure(self.clock, "ephemeral_by_room"):
with Measure(
self.clock, name="ephemeral_by_room", server_name=self.server_name
):
typing_key = since_token.typing_key if since_token else 0
room_ids = sync_result_builder.joined_room_ids
@ -783,7 +786,9 @@ class SyncHandler:
and current token to send down to clients.
newly_joined_room
"""
with Measure(self.clock, "load_filtered_recents"):
with Measure(
self.clock, name="load_filtered_recents", server_name=self.server_name
):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
sync_config.filter_collection.blocks_all_room_timeline()
@ -1174,7 +1179,9 @@ class SyncHandler:
# updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
with Measure(
self.clock, name="compute_state_delta", server_name=self.server_name
):
# The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on.
members_to_fetch: Optional[Set[str]] = None
@ -1791,7 +1798,9 @@ class SyncHandler:
# the DB.
return RoomNotifCounts.empty()
with Measure(self.clock, "unread_notifs_for_room_id"):
with Measure(
self.clock, name="unread_notifs_for_room_id", server_name=self.server_name
):
return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),

View File

@ -503,6 +503,7 @@ class TypingWriterHandler(FollowerTypingHandler):
class TypingNotificationEventSource(EventSource[int, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self._main_store = hs.get_datastores().main
self.clock = hs.get_clock()
# We can't call get_typing_handler here because there's a cycle:
@ -535,7 +536,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
with Measure(
self.clock, name="typing.get_new_events_as", server_name=self.server_name
):
handler = self.get_typing_handler()
events = []
@ -571,7 +574,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
"""
with Measure(self.clock, "typing.get_new_events"):
with Measure(
self.clock, name="typing.get_new_events", server_name=self.server_name
):
from_key = int(from_key)
handler = self.get_typing_handler()

View File

@ -237,7 +237,9 @@ class UserDirectoryHandler(StateDeltasHandler):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
with Measure(
self.clock, name="user_dir_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering:
return

View File

@ -92,6 +92,7 @@ class MatrixFederationAgent:
def __init__(
self,
server_name: str,
reactor: ISynapseReactor,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
@ -100,6 +101,11 @@ class MatrixFederationAgent:
_srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None,
):
"""
Args:
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
"""
# proxy_reactor is not blocklisting reactor
proxy_reactor = reactor
@ -127,6 +133,7 @@ class MatrixFederationAgent:
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
server_name,
reactor,
agent=BlocklistingAgentWrapper(
ProxyAgent(

View File

@ -91,12 +91,19 @@ class WellKnownResolver:
def __init__(
self,
server_name: str,
reactor: IReactorTime,
agent: IAgent,
user_agent: bytes,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
):
"""
Args:
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
"""
self.server_name = server_name
self._reactor = reactor
self._clock = Clock(reactor)
@ -134,7 +141,13 @@ class WellKnownResolver:
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
with Measure(
self._clock,
name="get_well_known",
# This should be our homeserver where the the code is running (used to
# label metrics)
server_name=self.server_name,
):
result: Optional[bytes]
cache_period: float

View File

@ -417,6 +417,7 @@ class MatrixFederationHttpClient:
if hs.get_instance_name() in outbound_federation_restricted_to:
# Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent(
self.server_name,
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
@ -697,7 +698,11 @@ class MatrixFederationHttpClient:
outgoing_requests_counter.labels(request.method).inc()
try:
with Measure(self.clock, "outbound_request"):
with Measure(
self.clock,
name="outbound_request",
server_name=self.server_name,
):
# we don't want all the fancy cookie and redirect handling
# that treq.request gives: just use the raw Agent.

View File

@ -66,6 +66,19 @@ all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
INSTANCE_LABEL_NAME = "instance"
"""
The standard Prometheus label name used to identify which server instance the metrics
came from.
In the case of a Synapse homeserver, this should be set to the homeserver name
(`hs.hostname`).
Normally, this would be set automatically by the Prometheus server scraping the data but
since we support multiple instances of Synapse running in the same process and all
metrics are in a single global `REGISTRY`, we need to manually label any metrics.
"""
class _RegistryProxy:
@staticmethod
@ -192,7 +205,16 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
same key.
Note that `callback` may be called on a separate thread.
Args:
key: A tuple of label values, which must match the order of the
`labels` given to the constructor.
callback
"""
assert len(key) == len(self.labels), (
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
)
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
@ -201,7 +223,17 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""Registers that we've exited a block with labels `key`."""
"""
Registers that we've exited a block with labels `key`.
Args:
key: A tuple of label values, which must match the order of the
`labels` given to the constructor.
callback
"""
assert len(key) == len(self.labels), (
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
)
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
@ -225,7 +257,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
with self._lock:
callbacks = set(self._registrations[key])
in_flight.add_metric(key, len(callbacks))
in_flight.add_metric(labels=key, value=len(callbacks))
metrics = self._metrics_class()
metrics_by_key[key] = metrics
@ -239,7 +271,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
"_".join([self.name, name]), "", labels=self.labels
)
for key, metrics in metrics_by_key.items():
gauge.add_metric(key, getattr(metrics, name))
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:

View File

@ -31,6 +31,7 @@ IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK = Callable[[str, int], Awaitabl
class MediaRepositoryModuleApiCallbacks:
def __init__(self, hs: "HomeServer") -> None:
self.server_name = hs.hostname
self.clock = hs.get_clock()
self._get_media_config_for_user_callbacks: List[
GET_MEDIA_CONFIG_FOR_USER_CALLBACK
@ -57,7 +58,11 @@ class MediaRepositoryModuleApiCallbacks:
async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]:
for callback in self._get_media_config_for_user_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Optional[JsonDict] = await delay_cancellation(callback(user_id))
if res:
return res
@ -68,7 +73,11 @@ class MediaRepositoryModuleApiCallbacks:
self, user_id: str, size: int
) -> bool:
for callback in self._is_user_allowed_to_upload_media_of_size_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: bool = await delay_cancellation(callback(user_id, size))
if not res:
return res

View File

@ -43,6 +43,7 @@ GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK = Callable[
class RatelimitModuleApiCallbacks:
def __init__(self, hs: "HomeServer") -> None:
self.server_name = hs.hostname
self.clock = hs.get_clock()
self._get_ratelimit_override_for_user_callbacks: List[
GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK
@ -64,7 +65,11 @@ class RatelimitModuleApiCallbacks:
self, user_id: str, limiter_name: str
) -> Optional[RatelimitOverride]:
for callback in self._get_ratelimit_override_for_user_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Optional[RatelimitOverride] = await delay_cancellation(
callback(user_id, limiter_name)
)

View File

@ -356,6 +356,7 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM"
def __init__(self, hs: "synapse.server.HomeServer") -> None:
self.server_name = hs.hostname
self.clock = hs.get_clock()
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
@ -490,7 +491,11 @@ class SpamCheckerModuleApiCallbacks:
generally discouraged as it doesn't support internationalization.
"""
for callback in self._check_event_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(event))
if res is False or res == self.NOT_SPAM:
# This spam-checker accepts the event.
@ -543,7 +548,11 @@ class SpamCheckerModuleApiCallbacks:
True if the event should be silently dropped
"""
for callback in self._should_drop_federated_event_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Union[bool, str] = await delay_cancellation(callback(event))
if res:
return res
@ -565,7 +574,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
"""
for callback in self._user_may_join_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(user_id, room_id, is_invited))
# Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is True or res is self.NOT_SPAM:
@ -604,7 +617,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise.
"""
for callback in self._user_may_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(
callback(inviter_userid, invitee_userid, room_id)
)
@ -643,7 +660,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise.
"""
for callback in self._federated_user_may_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(event))
# Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is True or res is self.NOT_SPAM:
@ -686,7 +707,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise.
"""
for callback in self._user_may_send_3pid_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(
callback(inviter_userid, medium, address, room_id)
)
@ -722,7 +747,11 @@ class SpamCheckerModuleApiCallbacks:
room_config: The room creation configuration which is the body of the /createRoom request
"""
for callback in self._user_may_create_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
checker_args = inspect.signature(callback)
# Also ensure backwards compatibility with spam checker callbacks
# that don't expect the room_config argument.
@ -786,7 +815,11 @@ class SpamCheckerModuleApiCallbacks:
content: The content of the state event
"""
for callback in self._user_may_send_state_event_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
# We make a copy of the content to ensure that the spam checker cannot modify it.
res = await delay_cancellation(
callback(user_id, room_id, event_type, state_key, deepcopy(content))
@ -814,7 +847,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._user_may_create_room_alias_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(userid, room_alias))
if res is True or res is self.NOT_SPAM:
continue
@ -847,7 +884,11 @@ class SpamCheckerModuleApiCallbacks:
room_id: The ID of the room that would be published
"""
for callback in self._user_may_publish_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(userid, room_id))
if res is True or res is self.NOT_SPAM:
continue
@ -889,7 +930,11 @@ class SpamCheckerModuleApiCallbacks:
True if the user is spammy.
"""
for callback in self._check_username_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
checker_args = inspect.signature(callback)
# Make a copy of the user profile object to ensure the spam checker cannot
# modify it.
@ -938,7 +983,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._check_registration_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
behaviour = await delay_cancellation(
callback(email_threepid, username, request_info, auth_provider_id)
)
@ -980,7 +1029,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._check_media_file_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(file_wrapper, file_info))
# Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is False or res is self.NOT_SPAM:
@ -1027,7 +1080,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._check_login_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(
callback(
user_id,

View File

@ -129,7 +129,8 @@ class BulkPushRuleEvaluator:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._event_auth_handler = hs.get_event_auth_handler()
self.should_calculate_push_rules = self.hs.config.push.enable_push

View File

@ -76,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"):
with Measure(
self.clock, name="repl_fed_send_events_parse", server_name=self.server_name
):
room_id = content["room_id"]
backfilled = content["backfilled"]

View File

@ -76,6 +76,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.hostname
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, event_id: str
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_event_parse"):
with Measure(
self.clock, name="repl_send_event_parse", server_name=self.server_name
):
event_dict = content["event"]
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
internal_metadata = content["internal_metadata"]

View File

@ -77,6 +77,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.hostname
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, payload: JsonDict
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_events_parse"):
with Measure(
self.clock, name="repl_send_events_parse", server_name=self.server_name
):
events_and_context = []
events = payload["events"]
rooms = set()

View File

@ -75,6 +75,7 @@ class ReplicationDataHandler:
"""
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self._reactor = hs.get_reactor()
@ -342,7 +343,11 @@ class ReplicationDataHandler:
waiting_list.add((position, deferred))
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
with Measure(
self._clock,
name="repl.wait_for_stream_position",
server_name=self.server_name,
):
logger.info(
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
stream_name,

View File

@ -78,6 +78,7 @@ class ReplicationStreamer:
"""
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@ -155,7 +156,11 @@ class ReplicationStreamer:
while self.pending_updates:
self.pending_updates = False
with Measure(self.clock, "repl.stream.get_updates"):
with Measure(
self.clock,
name="repl.stream.get_updates",
server_name=self.server_name,
):
all_streams = self.streams
if self._replication_torture_level is not None:

View File

@ -189,7 +189,8 @@ class StateHandler:
"""
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.store = hs.get_datastores().main
self._state_storage_controller = hs.get_storage_controllers().state
self.hs = hs
@ -631,6 +632,7 @@ class StateResolutionHandler:
"""
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
@ -747,7 +749,9 @@ class StateResolutionHandler:
# which will be used as a cache key for future resolutions, but
# not get persisted.
with Measure(self.clock, "state.create_group_ids"):
with Measure(
self.clock, name="state.create_group_ids", server_name=self.server_name
):
cache = _make_state_cache_entry(new_state, state_groups_ids)
self._state_cache[group_names] = cache
@ -785,7 +789,9 @@ class StateResolutionHandler:
a map from (type, state_key) to event_id.
"""
try:
with Measure(self.clock, "state._resolve_events") as m:
with Measure(
self.clock, name="state._resolve_events", server_name=self.server_name
) as m:
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
if room_version_obj.state_res == StateResolutionVersions.V1:
return await v1.resolve_events_with_store(

View File

@ -55,6 +55,7 @@ class SQLBaseStore(metaclass=ABCMeta):
hs: "HomeServer",
):
self.hs = hs
self.server_name = hs.hostname
self._clock = hs.get_clock()
self.database_engine = database.engine
self.db_pool = database

View File

@ -337,6 +337,7 @@ class EventsPersistenceStorageController:
assert stores.persist_events
self.persist_events_store = stores.persist_events
self.server_name = hs.hostname
self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
state_delta_for_room = None
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
with Measure(
self._clock,
name="_calculate_state_and_extrem",
server_name=self.server_name,
):
# Work out the new "current state" for the room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
room_id, chunk
)
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
with Measure(
self._clock,
name="calculate_chain_cover_index_for_events",
server_name=self.server_name,
):
# We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room
# and won't have their auth chain (we'll fix it up later if we join the
@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
break
logger.debug("Calculating state delta for room %s", room_id)
with Measure(self._clock, "persist_events.get_new_state_after_events"):
with Measure(
self._clock,
name="persist_events.get_new_state_after_events",
server_name=self.server_name,
):
res = await self._get_new_state_after_events(
room_id,
ev_ctx_rm,
@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
# removed keys entirely.
delta = DeltaState([], delta_ids)
elif current_state is not None:
with Measure(self._clock, "persist_events.calculate_state_delta"):
with Measure(
self._clock,
name="persist_events.calculate_state_delta",
server_name=self.server_name,
):
delta = await self._calculate_state_delta(room_id, current_state)
if delta:

View File

@ -68,6 +68,7 @@ class StateStorageController:
"""
def __init__(self, hs: "HomeServer", stores: "Databases"):
self.server_name = hs.hostname
self._is_mine_id = hs.is_mine_id
self._clock = hs.get_clock()
self.stores = stores
@ -812,7 +813,9 @@ class StateStorageController:
state_group = object()
assert state_group is not None
with Measure(self._clock, "get_joined_hosts"):
with Measure(
self._clock, name="get_joined_hosts", server_name=self.server_name
):
return await self._get_joined_hosts(
room_id, state_group, state_entry=state_entry
)

View File

@ -1246,7 +1246,9 @@ class EventsWorkerStore(SQLBaseStore):
to event row. Note that it may well contain additional events that
were not part of this request.
"""
with Measure(self._clock, "_fetch_event_list"):
with Measure(
self._clock, name="_fetch_event_list", server_name=self.server_name
):
try:
events_to_fetch = {
event_id for events, _ in event_list for event_id in events

View File

@ -983,7 +983,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
`_get_user_ids_from_membership_event_ids` for any uncached events.
"""
with Measure(self._clock, "get_joined_user_ids_from_state"):
with Measure(
self._clock,
name="get_joined_user_ids_from_state",
server_name=self.server_name,
):
users_in_room = set()
member_event_ids = [
e_id for key, e_id in state.items() if key[0] == EventTypes.Member

View File

@ -41,49 +41,83 @@ from synapse.logging.context import (
LoggingContext,
current_context,
)
from synapse.metrics import InFlightGauge
from synapse.metrics import INSTANCE_LABEL_NAME, InFlightGauge
from synapse.util import Clock
logger = logging.getLogger(__name__)
block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"])
# Metrics to see the number of and how much time is spend in various blocks of code.
#
block_counter = Counter(
"synapse_util_metrics_block_count",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""The number of times this block has been called."""
block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"])
block_timer = Counter(
"synapse_util_metrics_block_time_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""The cumulative time spent executing this block across all calls, in seconds."""
block_ru_utime = Counter(
"synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]
"synapse_util_metrics_block_ru_utime_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""Resource usage: user CPU time in seconds used in this block"""
block_ru_stime = Counter(
"synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]
"synapse_util_metrics_block_ru_stime_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""Resource usage: system CPU time in seconds used in this block"""
block_db_txn_count = Counter(
"synapse_util_metrics_block_db_txn_count", "", ["block_name"]
"synapse_util_metrics_block_db_txn_count",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""Number of database transactions completed in this block"""
# seconds spent waiting for db txns, excluding scheduling time, in this block
block_db_txn_duration = Counter(
"synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]
"synapse_util_metrics_block_db_txn_duration_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""Seconds spent waiting for database txns, excluding scheduling time, in this block"""
# seconds spent waiting for a db connection, in this block
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]
"synapse_util_metrics_block_db_sched_duration_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""Seconds spent waiting for a db connection, in this block"""
# This is dynamically created in InFlightGauge.__init__.
class _InFlightMetric(Protocol):
class _BlockInFlightMetric(Protocol):
"""
Sub-metrics used for the `InFlightGauge` for blocks.
"""
real_time_max: float
"""The longest observed duration of any single execution of this block, in seconds."""
real_time_sum: float
"""The cumulative time spent executing this block across all calls, in seconds."""
# Tracks the number of blocks currently active
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
in_flight: InFlightGauge[_BlockInFlightMetric] = InFlightGauge(
"synapse_util_metrics_block_in_flight",
"",
labels=["block_name"],
labels=["block_name", INSTANCE_LABEL_NAME],
# Matches the fields in the `_BlockInFlightMetric`
sub_metrics=["real_time_max", "real_time_sum"],
)
@ -92,8 +126,16 @@ P = ParamSpec("P")
R = TypeVar("R")
class HasClock(Protocol):
class HasClockAndServerName(Protocol):
clock: Clock
"""
Used to measure functions
"""
server_name: str
"""
The homeserver name that this Measure is associated with (used to label the metric)
(`hs.hostname`).
"""
def measure_func(
@ -101,8 +143,9 @@ def measure_func(
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
"""Decorate an async method with a `Measure` context manager.
The Measure is created using `self.clock`; it should only be used to decorate
methods in classes defining an instance-level `clock` attribute.
The Measure is created using `self.clock` and `self.server_name; it should only be
used to decorate methods in classes defining an instance-level `clock` and
`server_name` attributes.
Usage:
@ -116,16 +159,21 @@ def measure_func(
with Measure(...):
...
Args:
name: The name of the metric to report (the block name) (used to label the
metric). Defaults to the name of the decorated function.
"""
def wrapper(
func: Callable[Concatenate[HasClock, P], Awaitable[R]],
func: Callable[Concatenate[HasClockAndServerName, P], Awaitable[R]],
) -> Callable[P, Awaitable[R]]:
block_name = func.__name__ if name is None else name
@wraps(func)
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
with Measure(self.clock, block_name):
async def measured_func(
self: HasClockAndServerName, *args: P.args, **kwargs: P.kwargs
) -> R:
with Measure(self.clock, name=block_name, server_name=self.server_name):
r = await func(self, *args, **kwargs)
return r
@ -142,19 +190,24 @@ class Measure:
__slots__ = [
"clock",
"name",
"server_name",
"_logging_context",
"start",
]
def __init__(self, clock: Clock, name: str) -> None:
def __init__(self, clock: Clock, *, name: str, server_name: str) -> None:
"""
Args:
clock: An object with a "time()" method, which returns the current
time in seconds.
name: The name of the metric to report.
name: The name of the metric to report (the block name) (used to label the
metric).
server_name: The homeserver name that this Measure is associated with (used to
label the metric) (`hs.hostname`).
"""
self.clock = clock
self.name = name
self.server_name = server_name
curr_context = current_context()
if not curr_context:
logger.warning(
@ -174,7 +227,7 @@ class Measure:
self.start = self.clock.time()
self._logging_context.__enter__()
in_flight.register((self.name,), self._update_in_flight)
in_flight.register((self.name, self.server_name), self._update_in_flight)
logger.debug("Entering block %s", self.name)
@ -194,19 +247,20 @@ class Measure:
duration = self.clock.time() - self.start
usage = self.get_resource_usage()
in_flight.unregister((self.name,), self._update_in_flight)
in_flight.unregister((self.name, self.server_name), self._update_in_flight)
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
try:
block_counter.labels(self.name).inc()
block_timer.labels(self.name).inc(duration)
block_ru_utime.labels(self.name).inc(usage.ru_utime)
block_ru_stime.labels(self.name).inc(usage.ru_stime)
block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
except ValueError:
logger.warning("Failed to save metrics! Usage: %s", usage)
labels = {"block_name": self.name, INSTANCE_LABEL_NAME: self.server_name}
block_counter.labels(**labels).inc()
block_timer.labels(**labels).inc(duration)
block_ru_utime.labels(**labels).inc(usage.ru_utime)
block_ru_stime.labels(**labels).inc(usage.ru_stime)
block_db_txn_count.labels(**labels).inc(usage.db_txn_count)
block_db_txn_duration.labels(**labels).inc(usage.db_txn_duration_sec)
block_db_sched_duration.labels(**labels).inc(usage.db_sched_duration_sec)
except ValueError as exc:
logger.warning("Failed to save metrics! Usage: %s Error: %s", usage, exc)
def get_resource_usage(self) -> ContextResourceUsage:
"""Get the resources used within this Measure block
@ -215,7 +269,7 @@ class Measure:
"""
return self._logging_context.get_resource_usage()
def _update_in_flight(self, metrics: _InFlightMetric) -> None:
def _update_in_flight(self, metrics: _BlockInFlightMetric) -> None:
"""Gets called when processing in flight metrics"""
assert self.start is not None
duration = self.clock.time() - self.start

View File

@ -86,6 +86,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.mock_federation_client = AsyncMock(spec=["put_json"])
self.mock_federation_client.put_json.return_value = (200, "OK")
self.mock_federation_client.agent = MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor,
tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0",

View File

@ -91,6 +91,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
"test_cache", timer=self.reactor.seconds
)
self.well_known_resolver = WellKnownResolver(
"OUR_STUB_HOMESERVER_NAME",
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
b"test-agent",
@ -269,6 +270,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
because it is created too early during setUp
"""
return MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor=cast(ISynapseReactor, self.reactor),
tls_client_options_factory=self.tls_factory,
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
@ -1011,6 +1013,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
# Build a new agent and WellKnownResolver with a different tls factory
tls_factory = FederationPolicyForHTTPS(config)
agent = MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor=self.reactor,
tls_client_options_factory=tls_factory,
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
@ -1018,6 +1021,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
ip_blocklist=IPSet(),
_srv_resolver=self.mock_resolver,
_well_known_resolver=WellKnownResolver(
"OUR_STUB_HOMESERVER_NAME",
cast(ISynapseReactor, self.reactor),
Agent(self.reactor, contextFactory=tls_factory),
b"test-agent",

View File

@ -68,6 +68,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
reactor, _ = get_clock()
self.matrix_federation_agent = MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor,
tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0",