diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 17:56:26 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 18:46:41 +0200 |
commit | 2c03216d831160bedd72d45f712601b6f7d03f1c (patch) | |
tree | ab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/spgist/spgxlog.c | |
parent | 8dc626defec23016dd5988208d8704b858b9d21d (diff) | |
download | postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip |
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and
block(s) in a standardized format. That makes it easier to write tools that
need that information, like pg_rewind, prefetching the blocks to speed up
recovery, etc.
There's a whole new API for building WAL records, replacing the XLogRecData
chains used previously. The new API consists of XLogRegister* functions,
which are called for each buffer and chunk of data that is added to the
record. The new API also gives more control over when a full-page image is
written, by passing flags to the XLogRegisterBuffer function.
This also simplifies the XLogReadBufferForRedo() calls. The function can dig
the relation and block number from the WAL record, so they no longer need to
be passed as arguments.
For the convenience of redo routines, XLogReader now disects each WAL record
after reading it, copying the main data part and the per-block data into
MAXALIGNed buffers. The data chunks are not aligned within the WAL record,
but the redo routines can assume that the pointers returned by XLogRecGet*
functions are. Redo routines are now passed the XLogReaderState, which
contains the record in the already-disected format, instead of the plain
XLogRecord.
The new record format also makes the fixed size XLogRecord header smaller,
by removing the xl_len field. The length of the "main data" portion is now
stored at the end of the WAL record, and there's a separate header after
XLogRecord for it. The alignment padding at the end of XLogRecord is also
removed. This compansates for the fact that the new format would otherwise
be more bulky than the old format.
Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera,
Fujii Masao.
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 335 |
1 files changed, 153 insertions, 182 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 920739436ac..ac6d4bd369a 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -71,33 +71,30 @@ addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset) } static void -spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) +spgRedoCreateIndex(XLogReaderState *record) { - RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; Page page; - /* Backup blocks are not used in create_index records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO); page = (Page) BufferGetPage(buffer); SpGistInitMetapage(page); PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 1); + Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO); SpGistInitBuffer(buffer, SPGIST_LEAF); page = (Page) BufferGetPage(buffer); PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 2); + Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO); SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS); page = (Page) BufferGetPage(buffer); PageSetLSN(page, lsn); @@ -106,8 +103,9 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) +spgRedoAddLeaf(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr; char *leafTuple; @@ -128,15 +126,13 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, true); + buffer = XLogInitBufferForRedo(record, 0); SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 0, - xldata->node, xldata->blknoLeaf, - &buffer); + action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { @@ -164,7 +160,8 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) { /* replacing a DEAD tuple */ PageIndexTupleDelete(page, xldata->offnumLeaf); - if (PageAddItem(page, (Item) leafTuple, leafTupleHdr.size, + if (PageAddItem(page, + (Item) leafTuple, leafTupleHdr.size, xldata->offnumLeaf, false, false) != xldata->offnumLeaf) elog(ERROR, "failed to add item of size %u to SPGiST index page", leafTupleHdr.size); @@ -177,13 +174,14 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* update parent downlink if necessary */ - if (xldata->blknoParent != InvalidBlockNumber) + if (xldata->offnumParent != InvalidOffsetNumber) { - if (XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoParent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { SpGistInnerTuple tuple; + BlockNumber blknoLeaf; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf); page = BufferGetPage(buffer); @@ -191,7 +189,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoLeaf, xldata->offnumLeaf); + blknoLeaf, xldata->offnumLeaf); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -202,8 +200,9 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) +spgRedoMoveLeafs(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr; SpGistState state; @@ -213,6 +212,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; XLogRedoAction action; + BlockNumber blknoDst; + + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst); fillFakeState(&state, xldata->stateSrc); @@ -235,15 +237,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) /* Insert tuples on the dest page (do first, so redirect is valid) */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, true); + buffer = XLogInitBufferForRedo(record, 1); SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoDst, - &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) { int i; @@ -260,7 +261,8 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) * field. */ leafTuple = ptr; - memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + memcpy(&leafTupleHdr, leafTuple, + sizeof(SpGistLeafTupleData)); addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, toInsert[i]); @@ -274,14 +276,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* Delete tuples from the source page, inserting a redirection pointer */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoSrc, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, SPGIST_PLACEHOLDER, - xldata->blknoDst, + blknoDst, toInsert[nInsert - 1]); PageSetLSN(page, lsn); @@ -291,8 +293,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* And update the parent downlink */ - if (XLogReadBufferForRedo(lsn, record, 2, xldata->node, xldata->blknoParent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { SpGistInnerTuple tuple; @@ -302,7 +303,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoDst, toInsert[nInsert - 1]); + blknoDst, toInsert[nInsert - 1]); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -312,8 +313,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) +spgRedoAddNode(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogAddNode *xldata = (spgxlogAddNode *) ptr; char *innerTuple; @@ -321,7 +323,6 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) SpGistState state; Buffer buffer; Page page; - int bbi; XLogRedoAction action; ptr += sizeof(spgxlogAddNode); @@ -331,17 +332,18 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) fillFakeState(&state, xldata->stateSrc); - if (xldata->blknoNew == InvalidBlockNumber) + if (!XLogRecHasBlockRef(record, 1)) { /* update in place */ - Assert(xldata->blknoParent == InvalidBlockNumber); - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + Assert(xldata->parentBlk == -1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnum); if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, - xldata->offnum, false, false) != xldata->offnum) + xldata->offnum, + false, false) != xldata->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", innerTupleHdr.size); @@ -353,30 +355,30 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) } else { + BlockNumber blkno; + BlockNumber blknoNew; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno); + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew); + /* * In normal operation we would have all three pages (source, dest, * and parent) locked simultaneously; but in WAL replay it should be * safe to update them one at a time, as long as we do it in the right - * order. - * - * The logic here depends on the assumption that blkno != blknoNew, - * else we can't tell which BKP bit goes with which page, and the LSN - * checks could go wrong too. + * order. We must insert the new tuple before replacing the old tuple + * with the redirect tuple. */ - Assert(xldata->blkno != xldata->blknoNew); /* Install new tuple first so redirect is valid */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, true); /* AddNode is not used for nulls pages */ + buffer = XLogInitBufferForRedo(record, 1); SpGistInitBuffer(buffer, 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoNew, - &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -385,22 +387,26 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) innerTupleHdr.size, xldata->offnumNew); /* - * If parent is in this same page, don't advance LSN; doing so - * would fool us into not applying the parent downlink update - * below. We'll update the LSN when we fix the parent downlink. + * If parent is in this same page, update it now. */ - if (xldata->blknoParent != xldata->blknoNew) + if (xldata->parentBlk == 1) { - PageSetLSN(page, lsn); + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); } + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* Delete old tuple, replacing it with redirect or placeholder tuple */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { SpGistDeadTuple dt; @@ -412,11 +418,12 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) InvalidOffsetNumber); else dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, - xldata->blknoNew, + blknoNew, xldata->offnumNew); PageIndexTupleDelete(page, xldata->offnum); - if (PageAddItem(page, (Item) dt, dt->size, xldata->offnum, + if (PageAddItem(page, (Item) dt, dt->size, + xldata->offnum, false, false) != xldata->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", dt->size); @@ -427,67 +434,55 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) SpGistPageGetOpaque(page)->nRedirection++; /* - * If parent is in this same page, don't advance LSN; doing so - * would fool us into not applying the parent downlink update - * below. We'll update the LSN when we fix the parent downlink. + * If parent is in this same page, update it now. */ - if (xldata->blknoParent != xldata->blkno) + if (xldata->parentBlk == 0) { - PageSetLSN(page, lsn); + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); } + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* - * Update parent downlink. Since parent could be in either of the - * previous two buffers, it's a bit tricky to determine which BKP bit - * applies. + * Update parent downlink (if we didn't do it as part of the source or + * destination page update already). */ - if (xldata->blknoParent == xldata->blkno) - bbi = 0; - else if (xldata->blknoParent == xldata->blknoNew) - bbi = 1; - else - bbi = 2; - - if (record->xl_info & XLR_BKP_BLOCK(bbi)) + if (xldata->parentBlk == 2) { - if (bbi == 2) /* else we already did it */ - (void) RestoreBackupBlock(lsn, record, bbi, false, false); - action = BLK_RESTORED; - buffer = InvalidBuffer; - } - else - { - action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, - xldata->blknoParent, &buffer); - Assert(action != BLK_RESTORED); - } - if (action == BLK_NEEDS_REDO) - { - SpGistInnerTuple innerTuple; + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parentTuple; - page = BufferGetPage(buffer); + page = BufferGetPage(buffer); - innerTuple = (SpGistInnerTuple) PageGetItem(page, + parentTuple = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(innerTuple, xldata->nodeI, - xldata->blknoNew, xldata->offnumNew); + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); } } static void -spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) +spgRedoSplitTuple(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr; char *prefixTuple; @@ -496,6 +491,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) SpGistInnerTupleData postfixTupleHdr; Buffer buffer; Page page; + XLogRedoAction action; ptr += sizeof(spgxlogSplitTuple); prefixTuple = ptr; @@ -513,22 +509,17 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) */ /* insert postfix tuple first to avoid dangling link */ - if (xldata->blknoPostfix != xldata->blknoPrefix) + if (!xldata->postfixBlkSame) { - XLogRedoAction action; - if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, true); + buffer = XLogInitBufferForRedo(record, 1); /* SplitTuple is not used for nulls pages */ SpGistInitBuffer(buffer, 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoPostfix, - &buffer); - + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -544,18 +535,19 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } /* now handle the original page */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoPrefix, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnumPrefix); if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, xldata->offnumPrefix, false, false) != xldata->offnumPrefix) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTupleHdr.size); - if (xldata->blknoPostfix == xldata->blknoPrefix) - addOrReplaceTuple(page, (Item) postfixTuple, postfixTupleHdr.size, + if (xldata->postfixBlkSame) + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, xldata->offnumPostfix); PageSetLSN(page, lsn); @@ -566,8 +558,9 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) +spgRedoPickSplit(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr; char *innerTuple; @@ -578,14 +571,16 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) uint8 *leafPageSelect; Buffer srcBuffer; Buffer destBuffer; + Buffer innerBuffer; Page srcPage; Page destPage; - Buffer innerBuffer; Page page; - int bbi; int i; + BlockNumber blknoInner; XLogRedoAction action; + XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner); + fillFakeState(&state, xldata->stateSrc); ptr += SizeOfSpgxlogPickSplit; @@ -603,13 +598,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* now ptr points to the list of leaf tuples */ - /* - * It's a bit tricky to identify which pages have been handled as - * full-page images, so we explicitly count each referenced buffer. - */ - bbi = 0; - - if (SpGistBlockIsRoot(xldata->blknoSrc)) + if (xldata->isRootSplit) { /* when splitting root, we touch it only in the guise of new inner */ srcBuffer = InvalidBuffer; @@ -618,8 +607,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) else if (xldata->initSrc) { /* just re-init the source page */ - srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true); - Assert(BufferIsValid(srcBuffer)); + srcBuffer = XLogInitBufferForRedo(record, 0); srcPage = (Page) BufferGetPage(srcBuffer); SpGistInitBuffer(srcBuffer, @@ -634,9 +622,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * inserting leaf tuples and the new inner tuple, else the added * redirect tuple will be a dangling link.) */ - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoSrc, - &srcBuffer) == BLK_NEEDS_REDO) + srcPage = NULL; + if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO) { srcPage = BufferGetPage(srcBuffer); @@ -650,7 +637,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) toDelete, xldata->nDelete, SPGIST_REDIRECT, SPGIST_PLACEHOLDER, - xldata->blknoInner, + blknoInner, xldata->offnumInner); else spgPageIndexMultiDelete(&state, srcPage, @@ -662,15 +649,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* don't update LSN etc till we're done with it */ } - else - { - srcPage = NULL; /* don't do any page updates */ - } - bbi++; } /* try to access dest page if any */ - if (xldata->blknoDest == InvalidBlockNumber) + if (!XLogRecHasBlockRef(record, 1)) { destBuffer = InvalidBuffer; destPage = NULL; @@ -678,8 +660,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) else if (xldata->initDest) { /* just re-init the dest page */ - destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true); - Assert(BufferIsValid(destBuffer)); + destBuffer = XLogInitBufferForRedo(record, 1); destPage = (Page) BufferGetPage(destBuffer); SpGistInitBuffer(destBuffer, @@ -692,17 +673,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * We could probably release the page lock immediately in the * full-page-image case, but for safety let's hold it till later. */ - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoDest, - &destBuffer) == BLK_NEEDS_REDO) - { + if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO) destPage = (Page) BufferGetPage(destBuffer); - } else - { destPage = NULL; /* don't do any page updates */ - } - bbi++; } /* restore leaf tuples to src and/or dest page */ @@ -739,14 +713,12 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* restore new inner tuple */ if (xldata->initInner) { - innerBuffer = XLogReadBuffer(xldata->node, xldata->blknoInner, true); - SpGistInitBuffer(innerBuffer, - (xldata->storesNulls ? SPGIST_NULLS : 0)); + innerBuffer = XLogInitBufferForRedo(record, 2); + SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, - xldata->blknoInner, &innerBuffer); + action = XLogReadBufferForRedo(record, 2, &innerBuffer); if (action == BLK_NEEDS_REDO) { @@ -756,14 +728,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) xldata->offnumInner); /* if inner is also parent, update link while we're here */ - if (xldata->blknoInner == xldata->blknoParent) + if (xldata->innerIsParent) { SpGistInnerTuple parent; parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + blknoInner, xldata->offnumInner); } PageSetLSN(page, lsn); @@ -771,7 +743,6 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } if (BufferIsValid(innerBuffer)) UnlockReleaseBuffer(innerBuffer); - bbi++; /* * Now we can release the leaf-page locks. It's okay to do this before @@ -783,18 +754,11 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(destBuffer); /* update parent downlink, unless we did it above */ - if (xldata->blknoParent == InvalidBlockNumber) - { - /* no parent cause we split the root */ - Assert(SpGistBlockIsRoot(xldata->blknoInner)); - } - else if (xldata->blknoInner != xldata->blknoParent) + if (XLogRecHasBlockRef(record, 3)) { Buffer parentBuffer; - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoParent, - &parentBuffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO) { SpGistInnerTuple parent; @@ -803,7 +767,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + blknoInner, xldata->offnumInner); PageSetLSN(page, lsn); MarkBufferDirty(parentBuffer); @@ -811,11 +775,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) if (BufferIsValid(parentBuffer)) UnlockReleaseBuffer(parentBuffer); } + else + Assert(xldata->innerIsParent || xldata->isRootSplit); } static void -spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumLeaf(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr; OffsetNumber *toDead; @@ -844,8 +811,7 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(OffsetNumber) * xldata->nChain; chainDest = (OffsetNumber *) ptr; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -897,8 +863,9 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumRoot(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr; OffsetNumber *toDelete; @@ -907,8 +874,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) toDelete = xldata->offsets; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -923,8 +889,9 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumRedirect(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; OffsetNumber *itemToPlaceholder; @@ -939,12 +906,16 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) { if (TransactionIdIsValid(xldata->newestRedirectXid)) + { + RelFileNode node; + + XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, - xldata->node); + node); + } } - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); SpGistPageOpaque opaque = SpGistPageGetOpaque(page); @@ -995,40 +966,40 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) } void -spg_redo(XLogRecPtr lsn, XLogRecord *record) +spg_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_SPGIST_CREATE_INDEX: - spgRedoCreateIndex(lsn, record); + spgRedoCreateIndex(record); break; case XLOG_SPGIST_ADD_LEAF: - spgRedoAddLeaf(lsn, record); + spgRedoAddLeaf(record); break; case XLOG_SPGIST_MOVE_LEAFS: - spgRedoMoveLeafs(lsn, record); + spgRedoMoveLeafs(record); break; case XLOG_SPGIST_ADD_NODE: - spgRedoAddNode(lsn, record); + spgRedoAddNode(record); break; case XLOG_SPGIST_SPLIT_TUPLE: - spgRedoSplitTuple(lsn, record); + spgRedoSplitTuple(record); break; case XLOG_SPGIST_PICKSPLIT: - spgRedoPickSplit(lsn, record); + spgRedoPickSplit(record); break; case XLOG_SPGIST_VACUUM_LEAF: - spgRedoVacuumLeaf(lsn, record); + spgRedoVacuumLeaf(record); break; case XLOG_SPGIST_VACUUM_ROOT: - spgRedoVacuumRoot(lsn, record); + spgRedoVacuumRoot(record); break; case XLOG_SPGIST_VACUUM_REDIRECT: - spgRedoVacuumRedirect(lsn, record); + spgRedoVacuumRedirect(record); break; default: elog(PANIC, "spg_redo: unknown op code %u", info); |