Heikki Linnakangas 077ad4bd76 Relax fsyncing at end of a bulk load that was not WAL-logged
And improve the comments.

Backpatch to v17 where this was introduced.

Reviewed-by: Noah Misch
Discussion: https://www.postgresql.org/message-id/cac7d1b6-8358-40be-af0b-21bc9b27d34c@iki.fi
2024-08-16 14:45:37 +03:00

349 lines
10 KiB
C

/*-------------------------------------------------------------------------
*
* bulk_write.c
* Efficiently and reliably populate a new relation
*
* The assumption is that no other backends access the relation while we are
* loading it, so we can take some shortcuts. Do not mix operations through
* the regular buffer manager and the bulk loading interface!
*
* We bypass the buffer manager to avoid the locking overhead, and call
* smgrextend() directly. A downside is that the pages will need to be
* re-read into shared buffers on first use after the build finishes. That's
* usually a good tradeoff for large relations, and for small relations, the
* overhead isn't very significant compared to creating the relation in the
* first place.
*
* The pages are WAL-logged if needed. To save on WAL header overhead, we
* WAL-log several pages in one record.
*
* One tricky point is that because we bypass the buffer manager, we need to
* register the relation for fsyncing at the next checkpoint ourselves, and
* make sure that the relation is correctly fsync'd by us or the checkpointer
* even if a checkpoint happens concurrently.
*
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/storage/smgr/bulk_write.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xloginsert.h"
#include "access/xlogrecord.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/bulk_write.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/rel.h"
#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
typedef struct PendingWrite
{
BulkWriteBuffer buf;
BlockNumber blkno;
bool page_std;
} PendingWrite;
/*
* Bulk writer state for one relation fork.
*/
struct BulkWriteState
{
/* Information about the target relation we're writing */
SMgrRelation smgr;
ForkNumber forknum;
bool use_wal;
/* We keep several writes queued, and WAL-log them in batches */
int npending;
PendingWrite pending_writes[MAX_PENDING_WRITES];
/* Current size of the relation */
BlockNumber pages_written;
/* The RedoRecPtr at the time that the bulk operation started */
XLogRecPtr start_RedoRecPtr;
MemoryContext memcxt;
};
static void smgr_bulk_flush(BulkWriteState *bulkstate);
/*
* Start a bulk write operation on a relation fork.
*/
BulkWriteState *
smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
{
return smgr_bulk_start_smgr(RelationGetSmgr(rel),
forknum,
RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
}
/*
* Start a bulk write operation on a relation fork.
*
* This is like smgr_bulk_start_rel, but can be used without a relcache entry.
*/
BulkWriteState *
smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
{
BulkWriteState *state;
state = palloc(sizeof(BulkWriteState));
state->smgr = smgr;
state->forknum = forknum;
state->use_wal = use_wal;
state->npending = 0;
state->pages_written = 0;
state->start_RedoRecPtr = GetRedoRecPtr();
/*
* Remember the memory context. We will use it to allocate all the
* buffers later.
*/
state->memcxt = CurrentMemoryContext;
return state;
}
/*
* Finish bulk write operation.
*
* This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
* the relation if needed.
*/
void
smgr_bulk_finish(BulkWriteState *bulkstate)
{
/* WAL-log and flush any remaining pages */
smgr_bulk_flush(bulkstate);
/*
* Fsync the relation, or register it for the next checkpoint, if
* necessary.
*/
if (SmgrIsTemp(bulkstate->smgr))
{
/* Temporary relations don't need to be fsync'd, ever */
}
else if (!bulkstate->use_wal)
{
/*----------
* This is either an unlogged relation, or a permanent relation but we
* skipped WAL-logging because wal_level=minimal:
*
* A) Unlogged relation
*
* Unlogged relations will go away on crash, but they need to be
* fsync'd on a clean shutdown. It's sufficient to call
* smgrregistersync(), that ensures that the checkpointer will
* flush it at the shutdown checkpoint. (It will flush it on the
* next online checkpoint too, which is not strictly necessary.)
*
* Note that the init-fork of an unlogged relation is not
* considered unlogged for our purposes. It's treated like a
* regular permanent relation. The callers will pass use_wal=true
* for the init fork.
*
* B) Permanent relation, WAL-logging skipped because wal_level=minimal
*
* This is a new relation, and we didn't WAL-log the pages as we
* wrote, but they need to be fsync'd before commit.
*
* We don't need to do that here, however. The fsync() is done at
* commit, by smgrDoPendingSyncs() (*).
*
* (*) smgrDoPendingSyncs() might decide to WAL-log the whole
* relation at commit instead of fsyncing it, if the relation was
* very small, but it's smgrDoPendingSyncs() responsibility in any
* case.
*
* We cannot distinguish the two here, so conservatively assume it's
* an unlogged relation. A permanent relation with wal_level=minimal
* would require no actions, see above.
*/
smgrregistersync(bulkstate->smgr, bulkstate->forknum);
}
else
{
/*
* Permanent relation, WAL-logged normally.
*
* We already WAL-logged all the pages, so they will be replayed from
* WAL on crash. However, when we wrote out the pages, we passed
* skipFsync=true to avoid the overhead of registering all the writes
* with the checkpointer. Register the whole relation now.
*
* There is one hole in that idea: If a checkpoint occurred while we
* were writing the pages, it already missed fsyncing the pages we had
* written before the checkpoint started. A crash later on would
* replay the WAL starting from the checkpoint, therefore it wouldn't
* replay our earlier WAL records. So if a checkpoint started after
* the bulk write, fsync the files now.
*/
/*
* Prevent a checkpoint from starting between the GetRedoRecPtr() and
* smgrregistersync() calls.
*/
Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
MyProc->delayChkptFlags |= DELAY_CHKPT_START;
if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
{
/*
* A checkpoint occurred and it didn't know about our writes, so
* fsync() the relation ourselves.
*/
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
}
else
{
smgrregistersync(bulkstate->smgr, bulkstate->forknum);
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
}
}
}
static int
buffer_cmp(const void *a, const void *b)
{
const PendingWrite *bufa = (const PendingWrite *) a;
const PendingWrite *bufb = (const PendingWrite *) b;
/* We should not see duplicated writes for the same block */
Assert(bufa->blkno != bufb->blkno);
if (bufa->blkno > bufb->blkno)
return 1;
else
return -1;
}
/*
* Finish all the pending writes.
*/
static void
smgr_bulk_flush(BulkWriteState *bulkstate)
{
int npending = bulkstate->npending;
PendingWrite *pending_writes = bulkstate->pending_writes;
if (npending == 0)
return;
if (npending > 1)
qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
if (bulkstate->use_wal)
{
BlockNumber blknos[MAX_PENDING_WRITES];
Page pages[MAX_PENDING_WRITES];
bool page_std = true;
for (int i = 0; i < npending; i++)
{
blknos[i] = pending_writes[i].blkno;
pages[i] = pending_writes[i].buf->data;
/*
* If any of the pages use !page_std, we log them all as such.
* That's a bit wasteful, but in practice, a mix of standard and
* non-standard page layout is rare. None of the built-in AMs do
* that.
*/
if (!pending_writes[i].page_std)
page_std = false;
}
log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
npending, blknos, pages, page_std);
}
for (int i = 0; i < npending; i++)
{
BlockNumber blkno = pending_writes[i].blkno;
Page page = pending_writes[i].buf->data;
PageSetChecksumInplace(page, blkno);
if (blkno >= bulkstate->pages_written)
{
/*
* If we have to write pages nonsequentially, fill in the space
* with zeroes until we come back and overwrite. This is not
* logically necessary on standard Unix filesystems (unwritten
* space will read as zeroes anyway), but it should help to avoid
* fragmentation. The dummy pages aren't WAL-logged though.
*/
while (blkno > bulkstate->pages_written)
{
/* don't set checksum for all-zero page */
smgrextend(bulkstate->smgr, bulkstate->forknum,
bulkstate->pages_written++,
&zero_buffer,
true);
}
smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
bulkstate->pages_written = pending_writes[i].blkno + 1;
}
else
smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
pfree(page);
}
bulkstate->npending = 0;
}
/*
* Queue write of 'buf'.
*
* NB: this takes ownership of 'buf'!
*
* You are only allowed to write a given block once as part of one bulk write
* operation.
*/
void
smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
{
PendingWrite *w;
w = &bulkstate->pending_writes[bulkstate->npending++];
w->buf = buf;
w->blkno = blocknum;
w->page_std = page_std;
if (bulkstate->npending == MAX_PENDING_WRITES)
smgr_bulk_flush(bulkstate);
}
/*
* Allocate a new buffer which can later be written with smgr_bulk_write().
*
* There is no function to free the buffer. When you pass it to
* smgr_bulk_write(), it takes ownership and frees it when it's no longer
* needed.
*
* This is currently implemented as a simple palloc, but could be implemented
* using a ring buffer or larger chunks in the future, so don't rely on it.
*/
BulkWriteBuffer
smgr_bulk_get_buf(BulkWriteState *bulkstate)
{
return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
}