diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 702 |
1 files changed, 401 insertions, 301 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1eb877e5fcb..60d40d4505b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -144,16 +144,6 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; /* - * Are we doing recovery from XLOG stream? If so, we recover without using - * offline XLOG archives even though InArchiveRecovery==true. This flag is - * used only in standby mode. - */ -static bool InStreamingRecovery = false; - -/* The current log page is partially-filled, and so needs to be read again? */ -static bool needReread = false; - -/* * Local copy of SharedRecoveryInProgress variable. True actually means "not * known, need to check the shared state". */ @@ -457,12 +447,16 @@ static uint32 openLogOff = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. + * will be just past that page. readLen indicates how much of the current + * page has been read into readBuf. */ static int readFile = -1; static uint32 readId = 0; static uint32 readSeg = 0; static uint32 readOff = 0; +static uint32 readLen = 0; +/* Is the currently open segment being streamed from primary? */ +static bool readStreamed = false; /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ static char *readBuf = NULL; @@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0; /* State information for XLOG reading */ static XLogRecPtr ReadRecPtr; /* start of last record read */ static XLogRecPtr EndRecPtr; /* end+1 of last record read */ -static XLogRecord *nextRecord = NULL; static TimeLineID lastPageTLI = 0; static XLogRecPtr minRecoveryPoint; /* local copy of @@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock); -static int XLogFileRead(uint32 log, uint32 seg, int emode); +static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, + bool fromArchive, bool notexistOk); +static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, + bool fromArchive); +static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, + bool randAccess); static void XLogFileClose(void); static bool RestoreArchivedFile(char *path, const char *xlogfname, const char *recovername, off_t expectedSize); @@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); -static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); -static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode); +static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); static List *readTimeLineHistory(TimeLineID targetTLI); @@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, static void WriteControlFile(void); static void ReadControlFile(void); static char *str_time(pg_time_t tnow); +static bool CheckForStandbyTrigger(void); #ifdef WAL_DEBUG static void xlog_outrec(StringInfo buf, XLogRecord *record); @@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg) /* * Open a logfile segment for reading (during recovery). + * + * If fromArchive is true, the segment is retrieved from archive, otherwise + * it's read from pg_xlog. */ static int -XLogFileRead(uint32 log, uint32 seg, int emode) +XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, + bool fromArchive, bool notfoundOk) { - char path[MAXPGPATH]; char xlogfname[MAXFNAMELEN]; char activitymsg[MAXFNAMELEN + 16]; - ListCell *cell; + char path[MAXPGPATH]; int fd; - /* - * Loop looking for a suitable timeline ID: we might need to read any of - * the timelines listed in expectedTLIs. - * - * We expect curFileTLI on entry to be the TLI of the preceding file in - * sequence, or 0 if there was no predecessor. We do not allow curFileTLI - * to go backwards; this prevents us from picking up the wrong file when a - * parent timeline extends to higher segment numbers than the child we - * want to read. - */ - foreach(cell, expectedTLIs) - { - TimeLineID tli = (TimeLineID) lfirst_int(cell); - - if (tli < curFileTLI) - break; /* don't bother looking at too-old TLIs */ - XLogFileName(xlogfname, tli, log, seg); - if (InArchiveRecovery && !InStreamingRecovery) + if (fromArchive) { /* Report recovery progress in PS display */ snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", @@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode) restoredFromArchive = RestoreArchivedFile(path, xlogfname, "RECOVERYXLOG", XLogSegSize); + if (!restoredFromArchive) + return -1; } else + { XLogFilePath(path, tli, log, seg); + restoredFromArchive = false; + } fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (fd >= 0) @@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode) return fd; } - if (errno != ENOENT) /* unexpected failure? */ + if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ ereport(PANIC, (errcode_for_file_access(), errmsg("could not open file \"%s\" (log file %u, segment %u): %m", path, log, seg))); + return -1; +} + +/* + * Open a logfile segment for reading (during recovery). + * + * This version searches for the segment with any TLI listed in expectedTLIs. + * If not in StandbyMode and fromArchive is true, the segment is also + * searched in pg_xlog if not found in archive. + */ +static int +XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive) +{ + char path[MAXPGPATH]; + ListCell *cell; + int fd; + + /* + * Loop looking for a suitable timeline ID: we might need to read any of + * the timelines listed in expectedTLIs. + * + * We expect curFileTLI on entry to be the TLI of the preceding file in + * sequence, or 0 if there was no predecessor. We do not allow curFileTLI + * to go backwards; this prevents us from picking up the wrong file when a + * parent timeline extends to higher segment numbers than the child we + * want to read. + */ + foreach(cell, expectedTLIs) + { + TimeLineID tli = (TimeLineID) lfirst_int(cell); + + if (tli < curFileTLI) + break; /* don't bother looking at too-old TLIs */ + + fd = XLogFileRead(log, seg, emode, tli, fromArchive, true); + if (fd != -1) + return fd; + + /* + * If not in StandbyMode, fall back to searching pg_xlog. In + * StandbyMode we're streaming segments from the primary to pg_xlog, + * and we mustn't confuse the (possibly partial) segments in pg_xlog + * with complete segments ready to be applied. We rather wait for + * the records to arrive through streaming. + */ + if (!StandbyMode && fromArchive) + { + fd = XLogFileRead(log, seg, emode, tli, false, true); + if (fd != -1) + return fd; + } } /* Couldn't find it. For simplicity, complain about front timeline */ @@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) * different filename that can't be confused with regular XLOG * files. */ - if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name)) + if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); @@ -3474,92 +3514,19 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) } /* - * Attempt to fetch an XLOG record. - * - * If RecPtr is not NULL, try to fetch a record at that position. Otherwise - * try to fetch a record just after the last one previously read. - * - * In standby mode, if we failed in reading a valid record and are not doing - * recovery from XLOG stream yet, we ignore the failure and start walreceiver - * process to fetch the record from the primary. Otherwise, returns NULL, - * or fails if emode is PANIC. (emode must be either PANIC or LOG.) - * - * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In - * this case, if we have to start XLOG streaming, we use RedoStartLSN as the - * streaming start position instead of RecPtr. - * - * The record is copied into readRecordBuf, so that on successful return, - * the returned record pointer always points there. - */ -static XLogRecord * -FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) -{ - if (StandbyMode && !InStreamingRecovery) - { - XLogRecord *record; - XLogRecPtr startlsn; - bool haveNextRecord = (nextRecord != NULL); - - /* An invalid record is OK here, so we set emode to DEBUG2 */ - record = ReadRecord(RecPtr, DEBUG2); - if (record != NULL) - return record; - - /* - * Start XLOG streaming if there is no more valid records available - * in the archive. - * - * We need to calculate the start position of XLOG streaming. If we - * read a record in the middle of a segment which doesn't exist in - * pg_xlog, we use the start of the segment as the start position. - * That prevents a broken segment (i.e., with no records in the - * first half of a segment) from being created by XLOG streaming, - * which might cause trouble later on if the segment is e.g - * archived. - */ - startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr; - if (startlsn.xrecoff % XLogSegSize != 0) - { - char xlogpath[MAXPGPATH]; - struct stat stat_buf; - uint32 log; - uint32 seg; - - XLByteToSeg(startlsn, log, seg); - XLogFilePath(xlogpath, recoveryTargetTLI, log, seg); - - if (stat(xlogpath, &stat_buf) != 0) - startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize; - } - RequestXLogStreaming(startlsn, PrimaryConnInfo); - - /* Needs to read the current page again if the next record is in it */ - needReread = haveNextRecord; - nextRecord = NULL; - - InStreamingRecovery = true; - ereport(LOG, - (errmsg("starting streaming recovery at %X/%X", - startlsn.xlogid, startlsn.xrecoff))); - } - - return ReadRecord(RecPtr, emode); -} - -/* * Attempt to read an XLOG record. * * If RecPtr is not NULL, try to read a record at that position. Otherwise * try to read a record just after the last one previously read. * * If no valid record is available, returns NULL, or fails if emode is PANIC. - * (emode must be either PANIC, LOG or DEBUG2.) + * (emode must be either PANIC, LOG) * * The record is copied into readRecordBuf, so that on successful return, * the returned record pointer always points there. */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, int emode_arg) +ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) { XLogRecord *record; char *buffer; @@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) bool randAccess = false; uint32 len, total_len; - uint32 targetPageOff; uint32 targetRecOff; uint32 pageHeaderSize; - XLogRecPtr receivedUpto = {0,0}; - bool finished; int emode; /* @@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) * should never hit the end of WAL because we wait for it to be streamed. * Therefore treat any broken WAL as PANIC, instead of failing over. */ - if (InStreamingRecovery) + if (StandbyMode) emode = PANIC; else emode = emode_arg; @@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) if (RecPtr == NULL) { RecPtr = &tmpRecPtr; - /* fast case if next record is on same page */ - if (nextRecord != NULL) - { - record = nextRecord; - goto got_record; - } /* - * Align old recptr to next page if the current page is filled and - * doesn't need to be read again. + * Align recptr to next page if no more records can fit on the + * current page. */ - if (!needReread) + if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord) + { NextLogPage(tmpRecPtr); - /* We will account for page header size below */ + /* We will account for page header size below */ + } } else { @@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) randAccess = true; /* allow curFileTLI to go backwards too */ } - if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) - { - close(readFile); - readFile = -1; - } - - /* Is the target record ready yet? */ - if (InStreamingRecovery) - { - receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished); - if (finished) - { - if (emode_arg == PANIC) - ereport(PANIC, - (errmsg("streaming recovery ended"))); - else - return NULL; - } - } - - XLByteToSeg(*RecPtr, readId, readSeg); - if (readFile < 0) - { - /* Now it's okay to reset curFileTLI if random fetch */ - if (randAccess) - curFileTLI = 0; - - readFile = XLogFileRead(readId, readSeg, emode); - if (readFile < 0) - goto next_record_is_invalid; - - /* - * Whenever switching to a new WAL segment, we read the first page of - * the file and validate its header, even if that's not where the - * target record is. This is so that we can check the additional - * identification info that is present in the first page's "long" - * header. - */ - readOff = 0; - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) - goto next_record_is_invalid; - } + /* Read the page containing the record */ + if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) + return NULL; - targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - if (readOff != targetPageOff || needReread) - { - readOff = targetPageOff; - needReread = false; - if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not seek in log file %u, segment %u to offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) - goto next_record_is_invalid; - } pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; if (targetRecOff == 0) @@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) } record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ); -got_record:; - /* * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is * required. @@ -3838,58 +3725,35 @@ got_record:; } buffer = readRecordBuf; - nextRecord = NULL; len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ; if (total_len > len) { /* Need to reassemble record */ XLogContRecord *contrecord; - XLogRecPtr nextpagelsn = *RecPtr; + XLogRecPtr pagelsn; uint32 gotlen = len; + /* Initialize pagelsn to the beginning of the page this record is on */ + pagelsn = *RecPtr; + pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ; + memcpy(buffer, record, len); record = (XLogRecord *) buffer; buffer += len; for (;;) { - /* Is the next page ready yet? */ - if (InStreamingRecovery) + /* Calculate pointer to beginning of next page */ + pagelsn.xrecoff += XLOG_BLCKSZ; + if (pagelsn.xrecoff >= XLogFileSize) { - if (gotlen != len) - nextpagelsn.xrecoff += XLOG_BLCKSZ; - NextLogPage(nextpagelsn); - receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished); - if (finished) - { - if (emode_arg == PANIC) - ereport(PANIC, - (errmsg("streaming recovery ended"))); - else - return NULL; - } + (pagelsn.xlogid)++; + pagelsn.xrecoff = 0; } + /* Wait for the next page to become available */ + if (!XLogPageRead(&pagelsn, emode, false, false)) + return NULL; - readOff += XLOG_BLCKSZ; - if (readOff >= XLogSegSize) - { - close(readFile); - readFile = -1; - NextLogSeg(readId, readSeg); - readFile = XLogFileRead(readId, readSeg, emode); - if (readFile < 0) - goto next_record_is_invalid; - readOff = 0; - } - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) - goto next_record_is_invalid; + /* Check that the continuation record looks valid */ if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) { ereport(emode, @@ -3923,31 +3787,11 @@ got_record:; if (!RecordIsValid(record, *RecPtr, emode)) goto next_record_is_invalid; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); - if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize + - MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len)) - { - nextRecord = (XLogRecord *) ((char *) contrecord + - MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len)); - } EndRecPtr.xlogid = readId; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + pageHeaderSize + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len); - /* - * Check whether the current page needs to be read again. If there is no - * unread record in the current page (nextRecord == NULL), obviously we - * don't need to reread it. If we're not in streaming recovery mode yet, - * partially-filled page doesn't need to be reread because it is the - * last valid page. - */ - if (nextRecord != NULL && InStreamingRecovery && - XLByteLE(receivedUpto, EndRecPtr)) - { - nextRecord = NULL; - needReread = true; - } - ReadRecPtr = *RecPtr; /* needn't worry about XLOG SWITCH, it can't cross page boundaries */ return record; @@ -3956,26 +3800,9 @@ got_record:; /* Record does not cross a page boundary */ if (!RecordIsValid(record, *RecPtr, emode)) goto next_record_is_invalid; - if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ + - MAXALIGN(total_len)) - nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len)); EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); - /* - * Check whether the current page needs to be read again. If there is no - * unread record in the current page (nextRecord == NULL), obviously we - * don't need to reread it. If we're not in streaming recovery mode yet, - * partially-filled page doesn't need to be reread because it is the last - * valid page. - */ - if (nextRecord != NULL && InStreamingRecovery && - XLByteLE(receivedUpto, EndRecPtr)) - { - nextRecord = NULL; - needReread = true; - } - ReadRecPtr = *RecPtr; memcpy(buffer, record, total_len); @@ -3987,8 +3814,6 @@ got_record:; /* Pretend it extends to end of segment */ EndRecPtr.xrecoff += XLogSegSize - 1; EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize; - nextRecord = NULL; /* definitely not on same page */ - needReread = false; /* * Pretend that readBuf contains the last page of the segment. This is @@ -4005,7 +3830,6 @@ next_record_is_invalid:; close(readFile); readFile = -1; } - nextRecord = NULL; return NULL; } @@ -5730,7 +5554,7 @@ StartupXLOG(void) (errmsg("checkpoint record is at %X/%X", checkPointLoc.xlogid, checkPointLoc.xrecoff))); } - else if (InStreamingRecovery) + else if (StandbyMode) { /* * The last valid checkpoint record required for a streaming @@ -5938,12 +5762,12 @@ StartupXLOG(void) if (XLByteLT(checkPoint.redo, RecPtr)) { /* back up to find the record */ - record = FetchRecord(&(checkPoint.redo), PANIC, false); + record = ReadRecord(&(checkPoint.redo), PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = FetchRecord(NULL, LOG, false); + record = ReadRecord(NULL, LOG, false); } if (record != NULL) @@ -6096,7 +5920,7 @@ StartupXLOG(void) LastRec = ReadRecPtr; - record = FetchRecord(NULL, LOG, false); + record = ReadRecord(NULL, LOG, false); } while (record != NULL && recoveryContinue); /* @@ -6130,22 +5954,17 @@ StartupXLOG(void) /* * We are now done reading the xlog from stream. Turn off streaming - * recovery, and restart fetching the files (which would be required - * at end of recovery, e.g., timeline history file) from archive. + * recovery to force fetching the files (which would be required + * at end of recovery, e.g., timeline history file) from archive or + * pg_xlog. */ - if (InStreamingRecovery) - { - /* We are no longer in streaming recovery state */ - InStreamingRecovery = false; - ereport(LOG, - (errmsg("streaming recovery complete"))); - } + StandbyMode = false; /* * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ - record = ReadRecord(&LastRec, PANIC); + record = ReadRecord(&LastRec, PANIC, false); EndOfLog = EndRecPtr; XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg); @@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) return NULL; } - record = FetchRecord(&RecPtr, LOG, true); + record = ReadRecord(&RecPtr, LOG, true); if (record == NULL) { @@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags) } LWLockRelease(ControlFileLock); - /* Are we doing recovery from XLOG stream? */ - if (!InStreamingRecovery) - InStreamingRecovery = WalRcvInProgress(); - /* * Delete old log files (those no longer needed even for previous * checkpoint/restartpoint) to prevent the disk holding the xlog from @@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags) * streaming recovery we have to or the disk will eventually fill up from * old log files streamed from master. */ - if (InStreamingRecovery && (_logId || _logSeg)) + if (WalRcvInProgress() && (_logId || _logSeg)) { XLogRecPtr endptr; @@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void) */ if (shutdown_requested) proc_exit(1); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (IsUnderPostmaster && !PostmasterIsAlive(true)) + exit(1); } /* Main entry point for startup process */ @@ -8843,3 +8665,281 @@ StartupProcessMain(void) */ proc_exit(0); } + +/* + * Read the XLOG page containing RecPtr into readBuf (if not read already). + * Returns true if successful, false otherwise or fails if emode is PANIC. + * + * This is responsible for restoring files from archive as needed, as well + * as for waiting for the requested WAL record to arrive in standby mode. + */ +static bool +XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, + bool randAccess) +{ + static XLogRecPtr receivedUpto = {0, 0}; + bool switched_segment = false; + uint32 targetPageOff; + uint32 targetRecOff; + uint32 targetId; + uint32 targetSeg; + + XLByteToSeg(*RecPtr, targetId, targetSeg); + targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; + targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; + + /* Fast exit if we have read the record in the current buffer already */ + if (targetId == readId && targetSeg == readSeg && + targetPageOff == readOff && targetRecOff < readLen) + return true; + + /* + * See if we need to switch to a new segment because the requested record + * is not in the currently open one. + */ + if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) + { + close(readFile); + readFile = -1; + } + + XLByteToSeg(*RecPtr, readId, readSeg); + + /* See if we need to retrieve more data */ + if (readFile < 0 || + (readStreamed && !XLByteLT(*RecPtr, receivedUpto))) + { + if (StandbyMode) + { + bool last_restore_failed = false; + + /* + * In standby mode, wait for the requested record to become + * available, either via restore_command succeeding to restore + * the segment, or via walreceiver having streamed the record. + */ + for (;;) + { + if (WalRcvInProgress()) + { + /* + * While walreceiver is active, wait for new WAL to + * arrive from primary. + */ + receivedUpto = GetWalRcvWriteRecPtr(); + if (XLByteLT(*RecPtr, receivedUpto)) + { + /* + * Great, streamed far enough. Open the file if it's + * not open already. + */ + if (readFile < 0) + { + readFile = + XLogFileRead(readId, readSeg, PANIC, + recoveryTargetTLI, false, false); + switched_segment = true; + readStreamed = true; + } + break; + } + + if (CheckForStandbyTrigger()) + goto next_record_is_invalid; + + /* + * When streaming is active, we want to react quickly when + * the next WAL record arrives, so sleep only a bit. + */ + pg_usleep(100000L); /* 100ms */ + } + else + { + /* + * Until walreceiver manages to reconnect, poll the + * archive. + */ + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + /* Reset curFileTLI if random fetch. */ + if (randAccess) + curFileTLI = 0; + readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true); + switched_segment = true; + readStreamed = false; + if (readFile != -1) + { + elog(DEBUG1, "got WAL segment from archive"); + break; + } + + /* + * If we succeeded restoring some segments from archive + * since the last connection attempt (or we haven't + * tried streaming yet, retry immediately. But if we + * haven't, assume the problem is persistent, so be + * less aggressive. + */ + if (last_restore_failed) + { + /* + * Check to see if the trigger file exists. Note that + * we do this only after failure, so when you create + * the trigger file, we still finish replaying as much + * as we can before failover. + */ + if (CheckForStandbyTrigger()) + goto next_record_is_invalid; + pg_usleep(5000000L); /* 5 seconds */ + } + last_restore_failed = true; + + /* + * Nope, not found in archive. Try to stream it. + * + * If fetching_ckpt is TRUE, RecPtr points to the initial + * checkpoint location. In that case, we use RedoStartLSN + * as the streaming start position instead of RecPtr, so + * that when we later jump backwards to start redo at + * RedoStartLSN, we will have the logs streamed already. + */ + RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr, + PrimaryConnInfo); + } + + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + } + } + else + { + /* In archive or crash recovery. */ + if (readFile < 0) + { + /* Reset curFileTLI if random fetch. */ + if (randAccess) + curFileTLI = 0; + readFile = XLogFileReadAnyTLI(readId, readSeg, emode, + InArchiveRecovery); + switched_segment = true; + readStreamed = false; + if (readFile < 0) + return false; + } + } + } + + /* + * At this point, we have the right segment open and we know the + * requested record is in it. + */ + Assert(readFile != -1); + + /* + * If the current segment is being streamed from master, calculate + * how much of the current page we have received already. We know the + * requested record has been received, but this is for the benefit + * of future calls, to allow quick exit at the top of this function. + */ + if (readStreamed) + { + if (RecPtr->xlogid != receivedUpto.xlogid || + (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ)) + { + readLen = XLOG_BLCKSZ; + } + else + readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff; + } + else + readLen = XLOG_BLCKSZ; + + if (switched_segment && targetPageOff != 0) + { + /* + * Whenever switching to a new WAL segment, we read the first page of + * the file and validate its header, even if that's not where the + * target record is. This is so that we can check the additional + * identification info that is present in the first page's "long" + * header. + */ + readOff = 0; + if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + ereport(emode, + (errcode_for_file_access(), + errmsg("could not read from log file %u, segment %u, offset %u: %m", + readId, readSeg, readOff))); + goto next_record_is_invalid; + } + if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) + goto next_record_is_invalid; + } + + /* Read the requested page */ + readOff = targetPageOff; + if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) + { + ereport(emode, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, segment %u to offset %u: %m", + readId, readSeg, readOff))); + goto next_record_is_invalid; + } + if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + ereport(emode, + (errcode_for_file_access(), + errmsg("could not read from log file %u, segment %u, offset %u: %m", + readId, readSeg, readOff))); + goto next_record_is_invalid; + } + if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) + goto next_record_is_invalid; + + Assert(targetId == readId); + Assert(targetSeg == readSeg); + Assert(targetPageOff == readOff); + Assert(targetRecOff < readLen); + + return true; + +next_record_is_invalid: + if (readFile >= 0) + close(readFile); + readFile = -1; + readStreamed = false; + readLen = 0; + + return false; +} + +/* + * Check to see if the trigger file exists. If it does, request postmaster + * to shut down walreceiver, wait for it to exit, remove the trigger + * file, and return true. + */ +static bool +CheckForStandbyTrigger(void) +{ + struct stat stat_buf; + + if (TriggerFile == NULL) + return false; + + if (stat(TriggerFile, &stat_buf) == 0) + { + ereport(LOG, + (errmsg("trigger file found: %s", TriggerFile))); + ShutdownWalRcv(); + unlink(TriggerFile); + return true; + } + return false; +} |