diff --git a/changelog.d/18725.misc b/changelog.d/18725.misc new file mode 100644 index 0000000000..7fa5b47b89 --- /dev/null +++ b/changelog.d/18725.misc @@ -0,0 +1 @@ +Refactor `Gauge` metrics to be homeserver-scoped. diff --git a/contrib/grafana/synapse.json b/contrib/grafana/synapse.json index 62b58a199d..e23afcf2d3 100644 --- a/contrib/grafana/synapse.json +++ b/contrib/grafana/synapse.json @@ -4396,7 +4396,7 @@ "exemplar": false, "expr": "(time() - max without (job, index, host) (avg_over_time(synapse_federation_last_received_pdu_time[10m]))) / 60", "instant": false, - "legendFormat": "{{server_name}} ", + "legendFormat": "{{origin_server_name}} ", "range": true, "refId": "A" } @@ -4518,7 +4518,7 @@ "exemplar": false, "expr": "(time() - max without (job, index, host) (avg_over_time(synapse_federation_last_sent_pdu_time[10m]))) / 60", "instant": false, - "legendFormat": "{{server_name}}", + "legendFormat": "{{destination_server_name}}", "range": true, "refId": "A" } diff --git a/docs/upgrade.md b/docs/upgrade.md index e79ca93c04..9decc43727 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -117,6 +117,25 @@ each upgrade are complete before moving on to the next upgrade, to avoid stacking them up. You can monitor the currently running background updates with [the Admin API](usage/administration/admin_api/background_updates.html#status). +# Upgrading to v1.136.0 + +## Metric labels have changed on `synapse_federation_last_received_pdu_time` and `synapse_federation_last_sent_pdu_time` + +Previously, the `synapse_federation_last_received_pdu_time` and +`synapse_federation_last_sent_pdu_time` metrics both used the `server_name` label to +differentiate between different servers that we send and receive events from. + +Since we're now using the `server_name` label to differentiate between different Synapse +homeserver instances running in the same process, these metrics have been changed as follows: + + - `synapse_federation_last_received_pdu_time` now uses the `origin_server_name` label + - `synapse_federation_last_sent_pdu_time` now uses the `destination_server_name` label + +The Grafana dashboard JSON in `contrib/grafana/synapse.json` has been updated to reflect +this change but you will need to manually update your own existing Grafana dashboards +using these metrics. + + # Upgrading to v1.135.0 ## `on_user_registration` module API callback may now run on any worker diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index 2ccac97103..3eab2b3b73 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -28,7 +28,7 @@ from typing import Callable, Optional, Tuple, Type, Union import mypy.types from mypy.erasetype import remove_instance_last_known_values from mypy.errorcodes import ErrorCode -from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, Var +from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, TupleExpr, Var from mypy.plugin import ( FunctionLike, FunctionSigContext, @@ -61,6 +61,7 @@ class SynapsePlugin(Plugin): ) -> Optional[Callable[[FunctionSigContext], FunctionLike]]: if fullname in ( "prometheus_client.metrics.Counter", + "prometheus_client.metrics.Gauge", # TODO: Add other prometheus_client metrics that need checking as we # refactor, see https://github.com/element-hq/synapse/issues/18592 ): @@ -98,8 +99,8 @@ def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableTy ensures metrics are correctly separated by homeserver. There are also some metrics that apply at the process level, such as CPU usage, - Python garbage collection, Twisted reactor tick time which shouldn't have the - `SERVER_NAME_LABEL`. In those cases, use use a type ignore comment to disable the + Python garbage collection, and Twisted reactor tick time, which shouldn't have the + `SERVER_NAME_LABEL`. In those cases, use a type ignore comment to disable the check, e.g. `# type: ignore[missing-server-name-label]`. """ # The true signature, this isn't being modified so this is what will be returned. @@ -136,7 +137,7 @@ def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableTy # ] # ``` labelnames_arg_expression = ctx.args[2][0] if len(ctx.args[2]) > 0 else None - if isinstance(labelnames_arg_expression, ListExpr): + if isinstance(labelnames_arg_expression, (ListExpr, TupleExpr)): # Check if the `labelnames` argument includes the `server_name` label (`SERVER_NAME_LABEL`). for labelname_expression in labelnames_arg_expression.items: if ( diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 6b62d37dca..48989540bb 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -525,8 +525,12 @@ async def start(hs: "HomeServer") -> None: ) # Register the threadpools with our metrics. - register_threadpool("default", reactor.getThreadPool()) - register_threadpool("gai_resolver", resolver_threadpool) + register_threadpool( + name="default", server_name=server_name, threadpool=reactor.getThreadPool() + ) + register_threadpool( + name="gai_resolver", server_name=server_name, threadpool=resolver_threadpool + ) # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 57b6db6ea5..69d3ac78fd 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -28,6 +28,7 @@ from prometheus_client import Gauge from twisted.internet import defer +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import ( run_as_background_process, ) @@ -57,16 +58,25 @@ Phone home stats are sent every 3 hours _stats_process: List[Tuple[int, "resource.struct_rusage"]] = [] # Gauges to expose monthly active user control metrics -current_mau_gauge = Gauge("synapse_admin_mau_current", "Current MAU") +current_mau_gauge = Gauge( + "synapse_admin_mau_current", + "Current MAU", + labelnames=[SERVER_NAME_LABEL], +) current_mau_by_service_gauge = Gauge( "synapse_admin_mau_current_mau_by_service", "Current MAU by service", - ["app_service"], + labelnames=["app_service", SERVER_NAME_LABEL], +) +max_mau_gauge = Gauge( + "synapse_admin_mau_max", + "MAU Limit", + labelnames=[SERVER_NAME_LABEL], ) -max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit") registered_reserved_users_mau_gauge = Gauge( "synapse_admin_mau_registered_reserved_users", "Registered users with reserved threepids", + labelnames=[SERVER_NAME_LABEL], ) @@ -237,13 +247,21 @@ def start_phone_stats_home(hs: "HomeServer") -> None: await store.get_monthly_active_count_by_service() ) reserved_users = await store.get_registered_reserved_users() - current_mau_gauge.set(float(current_mau_count)) + current_mau_gauge.labels(**{SERVER_NAME_LABEL: server_name}).set( + float(current_mau_count) + ) for app_service, count in current_mau_count_by_service.items(): - current_mau_by_service_gauge.labels(app_service).set(float(count)) + current_mau_by_service_gauge.labels( + app_service=app_service, **{SERVER_NAME_LABEL: server_name} + ).set(float(count)) - registered_reserved_users_mau_gauge.set(float(len(reserved_users))) - max_mau_gauge.set(float(hs.config.server.max_mau_value)) + registered_reserved_users_mau_gauge.labels( + **{SERVER_NAME_LABEL: server_name} + ).set(float(len(reserved_users))) + max_mau_gauge.labels(**{SERVER_NAME_LABEL: server_name}).set( + float(hs.config.server.max_mau_value) + ) return run_as_background_process( "generate_monthly_active_users", diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d8e1d0e288..7e1e05580d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -127,7 +127,7 @@ pdu_process_time = Histogram( last_pdu_ts_metric = Gauge( "synapse_federation_last_received_pdu_time", "The timestamp of the last PDU which was successfully received from the given domain", - labelnames=("server_name",), + labelnames=("origin_server_name", SERVER_NAME_LABEL), ) @@ -554,7 +554,9 @@ class FederationServer(FederationBase): ) if newest_pdu_ts and origin in self._federation_metrics_domains: - last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000) + last_pdu_ts_metric.labels( + origin_server_name=origin, **{SERVER_NAME_LABEL: self.server_name} + ).set(newest_pdu_ts / 1000) return pdu_results diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 70afe16739..9b52848792 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -705,10 +705,12 @@ class FederationSender(AbstractFederationSender): assert ts is not None synapse.metrics.event_processing_lag.labels( - "federation_sender" + name="federation_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).set(now - ts) synapse.metrics.event_processing_last_ts.labels( - "federation_sender" + name="federation_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).set(ts) events_processed_counter.labels( @@ -726,7 +728,7 @@ class FederationSender(AbstractFederationSender): ).inc() synapse.metrics.event_processing_positions.labels( - "federation_sender" + name="federation_sender", **{SERVER_NAME_LABEL: self.server_name} ).set(next_token) finally: diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 21e2fed085..63ed13c6fa 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -34,6 +34,7 @@ from synapse.logging.opentracing import ( tags, whitelisted_homeserver, ) +from synapse.metrics import SERVER_NAME_LABEL from synapse.types import JsonDict from synapse.util import json_decoder from synapse.util.metrics import measure_func @@ -47,7 +48,7 @@ issue_8631_logger = logging.getLogger("synapse.8631_debug") last_pdu_ts_metric = Gauge( "synapse_federation_last_sent_pdu_time", "The timestamp of the last PDU which was successfully sent to the given domain", - labelnames=("server_name",), + labelnames=("destination_server_name", SERVER_NAME_LABEL), ) @@ -191,6 +192,7 @@ class TransactionManager: if pdus and destination in self._federation_metrics_domains: last_pdu = pdus[-1] - last_pdu_ts_metric.labels(server_name=destination).set( - last_pdu.origin_server_ts / 1000 - ) + last_pdu_ts_metric.labels( + destination_server_name=destination, + **{SERVER_NAME_LABEL: self.server_name}, + ).set(last_pdu.origin_server_ts / 1000) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fcbb46f8fe..9397e2cc8c 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -207,7 +207,8 @@ class ApplicationServicesHandler: await self.store.set_appservice_last_pos(upper_bound) synapse.metrics.event_processing_positions.labels( - "appservice_sender" + name="appservice_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).set(upper_bound) events_processed_counter.labels( @@ -230,10 +231,12 @@ class ApplicationServicesHandler: assert ts is not None synapse.metrics.event_processing_lag.labels( - "appservice_sender" + name="appservice_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).set(now - ts) synapse.metrics.event_processing_last_ts.labels( - "appservice_sender" + name="appservice_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).set(ts) finally: self.is_processing = False diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index a093505ca0..ce13dcc737 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -22,7 +22,7 @@ from synapse.api.errors import ShadowBanError from synapse.api.ratelimiting import Ratelimiter from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.logging.opentracing import set_tag -from synapse.metrics import event_processing_positions +from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.delayed_events import ( ReplicationAddedDelayedEventRestServlet, @@ -191,7 +191,9 @@ class DelayedEventsHandler: self._event_pos = max_pos # Expose current event processing position to prometheus - event_processing_positions.labels("delayed_events").set(max_pos) + event_processing_positions.labels( + name="delayed_events", **{SERVER_NAME_LABEL: self.server_name} + ).set(max_pos) await self._store.update_delayed_events_stream_pos(max_pos) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3e8866649b..a85178d5aa 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1568,9 +1568,9 @@ class PresenceHandler(BasePresenceHandler): self._event_pos = max_pos # Expose current event processing position to prometheus - synapse.metrics.event_processing_positions.labels("presence").set( - max_pos - ) + synapse.metrics.event_processing_positions.labels( + name="presence", **{SERVER_NAME_LABEL: self.server_name} + ).set(max_pos) async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None: """Process current state deltas for the room to find new joins that need diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 897731e4df..379c843057 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -49,7 +49,7 @@ from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging import opentracing -from synapse.metrics import event_processing_positions +from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.push import ReplicationCopyPusherRestServlet from synapse.storage.databases.main.state_deltas import StateDelta @@ -2255,7 +2255,9 @@ class RoomForgetterHandler(StateDeltasHandler): self.pos = max_pos # Expose current event processing position to prometheus - event_processing_positions.labels("room_forgetter").set(max_pos) + event_processing_positions.labels( + name="room_forgetter", **{SERVER_NAME_LABEL: self.server_name} + ).set(max_pos) await self._store.update_room_forgetter_stream_pos(max_pos) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 5cd4ec2b82..a2602ea818 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -32,7 +32,7 @@ from typing import ( ) from synapse.api.constants import EventContentFields, EventTypes, Membership -from synapse.metrics import event_processing_positions +from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import JsonDict @@ -147,7 +147,9 @@ class StatsHandler: logger.debug("Handled room stats to %s -> %s", self.pos, max_pos) - event_processing_positions.labels("stats").set(max_pos) + event_processing_positions.labels( + name="stats", **{SERVER_NAME_LABEL: self.server_name} + ).set(max_pos) self.pos = max_pos diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 17a3a87d86..130099a239 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -35,6 +35,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import Codes, SynapseError from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.user_directory import SearchResult @@ -262,9 +263,9 @@ class UserDirectoryHandler(StateDeltasHandler): self.pos = max_pos # Expose current event processing position to prometheus - synapse.metrics.event_processing_positions.labels("user_dir").set( - max_pos - ) + synapse.metrics.event_processing_positions.labels( + name="user_dir", **{SERVER_NAME_LABEL: self.server_name} + ).set(max_pos) await self.store.update_user_directory_stream_pos(max_pos) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bb4213a060..9c482c760f 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -495,19 +495,27 @@ event_processing_loop_room_count = Counter( # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. -event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) +event_processing_positions = Gauge( + "synapse_event_processing_positions", "", labelnames=["name", SERVER_NAME_LABEL] +) # Used to track the current max events stream position -event_persisted_position = Gauge("synapse_event_persisted_position", "") +event_persisted_position = Gauge( + "synapse_event_persisted_position", "", labelnames=[SERVER_NAME_LABEL] +) # Used to track the received_ts of the last event processed by various # components -event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) +event_processing_last_ts = Gauge( + "synapse_event_processing_last_ts", "", labelnames=["name", SERVER_NAME_LABEL] +) # Used to track the lag processing events. This is the time difference # between the last processed event's received_ts and the time it was # finished being processed. -event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +event_processing_lag = Gauge( + "synapse_event_processing_lag", "", labelnames=["name", SERVER_NAME_LABEL] +) event_processing_lag_by_event = Histogram( "synapse_event_processing_lag_by_event", @@ -516,7 +524,11 @@ event_processing_lag_by_event = Histogram( ) # Build info of the running server. -build_info = Gauge( +# +# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. We +# consider this process-level because all Synapse homeservers running in the process +# will use the same Synapse version. +build_info = Gauge( # type: ignore[missing-server-name-label] "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] ) build_info.labels( @@ -538,38 +550,51 @@ threepid_send_requests = Histogram( threadpool_total_threads = Gauge( "synapse_threadpool_total_threads", "Total number of threads currently in the threadpool", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) threadpool_total_working_threads = Gauge( "synapse_threadpool_working_threads", "Number of threads currently working in the threadpool", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) threadpool_total_min_threads = Gauge( "synapse_threadpool_min_threads", "Minimum number of threads configured in the threadpool", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) threadpool_total_max_threads = Gauge( "synapse_threadpool_max_threads", "Maximum number of threads configured in the threadpool", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) -def register_threadpool(name: str, threadpool: ThreadPool) -> None: - """Add metrics for the threadpool.""" +def register_threadpool(*, name: str, server_name: str, threadpool: ThreadPool) -> None: + """ + Add metrics for the threadpool. - threadpool_total_min_threads.labels(name).set(threadpool.min) - threadpool_total_max_threads.labels(name).set(threadpool.max) + Args: + name: The name of the threadpool, used to identify it in the metrics. + server_name: The homeserver name (used to label metrics) (this should be `hs.hostname`). + threadpool: The threadpool to register metrics for. + """ - threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads)) - threadpool_total_working_threads.labels(name).set_function( - lambda: len(threadpool.working) - ) + threadpool_total_min_threads.labels( + name=name, **{SERVER_NAME_LABEL: server_name} + ).set(threadpool.min) + threadpool_total_max_threads.labels( + name=name, **{SERVER_NAME_LABEL: server_name} + ).set(threadpool.max) + + threadpool_total_threads.labels( + name=name, **{SERVER_NAME_LABEL: server_name} + ).set_function(lambda: len(threadpool.threads)) + threadpool_total_working_threads.labels( + name=name, **{SERVER_NAME_LABEL: server_name} + ).set_function(lambda: len(threadpool.working)) class MetricsResource(Resource): diff --git a/synapse/metrics/_gc.py b/synapse/metrics/_gc.py index d16481a0f6..ee86e27479 100644 --- a/synapse/metrics/_gc.py +++ b/synapse/metrics/_gc.py @@ -54,7 +54,8 @@ running_on_pypy = platform.python_implementation() == "PyPy" # Python GC metrics # -gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) # type: ignore[missing-server-name-label] gc_time = Histogram( "python_gc_time", "Time taken to GC (sec)", diff --git a/synapse/metrics/common_usage_metrics.py b/synapse/metrics/common_usage_metrics.py index ea1ffd171d..cd1c3c8649 100644 --- a/synapse/metrics/common_usage_metrics.py +++ b/synapse/metrics/common_usage_metrics.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING import attr +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process if TYPE_CHECKING: @@ -33,6 +34,7 @@ from prometheus_client import Gauge current_dau_gauge = Gauge( "synapse_admin_daily_active_users", "Current daily active users count", + labelnames=[SERVER_NAME_LABEL], ) @@ -89,4 +91,6 @@ class CommonUsageMetricsManager: """Update the Prometheus gauges.""" metrics = await self._collect() - current_dau_gauge.set(float(metrics.daily_active_users)) + current_dau_gauge.labels( + **{SERVER_NAME_LABEL: self.server_name}, + ).set(float(metrics.daily_active_users)) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index d6d5de2ec5..ee51872c2c 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Dict, Iterable, Optional from prometheus_client import Gauge from synapse.api.errors import Codes, SynapseError +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -44,7 +45,9 @@ logger = logging.getLogger(__name__) synapse_pushers = Gauge( - "synapse_pushers", "Number of active synapse pushers", ["kind", "app_id"] + "synapse_pushers", + "Number of active synapse pushers", + labelnames=["kind", "app_id", SERVER_NAME_LABEL], ) @@ -420,11 +423,17 @@ class PusherPool: previous_pusher.on_stop() synapse_pushers.labels( - type(previous_pusher).__name__, previous_pusher.app_id + kind=type(previous_pusher).__name__, + app_id=previous_pusher.app_id, + **{SERVER_NAME_LABEL: self.server_name}, ).dec() byuser[appid_pushkey] = pusher - synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc() + synapse_pushers.labels( + kind=type(pusher).__name__, + app_id=pusher.app_id, + **{SERVER_NAME_LABEL: self.server_name}, + ).inc() logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey) @@ -476,4 +485,8 @@ class PusherPool: pusher = byuser.pop(appid_pushkey) pusher.on_stop() - synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() + synapse_pushers.labels( + kind=type(pusher).__name__, + app_id=pusher.app_id, + **{SERVER_NAME_LABEL: self.server_name}, + ).dec() diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index e924678a5b..0850a99e0c 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -52,7 +52,7 @@ logger = logging.getLogger(__name__) _pending_outgoing_requests = Gauge( "synapse_pending_outgoing_replication_requests", "Number of active outgoing replication requests, by replication method name", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) _outgoing_request_counter = Counter( @@ -213,7 +213,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): instance_map = hs.config.worker.instance_map - outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) + outgoing_gauge = _pending_outgoing_requests.labels( + name=cls.NAME, + **{SERVER_NAME_LABEL: server_name}, + ) replication_secret = None if hs.config.worker.worker_replication_secret: diff --git a/synapse/server.py b/synapse/server.py index b6d1aca616..1dc2781e4f 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -981,7 +981,10 @@ class HomeServer(metaclass=abc.ABCMeta): ) # Register the threadpool with our metrics. - register_threadpool("media", media_threadpool) + server_name = self.hostname + register_threadpool( + name="media", server_name=server_name, threadpool=media_threadpool + ) return media_threadpool diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0ecfe42152..5ef9139c29 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -126,9 +126,11 @@ class _PoolConnection(Connection): def make_pool( + *, reactor: IReactorCore, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine, + server_name: str, ) -> adbapi.ConnectionPool: """Get the connection pool for the database.""" @@ -152,7 +154,11 @@ def make_pool( **db_args, ) - register_threadpool(f"database-{db_config.name}", connection_pool.threadpool) + register_threadpool( + name=f"database-{db_config.name}", + server_name=server_name, + threadpool=connection_pool.threadpool, + ) return connection_pool @@ -573,7 +579,12 @@ class DatabasePool: self._clock = hs.get_clock() self._txn_limit = database_config.config.get("txn_limit", 0) self._database_config = database_config - self._db_pool = make_pool(hs.get_reactor(), database_config, engine) + self._db_pool = make_pool( + reactor=hs.get_reactor(), + db_config=database_config, + engine=engine, + server_name=self.server_name, + ) self.updates = BackgroundUpdater(hs, self) LaterGauge( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index abe215d26e..bee34ef6a3 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -71,11 +71,13 @@ if TYPE_CHECKING: oldest_pdu_in_federation_staging = Gauge( "synapse_federation_server_oldest_inbound_pdu_in_staging", "The age in seconds since we received the oldest pdu in the federation staging area", + labelnames=[SERVER_NAME_LABEL], ) number_pdus_in_federation_queue = Gauge( "synapse_federation_server_number_inbound_pdu_in_staging", "The total number of events in the inbound federation staging", + labelnames=[SERVER_NAME_LABEL], ) pdus_pruned_from_federation_queue = Counter( @@ -2060,8 +2062,12 @@ class EventFederationWorkerStore( "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn ) - number_pdus_in_federation_queue.set(count) - oldest_pdu_in_federation_staging.set(age) + number_pdus_in_federation_queue.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).set(count) + oldest_pdu_in_federation_staging.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).set(age) async def clean_room_for_join(self, room_id: str) -> None: await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1b2e584ccc..3ac602cd67 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -368,7 +368,9 @@ class PersistEventsStore: if not use_negative_stream_ordering: # we don't want to set the event_persisted_position to a negative # stream_ordering. - synapse.metrics.event_persisted_position.set(stream) + synapse.metrics.event_persisted_position.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).set(stream) for event, context in events_and_contexts: if context.app_service: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e067b53d97..7f015aa22c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -68,6 +68,7 @@ from synapse.logging.opentracing import ( tag_args, trace, ) +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -138,6 +139,7 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events event_fetch_ongoing_gauge = Gauge( "synapse_event_fetch_ongoing", "The number of event fetchers that are running", + labelnames=[SERVER_NAME_LABEL], ) @@ -312,7 +314,9 @@ class EventsWorkerStore(SQLBaseStore): Tuple[Iterable[str], "defer.Deferred[Dict[str, _EventRow]]"] ] = [] self._event_fetch_ongoing = 0 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + event_fetch_ongoing_gauge.labels(**{SERVER_NAME_LABEL: self.server_name}).set( + self._event_fetch_ongoing + ) # We define this sequence here so that it can be referenced from both # the DataStore and PersistEventStore. @@ -1140,7 +1144,9 @@ class EventsWorkerStore(SQLBaseStore): and self._event_fetch_ongoing < EVENT_QUEUE_THREADS ): self._event_fetch_ongoing += 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + event_fetch_ongoing_gauge.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).set(self._event_fetch_ongoing) # `_event_fetch_ongoing` is decremented in `_fetch_thread`. should_start = True else: @@ -1164,7 +1170,9 @@ class EventsWorkerStore(SQLBaseStore): event_fetches_to_fail = [] with self._event_fetch_lock: self._event_fetch_ongoing -= 1 - event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) + event_fetch_ongoing_gauge.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).set(self._event_fetch_ongoing) # There may still be work remaining in `_event_fetch_list` if we # failed, or it was added in between us deciding to exit and diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 3a2d664d28..4c0f129423 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -37,6 +37,7 @@ from prometheus_client import Gauge from twisted.internet import defer from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util import Clock @@ -49,19 +50,19 @@ R = TypeVar("R") number_queued = Gauge( "synapse_util_batching_queue_number_queued", "The number of items waiting in the queue across all keys", - labelnames=("name",), + labelnames=("name", SERVER_NAME_LABEL), ) number_in_flight = Gauge( "synapse_util_batching_queue_number_pending", "The number of items across all keys either being processed or waiting in a queue", - labelnames=("name",), + labelnames=("name", SERVER_NAME_LABEL), ) number_of_keys = Gauge( "synapse_util_batching_queue_number_of_keys", "The number of distinct keys that have items queued", - labelnames=("name",), + labelnames=("name", SERVER_NAME_LABEL), ) @@ -114,14 +115,18 @@ class BatchingQueue(Generic[V, R]): # The function to call with batches of values. self._process_batch_callback = process_batch_callback - number_queued.labels(self._name).set_function( - lambda: sum(len(q) for q in self._next_values.values()) + number_queued.labels( + name=self._name, **{SERVER_NAME_LABEL: self.server_name} + ).set_function(lambda: sum(len(q) for q in self._next_values.values())) + + number_of_keys.labels( + name=self._name, **{SERVER_NAME_LABEL: self.server_name} + ).set_function(lambda: len(self._next_values)) + + self._number_in_flight_metric: Gauge = number_in_flight.labels( + name=self._name, **{SERVER_NAME_LABEL: self.server_name} ) - number_of_keys.labels(self._name).set_function(lambda: len(self._next_values)) - - self._number_in_flight_metric: Gauge = number_in_flight.labels(self._name) - async def add_to_queue(self, value: V, key: Hashable = ()) -> R: """Adds the value to the queue with the given key, returning the result of the processing function for the batch that included the given value. diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 0c6c912918..92d446ce2a 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -43,6 +43,7 @@ from prometheus_client import Gauge from twisted.internet import defer from twisted.python.failure import Failure +from synapse.metrics import SERVER_NAME_LABEL from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry @@ -50,7 +51,7 @@ from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry cache_pending_metric = Gauge( "synapse_util_caches_cache_pending", "Number of lookups currently pending for this cache", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) T = TypeVar("T") @@ -111,7 +112,9 @@ class DeferredCache(Generic[KT, VT]): ] = cache_type() def metrics_cb() -> None: - cache_pending_metric.labels(name).set(len(self._pending_deferred_cache)) + cache_pending_metric.labels( + name=name, **{SERVER_NAME_LABEL: server_name} + ).set(len(self._pending_deferred_cache)) # cache is used for completed results and maps to the result itself, rather than # a Deferred. diff --git a/tests/server.py b/tests/server.py index 0ace329619..9e8ca9ccfb 100644 --- a/tests/server.py +++ b/tests/server.py @@ -702,6 +702,7 @@ def make_fake_db_pool( reactor: ISynapseReactor, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine, + server_name: str, ) -> adbapi.ConnectionPool: """Wrapper for `make_pool` which builds a pool which runs db queries synchronously. @@ -710,7 +711,9 @@ def make_fake_db_pool( is a drop-in replacement for the normal `make_pool` which builds such a connection pool. """ - pool = make_pool(reactor, db_config, engine) + pool = make_pool( + reactor=reactor, db_config=db_config, engine=engine, server_name=server_name + ) def runWithConnection( func: Callable[..., R], *args: Any, **kwargs: Any diff --git a/tests/util/test_batching_queue.py b/tests/util/test_batching_queue.py index 49ddc11725..532582cf87 100644 --- a/tests/util/test_batching_queue.py +++ b/tests/util/test_batching_queue.py @@ -42,9 +42,9 @@ class BatchingQueueTestCase(TestCase): # We ensure that we remove any existing metrics for "test_queue". try: - number_queued.remove("test_queue") - number_of_keys.remove("test_queue") - number_in_flight.remove("test_queue") + number_queued.remove("test_queue", "test_server") + number_of_keys.remove("test_queue", "test_server") + number_in_flight.remove("test_queue", "test_server") except KeyError: pass