aboutsummaryrefslogtreecommitdiff
path: root/src/backend/postmaster/pgstat.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r--src/backend/postmaster/pgstat.c340
1 files changed, 196 insertions, 144 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 6e8dee97842..ba335fd3429 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,6 +106,7 @@
#define PGSTAT_DB_HASH_SIZE 16
#define PGSTAT_TAB_HASH_SIZE 512
#define PGSTAT_FUNCTION_HASH_SIZE 512
+#define PGSTAT_REPLSLOT_HASH_SIZE 32
/* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int nReplSlotStats;
+static HTAB *replSlotStatHash = NULL;
static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
/*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
-static int pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
@@ -1110,6 +1110,24 @@ pgstat_vacuum_stat(void)
hash_destroy(htab);
/*
+ * Search for all the dead replication slots in stats hashtable and tell
+ * the stats collector to drop them.
+ */
+ if (replSlotStatHash)
+ {
+ PgStat_StatReplSlotEntry *slotentry;
+
+ hash_seq_init(&hstat, replSlotStatHash);
+ while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+ pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+ }
+ }
+
+ /*
* Lookup our own database entry; if not found, nothing more to do.
*/
dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
if (name)
{
- ReplicationSlot *slot;
-
- /*
- * Check if the slot exists with the given name. It is possible that by
- * the time this message is executed the slot is dropped but at least
- * this check will ensure that the given name is for a valid slot.
- */
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- slot = SearchNamedReplicationSlot(name);
- LWLockRelease(ReplicationSlotControlLock);
-
- if (!slot)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" does not exist",
- name)));
-
- /*
- * Nothing to do for physical slots as we collect stats only for
- * logical slots.
- */
- if (SlotIsPhysical(slot))
- return;
-
namestrcpy(&msg.m_slotname, name);
msg.clearall = false;
}
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
* ----------
*/
void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
{
PgStat_MsgReplSlot msg;
@@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
*/
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
+ msg.m_create = false;
msg.m_drop = false;
msg.m_spill_txns = repSlotStat->spill_txns;
msg.m_spill_count = repSlotStat->spill_count;
@@ -1835,6 +1830,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
}
/* ----------
+ * pgstat_report_replslot_create() -
+ *
+ * Tell the collector about creating the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_create(const char *slotname)
+{
+ PgStat_MsgReplSlot msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ namestrcpy(&msg.m_slotname, slotname);
+ msg.m_create = true;
+ msg.m_drop = false;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
* pgstat_report_replslot_drop() -
*
* Tell the collector about dropping the replication slot.
@@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname)
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, slotname);
+ msg.m_create = false;
msg.m_drop = true;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void)
* pgstat_fetch_replslot() -
*
* Support function for the SQL-callable pgstat* functions. Returns
- * a pointer to the replication slot statistics struct and sets the
- * number of entries in nslots_p.
+ * a pointer to the replication slot statistics struct.
* ---------
*/
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_StatReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
{
backend_read_statsfile();
- *nslots_p = nReplSlotStats;
- return replSlotStats;
+ return pgstat_get_replslot_entry(slotname, false);
}
/*
@@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
- int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
/*
* Write replication slot stats struct
*/
- for (i = 0; i < nReplSlotStats; i++)
+ if (replSlotStatHash)
{
- fputc('R', fpout);
- rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
+ PgStat_StatReplSlotEntry *slotent;
+
+ hash_seq_init(&hstat, replSlotStatHash);
+ while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ fputc('R', fpout);
+ rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
}
/*
@@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- /* Allocate the space for replication slot statistics */
- replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
- max_replication_slots
- * sizeof(PgStat_ReplSlotStats));
- nReplSlotStats = 0;
-
/*
* Clear out global, archiver, WAL and SLRU statistics so they start from
* zero in case we can't load an existing statsfile.
@@ -4006,12 +4017,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
- * Set the same reset timestamp for all replication slots too.
- */
- for (i = 0; i < max_replication_slots; i++)
- replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
- /*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
@@ -4197,21 +4202,43 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
/*
- * 'R' A PgStat_ReplSlotStats struct describing a replication
- * slot follows.
+ * 'R' A PgStat_StatReplSlotEntry struct describing a
+ * replication slot follows.
*/
case 'R':
- if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
- != sizeof(PgStat_ReplSlotStats))
{
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
- goto done;
+ PgStat_StatReplSlotEntry slotbuf;
+ PgStat_StatReplSlotEntry *slotent;
+
+ if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+ != sizeof(PgStat_StatReplSlotEntry))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ /* Create hash table if we don't have it already. */
+ if (replSlotStatHash == NULL)
+ {
+ HASHCTL hash_ctl;
+
+ hash_ctl.keysize = sizeof(NameData);
+ hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+ hash_ctl.hcxt = pgStatLocalContext;
+ replSlotStatHash = hash_create("Replication slots hash",
+ PGSTAT_REPLSLOT_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+ (void *) &slotbuf.slotname,
+ HASH_ENTER, NULL);
+ memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
+ break;
}
- nReplSlotStats++;
- break;
case 'E':
goto done;
@@ -4424,7 +4451,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_ArchiverStats myArchiverStats;
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
- PgStat_ReplSlotStats myReplSlotStats;
+ PgStat_StatReplSlotEntry myReplSlotStats;
PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
FILE *fpin;
int32 format_id;
@@ -4553,12 +4580,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break;
/*
- * 'R' A PgStat_ReplSlotStats struct describing a replication
- * slot follows.
+ * 'R' A PgStat_StatReplSlotEntry struct describing a
+ * replication slot follows.
*/
case 'R':
- if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
- != sizeof(PgStat_ReplSlotStats))
+ if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
+ != sizeof(PgStat_StatReplSlotEntry))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
@@ -4764,8 +4791,7 @@ pgstat_clear_snapshot(void)
/* Reset variables */
pgStatLocalContext = NULL;
pgStatDBHash = NULL;
- replSlotStats = NULL;
- nReplSlotStats = 0;
+ replSlotStatHash = NULL;
/*
* Historically the backend_status.c facilities lived in this file, and
@@ -5189,20 +5215,26 @@ static void
pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
int len)
{
- int i;
- int idx = -1;
+ PgStat_StatReplSlotEntry *slotent;
TimestampTz ts;
+ /* Return if we don't have replication slot statistics */
+ if (replSlotStatHash == NULL)
+ return;
+
ts = GetCurrentTimestamp();
if (msg->clearall)
{
- for (i = 0; i < nReplSlotStats; i++)
- pgstat_reset_replslot(i, ts);
+ HASH_SEQ_STATUS sstat;
+
+ hash_seq_init(&sstat, replSlotStatHash);
+ while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+ pgstat_reset_replslot(slotent, ts);
}
else
{
- /* Get the index of replication slot statistics to reset */
- idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+ /* Get the slot statistics to reset */
+ slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
/*
* Nothing to do if the given slot entry is not found. This could
@@ -5210,11 +5242,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
* corresponding statistics entry is also removed before receiving the
* reset message.
*/
- if (idx < 0)
+ if (!slotent)
return;
/* Reset the stats for the requested replication slot */
- pgstat_reset_replslot(idx, ts);
+ pgstat_reset_replslot(slotent, ts);
}
}
@@ -5532,46 +5564,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
static void
pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
{
- int idx;
-
- /*
- * Get the index of replication slot statistics. On dropping, we don't
- * create the new statistics.
- */
- idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
- /*
- * The slot entry is not found or there is no space to accommodate the new
- * entry. This could happen when the message for the creation of a slot
- * reached before the drop message even though the actual operations
- * happen in reverse order. In such a case, the next update of the
- * statistics for the same slot will create the required entry.
- */
- if (idx < 0)
- return;
-
- /* it must be a valid replication slot index */
- Assert(idx < nReplSlotStats);
-
if (msg->m_drop)
{
+ Assert(!msg->m_create);
+
/* Remove the replication slot statistics with the given name */
- if (idx < nReplSlotStats - 1)
- memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
- sizeof(PgStat_ReplSlotStats));
- nReplSlotStats--;
+ if (replSlotStatHash != NULL)
+ (void) hash_search(replSlotStatHash,
+ (void *) &(msg->m_slotname),
+ HASH_REMOVE,
+ NULL);
}
else
{
- /* Update the replication slot statistics */
- replSlotStats[idx].spill_txns += msg->m_spill_txns;
- replSlotStats[idx].spill_count += msg->m_spill_count;
- replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
- replSlotStats[idx].stream_txns += msg->m_stream_txns;
- replSlotStats[idx].stream_count += msg->m_stream_count;
- replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
- replSlotStats[idx].total_txns += msg->m_total_txns;
- replSlotStats[idx].total_bytes += msg->m_total_bytes;
+ PgStat_StatReplSlotEntry *slotent;
+
+ slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+ Assert(slotent);
+
+ if (msg->m_create)
+ {
+ /*
+ * If the message for dropping the slot with the same name gets
+ * lost, slotent has stats for the old slot. So we initialize all
+ * counters at slot creation.
+ */
+ pgstat_reset_replslot(slotent, 0);
+ }
+ else
+ {
+ /* Update the replication slot statistics */
+ slotent->spill_txns += msg->m_spill_txns;
+ slotent->spill_count += msg->m_spill_count;
+ slotent->spill_bytes += msg->m_spill_bytes;
+ slotent->stream_txns += msg->m_stream_txns;
+ slotent->stream_count += msg->m_stream_count;
+ slotent->stream_bytes += msg->m_stream_bytes;
+ slotent->total_txns += msg->m_total_txns;
+ slotent->total_bytes += msg->m_total_bytes;
+ }
}
}
@@ -5749,59 +5780,80 @@ pgstat_db_requested(Oid databaseid)
}
/* ----------
- * pgstat_replslot_index
+ * pgstat_replslot_entry
*
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
*
- * create_it tells whether to create the new slot entry if it is not found.
+ * create tells whether to create the new slot entry if it is not found.
* ----------
*/
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_StatReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create)
{
- int i;
+ PgStat_StatReplSlotEntry *slotent;
+ bool found;
- Assert(nReplSlotStats <= max_replication_slots);
- for (i = 0; i < nReplSlotStats; i++)
+ if (replSlotStatHash == NULL)
{
- if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
- return i; /* found */
+ HASHCTL hash_ctl;
+
+ /*
+ * Quick return NULL if the hash table is empty and the caller didn't
+ * request to create the entry.
+ */
+ if (!create)
+ return NULL;
+
+ hash_ctl.keysize = sizeof(NameData);
+ hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
+ replSlotStatHash = hash_create("Replication slots hash",
+ PGSTAT_REPLSLOT_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
}
- /*
- * The slot is not found. We don't want to register the new statistics if
- * the list is already full or the caller didn't request.
- */
- if (i == max_replication_slots || !create_it)
- return -1;
+ slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
+ (void *) &name,
+ create ? HASH_ENTER : HASH_FIND,
+ &found);
- /* Register new slot */
- memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
- namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+ if (!slotent)
+ {
+ /* not found */
+ Assert(!create && !found);
+ return NULL;
+ }
+
+ /* initialize the entry */
+ if (create && !found)
+ {
+ namestrcpy(&(slotent->slotname), NameStr(name));
+ pgstat_reset_replslot(slotent, 0);
+ }
- return nReplSlotStats++;
+ return slotent;
}
/* ----------
* pgstat_reset_replslot
*
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
* ----------
*/
static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
{
/* reset only counters. Don't clear slot name */
- replSlotStats[i].spill_txns = 0;
- replSlotStats[i].spill_count = 0;
- replSlotStats[i].spill_bytes = 0;
- replSlotStats[i].stream_txns = 0;
- replSlotStats[i].stream_count = 0;
- replSlotStats[i].stream_bytes = 0;
- replSlotStats[i].total_txns = 0;
- replSlotStats[i].total_bytes = 0;
- replSlotStats[i].stat_reset_timestamp = ts;
+ slotent->spill_txns = 0;
+ slotent->spill_count = 0;
+ slotent->spill_bytes = 0;
+ slotent->stream_txns = 0;
+ slotent->stream_count = 0;
+ slotent->stream_bytes = 0;
+ slotent->total_txns = 0;
+ slotent->total_bytes = 0;
+ slotent->stat_reset_timestamp = ts;
}
/*