aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 17:56:26 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 18:46:41 +0200
commit2c03216d831160bedd72d45f712601b6f7d03f1c (patch)
treeab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/heap/heapam.c
parent8dc626defec23016dd5988208d8704b858b9d21d (diff)
downloadpostgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz
postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and block(s) in a standardized format. That makes it easier to write tools that need that information, like pg_rewind, prefetching the blocks to speed up recovery, etc. There's a whole new API for building WAL records, replacing the XLogRecData chains used previously. The new API consists of XLogRegister* functions, which are called for each buffer and chunk of data that is added to the record. The new API also gives more control over when a full-page image is written, by passing flags to the XLogRegisterBuffer function. This also simplifies the XLogReadBufferForRedo() calls. The function can dig the relation and block number from the WAL record, so they no longer need to be passed as arguments. For the convenience of redo routines, XLogReader now disects each WAL record after reading it, copying the main data part and the per-block data into MAXALIGNed buffers. The data chunks are not aligned within the WAL record, but the redo routines can assume that the pointers returned by XLogRecGet* functions are. Redo routines are now passed the XLogReaderState, which contains the record in the already-disected format, instead of the plain XLogRecord. The new record format also makes the fixed size XLogRecord header smaller, by removing the xl_len field. The length of the "main data" portion is now stored at the end of the WAL record, and there's a separate header after XLogRecord for it. The alignment padding at the end of XLogRecord is also removed. This compansates for the fact that the new format would otherwise be more bulky than the old format. Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera, Fujii Masao.
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c987
1 files changed, 423 insertions, 564 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1763b70631d..c6e1eb79b2c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2132,84 +2132,64 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
- XLogRecData rdata[4];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
- bool need_tuple_data;
+ int bufflags = 0;
/*
- * For logical decoding, we need the tuple even if we're doing a full
- * page write, so make sure to log it separately. (XXX We could
- * alternatively store a pointer into the FPW).
- *
- * Also, if this is a catalog, we need to transmit combocids to
- * properly decode, so log that as well.
+ * If this is a catalog, we need to transmit combocids to properly
+ * decode, so log that as well.
*/
- need_tuple_data = RelationIsLogicallyLogged(relation);
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, heaptup);
- xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = heaptup->t_self;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapInsert;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
- xlhdr.t_infomask = heaptup->t_data->t_infomask;
- xlhdr.t_hoff = heaptup->t_data->t_hoff;
-
/*
- * note we mark rdata[1] as belonging to buffer; if XLogInsert decides
- * to write the whole page to the xlog, we don't need to store
- * xl_heap_header in the xlog.
+ * If this is the single and first tuple on page, we can reinit the
+ * page instead of restoring the whole thing. Set flag, and hide
+ * buffer references from XLogInsert.
*/
- rdata[1].data = (char *) &xlhdr;
- rdata[1].len = SizeOfHeapHeader;
- rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = &(rdata[2]);
+ if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
+ PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+ {
+ info |= XLOG_HEAP_INIT_PAGE;
+ bufflags |= REGBUF_WILL_INIT;
+ }
- /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
+ xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
+ Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
/*
- * Make a separate rdata entry for the tuple's buffer if we're doing
- * logical decoding, so that an eventual FPW doesn't remove the
- * tuple's data.
+ * For logical decoding, we need the tuple even if we're doing a full
+ * page write, so make sure it's included even if we take a full-page
+ * image. (XXX We could alternatively store a pointer into the FPW).
*/
- if (need_tuple_data)
+ if (RelationIsLogicallyLogged(relation))
{
- rdata[2].next = &(rdata[3]);
-
- rdata[3].data = NULL;
- rdata[3].len = 0;
- rdata[3].buffer = buffer;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
-
xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ bufflags |= REGBUF_KEEP_DATA;
}
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
+
+ xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
+ xlhdr.t_infomask = heaptup->t_data->t_infomask;
+ xlhdr.t_hoff = heaptup->t_data->t_hoff;
+
/*
- * If this is the single and first tuple on page, we can reinit the
- * page instead of restoring the whole thing. Set flag, and hide
- * buffer references from XLogInsert.
+ * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
+ * write the whole page to the xlog, we don't need to store
+ * xl_heap_header in the xlog.
*/
- if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
- PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
- {
- info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
- }
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+ XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ XLogRegisterBufData(0,
+ (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits));
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, info);
PageSetLSN(page, recptr);
}
@@ -2397,6 +2377,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
break;
RelationPutHeapTuple(relation, buffer, heaptup);
+
+ /*
+ * We don't use heap_multi_insert for catalog tuples yet, but
+ * better be prepared...
+ */
+ if (needwal && need_cids)
+ log_heap_new_cid(relation, heaptup);
}
if (PageIsAllVisible(page))
@@ -2419,12 +2406,12 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
{
XLogRecPtr recptr;
xl_heap_multi_insert *xlrec;
- XLogRecData rdata[3];
uint8 info = XLOG_HEAP2_MULTI_INSERT;
char *tupledata;
int totaldatalen;
char *scratchptr = scratch;
bool init;
+ int bufflags = 0;
/*
* If the page was previously empty, we can reinit the page
@@ -2450,8 +2437,6 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
tupledata = scratchptr;
xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
- xlrec->node = relation->rd_node;
- xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntuples = nthispage;
/*
@@ -2481,64 +2466,40 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
datalen);
tuphdr->datalen = datalen;
scratchptr += datalen;
-
- /*
- * We don't use heap_multi_insert for catalog tuples yet, but
- * better be prepared...
- */
- if (need_cids)
- log_heap_new_cid(relation, heaptup);
}
totaldatalen = scratchptr - tupledata;
Assert((scratchptr - scratch) < BLCKSZ);
- rdata[0].data = (char *) xlrec;
- rdata[0].len = tupledata - scratch;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &rdata[1];
-
- rdata[1].data = tupledata;
- rdata[1].len = totaldatalen;
- rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
-
- /*
- * Make a separate rdata entry for the tuple's buffer if we're
- * doing logical decoding, so that an eventual FPW doesn't remove
- * the tuple's data.
- */
if (need_tuple_data)
- {
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].buffer = buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
- }
/*
- * If we're going to reinitialize the whole page using the WAL
- * record, hide buffer reference from XLogInsert.
+ * Signal that this is the last xl_heap_multi_insert record
+ * emitted by this call to heap_multi_insert(). Needed for logical
+ * decoding so it knows when to cleanup temporary data.
*/
+ if (ndone + nthispage == ntuples)
+ xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT;
+
if (init)
{
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
info |= XLOG_HEAP_INIT_PAGE;
+ bufflags |= REGBUF_WILL_INIT;
}
/*
- * Signal that this is the last xl_heap_multi_insert record
- * emitted by this call to heap_multi_insert(). Needed for logical
- * decoding so it knows when to cleanup temporary data.
+ * If we're doing logical decoding, include the new tuple data
+ * even if we take a full-page image of the page.
*/
- if (ndone + nthispage == ntuples)
- xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT;
+ if (need_tuple_data)
+ bufflags |= REGBUF_KEEP_DATA;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) xlrec, tupledata - scratch);
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
- recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+ XLogRegisterBufData(0, tupledata, totaldatalen);
+ recptr = XLogInsert(RM_HEAP2_ID, info);
PageSetLSN(page, recptr);
}
@@ -2909,7 +2870,6 @@ l1:
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[4];
/* For logical decode we need combocids to properly decode the catalog */
if (RelationIsAccessibleInLogicalDecoding(relation))
@@ -2918,19 +2878,21 @@ l1:
xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
tp.t_data->t_infomask2);
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tp.t_self;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
xlrec.xmax = new_xmax;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapDelete;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ if (old_key_tuple != NULL)
+ {
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ }
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
/*
* Log replica identity of the deleted tuple if there is one
@@ -2943,27 +2905,14 @@ l1:
xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
- rdata[1].next = &(rdata[2]);
- rdata[2].data = (char *) &xlhdr;
- rdata[2].len = SizeOfHeapHeader;
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
-
- rdata[2].next = &(rdata[3]);
- rdata[3].data = (char *) old_key_tuple->t_data
- + offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].len = old_key_tuple->t_len
- - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = InvalidBuffer;
- rdata[3].next = NULL;
-
- if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
- else
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+ XLogRegisterData((char *) old_key_tuple->t_data
+ + offsetof(HeapTupleHeaderData, t_bits),
+ old_key_tuple->t_len
+ - offsetof(HeapTupleHeaderData, t_bits));
}
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
PageSetLSN(page, recptr);
}
@@ -4735,25 +4684,17 @@ failed:
{
xl_heap_lock xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tuple->t_self;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
+
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
xlrec.locking_xid = xid;
xlrec.infobits_set = compute_infobits(new_infomask,
tuple->t_data->t_infomask2);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapLock;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = *buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr);
}
@@ -5342,26 +5283,18 @@ l4:
{
xl_heap_lock_updated xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
Page page = BufferGetPage(buf);
- xlrec.target.node = rel->rd_node;
- xlrec.target.tid = mytup.t_self;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+
+ xlrec.offnum = ItemPointerGetOffsetNumber(&mytup.t_self);
xlrec.xmax = new_xmax;
xlrec.infobits_set = compute_infobits(new_infomask, new_infomask2);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapLockUpdated;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogRegisterData((char *) &xlrec, SizeOfHeapLockUpdated);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = buf;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED);
PageSetLSN(page, recptr);
}
@@ -5489,23 +5422,16 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
{
xl_heap_inplace xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tuple->t_self;
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapInplace;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapInplace);
- rdata[1].data = (char *) htup + htup->t_hoff;
- rdata[1].len = newlen;
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
PageSetLSN(page, recptr);
}
@@ -6507,17 +6433,14 @@ log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
{
xl_heap_cleanup_info xlrec;
XLogRecPtr recptr;
- XLogRecData rdata;
xlrec.node = rnode;
xlrec.latestRemovedXid = latestRemovedXid;
- rdata.data = (char *) &xlrec;
- rdata.len = SizeOfHeapCleanupInfo;
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapCleanupInfo);
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO, &rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO);
return recptr;
}
@@ -6542,23 +6465,19 @@ log_heap_clean(Relation reln, Buffer buffer,
TransactionId latestRemovedXid)
{
xl_heap_clean xlrec;
- uint8 info;
XLogRecPtr recptr;
- XLogRecData rdata[4];
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
- xlrec.node = reln->rd_node;
- xlrec.block = BufferGetBlockNumber(buffer);
xlrec.latestRemovedXid = latestRemovedXid;
xlrec.nredirected = nredirected;
xlrec.ndead = ndead;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapClean;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapClean);
+
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
/*
* The OffsetNumber arrays are not actually in the buffer, but we pretend
@@ -6569,49 +6488,18 @@ log_heap_clean(Relation reln, Buffer buffer,
* even if no item pointers changed state.
*/
if (nredirected > 0)
- {
- rdata[1].data = (char *) redirected;
- rdata[1].len = nredirected * sizeof(OffsetNumber) * 2;
- }
- else
- {
- rdata[1].data = NULL;
- rdata[1].len = 0;
- }
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = &(rdata[2]);
+ XLogRegisterBufData(0, (char *) redirected,
+ nredirected * sizeof(OffsetNumber) * 2);
if (ndead > 0)
- {
- rdata[2].data = (char *) nowdead;
- rdata[2].len = ndead * sizeof(OffsetNumber);
- }
- else
- {
- rdata[2].data = NULL;
- rdata[2].len = 0;
- }
- rdata[2].buffer = buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = &(rdata[3]);
+ XLogRegisterBufData(0, (char *) nowdead,
+ ndead * sizeof(OffsetNumber));
if (nunused > 0)
- {
- rdata[3].data = (char *) nowunused;
- rdata[3].len = nunused * sizeof(OffsetNumber);
- }
- else
- {
- rdata[3].data = NULL;
- rdata[3].len = 0;
- }
- rdata[3].buffer = buffer;
- rdata[3].buffer_std = true;
- rdata[3].next = NULL;
+ XLogRegisterBufData(0, (char *) nowunused,
+ nunused * sizeof(OffsetNumber));
- info = XLOG_HEAP2_CLEAN;
- recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEAN);
return recptr;
}
@@ -6626,35 +6514,28 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
{
xl_heap_freeze_page xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
/* nor when there are no tuples to freeze */
Assert(ntuples > 0);
- xlrec.node = reln->rd_node;
- xlrec.block = BufferGetBlockNumber(buffer);
xlrec.cutoff_xid = cutoff_xid;
xlrec.ntuples = ntuples;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapFreezePage;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
/*
* The freeze plan array is not actually in the buffer, but pretend that
* it is. When XLogInsert stores the whole buffer, the freeze plan need
* not be stored too.
*/
- rdata[1].data = (char *) tuples;
- rdata[1].len = ntuples * sizeof(xl_heap_freeze_tuple);
- rdata[1].buffer = buffer;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) tuples,
+ ntuples * sizeof(xl_heap_freeze_tuple));
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
return recptr;
}
@@ -6665,8 +6546,8 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
* corresponding visibility map block. Both should have already been modified
* and dirtied.
*
- * If checksums are enabled, we also add the heap_buffer to the chain to
- * protect it from being torn.
+ * If checksums are enabled, we also generate a full-page image of
+ * heap_buffer, if necessary.
*/
XLogRecPtr
log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
@@ -6674,38 +6555,23 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
{
xl_heap_visible xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ uint8 flags;
Assert(BufferIsValid(heap_buffer));
Assert(BufferIsValid(vm_buffer));
- xlrec.node = rnode;
- xlrec.block = BufferGetBlockNumber(heap_buffer);
xlrec.cutoff_xid = cutoff_xid;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapVisible;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogRegisterBuffer(0, vm_buffer, 0);
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = vm_buffer;
- rdata[1].buffer_std = false;
- rdata[1].next = NULL;
+ flags = REGBUF_STANDARD;
+ if (!XLogHintBitIsNeeded())
+ flags |= REGBUF_NO_IMAGE;
+ XLogRegisterBuffer(1, heap_buffer, flags);
- if (XLogHintBitIsNeeded())
- {
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].buffer = heap_buffer;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
- }
-
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE);
return recptr;
}
@@ -6721,22 +6587,23 @@ log_heap_update(Relation reln, Buffer oldbuf,
bool all_visible_cleared, bool new_all_visible_cleared)
{
xl_heap_update xlrec;
- xl_heap_header_len xlhdr;
- xl_heap_header_len xlhdr_idx;
+ xl_heap_header xlhdr;
+ xl_heap_header xlhdr_idx;
uint8 info;
uint16 prefix_suffix[2];
uint16 prefixlen = 0,
suffixlen = 0;
XLogRecPtr recptr;
- XLogRecData rdata[9];
Page page = BufferGetPage(newbuf);
bool need_tuple_data = RelationIsLogicallyLogged(reln);
- int nr;
- Buffer newbufref;
+ bool init;
+ int bufflags;
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
+ XLogBeginInsert();
+
if (HeapTupleIsHeapOnly(newtup))
info = XLOG_HEAP_HOT_UPDATE;
else
@@ -6794,103 +6661,97 @@ log_heap_update(Relation reln, Buffer oldbuf,
suffixlen = 0;
}
- xlrec.target.node = reln->rd_node;
- xlrec.target.tid = oldtup->t_self;
- xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
- xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
- oldtup->t_data->t_infomask2);
- xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
+ /* Prepare main WAL data chain */
xlrec.flags = 0;
if (all_visible_cleared)
xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
- xlrec.newtid = newtup->t_self;
if (new_all_visible_cleared)
xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
if (prefixlen > 0)
xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD;
if (suffixlen > 0)
xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD;
+ if (need_tuple_data)
+ {
+ xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ if (old_key_tuple)
+ {
+ if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ }
+ }
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- newbufref = InvalidBuffer;
+ init = true;
}
else
- newbufref = newbuf;
+ init = false;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].buffer = oldbuf;
- rdata[0].buffer_std = true;
- rdata[0].next = &(rdata[1]);
+ /* Prepare WAL data for the old page */
+ xlrec.old_offnum = ItemPointerGetOffsetNumber(&oldtup->t_self);
+ xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
+ xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
+ oldtup->t_data->t_infomask2);
+
+ /* Prepare WAL data for the new page */
+ xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
+ xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
+
+ bufflags = REGBUF_STANDARD;
+ if (init)
+ bufflags |= REGBUF_WILL_INIT;
+ if (need_tuple_data)
+ bufflags |= REGBUF_KEEP_DATA;
- rdata[1].data = (char *) &xlrec;
- rdata[1].len = SizeOfHeapUpdate;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
+ XLogRegisterBuffer(0, newbuf, bufflags);
+ if (oldbuf != newbuf)
+ XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
- /* prefix and/or suffix length fields */
+ XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
+
+ /*
+ * Prepare WAL data for the new tuple.
+ */
if (prefixlen > 0 || suffixlen > 0)
{
if (prefixlen > 0 && suffixlen > 0)
{
prefix_suffix[0] = prefixlen;
prefix_suffix[1] = suffixlen;
- rdata[2].data = (char *) &prefix_suffix;
- rdata[2].len = 2 * sizeof(uint16);
+ XLogRegisterBufData(0, (char *) &prefix_suffix, sizeof(uint16) * 2);
}
else if (prefixlen > 0)
{
- rdata[2].data = (char *) &prefixlen;
- rdata[2].len = sizeof(uint16);
+ XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16));
}
else
{
- rdata[2].data = (char *) &suffixlen;
- rdata[2].len = sizeof(uint16);
+ XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16));
}
- rdata[2].buffer = newbufref;
- rdata[2].buffer_std = true;
- rdata[2].next = &(rdata[3]);
- nr = 3;
}
- else
- nr = 2;
-
- xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
- xlhdr.header.t_infomask = newtup->t_data->t_infomask;
- xlhdr.header.t_hoff = newtup->t_data->t_hoff;
- Assert(offsetof(HeapTupleHeaderData, t_bits) +prefixlen + suffixlen <= newtup->t_len);
- xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -prefixlen - suffixlen;
- /*
- * As with insert records, we need not store this rdata segment if we
- * decide to store the whole buffer instead, unless we're doing logical
- * decoding.
- */
- rdata[nr].data = (char *) &xlhdr;
- rdata[nr].len = SizeOfHeapHeaderLen;
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = &(rdata[nr + 1]);
- nr++;
+ xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
+ xlhdr.t_infomask = newtup->t_data->t_infomask;
+ xlhdr.t_hoff = newtup->t_data->t_hoff;
+ Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len);
/*
* PG73FORMAT: write bitmap [+ padding] [+ oid] + data
*
* The 'data' doesn't include the common prefix or suffix.
*/
+ XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
if (prefixlen == 0)
{
- rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -suffixlen;
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterBufData(0,
+ ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits),
+ newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -suffixlen);
}
else
{
@@ -6901,75 +6762,33 @@ log_heap_update(Relation reln, Buffer oldbuf,
/* bitmap [+ padding] [+ oid] */
if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) >0)
{
- rdata[nr - 1].next = &(rdata[nr]);
- rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterBufData(0,
+ ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits),
+ newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits));
}
/* data after common prefix */
- rdata[nr - 1].next = &(rdata[nr]);
- rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen;
- rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen;
- rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterBufData(0,
+ ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen,
+ newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen);
}
- /*
- * Separate storage for the FPW buffer reference of the new page in the
- * wal_level >= logical case.
- */
- if (need_tuple_data)
+ /* We need to log a tuple identity */
+ if (need_tuple_data && old_key_tuple)
{
- rdata[nr - 1].next = &(rdata[nr]);
-
- rdata[nr].data = NULL,
- rdata[nr].len = 0;
- rdata[nr].buffer = newbufref;
- rdata[nr].buffer_std = true;
- rdata[nr].next = NULL;
- nr++;
-
- xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ /* don't really need this, but its more comfy to decode */
+ xlhdr_idx.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+ xlhdr_idx.t_infomask = old_key_tuple->t_data->t_infomask;
+ xlhdr_idx.t_hoff = old_key_tuple->t_data->t_hoff;
- /* We need to log a tuple identity */
- if (old_key_tuple)
- {
- /* don't really need this, but its more comfy to decode */
- xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2;
- xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask;
- xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
- xlhdr_idx.t_len = old_key_tuple->t_len;
-
- rdata[nr - 1].next = &(rdata[nr]);
- rdata[nr].data = (char *) &xlhdr_idx;
- rdata[nr].len = SizeOfHeapHeaderLen;
- rdata[nr].buffer = InvalidBuffer;
- rdata[nr].next = &(rdata[nr + 1]);
- nr++;
-
- /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[nr].data = (char *) old_key_tuple->t_data
- + offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].len = old_key_tuple->t_len
- - offsetof(HeapTupleHeaderData, t_bits);
- rdata[nr].buffer = InvalidBuffer;
- rdata[nr].next = NULL;
- nr++;
+ XLogRegisterData((char *) &xlhdr_idx, SizeOfHeapHeader);
- if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
- else
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
- }
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ XLogRegisterData((char *) old_key_tuple->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ old_key_tuple->t_len - offsetof(HeapTupleHeaderData, t_bits));
}
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
+ recptr = XLogInsert(RM_HEAP_ID, info);
return recptr;
}
@@ -6986,15 +6805,14 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
xl_heap_new_cid xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[1];
HeapTupleHeader hdr = tup->t_data;
Assert(ItemPointerIsValid(&tup->t_self));
Assert(tup->t_tableOid != InvalidOid);
xlrec.top_xid = GetTopTransactionId();
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tup->t_self;
+ xlrec.target_node = relation->rd_node;
+ xlrec.target_tid = tup->t_self;
/*
* If the tuple got inserted & deleted in the same TX we definitely have a
@@ -7035,12 +6853,15 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
xlrec.combocid = InvalidCommandId;
}
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapNewCid;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = NULL;
+ /*
+ * Note that we don't need to register the buffer here, because this
+ * operation does not modify the page. The insert/update/delete that
+ * called us certainly did, but that's WAL-logged separately.
+ */
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata);
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
return recptr;
}
@@ -7165,7 +6986,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
* Handles CLEANUP_INFO
*/
static void
-heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_cleanup_info(XLogReaderState *record)
{
xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
@@ -7179,15 +7000,16 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
*/
/* Backup blocks are not used in cleanup_info records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ Assert(!XLogRecHasAnyBlockRefs(record));
}
/*
* Handles HEAP2_CLEAN record type
*/
static void
-heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_clean(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
Buffer buffer;
Size freespace = 0;
@@ -7195,8 +7017,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
BlockNumber blkno;
XLogRedoAction action;
- rnode = xlrec->node;
- blkno = xlrec->block;
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
/*
* We're about to remove tuples. In Hot Standby mode, ensure that there's
@@ -7213,9 +7034,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
* If we have a full-page image, restore it (using a cleanup lock) and
* we're done.
*/
- action = XLogReadBufferForRedoExtended(lsn, record, 0,
- rnode, MAIN_FORKNUM, blkno,
- RBM_NORMAL, true, &buffer);
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true,
+ &buffer);
if (action == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buffer);
@@ -7226,11 +7046,13 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
int nredirected;
int ndead;
int nunused;
+ Size datalen;
+
+ redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
nredirected = xlrec->nredirected;
ndead = xlrec->ndead;
- end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
- redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
+ end = (OffsetNumber *) ((char *) redirected + datalen);
nowdead = redirected + (nredirected * 2);
nowunused = nowdead + ndead;
nunused = (end - nowunused);
@@ -7263,7 +7085,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO)
- XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
+ XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}
/*
@@ -7275,17 +7097,18 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
* page modification would fail to clear the visibility map bit.
*/
static void
-heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_visible(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+ Buffer vmbuffer = InvalidBuffer;
Buffer buffer;
Page page;
RelFileNode rnode;
BlockNumber blkno;
XLogRedoAction action;
- rnode = xlrec->node;
- blkno = xlrec->block;
+ XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
/*
* If there are any Hot Standby transactions running that have an xmin
@@ -7304,7 +7127,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
* truncated later in recovery, we don't need to update the page, but we'd
* better still update the visibility map.
*/
- action = XLogReadBufferForRedo(lsn, record, 1, rnode, blkno, &buffer);
+ action = XLogReadBufferForRedo(record, 1, &buffer);
if (action == BLK_NEEDS_REDO)
{
/*
@@ -7341,12 +7164,21 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
* the visibility map bit does so before checking the page LSN, so any
* bits that need to be cleared will still be cleared.
*/
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
+ &vmbuffer) == BLK_NEEDS_REDO)
{
+ Page vmpage = BufferGetPage(vmbuffer);
Relation reln;
- Buffer vmbuffer = InvalidBuffer;
+
+ /* initialize the page if it was read as zeros */
+ if (PageIsNew(vmpage))
+ PageInit(vmpage, BLCKSZ, 0);
+
+ /*
+ * XLogReplayBufferExtended locked the buffer. But visibilitymap_set
+ * will handle locking itself.
+ */
+ LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
reln = CreateFakeRelcacheEntry(rnode);
visibilitymap_pin(reln, blkno, &vmbuffer);
@@ -7362,25 +7194,27 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
* we did for the heap page. If this results in a dropped bit, no
* real harm is done; and the next VACUUM will fix it.
*/
- if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
+ if (lsn > PageGetLSN(vmpage))
visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
xlrec->cutoff_xid);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
}
+ else if (BufferIsValid(vmbuffer))
+ UnlockReleaseBuffer(vmbuffer);
}
/*
* Replay XLOG_HEAP2_FREEZE_PAGE records
*/
static void
-heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_freeze_page(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record);
TransactionId cutoff_xid = xlrec->cutoff_xid;
Buffer buffer;
- Page page;
int ntup;
/*
@@ -7388,12 +7222,19 @@ heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record)
* consider the frozen xids as running.
*/
if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
+ {
+ RelFileNode rnode;
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block,
- &buffer) == BLK_NEEDS_REDO)
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+ ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode);
+ }
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
- page = BufferGetPage(buffer);
+ Page page = BufferGetPage(buffer);
+ xl_heap_freeze_tuple *tuples;
+
+ tuples = (xl_heap_freeze_tuple *) XLogRecGetBlockData(record, 0, NULL);
/* now execute freeze plan for each frozen tuple */
for (ntup = 0; ntup < xlrec->ntuples; ntup++)
@@ -7402,7 +7243,7 @@ heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record)
ItemId lp;
HeapTupleHeader tuple;
- xlrec_tp = &xlrec->tuples[ntup];
+ xlrec_tp = &tuples[ntup];
lp = PageGetItemId(page, xlrec_tp->offset); /* offsets are one-based */
tuple = (HeapTupleHeader) PageGetItem(page, lp);
@@ -7444,19 +7285,21 @@ fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
}
static void
-heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_delete(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
- OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
BlockNumber blkno;
RelFileNode target_node;
+ ItemPointerData target_tid;
- blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
- target_node = xlrec->target.node;
+ XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno);
+ ItemPointerSetBlockNumber(&target_tid, blkno);
+ ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/*
* The visibility map may need to be fixed even if the heap page is
@@ -7473,16 +7316,14 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
FreeFakeRelcacheEntry(reln);
}
- if (XLogReadBufferForRedo(lsn, record, 0, target_node, blkno, &buffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
- page = (Page) BufferGetPage(buffer);
+ page = BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
+ if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
+ lp = PageGetItemId(page, xlrec->offnum);
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
elog(PANIC, "heap_delete_redo: invalid lp");
htup = (HeapTupleHeader) PageGetItem(page, lp);
@@ -7496,13 +7337,13 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Mark the page as a candidate for pruning */
- PageSetPrunable(page, record->xl_xid);
+ PageSetPrunable(page, XLogRecGetXid(record));
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* Make sure there is no forward chain link in t_ctid */
- htup->t_ctid = xlrec->target.tid;
+ htup->t_ctid = target_tid;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
@@ -7511,12 +7352,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
}
static void
-heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_insert(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
Buffer buffer;
Page page;
- OffsetNumber offnum;
struct
{
HeapTupleHeaderData hdr;
@@ -7528,10 +7369,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
Size freespace = 0;
RelFileNode target_node;
BlockNumber blkno;
+ ItemPointerData target_tid;
XLogRedoAction action;
- target_node = xlrec->target.node;
- blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno);
+ ItemPointerSetBlockNumber(&target_tid, blkno);
+ ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
/*
* The visibility map may need to be fixed even if the heap page is
@@ -7549,51 +7392,51 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
}
/*
- * If we inserted the first and only tuple on the page, re-initialize
- * the page from scratch.
+ * If we inserted the first and only tuple on the page, re-initialize the
+ * page from scratch.
*/
- if (record->xl_info & XLOG_HEAP_INIT_PAGE)
+ if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
{
- XLogReadBufferForRedoExtended(lsn, record, 0,
- target_node, MAIN_FORKNUM, blkno,
- RBM_ZERO_AND_LOCK, false, &buffer);
+ buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 0, target_node, blkno,
- &buffer);
-
+ action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
+ Size datalen;
+ char *data;
+
page = BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
elog(PANIC, "heap_insert_redo: invalid max offset number");
- newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
- Assert(newlen <= MaxHeapTupleSize);
- memcpy((char *) &xlhdr,
- (char *) xlrec + SizeOfHeapInsert,
- SizeOfHeapHeader);
+ data = XLogRecGetBlockData(record, 0, &datalen);
+
+ newlen = datalen - SizeOfHeapHeader;
+ Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
+ memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
+ data += SizeOfHeapHeader;
+
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
- (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
+ data,
newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits);
htup->t_infomask2 = xlhdr.t_infomask2;
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
- HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
- htup->t_ctid = xlrec->target.tid;
+ htup->t_ctid = target_tid;
- offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
- if (offnum == InvalidOffsetNumber)
+ if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
+ true, true) == InvalidOffsetNumber)
elog(PANIC, "heap_insert_redo: failed to add tuple");
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
@@ -7618,16 +7461,16 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
+ XLogRecordPageWithFreeSpace(target_node, blkno, freespace);
}
/*
* Handles MULTI_INSERT record type.
*/
static void
-heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_multi_insert(XLogReaderState *record)
{
- char *recdata = XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_multi_insert *xlrec;
RelFileNode rnode;
BlockNumber blkno;
@@ -7642,27 +7485,16 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
uint32 newlen;
Size freespace = 0;
int i;
- bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+ bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
XLogRedoAction action;
/*
* Insertion doesn't overwrite MVCC data, so no conflict processing is
* required.
*/
+ xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
- xlrec = (xl_heap_multi_insert *) recdata;
- recdata += SizeOfHeapMultiInsert;
-
- rnode = xlrec->node;
- blkno = xlrec->blkno;
-
- /*
- * If we're reinitializing the page, the tuples are stored in order from
- * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
- * record.
- */
- if (!isinit)
- recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
/*
* The visibility map may need to be fixed even if the heap page is
@@ -7681,24 +7513,35 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
if (isinit)
{
- XLogReadBufferForRedoExtended(lsn, record, 0,
- rnode, MAIN_FORKNUM, blkno,
- RBM_ZERO_AND_LOCK, false, &buffer);
+ buffer = XLogInitBufferForRedo(record, 0);
page = BufferGetPage(buffer);
PageInit(page, BufferGetPageSize(buffer), 0);
action = BLK_NEEDS_REDO;
}
else
- action = XLogReadBufferForRedo(lsn, record, 0, rnode, blkno, &buffer);
-
+ action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO)
{
- page = BufferGetPage(buffer);
+ char *tupdata;
+ char *endptr;
+ Size len;
+
+ /* Tuples are stored as block data */
+ tupdata = XLogRecGetBlockData(record, 0, &len);
+ endptr = tupdata + len;
+
+ page = (Page) BufferGetPage(buffer);
+
for (i = 0; i < xlrec->ntuples; i++)
{
OffsetNumber offnum;
xl_multi_insert_tuple *xlhdr;
+ /*
+ * If we're reinitializing the page, the tuples are stored in
+ * order from FirstOffsetNumber. Otherwise there's an array of
+ * offsets in the WAL record, and the tuples come after that.
+ */
if (isinit)
offnum = FirstOffsetNumber + i;
else
@@ -7706,8 +7549,8 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
- recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
+ tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
newlen = xlhdr->datalen;
Assert(newlen <= MaxHeapTupleSize);
@@ -7715,15 +7558,15 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
- (char *) recdata,
+ (char *) tupdata,
newlen);
- recdata += newlen;
+ tupdata += newlen;
newlen += offsetof(HeapTupleHeaderData, t_bits);
htup->t_infomask2 = xlhdr->t_infomask2;
htup->t_infomask = xlhdr->t_infomask;
htup->t_hoff = xlhdr->t_hoff;
- HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
@@ -7732,6 +7575,8 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
}
+ if (tupdata != endptr)
+ elog(PANIC, "heap_multi_insert_redo: total tuple length mismatch");
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
@@ -7755,19 +7600,21 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
+ XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}
/*
* Handles UPDATE and HOT_UPDATE
*/
static void
-heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
+heap_xlog_update(XLogReaderState *record, bool hot_update)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
RelFileNode rnode;
BlockNumber oldblk;
BlockNumber newblk;
+ ItemPointerData newtid;
Buffer obuffer,
nbuffer;
Page page;
@@ -7775,7 +7622,6 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
ItemId lp = NULL;
HeapTupleData oldtup;
HeapTupleHeader htup;
- char *recdata;
uint16 prefixlen = 0,
suffixlen = 0;
char *newp;
@@ -7784,7 +7630,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
- xl_heap_header_len xlhdr;
+ xl_heap_header xlhdr;
uint32 newlen;
Size freespace = 0;
XLogRedoAction oldaction;
@@ -7794,9 +7640,16 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
oldtup.t_data = NULL;
oldtup.t_len = 0;
- rnode = xlrec->target.node;
- newblk = ItemPointerGetBlockNumber(&xlrec->newtid);
- oldblk = ItemPointerGetBlockNumber(&xlrec->target.tid);
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &newblk);
+ if (XLogRecGetBlockTag(record, 1, NULL, NULL, &oldblk))
+ {
+ /* HOT updates are never done across pages */
+ Assert(!hot_update);
+ }
+ else
+ oldblk = newblk;
+
+ ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
/*
* The visibility map may need to be fixed even if the heap page is
@@ -7824,12 +7677,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
*/
/* Deal with old tuple version */
- oldaction = XLogReadBufferForRedo(lsn, record, 0, rnode, oldblk, &obuffer);
+ oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
+ &obuffer);
if (oldaction == BLK_NEEDS_REDO)
{
- page = (Page) BufferGetPage(obuffer);
-
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ page = BufferGetPage(obuffer);
+ offnum = xlrec->old_offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
@@ -7852,10 +7705,10 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Set forward chain link in t_ctid */
- htup->t_ctid = xlrec->newtid;
+ htup->t_ctid = newtid;
/* Mark the page as a candidate for pruning */
- PageSetPrunable(page, record->xl_xid);
+ PageSetPrunable(page, XLogRecGetXid(record));
if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
@@ -7872,18 +7725,15 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
nbuffer = obuffer;
newaction = oldaction;
}
- else if (record->xl_info & XLOG_HEAP_INIT_PAGE)
+ else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
{
- XLogReadBufferForRedoExtended(lsn, record, 1,
- rnode, MAIN_FORKNUM, newblk,
- RBM_ZERO_AND_LOCK, false, &nbuffer);
+ nbuffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(nbuffer);
PageInit(page, BufferGetPageSize(nbuffer), 0);
newaction = BLK_NEEDS_REDO;
}
else
- newaction = XLogReadBufferForRedo(lsn, record, 1, rnode, newblk,
- &nbuffer);
+ newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
/*
* The visibility map may need to be fixed even if the heap page is
@@ -7891,7 +7741,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
*/
if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
{
- Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ Relation reln = CreateFakeRelcacheEntry(rnode);
Buffer vmbuffer = InvalidBuffer;
visibilitymap_pin(reln, newblk, &vmbuffer);
@@ -7903,14 +7753,20 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
/* Deal with new tuple */
if (newaction == BLK_NEEDS_REDO)
{
- page = (Page) BufferGetPage(nbuffer);
+ char *recdata;
+ char *recdata_end;
+ Size datalen;
+ Size tuplen;
+
+ recdata = XLogRecGetBlockData(record, 0, &datalen);
+ recdata_end = recdata + datalen;
- offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
+ page = BufferGetPage(nbuffer);
+
+ offnum = xlrec->new_offnum;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
- recdata = (char *) xlrec + SizeOfHeapUpdate;
-
if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
{
Assert(newblk == oldblk);
@@ -7924,10 +7780,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
recdata += sizeof(uint16);
}
- memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen);
- recdata += SizeOfHeapHeaderLen;
+ memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
+ recdata += SizeOfHeapHeader;
+
+ tuplen = recdata_end - recdata;
+ Assert(tuplen <= MaxHeapTupleSize);
- Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
@@ -7941,7 +7799,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
int len;
/* copy bitmap [+ padding] [+ oid] from WAL record */
- len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+ len = xlhdr.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
memcpy(newp, recdata, len);
recdata += len;
newp += len;
@@ -7951,7 +7809,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
newp += prefixlen;
/* copy new tuple data from WAL record */
- len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
+ len = tuplen - (xlhdr.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
memcpy(newp, recdata, len);
recdata += len;
newp += len;
@@ -7962,24 +7820,26 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
* copy bitmap [+ padding] [+ oid] + data from record, all in one
* go
*/
- memcpy(newp, recdata, xlhdr.t_len);
- recdata += xlhdr.t_len;
- newp += xlhdr.t_len;
+ memcpy(newp, recdata, tuplen);
+ recdata += tuplen;
+ newp += tuplen;
}
+ Assert(recdata == recdata_end);
+
/* copy suffix from old tuple */
if (suffixlen > 0)
memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
- newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen;
- htup->t_infomask2 = xlhdr.header.t_infomask2;
- htup->t_infomask = xlhdr.header.t_infomask;
- htup->t_hoff = xlhdr.header.t_hoff;
+ newlen = offsetof(HeapTupleHeaderData, t_bits) + tuplen + prefixlen + suffixlen;
+ htup->t_infomask2 = xlhdr.t_infomask2;
+ htup->t_infomask = xlhdr.t_infomask;
+ htup->t_hoff = xlhdr.t_hoff;
- HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, FirstCommandId);
HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
/* Make sure there is no forward chain link in t_ctid */
- htup->t_ctid = xlrec->newtid;
+ htup->t_ctid = newtid;
offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
if (offnum == InvalidOffsetNumber)
@@ -7993,6 +7853,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
PageSetLSN(page, lsn);
MarkBufferDirty(nbuffer);
}
+
if (BufferIsValid(nbuffer) && nbuffer != obuffer)
UnlockReleaseBuffer(nbuffer);
if (BufferIsValid(obuffer))
@@ -8014,14 +7875,13 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
* totally accurate anyway.
*/
if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- freespace);
+ XLogRecordPageWithFreeSpace(rnode, newblk, freespace);
}
static void
-heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_lock(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
Buffer buffer;
Page page;
@@ -8029,13 +7889,11 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
ItemId lp = NULL;
HeapTupleHeader htup;
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node,
- ItemPointerGetBlockNumber(&xlrec->target.tid),
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
@@ -8055,7 +7913,9 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
{
HeapTupleHeaderClearHotUpdated(htup);
/* Make sure there is no forward chain link in t_ctid */
- htup->t_ctid = xlrec->target.tid;
+ ItemPointerSet(&htup->t_ctid,
+ BufferGetBlockNumber(buffer),
+ offnum);
}
HeapTupleHeaderSetXmax(htup, xlrec->locking_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
@@ -8067,22 +7927,23 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
}
static void
-heap_xlog_lock_updated(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_lock_updated(XLogReaderState *record)
{
- xl_heap_lock_updated *xlrec =
- (xl_heap_lock_updated *) XLogRecGetData(record);
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_lock_updated *xlrec;
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- &buffer) == BLK_NEEDS_REDO)
+ xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+
+ offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
@@ -8103,8 +7964,9 @@ heap_xlog_lock_updated(XLogRecPtr lsn, XLogRecord *record)
}
static void
-heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
+heap_xlog_inplace(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
Buffer buffer;
Page page;
@@ -8112,15 +7974,15 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
ItemId lp = NULL;
HeapTupleHeader htup;
uint32 oldlen;
- uint32 newlen;
+ Size newlen;
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
+ char *newtup = XLogRecGetBlockData(record, 0, &newlen);
+
page = BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ offnum = xlrec->offnum;
if (PageGetMaxOffsetNumber(page) >= offnum)
lp = PageGetItemId(page, offnum);
@@ -8130,13 +7992,10 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
htup = (HeapTupleHeader) PageGetItem(page, lp);
oldlen = ItemIdGetLength(lp) - htup->t_hoff;
- newlen = record->xl_len - SizeOfHeapInplace;
if (oldlen != newlen)
elog(PANIC, "heap_inplace_redo: wrong tuple length");
- memcpy((char *) htup + htup->t_hoff,
- (char *) xlrec + SizeOfHeapInplace,
- newlen);
+ memcpy((char *) htup + htup->t_hoff, newtup, newlen);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
@@ -8146,9 +8005,9 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
}
void
-heap_redo(XLogRecPtr lsn, XLogRecord *record)
+heap_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/*
* These operations don't overwrite MVCC data so no conflict processing is
@@ -8158,22 +8017,22 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
- heap_xlog_insert(lsn, record);
+ heap_xlog_insert(record);
break;
case XLOG_HEAP_DELETE:
- heap_xlog_delete(lsn, record);
+ heap_xlog_delete(record);
break;
case XLOG_HEAP_UPDATE:
- heap_xlog_update(lsn, record, false);
+ heap_xlog_update(record, false);
break;
case XLOG_HEAP_HOT_UPDATE:
- heap_xlog_update(lsn, record, true);
+ heap_xlog_update(record, true);
break;
case XLOG_HEAP_LOCK:
- heap_xlog_lock(lsn, record);
+ heap_xlog_lock(record);
break;
case XLOG_HEAP_INPLACE:
- heap_xlog_inplace(lsn, record);
+ heap_xlog_inplace(record);
break;
default:
elog(PANIC, "heap_redo: unknown op code %u", info);
@@ -8181,29 +8040,29 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
}
void
-heap2_redo(XLogRecPtr lsn, XLogRecord *record)
+heap2_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP2_CLEAN:
- heap_xlog_clean(lsn, record);
+ heap_xlog_clean(record);
break;
case XLOG_HEAP2_FREEZE_PAGE:
- heap_xlog_freeze_page(lsn, record);
+ heap_xlog_freeze_page(record);
break;
case XLOG_HEAP2_CLEANUP_INFO:
- heap_xlog_cleanup_info(lsn, record);
+ heap_xlog_cleanup_info(record);
break;
case XLOG_HEAP2_VISIBLE:
- heap_xlog_visible(lsn, record);
+ heap_xlog_visible(record);
break;
case XLOG_HEAP2_MULTI_INSERT:
- heap_xlog_multi_insert(lsn, record);
+ heap_xlog_multi_insert(record);
break;
case XLOG_HEAP2_LOCK_UPDATED:
- heap_xlog_lock_updated(lsn, record);
+ heap_xlog_lock_updated(record);
break;
case XLOG_HEAP2_NEW_CID:
@@ -8213,7 +8072,7 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
*/
break;
case XLOG_HEAP2_REWRITE:
- heap_xlog_logical_rewrite(lsn, record);
+ heap_xlog_logical_rewrite(record);
break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);