aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xlogrecovery.c2
-rw-r--r--src/backend/commands/waitlsn.c168
-rw-r--r--src/backend/lib/pairingheap.c18
-rw-r--r--src/include/commands/waitlsn.h44
-rw-r--r--src/include/lib/pairingheap.h3
5 files changed, 140 insertions, 95 deletions
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 24ab1b2b213..b2fe2d04ccf 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1836,7 +1836,7 @@ PerformWalRecovery(void)
*/
if (waitLSN &&
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
- pg_atomic_read_u64(&waitLSN->minLSN)))
+ pg_atomic_read_u64(&waitLSN->minWaitedLSN)))
WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
/* Else, try to fetch the next WAL record */
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
index 63e9ebf1730..51a34d422e2 100644
--- a/src/backend/commands/waitlsn.c
+++ b/src/backend/commands/waitlsn.c
@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* waitlsn.c
- * Implements waiting for the given LSN, which is used in
+ * Implements waiting for the given replay LSN, which is used in
* CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
*
* Copyright (c) 2024, PostgreSQL Global Development Group
@@ -26,21 +26,17 @@
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
-#include "utils/fmgrprotos.h"
#include "utils/wait_event_types.h"
-/* Add to / delete from shared memory array */
-static void addLSNWaiter(XLogRecPtr lsn);
-static void deleteLSNWaiter(void);
+static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+ void *arg);
struct WaitLSNState *waitLSN = NULL;
-static volatile sig_atomic_t haveShmemItem = false;
-/*
- * Report the amount of shared memory space needed for WaitLSNState
- */
+/* Report the amount of shared memory space needed for WaitLSNState. */
Size
WaitLSNShmemSize(void)
{
@@ -51,7 +47,7 @@ WaitLSNShmemSize(void)
return size;
}
-/* Initialize the WaitLSNState in the shared memory */
+/* Initialize the WaitLSNState in the shared memory. */
void
WaitLSNShmemInit(void)
{
@@ -62,81 +58,93 @@ WaitLSNShmemInit(void)
&found);
if (!found)
{
- SpinLockInit(&waitLSN->mutex);
- waitLSN->numWaitedProcs = 0;
- pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+ SpinLockInit(&waitLSN->waitersHeapMutex);
+ pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX);
+ pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL);
+ memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
}
}
/*
- * Add the information about the LSN waiter backend to the shared memory
- * array.
+ * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
+ * ordered by lsn, so that the waiter with smallest lsn is at the top.
*/
-static void
-addLSNWaiter(XLogRecPtr lsn)
+static int
+lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{
- WaitLSNProcInfo cur;
- int i;
+ const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
+ const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
- cur.procnum = MyProcNumber;
- cur.waitLSN = lsn;
+ if (aproc->waitLSN < bproc->waitLSN)
+ return 1;
+ else if (aproc->waitLSN > bproc->waitLSN)
+ return -1;
+ else
+ return 0;
+}
- SpinLockAcquire(&waitLSN->mutex);
+/*
+ * Update waitLSN->minWaitedLSN according to the current state of
+ * waitLSN->waitersHeap.
+ */
+static void
+updateMinWaitedLSN(void)
+{
+ XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
- for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ if (!pairingheap_is_empty(&waitLSN->waitersHeap))
{
- if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
- {
- WaitLSNProcInfo tmp;
+ pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
- tmp = waitLSN->procInfos[i];
- waitLSN->procInfos[i] = cur;
- cur = tmp;
- }
+ minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
}
- waitLSN->procInfos[i] = cur;
- waitLSN->numWaitedProcs++;
- pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
- SpinLockRelease(&waitLSN->mutex);
+ pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN);
}
/*
- * Delete the information about the LSN waiter backend from the shared memory
- * array.
+ * Put the current process into the heap of LSN waiters.
*/
static void
-deleteLSNWaiter(void)
+addLSNWaiter(XLogRecPtr lsn)
{
- int i;
- bool found = false;
+ WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
- SpinLockAcquire(&waitLSN->mutex);
+ Assert(!procInfo->inHeap);
- for (i = 0; i < waitLSN->numWaitedProcs; i++)
- {
- if (waitLSN->procInfos[i].procnum == MyProcNumber)
- found = true;
+ procInfo->procnum = MyProcNumber;
+ procInfo->waitLSN = lsn;
- if (found && i < waitLSN->numWaitedProcs - 1)
- {
- waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
- }
- }
+ SpinLockAcquire(&waitLSN->waitersHeapMutex);
- if (!found)
+ pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode);
+ procInfo->inHeap = true;
+ updateMinWaitedLSN();
+
+ SpinLockRelease(&waitLSN->waitersHeapMutex);
+}
+
+/*
+ * Remove the current process from the heap of LSN waiters if it's there.
+ */
+static void
+deleteLSNWaiter(void)
+{
+ WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
+
+ SpinLockAcquire(&waitLSN->waitersHeapMutex);
+
+ if (!procInfo->inHeap)
{
- SpinLockRelease(&waitLSN->mutex);
+ SpinLockRelease(&waitLSN->waitersHeapMutex);
return;
}
- waitLSN->numWaitedProcs--;
- if (waitLSN->numWaitedProcs != 0)
- pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
- else
- pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+ pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode);
+ procInfo->inHeap = false;
+ updateMinWaitedLSN();
- SpinLockRelease(&waitLSN->mutex);
+ SpinLockRelease(&waitLSN->waitersHeapMutex);
}
/*
@@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
{
int i;
int *wakeUpProcNums;
- int numWakeUpProcs;
+ int numWakeUpProcs = 0;
wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
- SpinLockAcquire(&waitLSN->mutex);
+ SpinLockAcquire(&waitLSN->waitersHeapMutex);
/*
- * Remember processes, whose waited LSNs are already replayed. We should
- * set their latches later after spinlock release.
+ * Iterate the pairing heap of waiting processes till we find LSN not yet
+ * replayed. Record the process numbers to set their latches later.
*/
- for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ while (!pairingheap_is_empty(&waitLSN->waitersHeap))
{
+ pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
+ WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
+
if (!XLogRecPtrIsInvalid(currentLSN) &&
- waitLSN->procInfos[i].waitLSN > currentLSN)
+ procInfo->waitLSN > currentLSN)
break;
- wakeUpProcNums[i] = waitLSN->procInfos[i].procnum;
+ wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum;
+ (void) pairingheap_remove_first(&waitLSN->waitersHeap);
+ procInfo->inHeap = false;
}
- /*
- * Immediately remove those processes from the shmem array. Otherwise,
- * shmem array items will be here till corresponding processes wake up and
- * delete themselves.
- */
- numWakeUpProcs = i;
- for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
- waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
- waitLSN->numWaitedProcs -= numWakeUpProcs;
-
- if (waitLSN->numWaitedProcs != 0)
- pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
- else
- pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+ updateMinWaitedLSN();
- SpinLockRelease(&waitLSN->mutex);
+ SpinLockRelease(&waitLSN->waitersHeapMutex);
/*
* Set latches for processes, whose waited LSNs are already replayed. This
@@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
void
WaitLSNCleanup(void)
{
- if (haveShmemItem)
+ if (waitLSN->procInfos[MyProcNumber].inHeap)
deleteLSNWaiter();
}
@@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
Assert(waitLSN);
/* Should be only called by a backend */
- Assert(MyBackendType == B_BACKEND);
+ Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends);
if (!RecoveryInProgress())
ereport(ERROR,
@@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
addLSNWaiter(targetLSN);
- haveShmemItem = true;
for (;;)
{
@@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
if (targetLSN > currentLSN)
{
deleteLSNWaiter();
- haveShmemItem = false;
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
LSN_FORMAT_ARGS(targetLSN),
LSN_FORMAT_ARGS(currentLSN))));
}
- else
- {
- haveShmemItem = false;
- }
}
Datum
diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c
index fe1deba13ec..7858e5e076b 100644
--- a/src/backend/lib/pairingheap.c
+++ b/src/backend/lib/pairingheap.c
@@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg)
pairingheap *heap;
heap = (pairingheap *) palloc(sizeof(pairingheap));
+ pairingheap_initialize(heap, compare, arg);
+
+ return heap;
+}
+
+/*
+ * pairingheap_initialize
+ *
+ * Same as pairingheap_allocate(), but initializes the pairing heap in-place
+ * rather than allocating a new chunk of memory. Useful to store the pairing
+ * heap in a shared memory.
+ */
+void
+pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare,
+ void *arg)
+{
heap->ph_compare = compare;
heap->ph_arg = arg;
heap->ph_root = NULL;
-
- return heap;
}
/*
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
index 10ef63f0c09..0d80248682c 100644
--- a/src/include/commands/waitlsn.h
+++ b/src/include/commands/waitlsn.h
@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* waitlsn.h
- * Declarations for LSN waiting routines.
+ * Declarations for LSN replay waiting routines.
*
* Copyright (c) 2024, PostgreSQL Global Development Group
*
@@ -12,23 +12,57 @@
#ifndef WAIT_LSN_H
#define WAIT_LSN_H
+#include "lib/pairingheap.h"
#include "postgres.h"
#include "port/atomics.h"
#include "storage/spin.h"
#include "tcop/dest.h"
-/* Shared memory structures */
+/*
+ * WaitLSNProcInfo – the shared memory structure representing information
+ * about the single process, which may wait for LSN replay. An item of
+ * waitLSN->procInfos array.
+ */
typedef struct WaitLSNProcInfo
{
+ /*
+ * A process number, same as the index of this item in waitLSN->procInfos.
+ * Stored for convenience.
+ */
int procnum;
+
+ /* LSN, which this process is waiting for */
XLogRecPtr waitLSN;
+
+ /* A pairing heap node for participation in waitLSN->waitersHeap */
+ pairingheap_node phNode;
+
+ /* A flag indicating that this item is added to waitLSN->waitersHeap */
+ bool inHeap;
} WaitLSNProcInfo;
+/*
+ * WaitLSNState - the shared memory state for the replay LSN waiting facility.
+ */
typedef struct WaitLSNState
{
- pg_atomic_uint64 minLSN;
- slock_t mutex;
- int numWaitedProcs;
+ /*
+ * The minimum LSN value some process is waiting for. Used for the
+ * fast-path checking if we need to wake up any waiters after replaying a
+ * WAL record.
+ */
+ pg_atomic_uint64 minWaitedLSN;
+
+ /*
+ * A pairing heap of waiting processes order by LSN values (least LSN is
+ * on top).
+ */
+ pairingheap waitersHeap;
+
+ /* A mutex protecting the pairing heap above */
+ slock_t waitersHeapMutex;
+
+ /* An array with per-process information, indexed by the process number */
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
} WaitLSNState;
diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h
index 7eade81535a..9e1c26033a1 100644
--- a/src/include/lib/pairingheap.h
+++ b/src/include/lib/pairingheap.h
@@ -77,6 +77,9 @@ typedef struct pairingheap
extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
void *arg);
+extern void pairingheap_initialize(pairingheap *heap,
+ pairingheap_comparator compare,
+ void *arg);
extern void pairingheap_free(pairingheap *heap);
extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
extern pairingheap_node *pairingheap_first(pairingheap *heap);