aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/Makefile3
-rw-r--r--src/backend/access/transam/meson.build1
-rw-r--r--src/backend/access/transam/xact.c6
-rw-r--r--src/backend/access/transam/xlog.c7
-rw-r--r--src/backend/access/transam/xlogfuncs.c116
-rw-r--r--src/backend/access/transam/xlogrecovery.c11
-rw-r--r--src/backend/access/transam/xlogwait.c337
7 files changed, 2 insertions, 479 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..661c55a9db7 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -36,8 +36,7 @@ OBJS = \
xlogreader.o \
xlogrecovery.o \
xlogstats.o \
- xlogutils.o \
- xlogwait.o
+ xlogutils.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 91d258f9df1..8a3522557cd 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -24,7 +24,6 @@ backend_sources += files(
'xlogrecovery.c',
'xlogstats.c',
'xlogutils.c',
- 'xlogwait.c',
)
# used by frontend programs to build a frontend xlogreader
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 004f7e10e55..b7ebcc2a557 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -31,7 +31,6 @@
#include "access/xloginsert.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
-#include "access/xlogwait.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_enum.h"
@@ -2827,11 +2826,6 @@ AbortTransaction(void)
*/
LWLockReleaseAll();
- /*
- * Cleanup waiting for LSN if any.
- */
- WaitLSNCleanup();
-
/* Clear wait information and command progress indicator */
pgstat_report_wait_end();
pgstat_progress_end_command();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f14d3933aec..6f58412bcab 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -62,7 +62,6 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
-#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
@@ -6175,12 +6174,6 @@ StartupXLOG(void)
LWLockRelease(ControlFileLock);
/*
- * Wake up all waiters for replay LSN. They need to report an error that
- * recovery was ended before reaching the target LSN.
- */
- WaitLSNWakeup(InvalidXLogRecPtr);
-
- /*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
* and after switching SharedRecoveryState to RECOVERY_STATE_DONE so as
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index bca1d395683..b0c6d7c6875 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -22,19 +22,17 @@
#include "access/xlog_internal.h"
#include "access/xlogbackup.h"
#include "access/xlogrecovery.h"
-#include "access/xlogwait.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/fd.h"
-#include "storage/proc.h"
+#include "storage/latch.h"
#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
-#include "utils/snapmgr.h"
#include "utils/timestamp.h"
/*
@@ -750,115 +748,3 @@ pg_promote(PG_FUNCTION_ARGS)
wait_seconds)));
PG_RETURN_BOOL(false);
}
-
-static WaitLSNResult lastWaitLSNResult = WAIT_LSN_RESULT_SUCCESS;
-
-/*
- * Waits until recovery replays the target LSN with optional timeout. Unless
- * 'no_error' provided throws an error on failure
- */
-Datum
-pg_wal_replay_wait(PG_FUNCTION_ARGS)
-{
- XLogRecPtr target_lsn = PG_GETARG_LSN(0);
- int64 timeout = PG_GETARG_INT64(1);
- bool no_error = PG_GETARG_BOOL(2);
-
- if (timeout < 0)
- ereport(ERROR,
- (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
- errmsg("\"timeout\" must not be negative")));
-
- /*
- * We are going to wait for the LSN replay. We should first care that we
- * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
- * Otherwise, our snapshot could prevent the replay of WAL records
- * implying a kind of self-deadlock. This is the reason why
- * pg_wal_replay_wait() is a procedure, not a function.
- *
- * At first, we should check there is no active snapshot. According to
- * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
- * processed with a snapshot. Thankfully, we can pop this snapshot,
- * because PortalRunUtility() can tolerate this.
- */
- if (ActiveSnapshotSet())
- PopActiveSnapshot();
-
- /*
- * At second, invalidate a catalog snapshot if any. And we should be done
- * with the preparation.
- */
- InvalidateCatalogSnapshot();
-
- /* Give up if there is still an active or registered snapshot. */
- if (GetOldestSnapshot())
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("pg_wal_replay_wait() must be only called without an active or registered snapshot"),
- errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.")));
-
- /*
- * As the result we should hold no snapshot, and correspondingly our xmin
- * should be unset.
- */
- Assert(MyProc->xmin == InvalidTransactionId);
-
- lastWaitLSNResult = WaitForLSNReplay(target_lsn, timeout);
-
- if (no_error)
- PG_RETURN_VOID();
-
- /*
- * Process the result of WaitForLSNReplay(). Throw appropriate error if
- * needed.
- */
- switch (lastWaitLSNResult)
- {
- case WAIT_LSN_RESULT_SUCCESS:
- /* Nothing to do on success */
- break;
-
- case WAIT_LSN_RESULT_TIMEOUT:
- ereport(ERROR,
- (errcode(ERRCODE_QUERY_CANCELED),
- errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
- LSN_FORMAT_ARGS(target_lsn),
- LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))));
- break;
-
- case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("recovery is not in progress"),
- errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
- LSN_FORMAT_ARGS(target_lsn),
- LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))));
- break;
- }
-
- PG_RETURN_VOID();
-}
-
-Datum
-pg_wal_replay_wait_status(PG_FUNCTION_ARGS)
-{
- const char *result_string = "";
-
- /* Process the result of WaitForLSNReplay(). */
- switch (lastWaitLSNResult)
- {
- case WAIT_LSN_RESULT_SUCCESS:
- result_string = "success";
- break;
-
- case WAIT_LSN_RESULT_TIMEOUT:
- result_string = "timeout";
- break;
-
- case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
- result_string = "not in recovery";
- break;
- }
-
- PG_RETURN_TEXT_P(cstring_to_text(result_string));
-}
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 869cb524082..05c738d6614 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,7 +40,6 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
-#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
@@ -1829,16 +1828,6 @@ PerformWalRecovery(void)
break;
}
- /*
- * If we replayed an LSN that someone was waiting for then walk
- * over the shared memory array and set latches to notify the
- * waiters.
- */
- if (waitLSNState &&
- (XLogRecoveryCtl->lastReplayedEndRecPtr >=
- pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
- WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr);
-
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
deleted file mode 100644
index 4c489e4cea3..00000000000
--- a/src/backend/access/transam/xlogwait.c
+++ /dev/null
@@ -1,337 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * xlogwait.c
- * Implements waiting for the given replay LSN, which is used in
- * CALL pg_wal_replay_wait(target_lsn pg_lsn,
- * timeout float8, no_error bool).
- *
- * Copyright (c) 2024, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/access/transam/xlogwait.c
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include <float.h>
-#include <math.h>
-
-#include "pgstat.h"
-#include "access/xlog.h"
-#include "access/xlogrecovery.h"
-#include "access/xlogwait.h"
-#include "miscadmin.h"
-#include "storage/latch.h"
-#include "storage/proc.h"
-#include "storage/shmem.h"
-#include "utils/fmgrprotos.h"
-#include "utils/pg_lsn.h"
-#include "utils/snapmgr.h"
-
-static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
- void *arg);
-
-struct WaitLSNState *waitLSNState = NULL;
-
-/* Report the amount of shared memory space needed for WaitLSNState. */
-Size
-WaitLSNShmemSize(void)
-{
- Size size;
-
- size = offsetof(WaitLSNState, procInfos);
- size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
- return size;
-}
-
-/* Initialize the WaitLSNState in the shared memory. */
-void
-WaitLSNShmemInit(void)
-{
- bool found;
-
- waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
- WaitLSNShmemSize(),
- &found);
- if (!found)
- {
- pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX);
- pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL);
- memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
- }
-}
-
-/*
- * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
- * ordered by lsn, so that the waiter with smallest lsn is at the top.
- */
-static int
-waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
-{
- const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
- const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
-
- if (aproc->waitLSN < bproc->waitLSN)
- return 1;
- else if (aproc->waitLSN > bproc->waitLSN)
- return -1;
- else
- return 0;
-}
-
-/*
- * Update waitLSN->minWaitedLSN according to the current state of
- * waitLSN->waitersHeap.
- */
-static void
-updateMinWaitedLSN(void)
-{
- XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
-
- if (!pairingheap_is_empty(&waitLSNState->waitersHeap))
- {
- pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
-
- minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
- }
-
- pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN);
-}
-
-/*
- * Put the current process into the heap of LSN waiters.
- */
-static void
-addLSNWaiter(XLogRecPtr lsn)
-{
- WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
-
- LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
-
- Assert(!procInfo->inHeap);
-
- procInfo->procno = MyProcNumber;
- procInfo->waitLSN = lsn;
-
- pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
- procInfo->inHeap = true;
- updateMinWaitedLSN();
-
- LWLockRelease(WaitLSNLock);
-}
-
-/*
- * Remove the current process from the heap of LSN waiters if it's there.
- */
-static void
-deleteLSNWaiter(void)
-{
- WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
-
- LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
-
- if (!procInfo->inHeap)
- {
- LWLockRelease(WaitLSNLock);
- return;
- }
-
- pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode);
- procInfo->inHeap = false;
- updateMinWaitedLSN();
-
- LWLockRelease(WaitLSNLock);
-}
-
-/*
- * Remove waiters whose LSN has been replayed from the heap and set their
- * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
- * and set latches for all waiters.
- */
-void
-WaitLSNWakeup(XLogRecPtr currentLSN)
-{
- int i;
- ProcNumber *wakeUpProcs;
- int numWakeUpProcs = 0;
-
- wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
-
- LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
-
- /*
- * Iterate the pairing heap of waiting processes till we find LSN not yet
- * replayed. Record the process numbers to wake up, but to avoid holding
- * the lock for too long, send the wakeups only after releasing the lock.
- */
- while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
- {
- pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
- WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
-
- if (!XLogRecPtrIsInvalid(currentLSN) &&
- procInfo->waitLSN > currentLSN)
- break;
-
- wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
- (void) pairingheap_remove_first(&waitLSNState->waitersHeap);
- procInfo->inHeap = false;
- }
-
- updateMinWaitedLSN();
-
- LWLockRelease(WaitLSNLock);
-
- /*
- * Set latches for processes, whose waited LSNs are already replayed. As
- * the time consuming operations, we do it this outside of WaitLSNLock.
- * This is actually fine because procLatch isn't ever freed, so we just
- * can potentially set the wrong process' (or no process') latch.
- */
- for (i = 0; i < numWakeUpProcs; i++)
- {
- SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
- }
- pfree(wakeUpProcs);
-}
-
-/*
- * Delete our item from shmem array if any.
- */
-void
-WaitLSNCleanup(void)
-{
- /*
- * We do a fast-path check of the 'inHeap' flag without the lock. This
- * flag is set to true only by the process itself. So, it's only possible
- * to get a false positive. But that will be eliminated by a recheck
- * inside deleteLSNWaiter().
- */
- if (waitLSNState->procInfos[MyProcNumber].inHeap)
- deleteLSNWaiter();
-}
-
-/*
- * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
- * timeout happens.
- */
-WaitLSNResult
-WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
-{
- XLogRecPtr currentLSN;
- TimestampTz endtime = 0;
- int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
-
- /* Shouldn't be called when shmem isn't initialized */
- Assert(waitLSNState);
-
- /* Should have a valid proc number */
- Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
-
- if (!RecoveryInProgress())
- {
- /*
- * Recovery is not in progress. Given that we detected this in the
- * very first check, this procedure was mistakenly called on primary.
- * However, it's possible that standby was promoted concurrently to
- * the procedure call, while target LSN is replayed. So, we still
- * check the last replay LSN before reporting an error.
- */
- if (targetLSN <= GetXLogReplayRecPtr(NULL))
- return WAIT_LSN_RESULT_SUCCESS;
- return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
- }
- else
- {
- /* If target LSN is already replayed, exit immediately */
- if (targetLSN <= GetXLogReplayRecPtr(NULL))
- return WAIT_LSN_RESULT_SUCCESS;
- }
-
- if (timeout > 0)
- {
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
- wake_events |= WL_TIMEOUT;
- }
-
- /*
- * Add our process to the pairing heap of waiters. It might happen that
- * target LSN gets replayed before we do. Another check at the beginning
- * of the loop below prevents the race condition.
- */
- addLSNWaiter(targetLSN);
-
- for (;;)
- {
- int rc;
- long delay_ms = 0;
-
- /* Recheck that recovery is still in-progress */
- if (!RecoveryInProgress())
- {
- /*
- * Recovery was ended, but recheck if target LSN was already
- * replayed. See the comment regarding deleteLSNWaiter() below.
- */
- deleteLSNWaiter();
- currentLSN = GetXLogReplayRecPtr(NULL);
- if (targetLSN <= currentLSN)
- return WAIT_LSN_RESULT_SUCCESS;
- return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
- }
- else
- {
- /* Check if the waited LSN has been replayed */
- currentLSN = GetXLogReplayRecPtr(NULL);
- if (targetLSN <= currentLSN)
- break;
- }
-
- /*
- * If the timeout value is specified, calculate the number of
- * milliseconds before the timeout. Exit if the timeout is already
- * reached.
- */
- if (timeout > 0)
- {
- delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
- if (delay_ms <= 0)
- break;
- }
-
- CHECK_FOR_INTERRUPTS();
-
- rc = WaitLatch(MyLatch, wake_events, delay_ms,
- WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (rc & WL_POSTMASTER_DEATH)
- ereport(FATAL,
- (errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("terminating connection due to unexpected postmaster exit"),
- errcontext("while waiting for LSN replay")));
-
- if (rc & WL_LATCH_SET)
- ResetLatch(MyLatch);
- }
-
- /*
- * Delete our process from the shared memory pairing heap. We might
- * already be deleted by the startup process. The 'inHeap' flag prevents
- * us from the double deletion.
- */
- deleteLSNWaiter();
-
- /*
- * If we didn't reach the target LSN, we must be exited by timeout.
- */
- if (targetLSN > currentLSN)
- return WAIT_LSN_RESULT_TIMEOUT;
-
- return WAIT_LSN_RESULT_SUCCESS;
-}