diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index a53e23c679d..8fe7bb65f1f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -564,7 +564,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; + Snapshot snapshot = NULL; xl_logical_message *message; if (info != XLOG_LOGICAL_MESSAGE) @@ -594,7 +594,17 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuildXactNeedsSkip(builder, buf->origptr))) return; - snapshot = SnapBuildGetOrBuildSnapshot(builder); + /* + * If this is a non-transactional change, get the snapshot we're expected + * to use. We only get here when the snapshot is consistent, and the + * change is not meant to be skipped. + * + * For transactional changes we don't need a snapshot, we'll use the + * regular snapshot maintained by ReorderBuffer. We just leave it NULL. + */ + if (!message->transactional) + snapshot = SnapBuildGetOrBuildSnapshot(builder); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, message->transactional, message->message, /* first part of message is diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index d5f90a5f5d2..2d17c551a80 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -856,6 +856,13 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Assert(xid != InvalidTransactionId); + /* + * We don't expect snapshots for transactional changes - we'll use the + * snapshot derived later during apply (unless the change gets + * skipped). + */ + Assert(!snap); + oldcontext = MemoryContextSwitchTo(rb->context); change = ReorderBufferGetChange(rb); @@ -874,6 +881,9 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, ReorderBufferTXN *txn = NULL; volatile Snapshot snapshot_now = snap; + /* Non-transactional changes require a valid snapshot. */ + Assert(snapshot_now); + if (xid != InvalidTransactionId) txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);