diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ae1b391bdae..2e79302a48a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3332,6 +3332,7 @@ void ApplyWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); + MemoryContext cctx = CurrentMemoryContext; MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; @@ -3432,8 +3433,30 @@ ApplyWorkerMain(Datum main_arg) { char *syncslotname; - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); + PG_TRY(); + { + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + } + PG_CATCH(); + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + /* + * Report the table sync error. There is no corresponding message + * type for table synchronization. + */ + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relid, + 0, /* message type */ + InvalidTransactionId, + errdata->message); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + PG_END_TRY(); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); @@ -3551,7 +3574,32 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - LogicalRepApplyLoop(origin_startpos); + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + /* report the apply error */ + if (apply_error_callback_arg.command != 0) + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + apply_error_callback_arg.rel != NULL + ? apply_error_callback_arg.rel->localreloid + : InvalidOid, + apply_error_callback_arg.command, + apply_error_callback_arg.remote_xid, + errdata->message); + MemoryContextSwitchTo(ecxt); + } + + PG_RE_THROW(); + } + PG_END_TRY(); proc_exit(0); } |