/*------------------------------------------------------------------------- * * bulk_write.c * Efficiently and reliably populate a new relation * * The assumption is that no other backends access the relation while we are * loading it, so we can take some shortcuts. Pages already present in the * indicated fork when the bulk write operation is started are not modified * unless explicitly written to. Do not mix operations through the regular * buffer manager and the bulk loading interface! * * We bypass the buffer manager to avoid the locking overhead, and call * smgrextend() directly. A downside is that the pages will need to be * re-read into shared buffers on first use after the build finishes. That's * usually a good tradeoff for large relations, and for small relations, the * overhead isn't very significant compared to creating the relation in the * first place. * * The pages are WAL-logged if needed. To save on WAL header overhead, we * WAL-log several pages in one record. * * One tricky point is that because we bypass the buffer manager, we need to * register the relation for fsyncing at the next checkpoint ourselves, and * make sure that the relation is correctly fsync'd by us or the checkpointer * even if a checkpoint happens concurrently. * * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/storage/smgr/bulk_write.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/xloginsert.h" #include "access/xlogrecord.h" #include "storage/bufpage.h" #include "storage/bulk_write.h" #include "storage/proc.h" #include "storage/smgr.h" #include "utils/rel.h" #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID static const PGIOAlignedBlock zero_buffer = {0}; /* worth BLCKSZ */ typedef struct PendingWrite { BulkWriteBuffer buf; BlockNumber blkno; bool page_std; } PendingWrite; /* * Bulk writer state for one relation fork. */ struct BulkWriteState { /* Information about the target relation we're writing */ SMgrRelation smgr; ForkNumber forknum; bool use_wal; /* We keep several writes queued, and WAL-log them in batches */ int npending; PendingWrite pending_writes[MAX_PENDING_WRITES]; /* Current size of the relation */ BlockNumber relsize; /* The RedoRecPtr at the time that the bulk operation started */ XLogRecPtr start_RedoRecPtr; MemoryContext memcxt; }; static void smgr_bulk_flush(BulkWriteState *bulkstate); /* * Start a bulk write operation on a relation fork. */ BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum) { return smgr_bulk_start_smgr(RelationGetSmgr(rel), forknum, RelationNeedsWAL(rel) || forknum == INIT_FORKNUM); } /* * Start a bulk write operation on a relation fork. * * This is like smgr_bulk_start_rel, but can be used without a relcache entry. */ BulkWriteState * smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal) { BulkWriteState *state; state = palloc(sizeof(BulkWriteState)); state->smgr = smgr; state->forknum = forknum; state->use_wal = use_wal; state->npending = 0; state->relsize = smgrnblocks(smgr, forknum); state->start_RedoRecPtr = GetRedoRecPtr(); /* * Remember the memory context. We will use it to allocate all the * buffers later. */ state->memcxt = CurrentMemoryContext; return state; } /* * Finish bulk write operation. * * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs * the relation if needed. */ void smgr_bulk_finish(BulkWriteState *bulkstate) { /* WAL-log and flush any remaining pages */ smgr_bulk_flush(bulkstate); /* * Fsync the relation, or register it for the next checkpoint, if * necessary. */ if (SmgrIsTemp(bulkstate->smgr)) { /* Temporary relations don't need to be fsync'd, ever */ } else if (!bulkstate->use_wal) { /*---------- * This is either an unlogged relation, or a permanent relation but we * skipped WAL-logging because wal_level=minimal: * * A) Unlogged relation * * Unlogged relations will go away on crash, but they need to be * fsync'd on a clean shutdown. It's sufficient to call * smgrregistersync(), that ensures that the checkpointer will * flush it at the shutdown checkpoint. (It will flush it on the * next online checkpoint too, which is not strictly necessary.) * * Note that the init-fork of an unlogged relation is not * considered unlogged for our purposes. It's treated like a * regular permanent relation. The callers will pass use_wal=true * for the init fork. * * B) Permanent relation, WAL-logging skipped because wal_level=minimal * * This is a new relation, and we didn't WAL-log the pages as we * wrote, but they need to be fsync'd before commit. * * We don't need to do that here, however. The fsync() is done at * commit, by smgrDoPendingSyncs() (*). * * (*) smgrDoPendingSyncs() might decide to WAL-log the whole * relation at commit instead of fsyncing it, if the relation was * very small, but it's smgrDoPendingSyncs() responsibility in any * case. * * We cannot distinguish the two here, so conservatively assume it's * an unlogged relation. A permanent relation with wal_level=minimal * would require no actions, see above. */ smgrregistersync(bulkstate->smgr, bulkstate->forknum); } else { /* * Permanent relation, WAL-logged normally. * * We already WAL-logged all the pages, so they will be replayed from * WAL on crash. However, when we wrote out the pages, we passed * skipFsync=true to avoid the overhead of registering all the writes * with the checkpointer. Register the whole relation now. * * There is one hole in that idea: If a checkpoint occurred while we * were writing the pages, it already missed fsyncing the pages we had * written before the checkpoint started. A crash later on would * replay the WAL starting from the checkpoint, therefore it wouldn't * replay our earlier WAL records. So if a checkpoint started after * the bulk write, fsync the files now. */ /* * Prevent a checkpoint from starting between the GetRedoRecPtr() and * smgrregistersync() calls. */ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); MyProc->delayChkptFlags |= DELAY_CHKPT_START; if (bulkstate->start_RedoRecPtr != GetRedoRecPtr()) { /* * A checkpoint occurred and it didn't know about our writes, so * fsync() the relation ourselves. */ MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; smgrimmedsync(bulkstate->smgr, bulkstate->forknum); elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently"); } else { smgrregistersync(bulkstate->smgr, bulkstate->forknum); MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; } } } static int buffer_cmp(const void *a, const void *b) { const PendingWrite *bufa = (const PendingWrite *) a; const PendingWrite *bufb = (const PendingWrite *) b; /* We should not see duplicated writes for the same block */ Assert(bufa->blkno != bufb->blkno); if (bufa->blkno > bufb->blkno) return 1; else return -1; } /* * Finish all the pending writes. */ static void smgr_bulk_flush(BulkWriteState *bulkstate) { int npending = bulkstate->npending; PendingWrite *pending_writes = bulkstate->pending_writes; if (npending == 0) return; if (npending > 1) qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp); if (bulkstate->use_wal) { BlockNumber blknos[MAX_PENDING_WRITES]; Page pages[MAX_PENDING_WRITES]; bool page_std = true; for (int i = 0; i < npending; i++) { blknos[i] = pending_writes[i].blkno; pages[i] = pending_writes[i].buf->data; /* * If any of the pages use !page_std, we log them all as such. * That's a bit wasteful, but in practice, a mix of standard and * non-standard page layout is rare. None of the built-in AMs do * that. */ if (!pending_writes[i].page_std) page_std = false; } log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum, npending, blknos, pages, page_std); } for (int i = 0; i < npending; i++) { BlockNumber blkno = pending_writes[i].blkno; Page page = pending_writes[i].buf->data; PageSetChecksumInplace(page, blkno); if (blkno >= bulkstate->relsize) { /* * If we have to write pages nonsequentially, fill in the space * with zeroes until we come back and overwrite. This is not * logically necessary on standard Unix filesystems (unwritten * space will read as zeroes anyway), but it should help to avoid * fragmentation. The dummy pages aren't WAL-logged though. */ while (blkno > bulkstate->relsize) { /* don't set checksum for all-zero page */ smgrextend(bulkstate->smgr, bulkstate->forknum, bulkstate->relsize, &zero_buffer, true); bulkstate->relsize++; } smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true); bulkstate->relsize++; } else smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true); pfree(page); } bulkstate->npending = 0; } /* * Queue write of 'buf'. * * NB: this takes ownership of 'buf'! * * You are only allowed to write a given block once as part of one bulk write * operation. */ void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std) { PendingWrite *w; w = &bulkstate->pending_writes[bulkstate->npending++]; w->buf = buf; w->blkno = blocknum; w->page_std = page_std; if (bulkstate->npending == MAX_PENDING_WRITES) smgr_bulk_flush(bulkstate); } /* * Allocate a new buffer which can later be written with smgr_bulk_write(). * * There is no function to free the buffer. When you pass it to * smgr_bulk_write(), it takes ownership and frees it when it's no longer * needed. * * This is currently implemented as a simple palloc, but could be implemented * using a ring buffer or larger chunks in the future, so don't rely on it. */ BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate) { return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0); }