aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c62
-rw-r--r--src/backend/replication/walsender.c28
-rw-r--r--src/include/access/xlog.h4
3 files changed, 59 insertions, 35 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 70b2e1cbeb8..c1c3709fb44 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -407,7 +407,6 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */
TimeLineID ThisTimeLineID;
- TimeLineID RecoveryTargetTLI;
/*
* archiveCleanupCommand is read from recovery.conf but needs to be in
@@ -456,14 +455,14 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* current effective recovery target timeline */
+ TimeLineID RecoveryTargetTLI;
/*
* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery
*/
TimestampTz currentChunkStartTime;
- /* end of the last record restored from the archive */
- XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
bool recoveryPause;
@@ -2817,18 +2816,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
if (reload)
WalSndRqstFileReload();
- /*
- * Calculate the end location of the restored WAL file and save it in
- * shmem. It's used as current standby flush position, and cascading
- * walsenders try to send WAL records up to this location.
- */
- XLogSegNoOffsetToRecPtr(segno, 0, endptr);
- XLByteAdvance(endptr, XLogSegSize);
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->restoreLastRecPtr = endptr;
- SpinLockRelease(&xlogctl->info_lck);
-
/* Signal walsender that new WAL has arrived */
if (AllowCascadeReplication())
WalSndWakeup();
@@ -4470,12 +4457,17 @@ rescanLatestTimeLine(void)
ThisTimeLineID)));
else
{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
/* Switch target */
recoveryTargetTLI = newtarget;
list_free(expectedTLIs);
expectedTLIs = newExpectedTLIs;
- XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
+ SpinLockRelease(&xlogctl->info_lck);
ereport(LOG,
(errmsg("new target timeline is %u",
@@ -7513,13 +7505,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
}
/*
- * GetRecoveryTargetTLI - get the recovery target timeline ID
+ * GetRecoveryTargetTLI - get the current recovery target timeline ID
*/
TimeLineID
GetRecoveryTargetTLI(void)
{
- /* RecoveryTargetTLI doesn't change so we need no lock to copy it */
- return XLogCtl->RecoveryTargetTLI;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ TimeLineID result;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ result = xlogctl->RecoveryTargetTLI;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return result;
}
/*
@@ -8309,7 +8308,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetStandbyFlushRecPtr();
+ endptr = GetStandbyFlushRecPtr(NULL);
KeepLogSeg(endptr, &_logSegNo);
_logSegNo--;
@@ -9818,14 +9817,13 @@ do_pg_abort_backup(void)
/*
* Get latest redo apply position.
*
- * Optionally, returns the end byte position of the last restored
- * WAL segment. Callers not interested in that value may pass
- * NULL for restoreLastRecPtr.
+ * Optionally, returns the current recovery target timeline. Callers not
+ * interested in that may pass NULL for targetTLI.
*
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
-GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
+GetXLogReplayRecPtr(TimeLineID *targetTLI)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9833,8 +9831,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr;
- if (restoreLastRecPtr)
- *restoreLastRecPtr = xlogctl->restoreLastRecPtr;
+ if (targetTLI)
+ *targetTLI = xlogctl->RecoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
@@ -9843,21 +9841,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
/*
* Get current standby flush position, ie, the last WAL position
* known to be fsync'd to disk in standby.
+ *
+ * If 'targetTLI' is not NULL, it's set to the current recovery target
+ * timeline.
*/
XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *targetTLI)
{
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
- XLogRecPtr restorePtr;
receivePtr = GetWalRcvWriteRecPtr(NULL);
- replayPtr = GetXLogReplayRecPtr(&restorePtr);
+ replayPtr = GetXLogReplayRecPtr(targetTLI);
if (XLByteLT(receivePtr, replayPtr))
- return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+ return replayPtr;
else
- return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+ return receivePtr;
}
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 38f7a3f1c36..cc27848318b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -303,7 +303,7 @@ IdentifySystem(void)
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
@@ -1137,7 +1137,31 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
+ if (am_cascading_walsender)
+ {
+ TimeLineID currentTargetTLI;
+ SendRqstPtr = GetStandbyFlushRecPtr(&currentTargetTLI);
+
+ /*
+ * If the recovery target timeline changed, bail out. It's a bit
+ * unfortunate that we have to just disconnect, but there is no way
+ * to tell the client that the timeline changed. We also don't know
+ * exactly where the switch happened, so we cannot safely try to send
+ * up to the switchover point before disconnecting.
+ */
+ if (currentTargetTLI != ThisTimeLineID)
+ {
+ if (!walsender_ready_to_stop)
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ *caughtup = true;
+ return;
+ }
+ }
+ else
+ SendRqstPtr = GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index ec79870e749..2893f3b3524 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -285,8 +285,8 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
-extern XLogRecPtr GetStandbyFlushRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI);
+extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);