/*------------------------------------------------------------------------- * * spgvacuum.c * vacuum for SP-GiST * * * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/access/spgist/spgvacuum.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/spgist_private.h" #include "access/transam.h" #include "catalog/storage.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "storage/procarray.h" /* local state for vacuum operations */ typedef struct spgBulkDeleteState { /* Parameters passed in to spgvacuumscan */ IndexVacuumInfo *info; IndexBulkDeleteResult *stats; IndexBulkDeleteCallback callback; void *callback_state; /* Additional working state */ SpGistState spgstate; TransactionId OldestXmin; BlockNumber lastFilledBlock; } spgBulkDeleteState; /* * Vacuum a regular (non-root) leaf page * * We must delete tuples that are targeted for deletion by the VACUUM, * but not move any tuples that are referenced by outside links; we assume * those are the ones that are heads of chains. */ static void vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer) { Page page = BufferGetPage(buffer); spgxlogVacuumLeaf xlrec; XLogRecData rdata[8]; OffsetNumber toDead[MaxIndexTuplesPerPage]; OffsetNumber toPlaceholder[MaxIndexTuplesPerPage]; OffsetNumber moveSrc[MaxIndexTuplesPerPage]; OffsetNumber moveDest[MaxIndexTuplesPerPage]; OffsetNumber chainSrc[MaxIndexTuplesPerPage]; OffsetNumber chainDest[MaxIndexTuplesPerPage]; OffsetNumber predecessor[MaxIndexTuplesPerPage + 1]; bool deletable[MaxIndexTuplesPerPage + 1]; int nDeletable; OffsetNumber i, max = PageGetMaxOffsetNumber(page); memset(predecessor, 0, sizeof(predecessor)); memset(deletable, 0, sizeof(deletable)); nDeletable = 0; /* Scan page, identify tuples to delete, accumulate stats */ for (i = FirstOffsetNumber; i <= max; i++) { SpGistLeafTuple lt; lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, i)); if (lt->tupstate == SPGIST_LIVE) { Assert(ItemPointerIsValid(<->heapPtr)); if (bds->callback(<->heapPtr, bds->callback_state)) { bds->stats->tuples_removed += 1; deletable[i] = true; nDeletable++; } else { bds->stats->num_index_tuples += 1; } /* Form predecessor map, too */ if (lt->nextOffset != InvalidOffsetNumber) { /* paranoia about corrupted chain links */ if (lt->nextOffset < FirstOffsetNumber || lt->nextOffset > max || predecessor[lt->nextOffset] != InvalidOffsetNumber) elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"", BufferGetBlockNumber(buffer), RelationGetRelationName(index)); predecessor[lt->nextOffset] = i; } } else { Assert(lt->nextOffset == InvalidOffsetNumber); } } if (nDeletable == 0) return; /* nothing more to do */ /*---------- * Figure out exactly what we have to do. We do this separately from * actually modifying the page, mainly so that we have a representation * that can be dumped into WAL and then the replay code can do exactly * the same thing. The output of this step consists of six arrays * describing four kinds of operations, to be performed in this order: * * toDead[]: tuple numbers to be replaced with DEAD tuples * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples * moveSrc[]: tuple numbers that need to be relocated to another offset * (replacing the tuple there) and then replaced with PLACEHOLDER tuples * moveDest[]: new locations for moveSrc tuples * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates * chainDest[]: new values of nextOffset for chainSrc members * * It's easiest to figure out what we have to do by processing tuple * chains, so we iterate over all the tuples (not just the deletable * ones!) to identify chain heads, then chase down each chain and make * work item entries for deletable tuples within the chain. *---------- */ xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0; for (i = FirstOffsetNumber; i <= max; i++) { SpGistLeafTuple head; bool interveningDeletable; OffsetNumber prevLive; OffsetNumber j; head = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, i)); if (head->tupstate != SPGIST_LIVE) continue; /* can't be a chain member */ if (predecessor[i] != 0) continue; /* not a chain head */ /* initialize ... */ interveningDeletable = false; prevLive = deletable[i] ? InvalidOffsetNumber : i; /* scan down the chain ... */ j = head->nextOffset; while (j != InvalidOffsetNumber) { SpGistLeafTuple lt; lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, j)); if (lt->tupstate != SPGIST_LIVE) { /* all tuples in chain should be live */ elog(ERROR, "unexpected SPGiST tuple state: %d", lt->tupstate); } if (deletable[j]) { /* This tuple should be replaced by a placeholder */ toPlaceholder[xlrec.nPlaceholder] = j; xlrec.nPlaceholder++; /* previous live tuple's chain link will need an update */ interveningDeletable = true; } else if (prevLive == InvalidOffsetNumber) { /* * This is the first live tuple in the chain. It has * to move to the head position. */ moveSrc[xlrec.nMove] = j; moveDest[xlrec.nMove] = i; xlrec.nMove++; /* Chain updates will be applied after the move */ prevLive = i; interveningDeletable = false; } else { /* * Second or later live tuple. Arrange to re-chain it to the * previous live one, if there was a gap. */ if (interveningDeletable) { chainSrc[xlrec.nChain] = prevLive; chainDest[xlrec.nChain] = j; xlrec.nChain++; } prevLive = j; interveningDeletable = false; } j = lt->nextOffset; } if (prevLive == InvalidOffsetNumber) { /* The chain is entirely removable, so we need a DEAD tuple */ toDead[xlrec.nDead] = i; xlrec.nDead++; } else if (interveningDeletable) { /* One or more deletions at end of chain, so close it off */ chainSrc[xlrec.nChain] = prevLive; chainDest[xlrec.nChain] = InvalidOffsetNumber; xlrec.nChain++; } } /* sanity check ... */ if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove) elog(ERROR, "inconsistent counts of deletable tuples"); /* Prepare WAL record */ xlrec.node = index->rd_node; xlrec.blkno = BufferGetBlockNumber(buffer); STORE_STATE(&bds->spgstate, xlrec.stateSrc); ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1); ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2); ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3); ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4); ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5); ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6); ACCEPT_RDATA_BUFFER(buffer, 7); /* Do the updates */ START_CRIT_SECTION(); spgPageIndexMultiDelete(&bds->spgstate, page, toDead, xlrec.nDead, SPGIST_DEAD, SPGIST_DEAD, InvalidBlockNumber, InvalidOffsetNumber); spgPageIndexMultiDelete(&bds->spgstate, page, toPlaceholder, xlrec.nPlaceholder, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, InvalidBlockNumber, InvalidOffsetNumber); /* * We implement the move step by swapping the item pointers of the * source and target tuples, then replacing the newly-source tuples * with placeholders. This is perhaps unduly friendly with the page * data representation, but it's fast and doesn't risk page overflow * when a tuple to be relocated is large. */ for (i = 0; i < xlrec.nMove; i++) { ItemId idSrc = PageGetItemId(page, moveSrc[i]); ItemId idDest = PageGetItemId(page, moveDest[i]); ItemIdData tmp; tmp = *idSrc; *idSrc = *idDest; *idDest = tmp; } spgPageIndexMultiDelete(&bds->spgstate, page, moveSrc, xlrec.nMove, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, InvalidBlockNumber, InvalidOffsetNumber); for (i = 0; i < xlrec.nChain; i++) { SpGistLeafTuple lt; lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, chainSrc[i])); Assert(lt->tupstate == SPGIST_LIVE); lt->nextOffset = chainDest[i]; } MarkBufferDirty(buffer); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); } /* * Vacuum the root page when it is a leaf * * On the root, we just delete any dead leaf tuples; no fancy business */ static void vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) { Page page = BufferGetPage(buffer); spgxlogVacuumRoot xlrec; XLogRecData rdata[3]; OffsetNumber toDelete[MaxIndexTuplesPerPage]; OffsetNumber i, max = PageGetMaxOffsetNumber(page); xlrec.nDelete = 0; /* Scan page, identify tuples to delete, accumulate stats */ for (i = FirstOffsetNumber; i <= max; i++) { SpGistLeafTuple lt; lt = (SpGistLeafTuple) PageGetItem(page, PageGetItemId(page, i)); if (lt->tupstate == SPGIST_LIVE) { Assert(ItemPointerIsValid(<->heapPtr)); if (bds->callback(<->heapPtr, bds->callback_state)) { bds->stats->tuples_removed += 1; toDelete[xlrec.nDelete] = i; xlrec.nDelete++; } else { bds->stats->num_index_tuples += 1; } } else { /* all tuples on root should be live */ elog(ERROR, "unexpected SPGiST tuple state: %d", lt->tupstate); } } if (xlrec.nDelete == 0) return; /* nothing more to do */ /* Prepare WAL record */ xlrec.node = index->rd_node; STORE_STATE(&bds->spgstate, xlrec.stateSrc); ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1); ACCEPT_RDATA_BUFFER(buffer, 2); /* Do the update */ START_CRIT_SECTION(); /* The tuple numbers are in order, so we can use PageIndexMultiDelete */ PageIndexMultiDelete(page, toDelete, xlrec.nDelete); MarkBufferDirty(buffer); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); } /* * Clean up redirect and placeholder tuples on the given page * * Redirect tuples can be marked placeholder once they're old enough. * Placeholder tuples can be removed if it won't change the offsets of * non-placeholder ones. * * Unlike the routines above, this works on both leaf and inner pages. */ static void vacuumRedirectAndPlaceholder(Relation index, Buffer buffer, TransactionId OldestXmin) { Page page = BufferGetPage(buffer); SpGistPageOpaque opaque = SpGistPageGetOpaque(page); OffsetNumber i, max = PageGetMaxOffsetNumber(page), firstPlaceholder = InvalidOffsetNumber; bool hasNonPlaceholder = false; bool hasUpdate = false; OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage]; OffsetNumber itemnos[MaxIndexTuplesPerPage]; spgxlogVacuumRedirect xlrec; XLogRecData rdata[3]; xlrec.node = index->rd_node; xlrec.blkno = BufferGetBlockNumber(buffer); xlrec.nToPlaceholder = 0; START_CRIT_SECTION(); /* * Scan backwards to convert old redirection tuples to placeholder tuples, * and identify location of last non-placeholder tuple while at it. */ for (i = max; i >= FirstOffsetNumber && (opaque->nRedirection > 0 || !hasNonPlaceholder); i--) { SpGistDeadTuple dt; dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i)); if (dt->tupstate == SPGIST_REDIRECT && TransactionIdPrecedes(dt->xid, OldestXmin)) { dt->tupstate = SPGIST_PLACEHOLDER; Assert(opaque->nRedirection > 0); opaque->nRedirection--; opaque->nPlaceholder++; ItemPointerSetInvalid(&dt->pointer); itemToPlaceholder[xlrec.nToPlaceholder] = i; xlrec.nToPlaceholder++; hasUpdate = true; } if (dt->tupstate == SPGIST_PLACEHOLDER) { if (!hasNonPlaceholder) firstPlaceholder = i; } else { hasNonPlaceholder = true; } } /* * Any placeholder tuples at the end of page can safely be removed. We * can't remove ones before the last non-placeholder, though, because we * can't alter the offset numbers of non-placeholder tuples. */ if (firstPlaceholder != InvalidOffsetNumber) { /* * We do not store this array to rdata because it's easy to recreate. */ for (i = firstPlaceholder; i <= max; i++) itemnos[i - firstPlaceholder] = i; i = max - firstPlaceholder + 1; Assert(opaque->nPlaceholder >= i); opaque->nPlaceholder -= i; /* The array is surely sorted, so can use PageIndexMultiDelete */ PageIndexMultiDelete(page, itemnos, i); hasUpdate = true; } xlrec.firstPlaceholder = firstPlaceholder; if (hasUpdate) MarkBufferDirty(buffer); if (hasUpdate && RelationNeedsWAL(index)) { XLogRecPtr recptr; ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1); ACCEPT_RDATA_BUFFER(buffer, 2); recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); } END_CRIT_SECTION(); } /* * Process one page during a bulkdelete scan */ static void spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno) { Relation index = bds->info->index; Buffer buffer; Page page; /* call vacuum_delay_point while not holding any buffer lock */ vacuum_delay_point(); buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, RBM_NORMAL, bds->info->strategy); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); if (PageIsNew(page)) { /* * We found an all-zero page, which could happen if the database * crashed just after extending the file. Initialize and recycle it. */ SpGistInitBuffer(buffer, 0); SpGistPageSetDeleted(page); /* We don't bother to WAL-log this action; easy to redo */ MarkBufferDirty(buffer); } else if (SpGistPageIsDeleted(page)) { /* nothing to do */ } else if (SpGistPageIsLeaf(page)) { if (blkno == SPGIST_HEAD_BLKNO) { vacuumLeafRoot(bds, index, buffer); /* no need for vacuumRedirectAndPlaceholder */ } else { vacuumLeafPage(bds, index, buffer); vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin); } } else { /* inner page */ vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin); } /* * The root page must never be deleted, nor marked as available in FSM, * because we don't want it ever returned by a search for a place to * put a new tuple. Otherwise, check for empty/deletable page, and * make sure FSM knows about it. */ if (blkno != SPGIST_HEAD_BLKNO) { /* If page is now empty, mark it deleted */ if (PageIsEmpty(page) && !SpGistPageIsDeleted(page)) { SpGistPageSetDeleted(page); /* We don't bother to WAL-log this action; easy to redo */ MarkBufferDirty(buffer); } if (SpGistPageIsDeleted(page)) { RecordFreeIndexPage(index, blkno); bds->stats->pages_deleted++; } else bds->lastFilledBlock = blkno; } SpGistSetLastUsedPage(index, buffer); UnlockReleaseBuffer(buffer); } /* * Perform a bulkdelete scan */ static void spgvacuumscan(spgBulkDeleteState *bds) { Relation index = bds->info->index; bool needLock; BlockNumber num_pages, blkno; /* Finish setting up spgBulkDeleteState */ initSpGistState(&bds->spgstate, index); bds->OldestXmin = GetOldestXmin(true, false); bds->lastFilledBlock = SPGIST_HEAD_BLKNO; /* * Reset counts that will be incremented during the scan; needed in case * of multiple scans during a single VACUUM command */ bds->stats->estimated_count = false; bds->stats->num_index_tuples = 0; bds->stats->pages_deleted = 0; /* We can skip locking for new or temp relations */ needLock = !RELATION_IS_LOCAL(index); /* * The outer loop iterates over all index pages except the metapage, in * physical order (we hope the kernel will cooperate in providing * read-ahead for speed). It is critical that we visit all leaf pages, * including ones added after we start the scan, else we might fail to * delete some deletable tuples. See more extensive comments about * this in btvacuumscan(). */ blkno = SPGIST_HEAD_BLKNO; for (;;) { /* Get the current relation length */ if (needLock) LockRelationForExtension(index, ExclusiveLock); num_pages = RelationGetNumberOfBlocks(index); if (needLock) UnlockRelationForExtension(index, ExclusiveLock); /* Quit if we've scanned the whole relation */ if (blkno >= num_pages) break; /* Iterate over pages, then loop back to recheck length */ for (; blkno < num_pages; blkno++) { spgvacuumpage(bds, blkno); } } /* Propagate local lastUsedPage cache to metablock */ SpGistUpdateMetaPage(index); /* * Truncate index if possible * * XXX disabled because it's unsafe due to possible concurrent inserts. * We'd have to rescan the pages to make sure they're still empty, and it * doesn't seem worth it. Note that btree doesn't do this either. */ #ifdef NOT_USED if (num_pages > bds->lastFilledBlock + 1) { BlockNumber lastBlock = num_pages - 1; num_pages = bds->lastFilledBlock + 1; RelationTruncate(index, num_pages); bds->stats->pages_removed += lastBlock - bds->lastFilledBlock; bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock; } #endif /* Report final stats */ bds->stats->num_pages = num_pages; bds->stats->pages_free = bds->stats->pages_deleted; } /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells * whether any given heap tuple (identified by ItemPointer) is being deleted. * * Result: a palloc'd struct containing statistical info for VACUUM displays. */ Datum spgbulkdelete(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); spgBulkDeleteState bds; /* allocate stats if first time through, else re-use existing struct */ if (stats == NULL) stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); bds.info = info; bds.stats = stats; bds.callback = callback; bds.callback_state = callback_state; spgvacuumscan(&bds); PG_RETURN_POINTER(stats); } /* Dummy callback to delete no tuples during spgvacuumcleanup */ static bool dummy_callback(ItemPointer itemptr, void *state) { return false; } /* * Post-VACUUM cleanup. * * Result: a palloc'd struct containing statistical info for VACUUM displays. */ Datum spgvacuumcleanup(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); Relation index = info->index; spgBulkDeleteState bds; /* No-op in ANALYZE ONLY mode */ if (info->analyze_only) PG_RETURN_POINTER(stats); /* * We don't need to scan the index if there was a preceding bulkdelete * pass. Otherwise, make a pass that won't delete any live tuples, but * might still accomplish useful stuff with redirect/placeholder cleanup, * and in any case will provide stats. */ if (stats == NULL) { stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); bds.info = info; bds.stats = stats; bds.callback = dummy_callback; bds.callback_state = NULL; spgvacuumscan(&bds); } /* Finally, vacuum the FSM */ IndexFreeSpaceMapVacuum(index); /* * It's quite possible for us to be fooled by concurrent page splits into * double-counting some index tuples, so disbelieve any total that exceeds * the underlying heap's count ... if we know that accurately. Otherwise * this might just make matters worse. */ if (!info->estimated_count) { if (stats->num_index_tuples > info->num_heap_tuples) stats->num_index_tuples = info->num_heap_tuples; } PG_RETURN_POINTER(stats); }