diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b4499cda7b3..e3fb26f5abe 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -469,6 +469,7 @@ typedef struct XLogCtlData XLogRecPtr lastSegSwitchLSN; /* These are accessed using atomics -- info_lck not needed */ + pg_atomic_uint64 logInsertResult; /* last byte + 1 inserted to buffers */ pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ @@ -1499,6 +1500,7 @@ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; + XLogRecPtr inserted; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (MyProc == NULL) elog(PANIC, "cannot wait without a PGPROC structure"); + /* + * Check if there's any work to do. Use a barrier to ensure we get the + * freshest value. + */ + inserted = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult); + if (upto <= inserted) + return inserted; + /* Read the current insert position */ SpinLockAcquire(&Insert->insertpos_lck); bytepos = Insert->CurrBytePos; @@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) finishedUpto = insertingat; } + + /* + * Advance the limit we know to have been inserted and return the freshest + * value we know of, which might be beyond what we requested if somebody + * is concurrently doing this with an 'upto' pointer ahead of us. + */ + finishedUpto = pg_atomic_monotonic_advance_u64(&XLogCtl->logInsertResult, + finishedUpto); + return finishedUpto; } @@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; + XLogRecPtr inserted; Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + + /* + * Caller should ensure that the requested data has been inserted into WAL + * buffers before we try to read it. + */ + inserted = pg_atomic_read_u64(&XLogCtl->logInsertResult); + if (startptr + count > inserted) + ereport(ERROR, + errmsg("cannot read past end of generated WAL: requested %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(startptr + count), + LSN_FORMAT_ARGS(inserted))); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { XLogRecPtr Flush; XLogRecPtr Write; + XLogRecPtr Insert; Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); pg_read_barrier(); Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); + pg_read_barrier(); + Insert = pg_atomic_read_u64(&XLogCtl->logInsertResult); /* WAL written to disk is always ahead of WAL flushed */ Assert(Write >= Flush); + + /* WAL inserted to buffers is always ahead of WAL written */ + Assert(Insert >= Write); } #endif } @@ -4951,6 +4987,7 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); + pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); @@ -5979,6 +6016,7 @@ StartupXLOG(void) * because no other process can be reading or writing WAL yet. */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + pg_atomic_write_u64(&XLogCtl->logInsertResult, EndOfLog); pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog); pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index ff47782cdba..78987f3154a 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. Returns the latest value observed, which + * may or may not be the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline uint64 +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64_impl(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return currval; + } + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(&currval, 8); +#endif + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_)) + break; + } + + return Max(target_, currval); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */