diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 12 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 11 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 403 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 12 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 159 | ||||
-rw-r--r-- | src/backend/utils/activity/wait_event_names.txt | 1 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 14 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 2 |
8 files changed, 594 insertions, 20 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index a1ff631e5ed..b4dd5cce75b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -105,6 +105,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wait_for_wal_lsn; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -224,6 +225,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to wait_for_wal_lsn. + */ + if (XLogRecPtrIsInvalid(upto_lsn)) + wait_for_wal_lsn = end_of_wal; + else + wait_for_wal_lsn = Min(upto_lsn, end_of_wal); + + WaitForStandbyConfirmation(wait_for_wal_lsn); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index ad0fc6a04b6..5074c8409f7 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -488,6 +488,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) latestFlushPtr = GetStandbyFlushRecPtr(NULL); if (remote_slot->confirmed_lsn > latestFlushPtr) { + /* + * Can get here only if GUC 'standby_slot_names' on the primary server + * was not configured correctly. + */ ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("skipping slot synchronization as the received slot sync" @@ -857,6 +861,13 @@ validate_remote_info(WalReceiverConn *wrconn) remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); Assert(!isnull); + /* + * Slot sync is currently not supported on a cascading standby. This is + * because if we allow it, the primary server needs to wait for all the + * cascading standbys, otherwise, logical subscribers can still be ahead + * of one of the cascading standbys which we plan to promote. Thus, to + * avoid this additional complexity, we restrict it for the time being. + */ if (remote_in_recovery) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2614f98ddd2..b8bf98b1822 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,13 +46,17 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/interrupt.h" #include "replication/slotsync.h" #include "replication/slot.h" +#include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -78,6 +82,24 @@ typedef struct ReplicationSlotOnDisk } ReplicationSlotOnDisk; /* + * Struct for the configuration of standby_slot_names. + * + * Note: this must be a flat representation that can be held in a single chunk + * of guc_malloc'd memory, so that it can be stored as the "extra" data for the + * standby_slot_names GUC. + */ +typedef struct +{ + /* Number of slot names in the slot_names[] */ + int nslotnames; + + /* + * slot_names contains 'nslotnames' consecutive null-terminated C strings. + */ + char slot_names[FLEXIBLE_ARRAY_MEMBER]; +} StandbySlotNamesConfigData; + +/* * Lookup table for slot invalidation causes. */ const char *const SlotInvalidationCauses[] = { @@ -115,10 +137,25 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * This GUC lists streaming replication standby server slot names that + * logical WAL sender processes will wait for. + */ +char *standby_slot_names; + +/* This is the parsed and cached configuration for standby_slot_names */ +static StandbySlotNamesConfigData *standby_slot_names_config; + +/* + * Oldest LSN that has been confirmed to be flushed to the standbys + * corresponding to the physical slots specified in the standby_slot_names GUC. + */ +static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; + static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -2345,3 +2382,367 @@ GetSlotInvalidationCause(const char *conflict_reason) Assert(found); return result; } + +/* + * A helper function to validate slots specified in GUC standby_slot_names. + * + * The rawname will be parsed, and the result will be saved into *elemlist. + */ +static bool +validate_standby_slots(char *rawname, List **elemlist) +{ + bool ok; + + /* Verify syntax and parse string into a list of identifiers */ + ok = SplitIdentifierString(rawname, ',', elemlist); + + if (!ok) + { + GUC_check_errdetail("List syntax is invalid."); + } + else if (!ReplicationSlotCtl) + { + /* + * We cannot validate the replication slot if the replication slots' + * data has not been initialized. This is ok as we will anyway + * validate the specified slot when waiting for them to catch up. See + * StandbySlotsHaveCaughtup() for details. + */ + } + else + { + /* Check that the specified slots exist and are logical slots */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + foreach_ptr(char, name, *elemlist) + { + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, false); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", + name); + ok = false; + break; + } + + if (!SlotIsPhysical(slot)) + { + GUC_check_errdetail("\"%s\" is not a physical replication slot", + name); + ok = false; + break; + } + } + + LWLockRelease(ReplicationSlotControlLock); + } + + return ok; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + char *rawname; + char *ptr; + List *elemlist; + int size; + bool ok; + StandbySlotNamesConfigData *config; + + if ((*newval)[0] == '\0') + return true; + + /* Need a modifiable copy of the GUC string */ + rawname = pstrdup(*newval); + + /* Now verify if the specified slots exist and have correct type */ + ok = validate_standby_slots(rawname, &elemlist); + + if (!ok || elemlist == NIL) + { + pfree(rawname); + list_free(elemlist); + return ok; + } + + /* Compute the size required for the StandbySlotNamesConfigData struct */ + size = offsetof(StandbySlotNamesConfigData, slot_names); + foreach_ptr(char, slot_name, elemlist) + size += strlen(slot_name) + 1; + + /* GUC extra value must be guc_malloc'd, not palloc'd */ + config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size); + + /* Transform the data into StandbySlotNamesConfigData */ + config->nslotnames = list_length(elemlist); + + ptr = config->slot_names; + foreach_ptr(char, slot_name, elemlist) + { + strcpy(ptr, slot_name); + ptr += strlen(slot_name) + 1; + } + + *extra = (void *) config; + + pfree(rawname); + list_free(elemlist); + return true; +} + +/* + * GUC assign_hook for standby_slot_names + */ +void +assign_standby_slot_names(const char *newval, void *extra) +{ + /* + * The standby slots may have changed, so we must recompute the oldest + * LSN. + */ + ss_oldest_flush_lsn = InvalidXLogRecPtr; + + standby_slot_names_config = (StandbySlotNamesConfigData *) extra; +} + +/* + * Check if the passed slot_name is specified in the standby_slot_names GUC. + */ +bool +SlotExistsInStandbySlotNames(const char *slot_name) +{ + const char *standby_slot_name; + + /* Return false if there is no value in standby_slot_names */ + if (standby_slot_names_config == NULL) + return false; + + /* + * XXX: We are not expecting this list to be long so a linear search + * shouldn't hurt but if that turns out not to be true then we can cache + * this information for each WalSender as well. + */ + standby_slot_name = standby_slot_names_config->slot_names; + for (int i = 0; i < standby_slot_names_config->nslotnames; i++) + { + if (strcmp(standby_slot_name, slot_name) == 0) + return true; + + standby_slot_name += strlen(standby_slot_name) + 1; + } + + return false; +} + +/* + * Return true if the slots specified in standby_slot_names have caught up to + * the given WAL location, false otherwise. + * + * The elevel parameter specifies the error level used for logging messages + * related to slots that do not exist, are invalidated, or are inactive. + */ +bool +StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) +{ + const char *name; + int caught_up_slot_num = 0; + XLogRecPtr min_restart_lsn = InvalidXLogRecPtr; + + /* + * Don't need to wait for the standbys to catch up if there is no value in + * standby_slot_names. + */ + if (standby_slot_names_config == NULL) + return true; + + /* + * Don't need to wait for the standbys to catch up if we are on a standby + * server, since we do not support syncing slots to cascading standbys. + */ + if (RecoveryInProgress()) + return true; + + /* + * Don't need to wait for the standbys to catch up if they are already + * beyond the specified WAL location. + */ + if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) && + ss_oldest_flush_lsn >= wait_for_lsn) + return true; + + /* + * To prevent concurrent slot dropping and creation while filtering the + * slots, take the ReplicationSlotControlLock outside of the loop. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + name = standby_slot_names_config->slot_names; + for (int i = 0; i < standby_slot_names_config->nslotnames; i++) + { + XLogRecPtr restart_lsn; + bool invalidated; + bool inactive; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, false); + + if (!slot) + { + /* + * If a slot name provided in standby_slot_names does not exist, + * report a message and exit the loop. A user can specify a slot + * name that does not exist just before the server startup. The + * GUC check_hook(validate_standby_slots) cannot validate such a + * slot during startup as the ReplicationSlotCtl shared memory is + * not initialized at that time. It is also possible for a user to + * drop the slot in standby_slot_names afterwards. + */ + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" specified in parameter %s does not exist", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider creating the slot \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + break; + } + + if (SlotIsLogical(slot)) + { + /* + * If a logical slot name is provided in standby_slot_names, + * report a message and exit the loop. Similar to the non-existent + * case, a user can specify a logical slot name in + * standby_slot_names before the server startup, or drop an + * existing physical slot and recreate a logical slot with the + * same name. + */ + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot have logical replication slot \"%s\" in parameter %s", + name, "standby_slot_names"), + errdetail("Logical replication is waiting for correction on \"%s\".", + name), + errhint("Consider removing logical slot \"%s\" from parameter %s.", + name, "standby_slot_names")); + break; + } + + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + inactive = slot->active_pid == 0; + SpinLockRelease(&slot->mutex); + + if (invalidated) + { + /* Specified physical slot has been invalidated */ + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical slot \"%s\" specified in parameter %s has been invalidated", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + break; + } + + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn) + { + /* Log a message if no active_pid for this physical slot */ + if (inactive) + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the standby associated with \"%s\".", + name), + errhint("Consider starting standby associated with \"%s\" or amend parameter %s.", + name, "standby_slot_names")); + + /* Continue if the current slot hasn't caught up. */ + break; + } + + Assert(restart_lsn >= wait_for_lsn); + + if (XLogRecPtrIsInvalid(min_restart_lsn) || + min_restart_lsn > restart_lsn) + min_restart_lsn = restart_lsn; + + caught_up_slot_num++; + + name += strlen(name) + 1; + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Return false if not all the standbys have caught up to the specified + * WAL location. + */ + if (caught_up_slot_num != standby_slot_names_config->nslotnames) + return false; + + /* The ss_oldest_flush_lsn must not retreat. */ + Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) || + min_restart_lsn >= ss_oldest_flush_lsn); + + ss_oldest_flush_lsn = min_restart_lsn; + + return true; +} + +/* + * Wait for physical standbys to confirm receiving the given lsn. + * + * Used by logical decoding SQL functions. It waits for physical standbys + * corresponding to the physical slots specified in the standby_slot_names GUC. + */ +void +WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + /* + * Don't need to wait for the standby to catch up if the current acquired + * slot is not a logical failover slot, or there is no value in + * standby_slot_names. + */ + if (!MyReplicationSlot->data.failover || !standby_slot_names_config) + return; + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Exit if done waiting for every slot. */ + if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING)) + break; + + /* + * Wait for the slots in the standby_slot_names to catch up, but use a + * timeout (1s) so we can also check if the standby_slot_names has + * been changed. + */ + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000, + WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 768a304723b..ad79e1fccd6 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -464,6 +464,12 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * crash, but this makes the data consistent after a clean shutdown. */ ReplicationSlotMarkDirty(); + + /* + * Wake up logical walsenders holding logical failover slots after + * updating the restart_lsn of the physical slot. + */ + PhysicalWakeupLogicalWalSnd(); } return retlsn; @@ -505,6 +511,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) NULL, NULL, NULL); /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + + /* * Start reading at the slot's restart_lsn, which we know to point to * a valid record. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0f1047179cb..25edb5e1412 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1726,25 +1726,109 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId } /* + * Wake up the logical walsender processes with logical failover slots if the + * currently acquired physical slot is specified in standby_slot_names GUC. + */ +void +PhysicalWakeupLogicalWalSnd(void) +{ + Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot)); + + /* + * If we are running in a standby, there is no need to wake up walsenders. + * This is because we do not support syncing slots to cascading standbys, + * so, there are no walsenders waiting for standbys to catch up. + */ + if (RecoveryInProgress()) + return; + + if (SlotExistsInStandbySlotNames(NameStr(MyReplicationSlot->data.name))) + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); +} + +/* + * Returns true if not all standbys have caught up to the flushed position + * (flushed_lsn) when the current acquired slot is a logical failover + * slot and we are streaming; otherwise, returns false. + * + * If returning true, the function sets the appropriate wait event in + * wait_event; otherwise, wait_event is set to 0. + */ +static bool +NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event) +{ + int elevel = got_STOPPING ? ERROR : WARNING; + bool failover_slot; + + failover_slot = (replication_active && MyReplicationSlot->data.failover); + + /* + * Note that after receiving the shutdown signal, an ERROR is reported if + * any slots are dropped, invalidated, or inactive. This measure is taken + * to prevent the walsender from waiting indefinitely. + */ + if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel)) + { + *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION; + return true; + } + + *wait_event = 0; + return false; +} + +/* + * Returns true if we need to wait for WALs to be flushed to disk, or if not + * all standbys have caught up to the flushed position (flushed_lsn) when the + * current acquired slot is a logical failover slot and we are + * streaming; otherwise, returns false. + * + * If returning true, the function sets the appropriate wait event in + * wait_event; otherwise, wait_event is set to 0. + */ +static bool +NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, + uint32 *wait_event) +{ + /* Check if we need to wait for WALs to be flushed to disk */ + if (target_lsn > flushed_lsn) + { + *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + return true; + } + + /* Check if the standby slots have caught up to the flushed position */ + return NeedToWaitForStandbys(flushed_lsn, wait_event); +} + +/* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical failover slot, we also wait for all the + * specified streaming replication standby servers to confirm receipt of WAL + * up to RecentFlushPtr. It is beneficial to wait here for the confirmation + * up to RecentFlushPtr rather than waiting before transmitting each change + * to logical subscribers, which is already covered by RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + uint32 wait_event = 0; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; /* * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * have enough WAL available and all the standby servers have confirmed + * receipt of WAL up to RecentFlushPtr. This is particularly interesting + * if we're far behind. */ - if (RecentFlushPtr != InvalidXLogRecPtr && - loc <= RecentFlushPtr) + if (!XLogRecPtrIsInvalid(RecentFlushPtr) && + !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) return RecentFlushPtr; /* Get a more recent flush pointer. */ @@ -1753,8 +1837,14 @@ WalSndWaitForWal(XLogRecPtr loc) else RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* + * Within the loop, we wait for the necessary WALs to be flushed to disk + * first, followed by waiting for standbys to catch up if there are enough + * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal. + */ for (;;) { + bool wait_for_standby_at_stop = false; long sleeptime; /* Clear any already-pending wakeups */ @@ -1781,21 +1871,35 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); - /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) - RecentFlushPtr = GetFlushRecPtr(NULL); - else - RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* + * To avoid the scenario where standbys need to catch up to a newer + * WAL location in each iteration, we update our idea of the currently + * flushed position only if we are not waiting for standbys to catch + * up. + */ + if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + { + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(NULL); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + } /* - * If postmaster asked us to stop, don't wait anymore. + * If postmaster asked us to stop and the standby slots have caught up + * to the flushed position, don't wait anymore. * * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. */ if (got_STOPPING) - break; + { + if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event)) + wait_for_standby_at_stop = true; + else + break; + } /* * We only send regular messages to the client for full decoded @@ -1810,11 +1914,18 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + /* + * Exit the loop if already caught up and doesn't need to wait for + * standby slots. + */ + if (!wait_for_standby_at_stop && + !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) break; - /* Waiting for new WAL. Since we need to wait, we're now caught up. */ + /* + * Waiting for new WAL or waiting for standbys to catch up. Since we + * need to wait, we're now caught up. + */ WalSndCaughtUp = true; /* @@ -1852,7 +1963,9 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + Assert(wait_event != 0); + + WalSndWait(wakeEvents, sleeptime, wait_event); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2262,6 +2375,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + PhysicalWakeupLogicalWalSnd(); } /* @@ -3535,6 +3649,7 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3604,8 +3719,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. + * + * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV + * until awakened by physical walsenders after the walreceiver confirms + * the receipt of the LSN. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ec2f31f82af..c08e00d1d6a 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -78,6 +78,7 @@ GSS_OPEN_SERVER "Waiting to read data from the client while establishing a GSSAP LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to remote server." LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." +WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 45013582a74..d77214795de 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4670,6 +4670,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("Lists streaming replication standby server slot " + "names that logical WAL sender processes will wait for."), + gettext_noop("Logical WAL sender processes will send decoded " + "changes to plugins only after the specified " + "replication slots confirm receiving WAL."), + GUC_LIST_INPUT + }, + &standby_slot_names, + "", + check_standby_slot_names, assign_standby_slot_names, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index edcc0282b2d..2244ee52f79 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -343,6 +343,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsender processes will wait for # - Standby Servers - |