diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1e5247f12a..ee09468db17 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -38,6 +38,7 @@ #include <signal.h> #include <unistd.h> +#include "access/transam.h" #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -45,6 +46,7 @@ #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -56,6 +58,7 @@ bool am_walreceiver; /* GUC variable */ int wal_receiver_status_interval; +bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; @@ -610,16 +613,43 @@ XLogWalRcvSendReply(void) wal_receiver_status_interval * 1000)) return; - /* Construct a new message. */ + /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + /* + * Get the OldestXmin and its associated epoch + */ + if (hot_standby_feedback && HotStandbyActive()) + { + TransactionId nextXid; + uint32 nextEpoch; + + reply_message.xmin = GetOldestXmin(true, false); + + /* + * Get epoch and adjust if nextXid and oldestXmin are different + * sides of the epoch boundary. + */ + GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (nextXid < reply_message.xmin) + nextEpoch--; + reply_message.epoch = nextEpoch; + } + else + { + reply_message.xmin = InvalidTransactionId; + reply_message.epoch = 0; + } + + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", reply_message.write.xlogid, reply_message.write.xrecoff, reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff); + reply_message.apply.xlogid, reply_message.apply.xrecoff, + reply_message.xmin, + reply_message.epoch); /* Prepend with the message type and send it. */ buf[0] = 'r'; |