diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 1420 |
1 files changed, 899 insertions, 521 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 358966d4fc4..40c11fb6bd0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.149 2004/07/19 14:34:39 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.150 2004/07/21 22:31:20 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,12 +24,13 @@ #include "access/clog.h" #include "access/subtrans.h" -#include "access/transam.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/xlog_internal.h" #include "access/xlogutils.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" +#include "miscadmin.h" #include "postmaster/bgwriter.h" #include "storage/bufpage.h" #include "storage/fd.h" @@ -41,7 +42,6 @@ #include "utils/builtins.h" #include "utils/guc.h" #include "utils/relcache.h" -#include "miscadmin.h" /* @@ -121,25 +121,57 @@ static int open_sync_bit = DEFAULT_SYNC_FLAGBIT; /* - * ThisStartUpID will be same in all backends --- it identifies current - * instance of the database system. + * ThisTimeLineID will be same in all backends --- it identifies current + * WAL timeline for the database system. */ -StartUpID ThisStartUpID = 0; +TimeLineID ThisTimeLineID = 0; /* Are we doing recovery from XLOG? */ bool InRecovery = false; /* Are we recovering using offline XLOG archives? */ static bool InArchiveRecovery = false; -/* Was the last file restored from archive, or local? */ +/* Was the last xlog file restored from archive, or local? */ static bool restoredFromArchive = false; -static char recoveryRestoreCommand[MAXPGPATH]; +/* options taken from recovery.conf */ +static char *recoveryRestoreCommand = NULL; static bool recoveryTarget = false; static bool recoveryTargetExact = false; static bool recoveryTargetInclusive = true; static TransactionId recoveryTargetXid; static time_t recoveryTargetTime; +/* if recoveryStopsHere returns true, it saves actual stop xid/time here */ +static TransactionId recoveryStopXid; +static time_t recoveryStopTime; +static bool recoveryStopAfter; + +/* + * During normal operation, the only timeline we care about is ThisTimeLineID. + * During recovery, however, things are more complicated. To simplify life + * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we + * scan through the WAL history (that is, it is the line that was active when + * the currently-scanned WAL record was generated). We also need these + * timeline values: + * + * recoveryTargetTLI: the desired timeline that we want to end in. + * + * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of + * its known parents, newest first (so recoveryTargetTLI is always the + * first list member). Only these TLIs are expected to be seen in the WAL + * segments we read, and indeed only these TLIs will be considered as + * candidate WAL files to open at all. + * + * curFileTLI: the TLI appearing in the name of the current input WAL file. + * (This is not necessarily the same as ThisTimeLineID, because we could + * be scanning data that was copied from an ancestor timeline when the current + * file was created.) During a sequential scan we do not allow this value + * to decrease. + */ +static TimeLineID recoveryTargetTLI; +static List *expectedTLIs; +static TimeLineID curFileTLI; + /* * MyLastRecPtr points to the start of the last XLOG record inserted by the * current transaction. If MyLastRecPtr.xrecoff == 0, then the current @@ -242,12 +274,19 @@ static XLogRecPtr RedoRecPtr; * *---------- */ + typedef struct XLogwrtRqst { XLogRecPtr Write; /* last byte + 1 to write out */ XLogRecPtr Flush; /* last byte + 1 to flush */ } XLogwrtRqst; +typedef struct XLogwrtResult +{ + XLogRecPtr Write; /* last byte + 1 written out */ + XLogRecPtr Flush; /* last byte + 1 flushed */ +} XLogwrtResult; + /* * Shared state data for XLogInsert. */ @@ -293,7 +332,7 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */ uint32 XLogCacheByte; /* # bytes in xlog buffers */ uint32 XLogCacheBlck; /* highest allocated xlog buffer index */ - StartUpID ThisStartUpID; + TimeLineID ThisTimeLineID; slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */ } XLogCtlData; @@ -323,99 +362,15 @@ static ControlFileData *ControlFile = NULL; XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \ ) - -/* Increment an xlogid/segment pair */ -#define NextLogSeg(logId, logSeg) \ - do { \ - if ((logSeg) >= XLogSegsPerFile-1) \ - { \ - (logId)++; \ - (logSeg) = 0; \ - } \ - else \ - (logSeg)++; \ - } while (0) - -/* Decrement an xlogid/segment pair (assume it's not 0,0) */ -#define PrevLogSeg(logId, logSeg) \ - do { \ - if (logSeg) \ - (logSeg)--; \ - else \ - { \ - (logId)--; \ - (logSeg) = XLogSegsPerFile-1; \ - } \ - } while (0) - -/* - * Compute ID and segment from an XLogRecPtr. - * - * For XLByteToSeg, do the computation at face value. For XLByteToPrevSeg, - * a boundary byte is taken to be in the previous segment. This is suitable - * for deciding which segment to write given a pointer to a record end, - * for example. (We can assume xrecoff is not zero, since no valid recptr - * can have that.) - */ -#define XLByteToSeg(xlrp, logId, logSeg) \ - ( logId = (xlrp).xlogid, \ - logSeg = (xlrp).xrecoff / XLogSegSize \ - ) -#define XLByteToPrevSeg(xlrp, logId, logSeg) \ - ( logId = (xlrp).xlogid, \ - logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \ - ) - -/* - * Is an XLogRecPtr within a particular XLOG segment? - * - * For XLByteInSeg, do the computation at face value. For XLByteInPrevSeg, - * a boundary byte is taken to be in the previous segment. - */ -#define XLByteInSeg(xlrp, logId, logSeg) \ - ((xlrp).xlogid == (logId) && \ - (xlrp).xrecoff / XLogSegSize == (logSeg)) - -#define XLByteInPrevSeg(xlrp, logId, logSeg) \ - ((xlrp).xlogid == (logId) && \ - ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg)) - - #define PrevBufIdx(idx) \ (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1)) #define NextBufIdx(idx) \ (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1)) -#define XRecOffIsValid(xrecoff) \ - ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \ - (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord) - -/* - * These macros encapsulate knowledge about the exact layout of XLog file - * names as well as archive-status file names. - */ -#define MAXFNAMELEN 32 - -#define XLogFileName(fname, log, seg) \ - snprintf(fname, MAXFNAMELEN, "%08X%08X", log, seg) - -#define XLogFilePath(path, log, seg) \ - snprintf(path, MAXPGPATH, "%s/%08X%08X", XLogDir, log, seg) - -#define StatusFilePath(path, xlog, suffix) \ - snprintf(path, MAXPGPATH, "%s/archive_status/%s%s", XLogDir, xlog, suffix) - -/* - * _INTL_MAXLOGRECSZ: max space needed for a record including header and - * any backup-block data. - */ -#define _INTL_MAXLOGRECSZ (SizeOfXLogRecord + MAXLOGRECSZ + \ - XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) - /* File path names */ -static char XLogDir[MAXPGPATH]; +char XLogDir[MAXPGPATH]; static char ControlFilePath[MAXPGPATH]; /* @@ -453,36 +408,44 @@ static char *readBuf = NULL; static XLogRecPtr ReadRecPtr; static XLogRecPtr EndRecPtr; static XLogRecord *nextRecord = NULL; -static StartUpID lastReadSUI; +static TimeLineID lastPageTLI = 0; static bool InRedo = false; + static void XLogArchiveNotify(const char *xlog); static void XLogArchiveNotifySeg(uint32 log, uint32 seg); static bool XLogArchiveIsDone(const char *xlog); static void XLogArchiveCleanup(const char *xlog); static void readRecoveryCommandFile(void); -static void exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, - uint32 xrecoff); +static void exitArchiveRecovery(TimeLineID endTLI, + uint32 endLogId, uint32 endLogSeg); static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); static bool AdvanceXLInsertBuffer(void); -static bool WasteXLInsertBuffer(void); static void XLogWrite(XLogwrtRqst WriteRqst); static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, bool find_free, int max_advance, bool use_lock); -static int XLogFileOpen(uint32 log, uint32 seg, bool econt); -static void RestoreArchivedXLog(char *path, uint32 log, uint32 seg); +static int XLogFileOpen(uint32 log, uint32 seg); +static int XLogFileRead(uint32 log, uint32 seg, int emode); +static bool RestoreArchivedFile(char *path, const char *xlogfname, + const char *recovername); static void PreallocXlogFiles(XLogRecPtr endptr); static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer); -static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI); +static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt, char *buffer); +static List *readTimeLineHistory(TimeLineID targetTLI); +static bool existsTimeLineHistory(TimeLineID probeTLI); +static TimeLineID findNewestTimeLine(TimeLineID startTLI); +static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, + TimeLineID endTLI, + uint32 endLogId, uint32 endLogSeg); static void WriteControlFile(void); static void ReadControlFile(void); static char *str_time(time_t tnow); @@ -546,7 +509,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { RecPtr.xlogid = 0; - RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */ + RecPtr.xrecoff = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return (RecPtr); } @@ -755,16 +718,9 @@ begin:; } /* - * Determine exactly where we will place the new XLOG record. If there - * isn't enough space on the current XLOG page for a record header, - * advance to the next page (leaving the unused space as zeroes). - * If there isn't enough space in the current XLOG segment for the whole - * record, advance to the next segment (inserting wasted-space records). - * This avoids needing a continuation record at the start of a segment - * file, which would conflict with placing a FILE_HEADER record there. - * We assume that no XLOG record can be larger than a segment file... + * If there isn't enough space on the current XLOG page for a record + * header, advance to the next page (leaving the unused space as zeroes). */ - updrqst = false; freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) @@ -773,27 +729,6 @@ begin:; freespace = INSERT_FREESPACE(Insert); } - if (freespace < (uint32) (SizeOfXLogRecord + write_len)) - { - /* Doesn't fit on this page, so check for overrunning the file */ - uint32 avail; - - /* First figure the space available in remaining pages of file */ - avail = XLogSegSize - BLCKSZ - - (Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize); - avail /= BLCKSZ; /* convert to pages, then usable bytes */ - avail *= (BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord); - avail += freespace; /* add in the current page too */ - if (avail < (uint32) (SizeOfXLogRecord + write_len)) - { - /* It overruns the file, so waste the rest of the file... */ - do { - updrqst = WasteXLInsertBuffer(); - } while ((Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize) != 0); - freespace = INSERT_FREESPACE(Insert); - } - } - curridx = Insert->curridx; record = (XLogRecord *) Insert->currpos; @@ -891,14 +826,12 @@ begin:; /* Use next buffer */ updrqst = AdvanceXLInsertBuffer(); curridx = Insert->curridx; - /* This assert checks we did not insert a file header record */ - Assert(INSERT_FREESPACE(Insert) == BLCKSZ - SizeOfXLogPHD); /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; contrecord->xl_rem_len = write_len; Insert->currpos += SizeOfXLogContRecord; - freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord; + freespace = INSERT_FREESPACE(Insert); } /* Ensure next record will be properly aligned */ @@ -949,9 +882,9 @@ begin:; * Create an archive notification file * * The name of the notification file is the message that will be picked up - * by the archiver, e.g. we write 00000001000000C6.ready - * and the archiver then knows to archive XLogDir/00000001000000C6, - * then when complete, rename it to 00000001000000C6.done + * by the archiver, e.g. we write 0000000100000001000000C6.ready + * and the archiver then knows to archive XLogDir/0000000100000001000000C6, + * then when complete, rename it to 0000000100000001000000C6.done */ static void XLogArchiveNotify(const char *xlog) @@ -990,7 +923,7 @@ XLogArchiveNotifySeg(uint32 log, uint32 seg) { char xlog[MAXFNAMELEN]; - XLogFileName(xlog, log, seg); + XLogFileName(xlog, ThisTimeLineID, log, seg); XLogArchiveNotify(xlog); } @@ -1035,16 +968,22 @@ XLogArchiveIsDone(const char *xlog) /* * XLogArchiveCleanup * - * Cleanup an archive notification file for a particular xlog segment + * Cleanup archive notification file(s) for a particular xlog segment */ static void XLogArchiveCleanup(const char *xlog) { char archiveStatusPath[MAXPGPATH]; + /* Remove the .done file */ StatusFilePath(archiveStatusPath, xlog, ".done"); unlink(archiveStatusPath); /* should we complain about failure? */ + + /* Remove the .ready file if present --- normally it shouldn't be */ + StatusFilePath(archiveStatusPath, xlog, ".ready"); + unlink(archiveStatusPath); + /* should we complain about failure? */ } /* @@ -1151,7 +1090,7 @@ AdvanceXLInsertBuffer(void) NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ); Insert->curridx = nextidx; Insert->currpage = NewPage; - Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD; + Insert->currpos = ((char *) NewPage) + SizeOfXLogShortPHD; /* * Be sure to re-zero the buffer so that bytes beyond what we've @@ -1164,104 +1103,27 @@ AdvanceXLInsertBuffer(void) */ NewPage->xlp_magic = XLOG_PAGE_MAGIC; /* NewPage->xlp_info = 0; */ /* done by memset */ - NewPage->xlp_sui = ThisStartUpID; + NewPage->xlp_tli = ThisTimeLineID; NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ; /* - * If first page of an XLOG segment file, add a FILE_HEADER record. + * If first page of an XLOG segment file, make it a long header. */ if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0) { - XLogRecPtr RecPtr; - XLogRecord *record; - XLogFileHeaderData *fhdr; - crc64 crc; - - record = (XLogRecord *) Insert->currpos; - record->xl_prev = Insert->PrevRecord; - record->xl_xact_prev.xlogid = 0; - record->xl_xact_prev.xrecoff = 0; - record->xl_xid = InvalidTransactionId; - record->xl_len = SizeOfXLogFHD; - record->xl_info = XLOG_FILE_HEADER; - record->xl_rmid = RM_XLOG_ID; - fhdr = (XLogFileHeaderData *) XLogRecGetData(record); - fhdr->xlfhd_sysid = ControlFile->system_identifier; - fhdr->xlfhd_xlogid = NewPage->xlp_pageaddr.xlogid; - fhdr->xlfhd_segno = NewPage->xlp_pageaddr.xrecoff / XLogSegSize; - fhdr->xlfhd_seg_size = XLogSegSize; - - INIT_CRC64(crc); - COMP_CRC64(crc, fhdr, SizeOfXLogFHD); - COMP_CRC64(crc, (char *) record + sizeof(crc64), - SizeOfXLogRecord - sizeof(crc64)); - FIN_CRC64(crc); - record->xl_crc = crc; - - /* Compute record's XLOG location */ - INSERT_RECPTR(RecPtr, Insert, nextidx); - - /* Record begin of record in appropriate places */ - Insert->PrevRecord = RecPtr; + XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; - Insert->currpos += SizeOfXLogRecord + SizeOfXLogFHD; + NewLongPage->xlp_sysid = ControlFile->system_identifier; + NewLongPage->xlp_seg_size = XLogSegSize; + NewPage->xlp_info |= XLP_LONG_HEADER; + Insert->currpos = ((char *) NewPage) + SizeOfXLogLongPHD; } return update_needed; } /* - * Fill the remainder of the current XLOG page with an XLOG_WASTED_SPACE - * record, and advance to the next page. This has the same calling and - * result conditions as AdvanceXLInsertBuffer, except that - * AdvanceXLInsertBuffer expects the current page to be already filled. - */ -static bool -WasteXLInsertBuffer(void) -{ - XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecord *record; - XLogRecPtr RecPtr; - uint32 freespace; - uint16 curridx; - crc64 rdata_crc; - - freespace = INSERT_FREESPACE(Insert); - Assert(freespace >= SizeOfXLogRecord); - freespace -= SizeOfXLogRecord; - - curridx = Insert->curridx; - record = (XLogRecord *) Insert->currpos; - - record->xl_prev = Insert->PrevRecord; - record->xl_xact_prev.xlogid = 0; - record->xl_xact_prev.xrecoff = 0; - - record->xl_xid = InvalidTransactionId; - record->xl_len = freespace; - record->xl_info = XLOG_WASTED_SPACE; - record->xl_rmid = RM_XLOG_ID; - - INIT_CRC64(rdata_crc); - COMP_CRC64(rdata_crc, XLogRecGetData(record), freespace); - COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64), - SizeOfXLogRecord - sizeof(crc64)); - FIN_CRC64(rdata_crc); - record->xl_crc = rdata_crc; - - /* Compute record's XLOG location */ - INSERT_RECPTR(RecPtr, Insert, curridx); - - /* Record begin of record in appropriate places */ - Insert->PrevRecord = RecPtr; - - /* We needn't bother to advance Insert->currpos */ - - return AdvanceXLInsertBuffer(); -} - -/* * Write and/or fsync the log at least as far as WriteRqst indicates. * * Must be called with WALWriteLock held. @@ -1355,7 +1217,7 @@ XLogWrite(XLogwrtRqst WriteRqst) if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); - openLogFile = XLogFileOpen(openLogId, openLogSeg, false); + openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } @@ -1439,7 +1301,7 @@ XLogWrite(XLogwrtRqst WriteRqst) if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); - openLogFile = XLogFileOpen(openLogId, openLogSeg, false); + openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } issue_xlog_fsync(); @@ -1617,7 +1479,7 @@ XLogFileInit(uint32 log, uint32 seg, int fd; int nbytes; - XLogFilePath(path, log, seg); + XLogFilePath(path, ThisTimeLineID, log, seg); /* * Try to use existent file (checkpoint maker may have created it @@ -1731,6 +1593,109 @@ XLogFileInit(uint32 log, uint32 seg, } /* + * Create a new XLOG file segment by copying a pre-existing one. + * + * log, seg: identify segment to be created. + * + * srcTLI, srclog, srcseg: identify segment to be copied (could be from + * a different timeline) + * + * Currently this is only used during recovery, and so there are no locking + * considerations. But we should be just as tense as XLogFileInit to avoid + * emplacing a bogus file. + */ +static void +XLogFileCopy(uint32 log, uint32 seg, + TimeLineID srcTLI, uint32 srclog, uint32 srcseg) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + char buffer[BLCKSZ]; + int srcfd; + int fd; + int nbytes; + + /* + * Open the source file + */ + XLogFilePath(path, srcTLI, srclog, srcseg); + srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (srcfd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* + * Copy into a temp file name. + */ + snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d", + XLogDir, (int) getpid()); + + unlink(tmppath); + + /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */ + fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", tmppath))); + + /* + * Do the data copying. + */ + for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer)) + { + errno = 0; + if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer)) + { + if (errno != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + else + ereport(PANIC, + (errmsg("insufficient data in file \"%s\"", path))); + } + errno = 0; + if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer)) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk + * space + */ + unlink(tmppath); + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", tmppath))); + } + } + + if (pg_fsync(fd) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", tmppath))); + + if (close(fd)) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", tmppath))); + + close(srcfd); + + /* + * Now move the segment into place with its final name. + */ + if (!InstallXLogFileSegment(log, seg, tmppath, false, 0, false)) + elog(PANIC, "InstallXLogFileSegment should not have failed"); +} + +/* * Install a new XLOG segment file as a current or future log segment. * * This is used both to install a newly-created segment (which has a temp @@ -1763,7 +1728,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, char path[MAXPGPATH]; struct stat stat_buf; - XLogFilePath(path, log, seg); + XLogFilePath(path, ThisTimeLineID, log, seg); /* * We want to be sure that only one process does this at a time. @@ -1789,7 +1754,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, return false; } NextLogSeg(log, seg); - XLogFilePath(path, log, seg); + XLogFilePath(path, ThisTimeLineID, log, seg); } } @@ -1820,73 +1785,102 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, } /* - * Open a pre-existing logfile segment. + * Open a pre-existing logfile segment for writing. */ static int -XLogFileOpen(uint32 log, uint32 seg, bool econt) +XLogFileOpen(uint32 log, uint32 seg) { char path[MAXPGPATH]; int fd; - if (InArchiveRecovery) - RestoreArchivedXLog(path, log, seg); - else - XLogFilePath(path, log, seg); + XLogFilePath(path, ThisTimeLineID, log, seg); fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT, S_IRUSR | S_IWUSR); if (fd < 0) - { - if (econt && errno == ENOENT) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" (log file %u, segment %u): %m", - path, log, seg))); - return (fd); - } ereport(PANIC, (errcode_for_file_access(), errmsg("could not open file \"%s\" (log file %u, segment %u): %m", path, log, seg))); - } + + return fd; +} + +/* + * Open a logfile segment for reading (during recovery). + */ +static int +XLogFileRead(uint32 log, uint32 seg, int emode) +{ + char path[MAXPGPATH]; + char xlogfname[MAXFNAMELEN]; + ListCell *cell; + int fd; /* - * XXX this is a pretty horrid hack. Remove after implementing timelines. - * - * if we switched back to local xlogs after having been - * restoring from archive, we need to make sure that the - * local files don't get removed by end-of-recovery checkpoint - * in case we need to re-run the recovery - * - * we want to copy these away as soon as possible, so set - * the archive status flag to .ready for them - * in case admin isn't cautious enough to have done this anyway + * Loop looking for a suitable timeline ID: we might need to + * read any of the timelines listed in expectedTLIs. * - * XXX this is completely broken, because there is no guarantee this file - * is actually complete and ready to be archived. Also, what if there's - * a .done file for them? + * We expect curFileTLI on entry to be the TLI of the preceding file + * in sequence, or 0 if there was no predecessor. We do not allow + * curFileTLI to go backwards; this prevents us from picking up the + * wrong file when a parent timeline extends to higher segment numbers + * than the child we want to read. */ - if (InArchiveRecovery && !restoredFromArchive) - XLogArchiveNotifySeg(log, seg); + foreach(cell, expectedTLIs) + { + TimeLineID tli = (TimeLineID) lfirst_int(cell); - return (fd); + if (tli < curFileTLI) + break; /* don't bother looking at too-old TLIs */ + + if (InArchiveRecovery) + { + XLogFileName(xlogfname, tli, log, seg); + restoredFromArchive = RestoreArchivedFile(path, xlogfname, + "RECOVERYXLOG"); + } + else + XLogFilePath(path, tli, log, seg); + + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (fd >= 0) + { + /* Success! */ + curFileTLI = tli; + return fd; + } + if (errno != ENOENT) /* unexpected failure? */ + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" (log file %u, segment %u): %m", + path, log, seg))); + } + + /* Couldn't find it. For simplicity, complain about front timeline */ + XLogFilePath(path, recoveryTargetTLI, log, seg); + errno = ENOENT; + ereport(emode, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" (log file %u, segment %u): %m", + path, log, seg))); + return -1; } /* - * Get next logfile segment when using off-line archive for recovery - * - * Attempt to retrieve the specified segment from off-line archival storage. + * Attempt to retrieve the specified file from off-line archival storage. * If successful, fill "path" with its complete path (note that this will be - * a temp file name that doesn't follow the normal naming convention). + * a temp file name that doesn't follow the normal naming convention), and + * return TRUE. * - * If not successful, fill "path" with the name of the normal on-line segment - * file (which may or may not actually exist, but we'll try to use it). + * If not successful, fill "path" with the name of the normal on-line file + * (which may or may not actually exist, but we'll try to use it), and return + * FALSE. */ -static void -RestoreArchivedXLog(char *path, uint32 log, uint32 seg) +static bool +RestoreArchivedFile(char *path, const char *xlogfname, + const char *recovername) { - char xlogfname[MAXFNAMELEN]; char xlogpath[MAXPGPATH]; char xlogRestoreCmd[MAXPGPATH]; char *dp; @@ -1919,11 +1913,10 @@ RestoreArchivedXLog(char *path, uint32 log, uint32 seg) * The copy-from-archive filename is always the same, ensuring that we * don't run out of disk space on long recoveries. */ - XLogFileName(xlogfname, log, seg); - snprintf(xlogpath, MAXPGPATH, "%s/RECOVERYXLOG", XLogDir); + snprintf(xlogpath, MAXPGPATH, "%s/%s", XLogDir, recovername); /* - * Make sure there is no existing RECOVERYXLOG file. + * Make sure there is no existing file named recovername. */ if (stat(xlogpath, &stat_buf) != 0) { @@ -2004,8 +1997,7 @@ RestoreArchivedXLog(char *path, uint32 log, uint32 seg) (errmsg("restored log file \"%s\" from archive", xlogfname))); strcpy(path, xlogpath); - restoredFromArchive = true; - return; + return true; } if (errno != ENOENT) ereport(FATAL, @@ -2033,8 +2025,8 @@ RestoreArchivedXLog(char *path, uint32 log, uint32 seg) * In many recovery scenarios we expect this to fail also, but * if so that just means we've reached the end of WAL. */ - XLogFilePath(path, log, seg); - restoredFromArchive = false; + snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlogfname); + return false; } /* @@ -2085,18 +2077,25 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr) errmsg("could not open transaction log directory \"%s\": %m", XLogDir))); - XLogFileName(lastoff, log, seg); + XLogFileName(lastoff, ThisTimeLineID, log, seg); errno = 0; while ((xlde = readdir(xldir)) != NULL) { /* - * use the alphanumeric sorting property of the filenames to decide - * which ones are earlier than the lastoff segment + * We ignore the timeline part of the XLOG segment identifiers in + * deciding whether a segment is still needed. This ensures that + * we won't prematurely remove a segment from a parent timeline. + * We could probably be a little more proactive about removing + * segments of non-parent timelines, but that would be a whole lot + * more complicated. + * + * We use the alphanumeric sorting property of the filenames to decide + * which ones are earlier than the lastoff segment. */ - if (strlen(xlde->d_name) == 16 && - strspn(xlde->d_name, "0123456789ABCDEF") == 16 && - strcmp(xlde->d_name, lastoff) <= 0) + if (strlen(xlde->d_name) == 24 && + strspn(xlde->d_name, "0123456789ABCDEF") == 24 && + strcmp(xlde->d_name + 8, lastoff + 8) <= 0) { bool recycle; @@ -2185,7 +2184,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) page = (Page) BufferGetPage(buffer); memcpy((char *) page, blk, BLCKSZ); PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); + PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } @@ -2272,11 +2271,13 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) { XLogRecord *record; XLogRecPtr tmpRecPtr = EndRecPtr; + bool randAccess = false; uint32 len, total_len; uint32 targetPageOff; + uint32 targetRecOff; + uint32 pageHeaderSize; unsigned i; - bool nextmode = false; if (readBuf == NULL) { @@ -2295,7 +2296,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) if (RecPtr == NULL) { RecPtr = &tmpRecPtr; - nextmode = true; /* fast case if next record is on same page */ if (nextRecord != NULL) { @@ -2310,12 +2310,24 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) (tmpRecPtr.xlogid)++; tmpRecPtr.xrecoff = 0; } - tmpRecPtr.xrecoff += SizeOfXLogPHD; + /* We will account for page header size below */ + } + else + { + if (!XRecOffIsValid(RecPtr->xrecoff)) + ereport(PANIC, + (errmsg("invalid record offset at %X/%X", + RecPtr->xlogid, RecPtr->xrecoff))); + /* + * Since we are going to a random position in WAL, forget any + * prior state about what timeline we were in, and allow it + * to be any timeline in expectedTLIs. We also set a flag to + * allow curFileTLI to go backwards (but we can't reset that + * variable right here, since we might not change files at all). + */ + lastPageTLI = 0; /* see comment in ValidXLOGHeader */ + randAccess = true; /* allow curFileTLI to go backwards too */ } - else if (!XRecOffIsValid(RecPtr->xrecoff)) - ereport(PANIC, - (errmsg("invalid record offset at %X/%X", - RecPtr->xlogid, RecPtr->xrecoff))); if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) { @@ -2325,7 +2337,11 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) XLByteToSeg(*RecPtr, readId, readSeg); if (readFile < 0) { - readFile = XLogFileOpen(readId, readSeg, (emode == LOG)); + /* Now it's okay to reset curFileTLI if random fetch */ + if (randAccess) + curFileTLI = 0; + + readFile = XLogFileRead(readId, readSeg, emode); if (readFile < 0) goto next_record_is_invalid; readOff = (uint32) (-1); /* force read to occur below */ @@ -2351,11 +2367,30 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) readId, readSeg, readOff))); goto next_record_is_invalid; } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode)) + if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) goto next_record_is_invalid; } + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); + targetRecOff = RecPtr->xrecoff % BLCKSZ; + if (targetRecOff == 0) + { + /* + * Can only get here in the continuing-from-prev-page case, because + * XRecOffIsValid eliminated the zero-page-offset case otherwise. + * Need to skip over the new page's header. + */ + tmpRecPtr.xrecoff += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + ereport(emode, + (errmsg("invalid record offset at %X/%X", + RecPtr->xlogid, RecPtr->xrecoff))); + goto next_record_is_invalid; + } if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD) + targetRecOff == pageHeaderSize) { ereport(emode, (errmsg("contrecord is requested by %X/%X", @@ -2428,7 +2463,7 @@ got_record:; close(readFile); readFile = -1; NextLogSeg(readId, readSeg); - readFile = XLogFileOpen(readId, readSeg, (emode == LOG)); + readFile = XLogFileRead(readId, readSeg, emode); if (readFile < 0) goto next_record_is_invalid; readOff = 0; @@ -2441,7 +2476,7 @@ got_record:; readId, readSeg, readOff))); goto next_record_is_invalid; } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true)) + if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) goto next_record_is_invalid; if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) { @@ -2450,7 +2485,8 @@ got_record:; readId, readSeg, readOff))); goto next_record_is_invalid; } - contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD); + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); + contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize); if (contrecord->xl_rem_len == 0 || total_len != (contrecord->xl_rem_len + gotlen)) { @@ -2460,7 +2496,7 @@ got_record:; readId, readSeg, readOff))); goto next_record_is_invalid; } - len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord; + len = BLCKSZ - pageHeaderSize - SizeOfXLogContRecord; if (contrecord->xl_rem_len > len) { memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len); @@ -2474,7 +2510,8 @@ got_record:; } if (!RecordIsValid(record, *RecPtr, emode)) goto next_record_is_invalid; - if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD + + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); + if (BLCKSZ - SizeOfXLogRecord >= pageHeaderSize + SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len)) { nextRecord = (XLogRecord *) ((char *) contrecord + @@ -2482,7 +2519,7 @@ got_record:; } EndRecPtr.xlogid = readId; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + - SizeOfXLogPHD + SizeOfXLogContRecord + + pageHeaderSize + SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len); ReadRecPtr = *RecPtr; return record; @@ -2514,7 +2551,7 @@ next_record_is_invalid:; * ReadRecord. It's not intended for use from anywhere else. */ static bool -ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI) +ValidXLOGHeader(XLogPageHeader hdr, int emode) { XLogRecPtr recaddr; @@ -2532,6 +2569,37 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI) hdr->xlp_info, readId, readSeg, readOff))); return false; } + if (hdr->xlp_info & XLP_LONG_HEADER) + { + XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr; + + if (longhdr->xlp_sysid != ControlFile->system_identifier) + { + char fhdrident_str[32]; + char sysident_str[32]; + + /* + * Format sysids separately to keep platform-dependent format + * code out of the translatable message string. + */ + snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, + longhdr->xlp_sysid); + snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, + ControlFile->system_identifier); + ereport(emode, + (errmsg("WAL file is from different system"), + errdetail("WAL file SYSID is %s, pg_control SYSID is %s", + fhdrident_str, sysident_str))); + return false; + } + if (longhdr->xlp_seg_size != XLogSegSize) + { + ereport(emode, + (errmsg("WAL file is from different system"), + errdetail("Incorrect XLOG_SEG_SIZE in page header."))); + return false; + } + } recaddr.xlogid = readId; recaddr.xrecoff = readSeg * XLogSegSize + readOff; if (!XLByteEQ(hdr->xlp_pageaddr, recaddr)) @@ -2544,30 +2612,369 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI) } /* - * We disbelieve a SUI less than the previous page's SUI, or more than - * a few counts greater. In theory as many as 512 shutdown checkpoint - * records could appear on a 32K-sized xlog page, so that's the most - * differential there could legitimately be. + * Check page TLI is one of the expected values. + */ + if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli)) + { + ereport(emode, + (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u", + hdr->xlp_tli, + readId, readSeg, readOff))); + return false; + } + + /* + * Since child timelines are always assigned a TLI greater than their + * immediate parent's TLI, we should never see TLI go backwards across + * successive pages of a consistent WAL sequence. * - * Note this check can only be applied when we are reading the next page - * in sequence, so ReadRecord passes a flag indicating whether to - * check. + * Of course this check should only be applied when advancing sequentially + * across pages; therefore ReadRecord resets lastPageTLI to zero when + * going to a random page. */ - if (checkSUI) + if (hdr->xlp_tli < lastPageTLI) + { + ereport(emode, + (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u", + hdr->xlp_tli, lastPageTLI, + readId, readSeg, readOff))); + return false; + } + lastPageTLI = hdr->xlp_tli; + return true; +} + +/* + * Try to read a timeline's history file. + * + * If successful, return the list of component TLIs (the given TLI followed by + * its ancestor TLIs). If we can't find the history file, assume that the + * timeline has no parents, and return a list of just the specified timeline + * ID. + */ +static List * +readTimeLineHistory(TimeLineID targetTLI) +{ + List *result; + char path[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + char fline[MAXPGPATH]; + FILE *fd; + + if (InArchiveRecovery) + { + TLHistoryFileName(histfname, targetTLI); + RestoreArchivedFile(path, histfname, "RECOVERYHISTORY"); + } + else + TLHistoryFilePath(path, targetTLI); + + fd = AllocateFile(path, "r"); + if (fd == NULL) { - if (hdr->xlp_sui < lastReadSUI || - hdr->xlp_sui > lastReadSUI + 512) + if (errno != ENOENT) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not open \"%s\": %m", path))); + /* Not there, so assume no parents */ + return list_make1_int((int) targetTLI); + } + + result = NIL; + + /* + * Parse the file... + */ + while (fgets(fline, MAXPGPATH, fd) != NULL) + { + /* skip leading whitespace and check for # comment */ + char *ptr; + char *endptr; + TimeLineID tli; + + for (ptr = fline; *ptr; ptr++) { - ereport(emode, - /* translator: SUI = startup id */ - (errmsg("out-of-sequence SUI %u (after %u) in log file %u, segment %u, offset %u", - hdr->xlp_sui, lastReadSUI, - readId, readSeg, readOff))); - return false; + if (!isspace((unsigned char) *ptr)) + break; } + if (*ptr == '\0' || *ptr == '#') + continue; + + /* expect a numeric timeline ID as first field of line */ + tli = (TimeLineID) strtoul(ptr, &endptr, 0); + if (endptr == ptr) + ereport(FATAL, + (errmsg("syntax error in history file: %s", fline), + errhint("Expected a numeric timeline ID."))); + + if (result && + tli <= (TimeLineID) linitial_int(result)) + ereport(FATAL, + (errmsg("invalid data in history file: %s", fline), + errhint("Timeline IDs must be in increasing sequence."))); + + /* Build list with newest item first */ + result = lcons_int((int) tli, result); + + /* we ignore the remainder of each line */ } - lastReadSUI = hdr->xlp_sui; - return true; + + FreeFile(fd); + + if (result && + targetTLI <= (TimeLineID) linitial_int(result)) + ereport(FATAL, + (errmsg("invalid data in history file \"%s\"", path), + errhint("Timeline IDs must be less than child timeline's ID."))); + + result = lcons_int((int) targetTLI, result); + + ereport(DEBUG3, + (errmsg_internal("history of timeline %u is %s", + targetTLI, nodeToString(result)))); + + return result; +} + +/* + * Probe whether a timeline history file exists for the given timeline ID + */ +static bool +existsTimeLineHistory(TimeLineID probeTLI) +{ + char path[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + FILE *fd; + + if (InArchiveRecovery) + { + TLHistoryFileName(histfname, probeTLI); + RestoreArchivedFile(path, histfname, "RECOVERYHISTORY"); + } + else + TLHistoryFilePath(path, probeTLI); + + fd = AllocateFile(path, "r"); + if (fd != NULL) + { + FreeFile(fd); + return true; + } + else + { + if (errno != ENOENT) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not open \"%s\": %m", path))); + return false; + } +} + +/* + * Find the newest existing timeline, assuming that startTLI exists. + * + * Note: while this is somewhat heuristic, it does positively guarantee + * that (result + 1) is not a known timeline, and therefore it should + * be safe to assign that ID to a new timeline. + */ +static TimeLineID +findNewestTimeLine(TimeLineID startTLI) +{ + TimeLineID newestTLI; + TimeLineID probeTLI; + + /* + * The algorithm is just to probe for the existence of timeline history + * files. XXX is it useful to allow gaps in the sequence? + */ + newestTLI = startTLI; + + for (probeTLI = startTLI + 1; ; probeTLI++) + { + if (existsTimeLineHistory(probeTLI)) + { + newestTLI = probeTLI; /* probeTLI exists */ + } + else + { + /* doesn't exist, assume we're done */ + break; + } + } + + return newestTLI; +} + +/* + * Create a new timeline history file. + * + * newTLI: ID of the new timeline + * parentTLI: ID of its immediate parent + * endTLI et al: ID of the last used WAL file, for annotation purposes + * + * Currently this is only used during recovery, and so there are no locking + * considerations. But we should be just as tense as XLogFileInit to avoid + * emplacing a bogus file. + */ +static void +writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, + TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + char xlogfname[MAXFNAMELEN]; + char buffer[BLCKSZ]; + int srcfd; + int fd; + int nbytes; + + Assert(newTLI > parentTLI); /* else bad selection of newTLI */ + + /* + * Write into a temp file name. + */ + snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d", + XLogDir, (int) getpid()); + + unlink(tmppath); + + /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */ + fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", tmppath))); + + /* + * If a history file exists for the parent, copy it verbatim + */ + if (InArchiveRecovery) + { + TLHistoryFileName(histfname, parentTLI); + RestoreArchivedFile(path, histfname, "RECOVERYHISTORY"); + } + else + TLHistoryFilePath(path, parentTLI); + + srcfd = BasicOpenFile(path, O_RDONLY, 0); + if (srcfd < 0) + { + if (errno != ENOENT) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not open \"%s\": %m", path))); + /* Not there, so assume parent has no parents */ + } + else + { + for (;;) + { + errno = 0; + nbytes = (int) read(srcfd, buffer, sizeof(buffer)); + if (nbytes < 0 || errno != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + if (nbytes == 0) + break; + errno = 0; + if ((int) write(fd, buffer, nbytes) != nbytes) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk + * space + */ + unlink(tmppath); + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", tmppath))); + } + } + close(srcfd); + } + + /* + * Append one line with the details of this timeline split. + * + * If we did have a parent file, insert an extra newline just in case + * the parent file failed to end with one. + */ + XLogFileName(xlogfname, endTLI, endLogId, endLogSeg); + + snprintf(buffer, sizeof(buffer), + "%s%u\t%s\t%s transaction %u at %s\n", + (srcfd < 0) ? "" : "\n", + parentTLI, + xlogfname, + recoveryStopAfter ? "after" : "before", + recoveryStopXid, + str_time(recoveryStopTime)); + + nbytes = strlen(buffer); + errno = 0; + if ((int) write(fd, buffer, nbytes) != nbytes) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk + * space + */ + unlink(tmppath); + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", tmppath))); + } + + if (pg_fsync(fd) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", tmppath))); + + if (close(fd)) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", tmppath))); + + + /* + * Now move the completed history file into place with its final name. + */ + TLHistoryFilePath(path, newTLI); + + /* + * Prefer link() to rename() here just to be really sure that we don't + * overwrite an existing logfile. However, there shouldn't be one, so + * rename() is an acceptable substitute except for the truly paranoid. + */ +#if HAVE_WORKING_LINK + if (link(tmppath, path) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not link file \"%s\" to \"%s\": %m", + tmppath, path))); + unlink(tmppath); +#else + if (rename(tmppath, path) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + tmppath, path))); +#endif + + /* The history file can be archived immediately. */ + TLHistoryFileName(histfname, newTLI); + XLogArchiveNotify(histfname); } /* @@ -2956,8 +3363,8 @@ BootStrapXLOG(void) CheckPoint checkPoint; char *buffer; XLogPageHeader page; + XLogLongPageHeader longpage; XLogRecord *record; - XLogFileHeaderData *fhdr; bool use_existent; uint64 sysidentifier; struct timeval tv; @@ -2979,6 +3386,9 @@ BootStrapXLOG(void) sysidentifier = ((uint64) tv.tv_sec) << 32; sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec); + /* First timeline ID is always 1 */ + ThisTimeLineID = 1; + /* Use malloc() to ensure buffer is MAXALIGNED */ buffer = (char *) malloc(BLCKSZ); page = (XLogPageHeader) buffer; @@ -2986,9 +3396,9 @@ BootStrapXLOG(void) /* Set up information for the initial checkpoint record */ checkPoint.redo.xlogid = 0; - checkPoint.redo.xrecoff = SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD; + checkPoint.redo.xrecoff = SizeOfXLogLongPHD; checkPoint.undo = checkPoint.redo; - checkPoint.ThisStartUpID = 0; + checkPoint.ThisTimeLineID = ThisTimeLineID; checkPoint.nextXid = FirstNormalTransactionId; checkPoint.nextOid = BootstrapObjectIdData; checkPoint.time = time(NULL); @@ -2999,38 +3409,18 @@ BootStrapXLOG(void) /* Set up the XLOG page header */ page->xlp_magic = XLOG_PAGE_MAGIC; - page->xlp_info = 0; - page->xlp_sui = checkPoint.ThisStartUpID; + page->xlp_info = XLP_LONG_HEADER; + page->xlp_tli = ThisTimeLineID; page->xlp_pageaddr.xlogid = 0; page->xlp_pageaddr.xrecoff = 0; - - /* Insert the file header record */ - record = (XLogRecord *) ((char *) page + SizeOfXLogPHD); - record->xl_prev.xlogid = 0; - record->xl_prev.xrecoff = 0; - record->xl_xact_prev.xlogid = 0; - record->xl_xact_prev.xrecoff = 0; - record->xl_xid = InvalidTransactionId; - record->xl_len = SizeOfXLogFHD; - record->xl_info = XLOG_FILE_HEADER; - record->xl_rmid = RM_XLOG_ID; - fhdr = (XLogFileHeaderData *) XLogRecGetData(record); - fhdr->xlfhd_sysid = sysidentifier; - fhdr->xlfhd_xlogid = 0; - fhdr->xlfhd_segno = 0; - fhdr->xlfhd_seg_size = XLogSegSize; - - INIT_CRC64(crc); - COMP_CRC64(crc, fhdr, SizeOfXLogFHD); - COMP_CRC64(crc, (char *) record + sizeof(crc64), - SizeOfXLogRecord - sizeof(crc64)); - FIN_CRC64(crc); - record->xl_crc = crc; + longpage = (XLogLongPageHeader) page; + longpage->xlp_sysid = sysidentifier; + longpage->xlp_seg_size = XLogSegSize; /* Insert the initial checkpoint record */ - record = (XLogRecord *) ((char *) page + SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD); + record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD); record->xl_prev.xlogid = 0; - record->xl_prev.xrecoff = SizeOfXLogPHD; + record->xl_prev.xrecoff = 0; record->xl_xact_prev.xlogid = 0; record->xl_xact_prev.xrecoff = 0; record->xl_xid = InvalidTransactionId; @@ -3050,7 +3440,7 @@ BootStrapXLOG(void) use_existent = false; openLogFile = XLogFileInit(0, 0, &use_existent, false); - /* Write the first page with the initial records */ + /* Write the first page with the initial record */ errno = 0; if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ) { @@ -3120,6 +3510,8 @@ readRecoveryCommandFile(void) char recoveryCommandFile[MAXPGPATH]; FILE *fd; char cmdline[MAXPGPATH]; + TimeLineID rtli = 0; + bool rtliGiven = false; bool syntaxError = false; snprintf(recoveryCommandFile, MAXPGPATH, "%s/recovery.conf", DataDir); @@ -3177,11 +3569,31 @@ readRecoveryCommandFile(void) } if (strcmp(tok1,"restore_command") == 0) { - StrNCpy(recoveryRestoreCommand, tok2, MAXPGPATH); + recoveryRestoreCommand = pstrdup(tok2); ereport(LOG, (errmsg("restore_command = \"%s\"", recoveryRestoreCommand))); } + else if (strcmp(tok1,"recovery_target_timeline") == 0) { + rtliGiven = true; + if (strcmp(tok2, "latest") == 0) + rtli = 0; + else + { + errno = 0; + rtli = (TimeLineID) strtoul(tok2, NULL, 0); + if (errno == EINVAL || errno == ERANGE) + ereport(FATAL, + (errmsg("recovery_target_timeline is not a valid number: \"%s\"", + tok2))); + } + if (rtli) + ereport(LOG, + (errmsg("recovery_target_timeline = %u", rtli))); + else + ereport(LOG, + (errmsg("recovery_target_timeline = latest"))); + } else if (strcmp(tok1,"recovery_target_xid") == 0) { errno = 0; recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0); @@ -3246,22 +3658,44 @@ readRecoveryCommandFile(void) errhint("Lines should have the format parameter = 'value'."))); /* Check that required parameters were supplied */ - if (recoveryRestoreCommand[0] == '\0') + if (recoveryRestoreCommand == NULL) ereport(FATAL, (errmsg("recovery command file \"%s\" did not specify restore_command", recoveryCommandFile))); + /* Enable fetching from archive recovery area */ + InArchiveRecovery = true; + /* - * clearly indicate our state + * If user specified recovery_target_timeline, validate it or compute the + * "latest" value. We can't do this until after we've gotten the restore + * command and set InArchiveRecovery, because we need to fetch timeline + * history files from the archive. */ - InArchiveRecovery = true; + if (rtliGiven) + { + if (rtli) + { + /* Timeline 1 does not have a history file, all else should */ + if (rtli != 1 && !existsTimeLineHistory(rtli)) + ereport(FATAL, + (errmsg("recovery_target_timeline %u does not exist", + rtli))); + recoveryTargetTLI = rtli; + } + else + { + /* We start the "latest" search from pg_control's timeline */ + recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI); + } + } } /* * Exit archive-recovery state */ static void -exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, uint32 xrecoff) +exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) { char recoveryPath[MAXPGPATH]; char xlogpath[MAXPGPATH]; @@ -3269,7 +3703,7 @@ exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, uint32 xrecoff) char recoveryCommandDone[MAXPGPATH]; /* - * Disable fetches from archive, so we can use XLogFileOpen below. + * We are no longer in archive recovery state. */ InArchiveRecovery = false; @@ -3294,10 +3728,12 @@ exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, uint32 xrecoff) * more descriptive of what our current database state is, because that * is what we replayed from. * - * XXX there ought to be a timeline increment somewhere around here. + * Note that if we are establishing a new timeline, ThisTimeLineID is + * already set to the new value, and so we will create a new file instead + * of overwriting any existing file. */ snprintf(recoveryPath, MAXPGPATH, "%s/RECOVERYXLOG", XLogDir); - XLogFilePath(xlogpath, endLogId, endLogSeg); + XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg); if (restoredFromArchive) { @@ -3319,61 +3755,26 @@ exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, uint32 xrecoff) * RECOVERYXLOG laying about, get rid of it. */ unlink(recoveryPath); /* ignore any error */ + /* + * If we are establishing a new timeline, we have to copy data + * from the last WAL segment of the old timeline to create a + * starting WAL segment for the new timeline. + */ + if (endTLI != ThisTimeLineID) + XLogFileCopy(endLogId, endLogSeg, + endTLI, endLogId, endLogSeg); } /* - * If we restored to a point-in-time, then the current WAL segment - * probably contains records beyond the stop point. These represent an - * extreme hazard: if we crash in the near future, the replay apparatus - * will know no reason why it shouldn't replay them. Therefore, - * explicitly zero out all the remaining pages of the segment. (We need - * not worry about the partial page in which the last record ends, since - * StartUpXlog will handle zeroing that. Also, there's nothing to do - * if we are right at a segment boundary.) - * - * XXX segment files beyond thhe current one also represent a hazard - * for the same reason. Need to invent timelines to fix this. + * Let's just make real sure there are not .ready or .done flags posted + * for the new segment. */ + XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg); + XLogArchiveCleanup(xlogpath); - /* align xrecoff to next page, then drop segment part */ - if (xrecoff % BLCKSZ != 0) - xrecoff += (BLCKSZ - xrecoff % BLCKSZ); - xrecoff %= XLogSegSize; - - if (recoveryTarget && xrecoff != 0) - { - int fd; - char zbuffer[BLCKSZ]; - - fd = XLogFileOpen(endLogId, endLogSeg, false); - MemSet(zbuffer, 0, sizeof(zbuffer)); - if (lseek(fd, (off_t) xrecoff, SEEK_SET) < 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not seek in file \"%s\": %m", - xlogpath))); - for (; xrecoff < XLogSegSize; xrecoff += sizeof(zbuffer)) - { - errno = 0; - if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer)) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to file \"%s\": %m", xlogpath))); - } - } - if (pg_fsync(fd) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", xlogpath))); - if (close(fd)) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", xlogpath))); - } + /* Get rid of any remaining recovered timeline-history file, too */ + snprintf(recoveryPath, MAXPGPATH, "%s/RECOVERYHISTORY", XLogDir); + unlink(recoveryPath); /* ignore any error */ /* * Rename the config file out of the way, so that we don't accidentally @@ -3398,6 +3799,8 @@ exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, uint32 xrecoff) * * Returns TRUE if we are stopping, FALSE otherwise. On TRUE return, * *includeThis is set TRUE if we should apply this record before stopping. + * Also, some information is saved in recoveryStopXid et al for use in + * annotating the new timeline's history file. */ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis) @@ -3466,27 +3869,31 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) if (stopsHere) { + recoveryStopXid = record->xl_xid; + recoveryStopTime = recordXtime; + recoveryStopAfter = *includeThis; + if (record_info == XLOG_XACT_COMMIT) { - if (*includeThis) + if (recoveryStopAfter) ereport(LOG, (errmsg("recovery stopping after commit of transaction %u, time %s", - record->xl_xid, str_time(recordXtime)))); + recoveryStopXid, str_time(recoveryStopTime)))); else ereport(LOG, (errmsg("recovery stopping before commit of transaction %u, time %s", - record->xl_xid, str_time(recordXtime)))); + recoveryStopXid, str_time(recoveryStopTime)))); } else { - if (*includeThis) + if (recoveryStopAfter) ereport(LOG, (errmsg("recovery stopping after abort of transaction %u, time %s", - record->xl_xid, str_time(recordXtime)))); + recoveryStopXid, str_time(recoveryStopTime)))); else ereport(LOG, (errmsg("recovery stopping before abort of transaction %u, time %s", - record->xl_xid, str_time(recordXtime)))); + recoveryStopXid, str_time(recoveryStopTime)))); } } @@ -3502,6 +3909,7 @@ StartupXLOG(void) XLogCtlInsert *Insert; CheckPoint checkPoint; bool wasShutdown; + bool needNewTimeLine = false; XLogRecPtr RecPtr, LastRec, checkPointLoc, @@ -3558,11 +3966,20 @@ StartupXLOG(void) #endif /* + * Initialize on the assumption we want to recover to the same timeline + * that's active according to pg_control. + */ + recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID; + + /* * Check for recovery control file, and if so set up state for * offline recovery */ readRecoveryCommandFile(); + /* Now we can determine the list of expected TLIs */ + expectedTLIs = readTimeLineHistory(recoveryTargetTLI); + /* * Get the last valid checkpoint record. If the latest one according * to pg_control is broken, try the next-to-last one. @@ -3611,17 +4028,11 @@ StartupXLOG(void) ShmemVariableCache->oidCount = 0; /* - * If it was a shutdown checkpoint, then any following WAL entries - * were created under the next StartUpID; if it was a regular - * checkpoint then any following WAL entries were created under the - * same StartUpID. We must replay WAL entries using the same StartUpID - * they were created under, so temporarily adopt that SUI (see also - * xlog_redo()). + * We must replay WAL entries using the same TimeLineID they were created + * under, so temporarily adopt the TLI indicated by the checkpoint (see + * also xlog_redo()). */ - if (wasShutdown) - ThisStartUpID = checkPoint.ThisStartUpID + 1; - else - ThisStartUpID = checkPoint.ThisStartUpID; + ThisTimeLineID = checkPoint.ThisTimeLineID; RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; @@ -3663,12 +4074,18 @@ StartupXLOG(void) RmgrTable[rmid].rm_startup(); } - /* Is REDO required ? */ + /* + * Find the first record that logically follows the checkpoint --- + * it might physically precede it, though. + */ if (XLByteLT(checkPoint.redo, RecPtr)) + { + /* back up to find the record */ record = ReadRecord(&(checkPoint.redo), PANIC, buffer); + } else { - /* read past CheckPoint record */ + /* just have to read next record after CheckPoint */ record = ReadRecord(NULL, LOG, buffer); } @@ -3708,6 +4125,7 @@ StartupXLOG(void) */ if (recoveryStopsHere(record, &recoveryApply)) { + needNewTimeLine = true; /* see below */ recoveryContinue = false; if (!recoveryApply) break; @@ -3753,13 +4171,33 @@ StartupXLOG(void) XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg); /* + * Consider whether we need to assign a new timeline ID. + * + * If we stopped short of the end of WAL during recovery, then we + * are generating a new timeline and must assign it a unique new ID. + * Otherwise, we can just extend the timeline we were in when we + * ran out of WAL. + */ + if (needNewTimeLine) + { + ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1; + ereport(LOG, + (errmsg("selected new timeline ID: %u", ThisTimeLineID))); + writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI, + curFileTLI, endLogId, endLogSeg); + } + + /* Save the selected TimeLineID in shared memory, too */ + XLogCtl->ThisTimeLineID = ThisTimeLineID; + + /* * We are now done reading the old WAL. Turn off archive fetching * if it was active, and make a writable copy of the last WAL segment. * (Note that we also have a copy of the last block of the old WAL in * readBuf; we will use that below.) */ if (InArchiveRecovery) - exitArchiveRecovery(endLogId, endLogSeg, EndOfLog.xrecoff); + exitArchiveRecovery(curFileTLI, endLogId, endLogSeg); /* * Prepare to write WAL starting at EndOfLog position, and init xlog @@ -3768,7 +4206,7 @@ StartupXLOG(void) */ openLogId = endLogId; openLogSeg = endLogSeg; - openLogFile = XLogFileOpen(openLogId, openLogSeg, false); + openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; ControlFile->logId = openLogId; ControlFile->logSeg = openLogSeg + 1; @@ -3812,9 +4250,8 @@ StartupXLOG(void) * XLogWrite()). * * Note: it might seem we should do AdvanceXLInsertBuffer() here, but - * we can't since we haven't yet determined the correct StartUpID - * to put into the new page's header. The first actual attempt to - * insert a log record will advance the insert state. + * this is sufficient. The first actual attempt to insert a log + * record will advance the insert state. */ XLogCtl->Write.curridx = NextBufIdx(0); } @@ -3861,21 +4298,14 @@ StartupXLOG(void) } /* - * At this point, ThisStartUpID is the largest SUI that we could - * find evidence for in the WAL entries. But check it against - * pg_control's latest checkpoint, to make sure that we can't - * accidentally re-use an already-used SUI. - */ - if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID) - ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID; - - /* * Perform a new checkpoint to update our recovery activity to * disk. * - * Note that we write a shutdown checkpoint. This is correct since - * the records following it will use SUI one more than what is - * shown in the checkpoint's ThisStartUpID. + * Note that we write a shutdown checkpoint rather than an on-line + * one. This is not particularly critical, but since we may be + * assigning a new TLI, using a shutdown checkpoint allows us to + * have the rule that TLI only changes in shutdown checkpoints, + * which allows some extra error checking in xlog_redo. * * In case we had to use the secondary checkpoint, make sure that it * will still be shown as the secondary checkpoint after this @@ -3890,18 +4320,6 @@ StartupXLOG(void) */ XLogCloseRelationCache(); } - else - { - /* - * If we are not doing recovery, then we saw a checkpoint with - * nothing after it, and we can safely use StartUpID equal to one - * more than the checkpoint's SUI. But just for paranoia's sake, - * check against pg_control too. - */ - ThisStartUpID = checkPoint.ThisStartUpID; - if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID) - ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID; - } /* * Preallocate additional log files, if wanted. @@ -3909,13 +4327,6 @@ StartupXLOG(void) PreallocXlogFiles(EndOfLog); /* - * Advance StartUpID to one more than the highest value used - * previously. - */ - ThisStartUpID++; - XLogCtl->ThisStartUpID = ThisStartUpID; - - /* * Okay, we're officially UP. */ InRecovery = false; @@ -4018,18 +4429,18 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, /* * This must be called during startup of a backend process, except that * it need not be called in a standalone backend (which does StartupXLOG - * instead). We need to initialize the local copies of ThisStartUpID and + * instead). We need to initialize the local copies of ThisTimeLineID and * RedoRecPtr. * * Note: before Postgres 7.5, we went to some effort to keep the postmaster - * process's copies of ThisStartUpID and RedoRecPtr valid too. This was + * process's copies of ThisTimeLineID and RedoRecPtr valid too. This was * unnecessary however, since the postmaster itself never touches XLOG anyway. */ void InitXLOGAccess(void) { - /* ThisStartUpID doesn't change so we need no lock to copy it */ - ThisStartUpID = XLogCtl->ThisStartUpID; + /* ThisTimeLineID doesn't change so we need no lock to copy it */ + ThisTimeLineID = XLogCtl->ThisTimeLineID; /* Use GetRedoRecPtr to copy the RedoRecPtr safely */ (void) GetRedoRecPtr(); } @@ -4110,7 +4521,7 @@ CreateCheckPoint(bool shutdown, bool force) } MemSet(&checkPoint, 0, sizeof(checkPoint)); - checkPoint.ThisStartUpID = ThisStartUpID; + checkPoint.ThisTimeLineID = ThisTimeLineID; checkPoint.time = time(NULL); LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); @@ -4372,8 +4783,20 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; - /* Any later WAL records should be run with shutdown SUI plus 1 */ - ThisStartUpID = checkPoint.ThisStartUpID + 1; + /* + * TLI may change in a shutdown checkpoint, but it shouldn't decrease + */ + if (checkPoint.ThisTimeLineID != ThisTimeLineID) + { + if (checkPoint.ThisTimeLineID < ThisTimeLineID || + !list_member_int(expectedTLIs, + (int) checkPoint.ThisTimeLineID)) + ereport(PANIC, + (errmsg("unexpected timeline ID %u (after %u) in checkpoint record", + checkPoint.ThisTimeLineID, ThisTimeLineID))); + /* Following WAL records should be run with new TLI */ + ThisTimeLineID = checkPoint.ThisTimeLineID; + } } else if (info == XLOG_CHECKPOINT_ONLINE) { @@ -4389,40 +4812,11 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; } - /* Any later WAL records should be run with the then-active SUI */ - ThisStartUpID = checkPoint.ThisStartUpID; - } - else if (info == XLOG_FILE_HEADER) - { - XLogFileHeaderData fhdr; - - memcpy(&fhdr, XLogRecGetData(record), sizeof(XLogFileHeaderData)); - if (fhdr.xlfhd_sysid != ControlFile->system_identifier) - { - char fhdrident_str[32]; - char sysident_str[32]; - - /* - * Format sysids separately to keep platform-dependent format - * code out of the translatable message string. - */ - snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, - fhdr.xlfhd_sysid); - snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, - ControlFile->system_identifier); - ereport(PANIC, - (errmsg("WAL file is from different system"), - errdetail("WAL file SYSID is %s, pg_control SYSID is %s", - fhdrident_str, sysident_str))); - } - if (fhdr.xlfhd_seg_size != XLogSegSize) + /* TLI should not change in an on-line checkpoint */ + if (checkPoint.ThisTimeLineID != ThisTimeLineID) ereport(PANIC, - (errmsg("WAL file is from different system"), - errdetail("Incorrect XLOG_SEG_SIZE in file header."))); - } - else if (info == XLOG_WASTED_SPACE) - { - /* ignore */ + (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record", + checkPoint.ThisTimeLineID, ThisTimeLineID))); } } @@ -4442,10 +4836,10 @@ xlog_desc(char *buf, uint8 xl_info, char *rec) CheckPoint *checkpoint = (CheckPoint *) rec; sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; " - "sui %u; xid %u; oid %u; %s", + "tli %u; xid %u; oid %u; %s", checkpoint->redo.xlogid, checkpoint->redo.xrecoff, checkpoint->undo.xlogid, checkpoint->undo.xrecoff, - checkpoint->ThisStartUpID, checkpoint->nextXid, + checkpoint->ThisTimeLineID, checkpoint->nextXid, checkpoint->nextOid, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } @@ -4456,22 +4850,6 @@ xlog_desc(char *buf, uint8 xl_info, char *rec) memcpy(&nextOid, rec, sizeof(Oid)); sprintf(buf + strlen(buf), "nextOid: %u", nextOid); } - else if (info == XLOG_FILE_HEADER) - { - XLogFileHeaderData *fhdr = (XLogFileHeaderData *) rec; - - sprintf(buf + strlen(buf), - "file header: sysid " UINT64_FORMAT "; " - "xlogid %X segno %X; seg_size %X", - fhdr->xlfhd_sysid, - fhdr->xlfhd_xlogid, - fhdr->xlfhd_segno, - fhdr->xlfhd_seg_size); - } - else if (info == XLOG_WASTED_SPACE) - { - strcat(buf, "wasted space"); - } else strcat(buf, "UNKNOWN"); } |