diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 1326 |
1 files changed, 329 insertions, 997 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 03466e495ac..d9bbeafd82c 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -203,7 +203,7 @@ typedef struct AlteredTableInfo typedef struct NewConstraint { char *name; /* Constraint name, or NULL if none */ - ConstrType contype; /* CHECK, FOREIGN */ + ConstrType contype; /* CHECK or FOREIGN */ Oid refrelid; /* PK rel, if FOREIGN */ Oid refindid; /* OID of PK's index, if FOREIGN */ Oid conid; /* OID of pg_constraint entry, if FOREIGN */ @@ -350,8 +350,7 @@ static void truncate_check_activity(Relation rel); static void RangeVarCallbackForTruncate(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); static List *MergeAttributes(List *schema, List *supers, char relpersistence, - bool is_partition, List **supconstr, - List **additional_notnulls); + bool is_partition, List **supconstr); static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); @@ -432,16 +431,14 @@ static bool check_for_column_name_collision(Relation rel, const char *colname, bool if_not_exists); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid); -static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse, - LOCKMODE lockmode); -static void set_attnotnull(List **wqueue, Relation rel, - AttrNumber attnum, bool recurse, LOCKMODE lockmode); -static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel, - char *constrname, char *colName, - bool recurse, bool recursing, - List **readyRels, LOCKMODE lockmode); -static void ATExecSetAttNotNull(List **wqueue, Relation rel, - const char *colName, LOCKMODE lockmode); +static void ATPrepDropNotNull(Relation rel, bool recurse, bool recursing); +static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode); +static void ATPrepSetNotNull(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse, bool recursing, + LOCKMODE lockmode, + AlterTableUtilityContext *context); +static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode); static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, const char *colName, LOCKMODE lockmode); static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr); @@ -544,11 +541,6 @@ static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode); -static ObjectAddress dropconstraint_internal(Relation rel, - HeapTuple constraintTup, DropBehavior behavior, - bool recurse, bool recursing, - bool missing_ok, List **readyRels, - LOCKMODE lockmode); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -624,7 +616,7 @@ static void RemoveInheritance(Relation child_rel, Relation parent_rel, static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, AlterTableUtilityContext *context); -static void AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel); +static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel); static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel, List *partConstraint, bool validate_default); @@ -642,7 +634,6 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl); static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl); -static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx); static List *GetParentedForeignKeyRefs(Relation partition); static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, char *compression); @@ -680,10 +671,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, TupleDesc descriptor; List *inheritOids; List *old_constraints; - List *old_notnulls; List *rawDefaults; List *cookedDefaults; - List *nncols; Datum reloptions; ListCell *listptr; AttrNumber attnum; @@ -873,13 +862,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, MergeAttributes(stmt->tableElts, inheritOids, stmt->relation->relpersistence, stmt->partbound != NULL, - &old_constraints, &old_notnulls); + &old_constraints); /* * Create a tuple descriptor from the relation schema. Note that this - * deals with column names, types, and in-descriptor NOT NULL flags, but - * not default values, NOT NULL or CHECK constraints; we handle those - * below. + * deals with column names, types, and NOT NULL constraints, but not + * default values or CHECK constraints; we handle those below. */ descriptor = BuildDescForRelation(stmt->tableElts); @@ -1262,17 +1250,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, AddRelationNewConstraints(rel, NIL, stmt->constraints, true, true, false, queryString); - /* - * Finally, merge the NOT NULL constraints that are directly declared with - * those that come from parent relations (making sure to count inheritance - * appropriately for each), create them, and set the attnotnull flag on - * columns that don't yet have it. - */ - nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints, - old_notnulls); - foreach(listptr, nncols) - set_attnotnull(NULL, rel, lfirst_int(listptr), false, NoLock); - ObjectAddressSet(address, RelationRelationId, relationId); /* @@ -2321,8 +2298,6 @@ storage_name(char c) * Output arguments: * 'supconstr' receives a list of constraints belonging to the parents, * updated as necessary to be valid for the child. - * 'nnconstraints' receives a list of CookedConstraints that corresponds to - * constraints coming from inheritance parents. * * Return value: * Completed schema list. @@ -2353,10 +2328,7 @@ storage_name(char c) * * Constraints (including NOT NULL constraints) for the child table * are the union of all relevant constraints, from both the child schema - * and parent tables. In addition, in legacy inheritance, each column that - * appears in a primary key in any of the parents also gets a NOT NULL - * constraint (partitioning doesn't need this, because the PK itself gets - * inherited.) + * and parent tables. * * The default value for a child column is defined as: * (1) If the child schema specifies a default, that value is used. @@ -2375,11 +2347,10 @@ storage_name(char c) */ static List * MergeAttributes(List *schema, List *supers, char relpersistence, - bool is_partition, List **supconstr, List **supnotnulls) + bool is_partition, List **supconstr) { List *inhSchema = NIL; List *constraints = NIL; - List *nnconstraints = NIL; bool have_bogus_defaults = false; int child_attno; static Node bogus_marker = {0}; /* marks conflicting defaults */ @@ -2491,13 +2462,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence, AttrMap *newattmap; List *inherited_defaults; List *cols_with_defaults; - List *nnconstrs; AttrNumber parent_attno; ListCell *lc1; ListCell *lc2; - Bitmapset *pkattrs; - Bitmapset *nncols = NULL; - /* caller already got lock */ relation = table_open(parent, NoLock); @@ -2586,20 +2553,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* We can't process inherited defaults until newattmap is complete. */ inherited_defaults = cols_with_defaults = NIL; - /* - * All columns that are part of the parent's primary key need to be - * NOT NULL; if partition just the attnotnull bit, otherwise a full - * constraint (if they don't have one already). Also, we request - * attnotnull on columns that have a NOT NULL constraint that's not - * marked NO INHERIT. - */ - pkattrs = RelationGetIndexAttrBitmap(relation, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - nnconstrs = RelationGetNotNullConstraints(relation, true); - foreach(lc1, nnconstrs) - nncols = bms_add_member(nncols, - ((CookedConstraint *) lfirst(lc1))->attnum); - for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) { @@ -2695,38 +2648,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } /* - * In regular inheritance, columns in the parent's primary key - * get an extra CHECK (NOT NULL) constraint. Partitioning - * doesn't need this, because the PK itself is going to be - * cloned to the partition. - */ - if (!is_partition && - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - { - CookedConstraint *nn; - - nn = palloc(sizeof(CookedConstraint)); - nn->contype = CONSTR_NOTNULL; - nn->conoid = InvalidOid; - nn->name = NULL; - nn->attnum = exist_attno; - nn->expr = NULL; - nn->skip_validation = false; - nn->is_local = false; - nn->inhcount = 1; - nn->is_no_inherit = false; - - nnconstraints = lappend(nnconstraints, nn); - } - - /* - * mark attnotnull if parent has it and it's not NO INHERIT + * Merge of NOT NULL constraints = OR 'em together */ - if (bms_is_member(parent_attno, nncols) || - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - def->is_not_null = true; + def->is_not_null |= attribute->attnotnull; /* * Check for GENERATED conflicts @@ -2760,11 +2684,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, attribute->atttypmod); def->inhcount = 1; def->is_local = false; - /* mark attnotnull if parent has it and it's not NO INHERIT */ - if (bms_is_member(parent_attno, nncols) || - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - def->is_not_null = true; + def->is_not_null = attribute->attnotnull; def->is_from_type = false; def->storage = attribute->attstorage; def->raw_default = NULL; @@ -2781,33 +2701,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, def->compression = NULL; inhSchema = lappend(inhSchema, def); newattmap->attnums[parent_attno - 1] = ++child_attno; - - /* - * In regular inheritance, columns in the parent's primary key - * get an extra NOT NULL constraint. Partitioning doesn't - * need this, because the PK itself is going to be cloned to - * the partition. - */ - if (!is_partition && - bms_is_member(parent_attno - - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - { - CookedConstraint *nn; - - nn = palloc(sizeof(CookedConstraint)); - nn->contype = CONSTR_NOTNULL; - nn->conoid = InvalidOid; - nn->name = NULL; - nn->attnum = newattmap->attnums[parent_attno - 1]; - nn->expr = NULL; - nn->skip_validation = false; - nn->is_local = false; - nn->inhcount = 1; - nn->is_no_inherit = false; - - nnconstraints = lappend(nnconstraints, nn); - } } /* @@ -2952,19 +2845,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } } - /* - * Also copy the NOT NULL constraints from this parent. The - * attnotnull markings were already installed above. - */ - foreach(lc1, nnconstrs) - { - CookedConstraint *nn = lfirst(lc1); - - nn->attnum = newattmap->attnums[nn->attnum - 1]; - - nnconstraints = lappend(nnconstraints, nn); - } - free_attrmap(newattmap); /* @@ -3171,7 +3051,8 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* * Now that we have the column definition list for a partition, we can * check whether the columns referenced in the column constraint specs - * actually exist. + * actually exist. Also, we merge parent's NOT NULL constraints and + * defaults into each corresponding column definition. */ if (is_partition) { @@ -3277,8 +3158,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } *supconstr = constraints; - *supnotnulls = nnconstraints; - return schema; } @@ -3330,85 +3209,6 @@ MergeCheckConstraint(List *constraints, char *name, Node *expr) return false; } -/* - * RelationGetNotNullConstraints -- get list of NOT NULL constraints - * - * Caller can request cooked constraints, or raw. - * - * This is seldom needed, so we just scan pg_constraint each time. - * - * XXX This is only used to create derived tables, so NO INHERIT constraints - * are always skipped. - */ -List * -RelationGetNotNullConstraints(Relation relation, bool cooked) -{ - List *notnulls = NIL; - Relation constrRel; - HeapTuple htup; - SysScanDesc conscan; - ScanKeyData skey; - - constrRel = table_open(ConstraintRelationId, AccessShareLock); - ScanKeyInit(&skey, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(relation))); - conscan = systable_beginscan(constrRel, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &skey); - - while (HeapTupleIsValid(htup = systable_getnext(conscan))) - { - Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(htup); - AttrNumber colnum; - - if (conForm->contype != CONSTRAINT_NOTNULL) - continue; - if (conForm->connoinherit) - continue; - - colnum = extractNotNullColumn(htup); - - if (cooked) - { - CookedConstraint *cooked; - - cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); - - cooked->contype = CONSTR_NOTNULL; - cooked->name = pstrdup(NameStr(conForm->conname)); - cooked->attnum = colnum; - cooked->expr = NULL; - cooked->skip_validation = false; - cooked->is_local = true; - cooked->inhcount = 0; - cooked->is_no_inherit = conForm->connoinherit; - - notnulls = lappend(notnulls, cooked); - } - else - { - Constraint *constr; - - constr = makeNode(Constraint); - constr->contype = CONSTR_NOTNULL; - constr->conname = pstrdup(NameStr(conForm->conname)); - constr->deferrable = false; - constr->initdeferred = false; - constr->location = -1; - constr->colname = get_attname(RelationGetRelid(relation), - colnum, false); - constr->skip_validation = false; - constr->initially_valid = true; - notnulls = lappend(notnulls, constr); - } - } - - systable_endscan(conscan); - table_close(constrRel, AccessShareLock); - - return notnulls; -} /* * StoreCatalogInheritance @@ -3969,10 +3769,7 @@ rename_constraint_internal(Oid myrelid, constraintOid); con = (Form_pg_constraint) GETSTRUCT(tuple); - if (myrelid && - (con->contype == CONSTRAINT_CHECK || - con->contype == CONSTRAINT_NOTNULL) && - !con->connoinherit) + if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit) { if (recurse) { @@ -4557,7 +4354,6 @@ AlterTableGetLockLevel(List *cmds) case AT_AddIndexConstraint: case AT_ReplicaIdentity: case AT_SetNotNull: - case AT_SetAttNotNull: case AT_EnableRowSecurity: case AT_DisableRowSecurity: case AT_ForceRowSecurity: @@ -4856,23 +4652,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Set up recursion for phase 2; no other prep needed */ - if (recurse) - cmd->recurse = true; + ATPrepDropNotNull(rel, recurse, recursing); + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); pass = AT_PASS_DROP; break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); /* Need command-specific recursion decision */ - if (recurse) - cmd->recurse = true; - pass = AT_PASS_COL_ATTRS; - break; - case AT_SetAttNotNull: /* set pg_attribute.attnotnull without adding - * a constraint */ - ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Need command-specific recursion decision */ - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing, + lockmode, context); pass = AT_PASS_COL_ATTRS; break; case AT_CheckNotNull: /* check column is already marked NOT NULL */ @@ -5257,14 +5045,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode); break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ - address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode); + address = ATExecDropNotNull(rel, cmd->name, lockmode); break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ - address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name, - cmd->recurse, false, NULL, lockmode); - break; - case AT_SetAttNotNull: /* set pg_attribute.attnotnull */ - ATExecSetAttNotNull(wqueue, rel, cmd->name, lockmode); + address = ATExecSetNotNull(tab, rel, cmd->name, lockmode); break; case AT_CheckNotNull: /* check column is already marked NOT NULL */ ATExecCheckNotNull(tab, rel, cmd->name, lockmode); @@ -5603,8 +5387,11 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, */ switch (cmd2->subtype) { - case AT_SetAttNotNull: - ATSimpleRecursion(wqueue, rel, cmd2, recurse, lockmode, context); + case AT_SetNotNull: + /* Need command-specific recursion decision */ + ATPrepSetNotNull(wqueue, rel, cmd2, + recurse, false, + lockmode, context); pass = AT_PASS_COL_ATTRS; break; case AT_AddIndex: @@ -6280,7 +6067,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) RelationGetRelationName(oldrel)), errtableconstraint(oldrel, con->name))); break; - case CONSTR_NOTNULL: case CONSTR_FOREIGN: /* Nothing to do here */ break; @@ -6389,8 +6175,6 @@ alter_table_type_to_string(AlterTableType cmdtype) return "ALTER COLUMN ... DROP NOT NULL"; case AT_SetNotNull: return "ALTER COLUMN ... SET NOT NULL"; - case AT_SetAttNotNull: - return NULL; /* not real grammar */ case AT_DropExpression: return "ALTER COLUMN ... DROP EXPRESSION"; case AT_CheckNotNull: @@ -6990,7 +6774,8 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, */ static ObjectAddress ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, - AlterTableCmd **cmd, bool recurse, bool recursing, + AlterTableCmd **cmd, + bool recurse, bool recursing, LOCKMODE lockmode, int cur_pass, AlterTableUtilityContext *context) { @@ -7505,19 +7290,41 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid) /* * ALTER TABLE ALTER COLUMN DROP NOT NULL - * + */ + +static void +ATPrepDropNotNull(Relation rel, bool recurse, bool recursing) +{ + /* + * If the parent is a partitioned table, like check constraints, we do not + * support removing the NOT NULL while partitions exist. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc partdesc = RelationGetPartitionDesc(rel, true); + + Assert(partdesc != NULL); + if (partdesc->nparts > 0 && !recurse && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot remove constraint from only the partitioned table when partitions exist"), + errhint("Do not specify the ONLY keyword."))); + } +} + +/* * Return the address of the modified column. If the column was already * nullable, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecDropNotNull(Relation rel, const char *colName, bool recurse, - LOCKMODE lockmode) +ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) { HeapTuple tuple; - HeapTuple conTup; Form_pg_attribute attTup; AttrNumber attnum; Relation attr_rel; + List *indexoidlist; + ListCell *indexoidscan; ObjectAddress address; /* @@ -7533,15 +7340,6 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, colName, RelationGetRelationName(rel)))); attTup = (Form_pg_attribute) GETSTRUCT(tuple); attnum = attTup->attnum; - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); - - /* If the column is already nullable there's nothing to do. */ - if (!attTup->attnotnull) - { - table_close(attr_rel, RowExclusiveLock); - return InvalidObjectAddress; - } /* Prevent them from altering a system attribute */ if (attnum <= 0) @@ -7557,43 +7355,68 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, colName, RelationGetRelationName(rel)))); /* - * It's not OK to remove a constraint only for the parent and leave it in - * the children, so disallow that. + * Check that the attribute is not in a primary key or in an index used as + * a replica identity. + * + * Note: we'll throw error even if the pkey index is not valid. */ - if (!recurse) + + /* Loop over all indexes on the relation */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) { - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - PartitionDesc partdesc; + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index indexStruct; + int i; - partdesc = RelationGetPartitionDesc(rel, true); + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - if (partdesc->nparts > 0) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot remove constraint from only the partitioned table when partitions exist"), - errhint("Do not specify the ONLY keyword.")); - } - else if (rel->rd_rel->relhassubclass && - find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL) + /* + * If the index is not a primary key or an index used as replica + * identity, skip the check. + */ + if (indexStruct->indisprimary || indexStruct->indisreplident) { - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("NOT NULL constraint on column \"%s\" must be removed in child tables too", - colName), - errhint("Do not specify the ONLY keyword.")); + /* + * Loop over each attribute in the primary key or the index used + * as replica identity and see if it matches the to-be-altered + * attribute. + */ + for (i = 0; i < indexStruct->indnkeyatts; i++) + { + if (indexStruct->indkey.values[i] == attnum) + { + if (indexStruct->indisprimary) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in a primary key", + colName))); + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in index used as replica identity", + colName))); + } + } } + + ReleaseSysCache(indexTuple); } - /* - * If rel is partition, shouldn't drop NOT NULL if parent has the same. - */ + list_free(indexoidlist); + + /* If rel is partition, shouldn't drop NOT NULL if parent has the same */ if (rel->rd_rel->relispartition) { - Oid parentId = get_partition_parent(RelationGetRelid(rel), false); - Relation parent = table_open(parentId, AccessShareLock); - TupleDesc tupDesc = RelationGetDescr(parent); - AttrNumber parent_attnum; + Oid parentId = get_partition_parent(RelationGetRelid(rel), false); + Relation parent = table_open(parentId, AccessShareLock); + TupleDesc tupDesc = RelationGetDescr(parent); + AttrNumber parent_attnum; parent_attnum = get_attnum(parentId, colName); if (TupleDescAttr(tupDesc, parent_attnum - 1)->attnotnull) @@ -7605,33 +7428,22 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Find the constraint that makes this column NOT NULL. + * Okay, actually perform the catalog change ... if needed */ - conTup = findNotNullConstraint(rel, colName); - if (conTup == NULL) + if (attTup->attnotnull) { - Bitmapset *pkcols; + attTup->attnotnull = false; - /* - * There's no NOT NULL constraint, so throw an error. If the column - * is in a primary key, we can throw a specific error. Otherwise, - * this is unexpected. - */ - pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY); - if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, - pkcols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in a primary key", colName)); + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - /* this shouldn't happen */ - elog(ERROR, "no NOT NULL constraint found to drop"); + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); } + else + address = InvalidObjectAddress; - dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false, - false, NULL, lockmode); - - heap_freetuple(conTup); + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), attnum); table_close(attr_rel, RowExclusiveLock); @@ -7639,126 +7451,102 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3 - * to verify it; recurses to apply the same to children. - * - * When called to alter an existing table, 'wqueue' must be given so that we can - * queue a check that existing tuples pass the constraint. When called from - * table creation, 'wqueue' should be passed as NULL. + * ALTER TABLE ALTER COLUMN SET NOT NULL */ + static void -set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, bool recurse, - LOCKMODE lockmode) +ATPrepSetNotNull(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse, bool recursing, + LOCKMODE lockmode, AlterTableUtilityContext *context) { - HeapTuple tuple; - Form_pg_attribute attForm; - List *children; - ListCell *lc; + /* + * If we're already recursing, there's nothing to do; the topmost + * invocation of ATSimpleRecursion already visited all children. + */ + if (recursing) + return; - tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - attnum, RelationGetRelid(rel)); - attForm = (Form_pg_attribute) GETSTRUCT(tuple); - if (!attForm->attnotnull) + /* + * If the target column is already marked NOT NULL, we can skip recursing + * to children, because their columns should already be marked NOT NULL as + * well. But there's no point in checking here unless the relation has + * some children; else we can just wait till execution to check. (If it + * does have children, however, this can save taking per-child locks + * unnecessarily. This greatly improves concurrency in some parallel + * restore scenarios.) + * + * Unfortunately, we can only apply this optimization to partitioned + * tables, because traditional inheritance doesn't enforce that child + * columns be NOT NULL when their parent is. (That's a bug that should + * get fixed someday.) + */ + if (rel->rd_rel->relhassubclass && + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - Relation attr_rel; - - attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - - attForm->attnotnull = true; - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); + HeapTuple tuple; + bool attnotnull; - table_close(attr_rel, RowExclusiveLock); + tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name); - /* - * And set up for existing values to be checked, unless another constraint - * already proves this. - */ - if (wqueue && !NotNullImpliedByRelConstraints(rel, attForm)) - { - AlteredTableInfo *tab; + /* Might as well throw the error now, if name is bad */ + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + cmd->name, RelationGetRelationName(rel)))); - tab = ATGetQueueEntry(wqueue, rel); - tab->verify_new_notnull = true; - } + attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull; + ReleaseSysCache(tuple); + if (attnotnull) + return; } - /* if no recursion is desired, we're done */ - if (!recurse) - return; - - children = find_inheritance_children(RelationGetRelid(rel), lockmode); - foreach(lc, children) + /* + * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table, + * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use + * normal recursion logic. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + !recurse) { - Oid childrelid = lfirst_oid(lc); - Relation childrel; - AttrNumber childattno; - - /* find_inheritance_children already got lock */ - childrel = table_open(childrelid, NoLock); - CheckTableNotInUse(childrel, "ALTER TABLE"); + AlterTableCmd *newcmd = makeNode(AlterTableCmd); - childattno = get_attnum(RelationGetRelid(childrel), - get_attname(RelationGetRelid(rel), attnum, - false)); - set_attnotnull(wqueue, childrel, childattno, - recurse, lockmode); - table_close(childrel, NoLock); + newcmd->subtype = AT_CheckNotNull; + newcmd->name = pstrdup(cmd->name); + ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context); } + else + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); } /* - * ALTER TABLE ALTER COLUMN SET NOT NULL - * - * Add a NOT NULL constraint to a single table and its children. Returns - * the address of the constraint added to the parent relation, if one gets - * added, or InvalidObjectAddress otherwise. - * - * We must recurse to child tables during execution, rather than using - * ALTER TABLE's normal prep-time recursion. + * Return the address of the modified column. If the column was already NOT + * NULL, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, - bool recurse, bool recursing, List **readyRels, - LOCKMODE lockmode) +ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode) { HeapTuple tuple; - Relation constr_rel; - ScanKeyData skey; - SysScanDesc conscan; AttrNumber attnum; + Relation attr_rel; ObjectAddress address; - Constraint *constraint; - CookedConstraint *ccon; - List *cooked; - List *ready = NIL; /* - * In cases of multiple inheritance, we might visit the same child more - * than once. In the topmost call, set up a list that we fill with all - * visited relations, to skip those. + * lookup the attribute */ - if (readyRels == NULL) - readyRels = &ready; - if (list_member_oid(*readyRels, RelationGetRelid(rel))) - return InvalidObjectAddress; - *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); + attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - /* At top level, permission check was done in ATPrepCmd, else do it */ - if (recursing) - { - ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - Assert(conName != NULL); - } + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - attnum = get_attnum(RelationGetRelid(rel), colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", colName, RelationGetRelationName(rel)))); + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; + /* Prevent them from altering a system attribute */ if (attnum <= 0) ereport(ERROR, @@ -7766,168 +7554,43 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, errmsg("cannot alter system column \"%s\"", colName))); - /* See if there's already a constraint */ - constr_rel = table_open(ConstraintRelationId, RowExclusiveLock); - ScanKeyInit(&skey, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(rel))); - conscan = systable_beginscan(constr_rel, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &skey); - - while (HeapTupleIsValid(tuple = systable_getnext(conscan))) + /* + * Okay, actually perform the catalog change ... if needed + */ + if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) { - Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple); - bool changed = false; - HeapTuple copytup; + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = true; - if (conForm->contype != CONSTRAINT_NOTNULL) - continue; - - if (extractNotNullColumn(tuple) != attnum) - continue; - - copytup = heap_copytuple(tuple); - conForm = (Form_pg_constraint) GETSTRUCT(copytup); + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); /* - * If we find an appropriate constraint, we're almost done, but just - * need to change some properties on it: if we're recursing, increment - * coninhcount; if not, set conislocal if not already set. + * Ordinarily phase 3 must ensure that no NULLs exist in columns that + * are set NOT NULL; however, if we can find a constraint which proves + * this then we can skip that. We needn't bother looking if we've + * already found that we must verify some other NOT NULL constraint. */ - if (recursing) - { - conForm->coninhcount++; - changed = true; - } - else if (!conForm->conislocal) + if (!tab->verify_new_notnull && + !NotNullImpliedByRelConstraints(rel, (Form_pg_attribute) GETSTRUCT(tuple))) { - conForm->conislocal = true; - changed = true; - } - - if (changed) - { - CatalogTupleUpdate(constr_rel, ©tup->t_self, copytup); - ObjectAddressSet(address, ConstraintRelationId, conForm->oid); + /* Tell Phase 3 it needs to test the constraint */ + tab->verify_new_notnull = true; } - systable_endscan(conscan); - table_close(constr_rel, RowExclusiveLock); - - if (changed) - return address; - else - return InvalidObjectAddress; - } - - systable_endscan(conscan); - table_close(constr_rel, RowExclusiveLock); - - /* - * If we're asked not to recurse, and children exist, raise an error. - */ - if (!recurse && - find_inheritance_children(RelationGetRelid(rel), - NoLock) != NIL) - { - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot add constraint to only the partitioned table when partitions exist"), - errhint("Do not specify the ONLY keyword.")); - else - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot add constraint only to table with inheritance children"), - errhint("Do not specify the ONLY keyword.")); - } - - /* - * No constraint exists; we must add one. First determine a name to use, - * if we haven't already. - */ - if (!recursing) - { - Assert(conName == NULL); - conName = ChooseConstraintName(RelationGetRelationName(rel), - colName, "not_null", - RelationGetNamespace(rel), - NIL); + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); } - constraint = makeNode(Constraint); - constraint->contype = CONSTR_NOTNULL; - constraint->conname = conName; - constraint->deferrable = false; - constraint->initdeferred = false; - constraint->location = -1; - constraint->colname = colName; - constraint->skip_validation = false; - constraint->initially_valid = true; - - /* and do it */ - cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint), - false, !recursing, false, NULL); - ccon = linitial(cooked); - ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); - - /* - * Mark pg_attribute.attnotnull for the column. Tell that function not to - * recurse, because we're going to do it here. - */ - set_attnotnull(wqueue, rel, attnum, false, lockmode); - - /* - * Recurse to propagate the constraint to children that don't have one. - */ - if (recurse) - { - List *children; - ListCell *lc; - - children = find_inheritance_children(RelationGetRelid(rel), - lockmode); - - foreach(lc, children) - { - Relation childrel; - - childrel = table_open(lfirst_oid(lc), NoLock); + else + address = InvalidObjectAddress; - ATExecSetNotNull(wqueue, childrel, - conName, colName, recurse, true, - readyRels, lockmode); + InvokeObjectPostAlterHook(RelationRelationId, + RelationGetRelid(rel), attnum); - table_close(childrel, NoLock); - } - } + table_close(attr_rel, RowExclusiveLock); return address; } /* - * ALTER TABLE ALTER COLUMN SET ATTNOTNULL - * - * This doesn't exist in the grammar; it's used when creating a - * primary key and the column is not already marked attnotnull. - */ -static void -ATExecSetAttNotNull(List **wqueue, Relation rel, - const char *colName, LOCKMODE lockmode) -{ - AttrNumber attnum; - - attnum = get_attnum(RelationGetRelid(rel), colName); - if (attnum == InvalidAttrNumber) /* XXX should not happen .. elog? */ - ereport(ERROR, - errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel))); - - set_attnotnull(wqueue, rel, attnum, false, lockmode); -} - -/* * ALTER TABLE ALTER COLUMN CHECK NOT NULL * * This doesn't exist in the grammar, but we generate AT_CheckNotNull @@ -9209,14 +8872,13 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Assert(IsA(newConstraint, Constraint)); /* - * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and - * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in - * parse_utilcmd.c). + * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes + * arriving here (see the preprocessing done in parse_utilcmd.c). Use a + * switch anyway to make it easier to add more code later. */ switch (newConstraint->contype) { case CONSTR_CHECK: - case CONSTR_NOTNULL: address = ATAddCheckConstraint(wqueue, tab, rel, newConstraint, recurse, false, is_readd, @@ -9301,9 +8963,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) } /* - * Add a check or NOT NULL constraint to a single table and its children. - * Returns the address of the constraint added to the parent relation, - * if one gets added, or InvalidObjectAddress otherwise. + * Add a check constraint to a single table and its children. Returns the + * address of the constraint added to the parent relation, if one gets added, + * or InvalidObjectAddress otherwise. * * Subroutine for ATExecAddConstraint. * @@ -9356,7 +9018,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL) + if (!ccon->skip_validation) { NewConstraint *newcon; @@ -9372,14 +9034,6 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (constr->conname == NULL) constr->conname = ccon->name; - /* - * If adding a NOT NULL constraint, set the pg_attribute flag and - * tell phase 3 to verify existing rows, if needed. - */ - if (constr->contype == CONSTR_NOTNULL) - set_attnotnull(wqueue, rel, ccon->attnum, - !ccon->is_no_inherit, lockmode); - ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); } @@ -12304,11 +11958,16 @@ ATExecDropConstraint(Relation rel, const char *constrName, bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode) { + List *children; + ListCell *child; Relation conrel; + Form_pg_constraint con; SysScanDesc scan; ScanKeyData skey[3]; HeapTuple tuple; bool found = false; + bool is_no_inherit_constraint = false; + char contype; /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) @@ -12337,258 +11996,80 @@ ATExecDropConstraint(Relation rel, const char *constrName, /* There can be at most one matching row */ if (HeapTupleIsValid(tuple = systable_getnext(scan))) { - dropconstraint_internal(rel, tuple, behavior, recurse, recursing, - missing_ok, NULL, lockmode); - found = true; - } + ObjectAddress conobj; - systable_endscan(scan); + con = (Form_pg_constraint) GETSTRUCT(tuple); - if (!found) - { - if (!missing_ok) + /* Don't drop inherited constraints */ + if (con->coninhcount > 0 && !recursing) ereport(ERROR, - errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, RelationGetRelationName(rel))); - else - ereport(NOTICE, - errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", - constrName, RelationGetRelationName(rel))); - } - - table_close(conrel, RowExclusiveLock); -} - -/* - * Remove a constraint, using its pg_constraint tuple - * - * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN - * DROP NOT NULL. - * - * Returns the address of the constraint being removed. - */ -static ObjectAddress -dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior, - bool recurse, bool recursing, bool missing_ok, List **readyRels, - LOCKMODE lockmode) -{ - Relation conrel; - Form_pg_constraint con; - ObjectAddress conobj; - List *children; - ListCell *child; - bool is_no_inherit_constraint = false; - bool dropping_pk = false; - char *constrName; - List *unconstrained_cols = NIL; - char *colname; /* to match NOT NULL constraints when recursing */ - List *ready = NIL; - - if (readyRels == NULL) - readyRels = &ready; - if (list_member_oid(*readyRels, RelationGetRelid(rel))) - return InvalidObjectAddress; - *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); - - conrel = table_open(ConstraintRelationId, RowExclusiveLock); - - con = (Form_pg_constraint) GETSTRUCT(constraintTup); - constrName = NameStr(con->conname); - - /* - * If the constraint is marked conislocal and is also inherited, then we - * just set conislocal false and we're done. The constraint doesn't go - * away, and we don't modify any children. - */ - if (con->conislocal && con->coninhcount > 0) - { - HeapTuple copytup; - - /* make a copy we can scribble on */ - copytup = heap_copytuple(constraintTup); - con = (Form_pg_constraint) GETSTRUCT(copytup); - con->conislocal = false; - CatalogTupleUpdate(conrel, ©tup->t_self, copytup); - - table_close(conrel, RowExclusiveLock); - - ObjectAddressSet(conobj, ConstraintRelationId, con->oid); - return conobj; - } - - /* Don't drop inherited constraints */ - if (con->coninhcount > 0 && !recursing) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", - constrName, RelationGetRelationName(rel)))); - - /* - * See if we have a NOT NULL constraint or a PRIMARY KEY. If so, we have - * more checks and actions below, so obtain the list of columns that are - * constrained by the constraint being dropped. - */ - if (con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber colnum = extractNotNullColumn(constraintTup); - - if (colnum != InvalidAttrNumber) - unconstrained_cols = list_make1_int(colnum); - } - else if (con->contype == CONSTRAINT_PRIMARY) - { - Datum adatum; - ArrayType *arr; - int numkeys; - bool isNull; - int16 *attnums; + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); - dropping_pk = true; + is_no_inherit_constraint = con->connoinherit; + contype = con->contype; - adatum = heap_getattr(constraintTup, Anum_pg_constraint_conkey, - RelationGetDescr(conrel), &isNull); - if (isNull) - elog(ERROR, "null conkey for constraint %u", con->oid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - numkeys = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - numkeys < 0 || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - attnums = (int16 *) ARR_DATA_PTR(arr); + /* + * If it's a foreign-key constraint, we'd better lock the referenced + * table and check that that's not in use, just as we've already done + * for the constrained table (else we might, eg, be dropping a trigger + * that has unfired events). But we can/must skip that in the + * self-referential case. + */ + if (contype == CONSTRAINT_FOREIGN && + con->confrelid != RelationGetRelid(rel)) + { + Relation frel; - for (int i = 0; i < numkeys; i++) - unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]); - } + /* Must match lock taken by RemoveTriggerById: */ + frel = table_open(con->confrelid, AccessExclusiveLock); + CheckTableNotInUse(frel, "ALTER TABLE"); + table_close(frel, NoLock); + } - is_no_inherit_constraint = con->connoinherit; + /* + * Perform the actual constraint deletion + */ + conobj.classId = ConstraintRelationId; + conobj.objectId = con->oid; + conobj.objectSubId = 0; - /* - * If it's a foreign-key constraint, we'd better lock the referenced table - * and check that that's not in use, just as we've already done for the - * constrained table (else we might, eg, be dropping a trigger that has - * unfired events). But we can/must skip that in the self-referential - * case. - */ - if (con->contype == CONSTRAINT_FOREIGN && - con->confrelid != RelationGetRelid(rel)) - { - Relation frel; + performDeletion(&conobj, behavior, 0); - /* Must match lock taken by RemoveTriggerById: */ - frel = table_open(con->confrelid, AccessExclusiveLock); - CheckTableNotInUse(frel, "ALTER TABLE"); - table_close(frel, NoLock); + found = true; } - /* - * Perform the actual constraint deletion - */ - ObjectAddressSet(conobj, ConstraintRelationId, con->oid); - performDeletion(&conobj, behavior, 0); + systable_endscan(scan); - /* - * If this was a NOT NULL or the primary key, the constrained columns must - * have had pg_attribute.attnotnull set. See if we need to reset it, and - * do so. - */ - if (unconstrained_cols) + if (!found) { - Relation attrel; - Bitmapset *pkcols; - Bitmapset *ircols; - ListCell *lc; - - /* Make the above deletion visible */ - CommandCounterIncrement(); - - attrel = table_open(AttributeRelationId, RowExclusiveLock); - - /* - * We want to test columns for their presence in the primary key, but - * only if we're not dropping it. - */ - pkcols = dropping_pk ? NULL : - RelationGetIndexAttrBitmap(rel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY); - - foreach(lc, unconstrained_cols) + if (!missing_ok) { - AttrNumber attnum = lfirst_int(lc); - HeapTuple atttup; - HeapTuple contup; - Form_pg_attribute attForm; - - /* - * Obtain pg_attribute tuple and verify conditions on it. We use - * a copy we can scribble on. - */ - atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); - if (!HeapTupleIsValid(atttup)) - elog(ERROR, "cache lookup failed for column %d", attnum); - attForm = (Form_pg_attribute) GETSTRUCT(atttup); - - /* - * Since the above deletion has been made visible, we can now - * search for any remaining constraints on this column (or these - * columns, in the case we're dropping a multicol primary key.) - * Then, verify whether any further NOT NULL or primary key - * exists, and reset attnotnull if none. - * - * However, if this is a generated identity column, abort the - * whole thing with a specific error message, because the - * constraint is required in that case. - */ - contup = findNotNullConstraintAttnum(rel, attnum); - if (contup || - bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, - pkcols)) - continue; - - /* - * It's not valid to drop the last NOT NULL constraint for a - * GENERATED AS IDENTITY column. - */ - if (attForm->attidentity) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("column \"%s\" of relation \"%s\" is an identity column", - get_attname(RelationGetRelid(rel), attnum, - false), - RelationGetRelationName(rel))); - - /* - * It's not valid to drop the last NOT NULL constraint for the - * replica identity either. XXX make exception for FULL? - */ - if (bms_is_member(lfirst_int(lc) - FirstLowInvalidHeapAttributeNumber, ircols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in index used as replica identity", - get_attname(RelationGetRelid(rel), lfirst_int(lc), false))); - - /* Reset attnotnull */ - if (attForm->attnotnull) - { - attForm->attnotnull = false; - CatalogTupleUpdate(attrel, &atttup->t_self, atttup); - } + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); + } + else + { + ereport(NOTICE, + (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", + constrName, RelationGetRelationName(rel)))); + table_close(conrel, RowExclusiveLock); + return; } - table_close(attrel, RowExclusiveLock); } /* * For partitioned tables, non-CHECK inherited constraints are dropped via * the dependency mechanism, so we're done here. */ - if (con->contype != CONSTRAINT_CHECK && + if (contype != CONSTRAINT_CHECK && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { table_close(conrel, RowExclusiveLock); - return conobj; + return; } /* @@ -12613,104 +12094,50 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha errmsg("cannot remove constraint from only the partitioned table when partitions exist"), errhint("Do not specify the ONLY keyword."))); - /* For NOT NULL constraints we recurse by column name */ - if (con->contype == CONSTRAINT_NOTNULL) - colname = NameStr(TupleDescAttr(RelationGetDescr(rel), - linitial_int(unconstrained_cols) - 1)->attname); - else - colname = NULL; /* keep compiler quiet */ - foreach(child, children) { Oid childrelid = lfirst_oid(child); Relation childrel; - HeapTuple tuple; - Form_pg_constraint childcon; HeapTuple copy_tuple; - SysScanDesc scan; - ScanKeyData skey[3]; - - if (list_member_oid(*readyRels, childrelid)) - continue; /* child already processed */ /* find_inheritance_children already got lock */ childrel = table_open(childrelid, NoLock); CheckTableNotInUse(childrel, "ALTER TABLE"); - /* - * We search for NOT NULL constraint by column number, and other - * constraints by name. - */ - if (con->contype == CONSTRAINT_NOTNULL) - { - bool found = false; - AttrNumber child_colnum; - HeapTuple child_tup; - - child_colnum = get_attnum(RelationGetRelid(childrel), colname); - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 1, skey); - while (HeapTupleIsValid(child_tup = systable_getnext(scan))) - { - Form_pg_constraint constr = (Form_pg_constraint) GETSTRUCT(child_tup); - AttrNumber constr_colnum; - - if (constr->contype != CONSTRAINT_NOTNULL) - continue; - constr_colnum = extractNotNullColumn(child_tup); - if (constr_colnum != child_colnum) - continue; + ScanKeyInit(&skey[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + ScanKeyInit(&skey[1], + Anum_pg_constraint_contypid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + ScanKeyInit(&skey[2], + Anum_pg_constraint_conname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(constrName)); + scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, + true, NULL, 3, skey); + + /* There can be at most one matching row */ + if (!HeapTupleIsValid(tuple = systable_getnext(scan))) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); - found = true; - break; /* found it */ - } - if (!found) /* shouldn't happen? */ - elog(ERROR, "failed to find NOT NULL constraint for column \"%s\" in table \"%s\"", - colname, RelationGetRelationName(childrel)); + copy_tuple = heap_copytuple(tuple); - copy_tuple = heap_copytuple(child_tup); - systable_endscan(scan); - } - else - { - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - ScanKeyInit(&skey[1], - Anum_pg_constraint_contypid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(InvalidOid)); - ScanKeyInit(&skey[2], - Anum_pg_constraint_conname, - BTEqualStrategyNumber, F_NAMEEQ, - CStringGetDatum(constrName)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 3, skey); - /* There can only be one, so no need to loop */ - tuple = systable_getnext(scan); - if (!HeapTupleIsValid(tuple)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, - RelationGetRelationName(childrel)))); - copy_tuple = heap_copytuple(tuple); - systable_endscan(scan); - } + systable_endscan(scan); - childcon = (Form_pg_constraint) GETSTRUCT(copy_tuple); + con = (Form_pg_constraint) GETSTRUCT(copy_tuple); - /* Right now only CHECK and NOT NULL constraints can be inherited */ - if (childcon->contype != CONSTRAINT_CHECK && - childcon->contype != CONSTRAINT_NOTNULL) - elog(ERROR, "inherited constraint is not a CHECK or NOT NULL constraint"); + /* Right now only CHECK constraints can be inherited */ + if (con->contype != CONSTRAINT_CHECK) + elog(ERROR, "inherited constraint is not a CHECK constraint"); - if (childcon->coninhcount <= 0) /* shouldn't happen */ + if (con->coninhcount <= 0) /* shouldn't happen */ elog(ERROR, "relation %u has non-inherited constraint \"%s\"", childrelid, constrName); @@ -12720,17 +12147,17 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha * If the child constraint has other definition sources, just * decrement its inheritance count; if not, recurse to delete it. */ - if (childcon->coninhcount == 1 && !childcon->conislocal) + if (con->coninhcount == 1 && !con->conislocal) { /* Time to delete this child constraint, too */ - dropconstraint_internal(childrel, copy_tuple, behavior, - recurse, true, missing_ok, readyRels, - lockmode); + ATExecDropConstraint(childrel, constrName, behavior, + true, true, + false, lockmode); } else { /* Child constraint must survive my deletion */ - childcon->coninhcount--; + con->coninhcount--; CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); /* Make update visible */ @@ -12744,8 +12171,8 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha * need to mark the inheritors' constraints as locally defined * rather than inherited. */ - childcon->coninhcount--; - childcon->conislocal = true; + con->coninhcount--; + con->conislocal = true; CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); @@ -12759,8 +12186,6 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha } table_close(conrel, RowExclusiveLock); - - return conobj; } /* @@ -14086,10 +13511,10 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, NIL, con->conname); } - else if (cmd->subtype == AT_SetAttNotNull) + else if (cmd->subtype == AT_SetNotNull) { /* - * The parser will create AT_AttSetNotNull subcommands for + * The parser will create AT_SetNotNull subcommands for * columns of PRIMARY KEY indexes/constraints, but we need * not do anything with them here, because the columns' * NOT NULL marks will already have been propagated into @@ -15833,7 +15258,6 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) SysScanDesc parent_scan; ScanKeyData parent_key; HeapTuple parent_tuple; - Oid parent_relid = RelationGetRelid(parent_rel); bool child_is_partition = false; catalog_relation = table_open(ConstraintRelationId, RowExclusiveLock); @@ -15847,7 +15271,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ScanKeyInit(&parent_key, Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(parent_relid)); + ObjectIdGetDatum(RelationGetRelid(parent_rel))); parent_scan = systable_beginscan(catalog_relation, ConstraintRelidTypidNameIndexId, true, NULL, 1, &parent_key); @@ -15859,8 +15283,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) HeapTuple child_tuple; bool found = false; - if (parent_con->contype != CONSTRAINT_CHECK && - parent_con->contype != CONSTRAINT_NOTNULL) + if (parent_con->contype != CONSTRAINT_CHECK) continue; /* if the parent's constraint is marked NO INHERIT, it's not inherited */ @@ -15880,30 +15303,14 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); HeapTuple child_copy; - if (child_con->contype != parent_con->contype) + if (child_con->contype != CONSTRAINT_CHECK) continue; - /* - * CHECK constraint are matched by name, NOT NULL ones by - * attribute number - */ - if (child_con->contype == CONSTRAINT_CHECK && - strcmp(NameStr(parent_con->conname), + if (strcmp(NameStr(parent_con->conname), NameStr(child_con->conname)) != 0) continue; - else if (child_con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber parent_attno = extractNotNullColumn(parent_tuple); - AttrNumber child_attno = extractNotNullColumn(child_tuple); - - if (strcmp(get_attname(parent_relid, parent_attno, false), - get_attname(RelationGetRelid(child_rel), child_attno, - false)) != 0) - continue; - } - if (child_con->contype == CONSTRAINT_CHECK && - !constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) + if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table \"%s\" has different definition for check constraint \"%s\"", @@ -16111,7 +15518,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) HeapTuple attributeTuple, constraintTuple; List *connames; - List *nncolumns; bool found; bool child_is_partition = false; @@ -16182,8 +15588,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) * this, we first need a list of the names of the parent's check * constraints. (We cheat a bit by only checking for name matches, * assuming that the expressions will match.) - * - * For NOT NULL columns, we store column numbers to match. */ catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock); ScanKeyInit(&key[0], @@ -16194,7 +15598,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) true, NULL, 1, key); connames = NIL; - nncolumns = NIL; while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { @@ -16202,8 +15605,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) if (con->contype == CONSTRAINT_CHECK) connames = lappend(connames, pstrdup(NameStr(con->conname))); - if (con->contype == CONSTRAINT_NOTNULL) - nncolumns = lappend_int(nncolumns, extractNotNullColumn(constraintTuple)); } systable_endscan(scan); @@ -16219,40 +15620,21 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - bool match = false; + bool match; ListCell *lc; - /* - * Match CHECK constraints by name, NOT NULL constraints by column - * number, and ignore all others. - */ - if (con->contype == CONSTRAINT_CHECK) - { - foreach(lc, connames) - { - if (con->contype == CONSTRAINT_CHECK && - strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) - { - match = true; - break; - } - } - } - else if (con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber child_attno = extractNotNullColumn(constraintTuple); + if (con->contype != CONSTRAINT_CHECK) + continue; - foreach(lc, nncolumns) + match = false; + foreach(lc, connames) + { + if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) { - if (lfirst_int(lc) == child_attno) - { - match = true; - break; - } + match = true; + break; } } - else - continue; if (match) { @@ -18543,7 +17925,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, StorePartitionBound(attachrel, rel, cmd->bound); /* Ensure there exists a correct set of indexes in the partition. */ - AttachPartitionEnsureIndexes(wqueue, rel, attachrel); + AttachPartitionEnsureIndexes(rel, attachrel); /* and triggers */ CloneRowTriggersToPartition(rel, attachrel); @@ -18656,12 +18038,13 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, * partitioned table. */ static void -AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) +AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) { List *idxes; List *attachRelIdxs; Relation *attachrelIdxRels; IndexInfo **attachInfos; + int i; ListCell *cell; MemoryContext cxt; MemoryContext oldcxt; @@ -18677,13 +18060,14 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs)); /* Build arrays of all existing indexes and their IndexInfos */ + i = 0; foreach(cell, attachRelIdxs) { Oid cldIdxId = lfirst_oid(cell); - int i = foreach_current_index(cell); attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock); attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]); + i++; } /* @@ -18749,7 +18133,7 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) * the first matching, unattached one we find, if any, as partition of * the parent index. If we find one, we're done. */ - for (int i = 0; i < list_length(attachRelIdxs); i++) + for (i = 0; i < list_length(attachRelIdxs); i++) { Oid cldIdxId = RelationGetRelid(attachrelIdxRels[i]); Oid cldConstrOid = InvalidOid; @@ -18805,28 +18189,6 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) stmt = generateClonedIndexStmt(NULL, idxRel, attmap, &conOid); - - /* - * If the index is a primary key, mark all columns as NOT NULL if - * they aren't already. - */ - if (stmt->primary) - { - MemoryContextSwitchTo(oldcxt); - for (int j = 0; j < info->ii_NumIndexKeyAttrs; j++) - { - AttrNumber childattno; - - childattno = get_attnum(RelationGetRelid(attachrel), - get_attname(RelationGetRelid(rel), - info->ii_IndexAttrNumbers[j], - false)); - set_attnotnull(wqueue, attachrel, childattno, - true, AccessExclusiveLock); - } - MemoryContextSwitchTo(cxt); - } - DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid, RelationGetRelid(idxRel), conOid, @@ -18839,7 +18201,7 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) out: /* Clean up. */ - for (int i = 0; i < list_length(attachRelIdxs); i++) + for (i = 0; i < list_length(attachRelIdxs); i++) index_close(attachrelIdxRels[i], AccessShareLock); MemoryContextSwitchTo(oldcxt); MemoryContextDelete(cxt); @@ -19740,13 +19102,6 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) RelationGetRelationName(partIdx)))); } - /* - * If it's a primary key, make sure the columns in the partition are - * NOT NULL. - */ - if (parentIdx->rd_index->indisprimary) - verifyPartitionIndexNotNull(childInfo, partTbl); - /* All good -- do it */ IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx)); if (OidIsValid(constraintOid)) @@ -19884,29 +19239,6 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl) } /* - * When attaching an index as a partition of a partitioned index which is a - * primary key, verify that all the columns in the partition are marked NOT - * NULL. - */ -static void -verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition) -{ - for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++) - { - Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition), - iinfo->ii_IndexAttrNumbers[i] - 1); - - if (!att->attnotnull) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("invalid primary key definition"), - errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.", - NameStr(att->attname), - RelationGetRelationName(partition))); - } -} - -/* * Return an OID list of constraints that reference the given relation * that are marked as having a parent constraints. */ |