aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r--src/backend/access/gin/ginxlog.c121
1 files changed, 85 insertions, 36 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index 250619cb2c1..4536c9c63dc 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -77,6 +77,9 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
MetaBuffer;
Page page;
+ /* Backup blocks are not used in create_index records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
Assert(BufferIsValid(MetaBuffer));
page = (Page) BufferGetPage(MetaBuffer);
@@ -109,6 +112,9 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* Backup blocks are not used in create_ptree records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
@@ -159,9 +165,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
}
}
- /* nothing else to do if page was backed up */
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer))
@@ -256,6 +265,9 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isData)
flags |= GIN_DATA;
+ /* Backup blocks are not used in split records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer);
@@ -369,9 +381,12 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
- /* nothing to do if page was backed up (and no info to do it with) */
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer))
@@ -420,33 +435,38 @@ static void
ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
- Buffer buffer;
+ Buffer dbuffer;
+ Buffer pbuffer;
+ Buffer lbuffer;
Page page;
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
+ else
{
- buffer = XLogReadBuffer(data->node, data->blkno, false);
- if (BufferIsValid(buffer))
+ dbuffer = XLogReadBuffer(data->node, data->blkno, false);
+ if (BufferIsValid(dbuffer))
{
- page = BufferGetPage(buffer);
+ page = BufferGetPage(dbuffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ MarkBufferDirty(dbuffer);
}
- UnlockReleaseBuffer(buffer);
}
}
- if (!(record->xl_info & XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
+ else
{
- buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
- if (BufferIsValid(buffer))
+ pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
+ if (BufferIsValid(pbuffer))
{
- page = BufferGetPage(buffer);
+ page = BufferGetPage(pbuffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page));
@@ -454,29 +474,35 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
GinPageDeletePostingItem(page, data->parentOffset);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ MarkBufferDirty(pbuffer);
}
- UnlockReleaseBuffer(buffer);
}
}
- if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
+ if (record->xl_info & XLR_BKP_BLOCK(2))
+ (void) RestoreBackupBlock(lsn, record, 2, false, false);
+ else if (data->leftBlkno != InvalidBlockNumber)
{
- buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
- if (BufferIsValid(buffer))
+ lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
+ if (BufferIsValid(lbuffer))
{
- page = BufferGetPage(buffer);
+ page = BufferGetPage(lbuffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ MarkBufferDirty(lbuffer);
}
- UnlockReleaseBuffer(buffer);
+ UnlockReleaseBuffer(lbuffer);
}
}
+
+ if (BufferIsValid(pbuffer))
+ UnlockReleaseBuffer(pbuffer);
+ if (BufferIsValid(dbuffer))
+ UnlockReleaseBuffer(dbuffer);
}
static void
@@ -505,7 +531,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
/*
* insert into tail page
*/
- if (!(record->xl_info & XLR_BKP_BLOCK_1))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
if (BufferIsValid(buffer))
@@ -553,20 +581,25 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
/*
* New tail
*/
- buffer = XLogReadBuffer(data->node, data->prevTail, false);
- if (BufferIsValid(buffer))
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
{
- Page page = BufferGetPage(buffer);
-
- if (!XLByteLE(lsn, PageGetLSN(page)))
+ buffer = XLogReadBuffer(data->node, data->prevTail, false);
+ if (BufferIsValid(buffer))
{
- GinPageGetOpaque(page)->rightlink = data->newRightlink;
+ Page page = BufferGetPage(buffer);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ GinPageGetOpaque(page)->rightlink = data->newRightlink;
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
}
- UnlockReleaseBuffer(buffer);
}
}
@@ -585,8 +618,12 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
tupsize;
IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer));
@@ -632,6 +669,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
Page metapage;
int i;
+ /* Backup blocks are not used in delete_listpage records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
if (!BufferIsValid(metabuffer))
return; /* assume index was deleted, nothing to do */
@@ -645,6 +685,16 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
MarkBufferDirty(metabuffer);
}
+ /*
+ * In normal operation, shiftList() takes exclusive lock on all the
+ * pages-to-be-deleted simultaneously. During replay, however, it should
+ * be all right to lock them one at a time. This is dependent on the fact
+ * that we are deleting pages from the head of the list, and that readers
+ * share-lock the next page before releasing the one they are on. So we
+ * cannot get past a reader that is on, or due to visit, any page we are
+ * going to delete. New incoming readers will block behind our metapage
+ * lock and then see a fully updated page list.
+ */
for (i = 0; i < data->ndeleted; i++)
{
Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
@@ -678,7 +728,6 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization as we have in b-tree, and remove
* killed tuples outside VACUUM, we'll need to handle that here.
*/
- RestoreBkpBlocks(lsn, record, false);
topCtx = MemoryContextSwitchTo(opCtx);
switch (info)