aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c12
-rw-r--r--src/backend/replication/logical/slotsync.c11
-rw-r--r--src/backend/replication/slot.c403
-rw-r--r--src/backend/replication/slotfuncs.c12
-rw-r--r--src/backend/replication/walsender.c159
-rw-r--r--src/backend/utils/activity/wait_event_names.txt1
-rw-r--r--src/backend/utils/misc/guc_tables.c14
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample2
8 files changed, 594 insertions, 20 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index a1ff631e5ed..b4dd5cce75b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -105,6 +105,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
+ XLogRecPtr wait_for_wal_lsn;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
@@ -224,6 +225,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
NameStr(MyReplicationSlot->data.plugin),
format_procedure(fcinfo->flinfo->fn_oid))));
+ /*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL up to wait_for_wal_lsn.
+ */
+ if (XLogRecPtrIsInvalid(upto_lsn))
+ wait_for_wal_lsn = end_of_wal;
+ else
+ wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
+
+ WaitForStandbyConfirmation(wait_for_wal_lsn);
+
ctx->output_writer_private = p;
/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index ad0fc6a04b6..5074c8409f7 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -488,6 +488,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
{
+ /*
+ * Can get here only if GUC 'standby_slot_names' on the primary server
+ * was not configured correctly.
+ */
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization as the received slot sync"
@@ -857,6 +861,13 @@ validate_remote_info(WalReceiverConn *wrconn)
remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
Assert(!isnull);
+ /*
+ * Slot sync is currently not supported on a cascading standby. This is
+ * because if we allow it, the primary server needs to wait for all the
+ * cascading standbys, otherwise, logical subscribers can still be ahead
+ * of one of the cascading standbys which we plan to promote. Thus, to
+ * avoid this additional complexity, we restrict it for the time being.
+ */
if (remote_in_recovery)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2614f98ddd2..b8bf98b1822 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,13 +46,17 @@
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/varlena.h"
/*
* Replication slot on-disk data structure.
@@ -78,6 +82,24 @@ typedef struct ReplicationSlotOnDisk
} ReplicationSlotOnDisk;
/*
+ * Struct for the configuration of standby_slot_names.
+ *
+ * Note: this must be a flat representation that can be held in a single chunk
+ * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
+ * standby_slot_names GUC.
+ */
+typedef struct
+{
+ /* Number of slot names in the slot_names[] */
+ int nslotnames;
+
+ /*
+ * slot_names contains 'nslotnames' consecutive null-terminated C strings.
+ */
+ char slot_names[FLEXIBLE_ARRAY_MEMBER];
+} StandbySlotNamesConfigData;
+
+/*
* Lookup table for slot invalidation causes.
*/
const char *const SlotInvalidationCauses[] = {
@@ -115,10 +137,25 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
/* My backend's replication slot in the shared memory array */
ReplicationSlot *MyReplicationSlot = NULL;
-/* GUC variable */
+/* GUC variables */
int max_replication_slots = 10; /* the maximum number of replication
* slots */
+/*
+ * This GUC lists streaming replication standby server slot names that
+ * logical WAL sender processes will wait for.
+ */
+char *standby_slot_names;
+
+/* This is the parsed and cached configuration for standby_slot_names */
+static StandbySlotNamesConfigData *standby_slot_names_config;
+
+/*
+ * Oldest LSN that has been confirmed to be flushed to the standbys
+ * corresponding to the physical slots specified in the standby_slot_names GUC.
+ */
+static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
+
static void ReplicationSlotShmemExit(int code, Datum arg);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -2345,3 +2382,367 @@ GetSlotInvalidationCause(const char *conflict_reason)
Assert(found);
return result;
}
+
+/*
+ * A helper function to validate slots specified in GUC standby_slot_names.
+ *
+ * The rawname will be parsed, and the result will be saved into *elemlist.
+ */
+static bool
+validate_standby_slots(char *rawname, List **elemlist)
+{
+ bool ok;
+
+ /* Verify syntax and parse string into a list of identifiers */
+ ok = SplitIdentifierString(rawname, ',', elemlist);
+
+ if (!ok)
+ {
+ GUC_check_errdetail("List syntax is invalid.");
+ }
+ else if (!ReplicationSlotCtl)
+ {
+ /*
+ * We cannot validate the replication slot if the replication slots'
+ * data has not been initialized. This is ok as we will anyway
+ * validate the specified slot when waiting for them to catch up. See
+ * StandbySlotsHaveCaughtup() for details.
+ */
+ }
+ else
+ {
+ /* Check that the specified slots exist and are logical slots */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ foreach_ptr(char, name, *elemlist)
+ {
+ ReplicationSlot *slot;
+
+ slot = SearchNamedReplicationSlot(name, false);
+
+ if (!slot)
+ {
+ GUC_check_errdetail("replication slot \"%s\" does not exist",
+ name);
+ ok = false;
+ break;
+ }
+
+ if (!SlotIsPhysical(slot))
+ {
+ GUC_check_errdetail("\"%s\" is not a physical replication slot",
+ name);
+ ok = false;
+ break;
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+ }
+
+ return ok;
+}
+
+/*
+ * GUC check_hook for standby_slot_names
+ */
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+ char *rawname;
+ char *ptr;
+ List *elemlist;
+ int size;
+ bool ok;
+ StandbySlotNamesConfigData *config;
+
+ if ((*newval)[0] == '\0')
+ return true;
+
+ /* Need a modifiable copy of the GUC string */
+ rawname = pstrdup(*newval);
+
+ /* Now verify if the specified slots exist and have correct type */
+ ok = validate_standby_slots(rawname, &elemlist);
+
+ if (!ok || elemlist == NIL)
+ {
+ pfree(rawname);
+ list_free(elemlist);
+ return ok;
+ }
+
+ /* Compute the size required for the StandbySlotNamesConfigData struct */
+ size = offsetof(StandbySlotNamesConfigData, slot_names);
+ foreach_ptr(char, slot_name, elemlist)
+ size += strlen(slot_name) + 1;
+
+ /* GUC extra value must be guc_malloc'd, not palloc'd */
+ config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size);
+
+ /* Transform the data into StandbySlotNamesConfigData */
+ config->nslotnames = list_length(elemlist);
+
+ ptr = config->slot_names;
+ foreach_ptr(char, slot_name, elemlist)
+ {
+ strcpy(ptr, slot_name);
+ ptr += strlen(slot_name) + 1;
+ }
+
+ *extra = (void *) config;
+
+ pfree(rawname);
+ list_free(elemlist);
+ return true;
+}
+
+/*
+ * GUC assign_hook for standby_slot_names
+ */
+void
+assign_standby_slot_names(const char *newval, void *extra)
+{
+ /*
+ * The standby slots may have changed, so we must recompute the oldest
+ * LSN.
+ */
+ ss_oldest_flush_lsn = InvalidXLogRecPtr;
+
+ standby_slot_names_config = (StandbySlotNamesConfigData *) extra;
+}
+
+/*
+ * Check if the passed slot_name is specified in the standby_slot_names GUC.
+ */
+bool
+SlotExistsInStandbySlotNames(const char *slot_name)
+{
+ const char *standby_slot_name;
+
+ /* Return false if there is no value in standby_slot_names */
+ if (standby_slot_names_config == NULL)
+ return false;
+
+ /*
+ * XXX: We are not expecting this list to be long so a linear search
+ * shouldn't hurt but if that turns out not to be true then we can cache
+ * this information for each WalSender as well.
+ */
+ standby_slot_name = standby_slot_names_config->slot_names;
+ for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
+ {
+ if (strcmp(standby_slot_name, slot_name) == 0)
+ return true;
+
+ standby_slot_name += strlen(standby_slot_name) + 1;
+ }
+
+ return false;
+}
+
+/*
+ * Return true if the slots specified in standby_slot_names have caught up to
+ * the given WAL location, false otherwise.
+ *
+ * The elevel parameter specifies the error level used for logging messages
+ * related to slots that do not exist, are invalidated, or are inactive.
+ */
+bool
+StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
+{
+ const char *name;
+ int caught_up_slot_num = 0;
+ XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
+
+ /*
+ * Don't need to wait for the standbys to catch up if there is no value in
+ * standby_slot_names.
+ */
+ if (standby_slot_names_config == NULL)
+ return true;
+
+ /*
+ * Don't need to wait for the standbys to catch up if we are on a standby
+ * server, since we do not support syncing slots to cascading standbys.
+ */
+ if (RecoveryInProgress())
+ return true;
+
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
+
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ name = standby_slot_names_config->slot_names;
+ for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
+ {
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
+
+ slot = SearchNamedReplicationSlot(name, false);
+
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in standby_slot_names does not exist,
+ * report a message and exit the loop. A user can specify a slot
+ * name that does not exist just before the server startup. The
+ * GUC check_hook(validate_standby_slots) cannot validate such a
+ * slot during startup as the ReplicationSlotCtl shared memory is
+ * not initialized at that time. It is also possible for a user to
+ * drop the slot in standby_slot_names afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
+
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in standby_slot_names,
+ * report a message and exit the loop. Similar to the non-existent
+ * case, a user can specify a logical slot name in
+ * standby_slot_names before the server startup, or drop an
+ * existing physical slot and recreate a logical slot with the
+ * same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting for correction on \"%s\".",
+ name),
+ errhint("Consider removing logical slot \"%s\" from parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
+
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
+
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != standby_slot_names_config->nslotnames)
+ return false;
+
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
+
+ ss_oldest_flush_lsn = min_restart_lsn;
+
+ return true;
+}
+
+/*
+ * Wait for physical standbys to confirm receiving the given lsn.
+ *
+ * Used by logical decoding SQL functions. It waits for physical standbys
+ * corresponding to the physical slots specified in the standby_slot_names GUC.
+ */
+void
+WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
+{
+ /*
+ * Don't need to wait for the standby to catch up if the current acquired
+ * slot is not a logical failover slot, or there is no value in
+ * standby_slot_names.
+ */
+ if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
+ return;
+
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Exit if done waiting for every slot. */
+ if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
+ break;
+
+ /*
+ * Wait for the slots in the standby_slot_names to catch up, but use a
+ * timeout (1s) so we can also check if the standby_slot_names has
+ * been changed.
+ */
+ ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
+ WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
+ }
+
+ ConditionVariableCancelSleep();
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 768a304723b..ad79e1fccd6 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -464,6 +464,12 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
* crash, but this makes the data consistent after a clean shutdown.
*/
ReplicationSlotMarkDirty();
+
+ /*
+ * Wake up logical walsenders holding logical failover slots after
+ * updating the restart_lsn of the physical slot.
+ */
+ PhysicalWakeupLogicalWalSnd();
}
return retlsn;
@@ -505,6 +511,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
NULL, NULL, NULL);
/*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL up to moveto lsn.
+ */
+ WaitForStandbyConfirmation(moveto);
+
+ /*
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0f1047179cb..25edb5e1412 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1726,25 +1726,109 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
}
/*
+ * Wake up the logical walsender processes with logical failover slots if the
+ * currently acquired physical slot is specified in standby_slot_names GUC.
+ */
+void
+PhysicalWakeupLogicalWalSnd(void)
+{
+ Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot));
+
+ /*
+ * If we are running in a standby, there is no need to wake up walsenders.
+ * This is because we do not support syncing slots to cascading standbys,
+ * so, there are no walsenders waiting for standbys to catch up.
+ */
+ if (RecoveryInProgress())
+ return;
+
+ if (SlotExistsInStandbySlotNames(NameStr(MyReplicationSlot->data.name)))
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+}
+
+/*
+ * Returns true if not all standbys have caught up to the flushed position
+ * (flushed_lsn) when the current acquired slot is a logical failover
+ * slot and we are streaming; otherwise, returns false.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
+{
+ int elevel = got_STOPPING ? ERROR : WARNING;
+ bool failover_slot;
+
+ failover_slot = (replication_active && MyReplicationSlot->data.failover);
+
+ /*
+ * Note that after receiving the shutdown signal, an ERROR is reported if
+ * any slots are dropped, invalidated, or inactive. This measure is taken
+ * to prevent the walsender from waiting indefinitely.
+ */
+ if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
+ {
+ *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
+ return true;
+ }
+
+ *wait_event = 0;
+ return false;
+}
+
+/*
+ * Returns true if we need to wait for WALs to be flushed to disk, or if not
+ * all standbys have caught up to the flushed position (flushed_lsn) when the
+ * current acquired slot is a logical failover slot and we are
+ * streaming; otherwise, returns false.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn,
+ uint32 *wait_event)
+{
+ /* Check if we need to wait for WALs to be flushed to disk */
+ if (target_lsn > flushed_lsn)
+ {
+ *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
+ return true;
+ }
+
+ /* Check if the standby slots have caught up to the flushed position */
+ return NeedToWaitForStandbys(flushed_lsn, wait_event);
+}
+
+/*
* Wait till WAL < loc is flushed to disk so it can be safely sent to client.
*
- * Returns end LSN of flushed WAL. Normally this will be >= loc, but
- * if we detect a shutdown request (either from postmaster or client)
- * we will return early, so caller must always check.
+ * If the walsender holds a logical failover slot, we also wait for all the
+ * specified streaming replication standby servers to confirm receipt of WAL
+ * up to RecentFlushPtr. It is beneficial to wait here for the confirmation
+ * up to RecentFlushPtr rather than waiting before transmitting each change
+ * to logical subscribers, which is already covered by RecentFlushPtr.
+ *
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we
+ * detect a shutdown request (either from postmaster or client) we will return
+ * early, so caller must always check.
*/
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
+ uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
- * have enough WAL available. This is particularly interesting if we're
- * far behind.
+ * have enough WAL available and all the standby servers have confirmed
+ * receipt of WAL up to RecentFlushPtr. This is particularly interesting
+ * if we're far behind.
*/
- if (RecentFlushPtr != InvalidXLogRecPtr &&
- loc <= RecentFlushPtr)
+ if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
+ !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
return RecentFlushPtr;
/* Get a more recent flush pointer. */
@@ -1753,8 +1837,14 @@ WalSndWaitForWal(XLogRecPtr loc)
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ /*
+ * Within the loop, we wait for the necessary WALs to be flushed to disk
+ * first, followed by waiting for standbys to catch up if there are enough
+ * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
+ */
for (;;)
{
+ bool wait_for_standby_at_stop = false;
long sleeptime;
/* Clear any already-pending wakeups */
@@ -1781,21 +1871,35 @@ WalSndWaitForWal(XLogRecPtr loc)
if (got_STOPPING)
XLogBackgroundFlush();
- /* Update our idea of the currently flushed position. */
- if (!RecoveryInProgress())
- RecentFlushPtr = GetFlushRecPtr(NULL);
- else
- RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ /*
+ * To avoid the scenario where standbys need to catch up to a newer
+ * WAL location in each iteration, we update our idea of the currently
+ * flushed position only if we are not waiting for standbys to catch
+ * up.
+ */
+ if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+ {
+ if (!RecoveryInProgress())
+ RecentFlushPtr = GetFlushRecPtr(NULL);
+ else
+ RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ }
/*
- * If postmaster asked us to stop, don't wait anymore.
+ * If postmaster asked us to stop and the standby slots have caught up
+ * to the flushed position, don't wait anymore.
*
* It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting
* down.
*/
if (got_STOPPING)
- break;
+ {
+ if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
+ wait_for_standby_at_stop = true;
+ else
+ break;
+ }
/*
* We only send regular messages to the client for full decoded
@@ -1810,11 +1914,18 @@ WalSndWaitForWal(XLogRecPtr loc)
!waiting_for_ping_response)
WalSndKeepalive(false, InvalidXLogRecPtr);
- /* check whether we're done */
- if (loc <= RecentFlushPtr)
+ /*
+ * Exit the loop if already caught up and doesn't need to wait for
+ * standby slots.
+ */
+ if (!wait_for_standby_at_stop &&
+ !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
break;
- /* Waiting for new WAL. Since we need to wait, we're now caught up. */
+ /*
+ * Waiting for new WAL or waiting for standbys to catch up. Since we
+ * need to wait, we're now caught up.
+ */
WalSndCaughtUp = true;
/*
@@ -1852,7 +1963,9 @@ WalSndWaitForWal(XLogRecPtr loc)
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
+ Assert(wait_event != 0);
+
+ WalSndWait(wakeEvents, sleeptime, wait_event);
}
/* reactivate latch so WalSndLoop knows to continue */
@@ -2262,6 +2375,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredLSN();
+ PhysicalWakeupLogicalWalSnd();
}
/*
@@ -3535,6 +3649,7 @@ WalSndShmemInit(void)
ConditionVariableInit(&WalSndCtl->wal_flush_cv);
ConditionVariableInit(&WalSndCtl->wal_replay_cv);
+ ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv);
}
}
@@ -3604,8 +3719,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
*
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
+ *
+ * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
+ * until awakened by physical walsenders after the walreceiver confirms
+ * the receipt of the LSN.
*/
- if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+ if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index ec2f31f82af..c08e00d1d6a 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -78,6 +78,7 @@ GSS_OPEN_SERVER "Waiting to read data from the client while establishing a GSSAP
LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to remote server."
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
+WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 45013582a74..d77214795de 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4670,6 +4670,20 @@ struct config_string ConfigureNamesString[] =
check_debug_io_direct, assign_debug_io_direct, NULL
},
+ {
+ {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+ gettext_noop("Lists streaming replication standby server slot "
+ "names that logical WAL sender processes will wait for."),
+ gettext_noop("Logical WAL sender processes will send decoded "
+ "changes to plugins only after the specified "
+ "replication slots confirm receiving WAL."),
+ GUC_LIST_INPUT
+ },
+ &standby_slot_names,
+ "",
+ check_standby_slot_names, assign_standby_slot_names, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index edcc0282b2d..2244ee52f79 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -343,6 +343,8 @@
# method to choose sync standbys, number of sync standbys,
# and comma-separated list of application_name
# from standby(s); '*' = all
+#standby_slot_names = '' # streaming replication standby server slot names that
+ # logical walsender processes will wait for
# - Standby Servers -