diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 2923 |
1 files changed, 1488 insertions, 1435 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 4dfa6fd2558..4bafbc2ddbb 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -1,13 +1,13 @@ /*------------------------------------------------------------------------- * * btinsert.c-- - * Item insertion in Lehman and Yao btrees for Postgres. + * Item insertion in Lehman and Yao btrees for Postgres. * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.17 1997/08/20 14:53:15 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.18 1997/09/07 04:38:45 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -22,1386 +22,1437 @@ #include <fmgr.h> #ifndef HAVE_MEMMOVE -# include <regex/utils.h> +#include <regex/utils.h> #else -# include <string.h> +#include <string.h> #endif static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem); -static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright); +static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright); static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit); -static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); +static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem); -static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); -static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem); -static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); +static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); +static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem); +static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); /* - * _bt_doinsert() -- Handle insertion of a single btitem in the tree. + * _bt_doinsert() -- Handle insertion of a single btitem in the tree. * - * This routine is called by the public interface routines, btbuild - * and btinsert. By here, btitem is filled in, and has a unique - * (xid, seqno) pair. + * This routine is called by the public interface routines, btbuild + * and btinsert. By here, btitem is filled in, and has a unique + * (xid, seqno) pair. */ InsertIndexResult _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel) { - ScanKey itup_scankey; - IndexTuple itup; - BTStack stack; - Buffer buf; - BlockNumber blkno; - int natts = rel->rd_rel->relnatts; - InsertIndexResult res; - - itup = &(btitem->bti_itup); - - /* we need a scan key to do our search, so build one */ - itup_scankey = _bt_mkscankey(rel, itup); - - /* find the page containing this key */ - stack = _bt_search(rel, natts, itup_scankey, &buf); - - blkno = BufferGetBlockNumber(buf); - - /* trade in our read lock for a write lock */ - _bt_relbuf(rel, buf, BT_READ); - buf = _bt_getbuf(rel, blkno, BT_WRITE); - - /* - * If the page was split between the time that we surrendered our - * read lock and acquired our write lock, then this page may no - * longer be the right place for the key we want to insert. In this - * case, we need to move right in the tree. See Lehman and Yao for - * an excruciatingly precise description. - */ - - buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE); - - /* if we're not allowing duplicates, make sure the key isn't */ - /* already in the node */ - if ( index_is_unique ) - { - OffsetNumber offset, maxoff; - Page page; + ScanKey itup_scankey; + IndexTuple itup; + BTStack stack; + Buffer buf; + BlockNumber blkno; + int natts = rel->rd_rel->relnatts; + InsertIndexResult res; - page = BufferGetPage(buf); - maxoff = PageGetMaxOffsetNumber (page); + itup = &(btitem->bti_itup); + + /* we need a scan key to do our search, so build one */ + itup_scankey = _bt_mkscankey(rel, itup); + + /* find the page containing this key */ + stack = _bt_search(rel, natts, itup_scankey, &buf); - offset = _bt_binsrch(rel, buf, natts, itup_scankey, BT_DESCENT); + blkno = BufferGetBlockNumber(buf); - /* make sure the offset we're given points to an actual */ - /* key on the page before trying to compare it */ - if ( !PageIsEmpty (page) && offset <= maxoff ) + /* trade in our read lock for a write lock */ + _bt_relbuf(rel, buf, BT_READ); + buf = _bt_getbuf(rel, blkno, BT_WRITE); + + /* + * If the page was split between the time that we surrendered our read + * lock and acquired our write lock, then this page may no longer be + * the right place for the key we want to insert. In this case, we + * need to move right in the tree. See Lehman and Yao for an + * excruciatingly precise description. + */ + + buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE); + + /* if we're not allowing duplicates, make sure the key isn't */ + /* already in the node */ + if (index_is_unique) { - TupleDesc itupdesc; - BTItem btitem; - IndexTuple itup; - HeapTuple htup; - BTPageOpaque opaque; - Buffer nbuf; - BlockNumber blkno; - - itupdesc = RelationGetTupleDescriptor(rel); - nbuf = InvalidBuffer; - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - /* - * _bt_compare returns 0 for (1,NULL) and (1,NULL) - - * this's how we handling NULLs - and so we must not use - * _bt_compare in real comparison, but only for - * ordering/finding items on pages. - vadim 03/24/97 - - while ( !_bt_compare (rel, itupdesc, page, - natts, itup_scankey, offset) ) - */ - while ( _bt_isequal (itupdesc, page, offset, natts, itup_scankey) ) - { /* they're equal */ - btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offset)); - itup = &(btitem->bti_itup); - htup = heap_fetch (heapRel, SelfTimeQual, &(itup->t_tid), NULL); - if ( htup != (HeapTuple) NULL ) - { /* it is a duplicate */ - elog(WARN, "Cannot insert a duplicate key into a unique index."); - } - /* get next offnum */ - if ( offset < maxoff ) - { - offset = OffsetNumberNext(offset); - } - else - { /* move right ? */ - if ( P_RIGHTMOST (opaque) ) - break; - if ( !_bt_isequal (itupdesc, page, P_HIKEY, - natts, itup_scankey) ) - break; - /* - * min key of the right page is the same, - * ooh - so many dead duplicates... - */ - blkno = opaque->btpo_next; - if ( nbuf != InvalidBuffer ) - _bt_relbuf (rel, nbuf, BT_READ); - for (nbuf = InvalidBuffer; ; ) - { - nbuf = _bt_getbuf (rel, blkno, BT_READ); - page = BufferGetPage (nbuf); - maxoff = PageGetMaxOffsetNumber(page); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - offset = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; - if ( ! PageIsEmpty (page) && offset <= maxoff ) - { /* Found some key */ - break; - } - else - { /* Empty or "pseudo"-empty page - get next */ - blkno = opaque->btpo_next; - _bt_relbuf (rel, nbuf, BT_READ); - nbuf = InvalidBuffer; - if ( blkno == P_NONE ) - break; + OffsetNumber offset, + maxoff; + Page page; + + page = BufferGetPage(buf); + maxoff = PageGetMaxOffsetNumber(page); + + offset = _bt_binsrch(rel, buf, natts, itup_scankey, BT_DESCENT); + + /* make sure the offset we're given points to an actual */ + /* key on the page before trying to compare it */ + if (!PageIsEmpty(page) && offset <= maxoff) + { + TupleDesc itupdesc; + BTItem btitem; + IndexTuple itup; + HeapTuple htup; + BTPageOpaque opaque; + Buffer nbuf; + BlockNumber blkno; + + itupdesc = RelationGetTupleDescriptor(rel); + nbuf = InvalidBuffer; + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* + * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's + * how we handling NULLs - and so we must not use _bt_compare + * in real comparison, but only for ordering/finding items on + * pages. - vadim 03/24/97 + * + * while ( !_bt_compare (rel, itupdesc, page, natts, + * itup_scankey, offset) ) + */ + while (_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) + { /* they're equal */ + btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offset)); + itup = &(btitem->bti_itup); + htup = heap_fetch(heapRel, SelfTimeQual, &(itup->t_tid), NULL); + if (htup != (HeapTuple) NULL) + { /* it is a duplicate */ + elog(WARN, "Cannot insert a duplicate key into a unique index."); + } + /* get next offnum */ + if (offset < maxoff) + { + offset = OffsetNumberNext(offset); + } + else + { /* move right ? */ + if (P_RIGHTMOST(opaque)) + break; + if (!_bt_isequal(itupdesc, page, P_HIKEY, + natts, itup_scankey)) + break; + + /* + * min key of the right page is the same, ooh - so + * many dead duplicates... + */ + blkno = opaque->btpo_next; + if (nbuf != InvalidBuffer) + _bt_relbuf(rel, nbuf, BT_READ); + for (nbuf = InvalidBuffer;;) + { + nbuf = _bt_getbuf(rel, blkno, BT_READ); + page = BufferGetPage(nbuf); + maxoff = PageGetMaxOffsetNumber(page); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + offset = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; + if (!PageIsEmpty(page) && offset <= maxoff) + { /* Found some key */ + break; + } + else + { /* Empty or "pseudo"-empty page - get next */ + blkno = opaque->btpo_next; + _bt_relbuf(rel, nbuf, BT_READ); + nbuf = InvalidBuffer; + if (blkno == P_NONE) + break; + } + } + if (nbuf == InvalidBuffer) + break; + } } - } - if ( nbuf == InvalidBuffer ) - break; - } - } - if ( nbuf != InvalidBuffer ) - _bt_relbuf(rel, nbuf, BT_READ); + if (nbuf != InvalidBuffer) + _bt_relbuf(rel, nbuf, BT_READ); + } } - } - - /* do the insertion */ - res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, - btitem, (BTItem) NULL); - - /* be tidy */ - _bt_freestack(stack); - _bt_freeskey(itup_scankey); - - return (res); + + /* do the insertion */ + res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, + btitem, (BTItem) NULL); + + /* be tidy */ + _bt_freestack(stack); + _bt_freeskey(itup_scankey); + + return (res); } /* - * _bt_insertonpg() -- Insert a tuple on a particular page in the index. + * _bt_insertonpg() -- Insert a tuple on a particular page in the index. * - * This recursive procedure does the following things: + * This recursive procedure does the following things: * - * + if necessary, splits the target page. - * + finds the right place to insert the tuple (taking into - * account any changes induced by a split). - * + inserts the tuple. - * + if the page was split, pops the parent stack, and finds the - * right place to insert the new child pointer (by walking - * right using information stored in the parent stack). - * + invoking itself with the appropriate tuple for the right - * child page on the parent. + * + if necessary, splits the target page. + * + finds the right place to insert the tuple (taking into + * account any changes induced by a split). + * + inserts the tuple. + * + if the page was split, pops the parent stack, and finds the + * right place to insert the new child pointer (by walking + * right using information stored in the parent stack). + * + invoking itself with the appropriate tuple for the right + * child page on the parent. * - * On entry, we must have the right buffer on which to do the - * insertion, and the buffer must be pinned and locked. On return, - * we will have dropped both the pin and the write lock on the buffer. + * On entry, we must have the right buffer on which to do the + * insertion, and the buffer must be pinned and locked. On return, + * we will have dropped both the pin and the write lock on the buffer. * - * The locking interactions in this code are critical. You should - * grok Lehman and Yao's paper before making any changes. In addition, - * you need to understand how we disambiguate duplicate keys in this - * implementation, in order to be able to find our location using - * L&Y "move right" operations. Since we may insert duplicate user - * keys, and since these dups may propogate up the tree, we use the - * 'afteritem' parameter to position ourselves correctly for the - * insertion on internal pages. + * The locking interactions in this code are critical. You should + * grok Lehman and Yao's paper before making any changes. In addition, + * you need to understand how we disambiguate duplicate keys in this + * implementation, in order to be able to find our location using + * L&Y "move right" operations. Since we may insert duplicate user + * keys, and since these dups may propogate up the tree, we use the + * 'afteritem' parameter to position ourselves correctly for the + * insertion on internal pages. */ -static InsertIndexResult +static InsertIndexResult _bt_insertonpg(Relation rel, - Buffer buf, - BTStack stack, - int keysz, - ScanKey scankey, - BTItem btitem, - BTItem afteritem) + Buffer buf, + BTStack stack, + int keysz, + ScanKey scankey, + BTItem btitem, + BTItem afteritem) { - InsertIndexResult res; - Page page; - BTPageOpaque lpageop; - BlockNumber itup_blkno; - OffsetNumber itup_off; - OffsetNumber firstright = InvalidOffsetNumber; - int itemsz; - bool do_split = false; - bool keys_equal = false; - - page = BufferGetPage(buf); - lpageop = (BTPageOpaque) PageGetSpecialPointer(page); - - itemsz = IndexTupleDSize(btitem->bti_itup) - + (sizeof(BTItemData) - sizeof(IndexTupleData)); - - itemsz = DOUBLEALIGN(itemsz); /* be safe, PageAddItem will do this - but we need to be consistent */ - /* - * If we have to insert item on the leftmost page which is the first - * page in the chain of duplicates then: - * 1. if scankey == hikey (i.e. - new duplicate item) then - * insert it here; - * 2. if scankey < hikey then: - * 2.a if there is duplicate key(s) here - we force splitting; - * 2.b else - we may "eat" this page from duplicates chain. - */ - if ( lpageop->btpo_flags & BTP_CHAIN ) - { - OffsetNumber maxoff = PageGetMaxOffsetNumber (page); - ItemId hitemid; - BTItem hitem; - - Assert ( !P_RIGHTMOST(lpageop) ); - hitemid = PageGetItemId(page, P_HIKEY); - hitem = (BTItem) PageGetItem(page, hitemid); - if ( maxoff > P_HIKEY && - !_bt_itemcmp (rel, keysz, hitem, - (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)), - BTEqualStrategyNumber) ) - elog (FATAL, "btree: bad key on the page in the chain of duplicates"); - - if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, - BTEqualStrategyNumber) ) - { - if ( !P_LEFTMOST(lpageop) ) - elog (FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates"); - if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, - BTLessStrategyNumber) ) - elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates"); - if ( maxoff > P_HIKEY ) /* have duplicate(s) */ - { - firstright = P_FIRSTKEY; - do_split = true; - } - else /* "eat" page */ - { - Buffer pbuf; - Page ppage; - - itup_blkno = BufferGetBlockNumber(buf); - itup_off = PageAddItem(page, (Item) btitem, itemsz, - P_FIRSTKEY, LP_USED); - if ( itup_off == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add item"); - lpageop->btpo_flags &= ~BTP_CHAIN; - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); - ppage = BufferGetPage(pbuf); - PageIndexTupleDelete(ppage, stack->bts_offset); - pfree(stack->bts_btitem); - stack->bts_btitem = _bt_formitem(&(btitem->bti_itup)); - ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), - itup_blkno, P_HIKEY); - _bt_wrtbuf(rel, buf); - res = _bt_insertonpg(rel, pbuf, stack->bts_parent, - keysz, scankey, stack->bts_btitem, - NULL); - ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); - return (res); - } - } - else - { - keys_equal = true; - if ( PageGetFreeSpace(page) < itemsz ) - do_split = true; - } - } - else if ( PageGetFreeSpace(page) < itemsz ) - do_split = true; - else if ( PageGetFreeSpace(page) < 3*itemsz + 2*sizeof(ItemIdData) ) - { - OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY; - OffsetNumber maxoff = PageGetMaxOffsetNumber (page); - ItemId itid; - BTItem previtem, chkitem; - Size maxsize; - Size currsize; - - itid = PageGetItemId(page, offnum); - previtem = (BTItem) PageGetItem(page, itid); - maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData)); - for (offnum = OffsetNumberNext(offnum); - offnum <= maxoff; offnum = OffsetNumberNext(offnum) ) - { - itid = PageGetItemId(page, offnum); - chkitem = (BTItem) PageGetItem(page, itid); - if ( !_bt_itemcmp (rel, keysz, previtem, chkitem, - BTEqualStrategyNumber) ) - { - if ( currsize > maxsize ) - maxsize = currsize; - currsize = 0; - previtem = chkitem; - } - currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData)); - } - if ( currsize > maxsize ) - maxsize = currsize; - maxsize += sizeof (PageHeaderData) + - DOUBLEALIGN (sizeof (BTPageOpaqueData)); - if ( maxsize >= PageGetPageSize (page) / 2 ) - do_split = true; - } - - if ( do_split ) - { - Buffer rbuf; - Page rpage; - BTItem ritem; - BlockNumber rbknum; - BTPageOpaque rpageop; - Buffer pbuf; - Page ppage; - BTPageOpaque ppageop; - BlockNumber bknum = BufferGetBlockNumber(buf); - BTItem lowLeftItem; - OffsetNumber maxoff; - bool shifted = false; - bool left_chained = ( lpageop->btpo_flags & BTP_CHAIN ) ? true : false; - - /* - * If we have to split leaf page in the chain of duplicates by - * new duplicate then we try to look at our right sibling first. - */ - if ( ( lpageop->btpo_flags & BTP_CHAIN ) && - ( lpageop->btpo_flags & BTP_LEAF ) && keys_equal ) - { - bool use_left = true; - - rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); - rpage = BufferGetPage(rbuf); - rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); - if ( !P_RIGHTMOST (rpageop) ) /* non-rightmost page */ - { /* - * If we have the same hikey here then it's - * yet another page in chain. - */ - if ( _bt_skeycmp (rel, keysz, scankey, rpage, - PageGetItemId(rpage, P_HIKEY), - BTEqualStrategyNumber) ) - { - if ( !( rpageop->btpo_flags & BTP_CHAIN ) ) - elog (FATAL, "btree: lost page in the chain of duplicates"); - } - else if ( _bt_skeycmp (rel, keysz, scankey, rpage, - PageGetItemId(rpage, P_HIKEY), - BTGreaterStrategyNumber) ) - elog (FATAL, "btree: hikey is out of order"); - else if ( rpageop->btpo_flags & BTP_CHAIN ) - /* - * If hikey > scankey then it's last page in chain and - * BTP_CHAIN must be OFF - */ - elog (FATAL, "btree: lost last page in the chain of duplicates"); - - /* if there is room here then we use this page. */ - if ( PageGetFreeSpace (rpage) > itemsz ) - use_left = false; - } - else /* rightmost page */ - { - Assert ( !( rpageop->btpo_flags & BTP_CHAIN ) ); - /* if there is room here then we use this page. */ - if ( PageGetFreeSpace (rpage) > itemsz ) - use_left = false; - } - if ( !use_left ) /* insert on the right page */ - { - _bt_relbuf(rel, buf, BT_WRITE); - return ( _bt_insertonpg(rel, rbuf, stack, keysz, - scankey, btitem, afteritem) ); - } - _bt_relbuf(rel, rbuf, BT_WRITE); - } + InsertIndexResult res; + Page page; + BTPageOpaque lpageop; + BlockNumber itup_blkno; + OffsetNumber itup_off; + OffsetNumber firstright = InvalidOffsetNumber; + int itemsz; + bool do_split = false; + bool keys_equal = false; + + page = BufferGetPage(buf); + lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + + itemsz = DOUBLEALIGN(itemsz); /* be safe, PageAddItem will do + * this but we need to be + * consistent */ + /* - * If after splitting un-chained page we'll got chain of pages - * with duplicates then we want to know - * 1. on which of two pages new btitem will go (current - * _bt_findsplitloc is quite bad); - * 2. what parent (if there's one) thinking about it - * (remember about deletions) + * If we have to insert item on the leftmost page which is the first + * page in the chain of duplicates then: 1. if scankey == hikey (i.e. + * - new duplicate item) then insert it here; 2. if scankey < hikey + * then: 2.a if there is duplicate key(s) here - we force splitting; + * 2.b else - we may "eat" this page from duplicates chain. */ - else if ( !( lpageop->btpo_flags & BTP_CHAIN ) ) + if (lpageop->btpo_flags & BTP_CHAIN) { - OffsetNumber start = ( P_RIGHTMOST(lpageop) ) ? P_HIKEY : P_FIRSTKEY; - Size llimit; - - maxoff = PageGetMaxOffsetNumber (page); - llimit = PageGetPageSize(page) - sizeof (PageHeaderData) - - DOUBLEALIGN (sizeof (BTPageOpaqueData)) - + sizeof(ItemIdData); - llimit /= 2; - firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit); - - if ( _bt_itemcmp (rel, keysz, - (BTItem) PageGetItem(page, PageGetItemId(page, start)), - (BTItem) PageGetItem(page, PageGetItemId(page, firstright)), - BTEqualStrategyNumber) ) - { - if ( _bt_skeycmp (rel, keysz, scankey, page, - PageGetItemId(page, firstright), - BTLessStrategyNumber) ) - /* - * force moving current items to the new page: - * new item will go on the current page. - */ - firstright = start; - else - /* - * new btitem >= firstright, start item == firstright - - * new chain of duplicates: if this non-leftmost leaf - * page and parent item < start item then force moving - * all items to the new page - current page will be - * "empty" after it. - */ - { - if ( !P_LEFTMOST (lpageop) && - ( lpageop->btpo_flags & BTP_LEAF ) ) - { - ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), - bknum, P_HIKEY); - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); - if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, - (BTItem) PageGetItem(page, - PageGetItemId(page, start)), - BTLessStrategyNumber) ) - { - firstright = start; - shifted = true; - } - _bt_relbuf(rel, pbuf, BT_WRITE); - } - } - } /* else - no new chain if start item < firstright one */ - } - - /* split the buffer into left and right halves */ - rbuf = _bt_split(rel, buf, firstright); - - /* which new page (left half or right half) gets the tuple? */ - if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) { - /* left page */ - itup_off = _bt_pgaddtup(rel, buf, keysz, scankey, - itemsz, btitem, afteritem); - itup_blkno = BufferGetBlockNumber(buf); - } else { - /* right page */ - itup_off = _bt_pgaddtup(rel, rbuf, keysz, scankey, - itemsz, btitem, afteritem); - itup_blkno = BufferGetBlockNumber(rbuf); + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + ItemId hitemid; + BTItem hitem; + + Assert(!P_RIGHTMOST(lpageop)); + hitemid = PageGetItemId(page, P_HIKEY); + hitem = (BTItem) PageGetItem(page, hitemid); + if (maxoff > P_HIKEY && + !_bt_itemcmp(rel, keysz, hitem, + (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)), + BTEqualStrategyNumber)) + elog(FATAL, "btree: bad key on the page in the chain of duplicates"); + + if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid, + BTEqualStrategyNumber)) + { + if (!P_LEFTMOST(lpageop)) + elog(FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates"); + if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid, + BTLessStrategyNumber)) + elog(FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates"); + if (maxoff > P_HIKEY) /* have duplicate(s) */ + { + firstright = P_FIRSTKEY; + do_split = true; + } + else +/* "eat" page */ + { + Buffer pbuf; + Page ppage; + + itup_blkno = BufferGetBlockNumber(buf); + itup_off = PageAddItem(page, (Item) btitem, itemsz, + P_FIRSTKEY, LP_USED); + if (itup_off == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add item"); + lpageop->btpo_flags &= ~BTP_CHAIN; + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + ppage = BufferGetPage(pbuf); + PageIndexTupleDelete(ppage, stack->bts_offset); + pfree(stack->bts_btitem); + stack->bts_btitem = _bt_formitem(&(btitem->bti_itup)); + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + itup_blkno, P_HIKEY); + _bt_wrtbuf(rel, buf); + res = _bt_insertonpg(rel, pbuf, stack->bts_parent, + keysz, scankey, stack->bts_btitem, + NULL); + ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); + return (res); + } + } + else + { + keys_equal = true; + if (PageGetFreeSpace(page) < itemsz) + do_split = true; + } } - - maxoff = PageGetMaxOffsetNumber (page); - if ( shifted ) - { - if ( maxoff > P_FIRSTKEY ) - elog (FATAL, "btree: shifted page is not empty"); - lowLeftItem = (BTItem) NULL; - } - else - { - if ( maxoff < P_FIRSTKEY ) - elog (FATAL, "btree: un-shifted page is empty"); - lowLeftItem = (BTItem) PageGetItem(page, - PageGetItemId(page, P_FIRSTKEY)); - if ( _bt_itemcmp (rel, keysz, lowLeftItem, - (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), - BTEqualStrategyNumber) ) - lpageop->btpo_flags |= BTP_CHAIN; + else if (PageGetFreeSpace(page) < itemsz) + do_split = true; + else if (PageGetFreeSpace(page) < 3 * itemsz + 2 * sizeof(ItemIdData)) + { + OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + ItemId itid; + BTItem previtem, + chkitem; + Size maxsize; + Size currsize; + + itid = PageGetItemId(page, offnum); + previtem = (BTItem) PageGetItem(page, itid); + maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData)); + for (offnum = OffsetNumberNext(offnum); + offnum <= maxoff; offnum = OffsetNumberNext(offnum)) + { + itid = PageGetItemId(page, offnum); + chkitem = (BTItem) PageGetItem(page, itid); + if (!_bt_itemcmp(rel, keysz, previtem, chkitem, + BTEqualStrategyNumber)) + { + if (currsize > maxsize) + maxsize = currsize; + currsize = 0; + previtem = chkitem; + } + currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData)); + } + if (currsize > maxsize) + maxsize = currsize; + maxsize += sizeof(PageHeaderData) + + DOUBLEALIGN(sizeof(BTPageOpaqueData)); + if (maxsize >= PageGetPageSize(page) / 2) + do_split = true; } - /* - * By here, - * - * + our target page has been split; - * + the original tuple has been inserted; - * + we have write locks on both the old (left half) and new - * (right half) buffers, after the split; and - * + we have the key we want to insert into the parent. - * - * Do the parent insertion. We need to hold onto the locks for - * the child pages until we locate the parent, but we can release - * them before doing the actual insertion (see Lehman and Yao for - * the reasoning). - */ - - if (stack == (BTStack) NULL) { - - /* create a new root node and release the split buffers */ - _bt_newroot(rel, buf, rbuf); - _bt_relbuf(rel, buf, BT_WRITE); - _bt_relbuf(rel, rbuf, BT_WRITE); - - } else { - ScanKey newskey; - InsertIndexResult newres; - BTItem new_item; - OffsetNumber upditem_offset = P_HIKEY; - bool do_update = false; - bool update_in_place = true; - bool parent_chained; - - /* form a index tuple that points at the new right page */ - rbknum = BufferGetBlockNumber(rbuf); - rpage = BufferGetPage(rbuf); - rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); - - /* - * By convention, the first entry (1) on every - * non-rightmost page is the high key for that page. In - * order to get the lowest key on the new right page, we - * actually look at its second (2) entry. - */ - - if (! P_RIGHTMOST(rpageop)) - { - ritem = (BTItem) PageGetItem(rpage, - PageGetItemId(rpage, P_FIRSTKEY)); - if ( _bt_itemcmp (rel, keysz, ritem, - (BTItem) PageGetItem(rpage, - PageGetItemId(rpage, P_HIKEY)), - BTEqualStrategyNumber) ) - rpageop->btpo_flags |= BTP_CHAIN; - } - else - ritem = (BTItem) PageGetItem(rpage, - PageGetItemId(rpage, P_HIKEY)); - - /* get a unique btitem for this key */ - new_item = _bt_formitem(&(ritem->bti_itup)); - - ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); - - /* - * Find the parent buffer and get the parent page. - * - * Oops - if we were moved right then we need to - * change stack item! We want to find parent pointing to - * where we are, right ? - vadim 05/27/97 - */ - ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), - bknum, P_HIKEY); - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); - ppage = BufferGetPage(pbuf); - ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); - parent_chained = (( ppageop->btpo_flags & BTP_CHAIN )) ? true : false; - - if ( parent_chained && !left_chained ) - elog (FATAL, "nbtree: unexpected chained parent of unchained page"); - - /* - * If the key of new_item is < than the key of the item - * in the parent page pointing to the left page - * (stack->bts_btitem), we have to update the latter key; - * otherwise the keys on the parent page wouldn't be - * monotonically increasing after we inserted the new - * pointer to the right page (new_item). This only - * happens if our left page is the leftmost page and a - * new minimum key had been inserted before, which is not - * reflected in the parent page but didn't matter so - * far. If there are duplicate keys and this new minimum - * key spills over to our new right page, we get an - * inconsistency if we don't update the left key in the - * parent page. - * - * Also, new duplicates handling code require us to update - * parent item if some smaller items left on the left page - * (which is possible in splitting leftmost page) and - * current parent item == new_item. - vadim 05/27/97 - */ - if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item, - BTGreaterStrategyNumber) || - ( !shifted && - _bt_itemcmp(rel, keysz, stack->bts_btitem, - new_item, BTEqualStrategyNumber) && - _bt_itemcmp(rel, keysz, lowLeftItem, - new_item, BTLessStrategyNumber) ) ) - { - do_update = true; - /* - * figure out which key is leftmost (if the parent page - * is rightmost, too, it must be the root) + if (do_split) + { + Buffer rbuf; + Page rpage; + BTItem ritem; + BlockNumber rbknum; + BTPageOpaque rpageop; + Buffer pbuf; + Page ppage; + BTPageOpaque ppageop; + BlockNumber bknum = BufferGetBlockNumber(buf); + BTItem lowLeftItem; + OffsetNumber maxoff; + bool shifted = false; + bool left_chained = (lpageop->btpo_flags & BTP_CHAIN) ? true : false; + + /* + * If we have to split leaf page in the chain of duplicates by new + * duplicate then we try to look at our right sibling first. */ - if(P_RIGHTMOST(ppageop)) - upditem_offset = P_HIKEY; + if ((lpageop->btpo_flags & BTP_CHAIN) && + (lpageop->btpo_flags & BTP_LEAF) && keys_equal) + { + bool use_left = true; + + rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + if (!P_RIGHTMOST(rpageop)) /* non-rightmost page */ + { /* If we have the same hikey here then + * it's yet another page in chain. */ + if (_bt_skeycmp(rel, keysz, scankey, rpage, + PageGetItemId(rpage, P_HIKEY), + BTEqualStrategyNumber)) + { + if (!(rpageop->btpo_flags & BTP_CHAIN)) + elog(FATAL, "btree: lost page in the chain of duplicates"); + } + else if (_bt_skeycmp(rel, keysz, scankey, rpage, + PageGetItemId(rpage, P_HIKEY), + BTGreaterStrategyNumber)) + elog(FATAL, "btree: hikey is out of order"); + else if (rpageop->btpo_flags & BTP_CHAIN) + + /* + * If hikey > scankey then it's last page in chain and + * BTP_CHAIN must be OFF + */ + elog(FATAL, "btree: lost last page in the chain of duplicates"); + + /* if there is room here then we use this page. */ + if (PageGetFreeSpace(rpage) > itemsz) + use_left = false; + } + else +/* rightmost page */ + { + Assert(!(rpageop->btpo_flags & BTP_CHAIN)); + /* if there is room here then we use this page. */ + if (PageGetFreeSpace(rpage) > itemsz) + use_left = false; + } + if (!use_left) /* insert on the right page */ + { + _bt_relbuf(rel, buf, BT_WRITE); + return (_bt_insertonpg(rel, rbuf, stack, keysz, + scankey, btitem, afteritem)); + } + _bt_relbuf(rel, rbuf, BT_WRITE); + } + + /* + * If after splitting un-chained page we'll got chain of pages + * with duplicates then we want to know 1. on which of two pages + * new btitem will go (current _bt_findsplitloc is quite bad); 2. + * what parent (if there's one) thinking about it (remember about + * deletions) + */ + else if (!(lpageop->btpo_flags & BTP_CHAIN)) + { + OffsetNumber start = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY; + Size llimit; + + maxoff = PageGetMaxOffsetNumber(page); + llimit = PageGetPageSize(page) - sizeof(PageHeaderData) - + DOUBLEALIGN(sizeof(BTPageOpaqueData)) + + sizeof(ItemIdData); + llimit /= 2; + firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit); + + if (_bt_itemcmp(rel, keysz, + (BTItem) PageGetItem(page, PageGetItemId(page, start)), + (BTItem) PageGetItem(page, PageGetItemId(page, firstright)), + BTEqualStrategyNumber)) + { + if (_bt_skeycmp(rel, keysz, scankey, page, + PageGetItemId(page, firstright), + BTLessStrategyNumber)) + + /* + * force moving current items to the new page: new + * item will go on the current page. + */ + firstright = start; + else + + /* + * new btitem >= firstright, start item == firstright + * - new chain of duplicates: if this non-leftmost + * leaf page and parent item < start item then force + * moving all items to the new page - current page + * will be "empty" after it. + */ + { + if (!P_LEFTMOST(lpageop) && + (lpageop->btpo_flags & BTP_LEAF)) + { + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + bknum, P_HIKEY); + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + if (_bt_itemcmp(rel, keysz, stack->bts_btitem, + (BTItem) PageGetItem(page, + PageGetItemId(page, start)), + BTLessStrategyNumber)) + { + firstright = start; + shifted = true; + } + _bt_relbuf(rel, pbuf, BT_WRITE); + } + } + } /* else - no new chain if start item < + * firstright one */ + } + + /* split the buffer into left and right halves */ + rbuf = _bt_split(rel, buf, firstright); + + /* which new page (left half or right half) gets the tuple? */ + if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) + { + /* left page */ + itup_off = _bt_pgaddtup(rel, buf, keysz, scankey, + itemsz, btitem, afteritem); + itup_blkno = BufferGetBlockNumber(buf); + } else - upditem_offset = P_FIRSTKEY; - if ( !P_LEFTMOST(lpageop) || - stack->bts_offset != upditem_offset ) - elog (FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)", - P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset); - } - - if ( do_update ) - { - if ( shifted ) - elog (FATAL, "btree: attempt to update parent for shifted page"); - /* - * Try to update in place. If out parent page is chained - * then we must forse insertion. + { + /* right page */ + itup_off = _bt_pgaddtup(rel, rbuf, keysz, scankey, + itemsz, btitem, afteritem); + itup_blkno = BufferGetBlockNumber(rbuf); + } + + maxoff = PageGetMaxOffsetNumber(page); + if (shifted) + { + if (maxoff > P_FIRSTKEY) + elog(FATAL, "btree: shifted page is not empty"); + lowLeftItem = (BTItem) NULL; + } + else + { + if (maxoff < P_FIRSTKEY) + elog(FATAL, "btree: un-shifted page is empty"); + lowLeftItem = (BTItem) PageGetItem(page, + PageGetItemId(page, P_FIRSTKEY)); + if (_bt_itemcmp(rel, keysz, lowLeftItem, + (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), + BTEqualStrategyNumber)) + lpageop->btpo_flags |= BTP_CHAIN; + } + + /* + * By here, + * + * + our target page has been split; + the original tuple has been + * inserted; + we have write locks on both the old (left half) + * and new (right half) buffers, after the split; and + we have + * the key we want to insert into the parent. + * + * Do the parent insertion. We need to hold onto the locks for the + * child pages until we locate the parent, but we can release them + * before doing the actual insertion (see Lehman and Yao for the + * reasoning). */ - if ( !parent_chained && - DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) == - DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) ) - { - _bt_updateitem(rel, keysz, pbuf, - stack->bts_btitem, lowLeftItem); - _bt_relbuf(rel, buf, BT_WRITE); - _bt_relbuf(rel, rbuf, BT_WRITE); + + if (stack == (BTStack) NULL) + { + + /* create a new root node and release the split buffers */ + _bt_newroot(rel, buf, rbuf); + _bt_relbuf(rel, buf, BT_WRITE); + _bt_relbuf(rel, rbuf, BT_WRITE); + } else { - update_in_place = false; - PageIndexTupleDelete(ppage, upditem_offset); - - /* - * don't write anything out yet--we still have the write - * lock, and now we call another _bt_insertonpg to - * insert the correct key. - * First, make a new item, using the tuple data from - * lowLeftItem. Point it to the left child. - * Update it on the stack at the same time. - */ - pfree(stack->bts_btitem); - stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup)); - ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), - bknum, P_HIKEY); - - /* - * Unlock the children before doing this - * - * Mmm ... I foresee problems here. - vadim 06/10/97 - */ - _bt_relbuf(rel, buf, BT_WRITE); - _bt_relbuf(rel, rbuf, BT_WRITE); - - /* - * A regular _bt_binsrch should find the right place to - * put the new entry, since it should be lower than any - * other key on the page. - * Therefore set afteritem to NULL. - */ - newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup)); - newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, - keysz, newskey, stack->bts_btitem, - NULL); - - pfree(newres); - pfree(newskey); - - /* - * we have now lost our lock on the parent buffer, and - * need to get it back. - */ - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + ScanKey newskey; + InsertIndexResult newres; + BTItem new_item; + OffsetNumber upditem_offset = P_HIKEY; + bool do_update = false; + bool update_in_place = true; + bool parent_chained; + + /* form a index tuple that points at the new right page */ + rbknum = BufferGetBlockNumber(rbuf); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + + /* + * By convention, the first entry (1) on every non-rightmost + * page is the high key for that page. In order to get the + * lowest key on the new right page, we actually look at its + * second (2) entry. + */ + + if (!P_RIGHTMOST(rpageop)) + { + ritem = (BTItem) PageGetItem(rpage, + PageGetItemId(rpage, P_FIRSTKEY)); + if (_bt_itemcmp(rel, keysz, ritem, + (BTItem) PageGetItem(rpage, + PageGetItemId(rpage, P_HIKEY)), + BTEqualStrategyNumber)) + rpageop->btpo_flags |= BTP_CHAIN; + } + else + ritem = (BTItem) PageGetItem(rpage, + PageGetItemId(rpage, P_HIKEY)); + + /* get a unique btitem for this key */ + new_item = _bt_formitem(&(ritem->bti_itup)); + + ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); + + /* + * Find the parent buffer and get the parent page. + * + * Oops - if we were moved right then we need to change stack + * item! We want to find parent pointing to where we are, + * right ? - vadim 05/27/97 + */ + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + bknum, P_HIKEY); + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + ppage = BufferGetPage(pbuf); + ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); + parent_chained = ((ppageop->btpo_flags & BTP_CHAIN)) ? true : false; + + if (parent_chained && !left_chained) + elog(FATAL, "nbtree: unexpected chained parent of unchained page"); + + /* + * If the key of new_item is < than the key of the item in the + * parent page pointing to the left page (stack->bts_btitem), + * we have to update the latter key; otherwise the keys on the + * parent page wouldn't be monotonically increasing after we + * inserted the new pointer to the right page (new_item). This + * only happens if our left page is the leftmost page and a + * new minimum key had been inserted before, which is not + * reflected in the parent page but didn't matter so far. If + * there are duplicate keys and this new minimum key spills + * over to our new right page, we get an inconsistency if we + * don't update the left key in the parent page. + * + * Also, new duplicates handling code require us to update parent + * item if some smaller items left on the left page (which is + * possible in splitting leftmost page) and current parent + * item == new_item. - vadim 05/27/97 + */ + if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item, + BTGreaterStrategyNumber) || + (!shifted && + _bt_itemcmp(rel, keysz, stack->bts_btitem, + new_item, BTEqualStrategyNumber) && + _bt_itemcmp(rel, keysz, lowLeftItem, + new_item, BTLessStrategyNumber))) + { + do_update = true; + + /* + * figure out which key is leftmost (if the parent page is + * rightmost, too, it must be the root) + */ + if (P_RIGHTMOST(ppageop)) + upditem_offset = P_HIKEY; + else + upditem_offset = P_FIRSTKEY; + if (!P_LEFTMOST(lpageop) || + stack->bts_offset != upditem_offset) + elog(FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)", + P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset); + } + + if (do_update) + { + if (shifted) + elog(FATAL, "btree: attempt to update parent for shifted page"); + + /* + * Try to update in place. If out parent page is chained + * then we must forse insertion. + */ + if (!parent_chained && + DOUBLEALIGN(IndexTupleDSize(lowLeftItem->bti_itup)) == + DOUBLEALIGN(IndexTupleDSize(stack->bts_btitem->bti_itup))) + { + _bt_updateitem(rel, keysz, pbuf, + stack->bts_btitem, lowLeftItem); + _bt_relbuf(rel, buf, BT_WRITE); + _bt_relbuf(rel, rbuf, BT_WRITE); + } + else + { + update_in_place = false; + PageIndexTupleDelete(ppage, upditem_offset); + + /* + * don't write anything out yet--we still have the + * write lock, and now we call another _bt_insertonpg + * to insert the correct key. First, make a new item, + * using the tuple data from lowLeftItem. Point it to + * the left child. Update it on the stack at the same + * time. + */ + pfree(stack->bts_btitem); + stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup)); + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + bknum, P_HIKEY); + + /* + * Unlock the children before doing this + * + * Mmm ... I foresee problems here. - vadim 06/10/97 + */ + _bt_relbuf(rel, buf, BT_WRITE); + _bt_relbuf(rel, rbuf, BT_WRITE); + + /* + * A regular _bt_binsrch should find the right place + * to put the new entry, since it should be lower than + * any other key on the page. Therefore set afteritem + * to NULL. + */ + newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup)); + newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, + keysz, newskey, stack->bts_btitem, + NULL); + + pfree(newres); + pfree(newskey); + + /* + * we have now lost our lock on the parent buffer, and + * need to get it back. + */ + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + } + } + else + { + _bt_relbuf(rel, buf, BT_WRITE); + _bt_relbuf(rel, rbuf, BT_WRITE); + } + + newskey = _bt_mkscankey(rel, &(new_item->bti_itup)); + + afteritem = stack->bts_btitem; + if (parent_chained && !update_in_place) + { + ppage = BufferGetPage(pbuf); + ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); + if (ppageop->btpo_flags & BTP_CHAIN) + elog(FATAL, "btree: unexpected BTP_CHAIN flag in parent after update"); + if (P_RIGHTMOST(ppageop)) + elog(FATAL, "btree: chained parent is RIGHTMOST after update"); + maxoff = PageGetMaxOffsetNumber(ppage); + if (maxoff != P_FIRSTKEY) + elog(FATAL, "btree: FIRSTKEY was unexpected in parent after update"); + if (_bt_skeycmp(rel, keysz, newskey, ppage, + PageGetItemId(ppage, P_FIRSTKEY), + BTLessEqualStrategyNumber)) + elog(FATAL, "btree: parent FIRSTKEY is >= duplicate key after update"); + if (!_bt_skeycmp(rel, keysz, newskey, ppage, + PageGetItemId(ppage, P_HIKEY), + BTEqualStrategyNumber)) + elog(FATAL, "btree: parent HIGHKEY is not equal duplicate key after update"); + afteritem = (BTItem) NULL; + } + else if (left_chained && !update_in_place) + { + ppage = BufferGetPage(pbuf); + ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); + if (!P_RIGHTMOST(ppageop) && + _bt_skeycmp(rel, keysz, newskey, ppage, + PageGetItemId(ppage, P_HIKEY), + BTGreaterStrategyNumber)) + afteritem = (BTItem) NULL; + } + if (afteritem == (BTItem) NULL) + { + rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE); + _bt_relbuf(rel, pbuf, BT_WRITE); + pbuf = rbuf; + } + + newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, + keysz, newskey, new_item, + afteritem); + + /* be tidy */ + pfree(newres); + pfree(newskey); + pfree(new_item); } - } - else - { - _bt_relbuf(rel, buf, BT_WRITE); - _bt_relbuf(rel, rbuf, BT_WRITE); - } - - newskey = _bt_mkscankey(rel, &(new_item->bti_itup)); - - afteritem = stack->bts_btitem; - if ( parent_chained && !update_in_place ) - { - ppage = BufferGetPage(pbuf); - ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); - if ( ppageop->btpo_flags & BTP_CHAIN ) - elog (FATAL, "btree: unexpected BTP_CHAIN flag in parent after update"); - if ( P_RIGHTMOST (ppageop) ) - elog (FATAL, "btree: chained parent is RIGHTMOST after update"); - maxoff = PageGetMaxOffsetNumber (ppage); - if ( maxoff != P_FIRSTKEY ) - elog (FATAL, "btree: FIRSTKEY was unexpected in parent after update"); - if ( _bt_skeycmp (rel, keysz, newskey, ppage, - PageGetItemId(ppage, P_FIRSTKEY), - BTLessEqualStrategyNumber) ) - elog (FATAL, "btree: parent FIRSTKEY is >= duplicate key after update"); - if ( !_bt_skeycmp (rel, keysz, newskey, ppage, - PageGetItemId(ppage, P_HIKEY), - BTEqualStrategyNumber) ) - elog (FATAL, "btree: parent HIGHKEY is not equal duplicate key after update"); - afteritem = (BTItem) NULL; - } - else if ( left_chained && !update_in_place ) - { - ppage = BufferGetPage(pbuf); - ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); - if ( !P_RIGHTMOST (ppageop) && - _bt_skeycmp (rel, keysz, newskey, ppage, - PageGetItemId(ppage, P_HIKEY), - BTGreaterStrategyNumber) ) - afteritem = (BTItem) NULL; - } - if ( afteritem == (BTItem) NULL) - { - rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE); - _bt_relbuf(rel, pbuf, BT_WRITE); - pbuf = rbuf; - } - - newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, - keysz, newskey, new_item, - afteritem); - - /* be tidy */ - pfree(newres); - pfree(newskey); - pfree(new_item); } - } else { - itup_off = _bt_pgaddtup(rel, buf, keysz, scankey, - itemsz, btitem, afteritem); - itup_blkno = BufferGetBlockNumber(buf); - - _bt_relbuf(rel, buf, BT_WRITE); - } - - /* by here, the new tuple is inserted */ - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); - - return (res); + else + { + itup_off = _bt_pgaddtup(rel, buf, keysz, scankey, + itemsz, btitem, afteritem); + itup_blkno = BufferGetBlockNumber(buf); + + _bt_relbuf(rel, buf, BT_WRITE); + } + + /* by here, the new tuple is inserted */ + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); + + return (res); } /* - * _bt_split() -- split a page in the btree. + * _bt_split() -- split a page in the btree. * - * On entry, buf is the page to split, and is write-locked and pinned. - * Returns the new right sibling of buf, pinned and write-locked. The - * pin and lock on buf are maintained. + * On entry, buf is the page to split, and is write-locked and pinned. + * Returns the new right sibling of buf, pinned and write-locked. The + * pin and lock on buf are maintained. */ -static Buffer +static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright) { - Buffer rbuf; - Page origpage; - Page leftpage, rightpage; - BTPageOpaque ropaque, lopaque, oopaque; - Buffer sbuf; - Page spage; - BTPageOpaque sopaque; - Size itemsz; - ItemId itemid; - BTItem item; - OffsetNumber leftoff, rightoff; - OffsetNumber start; - OffsetNumber maxoff; - OffsetNumber i; - - rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); - origpage = BufferGetPage(buf); - leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); - rightpage = BufferGetPage(rbuf); - - _bt_pageinit(rightpage, BufferGetPageSize(rbuf)); - _bt_pageinit(leftpage, BufferGetPageSize(buf)); - - /* init btree private data */ - oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); - lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); - ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); - - /* if we're splitting this page, it won't be the root when we're done */ - oopaque->btpo_flags &= ~BTP_ROOT; - oopaque->btpo_flags &= ~BTP_CHAIN; - lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags; - lopaque->btpo_prev = oopaque->btpo_prev; - ropaque->btpo_prev = BufferGetBlockNumber(buf); - lopaque->btpo_next = BufferGetBlockNumber(rbuf); - ropaque->btpo_next = oopaque->btpo_next; - - /* - * If the page we're splitting is not the rightmost page at its - * level in the tree, then the first (0) entry on the page is the - * high key for the page. We need to copy that to the right - * half. Otherwise (meaning the rightmost page case), we should - * treat the line pointers beginning at zero as user data. - * - * We leave a blank space at the start of the line table for the - * left page. We'll come back later and fill it in with the high - * key item we get from the right key. - */ - - leftoff = P_FIRSTKEY; - ropaque->btpo_next = oopaque->btpo_next; - if (! P_RIGHTMOST(oopaque)) { - /* splitting a non-rightmost page, start at the first data item */ - start = P_FIRSTKEY; - - itemid = PageGetItemId(origpage, P_HIKEY); - itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(origpage, itemid); - if ( PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add hikey to the right sibling"); - rightoff = P_FIRSTKEY; - } else { - /* splitting a rightmost page, "high key" is the first data item */ - start = P_HIKEY; - - /* the new rightmost page will not have a high key */ - rightoff = P_HIKEY; - } - maxoff = PageGetMaxOffsetNumber(origpage); - if ( firstright == InvalidOffsetNumber ) - { - Size llimit = PageGetFreeSpace(leftpage) / 2; - firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit); - } - - for (i = start; i <= maxoff; i = OffsetNumberNext(i)) { - itemid = PageGetItemId(origpage, i); + Buffer rbuf; + Page origpage; + Page leftpage, + rightpage; + BTPageOpaque ropaque, + lopaque, + oopaque; + Buffer sbuf; + Page spage; + BTPageOpaque sopaque; + Size itemsz; + ItemId itemid; + BTItem item; + OffsetNumber leftoff, + rightoff; + OffsetNumber start; + OffsetNumber maxoff; + OffsetNumber i; + + rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + origpage = BufferGetPage(buf); + leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); + rightpage = BufferGetPage(rbuf); + + _bt_pageinit(rightpage, BufferGetPageSize(rbuf)); + _bt_pageinit(leftpage, BufferGetPageSize(buf)); + + /* init btree private data */ + oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); + lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); + ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); + + /* if we're splitting this page, it won't be the root when we're done */ + oopaque->btpo_flags &= ~BTP_ROOT; + oopaque->btpo_flags &= ~BTP_CHAIN; + lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags; + lopaque->btpo_prev = oopaque->btpo_prev; + ropaque->btpo_prev = BufferGetBlockNumber(buf); + lopaque->btpo_next = BufferGetBlockNumber(rbuf); + ropaque->btpo_next = oopaque->btpo_next; + + /* + * If the page we're splitting is not the rightmost page at its level + * in the tree, then the first (0) entry on the page is the high key + * for the page. We need to copy that to the right half. Otherwise + * (meaning the rightmost page case), we should treat the line + * pointers beginning at zero as user data. + * + * We leave a blank space at the start of the line table for the left + * page. We'll come back later and fill it in with the high key item + * we get from the right key. + */ + + leftoff = P_FIRSTKEY; + ropaque->btpo_next = oopaque->btpo_next; + if (!P_RIGHTMOST(oopaque)) + { + /* splitting a non-rightmost page, start at the first data item */ + start = P_FIRSTKEY; + + itemid = PageGetItemId(origpage, P_HIKEY); + itemsz = ItemIdGetLength(itemid); + item = (BTItem) PageGetItem(origpage, itemid); + if (PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add hikey to the right sibling"); + rightoff = P_FIRSTKEY; + } + else + { + /* splitting a rightmost page, "high key" is the first data item */ + start = P_HIKEY; + + /* the new rightmost page will not have a high key */ + rightoff = P_HIKEY; + } + maxoff = PageGetMaxOffsetNumber(origpage); + if (firstright == InvalidOffsetNumber) + { + Size llimit = PageGetFreeSpace(leftpage) / 2; + + firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit); + } + + for (i = start; i <= maxoff; i = OffsetNumberNext(i)) + { + itemid = PageGetItemId(origpage, i); + itemsz = ItemIdGetLength(itemid); + item = (BTItem) PageGetItem(origpage, itemid); + + /* decide which page to put it on */ + if (i < firstright) + { + if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, + LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add item to the left sibling"); + leftoff = OffsetNumberNext(leftoff); + } + else + { + if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, + LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add item to the right sibling"); + rightoff = OffsetNumberNext(rightoff); + } + } + + /* + * Okay, page has been split, high key on right page is correct. Now + * set the high key on the left page to be the min key on the right + * page. + */ + + if (P_RIGHTMOST(ropaque)) + { + itemid = PageGetItemId(rightpage, P_HIKEY); + } + else + { + itemid = PageGetItemId(rightpage, P_FIRSTKEY); + } itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(origpage, itemid); - - /* decide which page to put it on */ - if (i < firstright) { - if ( PageAddItem(leftpage, (Item) item, itemsz, leftoff, - LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add item to the left sibling"); - leftoff = OffsetNumberNext(leftoff); - } else { - if ( PageAddItem(rightpage, (Item) item, itemsz, rightoff, - LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add item to the right sibling"); - rightoff = OffsetNumberNext(rightoff); + item = (BTItem) PageGetItem(rightpage, itemid); + + /* + * We left a hole for the high key on the left page; fill it. The + * modal crap is to tell the page manager to put the new item on the + * page and not screw around with anything else. Whoever designed + * this interface has presumably crawled back into the dung heap they + * came from. No one here will admit to it. + */ + + PageManagerModeSet(OverwritePageManagerMode); + if (PageAddItem(leftpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add hikey to the left sibling"); + PageManagerModeSet(ShufflePageManagerMode); + + /* + * By here, the original data page has been split into two new halves, + * and these are correct. The algorithm requires that the left page + * never move during a split, so we copy the new left page back on top + * of the original. Note that this is not a waste of time, since we + * also require (in the page management code) that the center of a + * page always be clean, and the most efficient way to guarantee this + * is just to compact the data by reinserting it into a new left page. + */ + + PageRestoreTempPage(leftpage, origpage); + + /* write these guys out */ + _bt_wrtnorelbuf(rel, rbuf); + _bt_wrtnorelbuf(rel, buf); + + /* + * Finally, we need to grab the right sibling (if any) and fix the + * prev pointer there. We are guaranteed that this is deadlock-free + * since no other writer will be moving holding a lock on that page + * and trying to move left, and all readers release locks on a page + * before trying to fetch its neighbors. + */ + + if (!P_RIGHTMOST(ropaque)) + { + sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + spage = BufferGetPage(sbuf); + sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); + sopaque->btpo_prev = BufferGetBlockNumber(rbuf); + + /* write and release the old right sibling */ + _bt_wrtbuf(rel, sbuf); } - } - - /* - * Okay, page has been split, high key on right page is correct. Now - * set the high key on the left page to be the min key on the right - * page. - */ - - if (P_RIGHTMOST(ropaque)) { - itemid = PageGetItemId(rightpage, P_HIKEY); - } else { - itemid = PageGetItemId(rightpage, P_FIRSTKEY); - } - itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(rightpage, itemid); - - /* - * We left a hole for the high key on the left page; fill it. The - * modal crap is to tell the page manager to put the new item on the - * page and not screw around with anything else. Whoever designed - * this interface has presumably crawled back into the dung heap they - * came from. No one here will admit to it. - */ - - PageManagerModeSet(OverwritePageManagerMode); - if ( PageAddItem(leftpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add hikey to the left sibling"); - PageManagerModeSet(ShufflePageManagerMode); - - /* - * By here, the original data page has been split into two new halves, - * and these are correct. The algorithm requires that the left page - * never move during a split, so we copy the new left page back on top - * of the original. Note that this is not a waste of time, since we - * also require (in the page management code) that the center of a - * page always be clean, and the most efficient way to guarantee this - * is just to compact the data by reinserting it into a new left page. - */ - - PageRestoreTempPage(leftpage, origpage); - - /* write these guys out */ - _bt_wrtnorelbuf(rel, rbuf); - _bt_wrtnorelbuf(rel, buf); - - /* - * Finally, we need to grab the right sibling (if any) and fix the - * prev pointer there. We are guaranteed that this is deadlock-free - * since no other writer will be moving holding a lock on that page - * and trying to move left, and all readers release locks on a page - * before trying to fetch its neighbors. - */ - - if (! P_RIGHTMOST(ropaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); - spage = BufferGetPage(sbuf); - sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); - sopaque->btpo_prev = BufferGetBlockNumber(rbuf); - - /* write and release the old right sibling */ - _bt_wrtbuf(rel, sbuf); - } - - /* split's done */ - return (rbuf); + + /* split's done */ + return (rbuf); } /* - * _bt_findsplitloc() -- find a safe place to split a page. + * _bt_findsplitloc() -- find a safe place to split a page. * - * In order to guarantee the proper handling of searches for duplicate - * keys, the first duplicate in the chain must either be the first - * item on the page after the split, or the entire chain must be on - * one of the two pages. That is, - * [1 2 2 2 3 4 5] - * must become - * [1] [2 2 2 3 4 5] - * or - * [1 2 2 2] [3 4 5] - * but not - * [1 2 2] [2 3 4 5]. - * However, - * [2 2 2 2 2 3 4] - * may be split as - * [2 2 2 2] [2 3 4]. + * In order to guarantee the proper handling of searches for duplicate + * keys, the first duplicate in the chain must either be the first + * item on the page after the split, or the entire chain must be on + * one of the two pages. That is, + * [1 2 2 2 3 4 5] + * must become + * [1] [2 2 2 3 4 5] + * or + * [1 2 2 2] [3 4 5] + * but not + * [1 2 2] [2 3 4 5]. + * However, + * [2 2 2 2 2 3 4] + * may be split as + * [2 2 2 2] [2 3 4]. */ -static OffsetNumber +static OffsetNumber _bt_findsplitloc(Relation rel, - Page page, - OffsetNumber start, - OffsetNumber maxoff, - Size llimit) + Page page, + OffsetNumber start, + OffsetNumber maxoff, + Size llimit) { - OffsetNumber i; - OffsetNumber saferight; - ItemId nxtitemid, safeitemid; - BTItem safeitem, nxtitem; - Size nbytes; - int natts; - - if ( start >= maxoff ) - elog (FATAL, "btree: cannot split if start (%d) >= maxoff (%d)", - start, maxoff); - natts = rel->rd_rel->relnatts; - saferight = start; - safeitemid = PageGetItemId(page, saferight); - nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData); - safeitem = (BTItem) PageGetItem(page, safeitemid); - - i = OffsetNumberNext(start); - - while (nbytes < llimit) - { - /* check the next item on the page */ - nxtitemid = PageGetItemId(page, i); - nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData)); - nxtitem = (BTItem) PageGetItem(page, nxtitemid); - - /* - * Test against last known safe item: - * if the tuple we're looking at isn't equal to the last safe - * one we saw, then it's our new safe tuple. - */ - if ( !_bt_itemcmp (rel, natts, - safeitem, nxtitem, BTEqualStrategyNumber) ) + OffsetNumber i; + OffsetNumber saferight; + ItemId nxtitemid, + safeitemid; + BTItem safeitem, + nxtitem; + Size nbytes; + int natts; + + if (start >= maxoff) + elog(FATAL, "btree: cannot split if start (%d) >= maxoff (%d)", + start, maxoff); + natts = rel->rd_rel->relnatts; + saferight = start; + safeitemid = PageGetItemId(page, saferight); + nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData); + safeitem = (BTItem) PageGetItem(page, safeitemid); + + i = OffsetNumberNext(start); + + while (nbytes < llimit) { - safeitem = nxtitem; - saferight = i; + /* check the next item on the page */ + nxtitemid = PageGetItemId(page, i); + nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData)); + nxtitem = (BTItem) PageGetItem(page, nxtitemid); + + /* + * Test against last known safe item: if the tuple we're looking + * at isn't equal to the last safe one we saw, then it's our new + * safe tuple. + */ + if (!_bt_itemcmp(rel, natts, + safeitem, nxtitem, BTEqualStrategyNumber)) + { + safeitem = nxtitem; + saferight = i; + } + if (i < maxoff) + i = OffsetNumberNext(i); + else + break; } - if ( i < maxoff ) - i = OffsetNumberNext(i); - else - break; - } - - /* - * If the chain of dups starts at the beginning of the page and extends - * past the halfway mark, we can split it in the middle. - */ - - if (saferight == start) - saferight = i; - - if ( saferight == maxoff && ( maxoff - start ) > 1 ) - saferight = start + ( maxoff - start ) / 2; - - return (saferight); + + /* + * If the chain of dups starts at the beginning of the page and + * extends past the halfway mark, we can split it in the middle. + */ + + if (saferight == start) + saferight = i; + + if (saferight == maxoff && (maxoff - start) > 1) + saferight = start + (maxoff - start) / 2; + + return (saferight); } /* - * _bt_newroot() -- Create a new root page for the index. + * _bt_newroot() -- Create a new root page for the index. * - * We've just split the old root page and need to create a new one. - * In order to do this, we add a new root page to the file, then lock - * the metadata page and update it. This is guaranteed to be deadlock- - * free, because all readers release their locks on the metadata page - * before trying to lock the root, and all writers lock the root before - * trying to lock the metadata page. We have a write lock on the old - * root page, so we have not introduced any cycles into the waits-for - * graph. + * We've just split the old root page and need to create a new one. + * In order to do this, we add a new root page to the file, then lock + * the metadata page and update it. This is guaranteed to be deadlock- + * free, because all readers release their locks on the metadata page + * before trying to lock the root, and all writers lock the root before + * trying to lock the metadata page. We have a write lock on the old + * root page, so we have not introduced any cycles into the waits-for + * graph. * - * On entry, lbuf (the old root) and rbuf (its new peer) are write- - * locked. We don't drop the locks in this routine; that's done by - * the caller. On exit, a new root page exists with entries for the - * two new children. The new root page is neither pinned nor locked. + * On entry, lbuf (the old root) and rbuf (its new peer) are write- + * locked. We don't drop the locks in this routine; that's done by + * the caller. On exit, a new root page exists with entries for the + * two new children. The new root page is neither pinned nor locked. */ static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { - Buffer rootbuf; - Page lpage, rpage, rootpage; - BlockNumber lbkno, rbkno; - BlockNumber rootbknum; - BTPageOpaque rootopaque; - ItemId itemid; - BTItem item; - Size itemsz; - BTItem new_item; - - /* get a new root page */ - rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); - rootpage = BufferGetPage(rootbuf); - _bt_pageinit(rootpage, BufferGetPageSize(rootbuf)); - - /* set btree special data */ - rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); - rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE; - rootopaque->btpo_flags |= BTP_ROOT; - - /* - * Insert the internal tuple pointers. - */ - - lbkno = BufferGetBlockNumber(lbuf); - rbkno = BufferGetBlockNumber(rbuf); - lpage = BufferGetPage(lbuf); - rpage = BufferGetPage(rbuf); - - /* - * step over the high key on the left page while building the - * left page pointer. - */ - itemid = PageGetItemId(lpage, P_FIRSTKEY); - itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(lpage, itemid); - new_item = _bt_formitem(&(item->bti_itup)); - ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY); - - /* - * insert the left page pointer into the new root page. the root - * page is the rightmost page on its level so the "high key" item - * is the first data item. - */ - if ( PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add leftkey to new root page"); - pfree(new_item); - - /* - * the right page is the rightmost page on the second level, so - * the "high key" item is the first data item on that page as well. - */ - itemid = PageGetItemId(rpage, P_HIKEY); - itemsz = ItemIdGetLength(itemid); - item = (BTItem) PageGetItem(rpage, itemid); - new_item = _bt_formitem(&(item->bti_itup)); - ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY); - - /* - * insert the right page pointer into the new root page. - */ - if ( PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add rightkey to new root page"); - pfree(new_item); - - /* write and let go of the root buffer */ - rootbknum = BufferGetBlockNumber(rootbuf); - _bt_wrtbuf(rel, rootbuf); - - /* update metadata page with new root block number */ - _bt_metaproot(rel, rootbknum, 0); + Buffer rootbuf; + Page lpage, + rpage, + rootpage; + BlockNumber lbkno, + rbkno; + BlockNumber rootbknum; + BTPageOpaque rootopaque; + ItemId itemid; + BTItem item; + Size itemsz; + BTItem new_item; + + /* get a new root page */ + rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + rootpage = BufferGetPage(rootbuf); + _bt_pageinit(rootpage, BufferGetPageSize(rootbuf)); + + /* set btree special data */ + rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); + rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE; + rootopaque->btpo_flags |= BTP_ROOT; + + /* + * Insert the internal tuple pointers. + */ + + lbkno = BufferGetBlockNumber(lbuf); + rbkno = BufferGetBlockNumber(rbuf); + lpage = BufferGetPage(lbuf); + rpage = BufferGetPage(rbuf); + + /* + * step over the high key on the left page while building the left + * page pointer. + */ + itemid = PageGetItemId(lpage, P_FIRSTKEY); + itemsz = ItemIdGetLength(itemid); + item = (BTItem) PageGetItem(lpage, itemid); + new_item = _bt_formitem(&(item->bti_itup)); + ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY); + + /* + * insert the left page pointer into the new root page. the root page + * is the rightmost page on its level so the "high key" item is the + * first data item. + */ + if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add leftkey to new root page"); + pfree(new_item); + + /* + * the right page is the rightmost page on the second level, so the + * "high key" item is the first data item on that page as well. + */ + itemid = PageGetItemId(rpage, P_HIKEY); + itemsz = ItemIdGetLength(itemid); + item = (BTItem) PageGetItem(rpage, itemid); + new_item = _bt_formitem(&(item->bti_itup)); + ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY); + + /* + * insert the right page pointer into the new root page. + */ + if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add rightkey to new root page"); + pfree(new_item); + + /* write and let go of the root buffer */ + rootbknum = BufferGetBlockNumber(rootbuf); + _bt_wrtbuf(rel, rootbuf); + + /* update metadata page with new root block number */ + _bt_metaproot(rel, rootbknum, 0); } /* - * _bt_pgaddtup() -- add a tuple to a particular page in the index. + * _bt_pgaddtup() -- add a tuple to a particular page in the index. * - * This routine adds the tuple to the page as requested, and keeps the - * write lock and reference associated with the page's buffer. It is - * an error to call pgaddtup() without a write lock and reference. If - * afteritem is non-null, it's the item that we expect our new item - * to follow. Otherwise, we do a binary search for the correct place - * and insert the new item there. + * This routine adds the tuple to the page as requested, and keeps the + * write lock and reference associated with the page's buffer. It is + * an error to call pgaddtup() without a write lock and reference. If + * afteritem is non-null, it's the item that we expect our new item + * to follow. Otherwise, we do a binary search for the correct place + * and insert the new item there. */ -static OffsetNumber +static OffsetNumber _bt_pgaddtup(Relation rel, - Buffer buf, - int keysz, - ScanKey itup_scankey, - Size itemsize, - BTItem btitem, - BTItem afteritem) + Buffer buf, + int keysz, + ScanKey itup_scankey, + Size itemsize, + BTItem btitem, + BTItem afteritem) { - OffsetNumber itup_off; - OffsetNumber first; - Page page; - BTPageOpaque opaque; - BTItem chkitem; - - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - first = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; - - if (afteritem == (BTItem) NULL) { - itup_off = _bt_binsrch(rel, buf, keysz, itup_scankey, BT_INSERTION); - } else { - itup_off = first; - - do { - chkitem = - (BTItem) PageGetItem(page, PageGetItemId(page, itup_off)); - itup_off = OffsetNumberNext(itup_off); - } while ( ! BTItemSame (chkitem, afteritem) ); - } - - if ( PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add item to the page"); - - /* write the buffer, but hold our lock */ - _bt_wrtnorelbuf(rel, buf); - - return (itup_off); + OffsetNumber itup_off; + OffsetNumber first; + Page page; + BTPageOpaque opaque; + BTItem chkitem; + + page = BufferGetPage(buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + first = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY; + + if (afteritem == (BTItem) NULL) + { + itup_off = _bt_binsrch(rel, buf, keysz, itup_scankey, BT_INSERTION); + } + else + { + itup_off = first; + + do + { + chkitem = + (BTItem) PageGetItem(page, PageGetItemId(page, itup_off)); + itup_off = OffsetNumberNext(itup_off); + } while (!BTItemSame(chkitem, afteritem)); + } + + if (PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add item to the page"); + + /* write the buffer, but hold our lock */ + _bt_wrtnorelbuf(rel, buf); + + return (itup_off); } /* - * _bt_goesonpg() -- Does a new tuple belong on this page? + * _bt_goesonpg() -- Does a new tuple belong on this page? * - * This is part of the complexity introduced by allowing duplicate - * keys into the index. The tuple belongs on this page if: + * This is part of the complexity introduced by allowing duplicate + * keys into the index. The tuple belongs on this page if: * - * + there is no page to the right of this one; or - * + it is less than the high key on the page; or - * + the item it is to follow ("afteritem") appears on this - * page. + * + there is no page to the right of this one; or + * + it is less than the high key on the page; or + * + the item it is to follow ("afteritem") appears on this + * page. */ -static bool +static bool _bt_goesonpg(Relation rel, - Buffer buf, - Size keysz, - ScanKey scankey, - BTItem afteritem) + Buffer buf, + Size keysz, + ScanKey scankey, + BTItem afteritem) { - Page page; - ItemId hikey; - BTPageOpaque opaque; - BTItem chkitem; - OffsetNumber offnum, maxoff; - bool found; - - page = BufferGetPage(buf); - - /* no right neighbor? */ - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - if (P_RIGHTMOST(opaque)) - return (true); - - /* - * this is a non-rightmost page, so it must have a high key item. - * - * If the scan key is < the high key (the min key on the next page), - * then it for sure belongs here. - */ - hikey = PageGetItemId(page, P_HIKEY); - if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTLessStrategyNumber)) - return (true); - - /* - * If the scan key is > the high key, then it for sure doesn't belong - * here. - */ - - if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTGreaterStrategyNumber)) - return (false); - - /* - * If we have no adjacency information, and the item is equal to the - * high key on the page (by here it is), then the item does not belong - * on this page. - * - * Now it's not true in all cases. - vadim 06/10/97 - */ - - if (afteritem == (BTItem) NULL) - { - if ( opaque->btpo_flags & BTP_LEAF ) - return (false); - if ( opaque->btpo_flags & BTP_CHAIN ) - return (true); - if ( _bt_skeycmp (rel, keysz, scankey, page, - PageGetItemId(page, P_FIRSTKEY), - BTEqualStrategyNumber) ) - return (true); - return (false); - } - - /* damn, have to work for it. i hate that. */ - maxoff = PageGetMaxOffsetNumber(page); - - /* - * Search the entire page for the afteroid. We need to do this, rather - * than doing a binary search and starting from there, because if the - * key we're searching for is the leftmost key in the tree at this - * level, then a binary search will do the wrong thing. Splits are - * pretty infrequent, so the cost isn't as bad as it could be. - */ - - found = false; - for (offnum = P_FIRSTKEY; - offnum <= maxoff; - offnum = OffsetNumberNext(offnum)) { - chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); - - if ( BTItemSame (chkitem, afteritem) ) { - found = true; - break; + Page page; + ItemId hikey; + BTPageOpaque opaque; + BTItem chkitem; + OffsetNumber offnum, + maxoff; + bool found; + + page = BufferGetPage(buf); + + /* no right neighbor? */ + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (P_RIGHTMOST(opaque)) + return (true); + + /* + * this is a non-rightmost page, so it must have a high key item. + * + * If the scan key is < the high key (the min key on the next page), then + * it for sure belongs here. + */ + hikey = PageGetItemId(page, P_HIKEY); + if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTLessStrategyNumber)) + return (true); + + /* + * If the scan key is > the high key, then it for sure doesn't belong + * here. + */ + + if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTGreaterStrategyNumber)) + return (false); + + /* + * If we have no adjacency information, and the item is equal to the + * high key on the page (by here it is), then the item does not belong + * on this page. + * + * Now it's not true in all cases. - vadim 06/10/97 + */ + + if (afteritem == (BTItem) NULL) + { + if (opaque->btpo_flags & BTP_LEAF) + return (false); + if (opaque->btpo_flags & BTP_CHAIN) + return (true); + if (_bt_skeycmp(rel, keysz, scankey, page, + PageGetItemId(page, P_FIRSTKEY), + BTEqualStrategyNumber)) + return (true); + return (false); + } + + /* damn, have to work for it. i hate that. */ + maxoff = PageGetMaxOffsetNumber(page); + + /* + * Search the entire page for the afteroid. We need to do this, + * rather than doing a binary search and starting from there, because + * if the key we're searching for is the leftmost key in the tree at + * this level, then a binary search will do the wrong thing. Splits + * are pretty infrequent, so the cost isn't as bad as it could be. + */ + + found = false; + for (offnum = P_FIRSTKEY; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); + + if (BTItemSame(chkitem, afteritem)) + { + found = true; + break; + } } - } - - return (found); + + return (found); } /* - * _bt_itemcmp() -- compare item1 to item2 using a requested - * strategy (<, <=, =, >=, >) + * _bt_itemcmp() -- compare item1 to item2 using a requested + * strategy (<, <=, =, >=, >) * */ bool _bt_itemcmp(Relation rel, - Size keysz, - BTItem item1, - BTItem item2, - StrategyNumber strat) + Size keysz, + BTItem item1, + BTItem item2, + StrategyNumber strat) { - TupleDesc tupDes; - IndexTuple indexTuple1, indexTuple2; - Datum attrDatum1, attrDatum2; - int i; - bool isFirstNull, isSecondNull; - bool compare; - bool useEqual = false; - - if ( strat == BTLessEqualStrategyNumber ) - { - useEqual = true; - strat = BTLessStrategyNumber; - } - else if ( strat == BTGreaterEqualStrategyNumber ) - { - useEqual = true; - strat = BTGreaterStrategyNumber; - } - - tupDes = RelationGetTupleDescriptor(rel); - indexTuple1 = &(item1->bti_itup); - indexTuple2 = &(item2->bti_itup); - - for (i = 1; i <= keysz; i++) { - attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull); - attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull); - - /* see comments about NULLs handling in btbuild */ - if ( isFirstNull ) /* attr in item1 is NULL */ + TupleDesc tupDes; + IndexTuple indexTuple1, + indexTuple2; + Datum attrDatum1, + attrDatum2; + int i; + bool isFirstNull, + isSecondNull; + bool compare; + bool useEqual = false; + + if (strat == BTLessEqualStrategyNumber) { - if ( isSecondNull ) /* attr in item2 is NULL too */ - compare = ( strat == BTEqualStrategyNumber ) ? true : false; - else - compare = ( strat == BTGreaterStrategyNumber ) ? true : false; - } - else if ( isSecondNull ) /* attr in item1 is NOT_NULL and */ - { /* and attr in item2 is NULL */ - compare = ( strat == BTLessStrategyNumber ) ? true : false; - } - else - { - compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2); + useEqual = true; + strat = BTLessStrategyNumber; } - - if ( compare ) /* true for one of ">, <, =" */ + else if (strat == BTGreaterEqualStrategyNumber) { - if ( strat != BTEqualStrategyNumber ) - return (true); + useEqual = true; + strat = BTGreaterStrategyNumber; } - else /* false for one of ">, <, =" */ + + tupDes = RelationGetTupleDescriptor(rel); + indexTuple1 = &(item1->bti_itup); + indexTuple2 = &(item2->bti_itup); + + for (i = 1; i <= keysz; i++) { - if ( strat == BTEqualStrategyNumber ) - return (false); - /* - * if original strat was "<=, >=" OR - * "<, >" but some attribute(s) left - * - need to test for Equality - */ - if ( useEqual || i < keysz ) - { - if ( isFirstNull || isSecondNull ) - compare = ( isFirstNull && isSecondNull ) ? true : false; - else - compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, - attrDatum1, attrDatum2); - if ( compare ) /* item1' and item2' attributes are equal */ - continue; /* - try to compare next attributes */ - } - return (false); + attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull); + attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull); + + /* see comments about NULLs handling in btbuild */ + if (isFirstNull) /* attr in item1 is NULL */ + { + if (isSecondNull) /* attr in item2 is NULL too */ + compare = (strat == BTEqualStrategyNumber) ? true : false; + else + compare = (strat == BTGreaterStrategyNumber) ? true : false; + } + else if (isSecondNull) /* attr in item1 is NOT_NULL and */ + { /* and attr in item2 is NULL */ + compare = (strat == BTLessStrategyNumber) ? true : false; + } + else + { + compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2); + } + + if (compare) /* true for one of ">, <, =" */ + { + if (strat != BTEqualStrategyNumber) + return (true); + } + else +/* false for one of ">, <, =" */ + { + if (strat == BTEqualStrategyNumber) + return (false); + + /* + * if original strat was "<=, >=" OR "<, >" but some + * attribute(s) left - need to test for Equality + */ + if (useEqual || i < keysz) + { + if (isFirstNull || isSecondNull) + compare = (isFirstNull && isSecondNull) ? true : false; + else + compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, + attrDatum1, attrDatum2); + if (compare) /* item1' and item2' attributes are equal */ + continue; /* - try to compare next attributes */ + } + return (false); + } } - } - return (true); + return (true); } /* - * _bt_updateitem() -- updates the key of the item identified by the - * oid with the key of newItem (done in place if - * possible) + * _bt_updateitem() -- updates the key of the item identified by the + * oid with the key of newItem (done in place if + * possible) * */ static void _bt_updateitem(Relation rel, - Size keysz, - Buffer buf, - BTItem oldItem, - BTItem newItem) + Size keysz, + Buffer buf, + BTItem oldItem, + BTItem newItem) { - Page page; - OffsetNumber maxoff; - OffsetNumber i; - ItemPointerData itemPtrData; - BTItem item; - IndexTuple oldIndexTuple, newIndexTuple; - int first; - - page = BufferGetPage(buf); - maxoff = PageGetMaxOffsetNumber(page); - - /* locate item on the page */ - first = P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page)) - ? P_HIKEY : P_FIRSTKEY; - i = first; - do { - item = (BTItem) PageGetItem(page, PageGetItemId(page, i)); - i = OffsetNumberNext(i); - } while (i <= maxoff && ! BTItemSame (item, oldItem)); - - /* this should never happen (in theory) */ - if ( ! BTItemSame (item, oldItem) ) { - elog(FATAL, "_bt_getstackbuf was lying!!"); - } - - /* - * It's defined by caller (_bt_insertonpg) - */ - /* - if(IndexTupleDSize(newItem->bti_itup) > - IndexTupleDSize(item->bti_itup)) { - elog(NOTICE, "trying to overwrite a smaller value with a bigger one in _bt_updateitem"); - elog(WARN, "this is not good."); - } - */ - - oldIndexTuple = &(item->bti_itup); - newIndexTuple = &(newItem->bti_itup); + Page page; + OffsetNumber maxoff; + OffsetNumber i; + ItemPointerData itemPtrData; + BTItem item; + IndexTuple oldIndexTuple, + newIndexTuple; + int first; + + page = BufferGetPage(buf); + maxoff = PageGetMaxOffsetNumber(page); + + /* locate item on the page */ + first = P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page)) + ? P_HIKEY : P_FIRSTKEY; + i = first; + do + { + item = (BTItem) PageGetItem(page, PageGetItemId(page, i)); + i = OffsetNumberNext(i); + } while (i <= maxoff && !BTItemSame(item, oldItem)); + + /* this should never happen (in theory) */ + if (!BTItemSame(item, oldItem)) + { + elog(FATAL, "_bt_getstackbuf was lying!!"); + } + + /* + * It's defined by caller (_bt_insertonpg) + */ + + /* + * if(IndexTupleDSize(newItem->bti_itup) > + * IndexTupleDSize(item->bti_itup)) { elog(NOTICE, "trying to + * overwrite a smaller value with a bigger one in _bt_updateitem"); + * elog(WARN, "this is not good."); } + */ + + oldIndexTuple = &(item->bti_itup); + newIndexTuple = &(newItem->bti_itup); /* keep the original item pointer */ - ItemPointerCopy(&(oldIndexTuple->t_tid), &itemPtrData); - CopyIndexTuple(newIndexTuple, &oldIndexTuple); - ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid)); - + ItemPointerCopy(&(oldIndexTuple->t_tid), &itemPtrData); + CopyIndexTuple(newIndexTuple, &oldIndexTuple); + ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid)); + } /* @@ -1409,177 +1460,179 @@ _bt_updateitem(Relation rel, * * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too. */ -static bool -_bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, - int keysz, ScanKey scankey) +static bool +_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, + int keysz, ScanKey scankey) { - Datum datum; - BTItem btitem; - IndexTuple itup; - ScanKey entry; - AttrNumber attno; - long result; - int i; - bool null; - - btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); - itup = &(btitem->bti_itup); - - for (i = 1; i <= keysz; i++) - { - entry = &scankey[i - 1]; - attno = entry->sk_attno; - Assert (attno == i); - datum = index_getattr(itup, attno, itupdesc, &null); - - /* NULLs are not equal */ - if ( entry->sk_flags & SK_ISNULL || null ) - return (false); - - result = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure, - entry->sk_argument, datum); - if (result != 0) - return (false); - } - - /* by here, the keys are equal */ - return (true); + Datum datum; + BTItem btitem; + IndexTuple itup; + ScanKey entry; + AttrNumber attno; + long result; + int i; + bool null; + + btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum)); + itup = &(btitem->bti_itup); + + for (i = 1; i <= keysz; i++) + { + entry = &scankey[i - 1]; + attno = entry->sk_attno; + Assert(attno == i); + datum = index_getattr(itup, attno, itupdesc, &null); + + /* NULLs are not equal */ + if (entry->sk_flags & SK_ISNULL || null) + return (false); + + result = (long) FMGR_PTR2(entry->sk_func, entry->sk_procedure, + entry->sk_argument, datum); + if (result != 0) + return (false); + } + + /* by here, the keys are equal */ + return (true); } #ifdef NOT_USED /* - * _bt_shift - insert btitem on the passed page after shifting page - * to the right in the tree. + * _bt_shift - insert btitem on the passed page after shifting page + * to the right in the tree. * * NOTE: tested for shifting leftmost page only, having btitem < hikey. */ -static InsertIndexResult -_bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, - ScanKey scankey, BTItem btitem, BTItem hikey) +static InsertIndexResult +_bt_shift(Relation rel, Buffer buf, BTStack stack, int keysz, + ScanKey scankey, BTItem btitem, BTItem hikey) { - InsertIndexResult res; - int itemsz; - Page page; - BlockNumber bknum; - BTPageOpaque pageop; - Buffer rbuf; - Page rpage; - BTPageOpaque rpageop; - Buffer pbuf; - Page ppage; - BTPageOpaque ppageop; - Buffer nbuf; - Page npage; - BTPageOpaque npageop; - BlockNumber nbknum; - BTItem nitem; - OffsetNumber afteroff; - - btitem = _bt_formitem(&(btitem->bti_itup)); - hikey = _bt_formitem(&(hikey->bti_itup)); - - page = BufferGetPage(buf); - - /* grab new page */ - nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); - nbknum = BufferGetBlockNumber(nbuf); - npage = BufferGetPage(nbuf); - _bt_pageinit(npage, BufferGetPageSize(nbuf)); - npageop = (BTPageOpaque) PageGetSpecialPointer(npage); - - /* copy content of the passed page */ - memmove ((char *) npage, (char *) page, BufferGetPageSize(buf)); - - /* re-init old (passed) page */ - _bt_pageinit(page, BufferGetPageSize(buf)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - /* init old page opaque */ - pageop->btpo_flags = npageop->btpo_flags; /* restore flags */ - pageop->btpo_flags &= ~BTP_CHAIN; - if ( _bt_itemcmp (rel, keysz, hikey, btitem, BTEqualStrategyNumber) ) - pageop->btpo_flags |= BTP_CHAIN; - pageop->btpo_prev = npageop->btpo_prev; /* restore prev */ - pageop->btpo_next = nbknum; /* next points to the new page */ - - /* init shifted page opaque */ - npageop->btpo_prev = bknum = BufferGetBlockNumber(buf); - - /* shifted page is ok, populate old page */ - - /* add passed hikey */ - itemsz = IndexTupleDSize(hikey->bti_itup) - + (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = DOUBLEALIGN(itemsz); - if ( PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add hikey in _bt_shift"); - pfree (hikey); - - /* add btitem */ - itemsz = IndexTupleDSize(btitem->bti_itup) - + (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = DOUBLEALIGN(itemsz); - if ( PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber ) - elog (FATAL, "btree: failed to add firstkey in _bt_shift"); - pfree (btitem); - nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); - btitem = _bt_formitem(&(nitem->bti_itup)); - ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY); - - /* ok, write them out */ - _bt_wrtnorelbuf(rel, nbuf); - _bt_wrtnorelbuf(rel, buf); - - /* fix btpo_prev on right sibling of old page */ - if ( !P_RIGHTMOST (npageop) ) - { - rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE); - rpage = BufferGetPage(rbuf); - rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); - rpageop->btpo_prev = nbknum; - _bt_wrtbuf(rel, rbuf); - } - - /* get parent pointing to the old page */ - ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), - bknum, P_HIKEY); - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); - ppage = BufferGetPage(pbuf); - ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); - - _bt_relbuf(rel, nbuf, BT_WRITE); - _bt_relbuf(rel, buf, BT_WRITE); - - /* re-set parent' pointer - we shifted our page to the right ! */ - nitem = (BTItem) PageGetItem (ppage, - PageGetItemId (ppage, stack->bts_offset)); - ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY); - ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY); - _bt_wrtnorelbuf(rel, pbuf); - - /* - * Now we want insert into the parent pointer to our old page. It has to - * be inserted before the pointer to new page. You may get problems here - * (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I don't - * know. It works if old page is leftmost (nitem is NULL) and - * btitem < hikey and it's all what we need currently. - vadim 05/30/97 - */ - nitem = NULL; - afteroff = P_FIRSTKEY; - if ( !P_RIGHTMOST (ppageop) ) - afteroff = OffsetNumberNext (afteroff); - if ( stack->bts_offset >= afteroff ) - { - afteroff = OffsetNumberPrev (stack->bts_offset); - nitem = (BTItem) PageGetItem (ppage, PageGetItemId (ppage, afteroff)); - nitem = _bt_formitem(&(nitem->bti_itup)); - } - res = _bt_insertonpg(rel, pbuf, stack->bts_parent, - keysz, scankey, btitem, nitem); - pfree (btitem); - - ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY); - - return (res); + InsertIndexResult res; + int itemsz; + Page page; + BlockNumber bknum; + BTPageOpaque pageop; + Buffer rbuf; + Page rpage; + BTPageOpaque rpageop; + Buffer pbuf; + Page ppage; + BTPageOpaque ppageop; + Buffer nbuf; + Page npage; + BTPageOpaque npageop; + BlockNumber nbknum; + BTItem nitem; + OffsetNumber afteroff; + + btitem = _bt_formitem(&(btitem->bti_itup)); + hikey = _bt_formitem(&(hikey->bti_itup)); + + page = BufferGetPage(buf); + + /* grab new page */ + nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + nbknum = BufferGetBlockNumber(nbuf); + npage = BufferGetPage(nbuf); + _bt_pageinit(npage, BufferGetPageSize(nbuf)); + npageop = (BTPageOpaque) PageGetSpecialPointer(npage); + + /* copy content of the passed page */ + memmove((char *) npage, (char *) page, BufferGetPageSize(buf)); + + /* re-init old (passed) page */ + _bt_pageinit(page, BufferGetPageSize(buf)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + /* init old page opaque */ + pageop->btpo_flags = npageop->btpo_flags; /* restore flags */ + pageop->btpo_flags &= ~BTP_CHAIN; + if (_bt_itemcmp(rel, keysz, hikey, btitem, BTEqualStrategyNumber)) + pageop->btpo_flags |= BTP_CHAIN; + pageop->btpo_prev = npageop->btpo_prev; /* restore prev */ + pageop->btpo_next = nbknum; /* next points to the new page */ + + /* init shifted page opaque */ + npageop->btpo_prev = bknum = BufferGetBlockNumber(buf); + + /* shifted page is ok, populate old page */ + + /* add passed hikey */ + itemsz = IndexTupleDSize(hikey->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = DOUBLEALIGN(itemsz); + if (PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add hikey in _bt_shift"); + pfree(hikey); + + /* add btitem */ + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = DOUBLEALIGN(itemsz); + if (PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber) + elog(FATAL, "btree: failed to add firstkey in _bt_shift"); + pfree(btitem); + nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); + btitem = _bt_formitem(&(nitem->bti_itup)); + ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY); + + /* ok, write them out */ + _bt_wrtnorelbuf(rel, nbuf); + _bt_wrtnorelbuf(rel, buf); + + /* fix btpo_prev on right sibling of old page */ + if (!P_RIGHTMOST(npageop)) + { + rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE); + rpage = BufferGetPage(rbuf); + rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); + rpageop->btpo_prev = nbknum; + _bt_wrtbuf(rel, rbuf); + } + + /* get parent pointing to the old page */ + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), + bknum, P_HIKEY); + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + ppage = BufferGetPage(pbuf); + ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); + + _bt_relbuf(rel, nbuf, BT_WRITE); + _bt_relbuf(rel, buf, BT_WRITE); + + /* re-set parent' pointer - we shifted our page to the right ! */ + nitem = (BTItem) PageGetItem(ppage, + PageGetItemId(ppage, stack->bts_offset)); + ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY); + ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY); + _bt_wrtnorelbuf(rel, pbuf); + + /* + * Now we want insert into the parent pointer to our old page. It has + * to be inserted before the pointer to new page. You may get problems + * here (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I + * don't know. It works if old page is leftmost (nitem is NULL) and + * btitem < hikey and it's all what we need currently. - vadim + * 05/30/97 + */ + nitem = NULL; + afteroff = P_FIRSTKEY; + if (!P_RIGHTMOST(ppageop)) + afteroff = OffsetNumberNext(afteroff); + if (stack->bts_offset >= afteroff) + { + afteroff = OffsetNumberPrev(stack->bts_offset); + nitem = (BTItem) PageGetItem(ppage, PageGetItemId(ppage, afteroff)); + nitem = _bt_formitem(&(nitem->bti_itup)); + } + res = _bt_insertonpg(rel, pbuf, stack->bts_parent, + keysz, scankey, btitem, nitem); + pfree(btitem); + + ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY); + + return (res); } + #endif |