diff options
Diffstat (limited to 'src/backend/access/gist/gistvacuum.c')
-rw-r--r-- | src/backend/access/gist/gistvacuum.c | 483 |
1 files changed, 301 insertions, 182 deletions
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index e81c0ebf487..9b32304d1ae 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -47,23 +47,235 @@ typedef struct bool emptypage; } ArrayTuple; +/* + * Make union of keys on page + */ +static IndexTuple +PageMakeUnionKey(GistVacuum *gv, Buffer buffer) { + Page page = BufferGetPage( buffer ); + IndexTuple *vec, + tmp, res; + int veclen = 0; + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + vec = gistextractpage(page, &veclen); + /* we call gistunion() in temprorary context because user-defined functions called in gistunion() + may do not free all memory */ + tmp = gistunion(gv->index, vec, veclen, &(gv->giststate)); + MemoryContextSwitchTo(oldCtx); + + res = (IndexTuple) palloc(IndexTupleSize(tmp)); + memcpy(res, tmp, IndexTupleSize(tmp)); + + ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer)); + GistTupleSetValid(res); + + MemoryContextReset(gv->opCtx); + + return res; +} + +static void +gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) { + Buffer buffer; + Page page; + + buffer = ReadBuffer(gv->index, blkno); + LockBuffer(buffer, GIST_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + if ( !GistPageIsLeaf(page) ) { + int i; + + for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i)) { + ItemId iid = PageGetItemId(page, i); + IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid); + gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid))); + } + } + + START_CRIT_SECTION(); + + MarkBufferDirty(buffer); + + page = (Page) BufferGetPage(buffer); + GistPageSetDeleted(page); + gv->result->std.pages_deleted++; + + if (!gv->index->rd_istemp) + { + XLogRecData rdata; + XLogRecPtr recptr; + gistxlogPageDelete xlrec; + + xlrec.node = gv->index->rd_node; + xlrec.blkno = blkno; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof(gistxlogPageDelete); + rdata.next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + else + PageSetLSN(page, XLogRecPtrForTemp); + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); +} + +static Page +GistPageGetCopyPage( Page page ) { + Size pageSize = PageGetPageSize( page ); + Page tmppage; + + tmppage=(Page)palloc( pageSize ); + memcpy( tmppage, page, pageSize ); + + return tmppage; +} + +static ArrayTuple +vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon) { + ArrayTuple res = {NULL, 0, false}; + IndexTuple *vec; + SplitedPageLayout *dist = NULL, + *ptr; + int i, veclen=0; + BlockNumber blkno = BufferGetBlockNumber(buffer); + MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + + vec = gistextractpage(tempPage, &veclen); + vec = gistjoinvector(vec, &veclen, addon, curlenaddon); + dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate)); + + MemoryContextSwitchTo(oldCtx); + + if (blkno != GIST_ROOT_BLKNO) { + /* if non-root split then we should not allocate new buffer */ + dist->buffer = buffer; + dist->page = tempPage; + /* during vacuum we never split leaf page */ + GistPageGetOpaque(dist->page)->flags = 0; + } else + pfree(tempPage); + + res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen); + res.ituplen = 0; + + /* make new pages and fills them */ + for (ptr = dist; ptr; ptr = ptr->next) { + char *data; + + if ( ptr->buffer == InvalidBuffer ) { + ptr->buffer = gistNewBuffer( gv->index ); + GISTInitBuffer( ptr->buffer, 0 ); + ptr->page = BufferGetPage(ptr->buffer); + } + ptr->block.blkno = BufferGetBlockNumber( ptr->buffer ); + + data = (char*)(ptr->list); + for(i=0;i<ptr->block.num;i++) { + if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber ) + elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index)); + data += IndexTupleSize((IndexTuple)data); + } + + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup)); + memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) ); + res.ituplen++; + } + + START_CRIT_SECTION(); + + for (ptr = dist; ptr; ptr = ptr->next) { + MarkBufferDirty(ptr->buffer); + GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber; + } + + /* restore splitted non-root page */ + if (blkno != GIST_ROOT_BLKNO) { + PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) ); + dist->page = BufferGetPage( dist->buffer ); + } + + if (!gv->index->rd_istemp) + { + XLogRecPtr recptr; + XLogRecData *rdata; + ItemPointerData key; /* set key for incomplete + * insert */ + char *xlinfo; + + ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + rdata = formSplitRdata(gv->index->rd_node, blkno, + false, &key, dist); + xlinfo = rdata->data; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + for (ptr = dist; ptr; ptr = ptr->next) + { + PageSetLSN(BufferGetPage(ptr->buffer), recptr); + PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + } + + pfree(xlinfo); + pfree(rdata); + } + else + { + for (ptr = dist; ptr; ptr = ptr->next) + PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); + } + + for (ptr = dist; ptr; ptr = ptr->next) + { + /* we must keep the buffer pin on the head page */ + if (BufferGetBlockNumber(ptr->buffer) != blkno) + UnlockReleaseBuffer( ptr->buffer ); + } + + if (blkno == GIST_ROOT_BLKNO) + { + ItemPointerData key; /* set key for incomplete + * insert */ + + ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); + } + + END_CRIT_SECTION(); + + MemoryContextReset(gv->opCtx); + + return res; +} static ArrayTuple gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) { ArrayTuple res = {NULL, 0, false}; Buffer buffer; - Page page; + Page page, tempPage = NULL; OffsetNumber i, maxoff; ItemId iid; int lenaddon = 4, curlenaddon = 0, - ntodelete = 0; + nOffToDelete = 0, + nBlkToDelete = 0; IndexTuple idxtuple, *addon = NULL; bool needwrite = false; - OffsetNumber todelete[MaxOffsetNumber]; + OffsetNumber offToDelete[MaxOffsetNumber]; + BlockNumber blkToDelete[MaxOffsetNumber]; ItemPointerData *completed = NULL; int ncompleted = 0, lencompleted = 16; @@ -76,12 +288,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) page = (Page) BufferGetPage(buffer); maxoff = PageGetMaxOffsetNumber(page); - /* - * XXX need to reduce scope of changes to page so we can make this - * critical section less extensive - */ - START_CRIT_SECTION(); - if (GistPageIsLeaf(page)) { if (GistTuplesDeleted(page)) @@ -92,13 +298,16 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted); addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon); + /* get copy of page to work */ + tempPage = GistPageGetCopyPage(page); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ArrayTuple chldtuple; bool needchildunion; - iid = PageGetItemId(page, i); - idxtuple = (IndexTuple) PageGetItem(page, iid); + iid = PageGetItemId(tempPage, i); + idxtuple = (IndexTuple) PageGetItem(tempPage, iid); needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false; if (needchildunion) @@ -109,14 +318,19 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) needchildunion); if (chldtuple.ituplen || chldtuple.emptypage) { - PageIndexTupleDelete(page, i); - todelete[ntodelete++] = i; + /* update tuple or/and inserts new */ + if ( chldtuple.emptypage ) + blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); + offToDelete[nOffToDelete++] = i; + PageIndexTupleDelete(tempPage, i); i--; maxoff--; needwrite = needunion = true; if (chldtuple.ituplen) { + + Assert( chldtuple.emptypage == false ); while (curlenaddon + chldtuple.ituplen >= lenaddon) { lenaddon *= 2; @@ -150,200 +364,102 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) } } } + + Assert( maxoff == PageGetMaxOffsetNumber(tempPage) ); if (curlenaddon) { /* insert updated tuples */ - if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber)) - { + if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber)) { /* there is no space on page to insert tuples */ - IndexTuple *vec; - SplitedPageLayout *dist = NULL, - *ptr; - int i, veclen=0; - MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); - - vec = gistextractbuffer(buffer, &veclen); - vec = gistjoinvector(vec, &veclen, addon, curlenaddon); - dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate)); - - MemoryContextSwitchTo(oldCtx); - - if (blkno != GIST_ROOT_BLKNO) { - /* if non-root split then we should not allocate new buffer */ - dist->buffer = buffer; - dist->page = BufferGetPage(dist->buffer); - GistPageGetOpaque(dist->page)->flags = 0; - } - - res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen); - res.ituplen = 0; - - /* make new pages and fills them */ - for (ptr = dist; ptr; ptr = ptr->next) { - char *data; - - if ( ptr->buffer == InvalidBuffer ) { - ptr->buffer = gistNewBuffer( gv->index ); - GISTInitBuffer( ptr->buffer, 0 ); - ptr->page = BufferGetPage(ptr->buffer); - } - ptr->block.blkno = BufferGetBlockNumber( ptr->buffer ); - - data = (char*)(ptr->list); - for(i=0;i<ptr->block.num;i++) { - if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber ) - elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index)); - data += IndexTupleSize((IndexTuple)data); - } - - ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); - res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup)); - memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) ); - res.ituplen++; - - MarkBufferDirty(ptr->buffer); - } - - if (!gv->index->rd_istemp) - { - XLogRecPtr recptr; - XLogRecData *rdata; - ItemPointerData key; /* set key for incomplete - * insert */ - char *xlinfo; - - ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon); + tempPage=NULL; /* vacuumSplitPage() free tempPage */ + needwrite = needunion = false; /* gistSplit already forms unions and writes pages */ + } else + /* enough free space */ + gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber); + } + } - rdata = formSplitRdata(gv->index->rd_node, blkno, - false, &key, dist); - xlinfo = rdata->data; + /* + * If page is empty, we should remove pointer to it before + * deleting page (except root) + */ - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(BufferGetPage(ptr->buffer), recptr); - PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); - } + if ( blkno != GIST_ROOT_BLKNO && ( PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage)) ) ) { + /* + * New version of page is empty, so leave it unchanged, + * upper call will mark our page as deleted. + * In case of page split we never will be here... + * + * If page was empty it can't become non-empty during processing + */ + res.emptypage = true; + UnlockReleaseBuffer(buffer); + } else { + /* write page and remove its childs if it need */ - pfree(xlinfo); - pfree(rdata); - } - else - { - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); - } - } + START_CRIT_SECTION(); - for (ptr = dist; ptr; ptr = ptr->next) - { - /* we must keep the buffer pin on the head page */ - if (BufferGetBlockNumber(ptr->buffer) != blkno) - UnlockReleaseBuffer( ptr->buffer ); - } + if ( tempPage && needwrite ) { + PageRestoreTempPage(tempPage, page); + tempPage = NULL; + } - if (blkno == GIST_ROOT_BLKNO) - { - ItemPointerData key; /* set key for incomplete - * insert */ + /* Empty index */ + if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO ) + { + needwrite = true; + GistPageSetLeaf(page); + } - ItemPointerSet(&key, blkno, TUPLE_IS_VALID); + + if (needwrite) + { + MarkBufferDirty(buffer); + GistClearTuplesDeleted(page); - oldCtx = MemoryContextSwitchTo(gv->opCtx); - gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); - MemoryContextSwitchTo(oldCtx); - } + if (!gv->index->rd_istemp) + { + XLogRecData *rdata; + XLogRecPtr recptr; + char *xlinfo; - needwrite = false; + rdata = formUpdateRdata(gv->index->rd_node, buffer, + offToDelete, nOffToDelete, + addon, curlenaddon, NULL); + xlinfo = rdata->next->data; - MemoryContextReset(gv->opCtx); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); - needunion = false; /* gistSplit already forms unions */ + pfree(xlinfo); + pfree(rdata); } else - { - /* enough free space */ - gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber); - } + PageSetLSN(page, XLogRecPtrForTemp); } - } - if (needunion) - { - /* forms union for page or check empty */ - if (PageIsEmpty(page)) - { - if (blkno == GIST_ROOT_BLKNO) - { - needwrite = true; - GistPageSetLeaf(page); - } - else - { - needwrite = true; - res.emptypage = true; - GistPageSetDeleted(page); - gv->result->std.pages_deleted++; - } - } - else - { - IndexTuple *vec, - tmp; - int veclen = 0; - MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); - - vec = gistextractbuffer(buffer, &veclen); - tmp = gistunion(gv->index, vec, veclen, &(gv->giststate)); - MemoryContextSwitchTo(oldCtx); + END_CRIT_SECTION(); + if ( needunion && !PageIsEmpty(page) ) + { res.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); res.ituplen = 1; - res.itup[0] = (IndexTuple) palloc(IndexTupleSize(tmp)); - memcpy(res.itup[0], tmp, IndexTupleSize(tmp)); - - ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno); - GistTupleSetValid(res.itup[0]); - - MemoryContextReset(gv->opCtx); + res.itup[0] = PageMakeUnionKey(gv, buffer); } - } - if (needwrite) - { - MarkBufferDirty(buffer); - GistClearTuplesDeleted(page); - - if (!gv->index->rd_istemp) - { - XLogRecData *rdata; - XLogRecPtr recptr; - char *xlinfo; - - rdata = formUpdateRdata(gv->index->rd_node, buffer, - todelete, ntodelete, res.emptypage, - addon, curlenaddon, NULL); - xlinfo = rdata->data; + UnlockReleaseBuffer(buffer); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); + /* delete empty children, now we havn't any links to pointed subtrees */ + for(i=0;i<nBlkToDelete;i++) + gistDeleteSubtree(gv, blkToDelete[i]); - pfree(xlinfo); - pfree(rdata); - } - else - PageSetLSN(page, XLogRecPtrForTemp); + if (ncompleted && !gv->index->rd_istemp) + gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted); } - END_CRIT_SECTION(); - - UnlockReleaseBuffer(buffer); - - if (ncompleted && !gv->index->rd_istemp) - gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted); for (i = 0; i < curlenaddon; i++) pfree(addon[i]); @@ -351,6 +467,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) pfree(addon); if (completed) pfree(completed); + if (tempPage) + pfree(tempPage); + return res; } @@ -627,10 +746,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) gistxlogPageUpdate *xlinfo; rdata = formUpdateRdata(rel->rd_node, buffer, - todelete, ntodelete, false, + todelete, ntodelete, NULL, 0, NULL); - xlinfo = (gistxlogPageUpdate *) rdata->data; + xlinfo = (gistxlogPageUpdate *) rdata->next->data; recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr); |