aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/waitlsn.c
blob: 1a83c34e09f7328b05c3db9a34c06fe3af3584be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
/*-------------------------------------------------------------------------
 *
 * waitlsn.c
 *	  Implements waiting for the given replay LSN, which is used in
 *	  CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
 *
 * Copyright (c) 2024, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/commands/waitlsn.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <float.h>
#include <math.h>

#include "pgstat.h"
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "commands/waitlsn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/wait_event_types.h"

static int	lsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
					void *arg);

struct WaitLSNState *waitLSN = NULL;

/* Report the amount of shared memory space needed for WaitLSNState. */
Size
WaitLSNShmemSize(void)
{
	Size		size;

	size = offsetof(WaitLSNState, procInfos);
	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
	return size;
}

/* Initialize the WaitLSNState in the shared memory. */
void
WaitLSNShmemInit(void)
{
	bool		found;

	waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
											   WaitLSNShmemSize(),
											   &found);
	if (!found)
	{
		pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX);
		pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL);
		memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
	}
}

/*
 * Comparison function for waitLSN->waitersHeap heap.  Waiting processes are
 * ordered by lsn, so that the waiter with smallest lsn is at the top.
 */
static int
lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{
	const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
	const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);

	if (aproc->waitLSN < bproc->waitLSN)
		return 1;
	else if (aproc->waitLSN > bproc->waitLSN)
		return -1;
	else
		return 0;
}

/*
 * Update waitLSN->minWaitedLSN according to the current state of
 * waitLSN->waitersHeap.
 */
static void
updateMinWaitedLSN(void)
{
	XLogRecPtr	minWaitedLSN = PG_UINT64_MAX;

	if (!pairingheap_is_empty(&waitLSN->waitersHeap))
	{
		pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);

		minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
	}

	pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN);
}

/*
 * Put the current process into the heap of LSN waiters.
 */
static void
addLSNWaiter(XLogRecPtr lsn)
{
	WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];

	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);

	Assert(!procInfo->inHeap);

	procInfo->procnum = MyProcNumber;
	procInfo->waitLSN = lsn;

	pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode);
	procInfo->inHeap = true;
	updateMinWaitedLSN();

	LWLockRelease(WaitLSNLock);
}

/*
 * Remove the current process from the heap of LSN waiters if it's there.
 */
static void
deleteLSNWaiter(void)
{
	WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];

	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);

	if (!procInfo->inHeap)
	{
		LWLockRelease(WaitLSNLock);
		return;
	}

	pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode);
	procInfo->inHeap = false;
	updateMinWaitedLSN();

	LWLockRelease(WaitLSNLock);
}

/*
 * Set latches of LSN waiters whose LSN has been replayed.  Set latches of all
 * LSN waiters when InvalidXLogRecPtr is given.
 */
void
WaitLSNSetLatches(XLogRecPtr currentLSN)
{
	int			i;
	int		   *wakeUpProcNums;
	int			numWakeUpProcs = 0;

	wakeUpProcNums = palloc(sizeof(int) * MaxBackends);

	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);

	/*
	 * Iterate the pairing heap of waiting processes till we find LSN not yet
	 * replayed.  Record the process numbers to set their latches later.
	 */
	while (!pairingheap_is_empty(&waitLSN->waitersHeap))
	{
		pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
		WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);

		if (!XLogRecPtrIsInvalid(currentLSN) &&
			procInfo->waitLSN > currentLSN)
			break;

		wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum;
		(void) pairingheap_remove_first(&waitLSN->waitersHeap);
		procInfo->inHeap = false;
	}

	updateMinWaitedLSN();

	LWLockRelease(WaitLSNLock);

	/*
	 * Set latches for processes, whose waited LSNs are already replayed. This
	 * involves spinlocks.  So, we shouldn't do this under a spinlock.
	 */
	for (i = 0; i < numWakeUpProcs; i++)
	{
		PGPROC	   *backend;

		backend = GetPGProcByNumber(wakeUpProcNums[i]);
		SetLatch(&backend->procLatch);
	}
	pfree(wakeUpProcNums);
}

