mirror of
https://github.com/element-hq/synapse.git
synced 2025-07-05 00:00:37 -04:00
Compare commits
12 Commits
67ec14afc9
...
30b91fdd86
Author | SHA1 | Date | |
---|---|---|---|
|
30b91fdd86 | ||
|
e0f8992ee3 | ||
|
06f9af155b | ||
|
5ad555cefc | ||
|
652c34bda6 | ||
|
521c68cafe | ||
|
c232ec7b3b | ||
|
c7d15dbcc7 | ||
|
6731c4bbf0 | ||
|
d05b6ca4c1 | ||
|
65035b6098 | ||
|
02a7668bb2 |
1
changelog.d/18601.misc
Normal file
1
changelog.d/18601.misc
Normal file
@ -0,0 +1 @@
|
||||
Refactor `Measure` block metrics to be homeserver-scoped.
|
@ -156,7 +156,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
|
||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
||||
"""Clear all the queues from before a given position"""
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
with Measure(
|
||||
self.clock, name="send_queue._clear", server_name=self.server_name
|
||||
):
|
||||
# Delete things out of presence maps
|
||||
keys = self.presence_destinations.keys()
|
||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
||||
|
@ -657,7 +657,11 @@ class FederationSender(AbstractFederationSender):
|
||||
logger.debug(
|
||||
"Handling %i events in room %s", len(events), events[0].room_id
|
||||
)
|
||||
with Measure(self.clock, "handle_room_events"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name="handle_room_events",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
for event in events:
|
||||
await handle_event(event)
|
||||
|
||||
|
@ -58,7 +58,7 @@ class TransactionManager:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||
self._server_name = hs.hostname
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self._store = hs.get_datastores().main
|
||||
self._transaction_actions = TransactionActions(self._store)
|
||||
@ -116,7 +116,7 @@ class TransactionManager:
|
||||
transaction = Transaction(
|
||||
origin_server_ts=int(self.clock.time_msec()),
|
||||
transaction_id=txn_id,
|
||||
origin=self._server_name,
|
||||
origin=self.server_name,
|
||||
destination=destination,
|
||||
pdus=[p.get_pdu_json() for p in pdus],
|
||||
edus=[edu.get_dict() for edu in edus],
|
||||
|
@ -73,6 +73,7 @@ events_processed_counter = Counter("synapse_handlers_appservice_events_processed
|
||||
|
||||
class ApplicationServicesHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.store = hs.get_datastores().main
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.appservice_api = hs.get_application_service_api()
|
||||
@ -120,7 +121,9 @@ class ApplicationServicesHandler:
|
||||
|
||||
@wrap_as_background_process("notify_interested_services")
|
||||
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
||||
with Measure(self.clock, "notify_interested_services"):
|
||||
with Measure(
|
||||
self.clock, name="notify_interested_services", server_name=self.server_name
|
||||
):
|
||||
self.is_processing = True
|
||||
try:
|
||||
upper_bound = -1
|
||||
@ -329,7 +332,11 @@ class ApplicationServicesHandler:
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> None:
|
||||
logger.debug("Checking interested services for %s", stream_key)
|
||||
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name="notify_interested_services_ephemeral",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
for service in services:
|
||||
if stream_key == StreamKeyType.TYPING:
|
||||
# Note that we don't persist the token (via set_appservice_stream_type_pos)
|
||||
|
@ -54,6 +54,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class DelayedEventsHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self._store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._config = hs.config
|
||||
@ -159,7 +160,9 @@ class DelayedEventsHandler:
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self._clock, "delayed_events_delta"):
|
||||
with Measure(
|
||||
self._clock, name="delayed_events_delta", server_name=self.server_name
|
||||
):
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
@ -526,6 +526,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self._account_data_handler = hs.get_account_data_handler()
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
@ -1215,7 +1217,8 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
||||
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
|
||||
self.store = hs.get_datastores().main
|
||||
self.federation = hs.get_federation_client()
|
||||
self.clock = hs.get_clock()
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.device_handler = device_handler
|
||||
self._notifier = hs.get_notifier()
|
||||
|
||||
|
@ -476,16 +476,16 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
|
||||
class EventCreationHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.validator = EventValidator()
|
||||
self.event_builder_factory = hs.get_event_builder_factory()
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.auth_blocking = hs.get_auth_blocking()
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.state = hs.get_state_handler()
|
||||
self.clock = hs.get_clock()
|
||||
self.validator = EventValidator()
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
self.event_builder_factory = hs.get_event_builder_factory()
|
||||
self.server_name = hs.hostname
|
||||
self.notifier = hs.get_notifier()
|
||||
self.config = hs.config
|
||||
self.require_membership_for_aliases = (
|
||||
|
@ -747,6 +747,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
class PresenceHandler(BasePresenceHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.server_name = hs.hostname
|
||||
self.wheel_timer: WheelTimer[str] = WheelTimer()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
@ -941,7 +942,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
with Measure(self.clock, "presence_update_states"):
|
||||
with Measure(
|
||||
self.clock, name="presence_update_states", server_name=self.server_name
|
||||
):
|
||||
# NOTE: We purposefully don't await between now and when we've
|
||||
# calculated what we want to do with the new states, to avoid races.
|
||||
|
||||
@ -1497,7 +1500,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "presence_delta"):
|
||||
with Measure(
|
||||
self.clock, name="presence_delta", server_name=self.server_name
|
||||
):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
@ -1759,6 +1764,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
# Same with get_presence_router:
|
||||
#
|
||||
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
|
||||
self.server_name = hs.hostname
|
||||
self.get_presence_handler = hs.get_presence_handler
|
||||
self.get_presence_router = hs.get_presence_router
|
||||
self.clock = hs.get_clock()
|
||||
@ -1792,7 +1798,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
user_id = user.to_string()
|
||||
stream_change_cache = self.store.presence_stream_cache
|
||||
|
||||
with Measure(self.clock, "presence.get_new_events"):
|
||||
with Measure(
|
||||
self.clock, name="presence.get_new_events", server_name=self.server_name
|
||||
):
|
||||
if from_key is not None:
|
||||
from_key = int(from_key)
|
||||
|
||||
|
@ -329,6 +329,7 @@ class E2eeSyncResult:
|
||||
|
||||
class SyncHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.hs_config = hs.config
|
||||
self.store = hs.get_datastores().main
|
||||
self.notifier = hs.get_notifier()
|
||||
@ -710,7 +711,9 @@ class SyncHandler:
|
||||
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
with Measure(self.clock, "ephemeral_by_room"):
|
||||
with Measure(
|
||||
self.clock, name="ephemeral_by_room", server_name=self.server_name
|
||||
):
|
||||
typing_key = since_token.typing_key if since_token else 0
|
||||
|
||||
room_ids = sync_result_builder.joined_room_ids
|
||||
@ -783,7 +786,9 @@ class SyncHandler:
|
||||
and current token to send down to clients.
|
||||
newly_joined_room
|
||||
"""
|
||||
with Measure(self.clock, "load_filtered_recents"):
|
||||
with Measure(
|
||||
self.clock, name="load_filtered_recents", server_name=self.server_name
|
||||
):
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
block_all_timeline = (
|
||||
sync_config.filter_collection.blocks_all_room_timeline()
|
||||
@ -1174,7 +1179,9 @@ class SyncHandler:
|
||||
# updates even if they occurred logically before the previous event.
|
||||
# TODO(mjark) Check for new redactions in the state events.
|
||||
|
||||
with Measure(self.clock, "compute_state_delta"):
|
||||
with Measure(
|
||||
self.clock, name="compute_state_delta", server_name=self.server_name
|
||||
):
|
||||
# The memberships needed for events in the timeline.
|
||||
# Only calculated when `lazy_load_members` is on.
|
||||
members_to_fetch: Optional[Set[str]] = None
|
||||
@ -1791,7 +1798,9 @@ class SyncHandler:
|
||||
# the DB.
|
||||
return RoomNotifCounts.empty()
|
||||
|
||||
with Measure(self.clock, "unread_notifs_for_room_id"):
|
||||
with Measure(
|
||||
self.clock, name="unread_notifs_for_room_id", server_name=self.server_name
|
||||
):
|
||||
return await self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
room_id,
|
||||
sync_config.user.to_string(),
|
||||
|
@ -503,6 +503,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self._main_store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
# We can't call get_typing_handler here because there's a cycle:
|
||||
@ -535,7 +536,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
appservice may be interested in.
|
||||
* The latest known room serial.
|
||||
"""
|
||||
with Measure(self.clock, "typing.get_new_events_as"):
|
||||
with Measure(
|
||||
self.clock, name="typing.get_new_events_as", server_name=self.server_name
|
||||
):
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
events = []
|
||||
@ -571,7 +574,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
with Measure(
|
||||
self.clock, name="typing.get_new_events", server_name=self.server_name
|
||||
):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
|
@ -237,7 +237,9 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "user_dir_delta"):
|
||||
with Measure(
|
||||
self.clock, name="user_dir_delta", server_name=self.server_name
|
||||
):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self.pos == room_max_stream_ordering:
|
||||
return
|
||||
|
@ -92,6 +92,7 @@ class MatrixFederationAgent:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_name: str,
|
||||
reactor: ISynapseReactor,
|
||||
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
||||
user_agent: bytes,
|
||||
@ -100,6 +101,11 @@ class MatrixFederationAgent:
|
||||
_srv_resolver: Optional[SrvResolver] = None,
|
||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
|
||||
"""
|
||||
|
||||
# proxy_reactor is not blocklisting reactor
|
||||
proxy_reactor = reactor
|
||||
|
||||
@ -127,6 +133,7 @@ class MatrixFederationAgent:
|
||||
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
server_name,
|
||||
reactor,
|
||||
agent=BlocklistingAgentWrapper(
|
||||
ProxyAgent(
|
||||
|
@ -91,12 +91,19 @@ class WellKnownResolver:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_name: str,
|
||||
reactor: IReactorTime,
|
||||
agent: IAgent,
|
||||
user_agent: bytes,
|
||||
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
|
||||
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
|
||||
"""
|
||||
|
||||
self.server_name = server_name
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
||||
@ -134,7 +141,13 @@ class WellKnownResolver:
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
try:
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="get_well_known",
|
||||
# This should be our homeserver where the the code is running (used to
|
||||
# label metrics)
|
||||
server_name=self.server_name,
|
||||
):
|
||||
result: Optional[bytes]
|
||||
cache_period: float
|
||||
|
||||
|
@ -417,6 +417,7 @@ class MatrixFederationHttpClient:
|
||||
if hs.get_instance_name() in outbound_federation_restricted_to:
|
||||
# Talk to federation directly
|
||||
federation_agent: IAgent = MatrixFederationAgent(
|
||||
self.server_name,
|
||||
self.reactor,
|
||||
tls_client_options_factory,
|
||||
user_agent.encode("ascii"),
|
||||
@ -697,7 +698,11 @@ class MatrixFederationHttpClient:
|
||||
outgoing_requests_counter.labels(request.method).inc()
|
||||
|
||||
try:
|
||||
with Measure(self.clock, "outbound_request"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name="outbound_request",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
# we don't want all the fancy cookie and redirect handling
|
||||
# that treq.request gives: just use the raw Agent.
|
||||
|
||||
|
@ -66,6 +66,19 @@ all_gauges: Dict[str, Collector] = {}
|
||||
|
||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||
|
||||
INSTANCE_LABEL_NAME = "instance"
|
||||
"""
|
||||
The standard Prometheus label name used to identify which server instance the metrics
|
||||
came from.
|
||||
|
||||
In the case of a Synapse homeserver, this should be set to the homeserver name
|
||||
(`hs.hostname`).
|
||||
|
||||
Normally, this would be set automatically by the Prometheus server scraping the data but
|
||||
since we support multiple instances of Synapse running in the same process and all
|
||||
metrics are in a single global `REGISTRY`, we need to manually label any metrics.
|
||||
"""
|
||||
|
||||
|
||||
class _RegistryProxy:
|
||||
@staticmethod
|
||||
@ -192,7 +205,16 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
same key.
|
||||
|
||||
Note that `callback` may be called on a separate thread.
|
||||
|
||||
Args:
|
||||
key: A tuple of label values, which must match the order of the
|
||||
`labels` given to the constructor.
|
||||
callback
|
||||
"""
|
||||
assert len(key) == len(self.labels), (
|
||||
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._registrations.setdefault(key, set()).add(callback)
|
||||
|
||||
@ -201,7 +223,17 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
key: Tuple[str, ...],
|
||||
callback: Callable[[MetricsEntry], None],
|
||||
) -> None:
|
||||
"""Registers that we've exited a block with labels `key`."""
|
||||
"""
|
||||
Registers that we've exited a block with labels `key`.
|
||||
|
||||
Args:
|
||||
key: A tuple of label values, which must match the order of the
|
||||
`labels` given to the constructor.
|
||||
callback
|
||||
"""
|
||||
assert len(key) == len(self.labels), (
|
||||
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._registrations.setdefault(key, set()).discard(callback)
|
||||
@ -225,7 +257,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
with self._lock:
|
||||
callbacks = set(self._registrations[key])
|
||||
|
||||
in_flight.add_metric(key, len(callbacks))
|
||||
in_flight.add_metric(labels=key, value=len(callbacks))
|
||||
|
||||
metrics = self._metrics_class()
|
||||
metrics_by_key[key] = metrics
|
||||
@ -239,7 +271,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
"_".join([self.name, name]), "", labels=self.labels
|
||||
)
|
||||
for key, metrics in metrics_by_key.items():
|
||||
gauge.add_metric(key, getattr(metrics, name))
|
||||
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
|
@ -31,6 +31,7 @@ IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK = Callable[[str, int], Awaitabl
|
||||
|
||||
class MediaRepositoryModuleApiCallbacks:
|
||||
def __init__(self, hs: "HomeServer") -> None:
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self._get_media_config_for_user_callbacks: List[
|
||||
GET_MEDIA_CONFIG_FOR_USER_CALLBACK
|
||||
@ -57,7 +58,11 @@ class MediaRepositoryModuleApiCallbacks:
|
||||
|
||||
async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]:
|
||||
for callback in self._get_media_config_for_user_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res: Optional[JsonDict] = await delay_cancellation(callback(user_id))
|
||||
if res:
|
||||
return res
|
||||
@ -68,7 +73,11 @@ class MediaRepositoryModuleApiCallbacks:
|
||||
self, user_id: str, size: int
|
||||
) -> bool:
|
||||
for callback in self._is_user_allowed_to_upload_media_of_size_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res: bool = await delay_cancellation(callback(user_id, size))
|
||||
if not res:
|
||||
return res
|
||||
|
@ -43,6 +43,7 @@ GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK = Callable[
|
||||
|
||||
class RatelimitModuleApiCallbacks:
|
||||
def __init__(self, hs: "HomeServer") -> None:
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self._get_ratelimit_override_for_user_callbacks: List[
|
||||
GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK
|
||||
@ -64,7 +65,11 @@ class RatelimitModuleApiCallbacks:
|
||||
self, user_id: str, limiter_name: str
|
||||
) -> Optional[RatelimitOverride]:
|
||||
for callback in self._get_ratelimit_override_for_user_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res: Optional[RatelimitOverride] = await delay_cancellation(
|
||||
callback(user_id, limiter_name)
|
||||
)
|
||||
|
@ -356,6 +356,7 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM"
|
||||
|
||||
def __init__(self, hs: "synapse.server.HomeServer") -> None:
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
|
||||
@ -490,7 +491,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
generally discouraged as it doesn't support internationalization.
|
||||
"""
|
||||
for callback in self._check_event_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(callback(event))
|
||||
if res is False or res == self.NOT_SPAM:
|
||||
# This spam-checker accepts the event.
|
||||
@ -543,7 +548,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
True if the event should be silently dropped
|
||||
"""
|
||||
for callback in self._should_drop_federated_event_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res: Union[bool, str] = await delay_cancellation(callback(event))
|
||||
if res:
|
||||
return res
|
||||
@ -565,7 +574,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
|
||||
"""
|
||||
for callback in self._user_may_join_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(callback(user_id, room_id, is_invited))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
@ -604,7 +617,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._user_may_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(inviter_userid, invitee_userid, room_id)
|
||||
)
|
||||
@ -643,7 +660,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._federated_user_may_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(callback(event))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
@ -686,7 +707,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._user_may_send_3pid_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(inviter_userid, medium, address, room_id)
|
||||
)
|
||||
@ -722,7 +747,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
room_config: The room creation configuration which is the body of the /createRoom request
|
||||
"""
|
||||
for callback in self._user_may_create_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
checker_args = inspect.signature(callback)
|
||||
# Also ensure backwards compatibility with spam checker callbacks
|
||||
# that don't expect the room_config argument.
|
||||
@ -786,7 +815,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
content: The content of the state event
|
||||
"""
|
||||
for callback in self._user_may_send_state_event_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
# We make a copy of the content to ensure that the spam checker cannot modify it.
|
||||
res = await delay_cancellation(
|
||||
callback(user_id, room_id, event_type, state_key, deepcopy(content))
|
||||
@ -814,7 +847,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
|
||||
"""
|
||||
for callback in self._user_may_create_room_alias_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(callback(userid, room_alias))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@ -847,7 +884,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
room_id: The ID of the room that would be published
|
||||
"""
|
||||
for callback in self._user_may_publish_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(callback(userid, room_id))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@ -889,7 +930,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
True if the user is spammy.
|
||||
"""
|
||||
for callback in self._check_username_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
checker_args = inspect.signature(callback)
|
||||
# Make a copy of the user profile object to ensure the spam checker cannot
|
||||
# modify it.
|
||||
@ -938,7 +983,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_registration_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
behaviour = await delay_cancellation(
|
||||
callback(email_threepid, username, request_info, auth_provider_id)
|
||||
)
|
||||
@ -980,7 +1029,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_media_file_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(callback(file_wrapper, file_info))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is False or res is self.NOT_SPAM:
|
||||
@ -1027,7 +1080,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_login_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(
|
||||
user_id,
|
||||
|
@ -129,7 +129,8 @@ class BulkPushRuleEvaluator:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self.should_calculate_push_rules = self.hs.config.push.enable_push
|
||||
|
||||
|
@ -76,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_fed_send_events_parse"):
|
||||
with Measure(
|
||||
self.clock, name="repl_fed_send_events_parse", server_name=self.server_name
|
||||
):
|
||||
room_id = content["room_id"]
|
||||
backfilled = content["backfilled"]
|
||||
|
||||
|
@ -76,6 +76,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict, event_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_event_parse"):
|
||||
with Measure(
|
||||
self.clock, name="repl_send_event_parse", server_name=self.server_name
|
||||
):
|
||||
event_dict = content["event"]
|
||||
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
|
||||
internal_metadata = content["internal_metadata"]
|
||||
|
@ -77,6 +77,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, payload: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_events_parse"):
|
||||
with Measure(
|
||||
self.clock, name="repl_send_events_parse", server_name=self.server_name
|
||||
):
|
||||
events_and_context = []
|
||||
events = payload["events"]
|
||||
rooms = set()
|
||||
|
@ -75,6 +75,7 @@ class ReplicationDataHandler:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.store = hs.get_datastores().main
|
||||
self.notifier = hs.get_notifier()
|
||||
self._reactor = hs.get_reactor()
|
||||
@ -342,7 +343,11 @@ class ReplicationDataHandler:
|
||||
waiting_list.add((position, deferred))
|
||||
|
||||
# We measure here to get in flight counts and average waiting time.
|
||||
with Measure(self._clock, "repl.wait_for_stream_position"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="repl.wait_for_stream_position",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
logger.info(
|
||||
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
|
||||
stream_name,
|
||||
|
@ -78,6 +78,7 @@ class ReplicationStreamer:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
@ -155,7 +156,11 @@ class ReplicationStreamer:
|
||||
while self.pending_updates:
|
||||
self.pending_updates = False
|
||||
|
||||
with Measure(self.clock, "repl.stream.get_updates"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
name="repl.stream.get_updates",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
all_streams = self.streams
|
||||
|
||||
if self._replication_torture_level is not None:
|
||||
|
@ -189,7 +189,8 @@ class StateHandler:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.store = hs.get_datastores().main
|
||||
self._state_storage_controller = hs.get_storage_controllers().state
|
||||
self.hs = hs
|
||||
@ -631,6 +632,7 @@ class StateResolutionHandler:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
||||
@ -747,7 +749,9 @@ class StateResolutionHandler:
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
|
||||
with Measure(self.clock, "state.create_group_ids"):
|
||||
with Measure(
|
||||
self.clock, name="state.create_group_ids", server_name=self.server_name
|
||||
):
|
||||
cache = _make_state_cache_entry(new_state, state_groups_ids)
|
||||
|
||||
self._state_cache[group_names] = cache
|
||||
@ -785,7 +789,9 @@ class StateResolutionHandler:
|
||||
a map from (type, state_key) to event_id.
|
||||
"""
|
||||
try:
|
||||
with Measure(self.clock, "state._resolve_events") as m:
|
||||
with Measure(
|
||||
self.clock, name="state._resolve_events", server_name=self.server_name
|
||||
) as m:
|
||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||
if room_version_obj.state_res == StateResolutionVersions.V1:
|
||||
return await v1.resolve_events_with_store(
|
||||
|
@ -55,6 +55,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
hs: "HomeServer",
|
||||
):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self._clock = hs.get_clock()
|
||||
self.database_engine = database.engine
|
||||
self.db_pool = database
|
||||
|
@ -337,6 +337,7 @@ class EventsPersistenceStorageController:
|
||||
assert stores.persist_events
|
||||
self.persist_events_store = stores.persist_events
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self._clock = hs.get_clock()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
|
||||
state_delta_for_room = None
|
||||
|
||||
if not backfilled:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="_calculate_state_and_extrem",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
# Work out the new "current state" for the room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
# calculating the state from that.
|
||||
@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
|
||||
room_id, chunk
|
||||
)
|
||||
|
||||
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="calculate_chain_cover_index_for_events",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
# We now calculate chain ID/sequence numbers for any state events we're
|
||||
# persisting. We ignore out of band memberships as we're not in the room
|
||||
# and won't have their auth chain (we'll fix it up later if we join the
|
||||
@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
|
||||
break
|
||||
|
||||
logger.debug("Calculating state delta for room %s", room_id)
|
||||
with Measure(self._clock, "persist_events.get_new_state_after_events"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="persist_events.get_new_state_after_events",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
res = await self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm,
|
||||
@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
|
||||
# removed keys entirely.
|
||||
delta = DeltaState([], delta_ids)
|
||||
elif current_state is not None:
|
||||
with Measure(self._clock, "persist_events.calculate_state_delta"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="persist_events.calculate_state_delta",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
delta = await self._calculate_state_delta(room_id, current_state)
|
||||
|
||||
if delta:
|
||||
|
@ -68,6 +68,7 @@ class StateStorageController:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer", stores: "Databases"):
|
||||
self.server_name = hs.hostname
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self._clock = hs.get_clock()
|
||||
self.stores = stores
|
||||
@ -812,7 +813,9 @@ class StateStorageController:
|
||||
state_group = object()
|
||||
|
||||
assert state_group is not None
|
||||
with Measure(self._clock, "get_joined_hosts"):
|
||||
with Measure(
|
||||
self._clock, name="get_joined_hosts", server_name=self.server_name
|
||||
):
|
||||
return await self._get_joined_hosts(
|
||||
room_id, state_group, state_entry=state_entry
|
||||
)
|
||||
|
@ -1246,7 +1246,9 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
to event row. Note that it may well contain additional events that
|
||||
were not part of this request.
|
||||
"""
|
||||
with Measure(self._clock, "_fetch_event_list"):
|
||||
with Measure(
|
||||
self._clock, name="_fetch_event_list", server_name=self.server_name
|
||||
):
|
||||
try:
|
||||
events_to_fetch = {
|
||||
event_id for events, _ in event_list for event_id in events
|
||||
|
@ -983,7 +983,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
`_get_user_ids_from_membership_event_ids` for any uncached events.
|
||||
"""
|
||||
|
||||
with Measure(self._clock, "get_joined_user_ids_from_state"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
name="get_joined_user_ids_from_state",
|
||||
server_name=self.server_name,
|
||||
):
|
||||
users_in_room = set()
|
||||
member_event_ids = [
|
||||
e_id for key, e_id in state.items() if key[0] == EventTypes.Member
|
||||
|
@ -41,49 +41,83 @@ from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
current_context,
|
||||
)
|
||||
from synapse.metrics import InFlightGauge
|
||||
from synapse.metrics import INSTANCE_LABEL_NAME, InFlightGauge
|
||||
from synapse.util import Clock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"])
|
||||
# Metrics to see the number of and how much time is spend in various blocks of code.
|
||||
#
|
||||
block_counter = Counter(
|
||||
"synapse_util_metrics_block_count",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""The number of times this block has been called."""
|
||||
|
||||
block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"])
|
||||
block_timer = Counter(
|
||||
"synapse_util_metrics_block_time_seconds",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""The cumulative time spent executing this block across all calls, in seconds."""
|
||||
|
||||
block_ru_utime = Counter(
|
||||
"synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]
|
||||
"synapse_util_metrics_block_ru_utime_seconds",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""Resource usage: user CPU time in seconds used in this block"""
|
||||
|
||||
block_ru_stime = Counter(
|
||||
"synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]
|
||||
"synapse_util_metrics_block_ru_stime_seconds",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""Resource usage: system CPU time in seconds used in this block"""
|
||||
|
||||
block_db_txn_count = Counter(
|
||||
"synapse_util_metrics_block_db_txn_count", "", ["block_name"]
|
||||
"synapse_util_metrics_block_db_txn_count",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""Number of database transactions completed in this block"""
|
||||
|
||||
# seconds spent waiting for db txns, excluding scheduling time, in this block
|
||||
block_db_txn_duration = Counter(
|
||||
"synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]
|
||||
"synapse_util_metrics_block_db_txn_duration_seconds",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""Seconds spent waiting for database txns, excluding scheduling time, in this block"""
|
||||
|
||||
# seconds spent waiting for a db connection, in this block
|
||||
block_db_sched_duration = Counter(
|
||||
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]
|
||||
"synapse_util_metrics_block_db_sched_duration_seconds",
|
||||
"",
|
||||
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||
)
|
||||
"""Seconds spent waiting for a db connection, in this block"""
|
||||
|
||||
|
||||
# This is dynamically created in InFlightGauge.__init__.
|
||||
class _InFlightMetric(Protocol):
|
||||
class _BlockInFlightMetric(Protocol):
|
||||
"""
|
||||
Sub-metrics used for the `InFlightGauge` for blocks.
|
||||
"""
|
||||
|
||||
real_time_max: float
|
||||
"""The longest observed duration of any single execution of this block, in seconds."""
|
||||
real_time_sum: float
|
||||
"""The cumulative time spent executing this block across all calls, in seconds."""
|
||||
|
||||
|
||||
# Tracks the number of blocks currently active
|
||||
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
|
||||
in_flight: InFlightGauge[_BlockInFlightMetric] = InFlightGauge(
|
||||
"synapse_util_metrics_block_in_flight",
|
||||
"",
|
||||
labels=["block_name"],
|
||||
labels=["block_name", INSTANCE_LABEL_NAME],
|
||||
# Matches the fields in the `_BlockInFlightMetric`
|
||||
sub_metrics=["real_time_max", "real_time_sum"],
|
||||
)
|
||||
|
||||
@ -92,8 +126,16 @@ P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class HasClock(Protocol):
|
||||
class HasClockAndServerName(Protocol):
|
||||
clock: Clock
|
||||
"""
|
||||
Used to measure functions
|
||||
"""
|
||||
server_name: str
|
||||
"""
|
||||
The homeserver name that this Measure is associated with (used to label the metric)
|
||||
(`hs.hostname`).
|
||||
"""
|
||||
|
||||
|
||||
def measure_func(
|
||||
@ -101,8 +143,9 @@ def measure_func(
|
||||
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
||||
"""Decorate an async method with a `Measure` context manager.
|
||||
|
||||
The Measure is created using `self.clock`; it should only be used to decorate
|
||||
methods in classes defining an instance-level `clock` attribute.
|
||||
The Measure is created using `self.clock` and `self.server_name; it should only be
|
||||
used to decorate methods in classes defining an instance-level `clock` and
|
||||
`server_name` attributes.
|
||||
|
||||
Usage:
|
||||
|
||||
@ -116,16 +159,21 @@ def measure_func(
|
||||
with Measure(...):
|
||||
...
|
||||
|
||||
Args:
|
||||
name: The name of the metric to report (the block name) (used to label the
|
||||
metric). Defaults to the name of the decorated function.
|
||||
"""
|
||||
|
||||
def wrapper(
|
||||
func: Callable[Concatenate[HasClock, P], Awaitable[R]],
|
||||
func: Callable[Concatenate[HasClockAndServerName, P], Awaitable[R]],
|
||||
) -> Callable[P, Awaitable[R]]:
|
||||
block_name = func.__name__ if name is None else name
|
||||
|
||||
@wraps(func)
|
||||
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
|
||||
with Measure(self.clock, block_name):
|
||||
async def measured_func(
|
||||
self: HasClockAndServerName, *args: P.args, **kwargs: P.kwargs
|
||||
) -> R:
|
||||
with Measure(self.clock, name=block_name, server_name=self.server_name):
|
||||
r = await func(self, *args, **kwargs)
|
||||
return r
|
||||
|
||||
@ -142,19 +190,24 @@ class Measure:
|
||||
__slots__ = [
|
||||
"clock",
|
||||
"name",
|
||||
"server_name",
|
||||
"_logging_context",
|
||||
"start",
|
||||
]
|
||||
|
||||
def __init__(self, clock: Clock, name: str) -> None:
|
||||
def __init__(self, clock: Clock, *, name: str, server_name: str) -> None:
|
||||
"""
|
||||
Args:
|
||||
clock: An object with a "time()" method, which returns the current
|
||||
time in seconds.
|
||||
name: The name of the metric to report.
|
||||
name: The name of the metric to report (the block name) (used to label the
|
||||
metric).
|
||||
server_name: The homeserver name that this Measure is associated with (used to
|
||||
label the metric) (`hs.hostname`).
|
||||
"""
|
||||
self.clock = clock
|
||||
self.name = name
|
||||
self.server_name = server_name
|
||||
curr_context = current_context()
|
||||
if not curr_context:
|
||||
logger.warning(
|
||||
@ -174,7 +227,7 @@ class Measure:
|
||||
|
||||
self.start = self.clock.time()
|
||||
self._logging_context.__enter__()
|
||||
in_flight.register((self.name,), self._update_in_flight)
|
||||
in_flight.register((self.name, self.server_name), self._update_in_flight)
|
||||
|
||||
logger.debug("Entering block %s", self.name)
|
||||
|
||||
@ -194,19 +247,20 @@ class Measure:
|
||||
duration = self.clock.time() - self.start
|
||||
usage = self.get_resource_usage()
|
||||
|
||||
in_flight.unregister((self.name,), self._update_in_flight)
|
||||
in_flight.unregister((self.name, self.server_name), self._update_in_flight)
|
||||
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
try:
|
||||
block_counter.labels(self.name).inc()
|
||||
block_timer.labels(self.name).inc(duration)
|
||||
block_ru_utime.labels(self.name).inc(usage.ru_utime)
|
||||
block_ru_stime.labels(self.name).inc(usage.ru_stime)
|
||||
block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
|
||||
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
|
||||
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
|
||||
except ValueError:
|
||||
logger.warning("Failed to save metrics! Usage: %s", usage)
|
||||
labels = {"block_name": self.name, INSTANCE_LABEL_NAME: self.server_name}
|
||||
block_counter.labels(**labels).inc()
|
||||
block_timer.labels(**labels).inc(duration)
|
||||
block_ru_utime.labels(**labels).inc(usage.ru_utime)
|
||||
block_ru_stime.labels(**labels).inc(usage.ru_stime)
|
||||
block_db_txn_count.labels(**labels).inc(usage.db_txn_count)
|
||||
block_db_txn_duration.labels(**labels).inc(usage.db_txn_duration_sec)
|
||||
block_db_sched_duration.labels(**labels).inc(usage.db_sched_duration_sec)
|
||||
except ValueError as exc:
|
||||
logger.warning("Failed to save metrics! Usage: %s Error: %s", usage, exc)
|
||||
|
||||
def get_resource_usage(self) -> ContextResourceUsage:
|
||||
"""Get the resources used within this Measure block
|
||||
@ -215,7 +269,7 @@ class Measure:
|
||||
"""
|
||||
return self._logging_context.get_resource_usage()
|
||||
|
||||
def _update_in_flight(self, metrics: _InFlightMetric) -> None:
|
||||
def _update_in_flight(self, metrics: _BlockInFlightMetric) -> None:
|
||||
"""Gets called when processing in flight metrics"""
|
||||
assert self.start is not None
|
||||
duration = self.clock.time() - self.start
|
||||
|
@ -86,6 +86,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||
self.mock_federation_client = AsyncMock(spec=["put_json"])
|
||||
self.mock_federation_client.put_json.return_value = (200, "OK")
|
||||
self.mock_federation_client.agent = MatrixFederationAgent(
|
||||
"OUR_STUB_HOMESERVER_NAME",
|
||||
reactor,
|
||||
tls_client_options_factory=None,
|
||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||
|
@ -91,6 +91,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
"test_cache", timer=self.reactor.seconds
|
||||
)
|
||||
self.well_known_resolver = WellKnownResolver(
|
||||
"OUR_STUB_HOMESERVER_NAME",
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
b"test-agent",
|
||||
@ -269,6 +270,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
because it is created too early during setUp
|
||||
"""
|
||||
return MatrixFederationAgent(
|
||||
"OUR_STUB_HOMESERVER_NAME",
|
||||
reactor=cast(ISynapseReactor, self.reactor),
|
||||
tls_client_options_factory=self.tls_factory,
|
||||
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
|
||||
@ -1011,6 +1013,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
# Build a new agent and WellKnownResolver with a different tls factory
|
||||
tls_factory = FederationPolicyForHTTPS(config)
|
||||
agent = MatrixFederationAgent(
|
||||
"OUR_STUB_HOMESERVER_NAME",
|
||||
reactor=self.reactor,
|
||||
tls_client_options_factory=tls_factory,
|
||||
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
|
||||
@ -1018,6 +1021,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
ip_blocklist=IPSet(),
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_resolver=WellKnownResolver(
|
||||
"OUR_STUB_HOMESERVER_NAME",
|
||||
cast(ISynapseReactor, self.reactor),
|
||||
Agent(self.reactor, contextFactory=tls_factory),
|
||||
b"test-agent",
|
||||
|
@ -68,6 +68,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
||||
|
||||
reactor, _ = get_clock()
|
||||
self.matrix_federation_agent = MatrixFederationAgent(
|
||||
"OUR_STUB_HOMESERVER_NAME",
|
||||
reactor,
|
||||
tls_client_options_factory=None,
|
||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||
|
Loading…
x
Reference in New Issue
Block a user