mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-04 00:02:52 -05:00 
			
		
		
		
	Commit 0aa8a01d04 extends the output plugin API to allow decoding of prepared xacts and allowed the user to enable/disable the two-phase option via pg_logical_slot_get_changes(). This can lead to a problem such that the first time when it gets changes via pg_logical_slot_get_changes() without two_phase option enabled it will not get the prepared even though prepare is after consistent snapshot. Now next time during getting changes, if the two_phase option is enabled it can skip prepare because by that time start decoding point has been moved. So the user will only get commit prepared. Allow to enable/disable this option at the create slot time and default will be false. It will break the existing slots which is fine in a major release. Author: Ajin Cherian Reviewed-by: Amit Kapila and Vignesh C Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
		
			
				
	
	
		
			126 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			126 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
-- Test streaming of two-phase commits
 | 
						|
SET synchronous_commit = on;
 | 
						|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 | 
						|
 ?column? 
 | 
						|
----------
 | 
						|
 init
 | 
						|
(1 row)
 | 
						|
 | 
						|
CREATE TABLE stream_test(data text);
 | 
						|
-- consume DDL
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | 
						|
 data 
 | 
						|
------
 | 
						|
(0 rows)
 | 
						|
 | 
						|
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
 | 
						|
BEGIN;
 | 
						|
SAVEPOINT s1;
 | 
						|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
 | 
						|
 ?column? 
 | 
						|
----------
 | 
						|
 msg5
 | 
						|
(1 row)
 | 
						|
 | 
						|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
 | 
						|
TRUNCATE table stream_test;
 | 
						|
ROLLBACK TO s1;
 | 
						|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 | 
						|
PREPARE TRANSACTION 'test1';
 | 
						|
-- should show the inserts after a ROLLBACK
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | 
						|
                           data                           
 | 
						|
----------------------------------------------------------
 | 
						|
 streaming message: transactional: 1 prefix: test, sz: 50
 | 
						|
 opening a streamed block for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 streaming change for transaction
 | 
						|
 closing a streamed block for transaction
 | 
						|
 preparing streamed transaction 'test1'
 | 
						|
(24 rows)
 | 
						|
 | 
						|
COMMIT PREPARED 'test1';
 | 
						|
--should show the COMMIT PREPARED and the other changes in the transaction
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | 
						|
          data           
 | 
						|
-------------------------
 | 
						|
 COMMIT PREPARED 'test1'
 | 
						|
(1 row)
 | 
						|
 | 
						|
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 | 
						|
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
 | 
						|
BEGIN;
 | 
						|
SAVEPOINT s1;
 | 
						|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
 | 
						|
 ?column? 
 | 
						|
----------
 | 
						|
 msg5
 | 
						|
(1 row)
 | 
						|
 | 
						|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
 | 
						|
TRUNCATE table stream_test;
 | 
						|
ROLLBACK to s1;
 | 
						|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 | 
						|
PREPARE TRANSACTION 'test1_nodecode';
 | 
						|
-- should NOT show inserts after a ROLLBACK
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | 
						|
                           data                           
 | 
						|
----------------------------------------------------------
 | 
						|
 streaming message: transactional: 1 prefix: test, sz: 50
 | 
						|
(1 row)
 | 
						|
 | 
						|
COMMIT PREPARED 'test1_nodecode';
 | 
						|
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
 | 
						|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 | 
						|
                            data                             
 | 
						|
-------------------------------------------------------------
 | 
						|
 BEGIN
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
 | 
						|
 table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
 | 
						|
 COMMIT
 | 
						|
(22 rows)
 | 
						|
 | 
						|
DROP TABLE stream_test;
 | 
						|
SELECT pg_drop_replication_slot('regression_slot');
 | 
						|
 pg_drop_replication_slot 
 | 
						|
--------------------------
 | 
						|
 
 | 
						|
(1 row)
 | 
						|
 |