Compare commits

...

7 Commits

Author SHA1 Message Date
reivilibre
29c6e393de
Merge d76ae3969534ea7f00da19ad10209ad6f80503bb into 1c093509ceb04ee8ce0eb6a408b76b0fda3ac87c 2025-10-02 17:03:15 +01:00
Eric Eastwood
1c093509ce
Switch task scheduler from raw logcontext manipulation (set_current_context) to utils (PreserveLoggingContext) (#18990)
Prefer the utils over raw logcontext manipulation.

Spawning from adding some logcontext debug logs in
https://github.com/element-hq/synapse/pull/18966 and since we're not
logging at the `set_current_context(...)` level (see reasoning there),
this removes some usage of `set_current_context(...)`.
2025-10-02 10:22:25 -05:00
Olivier 'reivilibre
d76ae39695 Newsfile
Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-09-19 17:49:20 +01:00
Olivier 'reivilibre
de3d453d82 Streams: add cheatsheet for adding a new one 2025-09-19 17:47:58 +01:00
Olivier 'reivilibre
164ebd3203 Streams: fix header levels 2025-09-19 17:47:47 +01:00
Olivier 'reivilibre
7f4f178501 Improve call_after documentation 2025-09-19 17:35:51 +01:00
Olivier 'reivilibre
da12f9535f Improve MultiWriterIdGenerator documentation 2025-09-19 17:35:44 +01:00
6 changed files with 59 additions and 15 deletions

1
changelog.d/18943.doc Normal file
View File

@ -0,0 +1 @@
Improve documentation around streams, particularly ID generators and adding new streams.

1
changelog.d/18990.misc Normal file
View File

@ -0,0 +1 @@
Switch task scheduler from raw logcontext manipulation to using the dedicated logcontext utils.

View File

@ -1,4 +1,4 @@
## Streams
# Streams
Synapse has a concept of "streams", which are roughly described in [`id_generators.py`](
https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py
@ -19,7 +19,7 @@ To that end, let's describe streams formally, paraphrasing from the docstring of
https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96
).
### Definition
## Definition
A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time.
Only "writers" can add facts to a stream, and there may be multiple writers.
@ -47,7 +47,7 @@ But unhappy cases (e.g. transaction rollback due to an error) also count as comp
Once completed, the rows written with that stream ID are fixed, and no new rows
will be inserted with that ID.
### Current stream ID
## Current stream ID
For any given stream reader (including writers themselves), we may define a per-writer current stream ID:
@ -93,7 +93,7 @@ Consider a single-writer stream which is initially at ID 1.
| Complete 6 | 6 | |
### Multi-writer streams
## Multi-writer streams
There are two ways to view a multi-writer stream.
@ -115,7 +115,7 @@ The facts this stream holds are instructions to "you should now invalidate these
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).
### Writing to streams
## Writing to streams
Writers need to track:
- track their current position (i.e. its own per-writer stream ID).
@ -133,7 +133,7 @@ To complete a fact, first remove it from your map of facts currently awaiting co
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID.
### Subscribing to streams
## Subscribing to streams
Readers need to track the current position of every writer.
@ -146,10 +146,44 @@ The `RDATA` itself is not a self-contained representation of the fact;
readers will have to query the stream tables for the full details.
Readers must also advance their record of the writer's current position for that stream.
# Summary
## Summary
In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous.
---
## Cheatsheet for creating a new stream
These rough notes and links may help you to create a new stream and add all the
necessary registration and event handling.
**Create your stream:**
- [create a stream class and stream row class](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728)
- will need an [ID generator](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75)
- may need [writer configuration](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177), if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream.
- if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: [[1]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331), [[2]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440)
- most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it.
- consider whether it may make sense to introduce a handler
**Register your stream in:**
- [`STREAMS_MAP`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71)
**Advance your stream in:**
- [`process_replication_position` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111)
- don't forget the super call
**If you're going to do any caching that needs invalidation from new rows:**
- add invalidations to [`process_replication_rows` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91)
- don't forget the super call
- add local-only [invalidations to your writer transactions](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201)
**For streams to be used in sync:**
- add a new field to [`StreamToken`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003)
- add a new [`StreamKeyType`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999)
- add appropriate wake-up rules
- in [`on_rdata`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260)
- locally on the same worker when completing a write, [e.g. in your handler](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139)
- add the stream in [`bound_future_token`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127)
---

View File

@ -327,7 +327,7 @@ class LoggingTransaction:
self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Call the given callback on the main twisted thread after the transaction has
finished.
finished successfully.
Mostly used to invalidate the caches on the correct thread.
@ -348,7 +348,7 @@ class LoggingTransaction:
self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Call the given asynchronous callback on the main twisted thread after
the transaction has finished (but before those added in `call_after`).
the transaction has finished successfully (but before those added in `call_after`).
Mostly used to invalidate remote caches after transactions.

View File

@ -182,7 +182,8 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
Uses a Postgres sequence to coordinate ID assignment, but positions of other
writers will only get updated when `advance` is called (by replication).
Note: Only works with Postgres.
On SQLite, falls back to a single-writer implementation, which is fine because
Synapse only supports monolith mode when SQLite is the database driver.
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
to have been 'seen as persisted'.
@ -543,6 +544,16 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
def get_next_txn(self, txn: LoggingTransaction) -> int:
"""
Generate an ID for immediate use within a database transaction.
The ID will automatically be marked as finished at the end of the
database transaction, therefore the stream rows MUST be persisted
within the active transaction (MUST NOT be persisted in a later
transaction).
The replication notifier will automatically be notified when the
transaction ends successfully.
Usage:
stream_id = stream_id_gen.get_next_txn(txn)

View File

@ -27,8 +27,8 @@ from twisted.python.failure import Failure
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
PreserveLoggingContext,
nested_logging_context,
set_current_context,
)
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import (
@ -422,14 +422,11 @@ class TaskScheduler:
"""
current_time = self._clock.time()
calling_context = set_current_context(task_log_context)
try:
with PreserveLoggingContext(task_log_context):
usage = task_log_context.get_resource_usage()
TaskScheduler._log_task_usage(
"continuing", task, usage, current_time - start_time
)
finally:
set_current_context(calling_context)
async def wrapper() -> None:
with nested_logging_context(task.id) as log_context: