aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c107
1 files changed, 56 insertions, 51 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 09c87d7c53a..4e2c350dc7e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -57,8 +57,8 @@
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
-int max_logical_replication_workers = 4;
-int max_sync_workers_per_subscription = 2;
+int max_logical_replication_workers = 4;
+int max_sync_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -68,7 +68,7 @@ typedef struct LogicalRepCtxStruct
pid_t launcher_pid;
/* Background workers. */
- LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
+ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
} LogicalRepCtxStruct;
LogicalRepCtxStruct *LogicalRepCtx;
@@ -83,9 +83,9 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t got_SIGTERM = false;
-static bool on_commit_launcher_wakeup = false;
+static bool on_commit_launcher_wakeup = false;
-Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
+Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
/*
@@ -122,8 +122,8 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
- Subscription *sub;
- MemoryContext oldcxt;
+ Subscription *sub;
+ MemoryContext oldcxt;
/*
* Allocate our results in the caller's context, not the
@@ -224,15 +224,16 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
- int i;
- LogicalRepWorker *res = NULL;
+ int i;
+ LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@@ -251,17 +252,17 @@ void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid)
{
- BackgroundWorker bgw;
+ BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
- int i;
- int slot = 0;
- LogicalRepWorker *worker = NULL;
- int nsyncworkers;
- TimestampTz now;
+ int i;
+ int slot = 0;
+ LogicalRepWorker *worker = NULL;
+ int nsyncworkers;
+ TimestampTz now;
ereport(LOG,
- (errmsg("starting logical replication worker for subscription \"%s\"",
- subname)));
+ (errmsg("starting logical replication worker for subscription \"%s\"",
+ subname)));
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -300,7 +301,7 @@ retry:
*/
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
{
- bool did_cleanup = false;
+ bool did_cleanup = false;
for (i = 0; i < max_logical_replication_workers; i++)
{
@@ -373,7 +374,7 @@ retry:
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
- bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
@@ -394,7 +395,7 @@ retry:
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
- errhint("You might need to increase max_worker_processes.")));
+ errhint("You might need to increase max_worker_processes.")));
return;
}
@@ -410,7 +411,7 @@ void
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
- uint16 generation;
+ uint16 generation;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -435,7 +436,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
*/
while (worker->in_use && !worker->proc)
{
- int rc;
+ int rc;
LWLockRelease(LogicalRepWorkerLock);
@@ -478,7 +479,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
/* ... and wait for it to die. */
for (;;)
{
- int rc;
+ int rc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
if (!worker->proc || worker->generation != generation)
@@ -509,7 +510,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
void
logicalrep_worker_wakeup(Oid subid, Oid relid)
{
- LogicalRepWorker *worker;
+ LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, true);
@@ -544,18 +545,18 @@ logicalrep_worker_attach(int slot)
{
LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication worker slot %d is empty, cannot attach",
- slot)));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication worker slot %d is empty, cannot attach",
+ slot)));
}
if (MyLogicalRepWorker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication worker slot %d is already used by "
- "another worker, cannot attach", slot)));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication worker slot %d is already used by "
+ "another worker, cannot attach", slot)));
}
MyLogicalRepWorker->proc = MyProc;
@@ -620,7 +621,7 @@ logicalrep_worker_onexit(int code, Datum arg)
void
logicalrep_worker_sigterm(SIGNAL_ARGS)
{
- int save_errno = errno;
+ int save_errno = errno;
got_SIGTERM = true;
@@ -634,7 +635,7 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
void
logicalrep_worker_sighup(SIGNAL_ARGS)
{
- int save_errno = errno;
+ int save_errno = errno;
got_SIGHUP = true;
@@ -651,15 +652,16 @@ logicalrep_worker_sighup(SIGNAL_ARGS)
int
logicalrep_sync_worker_count(Oid subid)
{
- int i;
- int res = 0;
+ int i;
+ int res = 0;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
if (w->subid == subid && OidIsValid(w->relid))
res++;
}
@@ -699,7 +701,7 @@ ApplyLauncherRegister(void)
return;
memset(&bgw, 0, sizeof(bgw));
- bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
@@ -729,7 +731,7 @@ ApplyLauncherShmemInit(void)
if (!found)
{
- int slot;
+ int slot;
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
@@ -783,7 +785,7 @@ ApplyLauncherWakeup(void)
void
ApplyLauncherMain(Datum main_arg)
{
- TimestampTz last_start_time = 0;
+ TimestampTz last_start_time = 0;
ereport(DEBUG1,
(errmsg("logical replication launcher started")));
@@ -813,10 +815,10 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
- TimestampTz now;
- long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ MemoryContext subctx;
+ MemoryContext oldctx;
+ TimestampTz now;
+ long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
now = GetCurrentTimestamp();
@@ -826,7 +828,7 @@ ApplyLauncherMain(Datum main_arg)
{
/* Use temporary context for the database list and worker info. */
subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
+ "Logical Replication Launcher sublist",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -838,8 +840,8 @@ ApplyLauncherMain(Datum main_arg)
/* Start the missing workers for enabled subscriptions. */
foreach(lc, sublist)
{
- Subscription *sub = (Subscription *) lfirst(lc);
- LogicalRepWorker *w;
+ Subscription *sub = (Subscription *) lfirst(lc);
+ LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
@@ -864,9 +866,9 @@ ApplyLauncherMain(Datum main_arg)
{
/*
* The wait in previous cycle was interrupted in less than
- * wal_retrieve_retry_interval since last worker was started,
- * this usually means crash of the worker, so we should retry
- * in wal_retrieve_retry_interval again.
+ * wal_retrieve_retry_interval since last worker was started, this
+ * usually means crash of the worker, so we should retry in
+ * wal_retrieve_retry_interval again.
*/
wait_time = wal_retrieve_retry_interval;
}
@@ -948,7 +950,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
int worker_pid;
- LogicalRepWorker worker;
+ LogicalRepWorker worker;
memcpy(&worker, &LogicalRepCtx->workers[i],
sizeof(LogicalRepWorker));
@@ -992,7 +994,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
- /* If only a single subscription was requested, and we found it, break. */
+ /*
+ * If only a single subscription was requested, and we found it,
+ * break.
+ */
if (OidIsValid(subid))
break;
}