aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c91
1 files changed, 90 insertions, 1 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab4..651a7750653 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -106,6 +106,7 @@
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
@@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
*
* The returned slot name is palloc'ed in current memory context.
*/
-char *
+static char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
char *slotname;
@@ -1585,6 +1586,94 @@ FetchTableStates(bool *started_tx)
}
/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
+{
+ char *sync_slotname = NULL;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ sync_slotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
+ pfree(sync_slotname);
+}
+
+/*
+ * Runs the tablesync worker.
+ *
+ * It starts syncing tables. After a successful sync, sets streaming options
+ * and starts streaming to catchup with apply worker.
+ */
+static void
+run_tablesync_worker()
+{
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *slotname = NULL;
+ WalRcvStreamOptions options;
+
+ start_table_sync(&origin_startpos, &slotname);
+
+ ReplicationOriginNameForLogicalRep(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+
+ set_apply_error_context_origin(originname);
+
+ set_stream_options(&options, slotname, &origin_startpos);
+
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+
+ /* Apply the changes till we catchup with the apply worker. */
+ start_apply(origin_startpos);
+}
+
+/* Logical Replication Tablesync worker entry point */
+void
+TablesyncWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+
+ SetupApplyOrSyncWorker(worker_slot);
+
+ run_tablesync_worker();
+
+ finish_sync_worker();
+}
+
+/*
* If the subscription has no tables then return false.
*
* Otherwise, are all tablesyncs READY?