diff options
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 191 |
1 files changed, 101 insertions, 90 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 76029d9949a..4440499d48a 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -32,35 +32,48 @@ typedef struct static MemoryContext opCtx; /* working memory for operations */ /* - * Replay the clearing of F_FOLLOW_RIGHT flag. + * Replay the clearing of F_FOLLOW_RIGHT flag on a child page. + * + * Even if the WAL record includes a full-page image, we have to update the + * follow-right flag, because that change is not included in the full-page + * image. To be sure that the intermediate state with the wrong flag value is + * not visible to concurrent Hot Standby queries, this function handles + * restoring the full-page image as well as updating the flag. (Note that + * we never need to do anything else to the child page in the current WAL + * action.) */ static void -gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, - BlockNumber leftblkno) +gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index, + RelFileNode node, BlockNumber childblkno) { Buffer buffer; + Page page; - buffer = XLogReadBuffer(node, leftblkno, false); - if (BufferIsValid(buffer)) + if (record->xl_info & XLR_BKP_BLOCK(block_index)) + buffer = RestoreBackupBlock(lsn, record, block_index, false, true); + else { - Page page = (Page) BufferGetPage(buffer); + buffer = XLogReadBuffer(node, childblkno, false); + if (!BufferIsValid(buffer)) + return; /* page was deleted, nothing to do */ + } + page = (Page) BufferGetPage(buffer); - /* - * Note that we still update the page even if page LSN is equal to the - * LSN of this record, because the updated NSN is not included in the - * full page image. - */ - if (!XLByteLT(lsn, PageGetLSN(page))) - { - GistPageGetOpaque(page)->nsn = lsn; - GistClearFollowRight(page); + /* + * Note that we still update the page even if page LSN is equal to the LSN + * of this record, because the updated NSN is not included in the full + * page image. + */ + if (!XLByteLT(lsn, PageGetLSN(page))) + { + GistPageGetOpaque(page)->nsn = lsn; + GistClearFollowRight(page); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); } + UnlockReleaseBuffer(buffer); } /* @@ -75,18 +88,37 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) Page page; char *data; + /* + * We need to acquire and hold lock on target page while updating the left + * child page. If we have a full-page image of target page, getting the + * lock is a side-effect of restoring that image. Note that even if the + * target page no longer exists, we'll still attempt to replay the change + * on the child page. + */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + buffer = RestoreBackupBlock(lsn, record, 0, false, true); + else + buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); + + /* Fix follow-right data on left child page */ if (BlockNumberIsValid(xldata->leftchild)) - gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); + gistRedoClearFollowRight(lsn, record, 1, + xldata->node, xldata->leftchild); - /* nothing more to do if page was backed up (and no info to do it with) */ - if (record->xl_info & XLR_BKP_BLOCK_1) + /* Done if target page no longer exists */ + if (!BufferIsValid(buffer)) return; - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); - if (!BufferIsValid(buffer)) + /* nothing more to do if page was backed up (and no info to do it with) */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + UnlockReleaseBuffer(buffer); return; + } + page = (Page) BufferGetPage(buffer); + /* nothing more to do if change already applied */ if (XLByteLE(lsn, PageGetLSN(page))) { UnlockReleaseBuffer(buffer); @@ -140,13 +172,16 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) GistClearTuplesDeleted(page); } - if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) - + if (!GistPageIsLeaf(page) && + PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && + xldata->blkno == GIST_ROOT_BLKNO) + { /* * all links on non-leaf root page was deleted by vacuum full, so root * page becomes a leaf */ GistPageSetLeaf(page); + } GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; PageSetLSN(page, lsn); @@ -156,30 +191,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) } static void -gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) -{ - gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record); - Buffer buffer; - Page page; - - /* nothing else to do if page was backed up (and no info to do it with) */ - if (record->xl_info & XLR_BKP_BLOCK_1) - return; - - buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); - if (!BufferIsValid(buffer)) - return; - - page = (Page) BufferGetPage(buffer); - GistPageSetDeleted(page); - - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); -} - -static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), @@ -215,15 +226,22 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; + Buffer firstbuffer = InvalidBuffer; Buffer buffer; Page page; int i; bool isrootsplit = false; - if (BlockNumberIsValid(xldata->leftchild)) - gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); decodePageSplitRecord(&xlrec, record); + /* + * We must hold lock on the first-listed page throughout the action, + * including while updating the left child page (if any). We can unlock + * remaining pages in the list as soon as they've been written, because + * there is no path for concurrent queries to reach those pages without + * first visiting the first-listed page. + */ + /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { @@ -273,8 +291,20 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + + if (i == 0) + firstbuffer = buffer; + else + UnlockReleaseBuffer(buffer); } + + /* Fix follow-right data on left child page, if any */ + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(lsn, record, 0, + xldata->node, xldata->leftchild); + + /* Finally, release lock on the first page */ + UnlockReleaseBuffer(firstbuffer); } static void @@ -284,6 +314,9 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* Backup blocks are not used in create_index records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); @@ -308,7 +341,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) @@ -316,9 +348,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_GIST_PAGE_UPDATE: gistRedoPageUpdateRecord(lsn, record); break; - case XLOG_GIST_PAGE_DELETE: - gistRedoPageDeleteRecord(lsn, record); - break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; @@ -348,14 +377,6 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) } static void -out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec) -{ - appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u", - xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, - xlrec->blkno); -} - -static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); @@ -375,9 +396,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) appendStringInfo(buf, "page_update: "); out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); break; - case XLOG_GIST_PAGE_DELETE: - out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); - break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; @@ -498,37 +516,30 @@ gistXLogUpdate(RelFileNode node, Buffer buffer, Buffer leftchildbuf) { XLogRecData *rdata; - gistxlogPageUpdate *xlrec; + gistxlogPageUpdate xlrec; int cur, i; XLogRecPtr recptr; - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); - xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); - xlrec->node = node; - xlrec->blkno = BufferGetBlockNumber(buffer); - xlrec->ntodelete = ntodelete; - xlrec->leftchild = + xlrec.node = node; + xlrec.blkno = BufferGetBlockNumber(buffer); + xlrec.ntodelete = ntodelete; + xlrec.leftchild = BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; - rdata[0].buffer = buffer; - rdata[0].buffer_std = true; - rdata[0].data = NULL; - rdata[0].len = 0; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(gistxlogPageUpdate); + rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) xlrec; - rdata[1].len = sizeof(gistxlogPageUpdate); - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &(rdata[2]); - - rdata[2].data = (char *) todelete; - rdata[2].len = sizeof(OffsetNumber) * ntodelete; - rdata[2].buffer = buffer; - rdata[2].buffer_std = true; + rdata[1].data = (char *) todelete; + rdata[1].len = sizeof(OffsetNumber) * ntodelete; + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; - cur = 3; + cur = 2; /* new tuples */ for (i = 0; i < ituplen; i++) |