diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2011-02-16 19:29:37 +0000 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2011-02-16 19:29:37 +0000 |
commit | bca8b7f16a3e720794cb0afbdb3733be4f8d9c2c (patch) | |
tree | 4d2e650f058ffa99af77f9068bf64a223de17246 /src/backend/replication/walreceiver.c | |
parent | 65076269ea54a8cd6e39f066a208c7d13aceac0a (diff) | |
download | postgresql-bca8b7f16a3e720794cb0afbdb3733be4f8d9c2c.tar.gz postgresql-bca8b7f16a3e720794cb0afbdb3733be4f8d9c2c.zip |
Hot Standby feedback for avoidance of cleanup conflicts on standby.
Standby optionally sends back information about oldestXmin of queries
which is then checked and applied to the WALSender's proc->xmin.
GetOldestXmin() is modified slightly to agree with GetSnapshotData(),
so that all backends on primary include WALSender within their snapshots.
Note this does nothing to change the snapshot xmin on either master or
standby. Feedback piggybacks on the standby reply message.
vacuum_defer_cleanup_age is no longer used on standby, though parameter
still exists on primary, since some use cases still exist.
Simon Riggs, review comments from Fujii Masao, Heikki Linnakangas, Robert Haas
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1e5247f12a..ee09468db17 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -38,6 +38,7 @@ #include <signal.h> #include <unistd.h> +#include "access/transam.h" #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -45,6 +46,7 @@ #include "replication/walreceiver.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -56,6 +58,7 @@ bool am_walreceiver; /* GUC variable */ int wal_receiver_status_interval; +bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; @@ -610,16 +613,43 @@ XLogWalRcvSendReply(void) wal_receiver_status_interval * 1000)) return; - /* Construct a new message. */ + /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + /* + * Get the OldestXmin and its associated epoch + */ + if (hot_standby_feedback && HotStandbyActive()) + { + TransactionId nextXid; + uint32 nextEpoch; + + reply_message.xmin = GetOldestXmin(true, false); + + /* + * Get epoch and adjust if nextXid and oldestXmin are different + * sides of the epoch boundary. + */ + GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (nextXid < reply_message.xmin) + nextEpoch--; + reply_message.epoch = nextEpoch; + } + else + { + reply_message.xmin = InvalidTransactionId; + reply_message.epoch = 0; + } + + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", reply_message.write.xlogid, reply_message.write.xrecoff, reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff); + reply_message.apply.xlogid, reply_message.apply.xrecoff, + reply_message.xmin, + reply_message.epoch); /* Prepend with the message type and send it. */ buf[0] = 'r'; |