diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 181 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 9 |
2 files changed, 155 insertions, 35 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index f6ae61060ec..965b898ee50 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -38,6 +38,7 @@ #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/slot.h" +#include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -76,6 +77,7 @@ static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); +static void logicalrep_worker_cleanup(LogicalRepWorker *worker); /* Flags set by signal handlers */ volatile sig_atomic_t got_SIGHUP = false; @@ -154,15 +156,19 @@ get_subscription_list(void) /* * Wait for a background worker to start up and attach to the shmem context. * - * This is like WaitForBackgroundWorkerStartup(), except that we wait for - * attaching, not just start and we also just exit if postmaster died. + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. */ -static bool +static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle) { BgwHandleStatus status; int rc; + uint16 generation; + + /* Remember generation for future identification. */ + generation = worker->generation; for (;;) { @@ -170,18 +176,29 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, CHECK_FOR_INTERRUPTS(); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Worker either died or has started; no need to do anything. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + LWLockRelease(LogicalRepWorkerLock); + + /* Check if worker has died before attaching, and clean up after it. */ status = GetBackgroundWorkerPid(handle, &pid); - /* - * Worker started and attached to our shmem. This check is safe - * because only launcher ever starts the workers, so nobody can steal - * the worker slot. - */ - if (status == BGWH_STARTED && worker->proc) - return true; - /* Worker didn't start or died before attaching to our shmem. */ if (status == BGWH_STOPPED) - return false; + { + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + return; + } /* * We need timeout because we generally don't get notified via latch @@ -197,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, ResetLatch(MyLatch); } - return false; + return; } /* @@ -216,8 +233,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && w->relid == relid && - (!only_running || (w->proc && IsBackendPid(w->proc->pid)))) + if (w->in_use && w->subid == subid && w->relid == relid && + (!only_running || w->proc)) { res = w; break; @@ -236,8 +253,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; + int i; int slot; LogicalRepWorker *worker = NULL; + int nsyncworkers; + TimestampTz now; ereport(LOG, (errmsg("starting logical replication worker for subscription \"%s\"", @@ -255,17 +275,73 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); +retry: /* Find unused worker slot. */ - for (slot = 0; slot < max_logical_replication_workers; slot++) + for (i = 0; i < max_logical_replication_workers; i++) { - if (!LogicalRepCtx->workers[slot].proc) + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (!w->in_use) { - worker = &LogicalRepCtx->workers[slot]; + worker = w; + slot = i; break; } } - /* Bail if not found */ + nsyncworkers = logicalrep_sync_worker_count(subid); + + now = GetCurrentTimestamp(); + + /* + * If we didn't find a free slot, try to do garbage collection. The + * reason we do this is because if some worker failed to start up and its + * parent has crashed while waiting, the in_use state was never cleared. + */ + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + { + bool did_cleanup = false; + + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* + * If the worker was marked in use but didn't manage to attach in + * time, clean it up. + */ + if (w->in_use && !w->proc && + TimestampDifferenceExceeds(w->launch_time, now, + wal_receiver_timeout)) + { + elog(WARNING, + "logical replication worker for subscription \"%d\" took too long to start; canceled", + worker->subid); + + logicalrep_worker_cleanup(w); + did_cleanup = true; + } + } + + if (did_cleanup) + goto retry; + } + + /* + * If we reached the sync worker limit per subscription, just exit + * silently as we might get here because of an otherwise harmless race + * condition. + */ + if (nsyncworkers >= max_sync_workers_per_subscription) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * However if there are no more free worker slots, inform user about it + * before exiting. + */ if (worker == NULL) { LWLockRelease(LogicalRepWorkerLock); @@ -276,7 +352,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, return; } - /* Prepare the worker info. */ + /* Prepare the worker slot. */ + worker->launch_time = now; + worker->in_use = true; + worker->generation++; worker->proc = NULL; worker->dbid = dbid; worker->userid = userid; @@ -331,6 +410,7 @@ void logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; + uint16 generation; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -344,10 +424,16 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* + * Remember which generation was our worker so we can check if what we see + * is still the same one. + */ + generation = worker->generation; + + /* * If we found worker but it does not have proc set it is starting up, * wait for it to finish and then kill it. */ - while (worker && !worker->proc) + while (worker->in_use && !worker->proc) { int rc; @@ -370,10 +456,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); /* - * Worker is no longer associated with subscription. It must have - * exited, nothing more for us to do. + * Check whether the worker slot is no longer used, which would mean + * that the worker has exited, or whether the worker generation is + * different, meaning that a different worker has taken the slot. */ - if (worker->subid == InvalidOid) + if (!worker->in_use || worker->generation != generation) { LWLockRelease(LogicalRepWorkerLock); return; @@ -394,7 +481,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) int rc; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - if (!worker->proc) + if (!worker->proc || worker->generation != generation) { LWLockRelease(LogicalRepWorkerLock); break; @@ -453,11 +540,23 @@ logicalrep_worker_attach(int slot) Assert(slot >= 0 && slot < max_logical_replication_workers); MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; + if (!MyLogicalRepWorker->in_use) + { + LWLockRelease(LogicalRepWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is empty, cannot attach", + slot))); + } + if (MyLogicalRepWorker->proc) + { + LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication worker slot %d already used by " - "another worker", slot))); + errmsg("logical replication worker slot %d is already used by " + "another worker, cannot attach", slot))); + } MyLogicalRepWorker->proc = MyProc; before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); @@ -474,15 +573,28 @@ logicalrep_worker_detach(void) /* Block concurrent access. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - MyLogicalRepWorker->dbid = InvalidOid; - MyLogicalRepWorker->userid = InvalidOid; - MyLogicalRepWorker->subid = InvalidOid; - MyLogicalRepWorker->proc = NULL; + logicalrep_worker_cleanup(MyLogicalRepWorker); LWLockRelease(LogicalRepWorkerLock); } /* + * Clean up worker info. + */ +static void +logicalrep_worker_cleanup(LogicalRepWorker *worker) +{ + Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); + + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; + worker->userid = InvalidOid; + worker->subid = InvalidOid; + worker->relid = InvalidOid; +} + +/* * Cleanup function for logical replication launcher. * * Called on logical replication launcher exit. @@ -732,12 +844,11 @@ ApplyLauncherMain(Datum main_arg) if (sub->enabled && w == NULL) { - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); last_start_time = now; wait_time = wal_retrieve_retry_interval; - /* Limit to one worker per mainloop cycle. */ - break; + + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b8e35d4b4dd..f6fee102b2a 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -21,6 +21,15 @@ typedef struct LogicalRepWorker { + /* Time at which this worker was launched. */ + TimestampTz launch_time; + + /* Indicates if this slot is used or free. */ + bool in_use; + + /* Increased everytime the slot is taken by new worker. */ + uint16 generation; + /* Pointer to proc array. NULL if not running. */ PGPROC *proc; |