diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 17:56:26 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 18:46:41 +0200 |
commit | 2c03216d831160bedd72d45f712601b6f7d03f1c (patch) | |
tree | ab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/spgist | |
parent | 8dc626defec23016dd5988208d8704b858b9d21d (diff) | |
download | postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip |
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and
block(s) in a standardized format. That makes it easier to write tools that
need that information, like pg_rewind, prefetching the blocks to speed up
recovery, etc.
There's a whole new API for building WAL records, replacing the XLogRecData
chains used previously. The new API consists of XLogRegister* functions,
which are called for each buffer and chunk of data that is added to the
record. The new API also gives more control over when a full-page image is
written, by passing flags to the XLogRegisterBuffer function.
This also simplifies the XLogReadBufferForRedo() calls. The function can dig
the relation and block number from the WAL record, so they no longer need to
be passed as arguments.
For the convenience of redo routines, XLogReader now disects each WAL record
after reading it, copying the main data part and the per-block data into
MAXALIGNed buffers. The data chunks are not aligned within the WAL record,
but the redo routines can assume that the pointers returned by XLogRecGet*
functions are. Redo routines are now passed the XLogReaderState, which
contains the record in the already-disected format, instead of the plain
XLogRecord.
The new record format also makes the fixed size XLogRecord header smaller,
by removing the xl_len field. The length of the "main data" portion is now
stored at the end of the WAL record, and there's a separate header after
XLogRecord for it. The alignment padding at the end of XLogRecord is also
removed. This compansates for the fact that the new format would otherwise
be more bulky than the old format.
Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera,
Fujii Masao.
Diffstat (limited to 'src/backend/access/spgist')
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 243 | ||||
-rw-r--r-- | src/backend/access/spgist/spginsert.c | 17 | ||||
-rw-r--r-- | src/backend/access/spgist/spgvacuum.c | 72 | ||||
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 335 |
4 files changed, 325 insertions, 342 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index 21a071ab199..1a17cc467ed 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -16,8 +16,8 @@ #include "postgres.h" #include "access/genam.h" -#include "access/xloginsert.h" #include "access/spgist_private.h" +#include "access/xloginsert.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/rel.h" @@ -202,25 +202,17 @@ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew) { - XLogRecData rdata[4]; spgxlogAddLeaf xlrec; - xlrec.node = index->rd_node; - xlrec.blknoLeaf = current->blkno; xlrec.newPage = isNew; xlrec.storesNulls = isNulls; /* these will be filled below as needed */ xlrec.offnumLeaf = InvalidOffsetNumber; xlrec.offnumHeadLeaf = InvalidOffsetNumber; - xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1); - ACCEPT_RDATA_BUFFER(current->buffer, 2); - START_CRIT_SECTION(); if (current->offnum == InvalidOffsetNumber || @@ -237,13 +229,10 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, /* Must update parent's downlink if any */ if (parent->buffer != InvalidBuffer) { - xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - - ACCEPT_RDATA_BUFFER(parent->buffer, 3); } } else @@ -303,12 +292,20 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) leafTuple, leafTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (xlrec.offnumParent != InvalidOffsetNumber) + XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF); PageSetLSN(current->page, recptr); /* update parent only if we actually changed it */ - if (xlrec.blknoParent != InvalidBlockNumber) + if (xlrec.offnumParent != InvalidOffsetNumber) { PageSetLSN(parent->page, recptr); } @@ -399,7 +396,6 @@ moveLeafs(Relation index, SpGistState *state, OffsetNumber *toDelete; OffsetNumber *toInsert; BlockNumber nblkno; - XLogRecData rdata[7]; spgxlogMoveLeafs xlrec; char *leafdata, *leafptr; @@ -455,20 +451,6 @@ moveLeafs(Relation index, SpGistState *state, nblkno = BufferGetBlockNumber(nbuf); Assert(nblkno != current->blkno); - /* prepare WAL info */ - xlrec.node = index->rd_node; - STORE_STATE(state, xlrec.stateSrc); - - xlrec.blknoSrc = current->blkno; - xlrec.blknoDst = nblkno; - xlrec.nMoves = nDelete; - xlrec.replaceDead = replaceDead; - xlrec.storesNulls = isNulls; - - xlrec.blknoParent = parent->blkno; - xlrec.offnumParent = parent->offnum; - xlrec.nodeI = parent->node; - leafdata = leafptr = palloc(size); START_CRIT_SECTION(); @@ -533,15 +515,29 @@ moveLeafs(Relation index, SpGistState *state, { XLogRecPtr recptr; - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogMoveLeafs, 0); - ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * nDelete, 1); - ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nInsert, 2); - ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3); - ACCEPT_RDATA_BUFFER(current->buffer, 4); - ACCEPT_RDATA_BUFFER(nbuf, 5); - ACCEPT_RDATA_BUFFER(parent->buffer, 6); + /* prepare WAL info */ + STORE_STATE(state, xlrec.stateSrc); - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata); + xlrec.nMoves = nDelete; + xlrec.replaceDead = replaceDead; + xlrec.storesNulls = isNulls; + + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs); + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * nInsert); + XLogRegisterData((char *) leafdata, leafptr - leafdata); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0)); + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS); PageSetLSN(current->page, recptr); PageSetLSN(npage, recptr); @@ -701,8 +697,6 @@ doPickSplit(Relation index, SpGistState *state, int currentFreeSpace; int totalLeafSizes; bool allTheSame; - XLogRecData rdata[10]; - int nRdata; spgxlogPickSplit xlrec; char *leafdata, *leafptr; @@ -725,7 +719,6 @@ doPickSplit(Relation index, SpGistState *state, newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); - xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); /* @@ -971,10 +964,6 @@ doPickSplit(Relation index, SpGistState *state, } /* - * Because a WAL record can't involve more than four buffers, we can only - * afford to deal with two leaf pages in each picksplit action, ie the - * current page and at most one other. - * * The new leaf tuples converted from the existing ones should require the * same or less space, and therefore should all fit onto one page * (although that's not necessarily the current page, since we can't @@ -1108,17 +1097,13 @@ doPickSplit(Relation index, SpGistState *state, } /* Start preparing WAL record */ - xlrec.blknoSrc = current->blkno; - xlrec.blknoDest = InvalidBlockNumber; xlrec.nDelete = 0; xlrec.initSrc = isNew; xlrec.storesNulls = isNulls; + xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno); leafdata = leafptr = (char *) palloc(totalLeafSizes); - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogPickSplit, 0); - nRdata = 1; - /* Here we begin making the changes to the target pages */ START_CRIT_SECTION(); @@ -1150,12 +1135,6 @@ doPickSplit(Relation index, SpGistState *state, else { xlrec.nDelete = nToDelete; - ACCEPT_RDATA_DATA(toDelete, - sizeof(OffsetNumber) * nToDelete, - nRdata); - nRdata++; - ACCEPT_RDATA_BUFFER(current->buffer, nRdata); - nRdata++; if (!state->isBuild) { @@ -1240,25 +1219,8 @@ doPickSplit(Relation index, SpGistState *state, if (newLeafBuffer != InvalidBuffer) { MarkBufferDirty(newLeafBuffer); - /* also save block number for WAL */ - xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer); - if (!xlrec.initDest) - { - ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata); - nRdata++; - } } - xlrec.nInsert = nToInsert; - ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nToInsert, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(leafPageSelect, sizeof(uint8) * nToInsert, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata); - nRdata++; - /* Remember current buffer, since we're about to change "current" */ saveCurrent = *current; @@ -1276,7 +1238,6 @@ doPickSplit(Relation index, SpGistState *state, current->blkno = parent->blkno; current->buffer = parent->buffer; current->page = parent->page; - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, @@ -1285,14 +1246,11 @@ doPickSplit(Relation index, SpGistState *state, /* * Update parent node link and mark parent page dirty */ - xlrec.blknoParent = parent->blkno; + xlrec.innerIsParent = true; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); - nRdata++; - /* * Update redirection link (in old current buffer) */ @@ -1314,7 +1272,6 @@ doPickSplit(Relation index, SpGistState *state, current->buffer = newInnerBuffer; current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, @@ -1326,16 +1283,11 @@ doPickSplit(Relation index, SpGistState *state, /* * Update parent node link and mark parent page dirty */ - xlrec.blknoParent = parent->blkno; + xlrec.innerIsParent = (parent->buffer == current->buffer); xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - ACCEPT_RDATA_BUFFER(current->buffer, nRdata); - nRdata++; - ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); - nRdata++; - /* * Update redirection link (in old current buffer) */ @@ -1357,8 +1309,8 @@ doPickSplit(Relation index, SpGistState *state, SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); xlrec.initInner = true; + xlrec.innerIsParent = false; - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = PageAddItem(current->page, (Item) innerTuple, innerTuple->size, InvalidOffsetNumber, false, false); @@ -1367,7 +1319,6 @@ doPickSplit(Relation index, SpGistState *state, innerTuple->size); /* No parent link to update, nor redirection to do */ - xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; @@ -1381,9 +1332,46 @@ doPickSplit(Relation index, SpGistState *state, if (RelationNeedsWAL(index)) { XLogRecPtr recptr; + int flags; + + XLogBeginInsert(); + + xlrec.nInsert = nToInsert; + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit); + + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * xlrec.nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * xlrec.nInsert); + XLogRegisterData((char *) leafPageSelect, + sizeof(uint8) * xlrec.nInsert); + XLogRegisterData((char *) innerTuple, innerTuple->size); + XLogRegisterData(leafdata, leafptr - leafdata); + + flags = REGBUF_STANDARD; + if (xlrec.initSrc) + flags |= REGBUF_WILL_INIT; + if (BufferIsValid(saveCurrent.buffer)) + XLogRegisterBuffer(0, saveCurrent.buffer, flags); + + if (BufferIsValid(newLeafBuffer)) + { + flags = REGBUF_STANDARD; + if (xlrec.initDest) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newLeafBuffer, flags); + } + XLogRegisterBuffer(2, current->buffer, REGBUF_STANDARD); + if (parent->buffer != InvalidBuffer) + { + if (parent->buffer != current->buffer) + XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD); + else + Assert(xlrec.innerIsParent); + } /* Issue the WAL record */ - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata); + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT); /* Update page LSNs on all affected pages */ if (newLeafBuffer != InvalidBuffer) @@ -1489,7 +1477,6 @@ spgAddNodeAction(Relation index, SpGistState *state, int nodeN, Datum nodeLabel) { SpGistInnerTuple newInnerTuple; - XLogRecData rdata[5]; spgxlogAddNode xlrec; /* Should not be applied to nulls */ @@ -1499,25 +1486,18 @@ spgAddNodeAction(Relation index, SpGistState *state, newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); /* Prepare WAL record */ - xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); - xlrec.blkno = current->blkno; xlrec.offnum = current->offnum; /* we don't fill these unless we need to change the parent downlink */ - xlrec.blknoParent = InvalidBlockNumber; + xlrec.parentBlk = -1; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; /* we don't fill these unless tuple has to be moved */ - xlrec.blknoNew = InvalidBlockNumber; xlrec.offnumNew = InvalidOffsetNumber; xlrec.newPage = false; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1); - ACCEPT_RDATA_BUFFER(current->buffer, 2); - if (PageGetExactFreeSpace(current->page) >= newInnerTuple->size - innerTuple->size) { @@ -1539,7 +1519,13 @@ spgAddNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); PageSetLSN(current->page, recptr); } @@ -1565,7 +1551,6 @@ spgAddNodeAction(Relation index, SpGistState *state, saveCurrent = *current; - xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; @@ -1580,8 +1565,6 @@ spgAddNodeAction(Relation index, SpGistState *state, current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); - xlrec.blknoNew = current->blkno; - /* * Let's just make real sure new current isn't same as old. Right now * that's impossible, but if SpGistGetBuffer ever got smart enough to @@ -1590,17 +1573,19 @@ spgAddNodeAction(Relation index, SpGistState *state, * replay would be subtly wrong, so I think a mere assert isn't enough * here. */ - if (xlrec.blknoNew == xlrec.blkno) + if (current->blkno == saveCurrent.blkno) elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer"); /* * New current and parent buffer will both be modified; but note that * parent buffer could be same as either new or old current. */ - ACCEPT_RDATA_BUFFER(current->buffer, 3); - if (parent->buffer != current->buffer && - parent->buffer != saveCurrent.buffer) - ACCEPT_RDATA_BUFFER(parent->buffer, 4); + if (parent->buffer == saveCurrent.buffer) + xlrec.parentBlk = 0; + else if (parent->buffer == current->buffer) + xlrec.parentBlk = 1; + else + xlrec.parentBlk = 2; START_CRIT_SECTION(); @@ -1647,7 +1632,20 @@ spgAddNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); + XLogBeginInsert(); + + /* orig page */ + XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD); + /* new page */ + XLogRegisterBuffer(1, current->buffer, REGBUF_STANDARD); + /* parent page (if different from orig and new) */ + if (xlrec.parentBlk == 2) + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); /* we don't bother to check if any of these are redundant */ PageSetLSN(current->page, recptr); @@ -1682,7 +1680,6 @@ spgSplitNodeAction(Relation index, SpGistState *state, BlockNumber postfixBlkno; OffsetNumber postfixOffset; int i; - XLogRecData rdata[5]; spgxlogSplitTuple xlrec; Buffer newBuffer = InvalidBuffer; @@ -1725,14 +1722,8 @@ spgSplitNodeAction(Relation index, SpGistState *state, postfixTuple->allTheSame = innerTuple->allTheSame; /* prep data for WAL record */ - xlrec.node = index->rd_node; xlrec.newPage = false; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1); - ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2); - ACCEPT_RDATA_BUFFER(current->buffer, 3); - /* * If we can't fit both tuples on the current page, get a new page for the * postfix tuple. In particular, can't split to the root page. @@ -1752,7 +1743,6 @@ spgSplitNodeAction(Relation index, SpGistState *state, GBUF_INNER_PARITY(current->blkno + 1), postfixTuple->size + sizeof(ItemIdData), &xlrec.newPage); - ACCEPT_RDATA_BUFFER(newBuffer, 4); } START_CRIT_SECTION(); @@ -1767,27 +1757,28 @@ spgSplitNodeAction(Relation index, SpGistState *state, if (xlrec.offnumPrefix != current->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTuple->size); - xlrec.blknoPrefix = current->blkno; /* * put postfix tuple into appropriate page */ if (newBuffer == InvalidBuffer) { - xlrec.blknoPostfix = postfixBlkno = current->blkno; + postfixBlkno = current->blkno; xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, current->page, (Item) postfixTuple, postfixTuple->size, NULL, false); + xlrec.postfixBlkSame = true; } else { - xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer); + postfixBlkno = BufferGetBlockNumber(newBuffer); xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, BufferGetPage(newBuffer), (Item) postfixTuple, postfixTuple->size, NULL, false); MarkBufferDirty(newBuffer); + xlrec.postfixBlkSame = false; } /* @@ -1808,7 +1799,23 @@ spgSplitNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) prefixTuple, prefixTuple->size); + XLogRegisterData((char *) postfixTuple, postfixTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (newBuffer != InvalidBuffer) + { + int flags; + + flags = REGBUF_STANDARD; + if (xlrec.newPage) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newBuffer, flags); + } + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE); PageSetLSN(current->page, recptr); diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c index e1dfc8e3580..f168ac5c5cf 100644 --- a/src/backend/access/spgist/spginsert.c +++ b/src/backend/access/spgist/spginsert.c @@ -105,15 +105,18 @@ spgbuild(PG_FUNCTION_ARGS) if (RelationNeedsWAL(index)) { XLogRecPtr recptr; - XLogRecData rdata; - /* WAL data is just the relfilenode */ - rdata.data = (char *) &(index->rd_node); - rdata.len = sizeof(RelFileNode); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; + XLogBeginInsert(); - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX, &rdata); + /* + * Replay will re-initialize the pages, so don't take full pages + * images. No other data to log. + */ + XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT); + XLogRegisterBuffer(1, rootbuffer, REGBUF_WILL_INIT | REGBUF_STANDARD); + XLogRegisterBuffer(2, nullbuffer, REGBUF_WILL_INIT | REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX); PageSetLSN(BufferGetPage(metabuffer), recptr); PageSetLSN(BufferGetPage(rootbuffer), recptr); diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index 2e05d22b749..c95b80b5c7c 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -127,7 +127,6 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, { Page page = BufferGetPage(buffer); spgxlogVacuumLeaf xlrec; - XLogRecData rdata[8]; OffsetNumber toDead[MaxIndexTuplesPerPage]; OffsetNumber toPlaceholder[MaxIndexTuplesPerPage]; OffsetNumber moveSrc[MaxIndexTuplesPerPage]; @@ -323,20 +322,6 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove) elog(ERROR, "inconsistent counts of deletable tuples"); - /* Prepare WAL record */ - xlrec.node = index->rd_node; - xlrec.blkno = BufferGetBlockNumber(buffer); - STORE_STATE(&bds->spgstate, xlrec.stateSrc); - - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumLeaf, 0); - ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1); - ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2); - ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3); - ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4); - ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5); - ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6); - ACCEPT_RDATA_BUFFER(buffer, 7); - /* Do the updates */ START_CRIT_SECTION(); @@ -389,7 +374,22 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata); + XLogBeginInsert(); + + STORE_STATE(&bds->spgstate, xlrec.stateSrc); + + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf); + /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ + XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead); + XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder); + XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove); + XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove); + XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain); + XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF); PageSetLSN(page, recptr); } @@ -407,12 +407,10 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) { Page page = BufferGetPage(buffer); spgxlogVacuumRoot xlrec; - XLogRecData rdata[3]; OffsetNumber toDelete[MaxIndexTuplesPerPage]; OffsetNumber i, max = PageGetMaxOffsetNumber(page); - xlrec.blkno = BufferGetBlockNumber(buffer); xlrec.nDelete = 0; /* Scan page, identify tuples to delete, accumulate stats */ @@ -448,15 +446,6 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) if (xlrec.nDelete == 0) return; /* nothing more to do */ - /* Prepare WAL record */ - xlrec.node = index->rd_node; - STORE_STATE(&bds->spgstate, xlrec.stateSrc); - - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumRoot, 0); - /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ - ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1); - ACCEPT_RDATA_BUFFER(buffer, 2); - /* Do the update */ START_CRIT_SECTION(); @@ -469,7 +458,19 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata); + XLogBeginInsert(); + + /* Prepare WAL record */ + STORE_STATE(&bds->spgstate, xlrec.stateSrc); + + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot); + /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * xlrec.nDelete); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT); PageSetLSN(page, recptr); } @@ -499,10 +500,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage]; OffsetNumber itemnos[MaxIndexTuplesPerPage]; spgxlogVacuumRedirect xlrec; - XLogRecData rdata[3]; - xlrec.node = index->rd_node; - xlrec.blkno = BufferGetBlockNumber(buffer); xlrec.nToPlaceholder = 0; xlrec.newestRedirectXid = InvalidTransactionId; @@ -585,11 +583,15 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) { XLogRecPtr recptr; - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogVacuumRedirect, 0); - ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1); - ACCEPT_RDATA_BUFFER(buffer, 2); + XLogBeginInsert(); + + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect); + XLogRegisterData((char *) itemToPlaceholder, + sizeof(OffsetNumber) * xlrec.nToPlaceholder); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata); + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT); PageSetLSN(page, recptr); } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 920739436ac..ac6d4bd369a 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -71,33 +71,30 @@ addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset) } static void -spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) +spgRedoCreateIndex(XLogReaderState *record) { - RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; Page page; - /* Backup blocks are not used in create_index records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO); page = (Page) BufferGetPage(buffer); SpGistInitMetapage(page); PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 1); + Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO); SpGistInitBuffer(buffer, SPGIST_LEAF); page = (Page) BufferGetPage(buffer); PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 2); + Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO); SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS); page = (Page) BufferGetPage(buffer); PageSetLSN(page, lsn); @@ -106,8 +103,9 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) +spgRedoAddLeaf(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr; char *leafTuple; @@ -128,15 +126,13 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, true); + buffer = XLogInitBufferForRedo(record, 0); SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 0, - xldata->node, xldata->blknoLeaf, - &buffer); + action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { @@ -164,7 +160,8 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) { /* replacing a DEAD tuple */ PageIndexTupleDelete(page, xldata->offnumLeaf); - if (PageAddItem(page, (Item) leafTuple, leafTupleHdr.size, + if (PageAddItem(page, + (Item) leafTuple, leafTupleHdr.size, xldata->offnumLeaf, false, false) != xldata->offnumLeaf) elog(ERROR, "failed to add item of size %u to SPGiST index page", leafTupleHdr.size); @@ -177,13 +174,14 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* update parent downlink if necessary */ - if (xldata->blknoParent != InvalidBlockNumber) + if (xldata->offnumParent != InvalidOffsetNumber) { - if (XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoParent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { SpGistInnerTuple tuple; + BlockNumber blknoLeaf; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf); page = BufferGetPage(buffer); @@ -191,7 +189,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoLeaf, xldata->offnumLeaf); + blknoLeaf, xldata->offnumLeaf); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -202,8 +200,9 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) +spgRedoMoveLeafs(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr; SpGistState state; @@ -213,6 +212,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; XLogRedoAction action; + BlockNumber blknoDst; + + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst); fillFakeState(&state, xldata->stateSrc); @@ -235,15 +237,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) /* Insert tuples on the dest page (do first, so redirect is valid) */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, true); + buffer = XLogInitBufferForRedo(record, 1); SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoDst, - &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) { int i; @@ -260,7 +261,8 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) * field. */ leafTuple = ptr; - memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + memcpy(&leafTupleHdr, leafTuple, + sizeof(SpGistLeafTupleData)); addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, toInsert[i]); @@ -274,14 +276,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* Delete tuples from the source page, inserting a redirection pointer */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoSrc, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, SPGIST_PLACEHOLDER, - xldata->blknoDst, + blknoDst, toInsert[nInsert - 1]); PageSetLSN(page, lsn); @@ -291,8 +293,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* And update the parent downlink */ - if (XLogReadBufferForRedo(lsn, record, 2, xldata->node, xldata->blknoParent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { SpGistInnerTuple tuple; @@ -302,7 +303,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoDst, toInsert[nInsert - 1]); + blknoDst, toInsert[nInsert - 1]); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -312,8 +313,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) +spgRedoAddNode(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogAddNode *xldata = (spgxlogAddNode *) ptr; char *innerTuple; @@ -321,7 +323,6 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) SpGistState state; Buffer buffer; Page page; - int bbi; XLogRedoAction action; ptr += sizeof(spgxlogAddNode); @@ -331,17 +332,18 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) fillFakeState(&state, xldata->stateSrc); - if (xldata->blknoNew == InvalidBlockNumber) + if (!XLogRecHasBlockRef(record, 1)) { /* update in place */ - Assert(xldata->blknoParent == InvalidBlockNumber); - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + Assert(xldata->parentBlk == -1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnum); if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, - xldata->offnum, false, false) != xldata->offnum) + xldata->offnum, + false, false) != xldata->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", innerTupleHdr.size); @@ -353,30 +355,30 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) } else { + BlockNumber blkno; + BlockNumber blknoNew; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno); + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew); + /* * In normal operation we would have all three pages (source, dest, * and parent) locked simultaneously; but in WAL replay it should be * safe to update them one at a time, as long as we do it in the right - * order. - * - * The logic here depends on the assumption that blkno != blknoNew, - * else we can't tell which BKP bit goes with which page, and the LSN - * checks could go wrong too. + * order. We must insert the new tuple before replacing the old tuple + * with the redirect tuple. */ - Assert(xldata->blkno != xldata->blknoNew); /* Install new tuple first so redirect is valid */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, true); /* AddNode is not used for nulls pages */ + buffer = XLogInitBufferForRedo(record, 1); SpGistInitBuffer(buffer, 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoNew, - &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -385,22 +387,26 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) innerTupleHdr.size, xldata->offnumNew); /* - * If parent is in this same page, don't advance LSN; doing so - * would fool us into not applying the parent downlink update - * below. We'll update the LSN when we fix the parent downlink. + * If parent is in this same page, update it now. */ - if (xldata->blknoParent != xldata->blknoNew) + if (xldata->parentBlk == 1) { - PageSetLSN(page, lsn); + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); } + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* Delete old tuple, replacing it with redirect or placeholder tuple */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { SpGistDeadTuple dt; @@ -412,11 +418,12 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) InvalidOffsetNumber); else dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, - xldata->blknoNew, + blknoNew, xldata->offnumNew); PageIndexTupleDelete(page, xldata->offnum); - if (PageAddItem(page, (Item) dt, dt->size, xldata->offnum, + if (PageAddItem(page, (Item) dt, dt->size, + xldata->offnum, false, false) != xldata->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", dt->size); @@ -427,67 +434,55 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) SpGistPageGetOpaque(page)->nRedirection++; /* - * If parent is in this same page, don't advance LSN; doing so - * would fool us into not applying the parent downlink update - * below. We'll update the LSN when we fix the parent downlink. + * If parent is in this same page, update it now. */ - if (xldata->blknoParent != xldata->blkno) + if (xldata->parentBlk == 0) { - PageSetLSN(page, lsn); + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); } + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* - * Update parent downlink. Since parent could be in either of the - * previous two buffers, it's a bit tricky to determine which BKP bit - * applies. + * Update parent downlink (if we didn't do it as part of the source or + * destination page update already). */ - if (xldata->blknoParent == xldata->blkno) - bbi = 0; - else if (xldata->blknoParent == xldata->blknoNew) - bbi = 1; - else - bbi = 2; - - if (record->xl_info & XLR_BKP_BLOCK(bbi)) + if (xldata->parentBlk == 2) { - if (bbi == 2) /* else we already did it */ - (void) RestoreBackupBlock(lsn, record, bbi, false, false); - action = BLK_RESTORED; - buffer = InvalidBuffer; - } - else - { - action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, - xldata->blknoParent, &buffer); - Assert(action != BLK_RESTORED); - } - if (action == BLK_NEEDS_REDO) - { - SpGistInnerTuple innerTuple; + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parentTuple; - page = BufferGetPage(buffer); + page = BufferGetPage(buffer); - innerTuple = (SpGistInnerTuple) PageGetItem(page, + parentTuple = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(innerTuple, xldata->nodeI, - xldata->blknoNew, xldata->offnumNew); + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); } } static void -spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) +spgRedoSplitTuple(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr; char *prefixTuple; @@ -496,6 +491,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) SpGistInnerTupleData postfixTupleHdr; Buffer buffer; Page page; + XLogRedoAction action; ptr += sizeof(spgxlogSplitTuple); prefixTuple = ptr; @@ -513,22 +509,17 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) */ /* insert postfix tuple first to avoid dangling link */ - if (xldata->blknoPostfix != xldata->blknoPrefix) + if (!xldata->postfixBlkSame) { - XLogRedoAction action; - if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, true); + buffer = XLogInitBufferForRedo(record, 1); /* SplitTuple is not used for nulls pages */ SpGistInitBuffer(buffer, 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoPostfix, - &buffer); - + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -544,18 +535,19 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } /* now handle the original page */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoPrefix, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnumPrefix); if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, xldata->offnumPrefix, false, false) != xldata->offnumPrefix) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTupleHdr.size); - if (xldata->blknoPostfix == xldata->blknoPrefix) - addOrReplaceTuple(page, (Item) postfixTuple, postfixTupleHdr.size, + if (xldata->postfixBlkSame) + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, xldata->offnumPostfix); PageSetLSN(page, lsn); @@ -566,8 +558,9 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) +spgRedoPickSplit(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr; char *innerTuple; @@ -578,14 +571,16 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) uint8 *leafPageSelect; Buffer srcBuffer; Buffer destBuffer; + Buffer innerBuffer; Page srcPage; Page destPage; - Buffer innerBuffer; Page page; - int bbi; int i; + BlockNumber blknoInner; XLogRedoAction action; + XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner); + fillFakeState(&state, xldata->stateSrc); ptr += SizeOfSpgxlogPickSplit; @@ -603,13 +598,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* now ptr points to the list of leaf tuples */ - /* - * It's a bit tricky to identify which pages have been handled as - * full-page images, so we explicitly count each referenced buffer. - */ - bbi = 0; - - if (SpGistBlockIsRoot(xldata->blknoSrc)) + if (xldata->isRootSplit) { /* when splitting root, we touch it only in the guise of new inner */ srcBuffer = InvalidBuffer; @@ -618,8 +607,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) else if (xldata->initSrc) { /* just re-init the source page */ - srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true); - Assert(BufferIsValid(srcBuffer)); + srcBuffer = XLogInitBufferForRedo(record, 0); srcPage = (Page) BufferGetPage(srcBuffer); SpGistInitBuffer(srcBuffer, @@ -634,9 +622,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * inserting leaf tuples and the new inner tuple, else the added * redirect tuple will be a dangling link.) */ - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoSrc, - &srcBuffer) == BLK_NEEDS_REDO) + srcPage = NULL; + if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO) { srcPage = BufferGetPage(srcBuffer); @@ -650,7 +637,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) toDelete, xldata->nDelete, SPGIST_REDIRECT, SPGIST_PLACEHOLDER, - xldata->blknoInner, + blknoInner, xldata->offnumInner); else spgPageIndexMultiDelete(&state, srcPage, @@ -662,15 +649,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* don't update LSN etc till we're done with it */ } - else - { - srcPage = NULL; /* don't do any page updates */ - } - bbi++; } /* try to access dest page if any */ - if (xldata->blknoDest == InvalidBlockNumber) + if (!XLogRecHasBlockRef(record, 1)) { destBuffer = InvalidBuffer; destPage = NULL; @@ -678,8 +660,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) else if (xldata->initDest) { /* just re-init the dest page */ - destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true); - Assert(BufferIsValid(destBuffer)); + destBuffer = XLogInitBufferForRedo(record, 1); destPage = (Page) BufferGetPage(destBuffer); SpGistInitBuffer(destBuffer, @@ -692,17 +673,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * We could probably release the page lock immediately in the * full-page-image case, but for safety let's hold it till later. */ - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoDest, - &destBuffer) == BLK_NEEDS_REDO) - { + if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO) destPage = (Page) BufferGetPage(destBuffer); - } else - { destPage = NULL; /* don't do any page updates */ - } - bbi++; } /* restore leaf tuples to src and/or dest page */ @@ -739,14 +713,12 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* restore new inner tuple */ if (xldata->initInner) { - innerBuffer = XLogReadBuffer(xldata->node, xldata->blknoInner, true); - SpGistInitBuffer(innerBuffer, - (xldata->storesNulls ? SPGIST_NULLS : 0)); + innerBuffer = XLogInitBufferForRedo(record, 2); + SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, - xldata->blknoInner, &innerBuffer); + action = XLogReadBufferForRedo(record, 2, &innerBuffer); if (action == BLK_NEEDS_REDO) { @@ -756,14 +728,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) xldata->offnumInner); /* if inner is also parent, update link while we're here */ - if (xldata->blknoInner == xldata->blknoParent) + if (xldata->innerIsParent) { SpGistInnerTuple parent; parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + blknoInner, xldata->offnumInner); } PageSetLSN(page, lsn); @@ -771,7 +743,6 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } if (BufferIsValid(innerBuffer)) UnlockReleaseBuffer(innerBuffer); - bbi++; /* * Now we can release the leaf-page locks. It's okay to do this before @@ -783,18 +754,11 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(destBuffer); /* update parent downlink, unless we did it above */ - if (xldata->blknoParent == InvalidBlockNumber) - { - /* no parent cause we split the root */ - Assert(SpGistBlockIsRoot(xldata->blknoInner)); - } - else if (xldata->blknoInner != xldata->blknoParent) + if (XLogRecHasBlockRef(record, 3)) { Buffer parentBuffer; - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoParent, - &parentBuffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO) { SpGistInnerTuple parent; @@ -803,7 +767,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + blknoInner, xldata->offnumInner); PageSetLSN(page, lsn); MarkBufferDirty(parentBuffer); @@ -811,11 +775,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) if (BufferIsValid(parentBuffer)) UnlockReleaseBuffer(parentBuffer); } + else + Assert(xldata->innerIsParent || xldata->isRootSplit); } static void -spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumLeaf(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr; OffsetNumber *toDead; @@ -844,8 +811,7 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(OffsetNumber) * xldata->nChain; chainDest = (OffsetNumber *) ptr; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -897,8 +863,9 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumRoot(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr; OffsetNumber *toDelete; @@ -907,8 +874,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) toDelete = xldata->offsets; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -923,8 +889,9 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumRedirect(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; OffsetNumber *itemToPlaceholder; @@ -939,12 +906,16 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) { if (TransactionIdIsValid(xldata->newestRedirectXid)) + { + RelFileNode node; + + XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, - xldata->node); + node); + } } - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); SpGistPageOpaque opaque = SpGistPageGetOpaque(page); @@ -995,40 +966,40 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) } void -spg_redo(XLogRecPtr lsn, XLogRecord *record) +spg_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_SPGIST_CREATE_INDEX: - spgRedoCreateIndex(lsn, record); + spgRedoCreateIndex(record); break; case XLOG_SPGIST_ADD_LEAF: - spgRedoAddLeaf(lsn, record); + spgRedoAddLeaf(record); break; case XLOG_SPGIST_MOVE_LEAFS: - spgRedoMoveLeafs(lsn, record); + spgRedoMoveLeafs(record); break; case XLOG_SPGIST_ADD_NODE: - spgRedoAddNode(lsn, record); + spgRedoAddNode(record); break; case XLOG_SPGIST_SPLIT_TUPLE: - spgRedoSplitTuple(lsn, record); + spgRedoSplitTuple(record); break; case XLOG_SPGIST_PICKSPLIT: - spgRedoPickSplit(lsn, record); + spgRedoPickSplit(record); break; case XLOG_SPGIST_VACUUM_LEAF: - spgRedoVacuumLeaf(lsn, record); + spgRedoVacuumLeaf(record); break; case XLOG_SPGIST_VACUUM_ROOT: - spgRedoVacuumRoot(lsn, record); + spgRedoVacuumRoot(record); break; case XLOG_SPGIST_VACUUM_REDIRECT: - spgRedoVacuumRedirect(lsn, record); + spgRedoVacuumRedirect(record); break; default: elog(PANIC, "spg_redo: unknown op code %u", info); |