diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1cf53c74ea6..186edaffd5a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11701,22 +11701,35 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) - Allows streaming or serializing changes immediately in logical decoding. - The allowed values of logical_replication_mode are - buffered and immediate. When set - to immediate, stream each change if + The allowed values are buffered and + immediate. The default is buffered. + This parameter is intended to be used to test logical decoding and + replication of large transactions. The effect of + logical_replication_mode is different for the + publisher and subscriber: + + + + On the publisher side, logical_replication_mode + allows streaming or serializing changes immediately in logical decoding. + When set to immediate, stream each change if the streaming option (see optional parameters set by CREATE SUBSCRIPTION) is enabled, otherwise, serialize each change. When set to - buffered, which is the default, decoding will stream - or serialize changes when logical_decoding_work_mem - is reached. + buffered, the decoding will stream or serialize + changes when logical_decoding_work_mem is reached. + - This parameter is intended to be used to test logical decoding and - replication of large transactions for which otherwise we need to - generate the changes till logical_decoding_work_mem - is reached. + On the subscriber side, if the streaming option is set to + parallel, logical_replication_mode + can be used to direct the leader apply worker to send changes to the + shared memory queue or to serialize all changes to the file. When set to + buffered, the leader sends changes to parallel apply + workers via a shared memory queue. When set to + immediate, the leader serializes all changes to files + and notifies the parallel apply workers to read and apply them at the + end of the transaction. diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 3579e704fe5..e670ec617a4 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -1149,6 +1149,13 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); + /* + * We don't try to send data to parallel worker for 'immediate' mode. This + * is primarily used for testing purposes. + */ + if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE)) + return false; + /* * This timeout is a bit arbitrary but testing revealed that it is sufficient * to send the message unless the parallel apply worker is waiting on some @@ -1187,12 +1194,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) startTime = GetCurrentTimestamp(); else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) - { - ereport(LOG, - (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", - winfo->shared->xid))); return false; - } } } @@ -1206,6 +1208,10 @@ void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked) { + ereport(LOG, + (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", + winfo->shared->xid))); + /* * The parallel apply worker could be stuck for some reason (say waiting * on some lock by other backend), so stop trying to send data directly to diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index c5a95f5dcca..b46e3b8c558 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4920,8 +4920,10 @@ struct config_enum ConfigureNamesEnum[] = { {"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS, - gettext_noop("Controls when to replicate each change."), - gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."), + gettext_noop("Controls when to replicate or apply each change."), + gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. " + "On the subscriber, it allows serialization of all changes to files and notifies the " + "parallel apply workers to read and apply them at the end of the transaction."), GUC_NOT_IN_SAMPLE }, &logical_replication_mode, diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 91e8aa8c0a5..0e0f27f14df 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -312,6 +312,34 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(15000), 'parallel apply worker replayed all changes from file'); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 814daf4d2f9..2b67ae1e0ac 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -143,15 +143,17 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; @@ -198,6 +200,63 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1}); test_streaming($node_publisher, $node_subscriber, $appname, 1); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + ROLLBACK; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(0), 'check rollback was reflected on subscriber'); + +# Serialize the ABORT sub-transaction. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + SAVEPOINT sp; + INSERT INTO test_tab_2 values(1); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'check rollback to savepoint was reflected on subscriber'); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 497245a209c..1cc871fddbd 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -319,16 +319,18 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup structure on subscriber (columns a and b are compatible with same table name on publisher) $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication (streaming = on) my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; @@ -384,6 +386,48 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1}); test_streaming($node_publisher, $node_subscriber, $appname, 1); +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + PREPARE TRANSACTION 'xact'; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';"); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'transaction is committed on subscriber'); + ############################### # check all the cleanup ###############################