diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-11-30 08:54:30 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-11-30 08:54:30 +0530 |
commit | 8d74fc96db5fd547e077bf9bf4c3b67f821d71cd (patch) | |
tree | 3037345a7edabd025edcc5d9b431fb14f780e817 /src/backend/replication/logical/worker.c | |
parent | 98105e53e0ab472b7721a3e8d7b9f1750a635120 (diff) | |
download | postgresql-8d74fc96db5fd547e077bf9bf4c3b67f821d71cd.tar.gz postgresql-8d74fc96db5fd547e077bf9bf4c3b67f821d71cd.zip |
Add a view to show the stats of subscription workers.
This commit adds a new system view pg_stat_subscription_workers, that
shows information about any errors which occur during the application of
logical replication changes as well as during performing initial table
synchronization. The subscription statistics entries are removed when the
corresponding subscription is removed.
It also adds an SQL function pg_stat_reset_subscription_worker() to reset
single subscription errors.
The contents of this view can be used by an upcoming patch that skips the
particular transaction that conflicts with the existing data on the
subscriber.
This view can be extended in the future to track other xact related
statistics like the number of xacts committed/aborted for subscription
workers.
Author: Masahiko Sawada
Reviewed-by: Greg Nancarrow, Hou Zhijie, Tang Haiying, Vignesh C, Dilip Kumar, Takamichi Osumi, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ae1b391bdae..2e79302a48a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3332,6 +3332,7 @@ void ApplyWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); + MemoryContext cctx = CurrentMemoryContext; MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; @@ -3432,8 +3433,30 @@ ApplyWorkerMain(Datum main_arg) { char *syncslotname; - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); + PG_TRY(); + { + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + } + PG_CATCH(); + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + /* + * Report the table sync error. There is no corresponding message + * type for table synchronization. + */ + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relid, + 0, /* message type */ + InvalidTransactionId, + errdata->message); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + PG_END_TRY(); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); @@ -3551,7 +3574,32 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - LogicalRepApplyLoop(origin_startpos); + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + /* report the apply error */ + if (apply_error_callback_arg.command != 0) + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + apply_error_callback_arg.rel != NULL + ? apply_error_callback_arg.rel->localreloid + : InvalidOid, + apply_error_callback_arg.command, + apply_error_callback_arg.remote_xid, + errdata->message); + MemoryContextSwitchTo(ecxt); + } + + PG_RE_THROW(); + } + PG_END_TRY(); proc_exit(0); } |