diff options
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/Makefile | 3 | ||||
-rw-r--r-- | src/backend/access/transam/meson.build | 1 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 6 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 7 | ||||
-rw-r--r-- | src/backend/access/transam/xlogfuncs.c | 116 | ||||
-rw-r--r-- | src/backend/access/transam/xlogrecovery.c | 11 | ||||
-rw-r--r-- | src/backend/access/transam/xlogwait.c | 337 |
7 files changed, 2 insertions, 479 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index a32f473e0a2..661c55a9db7 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -36,8 +36,7 @@ OBJS = \ xlogreader.o \ xlogrecovery.o \ xlogstats.o \ - xlogutils.o \ - xlogwait.o + xlogutils.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index 91d258f9df1..8a3522557cd 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -24,7 +24,6 @@ backend_sources += files( 'xlogrecovery.c', 'xlogstats.c', 'xlogutils.c', - 'xlogwait.c', ) # used by frontend programs to build a frontend xlogreader diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 004f7e10e55..b7ebcc2a557 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,7 +31,6 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" -#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -2827,11 +2826,6 @@ AbortTransaction(void) */ LWLockReleaseAll(); - /* - * Cleanup waiting for LSN if any. - */ - WaitLSNCleanup(); - /* Clear wait information and command progress indicator */ pgstat_report_wait_end(); pgstat_progress_end_command(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f14d3933aec..6f58412bcab 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,7 +62,6 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" -#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -6175,12 +6174,6 @@ StartupXLOG(void) LWLockRelease(ControlFileLock); /* - * Wake up all waiters for replay LSN. They need to report an error that - * recovery was ended before reaching the target LSN. - */ - WaitLSNWakeup(InvalidXLogRecPtr); - - /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) * and after switching SharedRecoveryState to RECOVERY_STATE_DONE so as diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index bca1d395683..b0c6d7c6875 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -22,19 +22,17 @@ #include "access/xlog_internal.h" #include "access/xlogbackup.h" #include "access/xlogrecovery.h" -#include "access/xlogwait.h" #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" #include "pgstat.h" #include "replication/walreceiver.h" #include "storage/fd.h" -#include "storage/proc.h" +#include "storage/latch.h" #include "storage/standby.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" -#include "utils/snapmgr.h" #include "utils/timestamp.h" /* @@ -750,115 +748,3 @@ pg_promote(PG_FUNCTION_ARGS) wait_seconds))); PG_RETURN_BOOL(false); } - -static WaitLSNResult lastWaitLSNResult = WAIT_LSN_RESULT_SUCCESS; - -/* - * Waits until recovery replays the target LSN with optional timeout. Unless - * 'no_error' provided throws an error on failure - */ -Datum -pg_wal_replay_wait(PG_FUNCTION_ARGS) -{ - XLogRecPtr target_lsn = PG_GETARG_LSN(0); - int64 timeout = PG_GETARG_INT64(1); - bool no_error = PG_GETARG_BOOL(2); - - if (timeout < 0) - ereport(ERROR, - (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), - errmsg("\"timeout\" must not be negative"))); - - /* - * We are going to wait for the LSN replay. We should first care that we - * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. - * Otherwise, our snapshot could prevent the replay of WAL records - * implying a kind of self-deadlock. This is the reason why - * pg_wal_replay_wait() is a procedure, not a function. - * - * At first, we should check there is no active snapshot. According to - * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is - * processed with a snapshot. Thankfully, we can pop this snapshot, - * because PortalRunUtility() can tolerate this. - */ - if (ActiveSnapshotSet()) - PopActiveSnapshot(); - - /* - * At second, invalidate a catalog snapshot if any. And we should be done - * with the preparation. - */ - InvalidateCatalogSnapshot(); - - /* Give up if there is still an active or registered snapshot. */ - if (GetOldestSnapshot()) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_wal_replay_wait() must be only called without an active or registered snapshot"), - errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function."))); - - /* - * As the result we should hold no snapshot, and correspondingly our xmin - * should be unset. - */ - Assert(MyProc->xmin == InvalidTransactionId); - - lastWaitLSNResult = WaitForLSNReplay(target_lsn, timeout); - - if (no_error) - PG_RETURN_VOID(); - - /* - * Process the result of WaitForLSNReplay(). Throw appropriate error if - * needed. - */ - switch (lastWaitLSNResult) - { - case WAIT_LSN_RESULT_SUCCESS: - /* Nothing to do on success */ - break; - - case WAIT_LSN_RESULT_TIMEOUT: - ereport(ERROR, - (errcode(ERRCODE_QUERY_CANCELED), - errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", - LSN_FORMAT_ARGS(target_lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))))); - break; - - case WAIT_LSN_RESULT_NOT_IN_RECOVERY: - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.", - LSN_FORMAT_ARGS(target_lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))))); - break; - } - - PG_RETURN_VOID(); -} - -Datum -pg_wal_replay_wait_status(PG_FUNCTION_ARGS) -{ - const char *result_string = ""; - - /* Process the result of WaitForLSNReplay(). */ - switch (lastWaitLSNResult) - { - case WAIT_LSN_RESULT_SUCCESS: - result_string = "success"; - break; - - case WAIT_LSN_RESULT_TIMEOUT: - result_string = "timeout"; - break; - - case WAIT_LSN_RESULT_NOT_IN_RECOVERY: - result_string = "not in recovery"; - break; - } - - PG_RETURN_TEXT_P(cstring_to_text(result_string)); -} diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 869cb524082..05c738d6614 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,7 +40,6 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" -#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" @@ -1829,16 +1828,6 @@ PerformWalRecovery(void) break; } - /* - * If we replayed an LSN that someone was waiting for then walk - * over the shared memory array and set latches to notify the - * waiters. - */ - if (waitLSNState && - (XLogRecoveryCtl->lastReplayedEndRecPtr >= - pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) - WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr); - /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c deleted file mode 100644 index 4c489e4cea3..00000000000 --- a/src/backend/access/transam/xlogwait.c +++ /dev/null @@ -1,337 +0,0 @@ -/*------------------------------------------------------------------------- - * - * xlogwait.c - * Implements waiting for the given replay LSN, which is used in - * CALL pg_wal_replay_wait(target_lsn pg_lsn, - * timeout float8, no_error bool). - * - * Copyright (c) 2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/backend/access/transam/xlogwait.c - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include <float.h> -#include <math.h> - -#include "pgstat.h" -#include "access/xlog.h" -#include "access/xlogrecovery.h" -#include "access/xlogwait.h" -#include "miscadmin.h" -#include "storage/latch.h" -#include "storage/proc.h" -#include "storage/shmem.h" -#include "utils/fmgrprotos.h" -#include "utils/pg_lsn.h" -#include "utils/snapmgr.h" - -static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, - void *arg); - -struct WaitLSNState *waitLSNState = NULL; - -/* Report the amount of shared memory space needed for WaitLSNState. */ -Size -WaitLSNShmemSize(void) -{ - Size size; - - size = offsetof(WaitLSNState, procInfos); - size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); - return size; -} - -/* Initialize the WaitLSNState in the shared memory. */ -void -WaitLSNShmemInit(void) -{ - bool found; - - waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState", - WaitLSNShmemSize(), - &found); - if (!found) - { - pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX); - pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL); - memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); - } -} - -/* - * Comparison function for waitLSN->waitersHeap heap. Waiting processes are - * ordered by lsn, so that the waiter with smallest lsn is at the top. - */ -static int -waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) -{ - const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); - const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); - - if (aproc->waitLSN < bproc->waitLSN) - return 1; - else if (aproc->waitLSN > bproc->waitLSN) - return -1; - else - return 0; -} - -/* - * Update waitLSN->minWaitedLSN according to the current state of - * waitLSN->waitersHeap. - */ -static void -updateMinWaitedLSN(void) -{ - XLogRecPtr minWaitedLSN = PG_UINT64_MAX; - - if (!pairingheap_is_empty(&waitLSNState->waitersHeap)) - { - pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); - - minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; - } - - pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN); -} - -/* - * Put the current process into the heap of LSN waiters. - */ -static void -addLSNWaiter(XLogRecPtr lsn) -{ - WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - Assert(!procInfo->inHeap); - - procInfo->procno = MyProcNumber; - procInfo->waitLSN = lsn; - - pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode); - procInfo->inHeap = true; - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); -} - -/* - * Remove the current process from the heap of LSN waiters if it's there. - */ -static void -deleteLSNWaiter(void) -{ - WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - if (!procInfo->inHeap) - { - LWLockRelease(WaitLSNLock); - return; - } - - pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode); - procInfo->inHeap = false; - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); -} - -/* - * Remove waiters whose LSN has been replayed from the heap and set their - * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap - * and set latches for all waiters. - */ -void -WaitLSNWakeup(XLogRecPtr currentLSN) -{ - int i; - ProcNumber *wakeUpProcs; - int numWakeUpProcs = 0; - - wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends); - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - /* - * Iterate the pairing heap of waiting processes till we find LSN not yet - * replayed. Record the process numbers to wake up, but to avoid holding - * the lock for too long, send the wakeups only after releasing the lock. - */ - while (!pairingheap_is_empty(&waitLSNState->waitersHeap)) - { - pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); - WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); - - if (!XLogRecPtrIsInvalid(currentLSN) && - procInfo->waitLSN > currentLSN) - break; - - wakeUpProcs[numWakeUpProcs++] = procInfo->procno; - (void) pairingheap_remove_first(&waitLSNState->waitersHeap); - procInfo->inHeap = false; - } - - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); - - /* - * Set latches for processes, whose waited LSNs are already replayed. As - * the time consuming operations, we do it this outside of WaitLSNLock. - * This is actually fine because procLatch isn't ever freed, so we just - * can potentially set the wrong process' (or no process') latch. - */ - for (i = 0; i < numWakeUpProcs; i++) - { - SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch); - } - pfree(wakeUpProcs); -} - -/* - * Delete our item from shmem array if any. - */ -void -WaitLSNCleanup(void) -{ - /* - * We do a fast-path check of the 'inHeap' flag without the lock. This - * flag is set to true only by the process itself. So, it's only possible - * to get a false positive. But that will be eliminated by a recheck - * inside deleteLSNWaiter(). - */ - if (waitLSNState->procInfos[MyProcNumber].inHeap) - deleteLSNWaiter(); -} - -/* - * Wait using MyLatch till the given LSN is replayed, the postmaster dies or - * timeout happens. - */ -WaitLSNResult -WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) -{ - XLogRecPtr currentLSN; - TimestampTz endtime = 0; - int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; - - /* Shouldn't be called when shmem isn't initialized */ - Assert(waitLSNState); - - /* Should have a valid proc number */ - Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends); - - if (!RecoveryInProgress()) - { - /* - * Recovery is not in progress. Given that we detected this in the - * very first check, this procedure was mistakenly called on primary. - * However, it's possible that standby was promoted concurrently to - * the procedure call, while target LSN is replayed. So, we still - * check the last replay LSN before reporting an error. - */ - if (targetLSN <= GetXLogReplayRecPtr(NULL)) - return WAIT_LSN_RESULT_SUCCESS; - return WAIT_LSN_RESULT_NOT_IN_RECOVERY; - } - else - { - /* If target LSN is already replayed, exit immediately */ - if (targetLSN <= GetXLogReplayRecPtr(NULL)) - return WAIT_LSN_RESULT_SUCCESS; - } - - if (timeout > 0) - { - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); - wake_events |= WL_TIMEOUT; - } - - /* - * Add our process to the pairing heap of waiters. It might happen that - * target LSN gets replayed before we do. Another check at the beginning - * of the loop below prevents the race condition. - */ - addLSNWaiter(targetLSN); - - for (;;) - { - int rc; - long delay_ms = 0; - - /* Recheck that recovery is still in-progress */ - if (!RecoveryInProgress()) - { - /* - * Recovery was ended, but recheck if target LSN was already - * replayed. See the comment regarding deleteLSNWaiter() below. - */ - deleteLSNWaiter(); - currentLSN = GetXLogReplayRecPtr(NULL); - if (targetLSN <= currentLSN) - return WAIT_LSN_RESULT_SUCCESS; - return WAIT_LSN_RESULT_NOT_IN_RECOVERY; - } - else - { - /* Check if the waited LSN has been replayed */ - currentLSN = GetXLogReplayRecPtr(NULL); - if (targetLSN <= currentLSN) - break; - } - - /* - * If the timeout value is specified, calculate the number of - * milliseconds before the timeout. Exit if the timeout is already - * reached. - */ - if (timeout > 0) - { - delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime); - if (delay_ms <= 0) - break; - } - - CHECK_FOR_INTERRUPTS(); - - rc = WaitLatch(MyLatch, wake_events, delay_ms, - WAIT_EVENT_WAIT_FOR_WAL_REPLAY); - - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (rc & WL_POSTMASTER_DEATH) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating connection due to unexpected postmaster exit"), - errcontext("while waiting for LSN replay"))); - - if (rc & WL_LATCH_SET) - ResetLatch(MyLatch); - } - - /* - * Delete our process from the shared memory pairing heap. We might - * already be deleted by the startup process. The 'inHeap' flag prevents - * us from the double deletion. - */ - deleteLSNWaiter(); - - /* - * If we didn't reach the target LSN, we must be exited by timeout. - */ - if (targetLSN > currentLSN) - return WAIT_LSN_RESULT_TIMEOUT; - - return WAIT_LSN_RESULT_SUCCESS; -} |