diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 653 |
1 files changed, 592 insertions, 61 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 59806349cc7..57ee112653f 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -266,6 +266,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = { gettext_noop("table \"%s\" does not exist, skipping"), gettext_noop("\"%s\" is not a table"), gettext_noop("Use DROP TABLE to remove a table.")}, + {RELKIND_PARTITIONED_INDEX, + ERRCODE_UNDEFINED_OBJECT, + gettext_noop("index \"%s\" does not exist"), + gettext_noop("index \"%s\" does not exist, skipping"), + gettext_noop("\"%s\" is not an index"), + gettext_noop("Use DROP INDEX to remove an index.")}, {'\0', 0, NULL, NULL, NULL, NULL} }; @@ -284,6 +290,7 @@ struct DropRelationCallbackState #define ATT_INDEX 0x0008 #define ATT_COMPOSITE_TYPE 0x0010 #define ATT_FOREIGN_TABLE 0x0020 +#define ATT_PARTITIONED_INDEX 0x0040 /* * Partition tables are expected to be dropped when the parent partitioned @@ -475,11 +482,17 @@ static void CreateInheritance(Relation child_rel, Relation parent_rel); static void RemoveInheritance(Relation child_rel, Relation parent_rel); static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd); +static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel); static void ValidatePartitionConstraints(List **wqueue, Relation scanrel, List *scanrel_children, List *partConstraint, bool validate_default); static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name); +static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation rel, + RangeVar *name); +static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl); +static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, + Relation partitionTbl); /* ---------------------------------------------------------------- @@ -897,6 +910,53 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, StorePartitionKey(rel, strategy, partnatts, partattrs, partexprs, partopclass, partcollation); + + /* make it all visible */ + CommandCounterIncrement(); + } + + /* + * If we're creating a partition, create now all the indexes defined in + * the parent. We can't do it earlier, because DefineIndex wants to know + * the partition key which we just stored. + */ + if (stmt->partbound) + { + Oid parentId = linitial_oid(inheritOids); + Relation parent; + List *idxlist; + ListCell *cell; + + /* Already have strong enough lock on the parent */ + parent = heap_open(parentId, NoLock); + idxlist = RelationGetIndexList(parent); + + /* + * For each index in the parent table, create one in the partition + */ + foreach(cell, idxlist) + { + Relation idxRel = index_open(lfirst_oid(cell), AccessShareLock); + AttrNumber *attmap; + IndexStmt *idxstmt; + + attmap = convert_tuples_by_name_map(RelationGetDescr(rel), + RelationGetDescr(parent), + gettext_noop("could not convert row type")); + idxstmt = + generateClonedIndexStmt(NULL, RelationGetRelid(rel), idxRel, + attmap, RelationGetDescr(rel)->natts); + DefineIndex(RelationGetRelid(rel), + idxstmt, + InvalidOid, + RelationGetRelid(idxRel), + false, false, false, false, false); + + index_close(idxRel, AccessShareLock); + } + + list_free(idxlist); + heap_close(parent, NoLock); } /* @@ -1179,10 +1239,13 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, * but RemoveRelations() can only pass one relkind for a given relation. * It chooses RELKIND_RELATION for both regular and partitioned tables. * That means we must be careful before giving the wrong type error when - * the relation is RELKIND_PARTITIONED_TABLE. + * the relation is RELKIND_PARTITIONED_TABLE. An equivalent problem + * exists with indexes. */ if (classform->relkind == RELKIND_PARTITIONED_TABLE) expected_relkind = RELKIND_RELATION; + else if (classform->relkind == RELKIND_PARTITIONED_INDEX) + expected_relkind = RELKIND_INDEX; else expected_relkind = classform->relkind; @@ -1210,7 +1273,8 @@ RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, * we do it the other way around. No error if we don't find a pg_index * entry, though --- the relation may have been dropped. */ - if (relkind == RELKIND_INDEX && relOid != oldRelOid) + if ((relkind == RELKIND_INDEX || relkind == RELKIND_PARTITIONED_INDEX) && + relOid != oldRelOid) { state->heapOid = IndexGetRelation(relOid, true); if (OidIsValid(state->heapOid)) @@ -2396,27 +2460,11 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid, int32 seqNumber, Relation inhRelation, bool child_is_partition) { - TupleDesc desc = RelationGetDescr(inhRelation); - Datum values[Natts_pg_inherits]; - bool nulls[Natts_pg_inherits]; ObjectAddress childobject, parentobject; - HeapTuple tuple; - - /* - * Make the pg_inherits entry - */ - values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId); - values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid); - values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber); - memset(nulls, 0, sizeof(nulls)); - - tuple = heap_form_tuple(desc, values, nulls); - - CatalogTupleInsert(inhRelation, tuple); - - heap_freetuple(tuple); + /* store the pg_inherits row */ + StoreSingleInheritance(relationId, parentOid, seqNumber); /* * Store a dependency too @@ -2540,6 +2588,7 @@ renameatt_check(Oid myrelid, Form_pg_class classform, bool recursing) relkind != RELKIND_MATVIEW && relkind != RELKIND_COMPOSITE_TYPE && relkind != RELKIND_INDEX && + relkind != RELKIND_PARTITIONED_INDEX && relkind != RELKIND_FOREIGN_TABLE && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, @@ -3019,7 +3068,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal) /* * Also rename the associated constraint, if any. */ - if (targetrelation->rd_rel->relkind == RELKIND_INDEX) + if (targetrelation->rd_rel->relkind == RELKIND_INDEX || + targetrelation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) { Oid constraintId = get_index_constraint(myrelid); @@ -3073,6 +3123,7 @@ CheckTableNotInUse(Relation rel, const char *stmt) stmt, RelationGetRelationName(rel)))); if (rel->rd_rel->relkind != RELKIND_INDEX && + rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX && AfterTriggerPendingOnRel(RelationGetRelid(rel))) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), @@ -3764,6 +3815,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_MISC; break; case AT_AttachPartition: + ATSimplePermissions(rel, ATT_TABLE | ATT_PARTITIONED_INDEX); + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; case AT_DetachPartition: ATSimplePermissions(rel, ATT_TABLE); /* No command-specific prep needed */ @@ -4112,9 +4167,15 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ATExecGenericOptions(rel, (List *) cmd->def); break; case AT_AttachPartition: - ATExecAttachPartition(wqueue, rel, (PartitionCmd *) cmd->def); + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ATExecAttachPartition(wqueue, rel, (PartitionCmd *) cmd->def); + else + ATExecAttachPartitionIdx(wqueue, rel, + ((PartitionCmd *) cmd->def)->name); break; case AT_DetachPartition: + /* ATPrepCmd ensures it must be a table */ + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); ATExecDetachPartition(rel, ((PartitionCmd *) cmd->def)->name); break; default: /* oops */ @@ -4148,9 +4209,13 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode) { AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); - /* Foreign tables have no storage, nor do partitioned tables. */ + /* + * Foreign tables have no storage, nor do partitioned tables and + * indexes. + */ if (tab->relkind == RELKIND_FOREIGN_TABLE || - tab->relkind == RELKIND_PARTITIONED_TABLE) + tab->relkind == RELKIND_PARTITIONED_TABLE || + tab->relkind == RELKIND_PARTITIONED_INDEX) continue; /* @@ -4752,6 +4817,9 @@ ATSimplePermissions(Relation rel, int allowed_targets) case RELKIND_INDEX: actual_target = ATT_INDEX; break; + case RELKIND_PARTITIONED_INDEX: + actual_target = ATT_PARTITIONED_INDEX; + break; case RELKIND_COMPOSITE_TYPE: actual_target = ATT_COMPOSITE_TYPE; break; @@ -6194,6 +6262,7 @@ ATPrepSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa if (rel->rd_rel->relkind != RELKIND_RELATION && rel->rd_rel->relkind != RELKIND_MATVIEW && rel->rd_rel->relkind != RELKIND_INDEX && + rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX && rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, @@ -6205,7 +6274,9 @@ ATPrepSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa * We allow referencing columns by numbers only for indexes, since table * column numbers could contain gaps if columns are later dropped. */ - if (rel->rd_rel->relkind != RELKIND_INDEX && !colName) + if (rel->rd_rel->relkind != RELKIND_INDEX && + rel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX && + !colName) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot refer to non-index column by number"))); @@ -6283,7 +6354,8 @@ ATExecSetStatistics(Relation rel, const char *colName, int16 colNum, Node *newVa errmsg("cannot alter system column \"%s\"", colName))); - if (rel->rd_rel->relkind == RELKIND_INDEX && + if ((rel->rd_rel->relkind == RELKIND_INDEX || + rel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) && rel->rd_index->indkey.values[attnum - 1] != 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -6736,6 +6808,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, address = DefineIndex(RelationGetRelid(rel), stmt, InvalidOid, /* no predefined OID */ + InvalidOid, /* no parent index */ true, /* is_alter_table */ check_rights, false, /* check_not_in_use - we did it already */ @@ -9139,7 +9212,8 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, { char relKind = get_rel_relkind(foundObject.objectId); - if (relKind == RELKIND_INDEX) + if (relKind == RELKIND_INDEX || + relKind == RELKIND_PARTITIONED_INDEX) { Assert(foundObject.objectSubId == 0); if (!list_member_oid(tab->changedIndexOids, foundObject.objectId)) @@ -9982,6 +10056,15 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock newOwnerId = tuple_class->relowner; } break; + case RELKIND_PARTITIONED_INDEX: + if (recursing) + break; + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot change owner of index \"%s\"", + NameStr(tuple_class->relname)), + errhint("Change the ownership of the index's table, instead."))); + break; case RELKIND_SEQUENCE: if (!recursing && tuple_class->relowner != newOwnerId) @@ -10103,6 +10186,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock */ if (tuple_class->relkind != RELKIND_COMPOSITE_TYPE && tuple_class->relkind != RELKIND_INDEX && + tuple_class->relkind != RELKIND_PARTITIONED_INDEX && tuple_class->relkind != RELKIND_TOASTVALUE) changeDependencyOnOwner(RelationRelationId, relationOid, newOwnerId); @@ -10110,7 +10194,8 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock /* * Also change the ownership of the table's row type, if it has one */ - if (tuple_class->relkind != RELKIND_INDEX) + if (tuple_class->relkind != RELKIND_INDEX && + tuple_class->relkind != RELKIND_PARTITIONED_INDEX) AlterTypeOwnerInternal(tuple_class->reltype, newOwnerId); /* @@ -10119,6 +10204,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock * relation, as well as its toast table (if it has one). */ if (tuple_class->relkind == RELKIND_RELATION || + tuple_class->relkind == RELKIND_PARTITIONED_TABLE || tuple_class->relkind == RELKIND_MATVIEW || tuple_class->relkind == RELKIND_TOASTVALUE) { @@ -10427,6 +10513,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, (void) view_reloptions(newOptions, true); break; case RELKIND_INDEX: + case RELKIND_PARTITIONED_INDEX: (void) index_reloptions(rel->rd_amroutine->amoptions, newOptions, true); break; default: @@ -10839,7 +10926,8 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt) relForm->relkind != RELKIND_RELATION && relForm->relkind != RELKIND_PARTITIONED_TABLE) || (stmt->objtype == OBJECT_INDEX && - relForm->relkind != RELKIND_INDEX) || + relForm->relkind != RELKIND_INDEX && + relForm->relkind != RELKIND_PARTITIONED_INDEX) || (stmt->objtype == OBJECT_MATVIEW && relForm->relkind != RELKIND_MATVIEW)) continue; @@ -11633,45 +11721,18 @@ RemoveInheritance(Relation child_rel, Relation parent_rel) Relation catalogRelation; SysScanDesc scan; ScanKeyData key[3]; - HeapTuple inheritsTuple, - attributeTuple, + HeapTuple attributeTuple, constraintTuple; List *connames; - bool found = false; + bool found; bool child_is_partition = false; /* If parent_rel is a partitioned table, child_rel must be a partition */ if (parent_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) child_is_partition = true; - /* - * Find and destroy the pg_inherits entry linking the two, or error out if - * there is none. - */ - catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); - ScanKeyInit(&key[0], - Anum_pg_inherits_inhrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(child_rel))); - scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, - true, NULL, 1, key); - - while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) - { - Oid inhparent; - - inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent; - if (inhparent == RelationGetRelid(parent_rel)) - { - CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self); - found = true; - break; - } - } - - systable_endscan(scan); - heap_close(catalogRelation, RowExclusiveLock); - + found = DeleteInheritsTuple(RelationGetRelid(child_rel), + RelationGetRelid(parent_rel)); if (!found) { if (child_is_partition) @@ -13226,7 +13287,8 @@ RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a composite type", rv->relname))); - if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX + if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX && + relkind != RELKIND_PARTITIONED_INDEX && !IsA(stmt, RenameStmt)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -13946,6 +14008,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) /* Update the pg_class entry. */ StorePartitionBound(attachrel, rel, cmd->bound); + /* Ensure there exists a correct set of indexes in the partition. */ + AttachPartitionEnsureIndexes(rel, attachrel); + /* * Generate partition constraint from the partition bound specification. * If the parent itself is a partition, make sure to include its @@ -14016,6 +14081,127 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) } /* + * AttachPartitionEnsureIndexes + * subroutine for ATExecAttachPartition to create/match indexes + * + * Enforce the indexing rule for partitioned tables during ALTER TABLE / ATTACH + * PARTITION: every partition must have an index attached to each index on the + * partitioned table. + */ +static void +AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) +{ + List *idxes; + List *attachRelIdxs; + Relation *attachrelIdxRels; + IndexInfo **attachInfos; + int i; + ListCell *cell; + MemoryContext cxt; + MemoryContext oldcxt; + + cxt = AllocSetContextCreate(CurrentMemoryContext, + "AttachPartitionEnsureIndexes", + ALLOCSET_DEFAULT_SIZES); + oldcxt = MemoryContextSwitchTo(cxt); + + idxes = RelationGetIndexList(rel); + attachRelIdxs = RelationGetIndexList(attachrel); + attachrelIdxRels = palloc(sizeof(Relation) * list_length(attachRelIdxs)); + attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs)); + + /* Build arrays of all existing indexes and their IndexInfos */ + i = 0; + foreach(cell, attachRelIdxs) + { + Oid cldIdxId = lfirst_oid(cell); + + attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock); + attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]); + i++; + } + + /* + * For each index on the partitioned table, find a matching one in the + * partition-to-be; if one is not found, create one. + */ + foreach(cell, idxes) + { + Oid idx = lfirst_oid(cell); + Relation idxRel = index_open(idx, AccessShareLock); + IndexInfo *info; + AttrNumber *attmap; + bool found = false; + + /* + * Ignore indexes in the partitioned table other than partitioned + * indexes. + */ + if (idxRel->rd_rel->relkind != RELKIND_PARTITIONED_INDEX) + { + index_close(idxRel, AccessShareLock); + continue; + } + + /* construct an indexinfo to compare existing indexes against */ + info = BuildIndexInfo(idxRel); + attmap = convert_tuples_by_name_map(RelationGetDescr(attachrel), + RelationGetDescr(rel), + gettext_noop("could not convert row type")); + + /* + * Scan the list of existing indexes in the partition-to-be, and mark + * the first matching, unattached one we find, if any, as partition of + * the parent index. If we find one, we're done. + */ + for (i = 0; i < list_length(attachRelIdxs); i++) + { + /* does this index have a parent? if so, can't use it */ + if (has_superclass(RelationGetRelid(attachrelIdxRels[i]))) + continue; + + if (CompareIndexInfo(attachInfos[i], info, + attachrelIdxRels[i]->rd_indcollation, + idxRel->rd_indcollation, + attachrelIdxRels[i]->rd_opfamily, + idxRel->rd_opfamily, + attmap, + RelationGetDescr(rel)->natts)) + { + /* bingo. */ + IndexSetParentIndex(attachrelIdxRels[i], idx); + found = true; + break; + } + } + + /* + * If no suitable index was found in the partition-to-be, create one + * now. + */ + if (!found) + { + IndexStmt *stmt; + + stmt = generateClonedIndexStmt(NULL, RelationGetRelid(attachrel), + idxRel, attmap, + RelationGetDescr(rel)->natts); + DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid, + RelationGetRelid(idxRel), + false, false, false, false, false); + } + + index_close(idxRel, AccessShareLock); + } + + /* Clean up. */ + for (i = 0; i < list_length(attachRelIdxs); i++) + index_close(attachrelIdxRels[i], AccessShareLock); + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(cxt); +} + +/* * ALTER TABLE DETACH PARTITION * * Return the address of the relation that is no longer a partition of rel. @@ -14033,6 +14219,8 @@ ATExecDetachPartition(Relation rel, RangeVar *name) new_repl[Natts_pg_class]; ObjectAddress address; Oid defaultPartOid; + List *indexes; + ListCell *cell; /* * We must lock the default partition, because detaching this partition @@ -14094,6 +14282,24 @@ ATExecDetachPartition(Relation rel, RangeVar *name) } } + /* detach indexes too */ + indexes = RelationGetIndexList(partRel); + foreach(cell, indexes) + { + Oid idxid = lfirst_oid(cell); + Relation idx; + + if (!has_superclass(idxid)) + continue; + + Assert((IndexGetRelation(get_partition_parent(idxid), false) == + RelationGetRelid(rel))); + + idx = index_open(idxid, AccessExclusiveLock); + IndexSetParentIndex(idx, InvalidOid); + relation_close(idx, AccessExclusiveLock); + } + /* * Invalidate the parent's relcache so that the partition is no longer * included in its partition descriptor. @@ -14107,3 +14313,328 @@ ATExecDetachPartition(Relation rel, RangeVar *name) return address; } + +/* + * Before acquiring lock on an index, acquire the same lock on the owning + * table. + */ +struct AttachIndexCallbackState +{ + Oid partitionOid; + Oid parentTblOid; + bool lockedParentTbl; +}; + +static void +RangeVarCallbackForAttachIndex(const RangeVar *rv, Oid relOid, Oid oldRelOid, + void *arg) +{ + struct AttachIndexCallbackState *state; + Form_pg_class classform; + HeapTuple tuple; + + state = (struct AttachIndexCallbackState *) arg; + + if (!state->lockedParentTbl) + { + LockRelationOid(state->parentTblOid, AccessShareLock); + state->lockedParentTbl = true; + } + + /* + * If we previously locked some other heap, and the name we're looking up + * no longer refers to an index on that relation, release the now-useless + * lock. XXX maybe we should do *after* we verify whether the index does + * not actually belong to the same relation ... + */ + if (relOid != oldRelOid && OidIsValid(state->partitionOid)) + { + UnlockRelationOid(state->partitionOid, AccessShareLock); + state->partitionOid = InvalidOid; + } + + /* Didn't find a relation, so no need for locking or permission checks. */ + if (!OidIsValid(relOid)) + return; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); + if (!HeapTupleIsValid(tuple)) + return; /* concurrently dropped, so nothing to do */ + classform = (Form_pg_class) GETSTRUCT(tuple); + if (classform->relkind != RELKIND_PARTITIONED_INDEX && + classform->relkind != RELKIND_INDEX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("\"%s\" is not an index", rv->relname))); + ReleaseSysCache(tuple); + + /* + * Since we need only examine the heap's tupledesc, an access share lock + * on it (preventing any DDL) is sufficient. + */ + state->partitionOid = IndexGetRelation(relOid, false); + LockRelationOid(state->partitionOid, AccessShareLock); +} + +/* + * ALTER INDEX i1 ATTACH PARTITION i2 + */ +static ObjectAddress +ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) +{ + Relation partIdx; + Relation partTbl; + Relation parentTbl; + ObjectAddress address; + Oid partIdxId; + Oid currParent; + struct AttachIndexCallbackState state; + + /* + * We need to obtain lock on the index 'name' to modify it, but we also + * need to read its owning table's tuple descriptor -- so we need to lock + * both. To avoid deadlocks, obtain lock on the table before doing so on + * the index. Furthermore, we need to examine the parent table of the + * partition, so lock that one too. + */ + state.partitionOid = InvalidOid; + state.parentTblOid = parentIdx->rd_index->indrelid; + state.lockedParentTbl = false; + partIdxId = + RangeVarGetRelidExtended(name, AccessExclusiveLock, false, false, + RangeVarCallbackForAttachIndex, + (void *) &state); + /* Not there? */ + if (!OidIsValid(partIdxId)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("index \"%s\" does not exist", name->relname))); + + /* no deadlock risk: RangeVarGetRelidExtended already acquired the lock */ + partIdx = relation_open(partIdxId, AccessExclusiveLock); + + /* we already hold locks on both tables, so this is safe: */ + parentTbl = relation_open(parentIdx->rd_index->indrelid, AccessShareLock); + partTbl = relation_open(partIdx->rd_index->indrelid, NoLock); + + ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partIdx)); + + /* Silently do nothing if already in the right state */ + currParent = !has_superclass(partIdxId) ? InvalidOid : + get_partition_parent(partIdxId); + if (currParent != RelationGetRelid(parentIdx)) + { + IndexInfo *childInfo; + IndexInfo *parentInfo; + AttrNumber *attmap; + bool found; + int i; + PartitionDesc partDesc; + + /* + * If this partition already has an index attached, refuse the operation. + */ + refuseDupeIndexAttach(parentIdx, partIdx, partTbl); + + if (OidIsValid(currParent)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentIdx)), + errdetail("Index \"%s\" is already attached to another index.", + RelationGetRelationName(partIdx)))); + + /* Make sure it indexes a partition of the other index's table */ + partDesc = RelationGetPartitionDesc(parentTbl); + found = false; + for (i = 0; i < partDesc->nparts; i++) + { + if (partDesc->oids[i] == state.partitionOid) + { + found = true; + break; + } + } + if (!found) + ereport(ERROR, + (errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentIdx)), + errdetail("Index \"%s\" is not an index on any partition of table \"%s\".", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentTbl)))); + + /* Ensure the indexes are compatible */ + childInfo = BuildIndexInfo(partIdx); + parentInfo = BuildIndexInfo(parentIdx); + attmap = convert_tuples_by_name_map(RelationGetDescr(partTbl), + RelationGetDescr(parentTbl), + gettext_noop("could not convert row type")); + if (!CompareIndexInfo(childInfo, parentInfo, + partIdx->rd_indcollation, + parentIdx->rd_indcollation, + partIdx->rd_opfamily, + parentIdx->rd_opfamily, + attmap, + RelationGetDescr(partTbl)->natts)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentIdx)), + errdetail("The index definitions do not match."))); + + /* All good -- do it */ + IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx)); + pfree(attmap); + + CommandCounterIncrement(); + + validatePartitionedIndex(parentIdx, parentTbl); + } + + relation_close(parentTbl, AccessShareLock); + /* keep these locks till commit */ + relation_close(partTbl, NoLock); + relation_close(partIdx, NoLock); + + return address; +} + +/* + * Verify whether the given partition already contains an index attached + * to the given partitioned index. If so, raise an error. + */ +static void +refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl) +{ + Relation pg_inherits; + ScanKeyData key; + HeapTuple tuple; + SysScanDesc scan; + + pg_inherits = heap_open(InheritsRelationId, AccessShareLock); + ScanKeyInit(&key, Anum_pg_inherits_inhparent, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(parentIdx))); + scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true, + NULL, 1, &key); + while (HeapTupleIsValid(tuple = systable_getnext(scan))) + { + Form_pg_inherits inhForm; + Oid tab; + + inhForm = (Form_pg_inherits) GETSTRUCT(tuple); + tab = IndexGetRelation(inhForm->inhrelid, false); + if (tab == RelationGetRelid(partitionTbl)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentIdx)), + errdetail("Another index is already attached for partition \"%s\".", + RelationGetRelationName(partitionTbl)))); + } + + systable_endscan(scan); + heap_close(pg_inherits, AccessShareLock); +} + +/* + * Verify whether the set of attached partition indexes to a parent index on + * a partitioned table is complete. If it is, mark the parent index valid. + * + * This should be called each time a partition index is attached. + */ +static void +validatePartitionedIndex(Relation partedIdx, Relation partedTbl) +{ + Relation inheritsRel; + SysScanDesc scan; + ScanKeyData key; + int tuples = 0; + HeapTuple inhTup; + bool updated = false; + + Assert(partedIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX); + + /* + * Scan pg_inherits for this parent index. Count each valid index we find + * (verifying the pg_index entry for each), and if we reach the total + * amount we expect, we can mark this parent index as valid. + */ + inheritsRel = heap_open(InheritsRelationId, AccessShareLock); + ScanKeyInit(&key, Anum_pg_inherits_inhparent, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(partedIdx))); + scan = systable_beginscan(inheritsRel, InheritsParentIndexId, true, + NULL, 1, &key); + while ((inhTup = systable_getnext(scan)) != NULL) + { + Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(inhTup); + HeapTuple indTup; + Form_pg_index indexForm; + + indTup = SearchSysCache1(INDEXRELID, + ObjectIdGetDatum(inhForm->inhrelid)); + if (!indTup) + elog(ERROR, "cache lookup failed for index %u", + inhForm->inhrelid); + indexForm = (Form_pg_index) GETSTRUCT(indTup); + if (IndexIsValid(indexForm)) + tuples += 1; + ReleaseSysCache(indTup); + } + + /* Done with pg_inherits */ + systable_endscan(scan); + heap_close(inheritsRel, AccessShareLock); + + /* + * If we found as many inherited indexes as the partitioned table has + * partitions, we're good; update pg_index to set indisvalid. + */ + if (tuples == RelationGetPartitionDesc(partedTbl)->nparts) + { + Relation idxRel; + HeapTuple newtup; + + idxRel = heap_open(IndexRelationId, RowExclusiveLock); + + newtup = heap_copytuple(partedIdx->rd_indextuple); + ((Form_pg_index) GETSTRUCT(newtup))->indisvalid = true; + updated = true; + + CatalogTupleUpdate(idxRel, &partedIdx->rd_indextuple->t_self, newtup); + + heap_close(idxRel, RowExclusiveLock); + } + + /* + * If this index is in turn a partition of a larger index, validating it + * might cause the parent to become valid also. Try that. + */ + if (updated && + has_superclass(RelationGetRelid(partedIdx))) + { + Oid parentIdxId, + parentTblId; + Relation parentIdx, + parentTbl; + + /* make sure we see the validation we just did */ + CommandCounterIncrement(); + + parentIdxId = get_partition_parent(RelationGetRelid(partedIdx)); + parentTblId = get_partition_parent(RelationGetRelid(partedTbl)); + parentIdx = relation_open(parentIdxId, AccessExclusiveLock); + parentTbl = relation_open(parentTblId, AccessExclusiveLock); + Assert(!parentIdx->rd_index->indisvalid); + + validatePartitionedIndex(parentIdx, parentTbl); + + relation_close(parentIdx, AccessExclusiveLock); + relation_close(parentTbl, AccessExclusiveLock); + } +} |