diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-11-30 08:54:30 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-11-30 08:54:30 +0530 |
commit | 8d74fc96db5fd547e077bf9bf4c3b67f821d71cd (patch) | |
tree | 3037345a7edabd025edcc5d9b431fb14f780e817 /src/backend/utils/adt/pgstatfuncs.c | |
parent | 98105e53e0ab472b7721a3e8d7b9f1750a635120 (diff) | |
download | postgresql-8d74fc96db5fd547e077bf9bf4c3b67f821d71cd.tar.gz postgresql-8d74fc96db5fd547e077bf9bf4c3b67f821d71cd.zip |
Add a view to show the stats of subscription workers.
This commit adds a new system view pg_stat_subscription_workers, that
shows information about any errors which occur during the application of
logical replication changes as well as during performing initial table
synchronization. The subscription statistics entries are removed when the
corresponding subscription is removed.
It also adds an SQL function pg_stat_reset_subscription_worker() to reset
single subscription errors.
The contents of this view can be used by an upcoming patch that skips the
particular transaction that conflicts with the existing data on the
subscriber.
This view can be extended in the future to track other xact related
statistics like the number of xacts committed/aborted for subscription
workers.
Author: Masahiko Sawada
Reviewed-by: Greg Nancarrow, Hou Zhijie, Tang Haiying, Vignesh C, Dilip Kumar, Takamichi Osumi, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/backend/utils/adt/pgstatfuncs.c')
-rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 128 |
1 files changed, 126 insertions, 2 deletions
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index e64857e5409..f529c1561ab 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2172,7 +2172,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS) { Oid taboid = PG_GETARG_OID(0); - pgstat_reset_single_counter(taboid, RESET_TABLE); + pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE); PG_RETURN_VOID(); } @@ -2182,11 +2182,38 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS) { Oid funcoid = PG_GETARG_OID(0); - pgstat_reset_single_counter(funcoid, RESET_FUNCTION); + pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION); PG_RETURN_VOID(); } +Datum +pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + Oid relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); + + pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER); + + PG_RETURN_VOID(); +} + +/* Reset all subscription worker stats associated with the given subscription */ +Datum +pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + + /* + * Use subscription drop message to remove statistics of all subscription + * workers. + */ + pgstat_report_subscription_drop(subid); + + PG_RETURN_VOID(); +} + + /* Reset SLRU counters (a specific one or all of them). */ Datum pg_stat_reset_slru(PG_FUNCTION_ARGS) @@ -2380,3 +2407,100 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +/* + * Get the subscription worker statistics for the given subscription + * (and relation). + */ +Datum +pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 8 + Oid subid = PG_GETARG_OID(0); + Oid subrelid; + TupleDesc tupdesc; + Datum values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; + PgStat_StatSubWorkerEntry *wentry; + int i; + + if (PG_ARGISNULL(1)) + subrelid = InvalidOid; + else + subrelid = PG_GETARG_OID(1); + + /* Get subscription worker stats */ + wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid); + + /* Return NULL if there is no worker statistics */ + if (wentry == NULL) + PG_RETURN_NULL(); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + i = 0; + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* subrelid */ + if (OidIsValid(subrelid)) + values[i++] = ObjectIdGetDatum(subrelid); + else + nulls[i++] = true; + + /* last_error_relid */ + if (OidIsValid(wentry->last_error_relid)) + values[i++] = ObjectIdGetDatum(wentry->last_error_relid); + else + nulls[i++] = true; + + /* last_error_command */ + if (wentry->last_error_command != 0) + values[i++] = + CStringGetTextDatum(logicalrep_message_type(wentry->last_error_command)); + else + nulls[i++] = true; + + /* last_error_xid */ + if (TransactionIdIsValid(wentry->last_error_xid)) + values[i++] = TransactionIdGetDatum(wentry->last_error_xid); + else + nulls[i++] = true; + + /* last_error_count */ + values[i++] = Int64GetDatum(wentry->last_error_count); + + /* last_error_message */ + values[i++] = CStringGetTextDatum(wentry->last_error_message); + + /* last_error_time */ + if (wentry->last_error_time != 0) + values[i++] = TimestampTzGetDatum(wentry->last_error_time); + else + nulls[i++] = true; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} |