mirror of
https://github.com/element-hq/synapse.git
synced 2025-07-05 00:00:37 -04:00
Compare commits
12 Commits
67ec14afc9
...
30b91fdd86
Author | SHA1 | Date | |
---|---|---|---|
|
30b91fdd86 | ||
|
e0f8992ee3 | ||
|
06f9af155b | ||
|
5ad555cefc | ||
|
652c34bda6 | ||
|
521c68cafe | ||
|
c232ec7b3b | ||
|
c7d15dbcc7 | ||
|
6731c4bbf0 | ||
|
d05b6ca4c1 | ||
|
65035b6098 | ||
|
02a7668bb2 |
1
changelog.d/18601.misc
Normal file
1
changelog.d/18601.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Refactor `Measure` block metrics to be homeserver-scoped.
|
@ -156,7 +156,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
|||||||
|
|
||||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
||||||
"""Clear all the queues from before a given position"""
|
"""Clear all the queues from before a given position"""
|
||||||
with Measure(self.clock, "send_queue._clear"):
|
with Measure(
|
||||||
|
self.clock, name="send_queue._clear", server_name=self.server_name
|
||||||
|
):
|
||||||
# Delete things out of presence maps
|
# Delete things out of presence maps
|
||||||
keys = self.presence_destinations.keys()
|
keys = self.presence_destinations.keys()
|
||||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
i = self.presence_destinations.bisect_left(position_to_delete)
|
||||||
|
@ -657,7 +657,11 @@ class FederationSender(AbstractFederationSender):
|
|||||||
logger.debug(
|
logger.debug(
|
||||||
"Handling %i events in room %s", len(events), events[0].room_id
|
"Handling %i events in room %s", len(events), events[0].room_id
|
||||||
)
|
)
|
||||||
with Measure(self.clock, "handle_room_events"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name="handle_room_events",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
for event in events:
|
for event in events:
|
||||||
await handle_event(event)
|
await handle_event(event)
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ class TransactionManager:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||||
self._server_name = hs.hostname
|
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||||
self._store = hs.get_datastores().main
|
self._store = hs.get_datastores().main
|
||||||
self._transaction_actions = TransactionActions(self._store)
|
self._transaction_actions = TransactionActions(self._store)
|
||||||
@ -116,7 +116,7 @@ class TransactionManager:
|
|||||||
transaction = Transaction(
|
transaction = Transaction(
|
||||||
origin_server_ts=int(self.clock.time_msec()),
|
origin_server_ts=int(self.clock.time_msec()),
|
||||||
transaction_id=txn_id,
|
transaction_id=txn_id,
|
||||||
origin=self._server_name,
|
origin=self.server_name,
|
||||||
destination=destination,
|
destination=destination,
|
||||||
pdus=[p.get_pdu_json() for p in pdus],
|
pdus=[p.get_pdu_json() for p in pdus],
|
||||||
edus=[edu.get_dict() for edu in edus],
|
edus=[edu.get_dict() for edu in edus],
|
||||||
|
@ -73,6 +73,7 @@ events_processed_counter = Counter("synapse_handlers_appservice_events_processed
|
|||||||
|
|
||||||
class ApplicationServicesHandler:
|
class ApplicationServicesHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
self.appservice_api = hs.get_application_service_api()
|
self.appservice_api = hs.get_application_service_api()
|
||||||
@ -120,7 +121,9 @@ class ApplicationServicesHandler:
|
|||||||
|
|
||||||
@wrap_as_background_process("notify_interested_services")
|
@wrap_as_background_process("notify_interested_services")
|
||||||
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
||||||
with Measure(self.clock, "notify_interested_services"):
|
with Measure(
|
||||||
|
self.clock, name="notify_interested_services", server_name=self.server_name
|
||||||
|
):
|
||||||
self.is_processing = True
|
self.is_processing = True
|
||||||
try:
|
try:
|
||||||
upper_bound = -1
|
upper_bound = -1
|
||||||
@ -329,7 +332,11 @@ class ApplicationServicesHandler:
|
|||||||
users: Collection[Union[str, UserID]],
|
users: Collection[Union[str, UserID]],
|
||||||
) -> None:
|
) -> None:
|
||||||
logger.debug("Checking interested services for %s", stream_key)
|
logger.debug("Checking interested services for %s", stream_key)
|
||||||
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name="notify_interested_services_ephemeral",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
for service in services:
|
for service in services:
|
||||||
if stream_key == StreamKeyType.TYPING:
|
if stream_key == StreamKeyType.TYPING:
|
||||||
# Note that we don't persist the token (via set_appservice_stream_type_pos)
|
# Note that we don't persist the token (via set_appservice_stream_type_pos)
|
||||||
|
@ -54,6 +54,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
class DelayedEventsHandler:
|
class DelayedEventsHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self._store = hs.get_datastores().main
|
self._store = hs.get_datastores().main
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
self._config = hs.config
|
self._config = hs.config
|
||||||
@ -159,7 +160,9 @@ class DelayedEventsHandler:
|
|||||||
|
|
||||||
# Loop round handling deltas until we're up to date
|
# Loop round handling deltas until we're up to date
|
||||||
while True:
|
while True:
|
||||||
with Measure(self._clock, "delayed_events_delta"):
|
with Measure(
|
||||||
|
self._clock, name="delayed_events_delta", server_name=self.server_name
|
||||||
|
):
|
||||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||||
if self._event_pos == room_max_stream_ordering:
|
if self._event_pos == room_max_stream_ordering:
|
||||||
return
|
return
|
||||||
|
@ -526,6 +526,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
|
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||||
|
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
self._account_data_handler = hs.get_account_data_handler()
|
self._account_data_handler = hs.get_account_data_handler()
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
@ -1215,7 +1217,8 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
|||||||
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
|
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.federation = hs.get_federation_client()
|
self.federation = hs.get_federation_client()
|
||||||
self.clock = hs.get_clock()
|
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||||
|
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||||
self.device_handler = device_handler
|
self.device_handler = device_handler
|
||||||
self._notifier = hs.get_notifier()
|
self._notifier = hs.get_notifier()
|
||||||
|
|
||||||
|
@ -476,16 +476,16 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
|
|||||||
class EventCreationHandler:
|
class EventCreationHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
self.validator = EventValidator()
|
||||||
|
self.event_builder_factory = hs.get_event_builder_factory()
|
||||||
|
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||||
|
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||||
self.auth_blocking = hs.get_auth_blocking()
|
self.auth_blocking = hs.get_auth_blocking()
|
||||||
self._event_auth_handler = hs.get_event_auth_handler()
|
self._event_auth_handler = hs.get_event_auth_handler()
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.clock = hs.get_clock()
|
|
||||||
self.validator = EventValidator()
|
|
||||||
self.profile_handler = hs.get_profile_handler()
|
self.profile_handler = hs.get_profile_handler()
|
||||||
self.event_builder_factory = hs.get_event_builder_factory()
|
|
||||||
self.server_name = hs.hostname
|
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.config = hs.config
|
self.config = hs.config
|
||||||
self.require_membership_for_aliases = (
|
self.require_membership_for_aliases = (
|
||||||
|
@ -747,6 +747,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
class PresenceHandler(BasePresenceHandler):
|
class PresenceHandler(BasePresenceHandler):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.wheel_timer: WheelTimer[str] = WheelTimer()
|
self.wheel_timer: WheelTimer[str] = WheelTimer()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
|
|
||||||
@ -941,7 +942,9 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
|
|
||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
|
|
||||||
with Measure(self.clock, "presence_update_states"):
|
with Measure(
|
||||||
|
self.clock, name="presence_update_states", server_name=self.server_name
|
||||||
|
):
|
||||||
# NOTE: We purposefully don't await between now and when we've
|
# NOTE: We purposefully don't await between now and when we've
|
||||||
# calculated what we want to do with the new states, to avoid races.
|
# calculated what we want to do with the new states, to avoid races.
|
||||||
|
|
||||||
@ -1497,7 +1500,9 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
async def _unsafe_process(self) -> None:
|
async def _unsafe_process(self) -> None:
|
||||||
# Loop round handling deltas until we're up to date
|
# Loop round handling deltas until we're up to date
|
||||||
while True:
|
while True:
|
||||||
with Measure(self.clock, "presence_delta"):
|
with Measure(
|
||||||
|
self.clock, name="presence_delta", server_name=self.server_name
|
||||||
|
):
|
||||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||||
if self._event_pos == room_max_stream_ordering:
|
if self._event_pos == room_max_stream_ordering:
|
||||||
return
|
return
|
||||||
@ -1759,6 +1764,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
|||||||
# Same with get_presence_router:
|
# Same with get_presence_router:
|
||||||
#
|
#
|
||||||
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
|
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.get_presence_handler = hs.get_presence_handler
|
self.get_presence_handler = hs.get_presence_handler
|
||||||
self.get_presence_router = hs.get_presence_router
|
self.get_presence_router = hs.get_presence_router
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
@ -1792,7 +1798,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
|||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
stream_change_cache = self.store.presence_stream_cache
|
stream_change_cache = self.store.presence_stream_cache
|
||||||
|
|
||||||
with Measure(self.clock, "presence.get_new_events"):
|
with Measure(
|
||||||
|
self.clock, name="presence.get_new_events", server_name=self.server_name
|
||||||
|
):
|
||||||
if from_key is not None:
|
if from_key is not None:
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
|
|
||||||
|
@ -329,6 +329,7 @@ class E2eeSyncResult:
|
|||||||
|
|
||||||
class SyncHandler:
|
class SyncHandler:
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.hs_config = hs.config
|
self.hs_config = hs.config
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
@ -710,7 +711,9 @@ class SyncHandler:
|
|||||||
|
|
||||||
sync_config = sync_result_builder.sync_config
|
sync_config = sync_result_builder.sync_config
|
||||||
|
|
||||||
with Measure(self.clock, "ephemeral_by_room"):
|
with Measure(
|
||||||
|
self.clock, name="ephemeral_by_room", server_name=self.server_name
|
||||||
|
):
|
||||||
typing_key = since_token.typing_key if since_token else 0
|
typing_key = since_token.typing_key if since_token else 0
|
||||||
|
|
||||||
room_ids = sync_result_builder.joined_room_ids
|
room_ids = sync_result_builder.joined_room_ids
|
||||||
@ -783,7 +786,9 @@ class SyncHandler:
|
|||||||
and current token to send down to clients.
|
and current token to send down to clients.
|
||||||
newly_joined_room
|
newly_joined_room
|
||||||
"""
|
"""
|
||||||
with Measure(self.clock, "load_filtered_recents"):
|
with Measure(
|
||||||
|
self.clock, name="load_filtered_recents", server_name=self.server_name
|
||||||
|
):
|
||||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||||
block_all_timeline = (
|
block_all_timeline = (
|
||||||
sync_config.filter_collection.blocks_all_room_timeline()
|
sync_config.filter_collection.blocks_all_room_timeline()
|
||||||
@ -1174,7 +1179,9 @@ class SyncHandler:
|
|||||||
# updates even if they occurred logically before the previous event.
|
# updates even if they occurred logically before the previous event.
|
||||||
# TODO(mjark) Check for new redactions in the state events.
|
# TODO(mjark) Check for new redactions in the state events.
|
||||||
|
|
||||||
with Measure(self.clock, "compute_state_delta"):
|
with Measure(
|
||||||
|
self.clock, name="compute_state_delta", server_name=self.server_name
|
||||||
|
):
|
||||||
# The memberships needed for events in the timeline.
|
# The memberships needed for events in the timeline.
|
||||||
# Only calculated when `lazy_load_members` is on.
|
# Only calculated when `lazy_load_members` is on.
|
||||||
members_to_fetch: Optional[Set[str]] = None
|
members_to_fetch: Optional[Set[str]] = None
|
||||||
@ -1791,7 +1798,9 @@ class SyncHandler:
|
|||||||
# the DB.
|
# the DB.
|
||||||
return RoomNotifCounts.empty()
|
return RoomNotifCounts.empty()
|
||||||
|
|
||||||
with Measure(self.clock, "unread_notifs_for_room_id"):
|
with Measure(
|
||||||
|
self.clock, name="unread_notifs_for_room_id", server_name=self.server_name
|
||||||
|
):
|
||||||
return await self.store.get_unread_event_push_actions_by_room_for_user(
|
return await self.store.get_unread_event_push_actions_by_room_for_user(
|
||||||
room_id,
|
room_id,
|
||||||
sync_config.user.to_string(),
|
sync_config.user.to_string(),
|
||||||
|
@ -503,6 +503,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
|||||||
|
|
||||||
class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self._main_store = hs.get_datastores().main
|
self._main_store = hs.get_datastores().main
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
# We can't call get_typing_handler here because there's a cycle:
|
# We can't call get_typing_handler here because there's a cycle:
|
||||||
@ -535,7 +536,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
|||||||
appservice may be interested in.
|
appservice may be interested in.
|
||||||
* The latest known room serial.
|
* The latest known room serial.
|
||||||
"""
|
"""
|
||||||
with Measure(self.clock, "typing.get_new_events_as"):
|
with Measure(
|
||||||
|
self.clock, name="typing.get_new_events_as", server_name=self.server_name
|
||||||
|
):
|
||||||
handler = self.get_typing_handler()
|
handler = self.get_typing_handler()
|
||||||
|
|
||||||
events = []
|
events = []
|
||||||
@ -571,7 +574,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
|||||||
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with Measure(self.clock, "typing.get_new_events"):
|
with Measure(
|
||||||
|
self.clock, name="typing.get_new_events", server_name=self.server_name
|
||||||
|
):
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
handler = self.get_typing_handler()
|
handler = self.get_typing_handler()
|
||||||
|
|
||||||
|
@ -237,7 +237,9 @@ class UserDirectoryHandler(StateDeltasHandler):
|
|||||||
|
|
||||||
# Loop round handling deltas until we're up to date
|
# Loop round handling deltas until we're up to date
|
||||||
while True:
|
while True:
|
||||||
with Measure(self.clock, "user_dir_delta"):
|
with Measure(
|
||||||
|
self.clock, name="user_dir_delta", server_name=self.server_name
|
||||||
|
):
|
||||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||||
if self.pos == room_max_stream_ordering:
|
if self.pos == room_max_stream_ordering:
|
||||||
return
|
return
|
||||||
|
@ -92,6 +92,7 @@ class MatrixFederationAgent:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
server_name: str,
|
||||||
reactor: ISynapseReactor,
|
reactor: ISynapseReactor,
|
||||||
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
||||||
user_agent: bytes,
|
user_agent: bytes,
|
||||||
@ -100,6 +101,11 @@ class MatrixFederationAgent:
|
|||||||
_srv_resolver: Optional[SrvResolver] = None,
|
_srv_resolver: Optional[SrvResolver] = None,
|
||||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||||
):
|
):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
|
||||||
|
"""
|
||||||
|
|
||||||
# proxy_reactor is not blocklisting reactor
|
# proxy_reactor is not blocklisting reactor
|
||||||
proxy_reactor = reactor
|
proxy_reactor = reactor
|
||||||
|
|
||||||
@ -127,6 +133,7 @@ class MatrixFederationAgent:
|
|||||||
|
|
||||||
if _well_known_resolver is None:
|
if _well_known_resolver is None:
|
||||||
_well_known_resolver = WellKnownResolver(
|
_well_known_resolver = WellKnownResolver(
|
||||||
|
server_name,
|
||||||
reactor,
|
reactor,
|
||||||
agent=BlocklistingAgentWrapper(
|
agent=BlocklistingAgentWrapper(
|
||||||
ProxyAgent(
|
ProxyAgent(
|
||||||
|
@ -91,12 +91,19 @@ class WellKnownResolver:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
server_name: str,
|
||||||
reactor: IReactorTime,
|
reactor: IReactorTime,
|
||||||
agent: IAgent,
|
agent: IAgent,
|
||||||
user_agent: bytes,
|
user_agent: bytes,
|
||||||
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
|
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
|
||||||
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
|
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
|
||||||
):
|
):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
server_name: Our homeserver name (used to label metrics) (`hs.hostname`).
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.server_name = server_name
|
||||||
self._reactor = reactor
|
self._reactor = reactor
|
||||||
self._clock = Clock(reactor)
|
self._clock = Clock(reactor)
|
||||||
|
|
||||||
@ -134,7 +141,13 @@ class WellKnownResolver:
|
|||||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||||
# requests for the same server in parallel?
|
# requests for the same server in parallel?
|
||||||
try:
|
try:
|
||||||
with Measure(self._clock, "get_well_known"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="get_well_known",
|
||||||
|
# This should be our homeserver where the the code is running (used to
|
||||||
|
# label metrics)
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
result: Optional[bytes]
|
result: Optional[bytes]
|
||||||
cache_period: float
|
cache_period: float
|
||||||
|
|
||||||
|
@ -417,6 +417,7 @@ class MatrixFederationHttpClient:
|
|||||||
if hs.get_instance_name() in outbound_federation_restricted_to:
|
if hs.get_instance_name() in outbound_federation_restricted_to:
|
||||||
# Talk to federation directly
|
# Talk to federation directly
|
||||||
federation_agent: IAgent = MatrixFederationAgent(
|
federation_agent: IAgent = MatrixFederationAgent(
|
||||||
|
self.server_name,
|
||||||
self.reactor,
|
self.reactor,
|
||||||
tls_client_options_factory,
|
tls_client_options_factory,
|
||||||
user_agent.encode("ascii"),
|
user_agent.encode("ascii"),
|
||||||
@ -697,7 +698,11 @@ class MatrixFederationHttpClient:
|
|||||||
outgoing_requests_counter.labels(request.method).inc()
|
outgoing_requests_counter.labels(request.method).inc()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with Measure(self.clock, "outbound_request"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name="outbound_request",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
# we don't want all the fancy cookie and redirect handling
|
# we don't want all the fancy cookie and redirect handling
|
||||||
# that treq.request gives: just use the raw Agent.
|
# that treq.request gives: just use the raw Agent.
|
||||||
|
|
||||||
|
@ -66,6 +66,19 @@ all_gauges: Dict[str, Collector] = {}
|
|||||||
|
|
||||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||||
|
|
||||||
|
INSTANCE_LABEL_NAME = "instance"
|
||||||
|
"""
|
||||||
|
The standard Prometheus label name used to identify which server instance the metrics
|
||||||
|
came from.
|
||||||
|
|
||||||
|
In the case of a Synapse homeserver, this should be set to the homeserver name
|
||||||
|
(`hs.hostname`).
|
||||||
|
|
||||||
|
Normally, this would be set automatically by the Prometheus server scraping the data but
|
||||||
|
since we support multiple instances of Synapse running in the same process and all
|
||||||
|
metrics are in a single global `REGISTRY`, we need to manually label any metrics.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class _RegistryProxy:
|
class _RegistryProxy:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -192,7 +205,16 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
|||||||
same key.
|
same key.
|
||||||
|
|
||||||
Note that `callback` may be called on a separate thread.
|
Note that `callback` may be called on a separate thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: A tuple of label values, which must match the order of the
|
||||||
|
`labels` given to the constructor.
|
||||||
|
callback
|
||||||
"""
|
"""
|
||||||
|
assert len(key) == len(self.labels), (
|
||||||
|
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
|
||||||
|
)
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._registrations.setdefault(key, set()).add(callback)
|
self._registrations.setdefault(key, set()).add(callback)
|
||||||
|
|
||||||
@ -201,7 +223,17 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
|||||||
key: Tuple[str, ...],
|
key: Tuple[str, ...],
|
||||||
callback: Callable[[MetricsEntry], None],
|
callback: Callable[[MetricsEntry], None],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Registers that we've exited a block with labels `key`."""
|
"""
|
||||||
|
Registers that we've exited a block with labels `key`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: A tuple of label values, which must match the order of the
|
||||||
|
`labels` given to the constructor.
|
||||||
|
callback
|
||||||
|
"""
|
||||||
|
assert len(key) == len(self.labels), (
|
||||||
|
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
|
||||||
|
)
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._registrations.setdefault(key, set()).discard(callback)
|
self._registrations.setdefault(key, set()).discard(callback)
|
||||||
@ -225,7 +257,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
callbacks = set(self._registrations[key])
|
callbacks = set(self._registrations[key])
|
||||||
|
|
||||||
in_flight.add_metric(key, len(callbacks))
|
in_flight.add_metric(labels=key, value=len(callbacks))
|
||||||
|
|
||||||
metrics = self._metrics_class()
|
metrics = self._metrics_class()
|
||||||
metrics_by_key[key] = metrics
|
metrics_by_key[key] = metrics
|
||||||
@ -239,7 +271,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
|||||||
"_".join([self.name, name]), "", labels=self.labels
|
"_".join([self.name, name]), "", labels=self.labels
|
||||||
)
|
)
|
||||||
for key, metrics in metrics_by_key.items():
|
for key, metrics in metrics_by_key.items():
|
||||||
gauge.add_metric(key, getattr(metrics, name))
|
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
||||||
yield gauge
|
yield gauge
|
||||||
|
|
||||||
def _register_with_collector(self) -> None:
|
def _register_with_collector(self) -> None:
|
||||||
|
@ -31,6 +31,7 @@ IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK = Callable[[str, int], Awaitabl
|
|||||||
|
|
||||||
class MediaRepositoryModuleApiCallbacks:
|
class MediaRepositoryModuleApiCallbacks:
|
||||||
def __init__(self, hs: "HomeServer") -> None:
|
def __init__(self, hs: "HomeServer") -> None:
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self._get_media_config_for_user_callbacks: List[
|
self._get_media_config_for_user_callbacks: List[
|
||||||
GET_MEDIA_CONFIG_FOR_USER_CALLBACK
|
GET_MEDIA_CONFIG_FOR_USER_CALLBACK
|
||||||
@ -57,7 +58,11 @@ class MediaRepositoryModuleApiCallbacks:
|
|||||||
|
|
||||||
async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]:
|
async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]:
|
||||||
for callback in self._get_media_config_for_user_callbacks:
|
for callback in self._get_media_config_for_user_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res: Optional[JsonDict] = await delay_cancellation(callback(user_id))
|
res: Optional[JsonDict] = await delay_cancellation(callback(user_id))
|
||||||
if res:
|
if res:
|
||||||
return res
|
return res
|
||||||
@ -68,7 +73,11 @@ class MediaRepositoryModuleApiCallbacks:
|
|||||||
self, user_id: str, size: int
|
self, user_id: str, size: int
|
||||||
) -> bool:
|
) -> bool:
|
||||||
for callback in self._is_user_allowed_to_upload_media_of_size_callbacks:
|
for callback in self._is_user_allowed_to_upload_media_of_size_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res: bool = await delay_cancellation(callback(user_id, size))
|
res: bool = await delay_cancellation(callback(user_id, size))
|
||||||
if not res:
|
if not res:
|
||||||
return res
|
return res
|
||||||
|
@ -43,6 +43,7 @@ GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK = Callable[
|
|||||||
|
|
||||||
class RatelimitModuleApiCallbacks:
|
class RatelimitModuleApiCallbacks:
|
||||||
def __init__(self, hs: "HomeServer") -> None:
|
def __init__(self, hs: "HomeServer") -> None:
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self._get_ratelimit_override_for_user_callbacks: List[
|
self._get_ratelimit_override_for_user_callbacks: List[
|
||||||
GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK
|
GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK
|
||||||
@ -64,7 +65,11 @@ class RatelimitModuleApiCallbacks:
|
|||||||
self, user_id: str, limiter_name: str
|
self, user_id: str, limiter_name: str
|
||||||
) -> Optional[RatelimitOverride]:
|
) -> Optional[RatelimitOverride]:
|
||||||
for callback in self._get_ratelimit_override_for_user_callbacks:
|
for callback in self._get_ratelimit_override_for_user_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res: Optional[RatelimitOverride] = await delay_cancellation(
|
res: Optional[RatelimitOverride] = await delay_cancellation(
|
||||||
callback(user_id, limiter_name)
|
callback(user_id, limiter_name)
|
||||||
)
|
)
|
||||||
|
@ -356,6 +356,7 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM"
|
NOT_SPAM: Literal["NOT_SPAM"] = "NOT_SPAM"
|
||||||
|
|
||||||
def __init__(self, hs: "synapse.server.HomeServer") -> None:
|
def __init__(self, hs: "synapse.server.HomeServer") -> None:
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
|
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
|
||||||
@ -490,7 +491,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
generally discouraged as it doesn't support internationalization.
|
generally discouraged as it doesn't support internationalization.
|
||||||
"""
|
"""
|
||||||
for callback in self._check_event_for_spam_callbacks:
|
for callback in self._check_event_for_spam_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(callback(event))
|
res = await delay_cancellation(callback(event))
|
||||||
if res is False or res == self.NOT_SPAM:
|
if res is False or res == self.NOT_SPAM:
|
||||||
# This spam-checker accepts the event.
|
# This spam-checker accepts the event.
|
||||||
@ -543,7 +548,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
True if the event should be silently dropped
|
True if the event should be silently dropped
|
||||||
"""
|
"""
|
||||||
for callback in self._should_drop_federated_event_callbacks:
|
for callback in self._should_drop_federated_event_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res: Union[bool, str] = await delay_cancellation(callback(event))
|
res: Union[bool, str] = await delay_cancellation(callback(event))
|
||||||
if res:
|
if res:
|
||||||
return res
|
return res
|
||||||
@ -565,7 +574,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
|
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_join_room_callbacks:
|
for callback in self._user_may_join_room_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(callback(user_id, room_id, is_invited))
|
res = await delay_cancellation(callback(user_id, room_id, is_invited))
|
||||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||||
if res is True or res is self.NOT_SPAM:
|
if res is True or res is self.NOT_SPAM:
|
||||||
@ -604,7 +617,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_invite_callbacks:
|
for callback in self._user_may_invite_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(
|
res = await delay_cancellation(
|
||||||
callback(inviter_userid, invitee_userid, room_id)
|
callback(inviter_userid, invitee_userid, room_id)
|
||||||
)
|
)
|
||||||
@ -643,7 +660,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||||
"""
|
"""
|
||||||
for callback in self._federated_user_may_invite_callbacks:
|
for callback in self._federated_user_may_invite_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(callback(event))
|
res = await delay_cancellation(callback(event))
|
||||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||||
if res is True or res is self.NOT_SPAM:
|
if res is True or res is self.NOT_SPAM:
|
||||||
@ -686,7 +707,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_send_3pid_invite_callbacks:
|
for callback in self._user_may_send_3pid_invite_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(
|
res = await delay_cancellation(
|
||||||
callback(inviter_userid, medium, address, room_id)
|
callback(inviter_userid, medium, address, room_id)
|
||||||
)
|
)
|
||||||
@ -722,7 +747,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
room_config: The room creation configuration which is the body of the /createRoom request
|
room_config: The room creation configuration which is the body of the /createRoom request
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_create_room_callbacks:
|
for callback in self._user_may_create_room_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
checker_args = inspect.signature(callback)
|
checker_args = inspect.signature(callback)
|
||||||
# Also ensure backwards compatibility with spam checker callbacks
|
# Also ensure backwards compatibility with spam checker callbacks
|
||||||
# that don't expect the room_config argument.
|
# that don't expect the room_config argument.
|
||||||
@ -786,7 +815,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
content: The content of the state event
|
content: The content of the state event
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_send_state_event_callbacks:
|
for callback in self._user_may_send_state_event_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
# We make a copy of the content to ensure that the spam checker cannot modify it.
|
# We make a copy of the content to ensure that the spam checker cannot modify it.
|
||||||
res = await delay_cancellation(
|
res = await delay_cancellation(
|
||||||
callback(user_id, room_id, event_type, state_key, deepcopy(content))
|
callback(user_id, room_id, event_type, state_key, deepcopy(content))
|
||||||
@ -814,7 +847,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_create_room_alias_callbacks:
|
for callback in self._user_may_create_room_alias_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(callback(userid, room_alias))
|
res = await delay_cancellation(callback(userid, room_alias))
|
||||||
if res is True or res is self.NOT_SPAM:
|
if res is True or res is self.NOT_SPAM:
|
||||||
continue
|
continue
|
||||||
@ -847,7 +884,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
room_id: The ID of the room that would be published
|
room_id: The ID of the room that would be published
|
||||||
"""
|
"""
|
||||||
for callback in self._user_may_publish_room_callbacks:
|
for callback in self._user_may_publish_room_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(callback(userid, room_id))
|
res = await delay_cancellation(callback(userid, room_id))
|
||||||
if res is True or res is self.NOT_SPAM:
|
if res is True or res is self.NOT_SPAM:
|
||||||
continue
|
continue
|
||||||
@ -889,7 +930,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
True if the user is spammy.
|
True if the user is spammy.
|
||||||
"""
|
"""
|
||||||
for callback in self._check_username_for_spam_callbacks:
|
for callback in self._check_username_for_spam_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
checker_args = inspect.signature(callback)
|
checker_args = inspect.signature(callback)
|
||||||
# Make a copy of the user profile object to ensure the spam checker cannot
|
# Make a copy of the user profile object to ensure the spam checker cannot
|
||||||
# modify it.
|
# modify it.
|
||||||
@ -938,7 +983,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
for callback in self._check_registration_for_spam_callbacks:
|
for callback in self._check_registration_for_spam_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
behaviour = await delay_cancellation(
|
behaviour = await delay_cancellation(
|
||||||
callback(email_threepid, username, request_info, auth_provider_id)
|
callback(email_threepid, username, request_info, auth_provider_id)
|
||||||
)
|
)
|
||||||
@ -980,7 +1029,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
for callback in self._check_media_file_for_spam_callbacks:
|
for callback in self._check_media_file_for_spam_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(callback(file_wrapper, file_info))
|
res = await delay_cancellation(callback(file_wrapper, file_info))
|
||||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||||
if res is False or res is self.NOT_SPAM:
|
if res is False or res is self.NOT_SPAM:
|
||||||
@ -1027,7 +1080,11 @@ class SpamCheckerModuleApiCallbacks:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
for callback in self._check_login_for_spam_callbacks:
|
for callback in self._check_login_for_spam_callbacks:
|
||||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name=f"{callback.__module__}.{callback.__qualname__}",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await delay_cancellation(
|
res = await delay_cancellation(
|
||||||
callback(
|
callback(
|
||||||
user_id,
|
user_id,
|
||||||
|
@ -129,7 +129,8 @@ class BulkPushRuleEvaluator:
|
|||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.clock = hs.get_clock()
|
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||||
|
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||||
self._event_auth_handler = hs.get_event_auth_handler()
|
self._event_auth_handler = hs.get_event_auth_handler()
|
||||||
self.should_calculate_push_rules = self.hs.config.push.enable_push
|
self.should_calculate_push_rules = self.hs.config.push.enable_push
|
||||||
|
|
||||||
|
@ -76,6 +76,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
async def _handle_request( # type: ignore[override]
|
async def _handle_request( # type: ignore[override]
|
||||||
self, request: Request, content: JsonDict
|
self, request: Request, content: JsonDict
|
||||||
) -> Tuple[int, JsonDict]:
|
) -> Tuple[int, JsonDict]:
|
||||||
with Measure(self.clock, "repl_fed_send_events_parse"):
|
with Measure(
|
||||||
|
self.clock, name="repl_fed_send_events_parse", server_name=self.server_name
|
||||||
|
):
|
||||||
room_id = content["room_id"]
|
room_id = content["room_id"]
|
||||||
backfilled = content["backfilled"]
|
backfilled = content["backfilled"]
|
||||||
|
|
||||||
|
@ -76,6 +76,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
|||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.event_creation_handler = hs.get_event_creation_handler()
|
self.event_creation_handler = hs.get_event_creation_handler()
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
|||||||
async def _handle_request( # type: ignore[override]
|
async def _handle_request( # type: ignore[override]
|
||||||
self, request: Request, content: JsonDict, event_id: str
|
self, request: Request, content: JsonDict, event_id: str
|
||||||
) -> Tuple[int, JsonDict]:
|
) -> Tuple[int, JsonDict]:
|
||||||
with Measure(self.clock, "repl_send_event_parse"):
|
with Measure(
|
||||||
|
self.clock, name="repl_send_event_parse", server_name=self.server_name
|
||||||
|
):
|
||||||
event_dict = content["event"]
|
event_dict = content["event"]
|
||||||
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
|
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
|
||||||
internal_metadata = content["internal_metadata"]
|
internal_metadata = content["internal_metadata"]
|
||||||
|
@ -77,6 +77,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.event_creation_handler = hs.get_event_creation_handler()
|
self.event_creation_handler = hs.get_event_creation_handler()
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
async def _handle_request( # type: ignore[override]
|
async def _handle_request( # type: ignore[override]
|
||||||
self, request: Request, payload: JsonDict
|
self, request: Request, payload: JsonDict
|
||||||
) -> Tuple[int, JsonDict]:
|
) -> Tuple[int, JsonDict]:
|
||||||
with Measure(self.clock, "repl_send_events_parse"):
|
with Measure(
|
||||||
|
self.clock, name="repl_send_events_parse", server_name=self.server_name
|
||||||
|
):
|
||||||
events_and_context = []
|
events_and_context = []
|
||||||
events = payload["events"]
|
events = payload["events"]
|
||||||
rooms = set()
|
rooms = set()
|
||||||
|
@ -75,6 +75,7 @@ class ReplicationDataHandler:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self._reactor = hs.get_reactor()
|
self._reactor = hs.get_reactor()
|
||||||
@ -342,7 +343,11 @@ class ReplicationDataHandler:
|
|||||||
waiting_list.add((position, deferred))
|
waiting_list.add((position, deferred))
|
||||||
|
|
||||||
# We measure here to get in flight counts and average waiting time.
|
# We measure here to get in flight counts and average waiting time.
|
||||||
with Measure(self._clock, "repl.wait_for_stream_position"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="repl.wait_for_stream_position",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
|
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
|
||||||
stream_name,
|
stream_name,
|
||||||
|
@ -78,6 +78,7 @@ class ReplicationStreamer:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
@ -155,7 +156,11 @@ class ReplicationStreamer:
|
|||||||
while self.pending_updates:
|
while self.pending_updates:
|
||||||
self.pending_updates = False
|
self.pending_updates = False
|
||||||
|
|
||||||
with Measure(self.clock, "repl.stream.get_updates"):
|
with Measure(
|
||||||
|
self.clock,
|
||||||
|
name="repl.stream.get_updates",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
all_streams = self.streams
|
all_streams = self.streams
|
||||||
|
|
||||||
if self._replication_torture_level is not None:
|
if self._replication_torture_level is not None:
|
||||||
|
@ -189,7 +189,8 @@ class StateHandler:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.clock = hs.get_clock()
|
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||||
|
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self._state_storage_controller = hs.get_storage_controllers().state
|
self._state_storage_controller = hs.get_storage_controllers().state
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
@ -631,6 +632,7 @@ class StateResolutionHandler:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
||||||
@ -747,7 +749,9 @@ class StateResolutionHandler:
|
|||||||
# which will be used as a cache key for future resolutions, but
|
# which will be used as a cache key for future resolutions, but
|
||||||
# not get persisted.
|
# not get persisted.
|
||||||
|
|
||||||
with Measure(self.clock, "state.create_group_ids"):
|
with Measure(
|
||||||
|
self.clock, name="state.create_group_ids", server_name=self.server_name
|
||||||
|
):
|
||||||
cache = _make_state_cache_entry(new_state, state_groups_ids)
|
cache = _make_state_cache_entry(new_state, state_groups_ids)
|
||||||
|
|
||||||
self._state_cache[group_names] = cache
|
self._state_cache[group_names] = cache
|
||||||
@ -785,7 +789,9 @@ class StateResolutionHandler:
|
|||||||
a map from (type, state_key) to event_id.
|
a map from (type, state_key) to event_id.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with Measure(self.clock, "state._resolve_events") as m:
|
with Measure(
|
||||||
|
self.clock, name="state._resolve_events", server_name=self.server_name
|
||||||
|
) as m:
|
||||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||||
if room_version_obj.state_res == StateResolutionVersions.V1:
|
if room_version_obj.state_res == StateResolutionVersions.V1:
|
||||||
return await v1.resolve_events_with_store(
|
return await v1.resolve_events_with_store(
|
||||||
|
@ -55,6 +55,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||||||
hs: "HomeServer",
|
hs: "HomeServer",
|
||||||
):
|
):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
self.server_name = hs.hostname
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.database_engine = database.engine
|
self.database_engine = database.engine
|
||||||
self.db_pool = database
|
self.db_pool = database
|
||||||
|
@ -337,6 +337,7 @@ class EventsPersistenceStorageController:
|
|||||||
assert stores.persist_events
|
assert stores.persist_events
|
||||||
self.persist_events_store = stores.persist_events
|
self.persist_events_store = stores.persist_events
|
||||||
|
|
||||||
|
self.server_name = hs.hostname
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
|
|||||||
state_delta_for_room = None
|
state_delta_for_room = None
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="_calculate_state_and_extrem",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
# Work out the new "current state" for the room.
|
# Work out the new "current state" for the room.
|
||||||
# We do this by working out what the new extremities are and then
|
# We do this by working out what the new extremities are and then
|
||||||
# calculating the state from that.
|
# calculating the state from that.
|
||||||
@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
|
|||||||
room_id, chunk
|
room_id, chunk
|
||||||
)
|
)
|
||||||
|
|
||||||
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="calculate_chain_cover_index_for_events",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
# We now calculate chain ID/sequence numbers for any state events we're
|
# We now calculate chain ID/sequence numbers for any state events we're
|
||||||
# persisting. We ignore out of band memberships as we're not in the room
|
# persisting. We ignore out of band memberships as we're not in the room
|
||||||
# and won't have their auth chain (we'll fix it up later if we join the
|
# and won't have their auth chain (we'll fix it up later if we join the
|
||||||
@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
|
|||||||
break
|
break
|
||||||
|
|
||||||
logger.debug("Calculating state delta for room %s", room_id)
|
logger.debug("Calculating state delta for room %s", room_id)
|
||||||
with Measure(self._clock, "persist_events.get_new_state_after_events"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="persist_events.get_new_state_after_events",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
res = await self._get_new_state_after_events(
|
res = await self._get_new_state_after_events(
|
||||||
room_id,
|
room_id,
|
||||||
ev_ctx_rm,
|
ev_ctx_rm,
|
||||||
@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
|
|||||||
# removed keys entirely.
|
# removed keys entirely.
|
||||||
delta = DeltaState([], delta_ids)
|
delta = DeltaState([], delta_ids)
|
||||||
elif current_state is not None:
|
elif current_state is not None:
|
||||||
with Measure(self._clock, "persist_events.calculate_state_delta"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="persist_events.calculate_state_delta",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
delta = await self._calculate_state_delta(room_id, current_state)
|
delta = await self._calculate_state_delta(room_id, current_state)
|
||||||
|
|
||||||
if delta:
|
if delta:
|
||||||
|
@ -68,6 +68,7 @@ class StateStorageController:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer", stores: "Databases"):
|
def __init__(self, hs: "HomeServer", stores: "Databases"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
self._is_mine_id = hs.is_mine_id
|
self._is_mine_id = hs.is_mine_id
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.stores = stores
|
self.stores = stores
|
||||||
@ -812,7 +813,9 @@ class StateStorageController:
|
|||||||
state_group = object()
|
state_group = object()
|
||||||
|
|
||||||
assert state_group is not None
|
assert state_group is not None
|
||||||
with Measure(self._clock, "get_joined_hosts"):
|
with Measure(
|
||||||
|
self._clock, name="get_joined_hosts", server_name=self.server_name
|
||||||
|
):
|
||||||
return await self._get_joined_hosts(
|
return await self._get_joined_hosts(
|
||||||
room_id, state_group, state_entry=state_entry
|
room_id, state_group, state_entry=state_entry
|
||||||
)
|
)
|
||||||
|
@ -1246,7 +1246,9 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
to event row. Note that it may well contain additional events that
|
to event row. Note that it may well contain additional events that
|
||||||
were not part of this request.
|
were not part of this request.
|
||||||
"""
|
"""
|
||||||
with Measure(self._clock, "_fetch_event_list"):
|
with Measure(
|
||||||
|
self._clock, name="_fetch_event_list", server_name=self.server_name
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
events_to_fetch = {
|
events_to_fetch = {
|
||||||
event_id for events, _ in event_list for event_id in events
|
event_id for events, _ in event_list for event_id in events
|
||||||
|
@ -983,7 +983,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||||||
`_get_user_ids_from_membership_event_ids` for any uncached events.
|
`_get_user_ids_from_membership_event_ids` for any uncached events.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with Measure(self._clock, "get_joined_user_ids_from_state"):
|
with Measure(
|
||||||
|
self._clock,
|
||||||
|
name="get_joined_user_ids_from_state",
|
||||||
|
server_name=self.server_name,
|
||||||
|
):
|
||||||
users_in_room = set()
|
users_in_room = set()
|
||||||
member_event_ids = [
|
member_event_ids = [
|
||||||
e_id for key, e_id in state.items() if key[0] == EventTypes.Member
|
e_id for key, e_id in state.items() if key[0] == EventTypes.Member
|
||||||
|
@ -41,49 +41,83 @@ from synapse.logging.context import (
|
|||||||
LoggingContext,
|
LoggingContext,
|
||||||
current_context,
|
current_context,
|
||||||
)
|
)
|
||||||
from synapse.metrics import InFlightGauge
|
from synapse.metrics import INSTANCE_LABEL_NAME, InFlightGauge
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"])
|
# Metrics to see the number of and how much time is spend in various blocks of code.
|
||||||
|
#
|
||||||
|
block_counter = Counter(
|
||||||
|
"synapse_util_metrics_block_count",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
|
)
|
||||||
|
"""The number of times this block has been called."""
|
||||||
|
|
||||||
block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"])
|
block_timer = Counter(
|
||||||
|
"synapse_util_metrics_block_time_seconds",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
|
)
|
||||||
|
"""The cumulative time spent executing this block across all calls, in seconds."""
|
||||||
|
|
||||||
block_ru_utime = Counter(
|
block_ru_utime = Counter(
|
||||||
"synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]
|
"synapse_util_metrics_block_ru_utime_seconds",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
)
|
)
|
||||||
|
"""Resource usage: user CPU time in seconds used in this block"""
|
||||||
|
|
||||||
block_ru_stime = Counter(
|
block_ru_stime = Counter(
|
||||||
"synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]
|
"synapse_util_metrics_block_ru_stime_seconds",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
)
|
)
|
||||||
|
"""Resource usage: system CPU time in seconds used in this block"""
|
||||||
|
|
||||||
block_db_txn_count = Counter(
|
block_db_txn_count = Counter(
|
||||||
"synapse_util_metrics_block_db_txn_count", "", ["block_name"]
|
"synapse_util_metrics_block_db_txn_count",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
)
|
)
|
||||||
|
"""Number of database transactions completed in this block"""
|
||||||
|
|
||||||
# seconds spent waiting for db txns, excluding scheduling time, in this block
|
# seconds spent waiting for db txns, excluding scheduling time, in this block
|
||||||
block_db_txn_duration = Counter(
|
block_db_txn_duration = Counter(
|
||||||
"synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]
|
"synapse_util_metrics_block_db_txn_duration_seconds",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
)
|
)
|
||||||
|
"""Seconds spent waiting for database txns, excluding scheduling time, in this block"""
|
||||||
|
|
||||||
# seconds spent waiting for a db connection, in this block
|
# seconds spent waiting for a db connection, in this block
|
||||||
block_db_sched_duration = Counter(
|
block_db_sched_duration = Counter(
|
||||||
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]
|
"synapse_util_metrics_block_db_sched_duration_seconds",
|
||||||
|
"",
|
||||||
|
labelnames=["block_name", INSTANCE_LABEL_NAME],
|
||||||
)
|
)
|
||||||
|
"""Seconds spent waiting for a db connection, in this block"""
|
||||||
|
|
||||||
|
|
||||||
# This is dynamically created in InFlightGauge.__init__.
|
# This is dynamically created in InFlightGauge.__init__.
|
||||||
class _InFlightMetric(Protocol):
|
class _BlockInFlightMetric(Protocol):
|
||||||
|
"""
|
||||||
|
Sub-metrics used for the `InFlightGauge` for blocks.
|
||||||
|
"""
|
||||||
|
|
||||||
real_time_max: float
|
real_time_max: float
|
||||||
|
"""The longest observed duration of any single execution of this block, in seconds."""
|
||||||
real_time_sum: float
|
real_time_sum: float
|
||||||
|
"""The cumulative time spent executing this block across all calls, in seconds."""
|
||||||
|
|
||||||
|
|
||||||
# Tracks the number of blocks currently active
|
# Tracks the number of blocks currently active
|
||||||
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
|
in_flight: InFlightGauge[_BlockInFlightMetric] = InFlightGauge(
|
||||||
"synapse_util_metrics_block_in_flight",
|
"synapse_util_metrics_block_in_flight",
|
||||||
"",
|
"",
|
||||||
labels=["block_name"],
|
labels=["block_name", INSTANCE_LABEL_NAME],
|
||||||
|
# Matches the fields in the `_BlockInFlightMetric`
|
||||||
sub_metrics=["real_time_max", "real_time_sum"],
|
sub_metrics=["real_time_max", "real_time_sum"],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -92,8 +126,16 @@ P = ParamSpec("P")
|
|||||||
R = TypeVar("R")
|
R = TypeVar("R")
|
||||||
|
|
||||||
|
|
||||||
class HasClock(Protocol):
|
class HasClockAndServerName(Protocol):
|
||||||
clock: Clock
|
clock: Clock
|
||||||
|
"""
|
||||||
|
Used to measure functions
|
||||||
|
"""
|
||||||
|
server_name: str
|
||||||
|
"""
|
||||||
|
The homeserver name that this Measure is associated with (used to label the metric)
|
||||||
|
(`hs.hostname`).
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
def measure_func(
|
def measure_func(
|
||||||
@ -101,8 +143,9 @@ def measure_func(
|
|||||||
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
||||||
"""Decorate an async method with a `Measure` context manager.
|
"""Decorate an async method with a `Measure` context manager.
|
||||||
|
|
||||||
The Measure is created using `self.clock`; it should only be used to decorate
|
The Measure is created using `self.clock` and `self.server_name; it should only be
|
||||||
methods in classes defining an instance-level `clock` attribute.
|
used to decorate methods in classes defining an instance-level `clock` and
|
||||||
|
`server_name` attributes.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
|
|
||||||
@ -116,16 +159,21 @@ def measure_func(
|
|||||||
with Measure(...):
|
with Measure(...):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: The name of the metric to report (the block name) (used to label the
|
||||||
|
metric). Defaults to the name of the decorated function.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def wrapper(
|
def wrapper(
|
||||||
func: Callable[Concatenate[HasClock, P], Awaitable[R]],
|
func: Callable[Concatenate[HasClockAndServerName, P], Awaitable[R]],
|
||||||
) -> Callable[P, Awaitable[R]]:
|
) -> Callable[P, Awaitable[R]]:
|
||||||
block_name = func.__name__ if name is None else name
|
block_name = func.__name__ if name is None else name
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
|
async def measured_func(
|
||||||
with Measure(self.clock, block_name):
|
self: HasClockAndServerName, *args: P.args, **kwargs: P.kwargs
|
||||||
|
) -> R:
|
||||||
|
with Measure(self.clock, name=block_name, server_name=self.server_name):
|
||||||
r = await func(self, *args, **kwargs)
|
r = await func(self, *args, **kwargs)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
@ -142,19 +190,24 @@ class Measure:
|
|||||||
__slots__ = [
|
__slots__ = [
|
||||||
"clock",
|
"clock",
|
||||||
"name",
|
"name",
|
||||||
|
"server_name",
|
||||||
"_logging_context",
|
"_logging_context",
|
||||||
"start",
|
"start",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, clock: Clock, name: str) -> None:
|
def __init__(self, clock: Clock, *, name: str, server_name: str) -> None:
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
clock: An object with a "time()" method, which returns the current
|
clock: An object with a "time()" method, which returns the current
|
||||||
time in seconds.
|
time in seconds.
|
||||||
name: The name of the metric to report.
|
name: The name of the metric to report (the block name) (used to label the
|
||||||
|
metric).
|
||||||
|
server_name: The homeserver name that this Measure is associated with (used to
|
||||||
|
label the metric) (`hs.hostname`).
|
||||||
"""
|
"""
|
||||||
self.clock = clock
|
self.clock = clock
|
||||||
self.name = name
|
self.name = name
|
||||||
|
self.server_name = server_name
|
||||||
curr_context = current_context()
|
curr_context = current_context()
|
||||||
if not curr_context:
|
if not curr_context:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@ -174,7 +227,7 @@ class Measure:
|
|||||||
|
|
||||||
self.start = self.clock.time()
|
self.start = self.clock.time()
|
||||||
self._logging_context.__enter__()
|
self._logging_context.__enter__()
|
||||||
in_flight.register((self.name,), self._update_in_flight)
|
in_flight.register((self.name, self.server_name), self._update_in_flight)
|
||||||
|
|
||||||
logger.debug("Entering block %s", self.name)
|
logger.debug("Entering block %s", self.name)
|
||||||
|
|
||||||
@ -194,19 +247,20 @@ class Measure:
|
|||||||
duration = self.clock.time() - self.start
|
duration = self.clock.time() - self.start
|
||||||
usage = self.get_resource_usage()
|
usage = self.get_resource_usage()
|
||||||
|
|
||||||
in_flight.unregister((self.name,), self._update_in_flight)
|
in_flight.unregister((self.name, self.server_name), self._update_in_flight)
|
||||||
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
|
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
block_counter.labels(self.name).inc()
|
labels = {"block_name": self.name, INSTANCE_LABEL_NAME: self.server_name}
|
||||||
block_timer.labels(self.name).inc(duration)
|
block_counter.labels(**labels).inc()
|
||||||
block_ru_utime.labels(self.name).inc(usage.ru_utime)
|
block_timer.labels(**labels).inc(duration)
|
||||||
block_ru_stime.labels(self.name).inc(usage.ru_stime)
|
block_ru_utime.labels(**labels).inc(usage.ru_utime)
|
||||||
block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
|
block_ru_stime.labels(**labels).inc(usage.ru_stime)
|
||||||
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
|
block_db_txn_count.labels(**labels).inc(usage.db_txn_count)
|
||||||
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
|
block_db_txn_duration.labels(**labels).inc(usage.db_txn_duration_sec)
|
||||||
except ValueError:
|
block_db_sched_duration.labels(**labels).inc(usage.db_sched_duration_sec)
|
||||||
logger.warning("Failed to save metrics! Usage: %s", usage)
|
except ValueError as exc:
|
||||||
|
logger.warning("Failed to save metrics! Usage: %s Error: %s", usage, exc)
|
||||||
|
|
||||||
def get_resource_usage(self) -> ContextResourceUsage:
|
def get_resource_usage(self) -> ContextResourceUsage:
|
||||||
"""Get the resources used within this Measure block
|
"""Get the resources used within this Measure block
|
||||||
@ -215,7 +269,7 @@ class Measure:
|
|||||||
"""
|
"""
|
||||||
return self._logging_context.get_resource_usage()
|
return self._logging_context.get_resource_usage()
|
||||||
|
|
||||||
def _update_in_flight(self, metrics: _InFlightMetric) -> None:
|
def _update_in_flight(self, metrics: _BlockInFlightMetric) -> None:
|
||||||
"""Gets called when processing in flight metrics"""
|
"""Gets called when processing in flight metrics"""
|
||||||
assert self.start is not None
|
assert self.start is not None
|
||||||
duration = self.clock.time() - self.start
|
duration = self.clock.time() - self.start
|
||||||
|
@ -86,6 +86,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
|||||||
self.mock_federation_client = AsyncMock(spec=["put_json"])
|
self.mock_federation_client = AsyncMock(spec=["put_json"])
|
||||||
self.mock_federation_client.put_json.return_value = (200, "OK")
|
self.mock_federation_client.put_json.return_value = (200, "OK")
|
||||||
self.mock_federation_client.agent = MatrixFederationAgent(
|
self.mock_federation_client.agent = MatrixFederationAgent(
|
||||||
|
"OUR_STUB_HOMESERVER_NAME",
|
||||||
reactor,
|
reactor,
|
||||||
tls_client_options_factory=None,
|
tls_client_options_factory=None,
|
||||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||||
|
@ -91,6 +91,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
|||||||
"test_cache", timer=self.reactor.seconds
|
"test_cache", timer=self.reactor.seconds
|
||||||
)
|
)
|
||||||
self.well_known_resolver = WellKnownResolver(
|
self.well_known_resolver = WellKnownResolver(
|
||||||
|
"OUR_STUB_HOMESERVER_NAME",
|
||||||
self.reactor,
|
self.reactor,
|
||||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||||
b"test-agent",
|
b"test-agent",
|
||||||
@ -269,6 +270,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
|||||||
because it is created too early during setUp
|
because it is created too early during setUp
|
||||||
"""
|
"""
|
||||||
return MatrixFederationAgent(
|
return MatrixFederationAgent(
|
||||||
|
"OUR_STUB_HOMESERVER_NAME",
|
||||||
reactor=cast(ISynapseReactor, self.reactor),
|
reactor=cast(ISynapseReactor, self.reactor),
|
||||||
tls_client_options_factory=self.tls_factory,
|
tls_client_options_factory=self.tls_factory,
|
||||||
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
|
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
|
||||||
@ -1011,6 +1013,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
|||||||
# Build a new agent and WellKnownResolver with a different tls factory
|
# Build a new agent and WellKnownResolver with a different tls factory
|
||||||
tls_factory = FederationPolicyForHTTPS(config)
|
tls_factory = FederationPolicyForHTTPS(config)
|
||||||
agent = MatrixFederationAgent(
|
agent = MatrixFederationAgent(
|
||||||
|
"OUR_STUB_HOMESERVER_NAME",
|
||||||
reactor=self.reactor,
|
reactor=self.reactor,
|
||||||
tls_client_options_factory=tls_factory,
|
tls_client_options_factory=tls_factory,
|
||||||
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
|
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
|
||||||
@ -1018,6 +1021,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
|||||||
ip_blocklist=IPSet(),
|
ip_blocklist=IPSet(),
|
||||||
_srv_resolver=self.mock_resolver,
|
_srv_resolver=self.mock_resolver,
|
||||||
_well_known_resolver=WellKnownResolver(
|
_well_known_resolver=WellKnownResolver(
|
||||||
|
"OUR_STUB_HOMESERVER_NAME",
|
||||||
cast(ISynapseReactor, self.reactor),
|
cast(ISynapseReactor, self.reactor),
|
||||||
Agent(self.reactor, contextFactory=tls_factory),
|
Agent(self.reactor, contextFactory=tls_factory),
|
||||||
b"test-agent",
|
b"test-agent",
|
||||||
|
@ -68,6 +68,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
|||||||
|
|
||||||
reactor, _ = get_clock()
|
reactor, _ = get_clock()
|
||||||
self.matrix_federation_agent = MatrixFederationAgent(
|
self.matrix_federation_agent = MatrixFederationAgent(
|
||||||
|
"OUR_STUB_HOMESERVER_NAME",
|
||||||
reactor,
|
reactor,
|
||||||
tls_client_options_factory=None,
|
tls_client_options_factory=None,
|
||||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user