Add optional pid parameter to pg_replication_origin_session_setup().

Commit 216a784829c introduced parallel apply workers, allowing multiple
processes to share a replication origin. To support this,
replorigin_session_setup() was extended to accept a pid argument
identifying the process using the origin.

This commit exposes that capability through the SQL interface function
pg_replication_origin_session_setup() by adding an optional pid parameter.
This enables multiple processes to coordinate replication using the same
origin when using SQL-level replication functions.

This change allows the non-builtin logical replication solutions to
implement parallel apply for large transactions.

Additionally, an existing internal error was made user-facing, as it can
now be triggered via the exposed SQL API.

Author: Doruk Yilmaz <doruk@mixrank.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com
Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com
This commit is contained in:
Amit Kapila 2025-09-19 05:38:40 +00:00
parent 8aac5923a3
commit 5b148706c5
11 changed files with 193 additions and 10 deletions

View File

@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
skip_snapshot_restore invalidation_distribution
skip_snapshot_restore invalidation_distribution parallel_session_origin
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf

View File

@ -0,0 +1,79 @@
Parsed test spec with 2 sessions
starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
step s0_setup: SELECT pg_replication_origin_session_setup('origin');
pg_replication_origin_session_setup
-----------------------------------
(1 row)
step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
pg_replication_origin_session_is_setup
--------------------------------------
t
(1 row)
step s1_setup:
SELECT pg_replication_origin_session_setup('origin', pid)
FROM pg_stat_activity
WHERE application_name = 'isolation/parallel_session_origin/s0';
pg_replication_origin_session_setup
-----------------------------------
(1 row)
step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
pg_replication_origin_session_is_setup
--------------------------------------
t
(1 row)
step s0_add_message:
SELECT 1
FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
?column?
--------
1
(1 row)
step s0_store_lsn:
INSERT INTO local_lsn_store
SELECT 0, local_lsn FROM pg_replication_origin_status;
step s1_add_message:
SELECT 1
FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
?column?
--------
1
(1 row)
step s1_store_lsn:
INSERT INTO local_lsn_store
SELECT 1, local_lsn FROM pg_replication_origin_status;
step s0_compare:
SELECT s0.lsn < s1.lsn
FROM local_lsn_store as s0, local_lsn_store as s1
WHERE s0.session = 0 AND s1.session = 1;
?column?
--------
t
(1 row)
step s0_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
step s1_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)

View File

@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
-- ensure inactive origin cannot be set as session one if pid is specified
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
ERROR: cannot use PID -1 for inactive replication origin with ID 1
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create

View File

@ -64,6 +64,7 @@ tests += {
'slot_creation_error',
'skip_snapshot_restore',
'invalidation_distribution',
'parallel_session_origin',
],
'regress_args': [
'--temp-config', files('logical.conf'),

View File

@ -0,0 +1,56 @@
# Test parallel replication origin manipulations; ensure local_lsn can be
# updated by all attached sessions.
setup
{
SELECT pg_replication_origin_create('origin');
CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
}
teardown
{
SELECT pg_replication_origin_drop('origin');
DROP TABLE local_lsn_store;
}
session "s0"
setup { SET synchronous_commit = on; }
step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
step "s0_add_message" {
SELECT 1
FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
}
step "s0_store_lsn" {
INSERT INTO local_lsn_store
SELECT 0, local_lsn FROM pg_replication_origin_status;
}
step "s0_compare" {
SELECT s0.lsn < s1.lsn
FROM local_lsn_store as s0, local_lsn_store as s1
WHERE s0.session = 0 AND s1.session = 1;
}
step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
session "s1"
setup { SET synchronous_commit = on; }
step "s1_setup" {
SELECT pg_replication_origin_session_setup('origin', pid)
FROM pg_stat_activity
WHERE application_name = 'isolation/parallel_session_origin/s0';
}
step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
step "s1_add_message" {
SELECT 1
FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
}
step "s1_store_lsn" {
INSERT INTO local_lsn_store
SELECT 1, local_lsn FROM pg_replication_origin_status;
}
step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
# commits a transaction and store the local_lsn of the replication origin.
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"

View File

@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure inactive origin cannot be set as session one if pid is specified
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');

View File

@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
<function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
<function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
</para></entry>
If multiple processes can safely use the same replication origin (for
example, parallel apply processes), the optional <parameter>pid</parameter>
parameter can be used to specify the process ID of the first process.
The first process must provide <parameter>pid</parameter> equals to
<literal>0</literal> and the other processes that share the same
replication origin should provide the process ID of the first process.
</para>
<caution>
<para>
When multiple processes share the same replication origin, it is critical
to maintain commit order to prevent data inconsistency. While processes
may send operations out of order, they must commit transactions in the
correct sequence to ensure proper replication consistency. The recommended workflow
for each worker is: set up the replication origin session with the first process's PID,
apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
with the LSN and commit timestamp before committing, then commit the
transaction only if everything succeeded.
</para>
</caution>
</entry>
</row>
<row>

View File

@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
CREATE OR REPLACE FUNCTION
pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
RETURNS void
LANGUAGE INTERNAL
STRICT VOLATILE PARALLEL UNSAFE
AS 'pg_replication_origin_session_setup';
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;

View File

@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
curstate->roident, curstate->acquired_by)));
}
else if (curstate->acquired_by != acquired_by)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by)));
}
/* ok, found slot */
session_replication_state = curstate;
break;
@ -1181,6 +1189,12 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
errhint("Increase \"max_active_replication_origins\" and try again.")));
else if (session_replication_state == NULL)
{
if (acquired_by)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use PID %d for inactive replication origin with ID %d",
acquired_by, node)));
/* initialize new slot */
session_replication_state = &replication_states[free_slot];
Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@ -1193,9 +1207,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
else if (session_replication_state->acquired_by != acquired_by)
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by);
else
Assert(session_replication_state->acquired_by == acquired_by);
LWLockRelease(ReplicationOriginLock);
@ -1374,12 +1387,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
replorigin_session_setup(origin, 0);
pid = PG_GETARG_INT32(1);
replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202509091
#define CATALOG_VERSION_NO 202509191
#endif

View File

@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
proparallel => 'u', prorettype => 'void', proargtypes => 'text',
proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',