Compare commits

...

12 Commits

Author SHA1 Message Date
Eric Eastwood
30b91fdd86
Merge e0f8992ee34562d41b6b78991132bd4fe678a30f into 24bcdb3f3c5aa2d8d2000dc34d54a8002a914616 2025-07-01 10:52:00 -05:00
Eric Eastwood
e0f8992ee3 Fix failing to save metrics because of incorrect label names
We would see this in the logs before:
```
Failed to save metrics! Usage: <ContextResourceUsage ...> Error: Incorrect label names
```
2025-06-26 16:43:13 -05:00
Eric Eastwood
06f9af155b Add introduction comment 2025-06-26 16:19:35 -05:00
Eric Eastwood
5ad555cefc Add docstrings for block metrics 2025-06-26 16:18:37 -05:00
Eric Eastwood
652c34bda6 Better docstrings for _InFlightMetric -> _BlockInFlightMetric 2025-06-26 16:16:37 -05:00
Eric Eastwood
521c68cafe Add changelog 2025-06-26 16:13:39 -05:00
Eric Eastwood
c232ec7b3b Fix mypy complaining about unknown types by changing property order around
Fix mypy complaints

```
synapse/handlers/delayed_events.py:266: error: Cannot determine type of "validator"  [has-type]
synapse/handlers/delayed_events.py:267: error: Cannot determine type of "event_builder_factory"  [has-type]
```
2025-06-26 16:09:34 -05:00
Eric Eastwood
c7d15dbcc7 Bulk refactor @measure_func decorator usage 2025-06-26 16:06:50 -05:00
Eric Eastwood
6731c4bbf0 Refactor Measure in WellKnownResolver 2025-06-26 16:06:50 -05:00
Eric Eastwood
d05b6ca4c1 Bulk refactor Measure(...) to add server_name 2025-06-26 16:06:50 -05:00
Eric Eastwood
65035b6098 Refactor @measure_func decorator to include server name
Update `@measure_func` docstring
2025-06-26 16:06:49 -05:00
Eric Eastwood
02a7668bb2 Add instance label to Measure 2025-06-26 15:53:24 -05:00
35 changed files with 376 additions and 91 deletions

1
changelog.d/18601.misc Normal file
View File

@ -0,0 +1 @@
Refactor `Measure` block metrics to be homeserver-scoped.

View File

@ -156,7 +156,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
def _clear_queue_before_pos(self, position_to_delete: int) -> None: def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position""" """Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"): with Measure(
self.clock, name="send_queue._clear", server_name=self.server_name
):
# Delete things out of presence maps # Delete things out of presence maps
keys = self.presence_destinations.keys() keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete) i = self.presence_destinations.bisect_left(position_to_delete)

View File

@ -657,7 +657,11 @@ class FederationSender(AbstractFederationSender):
logger.debug( logger.debug(
"Handling %i events in room %s", len(events), events[0].room_id "Handling %i events in room %s", len(events), events[0].room_id
) )
with Measure(self.clock, "handle_room_events"): with Measure(
self.clock,
name="handle_room_events",
server_name=self.server_name,
):
for event in events: for event in events:
await handle_event(event) await handle_event(event)

View File

@ -58,7 +58,7 @@ class TransactionManager:
""" """
def __init__(self, hs: "synapse.server.HomeServer"): def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastores().main self._store = hs.get_datastores().main
self._transaction_actions = TransactionActions(self._store) self._transaction_actions = TransactionActions(self._store)
@ -116,7 +116,7 @@ class TransactionManager:
transaction = Transaction( transaction = Transaction(
origin_server_ts=int(self.clock.time_msec()), origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id, transaction_id=txn_id,
origin=self._server_name, origin=self.server_name,
destination=destination, destination=destination,
pdus=[p.get_pdu_json() for p in pdus], pdus=[p.get_pdu_json() for p in pdus],
edus=[edu.get_dict() for edu in edus], edus=[edu.get_dict() for edu in edus],

View File

@ -73,6 +73,7 @@ events_processed_counter = Counter("synapse_handlers_appservice_events_processed
class ApplicationServicesHandler: class ApplicationServicesHandler:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.is_mine_id = hs.is_mine_id self.is_mine_id = hs.is_mine_id
self.appservice_api = hs.get_application_service_api() self.appservice_api = hs.get_application_service_api()
@ -120,7 +121,9 @@ class ApplicationServicesHandler:
@wrap_as_background_process("notify_interested_services") @wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None: async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"): with Measure(
self.clock, name="notify_interested_services", server_name=self.server_name
):
self.is_processing = True self.is_processing = True
try: try:
upper_bound = -1 upper_bound = -1
@ -329,7 +332,11 @@ class ApplicationServicesHandler:
users: Collection[Union[str, UserID]], users: Collection[Union[str, UserID]],
) -> None: ) -> None:
logger.debug("Checking interested services for %s", stream_key) logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"): with Measure(
self.clock,
name="notify_interested_services_ephemeral",
server_name=self.server_name,
):
for service in services: for service in services:
if stream_key == StreamKeyType.TYPING: if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos) # Note that we don't persist the token (via set_appservice_stream_type_pos)

View File

@ -54,6 +54,7 @@ logger = logging.getLogger(__name__)
class DelayedEventsHandler: class DelayedEventsHandler:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self._store = hs.get_datastores().main self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config self._config = hs.config
@ -159,7 +160,9 @@ class DelayedEventsHandler:
# Loop round handling deltas until we're up to date # Loop round handling deltas until we're up to date
while True: while True:
with Measure(self._clock, "delayed_events_delta"): with Measure(
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering() room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering: if self._event_pos == room_max_stream_ordering:
return return

View File

@ -526,6 +526,8 @@ class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self._account_data_handler = hs.get_account_data_handler() self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
@ -1215,7 +1217,8 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.federation = hs.get_federation_client() self.federation = hs.get_federation_client()
self.clock = hs.get_clock() self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.device_handler = device_handler self.device_handler = device_handler
self._notifier = hs.get_notifier() self._notifier = hs.get_notifier()

View File

@ -476,16 +476,16 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
class EventCreationHandler: class EventCreationHandler:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.hs = hs self.hs = hs
self.validator = EventValidator()
self.event_builder_factory = hs.get_event_builder_factory()
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.auth_blocking = hs.get_auth_blocking() self.auth_blocking = hs.get_auth_blocking()
self._event_auth_handler = hs.get_event_auth_handler() self._event_auth_handler = hs.get_event_auth_handler()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.profile_handler = hs.get_profile_handler() self.profile_handler = hs.get_profile_handler()
self.event_builder_factory = hs.get_event_builder_factory()
self.server_name = hs.hostname
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.config = hs.config self.config = hs.config
self.require_membership_for_aliases = ( self.require_membership_for_aliases = (

View File

@ -747,6 +747,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
class PresenceHandler(BasePresenceHandler): class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.server_name = hs.hostname
self.wheel_timer: WheelTimer[str] = WheelTimer() self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
@ -941,7 +942,9 @@ class PresenceHandler(BasePresenceHandler):
now = self.clock.time_msec() now = self.clock.time_msec()
with Measure(self.clock, "presence_update_states"): with Measure(
self.clock, name="presence_update_states", server_name=self.server_name
):
# NOTE: We purposefully don't await between now and when we've # NOTE: We purposefully don't await between now and when we've
# calculated what we want to do with the new states, to avoid races. # calculated what we want to do with the new states, to avoid races.
@ -1497,7 +1500,9 @@ class PresenceHandler(BasePresenceHandler):
async def _unsafe_process(self) -> None: async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date # Loop round handling deltas until we're up to date
while True: while True:
with Measure(self.clock, "presence_delta"): with Measure(
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering() room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering: if self._event_pos == room_max_stream_ordering:
return return
@ -1759,6 +1764,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# Same with get_presence_router: # Same with get_presence_router:
# #
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.server_name = hs.hostname
self.get_presence_handler = hs.get_presence_handler self.get_presence_handler = hs.get_presence_handler
self.get_presence_router = hs.get_presence_router self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock() self.clock = hs.get_clock()
@ -1792,7 +1798,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
user_id = user.to_string() user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache stream_change_cache = self.store.presence_stream_cache
with Measure(self.clock, "presence.get_new_events"): with Measure(
self.clock, name="presence.get_new_events", server_name=self.server_name
):
if from_key is not None: if from_key is not None:
from_key = int(from_key) from_key = int(from_key)

View File

@ -329,6 +329,7 @@ class E2eeSyncResult:
class SyncHandler: class SyncHandler:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.hs_config = hs.config self.hs_config = hs.config
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
@ -710,7 +711,9 @@ class SyncHandler:
sync_config = sync_result_builder.sync_config sync_config = sync_result_builder.sync_config
with Measure(self.clock, "ephemeral_by_room"): with Measure(
self.clock, name="ephemeral_by_room", server_name=self.server_name
):
typing_key = since_token.typing_key if since_token else 0 typing_key = since_token.typing_key if since_token else 0
room_ids = sync_result_builder.joined_room_ids room_ids = sync_result_builder.joined_room_ids
@ -783,7 +786,9 @@ class SyncHandler:
and current token to send down to clients. and current token to send down to clients.
newly_joined_room newly_joined_room
""" """
with Measure(self.clock, "load_filtered_recents"): with Measure(
self.clock, name="load_filtered_recents", server_name=self.server_name
):
timeline_limit = sync_config.filter_collection.timeline_limit() timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = ( block_all_timeline = (
sync_config.filter_collection.blocks_all_room_timeline() sync_config.filter_collection.blocks_all_room_timeline()
@ -1174,7 +1179,9 @@ class SyncHandler:
# updates even if they occurred logically before the previous event. # updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events. # TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"): with Measure(
self.clock, name="compute_state_delta", server_name=self.server_name
):
# The memberships needed for events in the timeline. # The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on. # Only calculated when `lazy_load_members` is on.
members_to_fetch: Optional[Set[str]] = None members_to_fetch: Optional[Set[str]] = None
@ -1791,7 +1798,9 @@ class SyncHandler:
# the DB. # the DB.
return RoomNotifCounts.empty() return RoomNotifCounts.empty()
with Measure(self.clock, "unread_notifs_for_room_id"): with Measure(
self.clock, name="unread_notifs_for_room_id", server_name=self.server_name
):
return await self.store.get_unread_event_push_actions_by_room_for_user( return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, room_id,
sync_config.user.to_string(), sync_config.user.to_string(),

View File

@ -503,6 +503,7 @@ class TypingWriterHandler(FollowerTypingHandler):
class TypingNotificationEventSource(EventSource[int, JsonMapping]): class TypingNotificationEventSource(EventSource[int, JsonMapping]):
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self._main_store = hs.get_datastores().main self._main_store = hs.get_datastores().main
self.clock = hs.get_clock() self.clock = hs.get_clock()
# We can't call get_typing_handler here because there's a cycle: # We can't call get_typing_handler here because there's a cycle:
@ -535,7 +536,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
appservice may be interested in. appservice may be interested in.
* The latest known room serial. * The latest known room serial.
""" """
with Measure(self.clock, "typing.get_new_events_as"): with Measure(
self.clock, name="typing.get_new_events_as", server_name=self.server_name
):
handler = self.get_typing_handler() handler = self.get_typing_handler()
events = [] events = []
@ -571,7 +574,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
Find typing notifications for given rooms (> `from_token` and <= `to_token`) Find typing notifications for given rooms (> `from_token` and <= `to_token`)
""" """
with Measure(self.clock, "typing.get_new_events"): with Measure(
self.clock, name="typing.get_new_events", server_name=self.server_name
):
from_key = int(from_key) from_key = int(from_key)
handler = self.get_typing_handler() handler = self.get_typing_handler()

View File

@ -237,7 +237,9 @@ class UserDirectoryHandler(StateDeltasHandler):
# Loop round handling deltas until we're up to date # Loop round handling deltas until we're up to date
while True: while True:
with Measure(self.clock, "user_dir_delta"): with Measure(
self.clock, name="user_dir_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering() room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering: if self.pos == room_max_stream_ordering:
return return

View File

@ -92,6 +92,7 @@ class MatrixFederationAgent:
def __init__( def __init__(
self, self,
server_name: str,
reactor: ISynapseReactor, reactor: ISynapseReactor,
tls_client_options_factory: Optional[FederationPolicyForHTTPS], tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes, user_agent: bytes,
@ -100,6 +101,11 @@ class MatrixFederationAgent:
_srv_resolver: Optional[SrvResolver] = None, _srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None, _well_known_resolver: Optional[WellKnownResolver] = None,
): ):
"""
Args:
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
"""
# proxy_reactor is not blocklisting reactor # proxy_reactor is not blocklisting reactor
proxy_reactor = reactor proxy_reactor = reactor
@ -127,6 +133,7 @@ class MatrixFederationAgent:
if _well_known_resolver is None: if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver( _well_known_resolver = WellKnownResolver(
server_name,
reactor, reactor,
agent=BlocklistingAgentWrapper( agent=BlocklistingAgentWrapper(
ProxyAgent( ProxyAgent(

View File

@ -91,12 +91,19 @@ class WellKnownResolver:
def __init__( def __init__(
self, self,
server_name: str,
reactor: IReactorTime, reactor: IReactorTime,
agent: IAgent, agent: IAgent,
user_agent: bytes, user_agent: bytes,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None, well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None, had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
): ):
"""
Args:
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
"""
self.server_name = server_name
self._reactor = reactor self._reactor = reactor
self._clock = Clock(reactor) self._clock = Clock(reactor)
@ -134,7 +141,13 @@ class WellKnownResolver:
# TODO: should we linearise so that we don't end up doing two .well-known # TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel? # requests for the same server in parallel?
try: try:
with Measure(self._clock, "get_well_known"): with Measure(
self._clock,
name="get_well_known",
# This should be our homeserver where the the code is running (used to
# label metrics)
server_name=self.server_name,
):
result: Optional[bytes] result: Optional[bytes]
cache_period: float cache_period: float

View File

@ -417,6 +417,7 @@ class MatrixFederationHttpClient:
if hs.get_instance_name() in outbound_federation_restricted_to: if hs.get_instance_name() in outbound_federation_restricted_to:
# Talk to federation directly # Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent( federation_agent: IAgent = MatrixFederationAgent(
self.server_name,
self.reactor, self.reactor,
tls_client_options_factory, tls_client_options_factory,
user_agent.encode("ascii"), user_agent.encode("ascii"),
@ -697,7 +698,11 @@ class MatrixFederationHttpClient:
outgoing_requests_counter.labels(request.method).inc() outgoing_requests_counter.labels(request.method).inc()
try: try:
with Measure(self.clock, "outbound_request"): with Measure(
self.clock,
name="outbound_request",
server_name=self.server_name,
):
# we don't want all the fancy cookie and redirect handling # we don't want all the fancy cookie and redirect handling
# that treq.request gives: just use the raw Agent. # that treq.request gives: just use the raw Agent.

View File

@ -66,6 +66,19 @@ all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
INSTANCE_LABEL_NAME = "instance"
"""
The standard Prometheus label name used to identify which server instance the metrics
came from.
In the case of a Synapse homeserver, this should be set to the homeserver name
(`hs.hostname`).
Normally, this would be set automatically by the Prometheus server scraping the data but
since we support multiple instances of Synapse running in the same process and all
metrics are in a single global `REGISTRY`, we need to manually label any metrics.
"""
class _RegistryProxy: class _RegistryProxy:
@staticmethod @staticmethod
@ -192,7 +205,16 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
same key. same key.
Note that `callback` may be called on a separate thread. Note that `callback` may be called on a separate thread.
Args:
key: A tuple of label values, which must match the order of the
`labels` given to the constructor.
callback
""" """
assert len(key) == len(self.labels), (
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
)
with self._lock: with self._lock:
self._registrations.setdefault(key, set()).add(callback) self._registrations.setdefault(key, set()).add(callback)
@ -201,7 +223,17 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
key: Tuple[str, ...], key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None], callback: Callable[[MetricsEntry], None],
) -> None: ) -> None:
"""Registers that we've exited a block with labels `key`.""" """
Registers that we've exited a block with labels `key`.
Args:
key: A tuple of label values, which must match the order of the
`labels` given to the constructor.
callback
"""
assert len(key) == len(self.labels), (
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
)
with self._lock: with self._lock:
self._registrations.setdefault(key, set()).discard(callback) self._registrations.setdefault(key, set()).discard(callback)
@ -225,7 +257,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
with self._lock: with self._lock:
callbacks = set(self._registrations[key]) callbacks = set(self._registrations[key])
in_flight.add_metric(key, len(callbacks)) in_flight.add_metric(labels=key, value=len(callbacks))
metrics = self._metrics_class() metrics = self._metrics_class()
metrics_by_key[key] = metrics metrics_by_key[key] = metrics
@ -239,7 +271,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
"_".join([self.name, name]), "", labels=self.labels "_".join([self.name, name]), "", labels=self.labels
) )
for key, metrics in metrics_by_key.items(): for key, metrics in metrics_by_key.items():
gauge.add_metric(key, getattr(metrics, name)) gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge yield gauge
def _register_with_collector(self) -> None: def _register_with_collector(self) -> None:

View File

@ -31,6 +31,7 @@ IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK = Callable[[str, int], Awaitabl
class MediaRepositoryModuleApiCallbacks: class MediaRepositoryModuleApiCallbacks:
def __init__(self, hs: "HomeServer") -> None: def __init__(self, hs: "HomeServer") -> None:
self.server_name = hs.hostname
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._get_media_config_for_user_callbacks: List[ self._get_media_config_for_user_callbacks: List[
GET_MEDIA_CONFIG_FOR_USER_CALLBACK GET_MEDIA_CONFIG_FOR_USER_CALLBACK
@ -57,7 +58,11 @@ class MediaRepositoryModuleApiCallbacks:
async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]: async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]:
for callback in self._get_media_config_for_user_callbacks: for callback in self._get_media_config_for_user_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Optional[JsonDict] = await delay_cancellation(callback(user_id)) res: Optional[JsonDict] = await delay_cancellation(callback(user_id))
if res: if res:
return res return res
@ -68,7 +73,11 @@ class MediaRepositoryModuleApiCallbacks:
self, user_id: str, size: int self, user_id: str, size: int
) -> bool: ) -> bool:
for callback in self._is_user_allowed_to_upload_media_of_size_callbacks: for callback in self._is_user_allowed_to_upload_media_of_size_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: bool = await delay_cancellation(callback(user_id, size)) res: bool = await delay_cancellation(callback(user_id, size))
if not res: if not res:
return res return res

View File

@ -43,6 +43,7 @@ GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK = Callable[
class RatelimitModuleApiCallbacks: class RatelimitModuleApiCallbacks:
def __init__(self, hs: "HomeServer") -> None: def __init__(self, hs: "HomeServer") -> None:
self.server_name = hs.hostname
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._get_ratelimit_override_for_user_callbacks: List[ self._get_ratelimit_override_for_user_callbacks: List[
GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK
@ -64,7 +65,11 @@ class RatelimitModuleApiCallbacks:
self, user_id: str, limiter_name: str self, user_id: str, limiter_name: str
) -> Optional[RatelimitOverride]: ) -> Optional[RatelimitOverride]:
for callback in self._get_ratelimit_override_for_user_callbacks: for callback in self._get_ratelimit_override_for_user_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Optional[RatelimitOverride] = await delay_cancellation( res: Optional[RatelimitOverride] = await delay_cancellation(
callback(user_id, limiter_name) callback(user_id, limiter_name)
) )

View File

@ -356,6 +356,7 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM" NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM"
def __init__(self, hs: "synapse.server.HomeServer") -> None: def __init__(self, hs: "synapse.server.HomeServer") -> None:
self.server_name = hs.hostname
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = [] self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
@ -490,7 +491,11 @@ class SpamCheckerModuleApiCallbacks:
generally discouraged as it doesn't support internationalization. generally discouraged as it doesn't support internationalization.
""" """
for callback in self._check_event_for_spam_callbacks: for callback in self._check_event_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(event)) res = await delay_cancellation(callback(event))
if res is False or res == self.NOT_SPAM: if res is False or res == self.NOT_SPAM:
# This spam-checker accepts the event. # This spam-checker accepts the event.
@ -543,7 +548,11 @@ class SpamCheckerModuleApiCallbacks:
True if the event should be silently dropped True if the event should be silently dropped
""" """
for callback in self._should_drop_federated_event_callbacks: for callback in self._should_drop_federated_event_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Union[bool, str] = await delay_cancellation(callback(event)) res: Union[bool, str] = await delay_cancellation(callback(event))
if res: if res:
return res return res
@ -565,7 +574,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise. NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
""" """
for callback in self._user_may_join_room_callbacks: for callback in self._user_may_join_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(user_id, room_id, is_invited)) res = await delay_cancellation(callback(user_id, room_id, is_invited))
# Normalize return values to `Codes` or `"NOT_SPAM"`. # Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is True or res is self.NOT_SPAM: if res is True or res is self.NOT_SPAM:
@ -604,7 +617,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise. NOT_SPAM if the operation is permitted, Codes otherwise.
""" """
for callback in self._user_may_invite_callbacks: for callback in self._user_may_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation( res = await delay_cancellation(
callback(inviter_userid, invitee_userid, room_id) callback(inviter_userid, invitee_userid, room_id)
) )
@ -643,7 +660,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise. NOT_SPAM if the operation is permitted, Codes otherwise.
""" """
for callback in self._federated_user_may_invite_callbacks: for callback in self._federated_user_may_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(event)) res = await delay_cancellation(callback(event))
# Normalize return values to `Codes` or `"NOT_SPAM"`. # Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is True or res is self.NOT_SPAM: if res is True or res is self.NOT_SPAM:
@ -686,7 +707,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise. NOT_SPAM if the operation is permitted, Codes otherwise.
""" """
for callback in self._user_may_send_3pid_invite_callbacks: for callback in self._user_may_send_3pid_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation( res = await delay_cancellation(
callback(inviter_userid, medium, address, room_id) callback(inviter_userid, medium, address, room_id)
) )
@ -722,7 +747,11 @@ class SpamCheckerModuleApiCallbacks:
room_config: The room creation configuration which is the body of the /createRoom request room_config: The room creation configuration which is the body of the /createRoom request
""" """
for callback in self._user_may_create_room_callbacks: for callback in self._user_may_create_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
checker_args = inspect.signature(callback) checker_args = inspect.signature(callback)
# Also ensure backwards compatibility with spam checker callbacks # Also ensure backwards compatibility with spam checker callbacks
# that don't expect the room_config argument. # that don't expect the room_config argument.
@ -786,7 +815,11 @@ class SpamCheckerModuleApiCallbacks:
content: The content of the state event content: The content of the state event
""" """
for callback in self._user_may_send_state_event_callbacks: for callback in self._user_may_send_state_event_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
# We make a copy of the content to ensure that the spam checker cannot modify it. # We make a copy of the content to ensure that the spam checker cannot modify it.
res = await delay_cancellation( res = await delay_cancellation(
callback(user_id, room_id, event_type, state_key, deepcopy(content)) callback(user_id, room_id, event_type, state_key, deepcopy(content))
@ -814,7 +847,11 @@ class SpamCheckerModuleApiCallbacks:
""" """
for callback in self._user_may_create_room_alias_callbacks: for callback in self._user_may_create_room_alias_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(userid, room_alias)) res = await delay_cancellation(callback(userid, room_alias))
if res is True or res is self.NOT_SPAM: if res is True or res is self.NOT_SPAM:
continue continue
@ -847,7 +884,11 @@ class SpamCheckerModuleApiCallbacks:
room_id: The ID of the room that would be published room_id: The ID of the room that would be published
""" """
for callback in self._user_may_publish_room_callbacks: for callback in self._user_may_publish_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(userid, room_id)) res = await delay_cancellation(callback(userid, room_id))
if res is True or res is self.NOT_SPAM: if res is True or res is self.NOT_SPAM:
continue continue
@ -889,7 +930,11 @@ class SpamCheckerModuleApiCallbacks:
True if the user is spammy. True if the user is spammy.
""" """
for callback in self._check_username_for_spam_callbacks: for callback in self._check_username_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
checker_args = inspect.signature(callback) checker_args = inspect.signature(callback)
# Make a copy of the user profile object to ensure the spam checker cannot # Make a copy of the user profile object to ensure the spam checker cannot
# modify it. # modify it.
@ -938,7 +983,11 @@ class SpamCheckerModuleApiCallbacks:
""" """
for callback in self._check_registration_for_spam_callbacks: for callback in self._check_registration_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
behaviour = await delay_cancellation( behaviour = await delay_cancellation(
callback(email_threepid, username, request_info, auth_provider_id) callback(email_threepid, username, request_info, auth_provider_id)
) )
@ -980,7 +1029,11 @@ class SpamCheckerModuleApiCallbacks:
""" """
for callback in self._check_media_file_for_spam_callbacks: for callback in self._check_media_file_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation(callback(file_wrapper, file_info)) res = await delay_cancellation(callback(file_wrapper, file_info))
# Normalize return values to `Codes` or `"NOT_SPAM"`. # Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is False or res is self.NOT_SPAM: if res is False or res is self.NOT_SPAM:
@ -1027,7 +1080,11 @@ class SpamCheckerModuleApiCallbacks:
""" """
for callback in self._check_login_for_spam_callbacks: for callback in self._check_login_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res = await delay_cancellation( res = await delay_cancellation(
callback( callback(
user_id, user_id,

View File

@ -129,7 +129,8 @@ class BulkPushRuleEvaluator:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.hs = hs self.hs = hs
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.clock = hs.get_clock() self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._event_auth_handler = hs.get_event_auth_handler() self._event_auth_handler = hs.get_event_auth_handler()
self.should_calculate_push_rules = self.hs.config.push.enable_push self.should_calculate_push_rules = self.hs.config.push.enable_push

View File

@ -76,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.server_name = hs.hostname
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override] async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict self, request: Request, content: JsonDict
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"): with Measure(
self.clock, name="repl_fed_send_events_parse", server_name=self.server_name
):
room_id = content["room_id"] room_id = content["room_id"]
backfilled = content["backfilled"] backfilled = content["backfilled"]

View File

@ -76,6 +76,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.server_name = hs.hostname
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override] async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, event_id: str self, request: Request, content: JsonDict, event_id: str
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_event_parse"): with Measure(
self.clock, name="repl_send_event_parse", server_name=self.server_name
):
event_dict = content["event"] event_dict = content["event"]
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]] room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
internal_metadata = content["internal_metadata"] internal_metadata = content["internal_metadata"]

View File

@ -77,6 +77,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
super().__init__(hs) super().__init__(hs)
self.server_name = hs.hostname
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override] async def _handle_request( # type: ignore[override]
self, request: Request, payload: JsonDict self, request: Request, payload: JsonDict
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_events_parse"): with Measure(
self.clock, name="repl_send_events_parse", server_name=self.server_name
):
events_and_context = [] events_and_context = []
events = payload["events"] events = payload["events"]
rooms = set() rooms = set()

View File

@ -75,6 +75,7 @@ class ReplicationDataHandler:
""" """
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self._reactor = hs.get_reactor() self._reactor = hs.get_reactor()
@ -342,7 +343,11 @@ class ReplicationDataHandler:
waiting_list.add((position, deferred)) waiting_list.add((position, deferred))
# We measure here to get in flight counts and average waiting time. # We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"): with Measure(
self._clock,
name="repl.wait_for_stream_position",
server_name=self.server_name,
):
logger.info( logger.info(
"Waiting for repl stream %r to reach %s (%s); currently at: %s", "Waiting for repl stream %r to reach %s (%s); currently at: %s",
stream_name, stream_name,

View File

@ -78,6 +78,7 @@ class ReplicationStreamer:
""" """
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
@ -155,7 +156,11 @@ class ReplicationStreamer:
while self.pending_updates: while self.pending_updates:
self.pending_updates = False self.pending_updates = False
with Measure(self.clock, "repl.stream.get_updates"): with Measure(
self.clock,
name="repl.stream.get_updates",
server_name=self.server_name,
):
all_streams = self.streams all_streams = self.streams
if self._replication_torture_level is not None: if self._replication_torture_level is not None:

View File

@ -189,7 +189,8 @@ class StateHandler:
""" """
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock() self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._state_storage_controller = hs.get_storage_controllers().state self._state_storage_controller = hs.get_storage_controllers().state
self.hs = hs self.hs = hs
@ -631,6 +632,7 @@ class StateResolutionHandler:
""" """
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.resolve_linearizer = Linearizer(name="state_resolve_lock") self.resolve_linearizer = Linearizer(name="state_resolve_lock")
@ -747,7 +749,9 @@ class StateResolutionHandler:
# which will be used as a cache key for future resolutions, but # which will be used as a cache key for future resolutions, but
# not get persisted. # not get persisted.
with Measure(self.clock, "state.create_group_ids"): with Measure(
self.clock, name="state.create_group_ids", server_name=self.server_name
):
cache = _make_state_cache_entry(new_state, state_groups_ids) cache = _make_state_cache_entry(new_state, state_groups_ids)
self._state_cache[group_names] = cache self._state_cache[group_names] = cache
@ -785,7 +789,9 @@ class StateResolutionHandler:
a map from (type, state_key) to event_id. a map from (type, state_key) to event_id.
""" """
try: try:
with Measure(self.clock, "state._resolve_events") as m: with Measure(
self.clock, name="state._resolve_events", server_name=self.server_name
) as m:
room_version_obj = KNOWN_ROOM_VERSIONS[room_version] room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
if room_version_obj.state_res == StateResolutionVersions.V1: if room_version_obj.state_res == StateResolutionVersions.V1:
return await v1.resolve_events_with_store( return await v1.resolve_events_with_store(

View File

@ -55,6 +55,7 @@ class SQLBaseStore(metaclass=ABCMeta):
hs: "HomeServer", hs: "HomeServer",
): ):
self.hs = hs self.hs = hs
self.server_name = hs.hostname
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.database_engine = database.engine self.database_engine = database.engine
self.db_pool = database self.db_pool = database

View File

@ -337,6 +337,7 @@ class EventsPersistenceStorageController:
assert stores.persist_events assert stores.persist_events
self.persist_events_store = stores.persist_events self.persist_events_store = stores.persist_events
self.server_name = hs.hostname
self._clock = hs.get_clock() self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name() self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id self.is_mine_id = hs.is_mine_id
@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
state_delta_for_room = None state_delta_for_room = None
if not backfilled: if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"): with Measure(
self._clock,
name="_calculate_state_and_extrem",
server_name=self.server_name,
):
# Work out the new "current state" for the room. # Work out the new "current state" for the room.
# We do this by working out what the new extremities are and then # We do this by working out what the new extremities are and then
# calculating the state from that. # calculating the state from that.
@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
room_id, chunk room_id, chunk
) )
with Measure(self._clock, "calculate_chain_cover_index_for_events"): with Measure(
self._clock,
name="calculate_chain_cover_index_for_events",
server_name=self.server_name,
):
# We now calculate chain ID/sequence numbers for any state events we're # We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room # persisting. We ignore out of band memberships as we're not in the room
# and won't have their auth chain (we'll fix it up later if we join the # and won't have their auth chain (we'll fix it up later if we join the
@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
break break
logger.debug("Calculating state delta for room %s", room_id) logger.debug("Calculating state delta for room %s", room_id)
with Measure(self._clock, "persist_events.get_new_state_after_events"): with Measure(
self._clock,
name="persist_events.get_new_state_after_events",
server_name=self.server_name,
):
res = await self._get_new_state_after_events( res = await self._get_new_state_after_events(
room_id, room_id,
ev_ctx_rm, ev_ctx_rm,
@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
# removed keys entirely. # removed keys entirely.
delta = DeltaState([], delta_ids) delta = DeltaState([], delta_ids)
elif current_state is not None: elif current_state is not None:
with Measure(self._clock, "persist_events.calculate_state_delta"): with Measure(
self._clock,
name="persist_events.calculate_state_delta",
server_name=self.server_name,
):
delta = await self._calculate_state_delta(room_id, current_state) delta = await self._calculate_state_delta(room_id, current_state)
if delta: if delta:

View File

@ -68,6 +68,7 @@ class StateStorageController:
""" """
def __init__(self, hs: "HomeServer", stores: "Databases"): def __init__(self, hs: "HomeServer", stores: "Databases"):
self.server_name = hs.hostname
self._is_mine_id = hs.is_mine_id self._is_mine_id = hs.is_mine_id
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.stores = stores self.stores = stores
@ -812,7 +813,9 @@ class StateStorageController:
state_group = object() state_group = object()
assert state_group is not None assert state_group is not None
with Measure(self._clock, "get_joined_hosts"): with Measure(
self._clock, name="get_joined_hosts", server_name=self.server_name
):
return await self._get_joined_hosts( return await self._get_joined_hosts(
room_id, state_group, state_entry=state_entry room_id, state_group, state_entry=state_entry
) )

View File

@ -1246,7 +1246,9 @@ class EventsWorkerStore(SQLBaseStore):
to event row. Note that it may well contain additional events that to event row. Note that it may well contain additional events that
were not part of this request. were not part of this request.
""" """
with Measure(self._clock, "_fetch_event_list"): with Measure(
self._clock, name="_fetch_event_list", server_name=self.server_name
):
try: try:
events_to_fetch = { events_to_fetch = {
event_id for events, _ in event_list for event_id in events event_id for events, _ in event_list for event_id in events

View File

@ -983,7 +983,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
`_get_user_ids_from_membership_event_ids` for any uncached events. `_get_user_ids_from_membership_event_ids` for any uncached events.
""" """
with Measure(self._clock, "get_joined_user_ids_from_state"): with Measure(
self._clock,
name="get_joined_user_ids_from_state",
server_name=self.server_name,
):
users_in_room = set() users_in_room = set()
member_event_ids = [ member_event_ids = [
e_id for key, e_id in state.items() if key[0] == EventTypes.Member e_id for key, e_id in state.items() if key[0] == EventTypes.Member

View File

@ -41,49 +41,83 @@ from synapse.logging.context import (
LoggingContext, LoggingContext,
current_context, current_context,
) )
from synapse.metrics import InFlightGauge from synapse.metrics import INSTANCE_LABEL_NAME, InFlightGauge
from synapse.util import Clock from synapse.util import Clock
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) # Metrics to see the number of and how much time is spend in various blocks of code.
#
block_counter = Counter(
"synapse_util_metrics_block_count",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""The number of times this block has been called."""
block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) block_timer = Counter(
"synapse_util_metrics_block_time_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
)
"""The cumulative time spent executing this block across all calls, in seconds."""
block_ru_utime = Counter( block_ru_utime = Counter(
"synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"] "synapse_util_metrics_block_ru_utime_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
) )
"""Resource usage: user CPU time in seconds used in this block"""
block_ru_stime = Counter( block_ru_stime = Counter(
"synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"] "synapse_util_metrics_block_ru_stime_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
) )
"""Resource usage: system CPU time in seconds used in this block"""
block_db_txn_count = Counter( block_db_txn_count = Counter(
"synapse_util_metrics_block_db_txn_count", "", ["block_name"] "synapse_util_metrics_block_db_txn_count",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
) )
"""Number of database transactions completed in this block"""
# seconds spent waiting for db txns, excluding scheduling time, in this block # seconds spent waiting for db txns, excluding scheduling time, in this block
block_db_txn_duration = Counter( block_db_txn_duration = Counter(
"synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"] "synapse_util_metrics_block_db_txn_duration_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
) )
"""Seconds spent waiting for database txns, excluding scheduling time, in this block"""
# seconds spent waiting for a db connection, in this block # seconds spent waiting for a db connection, in this block
block_db_sched_duration = Counter( block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"] "synapse_util_metrics_block_db_sched_duration_seconds",
"",
labelnames=["block_name", INSTANCE_LABEL_NAME],
) )
"""Seconds spent waiting for a db connection, in this block"""
# This is dynamically created in InFlightGauge.__init__. # This is dynamically created in InFlightGauge.__init__.
class _InFlightMetric(Protocol): class _BlockInFlightMetric(Protocol):
"""
Sub-metrics used for the `InFlightGauge` for blocks.
"""
real_time_max: float real_time_max: float
"""The longest observed duration of any single execution of this block, in seconds."""
real_time_sum: float real_time_sum: float
"""The cumulative time spent executing this block across all calls, in seconds."""
# Tracks the number of blocks currently active # Tracks the number of blocks currently active
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge( in_flight: InFlightGauge[_BlockInFlightMetric] = InFlightGauge(
"synapse_util_metrics_block_in_flight", "synapse_util_metrics_block_in_flight",
"", "",
labels=["block_name"], labels=["block_name", INSTANCE_LABEL_NAME],
# Matches the fields in the `_BlockInFlightMetric`
sub_metrics=["real_time_max", "real_time_sum"], sub_metrics=["real_time_max", "real_time_sum"],
) )
@ -92,8 +126,16 @@ P = ParamSpec("P")
R = TypeVar("R") R = TypeVar("R")
class HasClock(Protocol): class HasClockAndServerName(Protocol):
clock: Clock clock: Clock
"""
Used to measure functions
"""
server_name: str
"""
The homeserver name that this Measure is associated with (used to label the metric)
(`hs.hostname`).
"""
def measure_func( def measure_func(
@ -101,8 +143,9 @@ def measure_func(
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
"""Decorate an async method with a `Measure` context manager. """Decorate an async method with a `Measure` context manager.
The Measure is created using `self.clock`; it should only be used to decorate The Measure is created using `self.clock` and `self.server_name; it should only be
methods in classes defining an instance-level `clock` attribute. used to decorate methods in classes defining an instance-level `clock` and
`server_name` attributes.
Usage: Usage:
@ -116,16 +159,21 @@ def measure_func(
with Measure(...): with Measure(...):
... ...
Args:
name: The name of the metric to report (the block name) (used to label the
metric). Defaults to the name of the decorated function.
""" """
def wrapper( def wrapper(
func: Callable[Concatenate[HasClock, P], Awaitable[R]], func: Callable[Concatenate[HasClockAndServerName, P], Awaitable[R]],
) -> Callable[P, Awaitable[R]]: ) -> Callable[P, Awaitable[R]]:
block_name = func.__name__ if name is None else name block_name = func.__name__ if name is None else name
@wraps(func) @wraps(func)
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R: async def measured_func(
with Measure(self.clock, block_name): self: HasClockAndServerName, *args: P.args, **kwargs: P.kwargs
) -> R:
with Measure(self.clock, name=block_name, server_name=self.server_name):
r = await func(self, *args, **kwargs) r = await func(self, *args, **kwargs)
return r return r
@ -142,19 +190,24 @@ class Measure:
__slots__ = [ __slots__ = [
"clock", "clock",
"name", "name",
"server_name",
"_logging_context", "_logging_context",
"start", "start",
] ]
def __init__(self, clock: Clock, name: str) -> None: def __init__(self, clock: Clock, *, name: str, server_name: str) -> None:
""" """
Args: Args:
clock: An object with a "time()" method, which returns the current clock: An object with a "time()" method, which returns the current
time in seconds. time in seconds.
name: The name of the metric to report. name: The name of the metric to report (the block name) (used to label the
metric).
server_name: The homeserver name that this Measure is associated with (used to
label the metric) (`hs.hostname`).
""" """
self.clock = clock self.clock = clock
self.name = name self.name = name
self.server_name = server_name
curr_context = current_context() curr_context = current_context()
if not curr_context: if not curr_context:
logger.warning( logger.warning(
@ -174,7 +227,7 @@ class Measure:
self.start = self.clock.time() self.start = self.clock.time()
self._logging_context.__enter__() self._logging_context.__enter__()
in_flight.register((self.name,), self._update_in_flight) in_flight.register((self.name, self.server_name), self._update_in_flight)
logger.debug("Entering block %s", self.name) logger.debug("Entering block %s", self.name)
@ -194,19 +247,20 @@ class Measure:
duration = self.clock.time() - self.start duration = self.clock.time() - self.start
usage = self.get_resource_usage() usage = self.get_resource_usage()
in_flight.unregister((self.name,), self._update_in_flight) in_flight.unregister((self.name, self.server_name), self._update_in_flight)
self._logging_context.__exit__(exc_type, exc_val, exc_tb) self._logging_context.__exit__(exc_type, exc_val, exc_tb)
try: try:
block_counter.labels(self.name).inc() labels = {"block_name": self.name, INSTANCE_LABEL_NAME: self.server_name}
block_timer.labels(self.name).inc(duration) block_counter.labels(**labels).inc()
block_ru_utime.labels(self.name).inc(usage.ru_utime) block_timer.labels(**labels).inc(duration)
block_ru_stime.labels(self.name).inc(usage.ru_stime) block_ru_utime.labels(**labels).inc(usage.ru_utime)
block_db_txn_count.labels(self.name).inc(usage.db_txn_count) block_ru_stime.labels(**labels).inc(usage.ru_stime)
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) block_db_txn_count.labels(**labels).inc(usage.db_txn_count)
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) block_db_txn_duration.labels(**labels).inc(usage.db_txn_duration_sec)
except ValueError: block_db_sched_duration.labels(**labels).inc(usage.db_sched_duration_sec)
logger.warning("Failed to save metrics! Usage: %s", usage) except ValueError as exc:
logger.warning("Failed to save metrics! Usage: %s Error: %s", usage, exc)
def get_resource_usage(self) -> ContextResourceUsage: def get_resource_usage(self) -> ContextResourceUsage:
"""Get the resources used within this Measure block """Get the resources used within this Measure block
@ -215,7 +269,7 @@ class Measure:
""" """
return self._logging_context.get_resource_usage() return self._logging_context.get_resource_usage()
def _update_in_flight(self, metrics: _InFlightMetric) -> None: def _update_in_flight(self, metrics: _BlockInFlightMetric) -> None:
"""Gets called when processing in flight metrics""" """Gets called when processing in flight metrics"""
assert self.start is not None assert self.start is not None
duration = self.clock.time() - self.start duration = self.clock.time() - self.start

View File

@ -86,6 +86,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.mock_federation_client = AsyncMock(spec=["put_json"]) self.mock_federation_client = AsyncMock(spec=["put_json"])
self.mock_federation_client.put_json.return_value = (200, "OK") self.mock_federation_client.put_json.return_value = (200, "OK")
self.mock_federation_client.agent = MatrixFederationAgent( self.mock_federation_client.agent = MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor, reactor,
tls_client_options_factory=None, tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0", user_agent=b"SynapseInTrialTest/0.0.0",

View File

@ -91,6 +91,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
"test_cache", timer=self.reactor.seconds "test_cache", timer=self.reactor.seconds
) )
self.well_known_resolver = WellKnownResolver( self.well_known_resolver = WellKnownResolver(
"OUR_STUB_HOMESERVER_NAME",
self.reactor, self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory), Agent(self.reactor, contextFactory=self.tls_factory),
b"test-agent", b"test-agent",
@ -269,6 +270,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
because it is created too early during setUp because it is created too early during setUp
""" """
return MatrixFederationAgent( return MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor=cast(ISynapseReactor, self.reactor), reactor=cast(ISynapseReactor, self.reactor),
tls_client_options_factory=self.tls_factory, tls_client_options_factory=self.tls_factory,
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided. user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
@ -1011,6 +1013,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
# Build a new agent and WellKnownResolver with a different tls factory # Build a new agent and WellKnownResolver with a different tls factory
tls_factory = FederationPolicyForHTTPS(config) tls_factory = FederationPolicyForHTTPS(config)
agent = MatrixFederationAgent( agent = MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor=self.reactor, reactor=self.reactor,
tls_client_options_factory=tls_factory, tls_client_options_factory=tls_factory,
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below. user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
@ -1018,6 +1021,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
ip_blocklist=IPSet(), ip_blocklist=IPSet(),
_srv_resolver=self.mock_resolver, _srv_resolver=self.mock_resolver,
_well_known_resolver=WellKnownResolver( _well_known_resolver=WellKnownResolver(
"OUR_STUB_HOMESERVER_NAME",
cast(ISynapseReactor, self.reactor), cast(ISynapseReactor, self.reactor),
Agent(self.reactor, contextFactory=tls_factory), Agent(self.reactor, contextFactory=tls_factory),
b"test-agent", b"test-agent",

View File

@ -68,6 +68,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
reactor, _ = get_clock() reactor, _ = get_clock()
self.matrix_federation_agent = MatrixFederationAgent( self.matrix_federation_agent = MatrixFederationAgent(
"OUR_STUB_HOMESERVER_NAME",
reactor, reactor,
tls_client_options_factory=None, tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0", user_agent=b"SynapseInTrialTest/0.0.0",