aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c653
1 files changed, 592 insertions, 61 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 59806349cc7..57ee112653f 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -266,6 +266,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = {
gettext_noop("table \"%s\" does not exist, skipping"),
gettext_noop("\"%s\" is not a table"),
gettext_noop("Use DROP TABLE to remove a table.")},
+ {RELKIND_PARTITIONED_INDEX,
+ ERRCODE_UNDEFINED_OBJECT,
+ gettext_noop("index \"%s\" does not exist"),
+ gettext_noop("index \"%s\" does not exist, skipping"),
+ gettext_noop("\"%s\" is not an index"),
+ gettext_noop("Use DROP INDEX to remove an index.")},
{'\0', 0, NULL, NULL, NULL, NULL}
};
@@ -284,6 +290,7 @@ struct DropRelationCallbackState
#define ATT_INDEX 0x0008
#define ATT_COMPOSITE_TYPE 0x0010
#define ATT_FOREIGN_TABLE 0x0020
+#define ATT_PARTITIONED_INDEX 0x0040
/*
* Partition tables are expected to be dropped when the parent partitioned
@@ -475,11 +482,17 @@ static void CreateInheritance(Relation child_rel, Relation parent_rel);
static void RemoveInheritance(Relation child_rel, Relation parent_rel);
static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
PartitionCmd *cmd);
+static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel);
static void ValidatePartitionConstraints(List **wqueue, Relation scanrel,
List *scanrel_children,
List *partConstraint,
bool validate_default);
static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name);
+static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation rel,
+ RangeVar *name);
+static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
+static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
+ Relation partitionTbl);
/* ----------------------------------------------------------------
@@ -897,6 +910,53 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
StorePartitionKey(rel, strategy, partnatts, partattrs, partexprs,
partopclass, partcollation);
+
+ /* make it all visible */
+ CommandCounterIncrement();
+ }
+
+ /*
+ * If we're creating a partition, create now all the indexes defined in
+ * the parent. We can't do it earlier, because DefineIndex wants to know
+ * the partition key which we just stored.
+ */
+ if (stmt->partbound)
+ {
+ Oid parentId = linitial_oid(inheritOids);
+ Relation parent;
+ List *idxlist;
+ ListCell *cell;
+
+ /* Already have strong enough lock on the parent */
+ parent = heap_open(parentId, NoLock);
+ idxlist = RelationGetIndexList(parent);
+
+ /*
+ * For each index in the parent table, create one in the partition
+ */
+ foreach(cell, idxlist)
+ {
+ Relation idxRel = index_open(lfirst_oid(cell), AccessShareLock);
+ AttrNumber *attmap;
+ IndexStmt *idxstmt;
+
+ attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
+ RelationGetDescr(parent),
+ gettext_noop("could not convert row type"));
+ idxstmt =
+ generateClonedIndexStmt(NULL, RelationGetRelid(rel), idxRel,
+ attmap, RelationGetDescr(rel)->natts);
+ DefineIndex(RelationGetRelid(rel),
+ idxstmt,
+ InvalidOid,
+ RelationGetRelid(idxRel),
+ false, false, false, false, false);
+
+ index_close(idxRel, AccessShareLock);
+ }
+
+ list_free(idxlist);
+ heap_close(parent, NoLock);
}
/*
@@ -1179,10 +1239,13 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
* but RemoveRelations() can only pass one relkind for a given relation.
* It chooses RELKIND_RELATION for both regular and partitioned tables.
* That means we must be careful before giving the wrong type error when
- * the relation is RELKIND_PARTITIONED_TABLE.
+ * the relation is RELKIND_PARTITIONED_TABLE. An equivalent problem
+ * exists with indexes.
*/
if (classform->relkind == RELKIND_PARTITIONED_TABLE)
expected_relkind = RELKIND_RELATION;
+ else if (classform->relkind == RELKIND_PARTITIONED_INDEX)
+ expected_relkind = RELKIND_INDEX;
else
expected_relkind = classform->relkind;
@@ -1210,7 +1273,8 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
* we do it the other way around. No error if we don't find a pg_index
* entry, though --- the relation may have been dropped.
*/
- if (relkind == RELKIND_INDEX && relOid != oldRelOid)
+ if ((relkind == RELKIND_INDEX || relkind == RELKIND_PARTITIONED_INDEX) &&
+ relOid != oldRelOid)
{
state->heapOid = IndexGetRelation(relOid, true);
if (OidIsValid(state->heapOid))
@@ -2396,27 +2460,11 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int32 seqNumber, Relation inhRelation,
bool child_is_partition)
{
- TupleDesc desc = RelationGetDescr(inhRelation);
- Datum values[Natts_pg_inherits];
- bool nulls[Natts_pg_inherits];
ObjectAddress childobject,
parentobject;
- HeapTuple tuple;
-
- /*
- * Make the pg_inherits entry
- */
- values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId);
- values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid);
- values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber);
- memset(nulls, 0, sizeof(nulls));
-
- tuple = heap_form_tuple(desc, values, nulls);
-
- CatalogTupleInsert(inhRelation, tuple);
-
- heap_freetuple(tuple);
+ /* store the pg_inherits row */
+ StoreSingleInheritance(relationId, parentOid, seqNumber);
/*
* Store a dependency too
@@ -2540,6 +2588,7 @@ renameatt_check(Oid myrelid, Form_pg_class classform, bool recursing)
relkind != RELKIND_MATVIEW &&
relkind != RELKIND_COMPOSITE_TYPE &&
relkind != RELKIND_INDEX &&
+ relkind != RELKIND_PARTITIONED_INDEX &&
relkind != RELKIND_FOREIGN_TABLE &&
relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
@@ -3019,7 +3068,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal)
/*
* Also rename the associated constraint, if any.
*/
- if (targetrelation->rd_rel->relkind == RELKIND_INDEX)
+ if (targetrelation->rd_rel->relkind == RELKIND_INDEX ||
+ targetrelation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
{
Oid constraintId = get_index_constraint(myrelid);
@@ -3073,6 +3123,7 @@ CheckTableNotInUse(Relation rel, const char *stmt)
stmt, RelationGetRelationName(rel))));
if (rel->rd_rel->relkind != RELKIND_INDEX &&
+ rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX &&
AfterTriggerPendingOnRel(RelationGetRelid(rel)))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
@@ -3764,6 +3815,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_AttachPartition:
+ ATSimplePermissions(rel, ATT_TABLE | ATT_PARTITIONED_INDEX);
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
case AT_DetachPartition:
ATSimplePermissions(rel, ATT_TABLE);
/* No command-specific prep needed */
@@ -4112,9 +4167,15 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
ATExecGenericOptions(rel, (List *) cmd->def);
break;
case AT_AttachPartition:
- ATExecAttachPartition(wqueue, rel, (PartitionCmd *) cmd->def);
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ATExecAttachPartition(wqueue, rel, (PartitionCmd *) cmd->def);
+ else
+ ATExecAttachPartitionIdx(wqueue, rel,
+ ((PartitionCmd *) cmd->def)->name);
break;
case AT_DetachPartition:
+ /* ATPrepCmd ensures it must be a table */
+ Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
ATExecDetachPartition(rel, ((PartitionCmd *) cmd->def)->name);
break;
default: /* oops */
@@ -4148,9 +4209,13 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode)
{
AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
- /* Foreign tables have no storage, nor do partitioned tables. */
+ /*
+ * Foreign tables have no storage, nor do partitioned tables and
+ * indexes.
+ */
if (tab->relkind == RELKIND_FOREIGN_TABLE ||
- tab->relkind == RELKIND_PARTITIONED_TABLE)
+ tab->relkind == RELKIND_PARTITIONED_TABLE ||
+ tab->relkind == RELKIND_PARTITIONED_INDEX)
continue;
/*
@@ -4752,6 +4817,9 @@ ATSimplePermissions(Relation rel, int allowed_targets)
case RELKIND_INDEX:
actual_target = ATT_INDEX;
break;
+ case RELKIND_PARTITIONED_INDEX:
+ actual_target = ATT_PARTITIONED_INDEX;
+ break;
case RELKIND_COMPOSITE_TYPE:
actual_target = ATT_COMPOSITE_TYPE;
break;
@@ -6194,6 +6262,7 @@ ATPrepSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa
if (rel->rd_rel->relkind != RELKIND_RELATION &&
rel->rd_rel->relkind != RELKIND_MATVIEW &&
rel->rd_rel->relkind != RELKIND_INDEX &&
+ rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX &&
rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
@@ -6205,7 +6274,9 @@ ATPrepSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa
* We allow referencing columns by numbers only for indexes, since table
* column numbers could contain gaps if columns are later dropped.
*/
- if (rel->rd_rel->relkind != RELKIND_INDEX && !colName)
+ if (rel->rd_rel->relkind != RELKIND_INDEX &&
+ rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX &&
+ !colName)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot refer to non-index column by number")));
@@ -6283,7 +6354,8 @@ ATExecSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa
errmsg("cannot alter system column \"%s\"",
colName)));
- if (rel->rd_rel->relkind == RELKIND_INDEX &&
+ if ((rel->rd_rel->relkind == RELKIND_INDEX ||
+ rel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
rel->rd_index->indkey.values[attnum - 1] != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -6736,6 +6808,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
address = DefineIndex(RelationGetRelid(rel),
stmt,
InvalidOid, /* no predefined OID */
+ InvalidOid, /* no parent index */
true, /* is_alter_table */
check_rights,
false, /* check_not_in_use - we did it already */
@@ -9139,7 +9212,8 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
{
char relKind = get_rel_relkind(foundObject.objectId);
- if (relKind == RELKIND_INDEX)
+ if (relKind == RELKIND_INDEX ||
+ relKind == RELKIND_PARTITIONED_INDEX)
{
Assert(foundObject.objectSubId == 0);
if (!list_member_oid(tab->changedIndexOids, foundObject.objectId))
@@ -9982,6 +10056,15 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
newOwnerId = tuple_class->relowner;
}
break;
+ case RELKIND_PARTITIONED_INDEX:
+ if (recursing)
+ break;
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot change owner of index \"%s\"",
+ NameStr(tuple_class->relname)),
+ errhint("Change the ownership of the index's table, instead.")));
+ break;
case RELKIND_SEQUENCE:
if (!recursing &&
tuple_class->relowner != newOwnerId)
@@ -10103,6 +10186,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
*/
if (tuple_class->relkind != RELKIND_COMPOSITE_TYPE &&
tuple_class->relkind != RELKIND_INDEX &&
+ tuple_class->relkind != RELKIND_PARTITIONED_INDEX &&
tuple_class->relkind != RELKIND_TOASTVALUE)
changeDependencyOnOwner(RelationRelationId, relationOid,
newOwnerId);
@@ -10110,7 +10194,8 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
/*
* Also change the ownership of the table's row type, if it has one
*/
- if (tuple_class->relkind != RELKIND_INDEX)
+ if (tuple_class->relkind != RELKIND_INDEX &&
+ tuple_class->relkind != RELKIND_PARTITIONED_INDEX)
AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId);
/*
@@ -10119,6 +10204,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
* relation, as well as its toast table (if it has one).
*/
if (tuple_class->relkind == RELKIND_RELATION ||
+ tuple_class->relkind == RELKIND_PARTITIONED_TABLE ||
tuple_class->relkind == RELKIND_MATVIEW ||
tuple_class->relkind == RELKIND_TOASTVALUE)
{
@@ -10427,6 +10513,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
(void) view_reloptions(newOptions, true);
break;
case RELKIND_INDEX:
+ case RELKIND_PARTITIONED_INDEX:
(void) index_reloptions(rel->rd_amroutine->amoptions, newOptions, true);
break;
default:
@@ -10839,7 +10926,8 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
relForm->relkind != RELKIND_RELATION &&
relForm->relkind != RELKIND_PARTITIONED_TABLE) ||
(stmt->objtype == OBJECT_INDEX &&
- relForm->relkind != RELKIND_INDEX) ||
+ relForm->relkind != RELKIND_INDEX &&
+ relForm->relkind != RELKIND_PARTITIONED_INDEX) ||
(stmt->objtype == OBJECT_MATVIEW &&
relForm->relkind != RELKIND_MATVIEW))
continue;
@@ -11633,45 +11721,18 @@ RemoveInheritance(Relation child_rel, Relation parent_rel)
Relation catalogRelation;
SysScanDesc scan;
ScanKeyData key[3];
- HeapTuple inheritsTuple,
- attributeTuple,
+ HeapTuple attributeTuple,
constraintTuple;
List *connames;
- bool found = false;
+ bool found;
bool child_is_partition = false;
/* If parent_rel is a partitioned table, child_rel must be a partition */
if (parent_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
child_is_partition = true;
- /*
- * Find and destroy the pg_inherits entry linking the two, or error out if
- * there is none.
- */
- catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
- ScanKeyInit(&key[0],
- Anum_pg_inherits_inhrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(child_rel)));
- scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
- true, NULL, 1, key);
-
- while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
- {
- Oid inhparent;
-
- inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
- if (inhparent == RelationGetRelid(parent_rel))
- {
- CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self);
- found = true;
- break;
- }
- }
-
- systable_endscan(scan);
- heap_close(catalogRelation, RowExclusiveLock);
-
+ found = DeleteInheritsTuple(RelationGetRelid(child_rel),
+ RelationGetRelid(parent_rel));
if (!found)
{
if (child_is_partition)
@@ -13226,7 +13287,8 @@ RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a composite type", rv->relname)));
- if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX
+ if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX &&
+ relkind != RELKIND_PARTITIONED_INDEX
&& !IsA(stmt, RenameStmt))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -13946,6 +14008,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
/* Update the pg_class entry. */
StorePartitionBound(attachrel, rel, cmd->bound);
+ /* Ensure there exists a correct set of indexes in the partition. */
+ AttachPartitionEnsureIndexes(rel, attachrel);
+
/*
* Generate partition constraint from the partition bound specification.
* If the parent itself is a partition, make sure to include its
@@ -14016,6 +14081,127 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
}
/*
+ * AttachPartitionEnsureIndexes
+ * subroutine for ATExecAttachPartition to create/match indexes
+ *
+ * Enforce the indexing rule for partitioned tables during ALTER TABLE / ATTACH
+ * PARTITION: every partition must have an index attached to each index on the
+ * partitioned table.
+ */
+static void
+AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
+{
+ List *idxes;
+ List *attachRelIdxs;
+ Relation *attachrelIdxRels;
+ IndexInfo **attachInfos;
+ int i;
+ ListCell *cell;
+ MemoryContext cxt;
+ MemoryContext oldcxt;
+
+ cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "AttachPartitionEnsureIndexes",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcxt = MemoryContextSwitchTo(cxt);
+
+ idxes = RelationGetIndexList(rel);
+ attachRelIdxs = RelationGetIndexList(attachrel);
+ attachrelIdxRels = palloc(sizeof(Relation) * list_length(attachRelIdxs));
+ attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs));
+
+ /* Build arrays of all existing indexes and their IndexInfos */
+ i = 0;
+ foreach(cell, attachRelIdxs)
+ {
+ Oid cldIdxId = lfirst_oid(cell);
+
+ attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock);
+ attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]);
+ i++;
+ }
+
+ /*
+ * For each index on the partitioned table, find a matching one in the
+ * partition-to-be; if one is not found, create one.
+ */
+ foreach(cell, idxes)
+ {
+ Oid idx = lfirst_oid(cell);
+ Relation idxRel = index_open(idx, AccessShareLock);
+ IndexInfo *info;
+ AttrNumber *attmap;
+ bool found = false;
+
+ /*
+ * Ignore indexes in the partitioned table other than partitioned
+ * indexes.
+ */
+ if (idxRel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
+ {
+ index_close(idxRel, AccessShareLock);
+ continue;
+ }
+
+ /* construct an indexinfo to compare existing indexes against */
+ info = BuildIndexInfo(idxRel);
+ attmap = convert_tuples_by_name_map(RelationGetDescr(attachrel),
+ RelationGetDescr(rel),
+ gettext_noop("could not convert row type"));
+
+ /*
+ * Scan the list of existing indexes in the partition-to-be, and mark
+ * the first matching, unattached one we find, if any, as partition of
+ * the parent index. If we find one, we're done.
+ */
+ for (i = 0; i < list_length(attachRelIdxs); i++)
+ {
+ /* does this index have a parent? if so, can't use it */
+ if (has_superclass(RelationGetRelid(attachrelIdxRels[i])))
+ continue;
+
+ if (CompareIndexInfo(attachInfos[i], info,
+ attachrelIdxRels[i]->rd_indcollation,
+ idxRel->rd_indcollation,
+ attachrelIdxRels[i]->rd_opfamily,
+ idxRel->rd_opfamily,
+ attmap,
+ RelationGetDescr(rel)->natts))
+ {
+ /* bingo. */
+ IndexSetParentIndex(attachrelIdxRels[i], idx);
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * If no suitable index was found in the partition-to-be, create one
+ * now.
+ */
+ if (!found)
+ {
+ IndexStmt *stmt;
+
+ stmt = generateClonedIndexStmt(NULL, RelationGetRelid(attachrel),
+ idxRel, attmap,
+ RelationGetDescr(rel)->natts);
+ DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid,
+ RelationGetRelid(idxRel),
+ false, false, false, false, false);
+ }
+
+ index_close(idxRel, AccessShareLock);
+ }
+
+ /* Clean up. */
+ for (i = 0; i < list_length(attachRelIdxs); i++)
+ index_close(attachrelIdxRels[i], AccessShareLock);
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(cxt);
+}
+
+/*
* ALTER TABLE DETACH PARTITION
*
* Return the address of the relation that is no longer a partition of rel.
@@ -14033,6 +14219,8 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
new_repl[Natts_pg_class];
ObjectAddress address;
Oid defaultPartOid;
+ List *indexes;
+ ListCell *cell;
/*
* We must lock the default partition, because detaching this partition
@@ -14094,6 +14282,24 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
}
}
+ /* detach indexes too */
+ indexes = RelationGetIndexList(partRel);
+ foreach(cell, indexes)
+ {
+ Oid idxid = lfirst_oid(cell);
+ Relation idx;
+
+ if (!has_superclass(idxid))
+ continue;
+
+ Assert((IndexGetRelation(get_partition_parent(idxid), false) ==
+ RelationGetRelid(rel)));
+
+ idx = index_open(idxid, AccessExclusiveLock);
+ IndexSetParentIndex(idx, InvalidOid);
+ relation_close(idx, AccessExclusiveLock);
+ }
+
/*
* Invalidate the parent's relcache so that the partition is no longer
* included in its partition descriptor.
@@ -14107,3 +14313,328 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
return address;
}
+
+/*
+ * Before acquiring lock on an index, acquire the same lock on the owning
+ * table.
+ */
+struct AttachIndexCallbackState
+{
+ Oid partitionOid;
+ Oid parentTblOid;
+ bool lockedParentTbl;
+};
+
+static void
+RangeVarCallbackForAttachIndex(const RangeVar *rv, Oid relOid, Oid oldRelOid,
+ void *arg)
+{
+ struct AttachIndexCallbackState *state;
+ Form_pg_class classform;
+ HeapTuple tuple;
+
+ state = (struct AttachIndexCallbackState *) arg;
+
+ if (!state->lockedParentTbl)
+ {
+ LockRelationOid(state->parentTblOid, AccessShareLock);
+ state->lockedParentTbl = true;
+ }
+
+ /*
+ * If we previously locked some other heap, and the name we're looking up
+ * no longer refers to an index on that relation, release the now-useless
+ * lock. XXX maybe we should do *after* we verify whether the index does
+ * not actually belong to the same relation ...
+ */
+ if (relOid != oldRelOid && OidIsValid(state->partitionOid))
+ {
+ UnlockRelationOid(state->partitionOid, AccessShareLock);
+ state->partitionOid = InvalidOid;
+ }
+
+ /* Didn't find a relation, so no need for locking or permission checks. */
+ if (!OidIsValid(relOid))
+ return;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped, so nothing to do */
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+ if (classform->relkind != RELKIND_PARTITIONED_INDEX &&
+ classform->relkind != RELKIND_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("\"%s\" is not an index", rv->relname)));
+ ReleaseSysCache(tuple);
+
+ /*
+ * Since we need only examine the heap's tupledesc, an access share lock
+ * on it (preventing any DDL) is sufficient.
+ */
+ state->partitionOid = IndexGetRelation(relOid, false);
+ LockRelationOid(state->partitionOid, AccessShareLock);
+}
+
+/*
+ * ALTER INDEX i1 ATTACH PARTITION i2
+ */
+static ObjectAddress
+ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
+{
+ Relation partIdx;
+ Relation partTbl;
+ Relation parentTbl;
+ ObjectAddress address;
+ Oid partIdxId;
+ Oid currParent;
+ struct AttachIndexCallbackState state;
+
+ /*
+ * We need to obtain lock on the index 'name' to modify it, but we also
+ * need to read its owning table's tuple descriptor -- so we need to lock
+ * both. To avoid deadlocks, obtain lock on the table before doing so on
+ * the index. Furthermore, we need to examine the parent table of the
+ * partition, so lock that one too.
+ */
+ state.partitionOid = InvalidOid;
+ state.parentTblOid = parentIdx->rd_index->indrelid;
+ state.lockedParentTbl = false;
+ partIdxId =
+ RangeVarGetRelidExtended(name, AccessExclusiveLock, false, false,
+ RangeVarCallbackForAttachIndex,
+ (void *) &state);
+ /* Not there? */
+ if (!OidIsValid(partIdxId))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("index \"%s\" does not exist", name->relname)));
+
+ /* no deadlock risk: RangeVarGetRelidExtended already acquired the lock */
+ partIdx = relation_open(partIdxId, AccessExclusiveLock);
+
+ /* we already hold locks on both tables, so this is safe: */
+ parentTbl = relation_open(parentIdx->rd_index->indrelid, AccessShareLock);
+ partTbl = relation_open(partIdx->rd_index->indrelid, NoLock);
+
+ ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partIdx));
+
+ /* Silently do nothing if already in the right state */
+ currParent = !has_superclass(partIdxId) ? InvalidOid :
+ get_partition_parent(partIdxId);
+ if (currParent != RelationGetRelid(parentIdx))
+ {
+ IndexInfo *childInfo;
+ IndexInfo *parentInfo;
+ AttrNumber *attmap;
+ bool found;
+ int i;
+ PartitionDesc partDesc;
+
+ /*
+ * If this partition already has an index attached, refuse the operation.
+ */
+ refuseDupeIndexAttach(parentIdx, partIdx, partTbl);
+
+ if (OidIsValid(currParent))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+ RelationGetRelationName(partIdx),
+ RelationGetRelationName(parentIdx)),
+ errdetail("Index \"%s\" is already attached to another index.",
+ RelationGetRelationName(partIdx))));
+
+ /* Make sure it indexes a partition of the other index's table */
+ partDesc = RelationGetPartitionDesc(parentTbl);
+ found = false;
+ for (i = 0; i < partDesc->nparts; i++)
+ {
+ if (partDesc->oids[i] == state.partitionOid)
+ {
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ ereport(ERROR,
+ (errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+ RelationGetRelationName(partIdx),
+ RelationGetRelationName(parentIdx)),
+ errdetail("Index \"%s\" is not an index on any partition of table \"%s\".",
+ RelationGetRelationName(partIdx),
+ RelationGetRelationName(parentTbl))));
+
+ /* Ensure the indexes are compatible */
+ childInfo = BuildIndexInfo(partIdx);
+ parentInfo = BuildIndexInfo(parentIdx);
+ attmap = convert_tuples_by_name_map(RelationGetDescr(partTbl),
+ RelationGetDescr(parentTbl),
+ gettext_noop("could not convert row type"));
+ if (!CompareIndexInfo(childInfo, parentInfo,
+ partIdx->rd_indcollation,
+ parentIdx->rd_indcollation,
+ partIdx->rd_opfamily,
+ parentIdx->rd_opfamily,
+ attmap,
+ RelationGetDescr(partTbl)->natts))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+ RelationGetRelationName(partIdx),
+ RelationGetRelationName(parentIdx)),
+ errdetail("The index definitions do not match.")));
+
+ /* All good -- do it */
+ IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx));
+ pfree(attmap);
+
+ CommandCounterIncrement();
+
+ validatePartitionedIndex(parentIdx, parentTbl);
+ }
+
+ relation_close(parentTbl, AccessShareLock);
+ /* keep these locks till commit */
+ relation_close(partTbl, NoLock);
+ relation_close(partIdx, NoLock);
+
+ return address;
+}
+
+/*
+ * Verify whether the given partition already contains an index attached
+ * to the given partitioned index. If so, raise an error.
+ */
+static void
+refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl)
+{
+ Relation pg_inherits;
+ ScanKeyData key;
+ HeapTuple tuple;
+ SysScanDesc scan;
+
+ pg_inherits = heap_open(InheritsRelationId, AccessShareLock);
+ ScanKeyInit(&key, Anum_pg_inherits_inhparent,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(parentIdx)));
+ scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true,
+ NULL, 1, &key);
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ Form_pg_inherits inhForm;
+ Oid tab;
+
+ inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
+ tab = IndexGetRelation(inhForm->inhrelid, false);
+ if (tab == RelationGetRelid(partitionTbl))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+ RelationGetRelationName(partIdx),
+ RelationGetRelationName(parentIdx)),
+ errdetail("Another index is already attached for partition \"%s\".",
+ RelationGetRelationName(partitionTbl))));
+ }
+
+ systable_endscan(scan);
+ heap_close(pg_inherits, AccessShareLock);
+}
+
+/*
+ * Verify whether the set of attached partition indexes to a parent index on
+ * a partitioned table is complete. If it is, mark the parent index valid.
+ *
+ * This should be called each time a partition index is attached.
+ */
+static void
+validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
+{
+ Relation inheritsRel;
+ SysScanDesc scan;
+ ScanKeyData key;
+ int tuples = 0;
+ HeapTuple inhTup;
+ bool updated = false;
+
+ Assert(partedIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
+
+ /*
+ * Scan pg_inherits for this parent index. Count each valid index we find
+ * (verifying the pg_index entry for each), and if we reach the total
+ * amount we expect, we can mark this parent index as valid.
+ */
+ inheritsRel = heap_open(InheritsRelationId, AccessShareLock);
+ ScanKeyInit(&key, Anum_pg_inherits_inhparent,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(partedIdx)));
+ scan = systable_beginscan(inheritsRel, InheritsParentIndexId, true,
+ NULL, 1, &key);
+ while ((inhTup = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(inhTup);
+ HeapTuple indTup;
+ Form_pg_index indexForm;
+
+ indTup = SearchSysCache1(INDEXRELID,
+ ObjectIdGetDatum(inhForm->inhrelid));
+ if (!indTup)
+ elog(ERROR, "cache lookup failed for index %u",
+ inhForm->inhrelid);
+ indexForm = (Form_pg_index) GETSTRUCT(indTup);
+ if (IndexIsValid(indexForm))
+ tuples += 1;
+ ReleaseSysCache(indTup);
+ }
+
+ /* Done with pg_inherits */
+ systable_endscan(scan);
+ heap_close(inheritsRel, AccessShareLock);
+
+ /*
+ * If we found as many inherited indexes as the partitioned table has
+ * partitions, we're good; update pg_index to set indisvalid.
+ */
+ if (tuples == RelationGetPartitionDesc(partedTbl)->nparts)
+ {
+ Relation idxRel;
+ HeapTuple newtup;
+
+ idxRel = heap_open(IndexRelationId, RowExclusiveLock);
+
+ newtup = heap_copytuple(partedIdx->rd_indextuple);
+ ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = true;
+ updated = true;
+
+ CatalogTupleUpdate(idxRel, &partedIdx->rd_indextuple->t_self, newtup);
+
+ heap_close(idxRel, RowExclusiveLock);
+ }
+
+ /*
+ * If this index is in turn a partition of a larger index, validating it
+ * might cause the parent to become valid also. Try that.
+ */
+ if (updated &&
+ has_superclass(RelationGetRelid(partedIdx)))
+ {
+ Oid parentIdxId,
+ parentTblId;
+ Relation parentIdx,
+ parentTbl;
+
+ /* make sure we see the validation we just did */
+ CommandCounterIncrement();
+
+ parentIdxId = get_partition_parent(RelationGetRelid(partedIdx));
+ parentTblId = get_partition_parent(RelationGetRelid(partedTbl));
+ parentIdx = relation_open(parentIdxId, AccessExclusiveLock);
+ parentTbl = relation_open(parentTblId, AccessExclusiveLock);
+ Assert(!parentIdx->rd_index->indisvalid);
+
+ validatePartitionedIndex(parentIdx, parentTbl);
+
+ relation_close(parentIdx, AccessExclusiveLock);
+ relation_close(parentTbl, AccessExclusiveLock);
+ }
+}