aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c95
1 files changed, 84 insertions, 11 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b333d820c72..7f63185b1cc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -39,6 +39,7 @@
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/barrier.h"
@@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false;
static char *PrimaryConnInfo = NULL;
+static char *PrimarySlotName = NULL;
static char *TriggerFile = NULL;
/* are we currently in standby mode? */
@@ -485,6 +487,8 @@ typedef struct XLogCtlData
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
+ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
+
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
* segment */
@@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void);
static void CreateEndOfRecoveryRecord(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2909,6 +2914,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
}
/*
+ * Record the LSN up to which we can remove WAL because it's not required by
+ * any replication slot.
+ */
+void
+XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->replicationSlotMinLSN = lsn;
+ SpinLockRelease(&xlogctl->info_lck);
+}
+
+
+/*
+ * Return the oldest LSN we must retain to satisfy the needs of some
+ * replication slot.
+ */
+static XLogRecPtr
+XLogGetReplicationSlotMinimumLSN(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr retval;
+ SpinLockAcquire(&xlogctl->info_lck);
+ retval = xlogctl->replicationSlotMinLSN;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return retval;
+}
+
+/*
* Advance minRecoveryPoint in control file.
*
* If we crash during recovery, we must reach this point again before the
@@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void)
(errmsg_internal("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
+ else if (strcmp(item->name, "primary_slotname") == 0)
+ {
+ ReplicationSlotValidateName(item->value, ERROR);
+ PrimarySlotName = pstrdup(item->value);
+ ereport(DEBUG2,
+ (errmsg_internal("primary_slotname = '%s'",
+ PrimarySlotName)));
+ }
else if (strcmp(item->name, "trigger_file") == 0)
{
TriggerFile = pstrdup(item->value);
@@ -6506,6 +6552,12 @@ StartupXLOG(void)
XLogCtl->ckptXid = checkPoint.nextXid;
/*
+ * Initialize replication slots, before there's a chance to remove
+ * required resources.
+ */
+ StartupReplicationSlots(checkPoint.redo);
+
+ /*
* Startup MultiXact. We need to do this early for two reasons: one
* is that we might try to access multixacts when we do tuple freezing,
* and the other is we need its state initialized because we attempt
@@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointMultiXact();
CheckPointPredicate();
CheckPointRelationMap();
+ CheckPointReplicationSlots();
CheckPointBuffers(flags); /* performs all required fsyncs */
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
@@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags)
/*
* Retreat *logSegNo to the last segment that we need to retain because of
- * wal_keep_segments. This is calculated by subtracting wal_keep_segments
- * from the given xlog location, recptr.
+ * either wal_keep_segments or replication slots.
+ *
+ * This is calculated by subtracting wal_keep_segments from the given xlog
+ * location, recptr and by making sure that that result is below the
+ * requirement of replication slots.
*/
static void
KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
{
XLogSegNo segno;
-
- if (wal_keep_segments == 0)
- return;
+ XLogRecPtr keep;
XLByteToSeg(recptr, segno);
+ keep = XLogGetReplicationSlotMinimumLSN();
- /* avoid underflow, don't go below 1 */
- if (segno <= wal_keep_segments)
- segno = 1;
- else
- segno = segno - wal_keep_segments;
+ /* compute limit for wal_keep_segments first */
+ if (wal_keep_segments > 0)
+ {
+ /* avoid underflow, don't go below 1 */
+ if (segno <= wal_keep_segments)
+ segno = 1;
+ else
+ segno = segno - wal_keep_segments;
+ }
+
+ /* then check whether slots limit removal further */
+ if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+ {
+ XLogRecPtr slotSegNo;
+
+ XLByteToSeg(keep, slotSegNo);
+
+ if (slotSegNo <= 0)
+ segno = 1;
+ else if (slotSegNo < segno)
+ segno = slotSegNo;
+ }
/* don't delete WAL segments newer than the calculated segment */
if (segno < *logSegNo)
@@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
tli, curFileTLI);
}
curFileTLI = tli;
- RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+ RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
+ PrimarySlotName);
receivedUpto = 0;
}