aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c36
1 files changed, 33 insertions, 3 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b1e5247f12a..ee09468db17 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -38,6 +38,7 @@
#include <signal.h>
#include <unistd.h>
+#include "access/transam.h"
#include "access/xlog_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
@@ -45,6 +46,7 @@
#include "replication/walreceiver.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@@ -56,6 +58,7 @@ bool am_walreceiver;
/* GUC variable */
int wal_receiver_status_interval;
+bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
@@ -610,16 +613,43 @@ XLogWalRcvSendReply(void)
wal_receiver_status_interval * 1000))
return;
- /* Construct a new message. */
+ /* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+ /*
+ * Get the OldestXmin and its associated epoch
+ */
+ if (hot_standby_feedback && HotStandbyActive())
+ {
+ TransactionId nextXid;
+ uint32 nextEpoch;
+
+ reply_message.xmin = GetOldestXmin(true, false);
+
+ /*
+ * Get epoch and adjust if nextXid and oldestXmin are different
+ * sides of the epoch boundary.
+ */
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+ if (nextXid < reply_message.xmin)
+ nextEpoch--;
+ reply_message.epoch = nextEpoch;
+ }
+ else
+ {
+ reply_message.xmin = InvalidTransactionId;
+ reply_message.epoch = 0;
+ }
+
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
reply_message.write.xlogid, reply_message.write.xrecoff,
reply_message.flush.xlogid, reply_message.flush.xrecoff,
- reply_message.apply.xlogid, reply_message.apply.xrecoff);
+ reply_message.apply.xlogid, reply_message.apply.xrecoff,
+ reply_message.xmin,
+ reply_message.epoch);
/* Prepend with the message type and send it. */
buf[0] = 'r';