aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtinsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r--src/backend/access/nbtree/nbtinsert.c119
1 files changed, 84 insertions, 35 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index fd7360278db..9bfa0e9acec 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -82,7 +82,7 @@ static void _bt_checksplitloc(FindSplitData *state,
int dataitemstoleft, Size firstoldonrightsz);
static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
OffsetNumber itup_off);
-static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
+static bool _bt_isequal(Relation idxrel, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
@@ -109,13 +109,16 @@ _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
- int natts = rel->rd_rel->relnatts;
+ int indnkeyatts;
ScanKey itup_scankey;
BTStack stack = NULL;
Buffer buf;
OffsetNumber offset;
bool fastpath;
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+ Assert(indnkeyatts != 0);
+
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
@@ -173,12 +176,12 @@ top:
* page.
*/
if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) &&
- !P_INCOMPLETE_SPLIT(lpageop) &&
- !P_IGNORE(lpageop) &&
- (PageGetFreeSpace(page) > itemsz) &&
- PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
- _bt_compare(rel, natts, itup_scankey, page,
- P_FIRSTDATAKEY(lpageop)) > 0)
+ !P_INCOMPLETE_SPLIT(lpageop) &&
+ !P_IGNORE(lpageop) &&
+ (PageGetFreeSpace(page) > itemsz) &&
+ PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
+ _bt_compare(rel, indnkeyatts, itup_scankey, page,
+ P_FIRSTDATAKEY(lpageop)) > 0)
{
fastpath = true;
}
@@ -209,7 +212,7 @@ top:
if (!fastpath)
{
/* find the first page containing this key */
- stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE,
+ stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE,
NULL);
/* trade in our read lock for a write lock */
@@ -223,7 +226,7 @@ top:
* need to move right in the tree. See Lehman and Yao for an
* excruciatingly precise description.
*/
- buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+ buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false,
true, stack, BT_WRITE, NULL);
}
@@ -253,7 +256,7 @@ top:
TransactionId xwait;
uint32 speculativeToken;
- offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
checkUnique, &is_unique, &speculativeToken);
@@ -287,10 +290,12 @@ top:
* actual location of the insert is hard to predict because of the
* random search used to prevent O(N^2) performance when there are
* many duplicate entries, we can just use the "first valid" page.
+ * This reasoning also applies to INCLUDE indexes, whose extra
+ * attributes are not considered part of the key space.
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
- _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+ _bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
stack, heapRel);
_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
@@ -333,8 +338,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
IndexUniqueCheck checkUnique, bool *is_unique,
uint32 *speculativeToken)
{
- TupleDesc itupdesc = RelationGetDescr(rel);
- int natts = rel->rd_rel->relnatts;
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
SnapshotData SnapshotDirty;
OffsetNumber maxoff;
Page page;
@@ -393,7 +397,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
- if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+ if (!_bt_isequal(rel, page, offset, indnkeyatts, itup_scankey))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
@@ -557,8 +561,8 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
/* If scankey == hikey we gotta check the next page too */
if (P_RIGHTMOST(opaque))
break;
- if (!_bt_isequal(itupdesc, page, P_HIKEY,
- natts, itup_scankey))
+ if (!_bt_isequal(rel, page, P_HIKEY,
+ indnkeyatts, itup_scankey))
break;
/* Advance to next non-dead page --- there must be one */
for (;;)
@@ -1087,6 +1091,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
OffsetNumber maxoff;
OffsetNumber i;
bool isleaf;
+ IndexTuple lefthikey;
+ int indnatts = IndexRelationGetNumberOfAttributes(rel);
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
/* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@@ -1186,7 +1193,23 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
}
- if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+
+ /*
+ * We must truncate included attributes of the "high key" item, before
+ * insert it onto the leaf page. It's the only point in insertion
+ * process, where we perform truncation. All other functions work with
+ * this high key and do not change it.
+ */
+ if (indnatts != indnkeyatts && isleaf)
+ {
+ lefthikey = _bt_truncate_tuple(rel, item);
+ itemsz = IndexTupleSize(lefthikey);
+ itemsz = MAXALIGN(itemsz);
+ }
+ else
+ lefthikey = item;
+
+ if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
{
memset(rightpage, 0, BufferGetPageSize(rbuf));
@@ -1375,6 +1398,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
+ bool loglhikey = false;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
@@ -1404,18 +1428,20 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
/* Log left page */
- if (!isleaf)
+ if (!isleaf || indnatts != indnkeyatts)
{
/*
- * We must also log the left page's high key, because the right
- * page's leftmost key is suppressed on non-leaf levels. Show it
- * as belonging to the left page buffer, so that it is not stored
- * if XLogInsert decides it needs a full-page image of the left
- * page.
+ * We must also log the left page's high key. There are two
+ * reasons for that: right page's leftmost key is suppressed on
+ * non-leaf levels and in covering indexes included columns are
+ * truncated from high keys. Show it as belonging to the left
+ * page buffer, so that it is not stored if XLogInsert decides it
+ * needs a full-page image of the left page.
*/
itemid = PageGetItemId(origpage, P_HIKEY);
item = (IndexTuple) PageGetItem(origpage, itemid);
XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
+ loglhikey = true;
}
/*
@@ -1434,7 +1460,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
(char *) rightpage + ((PageHeader) rightpage)->pd_upper,
((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
- xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
+ xlinfo = newitemonleft ?
+ (loglhikey ? XLOG_BTREE_SPLIT_L_HIGHKEY : XLOG_BTREE_SPLIT_L) :
+ (loglhikey ? XLOG_BTREE_SPLIT_R_HIGHKEY : XLOG_BTREE_SPLIT_R);
recptr = XLogInsert(RM_BTREE_ID, xlinfo);
PageSetLSN(origpage, recptr);
@@ -1664,7 +1692,12 @@ _bt_checksplitloc(FindSplitData *state,
/*
* The first item on the right page becomes the high key of the left page;
- * therefore it counts against left space as well as right space.
+ * therefore it counts against left space as well as right space. When
+ * index has included attribues, then those attributes of left page high
+ * key will be truncate leaving that page with slightly more free space.
+ * However, that shouldn't affect our ability to find valid split
+ * location, because anyway split location should exists even without high
+ * key truncation.
*/
leftfree -= firstrightitemsz;
@@ -1787,18 +1820,18 @@ _bt_insert_parent(Relation rel,
stack = &fakestack;
stack->bts_blkno = BufferGetBlockNumber(pbuf);
stack->bts_offset = InvalidOffsetNumber;
- /* bts_btentry will be initialized below */
+ stack->bts_btentry = InvalidBlockNumber;
stack->bts_parent = NULL;
_bt_relbuf(rel, pbuf);
}
- /* get high key from left page == lowest key on new right page */
+ /* get high key from left page == lower bound for new right page */
ritem = (IndexTuple) PageGetItem(page,
PageGetItemId(page, P_HIKEY));
/* form an index tuple that points at the new right page */
new_item = CopyIndexTuple(ritem);
- ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
+ BTreeInnerTupleSetDownLink(new_item, rbknum);
/*
* Find the parent buffer and get the parent page.
@@ -1807,7 +1840,7 @@ _bt_insert_parent(Relation rel,
* want to find parent pointing to where we are, right ? - vadim
* 05/27/97
*/
- ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
+ stack->bts_btentry = bknum;
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
/*
@@ -1962,7 +1995,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
{
itemid = PageGetItemId(page, offnum);
item = (IndexTuple) PageGetItem(page, itemid);
- if (BTEntrySame(item, &stack->bts_btentry))
+
+ if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
{
/* Return accurate pointer to where link is now */
stack->bts_blkno = blkno;
@@ -1977,7 +2011,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
{
itemid = PageGetItemId(page, offnum);
item = (IndexTuple) PageGetItem(page, itemid);
- if (BTEntrySame(item, &stack->bts_btentry))
+
+ if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
{
/* Return accurate pointer to where link is now */
stack->bts_blkno = blkno;
@@ -2067,7 +2102,8 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
left_item_sz = sizeof(IndexTupleData);
left_item = (IndexTuple) palloc(left_item_sz);
left_item->t_info = left_item_sz;
- ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
+ BTreeInnerTupleSetDownLink(left_item, lbkno);
+ BTreeTupSetNAtts(left_item, 0);
/*
* Create downlink item for right page. The key for it is obtained from
@@ -2077,7 +2113,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
right_item_sz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(lpage, itemid);
right_item = CopyIndexTuple(item);
- ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
+ BTreeInnerTupleSetDownLink(right_item, rbkno);
/* NO EREPORT(ERROR) from here till newroot op is logged */
START_CRIT_SECTION();
@@ -2208,6 +2244,7 @@ _bt_pgaddtup(Page page,
{
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
+ BTreeTupSetNAtts(&trunctuple, 0);
itup = &trunctuple;
itemsize = sizeof(IndexTupleData);
}
@@ -2226,9 +2263,10 @@ _bt_pgaddtup(Page page,
* Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
*/
static bool
-_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
+_bt_isequal(Relation idxrel, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey)
{
+ TupleDesc itupdesc = RelationGetDescr(idxrel);
IndexTuple itup;
int i;
@@ -2237,6 +2275,17 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+ /*
+ * Index tuple shouldn't be truncated. Despite we technically could
+ * compare truncated tuple as well, this function should be only called
+ * for regular non-truncated leaf tuples and P_HIKEY tuple on
+ * rightmost leaf page.
+ */
+ Assert((P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page)) ||
+ offnum != P_HIKEY)
+ ? BTreeTupGetNAtts(itup, idxrel) == itupdesc->natts
+ : true);
+
for (i = 1; i <= keysz; i++)
{
AttrNumber attno;