aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-04-27 09:09:11 +0530
committerAmit Kapila <akapila@postgresql.org>2021-04-27 09:09:11 +0530
commit3fa17d37716f978f80dfcdab4e7c73f3a24e7a48 (patch)
tree43a865e413ebd2852e418535b8fbbdb4a83d6d78 /src
parente7eea52b2d61917fbbdac7f3f895e4ef636e935b (diff)
downloadpostgresql-3fa17d37716f978f80dfcdab4e7c73f3a24e7a48.tar.gz
postgresql-3fa17d37716f978f80dfcdab4e7c73f3a24e7a48.zip
Use HTAB for replication slot statistics.
Previously, we used to use the array of size max_replication_slots to store stats for replication slots. But that had two problems in the cases where a message for dropping a slot gets lost: 1) the stats for the new slot are not recorded if the array is full and 2) writing beyond the end of the array if the user reduces the max_replication_slots. This commit uses HTAB for replication slot statistics, resolving both problems. Now, pgstat_vacuum_stat() search for all the dead replication slots in stats hashtable and tell the collector to remove them. To avoid showing the stats for the already-dropped slots, pg_stat_replication_slots view searches slot stats by the slot name taken from pg_replication_slots. Also, we send a message for creating a slot at slot creation, initializing the stats. This reduces the possibility that the stats are accumulated into the old slot stats when a message for dropping a slot gets lost. Reported-by: Andres Freund Author: Sawada Masahiko, test case by Vignesh C Reviewed-by: Amit Kapila, Vignesh C, Dilip Kumar Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql30
-rw-r--r--src/backend/postmaster/pgstat.c340
-rw-r--r--src/backend/replication/logical/logical.c2
-rw-r--r--src/backend/replication/slot.c28
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c146
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat14
-rw-r--r--src/include/pgstat.h10
-rw-r--r--src/include/replication/slot.h2
-rw-r--r--src/test/regress/expected/rules.out4
-rw-r--r--src/tools/pgindent/typedefs.list2
11 files changed, 335 insertions, 245 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 70e578894f5..08f95c43cae 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
-CREATE VIEW pg_stat_replication_slots AS
- SELECT
- s.slot_name,
- s.spill_txns,
- s.spill_count,
- s.spill_bytes,
- s.stream_txns,
- s.stream_count,
- s.stream_bytes,
- s.total_txns,
- s.total_bytes,
- s.stats_reset
- FROM pg_stat_get_replication_slots() AS s;
-
CREATE VIEW pg_stat_slru AS
SELECT
s.name,
@@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
+CREATE VIEW pg_stat_replication_slots AS
+ SELECT
+ s.slot_name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
+ s.stats_reset
+ FROM pg_replication_slots as r,
+ LATERAL pg_stat_get_replication_slot(slot_name) as s
+ WHERE r.datoid IS NOT NULL; -- excluding physical slots
+
CREATE VIEW pg_stat_database AS
SELECT
D.oid AS datid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 6e8dee97842..ba335fd3429 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,6 +106,7 @@
#define PGSTAT_DB_HASH_SIZE 16
#define PGSTAT_TAB_HASH_SIZE 512
#define PGSTAT_FUNCTION_HASH_SIZE 512
+#define PGSTAT_REPLSLOT_HASH_SIZE 32
/* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int nReplSlotStats;
+static HTAB *replSlotStatHash = NULL;
static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
/*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
-static int pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
@@ -1110,6 +1110,24 @@ pgstat_vacuum_stat(void)
hash_destroy(htab);
/*
+ * Search for all the dead replication slots in stats hashtable and tell
+ * the stats collector to drop them.
+ */
+ if (replSlotStatHash)
+ {
+ PgStat_StatReplSlotEntry *slotentry;
+
+ hash_seq_init(&hstat, replSlotStatHash);
+ while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+ pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+ }
+ }
+
+ /*
* Lookup our own database entry; if not found, nothing more to do.
*/
dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
if (name)
{
- ReplicationSlot *slot;
-
- /*
- * Check if the slot exists with the given name. It is possible that by
- * the time this message is executed the slot is dropped but at least
- * this check will ensure that the given name is for a valid slot.
- */
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- slot = SearchNamedReplicationSlot(name);
- LWLockRelease(ReplicationSlotControlLock);
-
- if (!slot)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" does not exist",
- name)));
-
- /*
- * Nothing to do for physical slots as we collect stats only for
- * logical slots.
- */
- if (SlotIsPhysical(slot))
- return;
-
namestrcpy(&msg.m_slotname, name);
msg.clearall = false;
}
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
* ----------
*/
void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
{
PgStat_MsgReplSlot msg;
@@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
*/
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
+ msg.m_create = false;
msg.m_drop = false;
msg.m_spill_txns = repSlotStat->spill_txns;
msg.m_spill_count = repSlotStat->spill_count;
@@ -1835,6 +1830,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
}
/* ----------
+ * pgstat_report_replslot_create() -
+ *
+ * Tell the collector about creating the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_create(const char *slotname)
+{
+ PgStat_MsgReplSlot msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ namestrcpy(&msg.m_slotname, slotname);
+ msg.m_create = true;
+ msg.m_drop = false;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
* pgstat_report_replslot_drop() -
*
* Tell the collector about dropping the replication slot.
@@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname)
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, slotname);
+ msg.m_create = false;
msg.m_drop = true;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void)
* pgstat_fetch_replslot() -
*
* Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the replication slot statistics struct and sets the
- * number of entries in nslots_p.
+ * a pointer to the replication slot statistics struct.
* ---------
*/
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_StatReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
{
backend_read_statsfile();
- *nslots_p = nReplSlotStats;
- return replSlotStats;
+ return pgstat_get_replslot_entry(slotname, false);
}
/*
@@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
- int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
/*
* Write replication slot stats struct
*/
- for (i = 0; i < nReplSlotStats; i++)
+ if (replSlotStatHash)
{
- fputc('R', fpout);
- rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ PgStat_StatReplSlotEntry *slotent;
+
+ hash_seq_init(&hstat, replSlotStatHash);
+ while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ fputc('R', fpout);
+ rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
}
/*
@@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- /* Allocate the space for replication slot statistics */
- replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
- max_replication_slots
- * sizeof(PgStat_ReplSlotStats));
- nReplSlotStats = 0;
-
/*
* Clear out global, archiver, WAL and SLRU statistics so they start from
* zero in case we can't load an existing statsfile.
@@ -4006,12 +4017,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
- * Set the same reset timestamp for all replication slots too.
- */
- for (i = 0; i < max_replication_slots; i++)
- replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
- /*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
@@ -4197,21 +4202,43 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
/*
- * 'R' A PgStat_ReplSlotStats struct describing a replication
- * slot follows.
+ * 'R' A PgStat_StatReplSlotEntry struct describing a
+ * replication slot follows.
*/
case 'R':
- if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
- != sizeof(PgStat_ReplSlotStats))
{
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
- goto done;
+ PgStat_StatReplSlotEntry slotbuf;
+ PgStat_StatReplSlotEntry *slotent;
+
+ if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+ != sizeof(PgStat_StatReplSlotEntry))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ /* Create hash table if we don't have it already. */
+ if (replSlotStatHash == NULL)
+ {
+ HASHCTL hash_ctl;
+
+ hash_ctl.keysize = sizeof(NameData);
+ hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+ hash_ctl.hcxt = pgStatLocalContext;
+ replSlotStatHash = hash_create("Replication slots hash",
+ PGSTAT_REPLSLOT_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+ (void *) &slotbuf.slotname,
+ HASH_ENTER, NULL);
+ memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
+ break;
}
- nReplSlotStats++;
- break;
case 'E':
goto done;
@@ -4424,7 +4451,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_ArchiverStats myArchiverStats;
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
- PgStat_ReplSlotStats myReplSlotStats;
+ PgStat_StatReplSlotEntry myReplSlotStats;
PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
FILE *fpin;
int32 format_id;
@@ -4553,12 +4580,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break;
/*
- * 'R' A PgStat_ReplSlotStats struct describing a replication
- * slot follows.
+ * 'R' A PgStat_StatReplSlotEntry struct describing a
+ * replication slot follows.
*/
case 'R':
- if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
- != sizeof(PgStat_ReplSlotStats))
+ if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+ != sizeof(PgStat_StatReplSlotEntry))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
@@ -4764,8 +4791,7 @@ pgstat_clear_snapshot(void)
/* Reset variables */
pgStatLocalContext = NULL;
pgStatDBHash = NULL;
- replSlotStats = NULL;
- nReplSlotStats = 0;
+ replSlotStatHash = NULL;
/*
* Historically the backend_status.c facilities lived in this file, and
@@ -5189,20 +5215,26 @@ static void
pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
int len)
{
- int i;
- int idx = -1;
+ PgStat_StatReplSlotEntry *slotent;
TimestampTz ts;
+ /* Return if we don't have replication slot statistics */
+ if (replSlotStatHash == NULL)
+ return;
+
ts = GetCurrentTimestamp();
if (msg->clearall)
{
- for (i = 0; i < nReplSlotStats; i++)
- pgstat_reset_replslot(i, ts);
+ HASH_SEQ_STATUS sstat;
+
+ hash_seq_init(&sstat, replSlotStatHash);
+ while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+ pgstat_reset_replslot(slotent, ts);
}
else
{
- /* Get the index of replication slot statistics to reset */
- idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+ /* Get the slot statistics to reset */
+ slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
/*
* Nothing to do if the given slot entry is not found. This could
@@ -5210,11 +5242,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
* corresponding statistics entry is also removed before receiving the
* reset message.
*/
- if (idx < 0)
+ if (!slotent)
return;
/* Reset the stats for the requested replication slot */
- pgstat_reset_replslot(idx, ts);
+ pgstat_reset_replslot(slotent, ts);
}
}
@@ -5532,46 +5564,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
static void
pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
{
- int idx;
-
- /*
- * Get the index of replication slot statistics. On dropping, we don't
- * create the new statistics.
- */
- idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
- /*
- * The slot entry is not found or there is no space to accommodate the new
- * entry. This could happen when the message for the creation of a slot
- * reached before the drop message even though the actual operations
- * happen in reverse order. In such a case, the next update of the
- * statistics for the same slot will create the required entry.
- */
- if (idx < 0)
- return;
-
- /* it must be a valid replication slot index */
- Assert(idx < nReplSlotStats);
-
if (msg->m_drop)
{
+ Assert(!msg->m_create);
+
/* Remove the replication slot statistics with the given name */
- if (idx < nReplSlotStats - 1)
- memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
- sizeof(PgStat_ReplSlotStats));
- nReplSlotStats--;
+ if (replSlotStatHash != NULL)
+ (void) hash_search(replSlotStatHash,
+ (void *) &(msg->m_slotname),
+ HASH_REMOVE,
+ NULL);
}
else
{
- /* Update the replication slot statistics */
- replSlotStats[idx].spill_txns += msg->m_spill_txns;
- replSlotStats[idx].spill_count += msg->m_spill_count;
- replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
- replSlotStats[idx].stream_txns += msg->m_stream_txns;
- replSlotStats[idx].stream_count += msg->m_stream_count;
- replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
- replSlotStats[idx].total_txns += msg->m_total_txns;
- replSlotStats[idx].total_bytes += msg->m_total_bytes;
+ PgStat_StatReplSlotEntry *slotent;
+
+ slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+ Assert(slotent);
+
+ if (msg->m_create)
+ {
+ /*
+ * If the message for dropping the slot with the same name gets
+ * lost, slotent has stats for the old slot. So we initialize all
+ * counters at slot creation.
+ */
+ pgstat_reset_replslot(slotent, 0);
+ }
+ else
+ {
+ /* Update the replication slot statistics */
+ slotent->spill_txns += msg->m_spill_txns;
+ slotent->spill_count += msg->m_spill_count;
+ slotent->spill_bytes += msg->m_spill_bytes;
+ slotent->stream_txns += msg->m_stream_txns;
+ slotent->stream_count += msg->m_stream_count;
+ slotent->stream_bytes += msg->m_stream_bytes;
+ slotent->total_txns += msg->m_total_txns;
+ slotent->total_bytes += msg->m_total_bytes;
+ }
}
}
@@ -5749,59 +5780,80 @@ pgstat_db_requested(Oid databaseid)
}
/* ----------
- * pgstat_replslot_index
+ * pgstat_replslot_entry
*
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
*
- * create_it tells whether to create the new slot entry if it is not found.
+ * create tells whether to create the new slot entry if it is not found.
* ----------
*/
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_StatReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create)
{
- int i;
+ PgStat_StatReplSlotEntry *slotent;
+ bool found;
- Assert(nReplSlotStats <= max_replication_slots);
- for (i = 0; i < nReplSlotStats; i++)
+ if (replSlotStatHash == NULL)
{
- if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
- return i; /* found */
+ HASHCTL hash_ctl;
+
+ /*
+ * Quick return NULL if the hash table is empty and the caller didn't
+ * request to create the entry.
+ */
+ if (!create)
+ return NULL;
+
+ hash_ctl.keysize = sizeof(NameData);
+ hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+ replSlotStatHash = hash_create("Replication slots hash",
+ PGSTAT_REPLSLOT_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
}
- /*
- * The slot is not found. We don't want to register the new statistics if
- * the list is already full or the caller didn't request.
- */
- if (i == max_replication_slots || !create_it)
- return -1;
+ slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+ (void *) &name,
+ create ? HASH_ENTER : HASH_FIND,
+ &found);
- /* Register new slot */
- memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
- namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+ if (!slotent)
+ {
+ /* not found */
+ Assert(!create && !found);
+ return NULL;
+ }
+
+ /* initialize the entry */
+ if (create && !found)
+ {
+ namestrcpy(&(slotent->slotname), NameStr(name));
+ pgstat_reset_replslot(slotent, 0);
+ }
- return nReplSlotStats++;
+ return slotent;
}
/* ----------
* pgstat_reset_replslot
*
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
* ----------
*/
static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
{
/* reset only counters. Don't clear slot name */
- replSlotStats[i].spill_txns = 0;
- replSlotStats[i].spill_count = 0;
- replSlotStats[i].spill_bytes = 0;
- replSlotStats[i].stream_txns = 0;
- replSlotStats[i].stream_count = 0;
- replSlotStats[i].stream_bytes = 0;
- replSlotStats[i].total_txns = 0;
- replSlotStats[i].total_bytes = 0;
- replSlotStats[i].stat_reset_timestamp = ts;
+ slotent->spill_txns = 0;
+ slotent->spill_count = 0;
+ slotent->spill_bytes = 0;
+ slotent->stream_txns = 0;
+ slotent->stream_count = 0;
+ slotent->stream_bytes = 0;
+ slotent->total_txns = 0;
+ slotent->total_bytes = 0;
+ slotent->stat_reset_timestamp = ts;
}
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 35b0c676412..00543ede45a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,7 +1773,7 @@ void
UpdateDecodingStats(LogicalDecodingContext *ctx)
{
ReorderBuffer *rb = ctx->reorder;
- PgStat_ReplSlotStats repSlotStat;
+ PgStat_StatReplSlotEntry repSlotStat;
/* Nothing to do if we don't have any replication stats to be sent. */
if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78d..cf261e200e4 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -328,12 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
- {
- PgStat_ReplSlotStats repSlotStat;
- MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
- namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
- pgstat_report_replslot(&repSlotStat);
- }
+ pgstat_report_replslot_create(NameStr(slot->data.name));
/*
* Now that the slot has been marked as in_use and active, it's safe to
@@ -349,17 +344,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* Search for the named replication slot.
*
* Return the replication slot if found, otherwise NULL.
- *
- * The caller must hold ReplicationSlotControlLock in shared mode.
*/
ReplicationSlot *
-SearchNamedReplicationSlot(const char *name)
+SearchNamedReplicationSlot(const char *name, bool need_lock)
{
int i;
- ReplicationSlot *slot = NULL;
+ ReplicationSlot *slot = NULL;
- Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
- LW_SHARED));
+ if (need_lock)
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
@@ -372,6 +365,9 @@ SearchNamedReplicationSlot(const char *name)
}
}
+ if (need_lock)
+ LWLockRelease(ReplicationSlotControlLock);
+
return slot;
}
@@ -416,7 +412,7 @@ retry:
* Search for the slot with the specified name if the slot to acquire is
* not given. If the slot is not found, we either return -1 or error out.
*/
- s = slot ? slot : SearchNamedReplicationSlot(name);
+ s = slot ? slot : SearchNamedReplicationSlot(name, false);
if (s == NULL || !s->in_use)
{
LWLockRelease(ReplicationSlotControlLock);
@@ -713,6 +709,12 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
* reduce that possibility. If the messages reached in reverse, we would
* lose one statistics update message. But the next update message will
* create the statistics for the replication slot.
+ *
+ * XXX In case, the messages for creation and drop slot of the same name
+ * get lost and create happens before (auto)vacuum cleans up the dead
+ * slot, the stats will be accumulated into the old slot. One can imagine
+ * having OIDs for each slot to avoid the accumulation of stats but that
+ * doesn't seem worth doing as in practice this won't happen frequently.
*/
if (SlotIsLogical(slot))
pgstat_report_replslot_drop(NameStr(slot->data.name));
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 87f02d572e6..14056f53471 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
#include "pgstat.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/acl.h"
@@ -2207,8 +2208,33 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
char *target = NULL;
if (!PG_ARGISNULL(0))
+ {
+ ReplicationSlot *slot;
+
target = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ /*
+ * Check if the slot exists with the given name. It is possible that
+ * by the time this message is executed the slot is dropped but at
+ * least this check will ensure that the given name is for a valid
+ * slot.
+ */
+ slot = SearchNamedReplicationSlot(target, true);
+
+ if (!slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" does not exist",
+ target)));
+
+ /*
+ * Nothing to do for physical slots as we collect stats only for
+ * logical slots.
+ */
+ if (SlotIsPhysical(slot))
+ PG_RETURN_VOID();
+ }
+
pgstat_reset_replslot_counter(target);
PG_RETURN_VOID();
@@ -2280,73 +2306,77 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
-/* Get the statistics for the replication slots */
+/*
+ * Get the statistics for the replication slot. If the slot statistics is not
+ * available, return all-zeroes stats.
+ */
Datum
-pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
- ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ text *slotname_text = PG_GETARG_TEXT_P(0);
+ NameData slotname;
TupleDesc tupdesc;
- Tuplestorestate *tupstore;
- MemoryContext per_query_ctx;
- MemoryContext oldcontext;
- PgStat_ReplSlotStats *slotstats;
- int nstats;
- int i;
+ Datum values[10];
+ bool nulls[10];
+ PgStat_StatReplSlotEntry *slotent;
+ PgStat_StatReplSlotEntry allzero;
- /* check to see if caller supports us returning a tuplestore */
- if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("set-valued function called in context that cannot accept a set")));
- if (!(rsinfo->allowedModes & SFRM_Materialize))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("materialize mode required, but it is not allowed in this context")));
-
- /* Build a tuple descriptor for our result type */
- if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
- elog(ERROR, "return type must be a row type");
-
- per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
- oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->returnMode = SFRM_Materialize;
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
+ /* Initialise values and NULL flags arrays */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
- MemoryContextSwitchTo(oldcontext);
+ /* Initialise attributes information in the tuple descriptor */
+ tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TIMESTAMPTZOID, -1, 0);
+ BlessTupleDesc(tupdesc);
- slotstats = pgstat_fetch_replslot(&nstats);
- for (i = 0; i < nstats; i++)
+ namestrcpy(&slotname, text_to_cstring(slotname_text));
+ slotent = pgstat_fetch_replslot(slotname);
+ if (!slotent)
{
- Datum values[PG_STAT_GET_REPLICATION_SLOT_COLS];
- bool nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
- PgStat_ReplSlotStats *s = &(slotstats[i]);
-
- MemSet(values, 0, sizeof(values));
- MemSet(nulls, 0, sizeof(nulls));
-
- values[0] = CStringGetTextDatum(NameStr(s->slotname));
- values[1] = Int64GetDatum(s->spill_txns);
- values[2] = Int64GetDatum(s->spill_count);
- values[3] = Int64GetDatum(s->spill_bytes);
- values[4] = Int64GetDatum(s->stream_txns);
- values[5] = Int64GetDatum(s->stream_count);
- values[6] = Int64GetDatum(s->stream_bytes);
- values[7] = Int64GetDatum(s->total_txns);
- values[8] = Int64GetDatum(s->total_bytes);
-
- if (s->stat_reset_timestamp == 0)
- nulls[9] = true;
- else
- values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
-
- tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ /*
+ * If the slot is not found, initialise its stats. This is possible if
+ * the create slot message is lost.
+ */
+ memset(&allzero, 0, sizeof(PgStat_StatReplSlotEntry));
+ slotent = &allzero;
}
- tuplestore_donestoring(tupstore);
+ values[0] = CStringGetTextDatum(NameStr(slotname));
+ values[1] = Int64GetDatum(slotent->spill_txns);
+ values[2] = Int64GetDatum(slotent->spill_count);
+ values[3] = Int64GetDatum(slotent->spill_bytes);
+ values[4] = Int64GetDatum(slotent->stream_txns);
+ values[5] = Int64GetDatum(slotent->stream_count);
+ values[6] = Int64GetDatum(slotent->stream_bytes);
+ values[7] = Int64GetDatum(slotent->total_txns);
+ values[8] = Int64GetDatum(slotent->total_bytes);
- return (Datum) 0;
+ if (slotent->stat_reset_timestamp == 0)
+ nulls[9] = true;
+ else
+ values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+
+ /* Returns the record as Datum */
+ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index ba1a0d03333..22dcd0a270c 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202104231
+#define CATALOG_VERSION_NO 202104271
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index db1abc149c6..91f0ea2212c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5308,14 +5308,14 @@
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
prosrc => 'pg_stat_get_wal_receiver' },
-{ oid => '8595', descr => 'statistics: information about replication slots',
- proname => 'pg_stat_get_replication_slots', prorows => '10',
+{ oid => '8595', descr => 'statistics: information about replication slot',
+ proname => 'pg_stat_get_replication_slot', prorows => '1',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
- prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
- prosrc => 'pg_stat_get_replication_slots' },
+ prorettype => 'record', proargtypes => 'text',
+ proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+ prosrc => 'pg_stat_get_replication_slot' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5c5920b0b5f..1ce363e7d18 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -541,6 +541,7 @@ typedef struct PgStat_MsgReplSlot
{
PgStat_MsgHdr m_hdr;
NameData m_slotname;
+ bool m_create;
bool m_drop;
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
@@ -917,7 +918,7 @@ typedef struct PgStat_SLRUStats
/*
* Replication slot statistics kept in the stats collector
*/
-typedef struct PgStat_ReplSlotStats
+typedef struct PgStat_StatReplSlotEntry
{
NameData slotname;
PgStat_Counter spill_txns;
@@ -929,7 +930,7 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter total_txns;
PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
-} PgStat_ReplSlotStats;
+} PgStat_StatReplSlotEntry;
/*
@@ -1031,7 +1032,8 @@ extern void pgstat_report_recovery_conflict(int reason);
extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
+extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
+extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
@@ -1129,7 +1131,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
-extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50df..357068403a1 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6dff5439e00..572bc2057cc 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.total_txns,
s.total_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
+ FROM pg_replication_slots r,
+ LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+ WHERE (r.datoid IS NOT NULL);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c7aff677d4b..878b67a276d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1870,12 +1870,12 @@ PgStat_MsgTabstat
PgStat_MsgTempFile
PgStat_MsgVacuum
PgStat_MsgWal
-PgStat_ReplSlotStats
PgStat_SLRUStats
PgStat_Shared_Reset_Target
PgStat_Single_Reset_Type
PgStat_StatDBEntry
PgStat_StatFuncEntry
+PgStat_StatReplSlotEntry
PgStat_StatTabEntry
PgStat_SubXactStatus
PgStat_TableCounts