aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree')
-rw-r--r--src/backend/access/nbtree/nbtinsert.c207
-rw-r--r--src/backend/access/nbtree/nbtpage.c175
-rw-r--r--src/backend/access/nbtree/nbtxlog.c353
3 files changed, 257 insertions, 478 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index bcaba7e5e84..2c4f9904e1a 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -837,37 +837,25 @@ _bt_insertonpg(Relation rel,
if (RelationNeedsWAL(rel))
{
xl_btree_insert xlrec;
- BlockNumber xlleftchild;
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
- XLogRecData *nextrdata;
IndexTupleData trunctuple;
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+ xlrec.offnum = itup_off;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeInsert;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = nextrdata = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
if (P_ISLEAF(lpageop))
xlinfo = XLOG_BTREE_INSERT_LEAF;
else
{
/*
- * Include the block number of the left child, whose
- * INCOMPLETE_SPLIT flag was cleared.
+ * Register the left child whose INCOMPLETE_SPLIT flag was
+ * cleared.
*/
- xlleftchild = BufferGetBlockNumber(cbuf);
- nextrdata->data = (char *) &xlleftchild;
- nextrdata->len = sizeof(BlockNumber);
- nextrdata->buffer = cbuf;
- nextrdata->buffer_std = true;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
xlinfo = XLOG_BTREE_INSERT_UPPER;
}
@@ -879,33 +867,25 @@ _bt_insertonpg(Relation rel,
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
- nextrdata->data = (char *) &xlmeta;
- nextrdata->len = sizeof(xl_btree_metadata);
- nextrdata->buffer = InvalidBuffer;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+ XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
xlinfo = XLOG_BTREE_INSERT_META;
}
/* Read comments in _bt_pgaddtup */
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
{
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
- nextrdata->data = (char *) &trunctuple;
- nextrdata->len = sizeof(IndexTupleData);
+ XLogRegisterBufData(0, (char *) &trunctuple,
+ sizeof(IndexTupleData));
}
else
- {
- nextrdata->data = (char *) itup;
- nextrdata->len = IndexTupleDSize(*itup);
- }
- nextrdata->buffer = buf;
- nextrdata->buffer_std = true;
- nextrdata->next = NULL;
+ XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
if (BufferIsValid(metabuf))
{
@@ -1260,56 +1240,37 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[7];
- XLogRecData *lastrdata;
- BlockNumber cblkno;
-
- xlrec.node = rel->rd_node;
- xlrec.leftsib = origpagenumber;
- xlrec.rightsib = rightpagenumber;
- xlrec.rnext = ropaque->btpo_next;
+
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
+ xlrec.newitemoff = newitemoff;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeSplit;
- rdata[0].buffer = InvalidBuffer;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
- lastrdata = &rdata[0];
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+ /* Log the right sibling, because we've changed its prev-pointer. */
+ if (!P_RIGHTMOST(ropaque))
+ XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
+ if (BufferIsValid(cbuf))
+ XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
/*
- * Log the new item and its offset, if it was inserted on the left
- * page. (If it was put on the right page, we don't need to explicitly
- * WAL log it because it's included with all the other items on the
- * right page.) Show the new item as belonging to the left page
- * buffer, so that it is not stored if XLogInsert decides it needs a
- * full-page image of the left page. We store the offset anyway,
- * though, to support archive compression of these records.
+ * Log the new item, if it was inserted on the left page. (If it was
+ * put on the right page, we don't need to explicitly WAL log it
+ * because it's included with all the other items on the right page.)
+ * Show the new item as belonging to the left page buffer, so that it
+ * is not stored if XLogInsert decides it needs a full-page image of
+ * the left page. We store the offset anyway, though, to support
+ * archive compression of these records.
*/
if (newitemonleft)
- {
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = (char *) &newitemoff;
- lastrdata->len = sizeof(OffsetNumber);
- lastrdata->buffer = InvalidBuffer;
-
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = (char *) newitem;
- lastrdata->len = MAXALIGN(newitemsz);
- lastrdata->buffer = buf; /* backup block 0 */
- lastrdata->buffer_std = true;
- }
+ XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
/* Log left page */
if (!isleaf)
{
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
/*
* We must also log the left page's high key, because the right
* page's leftmost key is suppressed on non-leaf levels. Show it
@@ -1319,43 +1280,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
*/
itemid = PageGetItemId(origpage, P_HIKEY);
item = (IndexTuple) PageGetItem(origpage, itemid);
- lastrdata->data = (char *) item;
- lastrdata->len = MAXALIGN(IndexTupleSize(item));
- lastrdata->buffer = buf; /* backup block 0 */
- lastrdata->buffer_std = true;
- }
-
- if (isleaf && !newitemonleft)
- {
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- /*
- * Although we don't need to WAL-log anything on the left page, we
- * still need XLogInsert to consider storing a full-page image of
- * the left page, so make an empty entry referencing that buffer.
- * This also ensures that the left page is always backup block 0.
- */
- lastrdata->data = NULL;
- lastrdata->len = 0;
- lastrdata->buffer = buf; /* backup block 0 */
- lastrdata->buffer_std = true;
- }
-
- /*
- * Log block number of left child, whose INCOMPLETE_SPLIT flag this
- * insertion clears.
- */
- if (!isleaf)
- {
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- cblkno = BufferGetBlockNumber(cbuf);
- lastrdata->data = (char *) &cblkno;
- lastrdata->len = sizeof(BlockNumber);
- lastrdata->buffer = cbuf; /* backup block 1 */
- lastrdata->buffer_std = true;
+ XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
}
/*
@@ -1370,35 +1295,16 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
* and so the item pointers can be reconstructed. See comments for
* _bt_restore_page().
*/
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = (char *) rightpage +
- ((PageHeader) rightpage)->pd_upper;
- lastrdata->len = ((PageHeader) rightpage)->pd_special -
- ((PageHeader) rightpage)->pd_upper;
- lastrdata->buffer = InvalidBuffer;
-
- /* Log the right sibling, because we've changed its' prev-pointer. */
- if (!P_RIGHTMOST(ropaque))
- {
- lastrdata->next = lastrdata + 1;
- lastrdata++;
-
- lastrdata->data = NULL;
- lastrdata->len = 0;
- lastrdata->buffer = sbuf; /* bkp block 1 (leaf) or 2 (non-leaf) */
- lastrdata->buffer_std = true;
- }
-
- lastrdata->next = NULL;
+ XLogRegisterBufData(1,
+ (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
+ ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
if (isroot)
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
else
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
PageSetLSN(origpage, recptr);
PageSetLSN(rightpage, recptr);
@@ -2090,34 +1996,35 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
{
xl_btree_newroot xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ xl_btree_metadata md;
- xlrec.node = rel->rd_node;
xlrec.rootblk = rootblknum;
xlrec.level = metad->btm_level;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeNewroot;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
+
+ XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+ md.root = rootblknum;
+ md.level = metad->btm_level;
+ md.fastroot = rootblknum;
+ md.fastlevel = metad->btm_level;
+
+ XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
/*
* Direct access to page is not good but faster - we should implement
* some new func in page API.
*/
- rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
- rdata[1].len = ((PageHeader) rootpage)->pd_special -
- ((PageHeader) rootpage)->pd_upper;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
-
- /* Make a full-page image of the left child if needed */
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].buffer = lbuf;
- rdata[2].next = NULL;
-
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
+ XLogRegisterBufData(0,
+ (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
+ ((PageHeader) rootpage)->pd_special -
+ ((PageHeader) rootpage)->pd_upper);
+
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
PageSetLSN(lpage, recptr);
PageSetLSN(rootpage, recptr);
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index ea95ce6e1ec..a25dafeb400 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -236,18 +236,25 @@ _bt_getroot(Relation rel, int access)
{
xl_btree_newroot xlrec;
XLogRecPtr recptr;
- XLogRecData rdata;
+ xl_btree_metadata md;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
+
+ md.root = rootblkno;
+ md.level = 0;
+ md.fastroot = rootblkno;
+ md.fastlevel = 0;
+
+ XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
- xlrec.node = rel->rd_node;
xlrec.rootblk = rootblkno;
xlrec.level = 0;
- rdata.data = (char *) &xlrec;
- rdata.len = SizeOfBtreeNewroot;
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, &rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
PageSetLSN(rootpage, recptr);
PageSetLSN(metapg, recptr);
@@ -528,39 +535,23 @@ _bt_checkpage(Relation rel, Buffer buf)
static void
_bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedXid)
{
- if (!RelationNeedsWAL(rel))
- return;
-
- /* No ereport(ERROR) until changes are logged */
- START_CRIT_SECTION();
+ xl_btree_reuse_page xlrec_reuse;
/*
- * We don't do MarkBufferDirty here because we're about to initialise the
- * page, and nobody else can see it yet.
+ * Note that we don't register the buffer with the record, because this
+ * operation doesn't modify the page. This record only exists to provide a
+ * conflict point for Hot Standby.
*/
/* XLOG stuff */
- {
- XLogRecData rdata[1];
- xl_btree_reuse_page xlrec_reuse;
+ xlrec_reuse.node = rel->rd_node;
+ xlrec_reuse.block = blkno;
+ xlrec_reuse.latestRemovedXid = latestRemovedXid;
- xlrec_reuse.node = rel->rd_node;
- xlrec_reuse.block = blkno;
- xlrec_reuse.latestRemovedXid = latestRemovedXid;
- rdata[0].data = (char *) &xlrec_reuse;
- rdata[0].len = SizeOfBtreeReusePage;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec_reuse, SizeOfBtreeReusePage);
- XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE, rdata);
-
- /*
- * We don't do PageSetLSN here because we're about to initialise the
- * page, so no need.
- */
- }
-
- END_CRIT_SECTION();
+ XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE);
}
/*
@@ -633,7 +624,7 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access)
* WAL record that will allow us to conflict with queries
* running on standby.
*/
- if (XLogStandbyInfoActive())
+ if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -830,17 +821,13 @@ _bt_delitems_vacuum(Relation rel, Buffer buf,
if (RelationNeedsWAL(rel))
{
XLogRecPtr recptr;
- XLogRecData rdata[2];
xl_btree_vacuum xlrec_vacuum;
- xlrec_vacuum.node = rel->rd_node;
- xlrec_vacuum.block = BufferGetBlockNumber(buf);
-
xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
- rdata[0].data = (char *) &xlrec_vacuum;
- rdata[0].len = SizeOfBtreeVacuum;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterData((char *) &xlrec_vacuum, SizeOfBtreeVacuum);
/*
* The target-offsets array is not in the buffer, but pretend that it
@@ -848,20 +835,9 @@ _bt_delitems_vacuum(Relation rel, Buffer buf,
* need not be stored too.
*/
if (nitems > 0)
- {
- rdata[1].data = (char *) itemnos;
- rdata[1].len = nitems * sizeof(OffsetNumber);
- }
- else
- {
- rdata[1].data = NULL;
- rdata[1].len = 0;
- }
- rdata[1].buffer = buf;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterBufData(0, (char *) itemnos, nitems * sizeof(OffsetNumber));
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM);
PageSetLSN(page, recptr);
}
@@ -919,36 +895,23 @@ _bt_delitems_delete(Relation rel, Buffer buf,
if (RelationNeedsWAL(rel))
{
XLogRecPtr recptr;
- XLogRecData rdata[3];
xl_btree_delete xlrec_delete;
- xlrec_delete.node = rel->rd_node;
xlrec_delete.hnode = heapRel->rd_node;
- xlrec_delete.block = BufferGetBlockNumber(buf);
xlrec_delete.nitems = nitems;
- rdata[0].data = (char *) &xlrec_delete;
- rdata[0].len = SizeOfBtreeDelete;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterData((char *) &xlrec_delete, SizeOfBtreeDelete);
/*
* We need the target-offsets array whether or not we store the whole
* buffer, to allow us to find the latestRemovedXid on a standby
* server.
*/
- rdata[1].data = (char *) itemnos;
- rdata[1].len = nitems * sizeof(OffsetNumber);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].buffer = buf;
- rdata[2].buffer_std = true;
- rdata[2].next = NULL;
+ XLogRegisterData((char *) itemnos, nitems * sizeof(OffsetNumber));
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE);
PageSetLSN(page, recptr);
}
@@ -1493,33 +1456,26 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack)
{
xl_btree_mark_page_halfdead xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(topparent), topoff);
+ xlrec.poffset = topoff;
xlrec.leafblk = leafblkno;
if (target != leafblkno)
xlrec.topparent = target;
else
xlrec.topparent = InvalidBlockNumber;
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, leafbuf, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(1, topparent, REGBUF_STANDARD);
+
page = BufferGetPage(leafbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
xlrec.leftblk = opaque->btpo_prev;
xlrec.rightblk = opaque->btpo_next;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeMarkPageHalfDead;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].buffer = topparent;
- rdata[1].buffer_std = true;
- rdata[1].next = NULL;
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeMarkPageHalfDead);
- recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_MARK_PAGE_HALFDEAD, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_MARK_PAGE_HALFDEAD);
page = BufferGetPage(topparent);
PageSetLSN(page, recptr);
@@ -1826,63 +1782,44 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty)
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[4];
- XLogRecData *nextrdata;
- xlrec.node = rel->rd_node;
+ XLogBeginInsert();
+
+ XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
+ if (BufferIsValid(lbuf))
+ XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
+ XLogRegisterBuffer(2, rbuf, REGBUF_STANDARD);
+ if (target != leafblkno)
+ XLogRegisterBuffer(3, leafbuf, REGBUF_WILL_INIT);
/* information on the unlinked block */
- xlrec.deadblk = target;
xlrec.leftsib = leftsib;
xlrec.rightsib = rightsib;
xlrec.btpo_xact = opaque->btpo.xact;
/* information needed to recreate the leaf block (if not the target) */
- xlrec.leafblk = leafblkno;
xlrec.leafleftsib = leafleftsib;
xlrec.leafrightsib = leafrightsib;
xlrec.topparent = nextchild;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeUnlinkPage;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = nextrdata = &(rdata[1]);
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeUnlinkPage);
if (BufferIsValid(metabuf))
{
+ XLogRegisterBuffer(4, metabuf, REGBUF_WILL_INIT);
+
xlmeta.root = metad->btm_root;
xlmeta.level = metad->btm_level;
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
- nextrdata->data = (char *) &xlmeta;
- nextrdata->len = sizeof(xl_btree_metadata);
- nextrdata->buffer = InvalidBuffer;
- nextrdata->next = nextrdata + 1;
- nextrdata++;
+ XLogRegisterBufData(4, (char *) &xlmeta, sizeof(xl_btree_metadata));
xlinfo = XLOG_BTREE_UNLINK_PAGE_META;
}
else
xlinfo = XLOG_BTREE_UNLINK_PAGE;
- nextrdata->data = NULL;
- nextrdata->len = 0;
- nextrdata->buffer = rbuf;
- nextrdata->buffer_std = true;
- nextrdata->next = NULL;
-
- if (BufferIsValid(lbuf))
- {
- nextrdata->next = nextrdata + 1;
- nextrdata++;
- nextrdata->data = NULL;
- nextrdata->len = 0;
- nextrdata->buffer = lbuf;
- nextrdata->buffer_std = true;
- nextrdata->next = NULL;
- }
-
- recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
if (BufferIsValid(metabuf))
{
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 13951be62af..52aef9b9836 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -72,17 +72,23 @@ _bt_restore_page(Page page, char *from, int len)
}
static void
-_bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
- BlockNumber root, uint32 level,
- BlockNumber fastroot, uint32 fastlevel)
+_bt_restore_meta(XLogReaderState *record, uint8 block_id)
{
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer metabuf;
Page metapg;
BTMetaPageData *md;
BTPageOpaque pageop;
+ xl_btree_metadata *xlrec;
+ char *ptr;
+ Size len;
- metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
- Assert(BufferIsValid(metabuf));
+ metabuf = XLogInitBufferForRedo(record, block_id);
+ ptr = XLogRecGetBlockData(record, block_id, &len);
+
+ Assert(len == sizeof(xl_btree_metadata));
+ Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
+ xlrec = (xl_btree_metadata *) ptr;
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
@@ -90,10 +96,10 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
md = BTPageGetMeta(metapg);
md->btm_magic = BTREE_MAGIC;
md->btm_version = BTREE_VERSION;
- md->btm_root = root;
- md->btm_level = level;
- md->btm_fastroot = fastroot;
- md->btm_fastlevel = fastlevel;
+ md->btm_root = xlrec->root;
+ md->btm_level = xlrec->level;
+ md->btm_fastroot = xlrec->fastroot;
+ md->btm_fastlevel = xlrec->fastlevel;
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
@@ -117,14 +123,12 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
* types that can insert a downlink: insert, split, and newroot.
*/
static void
-_bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
- int block_index,
- RelFileNode rnode, BlockNumber cblock)
+_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
{
+ XLogRecPtr lsn = record->EndRecPtr;
Buffer buf;
- if (XLogReadBufferForRedo(lsn, record, block_index, rnode, cblock, &buf)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buf);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -140,38 +144,12 @@ _bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
}
static void
-btree_xlog_insert(bool isleaf, bool ismeta,
- XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
Buffer buffer;
Page page;
- char *datapos;
- int datalen;
- xl_btree_metadata md;
- BlockNumber cblkno = 0;
- int main_blk_index;
-
- datapos = (char *) xlrec + SizeOfBtreeInsert;
- datalen = record->xl_len - SizeOfBtreeInsert;
-
- /*
- * if this insert finishes a split at lower level, extract the block
- * number of the (left) child.
- */
- if (!isleaf && (record->xl_info & XLR_BKP_BLOCK(0)) == 0)
- {
- memcpy(&cblkno, datapos, sizeof(BlockNumber));
- Assert(cblkno != 0);
- datapos += sizeof(BlockNumber);
- datalen -= sizeof(BlockNumber);
- }
- if (ismeta)
- {
- memcpy(&md, datapos, sizeof(xl_btree_metadata));
- datapos += sizeof(xl_btree_metadata);
- datalen -= sizeof(xl_btree_metadata);
- }
/*
* Insertion to an internal page finishes an incomplete split at the child
@@ -183,21 +161,15 @@ btree_xlog_insert(bool isleaf, bool ismeta,
* cannot be updates happening.
*/
if (!isleaf)
+ _bt_clear_incomplete_split(record, 1);
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
- _bt_clear_incomplete_split(lsn, record, 0, xlrec->target.node, cblkno);
- main_blk_index = 1;
- }
- else
- main_blk_index = 0;
+ Size datalen;
+ char *datapos = XLogRecGetBlockData(record, 0, &datalen);
- if (XLogReadBufferForRedo(lsn, record, main_blk_index, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- &buffer) == BLK_NEEDS_REDO)
- {
page = BufferGetPage(buffer);
- if (PageAddItem(page, (Item) datapos, datalen,
- ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+ if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
false, false) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
@@ -215,15 +187,13 @@ btree_xlog_insert(bool isleaf, bool ismeta,
* obsolete link from the metapage.
*/
if (ismeta)
- _bt_restore_meta(xlrec->target.node, lsn,
- md.root, md.level,
- md.fastroot, md.fastlevel);
+ _bt_restore_meta(record, 2);
}
static void
-btree_xlog_split(bool onleft, bool isroot,
- XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
bool isleaf = (xlrec->level == 0);
Buffer lbuf;
@@ -231,56 +201,17 @@ btree_xlog_split(bool onleft, bool isroot,
Page rpage;
BTPageOpaque ropaque;
char *datapos;
- int datalen;
- OffsetNumber newitemoff = 0;
- Item newitem = NULL;
- Size newitemsz = 0;
+ Size datalen;
Item left_hikey = NULL;
Size left_hikeysz = 0;
- BlockNumber cblkno = InvalidBlockNumber;
-
- datapos = (char *) xlrec + SizeOfBtreeSplit;
- datalen = record->xl_len - SizeOfBtreeSplit;
-
- /* Extract newitemoff and newitem, if present */
- if (onleft)
- {
- memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
- datapos += sizeof(OffsetNumber);
- datalen -= sizeof(OffsetNumber);
- }
- if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
- {
- /*
- * We assume that 16-bit alignment is enough to apply IndexTupleSize
- * (since it's fetching from a uint16 field) and also enough for
- * PageAddItem to insert the tuple.
- */
- newitem = (Item) datapos;
- newitemsz = MAXALIGN(IndexTupleSize(newitem));
- datapos += newitemsz;
- datalen -= newitemsz;
- }
-
- /* Extract left hikey and its size (still assuming 16-bit alignment) */
- if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(0)))
- {
- left_hikey = (Item) datapos;
- left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
- datapos += left_hikeysz;
- datalen -= left_hikeysz;
- }
+ BlockNumber leftsib;
+ BlockNumber rightsib;
+ BlockNumber rnext;
- /*
- * If this insertion finishes an incomplete split, get the block number of
- * the child.
- */
- if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(1)))
- {
- memcpy(&cblkno, datapos, sizeof(BlockNumber));
- datapos += sizeof(BlockNumber);
- datalen -= sizeof(BlockNumber);
- }
+ XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
+ XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
+ if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
+ rnext = P_NONE;
/*
* Clear the incomplete split flag on the left sibling of the child page
@@ -288,18 +219,18 @@ btree_xlog_split(bool onleft, bool isroot,
* before locking the other pages)
*/
if (!isleaf)
- _bt_clear_incomplete_split(lsn, record, 1, xlrec->node, cblkno);
+ _bt_clear_incomplete_split(record, 3);
/* Reconstruct right (new) sibling page from scratch */
- rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
- Assert(BufferIsValid(rbuf));
+ rbuf = XLogInitBufferForRedo(record, 1);
+ datapos = XLogRecGetBlockData(record, 1, &datalen);
rpage = (Page) BufferGetPage(rbuf);
_bt_pageinit(rpage, BufferGetPageSize(rbuf));
ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
- ropaque->btpo_prev = xlrec->leftsib;
- ropaque->btpo_next = xlrec->rnext;
+ ropaque->btpo_prev = leftsib;
+ ropaque->btpo_next = rnext;
ropaque->btpo.level = xlrec->level;
ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0;
@@ -324,8 +255,7 @@ btree_xlog_split(bool onleft, bool isroot,
/* don't release the buffer yet; we touch right page's first item below */
/* Now reconstruct left (original) sibling page */
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->leftsib,
- &lbuf) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
{
/*
* To retain the same physical order of the tuples that they had, we
@@ -339,9 +269,31 @@ btree_xlog_split(bool onleft, bool isroot,
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
OffsetNumber off;
+ Item newitem;
+ Size newitemsz = 0;
Page newlpage;
OffsetNumber leftoff;
+ datapos = XLogRecGetBlockData(record, 0, &datalen);
+
+ if (onleft)
+ {
+ newitem = (Item) datapos;
+ newitemsz = MAXALIGN(IndexTupleSize(newitem));
+ datapos += newitemsz;
+ datalen -= newitemsz;
+ }
+
+ /* Extract left hikey and its size (assuming 16-bit alignment) */
+ if (!isleaf)
+ {
+ left_hikey = (Item) datapos;
+ left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
+ datapos += left_hikeysz;
+ datalen -= left_hikeysz;
+ }
+ Assert(datalen == 0);
+
newlpage = PageGetTempPageCopySpecial(lpage);
/* Set high key */
@@ -358,7 +310,7 @@ btree_xlog_split(bool onleft, bool isroot,
Item item;
/* add the new item if it was inserted on left page */
- if (onleft && off == newitemoff)
+ if (onleft && off == xlrec->newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
@@ -376,7 +328,7 @@ btree_xlog_split(bool onleft, bool isroot,
}
/* cope with possibility that newitem goes at the end */
- if (onleft && off == newitemoff)
+ if (onleft && off == xlrec->newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
@@ -390,7 +342,7 @@ btree_xlog_split(bool onleft, bool isroot,
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
lopaque->btpo_flags |= BTP_LEAF;
- lopaque->btpo_next = xlrec->rightsib;
+ lopaque->btpo_next = rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
@@ -410,22 +362,16 @@ btree_xlog_split(bool onleft, bool isroot,
* replay, because no other index update can be in progress, and readers
* will cope properly when following an obsolete left-link.
*/
- if (xlrec->rnext != P_NONE)
+ if (rnext != P_NONE)
{
- /*
- * the backup block containing right sibling is 1 or 2, depending
- * whether this was a leaf or internal page.
- */
- int rnext_index = isleaf ? 1 : 2;
Buffer buffer;
- if (XLogReadBufferForRedo(lsn, record, rnext_index, xlrec->node,
- xlrec->rnext, &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- pageop->btpo_prev = xlrec->rightsib;
+ pageop->btpo_prev = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
@@ -436,8 +382,9 @@ btree_xlog_split(bool onleft, bool isroot,
}
static void
-btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_vacuum(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
Buffer buffer;
Page page;
@@ -466,9 +413,13 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
*/
if (HotStandbyActiveInReplay())
{
+ RelFileNode thisrnode;
+ BlockNumber thisblkno;
BlockNumber blkno;
- for (blkno = xlrec->lastBlockVacuumed + 1; blkno < xlrec->block; blkno++)
+ XLogRecGetBlockTag(record, 0, &thisrnode, NULL, &thisblkno);
+
+ for (blkno = xlrec->lastBlockVacuumed + 1; blkno < thisblkno; blkno++)
{
/*
* We use RBM_NORMAL_NO_LOG mode because it's not an error
@@ -483,7 +434,7 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
* buffer manager we could optimise this so that if the block is
* not in shared_buffers we confirm it as unpinned.
*/
- buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno,
+ buffer = XLogReadBufferExtended(thisrnode, MAIN_FORKNUM, blkno,
RBM_NORMAL_NO_LOG);
if (BufferIsValid(buffer))
{
@@ -497,20 +448,23 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
* Like in btvacuumpage(), we need to take a cleanup lock on every leaf
* page. See nbtree/README for details.
*/
- if (XLogReadBufferForRedoExtended(lsn, record, 0,
- xlrec->node, MAIN_FORKNUM, xlrec->block,
- RBM_NORMAL, true, &buffer)
+ if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
== BLK_NEEDS_REDO)
{
+ char *ptr;
+ Size len;
+
+ ptr = XLogRecGetBlockData(record, 0, &len);
+
page = (Page) BufferGetPage(buffer);
- if (record->xl_len > SizeOfBtreeVacuum)
+ if (len > 0)
{
OffsetNumber *unused;
OffsetNumber *unend;
- unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
- unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
+ unused = (OffsetNumber *) ptr;
+ unend = (OffsetNumber *) ((char *) ptr + len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
@@ -542,13 +496,16 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
* XXX optimise later with something like XLogPrefetchBuffer()
*/
static TransactionId
-btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
+btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
{
+ xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
OffsetNumber *unused;
Buffer ibuffer,
hbuffer;
Page ipage,
hpage;
+ RelFileNode rnode;
+ BlockNumber blkno;
ItemId iitemid,
hitemid;
IndexTuple itup;
@@ -588,9 +545,11 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
* InvalidTransactionId to cancel all HS transactions. That's probably
* overkill, but it's safe, and certainly better than panicking here.
*/
- ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+ ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
if (!BufferIsValid(ibuffer))
return InvalidTransactionId;
+ LockBuffer(ibuffer, BT_READ);
ipage = (Page) BufferGetPage(ibuffer);
/*
@@ -611,12 +570,13 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
* Locate the heap page that the index tuple points at
*/
hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
- hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false);
+ hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL);
if (!BufferIsValid(hbuffer))
{
UnlockReleaseBuffer(ibuffer);
return InvalidTransactionId;
}
+ LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
hpage = (Page) BufferGetPage(hbuffer);
/*
@@ -678,8 +638,9 @@ btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
}
static void
-btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_delete(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
@@ -698,21 +659,23 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
*/
if (InHotStandby)
{
- TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
+ TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
+ RelFileNode rnode;
- ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+
+ ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
}
/*
* We don't need to take a cleanup lock to apply these changes. See
* nbtree/README for details.
*/
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
- if (record->xl_len > SizeOfBtreeDelete)
+ if (XLogRecGetDataLen(record) > SizeOfBtreeDelete)
{
OffsetNumber *unused;
@@ -736,17 +699,15 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
}
static void
-btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
- BlockNumber parent;
Buffer buffer;
Page page;
BTPageOpaque pageop;
IndexTupleData trunctuple;
- parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
-
/*
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
@@ -756,8 +717,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
/* parent page */
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, parent,
- &buffer) == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
OffsetNumber poffset;
ItemId itemid;
@@ -768,7 +728,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ poffset = xlrec->poffset;
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
@@ -788,8 +748,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
/* Rewrite the leaf page as a halfdead page */
- buffer = XLogReadBuffer(xlrec->target.node, xlrec->leafblk, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
@@ -822,17 +781,16 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
static void
-btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
- BlockNumber target;
BlockNumber leftsib;
BlockNumber rightsib;
Buffer buffer;
Page page;
BTPageOpaque pageop;
- target = xlrec->deadblk;
leftsib = xlrec->leftsib;
rightsib = xlrec->rightsib;
@@ -845,8 +803,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
/* Fix left-link of right sibling */
- if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, rightsib, &buffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -861,8 +818,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
/* Fix right-link of left sibling, if any */
if (leftsib != P_NONE)
{
- if (XLogReadBufferForRedo(lsn, record, 1, xlrec->node, leftsib, &buffer)
- == BLK_NEEDS_REDO)
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -876,8 +832,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
}
/* Rewrite target page as empty deleted page */
- buffer = XLogReadBuffer(xlrec->node, target, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
@@ -898,7 +853,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
* itself, update the leaf to point to the next remaining child in the
* branch.
*/
- if (target != xlrec->leafblk)
+ if (XLogRecHasBlockRef(record, 3))
{
/*
* There is no real data on the page, so we just re-create it from
@@ -906,8 +861,7 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
IndexTupleData trunctuple;
- buffer = XLogReadBuffer(xlrec->node, xlrec->leafblk, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 3);
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -936,27 +890,21 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
/* Update metapage if needed */
if (info == XLOG_BTREE_UNLINK_PAGE_META)
- {
- xl_btree_metadata md;
-
- memcpy(&md, (char *) xlrec + SizeOfBtreeUnlinkPage,
- sizeof(xl_btree_metadata));
- _bt_restore_meta(xlrec->node, lsn,
- md.root, md.level,
- md.fastroot, md.fastlevel);
- }
+ _bt_restore_meta(record, 4);
}
static void
-btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_newroot(XLogReaderState *record)
{
+ XLogRecPtr lsn = record->EndRecPtr;
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque pageop;
+ char *ptr;
+ Size len;
- buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
- Assert(BufferIsValid(buffer));
+ buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
@@ -969,34 +917,24 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
pageop->btpo_flags |= BTP_LEAF;
pageop->btpo_cycleid = 0;
- if (record->xl_len > SizeOfBtreeNewroot)
+ if (xlrec->level > 0)
{
- IndexTuple itup;
- BlockNumber cblkno;
-
- _bt_restore_page(page,
- (char *) xlrec + SizeOfBtreeNewroot,
- record->xl_len - SizeOfBtreeNewroot);
- /* extract block number of the left-hand split page */
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY));
- cblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ ptr = XLogRecGetBlockData(record, 0, &len);
+ _bt_restore_page(page, ptr, len);
/* Clear the incomplete-split flag in left child */
- _bt_clear_incomplete_split(lsn, record, 0, xlrec->node, cblkno);
+ _bt_clear_incomplete_split(record, 1);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
- _bt_restore_meta(xlrec->node, lsn,
- xlrec->rootblk, xlrec->level,
- xlrec->rootblk, xlrec->level);
+ _bt_restore_meta(record, 2);
}
static void
-btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
+btree_xlog_reuse_page(XLogReaderState *record)
{
xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
@@ -1015,58 +953,55 @@ btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
xlrec->node);
}
-
- /* Backup blocks are not used in reuse_page records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
}
void
-btree_redo(XLogRecPtr lsn, XLogRecord *record)
+btree_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info)
{
case XLOG_BTREE_INSERT_LEAF:
- btree_xlog_insert(true, false, lsn, record);
+ btree_xlog_insert(true, false, record);
break;
case XLOG_BTREE_INSERT_UPPER:
- btree_xlog_insert(false, false, lsn, record);
+ btree_xlog_insert(false, false, record);
break;
case XLOG_BTREE_INSERT_META:
- btree_xlog_insert(false, true, lsn, record);
+ btree_xlog_insert(false, true, record);
break;
case XLOG_BTREE_SPLIT_L:
- btree_xlog_split(true, false, lsn, record);
+ btree_xlog_split(true, false, record);
break;
case XLOG_BTREE_SPLIT_R:
- btree_xlog_split(false, false, lsn, record);
+ btree_xlog_split(false, false, record);
break;
case XLOG_BTREE_SPLIT_L_ROOT:
- btree_xlog_split(true, true, lsn, record);
+ btree_xlog_split(true, true, record);
break;
case XLOG_BTREE_SPLIT_R_ROOT:
- btree_xlog_split(false, true, lsn, record);
+ btree_xlog_split(false, true, record);
break;
case XLOG_BTREE_VACUUM:
- btree_xlog_vacuum(lsn, record);
+ btree_xlog_vacuum(record);
break;
case XLOG_BTREE_DELETE:
- btree_xlog_delete(lsn, record);
+ btree_xlog_delete(record);
break;
case XLOG_BTREE_MARK_PAGE_HALFDEAD:
- btree_xlog_mark_page_halfdead(info, lsn, record);
+ btree_xlog_mark_page_halfdead(info, record);
break;
case XLOG_BTREE_UNLINK_PAGE:
case XLOG_BTREE_UNLINK_PAGE_META:
- btree_xlog_unlink_page(info, lsn, record);
+ btree_xlog_unlink_page(info, record);
break;
case XLOG_BTREE_NEWROOT:
- btree_xlog_newroot(lsn, record);
+ btree_xlog_newroot(record);
break;
case XLOG_BTREE_REUSE_PAGE:
- btree_xlog_reuse_page(lsn, record);
+ btree_xlog_reuse_page(record);
break;
default:
elog(PANIC, "btree_redo: unknown op code %u", info);