aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walsender.c36
-rw-r--r--src/test/recovery/t/001_stream_rep.pl16
-rw-r--r--src/test/subscription/t/001_rep_changes.pl16
3 files changed, 66 insertions, 2 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1028919aecb..216baeda5cd 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -91,10 +91,14 @@
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL 1000
+
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
*
@@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc)
int wakeEvents;
uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ TimestampTz last_flush = 0;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc)
{
bool wait_for_standby_at_stop = false;
long sleeptime;
+ TimestampTz now;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc)
* new WAL to be generated. (But if we have nothing to send, we don't
* want to wake on socket-writable.)
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_SOCKET_READABLE;
@@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc)
Assert(wait_event != 0);
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+ last_flush = now;
+ }
+
WalSndWait(wakeEvents, sleeptime, wait_event);
}
@@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void)
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
+ TimestampTz last_flush = 0;
+
/*
* Initialize the last reply timestamp. That enables timeout processing
* from hereon.
@@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
* WalSndWaitForWal() handle any other blocking; idle receivers need
* its additional actions. For physical replication, also block if
* caught up; its send_data does not block.
+ *
+ * The IO statistics are reported in WalSndWaitForWal() for the
+ * logical WAL senders.
*/
if ((WalSndCaughtUp && send_data != XLogSendLogical &&
!streamingDoneSending) ||
@@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
{
long sleeptime;
int wakeEvents;
+ TimestampTz now;
if (!streamingDoneReceiving)
wakeEvents = WL_SOCKET_READABLE;
@@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data)
* Use fresh timestamp, not last_processing, to reduce the chance
* of reaching wal_sender_timeout before sending a keepalive.
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+ last_flush = now;
+ }
+
/* Sleep until something happens or we time out */
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
}
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ccd8417d449..2cbcc509d76 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
has_streaming => 1);
$node_standby_2->start;
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
# Create some content on primary and check its presence in standby nodes
$node_primary->safe_psql('postgres',
"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,19 @@ $node_primary->psql(
note "switching to physical replication slot";
+# Wait for the physical WAL sender to update its IO statistics. This is
+# done before the next restart, which would force a flush of its stats, and
+# far enough from the reset done above to not impact the run time.
+$node_primary->poll_query_until(
+ 'postgres',
+ qq[SELECT sum(reads) > 0
+ FROM pg_catalog.pg_stat_io
+ WHERE backend_type = 'walsender'
+ AND object = 'wal']
+ )
+ or die
+ "Timed out while waiting for the walsender to update its IO statistics";
+
# Switch to using a physical replication slot. We can do this without a new
# backup since physical slots can go backwards if needed. Do so on both
# standbys. Since we're going to be testing things that affect the slot state,
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 8726fe04ad2..916fdb48b3b 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -113,6 +113,9 @@ $node_subscriber->safe_psql('postgres',
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_publisher->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
is($result, qq(0), 'check non-replicated table is empty on subscriber');
@@ -184,6 +187,19 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col");
is($result, qq(2), 'check replicated changes for table having no columns');
+# Wait for the logical WAL sender to update its IO statistics. This is
+# done before the next restart, which would force a flush of its stats, and
+# far enough from the reset done above to not impact the run time.
+$node_publisher->poll_query_until(
+ 'postgres',
+ qq[SELECT sum(reads) > 0
+ FROM pg_catalog.pg_stat_io
+ WHERE backend_type = 'walsender'
+ AND object = 'wal']
+ )
+ or die
+ "Timed out while waiting for the walsender to update its IO statistics";
+
# insert some duplicate rows
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_full SELECT generate_series(1,10)");