Clean up old device_federation_inbox rows (#18546)

Fixes https://github.com/element-hq/synapse/issues/17370
This commit is contained in:
Erik Johnston 2025-06-18 12:58:31 +01:00 committed by GitHub
parent 73a38384f5
commit 33e0c25279
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 250 additions and 1 deletions

1
changelog.d/18546.misc Normal file
View File

@ -0,0 +1 @@
Clean up old, unused rows from the `device_federation_inbox` table.

View File

@ -42,6 +42,7 @@ from synapse.logging.opentracing import (
start_active_span,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@ -52,7 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util import Duration, json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.stringutils import parse_and_validate_server_name
@ -63,6 +64,18 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How long to keep messages in the device federation inbox before deleting them.
DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS = 7 * Duration.DAY_MS
# How often to run the task to clean up old device_federation_inbox rows.
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS = 5 * Duration.MINUTE_MS
# Update name for the device federation inbox received timestamp index.
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE = (
"device_federation_inbox_received_ts_index"
)
class DeviceInboxWorkerStore(SQLBaseStore):
def __init__(
self,
@ -134,6 +147,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
prefilled_cache=device_outbox_prefill,
)
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS,
"_delete_old_federation_inbox_rows",
self._delete_old_federation_inbox_rows,
)
def process_replication_rows(
self,
stream_name: str,
@ -960,6 +981,52 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
async def _delete_old_federation_inbox_rows(self, batch_size: int = 1000) -> None:
"""Delete old rows from the device_federation_inbox table."""
# We wait until we have the index on `received_ts`, otherwise the query
# will take a very long time.
if not await self.db_pool.updates.has_completed_background_update(
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE
):
return
def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool:
# We delete at most 100 rows that are older than
# DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
delete_before_ts = (
self._clock.time_msec() - DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
)
sql = """
WITH to_delete AS (
SELECT origin, message_id
FROM device_federation_inbox
WHERE received_ts < ?
ORDER BY received_ts ASC
LIMIT ?
)
DELETE FROM device_federation_inbox
WHERE
(origin, message_id) IN (
SELECT origin, message_id FROM to_delete
)
"""
txn.execute(sql, (delete_before_ts, batch_size))
return txn.rowcount < batch_size
while True:
finished = await self.db_pool.runInteraction(
"_delete_old_federation_inbox_rows",
_delete_old_federation_inbox_rows_txn,
db_autocommit=True, # We don't need to run in a transaction
)
if finished:
return
# We sleep a bit so that we don't hammer the database in a tight
# loop first time we run this.
self._clock.sleep(1)
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
@ -995,6 +1062,13 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
self._cleanup_device_federation_outbox,
)
self.db_pool.updates.register_background_index_update(
update_name=DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE,
index_name="device_federation_inbox_received_ts_index",
table="device_federation_inbox",
columns=["received_ts"],
)
async def _background_drop_index_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:

View File

@ -0,0 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Background update that adds an index to `device_federation_inbox.received_ts`
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(9206, 'device_federation_inbox_received_ts_index', '{}');

View File

@ -55,6 +55,15 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Duration:
"""Helper class that holds constants for common time durations in
milliseconds."""
MINUTE_MS = 60 * 1000
HOUR_MS = 60 * MINUTE_MS
DAY_MS = 24 * HOUR_MS
def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)

View File

@ -19,11 +19,17 @@
#
#
from unittest.mock import patch
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor
from synapse.rest import admin
from synapse.rest.client import devices
from synapse.server import HomeServer
from synapse.storage.databases.main.deviceinbox import (
DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS,
)
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
@ -172,3 +178,146 @@ class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
self.assertEqual(1, len(res))
self.assertEqual(res[0], "cur_device")
class DeviceInboxFederationInboxCleanupTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
devices.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.db_pool = self.store.db_pool
# Advance time to ensure we are past the cleanup delay
self.reactor.advance(DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS * 2 / 1000)
def test_delete_old_federation_inbox_rows_skips_if_no_index(self) -> None:
"""Test that we don't delete rows if the index hasn't been created yet."""
# Insert some test data into device_federation_inbox
for i in range(5):
self.get_success(
self.db_pool.simple_insert(
"device_federation_inbox",
{
"origin": "example.com",
"message_id": f"msg_{i}",
"received_ts": 0,
},
)
)
# Mock to report the update as not completed
with patch(
"synapse.storage.background_updates.BackgroundUpdater.has_completed_background_update"
) as mock:
mock.return_value = False
self.get_success(self.store._delete_old_federation_inbox_rows())
# Check that no rows were deleted
rows = self.get_success(
self.db_pool.simple_select_list(
"device_federation_inbox",
keyvalues={},
retcols=["origin", "message_id", "received_ts"],
)
)
self.assertEqual(
len(rows), 5, "Expected no rows to be deleted when index is missing"
)
def test_delete_old_federation_inbox_rows(self) -> None:
"""Test that old rows are deleted from device_federation_inbox."""
# Insert old messages
for i in range(5):
self.get_success(
self.db_pool.simple_insert(
"device_federation_inbox",
{
"origin": "old.example.com",
"message_id": f"old_msg_{i}",
"received_ts": self.clock.time_msec(),
},
)
)
self.reactor.advance(2 * DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS / 1000)
# Insert new messages
for i in range(5):
self.get_success(
self.db_pool.simple_insert(
"device_federation_inbox",
{
"origin": "new.example.com",
"message_id": f"new_msg_{i}",
"received_ts": self.clock.time_msec(),
},
)
)
# Run the cleanup
self.get_success(self.store._delete_old_federation_inbox_rows())
# Check that only old messages were deleted
rows = self.get_success(
self.db_pool.simple_select_onecol(
"device_federation_inbox",
keyvalues={},
retcol="origin",
)
)
self.assertEqual(len(rows), 5, "Expected only 5 new messages to remain")
for origin in rows:
self.assertEqual(origin, "new.example.com")
def test_delete_old_federation_inbox_rows_batch_limit(self) -> None:
"""Test that the deletion happens in batches."""
# Insert 10 old messages (more than the 5 batch limit)
for i in range(10):
self.get_success(
self.db_pool.simple_insert(
"device_federation_inbox",
{
"origin": "old.example.com",
"message_id": f"old_msg_{i}",
"received_ts": self.clock.time_msec(),
},
)
)
# Advance time to ensure we are past the cleanup delay
self.reactor.advance(2 * DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS / 1000)
# Run the cleanup - it should delete in batches and sleep between them
deferred = defer.ensureDeferred(
self.store._delete_old_federation_inbox_rows(batch_size=5)
)
# Check that the deferred doesn't resolve immediately
self.assertFalse(deferred.called)
# Advance the reactor to allow the cleanup to continue and complete
self.reactor.advance(2)
self.get_success(deferred)
# Check that all messages were deleted after multiple batches
rows = self.get_success(
self.db_pool.simple_select_list(
"device_federation_inbox",
keyvalues={},
retcols=["origin", "message_id"],
)
)
self.assertEqual(
len(rows),
0,
"Expected all messages to be deleted after multiple batches",
)