mirror of
https://github.com/postgres/postgres.git
synced 2025-10-24 00:03:18 -04:00
When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
65 lines
2.8 KiB
PL/PgSQL
65 lines
2.8 KiB
PL/PgSQL
-- predictability
|
|
SET synchronous_commit = on;
|
|
|
|
CREATE TABLE origin_tbl(id serial primary key, data text);
|
|
CREATE TABLE target_tbl(id serial primary key, data text);
|
|
|
|
SELECT pg_replication_origin_create('test_decoding: regression_slot');
|
|
-- ensure duplicate creations fail
|
|
SELECT pg_replication_origin_create('test_decoding: regression_slot');
|
|
|
|
--ensure deletions work (once)
|
|
SELECT pg_replication_origin_create('test_decoding: temp');
|
|
SELECT pg_replication_origin_drop('test_decoding: temp');
|
|
SELECT pg_replication_origin_drop('test_decoding: temp');
|
|
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
|
|
|
-- origin tx
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
|
|
INSERT INTO target_tbl(data)
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
|
|
-- as is normal, the insert into target_tbl shows up
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
|
|
|
|
-- mark session as replaying
|
|
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
|
|
|
|
-- ensure we prevent duplicate setup
|
|
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
|
|
|
|
BEGIN;
|
|
-- setup transaction origin
|
|
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
|
|
INSERT INTO target_tbl(data)
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
COMMIT;
|
|
|
|
-- check replication progress for the session is correct
|
|
SELECT pg_replication_origin_session_progress(false);
|
|
SELECT pg_replication_origin_session_progress(true);
|
|
|
|
SELECT pg_replication_origin_session_reset();
|
|
|
|
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
|
|
|
|
-- check replication progress identified by name is correct
|
|
SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
|
|
SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
|
|
|
|
-- ensure reset requires previously setup state
|
|
SELECT pg_replication_origin_session_reset();
|
|
|
|
-- and magically the replayed xact will be filtered!
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
|
|
--but new original changes still show up
|
|
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
|
|
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
SELECT pg_replication_origin_drop('test_decoding: regression_slot');
|