diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 95 |
1 files changed, 84 insertions, 11 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b333d820c72..7f63185b1cc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,6 +39,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/barrier.h" @@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime; /* options taken from recovery.conf for XLOG streaming */ static bool StandbyModeRequested = false; static char *PrimaryConnInfo = NULL; +static char *PrimarySlotName = NULL; static char *TriggerFile = NULL; /* are we currently in standby mode? */ @@ -485,6 +487,8 @@ typedef struct XLogCtlData uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ + XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ + XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG * segment */ @@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); @@ -2909,6 +2914,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) } /* + * Record the LSN up to which we can remove WAL because it's not required by + * any replication slot. + */ +void +XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->replicationSlotMinLSN = lsn; + SpinLockRelease(&xlogctl->info_lck); +} + + +/* + * Return the oldest LSN we must retain to satisfy the needs of some + * replication slot. + */ +static XLogRecPtr +XLogGetReplicationSlotMinimumLSN(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr retval; + SpinLockAcquire(&xlogctl->info_lck); + retval = xlogctl->replicationSlotMinLSN; + SpinLockRelease(&xlogctl->info_lck); + + return retval; +} + +/* * Advance minRecoveryPoint in control file. * * If we crash during recovery, we must reach this point again before the @@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void) (errmsg_internal("primary_conninfo = '%s'", PrimaryConnInfo))); } + else if (strcmp(item->name, "primary_slotname") == 0) + { + ReplicationSlotValidateName(item->value, ERROR); + PrimarySlotName = pstrdup(item->value); + ereport(DEBUG2, + (errmsg_internal("primary_slotname = '%s'", + PrimarySlotName))); + } else if (strcmp(item->name, "trigger_file") == 0) { TriggerFile = pstrdup(item->value); @@ -6506,6 +6552,12 @@ StartupXLOG(void) XLogCtl->ckptXid = checkPoint.nextXid; /* + * Initialize replication slots, before there's a chance to remove + * required resources. + */ + StartupReplicationSlots(checkPoint.redo); + + /* * Startup MultiXact. We need to do this early for two reasons: one * is that we might try to access multixacts when we do tuple freezing, * and the other is we need its state initialized because we attempt @@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointMultiXact(); CheckPointPredicate(); CheckPointRelationMap(); + CheckPointReplicationSlots(); CheckPointBuffers(flags); /* performs all required fsyncs */ /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); @@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags) /* * Retreat *logSegNo to the last segment that we need to retain because of - * wal_keep_segments. This is calculated by subtracting wal_keep_segments - * from the given xlog location, recptr. + * either wal_keep_segments or replication slots. + * + * This is calculated by subtracting wal_keep_segments from the given xlog + * location, recptr and by making sure that that result is below the + * requirement of replication slots. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { XLogSegNo segno; - - if (wal_keep_segments == 0) - return; + XLogRecPtr keep; XLByteToSeg(recptr, segno); + keep = XLogGetReplicationSlotMinimumLSN(); - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; + /* compute limit for wal_keep_segments first */ + if (wal_keep_segments > 0) + { + /* avoid underflow, don't go below 1 */ + if (segno <= wal_keep_segments) + segno = 1; + else + segno = segno - wal_keep_segments; + } + + /* then check whether slots limit removal further */ + if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + { + XLogRecPtr slotSegNo; + + XLByteToSeg(keep, slotSegNo); + + if (slotSegNo <= 0) + segno = 1; + else if (slotSegNo < segno) + segno = slotSegNo; + } /* don't delete WAL segments newer than the calculated segment */ if (segno < *logSegNo) @@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo, + PrimarySlotName); receivedUpto = 0; } |