Fix xmin advancement during fast_forward decoding.

During logical decoding, we advance catalog_xmin of logical too early in
fast_forward mode, resulting in required catalog data being removed by
vacuum. This mode is normally used to advance the slot without processing
the changes, but we still can't let the slot's xmin to advance to an
incorrect value.

Commit f49a80c481 fixed a similar issue where the logical slot's
catalog_xmin was getting advanced prematurely during non-fast-forward
mode. During xl_running_xacts processing, instead of directly advancing
the slot's xmin to the oldest running xid in the record, it allowed the
xmin to be held back for snapshots that can be used for
not-yet-replayed transactions, as those might consider older txns as
running too. However, it missed the fact that the same problem can happen
during fast_forward mode decoding, as we won't build a base snapshot in
that mode, and the future call to get_changes from the same slot can miss
seeing the required catalog changes leading to incorrect reslts.

This commit allows building the base snapshot even in fast_forward mode to
prevent the early advancement of xmin.

Reported-by: Amit Kapila <amit.kapila16@gmail.com>
Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Backpatch-through: 13
Discussion: https://postgr.es/m/CAA4eK1LqWncUOqKijiafe+Ypt1gQAQRjctKLMY953J79xDBgAg@mail.gmail.com
Discussion: https://postgr.es/m/OS0PR01MB57163087F86621D44D9A72BF94BB2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila 2025-04-28 11:55:00 +05:30
parent 4164d69763
commit d65485b02b
3 changed files with 71 additions and 13 deletions

View File

@ -38,3 +38,44 @@ COMMIT
stop
(1 row)
starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_advance_slot s0_advance_slot s1_commit s0_vacuum s0_get_changes
step s0_begin: BEGIN;
step s0_getxid: SELECT pg_current_xact_id() IS NULL;
?column?
--------
f
(1 row)
step s1_begin: BEGIN;
step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
step s0_commit: COMMIT;
step s0_checkpoint: CHECKPOINT;
step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
slot_name
--------------
isolation_slot
(1 row)
step s0_advance_slot: SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
slot_name
--------------
isolation_slot
(1 row)
step s1_commit: COMMIT;
step s0_vacuum: VACUUM pg_attribute;
step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------------------------------------------------------
BEGIN
table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
COMMIT
(3 rows)
?column?
--------
stop
(1 row)

View File

@ -25,6 +25,7 @@ step "s0_commit" { COMMIT; }
step "s0_checkpoint" { CHECKPOINT; }
step "s0_vacuum" { VACUUM pg_attribute; }
step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
step "s0_advance_slot" { SELECT slot_name FROM pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); }
session "s1"
setup { SET synchronous_commit=on; }
@ -40,3 +41,7 @@ step "s1_commit" { COMMIT; }
# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
# forbid modifying catalog after someone read it (and didn't commit yet).
permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
# Perform the same testing process as described above, but use advance_slot to
# forces xmin advancement during fast forward decoding.
permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_advance_slot" "s0_advance_slot" "s1_commit" "s0_vacuum" "s0_get_changes"

View File

@ -362,20 +362,24 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
* point in decoding changes.
* point in decoding data changes. However, it's crucial to build the base
* snapshot during fast-forward mode (as is done in
* SnapBuildProcessChange()) because we require the snapshot's xmin when
* determining the candidate catalog_xmin for the replication slot. See
* SnapBuildProcessRunningXacts().
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
ctx->fast_forward)
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP2_MULTI_INSERT:
if (!ctx->fast_forward &&
SnapBuildProcessChange(builder, xid, buf->origptr))
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
if (!ctx->fast_forward)
{
xl_heap_new_cid *xlrec;
@ -422,16 +426,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
* point in decoding data changes.
* point in decoding data changes. However, it's crucial to build the base
* snapshot during fast-forward mode (as is done in
* SnapBuildProcessChange()) because we require the snapshot's xmin when
* determining the candidate catalog_xmin for the replication slot. See
* SnapBuildProcessRunningXacts().
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
ctx->fast_forward)
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP_INSERT:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
DecodeInsert(ctx, buf);
break;
@ -442,17 +450,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
DecodeUpdate(ctx, buf);
break;
case XLOG_HEAP_DELETE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
DecodeDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
DecodeTruncate(ctx, buf);
break;
@ -480,7 +491,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_CONFIRM:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
!ctx->fast_forward)
DecodeSpecConfirm(ctx, buf);
break;