diff options
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 243 |
1 files changed, 125 insertions, 118 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index 21a071ab199..1a17cc467ed 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -16,8 +16,8 @@ #include "postgres.h" #include "access/genam.h" -#include "access/xloginsert.h" #include "access/spgist_private.h" +#include "access/xloginsert.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/rel.h" @@ -202,25 +202,17 @@ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew) { - XLogRecData rdata[4]; spgxlogAddLeaf xlrec; - xlrec.node = index->rd_node; - xlrec.blknoLeaf = current->blkno; xlrec.newPage = isNew; xlrec.storesNulls = isNulls; /* these will be filled below as needed */ xlrec.offnumLeaf = InvalidOffsetNumber; xlrec.offnumHeadLeaf = InvalidOffsetNumber; - xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1); - ACCEPT_RDATA_BUFFER(current->buffer, 2); - START_CRIT_SECTION(); if (current->offnum == InvalidOffsetNumber || @@ -237,13 +229,10 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, /* Must update parent's downlink if any */ if (parent->buffer != InvalidBuffer) { - xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - - ACCEPT_RDATA_BUFFER(parent->buffer, 3); } } else @@ -303,12 +292,20 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) leafTuple, leafTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (xlrec.offnumParent != InvalidOffsetNumber) + XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF); PageSetLSN(current->page, recptr); /* update parent only if we actually changed it */ - if (xlrec.blknoParent != InvalidBlockNumber) + if (xlrec.offnumParent != InvalidOffsetNumber) { PageSetLSN(parent->page, recptr); } @@ -399,7 +396,6 @@ moveLeafs(Relation index, SpGistState *state, OffsetNumber *toDelete; OffsetNumber *toInsert; BlockNumber nblkno; - XLogRecData rdata[7]; spgxlogMoveLeafs xlrec; char *leafdata, *leafptr; @@ -455,20 +451,6 @@ moveLeafs(Relation index, SpGistState *state, nblkno = BufferGetBlockNumber(nbuf); Assert(nblkno != current->blkno); - /* prepare WAL info */ - xlrec.node = index->rd_node; - STORE_STATE(state, xlrec.stateSrc); - - xlrec.blknoSrc = current->blkno; - xlrec.blknoDst = nblkno; - xlrec.nMoves = nDelete; - xlrec.replaceDead = replaceDead; - xlrec.storesNulls = isNulls; - - xlrec.blknoParent = parent->blkno; - xlrec.offnumParent = parent->offnum; - xlrec.nodeI = parent->node; - leafdata = leafptr = palloc(size); START_CRIT_SECTION(); @@ -533,15 +515,29 @@ moveLeafs(Relation index, SpGistState *state, { XLogRecPtr recptr; - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogMoveLeafs, 0); - ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * nDelete, 1); - ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nInsert, 2); - ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3); - ACCEPT_RDATA_BUFFER(current->buffer, 4); - ACCEPT_RDATA_BUFFER(nbuf, 5); - ACCEPT_RDATA_BUFFER(parent->buffer, 6); + /* prepare WAL info */ + STORE_STATE(state, xlrec.stateSrc); - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata); + xlrec.nMoves = nDelete; + xlrec.replaceDead = replaceDead; + xlrec.storesNulls = isNulls; + + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs); + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * nInsert); + XLogRegisterData((char *) leafdata, leafptr - leafdata); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0)); + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS); PageSetLSN(current->page, recptr); PageSetLSN(npage, recptr); @@ -701,8 +697,6 @@ doPickSplit(Relation index, SpGistState *state, int currentFreeSpace; int totalLeafSizes; bool allTheSame; - XLogRecData rdata[10]; - int nRdata; spgxlogPickSplit xlrec; char *leafdata, *leafptr; @@ -725,7 +719,6 @@ doPickSplit(Relation index, SpGistState *state, newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); - xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); /* @@ -971,10 +964,6 @@ doPickSplit(Relation index, SpGistState *state, } /* - * Because a WAL record can't involve more than four buffers, we can only - * afford to deal with two leaf pages in each picksplit action, ie the - * current page and at most one other. - * * The new leaf tuples converted from the existing ones should require the * same or less space, and therefore should all fit onto one page * (although that's not necessarily the current page, since we can't @@ -1108,17 +1097,13 @@ doPickSplit(Relation index, SpGistState *state, } /* Start preparing WAL record */ - xlrec.blknoSrc = current->blkno; - xlrec.blknoDest = InvalidBlockNumber; xlrec.nDelete = 0; xlrec.initSrc = isNew; xlrec.storesNulls = isNulls; + xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno); leafdata = leafptr = (char *) palloc(totalLeafSizes); - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogPickSplit, 0); - nRdata = 1; - /* Here we begin making the changes to the target pages */ START_CRIT_SECTION(); @@ -1150,12 +1135,6 @@ doPickSplit(Relation index, SpGistState *state, else { xlrec.nDelete = nToDelete; - ACCEPT_RDATA_DATA(toDelete, - sizeof(OffsetNumber) * nToDelete, - nRdata); - nRdata++; - ACCEPT_RDATA_BUFFER(current->buffer, nRdata); - nRdata++; if (!state->isBuild) { @@ -1240,25 +1219,8 @@ doPickSplit(Relation index, SpGistState *state, if (newLeafBuffer != InvalidBuffer) { MarkBufferDirty(newLeafBuffer); - /* also save block number for WAL */ - xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer); - if (!xlrec.initDest) - { - ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata); - nRdata++; - } } - xlrec.nInsert = nToInsert; - ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nToInsert, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(leafPageSelect, sizeof(uint8) * nToInsert, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata); - nRdata++; - /* Remember current buffer, since we're about to change "current" */ saveCurrent = *current; @@ -1276,7 +1238,6 @@ doPickSplit(Relation index, SpGistState *state, current->blkno = parent->blkno; current->buffer = parent->buffer; current->page = parent->page; - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, @@ -1285,14 +1246,11 @@ doPickSplit(Relation index, SpGistState *state, /* * Update parent node link and mark parent page dirty */ - xlrec.blknoParent = parent->blkno; + xlrec.innerIsParent = true; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); - nRdata++; - /* * Update redirection link (in old current buffer) */ @@ -1314,7 +1272,6 @@ doPickSplit(Relation index, SpGistState *state, current->buffer = newInnerBuffer; current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, @@ -1326,16 +1283,11 @@ doPickSplit(Relation index, SpGistState *state, /* * Update parent node link and mark parent page dirty */ - xlrec.blknoParent = parent->blkno; + xlrec.innerIsParent = (parent->buffer == current->buffer); xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - ACCEPT_RDATA_BUFFER(current->buffer, nRdata); - nRdata++; - ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); - nRdata++; - /* * Update redirection link (in old current buffer) */ @@ -1357,8 +1309,8 @@ doPickSplit(Relation index, SpGistState *state, SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); xlrec.initInner = true; + xlrec.innerIsParent = false; - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = PageAddItem(current->page, (Item) innerTuple, innerTuple->size, InvalidOffsetNumber, false, false); @@ -1367,7 +1319,6 @@ doPickSplit(Relation index, SpGistState *state, innerTuple->size); /* No parent link to update, nor redirection to do */ - xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; @@ -1381,9 +1332,46 @@ doPickSplit(Relation index, SpGistState *state, if (RelationNeedsWAL(index)) { XLogRecPtr recptr; + int flags; + + XLogBeginInsert(); + + xlrec.nInsert = nToInsert; + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit); + + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * xlrec.nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * xlrec.nInsert); + XLogRegisterData((char *) leafPageSelect, + sizeof(uint8) * xlrec.nInsert); + XLogRegisterData((char *) innerTuple, innerTuple->size); + XLogRegisterData(leafdata, leafptr - leafdata); + + flags = REGBUF_STANDARD; + if (xlrec.initSrc) + flags |= REGBUF_WILL_INIT; + if (BufferIsValid(saveCurrent.buffer)) + XLogRegisterBuffer(0, saveCurrent.buffer, flags); + + if (BufferIsValid(newLeafBuffer)) + { + flags = REGBUF_STANDARD; + if (xlrec.initDest) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newLeafBuffer, flags); + } + XLogRegisterBuffer(2, current->buffer, REGBUF_STANDARD); + if (parent->buffer != InvalidBuffer) + { + if (parent->buffer != current->buffer) + XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD); + else + Assert(xlrec.innerIsParent); + } /* Issue the WAL record */ - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata); + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT); /* Update page LSNs on all affected pages */ if (newLeafBuffer != InvalidBuffer) @@ -1489,7 +1477,6 @@ spgAddNodeAction(Relation index, SpGistState *state, int nodeN, Datum nodeLabel) { SpGistInnerTuple newInnerTuple; - XLogRecData rdata[5]; spgxlogAddNode xlrec; /* Should not be applied to nulls */ @@ -1499,25 +1486,18 @@ spgAddNodeAction(Relation index, SpGistState *state, newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); /* Prepare WAL record */ - xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); - xlrec.blkno = current->blkno; xlrec.offnum = current->offnum; /* we don't fill these unless we need to change the parent downlink */ - xlrec.blknoParent = InvalidBlockNumber; + xlrec.parentBlk = -1; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; /* we don't fill these unless tuple has to be moved */ - xlrec.blknoNew = InvalidBlockNumber; xlrec.offnumNew = InvalidOffsetNumber; xlrec.newPage = false; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1); - ACCEPT_RDATA_BUFFER(current->buffer, 2); - if (PageGetExactFreeSpace(current->page) >= newInnerTuple->size - innerTuple->size) { @@ -1539,7 +1519,13 @@ spgAddNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); PageSetLSN(current->page, recptr); } @@ -1565,7 +1551,6 @@ spgAddNodeAction(Relation index, SpGistState *state, saveCurrent = *current; - xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; @@ -1580,8 +1565,6 @@ spgAddNodeAction(Relation index, SpGistState *state, current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); - xlrec.blknoNew = current->blkno; - /* * Let's just make real sure new current isn't same as old. Right now * that's impossible, but if SpGistGetBuffer ever got smart enough to @@ -1590,17 +1573,19 @@ spgAddNodeAction(Relation index, SpGistState *state, * replay would be subtly wrong, so I think a mere assert isn't enough * here. */ - if (xlrec.blknoNew == xlrec.blkno) + if (current->blkno == saveCurrent.blkno) elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer"); /* * New current and parent buffer will both be modified; but note that * parent buffer could be same as either new or old current. */ - ACCEPT_RDATA_BUFFER(current->buffer, 3); - if (parent->buffer != current->buffer && - parent->buffer != saveCurrent.buffer) - ACCEPT_RDATA_BUFFER(parent->buffer, 4); + if (parent->buffer == saveCurrent.buffer) + xlrec.parentBlk = 0; + else if (parent->buffer == current->buffer) + xlrec.parentBlk = 1; + else + xlrec.parentBlk = 2; START_CRIT_SECTION(); @@ -1647,7 +1632,20 @@ spgAddNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); + XLogBeginInsert(); + + /* orig page */ + XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD); + /* new page */ + XLogRegisterBuffer(1, current->buffer, REGBUF_STANDARD); + /* parent page (if different from orig and new) */ + if (xlrec.parentBlk == 2) + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); /* we don't bother to check if any of these are redundant */ PageSetLSN(current->page, recptr); @@ -1682,7 +1680,6 @@ spgSplitNodeAction(Relation index, SpGistState *state, BlockNumber postfixBlkno; OffsetNumber postfixOffset; int i; - XLogRecData rdata[5]; spgxlogSplitTuple xlrec; Buffer newBuffer = InvalidBuffer; @@ -1725,14 +1722,8 @@ spgSplitNodeAction(Relation index, SpGistState *state, postfixTuple->allTheSame = innerTuple->allTheSame; /* prep data for WAL record */ - xlrec.node = index->rd_node; xlrec.newPage = false; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1); - ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2); - ACCEPT_RDATA_BUFFER(current->buffer, 3); - /* * If we can't fit both tuples on the current page, get a new page for the * postfix tuple. In particular, can't split to the root page. @@ -1752,7 +1743,6 @@ spgSplitNodeAction(Relation index, SpGistState *state, GBUF_INNER_PARITY(current->blkno + 1), postfixTuple->size + sizeof(ItemIdData), &xlrec.newPage); - ACCEPT_RDATA_BUFFER(newBuffer, 4); } START_CRIT_SECTION(); @@ -1767,27 +1757,28 @@ spgSplitNodeAction(Relation index, SpGistState *state, if (xlrec.offnumPrefix != current->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTuple->size); - xlrec.blknoPrefix = current->blkno; /* * put postfix tuple into appropriate page */ if (newBuffer == InvalidBuffer) { - xlrec.blknoPostfix = postfixBlkno = current->blkno; + postfixBlkno = current->blkno; xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, current->page, (Item) postfixTuple, postfixTuple->size, NULL, false); + xlrec.postfixBlkSame = true; } else { - xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer); + postfixBlkno = BufferGetBlockNumber(newBuffer); xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, BufferGetPage(newBuffer), (Item) postfixTuple, postfixTuple->size, NULL, false); MarkBufferDirty(newBuffer); + xlrec.postfixBlkSame = false; } /* @@ -1808,7 +1799,23 @@ spgSplitNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) prefixTuple, prefixTuple->size); + XLogRegisterData((char *) postfixTuple, postfixTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (newBuffer != InvalidBuffer) + { + int flags; + + flags = REGBUF_STANDARD; + if (xlrec.newPage) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newBuffer, flags); + } + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE); PageSetLSN(current->page, recptr); |