aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVadim B. Mikheev <vadim4o@yahoo.com>2000-10-13 02:03:02 +0000
committerVadim B. Mikheev <vadim4o@yahoo.com>2000-10-13 02:03:02 +0000
commit25a26a7ab8a70ee45dcbc6b060ce6ba274857a44 (patch)
tree1f064e59737b6a5891566267d6a1d76c12a9277b
parent0b33ace6785dda49d461d7889a9623f67d81e3f7 (diff)
downloadpostgresql-25a26a7ab8a70ee45dcbc6b060ce6ba274857a44.tar.gz
postgresql-25a26a7ab8a70ee45dcbc6b060ce6ba274857a44.zip
WAL
-rw-r--r--src/backend/access/heap/heapam.c20
-rw-r--r--src/backend/access/nbtree/nbtinsert.c85
-rw-r--r--src/backend/access/nbtree/nbtpage.c25
-rw-r--r--src/backend/access/nbtree/nbtree.c582
-rw-r--r--src/include/access/nbtree.h31
5 files changed, 703 insertions, 40 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index dbcefbf2733..3e1de33bfe4 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.86 2000/10/04 00:04:41 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $
*
*
* INTERFACE ROUTINES
@@ -2016,6 +2016,22 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record)
elog(STOP, "heap_redo: unknown op code %u", info);
}
+void heap_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ if (info == XLOG_HEAP_INSERT)
+ heap_xlog_insert(false, lsn, record);
+ else if (info == XLOG_HEAP_DELETE)
+ heap_xlog_delete(false, lsn, record);
+ else if (info == XLOG_HEAP_UPDATE)
+ heap_xlog_update(false, lsn, record);
+ else if (info == XLOG_HEAP_MOVE)
+ heap_xlog_move(false, lsn, record);
+ else
+ elog(STOP, "heap_undo: unknown op code %u", info);
+}
+
void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record);
@@ -2199,7 +2215,7 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
else /* we can't delete tuple right now */
{
lp->lp_flags |= LP_DELETE; /* mark for deletion */
- MarkBufferForCleanup(buffer, PageCleanup);
+ MarkBufferForCleanup(buffer, HeapPageCleanup);
}
}
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e454a989ee4..c72b8ca3df6 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.64 2000/10/05 20:10:20 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -61,6 +61,10 @@ static void _bt_pgaddtup(Relation rel, Page page,
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
+#ifdef XLOG
+static Relation _xlheapRel; /* temporary hack */
+#endif
+
/*
* _bt_doinsert() -- Handle insertion of a single btitem in the tree.
*
@@ -119,6 +123,10 @@ top:
}
}
+#ifdef XLOG
+ _xlheapRel = heapRel; /* temporary hack */
+#endif
+
/* do the insertion */
res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
@@ -517,21 +525,38 @@ _bt_insertonpg(Relation rel,
#ifdef XLOG
/* XLOG stuff */
{
- char xlbuf[sizeof(xl_btree_insert) + 2 * sizeof(CommandId)];
+ char xlbuf[sizeof(xl_btree_insert) +
+ sizeof(CommandId) + sizeof(RelFileNode)];
xl_btree_insert *xlrec = xlbuf;
int hsize = SizeOfBtreeInsert;
+ BTItemData truncitem;
+ BTItem xlitem = btitem;
+ Size xlsize = IndexTupleDSize(btitem->bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
xlrec->target.node = rel->rd_node;
ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
if (P_ISLEAF(lpageop))
- {
+ {
CommandId cid = GetCurrentCommandId();
- memcpy(xlbuf + SizeOfBtreeInsert, &(char*)cid, sizeof(CommandId));
+ memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
hsize += sizeof(CommandId);
+ memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
+ hsize += sizeof(RelFileNode);
+ }
+ /*
+ * Read comments in _bt_pgaddtup
+ */
+ else if (newitemoff == P_FIRSTDATAKEY(lpageop))
+ {
+ truncitem = *btitem;
+ truncitem.bti_itup.t_info = sizeof(BTItemData);
+ xlitem = &truncitem;
+ xlsize = sizeof(BTItemData);
}
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
- xlbuf, hsize, (char*) btitem, itemsz);
+ xlbuf, hsize, (char*) xlitem, xlsize);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
@@ -752,7 +777,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
*/
{
char xlbuf[sizeof(xl_btree_split) +
- 2 * sizeof(CommandId) + BLCKSZ];
+ sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ];
xl_btree_split *xlrec = xlbuf;
int hsize = SizeOfBtreeSplit;
int flag = (newitemonleft) ?
@@ -765,11 +790,30 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
CommandId cid = GetCurrentCommandId();
memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId));
hsize += sizeof(CommandId);
+ memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
+ hsize += sizeof(RelFileNode);
}
if (newitemonleft)
{
- memcpy(xlbuf + hsize, (char*) newitem, newitemsz);
- hsize += newitemsz;
+ /*
+ * Read comments in _bt_pgaddtup.
+ * Actually, seems that in non-leaf splits newitem shouldn't
+ * go to first data key position.
+ */
+ if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque))
+ {
+ BTItemData truncitem = *newitem;
+ truncitem.bti_itup.t_info = sizeof(BTItemData);
+ memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData));
+ hsize += sizeof(BTItemData);
+ }
+ else
+ {
+ Size itemsz = IndexTupleDSize(newitem->bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ memcpy(xlbuf + hsize, (char*) newitem, itemsz);
+ hsize += itemsz;
+ }
xlrec->otherblk = BufferGetBlockNumber(rbuf);
}
else
@@ -1012,7 +1056,7 @@ static Buffer
_bt_getstackbuf(Relation rel, BTStack stack)
{
BlockNumber blkno;
- Buffer buf;
+ Buffer buf, newbuf;
OffsetNumber start,
offnum,
maxoff;
@@ -1101,11 +1145,18 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
Size itemsz;
BTItem new_item;
+#ifdef XLOG
+ Buffer metabuf;
+#endif
+
/* get a new root page */
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
rootpage = BufferGetPage(rootbuf);
rootblknum = BufferGetBlockNumber(rootbuf);
+#ifdef XLOG
+ metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE);
+#endif
/* NO ELOG(ERROR) from here till newroot op is logged */
@@ -1168,9 +1219,12 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
#ifdef XLOG
/* XLOG stuff */
{
- xl_btree_newroot xlrec;
+ xl_btree_newroot xlrec;
+ Page metapg = BufferGetPage(metabuf);
+ BTMetaPageData *metad = BTPageGetMeta(metapg);
+
xlrec.node = rel->rd_node;
- xlrec.rootblk = rootblknum;
+ BlockIdSet(&(xlrec.rootblk), rootblknum);
/*
* Dirrect access to page is not good but faster - we should
@@ -1181,16 +1235,25 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
(char*)rootpage + (PageHeader) rootpage)->pd_upper,
((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper);
+ metad->btm_root = rootblknum;
+ (metad->btm_level)++;
+
PageSetLSN(rootpage, recptr);
PageSetSUI(rootpage, ThisStartUpID);
+ PageSetLSN(metapg, recptr);
+ PageSetSUI(metapg, ThisStartUpID);
+
+ _bt_wrtbuf(rel, metabuf);
}
#endif
/* write and let go of the new root buffer */
_bt_wrtbuf(rel, rootbuf);
+#ifndef XLOG
/* update metadata page with new root block number */
_bt_metaproot(rel, rootblknum, 0);
+#endif
/* update and release new sibling, and finally the old root */
_bt_wrtbuf(rel, rbuf);
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 2da74219010..41acd11659c 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.38 2000/10/04 00:04:42 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.39 2000/10/13 02:03:00 vadim Exp $
*
* NOTES
* Postgres btree pages look like ordinary relation pages. The opaque
@@ -27,23 +27,6 @@
#include "access/nbtree.h"
#include "miscadmin.h"
-#define BTREE_METAPAGE 0
-#define BTREE_MAGIC 0x053162
-
-#define BTREE_VERSION 1
-
-typedef struct BTMetaPageData
-{
- uint32 btm_magic;
- uint32 btm_version;
- BlockNumber btm_root;
- int32 btm_level;
-} BTMetaPageData;
-
-#define BTPageGetMeta(p) \
- ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
-
-
/*
* We use high-concurrency locking on btrees. There are two cases in
* which we don't do locking. One is when we're building the btree.
@@ -188,14 +171,18 @@ _bt_getroot(Relation rel, int access)
#ifdef XLOG
/* XLOG stuff */
{
- xl_btree_insert xlrec;
+ xl_btree_newroot xlrec;
+
xlrec.node = rel->rd_node;
+ BlockIdSet(&(xlrec.rootblk), rootblkno);
XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
&xlrec, SizeOfBtreeNewroot, NULL, 0);
PageSetLSN(rootpage, recptr);
PageSetSUI(rootpage, ThisStartUpID);
+ PageSetLSN(metapg, recptr);
+ PageSetSUI(metapg, ThisStartUpID);
}
#endif
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 7fec982fa2d..1064c2bb107 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -730,3 +730,583 @@ _bt_restscan(IndexScanDesc scan)
so->btso_curbuf = buf;
}
}
+
+#ifdef XLOG
+void btree_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ if (info == XLOG_BTREE_DELETE)
+ btree_xlog_delete(true, lsn, record);
+ else if (info == XLOG_BTREE_INSERT)
+ btree_xlog_insert(true, lsn, record);
+ else if (info == XLOG_BTREE_SPLIT)
+ btree_xlog_split(true, false, lsn, record); /* new item on the right */
+ else if (info == XLOG_BTREE_SPLEFT)
+ btree_xlog_split(true, true, lsn, record); /* new item on the left */
+ else if (info == XLOG_BTREE_NEWROOT)
+ btree_xlog_newroot(true, lsn, record);
+ else
+ elog(STOP, "btree_redo: unknown op code %u", info);
+}
+
+void btree_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ if (info == XLOG_BTREE_DELETE)
+ btree_xlog_delete(false, lsn, record);
+ else if (info == XLOG_BTREE_INSERT)
+ btree_xlog_insert(false, lsn, record);
+ else if (info == XLOG_BTREE_SPLIT)
+ btree_xlog_split(false, false, lsn, record);/* new item on the right */
+ else if (info == XLOG_BTREE_SPLEFT)
+ btree_xlog_split(false, true, lsn, record); /* new item on the left */
+ else if (info == XLOG_BTREE_NEWROOT)
+ btree_xlog_newroot(false, lsn, record);
+ else
+ elog(STOP, "btree_undo: unknown op code %u", info);
+}
+
+static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_btree_delete *xlrec;
+ Relation *reln;
+ Buffer buffer;
+ Page page;
+
+ if (!redo)
+ return;
+
+ xlrec = (xl_btree_delete*) XLogRecGetData(record);
+ reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+ if (!RelationIsValid(reln))
+ return;
+ buffer = XLogReadBuffer(false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_delete_redo: block unfound");
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(STOP, "btree_delete_redo: uninitialized page");
+
+ PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
+
+ return;
+}
+
+static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_btree_insert *xlrec;
+ Relation *reln;
+ Buffer buffer;
+ Page page;
+ BTPageOpaque pageop;
+
+ xlrec = (xl_btree_insert*) XLogRecGetData(record);
+ reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+ if (!RelationIsValid(reln))
+ return;
+ buffer = XLogReadBuffer((redo) ? true : false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ return;
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(STOP, "btree_insert_%s: uninitialized page",
+ (redo) ? "redo" : "undo");
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page)))
+ UnlockAndReleaseBuffer(buffer);
+ else
+ {
+ Size hsize = SizeOfBtreeInsert;
+ RelFileNode hnode;
+
+ if (P_ISLEAF(pageop))
+ {
+ hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+ memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert +
+ sizeof(CommandId), sizeof(RelFileNode));
+ }
+
+ if (! _bt_add_item(page,
+ ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+ (char*)xlrec + hsize,
+ record->xl_len - hsize,
+ &hnode))
+ elog(STOP, "btree_insert_redo: failed to add item");
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ }
+ }
+ else
+ {
+ BTItemData btdata;
+
+ if (XLByteLT(PageGetLSN(page), lsn))
+ elog(STOP, "btree_insert_undo: bad page LSN");
+
+ if (! P_ISLEAF(pageop))
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert +
+ sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
+
+ _bt_del_item(reln, buffer, &btdata, true, lsn, record);
+
+ }
+
+ return;
+}
+
+static void
+btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_btree_split *xlrec;
+ Relation *reln;
+ BlockNumber blkno;
+ BlockNumber parent;
+ Buffer buffer;
+ Page page;
+ BTPageOpaque pageop;
+ char *op = (redo) ? "redo" : "undo";
+ bool isleaf;
+
+ xlrec = (xl_btree_split*) XLogRecGetData(record);
+ reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+ if (!RelationIsValid(reln))
+ return;
+
+ /* Left (original) sibling */
+ blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
+ BlockIdGetBlockNumber(xlrec->otherblk);
+ buffer = XLogReadBuffer(false, reln, blkno);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_split_%s: lost left sibling", op);
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(STOP, "btree_split_%s: uninitialized left sibling", op);
+
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ isleaf = P_ISLEAF(pageop);
+ parent = pageop->btpo_parent;
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page)))
+ UnlockAndReleaseBuffer(buffer);
+ else
+ {
+ /* Delete items related to new right sibling */
+ _bt_thin_left_page(page, record);
+
+ if (onleft)
+ {
+ BTItemData btdata;
+ Size hsize = SizeOfBtreeSplit;
+ Size itemsz;
+ RelFileNode hnode;
+
+ pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk);
+ if (isleaf)
+ {
+ hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+ memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
+ sizeof(CommandId), sizeof(RelFileNode));
+ }
+
+ memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+ itemsz = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+
+ if (! _bt_add_item(page,
+ ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+ (char*)xlrec + hsize,
+ itemsz,
+ &hnode))
+ elog(STOP, "btree_split_redo: failed to add item");
+ }
+ else
+ pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ }
+ }
+ else /* undo */
+ {
+ if (XLByteLT(PageGetLSN(page), lsn))
+ elog(STOP, "btree_split_undo: bad left sibling LSN");
+
+ if (! isleaf || ! onleft)
+ UnlockAndReleaseBuffer(buffer);
+ else
+ {
+ BTItemData btdata;
+
+ memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit +
+ sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
+
+ _bt_del_item(reln, buffer, &btdata, false, lsn, record);
+ }
+ }
+
+ /* Right (new) sibling */
+ blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
+ ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_split_%s: lost right sibling", op);
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ {
+ if (!redo)
+ elog(STOP, "btree_split_undo: uninitialized right sibling");
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ }
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page)))
+ UnlockAndReleaseBuffer(buffer);
+ else
+ {
+ Size hsize = SizeOfBtreeSplit;
+ BTItemData btdata;
+ Size itemsz;
+
+ _bt_pageinit(page, BufferGetPageSize(buffer));
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (isleaf)
+ {
+ pageop->btpo_flags |= BTP_LEAF;
+ hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+ }
+ if (onleft) /* skip target item */
+ {
+ memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+ itemsz = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ hsize += itemsz;
+ }
+
+ for (char* item = (char*)xlrec + hsize;
+ item < (char*)record + record->xl_len; )
+ {
+ memcpy(&btdata, item, sizeof(BTItemData));
+ itemsz = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ itemsz = MAXALIGN(itemsz);
+ if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,
+ LP_USED) == InvalidOffsetNumber)
+ elog(STOP, "btree_split_redo: can't add item to right sibling");
+ item += itemsz;
+ }
+
+ pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
+ BlockIdGetBlockNumber(xlrec->otherblk);
+ pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk);
+ pageop->btpo_parent = parent;
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ }
+ }
+ else /* undo */
+ {
+ if (XLByteLT(PageGetLSN(page), lsn))
+ elog(STOP, "btree_split_undo: bad right sibling LSN");
+
+ if (! isleaf || onleft)
+ UnlockAndReleaseBuffer(buffer);
+ else
+ {
+ char tbuf[BLCKSZ];
+ int cnt;
+ char *item;
+ Size itemsz;
+
+ item = (char*)xlrec + SizeOfBtreeSplit +
+ sizeof(CommandId) + sizeof(RelFileNode);
+ for (cnt = 0; item < (char*)record + record->xl_len; )
+ {
+ BTItem btitem = (BTItem)
+ (tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
+ memcpy(btitem, item, sizeof(BTItemData));
+ itemsz = IndexTupleDSize(btitem->bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ itemsz = MAXALIGN(itemsz);
+ item += itemsz;
+ cnt++;
+ }
+ cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ if (cnt < 0)
+ elog(STOP, "btree_split_undo: target item unfound in right sibling");
+
+ item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData)));
+
+ _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record);
+ }
+ }
+
+ /* Right (next) page */
+ blkno = BlockIdGetBlockNumber(xlrec->rightblk);
+ buffer = XLogReadBuffer(false, reln, blkno);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_split_%s: lost next right page", op);
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(STOP, "btree_split_%s: uninitialized next right page", op);
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page)))
+ UnlockAndReleaseBuffer(buffer);
+ else
+ {
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
+ ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ }
+ }
+ else /* undo */
+ {
+ if (XLByteLT(PageGetLSN(page), lsn))
+ elog(STOP, "btree_split_undo: bad next right page LSN");
+
+ UnlockAndReleaseBuffer(buffer);
+ }
+
+}
+
+static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_btree_newroot *xlrec;
+ Relation *reln;
+ Buffer buffer;
+ Page page;
+ Buffer metabuf;
+ Page metapg;
+
+ if (!redo)
+ return;
+
+ xlrec = (xl_btree_newroot*) XLogRecGetData(record);
+ reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+ if (!RelationIsValid(reln))
+ return;
+ buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_newroot_redo: no root page");
+ metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_newroot_redo: no metapage");
+ page = (Page) BufferGetPage(buffer);
+
+ if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
+ {
+ _bt_pageinit(page, BufferGetPageSize(buffer));
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ pageop->btpo_flags |= BTP_ROOT;
+ pageop->btpo_prev = pageop->btpo_next = P_NONE;
+ pageop->btpo_parent = BTREE_METAPAGE;
+
+ if (record->xl_len == SizeOfBtreeNewroot) /* no childs */
+ pageop->btpo_flags |= BTP_LEAF;
+ else
+ {
+ BTItemData btdata;
+ Size itemsz;
+
+ for (char* item = (char*)xlrec + SizeOfBtreeNewroot;
+ item < (char*)record + record->xl_len; )
+ {
+ memcpy(&btdata, item, sizeof(BTItemData));
+ itemsz = IndexTupleDSize(btdata.bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ itemsz = MAXALIGN(itemsz);
+ if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,
+ LP_USED) == InvalidOffsetNumber)
+ elog(STOP, "btree_newroot_redo: can't add item");
+ item += itemsz;
+ }
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else
+ UnlockAndReleaseBuffer(buffer);
+
+ metapg = BufferGetPage(metabuf);
+ if (PageIsNew((PageHeader) metapg))
+ {
+ BTMetaPageData md;
+
+ _bt_pageinit(metapg, BufferGetPageSize(metabuf));
+ md.btm_magic = BTREE_MAGIC;
+ md.btm_version = BTREE_VERSION;
+ md.btm_root = P_NONE;
+ md.btm_level = 0;
+ memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md));
+ }
+
+ if (XLByteLT(PageGetLSN(metapg), lsn))
+ {
+ BTMetaPageData *metad = BTPageGetMeta(metapg);
+
+ metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
+ (metad->btm_level)++;
+ PageSetLSN(metapg, lsn);
+ PageSetSUI(metapg, ThisStartUpID);
+ UnlockAndWriteBuffer(metabuf);
+ }
+ else
+ UnlockAndReleaseBuffer(metabuf);
+
+ return;
+}
+
+/*
+ * UNDO insertion on *leaf* page:
+ * - find inserted tuple;
+ * - delete it if heap tuple was inserted by the same xaction
+ */
+static void
+_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
+ XLogRecPtr lsn, XLogRecord *record)
+{
+ char *xlrec = (char*) XLogRecGetData(record);
+ Page page = (Page) BufferGetPage(buffer);
+ BTPageOpaque pageop;
+ BlockNumber blkno;
+ OffsetNumber offno;
+ ItemId lp;
+
+ for ( ; ; )
+ {
+ offno = _bt_find_btitem(page, btitem);
+ if (offno != InvalidOffsetNumber)
+ break;
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (P_RIGHTMOST(pageop))
+ break;
+ blkno = pageop->btpo_next;
+ UnlockAndReleaseBuffer(buffer);
+ buffer = XLogReadBuffer(false, reln, blkno);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "btree_%s_undo: lost right sibling",
+ (insert) ? "insert" : "split");
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(STOP, "btree_%s_undo: uninitialized right sibling",
+ (insert) ? "insert" : "split");
+ if (XLByteLT(PageGetLSN(page), lsn))
+ break;
+ }
+
+ if (offno == InvalidOffsetNumber) /* not found */
+ {
+ if (!InRecovery)
+ elog(STOP, "btree_%s_undo: lost target tuple in rollback",
+ (insert) ? "insert" : "split");
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ lp = PageGetItemId(page, offno);
+ if (ItemIdDeleted(lp)) /* marked for deletion */
+ {
+ if (!InRecovery)
+ elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
+ (insert) ? "insert" : "split");
+ }
+ else if (InRecovery) /* check heap tuple */
+ {
+ int result;
+ CommandId cid;
+ RelFileNode hnode;
+ Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
+
+ memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
+ memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
+ result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid),
+ record->xl_xid, cid);
+ if (result <= 0) /* no tuple or not owner */
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ }
+ else if (! BufferIsUpdatable(buffer)) /* normal rollback */
+ {
+ lp->lp_flags |= LP_DELETE;
+ MarkBufferForCleanup(buffer, IndexPageCleanup);
+ return;
+ }
+
+ PageIndexTupleDelete(page, offno);
+ if (InRecovery)
+ {
+ pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ pageop->btpo_flags |= BTP_REORDER;
+ }
+ UnlockAndWriteBuffer(buffer);
+
+ return;
+}
+
+static bool
+_bt_add_item(Page page, OffsetNumber offno,
+ char* item, Size size, RelFileNode* hnode)
+{
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ if (offno > PageGetMaxOffsetNumber(page) + 1)
+ {
+ if (! (pageop->btpo_flags & BTP_REORDER))
+ {
+ elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
+ pageop->btpo_flags |= BTP_REORDER;
+ }
+ offno = PageGetMaxOffsetNumber(page) + 1;
+ }
+
+ if (PageAddItem(page, (Item) item, size, offno,
+ LP_USED) == InvalidOffsetNumber)
+ {
+ /* ops, not enough space - try to deleted dead tuples */
+ bool result;
+
+ if (! P_ISLEAF(pageop))
+ return(false);
+ result = _bt_cleanup_page(page, hnode);
+ if (!result || PageAddItem(page, (Item) item, size, offno,
+ LP_USED) == InvalidOffsetNumber)
+ return(false);
+ }
+
+ return(true);
+}
+
+#endif
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 437b6637b24..4ca61e0c630 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: nbtree.h,v 1.43 2000/10/04 00:04:43 vadim Exp $
+ * $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -42,11 +42,28 @@ typedef struct BTPageOpaqueData
#define BTP_FREE (1 << 2) /* not currently used... */
#define BTP_META (1 << 3) /* Set in the meta-page only */
+#ifdef XLOG
+#define BTP_REORDER (1 << 4) /* items must be re-ordered */
+#endif
} BTPageOpaqueData;
typedef BTPageOpaqueData *BTPageOpaque;
#define BTREE_METAPAGE 0 /* first page is meta */
+#define BTREE_MAGIC 0x053162
+
+#define BTREE_VERSION 1
+
+typedef struct BTMetaPageData
+{
+ uint32 btm_magic;
+ uint32 btm_version;
+ BlockNumber btm_root;
+ int32 btm_level;
+} BTMetaPageData;
+
+#define BTPageGetMeta(p) \
+ ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
/*
* BTScanOpaqueData is used to remember which buffers we're currently
@@ -228,13 +245,13 @@ typedef struct xl_btree_delete
/*
* This is what we need to know about pure (without split) insert -
- * 14 + [4] + btitem with key data. Note that we need in CommandID
- * (4 bytes) only for leaf page insert.
+ * 14 + [4+8] + btitem with key data. Note that we need in CommandID
+ * and HeapNode (4 + 8 bytes) only for leaf page insert.
*/
typedef struct xl_btree_insert
{
xl_btreetid target; /* inserted tuple id */
- /* [CommandID and ] BTITEM FOLLOWS AT END OF STRUCT */
+ /* [CommandID, HeapNode and ] BTITEM FOLLOWS AT END OF STRUCT */
} xl_btree_insert;
#define SizeOfBtreeInsert (offsetof(xl_btreetid, tid) + SizeOfIptrData)
@@ -242,8 +259,8 @@ typedef struct xl_btree_insert
/*
* This is what we need to know about insert with split -
- * 22 + [4] + [btitem] + right sibling btitems. Note that we need in
- * CommandID (4 bytes) only for leaf page insert.
+ * 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in
+ * CommandID and HeapNode (4 + 8 bytes) only for leaf page insert.
*/
typedef struct xl_btree_split
{
@@ -255,7 +272,7 @@ typedef struct xl_btree_split
* We log all btitems from the right sibling. If new btitem goes on
* the left sibling then we log it too and it will be the first
* BTItemData at the end of this struct, but after (for the leaf
- * pages) CommandId.
+ * pages) CommandId and HeapNode.
*/
} xl_btree_split;