diff options
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/Makefile | 2 | ||||
-rw-r--r-- | src/backend/access/transam/clog.c | 3 | ||||
-rw-r--r-- | src/backend/access/transam/multixact.c | 2 | ||||
-rw-r--r-- | src/backend/access/transam/twophase.c | 1 | ||||
-rw-r--r-- | src/backend/access/transam/varsup.c | 1 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 2 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 761 | ||||
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 633 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 2 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 121 |
10 files changed, 839 insertions, 689 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index eb6cfc5c44e..82a6c7695fc 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \ timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \ - xlogreader.o xlogutils.o + xloginsert.o xlogreader.o xlogutils.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 27ca4c65673..5ee070bd0a9 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -35,6 +35,9 @@ #include "access/clog.h" #include "access/slru.h" #include "access/transam.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" #include "miscadmin.h" #include "pg_trace.h" diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index 33346c76643..bfbe738530e 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -72,6 +72,8 @@ #include "access/twophase.h" #include "access/twophase_rmgr.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "funcapi.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c4069c39a20..d23c292edcd 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -48,6 +48,7 @@ #include "access/twophase_rmgr.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/xloginsert.h" #include "access/xlogutils.h" #include "catalog/pg_type.h" #include "catalog/storage.h" diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 7013fb894b4..d51cca406c7 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -17,6 +17,7 @@ #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" #include "commands/dbcommands.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 651a5c40f46..6f92bad07ca 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -25,6 +25,8 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" #include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/namespace.h" diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3160db72458..563d442a7a3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -31,6 +31,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xloginsert.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -300,14 +301,21 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; * (which is almost but not quite the same as a pointer to the most recent * CHECKPOINT record). We update this from the shared-memory copy, * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we - * hold an insertion lock). See XLogInsert for details. We are also allowed - * to update from XLogCtl->RedoRecPtr if we hold the info_lck; + * hold an insertion lock). See XLogInsertRecord for details. We are also + * allowed to update from XLogCtl->RedoRecPtr if we hold the info_lck; * see GetRedoRecPtr. A freshly spawned backend obtains the value during * InitXLOGAccess. */ static XLogRecPtr RedoRecPtr; /* + * doPageWrites is this backend's local copy of (forcePageWrites || + * fullPageWrites). It is used together with RedoRecPtr to decide whether + * a full-page image of a page need to be taken. + */ +static bool doPageWrites; + +/* * RedoStartLSN points to the checkpoint's REDO location which is specified * in a backup label file, backup history file or control file. In standby * mode, XLOG streaming usually starts from the position where an invalid @@ -419,7 +427,7 @@ typedef union WALInsertLockPadded } WALInsertLockPadded; /* - * Shared state data for XLogInsert. + * Shared state data for WAL insertion. */ typedef struct XLogCtlInsert { @@ -765,10 +773,6 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); -static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, - XLogRecPtr *lsn, BkpBlock *bkpb); -static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, - char *blk, bool get_cleanup_lock, bool keep_buffer); static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); @@ -831,226 +835,45 @@ static void WALInsertLockRelease(void); static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); /* - * Insert an XLOG record having the specified RMID and info bytes, - * with the body of the record being the data chunk(s) described by - * the rdata chain (see xlog.h for notes about rdata). + * Insert an XLOG record represented by an already-constructed chain of data + * chunks. This is a low-level routine; to construct the WAL record header + * and data, use the higher-level routines in xloginsert.c. + * + * If 'fpw_lsn' is valid, it is the oldest LSN among the pages that this + * WAL record applies to, that were not included in the record as full page + * images. If fpw_lsn >= RedoRecPtr, the function does not perform the + * insertion and returns InvalidXLogRecPtr. The caller can then recalculate + * which pages need a full-page image, and retry. If fpw_lsn is invalid, the + * record is always inserted. + * + * The first XLogRecData in the chain must be for the record header, and its + * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and + * xl_crc fields in the header, the rest of the header must already be filled + * by the caller. * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) - * - * NB: this routine feels free to scribble on the XLogRecData structs, - * though not on the data they reference. This is OK since the XLogRecData - * structs are always just temporaries in the calling code. */ XLogRecPtr -XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) +XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn) { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecData *rdt; - XLogRecData *rdt_lastnormal; - Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; - bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; - BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; - XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; - XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; - XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; - XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; - XLogRecData hdr_rdt; pg_crc32 rdata_crc; - uint32 len, - write_len; - unsigned i; - bool doPageWrites; - bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); bool inserted; - uint8 info_orig = info; - static XLogRecord *rechdr; + XLogRecord *rechdr = (XLogRecord *) rdata->data; + bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID && + rechdr->xl_info == XLOG_SWITCH); XLogRecPtr StartPos; XLogRecPtr EndPos; - if (rechdr == NULL) - { - static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF]; - - rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf); - MemSet(rechdr, 0, SizeOfXLogRecord); - } - /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) elog(ERROR, "cannot make new WAL entries during recovery"); - /* info's high bits are reserved for use by me */ - if (info & XLR_INFO_MASK) - elog(PANIC, "invalid xlog info mask %02X", info); - - TRACE_POSTGRESQL_XLOG_INSERT(rmid, info); - - /* - * In bootstrap mode, we don't actually log anything but XLOG resources; - * return a phony record pointer. - */ - if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) - { - EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ - return EndPos; - } - - /* - * Here we scan the rdata chain, to determine which buffers must be backed - * up. - * - * We may have to loop back to here if a race condition is detected below. - * We could prevent the race by doing all this work while holding an - * insertion lock, but it seems better to avoid doing CRC calculations - * while holding one. - * - * We add entries for backup blocks to the chain, so that they don't need - * any special treatment in the critical section where the chunks are - * copied into the WAL buffers. Those entries have to be unlinked from the - * chain if we have to loop back here. - */ -begin:; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - dtbuf[i] = InvalidBuffer; - dtbuf_bkp[i] = false; - } - - /* - * Decide if we need to do full-page writes in this XLOG record: true if - * full_page_writes is on or we have a PITR request for it. Since we - * don't yet have an insertion lock, fullPageWrites and forcePageWrites - * could change under us, but we'll recheck them once we have a lock. - */ - doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites; - - len = 0; - for (rdt = rdata;;) - { - if (rdt->buffer == InvalidBuffer) - { - /* Simple data, just include it */ - len += rdt->len; - } - else - { - /* Find info for buffer */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (rdt->buffer == dtbuf[i]) - { - /* Buffer already referenced by earlier chain item */ - if (dtbuf_bkp[i]) - { - rdt->data = NULL; - rdt->len = 0; - } - else if (rdt->data) - len += rdt->len; - break; - } - if (dtbuf[i] == InvalidBuffer) - { - /* OK, put it in this slot */ - dtbuf[i] = rdt->buffer; - if (doPageWrites && XLogCheckBuffer(rdt, true, - &(dtbuf_lsn[i]), &(dtbuf_xlg[i]))) - { - dtbuf_bkp[i] = true; - rdt->data = NULL; - rdt->len = 0; - } - else if (rdt->data) - len += rdt->len; - break; - } - } - if (i >= XLR_MAX_BKP_BLOCKS) - elog(PANIC, "can backup at most %d blocks per xlog record", - XLR_MAX_BKP_BLOCKS); - } - /* Break out of loop when rdt points to last chain item */ - if (rdt->next == NULL) - break; - rdt = rdt->next; - } - - /* - * NOTE: We disallow len == 0 because it provides a useful bit of extra - * error checking in ReadRecord. This means that all callers of - * XLogInsert must supply at least some not-in-a-buffer data. However, we - * make an exception for XLOG SWITCH records because we don't want them to - * ever cross a segment boundary. - */ - if (len == 0 && !isLogSwitch) - elog(PANIC, "invalid xlog record length %u", len); - - /* - * Make additional rdata chain entries for the backup blocks, so that we - * don't need to special-case them in the write loop. This modifies the - * original rdata chain, but we keep a pointer to the last regular entry, - * rdt_lastnormal, so that we can undo this if we have to loop back to the - * beginning. - * - * At the exit of this loop, write_len includes the backup block data. - * - * Also set the appropriate info bits to show which buffers were backed - * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer - * value (ignoring InvalidBuffer) appearing in the rdata chain. - */ - rdt_lastnormal = rdt; - write_len = len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - BkpBlock *bkpb; - char *page; - - if (!dtbuf_bkp[i]) - continue; - - info |= XLR_BKP_BLOCK(i); - - bkpb = &(dtbuf_xlg[i]); - page = (char *) BufferGetBlock(dtbuf[i]); - - rdt->next = &(dtbuf_rdt1[i]); - rdt = rdt->next; - - rdt->data = (char *) bkpb; - rdt->len = sizeof(BkpBlock); - write_len += sizeof(BkpBlock); - - rdt->next = &(dtbuf_rdt2[i]); - rdt = rdt->next; - - if (bkpb->hole_length == 0) - { - rdt->data = page; - rdt->len = BLCKSZ; - write_len += BLCKSZ; - rdt->next = NULL; - } - else - { - /* must skip the hole */ - rdt->data = page; - rdt->len = bkpb->hole_offset; - write_len += bkpb->hole_offset; - - rdt->next = &(dtbuf_rdt3[i]); - rdt = rdt->next; - - rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); - rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); - write_len += rdt->len; - rdt->next = NULL; - } - } - /* * Calculate CRC of the data, including all the backup blocks * @@ -1060,29 +883,15 @@ begin:; * header. */ INIT_CRC32C(rdata_crc); - for (rdt = rdata; rdt != NULL; rdt = rdt->next) + for (rdt = rdata->next; rdt != NULL; rdt = rdt->next) COMP_CRC32C(rdata_crc, rdt->data, rdt->len); /* - * Construct record header (prev-link is filled in later, after reserving - * the space for the record), and make that the first chunk in the chain. - * - * The CRC calculated for the header here doesn't include prev-link, - * because we don't know it yet. It will be added later. - */ - rechdr->xl_xid = GetCurrentTransactionIdIfAny(); - rechdr->xl_tot_len = SizeOfXLogRecord + write_len; - rechdr->xl_len = len; /* doesn't include backup blocks */ - rechdr->xl_info = info; - rechdr->xl_rmid = rmid; - rechdr->xl_prev = InvalidXLogRecPtr; + * Calculate CRC of the header, except for prev-link, because we don't + * know it yet. It will be added later. + */ COMP_CRC32C(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev)); - hdr_rdt.next = rdata; - hdr_rdt.data = (char *) rechdr; - hdr_rdt.len = SizeOfXLogRecord; - write_len += SizeOfXLogRecord; - /*---------- * * We have now done all the preparatory work we can without holding a @@ -1122,56 +931,33 @@ begin:; WALInsertLockAcquire(); /* - * Check to see if my RedoRecPtr is out of date. If so, may have to go - * back and recompute everything. This can only happen just after a - * checkpoint, so it's better to be slow in this case and fast otherwise. + * Check to see if my copy of RedoRecPtr or doPageWrites is out of date. + * If so, may have to go back and have the caller recompute everything. + * This can only happen just after a checkpoint, so it's better to be + * slow in this case and fast otherwise. * * If we aren't doing full-page writes then RedoRecPtr doesn't actually * affect the contents of the XLOG record, so we'll update our local copy - * but not force a recomputation. + * but not force a recomputation. (If doPageWrites was just turned off, + * we could recompute the record without full pages, but we choose not + * to bother.) */ if (RedoRecPtr != Insert->RedoRecPtr) { Assert(RedoRecPtr < Insert->RedoRecPtr); RedoRecPtr = Insert->RedoRecPtr; - - if (doPageWrites) - { - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (dtbuf[i] == InvalidBuffer) - continue; - if (dtbuf_bkp[i] == false && - dtbuf_lsn[i] <= RedoRecPtr) - { - /* - * Oops, this buffer now needs to be backed up, but we - * didn't think so above. Start over. - */ - WALInsertLockRelease(); - END_CRIT_SECTION(); - rdt_lastnormal->next = NULL; - info = info_orig; - goto begin; - } - } - } } + doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites); - /* - * Also check to see if fullPageWrites or forcePageWrites was just turned - * on; if we weren't already doing full-page writes then go back and - * recompute. (If it was just turned off, we could recompute the record - * without full pages, but we choose not to bother.) - */ - if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites) + if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr && doPageWrites) { - /* Oops, must redo it with full-page data. */ + /* + * Oops, some buffer now needs to be backed up that the caller + * didn't back up. Start over. + */ WALInsertLockRelease(); END_CRIT_SECTION(); - rdt_lastnormal->next = NULL; - info = info_orig; - goto begin; + return InvalidXLogRecPtr; } /* @@ -1182,7 +968,7 @@ begin:; inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); else { - ReserveXLogInsertLocation(write_len, &StartPos, &EndPos, + ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, &rechdr->xl_prev); inserted = true; } @@ -1201,7 +987,8 @@ begin:; * All the record data, including the header, is now ready to be * inserted. Copy the record in the space reserved. */ - CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos); + CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata, + StartPos, EndPos); } else { @@ -1437,7 +1224,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) } /* - * Subroutine of XLogInsert. Copies a WAL record to an already-reserved + * Subroutine of XLogInsertRecord. Copies a WAL record to an already-reserved * area in the WAL. */ static void @@ -2004,93 +1791,6 @@ XLogRecPtrToBytePos(XLogRecPtr ptr) } /* - * Determine whether the buffer referenced has to be backed up. - * - * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites - * could change later, so the result should be used for optimization purposes - * only. - */ -bool -XLogCheckBufferNeedsBackup(Buffer buffer) -{ - bool doPageWrites; - Page page; - - page = BufferGetPage(buffer); - - doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites; - - if (doPageWrites && PageGetLSN(page) <= RedoRecPtr) - return true; /* buffer requires backup */ - - return false; /* buffer does not need to be backed up */ -} - -/* - * Determine whether the buffer referenced by an XLogRecData item has to - * be backed up, and if so fill a BkpBlock struct for it. In any case - * save the buffer's LSN at *lsn. - */ -static bool -XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, - XLogRecPtr *lsn, BkpBlock *bkpb) -{ - Page page; - - page = BufferGetPage(rdata->buffer); - - /* - * We assume page LSN is first data on *every* page that can be passed to - * XLogInsert, whether it has the standard page layout or not. We don't - * need to take the buffer header lock for PageGetLSN if we hold an - * exclusive lock on the page and/or the relation. - */ - if (holdsExclusiveLock) - *lsn = PageGetLSN(page); - else - *lsn = BufferGetLSNAtomic(rdata->buffer); - - if (*lsn <= RedoRecPtr) - { - /* - * The page needs to be backed up, so set up *bkpb - */ - BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block); - - if (rdata->buffer_std) - { - /* Assume we can omit data between pd_lower and pd_upper */ - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb->hole_offset = lower; - bkpb->hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; - } - } - else - { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; - } - - return true; /* buffer requires backup */ - } - - return false; /* buffer does not need to be backed up */ -} - -/* * Initialize XLOG buffers, writing out old buffers if they still contain * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is * true, initialize as many pages as we can without having to write out @@ -3944,128 +3644,6 @@ CleanupBackupHistory(void) } /* - * Restore a full-page image from a backup block attached to an XLOG record. - * - * lsn: LSN of the XLOG record being replayed - * record: the complete XLOG record - * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1) - * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock - * keep_buffer: TRUE to return the buffer still locked and pinned - * - * Returns the buffer number containing the page. Note this is not terribly - * useful unless keep_buffer is specified as TRUE. - * - * Note: when a backup block is available in XLOG, we restore it - * unconditionally, even if the page in the database appears newer. - * This is to protect ourselves against database pages that were partially - * or incorrectly written during a crash. We assume that the XLOG data - * must be good because it has passed a CRC check, while the database - * page might not be. This will force us to replay all subsequent - * modifications of the page that appear in XLOG, rather than possibly - * ignoring them as already applied, but that's not a huge drawback. - * - * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer, - * else a normal exclusive lock is used. During crash recovery, that's just - * pro forma because there can't be any regular backends in the system, but - * in hot standby mode the distinction is important. - * - * If 'keep_buffer' is true, return without releasing the buffer lock and pin; - * then caller is responsible for doing UnlockReleaseBuffer() later. This - * is needed in some cases when replaying XLOG records that touch multiple - * pages, to prevent inconsistent states from being visible to other backends. - * (Again, that's only important in hot standby mode.) - */ -Buffer -RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, - bool get_cleanup_lock, bool keep_buffer) -{ - BkpBlock bkpb; - char *blk; - int i; - - /* Locate requested BkpBlock in the record */ - blk = (char *) XLogRecGetData(record) + record->xl_len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (!(record->xl_info & XLR_BKP_BLOCK(i))) - continue; - - memcpy(&bkpb, blk, sizeof(BkpBlock)); - blk += sizeof(BkpBlock); - - if (i == block_index) - { - /* Found it, apply the update */ - return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock, - keep_buffer); - } - - blk += BLCKSZ - bkpb.hole_length; - } - - /* Caller specified a bogus block_index */ - elog(ERROR, "failed to restore block_index %d", block_index); - return InvalidBuffer; /* keep compiler quiet */ -} - -/* - * Workhorse for RestoreBackupBlock usable without an xlog record - * - * Restores a full-page image from BkpBlock and a data pointer. - */ -static Buffer -RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, - bool get_cleanup_lock, bool keep_buffer) -{ - Buffer buffer; - Page page; - - buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block, - RBM_ZERO); - Assert(BufferIsValid(buffer)); - if (get_cleanup_lock) - LockBufferForCleanup(buffer); - else - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - - page = (Page) BufferGetPage(buffer); - - if (bkpb.hole_length == 0) - { - memcpy((char *) page, blk, BLCKSZ); - } - else - { - memcpy((char *) page, blk, bkpb.hole_offset); - /* must zero-fill the hole */ - MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length); - memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), - blk + bkpb.hole_offset, - BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); - } - - /* - * The checksum value on this page is currently invalid. We don't need to - * reset it here since it will be set before being written. - */ - - /* - * The page may be uninitialized. If so, we can't set the LSN because that - * would corrupt the page. - */ - if (!PageIsNew(page)) - { - PageSetLSN(page, lsn); - } - MarkBufferDirty(buffer); - - if (!keep_buffer) - UnlockReleaseBuffer(buffer); - - return buffer; -} - -/* * Attempt to read an XLOG record. * * If RecPtr is not NULL, try to read a record at that position. Otherwise @@ -6352,6 +5930,7 @@ StartupXLOG(void) lastFullPageWrites = checkPoint.fullPageWrites; RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; + doPageWrites = lastFullPageWrites; if (RecPtr < checkPoint.redo) ereport(PANIC, @@ -7606,12 +7185,16 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, void InitXLOGAccess(void) { + XLogCtlInsert *Insert = &XLogCtl->Insert; + /* ThisTimeLineID doesn't change so we need no lock to copy it */ ThisTimeLineID = XLogCtl->ThisTimeLineID; Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode()); /* Use GetRedoRecPtr to copy the RedoRecPtr safely */ (void) GetRedoRecPtr(); + /* Also update our copy of doPageWrites. */ + doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites); } /* @@ -7640,6 +7223,21 @@ GetRedoRecPtr(void) } /* + * Return information needed to decide whether a modified block needs a + * full-page image to be included in the WAL record. + * + * The returned values are cached copies from backend-private memory, and + * possibly out-of-date. XLogInsertRecord will re-check them against + * up-to-date values, while holding the WAL insert lock. + */ +void +GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p) +{ + *RedoRecPtr_p = RedoRecPtr; + *doPageWrites_p = doPageWrites; +} + +/* * GetInsertRecPtr -- Returns the current insert position. * * NOTE: The value *actually* returned is the position of the last full @@ -8793,218 +8391,6 @@ XLogRestorePoint(const char *rpName) } /* - * Write a backup block if needed when we are setting a hint. Note that - * this may be called for a variety of page types, not just heaps. - * - * Callable while holding just share lock on the buffer content. - * - * We can't use the plain backup block mechanism since that relies on the - * Buffer being exclusively locked. Since some modifications (setting LSN, hint - * bits) are allowed in a sharelocked buffer that can lead to wal checksum - * failures. So instead we copy the page and insert the copied data as normal - * record data. - * - * We only need to do something if page has not yet been full page written in - * this checkpoint round. The LSN of the inserted wal record is returned if we - * had to write, InvalidXLogRecPtr otherwise. - * - * It is possible that multiple concurrent backends could attempt to write WAL - * records. In that case, multiple copies of the same block would be recorded - * in separate WAL records by different backends, though that is still OK from - * a correctness perspective. - */ -XLogRecPtr -XLogSaveBufferForHint(Buffer buffer, bool buffer_std) -{ - XLogRecPtr recptr = InvalidXLogRecPtr; - XLogRecPtr lsn; - XLogRecData rdata[2]; - BkpBlock bkpb; - - /* - * Ensure no checkpoint can change our view of RedoRecPtr. - */ - Assert(MyPgXact->delayChkpt); - - /* - * Update RedoRecPtr so XLogCheckBuffer can make the right decision - */ - GetRedoRecPtr(); - - /* - * Setup phony rdata element for use within XLogCheckBuffer only. We reuse - * and reset rdata for any actual WAL record insert. - */ - rdata[0].buffer = buffer; - rdata[0].buffer_std = buffer_std; - - /* - * Check buffer while not holding an exclusive lock. - */ - if (XLogCheckBuffer(rdata, false, &lsn, &bkpb)) - { - char copied_buffer[BLCKSZ]; - char *origdata = (char *) BufferGetBlock(buffer); - - /* - * Copy buffer so we don't have to worry about concurrent hint bit or - * lsn updates. We assume pd_lower/upper cannot be changed without an - * exclusive lock, so the contents bkp are not racy. - * - * With buffer_std set to false, XLogCheckBuffer() sets hole_length - * and hole_offset to 0; so the following code is safe for either - * case. - */ - memcpy(copied_buffer, origdata, bkpb.hole_offset); - memcpy(copied_buffer + bkpb.hole_offset, - origdata + bkpb.hole_offset + bkpb.hole_length, - BLCKSZ - bkpb.hole_offset - bkpb.hole_length); - - /* - * Header for backup block. - */ - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - /* - * Save copy of the buffer. - */ - rdata[1].data = copied_buffer; - rdata[1].len = BLCKSZ - bkpb.hole_length; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); - } - - return recptr; -} - -/* - * Write a WAL record containing a full image of a page. Caller is responsible - * for writing the page to disk after calling this routine. - * - * Note: If you're using this function, you should be building pages in private - * memory and writing them directly to smgr. If you're using buffers, call - * log_newpage_buffer instead. - * - * If the page follows the standard page layout, with a PageHeader and unused - * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows - * the unused space to be left out from the WAL record, making it smaller. - */ -XLogRecPtr -log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, - Page page, bool page_std) -{ - BkpBlock bkpb; - XLogRecPtr recptr; - XLogRecData rdata[3]; - - /* NO ELOG(ERROR) from here till newpage op is logged */ - START_CRIT_SECTION(); - - bkpb.node = *rnode; - bkpb.fork = forkNum; - bkpb.block = blkno; - - if (page_std) - { - /* Assume we can omit data between pd_lower and pd_upper */ - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb.hole_offset = lower; - bkpb.hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - } - else - { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - if (bkpb.hole_length == 0) - { - rdata[1].data = (char *) page; - rdata[1].len = BLCKSZ; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - } - else - { - /* must skip the hole */ - rdata[1].data = (char *) page; - rdata[1].len = bkpb.hole_offset; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &rdata[2]; - - rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length); - rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length); - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; - } - - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); - - /* - * The page may be uninitialized. If so, we can't set the LSN because that - * would corrupt the page. - */ - if (!PageIsNew(page)) - { - PageSetLSN(page, recptr); - } - - END_CRIT_SECTION(); - - return recptr; -} - -/* - * Write a WAL record containing a full image of a page. - * - * Caller should initialize the buffer and mark it dirty before calling this - * function. This function will set the page LSN. - * - * If the page follows the standard page layout, with a PageHeader and unused - * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows - * the unused space to be left out from the WAL record, making it smaller. - */ -XLogRecPtr -log_newpage_buffer(Buffer buffer, bool page_std) -{ - Page page = BufferGetPage(buffer); - RelFileNode rnode; - ForkNumber forkNum; - BlockNumber blkno; - - /* Shared buffers should be modified in a critical section. */ - Assert(CritSectionCount > 0); - - BufferGetTag(buffer, &rnode, &forkNum, &blkno); - - return log_newpage(&rnode, forkNum, blkno, page, page_std); -} - -/* * Check if any of the GUC parameters that are critical for hot standby * have changed, and update the value in pg_control file if necessary. */ @@ -9757,7 +9143,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * the standby. * * We must hold all the insertion locks to change the value of - * forcePageWrites, to ensure adequate interlocking against XLogInsert(). + * forcePageWrites, to ensure adequate interlocking against + * XLogInsertRecord(). */ WALInsertLockAcquireExclusive(); if (exclusive) diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c new file mode 100644 index 00000000000..b83343bf5bd --- /dev/null +++ b/src/backend/access/transam/xloginsert.c @@ -0,0 +1,633 @@ +/*------------------------------------------------------------------------- + * + * xloginsert.c + * Functions for constructing WAL records + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/xloginsert.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" +#include "catalog/pg_control.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/proc.h" +#include "utils/memutils.h" +#include "pg_trace.h" + +static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, + XLogRecData *rdata, + XLogRecPtr RedoRecPtr, bool doPageWrites, + XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal); +static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb); + +/* + * Insert an XLOG record having the specified RMID and info bytes, + * with the body of the record being the data chunk(s) described by + * the rdata chain (see xloginsert.h for notes about rdata). + * + * Returns XLOG pointer to end of record (beginning of next record). + * This can be used as LSN for data pages affected by the logged action. + * (LSN is the XLOG point up to which the XLOG must be flushed to disk + * before the data page can be written out. This implements the basic + * WAL rule "write the log before the data".) + * + * NB: this routine feels free to scribble on the XLogRecData structs, + * though not on the data they reference. This is OK since the XLogRecData + * structs are always just temporaries in the calling code. + */ +XLogRecPtr +XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) +{ + XLogRecPtr RedoRecPtr; + bool doPageWrites; + XLogRecPtr EndPos; + XLogRecPtr fpw_lsn; + XLogRecData *rdt; + XLogRecData *rdt_lastnormal; + + /* info's high bits are reserved for use by me */ + if (info & XLR_INFO_MASK) + elog(PANIC, "invalid xlog info mask %02X", info); + + TRACE_POSTGRESQL_XLOG_INSERT(rmid, info); + + /* + * In bootstrap mode, we don't actually log anything but XLOG resources; + * return a phony record pointer. + */ + if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) + { + EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ + return EndPos; + } + + /* + * Get values needed to decide whether to do full-page writes. Since we + * don't yet have an insertion lock, these could change under us, but + * XLogInsertRecord will recheck them once it has a lock. + */ + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); + + /* + * Assemble an XLogRecData chain representing the WAL record, including + * any backup blocks needed. + * + * We may have to loop back to here if a race condition is detected in + * XLogInsertRecord. We could prevent the race by doing all this work + * while holding an insertion lock, but it seems better to avoid doing CRC + * calculations while holding one. + */ +retry: + rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites, + &fpw_lsn, &rdt_lastnormal); + + EndPos = XLogInsertRecord(rdt, fpw_lsn); + + if (EndPos == InvalidXLogRecPtr) + { + /* + * Undo the changes we made to the rdata chain, and retry. + * + * XXX: This doesn't undo *all* the changes; the XLogRecData + * entries for buffers that we had already decided to back up have + * had their data-pointers cleared. That's OK, as long as we + * decide to back them up on the next iteration as well. Hence, + * don't allow "doPageWrites" value to go from true to false after + * we've modified the rdata chain. + */ + bool newDoPageWrites; + + GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites); + doPageWrites = doPageWrites || newDoPageWrites; + rdt_lastnormal->next = NULL; + + goto retry; + } + + return EndPos; +} + +/* + * Assemble a full WAL record, including backup blocks, from an XLogRecData + * chain, ready for insertion with XLogInsertRecord(). The record header + * fields are filled in, except for the xl_prev field and CRC. + * + * The rdata chain is modified, adding entries for full-page images. + * *rdt_lastnormal is set to point to the last normal (ie. not added by + * this function) entry. It can be used to reset the chain to its original + * state. + * + * If the rdata chain contains any buffer references, and a full-page image + * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among + * such pages. This signals that the assembled record is only good for + * insertion on the assumption that the RedoRecPtr and doPageWrites values + * were up-to-date. + */ +static XLogRecData * +XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata, + XLogRecPtr RedoRecPtr, bool doPageWrites, + XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal) +{ + bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); + XLogRecData *rdt; + Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; + bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; + uint32 len, + total_len; + unsigned i; + + /* + * These need to be static because they are returned to the caller as part + * of the XLogRecData chain. + */ + static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; + static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; + static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; + static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; + static XLogRecData hdr_rdt; + static XLogRecord *rechdr; + + if (rechdr == NULL) + { + static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF]; + + rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf); + MemSet(rechdr, 0, SizeOfXLogRecord); + } + + /* The record begins with the header */ + hdr_rdt.data = (char *) rechdr; + hdr_rdt.len = SizeOfXLogRecord; + hdr_rdt.next = rdata; + total_len = SizeOfXLogRecord; + + /* + * Here we scan the rdata chain, to determine which buffers must be backed + * up. + * + * We add entries for backup blocks to the chain, so that they don't need + * any special treatment in the critical section where the chunks are + * copied into the WAL buffers. Those entries have to be unlinked from the + * chain if we have to loop back here. + */ + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + dtbuf[i] = InvalidBuffer; + dtbuf_bkp[i] = false; + } + + *fpw_lsn = InvalidXLogRecPtr; + len = 0; + for (rdt = rdata;;) + { + if (rdt->buffer == InvalidBuffer) + { + /* Simple data, just include it */ + len += rdt->len; + } + else + { + /* Find info for buffer */ + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + if (rdt->buffer == dtbuf[i]) + { + /* Buffer already referenced by earlier chain item */ + if (dtbuf_bkp[i]) + { + rdt->data = NULL; + rdt->len = 0; + } + else if (rdt->data) + len += rdt->len; + break; + } + if (dtbuf[i] == InvalidBuffer) + { + /* OK, put it in this slot */ + XLogRecPtr page_lsn; + bool needs_backup; + + dtbuf[i] = rdt->buffer; + + /* + * Determine whether the buffer has to be backed up. + * + * We assume page LSN is first data on *every* page that + * can be passed to XLogInsert, whether it has the + * standard page layout or not. We don't need to take the + * buffer header lock for PageGetLSN because we hold an + * exclusive lock on the page and/or the relation. + */ + page_lsn = PageGetLSN(BufferGetPage(rdt->buffer)); + if (!doPageWrites) + needs_backup = false; + else if (page_lsn <= RedoRecPtr) + needs_backup = true; + else + needs_backup = false; + + if (needs_backup) + { + /* + * The page needs to be backed up, so set up BkpBlock + */ + XLogFillBkpBlock(rdt->buffer, rdt->buffer_std, + &(dtbuf_xlg[i])); + dtbuf_bkp[i] = true; + rdt->data = NULL; + rdt->len = 0; + } + else + { + if (rdt->data) + len += rdt->len; + if (*fpw_lsn == InvalidXLogRecPtr || + page_lsn < *fpw_lsn) + { + *fpw_lsn = page_lsn; + } + } + break; + } + } + if (i >= XLR_MAX_BKP_BLOCKS) + elog(PANIC, "can backup at most %d blocks per xlog record", + XLR_MAX_BKP_BLOCKS); + } + /* Break out of loop when rdt points to last chain item */ + if (rdt->next == NULL) + break; + rdt = rdt->next; + } + total_len += len; + + /* + * Make additional rdata chain entries for the backup blocks, so that we + * don't need to special-case them in the write loop. This modifies the + * original rdata chain, but we keep a pointer to the last regular entry, + * rdt_lastnormal, so that we can undo this if we have to start over. + * + * At the exit of this loop, total_len includes the backup block data. + * + * Also set the appropriate info bits to show which buffers were backed + * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer + * value (ignoring InvalidBuffer) appearing in the rdata chain. + */ + *rdt_lastnormal = rdt; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + BkpBlock *bkpb; + char *page; + + if (!dtbuf_bkp[i]) + continue; + + info |= XLR_BKP_BLOCK(i); + + bkpb = &(dtbuf_xlg[i]); + page = (char *) BufferGetBlock(dtbuf[i]); + + rdt->next = &(dtbuf_rdt1[i]); + rdt = rdt->next; + + rdt->data = (char *) bkpb; + rdt->len = sizeof(BkpBlock); + total_len += sizeof(BkpBlock); + + rdt->next = &(dtbuf_rdt2[i]); + rdt = rdt->next; + + if (bkpb->hole_length == 0) + { + rdt->data = page; + rdt->len = BLCKSZ; + total_len += BLCKSZ; + rdt->next = NULL; + } + else + { + /* must skip the hole */ + rdt->data = page; + rdt->len = bkpb->hole_offset; + total_len += bkpb->hole_offset; + + rdt->next = &(dtbuf_rdt3[i]); + rdt = rdt->next; + + rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); + rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); + total_len += rdt->len; + rdt->next = NULL; + } + } + + /* + * We disallow len == 0 because it provides a useful bit of extra error + * checking in ReadRecord. This means that all callers of XLogInsert + * must supply at least some not-in-a-buffer data. However, we make an + * exception for XLOG SWITCH records because we don't want them to ever + * cross a segment boundary. + */ + if (len == 0 && !isLogSwitch) + elog(PANIC, "invalid xlog record length %u", rechdr->xl_len); + + /* + * Fill in the fields in the record header. Prev-link is filled in later, + * once we know where in the WAL the record will be inserted. CRC is also + * not calculated yet. + */ + rechdr->xl_xid = GetCurrentTransactionIdIfAny(); + rechdr->xl_tot_len = total_len; + rechdr->xl_len = len; /* doesn't include backup blocks */ + rechdr->xl_info = info; + rechdr->xl_rmid = rmid; + rechdr->xl_prev = InvalidXLogRecPtr; + + return &hdr_rdt; +} + +/* + * Determine whether the buffer referenced has to be backed up. + * + * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites + * could change later, so the result should be used for optimization purposes + * only. + */ +bool +XLogCheckBufferNeedsBackup(Buffer buffer) +{ + XLogRecPtr RedoRecPtr; + bool doPageWrites; + Page page; + + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); + + page = BufferGetPage(buffer); + + if (doPageWrites && PageGetLSN(page) <= RedoRecPtr) + return true; /* buffer requires backup */ + + return false; /* buffer does not need to be backed up */ +} + +/* + * Write a backup block if needed when we are setting a hint. Note that + * this may be called for a variety of page types, not just heaps. + * + * Callable while holding just share lock on the buffer content. + * + * We can't use the plain backup block mechanism since that relies on the + * Buffer being exclusively locked. Since some modifications (setting LSN, hint + * bits) are allowed in a sharelocked buffer that can lead to wal checksum + * failures. So instead we copy the page and insert the copied data as normal + * record data. + * + * We only need to do something if page has not yet been full page written in + * this checkpoint round. The LSN of the inserted wal record is returned if we + * had to write, InvalidXLogRecPtr otherwise. + * + * It is possible that multiple concurrent backends could attempt to write WAL + * records. In that case, multiple copies of the same block would be recorded + * in separate WAL records by different backends, though that is still OK from + * a correctness perspective. + */ +XLogRecPtr +XLogSaveBufferForHint(Buffer buffer, bool buffer_std) +{ + XLogRecPtr recptr = InvalidXLogRecPtr; + XLogRecPtr lsn; + XLogRecPtr RedoRecPtr; + + /* + * Ensure no checkpoint can change our view of RedoRecPtr. + */ + Assert(MyPgXact->delayChkpt); + + /* + * Update RedoRecPtr so that we can make the right decision + */ + RedoRecPtr = GetRedoRecPtr(); + + /* + * We assume page LSN is first data on *every* page that can be passed to + * XLogInsert, whether it has the standard page layout or not. Since we're + * only holding a share-lock on the page, we must take the buffer header + * lock when we look at the LSN. + */ + lsn = BufferGetLSNAtomic(buffer); + + if (lsn <= RedoRecPtr) + { + XLogRecData rdata[2]; + BkpBlock bkpb; + char copied_buffer[BLCKSZ]; + char *origdata = (char *) BufferGetBlock(buffer); + + /* Make a BkpBlock struct representing the buffer */ + XLogFillBkpBlock(buffer, buffer_std, &bkpb); + + /* + * Copy buffer so we don't have to worry about concurrent hint bit or + * lsn updates. We assume pd_lower/upper cannot be changed without an + * exclusive lock, so the contents bkp are not racy. + * + * With buffer_std set to false, XLogFillBkpBlock() sets hole_length + * and hole_offset to 0; so the following code is safe for either + * case. + */ + memcpy(copied_buffer, origdata, bkpb.hole_offset); + memcpy(copied_buffer + bkpb.hole_offset, + origdata + bkpb.hole_offset + bkpb.hole_length, + BLCKSZ - bkpb.hole_offset - bkpb.hole_length); + + /* + * Header for backup block. + */ + rdata[0].data = (char *) &bkpb; + rdata[0].len = sizeof(BkpBlock); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + /* + * Save copy of the buffer. + */ + rdata[1].data = copied_buffer; + rdata[1].len = BLCKSZ - bkpb.hole_length; + rdata[1].buffer = InvalidBuffer; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + } + + return recptr; +} + +/* + * Write a WAL record containing a full image of a page. Caller is responsible + * for writing the page to disk after calling this routine. + * + * Note: If you're using this function, you should be building pages in private + * memory and writing them directly to smgr. If you're using buffers, call + * log_newpage_buffer instead. + * + * If the page follows the standard page layout, with a PageHeader and unused + * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows + * the unused space to be left out from the WAL record, making it smaller. + */ +XLogRecPtr +log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, + Page page, bool page_std) +{ + BkpBlock bkpb; + XLogRecPtr recptr; + XLogRecData rdata[3]; + + /* NO ELOG(ERROR) from here till newpage op is logged */ + START_CRIT_SECTION(); + + bkpb.node = *rnode; + bkpb.fork = forkNum; + bkpb.block = blkno; + + if (page_std) + { + /* Assume we can omit data between pd_lower and pd_upper */ + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + if (lower >= SizeOfPageHeaderData && + upper > lower && + upper <= BLCKSZ) + { + bkpb.hole_offset = lower; + bkpb.hole_length = upper - lower; + } + else + { + /* No "hole" to compress out */ + bkpb.hole_offset = 0; + bkpb.hole_length = 0; + } + } + else + { + /* Not a standard page header, don't try to eliminate "hole" */ + bkpb.hole_offset = 0; + bkpb.hole_length = 0; + } + + rdata[0].data = (char *) &bkpb; + rdata[0].len = sizeof(BkpBlock); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + if (bkpb.hole_length == 0) + { + rdata[1].data = (char *) page; + rdata[1].len = BLCKSZ; + rdata[1].buffer = InvalidBuffer; + rdata[1].next = NULL; + } + else + { + /* must skip the hole */ + rdata[1].data = (char *) page; + rdata[1].len = bkpb.hole_offset; + rdata[1].buffer = InvalidBuffer; + rdata[1].next = &rdata[2]; + + rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length); + rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length); + rdata[2].buffer = InvalidBuffer; + rdata[2].next = NULL; + } + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + + /* + * The page may be uninitialized. If so, we can't set the LSN because that + * would corrupt the page. + */ + if (!PageIsNew(page)) + { + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + return recptr; +} + +/* + * Write a WAL record containing a full image of a page. + * + * Caller should initialize the buffer and mark it dirty before calling this + * function. This function will set the page LSN. + * + * If the page follows the standard page layout, with a PageHeader and unused + * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows + * the unused space to be left out from the WAL record, making it smaller. + */ +XLogRecPtr +log_newpage_buffer(Buffer buffer, bool page_std) +{ + Page page = BufferGetPage(buffer); + RelFileNode rnode; + ForkNumber forkNum; + BlockNumber blkno; + + /* Shared buffers should be modified in a critical section. */ + Assert(CritSectionCount > 0); + + BufferGetTag(buffer, &rnode, &forkNum, &blkno); + + return log_newpage(&rnode, forkNum, blkno, page, page_std); +} + +/* + * Fill a BkpBlock for a buffer. + */ +static void +XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb) +{ + BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block); + + if (buffer_std) + { + /* Assume we can omit data between pd_lower and pd_upper */ + Page page = BufferGetPage(buffer); + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + if (lower >= SizeOfPageHeaderData && + upper > lower && + upper <= BLCKSZ) + { + bkpb->hole_offset = lower; + bkpb->hole_length = upper - lower; + } + else + { + /* No "hole" to compress out */ + bkpb->hole_offset = 0; + bkpb->hole_length = 0; + } + } + else + { + /* Not a standard page header, don't try to eliminate "hole" */ + bkpb->hole_offset = 0; + bkpb->hole_length = 0; + } +} diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index da7ed92941c..7d573cc585d 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -16,7 +16,7 @@ #include "postgres.h" #include "access/transam.h" -#include "access/xlog.h" +#include "access/xlogrecord.h" #include "access/xlog_internal.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index ef827dbc404..1a21dac8538 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -456,6 +456,127 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, return buffer; } +/* + * Restore a full-page image from a backup block attached to an XLOG record. + * + * lsn: LSN of the XLOG record being replayed + * record: the complete XLOG record + * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1) + * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock + * keep_buffer: TRUE to return the buffer still locked and pinned + * + * Returns the buffer number containing the page. Note this is not terribly + * useful unless keep_buffer is specified as TRUE. + * + * Note: when a backup block is available in XLOG, we restore it + * unconditionally, even if the page in the database appears newer. + * This is to protect ourselves against database pages that were partially + * or incorrectly written during a crash. We assume that the XLOG data + * must be good because it has passed a CRC check, while the database + * page might not be. This will force us to replay all subsequent + * modifications of the page that appear in XLOG, rather than possibly + * ignoring them as already applied, but that's not a huge drawback. + * + * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer, + * else a normal exclusive lock is used. During crash recovery, that's just + * pro forma because there can't be any regular backends in the system, but + * in hot standby mode the distinction is important. + * + * If 'keep_buffer' is true, return without releasing the buffer lock and pin; + * then caller is responsible for doing UnlockReleaseBuffer() later. This + * is needed in some cases when replaying XLOG records that touch multiple + * pages, to prevent inconsistent states from being visible to other backends. + * (Again, that's only important in hot standby mode.) + */ +Buffer +RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, + bool get_cleanup_lock, bool keep_buffer) +{ + BkpBlock bkpb; + char *blk; + int i; + + /* Locate requested BkpBlock in the record */ + blk = (char *) XLogRecGetData(record) + record->xl_len; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + if (!(record->xl_info & XLR_BKP_BLOCK(i))) + continue; + + memcpy(&bkpb, blk, sizeof(BkpBlock)); + blk += sizeof(BkpBlock); + + if (i == block_index) + { + /* Found it, apply the update */ + return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock, + keep_buffer); + } + + blk += BLCKSZ - bkpb.hole_length; + } + + /* Caller specified a bogus block_index */ + elog(ERROR, "failed to restore block_index %d", block_index); + return InvalidBuffer; /* keep compiler quiet */ +} + +/* + * Workhorse for RestoreBackupBlock usable without an xlog record + * + * Restores a full-page image from BkpBlock and a data pointer. + */ +Buffer +RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, + bool get_cleanup_lock, bool keep_buffer) +{ + Buffer buffer; + Page page; + + buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block, + RBM_ZERO); + Assert(BufferIsValid(buffer)); + if (get_cleanup_lock) + LockBufferForCleanup(buffer); + else + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + page = (Page) BufferGetPage(buffer); + + if (bkpb.hole_length == 0) + { + memcpy((char *) page, blk, BLCKSZ); + } + else + { + memcpy((char *) page, blk, bkpb.hole_offset); + /* must zero-fill the hole */ + MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length); + memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), + blk + bkpb.hole_offset, + BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); + } + + /* + * The checksum value on this page is currently invalid. We don't need to + * reset it here since it will be set before being written. + */ + + /* + * The page may be uninitialized. If so, we can't set the LSN because that + * would corrupt the page. + */ + if (!PageIsNew(page)) + { + PageSetLSN(page, lsn); + } + MarkBufferDirty(buffer); + + if (!keep_buffer) + UnlockReleaseBuffer(buffer); + + return buffer; +} /* * Struct actually returned by XLogFakeRelcacheEntry, though the declared |