aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql2
-rw-r--r--src/backend/postmaster/pgstat.c6
-rw-r--r--src/backend/replication/logical/logical.c18
-rw-r--r--src/backend/replication/logical/reorderbuffer.c21
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c8
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/pgstat.h4
-rw-r--r--src/include/replication/reorderbuffer.h7
-rw-r--r--src/test/regress/expected/rules.out4
10 files changed, 63 insertions, 15 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a0..6d78b335908 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 666ce95d083..e1ec7d8b7d6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
msg.m_stream_txns = repSlotStat->stream_txns;
msg.m_stream_count = repSlotStat->stream_count;
msg.m_stream_bytes = repSlotStat->stream_bytes;
+ msg.m_total_txns = repSlotStat->total_txns;
+ msg.m_total_bytes = repSlotStat->total_bytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].stream_txns += msg->m_stream_txns;
replSlotStats[idx].stream_count += msg->m_stream_count;
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+ replSlotStats[idx].total_txns += msg->m_total_txns;
+ replSlotStats[idx].total_bytes += msg->m_total_bytes;
}
}
@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].stream_txns = 0;
replSlotStats[i].stream_count = 0;
replSlotStats[i].stream_bytes = 0;
+ replSlotStats[i].total_txns = 0;
+ replSlotStats[i].total_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12b..35b0c676412 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder;
PgStat_ReplSlotStats repSlotStat;
- /*
- * Nothing to do if we haven't spilled or streamed anything since the last
- * time the stats has been sent.
- */
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+ /* Nothing to do if we don't have any replication stats to be sent. */
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
(long long) rb->spillBytes,
(long long) rb->streamTxns,
(long long) rb->streamCount,
- (long long) rb->streamBytes);
+ (long long) rb->streamBytes,
+ (long long) rb->totalTxns,
+ (long long) rb->totalBytes);
namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
+ repSlotStat.total_txns = rb->totalTxns;
+ repSlotStat.total_bytes = rb->totalBytes;
pgstat_report_replslot(&repSlotStat);
+
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
rb->streamTxns = 0;
rb->streamCount = 0;
rb->streamBytes = 0;
+ rb->totalTxns = 0;
+ rb->totalBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a21..5cb484f0323 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
buffer->streamTxns = 0;
buffer->streamCount = 0;
buffer->streamBytes = 0;
+ buffer->totalTxns = 0;
+ buffer->totalBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node);
+ /*
+ * Update the total bytes processed before releasing the current set
+ * of changes and restoring the new set of changes.
+ */
+ rb->totalBytes += rb->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
@@ -2364,6 +2371,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
iterstate = NULL;
/*
+ * Update total transaction count and total transaction bytes
+ * processed. Ensure to not count the streamed transaction multiple
+ * times.
+ *
+ * Note that the statistics computation has to be done after
+ * ReorderBufferIterTXNFinish as it releases the serialized change
+ * which we have already accounted in ReorderBufferIterTXNNext.
+ */
+ if (!rbtxn_is_streamed(txn))
+ rb->totalTxns++;
+
+ rb->totalBytes += rb->size;
+
+ /*
* Done with current changes, send the last message for this set of
* changes depending upon streaming mode.
*/
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba736143..2680190a402 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[4] = Int64GetDatum(s->stream_txns);
values[5] = Int64GetDatum(s->stream_count);
values[6] = Int64GetDatum(s->stream_bytes);
+ values[7] = Int64GetDatum(s->total_txns);
+ values[8] = Int64GetDatum(s->total_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[7] = true;
+ nulls[9] = true;
else
- values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 87e9596da56..904b0c97ec2 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202104151
+#define CATALOG_VERSION_NO 202104161
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae6..591753fe817 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5315,9 +5315,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 8e11215058e..2aeb3cded4d 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_stream_txns;
PgStat_Counter m_stream_count;
PgStat_Counter m_stream_bytes;
+ PgStat_Counter m_total_txns;
+ PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
/* ----------
@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter stream_txns;
PgStat_Counter stream_count;
PgStat_Counter stream_bytes;
+ PgStat_Counter total_txns;
+ PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6ab..bfab8303ee7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,13 @@ struct ReorderBuffer
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */
+
+ /*
+ * Statistics about all the transactions sent to the decoding output
+ * plugin
+ */
+ int64 totalTxns; /* total number of transactions sent */
+ int64 totalBytes; /* total amount of data sent */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c6..6399f3feef8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,