aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c58
-rw-r--r--src/backend/replication/walreceiver.c36
-rw-r--r--src/backend/replication/walsender.c71
-rw-r--r--src/backend/storage/ipc/procarray.c45
-rw-r--r--src/backend/utils/misc/guc.c9
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/replication/walprotocol.h9
-rw-r--r--src/include/replication/walreceiver.h1
9 files changed, 204 insertions, 27 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6fdaaff9140..3ba1f29197f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -158,6 +158,11 @@ static XLogRecPtr LastRec;
* known, need to check the shared state".
*/
static bool LocalRecoveryInProgress = true;
+/*
+ * Local copy of SharedHotStandbyActive variable. False actually means "not
+ * known, need to check the shared state".
+ */
+static bool LocalHotStandbyActive = false;
/*
* Local state for XLogInsertAllowed():
@@ -406,6 +411,12 @@ typedef struct XLogCtlData
bool SharedRecoveryInProgress;
/*
+ * SharedHotStandbyActive indicates if we're still in crash or archive
+ * recovery. Protected by info_lck.
+ */
+ bool SharedHotStandbyActive;
+
+ /*
* recoveryWakeupLatch is used to wake up the startup process to
* continue WAL replay, if it is waiting for WAL to arrive or failover
* trigger file to appear.
@@ -4917,6 +4928,7 @@ XLOGShmemInit(void)
*/
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
XLogCtl->SharedRecoveryInProgress = true;
+ XLogCtl->SharedHotStandbyActive = false;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
SpinLockInit(&XLogCtl->info_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
@@ -6790,8 +6802,6 @@ StartupXLOG(void)
static void
CheckRecoveryConsistency(void)
{
- static bool backendsAllowed = false;
-
/*
* Have we passed our safe starting point?
*/
@@ -6811,11 +6821,19 @@ CheckRecoveryConsistency(void)
* enabling connections.
*/
if (standbyState == STANDBY_SNAPSHOT_READY &&
- !backendsAllowed &&
+ !LocalHotStandbyActive &&
reachedMinRecoveryPoint &&
IsUnderPostmaster)
{
- backendsAllowed = true;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->SharedHotStandbyActive = true;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ LocalHotStandbyActive = true;
+
SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY);
}
}
@@ -6863,6 +6881,38 @@ RecoveryInProgress(void)
}
/*
+ * Is HotStandby active yet? This is only important in special backends
+ * since normal backends won't ever be able to connect until this returns
+ * true. Postmaster knows this by way of signal, not via shared memory.
+ *
+ * Unlike testing standbyState, this works in any process that's connected to
+ * shared memory.
+ */
+bool
+HotStandbyActive(void)
+{
+ /*
+ * We check shared state each time only until Hot Standby is active. We
+ * can't de-activate Hot Standby, so there's no need to keep checking after
+ * the shared variable has once been seen true.
+ */
+ if (LocalHotStandbyActive)
+ return true;
+ else
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ /* spinlock is essential on machines with weak memory ordering! */
+ SpinLockAcquire(&xlogctl->info_lck);
+ LocalHotStandbyActive = xlogctl->SharedHotStandbyActive;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return LocalHotStandbyActive;
+ }
+}
+
+/*
* Is this process allowed to insert new WAL records?
*
* Ordinarily this is essentially equivalent to !RecoveryInProgress().
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b1e5247f12a..ee09468db17 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -38,6 +38,7 @@
#include <signal.h>
#include <unistd.h>
+#include "access/transam.h"
#include "access/xlog_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
@@ -45,6 +46,7 @@
#include "replication/walreceiver.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@@ -56,6 +58,7 @@ bool am_walreceiver;
/* GUC variable */
int wal_receiver_status_interval;
+bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
@@ -610,16 +613,43 @@ XLogWalRcvSendReply(void)
wal_receiver_status_interval * 1000))
return;
- /* Construct a new message. */
+ /* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+ /*
+ * Get the OldestXmin and its associated epoch
+ */
+ if (hot_standby_feedback && HotStandbyActive())
+ {
+ TransactionId nextXid;
+ uint32 nextEpoch;
+
+ reply_message.xmin = GetOldestXmin(true, false);
+
+ /*
+ * Get epoch and adjust if nextXid and oldestXmin are different
+ * sides of the epoch boundary.
+ */
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+ if (nextXid < reply_message.xmin)
+ nextEpoch--;
+ reply_message.epoch = nextEpoch;
+ }
+ else
+ {
+ reply_message.xmin = InvalidTransactionId;
+ reply_message.epoch = 0;
+ }
+
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
reply_message.write.xlogid, reply_message.write.xrecoff,
reply_message.flush.xlogid, reply_message.flush.xrecoff,
- reply_message.apply.xlogid, reply_message.apply.xrecoff);
+ reply_message.apply.xlogid, reply_message.apply.xrecoff,
+ reply_message.xmin,
+ reply_message.epoch);
/* Prepend with the message type and send it. */
buf[0] = 'r';
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fe9961638c6..0fdf722f852 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -53,6 +53,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -502,6 +503,7 @@ ProcessStandbyReplyMessage(void)
{
StandbyReplyMessage reply;
char msgtype;
+ TransactionId newxmin = InvalidTransactionId;
resetStringInfo(&reply_message);
@@ -531,10 +533,12 @@ ProcessStandbyReplyMessage(void)
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
reply.write.xlogid, reply.write.xrecoff,
reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff);
+ reply.apply.xlogid, reply.apply.xrecoff,
+ reply.xmin,
+ reply.epoch);
/*
* Update shared state for this WalSender process
@@ -550,6 +554,69 @@ ProcessStandbyReplyMessage(void)
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+
+ /*
+ * Update the WalSender's proc xmin to allow it to be visible
+ * to snapshots. This will hold back the removal of dead rows
+ * and thereby prevent the generation of cleanup conflicts
+ * on the standby server.
+ */
+ if (TransactionIdIsValid(reply.xmin))
+ {
+ TransactionId nextXid;
+ uint32 nextEpoch;
+ bool epochOK;
+
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+ /*
+ * Epoch of oldestXmin should be same as standby or
+ * if the counter has wrapped, then one less than reply.
+ */
+ if (reply.xmin <= nextXid)
+ {
+ if (reply.epoch == nextEpoch)
+ epochOK = true;
+ }
+ else
+ {
+ if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
+ epochOK = true;
+ }
+
+ /*
+ * Feedback from standby must not go backwards, nor should it go
+ * forwards further than our most recent xid.
+ */
+ if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ {
+ if (!TransactionIdIsValid(MyProc->xmin))
+ {
+ TransactionId oldestXmin = GetOldestXmin(true, true);
+ if (TransactionIdPrecedes(oldestXmin, reply.xmin))
+ newxmin = reply.xmin;
+ else
+ newxmin = oldestXmin;
+ }
+ else
+ {
+ if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
+ newxmin = reply.xmin;
+ else
+ newxmin = MyProc->xmin; /* stay the same */
+ }
+ }
+ }
+
+ /*
+ * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+ */
+ if (MyProc->xmin != newxmin)
+ {
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ MyProc->xmin = newxmin;
+ LWLockRelease(ProcArrayLock);
+ }
}
/* Main loop of walsender process */
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8b36df4759f..2473881e8fb 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1034,7 +1034,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
continue;
- if (allDbs || proc->databaseId == MyDatabaseId)
+ if (allDbs ||
+ proc->databaseId == MyDatabaseId ||
+ proc->databaseId == 0) /* include WalSender */
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = proc->xid;
@@ -1066,28 +1068,35 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
*/
TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
+ LWLockRelease(ProcArrayLock);
+
if (TransactionIdIsNormal(kaxmin) &&
TransactionIdPrecedes(kaxmin, result))
result = kaxmin;
}
+ else
+ {
+ /*
+ * No other information needed, so release the lock immediately.
+ */
+ LWLockRelease(ProcArrayLock);
- LWLockRelease(ProcArrayLock);
-
- /*
- * Compute the cutoff XID, being careful not to generate a "permanent"
- * XID.
- *
- * vacuum_defer_cleanup_age provides some additional "slop" for the
- * benefit of hot standby queries on slave servers. This is quick and
- * dirty, and perhaps not all that useful unless the master has a
- * predictable transaction rate, but it's what we've got. Note that we
- * are assuming vacuum_defer_cleanup_age isn't large enough to cause
- * wraparound --- so guc.c should limit it to no more than the
- * xidStopLimit threshold in varsup.c.
- */
- result -= vacuum_defer_cleanup_age;
- if (!TransactionIdIsNormal(result))
- result = FirstNormalTransactionId;
+ /*
+ * Compute the cutoff XID, being careful not to generate a "permanent"
+ * XID. We need do this only on the primary, never on standby.
+ *
+ * vacuum_defer_cleanup_age provides some additional "slop" for the
+ * benefit of hot standby queries on slave servers. This is quick and
+ * dirty, and perhaps not all that useful unless the master has a
+ * predictable transaction rate, but it's what we've got. Note that we
+ * are assuming vacuum_defer_cleanup_age isn't large enough to cause
+ * wraparound --- so guc.c should limit it to no more than the
+ * xidStopLimit threshold in varsup.c.
+ */
+ result -= vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(result))
+ result = FirstNormalTransactionId;
+ }
return result;
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 56885576b92..55cbf757b49 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1279,6 +1279,15 @@ static struct config_bool ConfigureNamesBool[] =
},
{
+ {"hot_standby_feedback", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+ gettext_noop("Allows feedback from a hot standby primary that will avoid query conflicts."),
+ NULL
+ },
+ &hot_standby_feedback,
+ false, NULL, NULL
+ },
+
+ {
{"allow_system_table_mods", PGC_POSTMASTER, DEVELOPER_OPTIONS,
gettext_noop("Allows modifications of the structure of system tables."),
NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 3b00a032621..6726733235b 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -196,6 +196,7 @@
#hot_standby = off # "on" allows queries during recovery
# (change requires restart)
+#hot_standby_feedback = off # info from standby to prevent query conflicts
#max_standby_archive_delay = 30s # max delay before canceling queries
# when reading WAL from archive;
# -1 allows indefinite delay
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7cd07a25d22..7e9bad6e3a4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -289,6 +289,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
extern bool RecoveryInProgress(void);
+extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(void);
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index 32c49620c1d..da94b6b2f30 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -56,6 +56,15 @@ typedef struct
XLogRecPtr flush;
XLogRecPtr apply;
+ /*
+ * The current xmin and epoch from the standby, for Hot Standby feedback.
+ * This may be invalid if the standby-side does not support feedback,
+ * or Hot Standby is not yet available.
+ */
+ TransactionId xmin;
+ uint32 epoch;
+
+
/* Sender's system clock at the time of transmission */
TimestampTz sendTime;
} StandbyReplyMessage;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index aa5bfb7aea1..9137b861c7d 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -18,6 +18,7 @@
extern bool am_walreceiver;
extern int wal_receiver_status_interval;
+extern bool hot_standby_feedback;
/*
* MAXCONNINFO: maximum size of a connection string.