diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 987 |
1 files changed, 423 insertions, 564 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1763b70631d..c6e1eb79b2c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2132,84 +2132,64 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; - bool need_tuple_data; + int bufflags = 0; /* - * For logical decoding, we need the tuple even if we're doing a full - * page write, so make sure to log it separately. (XXX We could - * alternatively store a pointer into the FPW). - * - * Also, if this is a catalog, we need to transmit combocids to - * properly decode, so log that as well. + * If this is a catalog, we need to transmit combocids to properly + * decode, so log that as well. */ - need_tuple_data = RelationIsLogicallyLogged(relation); if (RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, heaptup); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; - xlrec.target.node = relation->rd_node; - xlrec.target.tid = heaptup->t_self; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapInsert; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - xlhdr.t_infomask2 = heaptup->t_data->t_infomask2; - xlhdr.t_infomask = heaptup->t_data->t_infomask; - xlhdr.t_hoff = heaptup->t_data->t_hoff; - /* - * note we mark rdata[1] as belonging to buffer; if XLogInsert decides - * to write the whole page to the xlog, we don't need to store - * xl_heap_header in the xlog. + * If this is the single and first tuple on page, we can reinit the + * page instead of restoring the whole thing. Set flag, and hide + * buffer references from XLogInsert. */ - rdata[1].data = (char *) &xlhdr; - rdata[1].len = SizeOfHeapHeader; - rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; - rdata[1].buffer_std = true; - rdata[1].next = &(rdata[2]); + if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber && + PageGetMaxOffsetNumber(page) == FirstOffsetNumber) + { + info |= XLOG_HEAP_INIT_PAGE; + bufflags |= REGBUF_WILL_INIT; + } - /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits); - rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer; - rdata[2].buffer_std = true; - rdata[2].next = NULL; + xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self); + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer)); /* - * Make a separate rdata entry for the tuple's buffer if we're doing - * logical decoding, so that an eventual FPW doesn't remove the - * tuple's data. + * For logical decoding, we need the tuple even if we're doing a full + * page write, so make sure it's included even if we take a full-page + * image. (XXX We could alternatively store a pointer into the FPW). */ - if (need_tuple_data) + if (RelationIsLogicallyLogged(relation)) { - rdata[2].next = &(rdata[3]); - - rdata[3].data = NULL; - rdata[3].len = 0; - rdata[3].buffer = buffer; - rdata[3].buffer_std = true; - rdata[3].next = NULL; - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + bufflags |= REGBUF_KEEP_DATA; } + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapInsert); + + xlhdr.t_infomask2 = heaptup->t_data->t_infomask2; + xlhdr.t_infomask = heaptup->t_data->t_infomask; + xlhdr.t_hoff = heaptup->t_data->t_hoff; + /* - * If this is the single and first tuple on page, we can reinit the - * page instead of restoring the whole thing. Set flag, and hide - * buffer references from XLogInsert. + * note we mark xlhdr as belonging to buffer; if XLogInsert decides to + * write the whole page to the xlog, we don't need to store + * xl_heap_header in the xlog. */ - if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber && - PageGetMaxOffsetNumber(page) == FirstOffsetNumber) - { - info |= XLOG_HEAP_INIT_PAGE; - rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; - } + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); + XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader); + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + XLogRegisterBufData(0, + (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits), + heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits)); - recptr = XLogInsert(RM_HEAP_ID, info, rdata); + recptr = XLogInsert(RM_HEAP_ID, info); PageSetLSN(page, recptr); } @@ -2397,6 +2377,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, break; RelationPutHeapTuple(relation, buffer, heaptup); + + /* + * We don't use heap_multi_insert for catalog tuples yet, but + * better be prepared... + */ + if (needwal && need_cids) + log_heap_new_cid(relation, heaptup); } if (PageIsAllVisible(page)) @@ -2419,12 +2406,12 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, { XLogRecPtr recptr; xl_heap_multi_insert *xlrec; - XLogRecData rdata[3]; uint8 info = XLOG_HEAP2_MULTI_INSERT; char *tupledata; int totaldatalen; char *scratchptr = scratch; bool init; + int bufflags = 0; /* * If the page was previously empty, we can reinit the page @@ -2450,8 +2437,6 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, tupledata = scratchptr; xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; - xlrec->node = relation->rd_node; - xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntuples = nthispage; /* @@ -2481,64 +2466,40 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, datalen); tuphdr->datalen = datalen; scratchptr += datalen; - - /* - * We don't use heap_multi_insert for catalog tuples yet, but - * better be prepared... - */ - if (need_cids) - log_heap_new_cid(relation, heaptup); } totaldatalen = scratchptr - tupledata; Assert((scratchptr - scratch) < BLCKSZ); - rdata[0].data = (char *) xlrec; - rdata[0].len = tupledata - scratch; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &rdata[1]; - - rdata[1].data = tupledata; - rdata[1].len = totaldatalen; - rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; - rdata[1].buffer_std = true; - rdata[1].next = NULL; - - /* - * Make a separate rdata entry for the tuple's buffer if we're - * doing logical decoding, so that an eventual FPW doesn't remove - * the tuple's data. - */ if (need_tuple_data) - { - rdata[1].next = &(rdata[2]); - - rdata[2].data = NULL; - rdata[2].len = 0; - rdata[2].buffer = buffer; - rdata[2].buffer_std = true; - rdata[2].next = NULL; xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; - } /* - * If we're going to reinitialize the whole page using the WAL - * record, hide buffer reference from XLogInsert. + * Signal that this is the last xl_heap_multi_insert record + * emitted by this call to heap_multi_insert(). Needed for logical + * decoding so it knows when to cleanup temporary data. */ + if (ndone + nthispage == ntuples) + xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT; + if (init) { - rdata[1].buffer = rdata[2].buffer = InvalidBuffer; info |= XLOG_HEAP_INIT_PAGE; + bufflags |= REGBUF_WILL_INIT; } /* - * Signal that this is the last xl_heap_multi_insert record - * emitted by this call to heap_multi_insert(). Needed for logical - * decoding so it knows when to cleanup temporary data. + * If we're doing logical decoding, include the new tuple data + * even if we take a full-page image of the page. */ - if (ndone + nthispage == ntuples) - xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT; + if (need_tuple_data) + bufflags |= REGBUF_KEEP_DATA; + + XLogBeginInsert(); + XLogRegisterData((char *) xlrec, tupledata - scratch); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); - recptr = XLogInsert(RM_HEAP2_ID, info, rdata); + XLogRegisterBufData(0, tupledata, totaldatalen); + recptr = XLogInsert(RM_HEAP2_ID, info); PageSetLSN(page, recptr); } @@ -2909,7 +2870,6 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr; - XLogRecData rdata[4]; /* For logical decode we need combocids to properly decode the catalog */ if (RelationIsAccessibleInLogicalDecoding(relation)) @@ -2918,19 +2878,21 @@ l1: xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); - xlrec.target.node = relation->rd_node; - xlrec.target.tid = tp.t_self; + xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); xlrec.xmax = new_xmax; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapDelete; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + if (old_key_tuple != NULL) + { + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); /* * Log replica identity of the deleted tuple if there is one @@ -2943,27 +2905,14 @@ l1: xlhdr.t_infomask = old_key_tuple->t_data->t_infomask; xlhdr.t_hoff = old_key_tuple->t_data->t_hoff; - rdata[1].next = &(rdata[2]); - rdata[2].data = (char *) &xlhdr; - rdata[2].len = SizeOfHeapHeader; - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; - - rdata[2].next = &(rdata[3]); - rdata[3].data = (char *) old_key_tuple->t_data - + offsetof(HeapTupleHeaderData, t_bits); - rdata[3].len = old_key_tuple->t_len - - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = InvalidBuffer; - rdata[3].next = NULL; - - if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; - else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader); + XLogRegisterData((char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits), + old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits)); } - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); PageSetLSN(page, recptr); } @@ -4735,25 +4684,17 @@ failed: { xl_heap_lock xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; - xlrec.target.node = relation->rd_node; - xlrec.target.tid = tuple->t_self; + XLogBeginInsert(); + XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD); + + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); xlrec.locking_xid = xid; xlrec.infobits_set = compute_infobits(new_infomask, tuple->t_data->t_infomask2); - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapLock; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].buffer = *buffer; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + XLogRegisterData((char *) &xlrec, SizeOfHeapLock); - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata); + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); PageSetLSN(page, recptr); } @@ -5342,26 +5283,18 @@ l4: { xl_heap_lock_updated xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; Page page = BufferGetPage(buf); - xlrec.target.node = rel->rd_node; - xlrec.target.tid = mytup.t_self; + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + + xlrec.offnum = ItemPointerGetOffsetNumber(&mytup.t_self); xlrec.xmax = new_xmax; xlrec.infobits_set = compute_infobits(new_infomask, new_infomask2); - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapLockUpdated; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogRegisterData((char *) &xlrec, SizeOfHeapLockUpdated); - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].buffer = buf; - rdata[1].buffer_std = true; - rdata[1].next = NULL; - - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED, rdata); + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED); PageSetLSN(page, recptr); } @@ -5489,23 +5422,16 @@ heap_inplace_update(Relation relation, HeapTuple tuple) { xl_heap_inplace xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; - xlrec.target.node = relation->rd_node; - xlrec.target.tid = tuple->t_self; + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapInplace; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapInplace); - rdata[1].data = (char *) htup + htup->t_hoff; - rdata[1].len = newlen; - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen); - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE, rdata); + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); PageSetLSN(page, recptr); } @@ -6507,17 +6433,14 @@ log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid) { xl_heap_cleanup_info xlrec; XLogRecPtr recptr; - XLogRecData rdata; xlrec.node = rnode; xlrec.latestRemovedXid = latestRemovedXid; - rdata.data = (char *) &xlrec; - rdata.len = SizeOfHeapCleanupInfo; - rdata.buffer = InvalidBuffer; - rdata.next = NULL; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapCleanupInfo); - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO, &rdata); + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO); return recptr; } @@ -6542,23 +6465,19 @@ log_heap_clean(Relation reln, Buffer buffer, TransactionId latestRemovedXid) { xl_heap_clean xlrec; - uint8 info; XLogRecPtr recptr; - XLogRecData rdata[4]; /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); - xlrec.node = reln->rd_node; - xlrec.block = BufferGetBlockNumber(buffer); xlrec.latestRemovedXid = latestRemovedXid; xlrec.nredirected = nredirected; xlrec.ndead = ndead; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapClean; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapClean); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); /* * The OffsetNumber arrays are not actually in the buffer, but we pretend @@ -6569,49 +6488,18 @@ log_heap_clean(Relation reln, Buffer buffer, * even if no item pointers changed state. */ if (nredirected > 0) - { - rdata[1].data = (char *) redirected; - rdata[1].len = nredirected * sizeof(OffsetNumber) * 2; - } - else - { - rdata[1].data = NULL; - rdata[1].len = 0; - } - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - rdata[1].next = &(rdata[2]); + XLogRegisterBufData(0, (char *) redirected, + nredirected * sizeof(OffsetNumber) * 2); if (ndead > 0) - { - rdata[2].data = (char *) nowdead; - rdata[2].len = ndead * sizeof(OffsetNumber); - } - else - { - rdata[2].data = NULL; - rdata[2].len = 0; - } - rdata[2].buffer = buffer; - rdata[2].buffer_std = true; - rdata[2].next = &(rdata[3]); + XLogRegisterBufData(0, (char *) nowdead, + ndead * sizeof(OffsetNumber)); if (nunused > 0) - { - rdata[3].data = (char *) nowunused; - rdata[3].len = nunused * sizeof(OffsetNumber); - } - else - { - rdata[3].data = NULL; - rdata[3].len = 0; - } - rdata[3].buffer = buffer; - rdata[3].buffer_std = true; - rdata[3].next = NULL; + XLogRegisterBufData(0, (char *) nowunused, + nunused * sizeof(OffsetNumber)); - info = XLOG_HEAP2_CLEAN; - recptr = XLogInsert(RM_HEAP2_ID, info, rdata); + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEAN); return recptr; } @@ -6626,35 +6514,28 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, { xl_heap_freeze_page xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); /* nor when there are no tuples to freeze */ Assert(ntuples > 0); - xlrec.node = reln->rd_node; - xlrec.block = BufferGetBlockNumber(buffer); xlrec.cutoff_xid = cutoff_xid; xlrec.ntuples = ntuples; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapFreezePage; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage); /* * The freeze plan array is not actually in the buffer, but pretend that * it is. When XLogInsert stores the whole buffer, the freeze plan need * not be stored too. */ - rdata[1].data = (char *) tuples; - rdata[1].len = ntuples * sizeof(xl_heap_freeze_tuple); - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + XLogRegisterBufData(0, (char *) tuples, + ntuples * sizeof(xl_heap_freeze_tuple)); - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE, rdata); + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE); return recptr; } @@ -6665,8 +6546,8 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, * corresponding visibility map block. Both should have already been modified * and dirtied. * - * If checksums are enabled, we also add the heap_buffer to the chain to - * protect it from being torn. + * If checksums are enabled, we also generate a full-page image of + * heap_buffer, if necessary. */ XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, @@ -6674,38 +6555,23 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, { xl_heap_visible xlrec; XLogRecPtr recptr; - XLogRecData rdata[3]; + uint8 flags; Assert(BufferIsValid(heap_buffer)); Assert(BufferIsValid(vm_buffer)); - xlrec.node = rnode; - xlrec.block = BufferGetBlockNumber(heap_buffer); xlrec.cutoff_xid = cutoff_xid; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapVisible); - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapVisible; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogRegisterBuffer(0, vm_buffer, 0); - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].buffer = vm_buffer; - rdata[1].buffer_std = false; - rdata[1].next = NULL; + flags = REGBUF_STANDARD; + if (!XLogHintBitIsNeeded()) + flags |= REGBUF_NO_IMAGE; + XLogRegisterBuffer(1, heap_buffer, flags); - if (XLogHintBitIsNeeded()) - { - rdata[1].next = &(rdata[2]); - - rdata[2].data = NULL; - rdata[2].len = 0; - rdata[2].buffer = heap_buffer; - rdata[2].buffer_std = true; - rdata[2].next = NULL; - } - - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata); + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE); return recptr; } @@ -6721,22 +6587,23 @@ log_heap_update(Relation reln, Buffer oldbuf, bool all_visible_cleared, bool new_all_visible_cleared) { xl_heap_update xlrec; - xl_heap_header_len xlhdr; - xl_heap_header_len xlhdr_idx; + xl_heap_header xlhdr; + xl_heap_header xlhdr_idx; uint8 info; uint16 prefix_suffix[2]; uint16 prefixlen = 0, suffixlen = 0; XLogRecPtr recptr; - XLogRecData rdata[9]; Page page = BufferGetPage(newbuf); bool need_tuple_data = RelationIsLogicallyLogged(reln); - int nr; - Buffer newbufref; + bool init; + int bufflags; /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); + XLogBeginInsert(); + if (HeapTupleIsHeapOnly(newtup)) info = XLOG_HEAP_HOT_UPDATE; else @@ -6794,103 +6661,97 @@ log_heap_update(Relation reln, Buffer oldbuf, suffixlen = 0; } - xlrec.target.node = reln->rd_node; - xlrec.target.tid = oldtup->t_self; - xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data); - xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask, - oldtup->t_data->t_infomask2); - xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); + /* Prepare main WAL data chain */ xlrec.flags = 0; if (all_visible_cleared) xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; - xlrec.newtid = newtup->t_self; if (new_all_visible_cleared) xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; if (prefixlen > 0) xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD; if (suffixlen > 0) xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD; + if (need_tuple_data) + { + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + if (old_key_tuple) + { + if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + } /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - newbufref = InvalidBuffer; + init = true; } else - newbufref = newbuf; + init = false; - rdata[0].data = NULL; - rdata[0].len = 0; - rdata[0].buffer = oldbuf; - rdata[0].buffer_std = true; - rdata[0].next = &(rdata[1]); + /* Prepare WAL data for the old page */ + xlrec.old_offnum = ItemPointerGetOffsetNumber(&oldtup->t_self); + xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data); + xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask, + oldtup->t_data->t_infomask2); + + /* Prepare WAL data for the new page */ + xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self); + xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); + + bufflags = REGBUF_STANDARD; + if (init) + bufflags |= REGBUF_WILL_INIT; + if (need_tuple_data) + bufflags |= REGBUF_KEEP_DATA; - rdata[1].data = (char *) &xlrec; - rdata[1].len = SizeOfHeapUpdate; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &(rdata[2]); + XLogRegisterBuffer(0, newbuf, bufflags); + if (oldbuf != newbuf) + XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD); - /* prefix and/or suffix length fields */ + XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate); + + /* + * Prepare WAL data for the new tuple. + */ if (prefixlen > 0 || suffixlen > 0) { if (prefixlen > 0 && suffixlen > 0) { prefix_suffix[0] = prefixlen; prefix_suffix[1] = suffixlen; - rdata[2].data = (char *) &prefix_suffix; - rdata[2].len = 2 * sizeof(uint16); + XLogRegisterBufData(0, (char *) &prefix_suffix, sizeof(uint16) * 2); } else if (prefixlen > 0) { - rdata[2].data = (char *) &prefixlen; - rdata[2].len = sizeof(uint16); + XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16)); } else { - rdata[2].data = (char *) &suffixlen; - rdata[2].len = sizeof(uint16); + XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16)); } - rdata[2].buffer = newbufref; - rdata[2].buffer_std = true; - rdata[2].next = &(rdata[3]); - nr = 3; } - else - nr = 2; - - xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2; - xlhdr.header.t_infomask = newtup->t_data->t_infomask; - xlhdr.header.t_hoff = newtup->t_data->t_hoff; - Assert(offsetof(HeapTupleHeaderData, t_bits) +prefixlen + suffixlen <= newtup->t_len); - xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -prefixlen - suffixlen; - /* - * As with insert records, we need not store this rdata segment if we - * decide to store the whole buffer instead, unless we're doing logical - * decoding. - */ - rdata[nr].data = (char *) &xlhdr; - rdata[nr].len = SizeOfHeapHeaderLen; - rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; - rdata[nr].buffer_std = true; - rdata[nr].next = &(rdata[nr + 1]); - nr++; + xlhdr.t_infomask2 = newtup->t_data->t_infomask2; + xlhdr.t_infomask = newtup->t_data->t_infomask; + xlhdr.t_hoff = newtup->t_data->t_hoff; + Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len); /* * PG73FORMAT: write bitmap [+ padding] [+ oid] + data * * The 'data' doesn't include the common prefix or suffix. */ + XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader); if (prefixlen == 0) { - rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits); - rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -suffixlen; - rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; - rdata[nr].buffer_std = true; - rdata[nr].next = NULL; - nr++; + XLogRegisterBufData(0, + ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits), + newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) -suffixlen); } else { @@ -6901,75 +6762,33 @@ log_heap_update(Relation reln, Buffer oldbuf, /* bitmap [+ padding] [+ oid] */ if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) >0) { - rdata[nr - 1].next = &(rdata[nr]); - rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits); - rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits); - rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; - rdata[nr].buffer_std = true; - rdata[nr].next = NULL; - nr++; + XLogRegisterBufData(0, + ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits), + newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits)); } /* data after common prefix */ - rdata[nr - 1].next = &(rdata[nr]); - rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen; - rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen; - rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; - rdata[nr].buffer_std = true; - rdata[nr].next = NULL; - nr++; + XLogRegisterBufData(0, + ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen, + newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen); } - /* - * Separate storage for the FPW buffer reference of the new page in the - * wal_level >= logical case. - */ - if (need_tuple_data) + /* We need to log a tuple identity */ + if (need_tuple_data && old_key_tuple) { - rdata[nr - 1].next = &(rdata[nr]); - - rdata[nr].data = NULL, - rdata[nr].len = 0; - rdata[nr].buffer = newbufref; - rdata[nr].buffer_std = true; - rdata[nr].next = NULL; - nr++; - - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + /* don't really need this, but its more comfy to decode */ + xlhdr_idx.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr_idx.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr_idx.t_hoff = old_key_tuple->t_data->t_hoff; - /* We need to log a tuple identity */ - if (old_key_tuple) - { - /* don't really need this, but its more comfy to decode */ - xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2; - xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask; - xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff; - xlhdr_idx.t_len = old_key_tuple->t_len; - - rdata[nr - 1].next = &(rdata[nr]); - rdata[nr].data = (char *) &xlhdr_idx; - rdata[nr].len = SizeOfHeapHeaderLen; - rdata[nr].buffer = InvalidBuffer; - rdata[nr].next = &(rdata[nr + 1]); - nr++; - - /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[nr].data = (char *) old_key_tuple->t_data - + offsetof(HeapTupleHeaderData, t_bits); - rdata[nr].len = old_key_tuple->t_len - - offsetof(HeapTupleHeaderData, t_bits); - rdata[nr].buffer = InvalidBuffer; - rdata[nr].next = NULL; - nr++; + XLogRegisterData((char *) &xlhdr_idx, SizeOfHeapHeader); - if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; - else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; - } + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + XLogRegisterData((char *) old_key_tuple->t_data + offsetof(HeapTupleHeaderData, t_bits), + old_key_tuple->t_len - offsetof(HeapTupleHeaderData, t_bits)); } - recptr = XLogInsert(RM_HEAP_ID, info, rdata); + recptr = XLogInsert(RM_HEAP_ID, info); return recptr; } @@ -6986,15 +6805,14 @@ log_heap_new_cid(Relation relation, HeapTuple tup) xl_heap_new_cid xlrec; XLogRecPtr recptr; - XLogRecData rdata[1]; HeapTupleHeader hdr = tup->t_data; Assert(ItemPointerIsValid(&tup->t_self)); Assert(tup->t_tableOid != InvalidOid); xlrec.top_xid = GetTopTransactionId(); - xlrec.target.node = relation->rd_node; - xlrec.target.tid = tup->t_self; + xlrec.target_node = relation->rd_node; + xlrec.target_tid = tup->t_self; /* * If the tuple got inserted & deleted in the same TX we definitely have a @@ -7035,12 +6853,15 @@ log_heap_new_cid(Relation relation, HeapTuple tup) xlrec.combocid = InvalidCommandId; } - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapNewCid; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = NULL; + /* + * Note that we don't need to register the buffer here, because this + * operation does not modify the page. The insert/update/delete that + * called us certainly did, but that's WAL-logged separately. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid); - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata); + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID); return recptr; } @@ -7165,7 +6986,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool * * Handles CLEANUP_INFO */ static void -heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_cleanup_info(XLogReaderState *record) { xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); @@ -7179,15 +7000,16 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record) */ /* Backup blocks are not used in cleanup_info records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + Assert(!XLogRecHasAnyBlockRefs(record)); } /* * Handles HEAP2_CLEAN record type */ static void -heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_clean(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record); Buffer buffer; Size freespace = 0; @@ -7195,8 +7017,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) BlockNumber blkno; XLogRedoAction action; - rnode = xlrec->node; - blkno = xlrec->block; + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); /* * We're about to remove tuples. In Hot Standby mode, ensure that there's @@ -7213,9 +7034,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) * If we have a full-page image, restore it (using a cleanup lock) and * we're done. */ - action = XLogReadBufferForRedoExtended(lsn, record, 0, - rnode, MAIN_FORKNUM, blkno, - RBM_NORMAL, true, &buffer); + action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, + &buffer); if (action == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buffer); @@ -7226,11 +7046,13 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) int nredirected; int ndead; int nunused; + Size datalen; + + redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen); nredirected = xlrec->nredirected; ndead = xlrec->ndead; - end = (OffsetNumber *) ((char *) xlrec + record->xl_len); - redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); + end = (OffsetNumber *) ((char *) redirected + datalen); nowdead = redirected + (nredirected * 2); nowunused = nowdead + ndead; nunused = (end - nowunused); @@ -7263,7 +7085,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) * totally accurate anyway. */ if (action == BLK_NEEDS_REDO) - XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace); + XLogRecordPageWithFreeSpace(rnode, blkno, freespace); } /* @@ -7275,17 +7097,18 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) * page modification would fail to clear the visibility map bit. */ static void -heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_visible(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record); + Buffer vmbuffer = InvalidBuffer; Buffer buffer; Page page; RelFileNode rnode; BlockNumber blkno; XLogRedoAction action; - rnode = xlrec->node; - blkno = xlrec->block; + XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno); /* * If there are any Hot Standby transactions running that have an xmin @@ -7304,7 +7127,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) * truncated later in recovery, we don't need to update the page, but we'd * better still update the visibility map. */ - action = XLogReadBufferForRedo(lsn, record, 1, rnode, blkno, &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { /* @@ -7341,12 +7164,21 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) * the visibility map bit does so before checking the page LSN, so any * bits that need to be cleared will still be cleared. */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false, + &vmbuffer) == BLK_NEEDS_REDO) { + Page vmpage = BufferGetPage(vmbuffer); Relation reln; - Buffer vmbuffer = InvalidBuffer; + + /* initialize the page if it was read as zeros */ + if (PageIsNew(vmpage)) + PageInit(vmpage, BLCKSZ, 0); + + /* + * XLogReplayBufferExtended locked the buffer. But visibilitymap_set + * will handle locking itself. + */ + LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK); reln = CreateFakeRelcacheEntry(rnode); visibilitymap_pin(reln, blkno, &vmbuffer); @@ -7362,25 +7194,27 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) * we did for the heap page. If this results in a dropped bit, no * real harm is done; and the next VACUUM will fix it. */ - if (lsn > PageGetLSN(BufferGetPage(vmbuffer))) + if (lsn > PageGetLSN(vmpage)) visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer, xlrec->cutoff_xid); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } + else if (BufferIsValid(vmbuffer)) + UnlockReleaseBuffer(vmbuffer); } /* * Replay XLOG_HEAP2_FREEZE_PAGE records */ static void -heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_freeze_page(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; Buffer buffer; - Page page; int ntup; /* @@ -7388,12 +7222,19 @@ heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record) * consider the frozen xids as running. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node); + { + RelFileNode rnode; - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block, - &buffer) == BLK_NEEDS_REDO) + XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); + ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode); + } + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { - page = BufferGetPage(buffer); + Page page = BufferGetPage(buffer); + xl_heap_freeze_tuple *tuples; + + tuples = (xl_heap_freeze_tuple *) XLogRecGetBlockData(record, 0, NULL); /* now execute freeze plan for each frozen tuple */ for (ntup = 0; ntup < xlrec->ntuples; ntup++) @@ -7402,7 +7243,7 @@ heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record) ItemId lp; HeapTupleHeader tuple; - xlrec_tp = &xlrec->tuples[ntup]; + xlrec_tp = &tuples[ntup]; lp = PageGetItemId(page, xlrec_tp->offset); /* offsets are one-based */ tuple = (HeapTupleHeader) PageGetItem(page, lp); @@ -7444,19 +7285,21 @@ fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2) } static void -heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_delete(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); Buffer buffer; Page page; - OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; BlockNumber blkno; RelFileNode target_node; + ItemPointerData target_tid; - blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - target_node = xlrec->target.node; + XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno); + ItemPointerSetBlockNumber(&target_tid, blkno); + ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); /* * The visibility map may need to be fixed even if the heap page is @@ -7473,16 +7316,14 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (XLogReadBufferForRedo(lsn, record, 0, target_node, blkno, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { - page = (Page) BufferGetPage(buffer); + page = BufferGetPage(buffer); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); + if (PageGetMaxOffsetNumber(page) >= xlrec->offnum) + lp = PageGetItemId(page, xlrec->offnum); - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp)) elog(PANIC, "heap_delete_redo: invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); @@ -7496,13 +7337,13 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Mark the page as a candidate for pruning */ - PageSetPrunable(page, record->xl_xid); + PageSetPrunable(page, XLogRecGetXid(record)); if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = xlrec->target.tid; + htup->t_ctid = target_tid; PageSetLSN(page, lsn); MarkBufferDirty(buffer); } @@ -7511,12 +7352,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) } static void -heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_insert(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); Buffer buffer; Page page; - OffsetNumber offnum; struct { HeapTupleHeaderData hdr; @@ -7528,10 +7369,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) Size freespace = 0; RelFileNode target_node; BlockNumber blkno; + ItemPointerData target_tid; XLogRedoAction action; - target_node = xlrec->target.node; - blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + XLogRecGetBlockTag(record, 0, &target_node, NULL, &blkno); + ItemPointerSetBlockNumber(&target_tid, blkno); + ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); /* * The visibility map may need to be fixed even if the heap page is @@ -7549,51 +7392,51 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) } /* - * If we inserted the first and only tuple on the page, re-initialize - * the page from scratch. + * If we inserted the first and only tuple on the page, re-initialize the + * page from scratch. */ - if (record->xl_info & XLOG_HEAP_INIT_PAGE) + if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) { - XLogReadBufferForRedoExtended(lsn, record, 0, - target_node, MAIN_FORKNUM, blkno, - RBM_ZERO_AND_LOCK, false, &buffer); + buffer = XLogInitBufferForRedo(record, 0); page = BufferGetPage(buffer); PageInit(page, BufferGetPageSize(buffer), 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 0, target_node, blkno, - &buffer); - + action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { + Size datalen; + char *data; + page = BufferGetPage(buffer); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) + 1 < offnum) + if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum) elog(PANIC, "heap_insert_redo: invalid max offset number"); - newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader; - Assert(newlen <= MaxHeapTupleSize); - memcpy((char *) &xlhdr, - (char *) xlrec + SizeOfHeapInsert, - SizeOfHeapHeader); + data = XLogRecGetBlockData(record, 0, &datalen); + + newlen = datalen - SizeOfHeapHeader; + Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize); + memcpy((char *) &xlhdr, data, SizeOfHeapHeader); + data += SizeOfHeapHeader; + htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), - (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader, + data, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); htup->t_infomask2 = xlhdr.t_infomask2; htup->t_infomask = xlhdr.t_infomask; htup->t_hoff = xlhdr.t_hoff; - HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); HeapTupleHeaderSetCmin(htup, FirstCommandId); - htup->t_ctid = xlrec->target.tid; + htup->t_ctid = target_tid; - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); - if (offnum == InvalidOffsetNumber) + if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum, + true, true) == InvalidOffsetNumber) elog(PANIC, "heap_insert_redo: failed to add tuple"); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7618,16 +7461,16 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) * totally accurate anyway. */ if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) - XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace); + XLogRecordPageWithFreeSpace(target_node, blkno, freespace); } /* * Handles MULTI_INSERT record type. */ static void -heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_multi_insert(XLogReaderState *record) { - char *recdata = XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; xl_heap_multi_insert *xlrec; RelFileNode rnode; BlockNumber blkno; @@ -7642,27 +7485,16 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) uint32 newlen; Size freespace = 0; int i; - bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0; + bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0; XLogRedoAction action; /* * Insertion doesn't overwrite MVCC data, so no conflict processing is * required. */ + xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); - xlrec = (xl_heap_multi_insert *) recdata; - recdata += SizeOfHeapMultiInsert; - - rnode = xlrec->node; - blkno = xlrec->blkno; - - /* - * If we're reinitializing the page, the tuples are stored in order from - * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL - * record. - */ - if (!isinit) - recdata += sizeof(OffsetNumber) * xlrec->ntuples; + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); /* * The visibility map may need to be fixed even if the heap page is @@ -7681,24 +7513,35 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) if (isinit) { - XLogReadBufferForRedoExtended(lsn, record, 0, - rnode, MAIN_FORKNUM, blkno, - RBM_ZERO_AND_LOCK, false, &buffer); + buffer = XLogInitBufferForRedo(record, 0); page = BufferGetPage(buffer); PageInit(page, BufferGetPageSize(buffer), 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 0, rnode, blkno, &buffer); - + action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { - page = BufferGetPage(buffer); + char *tupdata; + char *endptr; + Size len; + + /* Tuples are stored as block data */ + tupdata = XLogRecGetBlockData(record, 0, &len); + endptr = tupdata + len; + + page = (Page) BufferGetPage(buffer); + for (i = 0; i < xlrec->ntuples; i++) { OffsetNumber offnum; xl_multi_insert_tuple *xlhdr; + /* + * If we're reinitializing the page, the tuples are stored in + * order from FirstOffsetNumber. Otherwise there's an array of + * offsets in the WAL record, and the tuples come after that. + */ if (isinit) offnum = FirstOffsetNumber + i; else @@ -7706,8 +7549,8 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_multi_insert_redo: invalid max offset number"); - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata); - recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata); + tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; newlen = xlhdr->datalen; Assert(newlen <= MaxHeapTupleSize); @@ -7715,15 +7558,15 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), - (char *) recdata, + (char *) tupdata, newlen); - recdata += newlen; + tupdata += newlen; newlen += offsetof(HeapTupleHeaderData, t_bits); htup->t_infomask2 = xlhdr->t_infomask2; htup->t_infomask = xlhdr->t_infomask; htup->t_hoff = xlhdr->t_hoff; - HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); HeapTupleHeaderSetCmin(htup, FirstCommandId); ItemPointerSetBlockNumber(&htup->t_ctid, blkno); ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); @@ -7732,6 +7575,8 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_multi_insert_redo: failed to add tuple"); } + if (tupdata != endptr) + elog(PANIC, "heap_multi_insert_redo: total tuple length mismatch"); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7755,19 +7600,21 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) * totally accurate anyway. */ if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) - XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace); + XLogRecordPageWithFreeSpace(rnode, blkno, freespace); } /* * Handles UPDATE and HOT_UPDATE */ static void -heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) +heap_xlog_update(XLogReaderState *record, bool hot_update) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); RelFileNode rnode; BlockNumber oldblk; BlockNumber newblk; + ItemPointerData newtid; Buffer obuffer, nbuffer; Page page; @@ -7775,7 +7622,6 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) ItemId lp = NULL; HeapTupleData oldtup; HeapTupleHeader htup; - char *recdata; uint16 prefixlen = 0, suffixlen = 0; char *newp; @@ -7784,7 +7630,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; - xl_heap_header_len xlhdr; + xl_heap_header xlhdr; uint32 newlen; Size freespace = 0; XLogRedoAction oldaction; @@ -7794,9 +7640,16 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) oldtup.t_data = NULL; oldtup.t_len = 0; - rnode = xlrec->target.node; - newblk = ItemPointerGetBlockNumber(&xlrec->newtid); - oldblk = ItemPointerGetBlockNumber(&xlrec->target.tid); + XLogRecGetBlockTag(record, 0, &rnode, NULL, &newblk); + if (XLogRecGetBlockTag(record, 1, NULL, NULL, &oldblk)) + { + /* HOT updates are never done across pages */ + Assert(!hot_update); + } + else + oldblk = newblk; + + ItemPointerSet(&newtid, newblk, xlrec->new_offnum); /* * The visibility map may need to be fixed even if the heap page is @@ -7824,12 +7677,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) */ /* Deal with old tuple version */ - oldaction = XLogReadBufferForRedo(lsn, record, 0, rnode, oldblk, &obuffer); + oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1, + &obuffer); if (oldaction == BLK_NEEDS_REDO) { - page = (Page) BufferGetPage(obuffer); - - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + page = BufferGetPage(obuffer); + offnum = xlrec->old_offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); @@ -7852,10 +7705,10 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Set forward chain link in t_ctid */ - htup->t_ctid = xlrec->newtid; + htup->t_ctid = newtid; /* Mark the page as a candidate for pruning */ - PageSetPrunable(page, record->xl_xid); + PageSetPrunable(page, XLogRecGetXid(record)); if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); @@ -7872,18 +7725,15 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) nbuffer = obuffer; newaction = oldaction; } - else if (record->xl_info & XLOG_HEAP_INIT_PAGE) + else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) { - XLogReadBufferForRedoExtended(lsn, record, 1, - rnode, MAIN_FORKNUM, newblk, - RBM_ZERO_AND_LOCK, false, &nbuffer); + nbuffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(nbuffer); PageInit(page, BufferGetPageSize(nbuffer), 0); newaction = BLK_NEEDS_REDO; } else - newaction = XLogReadBufferForRedo(lsn, record, 1, rnode, newblk, - &nbuffer); + newaction = XLogReadBufferForRedo(record, 0, &nbuffer); /* * The visibility map may need to be fixed even if the heap page is @@ -7891,7 +7741,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) */ if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) { - Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, newblk, &vmbuffer); @@ -7903,14 +7753,20 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) /* Deal with new tuple */ if (newaction == BLK_NEEDS_REDO) { - page = (Page) BufferGetPage(nbuffer); + char *recdata; + char *recdata_end; + Size datalen; + Size tuplen; + + recdata = XLogRecGetBlockData(record, 0, &datalen); + recdata_end = recdata + datalen; - offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); + page = BufferGetPage(nbuffer); + + offnum = xlrec->new_offnum; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - recdata = (char *) xlrec + SizeOfHeapUpdate; - if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) { Assert(newblk == oldblk); @@ -7924,10 +7780,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) recdata += sizeof(uint16); } - memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen); - recdata += SizeOfHeapHeaderLen; + memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader); + recdata += SizeOfHeapHeader; + + tuplen = recdata_end - recdata; + Assert(tuplen <= MaxHeapTupleSize); - Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); @@ -7941,7 +7799,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) int len; /* copy bitmap [+ padding] [+ oid] from WAL record */ - len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits); + len = xlhdr.t_hoff - offsetof(HeapTupleHeaderData, t_bits); memcpy(newp, recdata, len); recdata += len; newp += len; @@ -7951,7 +7809,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) newp += prefixlen; /* copy new tuple data from WAL record */ - len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits)); + len = tuplen - (xlhdr.t_hoff - offsetof(HeapTupleHeaderData, t_bits)); memcpy(newp, recdata, len); recdata += len; newp += len; @@ -7962,24 +7820,26 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * copy bitmap [+ padding] [+ oid] + data from record, all in one * go */ - memcpy(newp, recdata, xlhdr.t_len); - recdata += xlhdr.t_len; - newp += xlhdr.t_len; + memcpy(newp, recdata, tuplen); + recdata += tuplen; + newp += tuplen; } + Assert(recdata == recdata_end); + /* copy suffix from old tuple */ if (suffixlen > 0) memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); - newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen; - htup->t_infomask2 = xlhdr.header.t_infomask2; - htup->t_infomask = xlhdr.header.t_infomask; - htup->t_hoff = xlhdr.header.t_hoff; + newlen = offsetof(HeapTupleHeaderData, t_bits) + tuplen + prefixlen + suffixlen; + htup->t_infomask2 = xlhdr.t_infomask2; + htup->t_infomask = xlhdr.t_infomask; + htup->t_hoff = xlhdr.t_hoff; - HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); HeapTupleHeaderSetCmin(htup, FirstCommandId); HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = xlrec->newtid; + htup->t_ctid = newtid; offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) @@ -7993,6 +7853,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) PageSetLSN(page, lsn); MarkBufferDirty(nbuffer); } + if (BufferIsValid(nbuffer) && nbuffer != obuffer) UnlockReleaseBuffer(nbuffer); if (BufferIsValid(obuffer)) @@ -8014,14 +7875,13 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * totally accurate anyway. */ if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5) - XLogRecordPageWithFreeSpace(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - freespace); + XLogRecordPageWithFreeSpace(rnode, newblk, freespace); } static void -heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_lock(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); Buffer buffer; Page page; @@ -8029,13 +7889,11 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, - ItemPointerGetBlockNumber(&xlrec->target.tid), - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); @@ -8055,7 +7913,9 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) { HeapTupleHeaderClearHotUpdated(htup); /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = xlrec->target.tid; + ItemPointerSet(&htup->t_ctid, + BufferGetBlockNumber(buffer), + offnum); } HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); @@ -8067,22 +7927,23 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) } static void -heap_xlog_lock_updated(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_lock_updated(XLogReaderState *record) { - xl_heap_lock_updated *xlrec = - (xl_heap_lock_updated *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_lock_updated *xlrec; Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - &buffer) == BLK_NEEDS_REDO) + xlrec = (xl_heap_lock_updated *) XLogRecGetData(record); + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + + offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); @@ -8103,8 +7964,9 @@ heap_xlog_lock_updated(XLogRecPtr lsn, XLogRecord *record) } static void -heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) +heap_xlog_inplace(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record); Buffer buffer; Page page; @@ -8112,15 +7974,15 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; uint32 oldlen; - uint32 newlen; + Size newlen; - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { + char *newtup = XLogRecGetBlockData(record, 0, &newlen); + page = BufferGetPage(buffer); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); @@ -8130,13 +7992,10 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) htup = (HeapTupleHeader) PageGetItem(page, lp); oldlen = ItemIdGetLength(lp) - htup->t_hoff; - newlen = record->xl_len - SizeOfHeapInplace; if (oldlen != newlen) elog(PANIC, "heap_inplace_redo: wrong tuple length"); - memcpy((char *) htup + htup->t_hoff, - (char *) xlrec + SizeOfHeapInplace, - newlen); + memcpy((char *) htup + htup->t_hoff, newtup, newlen); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -8146,9 +8005,9 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) } void -heap_redo(XLogRecPtr lsn, XLogRecord *record) +heap_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; /* * These operations don't overwrite MVCC data so no conflict processing is @@ -8158,22 +8017,22 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP_INSERT: - heap_xlog_insert(lsn, record); + heap_xlog_insert(record); break; case XLOG_HEAP_DELETE: - heap_xlog_delete(lsn, record); + heap_xlog_delete(record); break; case XLOG_HEAP_UPDATE: - heap_xlog_update(lsn, record, false); + heap_xlog_update(record, false); break; case XLOG_HEAP_HOT_UPDATE: - heap_xlog_update(lsn, record, true); + heap_xlog_update(record, true); break; case XLOG_HEAP_LOCK: - heap_xlog_lock(lsn, record); + heap_xlog_lock(record); break; case XLOG_HEAP_INPLACE: - heap_xlog_inplace(lsn, record); + heap_xlog_inplace(record); break; default: elog(PANIC, "heap_redo: unknown op code %u", info); @@ -8181,29 +8040,29 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) } void -heap2_redo(XLogRecPtr lsn, XLogRecord *record) +heap2_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP2_CLEAN: - heap_xlog_clean(lsn, record); + heap_xlog_clean(record); break; case XLOG_HEAP2_FREEZE_PAGE: - heap_xlog_freeze_page(lsn, record); + heap_xlog_freeze_page(record); break; case XLOG_HEAP2_CLEANUP_INFO: - heap_xlog_cleanup_info(lsn, record); + heap_xlog_cleanup_info(record); break; case XLOG_HEAP2_VISIBLE: - heap_xlog_visible(lsn, record); + heap_xlog_visible(record); break; case XLOG_HEAP2_MULTI_INSERT: - heap_xlog_multi_insert(lsn, record); + heap_xlog_multi_insert(record); break; case XLOG_HEAP2_LOCK_UPDATED: - heap_xlog_lock_updated(lsn, record); + heap_xlog_lock_updated(record); break; case XLOG_HEAP2_NEW_CID: @@ -8213,7 +8072,7 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) */ break; case XLOG_HEAP2_REWRITE: - heap_xlog_logical_rewrite(lsn, record); + heap_xlog_logical_rewrite(record); break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); |