diff options
author | Alexander Korotkov <akorotkov@postgresql.org> | 2024-04-03 18:15:17 +0300 |
---|---|---|
committer | Alexander Korotkov <akorotkov@postgresql.org> | 2024-04-03 18:15:41 +0300 |
commit | bf1e65080629e2b0ac47ffe245576da96eff8420 (patch) | |
tree | d6eae6fbb978d03d72388d54a2ec86848a8979dd /src/backend/commands/waitlsn.c | |
parent | 936e3fa3787a51397280c1081587586e83c20399 (diff) | |
download | postgresql-bf1e65080629e2b0ac47ffe245576da96eff8420.tar.gz postgresql-bf1e65080629e2b0ac47ffe245576da96eff8420.zip |
Use the pairing heap instead of a flat array for LSN replay waiters
06c418e163 introduced pg_wal_replay_wait() procedure allowing to wait for
the particular LSN to be replayed on standby. The waiters were stored in
the flat array. Even though scanning small arrays is fast, that might be a
problem at scale (a lot of waiting processes).
This commit replaces the flat shared memory array with the pairing heap,
which holds the waiter with the least LSN at the top. This gives us O(log N)
complexity for both inserting and removing waiters.
Reported-by: Alvaro Herrera
Discussion: https://postgr.es/m/202404030658.hhj3vfxeyhft%40alvherre.pgsql
Diffstat (limited to 'src/backend/commands/waitlsn.c')
-rw-r--r-- | src/backend/commands/waitlsn.c | 168 |
1 files changed, 81 insertions, 87 deletions
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c index 63e9ebf1730..51a34d422e2 100644 --- a/src/backend/commands/waitlsn.c +++ b/src/backend/commands/waitlsn.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * waitlsn.c - * Implements waiting for the given LSN, which is used in + * Implements waiting for the given replay LSN, which is used in * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). * * Copyright (c) 2024, PostgreSQL Global Development Group @@ -26,21 +26,17 @@ #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" +#include "utils/fmgrprotos.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" -#include "utils/fmgrprotos.h" #include "utils/wait_event_types.h" -/* Add to / delete from shared memory array */ -static void addLSNWaiter(XLogRecPtr lsn); -static void deleteLSNWaiter(void); +static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); struct WaitLSNState *waitLSN = NULL; -static volatile sig_atomic_t haveShmemItem = false; -/* - * Report the amount of shared memory space needed for WaitLSNState - */ +/* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) { @@ -51,7 +47,7 @@ WaitLSNShmemSize(void) return size; } -/* Initialize the WaitLSNState in the shared memory */ +/* Initialize the WaitLSNState in the shared memory. */ void WaitLSNShmemInit(void) { @@ -62,81 +58,93 @@ WaitLSNShmemInit(void) &found); if (!found) { - SpinLockInit(&waitLSN->mutex); - waitLSN->numWaitedProcs = 0; - pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX); + SpinLockInit(&waitLSN->waitersHeapMutex); + pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); + memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); } } /* - * Add the information about the LSN waiter backend to the shared memory - * array. + * Comparison function for waitLSN->waitersHeap heap. Waiting processes are + * ordered by lsn, so that the waiter with smallest lsn is at the top. */ -static void -addLSNWaiter(XLogRecPtr lsn) +static int +lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) { - WaitLSNProcInfo cur; - int i; + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); - cur.procnum = MyProcNumber; - cur.waitLSN = lsn; + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} - SpinLockAcquire(&waitLSN->mutex); +/* + * Update waitLSN->minWaitedLSN according to the current state of + * waitLSN->waitersHeap. + */ +static void +updateMinWaitedLSN(void) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; - for (i = 0; i < waitLSN->numWaitedProcs; i++) + if (!pairingheap_is_empty(&waitLSN->waitersHeap)) { - if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN) - { - WaitLSNProcInfo tmp; + pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); - tmp = waitLSN->procInfos[i]; - waitLSN->procInfos[i] = cur; - cur = tmp; - } + minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; } - waitLSN->procInfos[i] = cur; - waitLSN->numWaitedProcs++; - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - SpinLockRelease(&waitLSN->mutex); + pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN); } /* - * Delete the information about the LSN waiter backend from the shared memory - * array. + * Put the current process into the heap of LSN waiters. */ static void -deleteLSNWaiter(void) +addLSNWaiter(XLogRecPtr lsn) { - int i; - bool found = false; + WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; - SpinLockAcquire(&waitLSN->mutex); + Assert(!procInfo->inHeap); - for (i = 0; i < waitLSN->numWaitedProcs; i++) - { - if (waitLSN->procInfos[i].procnum == MyProcNumber) - found = true; + procInfo->procnum = MyProcNumber; + procInfo->waitLSN = lsn; - if (found && i < waitLSN->numWaitedProcs - 1) - { - waitLSN->procInfos[i] = waitLSN->procInfos[i + 1]; - } - } + SpinLockAcquire(&waitLSN->waitersHeapMutex); - if (!found) + pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); + procInfo->inHeap = true; + updateMinWaitedLSN(); + + SpinLockRelease(&waitLSN->waitersHeapMutex); +} + +/* + * Remove the current process from the heap of LSN waiters if it's there. + */ +static void +deleteLSNWaiter(void) +{ + WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; + + SpinLockAcquire(&waitLSN->waitersHeapMutex); + + if (!procInfo->inHeap) { - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); return; } - waitLSN->numWaitedProcs--; - if (waitLSN->numWaitedProcs != 0) - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - else - pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode); + procInfo->inHeap = false; + updateMinWaitedLSN(); - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); } /* @@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) { int i; int *wakeUpProcNums; - int numWakeUpProcs; + int numWakeUpProcs = 0; wakeUpProcNums = palloc(sizeof(int) * MaxBackends); - SpinLockAcquire(&waitLSN->mutex); + SpinLockAcquire(&waitLSN->waitersHeapMutex); /* - * Remember processes, whose waited LSNs are already replayed. We should - * set their latches later after spinlock release. + * Iterate the pairing heap of waiting processes till we find LSN not yet + * replayed. Record the process numbers to set their latches later. */ - for (i = 0; i < waitLSN->numWaitedProcs; i++) + while (!pairingheap_is_empty(&waitLSN->waitersHeap)) { + pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); + if (!XLogRecPtrIsInvalid(currentLSN) && - waitLSN->procInfos[i].waitLSN > currentLSN) + procInfo->waitLSN > currentLSN) break; - wakeUpProcNums[i] = waitLSN->procInfos[i].procnum; + wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum; + (void) pairingheap_remove_first(&waitLSN->waitersHeap); + procInfo->inHeap = false; } - /* - * Immediately remove those processes from the shmem array. Otherwise, - * shmem array items will be here till corresponding processes wake up and - * delete themselves. - */ - numWakeUpProcs = i; - for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++) - waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs]; - waitLSN->numWaitedProcs -= numWakeUpProcs; - - if (waitLSN->numWaitedProcs != 0) - pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); - else - pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); + updateMinWaitedLSN(); - SpinLockRelease(&waitLSN->mutex); + SpinLockRelease(&waitLSN->waitersHeapMutex); /* * Set latches for processes, whose waited LSNs are already replayed. This @@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) void WaitLSNCleanup(void) { - if (haveShmemItem) + if (waitLSN->procInfos[MyProcNumber].inHeap) deleteLSNWaiter(); } @@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) Assert(waitLSN); /* Should be only called by a backend */ - Assert(MyBackendType == B_BACKEND); + Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends); if (!RecoveryInProgress()) ereport(ERROR, @@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); addLSNWaiter(targetLSN); - haveShmemItem = true; for (;;) { @@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout) if (targetLSN > currentLSN) { deleteLSNWaiter(); - haveShmemItem = false; ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", LSN_FORMAT_ARGS(targetLSN), LSN_FORMAT_ARGS(currentLSN)))); } - else - { - haveShmemItem = false; - } } Datum |