diff options
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 275 |
1 files changed, 183 insertions, 92 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 54e78f18b54..8746b353080 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -76,6 +76,9 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* Backup blocks are not used in create_index records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); @@ -117,7 +120,14 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(spgxlogAddLeaf); leafTuple = (SpGistLeafTuple) ptr; - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + /* + * In normal operation we would have both current and parent pages locked + * simultaneously; but in WAL replay it should be safe to update the leaf + * page before updating the parent. + */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf, xldata->newPage); @@ -169,8 +179,9 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) } /* update parent downlink if necessary */ - if (xldata->blknoParent != InvalidBlockNumber && - !(record->xl_info & XLR_BKP_BLOCK_2)) + if (record->xl_info & XLR_BKP_BLOCK(1)) + (void) RestoreBackupBlock(lsn, record, 1, false, false); + else if (xldata->blknoParent != InvalidBlockNumber) { buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); if (BufferIsValid(buffer)) @@ -219,8 +230,16 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) /* now ptr points to the list of leaf tuples */ + /* + * In normal operation we would have all three pages (source, dest, and + * parent) locked simultaneously; but in WAL replay it should be safe to + * update them one at a time, as long as we do it in the right order. + */ + /* Insert tuples on the dest page (do first, so redirect is valid) */ - if (!(record->xl_info & XLR_BKP_BLOCK_2)) + if (record->xl_info & XLR_BKP_BLOCK(1)) + (void) RestoreBackupBlock(lsn, record, 1, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoDst, xldata->newPage); @@ -253,7 +272,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) } /* Delete tuples from the source page, inserting a redirection pointer */ - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false); if (BufferIsValid(buffer)) @@ -276,7 +297,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) } /* And update the parent downlink */ - if (!(record->xl_info & XLR_BKP_BLOCK_3)) + if (record->xl_info & XLR_BKP_BLOCK(2)) + (void) RestoreBackupBlock(lsn, record, 2, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); if (BufferIsValid(buffer)) @@ -322,7 +345,9 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) { /* update in place */ Assert(xldata->blknoParent == InvalidBlockNumber); - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (BufferIsValid(buffer)) @@ -347,8 +372,22 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) } else { + /* + * In normal operation we would have all three pages (source, dest, + * and parent) locked simultaneously; but in WAL replay it should be + * safe to update them one at a time, as long as we do it in the right + * order. + * + * The logic here depends on the assumption that blkno != blknoNew, + * else we can't tell which BKP bit goes with which page, and the LSN + * checks could go wrong too. + */ + Assert(xldata->blkno != xldata->blknoNew); + /* Install new tuple first so redirect is valid */ - if (!(record->xl_info & XLR_BKP_BLOCK_2)) + if (record->xl_info & XLR_BKP_BLOCK(1)) + (void) RestoreBackupBlock(lsn, record, 1, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoNew, xldata->newPage); @@ -365,8 +404,17 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) addOrReplaceTuple(page, (Item) innerTuple, innerTuple->size, xldata->offnumNew); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); + /* + * If parent is in this same page, don't advance LSN; + * doing so would fool us into not applying the parent + * downlink update below. We'll update the LSN when we + * fix the parent downlink. + */ + if (xldata->blknoParent != xldata->blknoNew) + { + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + } MarkBufferDirty(buffer); } UnlockReleaseBuffer(buffer); @@ -374,7 +422,9 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) } /* Delete old tuple, replacing it with redirect or placeholder tuple */ - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (BufferIsValid(buffer)) @@ -405,8 +455,17 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) else SpGistPageGetOpaque(page)->nRedirection++; - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); + /* + * If parent is in this same page, don't advance LSN; + * doing so would fool us into not applying the parent + * downlink update below. We'll update the LSN when we + * fix the parent downlink. + */ + if (xldata->blknoParent != xldata->blkno) + { + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + } MarkBufferDirty(buffer); } UnlockReleaseBuffer(buffer); @@ -425,7 +484,12 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) else bbi = 2; - if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi))) + if (record->xl_info & XLR_BKP_BLOCK(bbi)) + { + if (bbi == 2) /* else we already did it */ + (void) RestoreBackupBlock(lsn, record, bbi, false, false); + } + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); if (BufferIsValid(buffer)) @@ -467,9 +531,16 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) ptr += prefixTuple->size; postfixTuple = (SpGistInnerTuple) ptr; + /* + * In normal operation we would have both pages locked simultaneously; but + * in WAL replay it should be safe to update them one at a time, as long + * as we do it in the right order. + */ + /* insert postfix tuple first to avoid dangling link */ - if (xldata->blknoPostfix != xldata->blknoPrefix && - !(record->xl_info & XLR_BKP_BLOCK_2)) + if (record->xl_info & XLR_BKP_BLOCK(1)) + (void) RestoreBackupBlock(lsn, record, 1, false, false); + else if (xldata->blknoPostfix != xldata->blknoPrefix) { buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix, xldata->newPage); @@ -495,7 +566,9 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) } /* now handle the original page */ - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false); if (BufferIsValid(buffer)) @@ -535,6 +608,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) uint8 *leafPageSelect; Buffer srcBuffer; Buffer destBuffer; + Page srcPage; + Page destPage; Page page; int bbi; int i; @@ -563,13 +638,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) { /* when splitting root, we touch it only in the guise of new inner */ srcBuffer = InvalidBuffer; + srcPage = NULL; } else if (xldata->initSrc) { /* just re-init the source page */ srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true); Assert(BufferIsValid(srcBuffer)); - page = (Page) BufferGetPage(srcBuffer); + srcPage = (Page) BufferGetPage(srcBuffer); SpGistInitBuffer(srcBuffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); @@ -577,14 +653,24 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } else { - /* delete the specified tuples from source page */ - if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi))) + /* + * Delete the specified tuples from source page. (In case we're in + * Hot Standby, we need to hold lock on the page till we're done + * inserting leaf tuples and the new inner tuple, else the added + * redirect tuple will be a dangling link.) + */ + if (record->xl_info & XLR_BKP_BLOCK(bbi)) + { + srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true); + srcPage = NULL; /* don't need to do any page updates */ + } + else { srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false); if (BufferIsValid(srcBuffer)) { - page = BufferGetPage(srcBuffer); - if (!XLByteLE(lsn, PageGetLSN(page))) + srcPage = BufferGetPage(srcBuffer); + if (!XLByteLE(lsn, PageGetLSN(srcPage))) { /* * We have it a bit easier here than in doPickSplit(), @@ -592,14 +678,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) * we can inject the correct redirection tuple now. */ if (!state.isBuild) - spgPageIndexMultiDelete(&state, page, + spgPageIndexMultiDelete(&state, srcPage, toDelete, xldata->nDelete, SPGIST_REDIRECT, SPGIST_PLACEHOLDER, xldata->blknoInner, xldata->offnumInner); else - spgPageIndexMultiDelete(&state, page, + spgPageIndexMultiDelete(&state, srcPage, toDelete, xldata->nDelete, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, @@ -608,10 +694,12 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) /* don't update LSN etc till we're done with it */ } + else + srcPage = NULL; /* don't do any page updates */ } + else + srcPage = NULL; } - else - srcBuffer = InvalidBuffer; bbi++; } @@ -619,13 +707,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) if (xldata->blknoDest == InvalidBlockNumber) { destBuffer = InvalidBuffer; + destPage = NULL; } else if (xldata->initDest) { /* just re-init the dest page */ destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true); Assert(BufferIsValid(destBuffer)); - page = (Page) BufferGetPage(destBuffer); + destPage = (Page) BufferGetPage(destBuffer); SpGistInitBuffer(destBuffer, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); @@ -633,10 +722,27 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } else { - if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi))) - destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false); + /* + * We could probably release the page lock immediately in the + * full-page-image case, but for safety let's hold it till later. + */ + if (record->xl_info & XLR_BKP_BLOCK(bbi)) + { + destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true); + destPage = NULL; /* don't need to do any page updates */ + } else - destBuffer = InvalidBuffer; + { + destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false); + if (BufferIsValid(destBuffer)) + { + destPage = (Page) BufferGetPage(destBuffer); + if (XLByteLE(lsn, PageGetLSN(destPage))) + destPage = NULL; /* don't do any page updates */ + } + else + destPage = NULL; + } bbi++; } @@ -644,47 +750,34 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) for (i = 0; i < xldata->nInsert; i++) { SpGistLeafTuple lt = (SpGistLeafTuple) ptr; - Buffer leafBuffer; ptr += lt->size; - leafBuffer = leafPageSelect[i] ? destBuffer : srcBuffer; - if (!BufferIsValid(leafBuffer)) + page = leafPageSelect[i] ? destPage : srcPage; + if (page == NULL) continue; /* no need to touch this page */ - page = BufferGetPage(leafBuffer); - if (!XLByteLE(lsn, PageGetLSN(page))) - { - addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]); - } + addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]); } - /* Now update src and dest page LSNs */ - if (BufferIsValid(srcBuffer)) + /* Now update src and dest page LSNs if needed */ + if (srcPage != NULL) { - page = BufferGetPage(srcBuffer); - if (!XLByteLE(lsn, PageGetLSN(page))) - { - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(srcBuffer); - } - UnlockReleaseBuffer(srcBuffer); + PageSetLSN(srcPage, lsn); + PageSetTLI(srcPage, ThisTimeLineID); + MarkBufferDirty(srcBuffer); } - if (BufferIsValid(destBuffer)) + if (destPage != NULL) { - page = BufferGetPage(destBuffer); - if (!XLByteLE(lsn, PageGetLSN(page))) - { - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(destBuffer); - } - UnlockReleaseBuffer(destBuffer); + PageSetLSN(destPage, lsn); + PageSetTLI(destPage, ThisTimeLineID); + MarkBufferDirty(destBuffer); } /* restore new inner tuple */ - if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi))) + if (record->xl_info & XLR_BKP_BLOCK(bbi)) + (void) RestoreBackupBlock(lsn, record, bbi, false, false); + else { Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoInner, xldata->initInner); @@ -722,6 +815,15 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } bbi++; + /* + * Now we can release the leaf-page locks. It's okay to do this before + * updating the parent downlink. + */ + if (BufferIsValid(srcBuffer)) + UnlockReleaseBuffer(srcBuffer); + if (BufferIsValid(destBuffer)) + UnlockReleaseBuffer(destBuffer); + /* update parent downlink, unless we did it above */ if (xldata->blknoParent == InvalidBlockNumber) { @@ -730,7 +832,9 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) } else if (xldata->blknoInner != xldata->blknoParent) { - if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi))) + if (record->xl_info & XLR_BKP_BLOCK(bbi)) + (void) RestoreBackupBlock(lsn, record, bbi, false, false); + else { Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false); @@ -788,7 +892,9 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(OffsetNumber) * xldata->nChain; chainDest = (OffsetNumber *) ptr; - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (BufferIsValid(buffer)) @@ -857,7 +963,9 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(spgxlogVacuumRoot); toDelete = (OffsetNumber *) ptr; - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (BufferIsValid(buffer)) @@ -889,7 +997,20 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record) ptr += sizeof(spgxlogVacuumRedirect); itemToPlaceholder = (OffsetNumber *) ptr; - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + /* + * If any redirection tuples are being removed, make sure there are no + * live Hot Standby transactions that might need to see them. + */ + if (InHotStandby) + { + if (TransactionIdIsValid(xldata->newestRedirectXid)) + ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->node); + } + + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); @@ -954,36 +1075,6 @@ spg_redo(XLogRecPtr lsn, XLogRecord *record) uint8 info = record->xl_info & ~XLR_INFO_MASK; MemoryContext oldCxt; - /* - * If we have any conflict processing to do, it must happen before we - * update the page. - */ - if (InHotStandby) - { - switch (info) - { - case XLOG_SPGIST_VACUUM_REDIRECT: - { - spgxlogVacuumRedirect *xldata = - (spgxlogVacuumRedirect *) XLogRecGetData(record); - - /* - * If any redirection tuples are being removed, make sure - * there are no live Hot Standby transactions that might - * need to see them. - */ - if (TransactionIdIsValid(xldata->newestRedirectXid)) - ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, - xldata->node); - break; - } - default: - break; - } - } - - RestoreBkpBlocks(lsn, record, false); - oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { |