aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gistxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r--src/backend/access/gist/gistxlog.c191
1 files changed, 101 insertions, 90 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 76029d9949a..4440499d48a 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -32,35 +32,48 @@ typedef struct
static MemoryContext opCtx; /* working memory for operations */
/*
- * Replay the clearing of F_FOLLOW_RIGHT flag.
+ * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
+ *
+ * Even if the WAL record includes a full-page image, we have to update the
+ * follow-right flag, because that change is not included in the full-page
+ * image. To be sure that the intermediate state with the wrong flag value is
+ * not visible to concurrent Hot Standby queries, this function handles
+ * restoring the full-page image as well as updating the flag. (Note that
+ * we never need to do anything else to the child page in the current WAL
+ * action.)
*/
static void
-gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
- BlockNumber leftblkno)
+gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
+ RelFileNode node, BlockNumber childblkno)
{
Buffer buffer;
+ Page page;
- buffer = XLogReadBuffer(node, leftblkno, false);
- if (BufferIsValid(buffer))
+ if (record->xl_info & XLR_BKP_BLOCK(block_index))
+ buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
+ else
{
- Page page = (Page) BufferGetPage(buffer);
+ buffer = XLogReadBuffer(node, childblkno, false);
+ if (!BufferIsValid(buffer))
+ return; /* page was deleted, nothing to do */
+ }
+ page = (Page) BufferGetPage(buffer);
- /*
- * Note that we still update the page even if page LSN is equal to the
- * LSN of this record, because the updated NSN is not included in the
- * full page image.
- */
- if (!XLByteLT(lsn, PageGetLSN(page)))
- {
- GistPageGetOpaque(page)->nsn = lsn;
- GistClearFollowRight(page);
+ /*
+ * Note that we still update the page even if page LSN is equal to the LSN
+ * of this record, because the updated NSN is not included in the full
+ * page image.
+ */
+ if (!XLByteLT(lsn, PageGetLSN(page)))
+ {
+ GistPageGetOpaque(page)->nsn = lsn;
+ GistClearFollowRight(page);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
}
+ UnlockReleaseBuffer(buffer);
}
/*
@@ -75,18 +88,37 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
Page page;
char *data;
+ /*
+ * We need to acquire and hold lock on target page while updating the left
+ * child page. If we have a full-page image of target page, getting the
+ * lock is a side-effect of restoring that image. Note that even if the
+ * target page no longer exists, we'll still attempt to replay the change
+ * on the child page.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ buffer = RestoreBackupBlock(lsn, record, 0, false, true);
+ else
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
+
+ /* Fix follow-right data on left child page */
if (BlockNumberIsValid(xldata->leftchild))
- gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
+ gistRedoClearFollowRight(lsn, record, 1,
+ xldata->node, xldata->leftchild);
- /* nothing more to do if page was backed up (and no info to do it with) */
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* Done if target page no longer exists */
+ if (!BufferIsValid(buffer))
return;
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (!BufferIsValid(buffer))
+ /* nothing more to do if page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ UnlockReleaseBuffer(buffer);
return;
+ }
+
page = (Page) BufferGetPage(buffer);
+ /* nothing more to do if change already applied */
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockReleaseBuffer(buffer);
@@ -140,13 +172,16 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
GistClearTuplesDeleted(page);
}
- if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
-
+ if (!GistPageIsLeaf(page) &&
+ PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
+ xldata->blkno == GIST_ROOT_BLKNO)
+ {
/*
* all links on non-leaf root page was deleted by vacuum full, so root
* page becomes a leaf
*/
GistPageSetLeaf(page);
+ }
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn);
@@ -156,30 +191,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
}
static void
-gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
-{
- gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
-
- /* nothing else to do if page was backed up (and no info to do it with) */
- if (record->xl_info & XLR_BKP_BLOCK_1)
- return;
-
- buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
- if (!BufferIsValid(buffer))
- return;
-
- page = (Page) BufferGetPage(buffer);
- GistPageSetDeleted(page);
-
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
-}
-
-static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
@@ -215,15 +226,22 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
PageSplitRecord xlrec;
+ Buffer firstbuffer = InvalidBuffer;
Buffer buffer;
Page page;
int i;
bool isrootsplit = false;
- if (BlockNumberIsValid(xldata->leftchild))
- gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
decodePageSplitRecord(&xlrec, record);
+ /*
+ * We must hold lock on the first-listed page throughout the action,
+ * including while updating the left child page (if any). We can unlock
+ * remaining pages in the list as soon as they've been written, because
+ * there is no path for concurrent queries to reach those pages without
+ * first visiting the first-listed page.
+ */
+
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
@@ -273,8 +291,20 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+
+ if (i == 0)
+ firstbuffer = buffer;
+ else
+ UnlockReleaseBuffer(buffer);
}
+
+ /* Fix follow-right data on left child page, if any */
+ if (BlockNumberIsValid(xldata->leftchild))
+ gistRedoClearFollowRight(lsn, record, 0,
+ xldata->node, xldata->leftchild);
+
+ /* Finally, release lock on the first page */
+ UnlockReleaseBuffer(firstbuffer);
}
static void
@@ -284,6 +314,9 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* Backup blocks are not used in create_index records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
@@ -308,7 +341,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
- RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
@@ -316,9 +348,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record);
break;
- case XLOG_GIST_PAGE_DELETE:
- gistRedoPageDeleteRecord(lsn, record);
- break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
@@ -348,14 +377,6 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
}
static void
-out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
-{
- appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
- xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
- xlrec->blkno);
-}
-
-static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_split: ");
@@ -375,9 +396,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "page_update: ");
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break;
- case XLOG_GIST_PAGE_DELETE:
- out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
- break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break;
@@ -498,37 +516,30 @@ gistXLogUpdate(RelFileNode node, Buffer buffer,
Buffer leftchildbuf)
{
XLogRecData *rdata;
- gistxlogPageUpdate *xlrec;
+ gistxlogPageUpdate xlrec;
int cur,
i;
XLogRecPtr recptr;
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
- xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
+ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
- xlrec->node = node;
- xlrec->blkno = BufferGetBlockNumber(buffer);
- xlrec->ntodelete = ntodelete;
- xlrec->leftchild =
+ xlrec.node = node;
+ xlrec.blkno = BufferGetBlockNumber(buffer);
+ xlrec.ntodelete = ntodelete;
+ xlrec.leftchild =
BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
- rdata[0].buffer = buffer;
- rdata[0].buffer_std = true;
- rdata[0].data = NULL;
- rdata[0].len = 0;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(gistxlogPageUpdate);
+ rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) xlrec;
- rdata[1].len = sizeof(gistxlogPageUpdate);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &(rdata[2]);
-
- rdata[2].data = (char *) todelete;
- rdata[2].len = sizeof(OffsetNumber) * ntodelete;
- rdata[2].buffer = buffer;
- rdata[2].buffer_std = true;
+ rdata[1].data = (char *) todelete;
+ rdata[1].len = sizeof(OffsetNumber) * ntodelete;
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
- cur = 3;
+ cur = 2;
/* new tuples */
for (i = 0; i < ituplen; i++)