aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/indexcmds.c133
-rw-r--r--src/backend/commands/sequence.c4
-rw-r--r--src/backend/commands/vacuum.c245
-rw-r--r--src/backend/commands/vacuumlazy.c109
4 files changed, 413 insertions, 78 deletions
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index ebac5957bd2..943978e589a 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.165 2007/09/10 21:59:37 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.166 2007/09/20 17:56:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -119,6 +119,7 @@ DefineIndex(RangeVar *heapRelation,
Oid namespaceId;
Oid tablespaceId;
Relation rel;
+ Relation indexRelation;
HeapTuple tuple;
Form_pg_am accessMethodForm;
bool amcanorder;
@@ -420,7 +421,10 @@ DefineIndex(RangeVar *heapRelation,
indexInfo->ii_Predicate = make_ands_implicit(predicate);
indexInfo->ii_PredicateState = NIL;
indexInfo->ii_Unique = unique;
+ /* In a concurrent build, mark it not-ready-for-inserts */
+ indexInfo->ii_ReadyForInserts = !concurrent;
indexInfo->ii_Concurrent = concurrent;
+ indexInfo->ii_BrokenHotChain = false;
classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
@@ -439,23 +443,38 @@ DefineIndex(RangeVar *heapRelation,
primary ? "PRIMARY KEY" : "UNIQUE",
indexRelationName, RelationGetRelationName(rel))));
- /* save lockrelid for below, then close rel */
+ /* save lockrelid and locktag for below, then close rel */
heaprelid = rel->rd_lockInfo.lockRelId;
+ SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
heap_close(rel, NoLock);
+ if (!concurrent)
+ {
+ indexRelationId =
+ index_create(relationId, indexRelationName, indexRelationId,
+ indexInfo, accessMethodId, tablespaceId, classObjectId,
+ coloptions, reloptions, primary, isconstraint,
+ allowSystemTableMods, skip_build, concurrent);
+
+ return; /* We're done, in the standard case */
+ }
+
+ /*
+ * For a concurrent build, we next insert the catalog entry and add
+ * constraints. We don't build the index just yet; we must first make
+ * the catalog entry so that the new index is visible to updating
+ * transactions. That will prevent them from making incompatible HOT
+ * updates. The new index will be marked not indisready and not
+ * indisvalid, so that no one else tries to either insert into it or use
+ * it for queries. We pass skip_build = true to prevent the build.
+ */
indexRelationId =
index_create(relationId, indexRelationName, indexRelationId,
indexInfo, accessMethodId, tablespaceId, classObjectId,
coloptions, reloptions, primary, isconstraint,
- allowSystemTableMods, skip_build, concurrent);
-
- if (!concurrent)
- return; /* We're done, in the standard case */
+ allowSystemTableMods, true, concurrent);
/*
- * Phase 2 of concurrent index build (see comments for validate_index()
- * for an overview of how this works)
- *
* We must commit our current transaction so that the index becomes
* visible; then start another. Note that all the data structures we just
* built are lost in the commit. The only data we keep past here are the
@@ -476,6 +495,9 @@ DefineIndex(RangeVar *heapRelation,
StartTransactionCommand();
/*
+ * Phase 2 of concurrent index build (see comments for validate_index()
+ * for an overview of how this works)
+ *
* Now we must wait until no running transaction could have the table open
* with the old list of indexes. To do this, inquire which xacts
* currently would conflict with ShareLock on the table -- ie, which ones
@@ -494,7 +516,91 @@ DefineIndex(RangeVar *heapRelation,
* check for that. Also, prepared xacts are not reported, which is
* fine since they certainly aren't going to do anything more.
*/
- SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
+ old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
+
+ while (VirtualTransactionIdIsValid(*old_lockholders))
+ {
+ VirtualXactLockTableWait(*old_lockholders);
+ old_lockholders++;
+ }
+
+ /*
+ * At this moment we are sure that there are no transactions with the
+ * table open for write that don't have this new index in their list of
+ * indexes. We have waited out all the existing transactions and any new
+ * transaction will have the new index in its list, but the index is still
+ * marked as "not-ready-for-inserts". The index is consulted while
+ * deciding HOT-safety though. This arrangement ensures that no new HOT
+ * chains can be created where the new tuple and the old tuple in the
+ * chain have different index keys.
+ *
+ * We now take a new snapshot, and build the index using all tuples that
+ * are visible in this snapshot. We can be sure that any HOT updates
+ * to these tuples will be compatible with the index, since any updates
+ * made by transactions that didn't know about the index are now committed
+ * or rolled back. Thus, each visible tuple is either the end of its
+ * HOT-chain or the extension of the chain is HOT-safe for this index.
+ */
+
+ /* Open and lock the parent heap relation */
+ rel = heap_openrv(heapRelation, ShareUpdateExclusiveLock);
+
+ /* And the target index relation */
+ indexRelation = index_open(indexRelationId, RowExclusiveLock);
+
+ /* Set ActiveSnapshot since functions in the indexes may need it */
+ ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
+
+ /* We have to re-build the IndexInfo struct, since it was lost in commit */
+ indexInfo = BuildIndexInfo(indexRelation);
+ Assert(!indexInfo->ii_ReadyForInserts);
+ indexInfo->ii_Concurrent = true;
+ indexInfo->ii_BrokenHotChain = false;
+
+ /* Now build the index */
+ index_build(rel, indexRelation, indexInfo, primary);
+
+ /* Close both the relations, but keep the locks */
+ heap_close(rel, NoLock);
+ index_close(indexRelation, NoLock);
+
+ /*
+ * Update the pg_index row to mark the index as ready for inserts.
+ * Once we commit this transaction, any new transactions that
+ * open the table must insert new entries into the index for insertions
+ * and non-HOT updates.
+ */
+ pg_index = heap_open(IndexRelationId, RowExclusiveLock);
+
+ indexTuple = SearchSysCacheCopy(INDEXRELID,
+ ObjectIdGetDatum(indexRelationId),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "cache lookup failed for index %u", indexRelationId);
+ indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
+
+ Assert(!indexForm->indisready);
+ Assert(!indexForm->indisvalid);
+
+ indexForm->indisready = true;
+
+ simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
+ CatalogUpdateIndexes(pg_index, indexTuple);
+
+ heap_close(pg_index, RowExclusiveLock);
+
+ /*
+ * Commit this transaction to make the indisready update visible.
+ */
+ CommitTransactionCommand();
+ StartTransactionCommand();
+
+ /*
+ * Phase 3 of concurrent index build
+ *
+ * We once again wait until no transaction can have the table open with
+ * the index marked as read-only for updates.
+ */
old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
while (VirtualTransactionIdIsValid(*old_lockholders))
@@ -505,7 +611,7 @@ DefineIndex(RangeVar *heapRelation,
/*
* Now take the "reference snapshot" that will be used by validate_index()
- * to filter candidate tuples. Beware! There might be still snapshots
+ * to filter candidate tuples. Beware! There might still be snapshots
* in use that treat some transaction as in-progress that our reference
* snapshot treats as committed. If such a recently-committed transaction
* deleted tuples in the table, we will not include them in the index; yet
@@ -560,7 +666,7 @@ DefineIndex(RangeVar *heapRelation,
elog(ERROR, "cache lookup failed for index %u", indexRelationId);
indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
- Assert(indexForm->indexrelid = indexRelationId);
+ Assert(indexForm->indisready);
Assert(!indexForm->indisvalid);
indexForm->indisvalid = true;
@@ -575,7 +681,8 @@ DefineIndex(RangeVar *heapRelation,
* relcache entries for the index itself, but we should also send a
* relcache inval on the parent table to force replanning of cached plans.
* Otherwise existing sessions might fail to use the new index where it
- * would be useful.
+ * would be useful. (Note that our earlier commits did not create
+ * reasons to replan; relcache flush on the index itself was sufficient.)
*/
CacheInvalidateRelcacheByRelid(heaprelid.relId);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 73024a7e703..25d1e2311b6 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.145 2007/09/12 22:10:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.146 2007/09/20 17:56:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1281,7 +1281,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
itemsz = record->xl_len - sizeof(xl_seq_rec);
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) item, itemsz,
- FirstOffsetNumber, false) == InvalidOffsetNumber)
+ FirstOffsetNumber, false, false) == InvalidOffsetNumber)
elog(PANIC, "seq_redo: failed to add item to page");
PageSetLSN(page, lsn);
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index f9b9423534e..5630fc2730d 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.358 2007/09/12 22:10:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.359 2007/09/20 17:56:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -124,10 +124,11 @@ typedef VTupleMoveData *VTupleMove;
typedef struct VRelStats
{
/* miscellaneous statistics */
- BlockNumber rel_pages;
- double rel_tuples;
- Size min_tlen;
- Size max_tlen;
+ BlockNumber rel_pages; /* pages in relation */
+ double rel_tuples; /* tuples that remain after vacuuming */
+ double rel_indexed_tuples; /* indexed tuples that remain */
+ Size min_tlen; /* min surviving tuple size */
+ Size max_tlen; /* max surviving tuple size */
bool hasindex;
/* vtlinks array for tuple chain following - sorted by new_tid */
int num_vtlinks;
@@ -1177,6 +1178,7 @@ full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
vacrelstats->rel_pages = 0;
vacrelstats->rel_tuples = 0;
+ vacrelstats->rel_indexed_tuples = 0;
vacrelstats->hasindex = false;
/* scan the heap */
@@ -1195,13 +1197,13 @@ full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
{
for (i = 0; i < nindexes; i++)
vacuum_index(&vacuum_pages, Irel[i],
- vacrelstats->rel_tuples, 0);
+ vacrelstats->rel_indexed_tuples, 0);
}
else
{
/* just scan indexes to update statistic */
for (i = 0; i < nindexes; i++)
- scan_index(Irel[i], vacrelstats->rel_tuples);
+ scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
}
}
@@ -1256,6 +1258,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
BlockNumber empty_pages,
empty_end_pages;
double num_tuples,
+ num_indexed_tuples,
tups_vacuumed,
nkeep,
nunused;
@@ -1278,7 +1281,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
relname)));
empty_pages = empty_end_pages = 0;
- num_tuples = tups_vacuumed = nkeep = nunused = 0;
+ num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
free_space = 0;
nblocks = RelationGetNumberOfBlocks(onerel);
@@ -1313,9 +1316,13 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
* background writer will try to write the page if it's already marked
* dirty. To ensure that invalid data doesn't get written to disk, we
* must take exclusive buffer lock wherever we potentially modify
- * pages.
+ * pages. In fact, we insist on cleanup lock so that we can safely
+ * call heap_page_prune(). (This might be overkill, since the bgwriter
+ * pays no attention to individual tuples, but on the other hand it's
+ * unlikely that the bgwriter has this particular page pinned at this
+ * instant. So violating the coding rule would buy us little anyway.)
*/
- LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ LockBufferForCleanup(buf);
vacpage->blkno = blkno;
vacpage->offsets_used = 0;
@@ -1356,6 +1363,21 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
continue;
}
+ /*
+ * Prune all HOT-update chains in this page.
+ *
+ * We use the redirect_move option so that redirecting line pointers
+ * get collapsed out; this allows us to not worry about them below.
+ *
+ * We count tuples removed by the pruning step as removed by VACUUM.
+ */
+ tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
+ true, false);
+
+ /*
+ * Now scan the page to collect vacuumable items and check for
+ * tuples requiring freezing.
+ */
nfrozen = 0;
notup = true;
maxoff = PageGetMaxOffsetNumber(page);
@@ -1369,7 +1391,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
/*
* Collect un-used items too - it's possible to have indexes
- * pointing here after crash.
+ * pointing here after crash. (That's an ancient comment and
+ * is likely obsolete with WAL, but we might as well continue
+ * to check for such problems.)
*/
if (!ItemIdIsUsed(itemid))
{
@@ -1378,6 +1402,23 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
continue;
}
+ /*
+ * DEAD item pointers are to be vacuumed normally; but we don't
+ * count them in tups_vacuumed, else we'd be double-counting
+ * (at least in the common case where heap_page_prune() just
+ * freed up a non-HOT tuple).
+ */
+ if (ItemIdIsDead(itemid))
+ {
+ vacpage->offsets[vacpage->offsets_free++] = offnum;
+ continue;
+ }
+
+ /* Shouldn't have any redirected items anymore */
+ if (!ItemIdIsNormal(itemid))
+ elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
+ relname, blkno, offnum);
+
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
tuple.t_len = ItemIdGetLength(itemid);
ItemPointerSet(&(tuple.t_self), blkno, offnum);
@@ -1410,12 +1451,45 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
}
break;
case HEAPTUPLE_DEAD:
- tupgone = true; /* we can delete the tuple */
/*
- * We need not require XMIN_COMMITTED or XMAX_COMMITTED to
- * be set, since we will remove the tuple without any
- * further examination of its hint bits.
+ * Ordinarily, DEAD tuples would have been removed by
+ * heap_page_prune(), but it's possible that the tuple
+ * state changed since heap_page_prune() looked. In
+ * particular an INSERT_IN_PROGRESS tuple could have
+ * changed to DEAD if the inserter aborted. So this
+ * cannot be considered an error condition, though it
+ * does suggest that someone released a lock early.
+ *
+ * If the tuple is HOT-updated then it must only be
+ * removed by a prune operation; so we keep it as if it
+ * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
+ * worth trying to make the shrinking code smart enough
+ * to handle this? It's an unusual corner case.)
+ *
+ * DEAD heap-only tuples can safely be removed if they
+ * aren't themselves HOT-updated, although this is a bit
+ * inefficient since we'll uselessly try to remove
+ * index entries for them.
*/
+ if (HeapTupleIsHotUpdated(&tuple))
+ {
+ nkeep += 1;
+ if (do_shrinking)
+ ereport(LOG,
+ (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
+ relname, blkno, offnum)));
+ do_shrinking = false;
+ }
+ else
+ {
+ tupgone = true; /* we can delete the tuple */
+ /*
+ * We need not require XMIN_COMMITTED or
+ * XMAX_COMMITTED to be set, since we will remove the
+ * tuple without any further examination of its hint
+ * bits.
+ */
+ }
break;
case HEAPTUPLE_RECENTLY_DEAD:
@@ -1530,6 +1604,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
else
{
num_tuples += 1;
+ if (!HeapTupleIsHeapOnly(&tuple))
+ num_indexed_tuples += 1;
notup = false;
if (tuple.t_len < min_tlen)
min_tlen = tuple.t_len;
@@ -1549,7 +1625,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
if (tempPage != NULL)
{
/* Some tuples are removable; figure free space after removal */
- PageRepairFragmentation(tempPage, NULL);
+ PageRepairFragmentation(tempPage);
vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
pfree(tempPage);
do_reap = true;
@@ -1558,7 +1634,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
{
/* Just use current available space */
vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
- /* Need to reap the page if it has LP_UNUSED line pointers */
+ /* Need to reap the page if it has UNUSED or DEAD line pointers */
do_reap = (vacpage->offsets_free > 0);
}
@@ -1621,6 +1697,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
/* save stats in the rel list for use later */
vacrelstats->rel_tuples = num_tuples;
+ vacrelstats->rel_indexed_tuples = num_indexed_tuples;
vacrelstats->rel_pages = nblocks;
if (num_tuples == 0)
min_tlen = max_tlen = 0;
@@ -1720,6 +1797,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
num_fraged_pages,
vacuumed_pages;
int keep_tuples = 0;
+ int keep_indexed_tuples = 0;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -1845,6 +1923,16 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (!ItemIdIsUsed(itemid))
continue;
+ if (ItemIdIsDead(itemid))
+ {
+ /* just remember it for vacuum_page() */
+ vacpage->offsets[vacpage->offsets_free++] = offnum;
+ continue;
+ }
+
+ /* Shouldn't have any redirected items now */
+ Assert(ItemIdIsNormal(itemid));
+
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
tuple_len = tuple.t_len = ItemIdGetLength(itemid);
ItemPointerSet(&(tuple.t_self), blkno, offnum);
@@ -1906,12 +1994,28 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (i >= vacpage->offsets_free) /* not found */
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
+ /*
+ * If this is not a heap-only tuple, there must be an
+ * index entry for this item which will be removed in
+ * the index cleanup. Decrement the keep_indexed_tuples
+ * count to remember this.
+ */
+ if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
+ keep_indexed_tuples--;
keep_tuples--;
}
}
else
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
+ /*
+ * If this is not a heap-only tuple, there must be an
+ * index entry for this item which will be removed in
+ * the index cleanup. Decrement the keep_indexed_tuples
+ * count to remember this.
+ */
+ if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
+ keep_indexed_tuples--;
keep_tuples--;
}
continue;
@@ -2028,7 +2132,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
break;
}
nextItemid = PageGetItemId(nextPage, nextOffnum);
- if (!ItemIdIsUsed(nextItemid))
+ if (!ItemIdIsNormal(nextItemid))
{
ReleaseBuffer(nextBuf);
break;
@@ -2166,7 +2270,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
Pitemid = PageGetItemId(Ppage,
ItemPointerGetOffsetNumber(&(tp.t_self)));
/* this can't happen since we saw tuple earlier: */
- if (!ItemIdIsUsed(Pitemid))
+ if (!ItemIdIsNormal(Pitemid))
elog(ERROR, "parent itemid marked as unused");
PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
@@ -2268,6 +2372,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
dst_buffer, dst_page, destvacpage,
&ec, &Ctid, vtmove[ti].cleanVpd);
+ /*
+ * If the tuple we are moving is a heap-only tuple,
+ * this move will generate an additional index entry,
+ * so increment the rel_indexed_tuples count.
+ */
+ if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
+ vacrelstats->rel_indexed_tuples++;
+
num_moved++;
if (destvacpage->blkno > last_move_dest_block)
last_move_dest_block = destvacpage->blkno;
@@ -2280,7 +2392,31 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
vacpage->offsets[vacpage->offsets_free++] =
ItemPointerGetOffsetNumber(&(tuple.t_self));
else
+ {
+ /*
+ * When we move tuple chains, we may need to move
+ * tuples from a block that we haven't yet scanned in
+ * the outer walk-along-the-relation loop. Note that we
+ * can't be moving a tuple from a block that we have
+ * already scanned because if such a tuple exists, then
+ * we must have moved the chain along with that tuple
+ * when we scanned that block. IOW the test of
+ * (Cbuf != buf) guarantees that the tuple we are
+ * looking at right now is in a block which is yet to
+ * be scanned.
+ *
+ * We maintain two counters to correctly count the
+ * moved-off tuples from blocks that are not yet
+ * scanned (keep_tuples) and how many of them have
+ * index pointers (keep_indexed_tuples). The main
+ * reason to track the latter is to help verify
+ * that indexes have the expected number of entries
+ * when all the dust settles.
+ */
+ if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
+ keep_indexed_tuples++;
keep_tuples++;
+ }
ReleaseBuffer(dst_buffer);
ReleaseBuffer(Cbuf);
@@ -2328,6 +2464,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
move_plain_tuple(onerel, buf, page, &tuple,
dst_buffer, dst_page, dst_vacpage, &ec);
+ /*
+ * If the tuple we are moving is a heap-only tuple,
+ * this move will generate an additional index entry,
+ * so increment the rel_indexed_tuples count.
+ */
+ if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
+ vacrelstats->rel_indexed_tuples++;
+
num_moved++;
if (dst_vacpage->blkno > last_move_dest_block)
last_move_dest_block = dst_vacpage->blkno;
@@ -2361,6 +2505,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (!ItemIdIsUsed(itemid))
continue;
+ /* Shouldn't be any DEAD or REDIRECT items anymore */
+ Assert(ItemIdIsNormal(itemid));
+
htup = (HeapTupleHeader) PageGetItem(page, itemid);
if (htup->t_infomask & HEAP_XMIN_COMMITTED)
continue;
@@ -2389,6 +2536,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
{
vacpage->offsets[vacpage->offsets_free++] = off;
Assert(keep_tuples > 0);
+ /*
+ * If this is not a heap-only tuple, there must be an
+ * index entry for this item which will be removed in
+ * the index cleanup. Decrement the keep_indexed_tuples
+ * count to remember this.
+ */
+ if (!HeapTupleHeaderIsHeapOnly(htup))
+ keep_indexed_tuples--;
keep_tuples--;
}
}
@@ -2396,6 +2551,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
{
vacpage->offsets[vacpage->offsets_free++] = off;
Assert(keep_tuples > 0);
+ if (!HeapTupleHeaderIsHeapOnly(htup))
+ keep_indexed_tuples--;
keep_tuples--;
}
}
@@ -2529,11 +2686,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
* page during chain moves but not been scanned over subsequently.
* The tuple ids of these tuples are not recorded as free offsets
* for any VacPage, so they will not be cleared from the indexes.
+ * keep_indexed_tuples is the portion of these that are expected
+ * to have index entries.
*/
Assert(keep_tuples >= 0);
for (i = 0; i < nindexes; i++)
vacuum_index(&Nvacpagelist, Irel[i],
- vacrelstats->rel_tuples, keep_tuples);
+ vacrelstats->rel_indexed_tuples,
+ keep_indexed_tuples);
}
/*
@@ -2551,7 +2711,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
OffsetNumber unused[MaxOffsetNumber];
OffsetNumber offnum,
maxoff;
- int uncnt;
+ int uncnt = 0;
int num_tuples = 0;
buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
@@ -2567,6 +2727,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (!ItemIdIsUsed(itemid))
continue;
+ /* Shouldn't be any DEAD or REDIRECT items anymore */
+ Assert(ItemIdIsNormal(itemid));
+
htup = (HeapTupleHeader) PageGetItem(page, itemid);
if (htup->t_infomask & HEAP_XMIN_COMMITTED)
continue;
@@ -2584,12 +2747,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
ItemIdSetUnused(itemid);
num_tuples++;
+
+ unused[uncnt++] = offnum;
}
Assert(vacpage->offsets_free == num_tuples);
START_CRIT_SECTION();
- uncnt = PageRepairFragmentation(page, unused);
+ PageRepairFragmentation(page);
MarkBufferDirty(buf);
@@ -2598,7 +2763,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buf, unused, uncnt);
+ recptr = log_heap_clean(onerel, buf,
+ NULL, 0, NULL, 0,
+ unused, uncnt,
+ false);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
@@ -2706,15 +2874,17 @@ move_chain_tuple(Relation rel,
/*
* Update the state of the copied tuple, and store it on the destination
- * page.
+ * page. The copied tuple is never part of a HOT chain.
*/
newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+ HeapTupleHeaderClearHotUpdated(newtup.t_data);
+ HeapTupleHeaderClearHeapOnly(newtup.t_data);
HeapTupleHeaderSetXvac(newtup.t_data, myXID);
newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
- InvalidOffsetNumber, false);
+ InvalidOffsetNumber, false, true);
if (newoff == InvalidOffsetNumber)
elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
(unsigned long) tuple_len, dst_vacpage->blkno);
@@ -2809,17 +2979,19 @@ move_plain_tuple(Relation rel,
START_CRIT_SECTION();
/*
- * Mark new tuple as MOVED_IN by me.
+ * Mark new tuple as MOVED_IN by me; also mark it not HOT.
*/
newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
HEAP_XMIN_INVALID |
HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+ HeapTupleHeaderClearHotUpdated(newtup.t_data);
+ HeapTupleHeaderClearHeapOnly(newtup.t_data);
HeapTupleHeaderSetXvac(newtup.t_data, myXID);
/* add tuple to the page */
newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
- InvalidOffsetNumber, false);
+ InvalidOffsetNumber, false, true);
if (newoff == InvalidOffsetNumber)
elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
(unsigned long) tuple_len,
@@ -2934,6 +3106,9 @@ update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
if (!ItemIdIsUsed(itemid))
continue;
+ /* Shouldn't be any DEAD or REDIRECT items anymore */
+ Assert(ItemIdIsNormal(itemid));
+
htup = (HeapTupleHeader) PageGetItem(page, itemid);
if (htup->t_infomask & HEAP_XMIN_COMMITTED)
continue;
@@ -3019,10 +3194,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
static void
vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
{
- OffsetNumber unused[MaxOffsetNumber];
- int uncnt;
Page page = BufferGetPage(buffer);
- ItemId itemid;
int i;
/* There shouldn't be any tuples moved onto the page yet! */
@@ -3032,11 +3204,12 @@ vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
for (i = 0; i < vacpage->offsets_free; i++)
{
- itemid = PageGetItemId(page, vacpage->offsets[i]);
+ ItemId itemid = PageGetItemId(page, vacpage->offsets[i]);
+
ItemIdSetUnused(itemid);
}
- uncnt = PageRepairFragmentation(page, unused);
+ PageRepairFragmentation(page);
MarkBufferDirty(buffer);
@@ -3045,7 +3218,10 @@ vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buffer, unused, uncnt);
+ recptr = log_heap_clean(onerel, buffer,
+ NULL, 0, NULL, 0,
+ vacpage->offsets, vacpage->offsets_free,
+ false);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
@@ -3527,8 +3703,7 @@ enough_space(VacPage vacpage, Size len)
static Size
PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
{
- PageHeader pd = (PageHeader) page;
- Size freespace = pd->pd_upper - pd->pd_lower;
+ Size freespace = PageGetHeapFreeSpace(page);
Size targetfree;
targetfree = RelationGetTargetPageFreeSpace(relation,
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 3faf172acbf..b9050719cb4 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -36,7 +36,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/vacuumlazy.c,v 1.96 2007/09/16 02:37:46 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/vacuumlazy.c,v 1.97 2007/09/20 17:56:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -326,8 +326,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
- /* Initially, we only need shared access to the buffer */
- LockBuffer(buf, BUFFER_LOCK_SHARE);
+ /* We need buffer cleanup lock so that we can prune HOT chains. */
+ LockBufferForCleanup(buf);
page = BufferGetPage(buf);
@@ -341,11 +341,10 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
* We have to be careful here because we could be looking at a
* page that someone has just added to the relation and not yet
* been able to initialize (see RelationGetBufferForTuple). To
- * interlock against that, release the buffer read lock (which we
- * must do anyway) and grab the relation extension lock before
- * re-locking in exclusive mode. If the page is still
- * uninitialized by then, it must be left over from a crashed
- * backend, and we can initialize it.
+ * protect against that, release the buffer lock, grab the
+ * relation extension lock momentarily, and re-lock the buffer.
+ * If the page is still uninitialized by then, it must be left
+ * over from a crashed backend, and we can initialize it.
*
* We don't really need the relation lock when this is a new or
* temp relation, but it's probably not worth the code space to
@@ -357,7 +356,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockRelationForExtension(onerel, ExclusiveLock);
UnlockRelationForExtension(onerel, ExclusiveLock);
- LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ LockBufferForCleanup(buf);
if (PageIsNew(page))
{
ereport(WARNING,
@@ -366,7 +365,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
PageInit(page, BufferGetPageSize(buf), 0);
empty_pages++;
lazy_record_free_space(vacrelstats, blkno,
- PageGetFreeSpace(page));
+ PageGetHeapFreeSpace(page));
}
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
@@ -377,11 +376,23 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
{
empty_pages++;
lazy_record_free_space(vacrelstats, blkno,
- PageGetFreeSpace(page));
+ PageGetHeapFreeSpace(page));
UnlockReleaseBuffer(buf);
continue;
}
+ /*
+ * Prune all HOT-update chains in this page.
+ *
+ * We count tuples removed by the pruning step as removed by VACUUM.
+ */
+ tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
+ false, false);
+
+ /*
+ * Now scan the page to collect vacuumable items and check for
+ * tuples requiring freezing.
+ */
nfrozen = 0;
hastup = false;
prev_dead_count = vacrelstats->num_dead_tuples;
@@ -394,22 +405,64 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
itemid = PageGetItemId(page, offnum);
+ /* Unused items require no processing, but we count 'em */
if (!ItemIdIsUsed(itemid))
{
nunused += 1;
continue;
}
+ /* Redirect items mustn't be touched */
+ if (ItemIdIsRedirected(itemid))
+ {
+ hastup = true; /* this page won't be truncatable */
+ continue;
+ }
+
+ ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+ /*
+ * DEAD item pointers are to be vacuumed normally; but we don't
+ * count them in tups_vacuumed, else we'd be double-counting
+ * (at least in the common case where heap_page_prune() just
+ * freed up a non-HOT tuple).
+ */
+ if (ItemIdIsDead(itemid))
+ {
+ lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ continue;
+ }
+
+ Assert(ItemIdIsNormal(itemid));
+
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
tuple.t_len = ItemIdGetLength(itemid);
- ItemPointerSet(&(tuple.t_self), blkno, offnum);
tupgone = false;
switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
{
case HEAPTUPLE_DEAD:
- tupgone = true; /* we can delete the tuple */
+ /*
+ * Ordinarily, DEAD tuples would have been removed by
+ * heap_page_prune(), but it's possible that the tuple
+ * state changed since heap_page_prune() looked. In
+ * particular an INSERT_IN_PROGRESS tuple could have
+ * changed to DEAD if the inserter aborted. So this
+ * cannot be considered an error condition.
+ *
+ * If the tuple is HOT-updated then it must only be
+ * removed by a prune operation; so we keep it just as
+ * if it were RECENTLY_DEAD. Also, if it's a heap-only
+ * tuple, we choose to keep it, because it'll be a
+ * lot cheaper to get rid of it in the next pruning pass
+ * than to treat it like an indexed tuple.
+ */
+ if (HeapTupleIsHotUpdated(&tuple) ||
+ HeapTupleIsHeapOnly(&tuple))
+ nkeep += 1;
+ else
+ tupgone = true; /* we can delete the tuple */
break;
case HEAPTUPLE_LIVE:
/* Tuple is good --- but let's do some validity checks */
@@ -449,11 +502,10 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
/*
* Each non-removable tuple must be checked to see if it
- * needs freezing. If we already froze anything, then
- * we've already switched the buffer lock to exclusive.
+ * needs freezing. Note we already have exclusive buffer lock.
*/
if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
- (nfrozen > 0) ? InvalidBuffer : buf))
+ InvalidBuffer))
frozen[nfrozen++] = offnum;
}
} /* scan along page */
@@ -485,9 +537,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
if (nindexes == 0 &&
vacrelstats->num_dead_tuples > 0)
{
- /* Trade in buffer share lock for super-exclusive lock */
- LockBuffer(buf, BUFFER_LOCK_UNLOCK);
- LockBufferForCleanup(buf);
/* Remove tuples from heap */
lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
/* Forget the now-vacuumed tuples, and press on */
@@ -505,7 +554,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
if (vacrelstats->num_dead_tuples == prev_dead_count)
{
lazy_record_free_space(vacrelstats, blkno,
- PageGetFreeSpace(page));
+ PageGetHeapFreeSpace(page));
}
/* Remember the location of the last page with nonremovable tuples */
@@ -598,7 +647,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
/* Now that we've compacted the page, record its available space */
page = BufferGetPage(buf);
lazy_record_free_space(vacrelstats, tblk,
- PageGetFreeSpace(page));
+ PageGetHeapFreeSpace(page));
UnlockReleaseBuffer(buf);
npages++;
}
@@ -615,7 +664,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
* lazy_vacuum_page() -- free dead tuples on a page
* and repair its fragmentation.
*
- * Caller must hold pin and lock on the buffer.
+ * Caller must hold pin and buffer cleanup lock on the buffer.
*
* tupindex is the index in vacrelstats->dead_tuples of the first dead
* tuple for this page. We assume the rest follow sequentially.
@@ -625,10 +674,9 @@ static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
int tupindex, LVRelStats *vacrelstats)
{
- OffsetNumber unused[MaxOffsetNumber];
- int uncnt;
Page page = BufferGetPage(buffer);
- ItemId itemid;
+ OffsetNumber unused[MaxOffsetNumber];
+ int uncnt = 0;
START_CRIT_SECTION();
@@ -636,6 +684,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
{
BlockNumber tblk;
OffsetNumber toff;
+ ItemId itemid;
tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
if (tblk != blkno)
@@ -643,9 +692,10 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
itemid = PageGetItemId(page, toff);
ItemIdSetUnused(itemid);
+ unused[uncnt++] = toff;
}
- uncnt = PageRepairFragmentation(page, unused);
+ PageRepairFragmentation(page);
MarkBufferDirty(buffer);
@@ -654,7 +704,10 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buffer, unused, uncnt);
+ recptr = log_heap_clean(onerel, buffer,
+ NULL, 0, NULL, 0,
+ unused, uncnt,
+ false);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
@@ -980,7 +1033,7 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
/*
* The array shouldn't overflow under normal behavior, but perhaps it
* could if we are given a really small maintenance_work_mem. In that
- * case, just forget the last few tuples.
+ * case, just forget the last few tuples (we'll get 'em next time).
*/
if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
{