aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2011-02-16 19:29:37 +0000
committerSimon Riggs <simon@2ndQuadrant.com>2011-02-16 19:29:37 +0000
commitbca8b7f16a3e720794cb0afbdb3733be4f8d9c2c (patch)
tree4d2e650f058ffa99af77f9068bf64a223de17246 /src/backend/replication/walreceiver.c
parent65076269ea54a8cd6e39f066a208c7d13aceac0a (diff)
downloadpostgresql-bca8b7f16a3e720794cb0afbdb3733be4f8d9c2c.tar.gz
postgresql-bca8b7f16a3e720794cb0afbdb3733be4f8d9c2c.zip
Hot Standby feedback for avoidance of cleanup conflicts on standby.
Standby optionally sends back information about oldestXmin of queries which is then checked and applied to the WALSender's proc->xmin. GetOldestXmin() is modified slightly to agree with GetSnapshotData(), so that all backends on primary include WALSender within their snapshots. Note this does nothing to change the snapshot xmin on either master or standby. Feedback piggybacks on the standby reply message. vacuum_defer_cleanup_age is no longer used on standby, though parameter still exists on primary, since some use cases still exist. Simon Riggs, review comments from Fujii Masao, Heikki Linnakangas, Robert Haas
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c36
1 files changed, 33 insertions, 3 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b1e5247f12a..ee09468db17 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -38,6 +38,7 @@
#include <signal.h>
#include <unistd.h>
+#include "access/transam.h"
#include "access/xlog_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
@@ -45,6 +46,7 @@
#include "replication/walreceiver.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@@ -56,6 +58,7 @@ bool am_walreceiver;
/* GUC variable */
int wal_receiver_status_interval;
+bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
@@ -610,16 +613,43 @@ XLogWalRcvSendReply(void)
wal_receiver_status_interval * 1000))
return;
- /* Construct a new message. */
+ /* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+ /*
+ * Get the OldestXmin and its associated epoch
+ */
+ if (hot_standby_feedback && HotStandbyActive())
+ {
+ TransactionId nextXid;
+ uint32 nextEpoch;
+
+ reply_message.xmin = GetOldestXmin(true, false);
+
+ /*
+ * Get epoch and adjust if nextXid and oldestXmin are different
+ * sides of the epoch boundary.
+ */
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+ if (nextXid < reply_message.xmin)
+ nextEpoch--;
+ reply_message.epoch = nextEpoch;
+ }
+ else
+ {
+ reply_message.xmin = InvalidTransactionId;
+ reply_message.epoch = 0;
+ }
+
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
reply_message.write.xlogid, reply_message.write.xrecoff,
reply_message.flush.xlogid, reply_message.flush.xrecoff,
- reply_message.apply.xlogid, reply_message.apply.xrecoff);
+ reply_message.apply.xlogid, reply_message.apply.xrecoff,
+ reply_message.xmin,
+ reply_message.epoch);
/* Prepend with the message type and send it. */
buf[0] = 'r';