diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/walsender.c | 36 | ||||
-rw-r--r-- | src/test/recovery/t/001_stream_rep.pl | 16 | ||||
-rw-r--r-- | src/test/subscription/t/001_rep_changes.pl | 16 |
3 files changed, 66 insertions, 2 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1028919aecb..216baeda5cd 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -91,10 +91,14 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/pgstat_internal.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" +/* Minimum interval used by walsender for stats flushes, in ms */ +#define WALSENDER_STATS_FLUSH_INTERVAL 1000 + /* * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. * @@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc) int wakeEvents; uint32 wait_event = 0; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + TimestampTz last_flush = 0; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc) { bool wait_for_standby_at_stop = false; long sleeptime; + TimestampTz now; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc) * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + sleeptime = WalSndComputeSleeptime(now); wakeEvents = WL_SOCKET_READABLE; @@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc) Assert(wait_event != 0); + /* Report IO statistics, if needed */ + if (TimestampDifferenceExceeds(last_flush, now, + WALSENDER_STATS_FLUSH_INTERVAL)) + { + pgstat_flush_io(false); + (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO); + last_flush = now; + } + WalSndWait(wakeEvents, sleeptime, wait_event); } @@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void) static void WalSndLoop(WalSndSendDataCallback send_data) { + TimestampTz last_flush = 0; + /* * Initialize the last reply timestamp. That enables timeout processing * from hereon. @@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data) * WalSndWaitForWal() handle any other blocking; idle receivers need * its additional actions. For physical replication, also block if * caught up; its send_data does not block. + * + * The IO statistics are reported in WalSndWaitForWal() for the + * logical WAL senders. */ if ((WalSndCaughtUp && send_data != XLogSendLogical && !streamingDoneSending) || @@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data) { long sleeptime; int wakeEvents; + TimestampTz now; if (!streamingDoneReceiving) wakeEvents = WL_SOCKET_READABLE; @@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data) * Use fresh timestamp, not last_processing, to reduce the chance * of reaching wal_sender_timeout before sending a keepalive. */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + sleeptime = WalSndComputeSleeptime(now); if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; + /* Report IO statistics, if needed */ + if (TimestampDifferenceExceeds(last_flush, now, + WALSENDER_STATS_FLUSH_INTERVAL)) + { + pgstat_flush_io(false); + (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO); + last_flush = now; + } + /* Sleep until something happens or we time out */ WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); } diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index ccd8417d449..2cbcc509d76 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name, has_streaming => 1); $node_standby_2->start; +# Reset IO statistics, for the WAL sender check with pg_stat_io. +$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')"); + # Create some content on primary and check its presence in standby nodes $node_primary->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a"); @@ -333,6 +336,19 @@ $node_primary->psql( note "switching to physical replication slot"; +# Wait for the physical WAL sender to update its IO statistics. This is +# done before the next restart, which would force a flush of its stats, and +# far enough from the reset done above to not impact the run time. +$node_primary->poll_query_until( + 'postgres', + qq[SELECT sum(reads) > 0 + FROM pg_catalog.pg_stat_io + WHERE backend_type = 'walsender' + AND object = 'wal'] + ) + or die + "Timed out while waiting for the walsender to update its IO statistics"; + # Switch to using a physical replication slot. We can do this without a new # backup since physical slots can go backwards if needed. Do so on both # standbys. Since we're going to be testing things that affect the slot state, diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 8726fe04ad2..916fdb48b3b 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -113,6 +113,9 @@ $node_subscriber->safe_psql('postgres', # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); +# Reset IO statistics, for the WAL sender check with pg_stat_io. +$node_publisher->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')"); + my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); is($result, qq(0), 'check non-replicated table is empty on subscriber'); @@ -184,6 +187,19 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col"); is($result, qq(2), 'check replicated changes for table having no columns'); +# Wait for the logical WAL sender to update its IO statistics. This is +# done before the next restart, which would force a flush of its stats, and +# far enough from the reset done above to not impact the run time. +$node_publisher->poll_query_until( + 'postgres', + qq[SELECT sum(reads) > 0 + FROM pg_catalog.pg_stat_io + WHERE backend_type = 'walsender' + AND object = 'wal'] + ) + or die + "Timed out while waiting for the walsender to update its IO statistics"; + # insert some duplicate rows $node_publisher->safe_psql('postgres', "INSERT INTO tab_full SELECT generate_series(1,10)"); |