diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 64 |
1 files changed, 47 insertions, 17 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index afb7acddaa6..27e58566cec 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -410,7 +410,7 @@ retry: worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; worker->stream_fileset = NULL; - worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; + worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); @@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) worker->userid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; - worker->apply_leader_pid = InvalidPid; + worker->leader_pid = InvalidPid; worker->parallel_apply = false; } @@ -1067,12 +1067,40 @@ IsLogicalLauncher(void) } /* + * Return the pid of the leader apply worker if the given pid is the pid of a + * parallel apply worker, otherwise, return InvalidPid. + */ +pid_t +GetLeaderApplyWorkerPid(pid_t pid) +{ + int leader_pid = InvalidPid; + int i; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) + { + leader_pid = w->leader_pid; + break; + } + } + + LWLockRelease(LogicalRepWorkerLock); + + return leader_pid; +} + +/* * Returns state of the subscriptions. */ Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 8 +#define PG_STAT_GET_SUBSCRIPTION_COLS 9 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) if (OidIsValid(subid) && worker.subid != subid) continue; - /* Skip if this is a parallel apply worker */ - if (isParallelApplyWorker(&worker)) - continue; - worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); @@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) else nulls[1] = true; values[2] = Int32GetDatum(worker_pid); - if (XLogRecPtrIsInvalid(worker.last_lsn)) + + if (isParallelApplyWorker(&worker)) + values[3] = Int32GetDatum(worker.leader_pid); + else nulls[3] = true; + + if (XLogRecPtrIsInvalid(worker.last_lsn)) + nulls[4] = true; else - values[3] = LSNGetDatum(worker.last_lsn); + values[4] = LSNGetDatum(worker.last_lsn); if (worker.last_send_time == 0) - nulls[4] = true; + nulls[5] = true; else - values[4] = TimestampTzGetDatum(worker.last_send_time); + values[5] = TimestampTzGetDatum(worker.last_send_time); if (worker.last_recv_time == 0) - nulls[5] = true; + nulls[6] = true; else - values[5] = TimestampTzGetDatum(worker.last_recv_time); + values[6] = TimestampTzGetDatum(worker.last_recv_time); if (XLogRecPtrIsInvalid(worker.reply_lsn)) - nulls[6] = true; + nulls[7] = true; else - values[6] = LSNGetDatum(worker.reply_lsn); + values[7] = LSNGetDatum(worker.reply_lsn); if (worker.reply_time == 0) - nulls[7] = true; + nulls[8] = true; else - values[7] = TimestampTzGetDatum(worker.reply_time); + values[8] = TimestampTzGetDatum(worker.reply_time); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); |