diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-08-13 15:39:08 +0300 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-09-02 15:10:28 +0300 |
commit | f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2 (patch) | |
tree | 92a7aa95ce72fbac48325761761821f8d7a742ed /src/backend/access/gin/ginxlog.c | |
parent | 26f8b99b248aae989e63ca0969a746f30b0c8c21 (diff) | |
download | postgresql-f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2.tar.gz postgresql-f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2.zip |
Refactor per-page logic common to all redo routines to a new function.
Every redo routine uses the same idiom to determine what to do to a page:
check if there's a backup block for it, and if not read, the buffer if the
block exists, and check its LSN. Refactor that into a common function,
XLogReadBufferForRedo, making all the redo routines shorter and more
readable.
This has no user-visible effect, and makes no changes to the WAL format.
Reviewed by Andres Freund, Alvaro Herrera, Michael Paquier.
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r-- | src/backend/access/gin/ginxlog.c | 234 |
1 files changed, 88 insertions, 146 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index ffabf4e2594..d0553bb8f72 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -20,25 +20,25 @@ static MemoryContext opCtx; /* working memory for operations */ static void -ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno) +ginRedoClearIncompleteSplit(XLogRecPtr lsn, XLogRecord *record, + int block_index, + RelFileNode node, BlockNumber blkno) { Buffer buffer; Page page; - buffer = XLogReadBuffer(node, blkno, false); - if (!BufferIsValid(buffer)) - return; /* page was deleted, nothing to do */ - page = (Page) BufferGetPage(buffer); - - if (lsn > PageGetLSN(page)) + if (XLogReadBufferForRedo(lsn, record, block_index, node, blkno, &buffer) + == BLK_NEEDS_REDO) { + page = (Page) BufferGetPage(buffer); + GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT; PageSetLSN(page, lsn); MarkBufferDirty(buffer); } - - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -332,7 +332,6 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) { ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record); Buffer buffer; - Page page; char *payload; BlockNumber leftChildBlkno = InvalidBlockNumber; BlockNumber rightChildBlkno = InvalidBlockNumber; @@ -351,26 +350,14 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload); payload += sizeof(BlockIdData); - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else - ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno); + ginRedoClearIncompleteSplit(lsn, record, 0, data->node, leftChildBlkno); } - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1)) + if (XLogReadBufferForRedo(lsn, record, isLeaf ? 0 : 1, data->node, + data->blkno, &buffer) == BLK_NEEDS_REDO) { - (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false); - return; - } - - buffer = XLogReadBuffer(data->node, data->blkno, false); - if (!BufferIsValid(buffer)) - return; /* page was deleted, nothing to do */ - page = (Page) BufferGetPage(buffer); + Page page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { /* How to insert the payload is tree-type specific */ if (data->flags & GIN_INSERT_ISDATA) { @@ -386,8 +373,8 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); MarkBufferDirty(buffer); } - - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -476,12 +463,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) * split */ if (!isLeaf) - { - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else - ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno); - } + ginRedoClearIncompleteSplit(lsn, record, 0, data->node, data->leftChildBlkno); flags = 0; if (isLeaf) @@ -605,31 +587,21 @@ ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record) { ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record); Buffer buffer; - Page page; - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) + if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->blkno, + &buffer) == BLK_NEEDS_REDO) { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - - buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); + Page page = BufferGetPage(buffer); - Assert(GinPageIsLeaf(page)); - Assert(GinPageIsData(page)); + Assert(GinPageIsLeaf(page)); + Assert(GinPageIsData(page)); - if (lsn > PageGetLSN(page)) - { ginRedoRecompress(page, &xlrec->data); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } - - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -641,62 +613,42 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) Buffer lbuffer; Page page; - if (record->xl_info & XLR_BKP_BLOCK(0)) - dbuffer = RestoreBackupBlock(lsn, record, 0, false, true); - else + if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->blkno, &dbuffer) + == BLK_NEEDS_REDO) { - dbuffer = XLogReadBuffer(data->node, data->blkno, false); - if (BufferIsValid(dbuffer)) - { - page = BufferGetPage(dbuffer); - if (lsn > PageGetLSN(page)) - { - Assert(GinPageIsData(page)); - GinPageGetOpaque(page)->flags = GIN_DELETED; - PageSetLSN(page, lsn); - MarkBufferDirty(dbuffer); - } - } + page = BufferGetPage(dbuffer); + + Assert(GinPageIsData(page)); + GinPageGetOpaque(page)->flags = GIN_DELETED; + PageSetLSN(page, lsn); + MarkBufferDirty(dbuffer); } - if (record->xl_info & XLR_BKP_BLOCK(1)) - pbuffer = RestoreBackupBlock(lsn, record, 1, false, true); - else + if (XLogReadBufferForRedo(lsn, record, 1, data->node, data->parentBlkno, + &pbuffer) == BLK_NEEDS_REDO) { - pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false); - if (BufferIsValid(pbuffer)) - { - page = BufferGetPage(pbuffer); - if (lsn > PageGetLSN(page)) - { - Assert(GinPageIsData(page)); - Assert(!GinPageIsLeaf(page)); - GinPageDeletePostingItem(page, data->parentOffset); - PageSetLSN(page, lsn); - MarkBufferDirty(pbuffer); - } - } + page = BufferGetPage(pbuffer); + + Assert(GinPageIsData(page)); + Assert(!GinPageIsLeaf(page)); + GinPageDeletePostingItem(page, data->parentOffset); + PageSetLSN(page, lsn); + MarkBufferDirty(pbuffer); } - if (record->xl_info & XLR_BKP_BLOCK(2)) - (void) RestoreBackupBlock(lsn, record, 2, false, false); - else if (data->leftBlkno != InvalidBlockNumber) + if (XLogReadBufferForRedo(lsn, record, 2, data->node, data->leftBlkno, + &lbuffer) == BLK_NEEDS_REDO) { - lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false); - if (BufferIsValid(lbuffer)) - { - page = BufferGetPage(lbuffer); - if (lsn > PageGetLSN(page)) - { - Assert(GinPageIsData(page)); - GinPageGetOpaque(page)->rightlink = data->rightLink; - PageSetLSN(page, lsn); - MarkBufferDirty(lbuffer); - } - UnlockReleaseBuffer(lbuffer); - } + page = BufferGetPage(lbuffer); + + Assert(GinPageIsData(page)); + GinPageGetOpaque(page)->rightlink = data->rightLink; + PageSetLSN(page, lsn); + MarkBufferDirty(lbuffer); } + if (BufferIsValid(lbuffer)) + UnlockReleaseBuffer(lbuffer); if (BufferIsValid(pbuffer)) UnlockReleaseBuffer(pbuffer); if (BufferIsValid(dbuffer)) @@ -730,74 +682,64 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) /* * insert into tail page */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, data->node, + data->metadata.tail, &buffer) + == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(data->node, data->metadata.tail, false); - if (BufferIsValid(buffer)) - { - Page page = BufferGetPage(buffer); + Page page = BufferGetPage(buffer); + OffsetNumber off; + int i; + Size tupsize; + IndexTuple tuples; - if (lsn > PageGetLSN(page)) - { - OffsetNumber l, - off = (PageIsEmpty(page)) ? FirstOffsetNumber : - OffsetNumberNext(PageGetMaxOffsetNumber(page)); - int i, - tupsize; - IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta)); + tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta)); - for (i = 0; i < data->ntuples; i++) - { - tupsize = IndexTupleSize(tuples); + if (PageIsEmpty(page)) + off = FirstOffsetNumber; + else + off = OffsetNumberNext(PageGetMaxOffsetNumber(page)); - l = PageAddItem(page, (Item) tuples, tupsize, off, false, false); + for (i = 0; i < data->ntuples; i++) + { + tupsize = IndexTupleSize(tuples); - if (l == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page"); + if (PageAddItem(page, (Item) tuples, tupsize, off, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add item to index page"); - tuples = (IndexTuple) (((char *) tuples) + tupsize); + tuples = (IndexTuple) (((char *) tuples) + tupsize); - off++; - } + off++; + } - /* - * Increase counter of heap tuples - */ - GinPageGetOpaque(page)->maxoff++; + /* + * Increase counter of heap tuples + */ + GinPageGetOpaque(page)->maxoff++; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } else if (data->prevTail != InvalidBlockNumber) { /* * New tail */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - (void) RestoreBackupBlock(lsn, record, 0, false, false); - else + if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->prevTail, + &buffer) == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(data->node, data->prevTail, false); - if (BufferIsValid(buffer)) - { - Page page = BufferGetPage(buffer); + Page page = BufferGetPage(buffer); - if (lsn > PageGetLSN(page)) - { - GinPageGetOpaque(page)->rightlink = data->newRightlink; + GinPageGetOpaque(page)->rightlink = data->newRightlink; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - UnlockReleaseBuffer(buffer); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } UnlockReleaseBuffer(metabuffer); |