diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 107 |
1 files changed, 56 insertions, 51 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 09c87d7c53a..4e2c350dc7e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -57,8 +57,8 @@ /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L -int max_logical_replication_workers = 4; -int max_sync_workers_per_subscription = 2; +int max_logical_replication_workers = 4; +int max_sync_workers_per_subscription = 2; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -68,7 +68,7 @@ typedef struct LogicalRepCtxStruct pid_t launcher_pid; /* Background workers. */ - LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; + LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; LogicalRepCtxStruct *LogicalRepCtx; @@ -83,9 +83,9 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker); volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t got_SIGTERM = false; -static bool on_commit_launcher_wakeup = false; +static bool on_commit_launcher_wakeup = false; -Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); +Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); /* @@ -122,8 +122,8 @@ get_subscription_list(void) while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) { Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); - Subscription *sub; - MemoryContext oldcxt; + Subscription *sub; + MemoryContext oldcxt; /* * Allocate our results in the caller's context, not the @@ -224,15 +224,16 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running) { - int i; - LogicalRepWorker *res = NULL; + int i; + LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + if (w->in_use && w->subid == subid && w->relid == relid && (!only_running || w->proc)) { @@ -251,17 +252,17 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid) { - BackgroundWorker bgw; + BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; - int i; - int slot = 0; - LogicalRepWorker *worker = NULL; - int nsyncworkers; - TimestampTz now; + int i; + int slot = 0; + LogicalRepWorker *worker = NULL; + int nsyncworkers; + TimestampTz now; ereport(LOG, - (errmsg("starting logical replication worker for subscription \"%s\"", - subname))); + (errmsg("starting logical replication worker for subscription \"%s\"", + subname))); /* Report this after the initial starting message for consistency. */ if (max_replication_slots == 0) @@ -300,7 +301,7 @@ retry: */ if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) { - bool did_cleanup = false; + bool did_cleanup = false; for (i = 0; i < max_logical_replication_workers; i++) { @@ -373,7 +374,7 @@ retry: /* Register the new dynamic worker. */ memset(&bgw, 0, sizeof(bgw)); - bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); @@ -394,7 +395,7 @@ retry: ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), - errhint("You might need to increase max_worker_processes."))); + errhint("You might need to increase max_worker_processes."))); return; } @@ -410,7 +411,7 @@ void logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; - uint16 generation; + uint16 generation; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -435,7 +436,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) */ while (worker->in_use && !worker->proc) { - int rc; + int rc; LWLockRelease(LogicalRepWorkerLock); @@ -478,7 +479,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) /* ... and wait for it to die. */ for (;;) { - int rc; + int rc; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); if (!worker->proc || worker->generation != generation) @@ -509,7 +510,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) void logicalrep_worker_wakeup(Oid subid, Oid relid) { - LogicalRepWorker *worker; + LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(subid, relid, true); @@ -544,18 +545,18 @@ logicalrep_worker_attach(int slot) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication worker slot %d is empty, cannot attach", - slot))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is empty, cannot attach", + slot))); } if (MyLogicalRepWorker->proc) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication worker slot %d is already used by " - "another worker, cannot attach", slot))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is already used by " + "another worker, cannot attach", slot))); } MyLogicalRepWorker->proc = MyProc; @@ -620,7 +621,7 @@ logicalrep_worker_onexit(int code, Datum arg) void logicalrep_worker_sigterm(SIGNAL_ARGS) { - int save_errno = errno; + int save_errno = errno; got_SIGTERM = true; @@ -634,7 +635,7 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) void logicalrep_worker_sighup(SIGNAL_ARGS) { - int save_errno = errno; + int save_errno = errno; got_SIGHUP = true; @@ -651,15 +652,16 @@ logicalrep_worker_sighup(SIGNAL_ARGS) int logicalrep_sync_worker_count(Oid subid) { - int i; - int res = 0; + int i; + int res = 0; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + if (w->subid == subid && OidIsValid(w->relid)) res++; } @@ -699,7 +701,7 @@ ApplyLauncherRegister(void) return; memset(&bgw, 0, sizeof(bgw)); - bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); @@ -729,7 +731,7 @@ ApplyLauncherShmemInit(void) if (!found) { - int slot; + int slot; memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); @@ -783,7 +785,7 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; + TimestampTz last_start_time = 0; ereport(DEBUG1, (errmsg("logical replication launcher started"))); @@ -813,10 +815,10 @@ ApplyLauncherMain(Datum main_arg) int rc; List *sublist; ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; - TimestampTz now; - long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + MemoryContext subctx; + MemoryContext oldctx; + TimestampTz now; + long wait_time = DEFAULT_NAPTIME_PER_CYCLE; now = GetCurrentTimestamp(); @@ -826,7 +828,7 @@ ApplyLauncherMain(Datum main_arg) { /* Use temporary context for the database list and worker info. */ subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", + "Logical Replication Launcher sublist", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -838,8 +840,8 @@ ApplyLauncherMain(Datum main_arg) /* Start the missing workers for enabled subscriptions. */ foreach(lc, sublist) { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); @@ -864,9 +866,9 @@ ApplyLauncherMain(Datum main_arg) { /* * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, - * this usually means crash of the worker, so we should retry - * in wal_retrieve_retry_interval again. + * wal_retrieve_retry_interval since last worker was started, this + * usually means crash of the worker, so we should retry in + * wal_retrieve_retry_interval again. */ wait_time = wal_retrieve_retry_interval; } @@ -948,7 +950,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) Datum values[PG_STAT_GET_SUBSCRIPTION_COLS]; bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS]; int worker_pid; - LogicalRepWorker worker; + LogicalRepWorker worker; memcpy(&worker, &LogicalRepCtx->workers[i], sizeof(LogicalRepWorker)); @@ -992,7 +994,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); - /* If only a single subscription was requested, and we found it, break. */ + /* + * If only a single subscription was requested, and we found it, + * break. + */ if (OidIsValid(subid)) break; } |