aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtree.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtree.c')
-rw-r--r--src/backend/access/nbtree/nbtree.c615
1 files changed, 341 insertions, 274 deletions
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 60ea3162d6c..7c715496735 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.66 2000/10/20 11:01:03 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.67 2000/10/21 15:43:18 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -32,6 +32,14 @@ bool BuildingBtree = false; /* see comment in btbuild() */
bool FastBuild = true; /* use sort/build instead of insertion
* build */
+#ifdef XLOG
+#include "access/xlogutils.h"
+
+void btree_redo(XLogRecPtr lsn, XLogRecord *record);
+void btree_undo(XLogRecPtr lsn, XLogRecord *record);
+void btree_desc(char *buf, uint8 xl_info, char* rec);
+#endif
+
static void _bt_restscan(IndexScanDesc scan);
/*
@@ -732,46 +740,264 @@ _bt_restscan(IndexScanDesc scan)
}
#ifdef XLOG
-void btree_redo(XLogRecPtr lsn, XLogRecord *record)
+
+static bool
+_bt_cleanup_page(Page page, RelFileNode hnode)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ OffsetNumber offno;
+ ItemId lp;
+ BTItem item;
+ bool result = false;
- if (info == XLOG_BTREE_DELETE)
- btree_xlog_delete(true, lsn, record);
- else if (info == XLOG_BTREE_INSERT)
- btree_xlog_insert(true, lsn, record);
- else if (info == XLOG_BTREE_SPLIT)
- btree_xlog_split(true, false, lsn, record); /* new item on the right */
- else if (info == XLOG_BTREE_SPLEFT)
- btree_xlog_split(true, true, lsn, record); /* new item on the left */
- else if (info == XLOG_BTREE_NEWROOT)
- btree_xlog_newroot(true, lsn, record);
- else
- elog(STOP, "btree_redo: unknown op code %u", info);
+ for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
+ {
+ lp = PageGetItemId(page, offno);
+ item = (BTItem) PageGetItem(page, lp);
+ if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid)))
+ offno = OffsetNumberNext(offno);
+ else
+ {
+ PageIndexTupleDelete(page, offno);
+ maxoff = PageGetMaxOffsetNumber(page);
+ result = true;
+ }
+ }
+
+ return(result);
}
-void btree_undo(XLogRecPtr lsn, XLogRecord *record)
+static bool
+_bt_add_item(Page page, OffsetNumber offno,
+ char* item, Size size, RelFileNode hnode)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- if (info == XLOG_BTREE_DELETE)
- btree_xlog_delete(false, lsn, record);
- else if (info == XLOG_BTREE_INSERT)
- btree_xlog_insert(false, lsn, record);
- else if (info == XLOG_BTREE_SPLIT)
- btree_xlog_split(false, false, lsn, record);/* new item on the right */
- else if (info == XLOG_BTREE_SPLEFT)
- btree_xlog_split(false, true, lsn, record); /* new item on the left */
- else if (info == XLOG_BTREE_NEWROOT)
- btree_xlog_newroot(false, lsn, record);
+ if (offno > PageGetMaxOffsetNumber(page) + 1)
+ {
+ if (! (pageop->btpo_flags & BTP_REORDER))
+ {
+ elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
+ pageop->btpo_flags |= BTP_REORDER;
+ }
+ offno = PageGetMaxOffsetNumber(page) + 1;
+ }
+
+ if (PageAddItem(page, (Item) item, size, offno,
+ LP_USED) == InvalidOffsetNumber)
+ {
+ /* ops, not enough space - try to deleted dead tuples */
+ bool result;
+
+ if (! P_ISLEAF(pageop))
+ return(false);
+ result = _bt_cleanup_page(page, hnode);
+ if (!result || PageAddItem(page, (Item) item, size, offno,
+ LP_USED) == InvalidOffsetNumber)
+ return(false);
+ }
+
+ return(true);
+}
+
+/*
+ * Remove from left sibling items belonging to right sibling
+ * and change P_HIKEY
+ */
+static void
+_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
+{
+ char *xlrec = (char*) XLogRecGetData(record);
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ Size hsize = SizeOfBtreeSplit;
+ RelFileNode hnode;
+ BTItemData btdata;
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
+ OffsetNumber offno;
+ char *item;
+ Size itemsz;
+ char *previtem = NULL;
+ char *lhikey = NULL;
+ Size lhisize = 0;
+
+ if (pageop->btpo_flags & BTP_LEAF)
+ {
+ hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+ memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
+ sizeof(CommandId), sizeof(RelFileNode));
+ }
else
- elog(STOP, "btree_undo: unknown op code %u", info);
+ {
+ lhikey = (char*)xlrec + hsize;
+ memcpy(&btdata, lhikey, sizeof(BTItemData));
+ lhisize = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ hsize += lhisize;
+ }
+
+ if (! P_RIGHTMOST(pageop))
+ PageIndexTupleDelete(page, P_HIKEY);
+
+ if (onleft) /* skip target item */
+ {
+ memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+ itemsz = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ hsize += itemsz;
+ }
+
+ for (item = (char*)xlrec + hsize; ; )
+ {
+ memcpy(&btdata, item, sizeof(BTItemData));
+ for (offno = P_FIRSTDATAKEY(pageop);
+ offno <= maxoff;
+ offno = OffsetNumberNext(offno))
+ {
+ ItemId lp = PageGetItemId(page, offno);
+ BTItem btitem = (BTItem) PageGetItem(page, lp);
+
+ if (BTItemSame(&btdata, btitem))
+ {
+ PageIndexTupleDelete(page, offno);
+ break;
+ }
+ }
+
+ itemsz = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ itemsz = MAXALIGN(itemsz);
+
+ if (item + itemsz < (char*)record + record->xl_len)
+ {
+ previtem = item;
+ item += itemsz;
+ }
+ else
+ break;
+ }
+
+ /* time to insert hi-key */
+ if (pageop->btpo_flags & BTP_LEAF)
+ {
+ lhikey = (P_RIGHTMOST(pageop)) ? item : previtem;
+ memcpy(&btdata, lhikey, sizeof(BTItemData));
+ lhisize = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ }
+
+ if (! _bt_add_item(page,
+ P_HIKEY,
+ lhikey,
+ lhisize,
+ hnode))
+ elog(STOP, "btree_split_redo: failed to add hi key to left sibling");
+
+ return;
+}
+
+/*
+ * UNDO insertion on *leaf* page:
+ * - find inserted tuple;
+ * - delete it if heap tuple was inserted by the same xaction
+ */
+static void
+_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
+ XLogRecPtr lsn, XLogRecord *record)
+{
+ char *xlrec = (char*) XLogRecGetData(record);
+ Page page = (Page) BufferGetPage(buffer);
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ BlockNumber blkno;
+ OffsetNumber offno;
+ ItemId lp;
+ BTItem item;
+
+ for ( ; ; )
+ {
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
+
+ for (offno = P_FIRSTDATAKEY(pageop);
+ offno <= maxoff;
+ offno = OffsetNumberNext(offno))
+ {
+ lp = PageGetItemId(page, offno);
+ item = (BTItem) PageGetItem(page, lp);
+ if (BTItemSame(item, btitem))
+ break;
+ }
+ if (offno <= maxoff)
+ break;
+ offno = InvalidOffsetNumber;
+ if (P_RIGHTMOST(pageop))
+ break;
+ blkno = pageop->btpo_next;
+ UnlockAndReleaseBuffer(buffer);
+ buffer = XLogReadBuffer(false, reln, blkno);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_%s_undo: lost right sibling",
+ (insert) ? "insert" : "split");
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(STOP, "btree_%s_undo: uninitialized right sibling",
+ (insert) ? "insert" : "split");
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (XLByteLT(PageGetLSN(page), lsn))
+ break;
+ }
+
+ if (offno == InvalidOffsetNumber) /* not found */
+ {
+ if (!InRecovery)
+ elog(STOP, "btree_%s_undo: lost target tuple in rollback",
+ (insert) ? "insert" : "split");
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ lp = PageGetItemId(page, offno);
+
+ if (InRecovery) /* check heap tuple */
+ {
+ if (!ItemIdDeleted(lp))
+ {
+ int result;
+ CommandId cid;
+ RelFileNode hnode;
+ Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
+
+ memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
+ memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
+ result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
+ record->xl_xid, cid);
+ if (result < 0) /* not owner */
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ }
+ PageIndexTupleDelete(page, offno);
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ pageop->btpo_flags |= BTP_REORDER;
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+
+ /* normal rollback */
+ if (ItemIdDeleted(lp)) /* marked for deletion ?! */
+ elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
+ (insert) ? "insert" : "split");
+
+ lp->lp_flags |= LP_DELETE;
+ MarkBufferForCleanup(buffer, IndexPageCleanup);
+ return;
}
-static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
+static void
+btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete *xlrec;
- Relation *reln;
+ Relation reln;
Buffer buffer;
Page page;
@@ -795,10 +1021,11 @@ static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
return;
}
-static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
+static void
+btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec;
- Relation *reln;
+ Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
@@ -872,7 +1099,7 @@ static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec;
- Relation *reln;
+ Relation reln;
BlockNumber blkno;
BlockNumber parent;
Buffer buffer;
@@ -888,7 +1115,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
/* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
- BlockIdGetBlockNumber(xlrec->otherblk);
+ BlockIdGetBlockNumber(&(xlrec->otherblk));
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost left sibling", op);
@@ -917,7 +1144,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
Size itemsz;
RelFileNode hnode;
- pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk);
+ pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
if (isleaf)
{
hsize += (sizeof(CommandId) + sizeof(RelFileNode));
@@ -970,7 +1197,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
}
/* Right (new) sibling */
- blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
+ blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer))
@@ -993,6 +1220,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
Size hsize = SizeOfBtreeSplit;
BTItemData btdata;
Size itemsz;
+ char *item;
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -1016,7 +1244,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
hsize += itemsz;
}
- for (char* item = (char*)xlrec + hsize;
+ for (item = (char*)xlrec + hsize;
item < (char*)record + record->xl_len; )
{
memcpy(&btdata, item, sizeof(BTItemData));
@@ -1030,8 +1258,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
}
pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
- BlockIdGetBlockNumber(xlrec->otherblk);
- pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk);
+ BlockIdGetBlockNumber(&(xlrec->otherblk));
+ pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
pageop->btpo_parent = parent;
PageSetLSN(page, lsn);
@@ -1077,7 +1305,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
}
/* Right (next) page */
- blkno = BlockIdGetBlockNumber(xlrec->rightblk);
+ blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost next right page", op);
@@ -1093,7 +1321,8 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
+ pageop->btpo_prev = (onleft) ?
+ BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
PageSetLSN(page, lsn);
@@ -1111,10 +1340,11 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
}
-static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
+static void
+btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_newroot *xlrec;
- Relation *reln;
+ Relation reln;
Buffer buffer;
Page page;
Buffer metabuf;
@@ -1137,6 +1367,8 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
{
+ BTPageOpaque pageop;
+
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -1150,8 +1382,9 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
BTItemData btdata;
Size itemsz;
+ char *item;
- for (char* item = (char*)xlrec + SizeOfBtreeNewroot;
+ for (item = (char*)xlrec + SizeOfBtreeNewroot;
item < (char*)record + record->xl_len; )
{
memcpy(&btdata, item, sizeof(BTItemData));
@@ -1182,7 +1415,7 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
md.btm_version = BTREE_VERSION;
md.btm_root = P_NONE;
md.btm_level = 0;
- memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md));
+ memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
}
if (XLByteLT(PageGetLSN(metapg), lsn))
@@ -1201,255 +1434,89 @@ static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
return;
}
-/*
- * UNDO insertion on *leaf* page:
- * - find inserted tuple;
- * - delete it if heap tuple was inserted by the same xaction
- */
-static void
-_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
- XLogRecPtr lsn, XLogRecord *record)
+void
+btree_redo(XLogRecPtr lsn, XLogRecord *record)
{
- char *xlrec = (char*) XLogRecGetData(record);
- Page page = (Page) BufferGetPage(buffer);
- BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- BlockNumber blkno;
- OffsetNumber offno;
- ItemId lp;
- BTItem item;
-
- for ( ; ; )
- {
- OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
-
- for (offno = P_FIRSTDATAKEY(pageop);
- offno <= maxoff;
- offno = OffsetNumberNext(offno))
- {
- lp = PageGetItemId(page, offno);
- item = (BTItem) PageGetItem(page, lp);
- if (BTItemSame(item, btitem))
- break;
- }
- if (offno <= maxoff)
- break;
- offno = InvalidOffsetNumber;
- if (P_RIGHTMOST(pageop))
- break;
- blkno = pageop->btpo_next;
- UnlockAndReleaseBuffer(buffer);
- buffer = XLogReadBuffer(false, reln, blkno);
- if (!BufferIsValid(buffer))
- elog(STOP, "btree_%s_undo: lost right sibling",
- (insert) ? "insert" : "split");
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(STOP, "btree_%s_undo: uninitialized right sibling",
- (insert) ? "insert" : "split");
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- if (XLByteLT(PageGetLSN(page), lsn))
- break;
- }
-
- if (offno == InvalidOffsetNumber) /* not found */
- {
- if (!InRecovery)
- elog(STOP, "btree_%s_undo: lost target tuple in rollback",
- (insert) ? "insert" : "split");
- UnlockAndReleaseBuffer(buffer);
- return;
- }
-
- lp = PageGetItemId(page, offno);
-
- if (InRecovery) /* check heap tuple */
- {
- if (!ItemIdDeleted(lp))
- {
- int result;
- CommandId cid;
- RelFileNode hnode;
- Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
-
- memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
- memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
- result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
- record->xl_xid, cid);
- if (result < 0) /* not owner */
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
- }
- PageIndexTupleDelete(page, offno);
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- pageop->btpo_flags |= BTP_REORDER;
- UnlockAndWriteBuffer(buffer);
- return;
- }
-
- /* normal rollback */
- if (ItemIdDeleted(lp)) /* marked for deletion ?! */
- elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
- (insert) ? "insert" : "split");
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
- lp->lp_flags |= LP_DELETE;
- MarkBufferForCleanup(buffer, IndexPageCleanup);
- return;
+ if (info == XLOG_BTREE_DELETE)
+ btree_xlog_delete(true, lsn, record);
+ else if (info == XLOG_BTREE_INSERT)
+ btree_xlog_insert(true, lsn, record);
+ else if (info == XLOG_BTREE_SPLIT)
+ btree_xlog_split(true, false, lsn, record); /* new item on the right */
+ else if (info == XLOG_BTREE_SPLEFT)
+ btree_xlog_split(true, true, lsn, record); /* new item on the left */
+ else if (info == XLOG_BTREE_NEWROOT)
+ btree_xlog_newroot(true, lsn, record);
+ else
+ elog(STOP, "btree_redo: unknown op code %u", info);
}
-static bool
-_bt_add_item(Page page, OffsetNumber offno,
- char* item, Size size, RelFileNode hnode)
+void
+btree_undo(XLogRecPtr lsn, XLogRecord *record)
{
- BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
-
- if (offno > PageGetMaxOffsetNumber(page) + 1)
- {
- if (! (pageop->btpo_flags & BTP_REORDER))
- {
- elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
- pageop->btpo_flags |= BTP_REORDER;
- }
- offno = PageGetMaxOffsetNumber(page) + 1;
- }
-
- if (PageAddItem(page, (Item) item, size, offno,
- LP_USED) == InvalidOffsetNumber)
- {
- /* ops, not enough space - try to deleted dead tuples */
- bool result;
-
- if (! P_ISLEAF(pageop))
- return(false);
- result = _bt_cleanup_page(page, hnode);
- if (!result || PageAddItem(page, (Item) item, size, offno,
- LP_USED) == InvalidOffsetNumber)
- return(false);
- }
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
- return(true);
+ if (info == XLOG_BTREE_DELETE)
+ btree_xlog_delete(false, lsn, record);
+ else if (info == XLOG_BTREE_INSERT)
+ btree_xlog_insert(false, lsn, record);
+ else if (info == XLOG_BTREE_SPLIT)
+ btree_xlog_split(false, false, lsn, record);/* new item on the right */
+ else if (info == XLOG_BTREE_SPLEFT)
+ btree_xlog_split(false, true, lsn, record); /* new item on the left */
+ else if (info == XLOG_BTREE_NEWROOT)
+ btree_xlog_newroot(false, lsn, record);
+ else
+ elog(STOP, "btree_undo: unknown op code %u", info);
}
-static bool
-_bt_cleanup_page(Page page, RelFileNode hnode)
+static void
+out_target(char *buf, xl_btreetid *target)
{
- OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
- OffsetNumber offno;
- ItemId lp;
- BTItem item;
- bool result = false;
-
- for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
- {
- lp = PageGetItemId(page, offno);
- item = (BTItem) PageGetItem(page, lp);
- if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))
- offno = OffsetNumberNext(offno);
- else
- {
- PageIndexTupleDelete(page, offno);
- maxoff = PageGetMaxOffsetNumber(page);
- result = true;
- }
- }
-
- return(result);
+ sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
+ target->node.tblNode, target->node.relNode,
+ ItemPointerGetBlockNumber(&(target->tid)),
+ ItemPointerGetOffsetNumber(&(target->tid)));
}
-
-/*
- * Remove from left sibling items belonging to right sibling
- * and change P_HIKEY
- */
-static void
-_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
+
+void
+btree_desc(char *buf, uint8 xl_info, char* rec)
{
- char *xlrec = (char*) XLogRecGetData(record);
- BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- Size hsize = SizeOfBtreeSplit;
- RelFileNode hnode;
- BTItemData btdata;
- OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
- OffsetNumber offno;
- char *item;
- Size itemsz;
- char *previtem = NULL;
- char *lhikey = NULL;
- Size lhisize = 0;
+ uint8 info = xl_info & ~XLR_INFO_MASK;
- if (pageop->btpo_flags & BTP_LEAF)
+ if (info == XLOG_BTREE_INSERT)
{
- hsize += (sizeof(CommandId) + sizeof(RelFileNode));
- memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
- sizeof(CommandId), sizeof(RelFileNode));
+ xl_btree_insert *xlrec = (xl_btree_insert*) rec;
+ strcat(buf, "insert: ");
+ out_target(buf, &(xlrec->target));
}
- else
+ else if (info == XLOG_BTREE_DELETE)
{
- lhikey = (char*)xlrec + hsize;
- memcpy(&btdata, lhikey, sizeof(BTItemData));
- lhisize = IndexTupleDSize(btdata.bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- hsize += lhisize;
+ xl_btree_delete *xlrec = (xl_btree_delete*) rec;
+ strcat(buf, "delete: ");
+ out_target(buf, &(xlrec->target));
}
-
- if (! P_RIGHTMOST(pageop))
- PageIndexTupleDelete(page, P_HIKEY);
-
- if (onleft) /* skip target item */
+ else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
{
- memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
- itemsz = IndexTupleDSize(btdata.bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- hsize += itemsz;
+ xl_btree_split *xlrec = (xl_btree_split*) rec;
+ sprintf(buf + strlen(buf), "split(%s): ",
+ (info == XLOG_BTREE_SPLIT) ? "right" : "left");
+ out_target(buf, &(xlrec->target));
+ sprintf(buf + strlen(buf), "; oth %u; rgh %u",
+ BlockIdGetBlockNumber(&xlrec->otherblk),
+ BlockIdGetBlockNumber(&xlrec->rightblk));
}
-
- for (item = (char*)xlrec + hsize; ; )
- {
- memcpy(&btdata, item, sizeof(BTItemData));
- for (offno = P_FIRSTDATAKEY(pageop);
- offno <= maxoff;
- offno = OffsetNumberNext(offno))
- {
- ItemId lp = PageGetItemId(page, offno);
- BTItem btitem = (BTItem) PageGetItem(page, lp);
-
- if (BTItemSame(&btdata, btitem))
- {
- PageIndexTupleDelete(page, offno);
- break;
- }
- }
-
- itemsz = IndexTupleDSize(btdata.bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- itemsz = MAXALIGN(itemsz);
-
- if (item + itemsz < (char*)record + record->xl_len)
- {
- previtem = item;
- item += itemsz;
- }
- else
- break;
- }
-
- /* time to insert hi-key */
- if (pageop->btpo_flags & BTP_LEAF)
+ else if (info == XLOG_BTREE_NEWROOT)
{
- lhikey = (P_RIGHTMOST(pageop)) ? item : previtem;
- memcpy(&btdata, lhikey, sizeof(BTItemData));
- lhisize = IndexTupleDSize(btdata.bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
+ xl_btree_newroot *xlrec = (xl_btree_newroot*) rec;
+ sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
+ xlrec->node.tblNode, xlrec->node.relNode,
+ BlockIdGetBlockNumber(&xlrec->rootblk));
}
-
- if (! _bt_add_item(page,
- P_HIKEY,
- lhikey,
- lhisize,
- &hnode))
- elog(STOP, "btree_split_redo: failed to add hi key to left sibling");
-
- return;
+ else
+ strcat(buf, "UNKNOWN");
}
#endif