diff options
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 335 |
1 files changed, 153 insertions, 182 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 920739436ac..ac6d4bd369a 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -71,33 +71,30 @@ addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset) } static void -spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) +spgRedoCreateIndex(XLogReaderState *record) { - RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; Page page; - /* Backup blocks are not used in create_index records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO); page = (Page) BufferGetPage(buffer); SpGistInitMetapage(page); PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 1); + Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO); SpGistInitBuffer(buffer, SPGIST_LEAF); page = (Page) BufferGetPage(buffer); PageSetLSN(page, lsn); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 2); + Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO); SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS); page = (Page) BufferGetPage(buffer); PageSetLSN(page, lsn); @@ -106,8 +103,9 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) +spgRedoAddLeaf(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr; char *leafTuple; @@ -128,15 +126,13 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, true); + buffer = XLogInitBufferForRedo(record, 0); SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 0, - xldata->node, xldata->blknoLeaf, - &buffer); + action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { @@ -164,7 +160,8 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) { /* replacing a DEAD tuple */ PageIndexTupleDelete(page, xldata->offnumLeaf); - if (PageAddItem(page, (Item) leafTuple, leafTupleHdr.size, + if (PageAddItem(page, + (Item) leafTuple, leafTupleHdr.size, xldata->offnumLeaf, false, false) != xldata->offnumLeaf) elog(ERROR, "failed to add item of size %u to SPGiST index page", leafTupleHdr.size); @@ -177,13 +174,14 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* update parent downlink if necessary */ - if (xldata->blknoParent != InvalidBlockNumber) + if (xldata->offnumParent != InvalidOffsetNumber) { - if (XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoParent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) { SpGistInnerTuple tuple; + BlockNumber blknoLeaf; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf); page = BufferGetPage(buffer); @@ -191,7 +189,7 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoLeaf, xldata->offnumLeaf); + blknoLeaf, xldata->offnumLeaf); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -202,8 +200,9 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) +spgRedoMoveLeafs(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr; SpGistState state; @@ -213,6 +212,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; XLogRedoAction action; + BlockNumber blknoDst; + + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst); fillFakeState(&state, xldata->stateSrc); @@ -235,15 +237,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) /* Insert tuples on the dest page (do first, so redirect is valid) */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, true); + buffer = XLogInitBufferForRedo(record, 1); SpGistInitBuffer(buffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoDst, - &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) { int i; @@ -260,7 +261,8 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) * field. */ leafTuple = ptr; - memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + memcpy(&leafTupleHdr, leafTuple, + sizeof(SpGistLeafTupleData)); addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, toInsert[i]); @@ -274,14 +276,14 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* Delete tuples from the source page, inserting a redirection pointer */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoSrc, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, SPGIST_PLACEHOLDER, - xldata->blknoDst, + blknoDst, toInsert[nInsert - 1]); PageSetLSN(page, lsn); @@ -291,8 +293,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); /* And update the parent downlink */ - if (XLogReadBufferForRedo(lsn, record, 2, xldata->node, xldata->blknoParent, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) { SpGistInnerTuple tuple; @@ -302,7 +303,7 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(tuple, xldata->nodeI, - xldata->blknoDst, toInsert[nInsert - 1]); + blknoDst, toInsert[nInsert - 1]); PageSetLSN(page, lsn); MarkBufferDirty(buffer); @@ -312,8 +313,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) +spgRedoAddNode(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogAddNode *xldata = (spgxlogAddNode *) ptr; char *innerTuple; @@ -321,7 +323,6 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) SpGistState state; Buffer buffer; Page page; - int bbi; XLogRedoAction action; ptr += sizeof(spgxlogAddNode); @@ -331,17 +332,18 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) fillFakeState(&state, xldata->stateSrc); - if (xldata->blknoNew == InvalidBlockNumber) + if (!XLogRecHasBlockRef(record, 1)) { /* update in place */ - Assert(xldata->blknoParent == InvalidBlockNumber); - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + Assert(xldata->parentBlk == -1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnum); if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, - xldata->offnum, false, false) != xldata->offnum) + xldata->offnum, + false, false) != xldata->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", innerTupleHdr.size); @@ -353,30 +355,30 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) } else { + BlockNumber blkno; + BlockNumber blknoNew; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno); + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew); + /* * In normal operation we would have all three pages (source, dest, * and parent) locked simultaneously; but in WAL replay it should be * safe to update them one at a time, as long as we do it in the right - * order. - * - * The logic here depends on the assumption that blkno != blknoNew, - * else we can't tell which BKP bit goes with which page, and the LSN - * checks could go wrong too. + * order. We must insert the new tuple before replacing the old tuple + * with the redirect tuple. */ - Assert(xldata->blkno != xldata->blknoNew); /* Install new tuple first so redirect is valid */ if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, true); /* AddNode is not used for nulls pages */ + buffer = XLogInitBufferForRedo(record, 1); SpGistInitBuffer(buffer, 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoNew, - &buffer); + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -385,22 +387,26 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) innerTupleHdr.size, xldata->offnumNew); /* - * If parent is in this same page, don't advance LSN; doing so - * would fool us into not applying the parent downlink update - * below. We'll update the LSN when we fix the parent downlink. + * If parent is in this same page, update it now. */ - if (xldata->blknoParent != xldata->blknoNew) + if (xldata->parentBlk == 1) { - PageSetLSN(page, lsn); + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); } + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* Delete old tuple, replacing it with redirect or placeholder tuple */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { SpGistDeadTuple dt; @@ -412,11 +418,12 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) InvalidOffsetNumber); else dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, - xldata->blknoNew, + blknoNew, xldata->offnumNew); PageIndexTupleDelete(page, xldata->offnum); - if (PageAddItem(page, (Item) dt, dt->size, xldata->offnum, + if (PageAddItem(page, (Item) dt, dt->size, + xldata->offnum, false, false) != xldata->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", dt->size); @@ -427,67 +434,55 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) SpGistPageGetOpaque(page)->nRedirection++; /* - * If parent is in this same page, don't advance LSN; doing so - * would fool us into not applying the parent downlink update - * below. We'll update the LSN when we fix the parent downlink. + * If parent is in this same page, update it now. */ - if (xldata->blknoParent != xldata->blkno) + if (xldata->parentBlk == 0) { - PageSetLSN(page, lsn); + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); } + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* - * Update parent downlink. Since parent could be in either of the - * previous two buffers, it's a bit tricky to determine which BKP bit - * applies. + * Update parent downlink (if we didn't do it as part of the source or + * destination page update already). */ - if (xldata->blknoParent == xldata->blkno) - bbi = 0; - else if (xldata->blknoParent == xldata->blknoNew) - bbi = 1; - else - bbi = 2; - - if (record->xl_info & XLR_BKP_BLOCK(bbi)) + if (xldata->parentBlk == 2) { - if (bbi == 2) /* else we already did it */ - (void) RestoreBackupBlock(lsn, record, bbi, false, false); - action = BLK_RESTORED; - buffer = InvalidBuffer; - } - else - { - action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, - xldata->blknoParent, &buffer); - Assert(action != BLK_RESTORED); - } - if (action == BLK_NEEDS_REDO) - { - SpGistInnerTuple innerTuple; + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parentTuple; - page = BufferGetPage(buffer); + page = BufferGetPage(buffer); - innerTuple = (SpGistInnerTuple) PageGetItem(page, + parentTuple = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); - spgUpdateNodeLink(innerTuple, xldata->nodeI, - xldata->blknoNew, xldata->offnumNew); + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); } } static void -spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) +spgRedoSplitTuple(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr; char *prefixTuple; @@ -496,6 +491,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) SpGistInnerTupleData postfixTupleHdr; Buffer buffer; Page page; + XLogRedoAction action; ptr += sizeof(spgxlogSplitTuple); prefixTuple = ptr; @@ -513,22 +509,17 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) */ /* insert postfix tuple first to avoid dangling link */ - if (xldata->blknoPostfix != xldata->blknoPrefix) + if (!xldata->postfixBlkSame) { - XLogRedoAction action; - if (xldata->newPage) { - buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, true); + buffer = XLogInitBufferForRedo(record, 1); /* SplitTuple is not used for nulls pages */ SpGistInitBuffer(buffer, 0); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, 1, - xldata->node, xldata->blknoPostfix, - &buffer); - + action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -544,18 +535,19 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } /* now handle the original page */ - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blknoPrefix, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); + PageIndexTupleDelete(page, xldata->offnumPrefix); if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, xldata->offnumPrefix, false, false) != xldata->offnumPrefix) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTupleHdr.size); - if (xldata->blknoPostfix == xldata->blknoPrefix) - addOrReplaceTuple(page, (Item) postfixTuple, postfixTupleHdr.size, + if (xldata->postfixBlkSame) + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, xldata->offnumPostfix); PageSetLSN(page, lsn); @@ -566,8 +558,9 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) +spgRedoPickSplit(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr; char *innerTuple; @@ -578,14 +571,16 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) uint8 *leafPageSelect; Buffer srcBuffer; Buffer destBuffer; + Buffer innerBuffer; Page srcPage; Page destPage; - Buffer innerBuffer; Page page; - int bbi; int i; + BlockNumber blknoInner; XLogRedoAction action; + XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner); + fillFakeState(&state, xldata->stateSrc); ptr += SizeOfSpgxlogPickSplit; @@ -603,13 +598,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* now ptr points to the list of leaf tuples */ - /* - * It's a bit tricky to identify which pages have been handled as - * full-page images, so we explicitly count each referenced buffer. - */ - bbi = 0; - - if (SpGistBlockIsRoot(xldata->blknoSrc)) + if (xldata->isRootSplit) { /* when splitting root, we touch it only in the guise of new inner */ srcBuffer = InvalidBuffer; @@ -618,8 +607,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) else if (xldata->initSrc) { /* just re-init the source page */ - srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true); - Assert(BufferIsValid(srcBuffer)); + srcBuffer = XLogInitBufferForRedo(record, 0); srcPage = (Page) BufferGetPage(srcBuffer); SpGistInitBuffer(srcBuffer, @@ -634,9 +622,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * inserting leaf tuples and the new inner tuple, else the added * redirect tuple will be a dangling link.) */ - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoSrc, - &srcBuffer) == BLK_NEEDS_REDO) + srcPage = NULL; + if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO) { srcPage = BufferGetPage(srcBuffer); @@ -650,7 +637,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) toDelete, xldata->nDelete, SPGIST_REDIRECT, SPGIST_PLACEHOLDER, - xldata->blknoInner, + blknoInner, xldata->offnumInner); else spgPageIndexMultiDelete(&state, srcPage, @@ -662,15 +649,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* don't update LSN etc till we're done with it */ } - else - { - srcPage = NULL; /* don't do any page updates */ - } - bbi++; } /* try to access dest page if any */ - if (xldata->blknoDest == InvalidBlockNumber) + if (!XLogRecHasBlockRef(record, 1)) { destBuffer = InvalidBuffer; destPage = NULL; @@ -678,8 +660,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) else if (xldata->initDest) { /* just re-init the dest page */ - destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true); - Assert(BufferIsValid(destBuffer)); + destBuffer = XLogInitBufferForRedo(record, 1); destPage = (Page) BufferGetPage(destBuffer); SpGistInitBuffer(destBuffer, @@ -692,17 +673,10 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * We could probably release the page lock immediately in the * full-page-image case, but for safety let's hold it till later. */ - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoDest, - &destBuffer) == BLK_NEEDS_REDO) - { + if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO) destPage = (Page) BufferGetPage(destBuffer); - } else - { destPage = NULL; /* don't do any page updates */ - } - bbi++; } /* restore leaf tuples to src and/or dest page */ @@ -739,14 +713,12 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* restore new inner tuple */ if (xldata->initInner) { - innerBuffer = XLogReadBuffer(xldata->node, xldata->blknoInner, true); - SpGistInitBuffer(innerBuffer, - (xldata->storesNulls ? SPGIST_NULLS : 0)); + innerBuffer = XLogInitBufferForRedo(record, 2); + SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0)); action = BLK_NEEDS_REDO; } else - action = XLogReadBufferForRedo(lsn, record, bbi, xldata->node, - xldata->blknoInner, &innerBuffer); + action = XLogReadBufferForRedo(record, 2, &innerBuffer); if (action == BLK_NEEDS_REDO) { @@ -756,14 +728,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) xldata->offnumInner); /* if inner is also parent, update link while we're here */ - if (xldata->blknoInner == xldata->blknoParent) + if (xldata->innerIsParent) { SpGistInnerTuple parent; parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + blknoInner, xldata->offnumInner); } PageSetLSN(page, lsn); @@ -771,7 +743,6 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } if (BufferIsValid(innerBuffer)) UnlockReleaseBuffer(innerBuffer); - bbi++; /* * Now we can release the leaf-page locks. It's okay to do this before @@ -783,18 +754,11 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(destBuffer); /* update parent downlink, unless we did it above */ - if (xldata->blknoParent == InvalidBlockNumber) - { - /* no parent cause we split the root */ - Assert(SpGistBlockIsRoot(xldata->blknoInner)); - } - else if (xldata->blknoInner != xldata->blknoParent) + if (XLogRecHasBlockRef(record, 3)) { Buffer parentBuffer; - if (XLogReadBufferForRedo(lsn, record, bbi, - xldata->node, xldata->blknoParent, - &parentBuffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO) { SpGistInnerTuple parent; @@ -803,7 +767,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) parent = (SpGistInnerTuple) PageGetItem(page, PageGetItemId(page, xldata->offnumParent)); spgUpdateNodeLink(parent, xldata->nodeI, - xldata->blknoInner, xldata->offnumInner); + blknoInner, xldata->offnumInner); PageSetLSN(page, lsn); MarkBufferDirty(parentBuffer); @@ -811,11 +775,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) if (BufferIsValid(parentBuffer)) UnlockReleaseBuffer(parentBuffer); } + else + Assert(xldata->innerIsParent || xldata->isRootSplit); } static void -spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumLeaf(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr; OffsetNumber *toDead; @@ -844,8 +811,7 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(OffsetNumber) * xldata->nChain; chainDest = (OffsetNumber *) ptr; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -897,8 +863,9 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumRoot(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr; OffsetNumber *toDelete; @@ -907,8 +874,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) toDelete = xldata->offsets; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); @@ -923,8 +889,9 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) } static void -spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) +spgRedoVacuumRedirect(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; char *ptr = XLogRecGetData(record); spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; OffsetNumber *itemToPlaceholder; @@ -939,12 +906,16 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) { if (TransactionIdIsValid(xldata->newestRedirectXid)) + { + RelFileNode node; + + XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, - xldata->node); + node); + } } - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { Page page = BufferGetPage(buffer); SpGistPageOpaque opaque = SpGistPageGetOpaque(page); @@ -995,40 +966,40 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) } void -spg_redo(XLogRecPtr lsn, XLogRecord *record) +spg_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_SPGIST_CREATE_INDEX: - spgRedoCreateIndex(lsn, record); + spgRedoCreateIndex(record); break; case XLOG_SPGIST_ADD_LEAF: - spgRedoAddLeaf(lsn, record); + spgRedoAddLeaf(record); break; case XLOG_SPGIST_MOVE_LEAFS: - spgRedoMoveLeafs(lsn, record); + spgRedoMoveLeafs(record); break; case XLOG_SPGIST_ADD_NODE: - spgRedoAddNode(lsn, record); + spgRedoAddNode(record); break; case XLOG_SPGIST_SPLIT_TUPLE: - spgRedoSplitTuple(lsn, record); + spgRedoSplitTuple(record); break; case XLOG_SPGIST_PICKSPLIT: - spgRedoPickSplit(lsn, record); + spgRedoPickSplit(record); break; case XLOG_SPGIST_VACUUM_LEAF: - spgRedoVacuumLeaf(lsn, record); + spgRedoVacuumLeaf(record); break; case XLOG_SPGIST_VACUUM_ROOT: - spgRedoVacuumRoot(lsn, record); + spgRedoVacuumRoot(record); break; case XLOG_SPGIST_VACUUM_REDIRECT: - spgRedoVacuumRedirect(lsn, record); + spgRedoVacuumRedirect(record); break; default: elog(PANIC, "spg_redo: unknown op code %u", info); |