aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2011-07-19 03:40:03 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2011-07-19 03:40:03 +0100
commit5286105800c7d5902f98f32e11b209c471c0c69c (patch)
tree59a5793296a3af901f864a16748e944edbd900ac /src/backend/access/transam/xlog.c
parent3d4890c0c5d27dfdf7d1a8816d7bdcdba3c39d21 (diff)
downloadpostgresql-5286105800c7d5902f98f32e11b209c471c0c69c.tar.gz
postgresql-5286105800c7d5902f98f32e11b209c471c0c69c.zip
Cascading replication feature for streaming log-based replication.
Standby servers can now have WALSender processes, which can work with either WALReceiver or archive_commands to pass data. Fully updated docs, including new conceptual terms of sending server, upstream and downstream servers. WALSenders terminated when promote to master. Fujii Masao, review, rework and doc rewrite by Simon Riggs
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c265
1 files changed, 155 insertions, 110 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 662b26bc27d..6a6959f728c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -446,6 +446,8 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* end of the last record restored from the archive */
+ XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
bool recoveryPause;
@@ -612,6 +614,7 @@ static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg);
static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2729,6 +2732,61 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
elog(ERROR, "invalid XLogFileRead source %d", source);
}
+ /*
+ * If the segment was fetched from archival storage, replace
+ * the existing xlog segment (if any) with the archival version.
+ */
+ if (source == XLOG_FROM_ARCHIVE)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr endptr;
+ char xlogfpath[MAXPGPATH];
+ bool reload = false;
+ struct stat statbuf;
+
+ XLogFilePath(xlogfpath, tli, log, seg);
+ if (stat(xlogfpath, &statbuf) == 0)
+ {
+ if (unlink(xlogfpath) != 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m",
+ xlogfpath)));
+ reload = true;
+ }
+
+ if (rename(path, xlogfpath) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ path, xlogfpath)));
+
+ /*
+ * If the existing segment was replaced, since walsenders might have
+ * it open, request them to reload a currently-open segment.
+ */
+ if (reload)
+ WalSndRqstFileReload();
+
+ /*
+ * Calculate the end location of the restored WAL file and save it in
+ * shmem. It's used as current standby flush position, and cascading
+ * walsenders try to send WAL records up to this location.
+ */
+ endptr.xlogid = log;
+ endptr.xrecoff = seg * XLogSegSize;
+ XLByteAdvance(endptr, XLogSegSize);
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->restoreLastRecPtr = endptr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ /* Signal walsender that new WAL has arrived */
+ if (AllowCascadeReplication())
+ WalSndWakeup();
+ }
+
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd >= 0)
{
@@ -3361,18 +3419,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
{
- /*
- * Normally we don't delete old XLOG files during recovery to
- * avoid accidentally deleting a file that looks stale due to a
- * bug or hardware issue, but in fact contains important data.
- * During streaming recovery, however, we will eventually fill the
- * disk if we never clean up, so we have to. That's not an issue
- * with file-based archive recovery because in that case we
- * restore one XLOG file at a time, on-demand, and with a
- * different filename that can't be confused with regular XLOG
- * files.
- */
- if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
+ if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
@@ -5484,62 +5531,23 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
}
/*
- * If the segment was fetched from archival storage, we want to replace
- * the existing xlog segment (if any) with the archival version. This is
- * because whatever is in XLOGDIR is very possibly older than what we have
- * from the archives, since it could have come from restoring a PGDATA
- * backup. In any case, the archival version certainly is more
- * descriptive of what our current database state is, because that is what
- * we replayed from.
+ * If we are establishing a new timeline, we have to copy data from
+ * the last WAL segment of the old timeline to create a starting WAL
+ * segment for the new timeline.
*
- * Note that if we are establishing a new timeline, ThisTimeLineID is
- * already set to the new value, and so we will create a new file instead
- * of overwriting any existing file. (This is, in fact, always the case
- * at present.)
+ * Notify the archiver that the last WAL segment of the old timeline
+ * is ready to copy to archival storage. Otherwise, it is not archived
+ * for a while.
*/
- snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
- XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
-
- if (restoredFromArchive)
+ if (endTLI != ThisTimeLineID)
{
- ereport(DEBUG3,
- (errmsg_internal("moving last restored xlog to \"%s\"",
- xlogpath)));
- unlink(xlogpath); /* might or might not exist */
- if (rename(recoveryPath, xlogpath) != 0)
- ereport(FATAL,
- (errcode_for_file_access(),
- errmsg("could not rename file \"%s\" to \"%s\": %m",
- recoveryPath, xlogpath)));
- /* XXX might we need to fix permissions on the file? */
- }
- else
- {
- /*
- * If the latest segment is not archival, but there's still a
- * RECOVERYXLOG laying about, get rid of it.
- */
- unlink(recoveryPath); /* ignore any error */
+ XLogFileCopy(endLogId, endLogSeg,
+ endTLI, endLogId, endLogSeg);
- /*
- * If we are establishing a new timeline, we have to copy data from
- * the last WAL segment of the old timeline to create a starting WAL
- * segment for the new timeline.
- *
- * Notify the archiver that the last WAL segment of the old timeline
- * is ready to copy to archival storage. Otherwise, it is not archived
- * for a while.
- */
- if (endTLI != ThisTimeLineID)
+ if (XLogArchivingActive())
{
- XLogFileCopy(endLogId, endLogSeg,
- endTLI, endLogId, endLogSeg);
-
- if (XLogArchivingActive())
- {
- XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
- XLogArchiveNotify(xlogpath);
- }
+ XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
+ XLogArchiveNotify(xlogpath);
}
}
@@ -5550,6 +5558,13 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
XLogArchiveCleanup(xlogpath);
+ /*
+ * Since there might be a partial WAL segment named RECOVERYXLOG,
+ * get rid of it.
+ */
+ snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
+ unlink(recoveryPath); /* ignore any error */
+
/* Get rid of any remaining recovered timeline-history file, too */
snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
unlink(recoveryPath); /* ignore any error */
@@ -7871,46 +7886,7 @@ CreateCheckPoint(int flags)
*/
if (_logId || _logSeg)
{
- /*
- * Calculate the last segment that we need to retain because of
- * wal_keep_segments, by subtracting wal_keep_segments from the new
- * checkpoint location.
- */
- if (wal_keep_segments > 0)
- {
- uint32 log;
- uint32 seg;
- int d_log;
- int d_seg;
-
- XLByteToSeg(recptr, log, seg);
-
- d_seg = wal_keep_segments % XLogSegsPerFile;
- d_log = wal_keep_segments / XLogSegsPerFile;
- if (seg < d_seg)
- {
- d_log += 1;
- seg = seg - d_seg + XLogSegsPerFile;
- }
- else
- seg = seg - d_seg;
- /* avoid underflow, don't go below (0,1) */
- if (log < d_log || (log == d_log && seg == 0))
- {
- log = 0;
- seg = 1;
- }
- else
- log = log - d_log;
-
- /* don't delete WAL segments newer than the calculated segment */
- if (log < _logId || (log == _logId && seg < _logSeg))
- {
- _logId = log;
- _logSeg = seg;
- }
- }
-
+ KeepLogSeg(recptr, &_logId, &_logSeg);
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, recptr);
}
@@ -8151,17 +8127,16 @@ CreateRestartPoint(int flags)
/*
* Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from
- * growing full. We don't need do this during normal recovery, but during
- * streaming recovery we have to or the disk will eventually fill up from
- * old log files streamed from master.
+ * growing full.
*/
- if (WalRcvInProgress() && (_logId || _logSeg))
+ if (_logId || _logSeg)
{
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetWalRcvWriteRecPtr(NULL);
+ endptr = GetStandbyFlushRecPtr();
+ KeepLogSeg(endptr, &_logId, &_logSeg);
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr);
@@ -8207,6 +8182,50 @@ CreateRestartPoint(int flags)
}
/*
+ * Calculate the last segment that we need to retain because of
+ * wal_keep_segments, by subtracting wal_keep_segments from
+ * the given xlog location, recptr.
+ */
+static void
+KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg)
+{
+ uint32 log;
+ uint32 seg;
+ int d_log;
+ int d_seg;
+
+ if (wal_keep_segments == 0)
+ return;
+
+ XLByteToSeg(recptr, log, seg);
+
+ d_seg = wal_keep_segments % XLogSegsPerFile;
+ d_log = wal_keep_segments / XLogSegsPerFile;
+ if (seg < d_seg)
+ {
+ d_log += 1;
+ seg = seg - d_seg + XLogSegsPerFile;
+ }
+ else
+ seg = seg - d_seg;
+ /* avoid underflow, don't go below (0,1) */
+ if (log < d_log || (log == d_log && seg == 0))
+ {
+ log = 0;
+ seg = 1;
+ }
+ else
+ log = log - d_log;
+
+ /* don't delete WAL segments newer than the calculated segment */
+ if (log < *logId || (log == *logId && seg < *logSeg))
+ {
+ *logId = log;
+ *logSeg = seg;
+ }
+}
+
+/*
* Write a NEXTOID log record
*/
void
@@ -9549,10 +9568,14 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
/*
* Get latest redo apply position.
*
+ * Optionally, returns the end byte position of the last restored
+ * WAL segment. Callers not interested in that value may pass
+ * NULL for restoreLastRecPtr.
+ *
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
-GetXLogReplayRecPtr(void)
+GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9560,12 +9583,34 @@ GetXLogReplayRecPtr(void)
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr;
+ if (restoreLastRecPtr)
+ *restoreLastRecPtr = xlogctl->restoreLastRecPtr;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}
/*
+ * Get current standby flush position, ie, the last WAL position
+ * known to be fsync'd to disk in standby.
+ */
+XLogRecPtr
+GetStandbyFlushRecPtr(void)
+{
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+ XLogRecPtr restorePtr;
+
+ receivePtr = GetWalRcvWriteRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr(&restorePtr);
+
+ if (XLByteLT(receivePtr, replayPtr))
+ return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+ else
+ return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+}
+
+/*
* Report the last WAL replay location (same format as pg_start_backup etc)
*
* This is useful for determining how much of WAL is visible to read-only
@@ -9577,7 +9622,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetXLogReplayRecPtr();
+ recptr = GetXLogReplayRecPtr(NULL);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();