aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c761
1 files changed, 74 insertions, 687 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3160db72458..563d442a7a3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -31,6 +31,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
@@ -300,14 +301,21 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold an insertion lock). See XLogInsert for details. We are also allowed
- * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
+ * hold an insertion lock). See XLogInsertRecord for details. We are also
+ * allowed to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
*/
static XLogRecPtr RedoRecPtr;
/*
+ * doPageWrites is this backend's local copy of (forcePageWrites ||
+ * fullPageWrites). It is used together with RedoRecPtr to decide whether
+ * a full-page image of a page need to be taken.
+ */
+static bool doPageWrites;
+
+/*
* RedoStartLSN points to the checkpoint's REDO location which is specified
* in a backup label file, backup history file or control file. In standby
* mode, XLOG streaming usually starts from the position where an invalid
@@ -419,7 +427,7 @@ typedef union WALInsertLockPadded
} WALInsertLockPadded;
/*
- * Shared state data for XLogInsert.
+ * Shared state data for WAL insertion.
*/
typedef struct XLogCtlInsert
{
@@ -765,10 +773,6 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
-static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
- XLogRecPtr *lsn, BkpBlock *bkpb);
-static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
- char *blk, bool get_cleanup_lock, bool keep_buffer);
static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
@@ -831,226 +835,45 @@ static void WALInsertLockRelease(void);
static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
/*
- * Insert an XLOG record having the specified RMID and info bytes,
- * with the body of the record being the data chunk(s) described by
- * the rdata chain (see xlog.h for notes about rdata).
+ * Insert an XLOG record represented by an already-constructed chain of data
+ * chunks. This is a low-level routine; to construct the WAL record header
+ * and data, use the higher-level routines in xloginsert.c.
+ *
+ * If 'fpw_lsn' is valid, it is the oldest LSN among the pages that this
+ * WAL record applies to, that were not included in the record as full page
+ * images. If fpw_lsn >= RedoRecPtr, the function does not perform the
+ * insertion and returns InvalidXLogRecPtr. The caller can then recalculate
+ * which pages need a full-page image, and retry. If fpw_lsn is invalid, the
+ * record is always inserted.
+ *
+ * The first XLogRecData in the chain must be for the record header, and its
+ * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and
+ * xl_crc fields in the header, the rest of the header must already be filled
+ * by the caller.
*
* Returns XLOG pointer to end of record (beginning of next record).
* This can be used as LSN for data pages affected by the logged action.
* (LSN is the XLOG point up to which the XLOG must be flushed to disk
* before the data page can be written out. This implements the basic
* WAL rule "write the log before the data".)
- *
- * NB: this routine feels free to scribble on the XLogRecData structs,
- * though not on the data they reference. This is OK since the XLogRecData
- * structs are always just temporaries in the calling code.
*/
XLogRecPtr
-XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
+XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecData *rdt;
- XLogRecData *rdt_lastnormal;
- Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
- bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
- BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
- XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
- XLogRecData hdr_rdt;
pg_crc32 rdata_crc;
- uint32 len,
- write_len;
- unsigned i;
- bool doPageWrites;
- bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
bool inserted;
- uint8 info_orig = info;
- static XLogRecord *rechdr;
+ XLogRecord *rechdr = (XLogRecord *) rdata->data;
+ bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
+ rechdr->xl_info == XLOG_SWITCH);
XLogRecPtr StartPos;
XLogRecPtr EndPos;
- if (rechdr == NULL)
- {
- static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
-
- rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
- MemSet(rechdr, 0, SizeOfXLogRecord);
- }
-
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
- /* info's high bits are reserved for use by me */
- if (info & XLR_INFO_MASK)
- elog(PANIC, "invalid xlog info mask %02X", info);
-
- TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
-
- /*
- * In bootstrap mode, we don't actually log anything but XLOG resources;
- * return a phony record pointer.
- */
- if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
- {
- EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
- return EndPos;
- }
-
- /*
- * Here we scan the rdata chain, to determine which buffers must be backed
- * up.
- *
- * We may have to loop back to here if a race condition is detected below.
- * We could prevent the race by doing all this work while holding an
- * insertion lock, but it seems better to avoid doing CRC calculations
- * while holding one.
- *
- * We add entries for backup blocks to the chain, so that they don't need
- * any special treatment in the critical section where the chunks are
- * copied into the WAL buffers. Those entries have to be unlinked from the
- * chain if we have to loop back here.
- */
-begin:;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- dtbuf[i] = InvalidBuffer;
- dtbuf_bkp[i] = false;
- }
-
- /*
- * Decide if we need to do full-page writes in this XLOG record: true if
- * full_page_writes is on or we have a PITR request for it. Since we
- * don't yet have an insertion lock, fullPageWrites and forcePageWrites
- * could change under us, but we'll recheck them once we have a lock.
- */
- doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
-
- len = 0;
- for (rdt = rdata;;)
- {
- if (rdt->buffer == InvalidBuffer)
- {
- /* Simple data, just include it */
- len += rdt->len;
- }
- else
- {
- /* Find info for buffer */
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (rdt->buffer == dtbuf[i])
- {
- /* Buffer already referenced by earlier chain item */
- if (dtbuf_bkp[i])
- {
- rdt->data = NULL;
- rdt->len = 0;
- }
- else if (rdt->data)
- len += rdt->len;
- break;
- }
- if (dtbuf[i] == InvalidBuffer)
- {
- /* OK, put it in this slot */
- dtbuf[i] = rdt->buffer;
- if (doPageWrites && XLogCheckBuffer(rdt, true,
- &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
- {
- dtbuf_bkp[i] = true;
- rdt->data = NULL;
- rdt->len = 0;
- }
- else if (rdt->data)
- len += rdt->len;
- break;
- }
- }
- if (i >= XLR_MAX_BKP_BLOCKS)
- elog(PANIC, "can backup at most %d blocks per xlog record",
- XLR_MAX_BKP_BLOCKS);
- }
- /* Break out of loop when rdt points to last chain item */
- if (rdt->next == NULL)
- break;
- rdt = rdt->next;
- }
-
- /*
- * NOTE: We disallow len == 0 because it provides a useful bit of extra
- * error checking in ReadRecord. This means that all callers of
- * XLogInsert must supply at least some not-in-a-buffer data. However, we
- * make an exception for XLOG SWITCH records because we don't want them to
- * ever cross a segment boundary.
- */
- if (len == 0 && !isLogSwitch)
- elog(PANIC, "invalid xlog record length %u", len);
-
- /*
- * Make additional rdata chain entries for the backup blocks, so that we
- * don't need to special-case them in the write loop. This modifies the
- * original rdata chain, but we keep a pointer to the last regular entry,
- * rdt_lastnormal, so that we can undo this if we have to loop back to the
- * beginning.
- *
- * At the exit of this loop, write_len includes the backup block data.
- *
- * Also set the appropriate info bits to show which buffers were backed
- * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
- * value (ignoring InvalidBuffer) appearing in the rdata chain.
- */
- rdt_lastnormal = rdt;
- write_len = len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- BkpBlock *bkpb;
- char *page;
-
- if (!dtbuf_bkp[i])
- continue;
-
- info |= XLR_BKP_BLOCK(i);
-
- bkpb = &(dtbuf_xlg[i]);
- page = (char *) BufferGetBlock(dtbuf[i]);
-
- rdt->next = &(dtbuf_rdt1[i]);
- rdt = rdt->next;
-
- rdt->data = (char *) bkpb;
- rdt->len = sizeof(BkpBlock);
- write_len += sizeof(BkpBlock);
-
- rdt->next = &(dtbuf_rdt2[i]);
- rdt = rdt->next;
-
- if (bkpb->hole_length == 0)
- {
- rdt->data = page;
- rdt->len = BLCKSZ;
- write_len += BLCKSZ;
- rdt->next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdt->data = page;
- rdt->len = bkpb->hole_offset;
- write_len += bkpb->hole_offset;
-
- rdt->next = &(dtbuf_rdt3[i]);
- rdt = rdt->next;
-
- rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
- rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
- write_len += rdt->len;
- rdt->next = NULL;
- }
- }
-
/*
* Calculate CRC of the data, including all the backup blocks
*
@@ -1060,29 +883,15 @@ begin:;
* header.
*/
INIT_CRC32C(rdata_crc);
- for (rdt = rdata; rdt != NULL; rdt = rdt->next)
+ for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
- * Construct record header (prev-link is filled in later, after reserving
- * the space for the record), and make that the first chunk in the chain.
- *
- * The CRC calculated for the header here doesn't include prev-link,
- * because we don't know it yet. It will be added later.
- */
- rechdr->xl_xid = GetCurrentTransactionIdIfAny();
- rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
- rechdr->xl_len = len; /* doesn't include backup blocks */
- rechdr->xl_info = info;
- rechdr->xl_rmid = rmid;
- rechdr->xl_prev = InvalidXLogRecPtr;
+ * Calculate CRC of the header, except for prev-link, because we don't
+ * know it yet. It will be added later.
+ */
COMP_CRC32C(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
- hdr_rdt.next = rdata;
- hdr_rdt.data = (char *) rechdr;
- hdr_rdt.len = SizeOfXLogRecord;
- write_len += SizeOfXLogRecord;
-
/*----------
*
* We have now done all the preparatory work we can without holding a
@@ -1122,56 +931,33 @@ begin:;
WALInsertLockAcquire();
/*
- * Check to see if my RedoRecPtr is out of date. If so, may have to go
- * back and recompute everything. This can only happen just after a
- * checkpoint, so it's better to be slow in this case and fast otherwise.
+ * Check to see if my copy of RedoRecPtr or doPageWrites is out of date.
+ * If so, may have to go back and have the caller recompute everything.
+ * This can only happen just after a checkpoint, so it's better to be
+ * slow in this case and fast otherwise.
*
* If we aren't doing full-page writes then RedoRecPtr doesn't actually
* affect the contents of the XLOG record, so we'll update our local copy
- * but not force a recomputation.
+ * but not force a recomputation. (If doPageWrites was just turned off,
+ * we could recompute the record without full pages, but we choose not
+ * to bother.)
*/
if (RedoRecPtr != Insert->RedoRecPtr)
{
Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
-
- if (doPageWrites)
- {
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (dtbuf[i] == InvalidBuffer)
- continue;
- if (dtbuf_bkp[i] == false &&
- dtbuf_lsn[i] <= RedoRecPtr)
- {
- /*
- * Oops, this buffer now needs to be backed up, but we
- * didn't think so above. Start over.
- */
- WALInsertLockRelease();
- END_CRIT_SECTION();
- rdt_lastnormal->next = NULL;
- info = info_orig;
- goto begin;
- }
- }
- }
}
+ doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
- /*
- * Also check to see if fullPageWrites or forcePageWrites was just turned
- * on; if we weren't already doing full-page writes then go back and
- * recompute. (If it was just turned off, we could recompute the record
- * without full pages, but we choose not to bother.)
- */
- if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
+ if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr && doPageWrites)
{
- /* Oops, must redo it with full-page data. */
+ /*
+ * Oops, some buffer now needs to be backed up that the caller
+ * didn't back up. Start over.
+ */
WALInsertLockRelease();
END_CRIT_SECTION();
- rdt_lastnormal->next = NULL;
- info = info_orig;
- goto begin;
+ return InvalidXLogRecPtr;
}
/*
@@ -1182,7 +968,7 @@ begin:;
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
- ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+ ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
@@ -1201,7 +987,8 @@ begin:;
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
- CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+ CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
+ StartPos, EndPos);
}
else
{
@@ -1437,7 +1224,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
}
/*
- * Subroutine of XLogInsert. Copies a WAL record to an already-reserved
+ * Subroutine of XLogInsertRecord. Copies a WAL record to an already-reserved
* area in the WAL.
*/
static void
@@ -2004,93 +1791,6 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
}
/*
- * Determine whether the buffer referenced has to be backed up.
- *
- * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
- * could change later, so the result should be used for optimization purposes
- * only.
- */
-bool
-XLogCheckBufferNeedsBackup(Buffer buffer)
-{
- bool doPageWrites;
- Page page;
-
- page = BufferGetPage(buffer);
-
- doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
-
- if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
- return true; /* buffer requires backup */
-
- return false; /* buffer does not need to be backed up */
-}
-
-/*
- * Determine whether the buffer referenced by an XLogRecData item has to
- * be backed up, and if so fill a BkpBlock struct for it. In any case
- * save the buffer's LSN at *lsn.
- */
-static bool
-XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
- XLogRecPtr *lsn, BkpBlock *bkpb)
-{
- Page page;
-
- page = BufferGetPage(rdata->buffer);
-
- /*
- * We assume page LSN is first data on *every* page that can be passed to
- * XLogInsert, whether it has the standard page layout or not. We don't
- * need to take the buffer header lock for PageGetLSN if we hold an
- * exclusive lock on the page and/or the relation.
- */
- if (holdsExclusiveLock)
- *lsn = PageGetLSN(page);
- else
- *lsn = BufferGetLSNAtomic(rdata->buffer);
-
- if (*lsn <= RedoRecPtr)
- {
- /*
- * The page needs to be backed up, so set up *bkpb
- */
- BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
-
- if (rdata->buffer_std)
- {
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb->hole_offset = lower;
- bkpb->hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
- }
-
- return true; /* buffer requires backup */
- }
-
- return false; /* buffer does not need to be backed up */
-}
-
-/*
* Initialize XLOG buffers, writing out old buffers if they still contain
* unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
* true, initialize as many pages as we can without having to write out
@@ -3944,128 +3644,6 @@ CleanupBackupHistory(void)
}
/*
- * Restore a full-page image from a backup block attached to an XLOG record.
- *
- * lsn: LSN of the XLOG record being replayed
- * record: the complete XLOG record
- * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1)
- * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock
- * keep_buffer: TRUE to return the buffer still locked and pinned
- *
- * Returns the buffer number containing the page. Note this is not terribly
- * useful unless keep_buffer is specified as TRUE.
- *
- * Note: when a backup block is available in XLOG, we restore it
- * unconditionally, even if the page in the database appears newer.
- * This is to protect ourselves against database pages that were partially
- * or incorrectly written during a crash. We assume that the XLOG data
- * must be good because it has passed a CRC check, while the database
- * page might not be. This will force us to replay all subsequent
- * modifications of the page that appear in XLOG, rather than possibly
- * ignoring them as already applied, but that's not a huge drawback.
- *
- * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer,
- * else a normal exclusive lock is used. During crash recovery, that's just
- * pro forma because there can't be any regular backends in the system, but
- * in hot standby mode the distinction is important.
- *
- * If 'keep_buffer' is true, return without releasing the buffer lock and pin;
- * then caller is responsible for doing UnlockReleaseBuffer() later. This
- * is needed in some cases when replaying XLOG records that touch multiple
- * pages, to prevent inconsistent states from being visible to other backends.
- * (Again, that's only important in hot standby mode.)
- */
-Buffer
-RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
- bool get_cleanup_lock, bool keep_buffer)
-{
- BkpBlock bkpb;
- char *blk;
- int i;
-
- /* Locate requested BkpBlock in the record */
- blk = (char *) XLogRecGetData(record) + record->xl_len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- memcpy(&bkpb, blk, sizeof(BkpBlock));
- blk += sizeof(BkpBlock);
-
- if (i == block_index)
- {
- /* Found it, apply the update */
- return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
- keep_buffer);
- }
-
- blk += BLCKSZ - bkpb.hole_length;
- }
-
- /* Caller specified a bogus block_index */
- elog(ERROR, "failed to restore block_index %d", block_index);
- return InvalidBuffer; /* keep compiler quiet */
-}
-
-/*
- * Workhorse for RestoreBackupBlock usable without an xlog record
- *
- * Restores a full-page image from BkpBlock and a data pointer.
- */
-static Buffer
-RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
- bool get_cleanup_lock, bool keep_buffer)
-{
- Buffer buffer;
- Page page;
-
- buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
- RBM_ZERO);
- Assert(BufferIsValid(buffer));
- if (get_cleanup_lock)
- LockBufferForCleanup(buffer);
- else
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
- page = (Page) BufferGetPage(buffer);
-
- if (bkpb.hole_length == 0)
- {
- memcpy((char *) page, blk, BLCKSZ);
- }
- else
- {
- memcpy((char *) page, blk, bkpb.hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
- memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
- blk + bkpb.hole_offset,
- BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
- }
-
- /*
- * The checksum value on this page is currently invalid. We don't need to
- * reset it here since it will be set before being written.
- */
-
- /*
- * The page may be uninitialized. If so, we can't set the LSN because that
- * would corrupt the page.
- */
- if (!PageIsNew(page))
- {
- PageSetLSN(page, lsn);
- }
- MarkBufferDirty(buffer);
-
- if (!keep_buffer)
- UnlockReleaseBuffer(buffer);
-
- return buffer;
-}
-
-/*
* Attempt to read an XLOG record.
*
* If RecPtr is not NULL, try to read a record at that position. Otherwise
@@ -6352,6 +5930,7 @@ StartupXLOG(void)
lastFullPageWrites = checkPoint.fullPageWrites;
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+ doPageWrites = lastFullPageWrites;
if (RecPtr < checkPoint.redo)
ereport(PANIC,
@@ -7606,12 +7185,16 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
void
InitXLOGAccess(void)
{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+
/* ThisTimeLineID doesn't change so we need no lock to copy it */
ThisTimeLineID = XLogCtl->ThisTimeLineID;
Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
(void) GetRedoRecPtr();
+ /* Also update our copy of doPageWrites. */
+ doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
}
/*
@@ -7640,6 +7223,21 @@ GetRedoRecPtr(void)
}
/*
+ * Return information needed to decide whether a modified block needs a
+ * full-page image to be included in the WAL record.
+ *
+ * The returned values are cached copies from backend-private memory, and
+ * possibly out-of-date. XLogInsertRecord will re-check them against
+ * up-to-date values, while holding the WAL insert lock.
+ */
+void
+GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
+{
+ *RedoRecPtr_p = RedoRecPtr;
+ *doPageWrites_p = doPageWrites;
+}
+
+/*
* GetInsertRecPtr -- Returns the current insert position.
*
* NOTE: The value *actually* returned is the position of the last full
@@ -8793,218 +8391,6 @@ XLogRestorePoint(const char *rpName)
}
/*
- * Write a backup block if needed when we are setting a hint. Note that
- * this may be called for a variety of page types, not just heaps.
- *
- * Callable while holding just share lock on the buffer content.
- *
- * We can't use the plain backup block mechanism since that relies on the
- * Buffer being exclusively locked. Since some modifications (setting LSN, hint
- * bits) are allowed in a sharelocked buffer that can lead to wal checksum
- * failures. So instead we copy the page and insert the copied data as normal
- * record data.
- *
- * We only need to do something if page has not yet been full page written in
- * this checkpoint round. The LSN of the inserted wal record is returned if we
- * had to write, InvalidXLogRecPtr otherwise.
- *
- * It is possible that multiple concurrent backends could attempt to write WAL
- * records. In that case, multiple copies of the same block would be recorded
- * in separate WAL records by different backends, though that is still OK from
- * a correctness perspective.
- */
-XLogRecPtr
-XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
-{
- XLogRecPtr recptr = InvalidXLogRecPtr;
- XLogRecPtr lsn;
- XLogRecData rdata[2];
- BkpBlock bkpb;
-
- /*
- * Ensure no checkpoint can change our view of RedoRecPtr.
- */
- Assert(MyPgXact->delayChkpt);
-
- /*
- * Update RedoRecPtr so XLogCheckBuffer can make the right decision
- */
- GetRedoRecPtr();
-
- /*
- * Setup phony rdata element for use within XLogCheckBuffer only. We reuse
- * and reset rdata for any actual WAL record insert.
- */
- rdata[0].buffer = buffer;
- rdata[0].buffer_std = buffer_std;
-
- /*
- * Check buffer while not holding an exclusive lock.
- */
- if (XLogCheckBuffer(rdata, false, &lsn, &bkpb))
- {
- char copied_buffer[BLCKSZ];
- char *origdata = (char *) BufferGetBlock(buffer);
-
- /*
- * Copy buffer so we don't have to worry about concurrent hint bit or
- * lsn updates. We assume pd_lower/upper cannot be changed without an
- * exclusive lock, so the contents bkp are not racy.
- *
- * With buffer_std set to false, XLogCheckBuffer() sets hole_length
- * and hole_offset to 0; so the following code is safe for either
- * case.
- */
- memcpy(copied_buffer, origdata, bkpb.hole_offset);
- memcpy(copied_buffer + bkpb.hole_offset,
- origdata + bkpb.hole_offset + bkpb.hole_length,
- BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
-
- /*
- * Header for backup block.
- */
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- /*
- * Save copy of the buffer.
- */
- rdata[1].data = copied_buffer;
- rdata[1].len = BLCKSZ - bkpb.hole_length;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
- }
-
- return recptr;
-}
-
-/*
- * Write a WAL record containing a full image of a page. Caller is responsible
- * for writing the page to disk after calling this routine.
- *
- * Note: If you're using this function, you should be building pages in private
- * memory and writing them directly to smgr. If you're using buffers, call
- * log_newpage_buffer instead.
- *
- * If the page follows the standard page layout, with a PageHeader and unused
- * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
- * the unused space to be left out from the WAL record, making it smaller.
- */
-XLogRecPtr
-log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
- Page page, bool page_std)
-{
- BkpBlock bkpb;
- XLogRecPtr recptr;
- XLogRecData rdata[3];
-
- /* NO ELOG(ERROR) from here till newpage op is logged */
- START_CRIT_SECTION();
-
- bkpb.node = *rnode;
- bkpb.fork = forkNum;
- bkpb.block = blkno;
-
- if (page_std)
- {
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb.hole_offset = lower;
- bkpb.hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
-
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- if (bkpb.hole_length == 0)
- {
- rdata[1].data = (char *) page;
- rdata[1].len = BLCKSZ;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdata[1].data = (char *) page;
- rdata[1].len = bkpb.hole_offset;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &rdata[2];
-
- rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
- }
-
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
-
- /*
- * The page may be uninitialized. If so, we can't set the LSN because that
- * would corrupt the page.
- */
- if (!PageIsNew(page))
- {
- PageSetLSN(page, recptr);
- }
-
- END_CRIT_SECTION();
-
- return recptr;
-}
-
-/*
- * Write a WAL record containing a full image of a page.
- *
- * Caller should initialize the buffer and mark it dirty before calling this
- * function. This function will set the page LSN.
- *
- * If the page follows the standard page layout, with a PageHeader and unused
- * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
- * the unused space to be left out from the WAL record, making it smaller.
- */
-XLogRecPtr
-log_newpage_buffer(Buffer buffer, bool page_std)
-{
- Page page = BufferGetPage(buffer);
- RelFileNode rnode;
- ForkNumber forkNum;
- BlockNumber blkno;
-
- /* Shared buffers should be modified in a critical section. */
- Assert(CritSectionCount > 0);
-
- BufferGetTag(buffer, &rnode, &forkNum, &blkno);
-
- return log_newpage(&rnode, forkNum, blkno, page, page_std);
-}
-
-/*
* Check if any of the GUC parameters that are critical for hot standby
* have changed, and update the value in pg_control file if necessary.
*/
@@ -9757,7 +9143,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
* the standby.
*
* We must hold all the insertion locks to change the value of
- * forcePageWrites, to ensure adequate interlocking against XLogInsert().
+ * forcePageWrites, to ensure adequate interlocking against
+ * XLogInsertRecord().
*/
WALInsertLockAcquireExclusive();
if (exclusive)