diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xlog.c | 58 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 36 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 71 | ||||
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 45 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 9 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 1 | ||||
-rw-r--r-- | src/include/access/xlog.h | 1 | ||||
-rw-r--r-- | src/include/replication/walprotocol.h | 9 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 1 |
9 files changed, 204 insertions, 27 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6fdaaff9140..3ba1f29197f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -158,6 +158,11 @@ static XLogRecPtr LastRec; * known, need to check the shared state". */ static bool LocalRecoveryInProgress = true; +/* + * Local copy of SharedHotStandbyActive variable. False actually means "not + * known, need to check the shared state". + */ +static bool LocalHotStandbyActive = false; /* * Local state for XLogInsertAllowed(): @@ -406,6 +411,12 @@ typedef struct XLogCtlData bool SharedRecoveryInProgress; /* + * SharedHotStandbyActive indicates if we're still in crash or archive + * recovery. Protected by info_lck. + */ + bool SharedHotStandbyActive; + + /* * recoveryWakeupLatch is used to wake up the startup process to * continue WAL replay, if it is waiting for WAL to arrive or failover * trigger file to appear. @@ -4917,6 +4928,7 @@ XLOGShmemInit(void) */ XLogCtl->XLogCacheBlck = XLOGbuffers - 1; XLogCtl->SharedRecoveryInProgress = true; + XLogCtl->SharedHotStandbyActive = false; XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); SpinLockInit(&XLogCtl->info_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); @@ -6790,8 +6802,6 @@ StartupXLOG(void) static void CheckRecoveryConsistency(void) { - static bool backendsAllowed = false; - /* * Have we passed our safe starting point? */ @@ -6811,11 +6821,19 @@ CheckRecoveryConsistency(void) * enabling connections. */ if (standbyState == STANDBY_SNAPSHOT_READY && - !backendsAllowed && + !LocalHotStandbyActive && reachedMinRecoveryPoint && IsUnderPostmaster) { - backendsAllowed = true; + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->SharedHotStandbyActive = true; + SpinLockRelease(&xlogctl->info_lck); + + LocalHotStandbyActive = true; + SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY); } } @@ -6863,6 +6881,38 @@ RecoveryInProgress(void) } /* + * Is HotStandby active yet? This is only important in special backends + * since normal backends won't ever be able to connect until this returns + * true. Postmaster knows this by way of signal, not via shared memory. + * + * Unlike testing standbyState, this works in any process that's connected to + * shared memory. + */ +bool +HotStandbyActive(void) +{ + /* + * We check shared state each time only until Hot Standby is active. We + * can't de-activate Hot Standby, so there's no need to keep checking after + * the shared variable has once been seen true. + */ + if (LocalHotStandbyActive) + return true; + else + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + /* spinlock is essential on machines with weak memory ordering! */ + SpinLockAcquire(&xlogctl->info_lck); + LocalHotStandbyActive = xlogctl->SharedHotStandbyActive; + SpinLockRelease(&xlogctl->info_lck); + + return LocalHotStandbyActive; + } +} + +/* * Is this process allowed to insert new WAL records? * * Ordinarily this is essentially equivalent to !RecoveryInProgress(). diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1e5247f12a..ee09468db17 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -38,6 +38,7 @@ #include <signal.h> #include <unistd.h> +#include "access/transam.h" #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -45,6 +46,7 @@ #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -56,6 +58,7 @@ bool am_walreceiver; /* GUC variable */ int wal_receiver_status_interval; +bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; @@ -610,16 +613,43 @@ XLogWalRcvSendReply(void) wal_receiver_status_interval * 1000)) return; - /* Construct a new message. */ + /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + /* + * Get the OldestXmin and its associated epoch + */ + if (hot_standby_feedback && HotStandbyActive()) + { + TransactionId nextXid; + uint32 nextEpoch; + + reply_message.xmin = GetOldestXmin(true, false); + + /* + * Get epoch and adjust if nextXid and oldestXmin are different + * sides of the epoch boundary. + */ + GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (nextXid < reply_message.xmin) + nextEpoch--; + reply_message.epoch = nextEpoch; + } + else + { + reply_message.xmin = InvalidTransactionId; + reply_message.epoch = 0; + } + + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", reply_message.write.xlogid, reply_message.write.xrecoff, reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff); + reply_message.apply.xlogid, reply_message.apply.xrecoff, + reply_message.xmin, + reply_message.epoch); /* Prepend with the message type and send it. */ buf[0] = 'r'; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fe9961638c6..0fdf722f852 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -53,6 +53,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -502,6 +503,7 @@ ProcessStandbyReplyMessage(void) { StandbyReplyMessage reply; char msgtype; + TransactionId newxmin = InvalidTransactionId; resetStringInfo(&reply_message); @@ -531,10 +533,12 @@ ProcessStandbyReplyMessage(void) pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage)); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ", + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", reply.write.xlogid, reply.write.xrecoff, reply.flush.xlogid, reply.flush.xrecoff, - reply.apply.xlogid, reply.apply.xrecoff); + reply.apply.xlogid, reply.apply.xrecoff, + reply.xmin, + reply.epoch); /* * Update shared state for this WalSender process @@ -550,6 +554,69 @@ ProcessStandbyReplyMessage(void) walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } + + /* + * Update the WalSender's proc xmin to allow it to be visible + * to snapshots. This will hold back the removal of dead rows + * and thereby prevent the generation of cleanup conflicts + * on the standby server. + */ + if (TransactionIdIsValid(reply.xmin)) + { + TransactionId nextXid; + uint32 nextEpoch; + bool epochOK; + + GetNextXidAndEpoch(&nextXid, &nextEpoch); + + /* + * Epoch of oldestXmin should be same as standby or + * if the counter has wrapped, then one less than reply. + */ + if (reply.xmin <= nextXid) + { + if (reply.epoch == nextEpoch) + epochOK = true; + } + else + { + if (nextEpoch > 0 && reply.epoch == nextEpoch - 1) + epochOK = true; + } + + /* + * Feedback from standby must not go backwards, nor should it go + * forwards further than our most recent xid. + */ + if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid)) + { + if (!TransactionIdIsValid(MyProc->xmin)) + { + TransactionId oldestXmin = GetOldestXmin(true, true); + if (TransactionIdPrecedes(oldestXmin, reply.xmin)) + newxmin = reply.xmin; + else + newxmin = oldestXmin; + } + else + { + if (TransactionIdPrecedes(MyProc->xmin, reply.xmin)) + newxmin = reply.xmin; + else + newxmin = MyProc->xmin; /* stay the same */ + } + } + } + + /* + * Grab the ProcArrayLock to set xmin, or invalidate for bad reply + */ + if (MyProc->xmin != newxmin) + { + LWLockAcquire(ProcArrayLock, LW_SHARED); + MyProc->xmin = newxmin; + LWLockRelease(ProcArrayLock); + } } /* Main loop of walsender process */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8b36df4759f..2473881e8fb 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1034,7 +1034,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM)) continue; - if (allDbs || proc->databaseId == MyDatabaseId) + if (allDbs || + proc->databaseId == MyDatabaseId || + proc->databaseId == 0) /* include WalSender */ { /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = proc->xid; @@ -1066,28 +1068,35 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) */ TransactionId kaxmin = KnownAssignedXidsGetOldestXmin(); + LWLockRelease(ProcArrayLock); + if (TransactionIdIsNormal(kaxmin) && TransactionIdPrecedes(kaxmin, result)) result = kaxmin; } + else + { + /* + * No other information needed, so release the lock immediately. + */ + LWLockRelease(ProcArrayLock); - LWLockRelease(ProcArrayLock); - - /* - * Compute the cutoff XID, being careful not to generate a "permanent" - * XID. - * - * vacuum_defer_cleanup_age provides some additional "slop" for the - * benefit of hot standby queries on slave servers. This is quick and - * dirty, and perhaps not all that useful unless the master has a - * predictable transaction rate, but it's what we've got. Note that we - * are assuming vacuum_defer_cleanup_age isn't large enough to cause - * wraparound --- so guc.c should limit it to no more than the - * xidStopLimit threshold in varsup.c. - */ - result -= vacuum_defer_cleanup_age; - if (!TransactionIdIsNormal(result)) - result = FirstNormalTransactionId; + /* + * Compute the cutoff XID, being careful not to generate a "permanent" + * XID. We need do this only on the primary, never on standby. + * + * vacuum_defer_cleanup_age provides some additional "slop" for the + * benefit of hot standby queries on slave servers. This is quick and + * dirty, and perhaps not all that useful unless the master has a + * predictable transaction rate, but it's what we've got. Note that we + * are assuming vacuum_defer_cleanup_age isn't large enough to cause + * wraparound --- so guc.c should limit it to no more than the + * xidStopLimit threshold in varsup.c. + */ + result -= vacuum_defer_cleanup_age; + if (!TransactionIdIsNormal(result)) + result = FirstNormalTransactionId; + } return result; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 56885576b92..55cbf757b49 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1279,6 +1279,15 @@ static struct config_bool ConfigureNamesBool[] = }, { + {"hot_standby_feedback", PGC_SIGHUP, WAL_STANDBY_SERVERS, + gettext_noop("Allows feedback from a hot standby primary that will avoid query conflicts."), + NULL + }, + &hot_standby_feedback, + false, NULL, NULL + }, + + { {"allow_system_table_mods", PGC_POSTMASTER, DEVELOPER_OPTIONS, gettext_noop("Allows modifications of the structure of system tables."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 3b00a032621..6726733235b 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -196,6 +196,7 @@ #hot_standby = off # "on" allows queries during recovery # (change requires restart) +#hot_standby_feedback = off # info from standby to prevent query conflicts #max_standby_archive_delay = 30s # max delay before canceling queries # when reading WAL from archive; # -1 allows indefinite delay diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 7cd07a25d22..7e9bad6e3a4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -289,6 +289,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec); extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg); extern bool RecoveryInProgress(void); +extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(void); diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index 32c49620c1d..da94b6b2f30 100644 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -56,6 +56,15 @@ typedef struct XLogRecPtr flush; XLogRecPtr apply; + /* + * The current xmin and epoch from the standby, for Hot Standby feedback. + * This may be invalid if the standby-side does not support feedback, + * or Hot Standby is not yet available. + */ + TransactionId xmin; + uint32 epoch; + + /* Sender's system clock at the time of transmission */ TimestampTz sendTime; } StandbyReplyMessage; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index aa5bfb7aea1..9137b861c7d 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -18,6 +18,7 @@ extern bool am_walreceiver; extern int wal_receiver_status_interval; +extern bool hot_standby_feedback; /* * MAXCONNINFO: maximum size of a connection string. |