aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/launcher.c57
-rw-r--r--src/backend/replication/logical/tablesync.c33
-rw-r--r--src/backend/replication/logical/worker.c43
3 files changed, 79 insertions, 54 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16d3bc..72e44d5a02d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,39 +468,44 @@ retry:
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
- if (is_parallel_apply_worker)
+ switch (worker->type)
{
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
- snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication parallel apply worker for subscription %u",
- subid);
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
- }
- else if (is_tablesync_worker)
- {
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
- snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication tablesync worker for subscription %u sync %u",
- subid,
- relid);
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
- }
- else
- {
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
- snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication apply worker for subscription %u",
- subid);
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+ case WORKERTYPE_APPLY:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication apply worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+ break;
+
+ case WORKERTYPE_PARALLEL_APPLY:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication parallel apply worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+ memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+ break;
+
+ case WORKERTYPE_TABLESYNC:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication tablesync worker for subscription %u sync %u",
+ subid,
+ relid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+ break;
+
+ case WORKERTYPE_UNKNOWN:
+ /* Should never happen. */
+ elog(ERROR, "unknown worker type");
}
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
- if (is_parallel_apply_worker)
- memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
-
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
/* Failed to start worker, so clean up the worker slot. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67bdd14095e..e2cee92cf26 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
- /*
- * Skip for parallel apply workers because they only operate on tables
- * that are in a READY state. See pa_can_start() and
- * should_apply_changes_for_rel().
- */
- if (am_parallel_apply_worker())
- return;
+ switch (MyLogicalRepWorker->type)
+ {
+ case WORKERTYPE_PARALLEL_APPLY:
- if (am_tablesync_worker())
- process_syncing_tables_for_sync(current_lsn);
- else
- process_syncing_tables_for_apply(current_lsn);
+ /*
+ * Skip for parallel apply workers because they only operate on
+ * tables that are in a READY state. See pa_can_start() and
+ * should_apply_changes_for_rel().
+ */
+ break;
+
+ case WORKERTYPE_TABLESYNC:
+ process_syncing_tables_for_sync(current_lsn);
+ break;
+
+ case WORKERTYPE_APPLY:
+ process_syncing_tables_for_apply(current_lsn);
+ break;
+
+ case WORKERTYPE_UNKNOWN:
+ /* Should never happen. */
+ elog(ERROR, "Unknown worker type");
+ }
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c11716..597947410f8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,34 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
- if (am_tablesync_worker())
- return MyLogicalRepWorker->relid == rel->localreloid;
- else if (am_parallel_apply_worker())
+ switch (MyLogicalRepWorker->type)
{
- /* We don't synchronize rel's that are in unknown state. */
- if (rel->state != SUBREL_STATE_READY &&
- rel->state != SUBREL_STATE_UNKNOWN)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
- MySubscription->name),
- errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
- return rel->state == SUBREL_STATE_READY;
+ case WORKERTYPE_TABLESYNC:
+ return MyLogicalRepWorker->relid == rel->localreloid;
+
+ case WORKERTYPE_PARALLEL_APPLY:
+ /* We don't synchronize rel's that are in unknown state. */
+ if (rel->state != SUBREL_STATE_READY &&
+ rel->state != SUBREL_STATE_UNKNOWN)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+ return rel->state == SUBREL_STATE_READY;
+
+ case WORKERTYPE_APPLY:
+ return (rel->state == SUBREL_STATE_READY ||
+ (rel->state == SUBREL_STATE_SYNCDONE &&
+ rel->statelsn <= remote_final_lsn));
+
+ case WORKERTYPE_UNKNOWN:
+ /* Should never happen. */
+ elog(ERROR, "Unknown worker type");
}
- else
- return (rel->state == SUBREL_STATE_READY ||
- (rel->state == SUBREL_STATE_SYNCDONE &&
- rel->statelsn <= remote_final_lsn));
+
+ return false; /* dummy for compiler */
}
/*