aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/launcher.c19
1 files changed, 13 insertions, 6 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6c894421a39..44bdcab3b97 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -168,14 +168,11 @@ get_subscription_list(void)
*/
static void
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
+ uint16 generation,
BackgroundWorkerHandle *handle)
{
BgwHandleStatus status;
int rc;
- uint16 generation;
-
- /* Remember generation for future identification. */
- generation = worker->generation;
for (;;)
{
@@ -282,7 +279,7 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
/*
- * Start new apply background worker.
+ * Start new apply background worker, if possible.
*/
void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
@@ -290,6 +287,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
+ uint16 generation;
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
@@ -406,6 +404,9 @@ retry:
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
+ /* Before releasing lock, remember generation for future identification. */
+ generation = worker->generation;
+
LWLockRelease(LogicalRepWorkerLock);
/* Register the new dynamic worker. */
@@ -428,6 +429,12 @@ retry:
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
+ /* Failed to start worker, so clean up the worker slot. */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+ Assert(generation == worker->generation);
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
@@ -436,7 +443,7 @@ retry:
}
/* Now wait until it attaches. */
- WaitForReplicationWorkerAttach(worker, bgw_handle);
+ WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
}
/*