aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2012-10-09 19:20:17 +0300
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2012-10-09 19:20:17 +0300
commitff8f160bf4322c4294deaa0e64ed26467283d525 (patch)
treef2726ac24e503a6cae917d33a26482aa508fe5a5 /src/backend/access/transam/xlog.c
parentf7491616a93e02856a44d4bd3b026610fe58704e (diff)
downloadpostgresql-ff8f160bf4322c4294deaa0e64ed26467283d525.tar.gz
postgresql-ff8f160bf4322c4294deaa0e64ed26467283d525.zip
Put the logic to wait for WAL in standby mode to a separate function.
This is just refactoring with no user-visible effect, to make the code more readable.
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c429
1 files changed, 222 insertions, 207 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3e4da46ac24..17ceda3b1ad 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -157,6 +157,9 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec;
+/* Local copy of WalRcv->receivedUpto */
+static XLogRecPtr receivedUpto = 0;
+
/*
* During recovery, lastFullPageWrites keeps track of full_page_writes that
* the replayed WAL records indicate. It's initialized with full_page_writes
@@ -538,6 +541,7 @@ static int readFile = -1;
static XLogSegNo readSegNo = 0;
static uint32 readOff = 0;
static uint32 readLen = 0;
+static bool readFileHeaderValidated = false;
static int readSource = 0; /* XLOG_FROM_* code */
/*
@@ -628,6 +632,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess);
+static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
+ bool fetching_ckpt);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
static void XLogFileClose(void);
static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -2685,6 +2691,9 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp();
+ /* The file header needs to be validated on first access */
+ readFileHeaderValidated = false;
+
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -9233,12 +9242,9 @@ static bool
XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess)
{
- static XLogRecPtr receivedUpto = 0;
- bool switched_segment = false;
uint32 targetPageOff;
uint32 targetRecOff;
XLogSegNo targetSegNo;
- static pg_time_t last_fail_time = 0;
XLByteToSeg(*RecPtr, targetSegNo);
targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
@@ -9283,208 +9289,9 @@ retry:
{
if (StandbyMode)
{
- /*
- * In standby mode, wait for the requested record to become
- * available, either via restore_command succeeding to restore the
- * segment, or via walreceiver having streamed the record.
- */
- for (;;)
- {
- if (WalRcvInProgress())
- {
- bool havedata;
-
- /*
- * If we find an invalid record in the WAL streamed from
- * master, something is seriously wrong. There's little
- * chance that the problem will just go away, but PANIC is
- * not good for availability either, especially in hot
- * standby mode. Disconnect, and retry from
- * archive/pg_xlog again. The WAL in the archive should be
- * identical to what was streamed, so it's unlikely that
- * it helps, but one can hope...
- */
- if (failedSources & XLOG_FROM_STREAM)
- {
- ShutdownWalRcv();
- continue;
- }
-
- /*
- * Walreceiver is active, so see if new data has arrived.
- *
- * We only advance XLogReceiptTime when we obtain fresh
- * WAL from walreceiver and observe that we had already
- * processed everything before the most recent "chunk"
- * that it flushed to disk. In steady state where we are
- * keeping up with the incoming data, XLogReceiptTime will
- * be updated on each cycle. When we are behind,
- * XLogReceiptTime will not advance, so the grace time
- * alloted to conflicting queries will decrease.
- */
- if (XLByteLT(*RecPtr, receivedUpto))
- havedata = true;
- else
- {
- XLogRecPtr latestChunkStart;
-
- receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
- if (XLByteLT(*RecPtr, receivedUpto))
- {
- havedata = true;
- if (!XLByteLT(*RecPtr, latestChunkStart))
- {
- XLogReceiptTime = GetCurrentTimestamp();
- SetCurrentChunkStartTime(XLogReceiptTime);
- }
- }
- else
- havedata = false;
- }
- if (havedata)
- {
- /*
- * Great, streamed far enough. Open the file if it's
- * not open already. Use XLOG_FROM_STREAM so that
- * source info is set correctly and XLogReceiptTime
- * isn't changed.
- */
- if (readFile < 0)
- {
- readFile =
- XLogFileRead(readSegNo, PANIC,
- recoveryTargetTLI,
- XLOG_FROM_STREAM, false);
- Assert(readFile >= 0);
- switched_segment = true;
- }
- else
- {
- /* just make sure source info is correct... */
- readSource = XLOG_FROM_STREAM;
- XLogReceiptSource = XLOG_FROM_STREAM;
- }
- break;
- }
-
- /*
- * Data not here yet, so check for trigger then sleep for
- * five seconds like in the WAL file polling case below.
- */
- if (CheckForStandbyTrigger())
- goto retry;
-
- /*
- * Wait for more WAL to arrive, or timeout to be reached
- */
- WaitLatch(&XLogCtl->recoveryWakeupLatch,
- WL_LATCH_SET | WL_TIMEOUT,
- 5000L);
- ResetLatch(&XLogCtl->recoveryWakeupLatch);
- }
- else
- {
- int sources;
- pg_time_t now;
-
- /*
- * Until walreceiver manages to reconnect, poll the
- * archive.
- */
- if (readFile >= 0)
- {
- close(readFile);
- readFile = -1;
- }
- /* Reset curFileTLI if random fetch. */
- if (randAccess)
- curFileTLI = 0;
-
- /*
- * Try to restore the file from archive, or read an
- * existing file from pg_xlog.
- */
- sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
- if (!(sources & ~failedSources))
- {
- /*
- * We've exhausted all options for retrieving the
- * file. Retry.
- */
- failedSources = 0;
-
- /*
- * Before we sleep, re-scan for possible new timelines
- * if we were requested to recover to the latest
- * timeline.
- */
- if (recoveryTargetIsLatest)
- {
- if (rescanLatestTimeLine())
- continue;
- }
-
- /*
- * If it hasn't been long since last attempt, sleep to
- * avoid busy-waiting.
- */
- now = (pg_time_t) time(NULL);
- if ((now - last_fail_time) < 5)
- {
- pg_usleep(1000000L * (5 - (now - last_fail_time)));
- now = (pg_time_t) time(NULL);
- }
- last_fail_time = now;
-
- /*
- * If primary_conninfo is set, launch walreceiver to
- * try to stream the missing WAL, before retrying to
- * restore from archive/pg_xlog.
- *
- * If fetching_ckpt is TRUE, RecPtr points to the
- * initial checkpoint location. In that case, we use
- * RedoStartLSN as the streaming start position
- * instead of RecPtr, so that when we later jump
- * backwards to start redo at RedoStartLSN, we will
- * have the logs streamed already.
- */
- if (PrimaryConnInfo)
- {
- RequestXLogStreaming(
- fetching_ckpt ? RedoStartLSN : *RecPtr,
- PrimaryConnInfo);
- continue;
- }
- }
- /* Don't try to read from a source that just failed */
- sources &= ~failedSources;
- readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
- sources);
- switched_segment = true;
- if (readFile >= 0)
- break;
-
- /*
- * Nope, not found in archive and/or pg_xlog.
- */
- failedSources |= sources;
-
- /*
- * Check to see if the trigger file exists. Note that we
- * do this only after failure, so when you create the
- * trigger file, we still finish replaying as much as we
- * can from archive and pg_xlog before failover.
- */
- if (CheckForStandbyTrigger())
- goto triggered;
- }
-
- /*
- * This possibly-long loop needs to handle interrupts of
- * startup process.
- */
- HandleStartupProcInterrupts();
- }
+ if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
+ fetching_ckpt))
+ goto triggered;
}
else
{
@@ -9502,7 +9309,6 @@ retry:
sources |= XLOG_FROM_ARCHIVE;
readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
- switched_segment = true;
if (readFile < 0)
return false;
}
@@ -9533,7 +9339,7 @@ retry:
else
readLen = XLOG_BLCKSZ;
- if (switched_segment && targetPageOff != 0)
+ if (!readFileHeaderValidated && targetPageOff != 0)
{
/*
* Whenever switching to a new WAL segment, we read the first page of
@@ -9582,6 +9388,8 @@ retry:
if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
+ readFileHeaderValidated = true;
+
Assert(targetSegNo == readSegNo);
Assert(targetPageOff == readOff);
Assert(targetRecOff < readLen);
@@ -9614,6 +9422,213 @@ triggered:
}
/*
+ * In standby mode, wait for the requested record to become available, either
+ * via restore_command succeeding to restore the segment, or via walreceiver
+ * having streamed the record (or via someone copying the segment directly to
+ * pg_xlog, but that is not documented or recommended).
+ *
+ * When the requested record becomes available, the function opens the file
+ * containing it (if not open already), and returns true. When end of standby
+ * mode is triggered by the user, and there is no more WAL available, returns
+ * false.
+ */
+static bool
+WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
+ bool fetching_ckpt)
+{
+ static pg_time_t last_fail_time = 0;
+
+ for (;;)
+ {
+ if (WalRcvInProgress())
+ {
+ bool havedata;
+
+ /*
+ * If we find an invalid record in the WAL streamed from master,
+ * something is seriously wrong. There's little chance that the
+ * problem will just go away, but PANIC is not good for
+ * availability either, especially in hot standby mode.
+ * Disconnect, and retry from archive/pg_xlog again. The WAL in
+ * the archive should be identical to what was streamed, so it's
+ * unlikely that it helps, but one can hope...
+ */
+ if (failedSources & XLOG_FROM_STREAM)
+ {
+ ShutdownWalRcv();
+ continue;
+ }
+
+ /*
+ * Walreceiver is active, so see if new data has arrived.
+ *
+ * We only advance XLogReceiptTime when we obtain fresh WAL from
+ * walreceiver and observe that we had already processed
+ * everything before the most recent "chunk" that it flushed to
+ * disk. In steady state where we are keeping up with the
+ * incoming data, XLogReceiptTime will be updated on each cycle.
+ * When we are behind, XLogReceiptTime will not advance, so the
+ * grace time allotted to conflicting queries will decrease.
+ */
+ if (XLByteLT(RecPtr, receivedUpto))
+ havedata = true;
+ else
+ {
+ XLogRecPtr latestChunkStart;
+
+ receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+ if (XLByteLT(RecPtr, receivedUpto))
+ {
+ havedata = true;
+ if (!XLByteLT(RecPtr, latestChunkStart))
+ {
+ XLogReceiptTime = GetCurrentTimestamp();
+ SetCurrentChunkStartTime(XLogReceiptTime);
+ }
+ }
+ else
+ havedata = false;
+ }
+ if (havedata)
+ {
+ /*
+ * Great, streamed far enough. Open the file if it's not open
+ * already. Use XLOG_FROM_STREAM so that source info is set
+ * correctly and XLogReceiptTime isn't changed.
+ */
+ if (readFile < 0)
+ {
+ readFile = XLogFileRead(readSegNo, PANIC,
+ curFileTLI,
+ XLOG_FROM_STREAM, false);
+ Assert(readFile >= 0);
+ }
+ else
+ {
+ /* just make sure source info is correct... */
+ readSource = XLOG_FROM_STREAM;
+ XLogReceiptSource = XLOG_FROM_STREAM;
+ }
+ break;
+ }
+
+ /*
+ * Data not here yet, so check for trigger then sleep for five
+ * seconds like in the WAL file polling case below.
+ */
+ if (CheckForStandbyTrigger())
+ return false;
+
+ /*
+ * Wait for more WAL to arrive, or timeout to be reached
+ */
+ WaitLatch(&XLogCtl->recoveryWakeupLatch,
+ WL_LATCH_SET | WL_TIMEOUT,
+ 5000L);
+ ResetLatch(&XLogCtl->recoveryWakeupLatch);
+ }
+ else
+ {
+ /*
+ * WAL receiver is not active. Poll the archive.
+ */
+ int sources;
+ pg_time_t now;
+
+ if (readFile >= 0)
+ {
+ close(readFile);
+ readFile = -1;
+ }
+ /* Reset curFileTLI if random fetch. */
+ if (randAccess)
+ curFileTLI = 0;
+
+ /*
+ * Try to restore the file from archive, or read an existing file
+ * from pg_xlog.
+ */
+ sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
+ if (!(sources & ~failedSources))
+ {
+ /*
+ * We've exhausted all options for retrieving the file. Retry.
+ */
+ failedSources = 0;
+
+ /*
+ * Before we sleep, re-scan for possible new timelines if we
+ * were requested to recover to the latest timeline.
+ */
+ if (recoveryTargetIsLatest)
+ {
+ if (rescanLatestTimeLine())
+ continue;
+ }
+
+ /*
+ * If it hasn't been long since last attempt, sleep to avoid
+ * busy-waiting.
+ */
+ now = (pg_time_t) time(NULL);
+ if ((now - last_fail_time) < 5)
+ {
+ pg_usleep(1000000L * (5 - (now - last_fail_time)));
+ now = (pg_time_t) time(NULL);
+ }
+ last_fail_time = now;
+
+ /*
+ * If primary_conninfo is set, launch walreceiver to try to
+ * stream the missing WAL, before retrying to restore from
+ * archive/pg_xlog.
+ *
+ * If fetching_ckpt is TRUE, RecPtr points to the initial
+ * checkpoint location. In that case, we use RedoStartLSN as
+ * the streaming start position instead of RecPtr, so that
+ * when we later jump backwards to start redo at RedoStartLSN,
+ * we will have the logs streamed already.
+ */
+ if (PrimaryConnInfo)
+ {
+ XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+
+ RequestXLogStreaming(ptr, PrimaryConnInfo);
+ continue;
+ }
+ }
+ /* Don't try to read from a source that just failed */
+ sources &= ~failedSources;
+ readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
+ if (readFile >= 0)
+ break;
+
+ /*
+ * Nope, not found in archive and/or pg_xlog.
+ */
+ failedSources |= sources;
+
+ /*
+ * Check to see if the trigger file exists. Note that we do this
+ * only after failure, so when you create the trigger file, we
+ * still finish replaying as much as we can from archive and
+ * pg_xlog before failover.
+ */
+ if (CheckForStandbyTrigger())
+ return false;
+ }
+
+ /*
+ * This possibly-long loop needs to handle interrupts of startup
+ * process.
+ */
+ HandleStartupProcInterrupts();
+ }
+
+ return true;
+}
+
+/*
* Determine what log level should be used to report a corrupt WAL record
* in the current WAL page, previously read by XLogPageRead().
*