aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c54
1 files changed, 51 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ae1b391bdae..2e79302a48a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3332,6 +3332,7 @@ void
ApplyWorkerMain(Datum main_arg)
{
int worker_slot = DatumGetInt32(main_arg);
+ MemoryContext cctx = CurrentMemoryContext;
MemoryContext oldctx;
char originname[NAMEDATALEN];
XLogRecPtr origin_startpos;
@@ -3432,8 +3433,30 @@ ApplyWorkerMain(Datum main_arg)
{
char *syncslotname;
- /* This is table synchronization worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+ PG_TRY();
+ {
+ /* This is table synchronization worker, call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+ }
+ PG_CATCH();
+ {
+ MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+ ErrorData *errdata = CopyErrorData();
+
+ /*
+ * Report the table sync error. There is no corresponding message
+ * type for table synchronization.
+ */
+ pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relid,
+ 0, /* message type */
+ InvalidTransactionId,
+ errdata->message);
+ MemoryContextSwitchTo(ecxt);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
/* allocate slot name in long-lived context */
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3551,7 +3574,32 @@ ApplyWorkerMain(Datum main_arg)
}
/* Run the main loop. */
- LogicalRepApplyLoop(origin_startpos);
+ PG_TRY();
+ {
+ LogicalRepApplyLoop(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ /* report the apply error */
+ if (apply_error_callback_arg.command != 0)
+ {
+ MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+ ErrorData *errdata = CopyErrorData();
+
+ pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ apply_error_callback_arg.rel != NULL
+ ? apply_error_callback_arg.rel->localreloid
+ : InvalidOid,
+ apply_error_callback_arg.command,
+ apply_error_callback_arg.remote_xid,
+ errdata->message);
+ MemoryContextSwitchTo(ecxt);
+ }
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
proc_exit(0);
}