aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r--src/backend/access/gin/ginxlog.c479
1 files changed, 206 insertions, 273 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index ca7bee1f340..4e43ae9feb1 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -18,55 +18,27 @@
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
-static MemoryContext topCtx;
-
-typedef struct ginIncompleteSplit
-{
- RelFileNode node;
- BlockNumber leftBlkno;
- BlockNumber rightBlkno;
- BlockNumber rootBlkno;
-} ginIncompleteSplit;
-
-static List *incomplete_splits;
static void
-pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
+ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
{
- ginIncompleteSplit *split;
-
- MemoryContextSwitchTo(topCtx);
-
- split = palloc(sizeof(ginIncompleteSplit));
-
- split->node = node;
- split->leftBlkno = leftBlkno;
- split->rightBlkno = rightBlkno;
- split->rootBlkno = rootBlkno;
-
- incomplete_splits = lappend(incomplete_splits, split);
-
- MemoryContextSwitchTo(opCtx);
-}
+ Buffer buffer;
+ Page page;
-static void
-forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
-{
- ListCell *l;
+ buffer = XLogReadBuffer(node, blkno, false);
+ if (!BufferIsValid(buffer))
+ return; /* page was deleted, nothing to do */
+ page = (Page) BufferGetPage(buffer);
- foreach(l, incomplete_splits)
+ if (lsn > PageGetLSN(page))
{
- ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+ GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
- if (RelFileNodeEquals(node, split->node) &&
- leftBlkno == split->leftBlkno &&
- updateBlkno == split->rightBlkno)
- {
- incomplete_splits = list_delete_ptr(incomplete_splits, split);
- pfree(split);
- break;
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+
+ UnlockReleaseBuffer(buffer);
}
static void
@@ -128,44 +100,105 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
}
static void
-ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
+ void *rdata)
{
- ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
+ Page page = BufferGetPage(buffer);
+ ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
+ IndexTuple itup;
- /* first, forget any incomplete split this insertion completes */
- if (data->isData)
+ if (rightblkno != InvalidBlockNumber)
{
- Assert(data->isDelete == FALSE);
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- {
- PostingItem *pitem;
+ /* update link to right page after split */
+ Assert(!GinPageIsLeaf(page));
+ Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
+ GinSetDownlink(itup, rightblkno);
+ }
- pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
- forgetIncompleteSplit(data->node,
- PostingItemGetBlockNumber(pitem),
- data->updateBlkno);
- }
+ if (data->isDelete)
+ {
+ Assert(GinPageIsLeaf(page));
+ Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
+ PageIndexTupleDelete(page, offset);
+ }
+
+ itup = &data->tuple;
+
+ if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
+ {
+ RelFileNode node;
+ ForkNumber forknum;
+ BlockNumber blknum;
+ BufferGetTag(buffer, &node, &forknum, &blknum);
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ node.spcNode, node.dbNode, node.relNode);
+ }
+}
+
+static void
+ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
+ void *rdata)
+{
+ Page page = BufferGetPage(buffer);
+
+ if (GinPageIsLeaf(page))
+ {
+ ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
+ ItemPointerData *items = data->items;
+ OffsetNumber i;
+
+ for (i = 0; i < data->nitem; i++)
+ GinDataPageAddItemPointer(page, &items[i], offset + i);
}
else
{
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- {
- IndexTuple itup;
+ PostingItem *pitem = (PostingItem *) rdata;
+ PostingItem *oldpitem;
- itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
- forgetIncompleteSplit(data->node,
- GinGetDownlink(itup),
- data->updateBlkno);
- }
+ /* update link to right page after split */
+ oldpitem = GinDataPageGetPostingItem(page, offset);
+ PostingItemSetBlockNumber(oldpitem, rightblkno);
+
+ GinDataPageAddPostingItem(page, pitem, offset);
+ }
+}
+
+static void
+ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+{
+ ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ char *payload;
+ BlockNumber leftChildBlkno = InvalidBlockNumber;
+ BlockNumber rightChildBlkno = InvalidBlockNumber;
+ bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
+
+ payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
+
+ /*
+ * First clear incomplete-split flag on child page if this finishes
+ * a split.
+ */
+ if (!isLeaf)
+ {
+ leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+ payload += sizeof(BlockIdData);
+ rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+ payload += sizeof(BlockIdData);
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
+ ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
}
/* If we have a full-page image, restore it and we're done */
- if (record->xl_info & XLR_BKP_BLOCK(0))
+ if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
{
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
return;
}
@@ -176,74 +209,82 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
- if (data->isData)
+ /* How to insert the payload is tree-type specific */
+ if (data->flags & GIN_INSERT_ISDATA)
{
Assert(GinPageIsData(page));
-
- if (data->isLeaf)
- {
- OffsetNumber i;
- ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
- Assert(GinPageIsLeaf(page));
- Assert(data->updateBlkno == InvalidBlockNumber);
-
- for (i = 0; i < data->nitem; i++)
- GinDataPageAddItemPointer(page, &items[i], data->offset + i);
- }
- else
- {
- PostingItem *pitem;
-
- Assert(!GinPageIsLeaf(page));
-
- if (data->updateBlkno != InvalidBlockNumber)
- {
- /* update link to right page after split */
- pitem = GinDataPageGetPostingItem(page, data->offset);
- PostingItemSetBlockNumber(pitem, data->updateBlkno);
- }
-
- pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
- GinDataPageAddPostingItem(page, pitem, data->offset);
- }
+ ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
}
else
{
- IndexTuple itup;
-
Assert(!GinPageIsData(page));
+ ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
+ }
- if (data->updateBlkno != InvalidBlockNumber)
- {
- /* update link to right page after split */
- Assert(!GinPageIsLeaf(page));
- Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
- GinSetDownlink(itup, data->updateBlkno);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
- if (data->isDelete)
- {
- Assert(GinPageIsLeaf(page));
- Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
- PageIndexTupleDelete(page, data->offset);
- }
+ UnlockReleaseBuffer(buffer);
+}
- itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+static void
+ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
+{
+ ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
+ IndexTuple itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
+ OffsetNumber i;
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- }
+ for (i = 0; i < data->separator; i++)
+ {
+ if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to gin index page");
+ itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+ }
- PageSetLSN(page, lsn);
+ for (i = data->separator; i < data->nitem; i++)
+ {
+ if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to gin index page");
+ itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+ }
+}
- MarkBufferDirty(buffer);
+static void
+ginRedoSplitData(Page lpage, Page rpage, void *rdata)
+{
+ ginxlogSplitData *data = (ginxlogSplitData *) rdata;
+ bool isleaf = GinPageIsLeaf(lpage);
+ char *ptr = (char *) rdata + sizeof(ginxlogSplitData);
+ OffsetNumber i;
+ ItemPointer bound;
+
+ if (isleaf)
+ {
+ ItemPointer items = (ItemPointer) ptr;
+ for (i = 0; i < data->separator; i++)
+ GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
+ for (i = data->separator; i < data->nitem; i++)
+ GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
+ }
+ else
+ {
+ PostingItem *items = (PostingItem *) ptr;
+ for (i = 0; i < data->separator; i++)
+ GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
+ for (i = data->separator; i < data->nitem; i++)
+ GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
}
- UnlockReleaseBuffer(buffer);
+ /* set up right key */
+ bound = GinDataPageGetRightBound(lpage);
+ if (isleaf)
+ *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
+ else
+ *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
+
+ bound = GinDataPageGetRightBound(rpage);
+ *bound = data->rightbound;
}
static void
@@ -255,14 +296,30 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
Page lpage,
rpage;
uint32 flags = 0;
+ char *payload;
+ bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
+ bool isData = (data->flags & GIN_INSERT_ISDATA) != 0;
+ bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
+
+ payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
+
+ /*
+ * First clear incomplete-split flag on child page if this finishes
+ * a split
+ */
+ if (!isLeaf)
+ {
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
+ ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
+ }
- if (data->isLeaf)
+ if (isLeaf)
flags |= GIN_LEAF;
- if (data->isData)
- flags |= GIN_DATA;
- /* Backup blocks are not used in split records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ if (isData)
+ flags |= GIN_DATA;
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
@@ -275,64 +332,13 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
GinInitBuffer(rbuffer, flags);
GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
- GinPageGetOpaque(rpage)->rightlink = data->rrlink;
-
- if (data->isData)
- {
- char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
- Size sizeofitem = GinSizeOfDataPageItem(lpage);
- OffsetNumber i;
- ItemPointer bound;
-
- for (i = 0; i < data->separator; i++)
- {
- if (data->isLeaf)
- GinDataPageAddItemPointer(lpage, (ItemPointer) ptr, InvalidOffsetNumber);
- else
- GinDataPageAddPostingItem(lpage, (PostingItem *) ptr, InvalidOffsetNumber);
- ptr += sizeofitem;
- }
-
- for (i = data->separator; i < data->nitem; i++)
- {
- if (data->isLeaf)
- GinDataPageAddItemPointer(rpage, (ItemPointer) ptr, InvalidOffsetNumber);
- else
- GinDataPageAddPostingItem(rpage, (PostingItem *) ptr, InvalidOffsetNumber);
- ptr += sizeofitem;
- }
+ GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
- /* set up right key */
- bound = GinDataPageGetRightBound(lpage);
- if (data->isLeaf)
- *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
- else
- *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
-
- bound = GinDataPageGetRightBound(rpage);
- *bound = data->rightbound;
- }
+ /* Do the tree-type specific portion to restore the page contents */
+ if (isData)
+ ginRedoSplitData(lpage, rpage, payload);
else
- {
- IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
- OffsetNumber i;
-
- for (i = 0; i < data->separator; i++)
- {
- if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
-
- for (i = data->separator; i < data->nitem; i++)
- {
- if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
- }
+ ginRedoSplitEntry(lpage, rpage, payload);
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuffer);
@@ -340,25 +346,31 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuffer);
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
-
- if (data->isRootSplit)
+ if (isRoot)
{
- Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
+ BlockNumber rootBlkno = data->rrlink;
+ Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
- if (data->isData)
+ if (isData)
{
- Assert(data->rootBlkno != GIN_ROOT_BLKNO);
- ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ Assert(rootBlkno != GIN_ROOT_BLKNO);
+ ginDataFillRoot(NULL, BufferGetPage(rootBuf),
+ BufferGetBlockNumber(lbuffer),
+ BufferGetPage(lbuffer),
+ BufferGetBlockNumber(rbuffer),
+ BufferGetPage(rbuffer));
}
else
{
- Assert(data->rootBlkno == GIN_ROOT_BLKNO);
- ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ Assert(rootBlkno == GIN_ROOT_BLKNO);
+ ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
+ BufferGetBlockNumber(lbuffer),
+ BufferGetPage(lbuffer),
+ BufferGetBlockNumber(rbuffer),
+ BufferGetPage(rbuffer));
}
PageSetLSN(rootPage, lsn);
@@ -366,8 +378,6 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
MarkBufferDirty(rootBuf);
UnlockReleaseBuffer(rootBuf);
}
- else
- pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
UnlockReleaseBuffer(rbuffer);
UnlockReleaseBuffer(lbuffer);
@@ -711,6 +721,7 @@ void
gin_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ MemoryContext oldCtx;
/*
* GIN indexes do not require any conflict processing. NB: If we ever
@@ -718,7 +729,7 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
* killed tuples outside VACUUM, we'll need to handle that here.
*/
- topCtx = MemoryContextSwitchTo(opCtx);
+ oldCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIN_CREATE_INDEX:
@@ -751,15 +762,13 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
default:
elog(PANIC, "gin_redo: unknown op code %u", info);
}
- MemoryContextSwitchTo(topCtx);
+ MemoryContextSwitchTo(oldCtx);
MemoryContextReset(opCtx);
}
void
gin_xlog_startup(void)
{
- incomplete_splits = NIL;
-
opCtx = AllocSetContextCreate(CurrentMemoryContext,
"GIN recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
@@ -767,84 +776,8 @@ gin_xlog_startup(void)
ALLOCSET_DEFAULT_MAXSIZE);
}
-static void
-ginContinueSplit(ginIncompleteSplit *split)
-{
- GinBtreeData btree;
- GinState ginstate;
- Relation reln;
- Buffer buffer;
- GinBtreeStack *stack;
-
- /*
- * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
- * split->leftBlkno, split->rightBlkno);
- */
- buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
-
- /*
- * Failure should be impossible here, because we wrote the page earlier.
- */
- if (!BufferIsValid(buffer))
- elog(PANIC, "ginContinueSplit: left block %u not found",
- split->leftBlkno);
-
- reln = CreateFakeRelcacheEntry(split->node);
-
- if (split->rootBlkno == GIN_ROOT_BLKNO)
- {
- MemSet(&ginstate, 0, sizeof(ginstate));
- ginstate.index = reln;
-
- ginPrepareEntryScan(&btree,
- InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
- &ginstate);
- }
- else
- {
- ginPrepareDataScan(&btree, reln, split->rootBlkno);
- }
-
- stack = palloc(sizeof(GinBtreeStack));
- stack->blkno = split->leftBlkno;
- stack->buffer = buffer;
- stack->off = InvalidOffsetNumber;
- stack->parent = NULL;
-
- ginFindParents(&btree, stack);
- LockBuffer(stack->parent->buffer, GIN_UNLOCK);
- ginFinishSplit(&btree, stack, NULL);
-
- /* buffer is released by ginFinishSplit */
-
- FreeFakeRelcacheEntry(reln);
-}
-
void
gin_xlog_cleanup(void)
{
- ListCell *l;
- MemoryContext topCtx;
-
- topCtx = MemoryContextSwitchTo(opCtx);
-
- foreach(l, incomplete_splits)
- {
- ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
-
- ginContinueSplit(split);
- MemoryContextReset(opCtx);
- }
-
- MemoryContextSwitchTo(topCtx);
MemoryContextDelete(opCtx);
- incomplete_splits = NIL;
-}
-
-bool
-gin_safe_restartpoint(void)
-{
- if (incomplete_splits)
- return false;
- return true;
}