diff options
Diffstat (limited to 'src/backend/access/nbtree')
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 207 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtpage.c | 175 | ||||
-rw-r--r-- | src/backend/access/nbtree/nbtxlog.c | 353 |
3 files changed, 257 insertions, 478 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index bcaba7e5e84..2c4f9904e1a 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -837,37 +837,25 @@ _bt_insertonpg(Relation rel, if (RelationNeedsWAL(rel)) { xl_btree_insert xlrec; - BlockNumber xlleftchild; xl_btree_metadata xlmeta; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[4]; - XLogRecData *nextrdata; IndexTupleData trunctuple; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); + xlrec.offnum = itup_off; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeInsert; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = nextrdata = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert); if (P_ISLEAF(lpageop)) xlinfo = XLOG_BTREE_INSERT_LEAF; else { /* - * Include the block number of the left child, whose - * INCOMPLETE_SPLIT flag was cleared. + * Register the left child whose INCOMPLETE_SPLIT flag was + * cleared. */ - xlleftchild = BufferGetBlockNumber(cbuf); - nextrdata->data = (char *) &xlleftchild; - nextrdata->len = sizeof(BlockNumber); - nextrdata->buffer = cbuf; - nextrdata->buffer_std = true; - nextrdata->next = nextrdata + 1; - nextrdata++; + XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD); xlinfo = XLOG_BTREE_INSERT_UPPER; } @@ -879,33 +867,25 @@ _bt_insertonpg(Relation rel, xlmeta.fastroot = metad->btm_fastroot; xlmeta.fastlevel = metad->btm_fastlevel; - nextrdata->data = (char *) &xlmeta; - nextrdata->len = sizeof(xl_btree_metadata); - nextrdata->buffer = InvalidBuffer; - nextrdata->next = nextrdata + 1; - nextrdata++; + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata)); xlinfo = XLOG_BTREE_INSERT_META; } /* Read comments in _bt_pgaddtup */ + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) { trunctuple = *itup; trunctuple.t_info = sizeof(IndexTupleData); - nextrdata->data = (char *) &trunctuple; - nextrdata->len = sizeof(IndexTupleData); + XLogRegisterBufData(0, (char *) &trunctuple, + sizeof(IndexTupleData)); } else - { - nextrdata->data = (char *) itup; - nextrdata->len = IndexTupleDSize(*itup); - } - nextrdata->buffer = buf; - nextrdata->buffer_std = true; - nextrdata->next = NULL; + XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup)); - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); if (BufferIsValid(metabuf)) { @@ -1260,56 +1240,37 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, xl_btree_split xlrec; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[7]; - XLogRecData *lastrdata; - BlockNumber cblkno; - - xlrec.node = rel->rd_node; - xlrec.leftsib = origpagenumber; - xlrec.rightsib = rightpagenumber; - xlrec.rnext = ropaque->btpo_next; + xlrec.level = ropaque->btpo.level; xlrec.firstright = firstright; + xlrec.newitemoff = newitemoff; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeSplit; - rdata[0].buffer = InvalidBuffer; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit); - lastrdata = &rdata[0]; + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT); + /* Log the right sibling, because we've changed its prev-pointer. */ + if (!P_RIGHTMOST(ropaque)) + XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD); + if (BufferIsValid(cbuf)) + XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD); /* - * Log the new item and its offset, if it was inserted on the left - * page. (If it was put on the right page, we don't need to explicitly - * WAL log it because it's included with all the other items on the - * right page.) Show the new item as belonging to the left page - * buffer, so that it is not stored if XLogInsert decides it needs a - * full-page image of the left page. We store the offset anyway, - * though, to support archive compression of these records. + * Log the new item, if it was inserted on the left page. (If it was + * put on the right page, we don't need to explicitly WAL log it + * because it's included with all the other items on the right page.) + * Show the new item as belonging to the left page buffer, so that it + * is not stored if XLogInsert decides it needs a full-page image of + * the left page. We store the offset anyway, though, to support + * archive compression of these records. */ if (newitemonleft) - { - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = (char *) &newitemoff; - lastrdata->len = sizeof(OffsetNumber); - lastrdata->buffer = InvalidBuffer; - - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = (char *) newitem; - lastrdata->len = MAXALIGN(newitemsz); - lastrdata->buffer = buf; /* backup block 0 */ - lastrdata->buffer_std = true; - } + XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz)); /* Log left page */ if (!isleaf) { - lastrdata->next = lastrdata + 1; - lastrdata++; - /* * We must also log the left page's high key, because the right * page's leftmost key is suppressed on non-leaf levels. Show it @@ -1319,43 +1280,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, */ itemid = PageGetItemId(origpage, P_HIKEY); item = (IndexTuple) PageGetItem(origpage, itemid); - lastrdata->data = (char *) item; - lastrdata->len = MAXALIGN(IndexTupleSize(item)); - lastrdata->buffer = buf; /* backup block 0 */ - lastrdata->buffer_std = true; - } - - if (isleaf && !newitemonleft) - { - lastrdata->next = lastrdata + 1; - lastrdata++; - - /* - * Although we don't need to WAL-log anything on the left page, we - * still need XLogInsert to consider storing a full-page image of - * the left page, so make an empty entry referencing that buffer. - * This also ensures that the left page is always backup block 0. - */ - lastrdata->data = NULL; - lastrdata->len = 0; - lastrdata->buffer = buf; /* backup block 0 */ - lastrdata->buffer_std = true; - } - - /* - * Log block number of left child, whose INCOMPLETE_SPLIT flag this - * insertion clears. - */ - if (!isleaf) - { - lastrdata->next = lastrdata + 1; - lastrdata++; - - cblkno = BufferGetBlockNumber(cbuf); - lastrdata->data = (char *) &cblkno; - lastrdata->len = sizeof(BlockNumber); - lastrdata->buffer = cbuf; /* backup block 1 */ - lastrdata->buffer_std = true; + XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item))); } /* @@ -1370,35 +1295,16 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, * and so the item pointers can be reconstructed. See comments for * _bt_restore_page(). */ - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = (char *) rightpage + - ((PageHeader) rightpage)->pd_upper; - lastrdata->len = ((PageHeader) rightpage)->pd_special - - ((PageHeader) rightpage)->pd_upper; - lastrdata->buffer = InvalidBuffer; - - /* Log the right sibling, because we've changed its' prev-pointer. */ - if (!P_RIGHTMOST(ropaque)) - { - lastrdata->next = lastrdata + 1; - lastrdata++; - - lastrdata->data = NULL; - lastrdata->len = 0; - lastrdata->buffer = sbuf; /* bkp block 1 (leaf) or 2 (non-leaf) */ - lastrdata->buffer_std = true; - } - - lastrdata->next = NULL; + XLogRegisterBufData(1, + (char *) rightpage + ((PageHeader) rightpage)->pd_upper, + ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper); if (isroot) xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT; else xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R; - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); PageSetLSN(origpage, recptr); PageSetLSN(rightpage, recptr); @@ -2090,34 +1996,35 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { xl_btree_newroot xlrec; XLogRecPtr recptr; - XLogRecData rdata[3]; + xl_btree_metadata md; - xlrec.node = rel->rd_node; xlrec.rootblk = rootblknum; xlrec.level = metad->btm_level; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeNewroot; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot); + + XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD); + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + + md.root = rootblknum; + md.level = metad->btm_level; + md.fastroot = rootblknum; + md.fastlevel = metad->btm_level; + + XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata)); /* * Direct access to page is not good but faster - we should implement * some new func in page API. */ - rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper; - rdata[1].len = ((PageHeader) rootpage)->pd_special - - ((PageHeader) rootpage)->pd_upper; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &(rdata[2]); - - /* Make a full-page image of the left child if needed */ - rdata[2].data = NULL; - rdata[2].len = 0; - rdata[2].buffer = lbuf; - rdata[2].next = NULL; - - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata); + XLogRegisterBufData(0, + (char *) rootpage + ((PageHeader) rootpage)->pd_upper, + ((PageHeader) rootpage)->pd_special - + ((PageHeader) rootpage)->pd_upper); + + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT); PageSetLSN(lpage, recptr); PageSetLSN(rootpage, recptr); diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index ea95ce6e1ec..a25dafeb400 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -236,18 +236,25 @@ _bt_getroot(Relation rel, int access) { xl_btree_newroot xlrec; XLogRecPtr recptr; - XLogRecData rdata; + xl_btree_metadata md; + + XLogBeginInsert(); + XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT); + XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT); + + md.root = rootblkno; + md.level = 0; + md.fastroot = rootblkno; + md.fastlevel = 0; + + XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata)); - xlrec.node = rel->rd_node; xlrec.rootblk = rootblkno; xlrec.level = 0; - rdata.data = (char *) &xlrec; - rdata.len = SizeOfBtreeNewroot; - rdata.buffer = InvalidBuffer; - rdata.next = NULL; + XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot); - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT); PageSetLSN(rootpage, recptr); PageSetLSN(metapg, recptr); @@ -528,39 +535,23 @@ _bt_checkpage(Relation rel, Buffer buf) static void _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedXid) { - if (!RelationNeedsWAL(rel)) - return; - - /* No ereport(ERROR) until changes are logged */ - START_CRIT_SECTION(); + xl_btree_reuse_page xlrec_reuse; /* - * We don't do MarkBufferDirty here because we're about to initialise the - * page, and nobody else can see it yet. + * Note that we don't register the buffer with the record, because this + * operation doesn't modify the page. This record only exists to provide a + * conflict point for Hot Standby. */ /* XLOG stuff */ - { - XLogRecData rdata[1]; - xl_btree_reuse_page xlrec_reuse; + xlrec_reuse.node = rel->rd_node; + xlrec_reuse.block = blkno; + xlrec_reuse.latestRemovedXid = latestRemovedXid; - xlrec_reuse.node = rel->rd_node; - xlrec_reuse.block = blkno; - xlrec_reuse.latestRemovedXid = latestRemovedXid; - rdata[0].data = (char *) &xlrec_reuse; - rdata[0].len = SizeOfBtreeReusePage; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = NULL; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec_reuse, SizeOfBtreeReusePage); - XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE, rdata); - - /* - * We don't do PageSetLSN here because we're about to initialise the - * page, so no need. - */ - } - - END_CRIT_SECTION(); + XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE); } /* @@ -633,7 +624,7 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access) * WAL record that will allow us to conflict with queries * running on standby. */ - if (XLogStandbyInfoActive()) + if (XLogStandbyInfoActive() && RelationNeedsWAL(rel)) { BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); @@ -830,17 +821,13 @@ _bt_delitems_vacuum(Relation rel, Buffer buf, if (RelationNeedsWAL(rel)) { XLogRecPtr recptr; - XLogRecData rdata[2]; xl_btree_vacuum xlrec_vacuum; - xlrec_vacuum.node = rel->rd_node; - xlrec_vacuum.block = BufferGetBlockNumber(buf); - xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed; - rdata[0].data = (char *) &xlrec_vacuum; - rdata[0].len = SizeOfBtreeVacuum; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterData((char *) &xlrec_vacuum, SizeOfBtreeVacuum); /* * The target-offsets array is not in the buffer, but pretend that it @@ -848,20 +835,9 @@ _bt_delitems_vacuum(Relation rel, Buffer buf, * need not be stored too. */ if (nitems > 0) - { - rdata[1].data = (char *) itemnos; - rdata[1].len = nitems * sizeof(OffsetNumber); - } - else - { - rdata[1].data = NULL; - rdata[1].len = 0; - } - rdata[1].buffer = buf; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + XLogRegisterBufData(0, (char *) itemnos, nitems * sizeof(OffsetNumber)); - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM); PageSetLSN(page, recptr); } @@ -919,36 +895,23 @@ _bt_delitems_delete(Relation rel, Buffer buf, if (RelationNeedsWAL(rel)) { XLogRecPtr recptr; - XLogRecData rdata[3]; xl_btree_delete xlrec_delete; - xlrec_delete.node = rel->rd_node; xlrec_delete.hnode = heapRel->rd_node; - xlrec_delete.block = BufferGetBlockNumber(buf); xlrec_delete.nitems = nitems; - rdata[0].data = (char *) &xlrec_delete; - rdata[0].len = SizeOfBtreeDelete; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_STANDARD); + XLogRegisterData((char *) &xlrec_delete, SizeOfBtreeDelete); /* * We need the target-offsets array whether or not we store the whole * buffer, to allow us to find the latestRemovedXid on a standby * server. */ - rdata[1].data = (char *) itemnos; - rdata[1].len = nitems * sizeof(OffsetNumber); - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &(rdata[2]); - - rdata[2].data = NULL; - rdata[2].len = 0; - rdata[2].buffer = buf; - rdata[2].buffer_std = true; - rdata[2].next = NULL; + XLogRegisterData((char *) itemnos, nitems * sizeof(OffsetNumber)); - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE); PageSetLSN(page, recptr); } @@ -1493,33 +1456,26 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) { xl_btree_mark_page_halfdead xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(topparent), topoff); + xlrec.poffset = topoff; xlrec.leafblk = leafblkno; if (target != leafblkno) xlrec.topparent = target; else xlrec.topparent = InvalidBlockNumber; + XLogBeginInsert(); + XLogRegisterBuffer(0, leafbuf, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, topparent, REGBUF_STANDARD); + page = BufferGetPage(leafbuf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); xlrec.leftblk = opaque->btpo_prev; xlrec.rightblk = opaque->btpo_next; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeMarkPageHalfDead; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].buffer = topparent; - rdata[1].buffer_std = true; - rdata[1].next = NULL; + XLogRegisterData((char *) &xlrec, SizeOfBtreeMarkPageHalfDead); - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_MARK_PAGE_HALFDEAD, rdata); + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_MARK_PAGE_HALFDEAD); page = BufferGetPage(topparent); PageSetLSN(page, recptr); @@ -1826,63 +1782,44 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty) xl_btree_metadata xlmeta; uint8 xlinfo; XLogRecPtr recptr; - XLogRecData rdata[4]; - XLogRecData *nextrdata; - xlrec.node = rel->rd_node; + XLogBeginInsert(); + + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); + if (BufferIsValid(lbuf)) + XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD); + XLogRegisterBuffer(2, rbuf, REGBUF_STANDARD); + if (target != leafblkno) + XLogRegisterBuffer(3, leafbuf, REGBUF_WILL_INIT); /* information on the unlinked block */ - xlrec.deadblk = target; xlrec.leftsib = leftsib; xlrec.rightsib = rightsib; xlrec.btpo_xact = opaque->btpo.xact; /* information needed to recreate the leaf block (if not the target) */ - xlrec.leafblk = leafblkno; xlrec.leafleftsib = leafleftsib; xlrec.leafrightsib = leafrightsib; xlrec.topparent = nextchild; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeUnlinkPage; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = nextrdata = &(rdata[1]); + XLogRegisterData((char *) &xlrec, SizeOfBtreeUnlinkPage); if (BufferIsValid(metabuf)) { + XLogRegisterBuffer(4, metabuf, REGBUF_WILL_INIT); + xlmeta.root = metad->btm_root; xlmeta.level = metad->btm_level; xlmeta.fastroot = metad->btm_fastroot; xlmeta.fastlevel = metad->btm_fastlevel; - nextrdata->data = (char *) &xlmeta; - nextrdata->len = sizeof(xl_btree_metadata); - nextrdata->buffer = InvalidBuffer; - nextrdata->next = nextrdata + 1; - nextrdata++; + XLogRegisterBufData(4, (char *) &xlmeta, sizeof(xl_btree_metadata)); xlinfo = XLOG_BTREE_UNLINK_PAGE_META; } else xlinfo = XLOG_BTREE_UNLINK_PAGE; - nextrdata->data = NULL; - nextrdata->len = 0; - nextrdata->buffer = rbuf; - nextrdata->buffer_std = true; - nextrdata->next = NULL; - - if (BufferIsValid(lbuf)) - { - nextrdata->next = nextrdata + 1; - nextrdata++; - nextrdata->data = NULL; - nextrdata->len = 0; - nextrdata->buffer = lbuf; - nextrdata->buffer_std = true; - nextrdata->next = NULL; - } - - recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo); if (BufferIsValid(metabuf)) { diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 13951be62af..52aef9b9836 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -72,17 +72,23 @@ _bt_restore_page(Page page, char *from, int len) } static void -_bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, - BlockNumber root, uint32 level, - BlockNumber fastroot, uint32 fastlevel) +_bt_restore_meta(XLogReaderState *record, uint8 block_id) { + XLogRecPtr lsn = record->EndRecPtr; Buffer metabuf; Page metapg; BTMetaPageData *md; BTPageOpaque pageop; + xl_btree_metadata *xlrec; + char *ptr; + Size len; - metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true); - Assert(BufferIsValid(metabuf)); + metabuf = XLogInitBufferForRedo(record, block_id); + ptr = XLogRecGetBlockData(record, block_id, &len); + + Assert(len == sizeof(xl_btree_metadata)); + Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE); + xlrec = (xl_btree_metadata *) ptr; metapg = BufferGetPage(metabuf); _bt_pageinit(metapg, BufferGetPageSize(metabuf)); @@ -90,10 +96,10 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, md = BTPageGetMeta(metapg); md->btm_magic = BTREE_MAGIC; md->btm_version = BTREE_VERSION; - md->btm_root = root; - md->btm_level = level; - md->btm_fastroot = fastroot; - md->btm_fastlevel = fastlevel; + md->btm_root = xlrec->root; + md->btm_level = xlrec->level; + md->btm_fastroot = xlrec->fastroot; + md->btm_fastlevel = xlrec->fastlevel; pageop = (BTPageOpaque) PageGetSpecialPointer(metapg); pageop->btpo_flags = BTP_META; @@ -117,14 +123,12 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, * types that can insert a downlink: insert, split, and newroot. */ static void -_bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record, - int block_index, - RelFileNode rnode, BlockNumber cblock) +_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id) { + XLogRecPtr lsn = record->EndRecPtr; Buffer buf; - if (XLogReadBufferForRedo(lsn, record, block_index, rnode, cblock, &buf) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buf); BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -140,38 +144,12 @@ _bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record, } static void -btree_xlog_insert(bool isleaf, bool ismeta, - XLogRecPtr lsn, XLogRecord *record) +btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record); Buffer buffer; Page page; - char *datapos; - int datalen; - xl_btree_metadata md; - BlockNumber cblkno = 0; - int main_blk_index; - - datapos = (char *) xlrec + SizeOfBtreeInsert; - datalen = record->xl_len - SizeOfBtreeInsert; - - /* - * if this insert finishes a split at lower level, extract the block - * number of the (left) child. - */ - if (!isleaf && (record->xl_info & XLR_BKP_BLOCK(0)) == 0) - { - memcpy(&cblkno, datapos, sizeof(BlockNumber)); - Assert(cblkno != 0); - datapos += sizeof(BlockNumber); - datalen -= sizeof(BlockNumber); - } - if (ismeta) - { - memcpy(&md, datapos, sizeof(xl_btree_metadata)); - datapos += sizeof(xl_btree_metadata); - datalen -= sizeof(xl_btree_metadata); - } /* * Insertion to an internal page finishes an incomplete split at the child @@ -183,21 +161,15 @@ btree_xlog_insert(bool isleaf, bool ismeta, * cannot be updates happening. */ if (!isleaf) + _bt_clear_incomplete_split(record, 1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { - _bt_clear_incomplete_split(lsn, record, 0, xlrec->target.node, cblkno); - main_blk_index = 1; - } - else - main_blk_index = 0; + Size datalen; + char *datapos = XLogRecGetBlockData(record, 0, &datalen); - if (XLogReadBufferForRedo(lsn, record, main_blk_index, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - &buffer) == BLK_NEEDS_REDO) - { page = BufferGetPage(buffer); - if (PageAddItem(page, (Item) datapos, datalen, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum, false, false) == InvalidOffsetNumber) elog(PANIC, "btree_insert_redo: failed to add item"); @@ -215,15 +187,13 @@ btree_xlog_insert(bool isleaf, bool ismeta, * obsolete link from the metapage. */ if (ismeta) - _bt_restore_meta(xlrec->target.node, lsn, - md.root, md.level, - md.fastroot, md.fastlevel); + _bt_restore_meta(record, 2); } static void -btree_xlog_split(bool onleft, bool isroot, - XLogRecPtr lsn, XLogRecord *record) +btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); bool isleaf = (xlrec->level == 0); Buffer lbuf; @@ -231,56 +201,17 @@ btree_xlog_split(bool onleft, bool isroot, Page rpage; BTPageOpaque ropaque; char *datapos; - int datalen; - OffsetNumber newitemoff = 0; - Item newitem = NULL; - Size newitemsz = 0; + Size datalen; Item left_hikey = NULL; Size left_hikeysz = 0; - BlockNumber cblkno = InvalidBlockNumber; - - datapos = (char *) xlrec + SizeOfBtreeSplit; - datalen = record->xl_len - SizeOfBtreeSplit; - - /* Extract newitemoff and newitem, if present */ - if (onleft) - { - memcpy(&newitemoff, datapos, sizeof(OffsetNumber)); - datapos += sizeof(OffsetNumber); - datalen -= sizeof(OffsetNumber); - } - if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0))) - { - /* - * We assume that 16-bit alignment is enough to apply IndexTupleSize - * (since it's fetching from a uint16 field) and also enough for - * PageAddItem to insert the tuple. - */ - newitem = (Item) datapos; - newitemsz = MAXALIGN(IndexTupleSize(newitem)); - datapos += newitemsz; - datalen -= newitemsz; - } - - /* Extract left hikey and its size (still assuming 16-bit alignment) */ - if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(0))) - { - left_hikey = (Item) datapos; - left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); - datapos += left_hikeysz; - datalen -= left_hikeysz; - } + BlockNumber leftsib; + BlockNumber rightsib; + BlockNumber rnext; - /* - * If this insertion finishes an incomplete split, get the block number of - * the child. - */ - if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(1))) - { - memcpy(&cblkno, datapos, sizeof(BlockNumber)); - datapos += sizeof(BlockNumber); - datalen -= sizeof(BlockNumber); - } + XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib); + XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib); + if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext)) + rnext = P_NONE; /* * Clear the incomplete split flag on the left sibling of the child page @@ -288,18 +219,18 @@ btree_xlog_split(bool onleft, bool isroot, * before locking the other pages) */ if (!isleaf) - _bt_clear_incomplete_split(lsn, record, 1, xlrec->node, cblkno); + _bt_clear_incomplete_split(record, 3); /* Reconstruct right (new) sibling page from scratch */ - rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true); - Assert(BufferIsValid(rbuf)); + rbuf = XLogInitBufferForRedo(record, 1); + datapos = XLogRecGetBlockData(record, 1, &datalen); rpage = (Page) BufferGetPage(rbuf); _bt_pageinit(rpage, BufferGetPageSize(rbuf)); ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage); - ropaque->btpo_prev = xlrec->leftsib; - ropaque->btpo_next = xlrec->rnext; + ropaque->btpo_prev = leftsib; + ropaque->btpo_next = rnext; ropaque->btpo.level = xlrec->level; ropaque->btpo_flags = isleaf ? BTP_LEAF : 0; ropaque->btpo_cycleid = 0; @@ -324,8 +255,7 @@ btree_xlog_split(bool onleft, bool isroot, /* don't release the buffer yet; we touch right page's first item below */ /* Now reconstruct left (original) sibling page */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->leftsib, - &lbuf) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO) { /* * To retain the same physical order of the tuples that they had, we @@ -339,9 +269,31 @@ btree_xlog_split(bool onleft, bool isroot, Page lpage = (Page) BufferGetPage(lbuf); BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); OffsetNumber off; + Item newitem; + Size newitemsz = 0; Page newlpage; OffsetNumber leftoff; + datapos = XLogRecGetBlockData(record, 0, &datalen); + + if (onleft) + { + newitem = (Item) datapos; + newitemsz = MAXALIGN(IndexTupleSize(newitem)); + datapos += newitemsz; + datalen -= newitemsz; + } + + /* Extract left hikey and its size (assuming 16-bit alignment) */ + if (!isleaf) + { + left_hikey = (Item) datapos; + left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); + datapos += left_hikeysz; + datalen -= left_hikeysz; + } + Assert(datalen == 0); + newlpage = PageGetTempPageCopySpecial(lpage); /* Set high key */ @@ -358,7 +310,7 @@ btree_xlog_split(bool onleft, bool isroot, Item item; /* add the new item if it was inserted on left page */ - if (onleft && off == newitemoff) + if (onleft && off == xlrec->newitemoff) { if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber) @@ -376,7 +328,7 @@ btree_xlog_split(bool onleft, bool isroot, } /* cope with possibility that newitem goes at the end */ - if (onleft && off == newitemoff) + if (onleft && off == xlrec->newitemoff) { if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber) @@ -390,7 +342,7 @@ btree_xlog_split(bool onleft, bool isroot, lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT; if (isleaf) lopaque->btpo_flags |= BTP_LEAF; - lopaque->btpo_next = xlrec->rightsib; + lopaque->btpo_next = rightsib; lopaque->btpo_cycleid = 0; PageSetLSN(lpage, lsn); @@ -410,22 +362,16 @@ btree_xlog_split(bool onleft, bool isroot, * replay, because no other index update can be in progress, and readers * will cope properly when following an obsolete left-link. */ - if (xlrec->rnext != P_NONE) + if (rnext != P_NONE) { - /* - * the backup block containing right sibling is 1 or 2, depending - * whether this was a leaf or internal page. - */ - int rnext_index = isleaf ? 1 : 2; Buffer buffer; - if (XLogReadBufferForRedo(lsn, record, rnext_index, xlrec->node, - xlrec->rnext, &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buffer); BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = xlrec->rightsib; + pageop->btpo_prev = rightsib; PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -436,8 +382,9 @@ btree_xlog_split(bool onleft, bool isroot, } static void -btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_vacuum(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record); Buffer buffer; Page page; @@ -466,9 +413,13 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) */ if (HotStandbyActiveInReplay()) { + RelFileNode thisrnode; + BlockNumber thisblkno; BlockNumber blkno; - for (blkno = xlrec->lastBlockVacuumed + 1; blkno < xlrec->block; blkno++) + XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno); + + for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++) { /* * We use RBM_NORMAL_NO_LOG mode because it's not an error @@ -483,7 +434,7 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) * buffer manager we could optimise this so that if the block is * not in shared_buffers we confirm it as unpinned. */ - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno, + buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno, RBM_NORMAL_NO_LOG); if (BufferIsValid(buffer)) { @@ -497,20 +448,23 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) * Like in btvacuumpage(), we need to take a cleanup lock on every leaf * page. See nbtree/README for details. */ - if (XLogReadBufferForRedoExtended(lsn, record, 0, - xlrec->node, MAIN_FORKNUM, xlrec->block, - RBM_NORMAL, true, &buffer) + if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer) == BLK_NEEDS_REDO) { + char *ptr; + Size len; + + ptr = XLogRecGetBlockData(record, 0, &len); + page = (Page) BufferGetPage(buffer); - if (record->xl_len > SizeOfBtreeVacuum) + if (len > 0) { OffsetNumber *unused; OffsetNumber *unend; - unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum); - unend = (OffsetNumber *) ((char *) xlrec + record->xl_len); + unused = (OffsetNumber *) ptr; + unend = (OffsetNumber *) ((char *) ptr + len); if ((unend - unused) > 0) PageIndexMultiDelete(page, unused, unend - unused); @@ -542,13 +496,16 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) * XXX optimise later with something like XLogPrefetchBuffer() */ static TransactionId -btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) +btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record) { + xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record); OffsetNumber *unused; Buffer ibuffer, hbuffer; Page ipage, hpage; + RelFileNode rnode; + BlockNumber blkno; ItemId iitemid, hitemid; IndexTuple itup; @@ -588,9 +545,11 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) * InvalidTransactionId to cancel all HS transactions. That's probably * overkill, but it's safe, and certainly better than panicking here. */ - ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false); + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL); if (!BufferIsValid(ibuffer)) return InvalidTransactionId; + LockBuffer(ibuffer, BT_READ); ipage = (Page) BufferGetPage(ibuffer); /* @@ -611,12 +570,13 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) * Locate the heap page that the index tuple points at */ hblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); - hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false); + hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL); if (!BufferIsValid(hbuffer)) { UnlockReleaseBuffer(ibuffer); return InvalidTransactionId; } + LockBuffer(hbuffer, BUFFER_LOCK_SHARE); hpage = (Page) BufferGetPage(hbuffer); /* @@ -678,8 +638,9 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) } static void -btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_delete(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record); Buffer buffer; Page page; @@ -698,21 +659,23 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) */ if (InHotStandby) { - TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec); + TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record); + RelFileNode rnode; - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node); + XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); + + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); } /* * We don't need to take a cleanup lock to apply these changes. See * nbtree/README for details. */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); - if (record->xl_len > SizeOfBtreeDelete) + if (XLogRecGetDataLen(record) > SizeOfBtreeDelete) { OffsetNumber *unused; @@ -736,17 +699,15 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) } static void -btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) +btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record); - BlockNumber parent; Buffer buffer; Page page; BTPageOpaque pageop; IndexTupleData trunctuple; - parent = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - /* * In normal operation, we would lock all the pages this WAL record * touches before changing any of them. In WAL replay, it should be okay @@ -756,8 +717,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) */ /* parent page */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, parent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { OffsetNumber poffset; ItemId itemid; @@ -768,7 +728,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); - poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + poffset = xlrec->poffset; nextoffset = OffsetNumberNext(poffset); itemid = PageGetItemId(page, nextoffset); @@ -788,8 +748,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* Rewrite the leaf page as a halfdead page */ - buffer = XLogReadBuffer(xlrec->target.node, xlrec->leafblk, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); _bt_pageinit(page, BufferGetPageSize(buffer)); @@ -822,17 +781,16 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) static void -btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) +btree_xlog_unlink_page(uint8 info, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record); - BlockNumber target; BlockNumber leftsib; BlockNumber rightsib; Buffer buffer; Page page; BTPageOpaque pageop; - target = xlrec->deadblk; leftsib = xlrec->leftsib; rightsib = xlrec->rightsib; @@ -845,8 +803,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) */ /* Fix left-link of right sibling */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, rightsib, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -861,8 +818,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) /* Fix right-link of left sibling, if any */ if (leftsib != P_NONE) { - if (XLogReadBufferForRedo(lsn, record, 1, xlrec->node, leftsib, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -876,8 +832,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) } /* Rewrite target page as empty deleted page */ - buffer = XLogReadBuffer(xlrec->node, target, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); _bt_pageinit(page, BufferGetPageSize(buffer)); @@ -898,7 +853,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) * itself, update the leaf to point to the next remaining child in the * branch. */ - if (target != xlrec->leafblk) + if (XLogRecHasBlockRef(record, 3)) { /* * There is no real data on the page, so we just re-create it from @@ -906,8 +861,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) */ IndexTupleData trunctuple; - buffer = XLogReadBuffer(xlrec->node, xlrec->leafblk, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 3); page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -936,27 +890,21 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) /* Update metapage if needed */ if (info == XLOG_BTREE_UNLINK_PAGE_META) - { - xl_btree_metadata md; - - memcpy(&md, (char *) xlrec + SizeOfBtreeUnlinkPage, - sizeof(xl_btree_metadata)); - _bt_restore_meta(xlrec->node, lsn, - md.root, md.level, - md.fastroot, md.fastlevel); - } + _bt_restore_meta(record, 4); } static void -btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_newroot(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record); Buffer buffer; Page page; BTPageOpaque pageop; + char *ptr; + Size len; - buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); _bt_pageinit(page, BufferGetPageSize(buffer)); @@ -969,34 +917,24 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) pageop->btpo_flags |= BTP_LEAF; pageop->btpo_cycleid = 0; - if (record->xl_len > SizeOfBtreeNewroot) + if (xlrec->level > 0) { - IndexTuple itup; - BlockNumber cblkno; - - _bt_restore_page(page, - (char *) xlrec + SizeOfBtreeNewroot, - record->xl_len - SizeOfBtreeNewroot); - /* extract block number of the left-hand split page */ - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY)); - cblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + ptr = XLogRecGetBlockData(record, 0, &len); + _bt_restore_page(page, ptr, len); /* Clear the incomplete-split flag in left child */ - _bt_clear_incomplete_split(lsn, record, 0, xlrec->node, cblkno); + _bt_clear_incomplete_split(record, 1); } PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - _bt_restore_meta(xlrec->node, lsn, - xlrec->rootblk, xlrec->level, - xlrec->rootblk, xlrec->level); + _bt_restore_meta(record, 2); } static void -btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_reuse_page(XLogReaderState *record) { xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record); @@ -1015,58 +953,55 @@ btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record) ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); } - - /* Backup blocks are not used in reuse_page records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); } void -btree_redo(XLogRecPtr lsn, XLogRecord *record) +btree_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; switch (info) { case XLOG_BTREE_INSERT_LEAF: - btree_xlog_insert(true, false, lsn, record); + btree_xlog_insert(true, false, record); break; case XLOG_BTREE_INSERT_UPPER: - btree_xlog_insert(false, false, lsn, record); + btree_xlog_insert(false, false, record); break; case XLOG_BTREE_INSERT_META: - btree_xlog_insert(false, true, lsn, record); + btree_xlog_insert(false, true, record); break; case XLOG_BTREE_SPLIT_L: - btree_xlog_split(true, false, lsn, record); + btree_xlog_split(true, false, record); break; case XLOG_BTREE_SPLIT_R: - btree_xlog_split(false, false, lsn, record); + btree_xlog_split(false, false, record); break; case XLOG_BTREE_SPLIT_L_ROOT: - btree_xlog_split(true, true, lsn, record); + btree_xlog_split(true, true, record); break; case XLOG_BTREE_SPLIT_R_ROOT: - btree_xlog_split(false, true, lsn, record); + btree_xlog_split(false, true, record); break; case XLOG_BTREE_VACUUM: - btree_xlog_vacuum(lsn, record); + btree_xlog_vacuum(record); break; case XLOG_BTREE_DELETE: - btree_xlog_delete(lsn, record); + btree_xlog_delete(record); break; case XLOG_BTREE_MARK_PAGE_HALFDEAD: - btree_xlog_mark_page_halfdead(info, lsn, record); + btree_xlog_mark_page_halfdead(info, record); break; case XLOG_BTREE_UNLINK_PAGE: case XLOG_BTREE_UNLINK_PAGE_META: - btree_xlog_unlink_page(info, lsn, record); + btree_xlog_unlink_page(info, record); break; case XLOG_BTREE_NEWROOT: - btree_xlog_newroot(lsn, record); + btree_xlog_newroot(record); break; case XLOG_BTREE_REUSE_PAGE: - btree_xlog_reuse_page(lsn, record); + btree_xlog_reuse_page(record); break; default: elog(PANIC, "btree_redo: unknown op code %u", info); |