/*------------------------------------------------------------------------- * * waitlsn.c * Implements waiting for the given replay LSN, which is used in * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). * * Copyright (c) 2024, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/commands/waitlsn.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include "pgstat.h" #include "access/xlog.h" #include "access/xlogrecovery.h" #include "commands/waitlsn.h" #include "funcapi.h" #include "miscadmin.h" #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" #include "utils/fmgrprotos.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" #include "utils/wait_event_types.h" static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg); struct WaitLSNState *waitLSN = NULL; /* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) { Size size; size = offsetof(WaitLSNState, procInfos); size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); return size; } /* Initialize the WaitLSNState in the shared memory. */ void WaitLSNShmemInit(void) { bool found; waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState", WaitLSNShmemSize(), &found); if (!found) { pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); } } /* * Comparison function for waitLSN->waitersHeap heap. Waiting processes are * ordered by lsn, so that the waiter with smallest lsn is at the top. */ static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) { const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); if (aproc->waitLSN < bproc->waitLSN) return 1; else if (aproc->waitLSN > bproc->waitLSN) return -1; else return 0; } /* * Update waitLSN->minWaitedLSN according to the current state of * waitLSN->waitersHeap. */ static void updateMinWaitedLSN(void) { XLogRecPtr minWaitedLSN = PG_UINT64_MAX; if (!pairingheap_is_empty(&waitLSN->waitersHeap)) { pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; } pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN); } /* * Put the current process into the heap of LSN waiters. */ static void addLSNWaiter(XLogRecPtr lsn) { WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); Assert(!procInfo->inHeap); procInfo->procnum = MyProcNumber; procInfo->waitLSN = lsn; pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); procInfo->inHeap = true; updateMinWaitedLSN(); LWLockRelease(WaitLSNLock); } /* * Remove the current process from the heap of LSN waiters if it's there. */ static void deleteLSNWaiter(void) { WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); if (!procInfo->inHeap) { LWLockRelease(WaitLSNLock); return; } pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode); procInfo->inHeap = false; updateMinWaitedLSN(); LWLockRelease(WaitLSNLock); } /* * Set latches of LSN waiters whose LSN has been replayed. Set latches of all * LSN waiters when InvalidXLogRecPtr is given. */ void WaitLSNSetLatches(XLogRecPtr currentLSN) { int i; int *wakeUpProcNums; int numWakeUpProcs = 0; wakeUpProcNums = palloc(sizeof(int) * MaxBackends); LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); /* * Iterate the pairing heap of waiting processes till we find LSN not yet * replayed. Record the process numbers to set their latches later. */ while (!pairingheap_is_empty(&waitLSN->waitersHeap)) { pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN) break; wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum; (void) pairingheap_remove_first(&waitLSN->waitersHeap); procInfo->inHeap = false; } updateMinWaitedLSN(); LWLockRelease(WaitLSNLock); /* * Set latches for processes, whose waited LSNs are already replayed. This * involves spinlocks. So, we shouldn't do this under a spinlock. */ for (i = 0; i < numWakeUpProcs; i++) { PGPROC *backend; backend = GetPGProcByNumber(wakeUpProcNums[i]); SetLatch(&backend->procLatch); } pfree(wakeUpProcNums); } /* * Delete our item from shmem array if any. */ void WaitLSNCleanup(void) { /* * We do a fast-path check of the 'inHeap' flag without the lock. This * flag is set to true only by the process itself. So, it's only possible * to get a false positive. But that will be eliminated by a recheck * inside deleteLSNWaiter(). */ if (waitLSN->procInfos[MyProcNumber].inHeap) deleteLSNWaiter(); } /* * Wait using MyLatch till the given LSN is replayed, the postmaster dies or * timeout happens. */ void WaitForLSN(XLogRecPtr targetLSN, int64 timeout) { XLogRecPtr currentLSN; TimestampTz endtime = 0; /* Shouldn't be called when shmem isn't initialized */ Assert(waitLSN); /* Should be only called by a backend */ Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends); if (!RecoveryInProgress()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), errhint("Waiting for LSN can only be executed during recovery."))); /* If target LSN is already replayed, exit immediately */ if (targetLSN <= GetXLogReplayRecPtr(NULL)) return; if (timeout > 0) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); addLSNWaiter(targetLSN); for (;;) { int rc; int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; long delay_ms = 0; /* Check if the waited LSN has been replayed */ currentLSN = GetXLogReplayRecPtr(NULL); if (targetLSN <= currentLSN) break; /* Recheck that recovery is still in-progress */ if (!RecoveryInProgress()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.", LSN_FORMAT_ARGS(targetLSN), LSN_FORMAT_ARGS(currentLSN)))); if (timeout > 0) { delay_ms = (endtime - GetCurrentTimestamp()) / 1000; latch_events |= WL_TIMEOUT; if (delay_ms <= 0) break; } CHECK_FOR_INTERRUPTS(); rc = WaitLatch(MyLatch, latch_events, delay_ms, WAIT_EVENT_WAIT_FOR_WAL_REPLAY); if (rc & WL_LATCH_SET) ResetLatch(MyLatch); } if (targetLSN > currentLSN) { deleteLSNWaiter(); ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", LSN_FORMAT_ARGS(targetLSN), LSN_FORMAT_ARGS(currentLSN)))); } } Datum pg_wal_replay_wait(PG_FUNCTION_ARGS) { XLogRecPtr target_lsn = PG_GETARG_LSN(0); int64 timeout = PG_GETARG_INT64(1); CallContext *context = (CallContext *) fcinfo->context; if (timeout < 0) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("\"timeout\" must not be negative"))); /* * We are going to wait for the LSN replay. We should first care that we * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. * Otherwise, our snapshot could prevent the replay of WAL records * implying a kind of self-deadlock. This is the reason why * pg_wal_replay_wait() is a procedure, not a function. * * At first, we check that pg_wal_replay_wait() is called in a non-atomic * context. That is, a procedure call isn't wrapped into a transaction, * another procedure call, or a function call. * * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic * context, CallStmt is processed with a snapshot. Thankfully, we can pop * this snapshot, because PortalRunUtility() can tolerate this. */ if (context->atomic) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_wal_replay_wait() must be only called in non-atomic context"), errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function."))); if (ActiveSnapshotSet()) PopActiveSnapshot(); Assert(!ActiveSnapshotSet()); InvalidateCatalogSnapshot(); Assert(MyProc->xmin == InvalidTransactionId); (void) WaitForLSN(target_lsn, timeout); PG_RETURN_VOID(); }