Peter Geoghegan d2e5e20e57 Add xl_btree_delete optimization.
Commit 558a9165e08 taught _bt_delitems_delete() to produce its own XID
horizon on the primary.  Standbys no longer needed to generate their own
latestRemovedXid, since they could just use the explicitly logged value
from the primary instead.  The deleted offset numbers array from the
xl_btree_delete WAL record was no longer used by the REDO routine for
anything other than deleting the items.

This enables a minor optimization:  We now treat the array as buffer
state, not generic WAL data, following _bt_delitems_vacuum()'s example.
This should be a minor win, since it allows us to avoid including the
deleted items array in cases where XLogInsert() stores the whole buffer
anyway.  The primary goal here is to make the code more maintainable,
though.  Removing inessential differences between the two functions
highlights the fundamental differences that remain.

Also change xl_btree_delete to use uint32 for the size of the array of
item offsets being deleted.  This brings xl_btree_delete closer to
xl_btree_vacuum.  Furthermore, it seems like a good idea to use an
explicit-width integer type (the field was previously an "int").

Bump XLOG_PAGE_MAGIC because xl_btree_delete changed.

Discussion: https://postgr.es/m/CAH2-Wzkz4TjmezzfAbaV1zYrh=fr0bCpzuJTvBe5iUQ3aUPsCQ@mail.gmail.com
2020-01-03 12:18:13 -08:00

820 lines
23 KiB
C

