diff options
Diffstat (limited to 'src/backend/access/gin/ginbtree.c')
-rw-r--r-- | src/backend/access/gin/ginbtree.c | 53 |
1 files changed, 42 insertions, 11 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c index f032faa22ec..010812dc6d3 100644 --- a/src/backend/access/gin/ginbtree.c +++ b/src/backend/access/gin/ginbtree.c @@ -112,10 +112,8 @@ ginFindLeafPage(GinBtree btree, GinBtreeStack *stack) /* rightmost page */ break; + stack->buffer = ginStepRight(stack->buffer, btree->index, access); stack->blkno = rightlink; - LockBuffer(stack->buffer, GIN_UNLOCK); - stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno); - LockBuffer(stack->buffer, access); page = BufferGetPage(stack->buffer); } @@ -148,6 +146,41 @@ ginFindLeafPage(GinBtree btree, GinBtreeStack *stack) } } +/* + * Step right from current page. + * + * The next page is locked first, before releasing the current page. This is + * crucial to protect from concurrent page deletion (see comment in + * ginDeletePage). + */ +Buffer +ginStepRight(Buffer buffer, Relation index, int lockmode) +{ + Buffer nextbuffer; + Page page = BufferGetPage(buffer); + bool isLeaf = GinPageIsLeaf(page); + bool isData = GinPageIsData(page); + BlockNumber blkno = GinPageGetOpaque(page)->rightlink; + + nextbuffer = ReadBuffer(index, blkno); + LockBuffer(nextbuffer, lockmode); + UnlockReleaseBuffer(buffer); + + /* Sanity check that the page we stepped to is of similar kind. */ + page = BufferGetPage(nextbuffer); + if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page)) + elog(ERROR, "right sibling of GIN page is of different type"); + + /* + * Given the proper lock sequence above, we should never land on a + * deleted page. + */ + if (GinPageIsDeleted(page)) + elog(ERROR, "right sibling of GIN page was deleted"); + + return nextbuffer; +} + void freeGinBtreeStack(GinBtreeStack *stack) { @@ -235,12 +268,12 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber) { blkno = GinPageGetOpaque(page)->rightlink; - LockBuffer(buffer, GIN_UNLOCK); - ReleaseBuffer(buffer); if (blkno == InvalidBlockNumber) + { + UnlockReleaseBuffer(buffer); break; - buffer = ReadBuffer(btree->index, blkno); - LockBuffer(buffer, GIN_EXCLUSIVE); + } + buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE); page = BufferGetPage(buffer); } @@ -444,23 +477,21 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) { BlockNumber rightlink = GinPageGetOpaque(page)->rightlink; - LockBuffer(parent->buffer, GIN_UNLOCK); - if (rightlink == InvalidBlockNumber) { /* * rightmost page, but we don't find parent, we should use * plain search... */ + LockBuffer(parent->buffer, GIN_UNLOCK); ginFindParents(btree, stack, rootBlkno); parent = stack->parent; Assert(parent != NULL); break; } + parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE); parent->blkno = rightlink; - parent->buffer = ReleaseAndReadBuffer(parent->buffer, btree->index, parent->blkno); - LockBuffer(parent->buffer, GIN_EXCLUSIVE); page = BufferGetPage(parent->buffer); } |