aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/postmaster/bgworker.c3
-rw-r--r--src/backend/replication/logical/applyparallelworker.c2
-rw-r--r--src/backend/replication/logical/launcher.c32
-rw-r--r--src/backend/replication/logical/tablesync.c91
-rw-r--r--src/backend/replication/logical/worker.c380
-rw-r--r--src/include/replication/logicalworker.h1
-rw-r--r--src/include/replication/worker_internal.h14
7 files changed, 299 insertions, 224 deletions
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5b4bd71694b..505e38376c3 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -131,6 +131,9 @@ static const struct
},
{
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
+ },
+ {
+ "TablesyncWorkerMain", TablesyncWorkerMain
}
};
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 6fb96148f4a..1d4e83c4c1f 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg)
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
MyLogicalRepWorker->reply_time = 0;
- InitializeApplyWorker();
+ InitializeLogRepWorker();
InitializingApplyWorker = false;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d863d..e231fa7f951 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -459,24 +459,30 @@ retry:
snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
if (is_parallel_apply_worker)
+ {
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
- else
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-
- if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u sync %u", subid, relid);
- else if (is_parallel_apply_worker)
+ "logical replication parallel apply worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+ }
+ else if (OidIsValid(relid))
+ {
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication parallel apply worker for subscription %u", subid);
+ "logical replication tablesync worker for subscription %u sync %u",
+ subid,
+ relid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+ }
else
+ {
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication apply worker for subscription %u", subid);
-
- if (is_parallel_apply_worker)
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
- else
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
+ "logical replication apply worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+ }
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab4..651a7750653 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -106,6 +106,7 @@
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
@@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
*
* The returned slot name is palloc'ed in current memory context.
*/
-char *
+static char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
char *slotname;
@@ -1585,6 +1586,94 @@ FetchTableStates(bool *started_tx)
}
/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
+{
+ char *sync_slotname = NULL;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ sync_slotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
+ pfree(sync_slotname);
+}
+
+/*
+ * Runs the tablesync worker.
+ *
+ * It starts syncing tables. After a successful sync, sets streaming options
+ * and starts streaming to catchup with apply worker.
+ */
+static void
+run_tablesync_worker()
+{
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *slotname = NULL;
+ WalRcvStreamOptions options;
+
+ start_table_sync(&origin_startpos, &slotname);
+
+ ReplicationOriginNameForLogicalRep(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+
+ set_apply_error_context_origin(originname);
+
+ set_stream_options(&options, slotname, &origin_startpos);
+
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+
+ /* Apply the changes till we catchup with the apply worker. */
+ start_apply(origin_startpos);
+}
+
+/* Logical Replication Tablesync worker entry point */
+void
+TablesyncWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+
+ SetupApplyOrSyncWorker(worker_slot);
+
+ run_tablesync_worker();
+
+ finish_sync_worker();
+}
+
+/*
* If the subscription has no tables then return false.
*
* Otherwise, are all tablesyncs READY?
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf7642..a9f5fa7dfc0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -396,8 +396,6 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
-static void DisableSubscriptionAndExit(void);
-
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -4328,6 +4326,57 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
}
/*
+ * Sets streaming options including replication slot name and origin start
+ * position. Workers need these options for logical replication.
+ */
+void
+set_stream_options(WalRcvStreamOptions *options,
+ char *slotname,
+ XLogRecPtr *origin_startpos)
+{
+ int server_version;
+
+ options->logical = true;
+ options->startpoint = *origin_startpos;
+ options->slotname = slotname;
+
+ server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+ options->proto.logical.proto_version =
+ server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
+ server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+ server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+ LOGICALREP_PROTO_VERSION_NUM;
+
+ options->proto.logical.publication_names = MySubscription->publications;
+ options->proto.logical.binary = MySubscription->binary;
+
+ /*
+ * Assign the appropriate option value for streaming option according to
+ * the 'streaming' mode and the publisher's ability to support that mode.
+ */
+ if (server_version >= 160000 &&
+ MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
+ {
+ options->proto.logical.streaming_str = "parallel";
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != LOGICALREP_STREAM_OFF)
+ {
+ options->proto.logical.streaming_str = "on";
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+ else
+ {
+ options->proto.logical.streaming_str = NULL;
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+
+ options->proto.logical.twophase = false;
+ options->proto.logical.origin = pstrdup(MySubscription->origin);
+}
+
+/*
* Cleanup the memory for subxacts and reset the related variables.
*/
static inline void
@@ -4361,24 +4410,18 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
}
/*
- * Execute the initial sync with error handling. Disable the subscription,
- * if it's required.
+ * Common function to run the apply loop with error handling. Disable the
+ * subscription, if necessary.
*
- * Allocate the slot name in long-lived context on return. Note that we don't
- * handle FATAL errors which are probably because of system resource error and
- * are not repeatable.
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
*/
-static void
-start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+void
+start_apply(XLogRecPtr origin_startpos)
{
- char *syncslotname = NULL;
-
- Assert(am_tablesync_worker());
-
PG_TRY();
{
- /* Call initial sync. */
- syncslotname = LogicalRepSyncTableStart(origin_startpos);
+ LogicalRepApplyLoop(origin_startpos);
}
PG_CATCH();
{
@@ -4387,65 +4430,132 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
else
{
/*
- * Report the worker failed during table synchronization. Abort
- * the current transaction so that the stats message is sent in an
+ * Report the worker failed while applying changes. Abort the
+ * current transaction so that the stats message is sent in an
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, false);
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
PG_RE_THROW();
}
}
PG_END_TRY();
-
- /* allocate slot name in long-lived context */
- *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
- pfree(syncslotname);
}
/*
- * Run the apply loop with error handling. Disable the subscription,
- * if necessary.
+ * Runs the leader apply worker.
*
- * Note that we don't handle FATAL errors which are probably because
- * of system resource error and are not repeatable.
+ * It sets up replication origin, streaming options and then starts streaming.
*/
static void
-start_apply(XLogRecPtr origin_startpos)
+run_apply_worker()
{
- PG_TRY();
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *slotname = NULL;
+ WalRcvStreamOptions options;
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char *err;
+ bool must_use_password;
+
+ slotname = MySubscription->slotname;
+
+ /*
+ * This shouldn't happen if the subscription is enabled, but guard against
+ * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
+ * slot is NULL.)
+ */
+ if (!slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription has no replication slot set")));
+
+ /* Setup replication origin tracking. */
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ originname, sizeof(originname));
+ StartTransactionCommand();
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+ replorigin_session_setup(originid, 0);
+ replorigin_session_origin = originid;
+ origin_startpos = replorigin_session_get_progress(false);
+
+ /* Is the use of a password mandatory? */
+ must_use_password = MySubscription->passwordrequired &&
+ !superuser_arg(MySubscription->owner);
+
+ /* Note that the superuser_arg call can access the DB */
+ CommitTransactionCommand();
+
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ must_use_password,
+ MySubscription->name, &err);
+
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
+
+ /*
+ * We don't really use the output identify_system for anything but it does
+ * some initializations on the upstream so let's still call it.
+ */
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ set_apply_error_context_origin(originname);
+
+ set_stream_options(&options, slotname, &origin_startpos);
+
+ /*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * the tri-state PENDING until all tablesyncs have reached READY state.
+ * Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
{
- LogicalRepApplyLoop(origin_startpos);
+ /* Start streaming with two_phase enabled */
+ options.proto.logical.twophase = true;
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+
+ StartTransactionCommand();
+ UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+ MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ CommitTransactionCommand();
}
- PG_CATCH();
+ else
{
- if (MySubscription->disableonerr)
- DisableSubscriptionAndExit();
- else
- {
- /*
- * Report the worker failed while applying changes. Abort the
- * current transaction so that the stats message is sent in an
- * idle state.
- */
- AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
-
- PG_RE_THROW();
- }
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
}
- PG_END_TRY();
+
+ ereport(DEBUG1,
+ (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
+ MySubscription->name,
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+ "?")));
+
+ /* Run the main loop. */
+ start_apply(origin_startpos);
}
/*
- * Common initialization for leader apply worker and parallel apply worker.
+ * Common initialization for leader apply worker, parallel apply worker and
+ * tablesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
*/
void
-InitializeApplyWorker(void)
+InitializeLogRepWorker(void)
{
MemoryContext oldctx;
@@ -4518,22 +4628,15 @@ InitializeApplyWorker(void)
CommitTransactionCommand();
}
-/* Logical Replication Apply worker entry point */
+/* Common function to setup the leader apply or tablesync worker. */
void
-ApplyWorkerMain(Datum main_arg)
+SetupApplyOrSyncWorker(int worker_slot)
{
- int worker_slot = DatumGetInt32(main_arg);
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos = InvalidXLogRecPtr;
- char *myslotname = NULL;
- WalRcvStreamOptions options;
- int server_version;
-
- InitializingApplyWorker = true;
-
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
+ Assert(am_tablesync_worker() || am_leader_apply_worker());
+
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
@@ -4551,79 +4654,12 @@ ApplyWorkerMain(Datum main_arg)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- InitializeApplyWorker();
-
- InitializingApplyWorker = false;
+ InitializeLogRepWorker();
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
- if (am_tablesync_worker())
- {
- start_table_sync(&origin_startpos, &myslotname);
-
- ReplicationOriginNameForLogicalRep(MySubscription->oid,
- MyLogicalRepWorker->relid,
- originname,
- sizeof(originname));
- set_apply_error_context_origin(originname);
- }
- else
- {
- /* This is the leader apply worker */
- RepOriginId originid;
- TimeLineID startpointTLI;
- char *err;
- bool must_use_password;
-
- myslotname = MySubscription->slotname;
-
- /*
- * This shouldn't happen if the subscription is enabled, but guard
- * against DDL bugs or manual catalog changes. (libpqwalreceiver will
- * crash if slot is NULL.)
- */
- if (!myslotname)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("subscription has no replication slot set")));
-
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
- originname, sizeof(originname));
- originid = replorigin_by_name(originname, true);
- if (!OidIsValid(originid))
- originid = replorigin_create(originname);
- replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
- origin_startpos = replorigin_session_get_progress(false);
-
- /* Is the use of a password mandatory? */
- must_use_password = MySubscription->passwordrequired &&
- !superuser_arg(MySubscription->owner);
-
- /* Note that the superuser_arg call can access the DB */
- CommitTransactionCommand();
-
- LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
- must_use_password,
- MySubscription->name, &err);
- if (LogRepWorkerWalRcvConn == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not connect to the publisher: %s", err)));
-
- /*
- * We don't really use the output identify_system for anything but it
- * does some initializations on the upstream so let's still call it.
- */
- (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
-
- set_apply_error_context_origin(originname);
- }
-
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -4631,91 +4667,21 @@ ApplyWorkerMain(Datum main_arg)
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states,
(Datum) 0);
+}
- /* Build logical replication streaming options. */
- options.logical = true;
- options.startpoint = origin_startpos;
- options.slotname = myslotname;
-
- server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
- options.proto.logical.proto_version =
- server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
- server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
- server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
- LOGICALREP_PROTO_VERSION_NUM;
-
- options.proto.logical.publication_names = MySubscription->publications;
- options.proto.logical.binary = MySubscription->binary;
-
- /*
- * Assign the appropriate option value for streaming option according to
- * the 'streaming' mode and the publisher's ability to support that mode.
- */
- if (server_version >= 160000 &&
- MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
- {
- options.proto.logical.streaming_str = "parallel";
- MyLogicalRepWorker->parallel_apply = true;
- }
- else if (server_version >= 140000 &&
- MySubscription->stream != LOGICALREP_STREAM_OFF)
- {
- options.proto.logical.streaming_str = "on";
- MyLogicalRepWorker->parallel_apply = false;
- }
- else
- {
- options.proto.logical.streaming_str = NULL;
- MyLogicalRepWorker->parallel_apply = false;
- }
-
- options.proto.logical.twophase = false;
- options.proto.logical.origin = pstrdup(MySubscription->origin);
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
- if (!am_tablesync_worker())
- {
- /*
- * Even when the two_phase mode is requested by the user, it remains
- * as the tri-state PENDING until all tablesyncs have reached READY
- * state. Only then, can it become ENABLED.
- *
- * Note: If the subscription has no tables then leave the state as
- * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
- * work.
- */
- if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
- AllTablesyncsReady())
- {
- /* Start streaming with two_phase enabled */
- options.proto.logical.twophase = true;
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ InitializingApplyWorker = true;
- StartTransactionCommand();
- UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
- MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
- CommitTransactionCommand();
- }
- else
- {
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- }
+ SetupApplyOrSyncWorker(worker_slot);
- ereport(DEBUG1,
- (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
- MySubscription->name,
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
- "?")));
- }
- else
- {
- /* Start normal logical streaming replication. */
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- }
+ InitializingApplyWorker = false;
- /* Run the main loop. */
- start_apply(origin_startpos);
+ run_apply_worker();
proc_exit(0);
}
@@ -4724,7 +4690,7 @@ ApplyWorkerMain(Datum main_arg)
* After error recovery, disable the subscription in a new transaction
* and exit cleanly.
*/
-static void
+void
DisableSubscriptionAndExit(void)
{
/*
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 39588da79fd..bbd71d0b420 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
+extern void TablesyncWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e7818965..672a7117c0c 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -19,6 +19,7 @@
#include "datatype/timestamp.h"
#include "miscadmin.h"
#include "replication/logicalrelation.h"
+#include "replication/walreceiver.h"
#include "storage/buffile.h"
#include "storage/fileset.h"
#include "storage/lock.h"
@@ -243,7 +244,6 @@ extern int logicalrep_sync_worker_count(Oid subid);
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
char *originname, Size szoriginname);
-extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
extern bool AllTablesyncsReady(void);
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
@@ -265,7 +265,17 @@ extern void maybe_reread_subscription(void);
extern void stream_cleanup_files(Oid subid, TransactionId xid);
-extern void InitializeApplyWorker(void);
+extern void set_stream_options(WalRcvStreamOptions *options,
+ char *slotname,
+ XLogRecPtr *origin_startpos);
+
+extern void start_apply(XLogRecPtr origin_startpos);
+
+extern void InitializeLogRepWorker(void);
+
+extern void SetupApplyOrSyncWorker(int worker_slot);
+
+extern void DisableSubscriptionAndExit(void);
extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);