/*
 * Delete our item from shmem array if any.
 */
void
WaitLSNCleanup(void)
{
	/*
	 * We do a fast-path check of the 'inHeap' flag without the lock.  This
	 * flag is set to true only by the process itself.  So, it's only possible
	 * to get a false positive.  But that will be eliminated by a recheck
	 * inside deleteLSNWaiter().
	 */
	if (waitLSN->procInfos[MyProcNumber].inHeap)
		deleteLSNWaiter();
}

/*
 * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
 * timeout happens.
 */
void
WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
{
	XLogRecPtr	currentLSN;
	TimestampTz endtime = 0;

	/* Shouldn't be called when shmem isn't initialized */
	Assert(waitLSN);

	/* Should be only called by a backend */
	Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends);

	if (!RecoveryInProgress())
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("recovery is not in progress"),
				 errhint("Waiting for LSN can only be executed during recovery.")));

	/* If target LSN is already replayed, exit immediately */
	if (targetLSN <= GetXLogReplayRecPtr(NULL))
		return;

	if (timeout > 0)
		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);

	addLSNWaiter(targetLSN);

	for (;;)
	{
		int			rc;
		int			latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
		long		delay_ms = 0;

		/* Check if the waited LSN has been replayed */
		currentLSN = GetXLogReplayRecPtr(NULL);
		if (targetLSN <= currentLSN)
			break;

		/* Recheck that recovery is still in-progress */
		if (!RecoveryInProgress())
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					 errmsg("recovery is not in progress"),
					 errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
							   LSN_FORMAT_ARGS(targetLSN),
							   LSN_FORMAT_ARGS(currentLSN))));

		if (timeout > 0)
		{
			delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
			latch_events |= WL_TIMEOUT;
			if (delay_ms <= 0)
				break;
		}

		CHECK_FOR_INTERRUPTS();

		rc = WaitLatch(MyLatch, latch_events, delay_ms,
					   WAIT_EVENT_WAIT_FOR_WAL_REPLAY);

		if (rc & WL_LATCH_SET)
			ResetLatch(MyLatch);
	}

	if (targetLSN > currentLSN)
	{
		deleteLSNWaiter();
		ereport(ERROR,
				(errcode(ERRCODE_QUERY_CANCELED),
				 errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
						LSN_FORMAT_ARGS(targetLSN),
						LSN_FORMAT_ARGS(currentLSN))));
	}
}

Datum
pg_wal_replay_wait(PG_FUNCTION_ARGS)
{
	XLogRecPtr	target_lsn = PG_GETARG_LSN(0);
	int64		timeout = PG_GETARG_INT64(1);
	CallContext *context = (CallContext *) fcinfo->context;

	if (timeout < 0)
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
				 errmsg("\"timeout\" must not be negative")));

	/*
	 * We are going to wait for the LSN replay.  We should first care that we
	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
	 * Otherwise, our snapshot could prevent the replay of WAL records
	 * implying a kind of self-deadlock.  This is the reason why
	 * pg_wal_replay_wait() is a procedure, not a function.
	 *
	 * At first, we check that pg_wal_replay_wait() is called in a non-atomic
	 * context.  That is, a procedure call isn't wrapped into a transaction,
	 * another procedure call, or a function call.
	 *
	 * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic
	 * context, CallStmt is processed with a snapshot.  Thankfully, we can pop
	 * this snapshot, because PortalRunUtility() can tolerate this.
	 */
	if (context->atomic)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("pg_wal_replay_wait() must be only called in non-atomic context"),
				 errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));

	if (ActiveSnapshotSet())
		PopActiveSnapshot();
	Assert(!ActiveSnapshotSet());
	InvalidateCatalogSnapshot();
	Assert(MyProc->xmin == InvalidTransactionId);

	(void) WaitForLSN(target_lsn, timeout);

	PG_RETURN_VOID();
}