diff options
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r-- | src/backend/access/gin/ginxlog.c | 479 |
1 files changed, 206 insertions, 273 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index ca7bee1f340..4e43ae9feb1 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -18,55 +18,27 @@ #include "utils/memutils.h" static MemoryContext opCtx; /* working memory for operations */ -static MemoryContext topCtx; - -typedef struct ginIncompleteSplit -{ - RelFileNode node; - BlockNumber leftBlkno; - BlockNumber rightBlkno; - BlockNumber rootBlkno; -} ginIncompleteSplit; - -static List *incomplete_splits; static void -pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno) +ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno) { - ginIncompleteSplit *split; - - MemoryContextSwitchTo(topCtx); - - split = palloc(sizeof(ginIncompleteSplit)); - - split->node = node; - split->leftBlkno = leftBlkno; - split->rightBlkno = rightBlkno; - split->rootBlkno = rootBlkno; - - incomplete_splits = lappend(incomplete_splits, split); - - MemoryContextSwitchTo(opCtx); -} + Buffer buffer; + Page page; -static void -forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno) -{ - ListCell *l; + buffer = XLogReadBuffer(node, blkno, false); + if (!BufferIsValid(buffer)) + return; /* page was deleted, nothing to do */ + page = (Page) BufferGetPage(buffer); - foreach(l, incomplete_splits) + if (lsn > PageGetLSN(page)) { - ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l); + GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT; - if (RelFileNodeEquals(node, split->node) && - leftBlkno == split->leftBlkno && - updateBlkno == split->rightBlkno) - { - incomplete_splits = list_delete_ptr(incomplete_splits, split); - pfree(split); - break; - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } + + UnlockReleaseBuffer(buffer); } static void @@ -128,44 +100,105 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) } static void -ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) +ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno, + void *rdata) { - ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record); - Buffer buffer; - Page page; + Page page = BufferGetPage(buffer); + ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata; + IndexTuple itup; - /* first, forget any incomplete split this insertion completes */ - if (data->isData) + if (rightblkno != InvalidBlockNumber) { - Assert(data->isDelete == FALSE); - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) - { - PostingItem *pitem; + /* update link to right page after split */ + Assert(!GinPageIsLeaf(page)); + Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page)); + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset)); + GinSetDownlink(itup, rightblkno); + } - pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - forgetIncompleteSplit(data->node, - PostingItemGetBlockNumber(pitem), - data->updateBlkno); - } + if (data->isDelete) + { + Assert(GinPageIsLeaf(page)); + Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page)); + PageIndexTupleDelete(page, offset); + } + + itup = &data->tuple; + + if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber) + { + RelFileNode node; + ForkNumber forknum; + BlockNumber blknum; + BufferGetTag(buffer, &node, &forknum, &blknum); + elog(ERROR, "failed to add item to index page in %u/%u/%u", + node.spcNode, node.dbNode, node.relNode); + } +} + +static void +ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno, + void *rdata) +{ + Page page = BufferGetPage(buffer); + + if (GinPageIsLeaf(page)) + { + ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata; + ItemPointerData *items = data->items; + OffsetNumber i; + + for (i = 0; i < data->nitem; i++) + GinDataPageAddItemPointer(page, &items[i], offset + i); } else { - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) - { - IndexTuple itup; + PostingItem *pitem = (PostingItem *) rdata; + PostingItem *oldpitem; - itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - forgetIncompleteSplit(data->node, - GinGetDownlink(itup), - data->updateBlkno); - } + /* update link to right page after split */ + oldpitem = GinDataPageGetPostingItem(page, offset); + PostingItemSetBlockNumber(oldpitem, rightblkno); + + GinDataPageAddPostingItem(page, pitem, offset); + } +} + +static void +ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) +{ + ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record); + Buffer buffer; + Page page; + char *payload; + BlockNumber leftChildBlkno = InvalidBlockNumber; + BlockNumber rightChildBlkno = InvalidBlockNumber; + bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0; + + payload = XLogRecGetData(record) + sizeof(ginxlogInsert); + + /* + * First clear incomplete-split flag on child page if this finishes + * a split. + */ + if (!isLeaf) + { + leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload); + payload += sizeof(BlockIdData); + rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload); + payload += sizeof(BlockIdData); + + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else + ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno); } /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) + if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1)) { - (void) RestoreBackupBlock(lsn, record, 0, false, false); + (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false); return; } @@ -176,74 +209,82 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) if (lsn > PageGetLSN(page)) { - if (data->isData) + /* How to insert the payload is tree-type specific */ + if (data->flags & GIN_INSERT_ISDATA) { Assert(GinPageIsData(page)); - - if (data->isLeaf) - { - OffsetNumber i; - ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - - Assert(GinPageIsLeaf(page)); - Assert(data->updateBlkno == InvalidBlockNumber); - - for (i = 0; i < data->nitem; i++) - GinDataPageAddItemPointer(page, &items[i], data->offset + i); - } - else - { - PostingItem *pitem; - - Assert(!GinPageIsLeaf(page)); - - if (data->updateBlkno != InvalidBlockNumber) - { - /* update link to right page after split */ - pitem = GinDataPageGetPostingItem(page, data->offset); - PostingItemSetBlockNumber(pitem, data->updateBlkno); - } - - pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - - GinDataPageAddPostingItem(page, pitem, data->offset); - } + ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload); } else { - IndexTuple itup; - Assert(!GinPageIsData(page)); + ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload); + } - if (data->updateBlkno != InvalidBlockNumber) - { - /* update link to right page after split */ - Assert(!GinPageIsLeaf(page)); - Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset)); - GinSetDownlink(itup, data->updateBlkno); - } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } - if (data->isDelete) - { - Assert(GinPageIsLeaf(page)); - Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); - PageIndexTupleDelete(page, data->offset); - } + UnlockReleaseBuffer(buffer); +} - itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); +static void +ginRedoSplitEntry(Page lpage, Page rpage, void *rdata) +{ + ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata; + IndexTuple itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry)); + OffsetNumber i; - if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - } + for (i = 0; i < data->separator; i++) + { + if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add item to gin index page"); + itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); + } - PageSetLSN(page, lsn); + for (i = data->separator; i < data->nitem; i++) + { + if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add item to gin index page"); + itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); + } +} - MarkBufferDirty(buffer); +static void +ginRedoSplitData(Page lpage, Page rpage, void *rdata) +{ + ginxlogSplitData *data = (ginxlogSplitData *) rdata; + bool isleaf = GinPageIsLeaf(lpage); + char *ptr = (char *) rdata + sizeof(ginxlogSplitData); + OffsetNumber i; + ItemPointer bound; + + if (isleaf) + { + ItemPointer items = (ItemPointer) ptr; + for (i = 0; i < data->separator; i++) + GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber); + for (i = data->separator; i < data->nitem; i++) + GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber); + } + else + { + PostingItem *items = (PostingItem *) ptr; + for (i = 0; i < data->separator; i++) + GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber); + for (i = data->separator; i < data->nitem; i++) + GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber); } - UnlockReleaseBuffer(buffer); + /* set up right key */ + bound = GinDataPageGetRightBound(lpage); + if (isleaf) + *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff); + else + *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key; + + bound = GinDataPageGetRightBound(rpage); + *bound = data->rightbound; } static void @@ -255,14 +296,30 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) Page lpage, rpage; uint32 flags = 0; + char *payload; + bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0; + bool isData = (data->flags & GIN_INSERT_ISDATA) != 0; + bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0; + + payload = XLogRecGetData(record) + sizeof(ginxlogSplit); + + /* + * First clear incomplete-split flag on child page if this finishes + * a split + */ + if (!isLeaf) + { + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); + else + ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno); + } - if (data->isLeaf) + if (isLeaf) flags |= GIN_LEAF; - if (data->isData) - flags |= GIN_DATA; - /* Backup blocks are not used in split records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + if (isData) + flags |= GIN_DATA; lbuffer = XLogReadBuffer(data->node, data->lblkno, true); Assert(BufferIsValid(lbuffer)); @@ -275,64 +332,13 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) GinInitBuffer(rbuffer, flags); GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer); - GinPageGetOpaque(rpage)->rightlink = data->rrlink; - - if (data->isData) - { - char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit); - Size sizeofitem = GinSizeOfDataPageItem(lpage); - OffsetNumber i; - ItemPointer bound; - - for (i = 0; i < data->separator; i++) - { - if (data->isLeaf) - GinDataPageAddItemPointer(lpage, (ItemPointer) ptr, InvalidOffsetNumber); - else - GinDataPageAddPostingItem(lpage, (PostingItem *) ptr, InvalidOffsetNumber); - ptr += sizeofitem; - } - - for (i = data->separator; i < data->nitem; i++) - { - if (data->isLeaf) - GinDataPageAddItemPointer(rpage, (ItemPointer) ptr, InvalidOffsetNumber); - else - GinDataPageAddPostingItem(rpage, (PostingItem *) ptr, InvalidOffsetNumber); - ptr += sizeofitem; - } + GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink; - /* set up right key */ - bound = GinDataPageGetRightBound(lpage); - if (data->isLeaf) - *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff); - else - *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key; - - bound = GinDataPageGetRightBound(rpage); - *bound = data->rightbound; - } + /* Do the tree-type specific portion to restore the page contents */ + if (isData) + ginRedoSplitData(lpage, rpage, payload); else - { - IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit)); - OffsetNumber i; - - for (i = 0; i < data->separator; i++) - { - if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); - } - - for (i = data->separator; i < data->nitem; i++) - { - if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); - } - } + ginRedoSplitEntry(lpage, rpage, payload); PageSetLSN(rpage, lsn); MarkBufferDirty(rbuffer); @@ -340,25 +346,31 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(lpage, lsn); MarkBufferDirty(lbuffer); - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) - forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno); - - if (data->isRootSplit) + if (isRoot) { - Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true); + BlockNumber rootBlkno = data->rrlink; + Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true); Page rootPage = BufferGetPage(rootBuf); GinInitBuffer(rootBuf, flags & ~GIN_LEAF); - if (data->isData) + if (isData) { - Assert(data->rootBlkno != GIN_ROOT_BLKNO); - ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer); + Assert(rootBlkno != GIN_ROOT_BLKNO); + ginDataFillRoot(NULL, BufferGetPage(rootBuf), + BufferGetBlockNumber(lbuffer), + BufferGetPage(lbuffer), + BufferGetBlockNumber(rbuffer), + BufferGetPage(rbuffer)); } else { - Assert(data->rootBlkno == GIN_ROOT_BLKNO); - ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer); + Assert(rootBlkno == GIN_ROOT_BLKNO); + ginEntryFillRoot(NULL, BufferGetPage(rootBuf), + BufferGetBlockNumber(lbuffer), + BufferGetPage(lbuffer), + BufferGetBlockNumber(rbuffer), + BufferGetPage(rbuffer)); } PageSetLSN(rootPage, lsn); @@ -366,8 +378,6 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) MarkBufferDirty(rootBuf); UnlockReleaseBuffer(rootBuf); } - else - pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno); UnlockReleaseBuffer(rbuffer); UnlockReleaseBuffer(lbuffer); @@ -711,6 +721,7 @@ void gin_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; + MemoryContext oldCtx; /* * GIN indexes do not require any conflict processing. NB: If we ever @@ -718,7 +729,7 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record) * killed tuples outside VACUUM, we'll need to handle that here. */ - topCtx = MemoryContextSwitchTo(opCtx); + oldCtx = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIN_CREATE_INDEX: @@ -751,15 +762,13 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record) default: elog(PANIC, "gin_redo: unknown op code %u", info); } - MemoryContextSwitchTo(topCtx); + MemoryContextSwitchTo(oldCtx); MemoryContextReset(opCtx); } void gin_xlog_startup(void) { - incomplete_splits = NIL; - opCtx = AllocSetContextCreate(CurrentMemoryContext, "GIN recovery temporary context", ALLOCSET_DEFAULT_MINSIZE, @@ -767,84 +776,8 @@ gin_xlog_startup(void) ALLOCSET_DEFAULT_MAXSIZE); } -static void -ginContinueSplit(ginIncompleteSplit *split) -{ - GinBtreeData btree; - GinState ginstate; - Relation reln; - Buffer buffer; - GinBtreeStack *stack; - - /* - * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno, - * split->leftBlkno, split->rightBlkno); - */ - buffer = XLogReadBuffer(split->node, split->leftBlkno, false); - - /* - * Failure should be impossible here, because we wrote the page earlier. - */ - if (!BufferIsValid(buffer)) - elog(PANIC, "ginContinueSplit: left block %u not found", - split->leftBlkno); - - reln = CreateFakeRelcacheEntry(split->node); - - if (split->rootBlkno == GIN_ROOT_BLKNO) - { - MemSet(&ginstate, 0, sizeof(ginstate)); - ginstate.index = reln; - - ginPrepareEntryScan(&btree, - InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY, - &ginstate); - } - else - { - ginPrepareDataScan(&btree, reln, split->rootBlkno); - } - - stack = palloc(sizeof(GinBtreeStack)); - stack->blkno = split->leftBlkno; - stack->buffer = buffer; - stack->off = InvalidOffsetNumber; - stack->parent = NULL; - - ginFindParents(&btree, stack); - LockBuffer(stack->parent->buffer, GIN_UNLOCK); - ginFinishSplit(&btree, stack, NULL); - - /* buffer is released by ginFinishSplit */ - - FreeFakeRelcacheEntry(reln); -} - void gin_xlog_cleanup(void) { - ListCell *l; - MemoryContext topCtx; - - topCtx = MemoryContextSwitchTo(opCtx); - - foreach(l, incomplete_splits) - { - ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l); - - ginContinueSplit(split); - MemoryContextReset(opCtx); - } - - MemoryContextSwitchTo(topCtx); MemoryContextDelete(opCtx); - incomplete_splits = NIL; -} - -bool -gin_safe_restartpoint(void) -{ - if (incomplete_splits) - return false; - return true; } |