aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r--src/backend/access/spgist/spgxlog.c275
1 files changed, 183 insertions, 92 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index 54e78f18b54..8746b353080 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -76,6 +76,9 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* Backup blocks are not used in create_index records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
@@ -117,7 +120,14 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
ptr += sizeof(spgxlogAddLeaf);
leafTuple = (SpGistLeafTuple) ptr;
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ /*
+ * In normal operation we would have both current and parent pages locked
+ * simultaneously; but in WAL replay it should be safe to update the leaf
+ * page before updating the parent.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
xldata->newPage);
@@ -169,8 +179,9 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
}
/* update parent downlink if necessary */
- if (xldata->blknoParent != InvalidBlockNumber &&
- !(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ else if (xldata->blknoParent != InvalidBlockNumber)
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
if (BufferIsValid(buffer))
@@ -219,8 +230,16 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
/* now ptr points to the list of leaf tuples */
+ /*
+ * In normal operation we would have all three pages (source, dest, and
+ * parent) locked simultaneously; but in WAL replay it should be safe to
+ * update them one at a time, as long as we do it in the right order.
+ */
+
/* Insert tuples on the dest page (do first, so redirect is valid) */
- if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
xldata->newPage);
@@ -253,7 +272,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
}
/* Delete tuples from the source page, inserting a redirection pointer */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
if (BufferIsValid(buffer))
@@ -276,7 +297,9 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
}
/* And update the parent downlink */
- if (!(record->xl_info & XLR_BKP_BLOCK_3))
+ if (record->xl_info & XLR_BKP_BLOCK(2))
+ (void) RestoreBackupBlock(lsn, record, 2, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
if (BufferIsValid(buffer))
@@ -322,7 +345,9 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
{
/* update in place */
Assert(xldata->blknoParent == InvalidBlockNumber);
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (BufferIsValid(buffer))
@@ -347,8 +372,22 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
}
else
{
+ /*
+ * In normal operation we would have all three pages (source, dest,
+ * and parent) locked simultaneously; but in WAL replay it should be
+ * safe to update them one at a time, as long as we do it in the right
+ * order.
+ *
+ * The logic here depends on the assumption that blkno != blknoNew,
+ * else we can't tell which BKP bit goes with which page, and the LSN
+ * checks could go wrong too.
+ */
+ Assert(xldata->blkno != xldata->blknoNew);
+
/* Install new tuple first so redirect is valid */
- if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
xldata->newPage);
@@ -365,8 +404,17 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
addOrReplaceTuple(page, (Item) innerTuple,
innerTuple->size, xldata->offnumNew);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
+ /*
+ * If parent is in this same page, don't advance LSN;
+ * doing so would fool us into not applying the parent
+ * downlink update below. We'll update the LSN when we
+ * fix the parent downlink.
+ */
+ if (xldata->blknoParent != xldata->blknoNew)
+ {
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ }
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
@@ -374,7 +422,9 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
}
/* Delete old tuple, replacing it with redirect or placeholder tuple */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (BufferIsValid(buffer))
@@ -405,8 +455,17 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
else
SpGistPageGetOpaque(page)->nRedirection++;
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
+ /*
+ * If parent is in this same page, don't advance LSN;
+ * doing so would fool us into not applying the parent
+ * downlink update below. We'll update the LSN when we
+ * fix the parent downlink.
+ */
+ if (xldata->blknoParent != xldata->blkno)
+ {
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ }
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
@@ -425,7 +484,12 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
else
bbi = 2;
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ {
+ if (bbi == 2) /* else we already did it */
+ (void) RestoreBackupBlock(lsn, record, bbi, false, false);
+ }
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
if (BufferIsValid(buffer))
@@ -467,9 +531,16 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
ptr += prefixTuple->size;
postfixTuple = (SpGistInnerTuple) ptr;
+ /*
+ * In normal operation we would have both pages locked simultaneously; but
+ * in WAL replay it should be safe to update them one at a time, as long
+ * as we do it in the right order.
+ */
+
/* insert postfix tuple first to avoid dangling link */
- if (xldata->blknoPostfix != xldata->blknoPrefix &&
- !(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ else if (xldata->blknoPostfix != xldata->blknoPrefix)
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
xldata->newPage);
@@ -495,7 +566,9 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
}
/* now handle the original page */
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
if (BufferIsValid(buffer))
@@ -535,6 +608,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
uint8 *leafPageSelect;
Buffer srcBuffer;
Buffer destBuffer;
+ Page srcPage;
+ Page destPage;
Page page;
int bbi;
int i;
@@ -563,13 +638,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
{
/* when splitting root, we touch it only in the guise of new inner */
srcBuffer = InvalidBuffer;
+ srcPage = NULL;
}
else if (xldata->initSrc)
{
/* just re-init the source page */
srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
Assert(BufferIsValid(srcBuffer));
- page = (Page) BufferGetPage(srcBuffer);
+ srcPage = (Page) BufferGetPage(srcBuffer);
SpGistInitBuffer(srcBuffer,
SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
@@ -577,14 +653,24 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
else
{
- /* delete the specified tuples from source page */
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ /*
+ * Delete the specified tuples from source page. (In case we're in
+ * Hot Standby, we need to hold lock on the page till we're done
+ * inserting leaf tuples and the new inner tuple, else the added
+ * redirect tuple will be a dangling link.)
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ {
+ srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
+ srcPage = NULL; /* don't need to do any page updates */
+ }
+ else
{
srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
if (BufferIsValid(srcBuffer))
{
- page = BufferGetPage(srcBuffer);
- if (!XLByteLE(lsn, PageGetLSN(page)))
+ srcPage = BufferGetPage(srcBuffer);
+ if (!XLByteLE(lsn, PageGetLSN(srcPage)))
{
/*
* We have it a bit easier here than in doPickSplit(),
@@ -592,14 +678,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
* we can inject the correct redirection tuple now.
*/
if (!state.isBuild)
- spgPageIndexMultiDelete(&state, page,
+ spgPageIndexMultiDelete(&state, srcPage,
toDelete, xldata->nDelete,
SPGIST_REDIRECT,
SPGIST_PLACEHOLDER,
xldata->blknoInner,
xldata->offnumInner);
else
- spgPageIndexMultiDelete(&state, page,
+ spgPageIndexMultiDelete(&state, srcPage,
toDelete, xldata->nDelete,
SPGIST_PLACEHOLDER,
SPGIST_PLACEHOLDER,
@@ -608,10 +694,12 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
/* don't update LSN etc till we're done with it */
}
+ else
+ srcPage = NULL; /* don't do any page updates */
}
+ else
+ srcPage = NULL;
}
- else
- srcBuffer = InvalidBuffer;
bbi++;
}
@@ -619,13 +707,14 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
if (xldata->blknoDest == InvalidBlockNumber)
{
destBuffer = InvalidBuffer;
+ destPage = NULL;
}
else if (xldata->initDest)
{
/* just re-init the dest page */
destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
Assert(BufferIsValid(destBuffer));
- page = (Page) BufferGetPage(destBuffer);
+ destPage = (Page) BufferGetPage(destBuffer);
SpGistInitBuffer(destBuffer,
SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
@@ -633,10 +722,27 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
else
{
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
- destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
+ /*
+ * We could probably release the page lock immediately in the
+ * full-page-image case, but for safety let's hold it till later.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ {
+ destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
+ destPage = NULL; /* don't need to do any page updates */
+ }
else
- destBuffer = InvalidBuffer;
+ {
+ destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
+ if (BufferIsValid(destBuffer))
+ {
+ destPage = (Page) BufferGetPage(destBuffer);
+ if (XLByteLE(lsn, PageGetLSN(destPage)))
+ destPage = NULL; /* don't do any page updates */
+ }
+ else
+ destPage = NULL;
+ }
bbi++;
}
@@ -644,47 +750,34 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
for (i = 0; i < xldata->nInsert; i++)
{
SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
- Buffer leafBuffer;
ptr += lt->size;
- leafBuffer = leafPageSelect[i] ? destBuffer : srcBuffer;
- if (!BufferIsValid(leafBuffer))
+ page = leafPageSelect[i] ? destPage : srcPage;
+ if (page == NULL)
continue; /* no need to touch this page */
- page = BufferGetPage(leafBuffer);
- if (!XLByteLE(lsn, PageGetLSN(page)))
- {
- addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
- }
+ addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
}
- /* Now update src and dest page LSNs */
- if (BufferIsValid(srcBuffer))
+ /* Now update src and dest page LSNs if needed */
+ if (srcPage != NULL)
{
- page = BufferGetPage(srcBuffer);
- if (!XLByteLE(lsn, PageGetLSN(page)))
- {
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(srcBuffer);
- }
- UnlockReleaseBuffer(srcBuffer);
+ PageSetLSN(srcPage, lsn);
+ PageSetTLI(srcPage, ThisTimeLineID);
+ MarkBufferDirty(srcBuffer);
}
- if (BufferIsValid(destBuffer))
+ if (destPage != NULL)
{
- page = BufferGetPage(destBuffer);
- if (!XLByteLE(lsn, PageGetLSN(page)))
- {
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(destBuffer);
- }
- UnlockReleaseBuffer(destBuffer);
+ PageSetLSN(destPage, lsn);
+ PageSetTLI(destPage, ThisTimeLineID);
+ MarkBufferDirty(destBuffer);
}
/* restore new inner tuple */
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ (void) RestoreBackupBlock(lsn, record, bbi, false, false);
+ else
{
Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
xldata->initInner);
@@ -722,6 +815,15 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
bbi++;
+ /*
+ * Now we can release the leaf-page locks. It's okay to do this before
+ * updating the parent downlink.
+ */
+ if (BufferIsValid(srcBuffer))
+ UnlockReleaseBuffer(srcBuffer);
+ if (BufferIsValid(destBuffer))
+ UnlockReleaseBuffer(destBuffer);
+
/* update parent downlink, unless we did it above */
if (xldata->blknoParent == InvalidBlockNumber)
{
@@ -730,7 +832,9 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
}
else if (xldata->blknoInner != xldata->blknoParent)
{
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(bbi)))
+ if (record->xl_info & XLR_BKP_BLOCK(bbi))
+ (void) RestoreBackupBlock(lsn, record, bbi, false, false);
+ else
{
Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
@@ -788,7 +892,9 @@ spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
ptr += sizeof(OffsetNumber) * xldata->nChain;
chainDest = (OffsetNumber *) ptr;
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (BufferIsValid(buffer))
@@ -857,7 +963,9 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
ptr += sizeof(spgxlogVacuumRoot);
toDelete = (OffsetNumber *) ptr;
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (BufferIsValid(buffer))
@@ -889,7 +997,20 @@ spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
ptr += sizeof(spgxlogVacuumRedirect);
itemToPlaceholder = (OffsetNumber *) ptr;
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ /*
+ * If any redirection tuples are being removed, make sure there are no
+ * live Hot Standby transactions that might need to see them.
+ */
+ if (InHotStandby)
+ {
+ if (TransactionIdIsValid(xldata->newestRedirectXid))
+ ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+ xldata->node);
+ }
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
@@ -954,36 +1075,6 @@ spg_redo(XLogRecPtr lsn, XLogRecord *record)
uint8 info = record->xl_info & ~XLR_INFO_MASK;
MemoryContext oldCxt;
- /*
- * If we have any conflict processing to do, it must happen before we
- * update the page.
- */
- if (InHotStandby)
- {
- switch (info)
- {
- case XLOG_SPGIST_VACUUM_REDIRECT:
- {
- spgxlogVacuumRedirect *xldata =
- (spgxlogVacuumRedirect *) XLogRecGetData(record);
-
- /*
- * If any redirection tuples are being removed, make sure
- * there are no live Hot Standby transactions that might
- * need to see them.
- */
- if (TransactionIdIsValid(xldata->newestRedirectXid))
- ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
- xldata->node);
- break;
- }
- default:
- break;
- }
- }
-
- RestoreBkpBlocks(lsn, record, false);
-
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{