diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 380 |
1 files changed, 173 insertions, 207 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 832b1cf7642..a9f5fa7dfc0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -396,8 +396,6 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); -static void DisableSubscriptionAndExit(void); - static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -4328,6 +4326,57 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s) } /* + * Sets streaming options including replication slot name and origin start + * position. Workers need these options for logical replication. + */ +void +set_stream_options(WalRcvStreamOptions *options, + char *slotname, + XLogRecPtr *origin_startpos) +{ + int server_version; + + options->logical = true; + options->startpoint = *origin_startpos; + options->slotname = slotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options->proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + + options->proto.logical.publication_names = MySubscription->publications; + options->proto.logical.binary = MySubscription->binary; + + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == LOGICALREP_STREAM_PARALLEL) + { + options->proto.logical.streaming_str = "parallel"; + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != LOGICALREP_STREAM_OFF) + { + options->proto.logical.streaming_str = "on"; + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options->proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } + + options->proto.logical.twophase = false; + options->proto.logical.origin = pstrdup(MySubscription->origin); +} + +/* * Cleanup the memory for subxacts and reset the related variables. */ static inline void @@ -4361,24 +4410,18 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) } /* - * Execute the initial sync with error handling. Disable the subscription, - * if it's required. + * Common function to run the apply loop with error handling. Disable the + * subscription, if necessary. * - * Allocate the slot name in long-lived context on return. Note that we don't - * handle FATAL errors which are probably because of system resource error and - * are not repeatable. + * Note that we don't handle FATAL errors which are probably because + * of system resource error and are not repeatable. */ -static void -start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +void +start_apply(XLogRecPtr origin_startpos) { - char *syncslotname = NULL; - - Assert(am_tablesync_worker()); - PG_TRY(); { - /* Call initial sync. */ - syncslotname = LogicalRepSyncTableStart(origin_startpos); + LogicalRepApplyLoop(origin_startpos); } PG_CATCH(); { @@ -4387,65 +4430,132 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) else { /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an + * Report the worker failed while applying changes. Abort the + * current transaction so that the stats message is sent in an * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); PG_RE_THROW(); } } PG_END_TRY(); - - /* allocate slot name in long-lived context */ - *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); - pfree(syncslotname); } /* - * Run the apply loop with error handling. Disable the subscription, - * if necessary. + * Runs the leader apply worker. * - * Note that we don't handle FATAL errors which are probably because - * of system resource error and are not repeatable. + * It sets up replication origin, streaming options and then starts streaming. */ static void -start_apply(XLogRecPtr origin_startpos) +run_apply_worker() { - PG_TRY(); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *slotname = NULL; + WalRcvStreamOptions options; + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + bool must_use_password; + + slotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard against + * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if + * slot is NULL.) + */ + if (!slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + /* Setup replication origin tracking. */ + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); + StartTransactionCommand(); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !superuser_arg(MySubscription->owner); + + /* Note that the superuser_arg call can access the DB */ + CommitTransactionCommand(); + + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + must_use_password, + MySubscription->name, &err); + + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it does + * some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + set_apply_error_context_origin(originname); + + set_stream_options(&options, slotname, &origin_startpos); + + /* + * Even when the two_phase mode is requested by the user, it remains as + * the tri-state PENDING until all tablesyncs have reached READY state. + * Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) { - LogicalRepApplyLoop(origin_startpos); + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); } - PG_CATCH(); + else { - if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); - - PG_RE_THROW(); - } + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); } - PG_END_TRY(); + + ereport(DEBUG1, + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + + /* Run the main loop. */ + start_apply(origin_startpos); } /* - * Common initialization for leader apply worker and parallel apply worker. + * Common initialization for leader apply worker, parallel apply worker and + * tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ void -InitializeApplyWorker(void) +InitializeLogRepWorker(void) { MemoryContext oldctx; @@ -4518,22 +4628,15 @@ InitializeApplyWorker(void) CommitTransactionCommand(); } -/* Logical Replication Apply worker entry point */ +/* Common function to setup the leader apply or tablesync worker. */ void -ApplyWorkerMain(Datum main_arg) +SetupApplyOrSyncWorker(int worker_slot) { - int worker_slot = DatumGetInt32(main_arg); - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos = InvalidXLogRecPtr; - char *myslotname = NULL; - WalRcvStreamOptions options; - int server_version; - - InitializingApplyWorker = true; - /* Attach to slot */ logicalrep_worker_attach(worker_slot); + Assert(am_tablesync_worker() || am_leader_apply_worker()); + /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); @@ -4551,79 +4654,12 @@ ApplyWorkerMain(Datum main_arg) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - InitializeApplyWorker(); - - InitializingApplyWorker = false; + InitializeLogRepWorker(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) - { - start_table_sync(&origin_startpos, &myslotname); - - ReplicationOriginNameForLogicalRep(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - set_apply_error_context_origin(originname); - } - else - { - /* This is the leader apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - bool must_use_password; - - myslotname = MySubscription->slotname; - - /* - * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver will - * crash if slot is NULL.) - */ - if (!myslotname) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription has no replication slot set"))); - - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, sizeof(originname)); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - - /* Note that the superuser_arg call can access the DB */ - CommitTransactionCommand(); - - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, - MySubscription->name, &err); - if (LogRepWorkerWalRcvConn == NULL) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); - - /* - * We don't really use the output identify_system for anything but it - * does some initializations on the upstream so let's still call it. - */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); - - set_apply_error_context_origin(originname); - } - /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. @@ -4631,91 +4667,21 @@ ApplyWorkerMain(Datum main_arg) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); +} - /* Build logical replication streaming options. */ - options.logical = true; - options.startpoint = origin_startpos; - options.slotname = myslotname; - - server_version = walrcv_server_version(LogRepWorkerWalRcvConn); - options.proto.logical.proto_version = - server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : - server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : - server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : - LOGICALREP_PROTO_VERSION_NUM; - - options.proto.logical.publication_names = MySubscription->publications; - options.proto.logical.binary = MySubscription->binary; - - /* - * Assign the appropriate option value for streaming option according to - * the 'streaming' mode and the publisher's ability to support that mode. - */ - if (server_version >= 160000 && - MySubscription->stream == LOGICALREP_STREAM_PARALLEL) - { - options.proto.logical.streaming_str = "parallel"; - MyLogicalRepWorker->parallel_apply = true; - } - else if (server_version >= 140000 && - MySubscription->stream != LOGICALREP_STREAM_OFF) - { - options.proto.logical.streaming_str = "on"; - MyLogicalRepWorker->parallel_apply = false; - } - else - { - options.proto.logical.streaming_str = NULL; - MyLogicalRepWorker->parallel_apply = false; - } - - options.proto.logical.twophase = false; - options.proto.logical.origin = pstrdup(MySubscription->origin); +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); - if (!am_tablesync_worker()) - { - /* - * Even when the two_phase mode is requested by the user, it remains - * as the tri-state PENDING until all tablesyncs have reached READY - * state. Only then, can it become ENABLED. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + InitializingApplyWorker = true; - StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; - CommitTransactionCommand(); - } - else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + SetupApplyOrSyncWorker(worker_slot); - ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", - MySubscription->name, - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : - "?"))); - } - else - { - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + InitializingApplyWorker = false; - /* Run the main loop. */ - start_apply(origin_startpos); + run_apply_worker(); proc_exit(0); } @@ -4724,7 +4690,7 @@ ApplyWorkerMain(Datum main_arg) * After error recovery, disable the subscription in a new transaction * and exit cleanly. */ -static void +void DisableSubscriptionAndExit(void) { /* |