diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtree.c')
-rw-r--r-- | src/backend/access/nbtree/nbtree.c | 615 |
1 files changed, 341 insertions, 274 deletions
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 60ea3162d6c..7c715496735 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.66 2000/10/20 11:01:03 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.67 2000/10/21 15:43:18 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -32,6 +32,14 @@ bool BuildingBtree = false; /* see comment in btbuild() */ bool FastBuild = true; /* use sort/build instead of insertion * build */ +#ifdef XLOG +#include "access/xlogutils.h" + +void btree_redo(XLogRecPtr lsn, XLogRecord *record); +void btree_undo(XLogRecPtr lsn, XLogRecord *record); +void btree_desc(char *buf, uint8 xl_info, char* rec); +#endif + static void _bt_restscan(IndexScanDesc scan); /* @@ -732,46 +740,264 @@ _bt_restscan(IndexScanDesc scan) } #ifdef XLOG -void btree_redo(XLogRecPtr lsn, XLogRecord *record) + +static bool +_bt_cleanup_page(Page page, RelFileNode hnode) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + OffsetNumber offno; + ItemId lp; + BTItem item; + bool result = false; - if (info == XLOG_BTREE_DELETE) - btree_xlog_delete(true, lsn, record); - else if (info == XLOG_BTREE_INSERT) - btree_xlog_insert(true, lsn, record); - else if (info == XLOG_BTREE_SPLIT) - btree_xlog_split(true, false, lsn, record); /* new item on the right */ - else if (info == XLOG_BTREE_SPLEFT) - btree_xlog_split(true, true, lsn, record); /* new item on the left */ - else if (info == XLOG_BTREE_NEWROOT) - btree_xlog_newroot(true, lsn, record); - else - elog(STOP, "btree_redo: unknown op code %u", info); + for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; ) + { + lp = PageGetItemId(page, offno); + item = (BTItem) PageGetItem(page, lp); + if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))) + offno = OffsetNumberNext(offno); + else + { + PageIndexTupleDelete(page, offno); + maxoff = PageGetMaxOffsetNumber(page); + result = true; + } + } + + return(result); } -void btree_undo(XLogRecPtr lsn, XLogRecord *record) +static bool +_bt_add_item(Page page, OffsetNumber offno, + char* item, Size size, RelFileNode hnode) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (info == XLOG_BTREE_DELETE) - btree_xlog_delete(false, lsn, record); - else if (info == XLOG_BTREE_INSERT) - btree_xlog_insert(false, lsn, record); - else if (info == XLOG_BTREE_SPLIT) - btree_xlog_split(false, false, lsn, record);/* new item on the right */ - else if (info == XLOG_BTREE_SPLEFT) - btree_xlog_split(false, true, lsn, record); /* new item on the left */ - else if (info == XLOG_BTREE_NEWROOT) - btree_xlog_newroot(false, lsn, record); + if (offno > PageGetMaxOffsetNumber(page) + 1) + { + if (! (pageop->btpo_flags & BTP_REORDER)) + { + elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); + pageop->btpo_flags |= BTP_REORDER; + } + offno = PageGetMaxOffsetNumber(page) + 1; + } + + if (PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + { + /* ops, not enough space - try to deleted dead tuples */ + bool result; + + if (! P_ISLEAF(pageop)) + return(false); + result = _bt_cleanup_page(page, hnode); + if (!result || PageAddItem(page, (Item) item, size, offno, + LP_USED) == InvalidOffsetNumber) + return(false); + } + + return(true); +} + +/* + * Remove from left sibling items belonging to right sibling + * and change P_HIKEY + */ +static void +_bt_fix_left_page(Page page, XLogRecord *record, bool onleft) +{ + char *xlrec = (char*) XLogRecGetData(record); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + Size hsize = SizeOfBtreeSplit; + RelFileNode hnode; + BTItemData btdata; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offno; + char *item; + Size itemsz; + char *previtem = NULL; + char *lhikey = NULL; + Size lhisize = 0; + + if (pageop->btpo_flags & BTP_LEAF) + { + hsize += (sizeof(CommandId) + sizeof(RelFileNode)); + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + + sizeof(CommandId), sizeof(RelFileNode)); + } else - elog(STOP, "btree_undo: unknown op code %u", info); + { + lhikey = (char*)xlrec + hsize; + memcpy(&btdata, lhikey, sizeof(BTItemData)); + lhisize = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += lhisize; + } + + if (! P_RIGHTMOST(pageop)) + PageIndexTupleDelete(page, P_HIKEY); + + if (onleft) /* skip target item */ + { + memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + hsize += itemsz; + } + + for (item = (char*)xlrec + hsize; ; ) + { + memcpy(&btdata, item, sizeof(BTItemData)); + for (offno = P_FIRSTDATAKEY(pageop); + offno <= maxoff; + offno = OffsetNumberNext(offno)) + { + ItemId lp = PageGetItemId(page, offno); + BTItem btitem = (BTItem) PageGetItem(page, lp); + + if (BTItemSame(&btdata, btitem)) + { + PageIndexTupleDelete(page, offno); + break; + } + } + + itemsz = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + + if (item + itemsz < (char*)record + record->xl_len) + { + previtem = item; + item += itemsz; + } + else + break; + } + + /* time to insert hi-key */ + if (pageop->btpo_flags & BTP_LEAF) + { + lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; + memcpy(&btdata, lhikey, sizeof(BTItemData)); + lhisize = IndexTupleDSize(btdata.bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + } + + if (! _bt_add_item(page, + P_HIKEY, + lhikey, + lhisize, + hnode)) + elog(STOP, "btree_split_redo: failed to add hi key to left sibling"); + + return; +} + +/* + * UNDO insertion on *leaf* page: + * - find inserted tuple; + * - delete it if heap tuple was inserted by the same xaction + */ +static void +_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, + XLogRecPtr lsn, XLogRecord *record) +{ + char *xlrec = (char*) XLogRecGetData(record); + Page page = (Page) BufferGetPage(buffer); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + BlockNumber blkno; + OffsetNumber offno; + ItemId lp; + BTItem item; + + for ( ; ; ) + { + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + + for (offno = P_FIRSTDATAKEY(pageop); + offno <= maxoff; + offno = OffsetNumberNext(offno)) + { + lp = PageGetItemId(page, offno); + item = (BTItem) PageGetItem(page, lp); + if (BTItemSame(item, btitem)) + break; + } + if (offno <= maxoff) + break; + offno = InvalidOffsetNumber; + if (P_RIGHTMOST(pageop)) + break; + blkno = pageop->btpo_next; + UnlockAndReleaseBuffer(buffer); + buffer = XLogReadBuffer(false, reln, blkno); + if (!BufferIsValid(buffer)) + elog(STOP, "btree_%s_undo: lost right sibling", + (insert) ? "insert" : "split"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_%s_undo: uninitialized right sibling", + (insert) ? "insert" : "split"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + if (XLByteLT(PageGetLSN(page), lsn)) + break; + } + + if (offno == InvalidOffsetNumber) /* not found */ + { + if (!InRecovery) + elog(STOP, "btree_%s_undo: lost target tuple in rollback", + (insert) ? "insert" : "split"); + UnlockAndReleaseBuffer(buffer); + return; + } + + lp = PageGetItemId(page, offno); + + if (InRecovery) /* check heap tuple */ + { + if (!ItemIdDeleted(lp)) + { + int result; + CommandId cid; + RelFileNode hnode; + Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit; + + memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); + memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); + result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid), + record->xl_xid, cid); + if (result < 0) /* not owner */ + { + UnlockAndReleaseBuffer(buffer); + return; + } + } + PageIndexTupleDelete(page, offno); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_flags |= BTP_REORDER; + UnlockAndWriteBuffer(buffer); + return; + } + + /* normal rollback */ + if (ItemIdDeleted(lp)) /* marked for deletion ?! */ + elog(STOP, "btree_%s_undo: deleted target tuple in rollback", + (insert) ? "insert" : "split"); + + lp->lp_flags |= LP_DELETE; + MarkBufferForCleanup(buffer, IndexPageCleanup); + return; } -static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) +static void +btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_btree_delete *xlrec; - Relation *reln; + Relation reln; Buffer buffer; Page page; @@ -795,10 +1021,11 @@ static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) return; } -static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) +static void +btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_btree_insert *xlrec; - Relation *reln; + Relation reln; Buffer buffer; Page page; BTPageOpaque pageop; @@ -872,7 +1099,7 @@ static void btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) { xl_btree_split *xlrec; - Relation *reln; + Relation reln; BlockNumber blkno; BlockNumber parent; Buffer buffer; @@ -888,7 +1115,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) /* Left (original) sibling */ blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : - BlockIdGetBlockNumber(xlrec->otherblk); + BlockIdGetBlockNumber(&(xlrec->otherblk)); buffer = XLogReadBuffer(false, reln, blkno); if (!BufferIsValid(buffer)) elog(STOP, "btree_split_%s: lost left sibling", op); @@ -917,7 +1144,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) Size itemsz; RelFileNode hnode; - pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk); + pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk)); if (isleaf) { hsize += (sizeof(CommandId) + sizeof(RelFileNode)); @@ -970,7 +1197,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) } /* Right (new) sibling */ - blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : + blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) : ItemPointerGetBlockNumber(&(xlrec->target.tid)); buffer = XLogReadBuffer((redo) ? true : false, reln, blkno); if (!BufferIsValid(buffer)) @@ -993,6 +1220,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) Size hsize = SizeOfBtreeSplit; BTItemData btdata; Size itemsz; + char *item; _bt_pageinit(page, BufferGetPageSize(buffer)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -1016,7 +1244,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) hsize += itemsz; } - for (char* item = (char*)xlrec + hsize; + for (item = (char*)xlrec + hsize; item < (char*)record + record->xl_len; ) { memcpy(&btdata, item, sizeof(BTItemData)); @@ -1030,8 +1258,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) } pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : - BlockIdGetBlockNumber(xlrec->otherblk); - pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk); + BlockIdGetBlockNumber(&(xlrec->otherblk)); + pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk)); pageop->btpo_parent = parent; PageSetLSN(page, lsn); @@ -1077,7 +1305,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) } /* Right (next) page */ - blkno = BlockIdGetBlockNumber(xlrec->rightblk); + blkno = BlockIdGetBlockNumber(&(xlrec->rightblk)); buffer = XLogReadBuffer(false, reln, blkno); if (!BufferIsValid(buffer)) elog(STOP, "btree_split_%s: lost next right page", op); @@ -1093,7 +1321,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) else { pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : + pageop->btpo_prev = (onleft) ? + BlockIdGetBlockNumber(&(xlrec->otherblk)) : ItemPointerGetBlockNumber(&(xlrec->target.tid)); PageSetLSN(page, lsn); @@ -1111,10 +1340,11 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) } -static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) +static void +btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) { xl_btree_newroot *xlrec; - Relation *reln; + Relation reln; Buffer buffer; Page page; Buffer metabuf; @@ -1137,6 +1367,8 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) { + BTPageOpaque pageop; + _bt_pageinit(page, BufferGetPageSize(buffer)); pageop = (BTPageOpaque) PageGetSpecialPointer(page); @@ -1150,8 +1382,9 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) { BTItemData btdata; Size itemsz; + char *item; - for (char* item = (char*)xlrec + SizeOfBtreeNewroot; + for (item = (char*)xlrec + SizeOfBtreeNewroot; item < (char*)record + record->xl_len; ) { memcpy(&btdata, item, sizeof(BTItemData)); @@ -1182,7 +1415,7 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) md.btm_version = BTREE_VERSION; md.btm_root = P_NONE; md.btm_level = 0; - memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md)); + memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md)); } if (XLByteLT(PageGetLSN(metapg), lsn)) @@ -1201,255 +1434,89 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) return; } -/* - * UNDO insertion on *leaf* page: - * - find inserted tuple; - * - delete it if heap tuple was inserted by the same xaction - */ -static void -_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, - XLogRecPtr lsn, XLogRecord *record) +void +btree_redo(XLogRecPtr lsn, XLogRecord *record) { - char *xlrec = (char*) XLogRecGetData(record); - Page page = (Page) BufferGetPage(buffer); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - BlockNumber blkno; - OffsetNumber offno; - ItemId lp; - BTItem item; - - for ( ; ; ) - { - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - - for (offno = P_FIRSTDATAKEY(pageop); - offno <= maxoff; - offno = OffsetNumberNext(offno)) - { - lp = PageGetItemId(page, offno); - item = (BTItem) PageGetItem(page, lp); - if (BTItemSame(item, btitem)) - break; - } - if (offno <= maxoff) - break; - offno = InvalidOffsetNumber; - if (P_RIGHTMOST(pageop)) - break; - blkno = pageop->btpo_next; - UnlockAndReleaseBuffer(buffer); - buffer = XLogReadBuffer(false, reln, blkno); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_%s_undo: lost right sibling", - (insert) ? "insert" : "split"); - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_%s_undo: uninitialized right sibling", - (insert) ? "insert" : "split"); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (XLByteLT(PageGetLSN(page), lsn)) - break; - } - - if (offno == InvalidOffsetNumber) /* not found */ - { - if (!InRecovery) - elog(STOP, "btree_%s_undo: lost target tuple in rollback", - (insert) ? "insert" : "split"); - UnlockAndReleaseBuffer(buffer); - return; - } - - lp = PageGetItemId(page, offno); - - if (InRecovery) /* check heap tuple */ - { - if (!ItemIdDeleted(lp)) - { - int result; - CommandId cid; - RelFileNode hnode; - Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit; - - memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); - memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); - result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid), - record->xl_xid, cid); - if (result < 0) /* not owner */ - { - UnlockAndReleaseBuffer(buffer); - return; - } - } - PageIndexTupleDelete(page, offno); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_flags |= BTP_REORDER; - UnlockAndWriteBuffer(buffer); - return; - } - - /* normal rollback */ - if (ItemIdDeleted(lp)) /* marked for deletion ?! */ - elog(STOP, "btree_%s_undo: deleted target tuple in rollback", - (insert) ? "insert" : "split"); + uint8 info = record->xl_info & ~XLR_INFO_MASK; - lp->lp_flags |= LP_DELETE; - MarkBufferForCleanup(buffer, IndexPageCleanup); - return; + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(true, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(true, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(true, false, lsn, record); /* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(true, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(true, lsn, record); + else + elog(STOP, "btree_redo: unknown op code %u", info); } -static bool -_bt_add_item(Page page, OffsetNumber offno, - char* item, Size size, RelFileNode hnode) +void +btree_undo(XLogRecPtr lsn, XLogRecord *record) { - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - if (offno > PageGetMaxOffsetNumber(page) + 1) - { - if (! (pageop->btpo_flags & BTP_REORDER)) - { - elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); - pageop->btpo_flags |= BTP_REORDER; - } - offno = PageGetMaxOffsetNumber(page) + 1; - } - - if (PageAddItem(page, (Item) item, size, offno, - LP_USED) == InvalidOffsetNumber) - { - /* ops, not enough space - try to deleted dead tuples */ - bool result; - - if (! P_ISLEAF(pageop)) - return(false); - result = _bt_cleanup_page(page, hnode); - if (!result || PageAddItem(page, (Item) item, size, offno, - LP_USED) == InvalidOffsetNumber) - return(false); - } + uint8 info = record->xl_info & ~XLR_INFO_MASK; - return(true); + if (info == XLOG_BTREE_DELETE) + btree_xlog_delete(false, lsn, record); + else if (info == XLOG_BTREE_INSERT) + btree_xlog_insert(false, lsn, record); + else if (info == XLOG_BTREE_SPLIT) + btree_xlog_split(false, false, lsn, record);/* new item on the right */ + else if (info == XLOG_BTREE_SPLEFT) + btree_xlog_split(false, true, lsn, record); /* new item on the left */ + else if (info == XLOG_BTREE_NEWROOT) + btree_xlog_newroot(false, lsn, record); + else + elog(STOP, "btree_undo: unknown op code %u", info); } -static bool -_bt_cleanup_page(Page page, RelFileNode hnode) +static void +out_target(char *buf, xl_btreetid *target) { - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offno; - ItemId lp; - BTItem item; - bool result = false; - - for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; ) - { - lp = PageGetItemId(page, offno); - item = (BTItem) PageGetItem(page, lp); - if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid)) - offno = OffsetNumberNext(offno); - else - { - PageIndexTupleDelete(page, offno); - maxoff = PageGetMaxOffsetNumber(page); - result = true; - } - } - - return(result); + sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u", + target->node.tblNode, target->node.relNode, + ItemPointerGetBlockNumber(&(target->tid)), + ItemPointerGetOffsetNumber(&(target->tid))); } - -/* - * Remove from left sibling items belonging to right sibling - * and change P_HIKEY - */ -static void -_bt_fix_left_page(Page page, XLogRecord *record, bool onleft) + +void +btree_desc(char *buf, uint8 xl_info, char* rec) { - char *xlrec = (char*) XLogRecGetData(record); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - Size hsize = SizeOfBtreeSplit; - RelFileNode hnode; - BTItemData btdata; - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offno; - char *item; - Size itemsz; - char *previtem = NULL; - char *lhikey = NULL; - Size lhisize = 0; + uint8 info = xl_info & ~XLR_INFO_MASK; - if (pageop->btpo_flags & BTP_LEAF) + if (info == XLOG_BTREE_INSERT) { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId), sizeof(RelFileNode)); + xl_btree_insert *xlrec = (xl_btree_insert*) rec; + strcat(buf, "insert: "); + out_target(buf, &(xlrec->target)); } - else + else if (info == XLOG_BTREE_DELETE) { - lhikey = (char*)xlrec + hsize; - memcpy(&btdata, lhikey, sizeof(BTItemData)); - lhisize = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += lhisize; + xl_btree_delete *xlrec = (xl_btree_delete*) rec; + strcat(buf, "delete: "); + out_target(buf, &(xlrec->target)); } - - if (! P_RIGHTMOST(pageop)) - PageIndexTupleDelete(page, P_HIKEY); - - if (onleft) /* skip target item */ + else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT) { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; + xl_btree_split *xlrec = (xl_btree_split*) rec; + sprintf(buf + strlen(buf), "split(%s): ", + (info == XLOG_BTREE_SPLIT) ? "right" : "left"); + out_target(buf, &(xlrec->target)); + sprintf(buf + strlen(buf), "; oth %u; rgh %u", + BlockIdGetBlockNumber(&xlrec->otherblk), + BlockIdGetBlockNumber(&xlrec->rightblk)); } - - for (item = (char*)xlrec + hsize; ; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - for (offno = P_FIRSTDATAKEY(pageop); - offno <= maxoff; - offno = OffsetNumberNext(offno)) - { - ItemId lp = PageGetItemId(page, offno); - BTItem btitem = (BTItem) PageGetItem(page, lp); - - if (BTItemSame(&btdata, btitem)) - { - PageIndexTupleDelete(page, offno); - break; - } - } - - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - - if (item + itemsz < (char*)record + record->xl_len) - { - previtem = item; - item += itemsz; - } - else - break; - } - - /* time to insert hi-key */ - if (pageop->btpo_flags & BTP_LEAF) + else if (info == XLOG_BTREE_NEWROOT) { - lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; - memcpy(&btdata, lhikey, sizeof(BTItemData)); - lhisize = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); + xl_btree_newroot *xlrec = (xl_btree_newroot*) rec; + sprintf(buf + strlen(buf), "root: node %u/%u; blk %u", + xlrec->node.tblNode, xlrec->node.relNode, + BlockIdGetBlockNumber(&xlrec->rootblk)); } - - if (! _bt_add_item(page, - P_HIKEY, - lhikey, - lhisize, - &hnode)) - elog(STOP, "btree_split_redo: failed to add hi key to left sibling"); - - return; + else + strcat(buf, "UNKNOWN"); } #endif |