aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/twophase.c14
-rw-r--r--src/backend/access/transam/xlog.c78
-rw-r--r--src/backend/access/transam/xlogarchive.c6
-rw-r--r--src/backend/access/transam/xlogfuncs.c8
-rw-r--r--src/backend/access/transam/xlogutils.c43
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
-rw-r--r--src/backend/replication/logical/worker.c2
-rw-r--r--src/backend/replication/slotfuncs.c2
-rw-r--r--src/backend/replication/walreceiver.c54
-rw-r--r--src/backend/replication/walsender.c77
-rw-r--r--src/include/access/xlog.h9
-rw-r--r--src/include/access/xlogarchive.h2
-rw-r--r--src/include/access/xlogutils.h4
13 files changed, 169 insertions, 132 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index f6e7fa71d88..ef4b5f639ce 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1373,11 +1373,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
* twophase files and ReadTwoPhaseFile should be used instead.
*
* Note clearly that this function can access WAL during normal operation,
- * similarly to the way WALSender or Logical Decoding would do. While
- * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
- * particularly if this routine is called for the end-of-recovery checkpoint
- * in the checkpointer itself, so save the current timeline number value
- * and restore it once done.
+ * similarly to the way WALSender or Logical Decoding would do.
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@@ -1385,7 +1381,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
- TimeLineID save_currtli = ThisTimeLineID;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.page_read = &read_local_xlog_page,
@@ -1401,13 +1396,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogBeginRead(xlogreader, lsn);
record = XLogReadRecord(xlogreader, &errormsg);
- /*
- * Restore immediately the timeline where it was previously, as
- * read_local_xlog_page() could have changed it if the record was read
- * while recovery was finishing or if the timeline has jumped in-between.
- */
- ThisTimeLineID = save_currtli;
-
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0a0771a18eb..9b15735921b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -192,7 +192,7 @@ CheckpointStatsData CheckpointStats;
* ThisTimeLineID will be same in all backends --- it identifies current
* WAL timeline for the database system.
*/
-TimeLineID ThisTimeLineID = 0;
+static TimeLineID ThisTimeLineID = 0;
static XLogRecPtr LastRec;
@@ -917,7 +917,8 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
- bool find_free, XLogSegNo max_segno);
+ bool find_free, XLogSegNo max_segno,
+ TimeLineID tli);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
@@ -2518,7 +2519,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
wal_segment_size);
/* create/use new log file */
- openLogFile = XLogFileInit(openLogSegNo);
+ openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID);
ReserveExternalFD();
}
@@ -2633,7 +2634,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
*/
if (finishing_seg)
{
- issue_xlog_fsync(openLogFile, openLogSegNo);
+ issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
/* signal that we need to wakeup walsenders later */
WalSndWakeupRequest();
@@ -2641,7 +2642,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
- XLogArchiveNotifySeg(openLogSegNo);
+ XLogArchiveNotifySeg(openLogSegNo, ThisTimeLineID);
XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2704,7 +2705,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
ReserveExternalFD();
}
- issue_xlog_fsync(openLogFile, openLogSegNo);
+ issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
}
/* signal that we need to wakeup walsenders later */
@@ -3296,7 +3297,8 @@ XLogNeedsFlush(XLogRecPtr record)
* succeed. (This is weird, but it's efficient for the callers.)
*/
static int
-XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
+XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli,
+ bool *added, char *path)
{
char tmppath[MAXPGPATH];
PGAlignedXLogBlock zbuffer;
@@ -3305,7 +3307,9 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
int fd;
int save_errno;
- XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
+ Assert(logtli != 0);
+
+ XLogFilePath(path, logtli, logsegno, wal_segment_size);
/*
* Try to use existent file (checkpoint maker may have created it already)
@@ -3449,7 +3453,8 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
* CheckPointSegments.
*/
max_segno = logsegno + CheckPointSegments;
- if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno))
+ if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno,
+ logtli))
{
*added = true;
elog(DEBUG2, "done creating and filling new WAL file");
@@ -3481,13 +3486,15 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
* in a critical section.
*/
int
-XLogFileInit(XLogSegNo logsegno)
+XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
{
bool ignore_added;
char path[MAXPGPATH];
int fd;
- fd = XLogFileInitInternal(logsegno, &ignore_added, path);
+ Assert(logtli != 0);
+
+ fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path);
if (fd >= 0)
return fd;
@@ -3629,7 +3636,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
/*
* Now move the segment into place with its final name.
*/
- if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0))
+ if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID))
elog(ERROR, "InstallXLogFileSegment should not have failed");
}
@@ -3653,18 +3660,22 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
* free slot is found between *segno and max_segno. (Ignored when find_free
* is false.)
*
+ * tli: The timeline on which the new segment should be installed.
+ *
* Returns true if the file was installed successfully. false indicates that
* max_segno limit was exceeded, the startup process has disabled this
* function for now, or an error occurred while renaming the file into place.
*/
static bool
InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
- bool find_free, XLogSegNo max_segno)
+ bool find_free, XLogSegNo max_segno, TimeLineID tli)
{
char path[MAXPGPATH];
struct stat stat_buf;
- XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+ Assert(tli != 0);
+
+ XLogFilePath(path, tli, *segno, wal_segment_size);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (!XLogCtl->InstallXLogFileSegmentActive)
@@ -3690,7 +3701,7 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
return false;
}
(*segno)++;
- XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+ XLogFilePath(path, tli, *segno, wal_segment_size);
}
}
@@ -3987,7 +3998,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
if (offset >= (uint32) (0.75 * wal_segment_size))
{
_logSegNo++;
- lf = XLogFileInitInternal(_logSegNo, &added, path);
+ lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path);
if (lf >= 0)
close(lf);
if (added)
@@ -4266,7 +4277,7 @@ RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
XLogCtl->InstallXLogFileSegmentActive && /* callee rechecks this */
lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
InstallXLogFileSegment(endlogSegNo, path,
- true, recycleSegNo))
+ true, recycleSegNo, ThisTimeLineID))
{
ereport(DEBUG2,
(errmsg_internal("recycled write-ahead log file \"%s\"",
@@ -5401,7 +5412,7 @@ BootStrapXLOG(void)
record->xl_crc = crc;
/* Create first XLOG segment file */
- openLogFile = XLogFileInit(1);
+ openLogFile = XLogFileInit(1, ThisTimeLineID);
/*
* We needn't bother with Reserve/ReleaseExternalFD here, since we'll
@@ -5709,7 +5720,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
*/
int fd;
- fd = XLogFileInit(startLogSegNo);
+ fd = XLogFileInit(startLogSegNo, ThisTimeLineID);
if (close(fd) != 0)
{
@@ -8706,16 +8717,36 @@ GetInsertRecPtr(void)
* position known to be fsync'd to disk.
*/
XLogRecPtr
-GetFlushRecPtr(void)
+GetFlushRecPtr(TimeLineID *insertTLI)
{
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
+ /*
+ * If we're writing and flushing WAL, the time line can't be changing,
+ * so no lock is required.
+ */
+ if (insertTLI)
+ *insertTLI = XLogCtl->ThisTimeLineID;
+
return LogwrtResult.Flush;
}
/*
+ * GetWALInsertionTimeLine -- Returns the current timeline of a system that
+ * is not in recovery.
+ */
+TimeLineID
+GetWALInsertionTimeLine(void)
+{
+ Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
+
+ /* Since the value can't be changing, no lock is required. */
+ return XLogCtl->ThisTimeLineID;
+}
+
+/*
* GetLastImportantRecPtr -- Returns the LSN of the last important record
* inserted. All records not explicitly marked as unimportant are considered
* important.
@@ -10849,11 +10880,13 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
* 'segno' is for error reporting purposes.
*/
void
-issue_xlog_fsync(int fd, XLogSegNo segno)
+issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
{
char *msg = NULL;
instr_time start;
+ Assert(tli != 0);
+
/*
* Quick exit if fsync is disabled or write() has already synced the WAL
* file.
@@ -10902,8 +10935,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
char xlogfname[MAXFNAMELEN];
int save_errno = errno;
- XLogFileName(xlogfname, ThisTimeLineID, segno,
- wal_segment_size);
+ XLogFileName(xlogfname, tli, segno, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index 26b023e754b..7d56dad0def 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -498,11 +498,13 @@ XLogArchiveNotify(const char *xlog)
* Convenience routine to notify using segment number representation of filename
*/
void
-XLogArchiveNotifySeg(XLogSegNo segno)
+XLogArchiveNotifySeg(XLogSegNo segno, TimeLineID tli)
{
char xlog[MAXFNAMELEN];
- XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size);
+ Assert(tli != 0);
+
+ XLogFileName(xlog, tli, segno, wal_segment_size);
XLogArchiveNotify(xlog);
}
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index b98deb72ec6..dd9a45c1860 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -382,7 +382,7 @@ pg_current_wal_flush_lsn(PG_FUNCTION_ARGS)
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
- current_recptr = GetFlushRecPtr();
+ current_recptr = GetFlushRecPtr(NULL);
PG_RETURN_LSN(current_recptr);
}
@@ -469,7 +469,8 @@ pg_walfile_name_offset(PG_FUNCTION_ARGS)
* xlogfilename
*/
XLByteToPrevSeg(locationpoint, xlogsegno, wal_segment_size);
- XLogFileName(xlogfilename, ThisTimeLineID, xlogsegno, wal_segment_size);
+ XLogFileName(xlogfilename, GetWALInsertionTimeLine(), xlogsegno,
+ wal_segment_size);
values[0] = CStringGetTextDatum(xlogfilename);
isnull[0] = false;
@@ -511,7 +512,8 @@ pg_walfile_name(PG_FUNCTION_ARGS)
"pg_walfile_name()")));
XLByteToPrevSeg(locationpoint, xlogsegno, wal_segment_size);
- XLogFileName(xlogfilename, ThisTimeLineID, xlogsegno, wal_segment_size);
+ XLogFileName(xlogfilename, GetWALInsertionTimeLine(), xlogsegno,
+ wal_segment_size);
PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 88a1bfd9394..b33e0531ed1 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -678,6 +678,10 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
* wantLength to the amount of the page that will be read, up to
* XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
*
+ * The currTLI argument should be the system-wide current timeline.
+ * Note that this may be different from state->currTLI, which is the timeline
+ * from which the caller is currently reading previous xlog records.
+ *
* We switch to an xlog segment from the new timeline eagerly when on a
* historical timeline, as soon as we reach the start of the xlog segment
* containing the timeline switch. The server copied the segment to the new
@@ -699,12 +703,11 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
*
* The caller must also make sure it doesn't read past the current replay
* position (using GetXLogReplayRecPtr) if executing in recovery, so it
- * doesn't fail to notice that the current timeline became historical. The
- * caller must also update ThisTimeLineID with the result of
- * GetXLogReplayRecPtr and must check RecoveryInProgress().
+ * doesn't fail to notice that the current timeline became historical.
*/
void
-XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage,
+ uint32 wantLength, TimeLineID currTLI)
{
const XLogRecPtr lastReadPage = (state->seg.ws_segno *
state->segcxt.ws_segsize + state->segoff);
@@ -712,6 +715,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+ Assert(currTLI != 0);
/*
* If the desired page is currently read in and valid, we have nothing to
@@ -732,12 +736,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* just carry on. (Seeking backwards requires a check to make sure the
* older page isn't on a prior timeline).
*
- * ThisTimeLineID might've become historical since we last looked, but the
- * caller is required not to read past the flush limit it saw at the time
- * it looked up the timeline. There's nothing we can do about it if
- * StartupXLOG() renames it to .partial concurrently.
+ * currTLI might've become historical since the caller obtained the value,
+ * but the caller is required not to read past the flush limit it saw at
+ * the time it looked up the timeline. There's nothing we can do about it
+ * if StartupXLOG() renames it to .partial concurrently.
*/
- if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+ if (state->currTLI == currTLI && wantPage >= lastReadPage)
{
Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
return;
@@ -749,7 +753,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* the current segment we can just keep reading.
*/
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
- state->currTLI != ThisTimeLineID &&
+ state->currTLI != currTLI &&
state->currTLI != 0 &&
((wantPage + wantLength) / state->segcxt.ws_segsize) <
(state->currTLIValidUntil / state->segcxt.ws_segsize))
@@ -772,7 +776,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* We need to re-read the timeline history in case it's been changed
* by a promotion or replay from a cascaded replica.
*/
- List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+ List *timelineHistory = readTimeLineHistory(currTLI);
XLogRecPtr endOfSegment;
endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
@@ -853,6 +857,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
TimeLineID tli;
int count;
WALReadError errinfo;
+ TimeLineID currTLI;
loc = targetPagePtr + reqLen;
@@ -862,16 +867,12 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
/*
* Determine the limit of xlog we can currently read to, and what the
* most recent timeline is.
- *
- * RecoveryInProgress() will update ThisTimeLineID when it first
- * notices recovery finishes, so we only have to maintain it for the
- * local process until recovery ends.
*/
if (!RecoveryInProgress())
- read_upto = GetFlushRecPtr();
+ read_upto = GetFlushRecPtr(&currTLI);
else
- read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
- tli = ThisTimeLineID;
+ read_upto = GetXLogReplayRecPtr(&currTLI);
+ tli = currTLI;
/*
* Check which timeline to get the record from.
@@ -890,16 +891,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* archive in the timeline will get renamed to .partial by
* StartupXLOG().
*
- * If that happens after our caller updated ThisTimeLineID but before
+ * If that happens after our caller determined the TLI but before
* we actually read the xlog page, we might still try to read from the
* old (now renamed) segment and fail. There's not much we can do
* about this, but it can only happen when we're a leaf of a cascading
* standby whose primary gets promoted while we're decoding, so a
* one-off ERROR isn't too bad.
*/
- XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli);
- if (state->currTLI == ThisTimeLineID)
+ if (state->currTLI == currTLI)
{
if (loc <= read_upto)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 5a7fae4a87d..2609a0a7104 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -211,7 +211,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* Compute the current end-of-wal.
*/
if (!RecoveryInProgress())
- end_of_wal = GetFlushRecPtr();
+ end_of_wal = GetFlushRecPtr(NULL);
else
end_of_wal = GetXLogReplayRecPtr(NULL);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4f..0bd5d0ee5e0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2435,7 +2435,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
bool *have_pending_txes)
{
dlist_mutable_iter iter;
- XLogRecPtr local_flush = GetFlushRecPtr();
+ XLogRecPtr local_flush = GetFlushRecPtr(NULL);
*write = InvalidXLogRecPtr;
*flush = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 877a006d503..a80298ba53d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -625,7 +625,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
* target position accordingly.
*/
if (!RecoveryInProgress())
- moveto = Min(moveto, GetFlushRecPtr());
+ moveto = Min(moveto, GetFlushRecPtr(NULL));
else
moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b90e5ca98ea..7a7eb3784e7 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -122,10 +122,12 @@ static StringInfoData incoming_message;
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvClose(XLogRecPtr recptr);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
+ TimeLineID tli);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
+ TimeLineID tli);
+static void XLogWalRcvFlush(bool dying, TimeLineID tli);
+static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -255,7 +257,7 @@ WalReceiverMain(void)
pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
/* Arrange to clean up at walreceiver exit */
- on_shmem_exit(WalRcvDie, 0);
+ on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -394,7 +396,6 @@ WalReceiverMain(void)
options.startpoint = startpoint;
options.slotname = slotname[0] != '\0' ? slotname : NULL;
options.proto.physical.startpointTLI = startpointTLI;
- ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
@@ -462,7 +463,8 @@ WalReceiverMain(void)
*/
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
- XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+ XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
+ startpointTLI);
}
else if (len == 0)
break;
@@ -487,7 +489,7 @@ WalReceiverMain(void)
* let the startup process and primary server know about
* them.
*/
- XLogWalRcvFlush(false);
+ XLogWalRcvFlush(false, startpointTLI);
}
/* Check if we need to exit the streaming loop. */
@@ -608,7 +610,7 @@ WalReceiverMain(void)
{
char xlogfname[MAXFNAMELEN];
- XLogWalRcvFlush(false);
+ XLogWalRcvFlush(false, startpointTLI);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (close(recvFile) != 0)
ereport(PANIC,
@@ -776,9 +778,12 @@ static void
WalRcvDie(int code, Datum arg)
{
WalRcvData *walrcv = WalRcv;
+ TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
+
+ Assert(*startpointTLI_p != 0);
/* Ensure that all WAL records received are flushed to disk */
- XLogWalRcvFlush(true);
+ XLogWalRcvFlush(true, *startpointTLI_p);
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
@@ -808,7 +813,7 @@ WalRcvDie(int code, Datum arg)
* Accept the message from XLOG stream, and process it.
*/
static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
{
int hdrlen;
XLogRecPtr dataStart;
@@ -838,7 +843,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
buf += hdrlen;
len -= hdrlen;
- XLogWalRcvWrite(buf, len, dataStart);
+ XLogWalRcvWrite(buf, len, dataStart, tli);
break;
}
case 'k': /* Keepalive */
@@ -875,25 +880,27 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
* Write XLOG data to disk.
*/
static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
{
int startoff;
int byteswritten;
+ Assert(tli != 0);
+
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- XLogWalRcvClose(recptr);
+ XLogWalRcvClose(recptr, tli);
if (recvFile < 0)
{
/* Create/use new log file */
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
- recvFile = XLogFileInit(recvSegNo);
- recvFileTLI = ThisTimeLineID;
+ recvFile = XLogFileInit(recvSegNo, tli);
+ recvFileTLI = tli;
}
/* Calculate the start offset of the received logs */
@@ -946,7 +953,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
* segment is received and written.
*/
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- XLogWalRcvClose(recptr);
+ XLogWalRcvClose(recptr, tli);
}
/*
@@ -956,13 +963,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
* an error, so we skip sending a reply in that case.
*/
static void
-XLogWalRcvFlush(bool dying)
+XLogWalRcvFlush(bool dying, TimeLineID tli)
{
+ Assert(tli != 0);
+
if (LogstreamResult.Flush < LogstreamResult.Write)
{
WalRcvData *walrcv = WalRcv;
- issue_xlog_fsync(recvFile, recvSegNo);
+ issue_xlog_fsync(recvFile, recvSegNo, tli);
LogstreamResult.Flush = LogstreamResult.Write;
@@ -972,7 +981,7 @@ XLogWalRcvFlush(bool dying)
{
walrcv->latestChunkStart = walrcv->flushedUpto;
walrcv->flushedUpto = LogstreamResult.Flush;
- walrcv->receivedTLI = ThisTimeLineID;
+ walrcv->receivedTLI = tli;
}
SpinLockRelease(&walrcv->mutex);
@@ -1009,17 +1018,18 @@ XLogWalRcvFlush(bool dying)
* Create an archive notification file since the segment is known completed.
*/
static void
-XLogWalRcvClose(XLogRecPtr recptr)
+XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
{
char xlogfname[MAXFNAMELEN];
Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
+ Assert(tli != 0);
/*
* fsync() and close current file before we switch to next one. We would
* otherwise have to reopen this file to fsync it later
*/
- XLogWalRcvFlush(false);
+ XLogWalRcvFlush(false, tli);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d9ab6d6de24..fff7dfc6409 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -230,7 +230,7 @@ static void WalSndShutdown(void) pg_attribute_noreturn();
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
-static XLogRecPtr GetStandbyFlushRecPtr(void);
+static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
static void IdentifySystem(void);
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
@@ -385,6 +385,7 @@ IdentifySystem(void)
TupleDesc tupdesc;
Datum values[4];
bool nulls[4];
+ TimeLineID currTLI;
/*
* Reply with a result set with one row, four columns. First col is system
@@ -397,12 +398,9 @@ IdentifySystem(void)
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
- {
- /* this also updates ThisTimeLineID */
- logptr = GetStandbyFlushRecPtr();
- }
+ logptr = GetStandbyFlushRecPtr(&currTLI);
else
- logptr = GetFlushRecPtr();
+ logptr = GetFlushRecPtr(&currTLI);
snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
@@ -441,7 +439,7 @@ IdentifySystem(void)
values[0] = CStringGetTextDatum(sysid);
/* column 2: timeline */
- values[1] = Int32GetDatum(ThisTimeLineID);
+ values[1] = Int32GetDatum(currTLI);
/* column 3: wal location */
values[2] = CStringGetTextDatum(xloc);
@@ -537,7 +535,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
if (RecoveryInProgress())
(void) GetXLogReplayRecPtr(&current_timeline);
else
- current_timeline = ThisTimeLineID;
+ current_timeline = GetWALInsertionTimeLine();
timeline_history = readTimeLineHistory(current_timeline);
slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
@@ -671,6 +669,7 @@ StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
XLogRecPtr FlushPtr;
+ TimeLineID FlushTLI;
/* create xlogreader for physical replication */
xlogreader =
@@ -710,24 +709,20 @@ StartReplication(StartReplicationCmd *cmd)
/*
* Select the timeline. If it was given explicitly by the client, use
- * that. Otherwise use the timeline of the last replayed record, which is
- * kept in ThisTimeLineID.
+ * that. Otherwise use the timeline of the last replayed record.
*/
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
- {
- /* this also updates ThisTimeLineID */
- FlushPtr = GetStandbyFlushRecPtr();
- }
+ FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
else
- FlushPtr = GetFlushRecPtr();
+ FlushPtr = GetFlushRecPtr(&FlushTLI);
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
- if (sendTimeLine == ThisTimeLineID)
+ if (sendTimeLine == FlushTLI)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
@@ -742,7 +737,7 @@ StartReplication(StartReplicationCmd *cmd)
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
*/
- timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+ timeLineHistory = readTimeLineHistory(FlushTLI);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
@@ -781,7 +776,7 @@ StartReplication(StartReplicationCmd *cmd)
}
else
{
- sendTimeLine = ThisTimeLineID;
+ sendTimeLine = FlushTLI;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
@@ -909,9 +904,16 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
+ TimeLineID currTLI = GetWALInsertionTimeLine();
- XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
- sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+ /*
+ * Since logical decoding is only permitted on a primary server, we know
+ * that the current timeline ID can't be changing any more. If we did this
+ * on a standby, we'd have to worry about the values we compute here
+ * becoming invalid due to a promotion or timeline change.
+ */
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
+ sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
@@ -1487,7 +1489,7 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Get a more recent flush pointer. */
if (!RecoveryInProgress())
- RecentFlushPtr = GetFlushRecPtr();
+ RecentFlushPtr = GetFlushRecPtr(NULL);
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
@@ -1521,7 +1523,7 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Update our idea of the currently flushed position. */
if (!RecoveryInProgress())
- RecentFlushPtr = GetFlushRecPtr();
+ RecentFlushPtr = GetFlushRecPtr(NULL);
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
@@ -2683,6 +2685,8 @@ XLogSendPhysical(void)
}
else if (am_cascading_walsender)
{
+ TimeLineID SendRqstTLI;
+
/*
* Streaming the latest timeline on a standby.
*
@@ -2702,14 +2706,12 @@ XLogSendPhysical(void)
*/
bool becameHistoric = false;
- SendRqstPtr = GetStandbyFlushRecPtr();
+ SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
if (!RecoveryInProgress())
{
- /*
- * We have been promoted. RecoveryInProgress() updated
- * ThisTimeLineID to the new current timeline.
- */
+ /* We have been promoted. */
+ SendRqstTLI = GetWALInsertionTimeLine();
am_cascading_walsender = false;
becameHistoric = true;
}
@@ -2717,10 +2719,9 @@ XLogSendPhysical(void)
{
/*
* Still a cascading standby. But is the timeline we're sending
- * still the one recovery is recovering from? ThisTimeLineID was
- * updated by the GetStandbyFlushRecPtr() call above.
+ * still the one recovery is recovering from?
*/
- if (sendTimeLine != ThisTimeLineID)
+ if (sendTimeLine != SendRqstTLI)
becameHistoric = true;
}
@@ -2733,7 +2734,7 @@ XLogSendPhysical(void)
*/
List *history;
- history = readTimeLineHistory(ThisTimeLineID);
+ history = readTimeLineHistory(SendRqstTLI);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sendTimeLine < sendTimeLineNextTLI);
@@ -2756,7 +2757,7 @@ XLogSendPhysical(void)
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = GetFlushRecPtr(NULL);
}
/*
@@ -2997,9 +2998,9 @@ XLogSendLogical(void)
* we only need to update flushPtr if EndRecPtr is past it.
*/
if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr();
+ flushPtr = GetFlushRecPtr(NULL);
else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr();
+ flushPtr = GetFlushRecPtr(NULL);
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3069,11 +3070,11 @@ WalSndDone(WalSndSendDataCallback send_data)
* can be sent to the standby. This should only be called when in recovery,
* ie. we're streaming to a cascaded standby.
*
- * As a side-effect, ThisTimeLineID is updated to the TLI of the last
+ * As a side-effect, *tli is updated to the TLI of the last
* replayed WAL record.
*/
static XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *tli)
{
XLogRecPtr replayPtr;
TimeLineID replayTLI;
@@ -3090,10 +3091,10 @@ GetStandbyFlushRecPtr(void)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- ThisTimeLineID = replayTLI;
+ *tli = replayTLI;
result = replayPtr;
- if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
+ if (receiveTLI == replayTLI && receivePtr > replayPtr)
result = receivePtr;
return result;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c0a560204b4..f188c41bedf 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -29,8 +29,6 @@
#define SYNC_METHOD_OPEN_DSYNC 4 /* for O_DSYNC */
extern int sync_method;
-extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */
-
/*
* Recovery target type.
* Only set during a Point in Time recovery, not when in standby mode.
@@ -262,7 +260,7 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int XLogFileInit(XLogSegNo segno);
+extern int XLogFileInit(XLogSegNo segno, TimeLineID tli);
extern int XLogFileOpen(XLogSegNo segno);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
@@ -274,7 +272,7 @@ extern void xlog_redo(XLogReaderState *record);
extern void xlog_desc(StringInfo buf, XLogReaderState *record);
extern const char *xlog_identify(uint8 info);
-extern void issue_xlog_fsync(int fd, XLogSegNo segno);
+extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli);
extern bool RecoveryInProgress(void);
extern RecoveryState GetRecoveryState(void);
@@ -312,7 +310,8 @@ extern void UpdateFullPageWrites(void);
extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p);
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
-extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI);
+extern TimeLineID GetWALInsertionTimeLine(void);
extern XLogRecPtr GetLastImportantRecPtr(void);
extern void RemovePromoteSignalFiles(void);
diff --git a/src/include/access/xlogarchive.h b/src/include/access/xlogarchive.h
index 3edd1a976c1..7dcf1bd2dd2 100644
--- a/src/include/access/xlogarchive.h
+++ b/src/include/access/xlogarchive.h
@@ -24,7 +24,7 @@ extern void ExecuteRecoveryCommand(const char *command, const char *commandName,
bool failOnSignal);
extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname);
extern void XLogArchiveNotify(const char *xlog);
-extern void XLogArchiveNotifySeg(XLogSegNo segno);
+extern void XLogArchiveNotifySeg(XLogSegNo segno, TimeLineID tli);
extern void XLogArchiveForceDone(const char *xlog);
extern bool XLogArchiveCheckDone(const char *xlog);
extern bool XLogArchiveIsBusy(const char *xlog);
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index a5cb3d322c5..eebc91f3a50 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -98,7 +98,9 @@ extern void wal_segment_open(XLogReaderState *state,
extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
- XLogRecPtr wantPage, uint32 wantLength);
+ XLogRecPtr wantPage,
+ uint32 wantLength,
+ TimeLineID currTLI);
extern void WALReadRaiseError(WALReadError *errinfo);