diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7f951..7cc0a16d3bc 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running) * Returns true on success, false on failure. */ bool -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, +logicalrep_worker_launch(LogicalRepWorkerType wtype, + Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm) { BackgroundWorker bgw; @@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int nsyncworkers; int nparallelapplyworkers; TimestampTz now; - bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); - - /* Sanity check - tablesync worker cannot be a subworker */ - Assert(!(is_parallel_apply_worker && OidIsValid(relid))); + bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); + + /*---------- + * Sanity checks: + * - must be valid worker type + * - tablesync workers are only ones to have relid + * - parallel apply worker is the only kind of subworker + */ + Assert(wtype != WORKERTYPE_UNKNOWN); + Assert(is_tablesync_worker == OidIsValid(relid)); + Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -393,7 +402,7 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) + if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -427,6 +436,7 @@ retry: } /* Prepare the worker slot. */ + worker->type = wtype; worker->launch_time = now; worker->in_use = true; worker->generation++; @@ -466,7 +476,7 @@ retry: subid); snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); } - else if (OidIsValid(relid)) + else if (is_tablesync_worker) { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && OidIsValid(w->relid)) + if (w->subid == subid && isTablesyncWorker(w)) res++; } @@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg) (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) { ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID); } @@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (OidIsValid(worker.relid)) + if (isTablesyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; |