aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/cluster.c')
-rw-r--r--src/backend/commands/cluster.c306
1 files changed, 242 insertions, 64 deletions
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 837390744fb..ff3ac9f8e55 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -15,7 +15,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.83 2002/07/12 18:43:15 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.84 2002/08/10 20:43:46 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -27,45 +27,74 @@
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
+#include "catalog/indexing.h"
+#include "catalog/catname.h"
#include "catalog/pg_index.h"
#include "catalog/pg_proc.h"
#include "commands/cluster.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+#include "utils/relcache.h"
+/*
+ * We need one of these structs for each index in the relation to be
+ * clustered. It's basically the data needed by index_create() so
+ * we can recreate the indexes after destroying the old heap.
+ */
+typedef struct
+{
+ char *indexName;
+ IndexInfo *indexInfo;
+ Oid accessMethodOID;
+ Oid *classOID;
+ Oid indexOID;
+ bool isPrimary;
+} IndexAttrs;
static Oid copy_heap(Oid OIDOldHeap, const char *NewName);
-static Oid copy_index(Oid OIDOldIndex, Oid OIDNewHeap,
- const char *NewIndexName);
static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
+static List *get_indexattr_list (Oid OIDOldHeap);
+static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
+static void swap_relfilenodes(Oid r1, Oid r2);
+
+Relation RelationIdGetRelation(Oid relationId);
/*
* cluster
*
- * STILL TO DO:
- * Create a list of all the other indexes on this relation. Because
- * the cluster will wreck all the tids, I'll need to destroy bogus
- * indexes. The user will have to re-create them. Not nice, but
- * I'm not a nice guy. The alternative is to try some kind of post
- * destroy re-build. This may be possible. I'll check out what the
- * index create functiond want in the way of paramaters. On the other
- * hand, re-creating n indexes may blow out the space.
+ * This clusters the table by creating a new, clustered table and
+ * swapping the relfilenodes of the new table and the old table, so
+ * the OID of the original table is preserved. Thus we do not lose
+ * GRANT, inheritance nor references to this table (this was a bug
+ * in releases thru 7.3)
+ *
+ * Also create new indexes and swap the filenodes with the old indexes
+ * the same way we do for the relation.
+ *
+ * TODO:
+ * maybe we can get away with AccessShareLock for the table.
+ * Concurrency would be much improved. Only acquire
+ * AccessExclusiveLock right before swapping the filenodes.
+ * This would allow users to CLUSTER on a regular basis,
+ * practically eliminating the need for auto-clustered indexes.
+ *
+ * Preserve constraint bit for the indexes.
*/
void
cluster(RangeVar *oldrelation, char *oldindexname)
{
Oid OIDOldHeap,
OIDOldIndex,
- OIDNewHeap,
- OIDNewIndex;
+ OIDNewHeap;
Relation OldHeap,
OldIndex;
char NewHeapName[NAMEDATALEN];
- char NewIndexName[NAMEDATALEN];
ObjectAddress object;
+ List *indexes;
/*
* We grab exclusive access to the target rel and index for the
@@ -96,6 +125,9 @@ cluster(RangeVar *oldrelation, char *oldindexname)
heap_close(OldHeap, NoLock);
index_close(OldIndex);
+ /* Save the information of all indexes on the relation. */
+ indexes = get_indexattr_list(OIDOldHeap);
+
/*
* Create the new heap with a temporary name.
*/
@@ -113,29 +145,27 @@ cluster(RangeVar *oldrelation, char *oldindexname)
/* To make the new heap's data visible. */
CommandCounterIncrement();
- /* Create new index over the tuples of the new heap. */
- snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex);
-
- OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName);
+ /* Swap the relfilenodes of the old and new heaps. */
+ swap_relfilenodes(OIDNewHeap, OIDOldHeap);
CommandCounterIncrement();
- /* Destroy old heap (along with its index) and rename new. */
+ /* Destroy new heap with old filenode */
object.classId = RelOid_pg_class;
- object.objectId = OIDOldHeap;
+ object.objectId = OIDNewHeap;
object.objectSubId = 0;
- /* XXX better to use DROP_CASCADE here? */
+ /* The relation is local to our transaction and we know nothin
+ * depends on it, so DROP_RESTRICT should be OK.
+ */
performDeletion(&object, DROP_RESTRICT);
/* performDeletion does CommandCounterIncrement at end */
- renamerel(OIDNewHeap, oldrelation->relname);
-
- /* This one might be unnecessary, but let's be safe. */
- CommandCounterIncrement();
-
- renamerel(OIDNewIndex, oldindexname);
+ /* Recreate the indexes on the relation. We do not need
+ * CommandCounterIncrement() because recreate_indexattr does it.
+ */
+ recreate_indexattr(OIDOldHeap, indexes);
}
static Oid
@@ -181,43 +211,6 @@ copy_heap(Oid OIDOldHeap, const char *NewName)
return OIDNewHeap;
}
-static Oid
-copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName)
-{
- Oid OIDNewIndex;
- Relation OldIndex,
- NewHeap;
- IndexInfo *indexInfo;
-
- NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
- OldIndex = index_open(OIDOldIndex);
-
- /*
- * Create a new index like the old one. To do this I get the info
- * from pg_index, and add a new index with a temporary name (that will
- * be changed later).
- */
- indexInfo = BuildIndexInfo(OldIndex->rd_index);
-
- OIDNewIndex = index_create(OIDNewHeap,
- NewIndexName,
- indexInfo,
- OldIndex->rd_rel->relam,
- OldIndex->rd_index->indclass,
- OldIndex->rd_index->indisprimary,
- false, /* XXX losing constraint status */
- allowSystemTableMods);
-
- setRelhasindex(OIDNewHeap, true,
- OldIndex->rd_index->indisprimary, InvalidOid);
-
- index_close(OldIndex);
- heap_close(NewHeap, NoLock);
-
- return OIDNewIndex;
-}
-
-
static void
rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
{
@@ -261,3 +254,188 @@ rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
heap_close(LocalOldHeap, NoLock);
heap_close(LocalNewHeap, NoLock);
}
+
+/* Get the necessary info about the indexes in the relation and
+ * return a List of IndexAttrs.
+ */
+List *
+get_indexattr_list (Oid OIDOldHeap)
+{
+ ScanKeyData entry;
+ HeapScanDesc scan;
+ Relation indexRelation;
+ HeapTuple indexTuple;
+ List *indexes = NIL;
+ IndexAttrs *attrs;
+ HeapTuple tuple;
+ Form_pg_index index;
+
+ /* Grab the index tuples by looking into RelationRelationName
+ * by the OID of the old heap.
+ */
+ indexRelation = heap_openr(IndexRelationName, AccessShareLock);
+ ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
+ F_OIDEQ, ObjectIdGetDatum(OIDOldHeap));
+ scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
+ while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ index = (Form_pg_index) GETSTRUCT(indexTuple);
+
+ attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs));
+ attrs->indexInfo = BuildIndexInfo(index);
+ attrs->isPrimary = index->indisprimary;
+ attrs->indexOID = index->indexrelid;
+
+ /* The opclasses are copied verbatim from the original indexes.
+ */
+ attrs->classOID = (Oid *)palloc(sizeof(Oid) *
+ attrs->indexInfo->ii_NumIndexAttrs);
+ memcpy(attrs->classOID, index->indclass,
+ sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
+
+ /* Name and access method of each index come from
+ * RelationRelationName.
+ */
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(attrs->indexOID),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID);
+ attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname));
+ attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam;
+ ReleaseSysCache(tuple);
+
+ /* Cons the gathered data into the list. We do not care about
+ * ordering, and this is more efficient than append.
+ */
+ indexes=lcons((void *)attrs, indexes);
+ }
+ heap_endscan(scan);
+ heap_close(indexRelation, AccessShareLock);
+ return indexes;
+}
+
+/* Create new indexes and swap the filenodes with old indexes. Then drop
+ * the new index (carrying the old heap along).
+ */
+void
+recreate_indexattr(Oid OIDOldHeap, List *indexes)
+{
+ IndexAttrs *attrs;
+ List *elem;
+ Oid newIndexOID;
+ char newIndexName[NAMEDATALEN];
+ ObjectAddress object;
+
+ foreach (elem, indexes)
+ {
+ attrs=(IndexAttrs *) lfirst(elem);
+
+ /* Create the new index under a temporary name */
+ snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID);
+
+ /* The new index will have constraint status set to false,
+ * but since we will only use its filenode it doesn't matter:
+ * after the filenode swap the index will keep the constraint
+ * status of the old index.
+ */
+ newIndexOID = index_create(OIDOldHeap, newIndexName,
+ attrs->indexInfo, attrs->accessMethodOID,
+ attrs->classOID, attrs->isPrimary,
+ false, allowSystemTableMods);
+ CommandCounterIncrement();
+
+ /* Swap the filenodes. */
+ swap_relfilenodes(newIndexOID, attrs->indexOID);
+ setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid);
+
+ /* I'm not sure this one is needed, but let's be safe. */
+ CommandCounterIncrement();
+
+ /* Destroy new index with old filenode */
+ object.classId = RelOid_pg_class;
+ object.objectId = newIndexOID;
+ object.objectSubId = 0;
+
+ /* The relation is local to our transaction and we know
+ * nothing depends on it, so DROP_RESTRICT should be OK.
+ */
+ performDeletion(&object, DROP_RESTRICT);
+
+ /* performDeletion does CommandCounterIncrement() at its end */
+
+ pfree(attrs->classOID);
+ pfree(attrs);
+ }
+ freeList(indexes);
+}
+
+/* Swap the relfilenodes for two given relations.
+ */
+void
+swap_relfilenodes(Oid r1, Oid r2)
+{
+ /* I can probably keep RelationRelationName open in the main
+ * function and pass the Relation around so I don't have to open
+ * it every time.
+ */
+ Relation relRelation,
+ irels[Num_pg_class_indices],
+ rel;
+ HeapTuple reltup[2];
+ Oid tempRFNode;
+ int i;
+
+ /* We need both RelationRelationName tuples. */
+ relRelation = heap_openr(RelationRelationName, RowExclusiveLock);
+
+ reltup[0] = SearchSysCacheCopy(RELOID,
+ ObjectIdGetDatum(r1),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(reltup[0]))
+ elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1);
+ reltup[1] = SearchSysCacheCopy(RELOID,
+ ObjectIdGetDatum(r2),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(reltup[1]))
+ elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2);
+
+ /* The buffer manager gets confused if we swap relfilenodes for
+ * relations that are not both local or non-local to this transaction.
+ * Flush the buffers on both relations so the buffer manager can
+ * forget about'em.
+ */
+
+ rel = RelationIdGetRelation(r1);
+ i = FlushRelationBuffers(rel, 0);
+ if (i < 0)
+ elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i);
+ RelationClose(rel);
+ rel = RelationIdGetRelation(r1);
+ i = FlushRelationBuffers(rel, 0);
+ if (i < 0)
+ elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i);
+ RelationClose(rel);
+
+ /* Actually swap the filenodes */
+
+ tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode;
+ ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode =
+ ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode;
+ ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode;
+
+ /* Update the RelationRelationName tuples */
+ simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]);
+ simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]);
+
+ /* To keep system catalogs current. */
+ CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, irels);
+ CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[1]);
+ CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[0]);
+ CatalogCloseIndices(Num_pg_class_indices, irels);
+ CommandCounterIncrement();
+
+ heap_close(relRelation, NoLock);
+ heap_freetuple(reltup[0]);
+ heap_freetuple(reltup[1]);
+}