/*-------------------------------------------------------------------------
*
* nbtxlog.c
* WAL replay logic for btrees.
*
*
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/nbtree/nbtxlog.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/bufmask.h"
#include "access/nbtree.h"
#include "access/nbtxlog.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "storage/procarray.h"
/*
* _bt_restore_page -- re-enter all the index tuples on a page
*
* The page is freshly init'd, and *from (length len) is a copy of what
* had been its upper part (pd_upper to pd_special). We assume that the
* tuples had been added to the page in item-number order, and therefore
* the one with highest item number appears first (lowest on the page).
*/
static void
_bt_restore_page(Page page, char *from, int len)
{
IndexTupleData itupdata;
Size itemsz;
char *end = from + len;
Item items[MaxIndexTuplesPerPage];
uint16 itemsizes[MaxIndexTuplesPerPage];
int i;
int nitems;
/*
* To get the items back in the original order, we add them to the page in
* reverse. To figure out where one tuple ends and another begins, we
* have to scan them in forward order first.
*/
i = 0;
while (from < end)
{
/*
* As we step through the items, 'from' won't always be properly
* aligned, so we need to use memcpy(). Further, we use Item (which
* is just a char*) here for our items array for the same reason;
* wouldn't want the compiler or anyone thinking that an item is
* aligned when it isn't.
*/
memcpy(&itupdata, from, sizeof(IndexTupleData));
itemsz = IndexTupleSize(&itupdata);
itemsz = MAXALIGN(itemsz);
items[i] = (Item) from;
itemsizes[i] = itemsz;
i++;
from += itemsz;
}
nitems = i;
for (i = nitems - 1; i >= 0; i--)
{
if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
false, false) == InvalidOffsetNumber)
elog(PANIC, "_bt_restore_page: cannot add item to page");
from += itemsz;
}
}
static void
_bt_restore_meta(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer metabuf;
Page metapg;
BTMetaPageData *md;
BTPageOpaque pageop;
xl_btree_metadata *xlrec;
char *ptr;
Size len;
metabuf = XLogInitBufferForRedo(record, block_id);
ptr = XLogRecGetBlockData(record, block_id, &len);
Assert(len == sizeof(xl_btree_metadata));
Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
xlrec = (xl_btree_metadata *) ptr;
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md = BTPageGetMeta(metapg);
md->btm_magic = BTREE_MAGIC;
md->btm_version = xlrec->version;
md->btm_root = xlrec->root;
md->btm_level = xlrec->level;
md->btm_fastroot = xlrec->fastroot;
md->btm_fastlevel = xlrec->fastlevel;
/* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
Assert(md->btm_version >= BTREE_NOVAC_VERSION);
md->btm_oldest_btpo_xact = xlrec->oldest_btpo_xact;
md->btm_last_cleanup_num_heap_tuples = xlrec->last_cleanup_num_heap_tuples;
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
/*
* Set pd_lower just past the end of the metadata. This is essential,
* because without doing so, metadata will be lost if xlog.c compresses
* the page.
*/
((PageHeader) metapg)->pd_lower =
((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
PageSetLSN(metapg, lsn);
MarkBufferDirty(metabuf);
UnlockReleaseBuffer(metabuf);
}
/*
* _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
*
* This is a common subroutine of the redo functions of all the WAL record
* types that can insert a downlink: insert, split, and newroot.
*/
static void
_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer buf;
if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buf);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_INCOMPLETE_SPLIT(pageop));
pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void
btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
Buffer buffer;
Page page;
/*
* Insertion to an internal page finishes an incomplete split at the child
* level. Clear the incomplete-split flag in the child. Note: during
* normal operation, the child and parent pages are locked at the same
* time, so that clearing the flag and inserting the downlink appear
* atomic to other backends. We don't bother with that during replay,
* because readers don't care about the incomplete-split flag and there
* cannot be updates happening.
*/
if (!isleaf)
_bt_clear_incomplete_split(record, 1);
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
Size datalen;
char *datapos = XLogRecGetBlockData(record, 0, &datalen);
page = BufferGetPage(buffer);
if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
false, false) == InvalidOffsetNumber)
elog(PANIC, "btree_xlog_insert: failed to add item");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
* Note: in normal operation, we'd update the metapage while still holding
* lock on the page we inserted into. But during replay it's not
* necessary to hold that lock, since no other index updates can be
* happening concurrently, and readers will cope fine with following an
* obsolete link from the metapage.
*/
if (ismeta)
_bt_restore_meta(record, 2);
}
static void
btree_xlog_split(bool onleft, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
bool isleaf = (xlrec->level == 0);
Buffer lbuf;
Buffer rbuf;
Page rpage;
BTPageOpaque ropaque;
char *datapos;
Size datalen;
BlockNumber leftsib;
BlockNumber rightsib;
BlockNumber rnext;
XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
rnext = P_NONE;
/*
* Clear the incomplete split flag on the left sibling of the child page
* this is a downlink for. (Like in btree_xlog_insert, this can be done
* before locking the other pages)
*/
if (!isleaf)
_bt_clear_incomplete_split(record, 3);
/* Reconstruct right (new) sibling page from scratch */
rbuf = XLogInitBufferForRedo(record, 1);
datapos = XLogRecGetBlockData(record, 1, &datalen);
rpage = (Page) BufferGetPage(rbuf);
_bt_pageinit(rpage, BufferGetPageSize(rbuf));
ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
ropaque->btpo_prev = leftsib;
ropaque->btpo_next = rnext;
ropaque->btpo.level = xlrec->level;
ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0;
_bt_restore_page(rpage, datapos, datalen);
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuf);
/* Now reconstruct left (original) sibling page */
if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
{
/*
* To retain the same physical order of the tuples that they had, we
* initialize a temporary empty page for the left page and add all the
* items to that in item number order. This mirrors how _bt_split()
* works. Retaining the same physical order makes WAL consistency
* checking possible. See also _bt_restore_page(), which does the
* same for the right page.
*/
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
OffsetNumber off;
IndexTuple newitem = NULL,
left_hikey = NULL;
Size newitemsz = 0,
left_hikeysz = 0;
Page newlpage;
OffsetNumber leftoff;
datapos = XLogRecGetBlockData(record, 0, &datalen);
if (onleft)
{
newitem = (IndexTuple) datapos;
newitemsz = MAXALIGN(IndexTupleSize(newitem));
datapos += newitemsz;
datalen -= newitemsz;
}
/*
* Extract left hikey and its size. We assume that 16-bit alignment
* is enough to apply IndexTupleSize (since it's fetching from a
* uint16 field).
*/
left_hikey = (IndexTuple) datapos;
left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
datapos += left_hikeysz;
datalen -= left_hikeysz;
Assert(datalen == 0);
newlpage = PageGetTempPageCopySpecial(lpage);
/* Set high key */
leftoff = P_HIKEY;
if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz,
P_HIKEY, false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add high key to left page after split");
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
{
ItemId itemid;
Size itemsz;
IndexTuple item;
/* add the new item if it was inserted on left page */
if (onleft && off == xlrec->newitemoff)
{
if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
itemid = PageGetItemId(lpage, off);
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(lpage, itemid);
if (PageAddItem(newlpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
/* cope with possibility that newitem goes at the end */
if (onleft && off == xlrec->newitemoff)
{
if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(newlpage, lpage);
/* Fix opaque fields */
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
lopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf);
}
/*
* We no longer need the buffers. They must be released together, so that
* readers cannot observe two inconsistent halves.
*/
if (BufferIsValid(lbuf))
UnlockReleaseBuffer(lbuf);
UnlockReleaseBuffer(rbuf);
/*
* Fix left-link of the page to the right of the new right sibling.
*
* Note: in normal operation, we do this while still holding lock on the
* two split pages. However, that's not necessary for correctness in WAL
* replay, because no other index update can be in progress, and readers
* will cope properly when following an obsolete left-link.
*/
if (rnext != P_NONE)
{
Buffer buffer;
if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
}
static void
btree_xlog_vacuum(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque opaque;
/*
* We need to take a cleanup lock here, just like btvacuumpage(). However,
* it isn't necessary to exhaustively get a cleanup lock on every block in
* the index during recovery (just getting a cleanup lock on pages with
* items to kill suffices). See nbtree/README for details.
*/
if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
== BLK_NEEDS_REDO)
{
char *ptr = XLogRecGetBlockData(record, 0, NULL);
page = (Page) BufferGetPage(buffer);
PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
/*
* Mark the page as not containing any LP_DEAD items --- see comments
* in _bt_delitems_vacuum().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
btree_xlog_delete(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque opaque;
/*
* If we have any conflict processing to do, it must happen before we
* update the page
*/
if (InHotStandby)
{
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
}
/*
* We don't need to take a cleanup lock to apply these changes. See
* nbtree/README for details.
*/
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
char *ptr = XLogRecGetBlockData(record, 0, NULL);
page = (Page) BufferGetPage(buffer);
PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
/* Mark the page as not containing any LP_DEAD items */
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque pageop;
IndexTupleData trunctuple;
/*
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
* to lock just one page at a time, since no concurrent index updates can
* be happening, and readers should not care whether they arrive at the
* target page or not (since it's surely empty).
*/
/* parent page */
if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
OffsetNumber poffset;
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
BlockNumber rightsib;
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
poffset = xlrec->poffset;
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = BTreeTupleGetDownLink(itup);
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
BTreeTupleSetDownLink(itup, rightsib);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Rewrite the leaf page as a halfdead page */
buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = xlrec->leftblk;
pageop->btpo_next = xlrec->rightblk;
pageop->btpo.level = 0;
pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
pageop->btpo_cycleid = 0;
/*
* Construct a dummy hikey item that points to the next parent to be
* deleted (if any).
*/
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(ERROR, "could not add dummy high key to half-dead page");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
BlockNumber leftsib;
BlockNumber rightsib;
Buffer buffer;
Page page;
BTPageOpaque pageop;
leftsib = xlrec->leftsib;
rightsib = xlrec->rightsib;
/*
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
* to lock just one page at a time, since no concurrent index updates can
* be happening, and readers should not care whether they arrive at the
* target page or not (since it's surely empty).
*/
/* Fix left-link of right sibling */
if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Fix right-link of left sibling, if any */
if (leftsib != P_NONE)
{
if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/* Rewrite target page as empty deleted page */
buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
pageop->btpo_next = rightsib;
pageop->btpo.xact = xlrec->btpo_xact;
pageop->btpo_flags = BTP_DELETED;
pageop->btpo_cycleid = 0;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
/*
* If we deleted a parent of the targeted leaf page, instead of the leaf
* itself, update the leaf to point to the next remaining child in the
* branch.
*/
if (XLogRecHasBlockRef(record, 3))
{
/*
* There is no real data on the page, so we just re-create it from
* scratch using the information from the WAL record.
*/
IndexTupleData trunctuple;
buffer = XLogInitBufferForRedo(record, 3);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
pageop->btpo_prev = xlrec->leafleftsib;
pageop->btpo_next = xlrec->leafrightsib;
pageop->btpo.level = 0;
pageop->btpo_cycleid = 0;
/* Add a dummy hikey item */
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(ERROR, "could not add dummy high key to half-dead page");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
/* Update metapage if needed */
if (info == XLOG_BTREE_UNLINK_PAGE_META)
_bt_restore_meta(record, 4);
}
static void
btree_xlog_newroot(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *ptr;
Size len;
buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags = BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo.level = xlrec->level;
if (xlrec->level == 0)
pageop->btpo_flags |= BTP_LEAF;
pageop->btpo_cycleid = 0;
if (xlrec->level > 0)
{
ptr = XLogRecGetBlockData(record, 0, &len);
_bt_restore_page(page, ptr, len);
/* Clear the incomplete-split flag in left child */
_bt_clear_incomplete_split(record, 1);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
_bt_restore_meta(record, 2);
}
static void
btree_xlog_reuse_page(XLogReaderState *record)
{
xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
/*
* Btree reuse_page records exist to provide a conflict point when we
* reuse pages in the index via the FSM. That's all they do though.
*
* latestRemovedXid was the page's btpo.xact. The btpo.xact <
* RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
* pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
* Consequently, one XID value achieves the same exclusion effect on
* master and standby.
*/
if (InHotStandby)
{
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
xlrec->node);
}
}
void
btree_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info)
{
case XLOG_BTREE_INSERT_LEAF:
btree_xlog_insert(true, false, record);
break;
case XLOG_BTREE_INSERT_UPPER:
btree_xlog_insert(false, false, record);
break;
case XLOG_BTREE_INSERT_META:
btree_xlog_insert(false, true, record);
break;
case XLOG_BTREE_SPLIT_L:
btree_xlog_split(true, record);
break;
case XLOG_BTREE_SPLIT_R:
btree_xlog_split(false, record);
break;
case XLOG_BTREE_VACUUM:
btree_xlog_vacuum(record);
break;
case XLOG_BTREE_DELETE:
btree_xlog_delete(record);
break;
case XLOG_BTREE_MARK_PAGE_HALFDEAD:
btree_xlog_mark_page_halfdead(info, record);
break;
case XLOG_BTREE_UNLINK_PAGE:
case XLOG_BTREE_UNLINK_PAGE_META:
btree_xlog_unlink_page(info, record);
break;
case XLOG_BTREE_NEWROOT:
btree_xlog_newroot(record);
break;
case XLOG_BTREE_REUSE_PAGE:
btree_xlog_reuse_page(record);
break;
case XLOG_BTREE_META_CLEANUP:
_bt_restore_meta(record, 0);
break;
default:
elog(PANIC, "btree_redo: unknown op code %u", info);
}
}
/*
* Mask a btree page before performing consistency checks on it.
*/
void
btree_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
BTPageOpaque maskopaq;
mask_page_lsn_and_checksum(page);
mask_page_hint_bits(page);
mask_unused_space(page);
maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(maskopaq))
{
/*
* Mask page content on a DELETED page since it will be re-initialized
* during replay. See btree_xlog_unlink_page() for details.
*/
mask_page_content(page);
}
else if (P_ISLEAF(maskopaq))
{
/*
* In btree leaf pages, it is possible to modify the LP_FLAGS without
* emitting any WAL record. Hence, mask the line pointer flags. See
* _bt_killitems(), _bt_check_unique() for details.
*/
mask_lp_flags(page);
}
/*
* BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
* _bt_killitems(), _bt_check_unique() for details.
*/
maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
/*
* During replay of a btree page split, we don't set the BTP_SPLIT_END
* flag of the right sibling and initialize the cycle_id to 0 for the same
* page. See btree_xlog_split() for details.
*/
maskopaq->btpo_flags &= ~BTP_SPLIT_END;
maskopaq->btpo_cycleid = 0;
}