diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r-- | src/backend/access/nbtree/nbtxlog.c | 353 |
1 files changed, 144 insertions, 209 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 13951be62af..52aef9b9836 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -72,17 +72,23 @@ _bt_restore_page(Page page, char *from, int len) } static void -_bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, - BlockNumber root, uint32 level, - BlockNumber fastroot, uint32 fastlevel) +_bt_restore_meta(XLogReaderState *record, uint8 block_id) { + XLogRecPtr lsn = record->EndRecPtr; Buffer metabuf; Page metapg; BTMetaPageData *md; BTPageOpaque pageop; + xl_btree_metadata *xlrec; + char *ptr; + Size len; - metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true); - Assert(BufferIsValid(metabuf)); + metabuf = XLogInitBufferForRedo(record, block_id); + ptr = XLogRecGetBlockData(record, block_id, &len); + + Assert(len == sizeof(xl_btree_metadata)); + Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE); + xlrec = (xl_btree_metadata *) ptr; metapg = BufferGetPage(metabuf); _bt_pageinit(metapg, BufferGetPageSize(metabuf)); @@ -90,10 +96,10 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, md = BTPageGetMeta(metapg); md->btm_magic = BTREE_MAGIC; md->btm_version = BTREE_VERSION; - md->btm_root = root; - md->btm_level = level; - md->btm_fastroot = fastroot; - md->btm_fastlevel = fastlevel; + md->btm_root = xlrec->root; + md->btm_level = xlrec->level; + md->btm_fastroot = xlrec->fastroot; + md->btm_fastlevel = xlrec->fastlevel; pageop = (BTPageOpaque) PageGetSpecialPointer(metapg); pageop->btpo_flags = BTP_META; @@ -117,14 +123,12 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, * types that can insert a downlink: insert, split, and newroot. */ static void -_bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record, - int block_index, - RelFileNode rnode, BlockNumber cblock) +_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id) { + XLogRecPtr lsn = record->EndRecPtr; Buffer buf; - if (XLogReadBufferForRedo(lsn, record, block_index, rnode, cblock, &buf) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buf); BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -140,38 +144,12 @@ _bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record, } static void -btree_xlog_insert(bool isleaf, bool ismeta, - XLogRecPtr lsn, XLogRecord *record) +btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record); Buffer buffer; Page page; - char *datapos; - int datalen; - xl_btree_metadata md; - BlockNumber cblkno = 0; - int main_blk_index; - - datapos = (char *) xlrec + SizeOfBtreeInsert; - datalen = record->xl_len - SizeOfBtreeInsert; - - /* - * if this insert finishes a split at lower level, extract the block - * number of the (left) child. - */ - if (!isleaf && (record->xl_info & XLR_BKP_BLOCK(0)) == 0) - { - memcpy(&cblkno, datapos, sizeof(BlockNumber)); - Assert(cblkno != 0); - datapos += sizeof(BlockNumber); - datalen -= sizeof(BlockNumber); - } - if (ismeta) - { - memcpy(&md, datapos, sizeof(xl_btree_metadata)); - datapos += sizeof(xl_btree_metadata); - datalen -= sizeof(xl_btree_metadata); - } /* * Insertion to an internal page finishes an incomplete split at the child @@ -183,21 +161,15 @@ btree_xlog_insert(bool isleaf, bool ismeta, * cannot be updates happening. */ if (!isleaf) + _bt_clear_incomplete_split(record, 1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { - _bt_clear_incomplete_split(lsn, record, 0, xlrec->target.node, cblkno); - main_blk_index = 1; - } - else - main_blk_index = 0; + Size datalen; + char *datapos = XLogRecGetBlockData(record, 0, &datalen); - if (XLogReadBufferForRedo(lsn, record, main_blk_index, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - &buffer) == BLK_NEEDS_REDO) - { page = BufferGetPage(buffer); - if (PageAddItem(page, (Item) datapos, datalen, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum, false, false) == InvalidOffsetNumber) elog(PANIC, "btree_insert_redo: failed to add item"); @@ -215,15 +187,13 @@ btree_xlog_insert(bool isleaf, bool ismeta, * obsolete link from the metapage. */ if (ismeta) - _bt_restore_meta(xlrec->target.node, lsn, - md.root, md.level, - md.fastroot, md.fastlevel); + _bt_restore_meta(record, 2); } static void -btree_xlog_split(bool onleft, bool isroot, - XLogRecPtr lsn, XLogRecord *record) +btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); bool isleaf = (xlrec->level == 0); Buffer lbuf; @@ -231,56 +201,17 @@ btree_xlog_split(bool onleft, bool isroot, Page rpage; BTPageOpaque ropaque; char *datapos; - int datalen; - OffsetNumber newitemoff = 0; - Item newitem = NULL; - Size newitemsz = 0; + Size datalen; Item left_hikey = NULL; Size left_hikeysz = 0; - BlockNumber cblkno = InvalidBlockNumber; - - datapos = (char *) xlrec + SizeOfBtreeSplit; - datalen = record->xl_len - SizeOfBtreeSplit; - - /* Extract newitemoff and newitem, if present */ - if (onleft) - { - memcpy(&newitemoff, datapos, sizeof(OffsetNumber)); - datapos += sizeof(OffsetNumber); - datalen -= sizeof(OffsetNumber); - } - if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0))) - { - /* - * We assume that 16-bit alignment is enough to apply IndexTupleSize - * (since it's fetching from a uint16 field) and also enough for - * PageAddItem to insert the tuple. - */ - newitem = (Item) datapos; - newitemsz = MAXALIGN(IndexTupleSize(newitem)); - datapos += newitemsz; - datalen -= newitemsz; - } - - /* Extract left hikey and its size (still assuming 16-bit alignment) */ - if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(0))) - { - left_hikey = (Item) datapos; - left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); - datapos += left_hikeysz; - datalen -= left_hikeysz; - } + BlockNumber leftsib; + BlockNumber rightsib; + BlockNumber rnext; - /* - * If this insertion finishes an incomplete split, get the block number of - * the child. - */ - if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(1))) - { - memcpy(&cblkno, datapos, sizeof(BlockNumber)); - datapos += sizeof(BlockNumber); - datalen -= sizeof(BlockNumber); - } + XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib); + XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib); + if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext)) + rnext = P_NONE; /* * Clear the incomplete split flag on the left sibling of the child page @@ -288,18 +219,18 @@ btree_xlog_split(bool onleft, bool isroot, * before locking the other pages) */ if (!isleaf) - _bt_clear_incomplete_split(lsn, record, 1, xlrec->node, cblkno); + _bt_clear_incomplete_split(record, 3); /* Reconstruct right (new) sibling page from scratch */ - rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true); - Assert(BufferIsValid(rbuf)); + rbuf = XLogInitBufferForRedo(record, 1); + datapos = XLogRecGetBlockData(record, 1, &datalen); rpage = (Page) BufferGetPage(rbuf); _bt_pageinit(rpage, BufferGetPageSize(rbuf)); ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage); - ropaque->btpo_prev = xlrec->leftsib; - ropaque->btpo_next = xlrec->rnext; + ropaque->btpo_prev = leftsib; + ropaque->btpo_next = rnext; ropaque->btpo.level = xlrec->level; ropaque->btpo_flags = isleaf ? BTP_LEAF : 0; ropaque->btpo_cycleid = 0; @@ -324,8 +255,7 @@ btree_xlog_split(bool onleft, bool isroot, /* don't release the buffer yet; we touch right page's first item below */ /* Now reconstruct left (original) sibling page */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->leftsib, - &lbuf) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO) { /* * To retain the same physical order of the tuples that they had, we @@ -339,9 +269,31 @@ btree_xlog_split(bool onleft, bool isroot, Page lpage = (Page) BufferGetPage(lbuf); BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); OffsetNumber off; + Item newitem; + Size newitemsz = 0; Page newlpage; OffsetNumber leftoff; + datapos = XLogRecGetBlockData(record, 0, &datalen); + + if (onleft) + { + newitem = (Item) datapos; + newitemsz = MAXALIGN(IndexTupleSize(newitem)); + datapos += newitemsz; + datalen -= newitemsz; + } + + /* Extract left hikey and its size (assuming 16-bit alignment) */ + if (!isleaf) + { + left_hikey = (Item) datapos; + left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); + datapos += left_hikeysz; + datalen -= left_hikeysz; + } + Assert(datalen == 0); + newlpage = PageGetTempPageCopySpecial(lpage); /* Set high key */ @@ -358,7 +310,7 @@ btree_xlog_split(bool onleft, bool isroot, Item item; /* add the new item if it was inserted on left page */ - if (onleft && off == newitemoff) + if (onleft && off == xlrec->newitemoff) { if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber) @@ -376,7 +328,7 @@ btree_xlog_split(bool onleft, bool isroot, } /* cope with possibility that newitem goes at the end */ - if (onleft && off == newitemoff) + if (onleft && off == xlrec->newitemoff) { if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber) @@ -390,7 +342,7 @@ btree_xlog_split(bool onleft, bool isroot, lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT; if (isleaf) lopaque->btpo_flags |= BTP_LEAF; - lopaque->btpo_next = xlrec->rightsib; + lopaque->btpo_next = rightsib; lopaque->btpo_cycleid = 0; PageSetLSN(lpage, lsn); @@ -410,22 +362,16 @@ btree_xlog_split(bool onleft, bool isroot, * replay, because no other index update can be in progress, and readers * will cope properly when following an obsolete left-link. */ - if (xlrec->rnext != P_NONE) + if (rnext != P_NONE) { - /* - * the backup block containing right sibling is 1 or 2, depending - * whether this was a leaf or internal page. - */ - int rnext_index = isleaf ? 1 : 2; Buffer buffer; - if (XLogReadBufferForRedo(lsn, record, rnext_index, xlrec->node, - xlrec->rnext, &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buffer); BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = xlrec->rightsib; + pageop->btpo_prev = rightsib; PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -436,8 +382,9 @@ btree_xlog_split(bool onleft, bool isroot, } static void -btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_vacuum(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record); Buffer buffer; Page page; @@ -466,9 +413,13 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) */ if (HotStandbyActiveInReplay()) { + RelFileNode thisrnode; + BlockNumber thisblkno; BlockNumber blkno; - for (blkno = xlrec->lastBlockVacuumed + 1; blkno < xlrec->block; blkno++) + XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno); + + for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++) { /* * We use RBM_NORMAL_NO_LOG mode because it's not an error @@ -483,7 +434,7 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) * buffer manager we could optimise this so that if the block is * not in shared_buffers we confirm it as unpinned. */ - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno, + buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno, RBM_NORMAL_NO_LOG); if (BufferIsValid(buffer)) { @@ -497,20 +448,23 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) * Like in btvacuumpage(), we need to take a cleanup lock on every leaf * page. See nbtree/README for details. */ - if (XLogReadBufferForRedoExtended(lsn, record, 0, - xlrec->node, MAIN_FORKNUM, xlrec->block, - RBM_NORMAL, true, &buffer) + if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer) == BLK_NEEDS_REDO) { + char *ptr; + Size len; + + ptr = XLogRecGetBlockData(record, 0, &len); + page = (Page) BufferGetPage(buffer); - if (record->xl_len > SizeOfBtreeVacuum) + if (len > 0) { OffsetNumber *unused; OffsetNumber *unend; - unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum); - unend = (OffsetNumber *) ((char *) xlrec + record->xl_len); + unused = (OffsetNumber *) ptr; + unend = (OffsetNumber *) ((char *) ptr + len); if ((unend - unused) > 0) PageIndexMultiDelete(page, unused, unend - unused); @@ -542,13 +496,16 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record) * XXX optimise later with something like XLogPrefetchBuffer() */ static TransactionId -btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) +btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record) { + xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record); OffsetNumber *unused; Buffer ibuffer, hbuffer; Page ipage, hpage; + RelFileNode rnode; + BlockNumber blkno; ItemId iitemid, hitemid; IndexTuple itup; @@ -588,9 +545,11 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) * InvalidTransactionId to cancel all HS transactions. That's probably * overkill, but it's safe, and certainly better than panicking here. */ - ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false); + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL); if (!BufferIsValid(ibuffer)) return InvalidTransactionId; + LockBuffer(ibuffer, BT_READ); ipage = (Page) BufferGetPage(ibuffer); /* @@ -611,12 +570,13 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) * Locate the heap page that the index tuple points at */ hblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); - hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false); + hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL); if (!BufferIsValid(hbuffer)) { UnlockReleaseBuffer(ibuffer); return InvalidTransactionId; } + LockBuffer(hbuffer, BUFFER_LOCK_SHARE); hpage = (Page) BufferGetPage(hbuffer); /* @@ -678,8 +638,9 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec) } static void -btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_delete(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record); Buffer buffer; Page page; @@ -698,21 +659,23 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) */ if (InHotStandby) { - TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec); + TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record); + RelFileNode rnode; - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node); + XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); + + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); } /* * We don't need to take a cleanup lock to apply these changes. See * nbtree/README for details. */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); - if (record->xl_len > SizeOfBtreeDelete) + if (XLogRecGetDataLen(record) > SizeOfBtreeDelete) { OffsetNumber *unused; @@ -736,17 +699,15 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) } static void -btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) +btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record); - BlockNumber parent; Buffer buffer; Page page; BTPageOpaque pageop; IndexTupleData trunctuple; - parent = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - /* * In normal operation, we would lock all the pages this WAL record * touches before changing any of them. In WAL replay, it should be okay @@ -756,8 +717,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) */ /* parent page */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, parent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { OffsetNumber poffset; ItemId itemid; @@ -768,7 +728,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); - poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + poffset = xlrec->poffset; nextoffset = OffsetNumberNext(poffset); itemid = PageGetItemId(page, nextoffset); @@ -788,8 +748,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* Rewrite the leaf page as a halfdead page */ - buffer = XLogReadBuffer(xlrec->target.node, xlrec->leafblk, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); _bt_pageinit(page, BufferGetPageSize(buffer)); @@ -822,17 +781,16 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record) static void -btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) +btree_xlog_unlink_page(uint8 info, XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record); - BlockNumber target; BlockNumber leftsib; BlockNumber rightsib; Buffer buffer; Page page; BTPageOpaque pageop; - target = xlrec->deadblk; leftsib = xlrec->leftsib; rightsib = xlrec->rightsib; @@ -845,8 +803,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) */ /* Fix left-link of right sibling */ - if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, rightsib, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -861,8 +818,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) /* Fix right-link of left sibling, if any */ if (leftsib != P_NONE) { - if (XLogReadBufferForRedo(lsn, record, 1, xlrec->node, leftsib, &buffer) - == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -876,8 +832,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) } /* Rewrite target page as empty deleted page */ - buffer = XLogReadBuffer(xlrec->node, target, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); _bt_pageinit(page, BufferGetPageSize(buffer)); @@ -898,7 +853,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) * itself, update the leaf to point to the next remaining child in the * branch. */ - if (target != xlrec->leafblk) + if (XLogRecHasBlockRef(record, 3)) { /* * There is no real data on the page, so we just re-create it from @@ -906,8 +861,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) */ IndexTupleData trunctuple; - buffer = XLogReadBuffer(xlrec->node, xlrec->leafblk, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 3); page = (Page) BufferGetPage(buffer); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -936,27 +890,21 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) /* Update metapage if needed */ if (info == XLOG_BTREE_UNLINK_PAGE_META) - { - xl_btree_metadata md; - - memcpy(&md, (char *) xlrec + SizeOfBtreeUnlinkPage, - sizeof(xl_btree_metadata)); - _bt_restore_meta(xlrec->node, lsn, - md.root, md.level, - md.fastroot, md.fastlevel); - } + _bt_restore_meta(record, 4); } static void -btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_newroot(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record); Buffer buffer; Page page; BTPageOpaque pageop; + char *ptr; + Size len; - buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(buffer); _bt_pageinit(page, BufferGetPageSize(buffer)); @@ -969,34 +917,24 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) pageop->btpo_flags |= BTP_LEAF; pageop->btpo_cycleid = 0; - if (record->xl_len > SizeOfBtreeNewroot) + if (xlrec->level > 0) { - IndexTuple itup; - BlockNumber cblkno; - - _bt_restore_page(page, - (char *) xlrec + SizeOfBtreeNewroot, - record->xl_len - SizeOfBtreeNewroot); - /* extract block number of the left-hand split page */ - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY)); - cblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + ptr = XLogRecGetBlockData(record, 0, &len); + _bt_restore_page(page, ptr, len); /* Clear the incomplete-split flag in left child */ - _bt_clear_incomplete_split(lsn, record, 0, xlrec->node, cblkno); + _bt_clear_incomplete_split(record, 1); } PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - _bt_restore_meta(xlrec->node, lsn, - xlrec->rootblk, xlrec->level, - xlrec->rootblk, xlrec->level); + _bt_restore_meta(record, 2); } static void -btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record) +btree_xlog_reuse_page(XLogReaderState *record) { xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record); @@ -1015,58 +953,55 @@ btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record) ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); } - - /* Backup blocks are not used in reuse_page records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); } void -btree_redo(XLogRecPtr lsn, XLogRecord *record) +btree_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; switch (info) { case XLOG_BTREE_INSERT_LEAF: - btree_xlog_insert(true, false, lsn, record); + btree_xlog_insert(true, false, record); break; case XLOG_BTREE_INSERT_UPPER: - btree_xlog_insert(false, false, lsn, record); + btree_xlog_insert(false, false, record); break; case XLOG_BTREE_INSERT_META: - btree_xlog_insert(false, true, lsn, record); + btree_xlog_insert(false, true, record); break; case XLOG_BTREE_SPLIT_L: - btree_xlog_split(true, false, lsn, record); + btree_xlog_split(true, false, record); break; case XLOG_BTREE_SPLIT_R: - btree_xlog_split(false, false, lsn, record); + btree_xlog_split(false, false, record); break; case XLOG_BTREE_SPLIT_L_ROOT: - btree_xlog_split(true, true, lsn, record); + btree_xlog_split(true, true, record); break; case XLOG_BTREE_SPLIT_R_ROOT: - btree_xlog_split(false, true, lsn, record); + btree_xlog_split(false, true, record); break; case XLOG_BTREE_VACUUM: - btree_xlog_vacuum(lsn, record); + btree_xlog_vacuum(record); break; case XLOG_BTREE_DELETE: - btree_xlog_delete(lsn, record); + btree_xlog_delete(record); break; case XLOG_BTREE_MARK_PAGE_HALFDEAD: - btree_xlog_mark_page_halfdead(info, lsn, record); + btree_xlog_mark_page_halfdead(info, record); break; case XLOG_BTREE_UNLINK_PAGE: case XLOG_BTREE_UNLINK_PAGE_META: - btree_xlog_unlink_page(info, lsn, record); + btree_xlog_unlink_page(info, record); break; case XLOG_BTREE_NEWROOT: - btree_xlog_newroot(lsn, record); + btree_xlog_newroot(record); break; case XLOG_BTREE_REUSE_PAGE: - btree_xlog_reuse_page(lsn, record); + btree_xlog_reuse_page(record); break; default: elog(PANIC, "btree_redo: unknown op code %u", info); |