aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c64
1 files changed, 47 insertions, 17 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index afb7acddaa6..27e58566cec 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -410,7 +410,7 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
- worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
+ worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
@@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->relid = InvalidOid;
- worker->apply_leader_pid = InvalidPid;
+ worker->leader_pid = InvalidPid;
worker->parallel_apply = false;
}
@@ -1067,12 +1067,40 @@ IsLogicalLauncher(void)
}
/*
+ * Return the pid of the leader apply worker if the given pid is the pid of a
+ * parallel apply worker, otherwise, return InvalidPid.
+ */
+pid_t
+GetLeaderApplyWorkerPid(pid_t pid)
+{
+ int leader_pid = InvalidPid;
+ int i;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+ {
+ leader_pid = w->leader_pid;
+ break;
+ }
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+
+ return leader_pid;
+}
+
+/*
* Returns state of the subscriptions.
*/
Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_COLS 8
+#define PG_STAT_GET_SUBSCRIPTION_COLS 9
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
int i;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if (OidIsValid(subid) && worker.subid != subid)
continue;
- /* Skip if this is a parallel apply worker */
- if (isParallelApplyWorker(&worker))
- continue;
-
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
@@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
else
nulls[1] = true;
values[2] = Int32GetDatum(worker_pid);
- if (XLogRecPtrIsInvalid(worker.last_lsn))
+
+ if (isParallelApplyWorker(&worker))
+ values[3] = Int32GetDatum(worker.leader_pid);
+ else
nulls[3] = true;
+
+ if (XLogRecPtrIsInvalid(worker.last_lsn))
+ nulls[4] = true;
else
- values[3] = LSNGetDatum(worker.last_lsn);
+ values[4] = LSNGetDatum(worker.last_lsn);
if (worker.last_send_time == 0)
- nulls[4] = true;
+ nulls[5] = true;
else
- values[4] = TimestampTzGetDatum(worker.last_send_time);
+ values[5] = TimestampTzGetDatum(worker.last_send_time);
if (worker.last_recv_time == 0)
- nulls[5] = true;
+ nulls[6] = true;
else
- values[5] = TimestampTzGetDatum(worker.last_recv_time);
+ values[6] = TimestampTzGetDatum(worker.last_recv_time);
if (XLogRecPtrIsInvalid(worker.reply_lsn))
- nulls[6] = true;
+ nulls[7] = true;
else
- values[6] = LSNGetDatum(worker.reply_lsn);
+ values[7] = LSNGetDatum(worker.reply_lsn);
if (worker.reply_time == 0)
- nulls[7] = true;
+ nulls[8] = true;
else
- values[7] = TimestampTzGetDatum(worker.reply_time);
+ values[8] = TimestampTzGetDatum(worker.reply_time);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
values, nulls);