mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-26 00:02:18 -04:00 
			
		
		
		
	The logical decoding functions do BeginInternalSubTransaction and RollbackAndReleaseCurrentSubTransaction to clean up after themselves. It turns out that AtEOSubXact_SPI has an unrecognized assumption that we always need to cancel the active SPI operation in the SPI context that surrounds the subtransaction (if there is one). That's true when the RollbackAndReleaseCurrentSubTransaction call is coming from the SPI-using function itself, but not when it's happening inside some unrelated function invoked by a SPI query. In practice the affected callers are the various PLs. To fix, record the current subtransaction ID when we begin a SPI operation, and clean up only if that ID is the subtransaction being canceled. Also, remove AtEOSubXact_SPI's assertion that it must have cleaned up the surrounding SPI context's active tuptable. That's proven wrong by the same test case. Also clarify (or, if you prefer, reinterpret) the calling conventions for _SPI_begin_call and _SPI_end_call. The memory context cleanup in the latter means that these have always had the flavor of a matched resource-management pair, but they weren't documented that way before. Per report from Ben Chobot. Back-patch to 9.4 where logical decoding came in. In principle, the SPI changes should go all the way back, since the problem dates back to commit 7ec1c5a86. But given the lack of field complaints it seems few people are using internal subtransactions in this way. So I don't feel a need to take any risks in 9.2/9.3. Discussion: https://postgr.es/m/73FBA179-C68C-4540-9473-71E865408B15@silentmedia.com
		
			
				
	
	
		
			112 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			112 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| -- test that we can insert the result of a get_changes call into a
 | |
| -- logged relation. That's really not a good idea in practical terms,
 | |
| -- but provides a nice test.
 | |
| -- predictability
 | |
| SET synchronous_commit = on;
 | |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 | |
|  ?column? 
 | |
| ----------
 | |
|  init
 | |
| (1 row)
 | |
| 
 | |
| -- slot works
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
|  data 
 | |
| ------
 | |
| (0 rows)
 | |
| 
 | |
| -- create some changes
 | |
| CREATE TABLE somechange(id serial primary key);
 | |
| INSERT INTO somechange DEFAULT VALUES;
 | |
| CREATE TABLE changeresult AS
 | |
|     SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
| SELECT * FROM changeresult;
 | |
|                       data                      
 | |
| ------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.somechange: INSERT: id[integer]:1
 | |
|  COMMIT
 | |
| (3 rows)
 | |
| 
 | |
| INSERT INTO changeresult
 | |
|     SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
| INSERT INTO changeresult
 | |
|     SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
| SELECT * FROM changeresult;
 | |
|                                                                        data                                                                       
 | |
| --------------------------------------------------------------------------------------------------------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.somechange: INSERT: id[integer]:1
 | |
|  COMMIT
 | |
|  BEGIN
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  COMMIT
 | |
|  BEGIN
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  COMMIT
 | |
|  BEGIN
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  COMMIT
 | |
| (20 rows)
 | |
| 
 | |
| DROP TABLE changeresult;
 | |
| DROP TABLE somechange;
 | |
| -- check calling logical decoding from pl/pgsql
 | |
| CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
 | |
| BEGIN
 | |
|   RETURN QUERY
 | |
|     SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
| END$$ LANGUAGE plpgsql;
 | |
| SELECT * FROM slot_changes_wrapper('regression_slot');
 | |
|                                                                                           slot_changes_wrapper                                                                                          
 | |
| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  COMMIT
 | |
| (14 rows)
 | |
| 
 | |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 | |
|                                                                                                   data                                                                                                  
 | |
| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | |
|  BEGIN
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  table public.changeresult: INSERT: data[text]:'BEGIN'
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
 | |
|  table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
 | |
|  table public.changeresult: INSERT: data[text]:'COMMIT'
 | |
|  COMMIT
 | |
| (14 rows)
 | |
| 
 | |
| SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
 | |
|  ?column? 
 | |
| ----------
 | |
|  stop
 | |
| (1 row)
 | |
| 
 |