aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r--src/backend/access/nbtree/nbtxlog.c34
1 files changed, 22 insertions, 12 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index b565bcb5401..0986ef07cf3 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -202,7 +202,7 @@ btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
}
static void
-btree_xlog_split(bool onleft, XLogReaderState *record)
+btree_xlog_split(bool onleft, bool lhighkey, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
@@ -248,11 +248,14 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
_bt_restore_page(rpage, datapos, datalen);
+ /* Non-leaf page should always have its high key logged. */
+ Assert(isleaf || lhighkey);
+
/*
- * On leaf level, the high key of the left page is equal to the first key
- * on the right page.
+ * When the high key isn't present is the wal record, then we assume it to
+ * be equal to the first key on the right page.
*/
- if (isleaf)
+ if (!lhighkey)
{
ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
@@ -296,13 +299,14 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
}
/* Extract left hikey and its size (assuming 16-bit alignment) */
- if (!isleaf)
+ if (lhighkey)
{
left_hikey = (IndexTuple) datapos;
left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
datapos += left_hikeysz;
datalen -= left_hikeysz;
}
+
Assert(datalen == 0);
newlpage = PageGetTempPageCopySpecial(lpage);
@@ -616,7 +620,7 @@ btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
* heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
* Note that we are not looking at tuple data here, just headers.
*/
- hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
+ hoffnum = ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid));
hitemid = PageGetItemId(hpage, hoffnum);
/*
@@ -764,11 +768,11 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
- rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
+ rightsib = BTreeInnerTupleGetDownLink(itup);
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
- ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
+ BTreeInnerTupleSetDownLink(itup, rightsib);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
@@ -798,7 +802,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
if (xlrec->topparent != InvalidBlockNumber)
- ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY);
+ ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent);
else
ItemPointerSetInvalid(&trunctuple.t_tid);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
@@ -908,7 +912,7 @@ btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
if (xlrec->topparent != InvalidBlockNumber)
- ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY);
+ ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent);
else
ItemPointerSetInvalid(&trunctuple.t_tid);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
@@ -1004,10 +1008,16 @@ btree_redo(XLogReaderState *record)
btree_xlog_insert(false, true, record);
break;
case XLOG_BTREE_SPLIT_L:
- btree_xlog_split(true, record);
+ btree_xlog_split(true, false, record);
+ break;
+ case XLOG_BTREE_SPLIT_L_HIGHKEY:
+ btree_xlog_split(true, true, record);
break;
case XLOG_BTREE_SPLIT_R:
- btree_xlog_split(false, record);
+ btree_xlog_split(false, false, record);
+ break;
+ case XLOG_BTREE_SPLIT_R_HIGHKEY:
+ btree_xlog_split(false, true, record);
break;
case XLOG_BTREE_VACUUM:
btree_xlog_vacuum(record);