diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-30 17:52:14 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-30 17:52:14 +0000 |
commit | 898a7bd13bb9d5ae36d2defcf3bbed3bd1d5ffd6 (patch) | |
tree | d4973dbb3c3f10bfc7becb9199f3b0b7b233c9c3 /src/backend/access/gist/gistxlog.c | |
parent | 7a30b1fb966fb14852ebe6bea35d801792f2cc61 (diff) | |
download | postgresql-898a7bd13bb9d5ae36d2defcf3bbed3bd1d5ffd6.tar.gz postgresql-898a7bd13bb9d5ae36d2defcf3bbed3bd1d5ffd6.zip |
Bug fixes for GiST crash recovery.
- add forgotten check of lsn for insert completion
- remove level of pages: hard to check in recovery
- some cleanups
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 114 |
1 files changed, 77 insertions, 37 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 30fd5b71eeb..15acb18c80d 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.5 2005/06/28 15:51:00 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.6 2005/06/30 17:52:14 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -44,6 +44,7 @@ typedef struct { typedef struct gistIncompleteInsert { RelFileNode node; + BlockNumber origblkno; /* for splits */ ItemPointerData key; int lenblk; BlockNumber *blkno; @@ -79,6 +80,7 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, ninsert->lenblk = lenblk; ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk); + ninsert->origblkno = *blkno; } else { int i; @@ -87,6 +89,7 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); for(i=0;i<ninsert->lenblk;i++) ninsert->blkno[i] = xlinfo->page[i].header->blkno; + ninsert->origblkno = xlinfo->data->origblkno; } Assert( ninsert->lenblk>0 ); @@ -209,6 +212,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); @@ -466,81 +470,98 @@ gist_form_invalid_tuple(BlockNumber blkno) { return tuple; } +static Buffer +gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) { + Buffer buffer = XLogReadBuffer( false, r, blkno ); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistXLogReadAndLockBuffer: block %u unfound", blkno); + if ( PageIsNew( (PageHeader)(BufferGetPage(buffer)) ) ) + elog(PANIC, "gistXLogReadAndLockBuffer: uninitialized page %u", blkno); + + return buffer; +} + + static void gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) { - int i; GISTInsertStack *top; insert->pathlen = 0; insert->path = NULL; - for(i=0;insert->lenblk;i++) { - if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) { - GISTInsertStack *ptr=top; - while(ptr) { - insert->pathlen++; - ptr = ptr->parent; - } + if ( (top=gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL ) { + int i; + GISTInsertStack *ptr=top; + while(ptr) { + insert->pathlen++; + ptr = ptr->parent; + } - insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen ); + insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen ); - i=0; - ptr = top; - while(ptr) { - insert->path[i] = ptr->blkno; - i++; - ptr = ptr->parent; - } - break; + i=0; + ptr = top; + while(ptr) { + insert->path[i] = ptr->blkno; + i++; + ptr = ptr->parent; } - } + } else + elog(LOG, "gixtxlogFindPath: lost parent for block %u", insert->origblkno); } static void gistContinueInsert(gistIncompleteInsert *insert) { IndexTuple *itup; int i, lenitup; - MemoryContext oldCxt; Relation index; - oldCxt = MemoryContextSwitchTo(opCtx); - index = XLogOpenRelation(insert->node); - if (!RelationIsValid(index)) + if (!RelationIsValid(index)) return; - elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index", - insert->node.spcNode, insert->node.dbNode, insert->node.relNode); - /* needed vector itup never will be more than initial lenblkno+2, because during this processing Indextuple can be only smaller */ lenitup = insert->lenblk; itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/)); - for(i=0;i<insert->lenblk;i++) + for(i=0;i<insert->lenblk;i++) itup[i] = gist_form_invalid_tuple( insert->blkno[i] ); - /* construct path */ - gixtxlogFindPath( index, insert ); - - if ( insert->pathlen==0 ) { - /*it was split root, so we should only make new root*/ + if ( insert->origblkno==GIST_ROOT_BLKNO ) { + /*it was split root, so we should only make new root. + it can't be simple insert into root, look at call + pushIncompleteInsert in gistRedoPageSplitRecord */ Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); Page page; if (!BufferIsValid(buffer)) elog(PANIC, "gistContinueInsert: root block unfound"); + page = BufferGetPage(buffer); + if (XLByteLE(insert->lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + GISTInitBuffer(buffer, 0); page = BufferGetPage(buffer); gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); + PageSetLSN(page, insert->lsn); + PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } else { Buffer *buffers; Page *pages; int numbuffer; - + + /* construct path */ + gixtxlogFindPath( index, insert ); + + Assert( insert->pathlen > 0 ); + buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) ); pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) ); @@ -555,6 +576,12 @@ gistContinueInsert(gistIncompleteInsert *insert) { if ( PageIsNew((PageHeader)(pages[numbuffer-1])) ) elog(PANIC, "gistContinueInsert: uninitialized page"); + if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer-1]))) { + LockBuffer(buffers[numbuffer-1], BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffers[numbuffer-1]); + return; + } + pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]); /* remove old IndexTuples */ @@ -587,9 +614,10 @@ gistContinueInsert(gistIncompleteInsert *insert) { if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) { IndexTuple *parentitup; + /* we split root, just copy tuples from old root to new page */ parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen); - /* we split root, just copy tuples from old root to new page */ + /* sanity check */ if ( i+1 != insert->pathlen ) elog(PANIC,"gistContinueInsert: can't restore index '%s'", RelationGetRelationName( index )); @@ -624,14 +652,15 @@ gistContinueInsert(gistIncompleteInsert *insert) { itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) ); PageSetLSN(pages[j], insert->lsn); PageSetTLI(pages[j], ThisTimeLineID); + GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK); WriteBuffer( buffers[j] ); } } } - MemoryContextSwitchTo(oldCxt); - MemoryContextReset(opCtx); + elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index", + insert->node.spcNode, insert->node.dbNode, insert->node.relNode); } void @@ -648,11 +677,22 @@ gist_xlog_startup(void) { void gist_xlog_cleanup(void) { ListCell *l; + List *reverse=NIL; + MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); - foreach(l, incomplete_inserts) { + /* we should call gistContinueInsert in reverse order */ + + foreach(l, incomplete_inserts) + reverse = lappend(reverse, lfirst(l)); + + MemoryContextSwitchTo(opCtx); + foreach(l, reverse) { gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); gistContinueInsert(insert); + MemoryContextReset(opCtx); } + MemoryContextSwitchTo(oldCxt); + MemoryContextDelete(opCtx); MemoryContextDelete(insertCtx); } |