diff options
-rw-r--r-- | contrib/bloom/blinsert.c | 11 | ||||
-rw-r--r-- | contrib/bloom/blscan.c | 2 | ||||
-rw-r--r-- | contrib/bloom/blutils.c | 13 | ||||
-rw-r--r-- | contrib/bloom/blvacuum.c | 75 |
4 files changed, 60 insertions, 41 deletions
diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c index 3953af996bb..78eec5c67e0 100644 --- a/contrib/bloom/blinsert.c +++ b/contrib/bloom/blinsert.c @@ -237,6 +237,13 @@ blinsert(Relation index, Datum *values, bool *isnull, state = GenericXLogStart(index); page = GenericXLogRegisterBuffer(state, buffer, 0); + /* + * We might have found a page that was recently deleted by VACUUM. If + * so, we can reuse it, but we must reinitialize it. + */ + if (PageIsNew(page) || BloomPageIsDeleted(page)) + BloomInitPage(page, 0); + if (BloomPageAddItem(&blstate, page, itup)) { /* Success! Apply the change, clean up, and exit */ @@ -295,6 +302,10 @@ blinsert(Relation index, Datum *values, bool *isnull, LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = GenericXLogRegisterBuffer(state, buffer, 0); + /* Basically same logic as above */ + if (PageIsNew(page) || BloomPageIsDeleted(page)) + BloomInitPage(page, 0); + if (BloomPageAddItem(&blstate, page, itup)) { /* Success! Apply the changes, clean up, and exit */ diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c index 0c954dc8d55..316a906734b 100644 --- a/contrib/bloom/blscan.c +++ b/contrib/bloom/blscan.c @@ -135,7 +135,7 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) page = BufferGetPage(buffer); TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page); - if (!BloomPageIsDeleted(page)) + if (!PageIsNew(page) && !BloomPageIsDeleted(page)) { OffsetNumber offset, maxOffset = BloomPageGetMaxOffset(page); diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c index b5007ed6bb1..debf4f46eb7 100644 --- a/contrib/bloom/blutils.c +++ b/contrib/bloom/blutils.c @@ -299,7 +299,7 @@ BloomFormTuple(BloomState *state, ItemPointer iptr, Datum *values, bool *isnull) /* * Add new bloom tuple to the page. Returns true if new tuple was successfully - * added to the page. Returns false if it doesn't fit the page. + * added to the page. Returns false if it doesn't fit on the page. */ bool BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple) @@ -308,7 +308,10 @@ BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple) BloomPageOpaque opaque; Pointer ptr; - /* Does new tuple fit the page */ + /* We shouldn't be pointed to an invalid page */ + Assert(!PageIsNew(page) && !BloomPageIsDeleted(page)); + + /* Does new tuple fit on the page? */ if (BloomPageGetFreeSpace(state, page) < state->sizeOfBloomTuple) return false; @@ -322,6 +325,9 @@ BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple) ptr = (Pointer) BloomPageGetTuple(state, page, opaque->maxoff + 1); ((PageHeader) page)->pd_lower = ptr - page; + /* Assert we didn't overrun available space */ + Assert(((PageHeader) page)->pd_lower <= ((PageHeader) page)->pd_upper); + return true; } @@ -424,6 +430,9 @@ BloomFillMetapage(Relation index, Page metaPage) metadata->magickNumber = BLOOM_MAGICK_NUMBER; metadata->opts = *opts; ((PageHeader) metaPage)->pd_lower += sizeof(BloomMetaPageData); + + /* If this fails, probably FreeBlockNumberArray size calc is wrong: */ + Assert(((PageHeader) metaPage)->pd_lower <= ((PageHeader) metaPage)->pd_upper); } /* diff --git a/contrib/bloom/blvacuum.c b/contrib/bloom/blvacuum.c index 7c355463a28..482242f1c27 100644 --- a/contrib/bloom/blvacuum.c +++ b/contrib/bloom/blvacuum.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/genam.h" +#include "bloom.h" #include "catalog/storage.h" #include "commands/vacuum.h" #include "miscadmin.h" @@ -21,7 +22,6 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" -#include "bloom.h" /* * Bulk deletion of all index entries pointing to a set of heap tuples. @@ -42,6 +42,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, BloomState state; Buffer buffer; Page page; + BloomMetaPageData *metaData; GenericXLogState *gxlogState; if (stats == NULL) @@ -60,6 +61,8 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, *itupPtr, *itupEnd; + vacuum_delay_point(); + buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy); @@ -67,15 +70,18 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, gxlogState = GenericXLogStart(index); page = GenericXLogRegisterBuffer(gxlogState, buffer, 0); - if (BloomPageIsDeleted(page)) + /* Ignore empty/deleted pages until blvacuumcleanup() */ + if (PageIsNew(page) || BloomPageIsDeleted(page)) { UnlockReleaseBuffer(buffer); GenericXLogAbort(gxlogState); - CHECK_FOR_INTERRUPTS(); continue; } - /* Iterate over the tuples */ + /* + * Iterate over the tuples. itup points to current tuple being + * scanned, itupPtr points to where to save next non-deleted tuple. + */ itup = itupPtr = BloomPageGetTuple(&state, page, FirstOffsetNumber); itupEnd = BloomPageGetTuple(&state, page, OffsetNumberNext(BloomPageGetMaxOffset(page))); @@ -84,36 +90,32 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, /* Do we have to delete this tuple? */ if (callback(&itup->heapPtr, callback_state)) { - stats->tuples_removed += 1; + /* Yes; adjust count of tuples that will be left on page */ BloomPageGetOpaque(page)->maxoff--; + stats->tuples_removed += 1; } else { + /* No; copy it to itupPtr++, but skip copy if not needed */ if (itupPtr != itup) - { - /* - * If we already delete something before, we have to move - * this tuple backward. - */ memmove((Pointer) itupPtr, (Pointer) itup, state.sizeOfBloomTuple); - } - stats->num_index_tuples++; itupPtr = BloomPageGetNextTuple(&state, itupPtr); } itup = BloomPageGetNextTuple(&state, itup); } + /* Assert that we counted correctly */ Assert(itupPtr == BloomPageGetTuple(&state, page, OffsetNumberNext(BloomPageGetMaxOffset(page)))); /* - * Add page to notFullPage list if we will not mark page as deleted - * and there is a free space on it + * Add page to new notFullPage list if we will not mark page as + * deleted and there is free space on it */ if (BloomPageGetMaxOffset(page) != 0 && - BloomPageGetFreeSpace(&state, page) > state.sizeOfBloomTuple && + BloomPageGetFreeSpace(&state, page) >= state.sizeOfBloomTuple && countPage < BloomMetaBlockN) notFullPage[countPage++] = blkno; @@ -134,27 +136,26 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, GenericXLogAbort(gxlogState); } UnlockReleaseBuffer(buffer); - CHECK_FOR_INTERRUPTS(); } - if (countPage > 0) - { - BloomMetaPageData *metaData; - - buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + /* + * Update the metapage's notFullPage list with whatever we found. Our + * info could already be out of date at this point, but blinsert() will + * cope if so. + */ + buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - gxlogState = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(gxlogState, buffer, 0); + gxlogState = GenericXLogStart(index); + page = GenericXLogRegisterBuffer(gxlogState, buffer, 0); - metaData = BloomPageGetMeta(page); - memcpy(metaData->notFullPage, notFullPage, sizeof(BlockNumber) * countPage); - metaData->nStart = 0; - metaData->nEnd = countPage; + metaData = BloomPageGetMeta(page); + memcpy(metaData->notFullPage, notFullPage, sizeof(BlockNumber) * countPage); + metaData->nStart = 0; + metaData->nEnd = countPage; - GenericXLogFinish(gxlogState); - UnlockReleaseBuffer(buffer); - } + GenericXLogFinish(gxlogState); + UnlockReleaseBuffer(buffer); return stats; } @@ -170,7 +171,6 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) Relation index = info->index; BlockNumber npages, blkno; - BlockNumber totFreePages; if (info->analyze_only) return stats; @@ -183,7 +183,9 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) * statistics. */ npages = RelationGetNumberOfBlocks(index); - totFreePages = 0; + stats->num_pages = npages; + stats->pages_free = 0; + stats->num_index_tuples = 0; for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++) { Buffer buffer; @@ -196,23 +198,20 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) LockBuffer(buffer, BUFFER_LOCK_SHARE); page = (Page) BufferGetPage(buffer); - if (BloomPageIsDeleted(page)) + if (PageIsNew(page) || BloomPageIsDeleted(page)) { RecordFreeIndexPage(index, blkno); - totFreePages++; + stats->pages_free++; } else { stats->num_index_tuples += BloomPageGetMaxOffset(page); - stats->estimated_count += BloomPageGetMaxOffset(page); } UnlockReleaseBuffer(buffer); } IndexFreeSpaceMapVacuum(info->index); - stats->pages_free = totFreePages; - stats->num_pages = RelationGetNumberOfBlocks(index); return stats; } |