aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c380
1 files changed, 173 insertions, 207 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf7642..a9f5fa7dfc0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -396,8 +396,6 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
-static void DisableSubscriptionAndExit(void);
-
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -4328,6 +4326,57 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
}
/*
+ * Sets streaming options including replication slot name and origin start
+ * position. Workers need these options for logical replication.
+ */
+void
+set_stream_options(WalRcvStreamOptions *options,
+ char *slotname,
+ XLogRecPtr *origin_startpos)
+{
+ int server_version;
+
+ options->logical = true;
+ options->startpoint = *origin_startpos;
+ options->slotname = slotname;
+
+ server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+ options->proto.logical.proto_version =
+ server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
+ server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+ server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+ LOGICALREP_PROTO_VERSION_NUM;
+
+ options->proto.logical.publication_names = MySubscription->publications;
+ options->proto.logical.binary = MySubscription->binary;
+
+ /*
+ * Assign the appropriate option value for streaming option according to
+ * the 'streaming' mode and the publisher's ability to support that mode.
+ */
+ if (server_version >= 160000 &&
+ MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
+ {
+ options->proto.logical.streaming_str = "parallel";
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != LOGICALREP_STREAM_OFF)
+ {
+ options->proto.logical.streaming_str = "on";
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+ else
+ {
+ options->proto.logical.streaming_str = NULL;
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+
+ options->proto.logical.twophase = false;
+ options->proto.logical.origin = pstrdup(MySubscription->origin);
+}
+
+/*
* Cleanup the memory for subxacts and reset the related variables.
*/
static inline void
@@ -4361,24 +4410,18 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
}
/*
- * Execute the initial sync with error handling. Disable the subscription,
- * if it's required.
+ * Common function to run the apply loop with error handling. Disable the
+ * subscription, if necessary.
*
- * Allocate the slot name in long-lived context on return. Note that we don't
- * handle FATAL errors which are probably because of system resource error and
- * are not repeatable.
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
*/
-static void
-start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+void
+start_apply(XLogRecPtr origin_startpos)
{
- char *syncslotname = NULL;
-
- Assert(am_tablesync_worker());
-
PG_TRY();
{
- /* Call initial sync. */
- syncslotname = LogicalRepSyncTableStart(origin_startpos);
+ LogicalRepApplyLoop(origin_startpos);
}
PG_CATCH();
{
@@ -4387,65 +4430,132 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
else
{
/*
- * Report the worker failed during table synchronization. Abort
- * the current transaction so that the stats message is sent in an
+ * Report the worker failed while applying changes. Abort the
+ * current transaction so that the stats message is sent in an
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, false);
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
PG_RE_THROW();
}
}
PG_END_TRY();
-
- /* allocate slot name in long-lived context */
- *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
- pfree(syncslotname);
}
/*
- * Run the apply loop with error handling. Disable the subscription,
- * if necessary.
+ * Runs the leader apply worker.
*
- * Note that we don't handle FATAL errors which are probably because
- * of system resource error and are not repeatable.
+ * It sets up replication origin, streaming options and then starts streaming.
*/
static void
-start_apply(XLogRecPtr origin_startpos)
+run_apply_worker()
{
- PG_TRY();
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *slotname = NULL;
+ WalRcvStreamOptions options;
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char *err;
+ bool must_use_password;
+
+ slotname = MySubscription->slotname;
+
+ /*
+ * This shouldn't happen if the subscription is enabled, but guard against
+ * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
+ * slot is NULL.)
+ */
+ if (!slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription has no replication slot set")));
+
+ /* Setup replication origin tracking. */
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ originname, sizeof(originname));
+ StartTransactionCommand();
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+ replorigin_session_setup(originid, 0);
+ replorigin_session_origin = originid;
+ origin_startpos = replorigin_session_get_progress(false);
+
+ /* Is the use of a password mandatory? */
+ must_use_password = MySubscription->passwordrequired &&
+ !superuser_arg(MySubscription->owner);
+
+ /* Note that the superuser_arg call can access the DB */
+ CommitTransactionCommand();
+
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ must_use_password,
+ MySubscription->name, &err);
+
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
+
+ /*
+ * We don't really use the output identify_system for anything but it does
+ * some initializations on the upstream so let's still call it.
+ */
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ set_apply_error_context_origin(originname);
+
+ set_stream_options(&options, slotname, &origin_startpos);
+
+ /*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * the tri-state PENDING until all tablesyncs have reached READY state.
+ * Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
{
- LogicalRepApplyLoop(origin_startpos);
+ /* Start streaming with two_phase enabled */
+ options.proto.logical.twophase = true;
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+
+ StartTransactionCommand();
+ UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+ MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ CommitTransactionCommand();
}
- PG_CATCH();
+ else
{
- if (MySubscription->disableonerr)
- DisableSubscriptionAndExit();
- else
- {
- /*
- * Report the worker failed while applying changes. Abort the
- * current transaction so that the stats message is sent in an
- * idle state.
- */
- AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
-
- PG_RE_THROW();
- }
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
}
- PG_END_TRY();
+
+ ereport(DEBUG1,
+ (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
+ MySubscription->name,
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+ "?")));
+
+ /* Run the main loop. */
+ start_apply(origin_startpos);
}
/*
- * Common initialization for leader apply worker and parallel apply worker.
+ * Common initialization for leader apply worker, parallel apply worker and
+ * tablesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
*/
void
-InitializeApplyWorker(void)
+InitializeLogRepWorker(void)
{
MemoryContext oldctx;
@@ -4518,22 +4628,15 @@ InitializeApplyWorker(void)
CommitTransactionCommand();
}
-/* Logical Replication Apply worker entry point */
+/* Common function to setup the leader apply or tablesync worker. */
void
-ApplyWorkerMain(Datum main_arg)
+SetupApplyOrSyncWorker(int worker_slot)
{
- int worker_slot = DatumGetInt32(main_arg);
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos = InvalidXLogRecPtr;
- char *myslotname = NULL;
- WalRcvStreamOptions options;
- int server_version;
-
- InitializingApplyWorker = true;
-
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
+ Assert(am_tablesync_worker() || am_leader_apply_worker());
+
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
@@ -4551,79 +4654,12 @@ ApplyWorkerMain(Datum main_arg)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- InitializeApplyWorker();
-
- InitializingApplyWorker = false;
+ InitializeLogRepWorker();
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
- if (am_tablesync_worker())
- {
- start_table_sync(&origin_startpos, &myslotname);
-
- ReplicationOriginNameForLogicalRep(MySubscription->oid,
- MyLogicalRepWorker->relid,
- originname,
- sizeof(originname));
- set_apply_error_context_origin(originname);
- }
- else
- {
- /* This is the leader apply worker */
- RepOriginId originid;
- TimeLineID startpointTLI;
- char *err;
- bool must_use_password;
-
- myslotname = MySubscription->slotname;
-
- /*
- * This shouldn't happen if the subscription is enabled, but guard
- * against DDL bugs or manual catalog changes. (libpqwalreceiver will
- * crash if slot is NULL.)
- */
- if (!myslotname)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("subscription has no replication slot set")));
-
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
- originname, sizeof(originname));
- originid = replorigin_by_name(originname, true);
- if (!OidIsValid(originid))
- originid = replorigin_create(originname);
- replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
- origin_startpos = replorigin_session_get_progress(false);
-
- /* Is the use of a password mandatory? */
- must_use_password = MySubscription->passwordrequired &&
- !superuser_arg(MySubscription->owner);
-
- /* Note that the superuser_arg call can access the DB */
- CommitTransactionCommand();
-
- LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
- must_use_password,
- MySubscription->name, &err);
- if (LogRepWorkerWalRcvConn == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not connect to the publisher: %s", err)));
-
- /*
- * We don't really use the output identify_system for anything but it
- * does some initializations on the upstream so let's still call it.
- */
- (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
-
- set_apply_error_context_origin(originname);
- }
-
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -4631,91 +4667,21 @@ ApplyWorkerMain(Datum main_arg)
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states,
(Datum) 0);
+}
- /* Build logical replication streaming options. */
- options.logical = true;
- options.startpoint = origin_startpos;
- options.slotname = myslotname;
-
- server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
- options.proto.logical.proto_version =
- server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
- server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
- server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
- LOGICALREP_PROTO_VERSION_NUM;
-
- options.proto.logical.publication_names = MySubscription->publications;
- options.proto.logical.binary = MySubscription->binary;
-
- /*
- * Assign the appropriate option value for streaming option according to
- * the 'streaming' mode and the publisher's ability to support that mode.
- */
- if (server_version >= 160000 &&
- MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
- {
- options.proto.logical.streaming_str = "parallel";
- MyLogicalRepWorker->parallel_apply = true;
- }
- else if (server_version >= 140000 &&
- MySubscription->stream != LOGICALREP_STREAM_OFF)
- {
- options.proto.logical.streaming_str = "on";
- MyLogicalRepWorker->parallel_apply = false;
- }
- else
- {
- options.proto.logical.streaming_str = NULL;
- MyLogicalRepWorker->parallel_apply = false;
- }
-
- options.proto.logical.twophase = false;
- options.proto.logical.origin = pstrdup(MySubscription->origin);
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
- if (!am_tablesync_worker())
- {
- /*
- * Even when the two_phase mode is requested by the user, it remains
- * as the tri-state PENDING until all tablesyncs have reached READY
- * state. Only then, can it become ENABLED.
- *
- * Note: If the subscription has no tables then leave the state as
- * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
- * work.
- */
- if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
- AllTablesyncsReady())
- {
- /* Start streaming with two_phase enabled */
- options.proto.logical.twophase = true;
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ InitializingApplyWorker = true;
- StartTransactionCommand();
- UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
- MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
- CommitTransactionCommand();
- }
- else
- {
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- }
+ SetupApplyOrSyncWorker(worker_slot);
- ereport(DEBUG1,
- (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
- MySubscription->name,
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
- MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
- "?")));
- }
- else
- {
- /* Start normal logical streaming replication. */
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- }
+ InitializingApplyWorker = false;
- /* Run the main loop. */
- start_apply(origin_startpos);
+ run_apply_worker();
proc_exit(0);
}
@@ -4724,7 +4690,7 @@ ApplyWorkerMain(Datum main_arg)
* After error recovery, disable the subscription in a new transaction
* and exit cleanly.
*/
-static void
+void
DisableSubscriptionAndExit(void)
{
/*