1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
|
/*-------------------------------------------------------------------------
*
* xlogwait.c
* Implements waiting for the given replay LSN, which is used in
* CALL pg_wal_replay_wait(target_lsn pg_lsn,
* timeout float8, no_error bool).
*
* Copyright (c) 2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/access/transam/xlogwait.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <float.h>
#include <math.h>
#include "pgstat.h"
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
void *arg);
struct WaitLSNState *waitLSNState = NULL;
/* Report the amount of shared memory space needed for WaitLSNState. */
Size
WaitLSNShmemSize(void)
{
Size size;
size = offsetof(WaitLSNState, procInfos);
size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
return size;
}
/* Initialize the WaitLSNState in the shared memory. */
void
WaitLSNShmemInit(void)
{
bool found;
waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
WaitLSNShmemSize(),
&found);
if (!found)
{
pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX);
pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL);
memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
}
}
/*
* Comparison function for waitLSN->waitersHeap heap. Waiting processes are
* ordered by lsn, so that the waiter with smallest lsn is at the top.
*/
static int
waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{
const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
if (aproc->waitLSN < bproc->waitLSN)
return 1;
else if (aproc->waitLSN > bproc->waitLSN)
return -1;
else
return 0;
}
/*
* Update waitLSN->minWaitedLSN according to the current state of
* waitLSN->waitersHeap.
*/
static void
updateMinWaitedLSN(void)
{
XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
if (!pairingheap_is_empty(&waitLSNState->waitersHeap))
{
pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
}
pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN);
}
/*
* Put the current process into the heap of LSN waiters.
*/
static void
addLSNWaiter(XLogRecPtr lsn)
{
WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
Assert(!procInfo->inHeap);
procInfo->procno = MyProcNumber;
procInfo->waitLSN = lsn;
pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
procInfo->inHeap = true;
updateMinWaitedLSN();
LWLockRelease(WaitLSNLock);
}
/*
* Remove the current process from the heap of LSN waiters if it's there.
*/
static void
deleteLSNWaiter(void)
{
WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
if (!procInfo->inHeap)
{
LWLockRelease(WaitLSNLock);
return;
}
pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode);
procInfo->inHeap = false;
updateMinWaitedLSN();
LWLockRelease(WaitLSNLock);
}
/*
* Remove waiters whose LSN has been replayed from the heap and set their
* latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
* and set latches for all waiters.
*/
void
WaitLSNWakeup(XLogRecPtr currentLSN)
{
int i;
ProcNumber *wakeUpProcs;
int numWakeUpProcs = 0;
wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
/*
* Iterate the pairing heap of waiting processes till we find LSN not yet
* replayed. Record the process numbers to wake up, but to avoid holding
* the lock for too long, send the wakeups only after releasing the lock.
*/
while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
{
pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
if (!XLogRecPtrIsInvalid(currentLSN) &&
procInfo->waitLSN > currentLSN)
break;
wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
(void) pairingheap_remove_first(&waitLSNState->waitersHeap);
procInfo->inHeap = false;
}
updateMinWaitedLSN();
LWLockRelease(WaitLSNLock);
/*
* Set latches for processes, whose waited LSNs are already replayed. As
* the time consuming operations, we do it this outside of WaitLSNLock.
* This is actually fine because procLatch isn't ever freed, so we just
* can potentially set the wrong process' (or no process') latch.
*/
for (i = 0; i < numWakeUpProcs; i++)
{
SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
}
pfree(wakeUpProcs);
}
/*
* Delete our item from shmem array if any.
*/
void
WaitLSNCleanup(void)
{
/*
* We do a fast-path check of the 'inHeap' flag without the lock. This
* flag is set to true only by the process itself. So, it's only possible
* to get a false positive. But that will be eliminated by a recheck
* inside deleteLSNWaiter().
*/
if (waitLSNState->procInfos[MyProcNumber].inHeap)
deleteLSNWaiter();
}
/*
* Wait using MyLatch till the given LSN is replayed, the postmaster dies or
* timeout happens.
*/
WaitLSNResult
WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
{
XLogRecPtr currentLSN;
TimestampTz endtime = 0;
int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
/* Shouldn't be called when shmem isn't initialized */
Assert(waitLSNState);
/* Should have a valid proc number */
Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
if (!RecoveryInProgress())
{
/*
* Recovery is not in progress. Given that we detected this in the
* very first check, this procedure was mistakenly called on primary.
* However, it's possible that standby was promoted concurrently to
* the procedure call, while target LSN is replayed. So, we still
* check the last replay LSN before reporting an error.
*/
if (targetLSN <= GetXLogReplayRecPtr(NULL))
return WAIT_LSN_RESULT_SUCCESS;
return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
}
else
{
/* If target LSN is already replayed, exit immediately */
if (targetLSN <= GetXLogReplayRecPtr(NULL))
return WAIT_LSN_RESULT_SUCCESS;
}
if (timeout > 0)
{
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
wake_events |= WL_TIMEOUT;
}
/*
* Add our process to the pairing heap of waiters. It might happen that
* target LSN gets replayed before we do. Another check at the beginning
* of the loop below prevents the race condition.
*/
addLSNWaiter(targetLSN);
for (;;)
{
int rc;
long delay_ms = 0;
/* Recheck that recovery is still in-progress */
if (!RecoveryInProgress())
{
/*
* Recovery was ended, but recheck if target LSN was already
* replayed. See the comment regarding deleteLSNWaiter() below.
*/
deleteLSNWaiter();
currentLSN = GetXLogReplayRecPtr(NULL);
if (targetLSN <= currentLSN)
return WAIT_LSN_RESULT_SUCCESS;
return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
}
else
{
/* Check if the waited LSN has been replayed */
currentLSN = GetXLogReplayRecPtr(NULL);
if (targetLSN <= currentLSN)
break;
}
/*
* If the timeout value is specified, calculate the number of
* milliseconds before the timeout. Exit if the timeout is already
* reached.
*/
if (timeout > 0)
{
delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
if (delay_ms <= 0)
break;
}
CHECK_FOR_INTERRUPTS();
rc = WaitLatch(MyLatch, wake_events, delay_ms,
WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (rc & WL_POSTMASTER_DEATH)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating connection due to unexpected postmaster exit"),
errcontext("while waiting for LSN replay")));
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
}
/*
* Delete our process from the shared memory pairing heap. We might
* already be deleted by the startup process. The 'inHeap' flag prevents
* us from the double deletion.
*/
deleteLSNWaiter();
/*
* If we didn't reach the target LSN, we must be exited by timeout.
*/
if (targetLSN > currentLSN)
return WAIT_LSN_RESULT_TIMEOUT;
return WAIT_LSN_RESULT_SUCCESS;
}
|