aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginxlog.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-08-13 15:39:08 +0300
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-09-02 15:10:28 +0300
commitf8f4227976a2cdb8ac7c611e49da03aa9e65e0d2 (patch)
tree92a7aa95ce72fbac48325761761821f8d7a742ed /src/backend/access/gin/ginxlog.c
parent26f8b99b248aae989e63ca0969a746f30b0c8c21 (diff)
downloadpostgresql-f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2.tar.gz
postgresql-f8f4227976a2cdb8ac7c611e49da03aa9e65e0d2.zip
Refactor per-page logic common to all redo routines to a new function.
Every redo routine uses the same idiom to determine what to do to a page: check if there's a backup block for it, and if not read, the buffer if the block exists, and check its LSN. Refactor that into a common function, XLogReadBufferForRedo, making all the redo routines shorter and more readable. This has no user-visible effect, and makes no changes to the WAL format. Reviewed by Andres Freund, Alvaro Herrera, Michael Paquier.
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r--src/backend/access/gin/ginxlog.c234
1 files changed, 88 insertions, 146 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index ffabf4e2594..d0553bb8f72 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -20,25 +20,25 @@
static MemoryContext opCtx; /* working memory for operations */
static void
-ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
+ginRedoClearIncompleteSplit(XLogRecPtr lsn, XLogRecord *record,
+ int block_index,
+ RelFileNode node, BlockNumber blkno)
{
Buffer buffer;
Page page;
- buffer = XLogReadBuffer(node, blkno, false);
- if (!BufferIsValid(buffer))
- return; /* page was deleted, nothing to do */
- page = (Page) BufferGetPage(buffer);
-
- if (lsn > PageGetLSN(page))
+ if (XLogReadBufferForRedo(lsn, record, block_index, node, blkno, &buffer)
+ == BLK_NEEDS_REDO)
{
+ page = (Page) BufferGetPage(buffer);
+
GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
-
- UnlockReleaseBuffer(buffer);
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -332,7 +332,6 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
Buffer buffer;
- Page page;
char *payload;
BlockNumber leftChildBlkno = InvalidBlockNumber;
BlockNumber rightChildBlkno = InvalidBlockNumber;
@@ -351,26 +350,14 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
payload += sizeof(BlockIdData);
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
- ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
+ ginRedoClearIncompleteSplit(lsn, record, 0, data->node, leftChildBlkno);
}
- /* If we have a full-page image, restore it and we're done */
- if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
+ if (XLogReadBufferForRedo(lsn, record, isLeaf ? 0 : 1, data->node,
+ data->blkno, &buffer) == BLK_NEEDS_REDO)
{
- (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
- return;
- }
-
- buffer = XLogReadBuffer(data->node, data->blkno, false);
- if (!BufferIsValid(buffer))
- return; /* page was deleted, nothing to do */
- page = (Page) BufferGetPage(buffer);
+ Page page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
/* How to insert the payload is tree-type specific */
if (data->flags & GIN_INSERT_ISDATA)
{
@@ -386,8 +373,8 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
-
- UnlockReleaseBuffer(buffer);
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -476,12 +463,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
* split
*/
if (!isLeaf)
- {
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
- ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
- }
+ ginRedoClearIncompleteSplit(lsn, record, 0, data->node, data->leftChildBlkno);
flags = 0;
if (isLeaf)
@@ -605,31 +587,21 @@ ginRedoVacuumDataLeafPage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetData(record);
Buffer buffer;
- Page page;
- /* If we have a full-page image, restore it and we're done */
- if (record->xl_info & XLR_BKP_BLOCK(0))
+ if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->blkno,
+ &buffer) == BLK_NEEDS_REDO)
{
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- return;
- }
-
- buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, false);
- if (!BufferIsValid(buffer))
- return;
- page = (Page) BufferGetPage(buffer);
+ Page page = BufferGetPage(buffer);
- Assert(GinPageIsLeaf(page));
- Assert(GinPageIsData(page));
+ Assert(GinPageIsLeaf(page));
+ Assert(GinPageIsData(page));
- if (lsn > PageGetLSN(page))
- {
ginRedoRecompress(page, &xlrec->data);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
-
- UnlockReleaseBuffer(buffer);
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -641,62 +613,42 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
Buffer lbuffer;
Page page;
- if (record->xl_info & XLR_BKP_BLOCK(0))
- dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->blkno, &dbuffer)
+ == BLK_NEEDS_REDO)
{
- dbuffer = XLogReadBuffer(data->node, data->blkno, false);
- if (BufferIsValid(dbuffer))
- {
- page = BufferGetPage(dbuffer);
- if (lsn > PageGetLSN(page))
- {
- Assert(GinPageIsData(page));
- GinPageGetOpaque(page)->flags = GIN_DELETED;
- PageSetLSN(page, lsn);
- MarkBufferDirty(dbuffer);
- }
- }
+ page = BufferGetPage(dbuffer);
+
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->flags = GIN_DELETED;
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(dbuffer);
}
- if (record->xl_info & XLR_BKP_BLOCK(1))
- pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
- else
+ if (XLogReadBufferForRedo(lsn, record, 1, data->node, data->parentBlkno,
+ &pbuffer) == BLK_NEEDS_REDO)
{
- pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
- if (BufferIsValid(pbuffer))
- {
- page = BufferGetPage(pbuffer);
- if (lsn > PageGetLSN(page))
- {
- Assert(GinPageIsData(page));
- Assert(!GinPageIsLeaf(page));
- GinPageDeletePostingItem(page, data->parentOffset);
- PageSetLSN(page, lsn);
- MarkBufferDirty(pbuffer);
- }
- }
+ page = BufferGetPage(pbuffer);
+
+ Assert(GinPageIsData(page));
+ Assert(!GinPageIsLeaf(page));
+ GinPageDeletePostingItem(page, data->parentOffset);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(pbuffer);
}
- if (record->xl_info & XLR_BKP_BLOCK(2))
- (void) RestoreBackupBlock(lsn, record, 2, false, false);
- else if (data->leftBlkno != InvalidBlockNumber)
+ if (XLogReadBufferForRedo(lsn, record, 2, data->node, data->leftBlkno,
+ &lbuffer) == BLK_NEEDS_REDO)
{
- lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
- if (BufferIsValid(lbuffer))
- {
- page = BufferGetPage(lbuffer);
- if (lsn > PageGetLSN(page))
- {
- Assert(GinPageIsData(page));
- GinPageGetOpaque(page)->rightlink = data->rightLink;
- PageSetLSN(page, lsn);
- MarkBufferDirty(lbuffer);
- }
- UnlockReleaseBuffer(lbuffer);
- }
+ page = BufferGetPage(lbuffer);
+
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->rightlink = data->rightLink;
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(lbuffer);
}
+ if (BufferIsValid(lbuffer))
+ UnlockReleaseBuffer(lbuffer);
if (BufferIsValid(pbuffer))
UnlockReleaseBuffer(pbuffer);
if (BufferIsValid(dbuffer))
@@ -730,74 +682,64 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
/*
* insert into tail page
*/
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, data->node,
+ data->metadata.tail, &buffer)
+ == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
- if (BufferIsValid(buffer))
- {
- Page page = BufferGetPage(buffer);
+ Page page = BufferGetPage(buffer);
+ OffsetNumber off;
+ int i;
+ Size tupsize;
+ IndexTuple tuples;
- if (lsn > PageGetLSN(page))
- {
- OffsetNumber l,
- off = (PageIsEmpty(page)) ? FirstOffsetNumber :
- OffsetNumberNext(PageGetMaxOffsetNumber(page));
- int i,
- tupsize;
- IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
+ tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
- for (i = 0; i < data->ntuples; i++)
- {
- tupsize = IndexTupleSize(tuples);
+ if (PageIsEmpty(page))
+ off = FirstOffsetNumber;
+ else
+ off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
- l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
+ for (i = 0; i < data->ntuples; i++)
+ {
+ tupsize = IndexTupleSize(tuples);
- if (l == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page");
+ if (PageAddItem(page, (Item) tuples, tupsize, off,
+ false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page");
- tuples = (IndexTuple) (((char *) tuples) + tupsize);
+ tuples = (IndexTuple) (((char *) tuples) + tupsize);
- off++;
- }
+ off++;
+ }
- /*
- * Increase counter of heap tuples
- */
- GinPageGetOpaque(page)->maxoff++;
+ /*
+ * Increase counter of heap tuples
+ */
+ GinPageGetOpaque(page)->maxoff++;
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
else if (data->prevTail != InvalidBlockNumber)
{
/*
* New tail
*/
- if (record->xl_info & XLR_BKP_BLOCK(0))
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
- else
+ if (XLogReadBufferForRedo(lsn, record, 0, data->node, data->prevTail,
+ &buffer) == BLK_NEEDS_REDO)
{
- buffer = XLogReadBuffer(data->node, data->prevTail, false);
- if (BufferIsValid(buffer))
- {
- Page page = BufferGetPage(buffer);
+ Page page = BufferGetPage(buffer);
- if (lsn > PageGetLSN(page))
- {
- GinPageGetOpaque(page)->rightlink = data->newRightlink;
+ GinPageGetOpaque(page)->rightlink = data->newRightlink;
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- UnlockReleaseBuffer(buffer);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
}
UnlockReleaseBuffer(metabuffer);