diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 91 |
1 files changed, 90 insertions, 1 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6d461654ab4..651a7750653 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -106,6 +106,7 @@ #include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" +#include "replication/logicalworker.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "replication/slot.h" @@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, * * The returned slot name is palloc'ed in current memory context. */ -char * +static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { char *slotname; @@ -1585,6 +1586,94 @@ FetchTableStates(bool *started_tx) } /* + * Execute the initial sync with error handling. Disable the subscription, + * if it's required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_table_sync(XLogRecPtr *origin_startpos, char **slotname) +{ + char *sync_slotname = NULL; + + Assert(am_tablesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + sync_slotname = LogicalRepSyncTableStart(origin_startpos); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during table synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); + + /* allocate slot name in long-lived context */ + *slotname = MemoryContextStrdup(ApplyContext, sync_slotname); + pfree(sync_slotname); +} + +/* + * Runs the tablesync worker. + * + * It starts syncing tables. After a successful sync, sets streaming options + * and starts streaming to catchup with apply worker. + */ +static void +run_tablesync_worker() +{ + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *slotname = NULL; + WalRcvStreamOptions options; + + start_table_sync(&origin_startpos, &slotname); + + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + set_apply_error_context_origin(originname); + + set_stream_options(&options, slotname, &origin_startpos); + + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + /* Apply the changes till we catchup with the apply worker. */ + start_apply(origin_startpos); +} + +/* Logical Replication Tablesync worker entry point */ +void +TablesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + run_tablesync_worker(); + + finish_sync_worker(); +} + +/* * If the subscription has no tables then return false. * * Otherwise, are all tablesyncs READY? |