diff options
Diffstat (limited to 'src/backend/commands/waitlsn.c')
-rw-r--r-- | src/backend/commands/waitlsn.c | 168 |
1 files changed, 81 insertions, 87 deletions
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c index 63e9ebf1730..51a34d422e2 100644 --- a/src/backend/commands/waitlsn.c +++ b/src/backend/commands/waitlsn.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * waitlsn.c - * Implements waiting for the given LSN, which is used in + * Implements waiting for the given replay LSN, which is used in * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). * * Copyright (c) 2024, PostgreSQL Global Development Group @@ -26,21 +26,17 @@ #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" +#include "utils/fmgrprotos.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" -#include "utils/fmgrprotos.h" #include "utils/wait_event_types.h" -/* Add to / delete from shared memory array */ -static void addLSNWaiter(XLogRecPtr lsn); -static void deleteLSNWaiter(void); +static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); struct WaitLSNState *waitLSN = NULL; -static volatile sig_atomic_t haveShmemItem = false; -/* - * Report the amount of shared memory space needed for WaitLSNState - */ +/* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) { @@ -51,7 +47,7 @@ WaitLSNShmemSize(void) return size; } -/* Initialize the WaitLSNState in the shared memory */ +/* Initialize the WaitLSNState in the shared memory. */ void WaitLSNShmemInit(void) { @@ -62,81 +58,93 @@ WaitLSNShmemInit(void) &found); if (!found) { - SpinLockInit(&waitLSN->mutex); - waitLSN->numWaitedProcs = 0; - pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX); + SpinLockInit(&waitLSN->waitersHeapMutex); + pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); + memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); } } /* - * Add the information about the LSN waiter backend to the shared memory - * array. + * Comparison function for waitLSN->waitersHeap heap. Waiting processes are + * ordered by lsn, so that the waiter with smallest lsn is at the top. */ -static void -addLSNWaiter(XLogRecPtr lsn) +static int +lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) { - WaitLSNProcInfo cur; - int i; + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); - cur.procnum = MyProcNumber; - cur.waitLSN = lsn; + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} - SpinLockAcquire(&waitLSN->mutex); +/* + * Update waitLSN->minWaitedLSN according to the current state of + * waitLSN->waitersHeap. + */ +static void +updateMinWaitedLSN(void) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; - for (i = 0; i < waitLSN->numWaitedProcs; i++) + if (!pairingheap_is_empty(&waitLSN->waitersHeap)) { - if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN) - { - WaitLSNProcInfo tmp; + pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); - tmp = waitLSN->procInfos[i]; - waitLSN->procInfos[i] = cur; - cur = tmp; - } + minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; } - waitLSN->procInfos[i] = cur; - waitLSN->numWaitedProcs++; - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - SpinLockRelease(&waitLSN->mutex); + pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN); } /* - * Delete the information about the LSN waiter backend from the shared memory - * array. + * Put the current process into the heap of LSN waiters. */ static void -deleteLSNWaiter(void) +addLSNWaiter(XLogRecPtr lsn) { - int i; - bool found = false; + WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; - SpinLockAcquire(&waitLSN->mutex); + Assert(!procInfo->inHeap); - for (i = 0; i < waitLSN->numWaitedProcs; i++) - { - if (waitLSN->procInfos[i].procnum == MyProcNumber) - found = true; + procInfo->procnum = MyProcNumber; + procInfo->waitLSN = lsn; - if (found && i < waitLSN->numWaitedProcs - 1) - { - waitLSN->procInfos[i] = waitLSN->procInfos[i + 1]; - } - } + SpinLockAcquire(&waitLSN->waitersHeapMutex); - if (!found) + pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); + procInfo->inHeap = true; + updateMinWaitedLSN(); + + SpinLockRelease(&waitLSN->waitersHeapMutex); +} + +/* + * Remove the current process from the heap of LSN waiters if it's there. + */ +static void +deleteLSNWaiter(void) +{ + WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; + + SpinLockAcquire(&waitLSN->waitersHeapMutex); + + if (!procInfo->inHeap) { - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); return; } - waitLSN->numWaitedProcs--; - if (waitLSN->numWaitedProcs != 0) - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - else - pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode); + procInfo->inHeap = false; + updateMinWaitedLSN(); - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); } /* @@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) { int i; int *wakeUpProcNums; - int numWakeUpProcs; + int numWakeUpProcs = 0; wakeUpProcNums = palloc(sizeof(int) * MaxBackends); - SpinLockAcquire(&waitLSN->mutex); + SpinLockAcquire(&waitLSN->waitersHeapMutex); /* - * Remember processes, whose waited LSNs are already replayed. We should - * set their latches later after spinlock release. + * Iterate the pairing heap of waiting processes till we find LSN not yet + * replayed. Record the process numbers to set their latches later. */ - for (i = 0; i < waitLSN->numWaitedProcs; i++) + while (!pairingheap_is_empty(&waitLSN->waitersHeap)) { + pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); + if (!XLogRecPtrIsInvalid(currentLSN) && - waitLSN->procInfos[i].waitLSN > currentLSN) + procInfo->waitLSN > currentLSN) break; - wakeUpProcNums[i] = waitLSN->procInfos[i].procnum; + wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum; + (void) pairingheap_remove_first(&waitLSN->waitersHeap); + procInfo->inHeap = false; } - /* - * Immediately remove those processes from the shmem array. Otherwise, - * shmem array items will be here till corresponding processes wake up and - * delete themselves. - */ - numWakeUpProcs = i; - for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++) - waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs]; - waitLSN->numWaitedProcs -= numWakeUpProcs; - - if (waitLSN->numWaitedProcs != 0) - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - else - pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + updateMinWaitedLSN(); - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); /* * Set latches for processes, whose waited LSNs are already replayed. This @@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) void WaitLSNCleanup(void) { - if (haveShmemItem) + if (waitLSN->procInfos[MyProcNumber].inHeap) deleteLSNWaiter(); } @@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) Assert(waitLSN); /* Should be only called by a backend */ - Assert(MyBackendType == B_BACKEND); + Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends); if (!RecoveryInProgress()) ereport(ERROR, @@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); addLSNWaiter(targetLSN); - haveShmemItem = true; for (;;) { @@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) if (targetLSN > currentLSN) { deleteLSNWaiter(); - haveShmemItem = false; ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", LSN_FORMAT_ARGS(targetLSN), LSN_FORMAT_ARGS(currentLSN)))); } - else - { - haveShmemItem = false; - } } Datum |