diff options
Diffstat (limited to 'src/backend/postmaster/pgstat.c')
-rw-r--r-- | src/backend/postmaster/pgstat.c | 340 |
1 files changed, 196 insertions, 144 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6e8dee97842..ba335fd3429 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -106,6 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 +#define PGSTAT_REPLSLOT_HASH_SIZE 32 /* ---------- @@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; -static PgStat_ReplSlotStats *replSlotStats; -static int nReplSlotStats; +static HTAB *replSlotStatHash = NULL; static PgStat_RecoveryPrefetchStats recoveryPrefetchStats; /* @@ -319,8 +319,8 @@ static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); static bool pgstat_db_requested(Oid databaseid); -static int pgstat_replslot_index(const char *name, bool create_it); -static void pgstat_reset_replslot(int i, TimestampTz ts); +static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); +static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts); static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); @@ -1110,6 +1110,24 @@ pgstat_vacuum_stat(void) hash_destroy(htab); /* + * Search for all the dead replication slots in stats hashtable and tell + * the stats collector to drop them. + */ + if (replSlotStatHash) + { + PgStat_StatReplSlotEntry *slotentry; + + hash_seq_init(&hstat, replSlotStatHash); + while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL) + pgstat_report_replslot_drop(NameStr(slotentry->slotname)); + } + } + + /* * Lookup our own database entry; if not found, nothing more to do. */ dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, @@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name) if (name) { - ReplicationSlot *slot; - - /* - * Check if the slot exists with the given name. It is possible that by - * the time this message is executed the slot is dropped but at least - * this check will ensure that the given name is for a valid slot. - */ - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - slot = SearchNamedReplicationSlot(name); - LWLockRelease(ReplicationSlotControlLock); - - if (!slot) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot \"%s\" does not exist", - name))); - - /* - * Nothing to do for physical slots as we collect stats only for - * logical slots. - */ - if (SlotIsPhysical(slot)) - return; - namestrcpy(&msg.m_slotname, name); msg.clearall = false; } @@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize) * ---------- */ void -pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) +pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat) { PgStat_MsgReplSlot msg; @@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) */ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname)); + msg.m_create = false; msg.m_drop = false; msg.m_spill_txns = repSlotStat->spill_txns; msg.m_spill_count = repSlotStat->spill_count; @@ -1835,6 +1830,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) } /* ---------- + * pgstat_report_replslot_create() - + * + * Tell the collector about creating the replication slot. + * ---------- + */ +void +pgstat_report_replslot_create(const char *slotname) +{ + PgStat_MsgReplSlot msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + namestrcpy(&msg.m_slotname, slotname); + msg.m_create = true; + msg.m_drop = false; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} + +/* ---------- * pgstat_report_replslot_drop() - * * Tell the collector about dropping the replication slot. @@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname) pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); namestrcpy(&msg.m_slotname, slotname); + msg.m_create = false; msg.m_drop = true; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void) * pgstat_fetch_replslot() - * * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the replication slot statistics struct and sets the - * number of entries in nslots_p. + * a pointer to the replication slot statistics struct. * --------- */ -PgStat_ReplSlotStats * -pgstat_fetch_replslot(int *nslots_p) +PgStat_StatReplSlotEntry * +pgstat_fetch_replslot(NameData slotname) { backend_read_statsfile(); - *nslots_p = nReplSlotStats; - return replSlotStats; + return pgstat_get_replslot_entry(slotname, false); } /* @@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; - int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) /* * Write replication slot stats struct */ - for (i = 0; i < nReplSlotStats; i++) + if (replSlotStatHash) { - fputc('R', fpout); - rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + PgStat_StatReplSlotEntry *slotent; + + hash_seq_init(&hstat, replSlotStatHash); + while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('R', fpout); + rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } } /* @@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* Allocate the space for replication slot statistics */ - replSlotStats = MemoryContextAllocZero(pgStatLocalContext, - max_replication_slots - * sizeof(PgStat_ReplSlotStats)); - nReplSlotStats = 0; - /* * Clear out global, archiver, WAL and SLRU statistics so they start from * zero in case we can't load an existing statsfile. @@ -4006,12 +4017,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; /* - * Set the same reset timestamp for all replication slots too. - */ - for (i = 0; i < max_replication_slots; i++) - replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - - /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch * with empty counters. @@ -4197,21 +4202,43 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication - * slot follows. + * 'R' A PgStat_StatReplSlotEntry struct describing a + * replication slot follows. */ case 'R': - if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - goto done; + PgStat_StatReplSlotEntry slotbuf; + PgStat_StatReplSlotEntry *slotent; + + if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin) + != sizeof(PgStat_StatReplSlotEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* Create hash table if we don't have it already. */ + if (replSlotStatHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(NameData); + hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry); + hash_ctl.hcxt = pgStatLocalContext; + replSlotStatHash = hash_create("Replication slots hash", + PGSTAT_REPLSLOT_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash, + (void *) &slotbuf.slotname, + HASH_ENTER, NULL); + memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry)); + break; } - nReplSlotStats++; - break; case 'E': goto done; @@ -4424,7 +4451,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_ArchiverStats myArchiverStats; PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; - PgStat_ReplSlotStats myReplSlotStats; + PgStat_StatReplSlotEntry myReplSlotStats; PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats; FILE *fpin; int32 format_id; @@ -4553,12 +4580,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication - * slot follows. + * 'R' A PgStat_StatReplSlotEntry struct describing a + * replication slot follows. */ case 'R': - if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) + if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin) + != sizeof(PgStat_StatReplSlotEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", @@ -4764,8 +4791,7 @@ pgstat_clear_snapshot(void) /* Reset variables */ pgStatLocalContext = NULL; pgStatDBHash = NULL; - replSlotStats = NULL; - nReplSlotStats = 0; + replSlotStatHash = NULL; /* * Historically the backend_status.c facilities lived in this file, and @@ -5189,20 +5215,26 @@ static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len) { - int i; - int idx = -1; + PgStat_StatReplSlotEntry *slotent; TimestampTz ts; + /* Return if we don't have replication slot statistics */ + if (replSlotStatHash == NULL) + return; + ts = GetCurrentTimestamp(); if (msg->clearall) { - for (i = 0; i < nReplSlotStats; i++) - pgstat_reset_replslot(i, ts); + HASH_SEQ_STATUS sstat; + + hash_seq_init(&sstat, replSlotStatHash); + while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL) + pgstat_reset_replslot(slotent, ts); } else { - /* Get the index of replication slot statistics to reset */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), false); + /* Get the slot statistics to reset */ + slotent = pgstat_get_replslot_entry(msg->m_slotname, false); /* * Nothing to do if the given slot entry is not found. This could @@ -5210,11 +5242,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, * corresponding statistics entry is also removed before receiving the * reset message. */ - if (idx < 0) + if (!slotent) return; /* Reset the stats for the requested replication slot */ - pgstat_reset_replslot(idx, ts); + pgstat_reset_replslot(slotent, ts); } } @@ -5532,46 +5564,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) { - int idx; - - /* - * Get the index of replication slot statistics. On dropping, we don't - * create the new statistics. - */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop); - - /* - * The slot entry is not found or there is no space to accommodate the new - * entry. This could happen when the message for the creation of a slot - * reached before the drop message even though the actual operations - * happen in reverse order. In such a case, the next update of the - * statistics for the same slot will create the required entry. - */ - if (idx < 0) - return; - - /* it must be a valid replication slot index */ - Assert(idx < nReplSlotStats); - if (msg->m_drop) { + Assert(!msg->m_create); + /* Remove the replication slot statistics with the given name */ - if (idx < nReplSlotStats - 1) - memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], - sizeof(PgStat_ReplSlotStats)); - nReplSlotStats--; + if (replSlotStatHash != NULL) + (void) hash_search(replSlotStatHash, + (void *) &(msg->m_slotname), + HASH_REMOVE, + NULL); } else { - /* Update the replication slot statistics */ - replSlotStats[idx].spill_txns += msg->m_spill_txns; - replSlotStats[idx].spill_count += msg->m_spill_count; - replSlotStats[idx].spill_bytes += msg->m_spill_bytes; - replSlotStats[idx].stream_txns += msg->m_stream_txns; - replSlotStats[idx].stream_count += msg->m_stream_count; - replSlotStats[idx].stream_bytes += msg->m_stream_bytes; - replSlotStats[idx].total_txns += msg->m_total_txns; - replSlotStats[idx].total_bytes += msg->m_total_bytes; + PgStat_StatReplSlotEntry *slotent; + + slotent = pgstat_get_replslot_entry(msg->m_slotname, true); + Assert(slotent); + + if (msg->m_create) + { + /* + * If the message for dropping the slot with the same name gets + * lost, slotent has stats for the old slot. So we initialize all + * counters at slot creation. + */ + pgstat_reset_replslot(slotent, 0); + } + else + { + /* Update the replication slot statistics */ + slotent->spill_txns += msg->m_spill_txns; + slotent->spill_count += msg->m_spill_count; + slotent->spill_bytes += msg->m_spill_bytes; + slotent->stream_txns += msg->m_stream_txns; + slotent->stream_count += msg->m_stream_count; + slotent->stream_bytes += msg->m_stream_bytes; + slotent->total_txns += msg->m_total_txns; + slotent->total_bytes += msg->m_total_bytes; + } } } @@ -5749,59 +5780,80 @@ pgstat_db_requested(Oid databaseid) } /* ---------- - * pgstat_replslot_index + * pgstat_replslot_entry * - * Return the index of entry of a replication slot with the given name, or - * -1 if the slot is not found. + * Return the entry of replication slot stats with the given name. Return + * NULL if not found and the caller didn't request to create it. * - * create_it tells whether to create the new slot entry if it is not found. + * create tells whether to create the new slot entry if it is not found. * ---------- */ -static int -pgstat_replslot_index(const char *name, bool create_it) +static PgStat_StatReplSlotEntry * +pgstat_get_replslot_entry(NameData name, bool create) { - int i; + PgStat_StatReplSlotEntry *slotent; + bool found; - Assert(nReplSlotStats <= max_replication_slots); - for (i = 0; i < nReplSlotStats; i++) + if (replSlotStatHash == NULL) { - if (namestrcmp(&replSlotStats[i].slotname, name) == 0) - return i; /* found */ + HASHCTL hash_ctl; + + /* + * Quick return NULL if the hash table is empty and the caller didn't + * request to create the entry. + */ + if (!create) + return NULL; + + hash_ctl.keysize = sizeof(NameData); + hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry); + replSlotStatHash = hash_create("Replication slots hash", + PGSTAT_REPLSLOT_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); } - /* - * The slot is not found. We don't want to register the new statistics if - * the list is already full or the caller didn't request. - */ - if (i == max_replication_slots || !create_it) - return -1; + slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash, + (void *) &name, + create ? HASH_ENTER : HASH_FIND, + &found); - /* Register new slot */ - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - namestrcpy(&replSlotStats[nReplSlotStats].slotname, name); + if (!slotent) + { + /* not found */ + Assert(!create && !found); + return NULL; + } + + /* initialize the entry */ + if (create && !found) + { + namestrcpy(&(slotent->slotname), NameStr(name)); + pgstat_reset_replslot(slotent, 0); + } - return nReplSlotStats++; + return slotent; } /* ---------- * pgstat_reset_replslot * - * Reset the replication slot stats at index 'i'. + * Reset the given replication slot stats. * ---------- */ static void -pgstat_reset_replslot(int i, TimestampTz ts) +pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts) { /* reset only counters. Don't clear slot name */ - replSlotStats[i].spill_txns = 0; - replSlotStats[i].spill_count = 0; - replSlotStats[i].spill_bytes = 0; - replSlotStats[i].stream_txns = 0; - replSlotStats[i].stream_count = 0; - replSlotStats[i].stream_bytes = 0; - replSlotStats[i].total_txns = 0; - replSlotStats[i].total_bytes = 0; - replSlotStats[i].stat_reset_timestamp = ts; + slotent->spill_txns = 0; + slotent->spill_count = 0; + slotent->spill_bytes = 0; + slotent->stream_txns = 0; + slotent->stream_count = 0; + slotent->stream_bytes = 0; + slotent->total_txns = 0; + slotent->total_bytes = 0; + slotent->stat_reset_timestamp = ts; } /* |