aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gist
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2010-12-23 16:03:08 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2010-12-23 16:21:47 +0200
commit9de3aa65f01fb51cbc725e8508ea233e4e92c46c (patch)
treed78b3dcde6cd6955af5ddb913f9a32bffd1a25a9 /src/backend/access/gist
parent7a1ca8977fd109a86b540444f71f24bd2f38ea43 (diff)
downloadpostgresql-9de3aa65f01fb51cbc725e8508ea233e4e92c46c.tar.gz
postgresql-9de3aa65f01fb51cbc725e8508ea233e4e92c46c.zip
Rewrite the GiST insertion logic so that we don't need the post-recovery
cleanup stage to finish incomplete inserts or splits anymore. There was two reasons for the cleanup step: 1. When a new tuple was inserted to a leaf page, the downlink in the parent needed to be updated to contain (ie. to be consistent with) the new key. Updating the parent in turn might require recursively updating the parent of the parent. We now handle that by updating the parent while traversing down the tree, so that when we insert the leaf tuple, all the parents are already consistent with the new key, and the tree is consistent at every step. 2. When a page is split, we need to insert the downlink for the new right page(s), and update the downlink for the original page to not include keys that moved to the right page(s). We now handle that by setting a new flag, F_FOLLOW_RIGHT, on the non-rightmost pages in the split. When that flag is set, scans always follow the rightlink, regardless of the NSN mechanism used to detect concurrent page splits. That way the tree is consistent right after split, even though the downlink is still missing. This is very similar to the way B-tree splits are handled. When the downlink is inserted in the parent, the flag is cleared. To keep the insertion algorithm simple, when an insertion sees an incomplete split, indicated by the F_FOLLOW_RIGHT flag, it finishes the split before doing anything else. These changes allow removing the whole "invalid tuple" mechanism, but I retained the scan code to still follow invalid tuples correctly. While we don't create any such tuples anymore, we want to handle them gracefully in case you pg_upgrade a GiST index that has them. If we encounter any on an insert, though, we just throw an error saying that you need to REINDEX. The issue that got me into doing this is that if you did a checkpoint while an insert or split was in progress, and the checkpoint finishes quickly so that there is no WAL record related to the insert between RedoRecPtr and the checkpoint record, recovery from that checkpoint would not know to finish the incomplete insert. IOW, we have the same issue we solved with the rm_safe_restartpoint mechanism during normal operation too. It's highly unlikely to happen in practice, and this fix is far too large to backpatch, so we're just going to live with in previous versions, but this refactoring fixes it going forward. With this patch, you don't get the annoying 'index "FOO" needs VACUUM or REINDEX to finish crash recovery' notices anymore if you crash at an unfortunate moment.
Diffstat (limited to 'src/backend/access/gist')
-rw-r--r--src/backend/access/gist/README181
-rw-r--r--src/backend/access/gist/gist.c1009
-rw-r--r--src/backend/access/gist/gistget.c10
-rw-r--r--src/backend/access/gist/gistsplit.c60
-rw-r--r--src/backend/access/gist/gistutil.c28
-rw-r--r--src/backend/access/gist/gistvacuum.c57
-rw-r--r--src/backend/access/gist/gistxlog.c797
7 files changed, 980 insertions, 1162 deletions
diff --git a/src/backend/access/gist/README b/src/backend/access/gist/README
index 03069f02685..2d78dcb0dfa 100644
--- a/src/backend/access/gist/README
+++ b/src/backend/access/gist/README
@@ -108,43 +108,71 @@ Penalty is used for choosing a subtree to insert; method PickSplit is used for
the node splitting algorithm; method Union is used for propagating changes
upward to maintain the tree properties.
-NOTICE: We modified original INSERT algorithm for performance reason. In
-particularly, it is now a single-pass algorithm.
-
-Function findLeaf is used to identify subtree for insertion. Page, in which
-insertion is proceeded, is locked as well as its parent page. Functions
-findParent and findPath are used to find parent pages, which could be changed
-because of concurrent access. Function pageSplit is recurrent and could split
-page by more than 2 pages, which could be necessary if keys have different
-lengths or more than one key are inserted (in such situation, user defined
-function pickSplit cannot guarantee free space on page).
-
-findLeaf(new-key)
- push(stack, [root, 0]) //page, LSN
- while(true)
- ptr = top of stack
- latch( ptr->page, S-mode )
- ptr->lsn = ptr->page->lsn
- if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn )
- unlatch( ptr->page )
- pop stack
- else if ( ptr->page is not leaf )
- push( stack, [get_best_child(ptr->page, new-key), 0] )
- unlatch( ptr->page )
- else
- unlatch( ptr->page )
- latch( ptr->page, X-mode )
- if ( ptr->page is not leaf )
- //the only root page can become a non-leaf
- unlatch( ptr->page )
- else if ( ptr->parent->lsn < ptr->page->nsn )
- unlatch( ptr->page )
- pop stack
- else
- return stack
- end
- end
- end
+To insert a tuple, we first have to find a suitable leaf page to insert to.
+The algorithm walks down the tree, starting from the root, along the path
+of smallest Penalty. At each step:
+
+1. Has this page been split since we looked at the parent? If so, it's
+possible that we should be inserting to the other half instead, so retreat
+back to the parent.
+2. If this is a leaf node, we've found our target node.
+3. Otherwise use Penalty to pick a new target subtree.
+4. Check the key representing the target subtree. If it doesn't already cover
+the key we're inserting, replace it with the Union of the old downlink key
+and the key being inserted. (Actually, we always call Union, and just skip
+the replacement if the Unioned key is the same as the existing key)
+5. Replacing the key in step 4 might cause the page to be split. In that case,
+propagate the change upwards and restart the algorithm from the first parent
+that didn't need to be split.
+6. Walk down to the target subtree, and goto 1.
+
+This differs from the insertion algorithm in the original paper. In the
+original paper, you first walk down the tree until you reach a leaf page, and
+then you adjust the downlink in the parent, and propagating the adjustment up,
+all the way up to the root in the worst case. But we adjust the downlinks to
+cover the new key already when we walk down, so that when we reach the leaf
+page, we don't need to update the parents anymore, except to insert the
+downlinks if we have to split the page. This makes crash recovery simpler:
+after inserting a key to the page, the tree is immediately self-consistent
+without having to update the parents. Even if we split a page and crash before
+inserting the downlink to the parent, the tree is self-consistent because the
+right half of the split is accessible via the rightlink of the left page
+(which replaced the original page).
+
+Note that the algorithm can walk up and down the tree before reaching a leaf
+page, if internal pages need to split while adjusting the downlinks for the
+new key. Eventually, you should reach the bottom, and proceed with the
+insertion of the new tuple.
+
+Once we've found the target page to insert to, we check if there's room
+for the new tuple. If there is, the tuple is inserted, and we're done.
+If it doesn't fit, however, the page needs to be split. Note that it is
+possible that a page needs to be split into more than two pages, if keys have
+different lengths or more than one key is being inserted at a time (which can
+happen when inserting downlinks for a page split that resulted in more than
+two pages at the lower level). After splitting a page, the parent page needs
+to be updated. The downlink for the new page needs to be inserted, and the
+downlink for the old page, which became the left half of the split, needs to
+be updated to only cover those tuples that stayed on the left page. Inserting
+the downlink in the parent can again lead to a page split, recursing up to the
+root page in the worst case.
+
+gistplacetopage is the workhorse function that performs one step of the
+insertion. If the tuple fits, it inserts it to the given page, otherwise
+it splits the page, and constructs the new downlink tuples for the split
+pages. The caller must then call gistplacetopage() on the parent page to
+insert the downlink tuples. The parent page that holds the downlink to
+the child might have migrated as a result of concurrent splits of the
+parent, gistfindCorrectParent() is used to find the parent page.
+
+Splitting the root page works slightly differently. At root split,
+gistplacetopage() allocates the new child pages and replaces the old root
+page with the new root containing downlinks to the new children, all in one
+operation.
+
+
+findPath is a subroutine of findParent, used when the correct parent page
+can't be found by following the rightlinks at the parent level:
findPath( stack item )
push stack, [root, 0, 0] // page, LSN, parent
@@ -165,9 +193,13 @@ findPath( stack item )
pop stack
end
+
+gistFindCorrectParent is used to re-find the parent of a page during
+insertion. It might have migrated to the right since we traversed down the
+tree because of page splits.
+
findParent( stack item )
parent = item->parent
- latch( parent->page, X-mode )
if ( parent->page->lsn != parent->lsn )
while(true)
search parent tuple on parent->page, if found the return
@@ -181,9 +213,13 @@ findParent( stack item )
end
newstack = findPath( item->parent )
replace part of stack to new one
+ latch( parent->page, X-mode )
return findParent( item )
end
+pageSplit function decides how to distribute keys to the new pages after
+page split:
+
pageSplit(page, allkeys)
(lkeys, rkeys) = pickSplit( allkeys )
if ( page is root )
@@ -204,39 +240,44 @@ pageSplit(page, allkeys)
return newkeys
-placetopage(page, keysarray)
- if ( no space left on page )
- keysarray = pageSplit(page, [ extract_keys(page), keysarray])
- last page in chain gets old NSN,
- original and others - new NSN equals to LSN
- if ( page is root )
- make new root with keysarray
- end
- else
- put keysarray on page
- if ( length of keysarray > 1 )
- keysarray = [ union(keysarray) ]
- end
- end
-insert(new-key)
- stack = findLeaf(new-key)
- keysarray = [new-key]
- ptr = top of stack
- while(true)
- findParent( ptr ) //findParent latches parent page
- keysarray = placetopage(ptr->page, keysarray)
- unlatch( ptr->page )
- pop stack;
- ptr = top of stack
- if (length of keysarray == 1)
- newboundingkey = union(oldboundingkey, keysarray)
- if (newboundingkey == oldboundingkey)
- unlatch ptr->page
- break loop
- end
- end
- end
+Concurrency control
+-------------------
+As a rule of thumb, if you need to hold a lock on multiple pages at the
+same time, the locks should be acquired in the following order: child page
+before parent, and left-to-right at the same level. Always acquiring the
+locks in the same order avoids deadlocks.
+
+The search algorithm only looks at and locks one page at a time. Consequently
+there's a race condition between a search and a page split. A page split
+happens in two phases: 1. The page is split 2. The downlink is inserted to the
+parent. If a search looks at the parent page between those steps, before the
+downlink is inserted, it will still find the new right half by following the
+rightlink on the left half. But it must not follow the rightlink if it saw the
+downlink in the parent, or the page will be visited twice!
+
+A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan
+sees that flag set, it knows that the right page is missing the downlink, and
+should be visited too. When split inserts the downlink to the parent, it
+clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the
+child page header to match the LSN of the insertion on the parent. If the
+F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the
+LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page
+before the downlink was inserted, so it should follow the rightlink. Otherwise
+the scan saw the downlink in the parent page, and will/did follow that as
+usual.
+
+A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because
+a page split keeps the child pages locked until the downlink has been inserted
+to the parent and the flag cleared again. But if a crash happens in the middle
+of a page split, before the downlinks are inserted into the parent, that will
+leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine,
+but we'll eventually want to fix that for performance reasons. And more
+importantly, dealing with pages with missing downlink pointers in the parent
+would complicate the insertion algorithm. So when an insertion sees a page
+with F_FOLLOW_RIGHT set, it immediately tries to bring the split that
+crashed in the middle to completion by adding the downlink in the parent.
+
Authors:
Teodor Sigaev <teodor@sigaev.ru>
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index b34830bb424..7cd144e2f09 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -31,6 +31,12 @@ typedef struct
MemoryContext tmpCtx;
} GISTBuildState;
+/* A List of these is used represent a split-in-progress. */
+typedef struct
+{
+ Buffer buf; /* the split page "half" */
+ IndexTuple downlink; /* downlink for this half. */
+} GISTPageSplitInfo;
/* non-export function prototypes */
static void gistbuildCallback(Relation index,
@@ -43,8 +49,13 @@ static void gistdoinsert(Relation r,
IndexTuple itup,
Size freespace,
GISTSTATE *GISTstate);
-static void gistfindleaf(GISTInsertState *state,
- GISTSTATE *giststate);
+static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
+static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate,
+ IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
+ Buffer leftchild);
+static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, List *splitinfo);
#define ROTATEDIST(d) do { \
@@ -251,41 +262,52 @@ gistinsert(PG_FUNCTION_ARGS)
/*
- * Workhouse routine for doing insertion into a GiST index. Note that
- * this routine assumes it is invoked in a short-lived memory context,
- * so it does not bother releasing palloc'd allocations.
+ * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
+ * at that offset is atomically removed along with inserting the new tuples.
+ * This is used to replace a tuple with a new one.
+ *
+ * If 'leftchildbuf' is valid, we're inserting the downlink for the page
+ * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
+ * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
+ *
+ * If there is not enough room on the page, it is split. All the split
+ * pages are kept pinned and locked and returned in *splitinfo, the caller
+ * is responsible for inserting the downlinks for them. However, if
+ * 'buffer' is the root page and it needs to be split, gistplacetopage()
+ * performs the split as one atomic operation, and *splitinfo is set to NIL.
+ * In that case, we continue to hold the root page locked, and the child
+ * pages are released; note that new tuple(s) are *not* on the root page
+ * but in one of the new child pages.
*/
-static void
-gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
+static bool
+gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
+ Buffer buffer,
+ IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
+ Buffer leftchildbuf,
+ List **splitinfo)
{
- GISTInsertState state;
-
- memset(&state, 0, sizeof(GISTInsertState));
-
- state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
- state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
- memcpy(state.itup[0], itup, IndexTupleSize(itup));
- state.ituplen = 1;
- state.freespace = freespace;
- state.r = r;
- state.key = itup->t_tid;
- state.needInsertComplete = true;
-
- state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
- state.stack->blkno = GIST_ROOT_BLKNO;
+ Page page = BufferGetPage(buffer);
+ bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
+ XLogRecPtr recptr;
+ int i;
+ bool is_split;
- gistfindleaf(&state, giststate);
- gistmakedeal(&state, giststate);
-}
+ /*
+ * Refuse to modify a page that's incompletely split. This should
+ * not happen because we finish any incomplete splits while we walk
+ * down the tree. However, it's remotely possible that another
+ * concurrent inserter splits a parent page, and errors out before
+ * completing the split. We will just throw an error in that case,
+ * and leave any split we had in progress unfinished too. The next
+ * insert that comes along will clean up the mess.
+ */
+ if (GistFollowRight(page))
+ elog(ERROR, "concurrent GiST page split was incomplete");
-static bool
-gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
-{
- bool is_splitted = false;
- bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
+ *splitinfo = NIL;
/*
- * if (!is_leaf) remove old key: This node's key has been modified, either
+ * if isupdate, remove old key: This node's key has been modified, either
* because a child split occurred or because we needed to adjust our key
* for an insert in a child node. Therefore, remove the old version of
* this node's key.
@@ -293,77 +315,134 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
* for WAL replay, in the non-split case we handle this by setting up a
* one-element todelete array; in the split case, it's handled implicitly
* because the tuple vector passed to gistSplit won't include this tuple.
- *
- * XXX: If we want to change fillfactors between node and leaf, fillfactor
- * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
*/
- if (gistnospace(state->stack->page, state->itup, state->ituplen,
- is_leaf ? InvalidOffsetNumber : state->stack->childoffnum,
- state->freespace))
+ is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
+ if (is_split)
{
/* no space for insertion */
IndexTuple *itvec;
int tlen;
SplitedPageLayout *dist = NULL,
*ptr;
- BlockNumber rrlink = InvalidBlockNumber;
- GistNSN oldnsn;
+ BlockNumber oldrlink = InvalidBlockNumber;
+ GistNSN oldnsn = { 0, 0 };
+ SplitedPageLayout rootpg;
+ BlockNumber blkno = BufferGetBlockNumber(buffer);
+ bool is_rootsplit;
- is_splitted = true;
+ is_rootsplit = (blkno == GIST_ROOT_BLKNO);
/*
- * Form index tuples vector to split: remove old tuple if t's needed
- * and add new tuples to vector
+ * Form index tuples vector to split. If we're replacing an old tuple,
+ * remove the old version from the vector.
*/
- itvec = gistextractpage(state->stack->page, &tlen);
- if (!is_leaf)
+ itvec = gistextractpage(page, &tlen);
+ if (OffsetNumberIsValid(oldoffnum))
{
/* on inner page we should remove old tuple */
- int pos = state->stack->childoffnum - FirstOffsetNumber;
+ int pos = oldoffnum - FirstOffsetNumber;
tlen--;
if (pos != tlen)
memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
}
- itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
- dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
-
- state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
- state->ituplen = 0;
+ itvec = gistjoinvector(itvec, &tlen, itup, ntup);
+ dist = gistSplit(state->r, page, itvec, tlen, giststate);
- if (state->stack->blkno != GIST_ROOT_BLKNO)
+ /*
+ * Set up pages to work with. Allocate new buffers for all but the
+ * leftmost page. The original page becomes the new leftmost page,
+ * and is just replaced with the new contents.
+ *
+ * For a root-split, allocate new buffers for all child pages, the
+ * original page is overwritten with new root page containing
+ * downlinks to the new child pages.
+ */
+ ptr = dist;
+ if (!is_rootsplit)
{
- /*
- * if non-root split then we should not allocate new buffer, but
- * we must create temporary page to operate
- */
- dist->buffer = state->stack->buffer;
- dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer));
+ /* save old rightlink and NSN */
+ oldrlink = GistPageGetOpaque(page)->rightlink;
+ oldnsn = GistPageGetOpaque(page)->nsn;
+
+ dist->buffer = buffer;
+ dist->block.blkno = BufferGetBlockNumber(buffer);
+ dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
/* clean all flags except F_LEAF */
GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+
+ ptr = ptr->next;
+ }
+ for (; ptr; ptr = ptr->next)
+ {
+ /* Allocate new page */
+ ptr->buffer = gistNewBuffer(state->r);
+ GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
+ ptr->page = BufferGetPage(ptr->buffer);
+ ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
}
- /* make new pages and fills them */
+ /*
+ * Now that we know whick blocks the new pages go to, set up downlink
+ * tuples to point to them.
+ */
for (ptr = dist; ptr; ptr = ptr->next)
{
- int i;
- char *data;
+ ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+ GistTupleSetValid(ptr->itup);
+ }
- /* get new page */
- if (ptr->buffer == InvalidBuffer)
+ /*
+ * If this is a root split, we construct the new root page with the
+ * downlinks here directly, instead of requiring the caller to insert
+ * them. Add the new root page to the list along with the child pages.
+ */
+ if (is_rootsplit)
+ {
+ IndexTuple *downlinks;
+ int ndownlinks = 0;
+ int i;
+
+ rootpg.buffer = buffer;
+ rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
+ GistPageGetOpaque(rootpg.page)->flags = 0;
+
+ /* Prepare a vector of all the downlinks */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ ndownlinks++;
+ downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
+ for (i = 0, ptr = dist; ptr; ptr = ptr->next)
+ downlinks[i++] = ptr->itup;
+
+ rootpg.block.blkno = GIST_ROOT_BLKNO;
+ rootpg.block.num = ndownlinks;
+ rootpg.list = gistfillitupvec(downlinks, ndownlinks,
+ &(rootpg.lenlist));
+ rootpg.itup = NULL;
+
+ rootpg.next = dist;
+ dist = &rootpg;
+ }
+ else
+ {
+ /* Prepare split-info to be returned to caller */
+ for (ptr = dist; ptr; ptr = ptr->next)
{
- ptr->buffer = gistNewBuffer(state->r);
- GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
- ptr->page = BufferGetPage(ptr->buffer);
+ GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
+ si->buf = ptr->buffer;
+ si->downlink = ptr->itup;
+ *splitinfo = lappend(*splitinfo, si);
}
- ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
+ }
- /*
- * fill page, we can do it because all these pages are new (ie not
- * linked in tree or masked by temp page
- */
- data = (char *) (ptr->list);
+ /*
+ * Fill all pages. All the pages are new, ie. freshly allocated empty
+ * pages, or a temporary copy of the old page.
+ */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ char *data = (char *) (ptr->list);
for (i = 0; i < ptr->block.num; i++)
{
if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
@@ -371,276 +450,381 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
data += IndexTupleSize((IndexTuple) data);
}
- /* set up ItemPointer and remember it for parent */
- ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
- state->itup[state->ituplen] = ptr->itup;
- state->ituplen++;
- }
+ /* Set up rightlinks */
+ if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
+ GistPageGetOpaque(ptr->page)->rightlink =
+ ptr->next->block.blkno;
+ else
+ GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
+
+ if (ptr->next && !is_rootsplit)
+ GistMarkFollowRight(ptr->page);
+ else
+ GistClearFollowRight(ptr->page);
- /* saves old rightlink */
- if (state->stack->blkno != GIST_ROOT_BLKNO)
- rrlink = GistPageGetOpaque(dist->page)->rightlink;
+ /*
+ * Copy the NSN of the original page to all pages. The
+ * F_FOLLOW_RIGHT flags ensure that scans will follow the
+ * rightlinks until the downlinks are inserted.
+ */
+ GistPageGetOpaque(ptr->page)->nsn = oldnsn;
+ }
START_CRIT_SECTION();
/*
- * must mark buffers dirty before XLogInsert, even though we'll still
- * be changing their opaque fields below. set up right links.
+ * Must mark buffers dirty before XLogInsert, even though we'll still
+ * be changing their opaque fields below.
*/
for (ptr = dist; ptr; ptr = ptr->next)
- {
MarkBufferDirty(ptr->buffer);
- GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
- ptr->next->block.blkno : rrlink;
- }
+ if (BufferIsValid(leftchildbuf))
+ MarkBufferDirty(leftchildbuf);
- /* restore splitted non-root page */
- if (state->stack->blkno != GIST_ROOT_BLKNO)
- {
- PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
- dist->page = BufferGetPage(dist->buffer);
- }
+ /*
+ * The first page in the chain was a temporary working copy meant
+ * to replace the old page. Copy it over the old page.
+ */
+ PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
+ dist->page = BufferGetPage(dist->buffer);
+ /* Write the WAL record */
if (RelationNeedsWAL(state->r))
- {
- XLogRecPtr recptr;
- XLogRecData *rdata;
-
- rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
- is_leaf, &(state->key), dist);
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
-
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- PageSetLSN(ptr->page, recptr);
- PageSetTLI(ptr->page, ThisTimeLineID);
- }
- }
+ recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
+ dist, oldrlink, oldnsn, leftchildbuf);
else
- {
- for (ptr = dist; ptr; ptr = ptr->next)
- {
- PageSetLSN(ptr->page, GetXLogRecPtrForTemp());
- }
- }
-
- /* set up NSN */
- oldnsn = GistPageGetOpaque(dist->page)->nsn;
- if (state->stack->blkno == GIST_ROOT_BLKNO)
- /* if root split we should put initial value */
- oldnsn = PageGetLSN(dist->page);
+ recptr = GetXLogRecPtrForTemp();
for (ptr = dist; ptr; ptr = ptr->next)
{
- /* only for last set oldnsn */
- GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
- PageGetLSN(ptr->page) : oldnsn;
+ PageSetLSN(ptr->page, recptr);
+ PageSetTLI(ptr->page, ThisTimeLineID);
}
/*
- * release buffers, if it was a root split then release all buffers
- * because we create all buffers
+ * Return the new child buffers to the caller.
+ *
+ * If this was a root split, we've already inserted the downlink
+ * pointers, in the form of a new root page. Therefore we can
+ * release all the new buffers, and keep just the root page locked.
*/
- ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next;
- for (; ptr; ptr = ptr->next)
- UnlockReleaseBuffer(ptr->buffer);
-
- if (state->stack->blkno == GIST_ROOT_BLKNO)
+ if (is_rootsplit)
{
- gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
- state->needInsertComplete = false;
+ for (ptr = dist->next; ptr; ptr = ptr->next)
+ UnlockReleaseBuffer(ptr->buffer);
}
-
- END_CRIT_SECTION();
}
else
{
- /* enough space */
+ /*
+ * Enough space. We also get here if ntuples==0.
+ */
START_CRIT_SECTION();
- if (!is_leaf)
- PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
- gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
+ if (OffsetNumberIsValid(oldoffnum))
+ PageIndexTupleDelete(page, oldoffnum);
+ gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
- MarkBufferDirty(state->stack->buffer);
+ MarkBufferDirty(buffer);
+
+ if (BufferIsValid(leftchildbuf))
+ MarkBufferDirty(leftchildbuf);
if (RelationNeedsWAL(state->r))
{
- OffsetNumber noffs = 0,
- offs[1];
- XLogRecPtr recptr;
- XLogRecData *rdata;
+ OffsetNumber ndeloffs = 0,
+ deloffs[1];
- if (!is_leaf)
+ if (OffsetNumberIsValid(oldoffnum))
{
- /* only on inner page we should delete previous version */
- offs[0] = state->stack->childoffnum;
- noffs = 1;
+ deloffs[0] = oldoffnum;
+ ndeloffs = 1;
}
- rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
- offs, noffs,
- state->itup, state->ituplen,
- &(state->key));
+ recptr = gistXLogUpdate(state->r->rd_node, buffer,
+ deloffs, ndeloffs, itup, ntup,
+ leftchildbuf);
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
- PageSetLSN(state->stack->page, recptr);
- PageSetTLI(state->stack->page, ThisTimeLineID);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
}
else
- PageSetLSN(state->stack->page, GetXLogRecPtrForTemp());
-
- if (state->stack->blkno == GIST_ROOT_BLKNO)
- state->needInsertComplete = false;
+ {
+ recptr = GetXLogRecPtrForTemp();
+ PageSetLSN(page, recptr);
+ }
- END_CRIT_SECTION();
+ *splitinfo = NIL;
+ }
- if (state->ituplen > 1)
- { /* previous is_splitted==true */
+ /*
+ * If we inserted the downlink for a child page, set NSN and clear
+ * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know
+ * to follow the rightlink if and only if they looked at the parent page
+ * before we inserted the downlink.
+ *
+ * Note that we do this *after* writing the WAL record. That means that
+ * the possible full page image in the WAL record does not include
+ * these changes, and they must be replayed even if the page is restored
+ * from the full page image. There's a chicken-and-egg problem: if we
+ * updated the child pages first, we wouldn't know the recptr of the WAL
+ * record we're about to write.
+ */
+ if (BufferIsValid(leftchildbuf))
+ {
+ Page leftpg = BufferGetPage(leftchildbuf);
- /*
- * child was splited, so we must form union for insertion in
- * parent
- */
- IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
+ GistPageGetOpaque(leftpg)->nsn = recptr;
+ GistClearFollowRight(leftpg);
- ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
- state->itup[0] = newtup;
- state->ituplen = 1;
- }
- else if (is_leaf)
- {
- /*
- * itup[0] store key to adjust parent, we set it to valid to
- * correct check by GistTupleIsInvalid macro in gistgetadjusted()
- */
- ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
- GistTupleSetValid(state->itup[0]);
- }
+ PageSetLSN(leftpg, recptr);
+ PageSetTLI(leftpg, ThisTimeLineID);
}
- return is_splitted;
+
+ END_CRIT_SECTION();
+
+ return is_split;
}
/*
- * returns stack of pages, all pages in stack are pinned, and
- * leaf is X-locked
+ * Workhouse routine for doing insertion into a GiST index. Note that
+ * this routine assumes it is invoked in a short-lived memory context,
+ * so it does not bother releasing palloc'd allocations.
*/
-
static void
-gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
+gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
{
ItemId iid;
IndexTuple idxtuple;
- GISTPageOpaque opaque;
+ GISTInsertStack firststack;
+ GISTInsertStack *stack;
+ GISTInsertState state;
+ bool xlocked = false;
+
+ memset(&state, 0, sizeof(GISTInsertState));
+ state.freespace = freespace;
+ state.r = r;
+
+ /* Start from the root */
+ firststack.blkno = GIST_ROOT_BLKNO;
+ firststack.lsn.xrecoff = 0;
+ firststack.parent = NULL;
+ state.stack = stack = &firststack;
/*
- * walk down, We don't lock page for a long time, but so we should be
- * ready to recheck path in a bad case... We remember, that page->lsn
- * should never be invalid.
+ * Walk down along the path of smallest penalty, updating the parent
+ * pointers with the key we're inserting as we go. If we crash in the
+ * middle, the tree is consistent, although the possible parent updates
+ * were a waste.
*/
for (;;)
{
- if (XLogRecPtrIsInvalid(state->stack->lsn))
- state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
- LockBuffer(state->stack->buffer, GIST_SHARE);
- gistcheckpage(state->r, state->stack->buffer);
+ if (XLogRecPtrIsInvalid(stack->lsn))
+ stack->buffer = ReadBuffer(state.r, stack->blkno);
+
+ /*
+ * Be optimistic and grab shared lock first. Swap it for an
+ * exclusive lock later if we need to update the page.
+ */
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_SHARE);
+ gistcheckpage(state.r, stack->buffer);
+ }
+
+ stack->page = (Page) BufferGetPage(stack->buffer);
+ stack->lsn = PageGetLSN(stack->page);
+ Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
- state->stack->page = (Page) BufferGetPage(state->stack->buffer);
- opaque = GistPageGetOpaque(state->stack->page);
+ /*
+ * If this page was split but the downlink was never inserted to
+ * the parent because the inserting backend crashed before doing
+ * that, fix that now.
+ */
+ if (GistFollowRight(stack->page))
+ {
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ /* someone might've completed the split when we unlocked */
+ if (!GistFollowRight(stack->page))
+ continue;
+ }
+ gistfixsplit(&state, giststate);
- state->stack->lsn = PageGetLSN(state->stack->page);
- Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn));
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ continue;
+ }
- if (state->stack->blkno != GIST_ROOT_BLKNO &&
- XLByteLT(state->stack->parent->lsn, opaque->nsn))
+ if (stack->blkno != GIST_ROOT_BLKNO &&
+ XLByteLT(stack->parent->lsn,
+ GistPageGetOpaque(stack->page)->nsn))
{
/*
- * caused split non-root page is detected, go up to parent to
- * choose best child
+ * Concurrent split detected. There's no guarantee that the
+ * downlink for this page is consistent with the tuple we're
+ * inserting anymore, so go back to parent and rechoose the
+ * best child.
*/
- UnlockReleaseBuffer(state->stack->buffer);
- state->stack = state->stack->parent;
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
continue;
}
- if (!GistPageIsLeaf(state->stack->page))
+ if (!GistPageIsLeaf(stack->page))
{
/*
- * This is an internal page, so continue to walk down the tree. We
- * find the child node that has the minimum insertion penalty and
- * recursively invoke ourselves to modify that node. Once the
- * recursive call returns, we may need to adjust the parent node
- * for two reasons: the child node split, or the key in this node
- * needs to be adjusted for the newly inserted key below us.
+ * This is an internal page so continue to walk down the tree.
+ * Find the child node that has the minimum insertion penalty.
*/
- GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
-
- state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
+ BlockNumber childblkno;
+ IndexTuple newtup;
+ GISTInsertStack *item;
- iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
- idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
- item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
- LockBuffer(state->stack->buffer, GIST_UNLOCK);
+ stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
+ iid = PageGetItemId(stack->page, stack->childoffnum);
+ idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+ childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
- item->parent = state->stack;
- item->child = NULL;
- if (state->stack)
- state->stack->child = item;
- state->stack = item;
- }
- else
- {
- /* be carefull, during unlock/lock page may be changed... */
- LockBuffer(state->stack->buffer, GIST_UNLOCK);
- LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
- state->stack->page = (Page) BufferGetPage(state->stack->buffer);
- opaque = GistPageGetOpaque(state->stack->page);
+ /*
+ * Check that it's not a leftover invalid tuple from pre-9.1
+ */
+ if (GistTupleIsInvalid(idxtuple))
+ ereport(ERROR,
+ (errmsg("index \"%s\" contains an inner tuple marked as invalid",
+ RelationGetRelationName(r)),
+ errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
+ errhint("Please REINDEX it.")));
- if (state->stack->blkno == GIST_ROOT_BLKNO)
+ /*
+ * Check that the key representing the target child node is
+ * consistent with the key we're inserting. Update it if it's not.
+ */
+ newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
+ if (newtup)
{
/*
- * the only page can become inner instead of leaf is a root
- * page, so for root we should recheck it
+ * Swap shared lock for an exclusive one. Beware, the page
+ * may change while we unlock/lock the page...
*/
- if (!GistPageIsLeaf(state->stack->page))
+ if (!xlocked)
{
- /*
- * very rarely situation: during unlock/lock index with
- * number of pages = 1 was increased
- */
- LockBuffer(state->stack->buffer, GIST_UNLOCK);
- continue;
- }
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ stack->page = (Page) BufferGetPage(stack->buffer);
+ if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
+ {
+ /* the page was changed while we unlocked it, retry */
+ continue;
+ }
+ }
/*
- * we don't need to check root split, because checking
- * leaf/inner is enough to recognize split for root
+ * Update the tuple.
+ *
+ * gistinserthere() might have to split the page to make the
+ * updated tuple fit. It will adjust the stack so that after
+ * the call, we'll be holding a lock on the page containing
+ * the tuple, which might have moved right.
+ *
+ * Except if this causes a root split, gistinserthere()
+ * returns 'true'. In that case, stack only holds the new
+ * root, and the child page was released. Have to start
+ * all over.
*/
-
+ if (gistinserttuples(&state, stack, giststate, &newtup, 1,
+ stack->childoffnum, InvalidBuffer))
+ {
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ continue;
+ }
}
- else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ xlocked = false;
+
+ /* descend to the chosen child */
+ item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
+ item->blkno = childblkno;
+ item->parent = stack;
+ state.stack = stack = item;
+ }
+ else
+ {
+ /*
+ * Leaf page. Insert the new key. We've already updated all the
+ * parents on the way down, but we might have to split the page
+ * if it doesn't fit. gistinserthere() will take care of that.
+ */
+
+ /*
+ * Swap shared lock for an exclusive one. Be careful, the page
+ * may change while we unlock/lock the page...
+ */
+ if (!xlocked)
{
- /*
- * detecting split during unlock/lock, so we should find
- * better child on parent
- */
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ stack->page = (Page) BufferGetPage(stack->buffer);
+ stack->lsn = PageGetLSN(stack->page);
- /* forget buffer */
- UnlockReleaseBuffer(state->stack->buffer);
+ if (stack->blkno == GIST_ROOT_BLKNO)
+ {
+ /*
+ * the only page that can become inner instead of leaf
+ * is the root page, so for root we should recheck it
+ */
+ if (!GistPageIsLeaf(stack->page))
+ {
+ /*
+ * very rare situation: during unlock/lock index with
+ * number of pages = 1 was increased
+ */
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ xlocked = false;
+ continue;
+ }
- state->stack = state->stack->parent;
- continue;
+ /*
+ * we don't need to check root split, because checking
+ * leaf/inner is enough to recognize split for root
+ */
+ }
+ else if (GistFollowRight(stack->page) ||
+ XLByteLT(stack->parent->lsn,
+ GistPageGetOpaque(stack->page)->nsn))
+ {
+ /*
+ * The page was split while we momentarily unlocked the
+ * page. Go back to parent.
+ */
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ continue;
+ }
}
- state->stack->lsn = PageGetLSN(state->stack->page);
+ /* now state.stack->(page, buffer and blkno) points to leaf page */
- /* ok we found a leaf page and it X-locked */
+ gistinserttuples(&state, stack, giststate, &itup, 1,
+ InvalidOffsetNumber, InvalidBuffer);
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+
+ /* Release any pins we might still hold before exiting */
+ for (; stack; stack = stack->parent)
+ ReleaseBuffer(stack->buffer);
break;
}
}
-
- /* now state->stack->(page, buffer and blkno) points to leaf page */
}
/*
@@ -648,7 +832,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
*
* returns from the beginning of closest parent;
*
- * To prevent deadlocks, this should lock only one page simultaneously.
+ * To prevent deadlocks, this should lock only one page at a time.
*/
GISTInsertStack *
gistFindPath(Relation r, BlockNumber child)
@@ -683,6 +867,13 @@ gistFindPath(Relation r, BlockNumber child)
top->lsn = PageGetLSN(page);
+ /*
+ * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
+ * downlink. This should not normally happen..
+ */
+ if (GistFollowRight(page))
+ elog(ERROR, "concurrent GiST page split was incomplete");
+
if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
{
@@ -711,8 +902,6 @@ gistFindPath(Relation r, BlockNumber child)
ptr = top;
while (ptr->parent)
{
- /* set child link */
- ptr->parent->child = ptr;
/* move childoffnum.. */
if (ptr == top)
{
@@ -754,17 +943,16 @@ gistFindPath(Relation r, BlockNumber child)
return NULL;
}
-
/*
- * Returns X-locked parent of stack page
+ * Updates the stack so that child->parent is the correct parent of the
+ * child. child->parent must be exclusively locked on entry, and will
+ * remain so at exit, but it might not be the same page anymore.
*/
-
static void
gistFindCorrectParent(Relation r, GISTInsertStack *child)
{
GISTInsertStack *parent = child->parent;
- LockBuffer(parent->buffer, GIST_EXCLUSIVE);
gistcheckpage(r, parent->buffer);
parent->page = (Page) BufferGetPage(parent->buffer);
@@ -836,83 +1024,231 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
/* install new chain of parents to stack */
child->parent = parent;
- parent->child = child;
/* make recursive call to normal processing */
+ LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
gistFindCorrectParent(r, child);
}
return;
}
-void
-gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
+/*
+ * Form a downlink pointer for the page in 'buf'.
+ */
+static IndexTuple
+gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
+ GISTInsertStack *stack)
{
- int is_splitted;
- ItemId iid;
- IndexTuple oldtup,
- newtup;
+ Page page = BufferGetPage(buf);
+ OffsetNumber maxoff;
+ OffsetNumber offset;
+ IndexTuple downlink = NULL;
- /* walk up */
- while (true)
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
{
- /*
- * After this call: 1. if child page was splited, then itup contains
- * keys for each page 2. if child page wasn't splited, then itup
- * contains additional for adjustment of current key
- */
-
- if (state->stack->parent)
+ IndexTuple ituple = (IndexTuple)
+ PageGetItem(page, PageGetItemId(page, offset));
+ if (downlink == NULL)
+ downlink = CopyIndexTuple(ituple);
+ else
{
- /*
- * X-lock parent page before proceed child, gistFindCorrectParent
- * should find and lock it
- */
- gistFindCorrectParent(state->r, state->stack);
+ IndexTuple newdownlink;
+ newdownlink = gistgetadjusted(rel, downlink, ituple,
+ giststate);
+ if (newdownlink)
+ downlink = newdownlink;
}
- is_splitted = gistplacetopage(state, giststate);
+ }
+
+ /*
+ * If the page is completely empty, we can't form a meaningful
+ * downlink for it. But we have to insert a downlink for the page.
+ * Any key will do, as long as its consistent with the downlink of
+ * parent page, so that we can legally insert it to the parent.
+ * A minimal one that matches as few scans as possible would be best,
+ * to keep scans from doing useless work, but we don't know how to
+ * construct that. So we just use the downlink of the original page
+ * that was split - that's as far from optimal as it can get but will
+ * do..
+ */
+ if (!downlink)
+ {
+ ItemId iid;
+
+ LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
+ gistFindCorrectParent(rel, stack);
+ iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
+ downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
+ downlink = CopyIndexTuple(downlink);
+ LockBuffer(stack->parent->buffer, GIST_UNLOCK);
+ }
- /* parent locked above, so release child buffer */
- UnlockReleaseBuffer(state->stack->buffer);
+ ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
+ GistTupleSetValid(downlink);
- /* pop parent page from stack */
- state->stack = state->stack->parent;
+ return downlink;
+}
- /* stack is void */
- if (!state->stack)
- break;
- /*
- * child did not split, so we can check is it needed to update parent
- * tuple
- */
- if (!is_splitted)
- {
- /* parent's tuple */
- iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
- oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
- newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
-
- if (!newtup)
- { /* not need to update key */
- LockBuffer(state->stack->buffer, GIST_UNLOCK);
- break;
- }
+/*
+ * Complete the incomplete split of state->stack->page.
+ */
+static void
+gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
+{
+ GISTInsertStack *stack = state->stack;
+ Buffer buf;
+ Page page;
+ List *splitinfo = NIL;
+
+ elog(LOG, "fixing incomplete split in index \"%s\", block %u",
+ RelationGetRelationName(state->r), stack->blkno);
- state->itup[0] = newtup;
+ Assert(GistFollowRight(stack->page));
+ Assert(OffsetNumberIsValid(stack->parent->childoffnum));
+
+ buf = stack->buffer;
+
+ /*
+ * Read the chain of split pages, following the rightlinks. Construct
+ * a downlink tuple for each page.
+ */
+ for (;;)
+ {
+ GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
+ IndexTuple downlink;
+
+ page = BufferGetPage(buf);
+
+ /* Form the new downlink tuples to insert to parent */
+ downlink = gistformdownlink(state->r, buf, giststate, stack);
+
+ si->buf = buf;
+ si->downlink = downlink;
+
+ splitinfo = lappend(splitinfo, si);
+
+ if (GistFollowRight(page))
+ {
+ /* lock next page */
+ buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
+ LockBuffer(buf, GIST_EXCLUSIVE);
}
- } /* while */
+ else
+ break;
+ }
+
+ /* Insert the downlinks */
+ gistfinishsplit(state, stack, giststate, splitinfo);
+}
+
+/*
+ * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
+ * replace an old tuple at oldoffnum. The caller must hold an exclusive lock
+ * on the page.
+ *
+ * If leftchild is valid, we're inserting/updating the downlink for the
+ * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
+ * update NSN on leftchild, atomically with the insertion of the downlink.
+ *
+ * Returns 'true' if the page had to be split. On return, we will continue
+ * to hold an exclusive lock on state->stack->buffer, but if we had to split
+ * the page, it might not contain the tuple we just inserted/updated.
+ */
+static bool
+gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate,
+ IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
+ Buffer leftchild)
+{
+ List *splitinfo;
+ bool is_split;
+
+ is_split = gistplacetopage(state, giststate, stack->buffer,
+ tuples, ntup, oldoffnum,
+ leftchild,
+ &splitinfo);
+ if (splitinfo)
+ gistfinishsplit(state, stack, giststate, splitinfo);
+
+ return is_split;
+}
+
+/*
+ * Finish an incomplete split by inserting/updating the downlinks in
+ * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
+ * involved in the split, from left-to-right.
+ */
+static void
+gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, List *splitinfo)
+{
+ ListCell *lc;
+ List *reversed;
+ GISTPageSplitInfo *right;
+ GISTPageSplitInfo *left;
+ IndexTuple tuples[2];
+
+ /* A split always contains at least two halves */
+ Assert(list_length(splitinfo) >= 2);
+
+ /*
+ * We need to insert downlinks for each new page, and update the
+ * downlink for the original (leftmost) page in the split. Begin at
+ * the rightmost page, inserting one downlink at a time until there's
+ * only two pages left. Finally insert the downlink for the last new
+ * page and update the downlink for the original page as one operation.
+ */
- /* release all parent buffers */
- while (state->stack)
+ /* for convenience, create a copy of the list in reverse order */
+ reversed = NIL;
+ foreach(lc, splitinfo)
{
- ReleaseBuffer(state->stack->buffer);
- state->stack = state->stack->parent;
+ reversed = lcons(lfirst(lc), reversed);
}
- /* say to xlog that insert is completed */
- if (state->needInsertComplete && RelationNeedsWAL(state->r))
- gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
+ LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
+ gistFindCorrectParent(state->r, stack);
+
+ while(list_length(reversed) > 2)
+ {
+ right = (GISTPageSplitInfo *) linitial(reversed);
+ left = (GISTPageSplitInfo *) lsecond(reversed);
+
+ if (gistinserttuples(state, stack->parent, giststate,
+ &right->downlink, 1,
+ InvalidOffsetNumber,
+ left->buf))
+ {
+ /*
+ * If the parent page was split, need to relocate the original
+ * parent pointer.
+ */
+ gistFindCorrectParent(state->r, stack);
+ }
+ UnlockReleaseBuffer(right->buf);
+ reversed = list_delete_first(reversed);
+ }
+
+ right = (GISTPageSplitInfo *) linitial(reversed);
+ left = (GISTPageSplitInfo *) lsecond(reversed);
+
+ /*
+ * Finally insert downlink for the remaining right page and update the
+ * downlink for the original page to not contain the tuples that were
+ * moved to the new pages.
+ */
+ tuples[0] = left->downlink;
+ tuples[1] = right->downlink;
+ gistinserttuples(state, stack->parent, giststate,
+ tuples, 2,
+ stack->parent->childoffnum,
+ left->buf);
+ LockBuffer(stack->parent->buffer, GIST_UNLOCK);
+ UnlockReleaseBuffer(right->buf);
+ Assert(left->buf == stack->buffer);
}
/*
@@ -963,8 +1299,7 @@ gistSplit(Relation r,
ROTATEDIST(res);
res->block.num = v.splitVector.spl_nright;
res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
- res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
- : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
+ res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
}
if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
@@ -986,51 +1321,13 @@ gistSplit(Relation r,
ROTATEDIST(res);
res->block.num = v.splitVector.spl_nleft;
res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
- res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
- : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
+ res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
}
return res;
}
/*
- * buffer must be pinned and locked by caller
- */
-void
-gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
-{
- Page page;
-
- Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
- page = BufferGetPage(buffer);
-
- START_CRIT_SECTION();
-
- GISTInitBuffer(buffer, 0);
- gistfillbuffer(page, itup, len, FirstOffsetNumber);
-
- MarkBufferDirty(buffer);
-
- if (RelationNeedsWAL(r))
- {
- XLogRecPtr recptr;
- XLogRecData *rdata;
-
- rdata = formUpdateRdata(r->rd_node, buffer,
- NULL, 0,
- itup, len, key);
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
- PageSetLSN(page, recptr);
- PageSetTLI(page, ThisTimeLineID);
- }
- else
- PageSetLSN(page, GetXLogRecPtrForTemp());
-
- END_CRIT_SECTION();
-}
-
-/*
* Fill a GISTSTATE with information about the index
*/
void
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 56921cfee01..5061003a3bf 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -254,9 +254,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
page = BufferGetPage(buffer);
opaque = GistPageGetOpaque(page);
- /* check if page split occurred since visit to parent */
+ /*
+ * Check if we need to follow the rightlink. We need to follow it if the
+ * page was concurrently split since we visited the parent (in which case
+ * parentlsn < nsn), or if the the system crashed after a page split but
+ * before the downlink was inserted into the parent.
+ */
if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
- XLByteLT(pageItem->data.parentlsn, opaque->nsn) &&
+ (GistFollowRight(page) ||
+ XLByteLT(pageItem->data.parentlsn, opaque->nsn)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* There was a page split, follow right link to add pages */
diff --git a/src/backend/access/gist/gistsplit.c b/src/backend/access/gist/gistsplit.c
index 0ce317cdbc8..ac9a6578414 100644
--- a/src/backend/access/gist/gistsplit.c
+++ b/src/backend/access/gist/gistsplit.c
@@ -500,58 +500,6 @@ gistSplitHalf(GIST_SPLITVEC *v, int len)
}
/*
- * if it was invalid tuple then we need special processing.
- * We move all invalid tuples on right page.
- *
- * if there is no place on left page, gistSplit will be called one more
- * time for left page.
- *
- * Normally, we never exec this code, but after crash replay it's possible
- * to get 'invalid' tuples (probability is low enough)
- */
-static void
-gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
-{
- int i;
- static OffsetNumber offInvTuples[MaxOffsetNumber];
- int nOffInvTuples = 0;
-
- for (i = 1; i <= len; i++)
- if (GistTupleIsInvalid(itup[i - 1]))
- offInvTuples[nOffInvTuples++] = i;
-
- if (nOffInvTuples == len)
- {
- /* corner case, all tuples are invalid */
- v->spl_rightvalid = v->spl_leftvalid = false;
- gistSplitHalf(&v->splitVector, len);
- }
- else
- {
- GistSplitUnion gsvp;
-
- v->splitVector.spl_right = offInvTuples;
- v->splitVector.spl_nright = nOffInvTuples;
- v->spl_rightvalid = false;
-
- v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
- v->splitVector.spl_nleft = 0;
- for (i = 1; i <= len; i++)
- if (!GistTupleIsInvalid(itup[i - 1]))
- v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
- v->spl_leftvalid = true;
-
- gsvp.equiv = NULL;
- gsvp.attr = v->spl_lattr;
- gsvp.len = v->splitVector.spl_nleft;
- gsvp.entries = v->splitVector.spl_left;
- gsvp.isnull = v->spl_lisnull;
-
- gistunionsubkeyvec(giststate, itup, &gsvp, 0);
- }
-}
-
-/*
* trys to split page by attno key, in a case of null
* values move its to separate page.
*/
@@ -568,12 +516,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
Datum datum;
bool IsNull;
- if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
- {
- gistSplitByInvalid(giststate, v, itup, len);
- return;
- }
-
datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, attno, &(entryvec->vector[i]),
datum, r, page, i,
@@ -582,8 +524,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
offNullTuples[nOffNullTuples++] = i;
}
- v->spl_leftvalid = v->spl_rightvalid = true;
-
if (nOffNullTuples == len)
{
/*
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index d488bbb4200..8a0d3568604 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -152,7 +152,7 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
* invalid tuple. Resulting Datums aren't compressed.
*/
-bool
+void
gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull)
{
@@ -180,10 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
Datum datum;
bool IsNull;
- if (GistTupleIsInvalid(itvec[j]))
- return FALSE; /* signals that union with invalid tuple =>
- * result is invalid */
-
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
@@ -218,8 +214,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
isnull[i] = FALSE;
}
}
-
- return TRUE;
}
/*
@@ -231,8 +225,7 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
- if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS))
- return gist_form_invalid_tuple(InvalidBlockNumber);
+ gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS);
return gistFormTuple(giststate, r, attrS, isnullS, false);
}
@@ -328,9 +321,6 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
IndexTuple newtup = NULL;
int i;
- if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
- return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
-
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldentries, oldisnull);
@@ -401,14 +391,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
- if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup))
- {
- ereport(LOG,
- (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery",
- RelationGetRelationName(r))));
- continue;
- }
-
sum_grow = 0;
for (j = 0; j < r->rd_att->natts; j++)
{
@@ -521,7 +503,11 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
}
res = index_form_tuple(giststate->tupdesc, compatt, isnull);
- GistTupleSetValid(res);
+ /*
+ * The offset number on tuples on internal pages is unused. For historical
+ * reasons, it is set 0xffff.
+ */
+ ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff);
return res;
}
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index e02e72d4313..86af414dced 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -26,13 +26,6 @@
#include "utils/memutils.h"
-typedef struct GistBulkDeleteResult
-{
- IndexBulkDeleteResult std; /* common state */
- bool needReindex;
-} GistBulkDeleteResult;
-
-
/*
* VACUUM cleanup: update FSM
*/
@@ -40,7 +33,7 @@ Datum
gistvacuumcleanup(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
- GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation rel = info->index;
BlockNumber npages,
blkno;
@@ -56,10 +49,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
/* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL)
{
- stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* use heap's tuple count */
- stats->std.num_index_tuples = info->num_heap_tuples;
- stats->std.estimated_count = info->estimated_count;
+ stats->num_index_tuples = info->num_heap_tuples;
+ stats->estimated_count = info->estimated_count;
/*
* XXX the above is wrong if index is partial. Would it be OK to just
@@ -67,11 +60,6 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
*/
}
- if (stats->needReindex)
- ereport(NOTICE,
- (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
- RelationGetRelationName(rel))));
-
/*
* Need lock unless it's local to this backend.
*/
@@ -112,10 +100,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
IndexFreeSpaceMapVacuum(info->index);
/* return statistics */
- stats->std.pages_free = totFreePages;
+ stats->pages_free = totFreePages;
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
- stats->std.num_pages = RelationGetNumberOfBlocks(rel);
+ stats->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
@@ -135,7 +123,7 @@ pushStackIfSplited(Page page, GistBDItem *stack)
GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
- XLByteLT(stack->parentlsn, opaque->nsn) &&
+ (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* split page detected, install right link to the stack */
@@ -162,7 +150,7 @@ Datum
gistbulkdelete(PG_FUNCTION_ARGS)
{
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
- GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index;
@@ -171,10 +159,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
/* first time through? */
if (stats == NULL)
- stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* we'll re-count the tuples each time */
- stats->std.estimated_count = false;
- stats->std.num_index_tuples = 0;
+ stats->estimated_count = false;
+ stats->num_index_tuples = 0;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
@@ -232,10 +220,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
{
todelete[ntodelete] = i - ntodelete;
ntodelete++;
- stats->std.tuples_removed += 1;
+ stats->tuples_removed += 1;
}
else
- stats->std.num_index_tuples += 1;
+ stats->num_index_tuples += 1;
}
if (ntodelete)
@@ -250,22 +238,13 @@ gistbulkdelete(PG_FUNCTION_ARGS)
if (RelationNeedsWAL(rel))
{
- XLogRecData *rdata;
XLogRecPtr recptr;
- gistxlogPageUpdate *xlinfo;
- rdata = formUpdateRdata(rel->rd_node, buffer,
+ recptr = gistXLogUpdate(rel->rd_node, buffer,
todelete, ntodelete,
- NULL, 0,
- NULL);
- xlinfo = (gistxlogPageUpdate *) rdata->next->data;
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+ NULL, 0, InvalidBuffer);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
-
- pfree(xlinfo);
- pfree(rdata);
}
else
PageSetLSN(page, GetXLogRecPtrForTemp());
@@ -293,7 +272,11 @@ gistbulkdelete(PG_FUNCTION_ARGS)
stack->next = ptr;
if (GistTupleIsInvalid(idxtuple))
- stats->needReindex = true;
+ ereport(LOG,
+ (errmsg("index \"%s\" contains an inner tuple marked as invalid",
+ RelationGetRelationName(rel)),
+ errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
+ errhint("Please REINDEX it.")));
}
}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index a90303e547b..7d25728d8d8 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -20,15 +20,6 @@
#include "utils/memutils.h"
#include "utils/rel.h"
-
-typedef struct
-{
- gistxlogPageUpdate *data;
- int len;
- IndexTuple *itup;
- OffsetNumber *todelete;
-} PageUpdateRecord;
-
typedef struct
{
gistxlogPage *header;
@@ -41,144 +32,37 @@ typedef struct
NewPage *page;
} PageSplitRecord;
-/* track for incomplete inserts, idea was taken from nbtxlog.c */
-
-typedef struct gistIncompleteInsert
-{
- RelFileNode node;
- BlockNumber origblkno; /* for splits */
- ItemPointerData key;
- int lenblk;
- BlockNumber *blkno;
- XLogRecPtr lsn;
- BlockNumber *path;
- int pathlen;
-} gistIncompleteInsert;
-
-
static MemoryContext opCtx; /* working memory for operations */
-static MemoryContext insertCtx; /* holds incomplete_inserts list */
-static List *incomplete_inserts;
-
-
-#define ItemPointerEQ(a, b) \
- ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
- ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
-
+/*
+ * Replay the clearing of F_FOLLOW_RIGHT flag.
+ */
static void
-pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
- BlockNumber *blkno, int lenblk,
- PageSplitRecord *xlinfo /* to extract blkno info */ )
+gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
+ BlockNumber leftblkno)
{
- MemoryContext oldCxt;
- gistIncompleteInsert *ninsert;
+ Buffer buffer;
- if (!ItemPointerIsValid(&key))
+ buffer = XLogReadBuffer(node, leftblkno, false);
+ if (BufferIsValid(buffer))
+ {
+ Page page = (Page) BufferGetPage(buffer);
/*
- * if key is null then we should not store insertion as incomplete,
- * because it's a vacuum operation..
+ * Note that we still update the page even if page LSN is equal to the
+ * LSN of this record, because the updated NSN is not included in the
+ * full page image.
*/
- return;
-
- oldCxt = MemoryContextSwitchTo(insertCtx);
- ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
-
- ninsert->node = node;
- ninsert->key = key;
- ninsert->lsn = lsn;
-
- if (lenblk && blkno)
- {
- ninsert->lenblk = lenblk;
- ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
- memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
- ninsert->origblkno = *blkno;
- }
- else
- {
- int i;
-
- Assert(xlinfo);
- ninsert->lenblk = xlinfo->data->npage;
- ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
- for (i = 0; i < ninsert->lenblk; i++)
- ninsert->blkno[i] = xlinfo->page[i].header->blkno;
- ninsert->origblkno = xlinfo->data->origblkno;
- }
- Assert(ninsert->lenblk > 0);
-
- /*
- * Stick the new incomplete insert onto the front of the list, not the
- * back. This is so that gist_xlog_cleanup will process incompletions in
- * last-in-first-out order.
- */
- incomplete_inserts = lcons(ninsert, incomplete_inserts);
-
- MemoryContextSwitchTo(oldCxt);
-}
-
-static void
-forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
-{
- ListCell *l;
-
- if (!ItemPointerIsValid(&key))
- return;
-
- if (incomplete_inserts == NIL)
- return;
-
- foreach(l, incomplete_inserts)
- {
- gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
- if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
+ if (!XLByteLT(lsn, PageGetLSN(page)))
{
- /* found */
- incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
- pfree(insert->blkno);
- pfree(insert);
- break;
- }
- }
-}
+ GistPageGetOpaque(page)->nsn = lsn;
+ GistClearFollowRight(page);
-static void
-decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
-{
- char *begin = XLogRecGetData(record),
- *ptr;
- int i = 0,
- addpath = 0;
-
- decoded->data = (gistxlogPageUpdate *) begin;
-
- if (decoded->data->ntodelete)
- {
- decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
- addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
- }
- else
- decoded->todelete = NULL;
-
- decoded->len = 0;
- ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
- while (ptr - begin < record->xl_len)
- {
- decoded->len++;
- ptr += IndexTupleSize((IndexTuple) ptr);
- }
-
- decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
-
- ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
- while (ptr - begin < record->xl_len)
- {
- decoded->itup[i] = (IndexTuple) ptr;
- ptr += IndexTupleSize(decoded->itup[i]);
- i++;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
}
}
@@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
* redo any page update (except page split)
*/
static void
-gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
+gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
{
- gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
- PageUpdateRecord xlrec;
+ char *begin = XLogRecGetData(record);
+ gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
Buffer buffer;
Page page;
+ char *data;
- /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
- forgetIncompleteInsert(xldata->node, xldata->key);
+ if (BlockNumberIsValid(xldata->leftchild))
+ gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
- if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
- /* operation with root always finalizes insertion */
- pushIncompleteInsert(xldata->node, lsn, xldata->key,
- &(xldata->blkno), 1,
- NULL);
-
- /* nothing else to do if page was backed up (and no info to do it with) */
+ /* nothing more to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
- decodePageUpdateRecord(&xlrec, record);
-
- buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
@@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
return;
}
- if (isnewroot)
- GISTInitBuffer(buffer, 0);
- else if (xlrec.data->ntodelete)
+ data = begin + sizeof(gistxlogPageUpdate);
+
+ /* Delete old tuples */
+ if (xldata->ntodelete > 0)
{
int i;
+ OffsetNumber *todelete = (OffsetNumber *) data;
+ data += sizeof(OffsetNumber) * xldata->ntodelete;
- for (i = 0; i < xlrec.data->ntodelete; i++)
- PageIndexTupleDelete(page, xlrec.todelete[i]);
+ for (i = 0; i < xldata->ntodelete; i++)
+ PageIndexTupleDelete(page, todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
- if (xlrec.len > 0)
- gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
-
- /*
- * special case: leafpage, nothing to insert, nothing to delete, then
- * vacuum marks page
- */
- if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
- GistClearTuplesDeleted(page);
+ if (data - begin < record->xl_len)
+ {
+ OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
+ while (data - begin < record->xl_len)
+ {
+ IndexTuple itup = (IndexTuple) data;
+ Size sz = IndexTupleSize(itup);
+ OffsetNumber l;
+ data += sz;
+
+ l = PageAddItem(page, (Item) itup, sz, off, false, false);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to GiST index page, size %d bytes",
+ (int) sz);
+ off++;
+ }
+ }
+ else
+ {
+ /*
+ * special case: leafpage, nothing to insert, nothing to delete, then
+ * vacuum marks page
+ */
+ if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
+ GistClearTuplesDeleted(page);
+ }
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
@@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
+ gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
PageSplitRecord xlrec;
Buffer buffer;
Page page;
int i;
- int flags;
+ bool isrootsplit = false;
+ if (BlockNumberIsValid(xldata->leftchild))
+ gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
decodePageSplitRecord(&xlrec, record);
- flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
+ int flags;
+
+ if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ {
+ Assert(i == 0);
+ isrootsplit = true;
+ }
buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
/* ok, clear buffer */
+ if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
+ flags = F_LEAF;
+ else
+ flags = 0;
GISTInitBuffer(buffer, flags);
/* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+ if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ {
+ GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
+ GistPageGetOpaque(page)->nsn = xldata->orignsn;
+ GistClearFollowRight(page);
+ }
+ else
+ {
+ if (i < xlrec.data->npage - 1)
+ GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+ else
+ GistPageGetOpaque(page)->rightlink = xldata->origrlink;
+ GistPageGetOpaque(page)->nsn = xldata->orignsn;
+ if (i < xlrec.data->npage - 1 && !isrootsplit)
+ GistMarkFollowRight(page);
+ else
+ GistClearFollowRight(page);
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
-
- forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
-
- pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
- NULL, 0,
- &xlrec);
}
static void
@@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
}
-static void
-gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
-{
- char *begin = XLogRecGetData(record),
- *ptr;
- gistxlogInsertComplete *xlrec;
-
- xlrec = (gistxlogInsertComplete *) begin;
-
- ptr = begin + sizeof(gistxlogInsertComplete);
- while (ptr - begin < record->xl_len)
- {
- Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
- forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
- ptr += sizeof(ItemPointerData);
- }
-}
-
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
@@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
-
RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
- gistRedoPageUpdateRecord(lsn, record, false);
+ gistRedoPageUpdateRecord(lsn, record);
break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
- case XLOG_GIST_NEW_ROOT:
- gistRedoPageUpdateRecord(lsn, record, true);
- break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
- case XLOG_GIST_INSERT_COMPLETE:
- gistRedoCompleteInsert(lsn, record);
- break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
@@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
}
static void
-out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
+out_target(StringInfo buf, RelFileNode node)
{
appendStringInfo(buf, "rel %u/%u/%u",
node.spcNode, node.dbNode, node.relNode);
- if (ItemPointerIsValid(&key))
- appendStringInfo(buf, "; tid %u/%u",
- ItemPointerGetBlockNumber(&key),
- ItemPointerGetOffsetNumber(&key));
}
static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
- out_target(buf, xlrec->node, xlrec->key);
+ out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
@@ -463,7 +358,7 @@ static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_split: ");
- out_target(buf, xlrec->node, xlrec->key);
+ out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u splits to %d pages",
xlrec->origblkno, xlrec->npage);
}
@@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
- case XLOG_GIST_NEW_ROOT:
- appendStringInfo(buf, "new_root: ");
- out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
- break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break;
@@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
((RelFileNode *) rec)->dbNode,
((RelFileNode *) rec)->relNode);
break;
- case XLOG_GIST_INSERT_COMPLETE:
- appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
- ((gistxlogInsertComplete *) rec)->node.spcNode,
- ((gistxlogInsertComplete *) rec)->node.dbNode,
- ((gistxlogInsertComplete *) rec)->node.relNode);
- break;
default:
appendStringInfo(buf, "unknown gist op code %u", info);
break;
}
}
-IndexTuple
-gist_form_invalid_tuple(BlockNumber blkno)
-{
- /*
- * we don't alloc space for null's bitmap, this is invalid tuple, be
- * carefull in read and write code
- */
- Size size = IndexInfoFindDataOffset(0);
- IndexTuple tuple = (IndexTuple) palloc0(size);
-
- tuple->t_info |= size;
-
- ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
- GistTupleSetInvalid(tuple);
-
- return tuple;
-}
-
-
-static void
-gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
-{
- GISTInsertStack *top;
-
- insert->pathlen = 0;
- insert->path = NULL;
-
- if ((top = gistFindPath(index, insert->origblkno)) != NULL)
- {
- int i;
- GISTInsertStack *ptr;
-
- for (ptr = top; ptr; ptr = ptr->parent)
- insert->pathlen++;
-
- insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
-
- i = 0;
- for (ptr = top; ptr; ptr = ptr->parent)
- insert->path[i++] = ptr->blkno;
- }
- else
- elog(ERROR, "lost parent for block %u", insert->origblkno);
-}
-
-static SplitedPageLayout *
-gistMakePageLayout(Buffer *buffers, int nbuffers)
-{
- SplitedPageLayout *res = NULL,
- *resptr;
-
- while (nbuffers-- > 0)
- {
- Page page = BufferGetPage(buffers[nbuffers]);
- IndexTuple *vec;
- int veclen;
-
- resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
-
- resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
- resptr->block.num = PageGetMaxOffsetNumber(page);
-
- vec = gistextractpage(page, &veclen);
- resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
-
- resptr->next = res;
- res = resptr;
- }
-
- return res;
-}
-
-/*
- * Continue insert after crash. In normal situations, there aren't any
- * incomplete inserts, but if a crash occurs partway through an insertion
- * sequence, we'll need to finish making the index valid at the end of WAL
- * replay.
- *
- * Note that we assume the index is now in a valid state, except for the
- * unfinished insertion. In particular it's safe to invoke gistFindPath();
- * there shouldn't be any garbage pages for it to run into.
- *
- * To complete insert we can't use basic insertion algorithm because
- * during insertion we can't call user-defined support functions of opclass.
- * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
- * 'invalid' tuple should be updated by vacuum full.
- */
-static void
-gistContinueInsert(gistIncompleteInsert *insert)
-{
- IndexTuple *itup;
- int i,
- lenitup;
- Relation index;
-
- index = CreateFakeRelcacheEntry(insert->node);
-
- /*
- * needed vector itup never will be more than initial lenblkno+2, because
- * during this processing Indextuple can be only smaller
- */
- lenitup = insert->lenblk;
- itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
-
- for (i = 0; i < insert->lenblk; i++)
- itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
-
- /*
- * any insertion of itup[] should make LOG message about
- */
-
- if (insert->origblkno == GIST_ROOT_BLKNO)
- {
- /*
- * it was split root, so we should only make new root. it can't be
- * simple insert into root, we should replace all content of root.
- */
- Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
-
- gistnewroot(index, buffer, itup, lenitup, NULL);
- UnlockReleaseBuffer(buffer);
- }
- else
- {
- Buffer *buffers;
- Page *pages;
- int numbuffer;
- OffsetNumber *todelete;
-
- /* construct path */
- gistxlogFindPath(index, insert);
-
- Assert(insert->pathlen > 0);
-
- buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
- pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
- todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
-
- for (i = 0; i < insert->pathlen; i++)
- {
- int j,
- k,
- pituplen = 0;
- uint8 xlinfo;
- XLogRecData *rdata;
- XLogRecPtr recptr;
- Buffer tempbuffer = InvalidBuffer;
- int ntodelete = 0;
-
- numbuffer = 1;
- buffers[0] = ReadBuffer(index, insert->path[i]);
- LockBuffer(buffers[0], GIST_EXCLUSIVE);
-
- /*
- * we check buffer, because we restored page earlier
- */
- gistcheckpage(index, buffers[0]);
-
- pages[0] = BufferGetPage(buffers[0]);
- Assert(!GistPageIsLeaf(pages[0]));
-
- pituplen = PageGetMaxOffsetNumber(pages[0]);
-
- /* find remove old IndexTuples to remove */
- for (j = 0; j < pituplen && ntodelete < lenitup; j++)
- {
- BlockNumber blkno;
- ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
- IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
-
- blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
-
- for (k = 0; k < lenitup; k++)
- if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
- {
- todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
- ntodelete++;
- break;
- }
- }
-
- if (ntodelete == 0)
- elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
-
- /*
- * we check space with subtraction only first tuple to delete,
- * hope, that wiil be enough space....
- */
-
- if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
- {
-
- /* no space left on page, so we must split */
- buffers[numbuffer] = ReadBuffer(index, P_NEW);
- LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
- GISTInitBuffer(buffers[numbuffer], 0);
- pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
- gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
- numbuffer++;
-
- if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
- {
- Buffer tmp;
-
- /*
- * we split root, just copy content from root to new page
- */
-
- /* sanity check */
- if (i + 1 != insert->pathlen)
- elog(PANIC, "unexpected pathlen in index \"%s\"",
- RelationGetRelationName(index));
-
- /* fill new page, root will be changed later */
- tempbuffer = ReadBuffer(index, P_NEW);
- LockBuffer(tempbuffer, GIST_EXCLUSIVE);
- memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
-
- /* swap buffers[0] (was root) and temp buffer */
- tmp = buffers[0];
- buffers[0] = tempbuffer;
- tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO,
- * it is still unchanged */
-
- pages[0] = BufferGetPage(buffers[0]);
- }
-
- START_CRIT_SECTION();
-
- for (j = 0; j < ntodelete; j++)
- PageIndexTupleDelete(pages[0], todelete[j]);
-
- xlinfo = XLOG_GIST_PAGE_SPLIT;
- rdata = formSplitRdata(index->rd_node, insert->path[i],
- false, &(insert->key),
- gistMakePageLayout(buffers, numbuffer));
-
- }
- else
- {
- START_CRIT_SECTION();
-
- for (j = 0; j < ntodelete; j++)
- PageIndexTupleDelete(pages[0], todelete[j]);
- gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
-
- xlinfo = XLOG_GIST_PAGE_UPDATE;
- rdata = formUpdateRdata(index->rd_node, buffers[0],
- todelete, ntodelete,
- itup, lenitup, &(insert->key));
- }
-
- /*
- * use insert->key as mark for completion of insert (form*Rdata()
- * above) for following possible replays
- */
-
- /* write pages, we should mark it dirty befor XLogInsert() */
- for (j = 0; j < numbuffer; j++)
- {
- GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
- MarkBufferDirty(buffers[j]);
- }
- recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
- for (j = 0; j < numbuffer; j++)
- {
- PageSetLSN(pages[j], recptr);
- PageSetTLI(pages[j], ThisTimeLineID);
- }
-
- END_CRIT_SECTION();
-
- lenitup = numbuffer;
- for (j = 0; j < numbuffer; j++)
- {
- itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
- UnlockReleaseBuffer(buffers[j]);
- }
-
- if (tempbuffer != InvalidBuffer)
- {
- /*
- * it was a root split, so fill it by new values
- */
- gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
- UnlockReleaseBuffer(tempbuffer);
- }
- }
- }
-
- FreeFakeRelcacheEntry(index);
-
- ereport(LOG,
- (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
- insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
- errdetail("Incomplete insertion detected during crash replay.")));
-}
-
void
gist_xlog_startup(void)
{
- incomplete_inserts = NIL;
- insertCtx = AllocSetContextCreate(CurrentMemoryContext,
- "GiST recovery temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void)
{
- ListCell *l;
- MemoryContext oldCxt;
-
- oldCxt = MemoryContextSwitchTo(opCtx);
-
- foreach(l, incomplete_inserts)
- {
- gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
- gistContinueInsert(insert);
- MemoryContextReset(opCtx);
- }
- MemoryContextSwitchTo(oldCxt);
-
MemoryContextDelete(opCtx);
- MemoryContextDelete(insertCtx);
-}
-
-bool
-gist_safe_restartpoint(void)
-{
- if (incomplete_inserts)
- return false;
- return true;
}
-
-XLogRecData *
-formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
- ItemPointer key, SplitedPageLayout *dist)
+/*
+ * Write WAL record of a page split.
+ */
+XLogRecPtr
+gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
+ SplitedPageLayout *dist,
+ BlockNumber origrlink, GistNSN orignsn,
+ Buffer leftchildbuf)
{
XLogRecData *rdata;
- gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
+ gistxlogPageSplit xlrec;
SplitedPageLayout *ptr;
int npage = 0,
- cur = 1;
+ cur;
+ XLogRecPtr recptr;
- ptr = dist;
- while (ptr)
- {
+ for (ptr = dist; ptr; ptr = ptr->next)
npage++;
- ptr = ptr->next;
- }
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
- xlrec->node = node;
- xlrec->origblkno = blkno;
- xlrec->origleaf = page_is_leaf;
- xlrec->npage = (uint16) npage;
- if (key)
- xlrec->key = *key;
- else
- ItemPointerSetInvalid(&(xlrec->key));
+ xlrec.node = node;
+ xlrec.origblkno = blkno;
+ xlrec.origrlink = origrlink;
+ xlrec.orignsn = orignsn;
+ xlrec.origleaf = page_is_leaf;
+ xlrec.npage = (uint16) npage;
+ xlrec.leftchild =
+ BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) xlrec;
+ rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(gistxlogPageSplit);
- rdata[0].next = NULL;
+ rdata[0].buffer = InvalidBuffer;
- ptr = dist;
- while (ptr)
+ cur = 1;
+
+ /*
+ * Include a full page image of the child buf. (only necessary if a
+ * checkpoint happened since the child page was split)
+ */
+ if (BufferIsValid(leftchildbuf))
{
+ rdata[cur - 1].next = &(rdata[cur]);
+ rdata[cur].data = NULL;
+ rdata[cur].len = 0;
+ rdata[cur].buffer = leftchildbuf;
+ rdata[cur].buffer_std = true;
+ cur++;
+ }
+
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) &(ptr->block);
rdata[cur].len = sizeof(gistxlogPage);
- rdata[cur - 1].next = &(rdata[cur]);
cur++;
+ rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) (ptr->list);
rdata[cur].len = ptr->lenlist;
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].next = NULL;
cur++;
- ptr = ptr->next;
}
+ rdata[cur - 1].next = NULL;
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
- return rdata;
+ pfree(rdata);
+ return recptr;
}
/*
- * Construct the rdata array for an XLOG record describing a page update
- * (deletion and/or insertion of tuples on a single index page).
+ * Write XLOG record describing a page update. The update can include any
+ * number of deletions and/or insertions of tuples on a single index page.
+ *
+ * If this update inserts a downlink for a split page, also record that
+ * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
*
* Note that both the todelete array and the tuples are marked as belonging
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
@@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
* at least one rdata item referencing the buffer, even when ntodelete and
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
*/
-XLogRecData *
-formUpdateRdata(RelFileNode node, Buffer buffer,
- OffsetNumber *todelete, int ntodelete,
- IndexTuple *itup, int ituplen, ItemPointer key)
+XLogRecPtr
+gistXLogUpdate(RelFileNode node, Buffer buffer,
+ OffsetNumber *todelete, int ntodelete,
+ IndexTuple *itup, int ituplen,
+ Buffer leftchildbuf)
{
XLogRecData *rdata;
gistxlogPageUpdate *xlrec;
int cur,
i;
+ XLogRecPtr recptr;
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
+ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete;
-
- if (key)
- xlrec->key = *key;
- else
- ItemPointerSetInvalid(&(xlrec->key));
+ xlrec->leftchild =
+ BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
rdata[0].buffer = buffer;
rdata[0].buffer_std = true;
@@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[1].next = &(rdata[2]);
rdata[2].data = (char *) todelete;
- rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+ rdata[2].len = sizeof(OffsetNumber) * ntodelete;
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
- rdata[2].next = NULL;
- /* new tuples */
cur = 3;
+
+ /* new tuples */
for (i = 0; i < ituplen; i++)
{
rdata[cur - 1].next = &(rdata[cur]);
@@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].buffer = buffer;
rdata[cur].buffer_std = true;
- rdata[cur].next = NULL;
cur++;
}
- return rdata;
-}
-
-XLogRecPtr
-gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
-{
- gistxlogInsertComplete xlrec;
- XLogRecData rdata[2];
- XLogRecPtr recptr;
-
- Assert(len > 0);
- xlrec.node = node;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(gistxlogInsertComplete);
- rdata[0].next = &(rdata[1]);
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) keys;
- rdata[1].len = sizeof(ItemPointerData) * len;
- rdata[1].next = NULL;
-
- START_CRIT_SECTION();
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
+ /*
+ * Include a full page image of the child buf. (only necessary if
+ * a checkpoint happened since the child page was split)
+ */
+ if (BufferIsValid(leftchildbuf))
+ {
+ rdata[cur - 1].next = &(rdata[cur]);
+ rdata[cur].data = NULL;
+ rdata[cur].len = 0;
+ rdata[cur].buffer = leftchildbuf;
+ rdata[cur].buffer_std = true;
+ cur++;
+ }
+ rdata[cur - 1].next = NULL;
- END_CRIT_SECTION();
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+ pfree(rdata);
return recptr;
}