aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-08-03 08:59:50 +0530
committerAmit Kapila <akapila@postgresql.org>2023-08-03 08:59:50 +0530
commit02c1b64fb15ca018f0c159a0152497c8d6704d40 (patch)
treee00d9c84a1df14e4f34aa457352a1dab762a5490 /src/backend/replication/logical/tablesync.c
parent0125c4e21d7e9c8b3da95ffcd3e34c0f61c9b69a (diff)
downloadpostgresql-02c1b64fb15ca018f0c159a0152497c8d6704d40.tar.gz
postgresql-02c1b64fb15ca018f0c159a0152497c8d6704d40.zip
Refactor to split Apply and Tablesync Workers code.
Both apply and tablesync workers were using ApplyWorkerMain() as entry point. As the name implies, ApplyWorkerMain() should be considered as the main function for apply workers. Tablesync worker's path was hidden and does not have enough in common to share the same main function with apply worker. Also, most of the code shared by both worker types is already combined in LogicalRepApplyLoop(). There is no need to combine the rest in ApplyWorkerMain() anymore. This patch introduces TablesyncWorkerMain() as a new entry point for tablesync workers. This aims to increase code readability and would help with future improvements like the reuse of tablesync workers in the initial synchronization. Author: Melih Mutlu based on suggestions by Melanie Plageman Reviewed-by: Peter Smith, Kuroda Hayato, Amit Kapila Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c91
1 files changed, 90 insertions, 1 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab4..651a7750653 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -106,6 +106,7 @@
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
@@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
*
* The returned slot name is palloc'ed in current memory context.
*/
-char *
+static char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
char *slotname;
@@ -1585,6 +1586,94 @@ FetchTableStates(bool *started_tx)
}
/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
+{
+ char *sync_slotname = NULL;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ sync_slotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
+ pfree(sync_slotname);
+}
+
+/*
+ * Runs the tablesync worker.
+ *
+ * It starts syncing tables. After a successful sync, sets streaming options
+ * and starts streaming to catchup with apply worker.
+ */
+static void
+run_tablesync_worker()
+{
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *slotname = NULL;
+ WalRcvStreamOptions options;
+
+ start_table_sync(&origin_startpos, &slotname);
+
+ ReplicationOriginNameForLogicalRep(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+
+ set_apply_error_context_origin(originname);
+
+ set_stream_options(&options, slotname, &origin_startpos);
+
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+
+ /* Apply the changes till we catchup with the apply worker. */
+ start_apply(origin_startpos);
+}
+
+/* Logical Replication Tablesync worker entry point */
+void
+TablesyncWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+
+ SetupApplyOrSyncWorker(worker_slot);
+
+ run_tablesync_worker();
+
+ finish_sync_worker();
+}
+
+/*
* If the subscription has no tables then return false.
*
* Otherwise, are all tablesyncs READY?