Assert that buffers are marked dirty before XLogRegisterBuffer().

Enforce the rule from transam/README in XLogRegisterBuffer(), and
update callers to follow the rule.

Hash indexes sometimes register clean pages as a part of the locking
protocol, so provide a REGBUF_NO_CHANGE flag to support that use.

Discussion: https://postgr.es/m/c84114f8-c7f1-5b57-f85a-3adc31e1a904@iki.fi
Reviewed-by: Heikki Linnakangas
This commit is contained in:
Jeff Davis 2023-10-23 17:17:46 -07:00
parent befe9451fb
commit 00d7fb5e2e
10 changed files with 118 additions and 20 deletions

View File

@ -387,24 +387,22 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
START_CRIT_SECTION();
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogBeginInsert();
XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
if (BufferIsValid(childbuf))
XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
}
/* Perform the page update, and register any extra WAL data */
/*
* Perform the page update, dirty and register stack->buffer, and
* register any extra WAL data.
*/
btree->execPlaceToPage(btree, stack->buffer, stack,
insertdata, updateblkno, ptp_workspace);
MarkBufferDirty(stack->buffer);
/* An insert to an internal page finishes the split of the child. */
if (BufferIsValid(childbuf))
{
GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
MarkBufferDirty(childbuf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
}
if (RelationNeedsWAL(btree->index) && !btree->isBuild)

View File

@ -721,9 +721,12 @@ dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
/* Apply changes to page */
dataPlaceToPageLeafRecompress(buf, leaf);
MarkBufferDirty(buf);
/* If needed, register WAL data built by computeLeafRecompressWALData */
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
}
}
@ -1155,6 +1158,8 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
pitem = (PostingItem *) insertdata;
GinDataPageAddPostingItem(page, pitem, off);
MarkBufferDirty(buf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
/*
@ -1167,6 +1172,7 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
data.offset = off;
data.newitem = *pitem;
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) &data,
sizeof(ginxlogInsertDataInternal));
}

View File

@ -571,6 +571,8 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
MarkBufferDirty(buf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
/*
@ -583,6 +585,7 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
data.isDelete = insertData->isDelete;
data.offset = off;
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) &data,
offsetof(ginxlogInsertEntry, tuple));
XLogRegisterBufData(0, (char *) insertData->entry,

View File

@ -397,6 +397,9 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
}
Assert((ptr - collectordata) <= collector->sumsize);
MarkBufferDirty(buffer);
if (needWal)
{
XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
@ -404,8 +407,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
}
metadata->tailFreeSize = PageGetExactFreeSpace(page);
MarkBufferDirty(buffer);
}
/*

View File

@ -824,11 +824,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
/*
* bucket buffer needs to be registered to ensure that we can
* acquire a cleanup lock on it during replay.
* bucket buffer was not changed, but still needs to be
* registered to ensure that we can acquire a cleanup lock on
* it during replay.
*/
if (!xlrec.is_primary_bucket_page)
XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
{
uint8 flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
XLogRegisterBuffer(0, bucket_buf, flags);
}
XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
XLogRegisterBufData(1, (char *) deletable,

View File

@ -658,11 +658,15 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage);
/*
* bucket buffer needs to be registered to ensure that we can acquire
* a cleanup lock on it during replay.
* bucket buffer was not changed, but still needs to be registered to
* ensure that we can acquire a cleanup lock on it during replay.
*/
if (!xlrec.is_prim_bucket_same_wrt)
XLogRegisterBuffer(0, bucketbuf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
{
uint8 flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
XLogRegisterBuffer(0, bucketbuf, flags);
}
XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
if (xlrec.ntups > 0)
@ -960,11 +964,16 @@ readpage:
XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents);
/*
* bucket buffer needs to be registered to ensure that
* we can acquire a cleanup lock on it during replay.
* bucket buffer was not changed, but still needs to
* be registered to ensure that we can acquire a
* cleanup lock on it during replay.
*/
if (!xlrec.is_prim_bucket_same_wrt)
XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
{
int flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
XLogRegisterBuffer(0, bucket_buf, flags);
}
XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
XLogRegisterBufData(1, (char *) itup_offsets,

View File

@ -248,6 +248,20 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
Assert(begininsert_called);
/*
* Ordinarily, buffer should be exclusive-locked and marked dirty before
* we get here, otherwise we could end up violating one of the rules in
* access/transam/README.
*
* Some callers intentionally register a clean page and never update that
* page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
* bypass these checks.
*/
#ifdef USE_ASSERT_CHECKING
if (!(flags & REGBUF_NO_CHANGE))
Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
#endif
if (block_id >= max_registered_block_id)
{
if (block_id >= max_registered_buffers)
@ -1313,8 +1327,8 @@ log_newpage_range(Relation rel, ForkNumber forknum,
START_CRIT_SECTION();
for (i = 0; i < nbufs; i++)
{
XLogRegisterBuffer(i, bufpack[i], flags);
MarkBufferDirty(bufpack[i]);
XLogRegisterBuffer(i, bufpack[i], flags);
}
recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);

View File

@ -2098,6 +2098,65 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
return first_block;
}
/*
* BufferIsExclusiveLocked
*
* Checks if buffer is exclusive-locked.
*
* Buffer must be pinned.
*/
bool
BufferIsExclusiveLocked(Buffer buffer)
{
BufferDesc *bufHdr;
if (BufferIsLocal(buffer))
{
int bufid = -buffer - 1;
bufHdr = GetLocalBufferDescriptor(bufid);
}
else
{
bufHdr = GetBufferDescriptor(buffer - 1);
}
Assert(BufferIsPinned(buffer));
return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE);
}
/*
* BufferIsDirty
*
* Checks if buffer is already dirty.
*
* Buffer must be pinned and exclusive-locked. (Without an exclusive lock,
* the result may be stale before it's returned.)
*/
bool
BufferIsDirty(Buffer buffer)
{
BufferDesc *bufHdr;
if (BufferIsLocal(buffer))
{
int bufid = -buffer - 1;
bufHdr = GetLocalBufferDescriptor(bufid);
}
else
{
bufHdr = GetBufferDescriptor(buffer - 1);
}
Assert(BufferIsPinned(buffer));
Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
LW_EXCLUSIVE));
return pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY;
}
/*
* MarkBufferDirty
*

View File

@ -37,6 +37,7 @@
* will be skipped) */
#define REGBUF_KEEP_DATA 0x10 /* include data even if a full-page image
* is taken */
#define REGBUF_NO_CHANGE 0x20 /* intentionally register clean buffer */
/* prototypes for public functions in xloginsert.c: */
extern void XLogBeginInsert(void);

View File

@ -179,6 +179,8 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
bool permanent);
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern bool BufferIsExclusiveLocked(Buffer buffer);
extern bool BufferIsDirty(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);
extern void IncrBufferRefCount(Buffer buffer);
extern void CheckBufferIsPinnedOnce(Buffer buffer);