diff options
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r-- | src/backend/access/gist/gist.c | 1009 |
1 files changed, 653 insertions, 356 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index b34830bb424..7cd144e2f09 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -31,6 +31,12 @@ typedef struct MemoryContext tmpCtx; } GISTBuildState; +/* A List of these is used represent a split-in-progress. */ +typedef struct +{ + Buffer buf; /* the split page "half" */ + IndexTuple downlink; /* downlink for this half. */ +} GISTPageSplitInfo; /* non-export function prototypes */ static void gistbuildCallback(Relation index, @@ -43,8 +49,13 @@ static void gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *GISTstate); -static void gistfindleaf(GISTInsertState *state, - GISTSTATE *giststate); +static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate); +static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, + IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, + Buffer leftchild); +static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, List *splitinfo); #define ROTATEDIST(d) do { \ @@ -251,41 +262,52 @@ gistinsert(PG_FUNCTION_ARGS) /* - * Workhouse routine for doing insertion into a GiST index. Note that - * this routine assumes it is invoked in a short-lived memory context, - * so it does not bother releasing palloc'd allocations. + * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple + * at that offset is atomically removed along with inserting the new tuples. + * This is used to replace a tuple with a new one. + * + * If 'leftchildbuf' is valid, we're inserting the downlink for the page + * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'. + * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set. + * + * If there is not enough room on the page, it is split. All the split + * pages are kept pinned and locked and returned in *splitinfo, the caller + * is responsible for inserting the downlinks for them. However, if + * 'buffer' is the root page and it needs to be split, gistplacetopage() + * performs the split as one atomic operation, and *splitinfo is set to NIL. + * In that case, we continue to hold the root page locked, and the child + * pages are released; note that new tuple(s) are *not* on the root page + * but in one of the new child pages. */ -static void -gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) +static bool +gistplacetopage(GISTInsertState *state, GISTSTATE *giststate, + Buffer buffer, + IndexTuple *itup, int ntup, OffsetNumber oldoffnum, + Buffer leftchildbuf, + List **splitinfo) { - GISTInsertState state; - - memset(&state, 0, sizeof(GISTInsertState)); - - state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); - state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); - memcpy(state.itup[0], itup, IndexTupleSize(itup)); - state.ituplen = 1; - state.freespace = freespace; - state.r = r; - state.key = itup->t_tid; - state.needInsertComplete = true; - - state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); - state.stack->blkno = GIST_ROOT_BLKNO; + Page page = BufferGetPage(buffer); + bool is_leaf = (GistPageIsLeaf(page)) ? true : false; + XLogRecPtr recptr; + int i; + bool is_split; - gistfindleaf(&state, giststate); - gistmakedeal(&state, giststate); -} + /* + * Refuse to modify a page that's incompletely split. This should + * not happen because we finish any incomplete splits while we walk + * down the tree. However, it's remotely possible that another + * concurrent inserter splits a parent page, and errors out before + * completing the split. We will just throw an error in that case, + * and leave any split we had in progress unfinished too. The next + * insert that comes along will clean up the mess. + */ + if (GistFollowRight(page)) + elog(ERROR, "concurrent GiST page split was incomplete"); -static bool -gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) -{ - bool is_splitted = false; - bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + *splitinfo = NIL; /* - * if (!is_leaf) remove old key: This node's key has been modified, either + * if isupdate, remove old key: This node's key has been modified, either * because a child split occurred or because we needed to adjust our key * for an insert in a child node. Therefore, remove the old version of * this node's key. @@ -293,77 +315,134 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * for WAL replay, in the non-split case we handle this by setting up a * one-element todelete array; in the split case, it's handled implicitly * because the tuple vector passed to gistSplit won't include this tuple. - * - * XXX: If we want to change fillfactors between node and leaf, fillfactor - * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor) */ - if (gistnospace(state->stack->page, state->itup, state->ituplen, - is_leaf ? InvalidOffsetNumber : state->stack->childoffnum, - state->freespace)) + is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace); + if (is_split) { /* no space for insertion */ IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; - BlockNumber rrlink = InvalidBlockNumber; - GistNSN oldnsn; + BlockNumber oldrlink = InvalidBlockNumber; + GistNSN oldnsn = { 0, 0 }; + SplitedPageLayout rootpg; + BlockNumber blkno = BufferGetBlockNumber(buffer); + bool is_rootsplit; - is_splitted = true; + is_rootsplit = (blkno == GIST_ROOT_BLKNO); /* - * Form index tuples vector to split: remove old tuple if t's needed - * and add new tuples to vector + * Form index tuples vector to split. If we're replacing an old tuple, + * remove the old version from the vector. */ - itvec = gistextractpage(state->stack->page, &tlen); - if (!is_leaf) + itvec = gistextractpage(page, &tlen); + if (OffsetNumberIsValid(oldoffnum)) { /* on inner page we should remove old tuple */ - int pos = state->stack->childoffnum - FirstOffsetNumber; + int pos = oldoffnum - FirstOffsetNumber; tlen--; if (pos != tlen) memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); } - itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); - dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate); - - state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen); - state->ituplen = 0; + itvec = gistjoinvector(itvec, &tlen, itup, ntup); + dist = gistSplit(state->r, page, itvec, tlen, giststate); - if (state->stack->blkno != GIST_ROOT_BLKNO) + /* + * Set up pages to work with. Allocate new buffers for all but the + * leftmost page. The original page becomes the new leftmost page, + * and is just replaced with the new contents. + * + * For a root-split, allocate new buffers for all child pages, the + * original page is overwritten with new root page containing + * downlinks to the new child pages. + */ + ptr = dist; + if (!is_rootsplit) { - /* - * if non-root split then we should not allocate new buffer, but - * we must create temporary page to operate - */ - dist->buffer = state->stack->buffer; - dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer)); + /* save old rightlink and NSN */ + oldrlink = GistPageGetOpaque(page)->rightlink; + oldnsn = GistPageGetOpaque(page)->nsn; + + dist->buffer = buffer; + dist->block.blkno = BufferGetBlockNumber(buffer); + dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer)); /* clean all flags except F_LEAF */ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; + + ptr = ptr->next; + } + for (; ptr; ptr = ptr->next) + { + /* Allocate new page */ + ptr->buffer = gistNewBuffer(state->r); + GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); + ptr->page = BufferGetPage(ptr->buffer); + ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); } - /* make new pages and fills them */ + /* + * Now that we know whick blocks the new pages go to, set up downlink + * tuples to point to them. + */ for (ptr = dist; ptr; ptr = ptr->next) { - int i; - char *data; + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + GistTupleSetValid(ptr->itup); + } - /* get new page */ - if (ptr->buffer == InvalidBuffer) + /* + * If this is a root split, we construct the new root page with the + * downlinks here directly, instead of requiring the caller to insert + * them. Add the new root page to the list along with the child pages. + */ + if (is_rootsplit) + { + IndexTuple *downlinks; + int ndownlinks = 0; + int i; + + rootpg.buffer = buffer; + rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer)); + GistPageGetOpaque(rootpg.page)->flags = 0; + + /* Prepare a vector of all the downlinks */ + for (ptr = dist; ptr; ptr = ptr->next) + ndownlinks++; + downlinks = palloc(sizeof(IndexTuple) * ndownlinks); + for (i = 0, ptr = dist; ptr; ptr = ptr->next) + downlinks[i++] = ptr->itup; + + rootpg.block.blkno = GIST_ROOT_BLKNO; + rootpg.block.num = ndownlinks; + rootpg.list = gistfillitupvec(downlinks, ndownlinks, + &(rootpg.lenlist)); + rootpg.itup = NULL; + + rootpg.next = dist; + dist = &rootpg; + } + else + { + /* Prepare split-info to be returned to caller */ + for (ptr = dist; ptr; ptr = ptr->next) { - ptr->buffer = gistNewBuffer(state->r); - GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); - ptr->page = BufferGetPage(ptr->buffer); + GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo)); + si->buf = ptr->buffer; + si->downlink = ptr->itup; + *splitinfo = lappend(*splitinfo, si); } - ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); + } - /* - * fill page, we can do it because all these pages are new (ie not - * linked in tree or masked by temp page - */ - data = (char *) (ptr->list); + /* + * Fill all pages. All the pages are new, ie. freshly allocated empty + * pages, or a temporary copy of the old page. + */ + for (ptr = dist; ptr; ptr = ptr->next) + { + char *data = (char *) (ptr->list); for (i = 0; i < ptr->block.num; i++) { if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) @@ -371,276 +450,381 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) data += IndexTupleSize((IndexTuple) data); } - /* set up ItemPointer and remember it for parent */ - ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); - state->itup[state->ituplen] = ptr->itup; - state->ituplen++; - } + /* Set up rightlinks */ + if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO) + GistPageGetOpaque(ptr->page)->rightlink = + ptr->next->block.blkno; + else + GistPageGetOpaque(ptr->page)->rightlink = oldrlink; + + if (ptr->next && !is_rootsplit) + GistMarkFollowRight(ptr->page); + else + GistClearFollowRight(ptr->page); - /* saves old rightlink */ - if (state->stack->blkno != GIST_ROOT_BLKNO) - rrlink = GistPageGetOpaque(dist->page)->rightlink; + /* + * Copy the NSN of the original page to all pages. The + * F_FOLLOW_RIGHT flags ensure that scans will follow the + * rightlinks until the downlinks are inserted. + */ + GistPageGetOpaque(ptr->page)->nsn = oldnsn; + } START_CRIT_SECTION(); /* - * must mark buffers dirty before XLogInsert, even though we'll still - * be changing their opaque fields below. set up right links. + * Must mark buffers dirty before XLogInsert, even though we'll still + * be changing their opaque fields below. */ for (ptr = dist; ptr; ptr = ptr->next) - { MarkBufferDirty(ptr->buffer); - GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ? - ptr->next->block.blkno : rrlink; - } + if (BufferIsValid(leftchildbuf)) + MarkBufferDirty(leftchildbuf); - /* restore splitted non-root page */ - if (state->stack->blkno != GIST_ROOT_BLKNO) - { - PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); - dist->page = BufferGetPage(dist->buffer); - } + /* + * The first page in the chain was a temporary working copy meant + * to replace the old page. Copy it over the old page. + */ + PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); + dist->page = BufferGetPage(dist->buffer); + /* Write the WAL record */ if (RelationNeedsWAL(state->r)) - { - XLogRecPtr recptr; - XLogRecData *rdata; - - rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, - is_leaf, &(state->key), dist); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); - - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(ptr->page, recptr); - PageSetTLI(ptr->page, ThisTimeLineID); - } - } + recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf, + dist, oldrlink, oldnsn, leftchildbuf); else - { - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(ptr->page, GetXLogRecPtrForTemp()); - } - } - - /* set up NSN */ - oldnsn = GistPageGetOpaque(dist->page)->nsn; - if (state->stack->blkno == GIST_ROOT_BLKNO) - /* if root split we should put initial value */ - oldnsn = PageGetLSN(dist->page); + recptr = GetXLogRecPtrForTemp(); for (ptr = dist; ptr; ptr = ptr->next) { - /* only for last set oldnsn */ - GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ? - PageGetLSN(ptr->page) : oldnsn; + PageSetLSN(ptr->page, recptr); + PageSetTLI(ptr->page, ThisTimeLineID); } /* - * release buffers, if it was a root split then release all buffers - * because we create all buffers + * Return the new child buffers to the caller. + * + * If this was a root split, we've already inserted the downlink + * pointers, in the form of a new root page. Therefore we can + * release all the new buffers, and keep just the root page locked. */ - ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next; - for (; ptr; ptr = ptr->next) - UnlockReleaseBuffer(ptr->buffer); - - if (state->stack->blkno == GIST_ROOT_BLKNO) + if (is_rootsplit) { - gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); - state->needInsertComplete = false; + for (ptr = dist->next; ptr; ptr = ptr->next) + UnlockReleaseBuffer(ptr->buffer); } - - END_CRIT_SECTION(); } else { - /* enough space */ + /* + * Enough space. We also get here if ntuples==0. + */ START_CRIT_SECTION(); - if (!is_leaf) - PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); - gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); + if (OffsetNumberIsValid(oldoffnum)) + PageIndexTupleDelete(page, oldoffnum); + gistfillbuffer(page, itup, ntup, InvalidOffsetNumber); - MarkBufferDirty(state->stack->buffer); + MarkBufferDirty(buffer); + + if (BufferIsValid(leftchildbuf)) + MarkBufferDirty(leftchildbuf); if (RelationNeedsWAL(state->r)) { - OffsetNumber noffs = 0, - offs[1]; - XLogRecPtr recptr; - XLogRecData *rdata; + OffsetNumber ndeloffs = 0, + deloffs[1]; - if (!is_leaf) + if (OffsetNumberIsValid(oldoffnum)) { - /* only on inner page we should delete previous version */ - offs[0] = state->stack->childoffnum; - noffs = 1; + deloffs[0] = oldoffnum; + ndeloffs = 1; } - rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, - offs, noffs, - state->itup, state->ituplen, - &(state->key)); + recptr = gistXLogUpdate(state->r->rd_node, buffer, + deloffs, ndeloffs, itup, ntup, + leftchildbuf); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); - PageSetLSN(state->stack->page, recptr); - PageSetTLI(state->stack->page, ThisTimeLineID); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); } else - PageSetLSN(state->stack->page, GetXLogRecPtrForTemp()); - - if (state->stack->blkno == GIST_ROOT_BLKNO) - state->needInsertComplete = false; + { + recptr = GetXLogRecPtrForTemp(); + PageSetLSN(page, recptr); + } - END_CRIT_SECTION(); + *splitinfo = NIL; + } - if (state->ituplen > 1) - { /* previous is_splitted==true */ + /* + * If we inserted the downlink for a child page, set NSN and clear + * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know + * to follow the rightlink if and only if they looked at the parent page + * before we inserted the downlink. + * + * Note that we do this *after* writing the WAL record. That means that + * the possible full page image in the WAL record does not include + * these changes, and they must be replayed even if the page is restored + * from the full page image. There's a chicken-and-egg problem: if we + * updated the child pages first, we wouldn't know the recptr of the WAL + * record we're about to write. + */ + if (BufferIsValid(leftchildbuf)) + { + Page leftpg = BufferGetPage(leftchildbuf); - /* - * child was splited, so we must form union for insertion in - * parent - */ - IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); + GistPageGetOpaque(leftpg)->nsn = recptr; + GistClearFollowRight(leftpg); - ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); - state->itup[0] = newtup; - state->ituplen = 1; - } - else if (is_leaf) - { - /* - * itup[0] store key to adjust parent, we set it to valid to - * correct check by GistTupleIsInvalid macro in gistgetadjusted() - */ - ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); - GistTupleSetValid(state->itup[0]); - } + PageSetLSN(leftpg, recptr); + PageSetTLI(leftpg, ThisTimeLineID); } - return is_splitted; + + END_CRIT_SECTION(); + + return is_split; } /* - * returns stack of pages, all pages in stack are pinned, and - * leaf is X-locked + * Workhouse routine for doing insertion into a GiST index. Note that + * this routine assumes it is invoked in a short-lived memory context, + * so it does not bother releasing palloc'd allocations. */ - static void -gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) +gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; - GISTPageOpaque opaque; + GISTInsertStack firststack; + GISTInsertStack *stack; + GISTInsertState state; + bool xlocked = false; + + memset(&state, 0, sizeof(GISTInsertState)); + state.freespace = freespace; + state.r = r; + + /* Start from the root */ + firststack.blkno = GIST_ROOT_BLKNO; + firststack.lsn.xrecoff = 0; + firststack.parent = NULL; + state.stack = stack = &firststack; /* - * walk down, We don't lock page for a long time, but so we should be - * ready to recheck path in a bad case... We remember, that page->lsn - * should never be invalid. + * Walk down along the path of smallest penalty, updating the parent + * pointers with the key we're inserting as we go. If we crash in the + * middle, the tree is consistent, although the possible parent updates + * were a waste. */ for (;;) { - if (XLogRecPtrIsInvalid(state->stack->lsn)) - state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); - LockBuffer(state->stack->buffer, GIST_SHARE); - gistcheckpage(state->r, state->stack->buffer); + if (XLogRecPtrIsInvalid(stack->lsn)) + stack->buffer = ReadBuffer(state.r, stack->blkno); + + /* + * Be optimistic and grab shared lock first. Swap it for an + * exclusive lock later if we need to update the page. + */ + if (!xlocked) + { + LockBuffer(stack->buffer, GIST_SHARE); + gistcheckpage(state.r, stack->buffer); + } + + stack->page = (Page) BufferGetPage(stack->buffer); + stack->lsn = PageGetLSN(stack->page); + Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn)); - state->stack->page = (Page) BufferGetPage(state->stack->buffer); - opaque = GistPageGetOpaque(state->stack->page); + /* + * If this page was split but the downlink was never inserted to + * the parent because the inserting backend crashed before doing + * that, fix that now. + */ + if (GistFollowRight(stack->page)) + { + if (!xlocked) + { + LockBuffer(stack->buffer, GIST_UNLOCK); + LockBuffer(stack->buffer, GIST_EXCLUSIVE); + xlocked = true; + /* someone might've completed the split when we unlocked */ + if (!GistFollowRight(stack->page)) + continue; + } + gistfixsplit(&state, giststate); - state->stack->lsn = PageGetLSN(state->stack->page); - Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn)); + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; + continue; + } - if (state->stack->blkno != GIST_ROOT_BLKNO && - XLByteLT(state->stack->parent->lsn, opaque->nsn)) + if (stack->blkno != GIST_ROOT_BLKNO && + XLByteLT(stack->parent->lsn, + GistPageGetOpaque(stack->page)->nsn)) { /* - * caused split non-root page is detected, go up to parent to - * choose best child + * Concurrent split detected. There's no guarantee that the + * downlink for this page is consistent with the tuple we're + * inserting anymore, so go back to parent and rechoose the + * best child. */ - UnlockReleaseBuffer(state->stack->buffer); - state->stack = state->stack->parent; + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; continue; } - if (!GistPageIsLeaf(state->stack->page)) + if (!GistPageIsLeaf(stack->page)) { /* - * This is an internal page, so continue to walk down the tree. We - * find the child node that has the minimum insertion penalty and - * recursively invoke ourselves to modify that node. Once the - * recursive call returns, we may need to adjust the parent node - * for two reasons: the child node split, or the key in this node - * needs to be adjusted for the newly inserted key below us. + * This is an internal page so continue to walk down the tree. + * Find the child node that has the minimum insertion penalty. */ - GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); - - state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); + BlockNumber childblkno; + IndexTuple newtup; + GISTInsertStack *item; - iid = PageGetItemId(state->stack->page, state->stack->childoffnum); - idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); - item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); - LockBuffer(state->stack->buffer, GIST_UNLOCK); + stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate); + iid = PageGetItemId(stack->page, stack->childoffnum); + idxtuple = (IndexTuple) PageGetItem(stack->page, iid); + childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); - item->parent = state->stack; - item->child = NULL; - if (state->stack) - state->stack->child = item; - state->stack = item; - } - else - { - /* be carefull, during unlock/lock page may be changed... */ - LockBuffer(state->stack->buffer, GIST_UNLOCK); - LockBuffer(state->stack->buffer, GIST_EXCLUSIVE); - state->stack->page = (Page) BufferGetPage(state->stack->buffer); - opaque = GistPageGetOpaque(state->stack->page); + /* + * Check that it's not a leftover invalid tuple from pre-9.1 + */ + if (GistTupleIsInvalid(idxtuple)) + ereport(ERROR, + (errmsg("index \"%s\" contains an inner tuple marked as invalid", + RelationGetRelationName(r)), + errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), + errhint("Please REINDEX it."))); - if (state->stack->blkno == GIST_ROOT_BLKNO) + /* + * Check that the key representing the target child node is + * consistent with the key we're inserting. Update it if it's not. + */ + newtup = gistgetadjusted(state.r, idxtuple, itup, giststate); + if (newtup) { /* - * the only page can become inner instead of leaf is a root - * page, so for root we should recheck it + * Swap shared lock for an exclusive one. Beware, the page + * may change while we unlock/lock the page... */ - if (!GistPageIsLeaf(state->stack->page)) + if (!xlocked) { - /* - * very rarely situation: during unlock/lock index with - * number of pages = 1 was increased - */ - LockBuffer(state->stack->buffer, GIST_UNLOCK); - continue; - } + LockBuffer(stack->buffer, GIST_UNLOCK); + LockBuffer(stack->buffer, GIST_EXCLUSIVE); + xlocked = true; + stack->page = (Page) BufferGetPage(stack->buffer); + if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn)) + { + /* the page was changed while we unlocked it, retry */ + continue; + } + } /* - * we don't need to check root split, because checking - * leaf/inner is enough to recognize split for root + * Update the tuple. + * + * gistinserthere() might have to split the page to make the + * updated tuple fit. It will adjust the stack so that after + * the call, we'll be holding a lock on the page containing + * the tuple, which might have moved right. + * + * Except if this causes a root split, gistinserthere() + * returns 'true'. In that case, stack only holds the new + * root, and the child page was released. Have to start + * all over. */ - + if (gistinserttuples(&state, stack, giststate, &newtup, 1, + stack->childoffnum, InvalidBuffer)) + { + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; + continue; + } } - else if (XLByteLT(state->stack->parent->lsn, opaque->nsn)) + LockBuffer(stack->buffer, GIST_UNLOCK); + xlocked = false; + + /* descend to the chosen child */ + item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); + item->blkno = childblkno; + item->parent = stack; + state.stack = stack = item; + } + else + { + /* + * Leaf page. Insert the new key. We've already updated all the + * parents on the way down, but we might have to split the page + * if it doesn't fit. gistinserthere() will take care of that. + */ + + /* + * Swap shared lock for an exclusive one. Be careful, the page + * may change while we unlock/lock the page... + */ + if (!xlocked) { - /* - * detecting split during unlock/lock, so we should find - * better child on parent - */ + LockBuffer(stack->buffer, GIST_UNLOCK); + LockBuffer(stack->buffer, GIST_EXCLUSIVE); + xlocked = true; + stack->page = (Page) BufferGetPage(stack->buffer); + stack->lsn = PageGetLSN(stack->page); - /* forget buffer */ - UnlockReleaseBuffer(state->stack->buffer); + if (stack->blkno == GIST_ROOT_BLKNO) + { + /* + * the only page that can become inner instead of leaf + * is the root page, so for root we should recheck it + */ + if (!GistPageIsLeaf(stack->page)) + { + /* + * very rare situation: during unlock/lock index with + * number of pages = 1 was increased + */ + LockBuffer(stack->buffer, GIST_UNLOCK); + xlocked = false; + continue; + } - state->stack = state->stack->parent; - continue; + /* + * we don't need to check root split, because checking + * leaf/inner is enough to recognize split for root + */ + } + else if (GistFollowRight(stack->page) || + XLByteLT(stack->parent->lsn, + GistPageGetOpaque(stack->page)->nsn)) + { + /* + * The page was split while we momentarily unlocked the + * page. Go back to parent. + */ + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; + continue; + } } - state->stack->lsn = PageGetLSN(state->stack->page); + /* now state.stack->(page, buffer and blkno) points to leaf page */ - /* ok we found a leaf page and it X-locked */ + gistinserttuples(&state, stack, giststate, &itup, 1, + InvalidOffsetNumber, InvalidBuffer); + LockBuffer(stack->buffer, GIST_UNLOCK); + + /* Release any pins we might still hold before exiting */ + for (; stack; stack = stack->parent) + ReleaseBuffer(stack->buffer); break; } } - - /* now state->stack->(page, buffer and blkno) points to leaf page */ } /* @@ -648,7 +832,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) * * returns from the beginning of closest parent; * - * To prevent deadlocks, this should lock only one page simultaneously. + * To prevent deadlocks, this should lock only one page at a time. */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child) @@ -683,6 +867,13 @@ gistFindPath(Relation r, BlockNumber child) top->lsn = PageGetLSN(page); + /* + * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a + * downlink. This should not normally happen.. + */ + if (GistFollowRight(page)) + elog(ERROR, "concurrent GiST page split was incomplete"); + if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) && GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ ) { @@ -711,8 +902,6 @@ gistFindPath(Relation r, BlockNumber child) ptr = top; while (ptr->parent) { - /* set child link */ - ptr->parent->child = ptr; /* move childoffnum.. */ if (ptr == top) { @@ -754,17 +943,16 @@ gistFindPath(Relation r, BlockNumber child) return NULL; } - /* - * Returns X-locked parent of stack page + * Updates the stack so that child->parent is the correct parent of the + * child. child->parent must be exclusively locked on entry, and will + * remain so at exit, but it might not be the same page anymore. */ - static void gistFindCorrectParent(Relation r, GISTInsertStack *child) { GISTInsertStack *parent = child->parent; - LockBuffer(parent->buffer, GIST_EXCLUSIVE); gistcheckpage(r, parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer); @@ -836,83 +1024,231 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) /* install new chain of parents to stack */ child->parent = parent; - parent->child = child; /* make recursive call to normal processing */ + LockBuffer(child->parent->buffer, GIST_EXCLUSIVE); gistFindCorrectParent(r, child); } return; } -void -gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) +/* + * Form a downlink pointer for the page in 'buf'. + */ +static IndexTuple +gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate, + GISTInsertStack *stack) { - int is_splitted; - ItemId iid; - IndexTuple oldtup, - newtup; + Page page = BufferGetPage(buf); + OffsetNumber maxoff; + OffsetNumber offset; + IndexTuple downlink = NULL; - /* walk up */ - while (true) + maxoff = PageGetMaxOffsetNumber(page); + for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset)) { - /* - * After this call: 1. if child page was splited, then itup contains - * keys for each page 2. if child page wasn't splited, then itup - * contains additional for adjustment of current key - */ - - if (state->stack->parent) + IndexTuple ituple = (IndexTuple) + PageGetItem(page, PageGetItemId(page, offset)); + if (downlink == NULL) + downlink = CopyIndexTuple(ituple); + else { - /* - * X-lock parent page before proceed child, gistFindCorrectParent - * should find and lock it - */ - gistFindCorrectParent(state->r, state->stack); + IndexTuple newdownlink; + newdownlink = gistgetadjusted(rel, downlink, ituple, + giststate); + if (newdownlink) + downlink = newdownlink; } - is_splitted = gistplacetopage(state, giststate); + } + + /* + * If the page is completely empty, we can't form a meaningful + * downlink for it. But we have to insert a downlink for the page. + * Any key will do, as long as its consistent with the downlink of + * parent page, so that we can legally insert it to the parent. + * A minimal one that matches as few scans as possible would be best, + * to keep scans from doing useless work, but we don't know how to + * construct that. So we just use the downlink of the original page + * that was split - that's as far from optimal as it can get but will + * do.. + */ + if (!downlink) + { + ItemId iid; + + LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); + gistFindCorrectParent(rel, stack); + iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum); + downlink = (IndexTuple) PageGetItem(stack->parent->page, iid); + downlink = CopyIndexTuple(downlink); + LockBuffer(stack->parent->buffer, GIST_UNLOCK); + } - /* parent locked above, so release child buffer */ - UnlockReleaseBuffer(state->stack->buffer); + ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf)); + GistTupleSetValid(downlink); - /* pop parent page from stack */ - state->stack = state->stack->parent; + return downlink; +} - /* stack is void */ - if (!state->stack) - break; - /* - * child did not split, so we can check is it needed to update parent - * tuple - */ - if (!is_splitted) - { - /* parent's tuple */ - iid = PageGetItemId(state->stack->page, state->stack->childoffnum); - oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); - newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); - - if (!newtup) - { /* not need to update key */ - LockBuffer(state->stack->buffer, GIST_UNLOCK); - break; - } +/* + * Complete the incomplete split of state->stack->page. + */ +static void +gistfixsplit(GISTInsertState *state, GISTSTATE *giststate) +{ + GISTInsertStack *stack = state->stack; + Buffer buf; + Page page; + List *splitinfo = NIL; + + elog(LOG, "fixing incomplete split in index \"%s\", block %u", + RelationGetRelationName(state->r), stack->blkno); - state->itup[0] = newtup; + Assert(GistFollowRight(stack->page)); + Assert(OffsetNumberIsValid(stack->parent->childoffnum)); + + buf = stack->buffer; + + /* + * Read the chain of split pages, following the rightlinks. Construct + * a downlink tuple for each page. + */ + for (;;) + { + GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo)); + IndexTuple downlink; + + page = BufferGetPage(buf); + + /* Form the new downlink tuples to insert to parent */ + downlink = gistformdownlink(state->r, buf, giststate, stack); + + si->buf = buf; + si->downlink = downlink; + + splitinfo = lappend(splitinfo, si); + + if (GistFollowRight(page)) + { + /* lock next page */ + buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink); + LockBuffer(buf, GIST_EXCLUSIVE); } - } /* while */ + else + break; + } + + /* Insert the downlinks */ + gistfinishsplit(state, stack, giststate, splitinfo); +} + +/* + * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples + * replace an old tuple at oldoffnum. The caller must hold an exclusive lock + * on the page. + * + * If leftchild is valid, we're inserting/updating the downlink for the + * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and + * update NSN on leftchild, atomically with the insertion of the downlink. + * + * Returns 'true' if the page had to be split. On return, we will continue + * to hold an exclusive lock on state->stack->buffer, but if we had to split + * the page, it might not contain the tuple we just inserted/updated. + */ +static bool +gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, + IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, + Buffer leftchild) +{ + List *splitinfo; + bool is_split; + + is_split = gistplacetopage(state, giststate, stack->buffer, + tuples, ntup, oldoffnum, + leftchild, + &splitinfo); + if (splitinfo) + gistfinishsplit(state, stack, giststate, splitinfo); + + return is_split; +} + +/* + * Finish an incomplete split by inserting/updating the downlinks in + * parent page. 'splitinfo' contains all the child pages, exclusively-locked, + * involved in the split, from left-to-right. + */ +static void +gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, List *splitinfo) +{ + ListCell *lc; + List *reversed; + GISTPageSplitInfo *right; + GISTPageSplitInfo *left; + IndexTuple tuples[2]; + + /* A split always contains at least two halves */ + Assert(list_length(splitinfo) >= 2); + + /* + * We need to insert downlinks for each new page, and update the + * downlink for the original (leftmost) page in the split. Begin at + * the rightmost page, inserting one downlink at a time until there's + * only two pages left. Finally insert the downlink for the last new + * page and update the downlink for the original page as one operation. + */ - /* release all parent buffers */ - while (state->stack) + /* for convenience, create a copy of the list in reverse order */ + reversed = NIL; + foreach(lc, splitinfo) { - ReleaseBuffer(state->stack->buffer); - state->stack = state->stack->parent; + reversed = lcons(lfirst(lc), reversed); } - /* say to xlog that insert is completed */ - if (state->needInsertComplete && RelationNeedsWAL(state->r)) - gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); + LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); + gistFindCorrectParent(state->r, stack); + + while(list_length(reversed) > 2) + { + right = (GISTPageSplitInfo *) linitial(reversed); + left = (GISTPageSplitInfo *) lsecond(reversed); + + if (gistinserttuples(state, stack->parent, giststate, + &right->downlink, 1, + InvalidOffsetNumber, + left->buf)) + { + /* + * If the parent page was split, need to relocate the original + * parent pointer. + */ + gistFindCorrectParent(state->r, stack); + } + UnlockReleaseBuffer(right->buf); + reversed = list_delete_first(reversed); + } + + right = (GISTPageSplitInfo *) linitial(reversed); + left = (GISTPageSplitInfo *) lsecond(reversed); + + /* + * Finally insert downlink for the remaining right page and update the + * downlink for the original page to not contain the tuples that were + * moved to the new pages. + */ + tuples[0] = left->downlink; + tuples[1] = right->downlink; + gistinserttuples(state, stack->parent, giststate, + tuples, 2, + stack->parent->childoffnum, + left->buf); + LockBuffer(stack->parent->buffer, GIST_UNLOCK); + UnlockReleaseBuffer(right->buf); + Assert(left->buf == stack->buffer); } /* @@ -963,8 +1299,7 @@ gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nright; res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); - res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false) - : gist_form_invalid_tuple(GIST_ROOT_BLKNO); + res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false); } if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) @@ -986,51 +1321,13 @@ gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nleft; res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); - res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false) - : gist_form_invalid_tuple(GIST_ROOT_BLKNO); + res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false); } return res; } /* - * buffer must be pinned and locked by caller - */ -void -gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) -{ - Page page; - - Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); - page = BufferGetPage(buffer); - - START_CRIT_SECTION(); - - GISTInitBuffer(buffer, 0); - gistfillbuffer(page, itup, len, FirstOffsetNumber); - - MarkBufferDirty(buffer); - - if (RelationNeedsWAL(r)) - { - XLogRecPtr recptr; - XLogRecData *rdata; - - rdata = formUpdateRdata(r->rd_node, buffer, - NULL, 0, - itup, len, key); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - } - else - PageSetLSN(page, GetXLogRecPtrForTemp()); - - END_CRIT_SECTION(); -} - -/* * Fill a GISTSTATE with information about the index */ void |