Allow a few admin APIs used by MAS to run on workers (#18313)

This should be reviewed commit by commit.

It adds a few admin servlets that are used by MAS when in delegation
mode to workers

---------

Co-authored-by: Olivier 'reivilibre <oliverw@matrix.org>
Co-authored-by: Devon Hudson <devon.dmytro@gmail.com>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Quentin Gliech 2025-05-02 15:37:58 +02:00 committed by GitHub
parent 411d239db4
commit b8146d4b03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 249 additions and 200 deletions

1
changelog.d/18313.misc Normal file
View File

@ -0,0 +1 @@
Allow a few admin APIs used by matrix-authentication-service to run on workers.

View File

@ -323,6 +323,15 @@ For multiple workers not handling the SSO endpoints properly, see
[#7530](https://github.com/matrix-org/synapse/issues/7530) and [#7530](https://github.com/matrix-org/synapse/issues/7530) and
[#9427](https://github.com/matrix-org/synapse/issues/9427). [#9427](https://github.com/matrix-org/synapse/issues/9427).
Additionally, when MSC3861 is enabled (`experimental_features.msc3861.enabled`
set to `true`), the following endpoints can be handled by the worker:
^/_synapse/admin/v2/users/[^/]+$
^/_synapse/admin/v1/username_available$
^/_synapse/admin/v1/users/[^/]+/_allow_cross_signing_replacement_without_uia$
# Only the GET method:
^/_synapse/admin/v1/users/[^/]+/devices$
Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners) Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners)
with `client` and `federation` `resources` must be configured in the with `client` and `federation` `resources` must be configured in the
[`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners)

View File

@ -51,8 +51,7 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource from synapse.rest import ClientRestResource, admin
from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
from synapse.rest.health import HealthResource from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree from synapse.rest.synapse.client import build_synapse_client_resource_tree
@ -176,8 +175,13 @@ class GenericWorkerServer(HomeServer):
def _listen_http(self, listener_config: ListenerConfig) -> None: def _listen_http(self, listener_config: ListenerConfig) -> None:
assert listener_config.http_options is not None assert listener_config.http_options is not None
# We always include a health resource. # We always include an admin resource that we populate with servlets as needed
resources: Dict[str, Resource] = {"/health": HealthResource()} admin_resource = JsonResource(self, canonical_json=False)
resources: Dict[str, Resource] = {
# We always include a health resource.
"/health": HealthResource(),
"/_synapse/admin": admin_resource,
}
for res in listener_config.http_options.resources: for res in listener_config.http_options.resources:
for name in res.names: for name in res.names:
@ -190,7 +194,7 @@ class GenericWorkerServer(HomeServer):
resources.update(build_synapse_client_resource_tree(self)) resources.update(build_synapse_client_resource_tree(self))
resources["/.well-known"] = well_known_resource(self) resources["/.well-known"] = well_known_resource(self)
resources["/_synapse/admin"] = AdminRestResource(self) admin.register_servlets(self, admin_resource)
elif name == "federation": elif name == "federation":
resources[FEDERATION_PREFIX] = TransportLayerServer(self) resources[FEDERATION_PREFIX] = TransportLayerServer(self)
@ -200,15 +204,13 @@ class GenericWorkerServer(HomeServer):
# We need to serve the admin servlets for media on the # We need to serve the admin servlets for media on the
# worker. # worker.
admin_resource = JsonResource(self, canonical_json=False) admin.register_servlets_for_media_repo(self, admin_resource)
register_servlets_for_media_repo(self, admin_resource)
resources.update( resources.update(
{ {
MEDIA_R0_PREFIX: media_repo, MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo, MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo,
"/_synapse/admin": admin_resource,
} }
) )

View File

@ -54,6 +54,7 @@ from synapse.config.server import ListenerConfig, TCPListenerConfig
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import ( from synapse.http.server import (
JsonResource,
OptionsResource, OptionsResource,
RootOptionsRedirectResource, RootOptionsRedirectResource,
StaticResource, StaticResource,
@ -61,8 +62,7 @@ from synapse.http.server import (
from synapse.logging.context import LoggingContext from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource from synapse.rest import ClientRestResource, admin
from synapse.rest.admin import AdminRestResource
from synapse.rest.health import HealthResource from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree from synapse.rest.synapse.client import build_synapse_client_resource_tree
@ -180,11 +180,14 @@ class SynapseHomeServer(HomeServer):
if compress: if compress:
client_resource = gz_wrap(client_resource) client_resource = gz_wrap(client_resource)
admin_resource = JsonResource(self, canonical_json=False)
admin.register_servlets(self, admin_resource)
resources.update( resources.update(
{ {
CLIENT_API_PREFIX: client_resource, CLIENT_API_PREFIX: client_resource,
"/.well-known": well_known_resource(self), "/.well-known": well_known_resource(self),
"/_synapse/admin": AdminRestResource(self), "/_synapse/admin": admin_resource,
**build_synapse_client_resource_tree(self), **build_synapse_client_resource_tree(self),
} }
) )

View File

@ -36,10 +36,17 @@ class SetPasswordHandler:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self._auth_handler = hs.get_auth_handler() self._auth_handler = hs.get_auth_handler()
# This can only be instantiated on the main process.
device_handler = hs.get_device_handler() # We don't need the device handler if password changing is disabled.
assert isinstance(device_handler, DeviceHandler) # This allows us to instantiate the SetPasswordHandler on the workers
self._device_handler = device_handler # that have admin APIs for MAS
if self._auth_handler.can_change_password():
# This can only be instantiated on the main process.
device_handler = hs.get_device_handler()
assert isinstance(device_handler, DeviceHandler)
self._device_handler: Optional[DeviceHandler] = device_handler
else:
self._device_handler = None
async def set_password( async def set_password(
self, self,
@ -51,6 +58,9 @@ class SetPasswordHandler:
if not self._auth_handler.can_change_password(): if not self._auth_handler.can_change_password():
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN) raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
# We should have this available only if password changing is enabled.
assert self._device_handler is not None
try: try:
await self.store.user_set_password_hash(user_id, password_hash) await self.store.user_set_password_hash(user_id, password_hash)
except StoreError as e: except StoreError as e:

View File

@ -187,7 +187,6 @@ class ClientRestResource(JsonResource):
mutual_rooms.register_servlets, mutual_rooms.register_servlets,
login_token_request.register_servlets, login_token_request.register_servlets,
rendezvous.register_servlets, rendezvous.register_servlets,
auth_metadata.register_servlets,
]: ]:
continue continue

View File

@ -39,7 +39,7 @@ from typing import TYPE_CHECKING, Optional, Tuple
from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME
from synapse.http.server import HttpServer, JsonResource from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
@ -51,6 +51,7 @@ from synapse.rest.admin.background_updates import (
from synapse.rest.admin.devices import ( from synapse.rest.admin.devices import (
DeleteDevicesRestServlet, DeleteDevicesRestServlet,
DeviceRestServlet, DeviceRestServlet,
DevicesGetRestServlet,
DevicesRestServlet, DevicesRestServlet,
) )
from synapse.rest.admin.event_reports import ( from synapse.rest.admin.event_reports import (
@ -264,14 +265,6 @@ class PurgeHistoryStatusRestServlet(RestServlet):
######################################################################################## ########################################################################################
class AdminRestResource(JsonResource):
"""The REST resource which gets mounted at /_synapse/admin"""
def __init__(self, hs: "HomeServer"):
JsonResource.__init__(self, hs, canonical_json=False)
register_servlets(hs, self)
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
""" """
Register all the admin servlets. Register all the admin servlets.
@ -280,6 +273,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
# Admin servlets below may not work on workers. # Admin servlets below may not work on workers.
if hs.config.worker.worker_app is not None: if hs.config.worker.worker_app is not None:
# Some admin servlets can be mounted on workers when MSC3861 is enabled.
if hs.config.experimental.msc3861.enabled:
register_servlets_for_msc3861_delegation(hs, http_server)
return return
register_servlets_for_client_rest_resource(hs, http_server) register_servlets_for_client_rest_resource(hs, http_server)
@ -367,4 +364,16 @@ def register_servlets_for_client_rest_resource(
ListMediaInRoom(hs).register(http_server) ListMediaInRoom(hs).register(http_server)
# don't add more things here: new servlets should only be exposed on # don't add more things here: new servlets should only be exposed on
# /_synapse/admin so should not go here. Instead register them in AdminRestResource. # /_synapse/admin so should not go here. Instead register them in register_servlets.
def register_servlets_for_msc3861_delegation(
hs: "HomeServer", http_server: HttpServer
) -> None:
"""Register servlets needed by MAS when MSC3861 is enabled"""
assert hs.config.experimental.msc3861.enabled
UserRestServletV2(hs).register(http_server)
UsernameAvailableRestServlet(hs).register(http_server)
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
DevicesGetRestServlet(hs).register(http_server)

View File

@ -113,18 +113,19 @@ class DeviceRestServlet(RestServlet):
return HTTPStatus.OK, {} return HTTPStatus.OK, {}
class DevicesRestServlet(RestServlet): class DevicesGetRestServlet(RestServlet):
""" """
Retrieve the given user's devices Retrieve the given user's devices
This can be mounted on workers as it is read-only, as opposed
to `DevicesRestServlet`.
""" """
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2") PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth() self.auth = hs.get_auth()
handler = hs.get_device_handler() self.device_worker_handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_handler = handler
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.is_mine = hs.is_mine self.is_mine = hs.is_mine
@ -141,9 +142,24 @@ class DevicesRestServlet(RestServlet):
if u is None: if u is None:
raise NotFoundError("Unknown user") raise NotFoundError("Unknown user")
devices = await self.device_handler.get_devices_by_user(target_user.to_string()) devices = await self.device_worker_handler.get_devices_by_user(
target_user.to_string()
)
return HTTPStatus.OK, {"devices": devices, "total": len(devices)} return HTTPStatus.OK, {"devices": devices, "total": len(devices)}
class DevicesRestServlet(DevicesGetRestServlet):
"""
Retrieve the given user's devices
"""
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
assert isinstance(self.device_worker_handler, DeviceHandler)
self.device_handler = self.device_worker_handler
async def on_POST( async def on_POST(
self, request: SynapseRequest, user_id: str self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:

View File

@ -1501,6 +1501,45 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"delete_old_otks_for_next_user_batch", impl "delete_old_otks_for_next_user_batch", impl
) )
async def allow_master_cross_signing_key_replacement_without_uia(
self, user_id: str, duration_ms: int
) -> Optional[int]:
"""Mark this user's latest master key as being replaceable without UIA.
Said replacement will only be permitted for a short time after calling this
function. That time period is controlled by the duration argument.
Returns:
None, if there is no such key.
Otherwise, the timestamp before which replacement is allowed without UIA.
"""
timestamp = self._clock.time_msec() + duration_ms
def impl(txn: LoggingTransaction) -> Optional[int]:
txn.execute(
"""
UPDATE e2e_cross_signing_keys
SET updatable_without_uia_before_ms = ?
WHERE stream_id = (
SELECT stream_id
FROM e2e_cross_signing_keys
WHERE user_id = ? AND keytype = 'master'
ORDER BY stream_id DESC
LIMIT 1
)
""",
(timestamp, user_id),
)
if txn.rowcount == 0:
return None
return timestamp
return await self.db_pool.runInteraction(
"allow_master_cross_signing_key_replacement_without_uia",
impl,
)
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def __init__( def __init__(
@ -1755,42 +1794,3 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
], ],
desc="add_e2e_signing_key", desc="add_e2e_signing_key",
) )
async def allow_master_cross_signing_key_replacement_without_uia(
self, user_id: str, duration_ms: int
) -> Optional[int]:
"""Mark this user's latest master key as being replaceable without UIA.
Said replacement will only be permitted for a short time after calling this
function. That time period is controlled by the duration argument.
Returns:
None, if there is no such key.
Otherwise, the timestamp before which replacement is allowed without UIA.
"""
timestamp = self._clock.time_msec() + duration_ms
def impl(txn: LoggingTransaction) -> Optional[int]:
txn.execute(
"""
UPDATE e2e_cross_signing_keys
SET updatable_without_uia_before_ms = ?
WHERE stream_id = (
SELECT stream_id
FROM e2e_cross_signing_keys
WHERE user_id = ? AND keytype = 'master'
ORDER BY stream_id DESC
LIMIT 1
)
""",
(timestamp, user_id),
)
if txn.rowcount == 0:
return None
return timestamp
return await self.db_pool.runInteraction(
"allow_master_cross_signing_key_replacement_without_uia",
impl,
)

View File

@ -2105,6 +2105,136 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
func=is_user_approved_txn, func=is_user_approved_txn,
) )
async def set_user_deactivated_status(
self, user_id: str, deactivated: bool
) -> None:
"""Set the `deactivated` property for the provided user to the provided value.
Args:
user_id: The ID of the user to set the status for.
deactivated: The value to set for `deactivated`.
"""
await self.db_pool.runInteraction(
"set_user_deactivated_status",
self.set_user_deactivated_status_txn,
user_id,
deactivated,
)
def set_user_deactivated_status_txn(
self, txn: LoggingTransaction, user_id: str, deactivated: bool
) -> None:
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"deactivated": 1 if deactivated else 0},
)
self._invalidate_cache_and_stream(
txn, self.get_user_deactivated_status, (user_id,)
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
self._invalidate_cache_and_stream(txn, self.is_guest, (user_id,))
async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None:
"""
Set whether the user's account is suspended in the `users` table.
Args:
user_id: The user ID of the user in question
suspended: True if the user is suspended, false if not
"""
await self.db_pool.runInteraction(
"set_user_suspended_status",
self.set_user_suspended_status_txn,
user_id,
suspended,
)
def set_user_suspended_status_txn(
self, txn: LoggingTransaction, user_id: str, suspended: bool
) -> None:
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"suspended": suspended},
)
self._invalidate_cache_and_stream(
txn, self.get_user_suspended_status, (user_id,)
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
async def set_user_locked_status(self, user_id: str, locked: bool) -> None:
"""Set the `locked` property for the provided user to the provided value.
Args:
user_id: The ID of the user to set the status for.
locked: The value to set for `locked`.
"""
await self.db_pool.runInteraction(
"set_user_locked_status",
self.set_user_locked_status_txn,
user_id,
locked,
)
def set_user_locked_status_txn(
self, txn: LoggingTransaction, user_id: str, locked: bool
) -> None:
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"locked": locked},
)
self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,))
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
async def update_user_approval_status(
self, user_id: UserID, approved: bool
) -> None:
"""Set the user's 'approved' flag to the given value.
The boolean will be turned into an int (in update_user_approval_status_txn)
because the column is a smallint.
Args:
user_id: the user to update the flag for.
approved: the value to set the flag to.
"""
await self.db_pool.runInteraction(
"update_user_approval_status",
self.update_user_approval_status_txn,
user_id.to_string(),
approved,
)
def update_user_approval_status_txn(
self, txn: LoggingTransaction, user_id: str, approved: bool
) -> None:
"""Set the user's 'approved' flag to the given value.
The boolean is turned into an int because the column is a smallint.
Args:
txn: the current database transaction.
user_id: the user to update the flag for.
approved: the value to set the flag to.
"""
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"approved": approved},
)
# Invalidate the caches of methods that read the value of the 'approved' flag.
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__( def __init__(
@ -2217,117 +2347,6 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
return nb_processed return nb_processed
async def set_user_deactivated_status(
self, user_id: str, deactivated: bool
) -> None:
"""Set the `deactivated` property for the provided user to the provided value.
Args:
user_id: The ID of the user to set the status for.
deactivated: The value to set for `deactivated`.
"""
await self.db_pool.runInteraction(
"set_user_deactivated_status",
self.set_user_deactivated_status_txn,
user_id,
deactivated,
)
def set_user_deactivated_status_txn(
self, txn: LoggingTransaction, user_id: str, deactivated: bool
) -> None:
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"deactivated": 1 if deactivated else 0},
)
self._invalidate_cache_and_stream(
txn, self.get_user_deactivated_status, (user_id,)
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None:
"""
Set whether the user's account is suspended in the `users` table.
Args:
user_id: The user ID of the user in question
suspended: True if the user is suspended, false if not
"""
await self.db_pool.runInteraction(
"set_user_suspended_status",
self.set_user_suspended_status_txn,
user_id,
suspended,
)
def set_user_suspended_status_txn(
self, txn: LoggingTransaction, user_id: str, suspended: bool
) -> None:
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"suspended": suspended},
)
self._invalidate_cache_and_stream(
txn, self.get_user_suspended_status, (user_id,)
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
async def set_user_locked_status(self, user_id: str, locked: bool) -> None:
"""Set the `locked` property for the provided user to the provided value.
Args:
user_id: The ID of the user to set the status for.
locked: The value to set for `locked`.
"""
await self.db_pool.runInteraction(
"set_user_locked_status",
self.set_user_locked_status_txn,
user_id,
locked,
)
def set_user_locked_status_txn(
self, txn: LoggingTransaction, user_id: str, locked: bool
) -> None:
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"locked": locked},
)
self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,))
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
def update_user_approval_status_txn(
self, txn: LoggingTransaction, user_id: str, approved: bool
) -> None:
"""Set the user's 'approved' flag to the given value.
The boolean is turned into an int because the column is a smallint.
Args:
txn: the current database transaction.
user_id: the user to update the flag for.
approved: the value to set the flag to.
"""
self.db_pool.simple_update_one_txn(
txn=txn,
table="users",
keyvalues={"name": user_id},
updatevalues={"approved": approved},
)
# Invalidate the caches of methods that read the value of the 'approved' flag.
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
def __init__( def __init__(
@ -2956,25 +2975,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
start_or_continue_validation_session_txn, start_or_continue_validation_session_txn,
) )
async def update_user_approval_status(
self, user_id: UserID, approved: bool
) -> None:
"""Set the user's 'approved' flag to the given value.
The boolean will be turned into an int (in update_user_approval_status_txn)
because the column is a smallint.
Args:
user_id: the user to update the flag for.
approved: the value to set the flag to.
"""
await self.db_pool.runInteraction(
"update_user_approval_status",
self.update_user_approval_status_txn,
user_id.to_string(),
approved,
)
@wrap_as_background_process("delete_expired_login_tokens") @wrap_as_background_process("delete_expired_login_tokens")
async def _delete_expired_login_tokens(self) -> None: async def _delete_expired_login_tokens(self) -> None:
"""Remove login tokens with expiry dates that have passed.""" """Remove login tokens with expiry dates that have passed."""