diff options
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r-- | src/backend/access/gin/ginxlog.c | 121 |
1 files changed, 85 insertions, 36 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index 250619cb2c1..4536c9c63dc 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -77,6 +77,9 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) MetaBuffer; Page page; + /* Backup blocks are not used in create_index records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true); Assert(BufferIsValid(MetaBuffer)); page = (Page) BufferGetPage(MetaBuffer); @@ -109,6 +112,9 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* Backup blocks are not used in create_ptree records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + buffer = XLogReadBuffer(data->node, data->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); @@ -159,9 +165,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) } } - /* nothing else to do if page was backed up */ - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(data->node, data->blkno, false); if (!BufferIsValid(buffer)) @@ -256,6 +265,9 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (data->isData) flags |= GIN_DATA; + /* Backup blocks are not used in split records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + lbuffer = XLogReadBuffer(data->node, data->lblkno, true); Assert(BufferIsValid(lbuffer)); lpage = (Page) BufferGetPage(lbuffer); @@ -369,9 +381,12 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; - /* nothing to do if page was backed up (and no info to do it with) */ - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(data->node, data->blkno, false); if (!BufferIsValid(buffer)) @@ -420,33 +435,38 @@ static void ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) { ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record); - Buffer buffer; + Buffer dbuffer; + Buffer pbuffer; + Buffer lbuffer; Page page; - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + dbuffer = RestoreBackupBlock(lsn, record, 0, false, true); + else { - buffer = XLogReadBuffer(data->node, data->blkno, false); - if (BufferIsValid(buffer)) + dbuffer = XLogReadBuffer(data->node, data->blkno, false); + if (BufferIsValid(dbuffer)) { - page = BufferGetPage(buffer); + page = BufferGetPage(dbuffer); if (!XLByteLE(lsn, PageGetLSN(page))) { Assert(GinPageIsData(page)); GinPageGetOpaque(page)->flags = GIN_DELETED; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); + MarkBufferDirty(dbuffer); } - UnlockReleaseBuffer(buffer); } } - if (!(record->xl_info & XLR_BKP_BLOCK_2)) + if (record->xl_info & XLR_BKP_BLOCK(1)) + pbuffer = RestoreBackupBlock(lsn, record, 1, false, true); + else { - buffer = XLogReadBuffer(data->node, data->parentBlkno, false); - if (BufferIsValid(buffer)) + pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false); + if (BufferIsValid(pbuffer)) { - page = BufferGetPage(buffer); + page = BufferGetPage(pbuffer); if (!XLByteLE(lsn, PageGetLSN(page))) { Assert(GinPageIsData(page)); @@ -454,29 +474,35 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) GinPageDeletePostingItem(page, data->parentOffset); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); + MarkBufferDirty(pbuffer); } - UnlockReleaseBuffer(buffer); } } - if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) + if (record->xl_info & XLR_BKP_BLOCK(2)) + (void) RestoreBackupBlock(lsn, record, 2, false, false); + else if (data->leftBlkno != InvalidBlockNumber) { - buffer = XLogReadBuffer(data->node, data->leftBlkno, false); - if (BufferIsValid(buffer)) + lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false); + if (BufferIsValid(lbuffer)) { - page = BufferGetPage(buffer); + page = BufferGetPage(lbuffer); if (!XLByteLE(lsn, PageGetLSN(page))) { Assert(GinPageIsData(page)); GinPageGetOpaque(page)->rightlink = data->rightLink; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); + MarkBufferDirty(lbuffer); } - UnlockReleaseBuffer(buffer); + UnlockReleaseBuffer(lbuffer); } } + + if (BufferIsValid(pbuffer)) + UnlockReleaseBuffer(pbuffer); + if (BufferIsValid(dbuffer)) + UnlockReleaseBuffer(dbuffer); } static void @@ -505,7 +531,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) /* * insert into tail page */ - if (!(record->xl_info & XLR_BKP_BLOCK_1)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { buffer = XLogReadBuffer(data->node, data->metadata.tail, false); if (BufferIsValid(buffer)) @@ -553,20 +581,25 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) /* * New tail */ - buffer = XLogReadBuffer(data->node, data->prevTail, false); - if (BufferIsValid(buffer)) + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else { - Page page = BufferGetPage(buffer); - - if (!XLByteLE(lsn, PageGetLSN(page))) + buffer = XLogReadBuffer(data->node, data->prevTail, false); + if (BufferIsValid(buffer)) { - GinPageGetOpaque(page)->rightlink = data->newRightlink; + Page page = BufferGetPage(buffer); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + GinPageGetOpaque(page)->rightlink = data->newRightlink; + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); } - UnlockReleaseBuffer(buffer); } } @@ -585,8 +618,12 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) tupsize; IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage)); - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(data->node, data->blkno, true); Assert(BufferIsValid(buffer)); @@ -632,6 +669,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) Page metapage; int i; + /* Backup blocks are not used in delete_listpage records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); if (!BufferIsValid(metabuffer)) return; /* assume index was deleted, nothing to do */ @@ -645,6 +685,16 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) MarkBufferDirty(metabuffer); } + /* + * In normal operation, shiftList() takes exclusive lock on all the + * pages-to-be-deleted simultaneously. During replay, however, it should + * be all right to lock them one at a time. This is dependent on the fact + * that we are deleting pages from the head of the list, and that readers + * share-lock the next page before releasing the one they are on. So we + * cannot get past a reader that is on, or due to visit, any page we are + * going to delete. New incoming readers will block behind our metapage + * lock and then see a fully updated page list. + */ for (i = 0; i < data->ndeleted; i++) { Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false); @@ -678,7 +728,6 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization as we have in b-tree, and remove * killed tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); topCtx = MemoryContextSwitchTo(opCtx); switch (info) |