diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 119 |
1 files changed, 84 insertions, 35 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index fd7360278db..9bfa0e9acec 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -82,7 +82,7 @@ static void _bt_checksplitloc(FindSplitData *state, int dataitemstoleft, Size firstoldonrightsz); static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, OffsetNumber itup_off); -static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, +static bool _bt_isequal(Relation idxrel, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel); @@ -109,13 +109,16 @@ _bt_doinsert(Relation rel, IndexTuple itup, IndexUniqueCheck checkUnique, Relation heapRel) { bool is_unique = false; - int natts = rel->rd_rel->relnatts; + int indnkeyatts; ScanKey itup_scankey; BTStack stack = NULL; Buffer buf; OffsetNumber offset; bool fastpath; + indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); + Assert(indnkeyatts != 0); + /* we need an insertion scan key to do our search, so build one */ itup_scankey = _bt_mkscankey(rel, itup); @@ -173,12 +176,12 @@ top: * page. */ if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) && - !P_INCOMPLETE_SPLIT(lpageop) && - !P_IGNORE(lpageop) && - (PageGetFreeSpace(page) > itemsz) && - PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) && - _bt_compare(rel, natts, itup_scankey, page, - P_FIRSTDATAKEY(lpageop)) > 0) + !P_INCOMPLETE_SPLIT(lpageop) && + !P_IGNORE(lpageop) && + (PageGetFreeSpace(page) > itemsz) && + PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) && + _bt_compare(rel, indnkeyatts, itup_scankey, page, + P_FIRSTDATAKEY(lpageop)) > 0) { fastpath = true; } @@ -209,7 +212,7 @@ top: if (!fastpath) { /* find the first page containing this key */ - stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, + stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE, NULL); /* trade in our read lock for a write lock */ @@ -223,7 +226,7 @@ top: * need to move right in the tree. See Lehman and Yao for an * excruciatingly precise description. */ - buf = _bt_moveright(rel, buf, natts, itup_scankey, false, + buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false, true, stack, BT_WRITE, NULL); } @@ -253,7 +256,7 @@ top: TransactionId xwait; uint32 speculativeToken; - offset = _bt_binsrch(rel, buf, natts, itup_scankey, false); + offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false); xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey, checkUnique, &is_unique, &speculativeToken); @@ -287,10 +290,12 @@ top: * actual location of the insert is hard to predict because of the * random search used to prevent O(N^2) performance when there are * many duplicate entries, we can just use the "first valid" page. + * This reasoning also applies to INCLUDE indexes, whose extra + * attributes are not considered part of the key space. */ CheckForSerializableConflictIn(rel, NULL, buf); /* do the insertion */ - _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, + _bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup, stack, heapRel); _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false); } @@ -333,8 +338,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, IndexUniqueCheck checkUnique, bool *is_unique, uint32 *speculativeToken) { - TupleDesc itupdesc = RelationGetDescr(rel); - int natts = rel->rd_rel->relnatts; + int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); SnapshotData SnapshotDirty; OffsetNumber maxoff; Page page; @@ -393,7 +397,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, * in real comparison, but only for ordering/finding items on * pages. - vadim 03/24/97 */ - if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey)) + if (!_bt_isequal(rel, page, offset, indnkeyatts, itup_scankey)) break; /* we're past all the equal tuples */ /* okay, we gotta fetch the heap tuple ... */ @@ -557,8 +561,8 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel, /* If scankey == hikey we gotta check the next page too */ if (P_RIGHTMOST(opaque)) break; - if (!_bt_isequal(itupdesc, page, P_HIKEY, - natts, itup_scankey)) + if (!_bt_isequal(rel, page, P_HIKEY, + indnkeyatts, itup_scankey)) break; /* Advance to next non-dead page --- there must be one */ for (;;) @@ -1087,6 +1091,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, OffsetNumber maxoff; OffsetNumber i; bool isleaf; + IndexTuple lefthikey; + int indnatts = IndexRelationGetNumberOfAttributes(rel); + int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel); /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); @@ -1186,7 +1193,23 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, itemsz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(origpage, itemid); } - if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, + + /* + * We must truncate included attributes of the "high key" item, before + * insert it onto the leaf page. It's the only point in insertion + * process, where we perform truncation. All other functions work with + * this high key and do not change it. + */ + if (indnatts != indnkeyatts && isleaf) + { + lefthikey = _bt_truncate_tuple(rel, item); + itemsz = IndexTupleSize(lefthikey); + itemsz = MAXALIGN(itemsz); + } + else + lefthikey = item; + + if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff, false, false) == InvalidOffsetNumber) { memset(rightpage, 0, BufferGetPageSize(rbuf)); @@ -1375,6 +1398,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, xl_btree_split xlrec; uint8 xlinfo; XLogRecPtr recptr; + bool loglhikey = false; xlrec.level = ropaque->btpo.level; xlrec.firstright = firstright; @@ -1404,18 +1428,20 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz)); /* Log left page */ - if (!isleaf) + if (!isleaf || indnatts != indnkeyatts) { /* - * We must also log the left page's high key, because the right - * page's leftmost key is suppressed on non-leaf levels. Show it - * as belonging to the left page buffer, so that it is not stored - * if XLogInsert decides it needs a full-page image of the left - * page. + * We must also log the left page's high key. There are two + * reasons for that: right page's leftmost key is suppressed on + * non-leaf levels and in covering indexes included columns are + * truncated from high keys. Show it as belonging to the left + * page buffer, so that it is not stored if XLogInsert decides it + * needs a full-page image of the left page. */ itemid = PageGetItemId(origpage, P_HIKEY); item = (IndexTuple) PageGetItem(origpage, itemid); XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item))); + loglhikey = true; } /* @@ -1434,7 +1460,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, (char *) rightpage + ((PageHeader) rightpage)->pd_upper, ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper); - xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R; + xlinfo = newitemonleft ? + (loglhikey ? XLOG_BTREE_SPLIT_L_HIGHKEY : XLOG_BTREE_SPLIT_L) : + (loglhikey ? XLOG_BTREE_SPLIT_R_HIGHKEY : XLOG_BTREE_SPLIT_R); recptr = XLogInsert(RM_BTREE_ID, xlinfo); PageSetLSN(origpage, recptr); @@ -1664,7 +1692,12 @@ _bt_checksplitloc(FindSplitData *state, /* * The first item on the right page becomes the high key of the left page; - * therefore it counts against left space as well as right space. + * therefore it counts against left space as well as right space. When + * index has included attribues, then those attributes of left page high + * key will be truncate leaving that page with slightly more free space. + * However, that shouldn't affect our ability to find valid split + * location, because anyway split location should exists even without high + * key truncation. */ leftfree -= firstrightitemsz; @@ -1787,18 +1820,18 @@ _bt_insert_parent(Relation rel, stack = &fakestack; stack->bts_blkno = BufferGetBlockNumber(pbuf); stack->bts_offset = InvalidOffsetNumber; - /* bts_btentry will be initialized below */ + stack->bts_btentry = InvalidBlockNumber; stack->bts_parent = NULL; _bt_relbuf(rel, pbuf); } - /* get high key from left page == lowest key on new right page */ + /* get high key from left page == lower bound for new right page */ ritem = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY)); /* form an index tuple that points at the new right page */ new_item = CopyIndexTuple(ritem); - ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY); + BTreeInnerTupleSetDownLink(new_item, rbknum); /* * Find the parent buffer and get the parent page. @@ -1807,7 +1840,7 @@ _bt_insert_parent(Relation rel, * want to find parent pointing to where we are, right ? - vadim * 05/27/97 */ - ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY); + stack->bts_btentry = bknum; pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); /* @@ -1962,7 +1995,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) { itemid = PageGetItemId(page, offnum); item = (IndexTuple) PageGetItem(page, itemid); - if (BTEntrySame(item, &stack->bts_btentry)) + + if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry) { /* Return accurate pointer to where link is now */ stack->bts_blkno = blkno; @@ -1977,7 +2011,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) { itemid = PageGetItemId(page, offnum); item = (IndexTuple) PageGetItem(page, itemid); - if (BTEntrySame(item, &stack->bts_btentry)) + + if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry) { /* Return accurate pointer to where link is now */ stack->bts_blkno = blkno; @@ -2067,7 +2102,8 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) left_item_sz = sizeof(IndexTupleData); left_item = (IndexTuple) palloc(left_item_sz); left_item->t_info = left_item_sz; - ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY); + BTreeInnerTupleSetDownLink(left_item, lbkno); + BTreeTupSetNAtts(left_item, 0); /* * Create downlink item for right page. The key for it is obtained from @@ -2077,7 +2113,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) right_item_sz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(lpage, itemid); right_item = CopyIndexTuple(item); - ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY); + BTreeInnerTupleSetDownLink(right_item, rbkno); /* NO EREPORT(ERROR) from here till newroot op is logged */ START_CRIT_SECTION(); @@ -2208,6 +2244,7 @@ _bt_pgaddtup(Page page, { trunctuple = *itup; trunctuple.t_info = sizeof(IndexTupleData); + BTreeTupSetNAtts(&trunctuple, 0); itup = &trunctuple; itemsize = sizeof(IndexTupleData); } @@ -2226,9 +2263,10 @@ _bt_pgaddtup(Page page, * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too. */ static bool -_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, +_bt_isequal(Relation idxrel, Page page, OffsetNumber offnum, int keysz, ScanKey scankey) { + TupleDesc itupdesc = RelationGetDescr(idxrel); IndexTuple itup; int i; @@ -2237,6 +2275,17 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); + /* + * Index tuple shouldn't be truncated. Despite we technically could + * compare truncated tuple as well, this function should be only called + * for regular non-truncated leaf tuples and P_HIKEY tuple on + * rightmost leaf page. + */ + Assert((P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page)) || + offnum != P_HIKEY) + ? BTreeTupGetNAtts(itup, idxrel) == itupdesc->natts + : true); + for (i = 1; i <= keysz; i++) { AttrNumber attno; |