1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
/*-------------------------------------------------------------------------
*
* bulk_write.c
* Efficiently and reliably populate a new relation
*
* The assumption is that no other backends access the relation while we are
* loading it, so we can take some shortcuts. Pages already present in the
* indicated fork when the bulk write operation is started are not modified
* unless explicitly written to. Do not mix operations through the regular
* buffer manager and the bulk loading interface!
*
* We bypass the buffer manager to avoid the locking overhead, and call
* smgrextend() directly. A downside is that the pages will need to be
* re-read into shared buffers on first use after the build finishes. That's
* usually a good tradeoff for large relations, and for small relations, the
* overhead isn't very significant compared to creating the relation in the
* first place.
*
* The pages are WAL-logged if needed. To save on WAL header overhead, we
* WAL-log several pages in one record.
*
* One tricky point is that because we bypass the buffer manager, we need to
* register the relation for fsyncing at the next checkpoint ourselves, and
* make sure that the relation is correctly fsync'd by us or the checkpointer
* even if a checkpoint happens concurrently.
*
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/storage/smgr/bulk_write.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xloginsert.h"
#include "access/xlogrecord.h"
#include "storage/bufpage.h"
#include "storage/bulk_write.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/rel.h"
#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
static const PGIOAlignedBlock zero_buffer = {0}; /* worth BLCKSZ */
typedef struct PendingWrite
{
BulkWriteBuffer buf;
BlockNumber blkno;
bool page_std;
} PendingWrite;
/*
* Bulk writer state for one relation fork.
*/
struct BulkWriteState
{
/* Information about the target relation we're writing */
SMgrRelation smgr;
ForkNumber forknum;
bool use_wal;
/* We keep several writes queued, and WAL-log them in batches */
int npending;
PendingWrite pending_writes[MAX_PENDING_WRITES];
/* Current size of the relation */
BlockNumber relsize;
/* The RedoRecPtr at the time that the bulk operation started */
XLogRecPtr start_RedoRecPtr;
MemoryContext memcxt;
};
static void smgr_bulk_flush(BulkWriteState *bulkstate);
/*
* Start a bulk write operation on a relation fork.
*/
BulkWriteState *
smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
{
return smgr_bulk_start_smgr(RelationGetSmgr(rel),
forknum,
RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
}
/*
* Start a bulk write operation on a relation fork.
*
* This is like smgr_bulk_start_rel, but can be used without a relcache entry.
*/
BulkWriteState *
smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
{
BulkWriteState *state;
state = palloc(sizeof(BulkWriteState));
state->smgr = smgr;
state->forknum = forknum;
state->use_wal = use_wal;
state->npending = 0;
state->relsize = smgrnblocks(smgr, forknum);
state->start_RedoRecPtr = GetRedoRecPtr();
/*
* Remember the memory context. We will use it to allocate all the
* buffers later.
*/
state->memcxt = CurrentMemoryContext;
return state;
}
/*
* Finish bulk write operation.
*
* This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
* the relation if needed.
*/
void
smgr_bulk_finish(BulkWriteState *bulkstate)
{
/* WAL-log and flush any remaining pages */
smgr_bulk_flush(bulkstate);
/*
* Fsync the relation, or register it for the next checkpoint, if
* necessary.
*/
if (SmgrIsTemp(bulkstate->smgr))
{
/* Temporary relations don't need to be fsync'd, ever */
}
else if (!bulkstate->use_wal)
{
/*----------
* This is either an unlogged relation, or a permanent relation but we
* skipped WAL-logging because wal_level=minimal:
*
* A) Unlogged relation
*
* Unlogged relations will go away on crash, but they need to be
* fsync'd on a clean shutdown. It's sufficient to call
* smgrregistersync(), that ensures that the checkpointer will
* flush it at the shutdown checkpoint. (It will flush it on the
* next online checkpoint too, which is not strictly necessary.)
*
* Note that the init-fork of an unlogged relation is not
* considered unlogged for our purposes. It's treated like a
* regular permanent relation. The callers will pass use_wal=true
* for the init fork.
*
* B) Permanent relation, WAL-logging skipped because wal_level=minimal
*
* This is a new relation, and we didn't WAL-log the pages as we
* wrote, but they need to be fsync'd before commit.
*
* We don't need to do that here, however. The fsync() is done at
* commit, by smgrDoPendingSyncs() (*).
*
* (*) smgrDoPendingSyncs() might decide to WAL-log the whole
* relation at commit instead of fsyncing it, if the relation was
* very small, but it's smgrDoPendingSyncs() responsibility in any
* case.
*
* We cannot distinguish the two here, so conservatively assume it's
* an unlogged relation. A permanent relation with wal_level=minimal
* would require no actions, see above.
*/
smgrregistersync(bulkstate->smgr, bulkstate->forknum);
}
else
{
/*
* Permanent relation, WAL-logged normally.
*
* We already WAL-logged all the pages, so they will be replayed from
* WAL on crash. However, when we wrote out the pages, we passed
* skipFsync=true to avoid the overhead of registering all the writes
* with the checkpointer. Register the whole relation now.
*
* There is one hole in that idea: If a checkpoint occurred while we
* were writing the pages, it already missed fsyncing the pages we had
* written before the checkpoint started. A crash later on would
* replay the WAL starting from the checkpoint, therefore it wouldn't
* replay our earlier WAL records. So if a checkpoint started after
* the bulk write, fsync the files now.
*/
/*
* Prevent a checkpoint from starting between the GetRedoRecPtr() and
* smgrregistersync() calls.
*/
Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
MyProc->delayChkptFlags |= DELAY_CHKPT_START;
if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
{
/*
* A checkpoint occurred and it didn't know about our writes, so
* fsync() the relation ourselves.
*/
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
}
else
{
smgrregistersync(bulkstate->smgr, bulkstate->forknum);
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
}
}
}
static int
buffer_cmp(const void *a, const void *b)
{
const PendingWrite *bufa = (const PendingWrite *) a;
const PendingWrite *bufb = (const PendingWrite *) b;
/* We should not see duplicated writes for the same block */
Assert(bufa->blkno != bufb->blkno);
if (bufa->blkno > bufb->blkno)
return 1;
else
return -1;
}
/*
* Finish all the pending writes.
*/
static void
smgr_bulk_flush(BulkWriteState *bulkstate)
{
int npending = bulkstate->npending;
PendingWrite *pending_writes = bulkstate->pending_writes;
if (npending == 0)
return;
if (npending > 1)
qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
if (bulkstate->use_wal)
{
BlockNumber blknos[MAX_PENDING_WRITES];
Page pages[MAX_PENDING_WRITES];
bool page_std = true;
for (int i = 0; i < npending; i++)
{
blknos[i] = pending_writes[i].blkno;
pages[i] = pending_writes[i].buf->data;
/*
* If any of the pages use !page_std, we log them all as such.
* That's a bit wasteful, but in practice, a mix of standard and
* non-standard page layout is rare. None of the built-in AMs do
* that.
*/
if (!pending_writes[i].page_std)
page_std = false;
}
log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
npending, blknos, pages, page_std);
}
for (int i = 0; i < npending; i++)
{
BlockNumber blkno = pending_writes[i].blkno;
Page page = pending_writes[i].buf->data;
PageSetChecksumInplace(page, blkno);
if (blkno >= bulkstate->relsize)
{
/*
* If we have to write pages nonsequentially, fill in the space
* with zeroes until we come back and overwrite. This is not
* logically necessary on standard Unix filesystems (unwritten
* space will read as zeroes anyway), but it should help to avoid
* fragmentation. The dummy pages aren't WAL-logged though.
*/
while (blkno > bulkstate->relsize)
{
/* don't set checksum for all-zero page */
smgrextend(bulkstate->smgr, bulkstate->forknum,
bulkstate->relsize,
&zero_buffer,
true);
bulkstate->relsize++;
}
smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
bulkstate->relsize++;
}
else
smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
pfree(page);
}
bulkstate->npending = 0;
}
/*
* Queue write of 'buf'.
*
* NB: this takes ownership of 'buf'!
*
* You are only allowed to write a given block once as part of one bulk write
* operation.
*/
void
smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
{
PendingWrite *w;
w = &bulkstate->pending_writes[bulkstate->npending++];
w->buf = buf;
w->blkno = blocknum;
w->page_std = page_std;
if (bulkstate->npending == MAX_PENDING_WRITES)
smgr_bulk_flush(bulkstate);
}
/*
* Allocate a new buffer which can later be written with smgr_bulk_write().
*
* There is no function to free the buffer. When you pass it to
* smgr_bulk_write(), it takes ownership and frees it when it's no longer
* needed.
*
* This is currently implemented as a simple palloc, but could be implemented
* using a ring buffer or larger chunks in the future, so don't rely on it.
*/
BulkWriteBuffer
smgr_bulk_get_buf(BulkWriteState *bulkstate)
{
return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
}
|