diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xlog.c | 265 | ||||
-rw-r--r-- | src/backend/postmaster/postmaster.c | 20 | ||||
-rw-r--r-- | src/backend/replication/basebackup.c | 5 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 108 | ||||
-rw-r--r-- | src/include/access/xlog.h | 6 | ||||
-rw-r--r-- | src/include/replication/walsender.h | 5 |
8 files changed, 284 insertions, 139 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 662b26bc27d..6a6959f728c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -446,6 +446,8 @@ typedef struct XLogCtlData XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* end of the last record restored from the archive */ + XLogRecPtr restoreLastRecPtr; /* Are we requested to pause recovery? */ bool recoveryPause; @@ -612,6 +614,7 @@ static void CheckRequiredParameterValues(void); static void XLogReportParameters(void); static void LocalSetXLogInsertAllowed(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); +static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg); static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); @@ -2729,6 +2732,61 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, elog(ERROR, "invalid XLogFileRead source %d", source); } + /* + * If the segment was fetched from archival storage, replace + * the existing xlog segment (if any) with the archival version. + */ + if (source == XLOG_FROM_ARCHIVE) + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr endptr; + char xlogfpath[MAXPGPATH]; + bool reload = false; + struct stat statbuf; + + XLogFilePath(xlogfpath, tli, log, seg); + if (stat(xlogfpath, &statbuf) == 0) + { + if (unlink(xlogfpath) != 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + xlogfpath))); + reload = true; + } + + if (rename(path, xlogfpath) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + path, xlogfpath))); + + /* + * If the existing segment was replaced, since walsenders might have + * it open, request them to reload a currently-open segment. + */ + if (reload) + WalSndRqstFileReload(); + + /* + * Calculate the end location of the restored WAL file and save it in + * shmem. It's used as current standby flush position, and cascading + * walsenders try to send WAL records up to this location. + */ + endptr.xlogid = log; + endptr.xrecoff = seg * XLogSegSize; + XLByteAdvance(endptr, XLogSegSize); + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->restoreLastRecPtr = endptr; + SpinLockRelease(&xlogctl->info_lck); + + /* Signal walsender that new WAL has arrived */ + if (AllowCascadeReplication()) + WalSndWakeup(); + } + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (fd >= 0) { @@ -3361,18 +3419,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) strspn(xlde->d_name, "0123456789ABCDEF") == 24 && strcmp(xlde->d_name + 8, lastoff + 8) <= 0) { - /* - * Normally we don't delete old XLOG files during recovery to - * avoid accidentally deleting a file that looks stale due to a - * bug or hardware issue, but in fact contains important data. - * During streaming recovery, however, we will eventually fill the - * disk if we never clean up, so we have to. That's not an issue - * with file-based archive recovery because in that case we - * restore one XLOG file at a time, on-demand, and with a - * different filename that can't be confused with regular XLOG - * files. - */ - if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name)) + if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); @@ -5484,62 +5531,23 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) } /* - * If the segment was fetched from archival storage, we want to replace - * the existing xlog segment (if any) with the archival version. This is - * because whatever is in XLOGDIR is very possibly older than what we have - * from the archives, since it could have come from restoring a PGDATA - * backup. In any case, the archival version certainly is more - * descriptive of what our current database state is, because that is what - * we replayed from. + * If we are establishing a new timeline, we have to copy data from + * the last WAL segment of the old timeline to create a starting WAL + * segment for the new timeline. * - * Note that if we are establishing a new timeline, ThisTimeLineID is - * already set to the new value, and so we will create a new file instead - * of overwriting any existing file. (This is, in fact, always the case - * at present.) + * Notify the archiver that the last WAL segment of the old timeline + * is ready to copy to archival storage. Otherwise, it is not archived + * for a while. */ - snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG"); - XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg); - - if (restoredFromArchive) + if (endTLI != ThisTimeLineID) { - ereport(DEBUG3, - (errmsg_internal("moving last restored xlog to \"%s\"", - xlogpath))); - unlink(xlogpath); /* might or might not exist */ - if (rename(recoveryPath, xlogpath) != 0) - ereport(FATAL, - (errcode_for_file_access(), - errmsg("could not rename file \"%s\" to \"%s\": %m", - recoveryPath, xlogpath))); - /* XXX might we need to fix permissions on the file? */ - } - else - { - /* - * If the latest segment is not archival, but there's still a - * RECOVERYXLOG laying about, get rid of it. - */ - unlink(recoveryPath); /* ignore any error */ + XLogFileCopy(endLogId, endLogSeg, + endTLI, endLogId, endLogSeg); - /* - * If we are establishing a new timeline, we have to copy data from - * the last WAL segment of the old timeline to create a starting WAL - * segment for the new timeline. - * - * Notify the archiver that the last WAL segment of the old timeline - * is ready to copy to archival storage. Otherwise, it is not archived - * for a while. - */ - if (endTLI != ThisTimeLineID) + if (XLogArchivingActive()) { - XLogFileCopy(endLogId, endLogSeg, - endTLI, endLogId, endLogSeg); - - if (XLogArchivingActive()) - { - XLogFileName(xlogpath, endTLI, endLogId, endLogSeg); - XLogArchiveNotify(xlogpath); - } + XLogFileName(xlogpath, endTLI, endLogId, endLogSeg); + XLogArchiveNotify(xlogpath); } } @@ -5550,6 +5558,13 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg); XLogArchiveCleanup(xlogpath); + /* + * Since there might be a partial WAL segment named RECOVERYXLOG, + * get rid of it. + */ + snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG"); + unlink(recoveryPath); /* ignore any error */ + /* Get rid of any remaining recovered timeline-history file, too */ snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY"); unlink(recoveryPath); /* ignore any error */ @@ -7871,46 +7886,7 @@ CreateCheckPoint(int flags) */ if (_logId || _logSeg) { - /* - * Calculate the last segment that we need to retain because of - * wal_keep_segments, by subtracting wal_keep_segments from the new - * checkpoint location. - */ - if (wal_keep_segments > 0) - { - uint32 log; - uint32 seg; - int d_log; - int d_seg; - - XLByteToSeg(recptr, log, seg); - - d_seg = wal_keep_segments % XLogSegsPerFile; - d_log = wal_keep_segments / XLogSegsPerFile; - if (seg < d_seg) - { - d_log += 1; - seg = seg - d_seg + XLogSegsPerFile; - } - else - seg = seg - d_seg; - /* avoid underflow, don't go below (0,1) */ - if (log < d_log || (log == d_log && seg == 0)) - { - log = 0; - seg = 1; - } - else - log = log - d_log; - - /* don't delete WAL segments newer than the calculated segment */ - if (log < _logId || (log == _logId && seg < _logSeg)) - { - _logId = log; - _logSeg = seg; - } - } - + KeepLogSeg(recptr, &_logId, &_logSeg); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, recptr); } @@ -8151,17 +8127,16 @@ CreateRestartPoint(int flags) /* * Delete old log files (those no longer needed even for previous * checkpoint/restartpoint) to prevent the disk holding the xlog from - * growing full. We don't need do this during normal recovery, but during - * streaming recovery we have to or the disk will eventually fill up from - * old log files streamed from master. + * growing full. */ - if (WalRcvInProgress() && (_logId || _logSeg)) + if (_logId || _logSeg) { XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ - endptr = GetWalRcvWriteRecPtr(NULL); + endptr = GetStandbyFlushRecPtr(); + KeepLogSeg(endptr, &_logId, &_logSeg); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, endptr); @@ -8207,6 +8182,50 @@ CreateRestartPoint(int flags) } /* + * Calculate the last segment that we need to retain because of + * wal_keep_segments, by subtracting wal_keep_segments from + * the given xlog location, recptr. + */ +static void +KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg) +{ + uint32 log; + uint32 seg; + int d_log; + int d_seg; + + if (wal_keep_segments == 0) + return; + + XLByteToSeg(recptr, log, seg); + + d_seg = wal_keep_segments % XLogSegsPerFile; + d_log = wal_keep_segments / XLogSegsPerFile; + if (seg < d_seg) + { + d_log += 1; + seg = seg - d_seg + XLogSegsPerFile; + } + else + seg = seg - d_seg; + /* avoid underflow, don't go below (0,1) */ + if (log < d_log || (log == d_log && seg == 0)) + { + log = 0; + seg = 1; + } + else + log = log - d_log; + + /* don't delete WAL segments newer than the calculated segment */ + if (log < *logId || (log == *logId && seg < *logSeg)) + { + *logId = log; + *logSeg = seg; + } +} + +/* * Write a NEXTOID log record */ void @@ -9549,10 +9568,14 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS) /* * Get latest redo apply position. * + * Optionally, returns the end byte position of the last restored + * WAL segment. Callers not interested in that value may pass + * NULL for restoreLastRecPtr. + * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(void) +GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -9560,12 +9583,34 @@ GetXLogReplayRecPtr(void) SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->recoveryLastRecPtr; + if (restoreLastRecPtr) + *restoreLastRecPtr = xlogctl->restoreLastRecPtr; SpinLockRelease(&xlogctl->info_lck); return recptr; } /* + * Get current standby flush position, ie, the last WAL position + * known to be fsync'd to disk in standby. + */ +XLogRecPtr +GetStandbyFlushRecPtr(void) +{ + XLogRecPtr receivePtr; + XLogRecPtr replayPtr; + XLogRecPtr restorePtr; + + receivePtr = GetWalRcvWriteRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(&restorePtr); + + if (XLByteLT(receivePtr, replayPtr)) + return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; + else + return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr; +} + +/* * Report the last WAL replay location (same format as pg_start_backup etc) * * This is useful for determining how much of WAL is visible to read-only @@ -9577,7 +9622,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; - recptr = GetXLogReplayRecPtr(); + recptr = GetXLogReplayRecPtr(NULL); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 9bcbf212f8c..412bc96465c 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -2317,6 +2317,26 @@ reaper(SIGNAL_ARGS) pmState = PM_RUN; /* + * Kill any walsenders to force the downstream standby(s) to + * reread the timeline history file, adjust their timelines and + * establish replication connections again. This is required + * because the timeline of cascading standby is not consistent + * with that of cascaded one just after failover. We LOG this + * message since we need to leave a record to explain this + * disconnection. + * + * XXX should avoid the need for disconnection. When we do, + * am_cascading_walsender should be replaced with RecoveryInProgress() + */ + if (max_wal_senders > 0) + { + ereport(LOG, + (errmsg("terminating walsender all processes to force cascaded" + "standby(s) to update timeline and reconnect"))); + SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND); + } + + /* * Crank up the background writer, if we didn't do that already * when we entered consistent recovery state. It doesn't matter * if this fails, we'll just try again later. diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index bcde19c71b6..74d28440bf4 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd) MemoryContext old_context; basebackup_options opt; + if (am_cascading_walsender) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("recovery is still in progress, can't accept WAL streaming connections for backup"))); + parse_basebackup_options(cmd->options, &opt); backup_context = AllocSetContextCreate(CurrentMemoryContext, diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index b73d225a8ef..32db2bc4c52 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void) int priority = 0; bool found = false; + /* + * Since synchronous cascade replication is not allowed, we always + * set the priority of cascading walsender to zero. + */ + if (am_cascading_walsender) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ea6f6cdcdaf..c24fa87394d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -44,6 +44,7 @@ #include "miscadmin.h" #include "replication/walprotocol.h" #include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/procarray.h" @@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying) } SpinLockRelease(&walrcv->mutex); - /* Signal the startup process that new WAL has arrived */ + /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); + if (AllowCascadeReplication()) + WalSndWakeup(); /* Report XLOG streaming progress in PS display */ if (update_process_title) @@ -625,7 +628,7 @@ XLogWalRcvSendReply(void) /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; - reply_message.apply = GetXLogReplayRecPtr(); + reply_message.apply = GetXLogReplayRecPtr(NULL); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc5b3300d23..63a63048dbb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -48,6 +48,7 @@ #include "replication/basebackup.h" #include "replication/replnodes.h" #include "replication/walprotocol.h" +#include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ +bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ @@ -135,10 +137,7 @@ WalSenderMain(void) { MemoryContext walsnd_context; - if (RecoveryInProgress()) - ereport(FATAL, - (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("recovery is still in progress, can't accept WAL streaming connections"))); + am_cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); @@ -165,6 +164,12 @@ WalSenderMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* + * Use the recovery target timeline ID during recovery + */ + if (am_cascading_walsender) + ThisTimeLineID = GetRecoveryTargetTLI(); + /* Tell the standby that walsender is ready for receiving commands */ ReadyForQuery(DestRemote); @@ -290,7 +295,7 @@ IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = GetInsertRecPtr(); + logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); @@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd) SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); /* - * Check that we're logging enough information in the WAL for - * log-shipping. + * We assume here that we're logging enough information in the WAL for + * log-shipping, since this is checked in PostmasterMain(). * - * NOTE: This only checks the current value of wal_level. Even if the - * current setting is not 'minimal', there can be old WAL in the pg_xlog - * directory that was created with 'minimal'. So this is not bulletproof, - * the purpose is just to give a user-friendly error message that hints - * how to configure the system correctly. + * NOTE: wal_level can only change at shutdown, so in most cases it is + * difficult for there to be WAL data that we can still see that was written + * at wal_level='minimal'. */ - if (wal_level == WAL_LEVEL_MINIMAL) - ereport(FATAL, - (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("standby connections not allowed because wal_level=minimal"))); /* * When we first start replication the standby will be behind the primary. @@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void) SpinLockRelease(&walsnd->mutex); } - SyncRepReleaseWaiters(); + if (!am_cascading_walsender) + SyncRepReleaseWaiters(); } /* @@ -764,6 +764,8 @@ WalSndLoop(void) /* * When SIGUSR2 arrives, we send any outstanding logs up to the * shutdown checkpoint record (i.e., the latest record) and exit. + * This may be a normal termination at shutdown, or a promotion, + * the walsender is not sure which. */ if (walsender_ready_to_stop && !pq_is_send_pending()) { @@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg) } /* - * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' * * XXX probably this should be improved to suck data directly from the * WAL buffers when possible. @@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg) * more than one. */ void -XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) +XLogRead(char *buf, XLogRecPtr startptr, Size count) { - XLogRecPtr startRecPtr = recptr; - char path[MAXPGPATH]; + char *p; + XLogRecPtr recptr; + Size nbytes; uint32 lastRemovedLog; uint32 lastRemovedSeg; uint32 log; uint32 seg; +retry: + p = buf; + recptr = startptr; + nbytes = count; + while (nbytes > 0) { uint32 startoff; @@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) { + char path[MAXPGPATH]; + /* Switch to another logfile segment */ if (sendFile >= 0) close(sendFile); @@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) else segbytes = nbytes; - readbytes = read(sendFile, buf, segbytes); + readbytes = read(sendFile, p, segbytes); if (readbytes <= 0) ereport(ERROR, (errcode_for_file_access(), @@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) sendOff += readbytes; nbytes -= readbytes; - buf += readbytes; + p += readbytes; } /* @@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) * already have been overwritten with new WAL records. */ XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg); - XLByteToSeg(startRecPtr, log, seg); + XLByteToSeg(startptr, log, seg); if (log < lastRemovedLog || (log == lastRemovedLog && seg <= lastRemovedSeg)) { @@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) errmsg("requested WAL segment %s has already been removed", filename))); } + + /* + * During recovery, the currently-open WAL file might be replaced with + * the file of the same name retrieved from archive. So we always need + * to check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendFile >= 0) + { + close(sendFile); + sendFile = -1; + + goto retry; + } + } } /* @@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) @@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup) return; } +/* + * Request walsenders to reload the currently-open WAL file + */ +void +WalSndRqstFileReload(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + if (walsnd->pid == 0) + continue; + + SpinLockAcquire(&walsnd->mutex); + walsnd->needreload = true; + SpinLockRelease(&walsnd->mutex); + } +} + /* SIGHUP: set flag to re-read config file at next convenient time */ static void WalSndSigHupHandler(SIGNAL_ARGS) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 7056fd61891..cdbf63fa76e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -221,6 +221,9 @@ extern int wal_level; /* Do we need to WAL-log information required only for Hot Standby? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY) +/* Can we allow the standby to accept replication connection from another standby? */ +#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0) + #ifdef WAL_DEBUG extern bool XLOG_DEBUG; #endif @@ -292,7 +295,8 @@ extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(void); +extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); +extern XLogRecPtr GetStandbyFlushRecPtr(void); extern void UpdateControlFile(void); extern uint64 GetSystemIdentifier(void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 6ee8668d0a4..cb8e70ef38a 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -35,6 +35,7 @@ typedef struct WalSnd pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + bool needreload; /* does currently-open file need to be reloaded? */ /* * The xlog locations that have been written, flushed, and applied by @@ -92,6 +93,7 @@ extern WalSndCtlData *WalSndCtl; /* global state */ extern bool am_walsender; +extern bool am_cascading_walsender; extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_ready_to_stop; @@ -106,7 +108,8 @@ extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); extern void WalSndWakeup(void); extern void WalSndSetState(WalSndState state); -extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); +extern void XLogRead(char *buf, XLogRecPtr startptr, Size count); +extern void WalSndRqstFileReload(void); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS); |