aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c265
-rw-r--r--src/backend/postmaster/postmaster.c20
-rw-r--r--src/backend/replication/basebackup.c5
-rw-r--r--src/backend/replication/syncrep.c7
-rw-r--r--src/backend/replication/walreceiver.c7
-rw-r--r--src/backend/replication/walsender.c108
-rw-r--r--src/include/access/xlog.h6
-rw-r--r--src/include/replication/walsender.h5
8 files changed, 284 insertions, 139 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 662b26bc27d..6a6959f728c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -446,6 +446,8 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* end of the last record restored from the archive */
+ XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
bool recoveryPause;
@@ -612,6 +614,7 @@ static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg);
static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2729,6 +2732,61 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
elog(ERROR, "invalid XLogFileRead source %d", source);
}
+ /*
+ * If the segment was fetched from archival storage, replace
+ * the existing xlog segment (if any) with the archival version.
+ */
+ if (source == XLOG_FROM_ARCHIVE)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr endptr;
+ char xlogfpath[MAXPGPATH];
+ bool reload = false;
+ struct stat statbuf;
+
+ XLogFilePath(xlogfpath, tli, log, seg);
+ if (stat(xlogfpath, &statbuf) == 0)
+ {
+ if (unlink(xlogfpath) != 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m",
+ xlogfpath)));
+ reload = true;
+ }
+
+ if (rename(path, xlogfpath) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ path, xlogfpath)));
+
+ /*
+ * If the existing segment was replaced, since walsenders might have
+ * it open, request them to reload a currently-open segment.
+ */
+ if (reload)
+ WalSndRqstFileReload();
+
+ /*
+ * Calculate the end location of the restored WAL file and save it in
+ * shmem. It's used as current standby flush position, and cascading
+ * walsenders try to send WAL records up to this location.
+ */
+ endptr.xlogid = log;
+ endptr.xrecoff = seg * XLogSegSize;
+ XLByteAdvance(endptr, XLogSegSize);
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->restoreLastRecPtr = endptr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ /* Signal walsender that new WAL has arrived */
+ if (AllowCascadeReplication())
+ WalSndWakeup();
+ }
+
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd >= 0)
{
@@ -3361,18 +3419,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
{
- /*
- * Normally we don't delete old XLOG files during recovery to
- * avoid accidentally deleting a file that looks stale due to a
- * bug or hardware issue, but in fact contains important data.
- * During streaming recovery, however, we will eventually fill the
- * disk if we never clean up, so we have to. That's not an issue
- * with file-based archive recovery because in that case we
- * restore one XLOG file at a time, on-demand, and with a
- * different filename that can't be confused with regular XLOG
- * files.
- */
- if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
+ if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
@@ -5484,62 +5531,23 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
}
/*
- * If the segment was fetched from archival storage, we want to replace
- * the existing xlog segment (if any) with the archival version. This is
- * because whatever is in XLOGDIR is very possibly older than what we have
- * from the archives, since it could have come from restoring a PGDATA
- * backup. In any case, the archival version certainly is more
- * descriptive of what our current database state is, because that is what
- * we replayed from.
+ * If we are establishing a new timeline, we have to copy data from
+ * the last WAL segment of the old timeline to create a starting WAL
+ * segment for the new timeline.
*
- * Note that if we are establishing a new timeline, ThisTimeLineID is
- * already set to the new value, and so we will create a new file instead
- * of overwriting any existing file. (This is, in fact, always the case
- * at present.)
+ * Notify the archiver that the last WAL segment of the old timeline
+ * is ready to copy to archival storage. Otherwise, it is not archived
+ * for a while.
*/
- snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
- XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
-
- if (restoredFromArchive)
+ if (endTLI != ThisTimeLineID)
{
- ereport(DEBUG3,
- (errmsg_internal("moving last restored xlog to \"%s\"",
- xlogpath)));
- unlink(xlogpath); /* might or might not exist */
- if (rename(recoveryPath, xlogpath) != 0)
- ereport(FATAL,
- (errcode_for_file_access(),
- errmsg("could not rename file \"%s\" to \"%s\": %m",
- recoveryPath, xlogpath)));
- /* XXX might we need to fix permissions on the file? */
- }
- else
- {
- /*
- * If the latest segment is not archival, but there's still a
- * RECOVERYXLOG laying about, get rid of it.
- */
- unlink(recoveryPath); /* ignore any error */
+ XLogFileCopy(endLogId, endLogSeg,
+ endTLI, endLogId, endLogSeg);
- /*
- * If we are establishing a new timeline, we have to copy data from
- * the last WAL segment of the old timeline to create a starting WAL
- * segment for the new timeline.
- *
- * Notify the archiver that the last WAL segment of the old timeline
- * is ready to copy to archival storage. Otherwise, it is not archived
- * for a while.
- */
- if (endTLI != ThisTimeLineID)
+ if (XLogArchivingActive())
{
- XLogFileCopy(endLogId, endLogSeg,
- endTLI, endLogId, endLogSeg);
-
- if (XLogArchivingActive())
- {
- XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
- XLogArchiveNotify(xlogpath);
- }
+ XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
+ XLogArchiveNotify(xlogpath);
}
}
@@ -5550,6 +5558,13 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
XLogArchiveCleanup(xlogpath);
+ /*
+ * Since there might be a partial WAL segment named RECOVERYXLOG,
+ * get rid of it.
+ */
+ snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
+ unlink(recoveryPath); /* ignore any error */
+
/* Get rid of any remaining recovered timeline-history file, too */
snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
unlink(recoveryPath); /* ignore any error */
@@ -7871,46 +7886,7 @@ CreateCheckPoint(int flags)
*/
if (_logId || _logSeg)
{
- /*
- * Calculate the last segment that we need to retain because of
- * wal_keep_segments, by subtracting wal_keep_segments from the new
- * checkpoint location.
- */
- if (wal_keep_segments > 0)
- {
- uint32 log;
- uint32 seg;
- int d_log;
- int d_seg;
-
- XLByteToSeg(recptr, log, seg);
-
- d_seg = wal_keep_segments % XLogSegsPerFile;
- d_log = wal_keep_segments / XLogSegsPerFile;
- if (seg < d_seg)
- {
- d_log += 1;
- seg = seg - d_seg + XLogSegsPerFile;
- }
- else
- seg = seg - d_seg;
- /* avoid underflow, don't go below (0,1) */
- if (log < d_log || (log == d_log && seg == 0))
- {
- log = 0;
- seg = 1;
- }
- else
- log = log - d_log;
-
- /* don't delete WAL segments newer than the calculated segment */
- if (log < _logId || (log == _logId && seg < _logSeg))
- {
- _logId = log;
- _logSeg = seg;
- }
- }
-
+ KeepLogSeg(recptr, &_logId, &_logSeg);
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, recptr);
}
@@ -8151,17 +8127,16 @@ CreateRestartPoint(int flags)
/*
* Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from
- * growing full. We don't need do this during normal recovery, but during
- * streaming recovery we have to or the disk will eventually fill up from
- * old log files streamed from master.
+ * growing full.
*/
- if (WalRcvInProgress() && (_logId || _logSeg))
+ if (_logId || _logSeg)
{
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetWalRcvWriteRecPtr(NULL);
+ endptr = GetStandbyFlushRecPtr();
+ KeepLogSeg(endptr, &_logId, &_logSeg);
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr);
@@ -8207,6 +8182,50 @@ CreateRestartPoint(int flags)
}
/*
+ * Calculate the last segment that we need to retain because of
+ * wal_keep_segments, by subtracting wal_keep_segments from
+ * the given xlog location, recptr.
+ */
+static void
+KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg)
+{
+ uint32 log;
+ uint32 seg;
+ int d_log;
+ int d_seg;
+
+ if (wal_keep_segments == 0)
+ return;
+
+ XLByteToSeg(recptr, log, seg);
+
+ d_seg = wal_keep_segments % XLogSegsPerFile;
+ d_log = wal_keep_segments / XLogSegsPerFile;
+ if (seg < d_seg)
+ {
+ d_log += 1;
+ seg = seg - d_seg + XLogSegsPerFile;
+ }
+ else
+ seg = seg - d_seg;
+ /* avoid underflow, don't go below (0,1) */
+ if (log < d_log || (log == d_log && seg == 0))
+ {
+ log = 0;
+ seg = 1;
+ }
+ else
+ log = log - d_log;
+
+ /* don't delete WAL segments newer than the calculated segment */
+ if (log < *logId || (log == *logId && seg < *logSeg))
+ {
+ *logId = log;
+ *logSeg = seg;
+ }
+}
+
+/*
* Write a NEXTOID log record
*/
void
@@ -9549,10 +9568,14 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
/*
* Get latest redo apply position.
*
+ * Optionally, returns the end byte position of the last restored
+ * WAL segment. Callers not interested in that value may pass
+ * NULL for restoreLastRecPtr.
+ *
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
-GetXLogReplayRecPtr(void)
+GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9560,12 +9583,34 @@ GetXLogReplayRecPtr(void)
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr;
+ if (restoreLastRecPtr)
+ *restoreLastRecPtr = xlogctl->restoreLastRecPtr;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}
/*
+ * Get current standby flush position, ie, the last WAL position
+ * known to be fsync'd to disk in standby.
+ */
+XLogRecPtr
+GetStandbyFlushRecPtr(void)
+{
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+ XLogRecPtr restorePtr;
+
+ receivePtr = GetWalRcvWriteRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr(&restorePtr);
+
+ if (XLByteLT(receivePtr, replayPtr))
+ return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+ else
+ return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+}
+
+/*
* Report the last WAL replay location (same format as pg_start_backup etc)
*
* This is useful for determining how much of WAL is visible to read-only
@@ -9577,7 +9622,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetXLogReplayRecPtr();
+ recptr = GetXLogReplayRecPtr(NULL);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 9bcbf212f8c..412bc96465c 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2317,6 +2317,26 @@ reaper(SIGNAL_ARGS)
pmState = PM_RUN;
/*
+ * Kill any walsenders to force the downstream standby(s) to
+ * reread the timeline history file, adjust their timelines and
+ * establish replication connections again. This is required
+ * because the timeline of cascading standby is not consistent
+ * with that of cascaded one just after failover. We LOG this
+ * message since we need to leave a record to explain this
+ * disconnection.
+ *
+ * XXX should avoid the need for disconnection. When we do,
+ * am_cascading_walsender should be replaced with RecoveryInProgress()
+ */
+ if (max_wal_senders > 0)
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender all processes to force cascaded"
+ "standby(s) to update timeline and reconnect")));
+ SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
+ }
+
+ /*
* Crank up the background writer, if we didn't do that already
* when we entered consistent recovery state. It doesn't matter
* if this fails, we'll just try again later.
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index bcde19c71b6..74d28440bf4 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd)
MemoryContext old_context;
basebackup_options opt;
+ if (am_cascading_walsender)
+ ereport(FATAL,
+ (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+ errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
+
parse_basebackup_options(cmd->options, &opt);
backup_context = AllocSetContextCreate(CurrentMemoryContext,
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index b73d225a8ef..32db2bc4c52 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void)
int priority = 0;
bool found = false;
+ /*
+ * Since synchronous cascade replication is not allowed, we always
+ * set the priority of cascading walsender to zero.
+ */
+ if (am_cascading_walsender)
+ return 0;
+
/* Need a modifiable copy of string */
rawstring = pstrdup(SyncRepStandbyNames);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ea6f6cdcdaf..c24fa87394d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -44,6 +44,7 @@
#include "miscadmin.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
+#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
@@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying)
}
SpinLockRelease(&walrcv->mutex);
- /* Signal the startup process that new WAL has arrived */
+ /* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
+ if (AllowCascadeReplication())
+ WalSndWakeup();
/* Report XLOG streaming progress in PS display */
if (update_process_title)
@@ -625,7 +628,7 @@ XLogWalRcvSendReply(void)
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
- reply_message.apply = GetXLogReplayRecPtr();
+ reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5b3300d23..63a63048dbb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -48,6 +48,7 @@
#include "replication/basebackup.h"
#include "replication/replnodes.h"
#include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
@@ -135,10 +137,7 @@ WalSenderMain(void)
{
MemoryContext walsnd_context;
- if (RecoveryInProgress())
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+ am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
@@ -165,6 +164,12 @@ WalSenderMain(void)
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /*
+ * Use the recovery target timeline ID during recovery
+ */
+ if (am_cascading_walsender)
+ ThisTimeLineID = GetRecoveryTargetTLI();
+
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
@@ -290,7 +295,7 @@ IdentifySystem(void)
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
@@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd)
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
- * Check that we're logging enough information in the WAL for
- * log-shipping.
+ * We assume here that we're logging enough information in the WAL for
+ * log-shipping, since this is checked in PostmasterMain().
*
- * NOTE: This only checks the current value of wal_level. Even if the
- * current setting is not 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'. So this is not bulletproof,
- * the purpose is just to give a user-friendly error message that hints
- * how to configure the system correctly.
+ * NOTE: wal_level can only change at shutdown, so in most cases it is
+ * difficult for there to be WAL data that we can still see that was written
+ * at wal_level='minimal'.
*/
- if (wal_level == WAL_LEVEL_MINIMAL)
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=minimal")));
/*
* When we first start replication the standby will be behind the primary.
@@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void)
SpinLockRelease(&walsnd->mutex);
}
- SyncRepReleaseWaiters();
+ if (!am_cascading_walsender)
+ SyncRepReleaseWaiters();
}
/*
@@ -764,6 +764,8 @@ WalSndLoop(void)
/*
* When SIGUSR2 arrives, we send any outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
+ * This may be a normal termination at shutdown, or a promotion,
+ * the walsender is not sure which.
*/
if (walsender_ready_to_stop && !pq_is_send_pending())
{
@@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg)
}
/*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
@@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
{
- XLogRecPtr startRecPtr = recptr;
- char path[MAXPGPATH];
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
uint32 lastRemovedLog;
uint32 lastRemovedSeg;
uint32 log;
uint32 seg;
+retry:
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
+
while (nbytes > 0)
{
uint32 startoff;
@@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
{
+ char path[MAXPGPATH];
+
/* Switch to another logfile segment */
if (sendFile >= 0)
close(sendFile);
@@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
else
segbytes = nbytes;
- readbytes = read(sendFile, buf, segbytes);
+ readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
sendOff += readbytes;
nbytes -= readbytes;
- buf += readbytes;
+ p += readbytes;
}
/*
@@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* already have been overwritten with new WAL records.
*/
XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startRecPtr, log, seg);
+ XLByteToSeg(startptr, log, seg);
if (log < lastRemovedLog ||
(log == lastRemovedLog && seg <= lastRemovedSeg))
{
@@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
errmsg("requested WAL segment %s has already been removed",
filename)));
}
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with
+ * the file of the same name retrieved from archive. So we always need
+ * to check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
+ }
}
/*
@@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
@@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup)
return;
}
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->needreload = true;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7056fd61891..cdbf63fa76e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -221,6 +221,9 @@ extern int wal_level;
/* Do we need to WAL-log information required only for Hot Standby? */
#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
+/* Can we allow the standby to accept replication connection from another standby? */
+#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
+
#ifdef WAL_DEBUG
extern bool XLOG_DEBUG;
#endif
@@ -292,7 +295,8 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
+extern XLogRecPtr GetStandbyFlushRecPtr(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 6ee8668d0a4..cb8e70ef38a 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -35,6 +35,7 @@ typedef struct WalSnd
pid_t pid; /* this walsender's process id, or 0 */
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ bool needreload; /* does currently-open file need to be reloaded? */
/*
* The xlog locations that have been written, flushed, and applied by
@@ -92,6 +93,7 @@ extern WalSndCtlData *WalSndCtl;
/* global state */
extern bool am_walsender;
+extern bool am_cascading_walsender;
extern volatile sig_atomic_t walsender_shutdown_requested;
extern volatile sig_atomic_t walsender_ready_to_stop;
@@ -106,7 +108,8 @@ extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
+extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void WalSndRqstFileReload(void);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);