aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/monitoring.sgml5
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/walsender.c71
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/replication/walsender_private.h5
-rw-r--r--src/test/regress/expected/rules.out5
7 files changed, 77 insertions, 20 deletions
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 40e4298cf4e..96bcc3a63be 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</itemizedlist>
</entry>
</row>
+ <row>
+ <entry><structfield>reply_time</structfield></entry>
+ <entry><type>timestamp with time zone</type></entry>
+ <entry>Send time of last reply message received from standby server</entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 715995dd883..8630542bb34 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
W.flush_lag,
W.replay_lag,
W.sync_priority,
- W.sync_state
+ W.sync_state,
+ W.reply_time
FROM pg_stat_get_activity(NULL) AS S
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb525e88..d1a8113cb66 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void)
applyLag;
bool clearLagTimes;
TimestampTz now;
+ TimestampTz replyTime;
static bool fullyAppliedLastTime = false;
@@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void)
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyTime = pq_getmsgint64(&reply_message);
replyRequested = pq_getmsgbyte(&reply_message);
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
- (uint32) (writePtr >> 32), (uint32) writePtr,
- (uint32) (flushPtr >> 32), (uint32) flushPtr,
- (uint32) (applyPtr >> 32), (uint32) applyPtr,
- replyRequested ? " (reply requested)" : "");
+ if (log_min_messages <= DEBUG2)
+ {
+ char *replyTimeStr;
+
+ /* Copy because timestamptz_to_str returns a static buffer */
+ replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+ (uint32) (writePtr >> 32), (uint32) writePtr,
+ (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ (uint32) (applyPtr >> 32), (uint32) applyPtr,
+ replyRequested ? " (reply requested)" : "",
+ replyTimeStr);
+
+ pfree(replyTimeStr);
+ }
/* See if we can compute the round-trip lag for these positions. */
now = GetCurrentTimestamp();
@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void)
walsnd->flushLag = flushLag;
if (applyLag != -1 || clearLagTimes)
walsnd->applyLag = applyLag;
+ walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex);
}
@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void)
uint32 feedbackEpoch;
TransactionId feedbackCatalogXmin;
uint32 feedbackCatalogEpoch;
+ TimestampTz replyTime;
/*
* Decipher the reply message. The caller already consumed the msgtype
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
* of this message.
*/
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyTime = pq_getmsgint64(&reply_message);
feedbackXmin = pq_getmsgint(&reply_message, 4);
feedbackEpoch = pq_getmsgint(&reply_message, 4);
feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
- elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
- feedbackXmin,
- feedbackEpoch,
- feedbackCatalogXmin,
- feedbackCatalogEpoch);
+ if (log_min_messages <= DEBUG2)
+ {
+ char *replyTimeStr;
+
+ /* Copy because timestamptz_to_str returns a static buffer */
+ replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
+ feedbackXmin,
+ feedbackEpoch,
+ feedbackCatalogXmin,
+ feedbackCatalogEpoch,
+ replyTimeStr);
+
+ pfree(replyTimeStr);
+ }
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ {
+ WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+ }
/*
* Unset WalSender's xmins if the feedback message values are invalid.
@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void)
walsnd->applyLag = -1;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
+ walsnd->replyTime = 0;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 11
+#define PG_STAT_GET_WAL_SENDERS_COLS 12
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int priority;
int pid;
WalSndState state;
+ TimestampTz replyTime;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
flushLag = walsnd->flushLag;
applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority;
+ replyTime = walsnd->replyTime;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
values[10] = CStringGetTextDatum("potential");
+
+ if (replyTime == 0)
+ nulls[11] = true;
+ else
+ values[11] = TimestampTzGetDatum(replyTime);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index be72bddd17c..e16ec9dd778 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201811201
+#define CATALOG_VERSION_NO 201812091
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 034a41eb556..f79fcfe029f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5023,9 +5023,9 @@
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}',
+ proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
prosrc => 'pg_stat_get_wal_senders' },
{ oid => '3317', descr => 'statistics: information about WAL receiver',
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 4b904779361..53314b1fae5 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -75,6 +75,11 @@ typedef struct WalSnd
* SyncRepLock.
*/
int sync_standby_priority;
+
+ /*
+ * Timestamp of the last message received from standby.
+ */
+ TimestampTz replyTime;
} WalSnd;
extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 735dd37acff..b68b8d273f3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid,
w.flush_lag,
w.replay_lag,
w.sync_priority,
- w.sync_state
+ w.sync_state,
+ w.reply_time
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
- JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
+ JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
s.ssl,