Use 64-bit atomics for xlblocks array elements.

In preparation for reading the contents of WAL buffers without a
lock. Also, avoids the previously-needed comment in GetXLogBuffer()
explaining why it's safe from torn reads.

Author: Bharath Rupireddy
Discussion: https://postgr.es/m/CALj2ACVfFMfqD5oLzZSQQZWfXiJqd-NdX0_317veP6FuB31QWA@mail.gmail.com
Reviewed-by: Andres Freund
This commit is contained in:
Jeff Davis 2023-12-19 17:35:42 -08:00
parent 1301c80b21
commit c3a8e2a7cb

View File

@ -501,7 +501,7 @@ typedef struct XLogCtlData
* WALBufMappingLock. * WALBufMappingLock.
*/ */
char *pages; /* buffers for unwritten XLOG pages */ char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */ int XLogCacheBlck; /* highest allocated xlog buffer index */
/* /*
@ -1636,20 +1636,16 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
* out to disk and evicted, and the caller is responsible for making sure * out to disk and evicted, and the caller is responsible for making sure
* that doesn't happen. * that doesn't happen.
* *
* However, we don't hold a lock while we read the value. If someone has * We don't hold a lock while we read the value. If someone is just about
* just initialized the page, it's possible that we get a "torn read" of * to initialize or has just initialized the page, it's possible that we
* the XLogRecPtr if 64-bit fetches are not atomic on this platform. In * get InvalidXLogRecPtr. That's ok, we'll grab the mapping lock (in
* that case we will see a bogus value. That's ok, we'll grab the mapping * AdvanceXLInsertBuffer) and retry if we see anything other than the page
* lock (in AdvanceXLInsertBuffer) and retry if we see anything else than * we're looking for.
* the page we're looking for. But it means that when we do this unlocked
* read, we might see a value that appears to be ahead of the page we're
* looking for. Don't PANIC on that, until we've verified the value while
* holding the lock.
*/ */
expectedEndPtr = ptr; expectedEndPtr = ptr;
expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
endptr = XLogCtl->xlblocks[idx]; endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
if (expectedEndPtr != endptr) if (expectedEndPtr != endptr)
{ {
XLogRecPtr initializedUpto; XLogRecPtr initializedUpto;
@ -1680,7 +1676,7 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
WALInsertLockUpdateInsertingAt(initializedUpto); WALInsertLockUpdateInsertingAt(initializedUpto);
AdvanceXLInsertBuffer(ptr, tli, false); AdvanceXLInsertBuffer(ptr, tli, false);
endptr = XLogCtl->xlblocks[idx]; endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
if (expectedEndPtr != endptr) if (expectedEndPtr != endptr)
elog(PANIC, "could not find WAL buffer for %X/%X", elog(PANIC, "could not find WAL buffer for %X/%X",
@ -1867,7 +1863,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
* be zero if the buffer hasn't been used yet). Fall through if it's * be zero if the buffer hasn't been used yet). Fall through if it's
* already written out. * already written out.
*/ */
OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
if (LogwrtResult.Write < OldPageRqstPtr) if (LogwrtResult.Write < OldPageRqstPtr)
{ {
/* /*
@ -1989,8 +1985,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
*/ */
pg_write_barrier(); pg_write_barrier();
*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr; pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
XLogCtl->InitializedUpTo = NewPageEndPtr; XLogCtl->InitializedUpTo = NewPageEndPtr;
npages++; npages++;
@ -2208,7 +2203,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
* if we're passed a bogus WriteRqst.Write that is past the end of the * if we're passed a bogus WriteRqst.Write that is past the end of the
* last page that's been initialized by AdvanceXLInsertBuffer. * last page that's been initialized by AdvanceXLInsertBuffer.
*/ */
XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx]; XLogRecPtr EndPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[curridx]);
if (LogwrtResult.Write >= EndPtr) if (LogwrtResult.Write >= EndPtr)
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
@ -4632,7 +4627,7 @@ XLOGShmemSize(void)
/* WAL insertion locks, plus alignment */ /* WAL insertion locks, plus alignment */
size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1)); size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
/* xlblocks array */ /* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); size = add_size(size, mul_size(sizeof(pg_atomic_uint64), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */ /* extra alignment padding for XLOG I/O buffers */
size = add_size(size, Max(XLOG_BLCKSZ, PG_IO_ALIGN_SIZE)); size = add_size(size, Max(XLOG_BLCKSZ, PG_IO_ALIGN_SIZE));
/* and the buffers themselves */ /* and the buffers themselves */
@ -4710,10 +4705,13 @@ XLOGShmemInit(void)
* needed here. * needed here.
*/ */
allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData); allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
XLogCtl->xlblocks = (XLogRecPtr *) allocptr; XLogCtl->xlblocks = (pg_atomic_uint64 *) allocptr;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); allocptr += sizeof(pg_atomic_uint64) * XLOGbuffers;
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
for (i = 0; i < XLOGbuffers; i++)
{
pg_atomic_init_u64(&XLogCtl->xlblocks[i], InvalidXLogRecPtr);
}
/* WAL insertion locks. Ensure they're aligned to the full padded size */ /* WAL insertion locks. Ensure they're aligned to the full padded size */
allocptr += sizeof(WALInsertLockPadded) - allocptr += sizeof(WALInsertLockPadded) -
@ -5750,7 +5748,7 @@ StartupXLOG(void)
memcpy(page, endOfRecoveryInfo->lastPage, len); memcpy(page, endOfRecoveryInfo->lastPage, len);
memset(page + len, 0, XLOG_BLCKSZ - len); memset(page + len, 0, XLOG_BLCKSZ - len);
XLogCtl->xlblocks[firstIdx] = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
} }
else else