aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/slotsync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/slotsync.c')
-rw-r--r--src/backend/replication/logical/slotsync.c702
1 files changed, 652 insertions, 50 deletions
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 4cc9148c572..36773cfe73f 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -10,18 +10,25 @@
*
* This file contains the code for slot synchronization on a physical standby
* to fetch logical failover slots information from the primary server, create
- * the slots on the standby and synchronize them. This is done by a call to SQL
- * function pg_sync_replication_slots.
+ * the slots on the standby and synchronize them periodically.
*
- * If on physical standby, the WAL corresponding to the remote's restart_lsn
- * is not available or the remote's catalog_xmin precedes the oldest xid for which
- * it is guaranteed that rows wouldn't have been removed then we cannot create
- * the local standby slot because that would mean moving the local slot
+ * Slot synchronization can be performed either automatically by enabling slot
+ * sync worker or manually by calling SQL function pg_sync_replication_slots().
+ *
+ * If the WAL corresponding to the remote's restart_lsn is not available on the
+ * physical standby or the remote's catalog_xmin precedes the oldest xid for
+ * which it is guaranteed that rows wouldn't have been removed then we cannot
+ * create the local standby slot because that would mean moving the local slot
* backward and decoding won't be possible via such a slot. In this case, the
* slot will be marked as RS_TEMPORARY. Once the primary server catches up,
* the slot will be marked as RS_PERSISTENT (which means sync-ready) after
- * which we can call pg_sync_replication_slots() periodically to perform
- * syncs.
+ * which slot sync worker can perform the sync periodically or user can call
+ * pg_sync_replication_slots() periodically to perform the syncs.
+ *
+ * The slot sync worker waits for some time before the next synchronization,
+ * with the duration varying based on whether any slots were updated during
+ * the last cycle. Refer to the comments above wait_for_slot_activity() for
+ * more details.
*
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
@@ -31,28 +38,84 @@
#include "postgres.h"
+#include <time.h>
+
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
+#include "utils/ps_status.h"
+#include "utils/timeout.h"
-/* Struct for sharing information to control slot synchronization. */
+/*
+ * Struct for sharing information to control slot synchronization.
+ *
+ * The slot sync worker's pid is needed by the startup process to shut it
+ * down during promotion. The startup process shuts down the slot sync worker
+ * and also sets stopSignaled=true to handle the race condition when the
+ * postmaster has not noticed the promotion yet and thus may end up restarting
+ * the slot sync worker. If stopSignaled is set, the worker will exit in such a
+ * case. Note that we don't need to reset this variable as after promotion the
+ * slot sync worker won't be restarted because the pmState changes to PM_RUN from
+ * PM_HOT_STANDBY and we don't support demoting primary without restarting the
+ * server. See MaybeStartSlotSyncWorker.
+ *
+ * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
+ * overwrites.
+ *
+ * The 'last_start_time' is needed by postmaster to start the slot sync worker
+ * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a immediate restart
+ * is expected (e.g., slot sync GUCs change), slot sync worker will reset
+ * last_start_time before exiting, so that postmaster can start the worker
+ * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ *
+ * All the fields except 'syncing' are used only by slotsync worker.
+ * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
+ */
typedef struct SlotSyncCtxStruct
{
- /* prevents concurrent slot syncs to avoid slot overwrites */
+ pid_t pid;
+ bool stopSignaled;
bool syncing;
+ time_t last_start_time;
slock_t mutex;
} SlotSyncCtxStruct;
SlotSyncCtxStruct *SlotSyncCtx = NULL;
+/* GUC variable */
+bool sync_replication_slots = false;
+
+/*
+ * The sleep time (ms) between slot-sync cycles varies dynamically
+ * (within a MIN/MAX range) according to slot activity. See
+ * wait_for_slot_activity() for details.
+ */
+#define MIN_WORKER_NAPTIME_MS 200
+#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */
+
+static long sleep_ms = MIN_WORKER_NAPTIME_MS;
+
+/* The restart interval for slot sync work used by postmaster */
+#define SLOTSYNC_RESTART_INTERVAL_SEC 10
+
+/* Flag to tell if we are in a slot sync worker process */
+static bool am_slotsync_worker = false;
+
/*
* Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
* in SlotSyncCtxStruct, this flag is true only if the current process is
@@ -79,6 +142,13 @@ typedef struct RemoteSlot
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
+#ifdef EXEC_BACKEND
+static pid_t slotsyncworker_forkexec(void);
+#endif
+NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
+
+static void slotsync_failure_callback(int code, Datum arg);
+
/*
* If necessary, update the local synced slot's metadata based on the data
* from the remote slot.
@@ -343,8 +413,11 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* If the remote restart_lsn and catalog_xmin have caught up with the
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
+ *
+ * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
+ * false.
*/
-static void
+static bool
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
@@ -375,7 +448,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
slot->data.catalog_xmin));
- return;
+ return false;
}
/* First time slot update, the function must return true */
@@ -387,6 +460,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(LOG,
errmsg("newly created slot \"%s\" is sync-ready now",
remote_slot->name));
+
+ return true;
}
/*
@@ -399,12 +474,15 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* the remote_slot catches up with locally reserved position and local slot is
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
+ *
+ * Returns TRUE if the local slot is updated.
*/
-static void
+static bool
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
+ bool slot_updated = false;
/*
* Make sure that concerned WAL is received and flushed before syncing
@@ -412,12 +490,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*/
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
- elog(ERROR,
- "skipping slot synchronization as the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr));
+ {
+ ereport(am_slotsync_worker ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization as the received slot sync"
+ " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ return false;
+ }
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
@@ -465,19 +548,22 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
+
+ slot_updated = true;
}
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
ReplicationSlotRelease();
- return;
+ return slot_updated;
}
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ slot_updated = update_and_persist_local_synced_slot(remote_slot,
+ remote_dbid);
}
/* Slot ready for sync, so sync it. */
@@ -500,6 +586,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
+
+ slot_updated = true;
}
}
}
@@ -511,7 +599,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Skip creating the local slot if remote_slot is invalidated already */
if (remote_slot->invalidated != RS_INVAL_NONE)
- return;
+ return false;
/*
* We create temporary slots instead of ephemeral slots here because
@@ -548,9 +636,13 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
LWLockRelease(ProcArrayLock);
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+
+ slot_updated = true;
}
ReplicationSlotRelease();
+
+ return slot_updated;
}
/*
@@ -558,8 +650,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*
* Gets the failover logical slots info from the primary server and updates
* the slots locally. Creates the slots if not present on the standby.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
*/
-static void
+static bool
synchronize_slots(WalReceiverConn *wrconn)
{
#define SLOTSYNC_COLUMN_COUNT 9
@@ -569,6 +663,8 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
+ bool some_slot_updated = false;
+ bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover,"
" database, conflict_reason"
@@ -589,9 +685,15 @@ synchronize_slots(WalReceiverConn *wrconn)
syncing_slots = true;
+ /* The syscache access in walrcv_exec() needs a transaction env. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
/* Execute the query */
res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
-
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
@@ -686,7 +788,7 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
@@ -696,11 +798,16 @@ synchronize_slots(WalReceiverConn *wrconn)
walrcv_clear_result(res);
+ if (started_tx)
+ CommitTransactionCommand();
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
+
+ return some_slot_updated;
}
/*
@@ -720,6 +827,7 @@ validate_remote_info(WalReceiverConn *wrconn)
TupleTableSlot *tupslot;
bool remote_in_recovery;
bool primary_slot_valid;
+ bool started_tx = false;
initStringInfo(&cmd);
appendStringInfo(&cmd,
@@ -728,6 +836,13 @@ validate_remote_info(WalReceiverConn *wrconn)
" WHERE slot_type='physical' AND slot_name=%s",
quote_literal_cstr(PrimarySlotName));
+ /* The syscache access in walrcv_exec() needs a transaction env. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
pfree(cmd.data);
@@ -763,28 +878,73 @@ validate_remote_info(WalReceiverConn *wrconn)
ExecClearTuple(tupslot);
walrcv_clear_result(res);
+
+ if (started_tx)
+ CommitTransactionCommand();
}
/*
- * Check all necessary GUCs for slot synchronization are set
- * appropriately, otherwise, raise ERROR.
+ * Checks if dbname is specified in 'primary_conninfo'.
+ *
+ * Error out if not specified otherwise return it.
*/
-void
-ValidateSlotSyncParams(void)
+char *
+CheckAndGetDbnameFromConninfo(void)
{
char *dbname;
/*
+ * The slot synchronization needs a database connection for walrcv_exec to
+ * work.
+ */
+ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+ if (dbname == NULL)
+ ereport(ERROR,
+
+ /*
+ * translator: dbname is a specific option; %s is a GUC variable name
+ */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("slot synchronization requires dbname to be specified in %s",
+ "primary_conninfo"));
+ return dbname;
+}
+
+/*
+ * Return true if all necessary GUCs for slot synchronization are set
+ * appropriately, otherwise, return false.
+ */
+bool
+ValidateSlotSyncParams(int elevel)
+{
+ /*
+ * Logical slot sync/creation requires wal_level >= logical.
+ *
+ * Sincle altering the wal_level requires a server restart, so error out
+ * in this case regardless of elevel provided by caller.
+ */
+ if (wal_level < WAL_LEVEL_LOGICAL)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("slot synchronization requires wal_level >= \"logical\""));
+ return false;
+ }
+
+ /*
* A physical replication slot(primary_slot_name) is required on the
* primary to ensure that the rows needed by the standby are not removed
* after restarting, so that the synchronized slot on the standby will not
* be invalidated.
*/
if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
+ return false;
+ }
/*
* hot_standby_feedback must be enabled to cooperate with the physical
@@ -792,47 +952,478 @@ ValidateSlotSyncParams(void)
* catalog_xmin values on the standby.
*/
if (!hot_standby_feedback)
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be enabled",
"hot_standby_feedback"));
-
- /* Logical slot sync/creation requires wal_level >= logical. */
- if (wal_level < WAL_LEVEL_LOGICAL)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("slot synchronization requires wal_level >= \"logical\""));
+ return false;
+ }
/*
* The primary_conninfo is required to make connection to primary for
* getting slots information.
*/
if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined",
"primary_conninfo"));
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Re-read the config file.
+ *
+ * Exit if any of the slot sync GUCs have changed. The postmaster will
+ * restart it.
+ */
+static void
+slotsync_reread_config(void)
+{
+ char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
+ char *old_primary_slotname = pstrdup(PrimarySlotName);
+ bool old_sync_replication_slots = sync_replication_slots;
+ bool old_hot_standby_feedback = hot_standby_feedback;
+ bool conninfo_changed;
+ bool primary_slotname_changed;
+
+ Assert(sync_replication_slots);
+
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
+ primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+ pfree(old_primary_conninfo);
+ pfree(old_primary_slotname);
+
+ if (old_sync_replication_slots != sync_replication_slots)
+ {
+ ereport(LOG,
+ /* translator: %s is a GUC variable name */
+ errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
+ proc_exit(0);
+ }
+
+ if (conninfo_changed ||
+ primary_slotname_changed ||
+ (old_hot_standby_feedback != hot_standby_feedback))
+ {
+ ereport(LOG,
+ errmsg("slot sync worker will restart because of a parameter change"));
+
+ /*
+ * Reset the last-start time for this worker so that the postmaster
+ * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ */
+ SlotSyncCtx->last_start_time = 0;
+
+ proc_exit(0);
+ }
+
+}
+
+/*
+ * Interrupt handler for main loop of slot sync worker.
+ */
+static void
+ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ if (ShutdownRequestPending)
+ {
+ ereport(LOG,
+ errmsg("slot sync worker is shutting down on receiving SIGINT"));
+
+ proc_exit(0);
+ }
+
+ if (ConfigReloadPending)
+ slotsync_reread_config();
+}
+
+/*
+ * Cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_onexit(int code, Datum arg)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->pid = InvalidPid;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+}
+
+/*
+ * Sleep for long enough that we believe it's likely that the slots on primary
+ * get updated.
+ *
+ * If there is no slot activity the wait time between sync-cycles will double
+ * (to a maximum of 30s). If there is some slot activity the wait time between
+ * sync-cycles is reset to the minimum (200ms).
+ */
+static void
+wait_for_slot_activity(bool some_slot_updated)
+{
+ int rc;
+
+ if (!some_slot_updated)
+ {
+ /*
+ * No slots were updated, so double the sleep time, but not beyond the
+ * maximum allowable value.
+ */
+ sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS);
+ }
+ else
+ {
+ /*
+ * Some slots were updated since the last sleep, so reset the sleep
+ * time.
+ */
+ sleep_ms = MIN_WORKER_NAPTIME_MS;
+ }
+
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ sleep_ms,
+ WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+}
+
+/*
+ * The main loop of our worker process.
+ *
+ * It connects to the primary server, fetches logical failover slots
+ * information periodically in order to create and sync the slots.
+ */
+NON_EXEC_STATIC void
+ReplSlotSyncWorkerMain(int argc, char *argv[])
+{
+ WalReceiverConn *wrconn = NULL;
+ char *dbname;
+ char *err;
+ sigjmp_buf local_sigjmp_buf;
+ StringInfoData app_name;
+
+ am_slotsync_worker = true;
+
+ MyBackendType = B_SLOTSYNC_WORKER;
+
+ init_ps_display(NULL);
+
+ SetProcessingMode(InitProcessing);
/*
- * The slot synchronization needs a database connection for walrcv_exec to
- * work.
+ * Create a per-backend PGPROC struct in shared memory. We must do this
+ * before we access any shared memory.
*/
- dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
- if (dbname == NULL)
- ereport(ERROR,
+ InitProcess();
+
+ /*
+ * Early initialization.
+ */
+ BaseInit();
+
+ Assert(SlotSyncCtx != NULL);
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync worker to stop, so if meanwhile
+ * postmaster ended up starting the worker again, exit.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ proc_exit(0);
+ }
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = MyProcPid;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * We just need to clean up, report the error, and go away.
+ *
+ * If we do not have this handling here, then since this worker process
+ * operates at the bottom of the exception stack, ERRORs turn into FATALs.
+ * Therefore, we create our own exception handler to catch ERRORs.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
/*
- * translator: dbname is a specific option; %s is a GUC variable name
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
*/
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("slot synchronization requires dbname to be specified in %s",
- "primary_conninfo"));
+ proc_exit(0);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+ dbname = CheckAndGetDbnameFromConninfo();
+
+ /*
+ * Connect to the database specified by the user in primary_conninfo. We
+ * need a database connection for walrcv_exec to work which we use to
+ * fetch slot information from the remote node. See comments atop
+ * libpqrcv_exec.
+ *
+ * We do not specify a specific user here since the slot sync worker will
+ * operate as a superuser. This is safe because the slot sync worker does
+ * not interact with user tables, eliminating the risk of executing
+ * arbitrary code within triggers.
+ */
+ InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
+
+ SetProcessingMode(NormalProcessing);
+
+ initStringInfo(&app_name);
+ if (cluster_name[0])
+ appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
+ else
+ appendStringInfo(&app_name, "%s", "slotsync worker");
+
+ /*
+ * Establish the connection to the primary server for slot
+ * synchronization.
+ */
+ wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
+ app_name.data, &err);
+ pfree(app_name.data);
+
+ if (!wrconn)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err));
+
+ /*
+ * Register the failure callback once we have the connection.
+ *
+ * XXX: This can be combined with previous such cleanup registration of
+ * slotsync_worker_onexit() but that will need the connection to be made
+ * global and we want to avoid introducing global for this purpose.
+ */
+ before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+
+ /*
+ * Using the specified primary server connection, check that we are not a
+ * cascading standby and slot configured in 'primary_slot_name' exists on
+ * the primary server.
+ */
+ validate_remote_info(wrconn);
+
+ /* Main loop to synchronize slots */
+ for (;;)
+ {
+ bool some_slot_updated = false;
+
+ ProcessSlotSyncInterrupts(wrconn);
+
+ some_slot_updated = synchronize_slots(wrconn);
+
+ wait_for_slot_activity(some_slot_updated);
+ }
+
+ /*
+ * The slot sync worker can't get here because it will only stop when it
+ * receives a SIGINT from the startup process, or when there is an error.
+ */
+ Assert(false);
+}
+
+/*
+ * Main entry point for slot sync worker process, to be called from the
+ * postmaster.
+ */
+int
+StartSlotSyncWorker(void)
+{
+ pid_t pid;
+
+#ifdef EXEC_BACKEND
+ switch ((pid = slotsyncworker_forkexec()))
+ {
+#else
+ switch ((pid = fork_process()))
+ {
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ ReplSlotSyncWorkerMain(0, NULL);
+ break;
+#endif
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork slot sync worker process: %m")));
+ return 0;
+
+ default:
+ return (int) pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+#ifdef EXEC_BACKEND
+/*
+ * The forkexec routine for the slot sync worker process.
+ *
+ * Format up the arglist, then fork and exec.
+ */
+static pid_t
+slotsyncworker_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkssworker";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+#endif
+
+/*
+ * Shut down the slot sync worker.
+ */
+void
+ShutDownSlotSync(void)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ SlotSyncCtx->stopSignaled = true;
+
+ if (SlotSyncCtx->pid == InvalidPid)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ return;
+ }
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ kill(SlotSyncCtx->pid, SIGINT);
+
+ /* Wait for it to die */
+ for (;;)
+ {
+ int rc;
+
+ /* Wait a bit, we don't expect to have to wait long */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* Is it gone? */
+ if (SlotSyncCtx->pid == InvalidPid)
+ break;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ }
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+}
+
+/*
+ * SlotSyncWorkerCanRestart
+ *
+ * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
+ * since it was launched last. Otherwise returns false.
+ *
+ * This is a safety valve to protect against continuous respawn attempts if the
+ * worker is dying immediately at launch. Note that since we will retry to
+ * launch the worker from the postmaster main loop, we will get another
+ * chance later.
+ */
+bool
+SlotSyncWorkerCanRestart(void)
+{
+ time_t curtime = time(NULL);
+
+ /* Return false if too soon since last start. */
+ if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
+ (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
+ return false;
+
+ SlotSyncCtx->last_start_time = curtime;
+
+ return true;
}
/*
- * Is current process syncing replication slots ?
+ * Is current process syncing replication slots?
+ *
+ * Could be either backend executing SQL function or slot sync worker.
*/
bool
IsSyncingReplicationSlots(void)
@@ -841,6 +1432,15 @@ IsSyncingReplicationSlots(void)
}
/*
+ * Is current process a slot sync worker?
+ */
+bool
+IsLogicalSlotSyncWorker(void)
+{
+ return am_slotsync_worker;
+}
+
+/*
* Amount of shared memory required for slot synchronization.
*/
Size
@@ -855,14 +1455,16 @@ SlotSyncShmemSize(void)
void
SlotSyncShmemInit(void)
{
+ Size size = SlotSyncShmemSize();
bool found;
SlotSyncCtx = (SlotSyncCtxStruct *)
- ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
+ ShmemInitStruct("Slot Sync Data", size, &found);
if (!found)
{
- SlotSyncCtx->syncing = false;
+ memset(SlotSyncCtx, 0, size);
+ SlotSyncCtx->pid = InvalidPid;
SpinLockInit(&SlotSyncCtx->mutex);
}
}