aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c702
1 files changed, 401 insertions, 301 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1eb877e5fcb..60d40d4505b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -144,16 +144,6 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec;
/*
- * Are we doing recovery from XLOG stream? If so, we recover without using
- * offline XLOG archives even though InArchiveRecovery==true. This flag is
- * used only in standby mode.
- */
-static bool InStreamingRecovery = false;
-
-/* The current log page is partially-filled, and so needs to be read again? */
-static bool needReread = false;
-
-/*
* Local copy of SharedRecoveryInProgress variable. True actually means "not
* known, need to check the shared state".
*/
@@ -457,12 +447,16 @@ static uint32 openLogOff = 0;
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
- * will be just past that page.
+ * will be just past that page. readLen indicates how much of the current
+ * page has been read into readBuf.
*/
static int readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
+static uint32 readLen = 0;
+/* Is the currently open segment being streamed from primary? */
+static bool readStreamed = false;
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
@@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0;
/* State information for XLOG reading */
static XLogRecPtr ReadRecPtr; /* start of last record read */
static XLogRecPtr EndRecPtr; /* end+1 of last record read */
-static XLogRecord *nextRecord = NULL;
static TimeLineID lastPageTLI = 0;
static XLogRecPtr minRecoveryPoint; /* local copy of
@@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
-static int XLogFileRead(uint32 log, uint32 seg, int emode);
+static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+ bool fromArchive, bool notexistOk);
+static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
+ bool fromArchive);
+static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+ bool randAccess);
static void XLogFileClose(void);
static bool RestoreArchivedFile(char *path, const char *xlogfname,
const char *recovername, off_t expectedSize);
@@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
static List *readTimeLineHistory(TimeLineID targetTLI);
@@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
static void WriteControlFile(void);
static void ReadControlFile(void);
static char *str_time(pg_time_t tnow);
+static bool CheckForStandbyTrigger(void);
#ifdef WAL_DEBUG
static void xlog_outrec(StringInfo buf, XLogRecord *record);
@@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg)
/*
* Open a logfile segment for reading (during recovery).
+ *
+ * If fromArchive is true, the segment is retrieved from archive, otherwise
+ * it's read from pg_xlog.
*/
static int
-XLogFileRead(uint32 log, uint32 seg, int emode)
+XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+ bool fromArchive, bool notfoundOk)
{
- char path[MAXPGPATH];
char xlogfname[MAXFNAMELEN];
char activitymsg[MAXFNAMELEN + 16];
- ListCell *cell;
+ char path[MAXPGPATH];
int fd;
- /*
- * Loop looking for a suitable timeline ID: we might need to read any of
- * the timelines listed in expectedTLIs.
- *
- * We expect curFileTLI on entry to be the TLI of the preceding file in
- * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
- * to go backwards; this prevents us from picking up the wrong file when a
- * parent timeline extends to higher segment numbers than the child we
- * want to read.
- */
- foreach(cell, expectedTLIs)
- {
- TimeLineID tli = (TimeLineID) lfirst_int(cell);
-
- if (tli < curFileTLI)
- break; /* don't bother looking at too-old TLIs */
-
XLogFileName(xlogfname, tli, log, seg);
- if (InArchiveRecovery && !InStreamingRecovery)
+ if (fromArchive)
{
/* Report recovery progress in PS display */
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
@@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG",
XLogSegSize);
+ if (!restoredFromArchive)
+ return -1;
}
else
+ {
XLogFilePath(path, tli, log, seg);
+ restoredFromArchive = false;
+ }
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd >= 0)
@@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
return fd;
}
- if (errno != ENOENT) /* unexpected failure? */
+ if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
path, log, seg)));
+ return -1;
+}
+
+/*
+ * Open a logfile segment for reading (during recovery).
+ *
+ * This version searches for the segment with any TLI listed in expectedTLIs.
+ * If not in StandbyMode and fromArchive is true, the segment is also
+ * searched in pg_xlog if not found in archive.
+ */
+static int
+XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
+{
+ char path[MAXPGPATH];
+ ListCell *cell;
+ int fd;
+
+ /*
+ * Loop looking for a suitable timeline ID: we might need to read any of
+ * the timelines listed in expectedTLIs.
+ *
+ * We expect curFileTLI on entry to be the TLI of the preceding file in
+ * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
+ * to go backwards; this prevents us from picking up the wrong file when a
+ * parent timeline extends to higher segment numbers than the child we
+ * want to read.
+ */
+ foreach(cell, expectedTLIs)
+ {
+ TimeLineID tli = (TimeLineID) lfirst_int(cell);
+
+ if (tli < curFileTLI)
+ break; /* don't bother looking at too-old TLIs */
+
+ fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
+ if (fd != -1)
+ return fd;
+
+ /*
+ * If not in StandbyMode, fall back to searching pg_xlog. In
+ * StandbyMode we're streaming segments from the primary to pg_xlog,
+ * and we mustn't confuse the (possibly partial) segments in pg_xlog
+ * with complete segments ready to be applied. We rather wait for
+ * the records to arrive through streaming.
+ */
+ if (!StandbyMode && fromArchive)
+ {
+ fd = XLogFileRead(log, seg, emode, tli, false, true);
+ if (fd != -1)
+ return fd;
+ }
}
/* Couldn't find it. For simplicity, complain about front timeline */
@@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
* different filename that can't be confused with regular XLOG
* files.
*/
- if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
+ if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
@@ -3474,92 +3514,19 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
}
/*
- * Attempt to fetch an XLOG record.
- *
- * If RecPtr is not NULL, try to fetch a record at that position. Otherwise
- * try to fetch a record just after the last one previously read.
- *
- * In standby mode, if we failed in reading a valid record and are not doing
- * recovery from XLOG stream yet, we ignore the failure and start walreceiver
- * process to fetch the record from the primary. Otherwise, returns NULL,
- * or fails if emode is PANIC. (emode must be either PANIC or LOG.)
- *
- * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
- * this case, if we have to start XLOG streaming, we use RedoStartLSN as the
- * streaming start position instead of RecPtr.
- *
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
- */
-static XLogRecord *
-FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
-{
- if (StandbyMode && !InStreamingRecovery)
- {
- XLogRecord *record;
- XLogRecPtr startlsn;
- bool haveNextRecord = (nextRecord != NULL);
-
- /* An invalid record is OK here, so we set emode to DEBUG2 */
- record = ReadRecord(RecPtr, DEBUG2);
- if (record != NULL)
- return record;
-
- /*
- * Start XLOG streaming if there is no more valid records available
- * in the archive.
- *
- * We need to calculate the start position of XLOG streaming. If we
- * read a record in the middle of a segment which doesn't exist in
- * pg_xlog, we use the start of the segment as the start position.
- * That prevents a broken segment (i.e., with no records in the
- * first half of a segment) from being created by XLOG streaming,
- * which might cause trouble later on if the segment is e.g
- * archived.
- */
- startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
- if (startlsn.xrecoff % XLogSegSize != 0)
- {
- char xlogpath[MAXPGPATH];
- struct stat stat_buf;
- uint32 log;
- uint32 seg;
-
- XLByteToSeg(startlsn, log, seg);
- XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
-
- if (stat(xlogpath, &stat_buf) != 0)
- startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
- }
- RequestXLogStreaming(startlsn, PrimaryConnInfo);
-
- /* Needs to read the current page again if the next record is in it */
- needReread = haveNextRecord;
- nextRecord = NULL;
-
- InStreamingRecovery = true;
- ereport(LOG,
- (errmsg("starting streaming recovery at %X/%X",
- startlsn.xlogid, startlsn.xrecoff)));
- }
-
- return ReadRecord(RecPtr, emode);
-}
-
-/*
* Attempt to read an XLOG record.
*
* If RecPtr is not NULL, try to read a record at that position. Otherwise
* try to read a record just after the last one previously read.
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG or DEBUG2.)
+ * (emode must be either PANIC, LOG)
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
*/
static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
+ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
{
XLogRecord *record;
char *buffer;
@@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
bool randAccess = false;
uint32 len,
total_len;
- uint32 targetPageOff;
uint32 targetRecOff;
uint32 pageHeaderSize;
- XLogRecPtr receivedUpto = {0,0};
- bool finished;
int emode;
/*
@@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
* should never hit the end of WAL because we wait for it to be streamed.
* Therefore treat any broken WAL as PANIC, instead of failing over.
*/
- if (InStreamingRecovery)
+ if (StandbyMode)
emode = PANIC;
else
emode = emode_arg;
@@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
if (RecPtr == NULL)
{
RecPtr = &tmpRecPtr;
- /* fast case if next record is on same page */
- if (nextRecord != NULL)
- {
- record = nextRecord;
- goto got_record;
- }
/*
- * Align old recptr to next page if the current page is filled and
- * doesn't need to be read again.
+ * Align recptr to next page if no more records can fit on the
+ * current page.
*/
- if (!needReread)
+ if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
+ {
NextLogPage(tmpRecPtr);
- /* We will account for page header size below */
+ /* We will account for page header size below */
+ }
}
else
{
@@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
randAccess = true; /* allow curFileTLI to go backwards too */
}
- if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
- {
- close(readFile);
- readFile = -1;
- }
-
- /* Is the target record ready yet? */
- if (InStreamingRecovery)
- {
- receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
- if (finished)
- {
- if (emode_arg == PANIC)
- ereport(PANIC,
- (errmsg("streaming recovery ended")));
- else
- return NULL;
- }
- }
-
- XLByteToSeg(*RecPtr, readId, readSeg);
- if (readFile < 0)
- {
- /* Now it's okay to reset curFileTLI if random fetch */
- if (randAccess)
- curFileTLI = 0;
-
- readFile = XLogFileRead(readId, readSeg, emode);
- if (readFile < 0)
- goto next_record_is_invalid;
-
- /*
- * Whenever switching to a new WAL segment, we read the first page of
- * the file and validate its header, even if that's not where the
- * target record is. This is so that we can check the additional
- * identification info that is present in the first page's "long"
- * header.
- */
- readOff = 0;
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
- goto next_record_is_invalid;
- }
+ /* Read the page containing the record */
+ if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
+ return NULL;
- targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
- if (readOff != targetPageOff || needReread)
- {
- readOff = targetPageOff;
- needReread = false;
- if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
- goto next_record_is_invalid;
- }
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
if (targetRecOff == 0)
@@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
}
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
-got_record:;
-
/*
* xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
* required.
@@ -3838,58 +3725,35 @@ got_record:;
}
buffer = readRecordBuf;
- nextRecord = NULL;
len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
if (total_len > len)
{
/* Need to reassemble record */
XLogContRecord *contrecord;
- XLogRecPtr nextpagelsn = *RecPtr;
+ XLogRecPtr pagelsn;
uint32 gotlen = len;
+ /* Initialize pagelsn to the beginning of the page this record is on */
+ pagelsn = *RecPtr;
+ pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
memcpy(buffer, record, len);
record = (XLogRecord *) buffer;
buffer += len;
for (;;)
{
- /* Is the next page ready yet? */
- if (InStreamingRecovery)
+ /* Calculate pointer to beginning of next page */
+ pagelsn.xrecoff += XLOG_BLCKSZ;
+ if (pagelsn.xrecoff >= XLogFileSize)
{
- if (gotlen != len)
- nextpagelsn.xrecoff += XLOG_BLCKSZ;
- NextLogPage(nextpagelsn);
- receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
- if (finished)
- {
- if (emode_arg == PANIC)
- ereport(PANIC,
- (errmsg("streaming recovery ended")));
- else
- return NULL;
- }
+ (pagelsn.xlogid)++;
+ pagelsn.xrecoff = 0;
}
+ /* Wait for the next page to become available */
+ if (!XLogPageRead(&pagelsn, emode, false, false))
+ return NULL;
- readOff += XLOG_BLCKSZ;
- if (readOff >= XLogSegSize)
- {
- close(readFile);
- readFile = -1;
- NextLogSeg(readId, readSeg);
- readFile = XLogFileRead(readId, readSeg, emode);
- if (readFile < 0)
- goto next_record_is_invalid;
- readOff = 0;
- }
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
- goto next_record_is_invalid;
+ /* Check that the continuation record looks valid */
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
ereport(emode,
@@ -3923,31 +3787,11 @@ got_record:;
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
- if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
- MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
- {
- nextRecord = (XLogRecord *) ((char *) contrecord +
- MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
- }
EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
- /*
- * Check whether the current page needs to be read again. If there is no
- * unread record in the current page (nextRecord == NULL), obviously we
- * don't need to reread it. If we're not in streaming recovery mode yet,
- * partially-filled page doesn't need to be reread because it is the
- * last valid page.
- */
- if (nextRecord != NULL && InStreamingRecovery &&
- XLByteLE(receivedUpto, EndRecPtr))
- {
- nextRecord = NULL;
- needReread = true;
- }
-
ReadRecPtr = *RecPtr;
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record;
@@ -3956,26 +3800,9 @@ got_record:;
/* Record does not cross a page boundary */
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
- if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
- MAXALIGN(total_len))
- nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
- /*
- * Check whether the current page needs to be read again. If there is no
- * unread record in the current page (nextRecord == NULL), obviously we
- * don't need to reread it. If we're not in streaming recovery mode yet,
- * partially-filled page doesn't need to be reread because it is the last
- * valid page.
- */
- if (nextRecord != NULL && InStreamingRecovery &&
- XLByteLE(receivedUpto, EndRecPtr))
- {
- nextRecord = NULL;
- needReread = true;
- }
-
ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len);
@@ -3987,8 +3814,6 @@ got_record:;
/* Pretend it extends to end of segment */
EndRecPtr.xrecoff += XLogSegSize - 1;
EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
- nextRecord = NULL; /* definitely not on same page */
- needReread = false;
/*
* Pretend that readBuf contains the last page of the segment. This is
@@ -4005,7 +3830,6 @@ next_record_is_invalid:;
close(readFile);
readFile = -1;
}
- nextRecord = NULL;
return NULL;
}
@@ -5730,7 +5554,7 @@ StartupXLOG(void)
(errmsg("checkpoint record is at %X/%X",
checkPointLoc.xlogid, checkPointLoc.xrecoff)));
}
- else if (InStreamingRecovery)
+ else if (StandbyMode)
{
/*
* The last valid checkpoint record required for a streaming
@@ -5938,12 +5762,12 @@ StartupXLOG(void)
if (XLByteLT(checkPoint.redo, RecPtr))
{
/* back up to find the record */
- record = FetchRecord(&(checkPoint.redo), PANIC, false);
+ record = ReadRecord(&(checkPoint.redo), PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = FetchRecord(NULL, LOG, false);
+ record = ReadRecord(NULL, LOG, false);
}
if (record != NULL)
@@ -6096,7 +5920,7 @@ StartupXLOG(void)
LastRec = ReadRecPtr;
- record = FetchRecord(NULL, LOG, false);
+ record = ReadRecord(NULL, LOG, false);
} while (record != NULL && recoveryContinue);
/*
@@ -6130,22 +5954,17 @@ StartupXLOG(void)
/*
* We are now done reading the xlog from stream. Turn off streaming
- * recovery, and restart fetching the files (which would be required
- * at end of recovery, e.g., timeline history file) from archive.
+ * recovery to force fetching the files (which would be required
+ * at end of recovery, e.g., timeline history file) from archive or
+ * pg_xlog.
*/
- if (InStreamingRecovery)
- {
- /* We are no longer in streaming recovery state */
- InStreamingRecovery = false;
- ereport(LOG,
- (errmsg("streaming recovery complete")));
- }
+ StandbyMode = false;
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
- record = ReadRecord(&LastRec, PANIC);
+ record = ReadRecord(&LastRec, PANIC, false);
EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
@@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
return NULL;
}
- record = FetchRecord(&RecPtr, LOG, true);
+ record = ReadRecord(&RecPtr, LOG, true);
if (record == NULL)
{
@@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags)
}
LWLockRelease(ControlFileLock);
- /* Are we doing recovery from XLOG stream? */
- if (!InStreamingRecovery)
- InStreamingRecovery = WalRcvInProgress();
-
/*
* Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from
@@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags)
* streaming recovery we have to or the disk will eventually fill up from
* old log files streamed from master.
*/
- if (InStreamingRecovery && (_logId || _logSeg))
+ if (WalRcvInProgress() && (_logId || _logSeg))
{
XLogRecPtr endptr;
@@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void)
*/
if (shutdown_requested)
proc_exit(1);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (IsUnderPostmaster && !PostmasterIsAlive(true))
+ exit(1);
}
/* Main entry point for startup process */
@@ -8843,3 +8665,281 @@ StartupProcessMain(void)
*/
proc_exit(0);
}
+
+/*
+ * Read the XLOG page containing RecPtr into readBuf (if not read already).
+ * Returns true if successful, false otherwise or fails if emode is PANIC.
+ *
+ * This is responsible for restoring files from archive as needed, as well
+ * as for waiting for the requested WAL record to arrive in standby mode.
+ */
+static bool
+XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+ bool randAccess)
+{
+ static XLogRecPtr receivedUpto = {0, 0};
+ bool switched_segment = false;
+ uint32 targetPageOff;
+ uint32 targetRecOff;
+ uint32 targetId;
+ uint32 targetSeg;
+
+ XLByteToSeg(*RecPtr, targetId, targetSeg);
+ targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+ targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
+
+ /* Fast exit if we have read the record in the current buffer already */
+ if (targetId == readId && targetSeg == readSeg &&
+ targetPageOff == readOff && targetRecOff < readLen)
+ return true;
+
+ /*
+ * See if we need to switch to a new segment because the requested record
+ * is not in the currently open one.
+ */
+ if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
+ {
+ close(readFile);
+ readFile = -1;
+ }
+
+ XLByteToSeg(*RecPtr, readId, readSeg);
+
+ /* See if we need to retrieve more data */
+ if (readFile < 0 ||
+ (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
+ {
+ if (StandbyMode)
+ {
+ bool last_restore_failed = false;
+
+ /*
+ * In standby mode, wait for the requested record to become
+ * available, either via restore_command succeeding to restore
+ * the segment, or via walreceiver having streamed the record.
+ */
+ for (;;)
+ {
+ if (WalRcvInProgress())
+ {
+ /*
+ * While walreceiver is active, wait for new WAL to
+ * arrive from primary.
+ */
+ receivedUpto = GetWalRcvWriteRecPtr();
+ if (XLByteLT(*RecPtr, receivedUpto))
+ {
+ /*
+ * Great, streamed far enough. Open the file if it's
+ * not open already.
+ */
+ if (readFile < 0)
+ {
+ readFile =
+ XLogFileRead(readId, readSeg, PANIC,
+ recoveryTargetTLI, false, false);
+ switched_segment = true;
+ readStreamed = true;
+ }
+ break;
+ }
+
+ if (CheckForStandbyTrigger())
+ goto next_record_is_invalid;
+
+ /*
+ * When streaming is active, we want to react quickly when
+ * the next WAL record arrives, so sleep only a bit.
+ */
+ pg_usleep(100000L); /* 100ms */
+ }
+ else
+ {
+ /*
+ * Until walreceiver manages to reconnect, poll the
+ * archive.
+ */
+ if (readFile >= 0)
+ {
+ close(readFile);
+ readFile = -1;
+ }
+ /* Reset curFileTLI if random fetch. */
+ if (randAccess)
+ curFileTLI = 0;
+ readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
+ switched_segment = true;
+ readStreamed = false;
+ if (readFile != -1)
+ {
+ elog(DEBUG1, "got WAL segment from archive");
+ break;
+ }
+
+ /*
+ * If we succeeded restoring some segments from archive
+ * since the last connection attempt (or we haven't
+ * tried streaming yet, retry immediately. But if we
+ * haven't, assume the problem is persistent, so be
+ * less aggressive.
+ */
+ if (last_restore_failed)
+ {
+ /*
+ * Check to see if the trigger file exists. Note that
+ * we do this only after failure, so when you create
+ * the trigger file, we still finish replaying as much
+ * as we can before failover.
+ */
+ if (CheckForStandbyTrigger())
+ goto next_record_is_invalid;
+ pg_usleep(5000000L); /* 5 seconds */
+ }
+ last_restore_failed = true;
+
+ /*
+ * Nope, not found in archive. Try to stream it.
+ *
+ * If fetching_ckpt is TRUE, RecPtr points to the initial
+ * checkpoint location. In that case, we use RedoStartLSN
+ * as the streaming start position instead of RecPtr, so
+ * that when we later jump backwards to start redo at
+ * RedoStartLSN, we will have the logs streamed already.
+ */
+ RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
+ PrimaryConnInfo);
+ }
+
+ /*
+ * This possibly-long loop needs to handle interrupts of startup
+ * process.
+ */
+ HandleStartupProcInterrupts();
+ }
+ }
+ else
+ {
+ /* In archive or crash recovery. */
+ if (readFile < 0)
+ {
+ /* Reset curFileTLI if random fetch. */
+ if (randAccess)
+ curFileTLI = 0;
+ readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
+ InArchiveRecovery);
+ switched_segment = true;
+ readStreamed = false;
+ if (readFile < 0)
+ return false;
+ }
+ }
+ }
+
+ /*
+ * At this point, we have the right segment open and we know the
+ * requested record is in it.
+ */
+ Assert(readFile != -1);
+
+ /*
+ * If the current segment is being streamed from master, calculate
+ * how much of the current page we have received already. We know the
+ * requested record has been received, but this is for the benefit
+ * of future calls, to allow quick exit at the top of this function.
+ */
+ if (readStreamed)
+ {
+ if (RecPtr->xlogid != receivedUpto.xlogid ||
+ (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
+ {
+ readLen = XLOG_BLCKSZ;
+ }
+ else
+ readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
+ }
+ else
+ readLen = XLOG_BLCKSZ;
+
+ if (switched_segment && targetPageOff != 0)
+ {
+ /*
+ * Whenever switching to a new WAL segment, we read the first page of
+ * the file and validate its header, even if that's not where the
+ * target record is. This is so that we can check the additional
+ * identification info that is present in the first page's "long"
+ * header.
+ */
+ readOff = 0;
+ if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not read from log file %u, segment %u, offset %u: %m",
+ readId, readSeg, readOff)));
+ goto next_record_is_invalid;
+ }
+ if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+ goto next_record_is_invalid;
+ }
+
+ /* Read the requested page */
+ readOff = targetPageOff;
+ if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
+ {
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+ readId, readSeg, readOff)));
+ goto next_record_is_invalid;
+ }
+ if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not read from log file %u, segment %u, offset %u: %m",
+ readId, readSeg, readOff)));
+ goto next_record_is_invalid;
+ }
+ if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+ goto next_record_is_invalid;
+
+ Assert(targetId == readId);
+ Assert(targetSeg == readSeg);
+ Assert(targetPageOff == readOff);
+ Assert(targetRecOff < readLen);
+
+ return true;
+
+next_record_is_invalid:
+ if (readFile >= 0)
+ close(readFile);
+ readFile = -1;
+ readStreamed = false;
+ readLen = 0;
+
+ return false;
+}
+
+/*
+ * Check to see if the trigger file exists. If it does, request postmaster
+ * to shut down walreceiver, wait for it to exit, remove the trigger
+ * file, and return true.
+ */
+static bool
+CheckForStandbyTrigger(void)
+{
+ struct stat stat_buf;
+
+ if (TriggerFile == NULL)
+ return false;
+
+ if (stat(TriggerFile, &stat_buf) == 0)
+ {
+ ereport(LOG,
+ (errmsg("trigger file found: %s", TriggerFile)));
+ ShutdownWalRcv();
+ unlink(TriggerFile);
+ return true;
+ }
+ return false;
+}