aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-06 13:52:08 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-06 13:55:36 +0200
commit2076db2aea766c4c828dccc34ae35f614129000d (patch)
tree5004b943d2014fdf3c2d4bed820fe550c5013c96 /src/backend/access/transam/xlog.c
parentd2b8a2c7ec1098e7b98160ccdc0e3a513964fb08 (diff)
downloadpostgresql-2076db2aea766c4c828dccc34ae35f614129000d.tar.gz
postgresql-2076db2aea766c4c828dccc34ae35f614129000d.zip
Move the backup-block logic from XLogInsert to a new file, xloginsert.c.
xlog.c is huge, this makes it a little bit smaller, which is nice. Functions related to putting together the WAL record are in xloginsert.c, and the lower level stuff for managing WAL buffers and such are in xlog.c. Also move the definition of XLogRecord to a separate header file. This causes churn in the #includes of all the files that write WAL records, and redo routines, but it avoids pulling in xlog.h into most places. Reviewed by Michael Paquier, Alvaro Herrera, Andres Freund and Amit Kapila.
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c761
1 files changed, 74 insertions, 687 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3160db72458..563d442a7a3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -31,6 +31,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
@@ -300,14 +301,21 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold an insertion lock). See XLogInsert for details. We are also allowed
- * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
+ * hold an insertion lock). See XLogInsertRecord for details. We are also
+ * allowed to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
*/
static XLogRecPtr RedoRecPtr;
/*
+ * doPageWrites is this backend's local copy of (forcePageWrites ||
+ * fullPageWrites). It is used together with RedoRecPtr to decide whether
+ * a full-page image of a page need to be taken.
+ */
+static bool doPageWrites;
+
+/*
* RedoStartLSN points to the checkpoint's REDO location which is specified
* in a backup label file, backup history file or control file. In standby
* mode, XLOG streaming usually starts from the position where an invalid
@@ -419,7 +427,7 @@ typedef union WALInsertLockPadded
} WALInsertLockPadded;
/*
- * Shared state data for XLogInsert.
+ * Shared state data for WAL insertion.
*/
typedef struct XLogCtlInsert
{
@@ -765,10 +773,6 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
-static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
- XLogRecPtr *lsn, BkpBlock *bkpb);
-static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
- char *blk, bool get_cleanup_lock, bool keep_buffer);
static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
@@ -831,226 +835,45 @@ static void WALInsertLockRelease(void);
static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
/*
- * Insert an XLOG record having the specified RMID and info bytes,
- * with the body of the record being the data chunk(s) described by
- * the rdata chain (see xlog.h for notes about rdata).
+ * Insert an XLOG record represented by an already-constructed chain of data
+ * chunks. This is a low-level routine; to construct the WAL record header
+ * and data, use the higher-level routines in xloginsert.c.
+ *
+ * If 'fpw_lsn' is valid, it is the oldest LSN among the pages that this
+ * WAL record applies to, that were not included in the record as full page
+ * images. If fpw_lsn >= RedoRecPtr, the function does not perform the
+ * insertion and returns InvalidXLogRecPtr. The caller can then recalculate
+ * which pages need a full-page image, and retry. If fpw_lsn is invalid, the
+ * record is always inserted.
+ *
+ * The first XLogRecData in the chain must be for the record header, and its
+ * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and
+ * xl_crc fields in the header, the rest of the header must already be filled
+ * by the caller.
*
* Returns XLOG pointer to end of record (beginning of next record).
* This can be used as LSN for data pages affected by the logged action.
* (LSN is the XLOG point up to which the XLOG must be flushed to disk
* before the data page can be written out. This implements the basic
* WAL rule "write the log before the data".)
- *
- * NB: this routine feels free to scribble on the XLogRecData structs,
- * though not on the data they reference. This is OK since the XLogRecData
- * structs are always just temporaries in the calling code.
*/
XLogRecPtr
-XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
+XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecData *rdt;
- XLogRecData *rdt_lastnormal;
- Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
- bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
- BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
- XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
- XLogRecData hdr_rdt;
pg_crc32 rdata_crc;
- uint32 len,
- write_len;
- unsigned i;
- bool doPageWrites;
- bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
bool inserted;
- uint8 info_orig = info;
- static XLogRecord *rechdr;
+ XLogRecord *rechdr = (XLogRecord *) rdata->data;
+ bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
+ rechdr->xl_info == XLOG_SWITCH);
XLogRecPtr StartPos;
XLogRecPtr EndPos;
- if (rechdr == NULL)
- {
- static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
-
- rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
- MemSet(rechdr, 0, SizeOfXLogRecord);
- }
-
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
- /* info's high bits are reserved for use by me */
- if (info & XLR_INFO_MASK)
- elog(PANIC, "invalid xlog info mask %02X", info);
-
- TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
-
- /*
- * In bootstrap mode, we don't actually log anything but XLOG resources;
- * return a phony record pointer.
- */
- if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
- {
- EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
- return EndPos;
- }
-
- /*
- * Here we scan the rdata chain, to determine which buffers must be backed
- * up.
- *
- * We may have to loop back to here if a race condition is detected below.
- * We could prevent the race by doing all this work while holding an
- * insertion lock, but it seems better to avoid doing CRC calculations
- * while holding one.
- *
- * We add entries for backup blocks to the chain, so that they don't need
- * any special treatment in the critical section where the chunks are
- * copied into the WAL buffers. Those entries have to be unlinked from the
- * chain if we have to loop back here.
- */
-begin:;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- dtbuf[i] = InvalidBuffer;
- dtbuf_bkp[i] = false;
- }
-
- /*
- * Decide if we need to do full-page writes in this XLOG record: true if
- * full_page_writes is on or we have a PITR request for it. Since we
- * don't yet have an insertion lock, fullPageWrites and forcePageWrites
- * could change under us, but we'll recheck them once we have a lock.
- */
- doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
-
- len = 0;
- for (rdt = rdata;;)
- {
- if (rdt->buffer == InvalidBuffer)
- {
- /* Simple data, just include it */
- len += rdt->len;
- }
- else
- {
- /* Find info for buffer */
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (rdt->buffer == dtbuf[i])
- {
- /* Buffer already referenced by earlier chain item */
- if (dtbuf_bkp[i])
- {
- rdt->data = NULL;
- rdt->len = 0;
- }
- else if (rdt->data)
- len += rdt->len;
- break;
- }
- if (dtbuf[i] == InvalidBuffer)
- {
- /* OK, put it in this slot */
- dtbuf[i] = rdt->buffer;
- if (doPageWrites && XLogCheckBuffer(rdt, true,
- &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
- {
- dtbuf_bkp[i] = true;
- rdt->data = NULL;
- rdt->len = 0;
- }
- else if (rdt->data)
- len += rdt->len;
- break;
- }
- }
- if (i >= XLR_MAX_BKP_BLOCKS)
- elog(PANIC, "can backup at most %d blocks per xlog record",
- XLR_MAX_BKP_BLOCKS);
- }
- /* Break out of loop when rdt points to last chain item */
- if (rdt->next == NULL)
- break;
- rdt = rdt->next;
- }
-
- /*
- * NOTE: We disallow len == 0 because it provides a useful bit of extra
- * error checking in ReadRecord. This means that all callers of
- * XLogInsert must supply at least some not-in-a-buffer data. However, we
- * make an exception for XLOG SWITCH records because we don't want them to
- * ever cross a segment boundary.
- */
- if (len == 0 && !isLogSwitch)
- elog(PANIC, "invalid xlog record length %u", len);
-
- /*
- * Make additional rdata chain entries for the backup blocks, so that we
- * don't need to special-case them in the write loop. This modifies the
- * original rdata chain, but we keep a pointer to the last regular entry,
- * rdt_lastnormal, so that we can undo this if we have to loop back to the
- * beginning.
- *
- * At the exit of this loop, write_len includes the backup block data.
- *
- * Also set the appropriate info bits to show which buffers were backed
- * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
- * value (ignoring InvalidBuffer) appearing in the rdata chain.
- */
- rdt_lastnormal = rdt;
- write_len = len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- BkpBlock *bkpb;
- char *page;
-
- if (!dtbuf_bkp[i])
- continue;
-
- info |= XLR_BKP_BLOCK(i);
-
- bkpb = &(dtbuf_xlg[i]);
- page = (char *) BufferGetBlock(dtbuf[i]);
-
- rdt->next = &(dtbuf_rdt1[i]);
- rdt = rdt->next;
-
- rdt->data = (char *) bkpb;
- rdt->len = sizeof(BkpBlock);
- write_len += sizeof(BkpBlock);
-
- rdt->next = &(dtbuf_rdt2[i]);
- rdt = rdt->next;
-
- if (bkpb->hole_length == 0)
- {
- rdt->data = page;
- rdt->len = BLCKSZ;
- write_len += BLCKSZ;
- rdt->next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdt->data = page;
- rdt->len = bkpb->hole_offset;
- write_len += bkpb->hole_offset;
-
- rdt->next = &(dtbuf_rdt3[i]);
- rdt = rdt->next;
-
- rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
- rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
- write_len += rdt->len;
- rdt->next = NULL;
- }
- }
-
/*
* Calculate CRC of the data, including all the backup blocks
*
@@ -1060,29 +883,15 @@ begin:;
* header.
*/
INIT_CRC32C(rdata_crc);
- for (rdt = rdata; rdt != NULL; rdt = rdt->next)
+ for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
- * Construct record header (prev-link is filled in later, after reserving
- * the space for the record), and make that the first chunk in the chain.
- *
- * The CRC calculated for the header here doesn't include prev-link,
- * because we don't know it yet. It will be added later.
- */
- rechdr->xl_xid = GetCurrentTransactionIdIfAny();
- rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
- rechdr->xl_len = len; /* doesn't include backup blocks */
- rechdr->xl_info = info;
- rechdr->xl_rmid = rmid;
- rechdr->xl_prev = InvalidXLogRecPtr;
+ * Calculate CRC of the header, except for prev-link, because we don't
+ * know it yet. It will be added later.
+ */
COMP_CRC32C(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
- hdr_rdt.next = rdata;
- hdr_rdt.data = (char *) rechdr;
- hdr_rdt.len = SizeOfXLogRecord;
- write_len += SizeOfXLogRecord;
-
/*----------
*
* We have now done all the preparatory work we can without holding a
@@ -1122,56 +931,33 @@ begin:;
WALInsertLockAcquire();
/*
- * Check to see if my RedoRecPtr is out of date. If so, may have to go
- * back and recompute everything. This can only happen just after a
- * checkpoint, so it's better to be slow in this case and fast otherwise.
+ * Check to see if my copy of RedoRecPtr or doPageWrites is out of date.
+ * If so, may have to go back and have the caller recompute everything.
+ * This can only happen just after a checkpoint, so it's better to be
+ * slow in this case and fast otherwise.
*
* If we aren't doing full-page writes then RedoRecPtr doesn't actually
* affect the contents of the XLOG record, so we'll update our local copy
- * but not force a recomputation.
+ * but not force a recomputation. (If doPageWrites was just turned off,
+ * we could recompute the record without full pages, but we choose not
+ * to bother.)
*/
if (RedoRecPtr != Insert->RedoRecPtr)
{
Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
-
- if (doPageWrites)
- {
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (dtbuf[i] == InvalidBuffer)
- continue;
- if (dtbuf_bkp[i] == false &&
- dtbuf_lsn[i] <= RedoRecPtr)
- {
- /*
- * Oops, this buffer now needs to be backed up, but we
- * didn't think so above. Start over.
- */
- WALInsertLockRelease();
- END_CRIT_SECTION();
- rdt_lastnormal->next = NULL;
- info = info_orig;
- goto begin;
- }
- }
- }
}
+ doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
- /*
- * Also check to see if fullPageWrites or forcePageWrites was just turned
- * on; if we weren't already doing full-page writes then go back and
- * recompute. (If it was just turned off, we could recompute the record
- * without full pages, but we choose not to bother.)
- */
- if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
+ if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr && doPageWrites)
{
- /* Oops, must redo it with full-page data. */
+ /*
+ * Oops, some buffer now needs to be backed up that the caller
+ * didn't back up. Start over.
+ */
WALInsertLockRelease();
END_CRIT_SECTION();
- rdt_lastnormal->next = NULL;
- info = info_orig;
- goto begin;
+ return InvalidXLogRecPtr;
}
/*
@@ -1182,7 +968,7 @@ begin:;
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
- ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+ ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
@@ -1201,7 +987,8 @@ begin:;
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
- CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+ CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
+ StartPos, EndPos);
}
else
{
@@ -1437,7 +1224,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
}
/*
- * Subroutine of XLogInsert. Copies a WAL record to an already-reserved
+ * Subroutine of XLogInsertRecord. Copies a WAL record to an already-reserved
* area in the WAL.
*/
static void
@@ -2004,93 +1791,6 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
}
/*
- * Determine whether the buffer referenced has to be backed up.
- *
- * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
- * could change later, so the result should be used for optimization purposes
- * only.
- */
-bool
-XLogCheckBufferNeedsBackup(Buffer buffer)
-{
- bool doPageWrites;
- Page page;
-
- page = BufferGetPage(buffer);
-
- doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
-
- if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
- return true; /* buffer requires backup */
-
- return false; /* buffer does not need to be backed up */
-}
-
-/*
- * Determine whether the buffer referenced by an XLogRecData item has to
- * be backed up, and if so fill a BkpBlock struct for it. In any case
- * save the buffer's LSN at *lsn.
- */
-static bool
-XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
- XLogRecPtr *lsn, BkpBlock *bkpb)
-{
- Page page;
-
- page = BufferGetPage(rdata->buffer);
-
- /*
- * We assume page LSN is first data on *every* page that can be passed to
- * XLogInsert, whether it has the standard page layout or not. We don't
- * need to take the buffer header lock for PageGetLSN if we hold an
- * exclusive lock on the page and/or the relation.
- */
- if (holdsExclusiveLock)
- *lsn = PageGetLSN(page);
- else
- *lsn = BufferGetLSNAtomic(rdata->buffer);
-
- if (*lsn <= RedoRecPtr)
- {
- /*
- * The page needs to be backed up, so set up *bkpb
- */
- BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
-
- if (rdata->buffer_std)
- {
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb->hole_offset = lower;
- bkpb->hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
- }
-
- return true; /* buffer requires backup */
- }
-
- return false; /* buffer does not need to be backed up */
-}
-
-/*
* Initialize XLOG buffers, writing out old buffers if they still contain
* unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
* true, initialize as many pages as we can without having to write out
@@ -3944,128 +3644,6 @@ CleanupBackupHistory(void)
}
/*
- * Restore a full-page image from a backup block attached to an XLOG record.
- *
- * lsn: LSN of the XLOG record being replayed
- * record: the complete XLOG record
- * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1)
- * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock
- * keep_buffer: TRUE to return the buffer still locked and pinned
- *
- * Returns the buffer number containing the page. Note this is not terribly
- * useful unless keep_buffer is specified as TRUE.
- *
- * Note: when a backup block is available in XLOG, we restore it
- * unconditionally, even if the page in the database appears newer.
- * This is to protect ourselves against database pages that were partially
- * or incorrectly written during a crash. We assume that the XLOG data
- * must be good because it has passed a CRC check, while the database
- * page might not be. This will force us to replay all subsequent
- * modifications of the page that appear in XLOG, rather than possibly
- * ignoring them as already applied, but that's not a huge drawback.
- *
- * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer,
- * else a normal exclusive lock is used. During crash recovery, that's just
- * pro forma because there can't be any regular backends in the system, but
- * in hot standby mode the distinction is important.
- *
- * If 'keep_buffer' is true, return without releasing the buffer lock and pin;
- * then caller is responsible for doing UnlockReleaseBuffer() later. This
- * is needed in some cases when replaying XLOG records that touch multiple
- * pages, to prevent inconsistent states from being visible to other backends.
- * (Again, that's only important in hot standby mode.)
- */
-Buffer
-RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
- bool get_cleanup_lock, bool keep_buffer)
-{
- BkpBlock bkpb;
- char *blk;
- int i;
-
- /* Locate requested BkpBlock in the record */
- blk = (char *) XLogRecGetData(record) + record->xl_len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- memcpy(&bkpb, blk, sizeof(BkpBlock));
- blk += sizeof(BkpBlock);
-
- if (i == block_index)
- {
- /* Found it, apply the update */
- return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
- keep_buffer);
- }
-
- blk += BLCKSZ - bkpb.hole_length;
- }
-
- /* Caller specified a bogus block_index */
- elog(ERROR, "failed to restore block_index %d", block_index);
- return InvalidBuffer; /* keep compiler quiet */
-}
-
-/*
- * Workhorse for RestoreBackupBlock usable without an xlog record
- *
- * Restores a full-page image from BkpBlock and a data pointer.
- */
-static Buffer
-RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
- bool get_cleanup_lock, bool keep_buffer)
-{
- Buffer buffer;
- Page page;
-
- buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
- RBM_ZERO);
- Assert(BufferIsValid(buffer));
- if (get_cleanup_lock)
- LockBufferForCleanup(buffer);
- else
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
- page = (Page) BufferGetPage(buffer);
-
- if (bkpb.hole_length == 0)
- {
- memcpy((char *) page, blk, BLCKSZ);
- }
- else
- {
- memcpy((char *) page, blk, bkpb.hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
- memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
- blk + bkpb.hole_offset,
- BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
- }
-
- /*
- * The checksum value on this page is currently invalid. We don't need to
- * reset it here since it will be set before being written.
- */
-
- /*
- * The page may be uninitialized. If so, we can't set the LSN because that
- * would corrupt the page.
- */
- if (!PageIsNew(page))
- {
- PageSetLSN(page, lsn);
- }
- MarkBufferDirty(buffer);
-
- if (!keep_buffer)
- UnlockReleaseBuffer(buffer);
-
- return buffer;
-}
-
-/*
* Attempt to read an XLOG record.
*
* If RecPtr is not NULL, try to read a record at that position. Otherwise
@@ -6352,6 +5930,7 @@ StartupXLOG(void)
lastFullPageWrites = checkPoint.fullPageWrites;
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+ doPageWrites = lastFullPageWrites;
if (RecPtr < checkPoint.redo)
ereport(PANIC,
@@ -7606,12 +7185,16 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
void
InitXLOGAccess(void)
{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+
/* ThisTimeLineID doesn't change so we need no lock to copy it */
ThisTimeLineID = XLogCtl->ThisTimeLineID;
Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
(void) GetRedoRecPtr();
+ /* Also update our copy of doPageWrites. */
+ doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
}
/*
@@ -7640,6 +7223,21 @@ GetRedoRecPtr(void)
}
/*
+ * Return information needed to decide whether a modified block needs a
+ * full-page image to be included in the WAL record.
+ *
+ * The returned values are cached copies from backend-private memory, and
+ * possibly out-of-date. XLogInsertRecord will re-check them against
+ * up-to-date values, while holding the WAL insert lock.
+ */
+void
+GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
+{
+ *RedoRecPtr_p = RedoRecPtr;
+ *doPageWrites_p = doPageWrites;
+}
+
+/*
* GetInsertRecPtr -- Returns the current insert position.
*
* NOTE: The value *actually* returned is the position of the last full
@@ -8793,218 +8391,6 @@ XLogRestorePoint(const char *rpName)
}
/*
- * Write a backup block if needed when we are setting a hint. Note that
- * this may be called for a variety of page types, not just heaps.
- *
- * Callable while holding just share lock on the buffer content.
- *
- * We can't use the plain backup block mechanism since that relies on the
- * Buffer being exclusively locked. Since some modifications (setting LSN, hint
- * bits) are allowed in a sharelocked buffer that can lead to wal checksum
- * failures. So instead we copy the page and insert the copied data as normal
- * record data.
- *
- * We only need to do something if page has not yet been full page written in
- * this checkpoint round. The LSN of the inserted wal record is returned if we
- * had to write, InvalidXLogRecPtr otherwise.
- *
- * It is possible that multiple concurrent backends could attempt to write WAL
- * records. In that case, multiple copies of the same block would be recorded
- * in separate WAL records by different backends, though that is still OK from
- * a correctness perspective.
- */
-XLogRecPtr
-XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
-{
- XLogRecPtr recptr = InvalidXLogRecPtr;
- XLogRecPtr lsn;
- XLogRecData rdata[2];
- BkpBlock bkpb;
-
- /*
- * Ensure no checkpoint can change our view of RedoRecPtr.
- */
- Assert(MyPgXact->delayChkpt);
-
- /*
- * Update RedoRecPtr so XLogCheckBuffer can make the right decision
- */
- GetRedoRecPtr();
-
- /*
- * Setup phony rdata element for use within XLogCheckBuffer only. We reuse
- * and reset rdata for any actual WAL record insert.
- */
- rdata[0].buffer = buffer;
- rdata[0].buffer_std = buffer_std;
-
- /*
- * Check buffer while not holding an exclusive lock.
- */
- if (XLogCheckBuffer(rdata, false, &lsn, &bkpb))
- {
- char copied_buffer[BLCKSZ];
- char *origdata = (char *) BufferGetBlock(buffer);
-
- /*
- * Copy buffer so we don't have to worry about concurrent hint bit or
- * lsn updates. We assume pd_lower/upper cannot be changed without an
- * exclusive lock, so the contents bkp are not racy.
- *
- * With buffer_std set to false, XLogCheckBuffer() sets hole_length
- * and hole_offset to 0; so the following code is safe for either
- * case.
- */
- memcpy(copied_buffer, origdata, bkpb.hole_offset);
- memcpy(copied_buffer + bkpb.hole_offset,
- origdata + bkpb.hole_offset + bkpb.hole_length,
- BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
-
- /*
- * Header for backup block.
- */
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- /*
- * Save copy of the buffer.
- */
- rdata[1].data = copied_buffer;
- rdata[1].len = BLCKSZ - bkpb.hole_length;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
- }
-
- return recptr;
-}
-
-/*
- * Write a WAL record containing a full image of a page. Caller is responsible
- * for writing the page to disk after calling this routine.
- *
- * Note: If you're using this function, you should be building pages in private
- * memory and writing them directly to smgr. If you're using buffers, call
- * log_newpage_buffer instead.
- *
- * If the page follows the standard page layout, with a PageHeader and unused
- * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
- * the unused space to be left out from the WAL record, making it smaller.
- */
-XLogRecPtr
-log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
- Page page, bool page_std)
-{
- BkpBlock bkpb;
- XLogRecPtr recptr;
- XLogRecData rdata[3];
-
- /* NO ELOG(ERROR) from here till newpage op is logged */
- START_CRIT_SECTION();
-
- bkpb.node = *rnode;
- bkpb.fork = forkNum;
- bkpb.block = blkno;
-
- if (page_std)
- {
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb.hole_offset = lower;
- bkpb.hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
-
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- if (bkpb.hole_length == 0)
- {
- rdata[1].data = (char *) page;
- rdata[1].len = BLCKSZ;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdata[1].data = (char *) page;
- rdata[1].len = bkpb.hole_offset;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &rdata[2];
-
- rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
- }
-
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
-
- /*
- * The page may be uninitialized. If so, we can't set the LSN because that
- * would corrupt the page.
- */
- if (!PageIsNew(page))
- {
- PageSetLSN(page, recptr);
- }
-
- END_CRIT_SECTION();
-
- return recptr;
-}
-
-/*
- * Write a WAL record containing a full image of a page.
- *
- * Caller should initialize the buffer and mark it dirty before calling this
- * function. This function will set the page LSN.
- *
- * If the page follows the standard page layout, with a PageHeader and unused
- * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
- * the unused space to be left out from the WAL record, making it smaller.
- */
-XLogRecPtr
-log_newpage_buffer(Buffer buffer, bool page_std)
-{
- Page page = BufferGetPage(buffer);
- RelFileNode rnode;
- ForkNumber forkNum;
- BlockNumber blkno;
-
- /* Shared buffers should be modified in a critical section. */
- Assert(CritSectionCount > 0);
-
- BufferGetTag(buffer, &rnode, &forkNum, &blkno);
-
- return log_newpage(&rnode, forkNum, blkno, page, page_std);
-}
-
-/*
* Check if any of the GUC parameters that are critical for hot standby
* have changed, and update the value in pg_control file if necessary.
*/
@@ -9757,7 +9143,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
* the standby.
*
* We must hold all the insertion locks to change the value of
- * forcePageWrites, to ensure adequate interlocking against XLogInsert().
+ * forcePageWrites, to ensure adequate interlocking against
+ * XLogInsertRecord().
*/
WALInsertLockAcquireExclusive();
if (exclusive)