diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtree.c')
-rw-r--r-- | src/backend/access/nbtree/nbtree.c | 582 |
1 files changed, 581 insertions, 1 deletions
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 7fec982fa2d..1064c2bb107 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -730,3 +730,583 @@ _bt_restscan(IndexScanDesc scan) so->btso_curbuf = buf; } } + +#ifdef XLOG +void btree_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(true, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(true, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(true, false, lsn, record); /* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(true, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(true, lsn, record); + else + elog(STOP, "btree_redo: unknown op code %u", info); +} + +void btree_undo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(false, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(false, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(false, false, lsn, record);/* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(false, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(false, lsn, record); + else + elog(STOP, "btree_undo: unknown op code %u", info); +} + +static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_delete *xlrec; + Relation *reln; + Buffer buffer; + Page page; + + if (!redo) + return; + + xlrec = (xl_btree_delete*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_delete_redo: block unfound"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_delete_redo: uninitialized page"); + + PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid))); + + return; +} + +static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_insert *xlrec; + Relation *reln; + Buffer buffer; + Page page; + BTPageOpaque pageop; + + xlrec = (xl_btree_insert*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer((redo) ? true : false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_insert_%s: uninitialized page", + (redo) ? "redo" : "undo"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + Size hsize = SizeOfBtreeInsert; + RelFileNode hnode; + + if (P_ISLEAF(pageop)) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + + sizeof(CommandId), sizeof(RelFileNode)); + } + + if (! _bt_add_item(page, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + (char*)xlrec + hsize, + record->xl_len - hsize, + &hnode)) + elog(STOP, "btree_insert_redo: failed to add item"); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else + { + BTItemData btdata; + + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_insert_undo: bad page LSN"); + + if (! P_ISLEAF(pageop)) + { + UnlockAndReleaseBuffer(buffer); + return; + } + + memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + + sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); + + _bt_del_item(reln, buffer, &btdata, true, lsn, record); + + } + + return; +} + +static void +btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_split *xlrec; + Relation *reln; + BlockNumber blkno; + BlockNumber parent; + Buffer buffer; + Page page; + BTPageOpaque pageop; + char *op = (redo) ? "redo" : "undo"; + bool isleaf; + + xlrec = (xl_btree_split*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); + if (!RelationIsValid(reln)) + return; + + /* Left (original) sibling */ + blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(xlrec->otherblk); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost left sibling", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_%s: uninitialized left sibling", op); + + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + isleaf = P_ISLEAF(pageop); + parent = pageop->btpo_parent; + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + /* Delete items related to new right sibling */ + _bt_thin_left_page(page, record); + + if (onleft) + { + BTItemData btdata; + Size hsize = SizeOfBtreeSplit; + Size itemsz; + RelFileNode hnode; + + pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk); + if (isleaf) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId), sizeof(RelFileNode)); + } + + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + + if (! _bt_add_item(page, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + (char*)xlrec + hsize, + itemsz, + &hnode)) + elog(STOP, "btree_split_redo: failed to add item"); + } + else + pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad left sibling LSN"); + + if (! isleaf || ! onleft) + UnlockAndReleaseBuffer(buffer); + else + { + BTItemData btdata; + + memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); + + _bt_del_item(reln, buffer, &btdata, false, lsn, record); + } + } + + /* Right (new) sibling */ + blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + buffer = XLogReadBuffer((redo) ? true : false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost right sibling", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + if (!redo) + elog(STOP, "btree_split_undo: uninitialized right sibling"); + PageInit(page, BufferGetPageSize(buffer), 0); + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + Size hsize = SizeOfBtreeSplit; + BTItemData btdata; + Size itemsz; + + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (isleaf) + { + pageop->btpo_flags |= BTP_LEAF; + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + } + if (onleft) /* skip target item */ + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + + for (char* item = (char*)xlrec + hsize; + item < (char*)record + record->xl_len; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(STOP, "btree_split_redo: can't add item to right sibling"); + item += itemsz; + } + + pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(xlrec->otherblk); + pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk); + pageop->btpo_parent = parent; + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad right sibling LSN"); + + if (! isleaf || onleft) + UnlockAndReleaseBuffer(buffer); + else + { + char tbuf[BLCKSZ]; + int cnt; + char *item; + Size itemsz; + + item = (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId) + sizeof(RelFileNode); + for (cnt = 0; item < (char*)record + record->xl_len; ) + { + BTItem btitem = (BTItem) + (tbuf + cnt * (MAXALIGN(sizeof(BTItemData)))); + memcpy(btitem, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + item += itemsz; + cnt++; + } + cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (cnt < 0) + elog(STOP, "btree_split_undo: target item unfound in right sibling"); + + item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData))); + + _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record); + } + } + + /* Right (next) page */ + blkno = BlockIdGetBlockNumber(xlrec->rightblk); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_split_%s: lost next right page", op); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_%s: uninitialized next right page", op); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) + UnlockAndReleaseBuffer(buffer); + else + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + else /* undo */ + { + if (XLByteLT(PageGetLSN(page), lsn)) + elog(STOP, "btree_split_undo: bad next right page LSN"); + + UnlockAndReleaseBuffer(buffer); + } + +} + +static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_btree_newroot *xlrec; + Relation *reln; + Buffer buffer; + Page page; + Buffer metabuf; + Page metapg; + + if (!redo) + return; + + xlrec = (xl_btree_newroot*) XLogRecGetData(record); + reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk))); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_newroot_redo: no root page"); + metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_newroot_redo: no metapage"); + page = (Page) BufferGetPage(buffer); + + if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) + { + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_flags |= BTP_ROOT; + pageop->btpo_prev = pageop->btpo_next = P_NONE; + pageop->btpo_parent = BTREE_METAPAGE; + + if (record->xl_len == SizeOfBtreeNewroot) /* no childs */ + pageop->btpo_flags |= BTP_LEAF; + else + { + BTItemData btdata; + Size itemsz; + + for (char* item = (char*)xlrec + SizeOfBtreeNewroot; + item < (char*)record + record->xl_len; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, + LP_USED) == InvalidOffsetNumber) + elog(STOP, "btree_newroot_redo: can't add item"); + item += itemsz; + } + } + + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + + metapg = BufferGetPage(metabuf); + if (PageIsNew((PageHeader) metapg)) + { + BTMetaPageData md; + + _bt_pageinit(metapg, BufferGetPageSize(metabuf)); + md.btm_magic = BTREE_MAGIC; + md.btm_version = BTREE_VERSION; + md.btm_root = P_NONE; + md.btm_level = 0; + memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md)); + } + + if (XLByteLT(PageGetLSN(metapg), lsn)) + { + BTMetaPageData *metad = BTPageGetMeta(metapg); + + metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk)); + (metad->btm_level)++; + PageSetLSN(metapg, lsn); + PageSetSUI(metapg, ThisStartUpID); + UnlockAndWriteBuffer(metabuf); + } + else + UnlockAndReleaseBuffer(metabuf); + + return; +} + +/* + * UNDO insertion on *leaf* page: + * - find inserted tuple; + * - delete it if heap tuple was inserted by the same xaction + */ +static void +_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, + XLogRecPtr lsn, XLogRecord *record) +{ + char *xlrec = (char*) XLogRecGetData(record); + Page page = (Page) BufferGetPage(buffer); + BTPageOpaque pageop; + BlockNumber blkno; + OffsetNumber offno; + ItemId lp; + + for ( ; ; ) + { + offno = _bt_find_btitem(page, btitem); + if (offno != InvalidOffsetNumber) + break; + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (P_RIGHTMOST(pageop)) + break; + blkno = pageop->btpo_next; + UnlockAndReleaseBuffer(buffer); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_%s_undo: lost right sibling", + (insert) ? "insert" : "split"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_%s_undo: uninitialized right sibling", + (insert) ? "insert" : "split"); + if (XLByteLT(PageGetLSN(page), lsn)) + break; + } + + if (offno == InvalidOffsetNumber) /* not found */ + { + if (!InRecovery) + elog(STOP, "btree_%s_undo: lost target tuple in rollback", + (insert) ? "insert" : "split"); + UnlockAndReleaseBuffer(buffer); + return; + } + + lp = PageGetItemId(page, offno); + if (ItemIdDeleted(lp)) /* marked for deletion */ + { + if (!InRecovery) + elog(STOP, "btree_%s_undo: deleted target tuple in rollback", + (insert) ? "insert" : "split"); + } + else if (InRecovery) /* check heap tuple */ + { + int result; + CommandId cid; + RelFileNode hnode; + Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit; + + memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); + memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); + result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid), + record->xl_xid, cid); + if (result <= 0) /* no tuple or not owner */ + { + UnlockAndReleaseBuffer(buffer); + return; + } + } + else if (! BufferIsUpdatable(buffer)) /* normal rollback */ + { + lp->lp_flags |= LP_DELETE; + MarkBufferForCleanup(buffer, IndexPageCleanup); + return; + } + + PageIndexTupleDelete(page, offno); + if (InRecovery) + { + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_flags |= BTP_REORDER; + } + UnlockAndWriteBuffer(buffer); + + return; +} + +static bool +_bt_add_item(Page page, OffsetNumber offno, + char* item, Size size, RelFileNode* hnode) +{ + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + if (offno > PageGetMaxOffsetNumber(page) + 1) + { + if (! (pageop->btpo_flags & BTP_REORDER)) + { + elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); + pageop->btpo_flags |= BTP_REORDER; + } + offno = PageGetMaxOffsetNumber(page) + 1; + } + + if (PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + { + /* ops, not enough space - try to deleted dead tuples */ + bool result; + + if (! P_ISLEAF(pageop)) + return(false); + result = _bt_cleanup_page(page, hnode); + if (!result || PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + return(false); + } + + return(true); +} + +#endif |