diff options
author | Robert Haas <rhaas@postgresql.org> | 2014-01-31 22:45:17 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2014-01-31 22:45:36 -0500 |
commit | 858ec11858a914d4c380971985709b6d6b7dd6fc (patch) | |
tree | 59eb508185cd8544c3485919a25dee15f3818c21 /src/backend/access/transam/xlog.c | |
parent | 5bdef38b8917cfbe206d14969c61a5d38fc822b6 (diff) | |
download | postgresql-858ec11858a914d4c380971985709b6d6b7dd6fc.tar.gz postgresql-858ec11858a914d4c380971985709b6d6b7dd6fc.zip |
Introduce replication slots.
Replication slots are a crash-safe data structure which can be created
on either a master or a standby to prevent premature removal of
write-ahead log segments needed by a standby, as well as (with
hot_standby_feedback=on) pruning of tuples whose removal would cause
replication conflicts. Slots have some advantages over existing
techniques, as explained in the documentation.
In a few places, we refer to the type of replication slots introduced
by this patch as "physical" slots, because forthcoming patches for
logical decoding will also have slots, but with somewhat different
properties.
Andres Freund and Robert Haas
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 95 |
1 files changed, 84 insertions, 11 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b333d820c72..7f63185b1cc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,6 +39,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/barrier.h" @@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime; /* options taken from recovery.conf for XLOG streaming */ static bool StandbyModeRequested = false; static char *PrimaryConnInfo = NULL; +static char *PrimarySlotName = NULL; static char *TriggerFile = NULL; /* are we currently in standby mode? */ @@ -485,6 +487,8 @@ typedef struct XLogCtlData uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ + XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ + XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG * segment */ @@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); @@ -2909,6 +2914,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) } /* + * Record the LSN up to which we can remove WAL because it's not required by + * any replication slot. + */ +void +XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->replicationSlotMinLSN = lsn; + SpinLockRelease(&xlogctl->info_lck); +} + + +/* + * Return the oldest LSN we must retain to satisfy the needs of some + * replication slot. + */ +static XLogRecPtr +XLogGetReplicationSlotMinimumLSN(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr retval; + SpinLockAcquire(&xlogctl->info_lck); + retval = xlogctl->replicationSlotMinLSN; + SpinLockRelease(&xlogctl->info_lck); + + return retval; +} + +/* * Advance minRecoveryPoint in control file. * * If we crash during recovery, we must reach this point again before the @@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void) (errmsg_internal("primary_conninfo = '%s'", PrimaryConnInfo))); } + else if (strcmp(item->name, "primary_slotname") == 0) + { + ReplicationSlotValidateName(item->value, ERROR); + PrimarySlotName = pstrdup(item->value); + ereport(DEBUG2, + (errmsg_internal("primary_slotname = '%s'", + PrimarySlotName))); + } else if (strcmp(item->name, "trigger_file") == 0) { TriggerFile = pstrdup(item->value); @@ -6506,6 +6552,12 @@ StartupXLOG(void) XLogCtl->ckptXid = checkPoint.nextXid; /* + * Initialize replication slots, before there's a chance to remove + * required resources. + */ + StartupReplicationSlots(checkPoint.redo); + + /* * Startup MultiXact. We need to do this early for two reasons: one * is that we might try to access multixacts when we do tuple freezing, * and the other is we need its state initialized because we attempt @@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointMultiXact(); CheckPointPredicate(); CheckPointRelationMap(); + CheckPointReplicationSlots(); CheckPointBuffers(flags); /* performs all required fsyncs */ /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); @@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags) /* * Retreat *logSegNo to the last segment that we need to retain because of - * wal_keep_segments. This is calculated by subtracting wal_keep_segments - * from the given xlog location, recptr. + * either wal_keep_segments or replication slots. + * + * This is calculated by subtracting wal_keep_segments from the given xlog + * location, recptr and by making sure that that result is below the + * requirement of replication slots. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { XLogSegNo segno; - - if (wal_keep_segments == 0) - return; + XLogRecPtr keep; XLByteToSeg(recptr, segno); + keep = XLogGetReplicationSlotMinimumLSN(); - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; + /* compute limit for wal_keep_segments first */ + if (wal_keep_segments > 0) + { + /* avoid underflow, don't go below 1 */ + if (segno <= wal_keep_segments) + segno = 1; + else + segno = segno - wal_keep_segments; + } + + /* then check whether slots limit removal further */ + if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + { + XLogRecPtr slotSegNo; + + XLByteToSeg(keep, slotSegNo); + + if (slotSegNo <= 0) + segno = 1; + else if (slotSegNo < segno) + segno = slotSegNo; + } /* don't delete WAL segments newer than the calculated segment */ if (segno < *logSegNo) @@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo, + PrimarySlotName); receivedUpto = 0; } |