diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 1264 |
1 files changed, 827 insertions, 437 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index eaa81424270..ccd9645e7d2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -371,7 +371,8 @@ static void truncate_check_activity(Relation rel); static void RangeVarCallbackForTruncate(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); static List *MergeAttributes(List *columns, const List *supers, char relpersistence, - bool is_partition, List **supconstr); + bool is_partition, List **supconstr, + List **supnotnulls); static List *MergeCheckConstraint(List *constraints, const char *name, Node *expr); static void MergeChildAttribute(List *inh_columns, int exist_attno, int newcol_attno, const ColumnDef *newdef); static ColumnDef *MergeInheritedAttribute(List *inh_columns, int exist_attno, const ColumnDef *newdef); @@ -456,15 +457,14 @@ static bool check_for_column_name_collision(Relation rel, const char *colname, bool if_not_exists); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid); -static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode); -static void ATPrepSetNotNull(List **wqueue, Relation rel, - AlterTableCmd *cmd, bool recurse, bool recursing, - LOCKMODE lockmode, - AlterTableUtilityContext *context); -static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode); -static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode); +static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse, + LOCKMODE lockmode); +static void set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, + LOCKMODE lockmode); +static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel, + char *constrname, char *colName, + bool recurse, bool recursing, + LOCKMODE lockmode); static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr); static bool ConstraintImpliedByRelConstraint(Relation scanrel, List *testConstraint, List *provenConstraint); @@ -496,6 +496,9 @@ static ObjectAddress ATExecDropColumn(List **wqueue, Relation rel, const char *c bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode, ObjectAddresses *addrs); +static void ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, + bool recurse, LOCKMODE lockmode, + AlterTableUtilityContext *context); static ObjectAddress ATExecAddIndex(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode); static ObjectAddress ATExecAddStatistics(AlteredTableInfo *tab, Relation rel, @@ -507,11 +510,11 @@ static ObjectAddress ATExecAddConstraint(List **wqueue, static char *ChooseForeignKeyConstraintNameAddition(List *colnames); static ObjectAddress ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, LOCKMODE lockmode); -static ObjectAddress ATAddCheckConstraint(List **wqueue, - AlteredTableInfo *tab, Relation rel, - Constraint *constr, - bool recurse, bool recursing, bool is_readd, - LOCKMODE lockmode); +static ObjectAddress ATAddCheckNNConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Constraint *constr, + bool recurse, bool recursing, bool is_readd, + LOCKMODE lockmode); static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Constraint *fkconstraint, bool recurse, bool recursing, @@ -577,9 +580,12 @@ static void GetForeignKeyCheckTriggers(Relation trigrel, Oid *insertTriggerOid, Oid *updateTriggerOid); static void ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, - bool recurse, bool recursing, + DropBehavior behavior, bool recurse, bool missing_ok, LOCKMODE lockmode); +static ObjectAddress dropconstraint_internal(Relation rel, + HeapTuple constraintTup, DropBehavior behavior, + bool recurse, bool recursing, + bool missing_ok, LOCKMODE lockmode); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -677,6 +683,7 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl); static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl); +static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx); static List *GetParentedForeignKeyRefs(Relation partition); static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, const char *compression); @@ -714,8 +721,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, TupleDesc descriptor; List *inheritOids; List *old_constraints; + List *old_notnulls; List *rawDefaults; List *cookedDefaults; + List *nncols; Datum reloptions; ListCell *listptr; AttrNumber attnum; @@ -906,12 +915,13 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, MergeAttributes(stmt->tableElts, inheritOids, stmt->relation->relpersistence, stmt->partbound != NULL, - &old_constraints); + &old_constraints, &old_notnulls); /* * Create a tuple descriptor from the relation schema. Note that this - * deals with column names, types, and not-null constraints, but not - * default values or CHECK constraints; we handle those below. + * deals with column names, types, and in-descriptor NOT NULL flags, but + * not default values, NOT NULL or CHECK constraints; we handle those + * below. */ descriptor = BuildDescForRelation(stmt->tableElts); @@ -1283,6 +1293,17 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, AddRelationNewConstraints(rel, NIL, stmt->constraints, true, true, false, queryString); + /* + * Finally, merge the not-null constraints that are declared directly with + * those that come from parent relations (making sure to count inheritance + * appropriately for each), create them, and set the attnotnull flag on + * columns that don't yet have it. + */ + nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints, + old_notnulls); + foreach_int(attrnum, nncols) + set_attnotnull(NULL, rel, attrnum, NoLock); + ObjectAddressSet(address, RelationRelationId, relationId); /* @@ -2414,6 +2435,8 @@ storage_name(char c) * Output arguments: * 'supconstr' receives a list of constraints belonging to the parents, * updated as necessary to be valid for the child. + * 'supnotnulls' receives a list of CookedConstraints that corresponds to + * constraints coming from inheritance parents. * * Return value: * Completed schema list. @@ -2444,7 +2467,10 @@ storage_name(char c) * * Constraints (including not-null constraints) for the child table * are the union of all relevant constraints, from both the child schema - * and parent tables. + * and parent tables. In addition, in legacy inheritance, each column that + * appears in a primary key in any of the parents also gets a NOT NULL + * constraint (partitioning doesn't need this, because the PK itself gets + * inherited.) * * The default value for a child column is defined as: * (1) If the child schema specifies a default, that value is used. @@ -2463,10 +2489,11 @@ storage_name(char c) */ static List * MergeAttributes(List *columns, const List *supers, char relpersistence, - bool is_partition, List **supconstr) + bool is_partition, List **supconstr, List **supnotnulls) { List *inh_columns = NIL; List *constraints = NIL; + List *nnconstraints = NIL; bool have_bogus_defaults = false; int child_attno; static Node bogus_marker = {0}; /* marks conflicting defaults */ @@ -2577,8 +2604,10 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, AttrMap *newattmap; List *inherited_defaults; List *cols_with_defaults; + List *nnconstrs; ListCell *lc1; ListCell *lc2; + Bitmapset *nncols = NULL; /* caller already got lock */ relation = table_open(parent, NoLock); @@ -2666,6 +2695,15 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, /* We can't process inherited defaults until newattmap is complete. */ inherited_defaults = cols_with_defaults = NIL; + /* + * Request attnotnull on columns that have a not-null constraint + * that's not marked NO INHERIT. + */ + nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation), + true, false); + foreach_ptr(CookedConstraint, cc, nnconstrs) + nncols = bms_add_member(nncols, cc->attnum); + for (AttrNumber parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) { @@ -2687,7 +2725,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, */ newdef = makeColumnDef(attributeName, attribute->atttypid, attribute->atttypmod, attribute->attcollation); - newdef->is_not_null = attribute->attnotnull; newdef->storage = attribute->attstorage; newdef->generated = attribute->attgenerated; if (CompressionMethodIsValid(attribute->attcompression)) @@ -2736,6 +2773,12 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, } /* + * mark attnotnull if parent has it + */ + if (bms_is_member(parent_attno, nncols)) + mergeddef->is_not_null = true; + + /* * Locate default/generation expression if any */ if (attribute->atthasdef) @@ -2846,6 +2889,19 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, } } + /* + * Also copy the not-null constraints from this parent. The + * attnotnull markings were already installed above. + */ + foreach_ptr(CookedConstraint, nn, nnconstrs) + { + Assert(nn->contype == CONSTR_NOTNULL); + + nn->attnum = newattmap->attnums[nn->attnum - 1]; + + nnconstraints = lappend(nnconstraints, nn); + } + free_attrmap(newattmap); /* @@ -2916,8 +2972,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, /* * Now that we have the column definition list for a partition, we can * check whether the columns referenced in the column constraint specs - * actually exist. Also, we merge parent's not-null constraints and - * defaults into each corresponding column definition. + * actually exist. Also, merge column defaults. */ if (is_partition) { @@ -2934,7 +2989,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, if (strcmp(coldef->colname, restdef->colname) == 0) { found = true; - coldef->is_not_null |= restdef->is_not_null; /* * Check for conflicts related to generated columns. @@ -3023,6 +3077,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, } *supconstr = constraints; + *supnotnulls = nnconstraints; return columns; } @@ -3304,11 +3359,6 @@ MergeInheritedAttribute(List *inh_columns, format_type_with_typemod(newtypeid, newtypmod)))); /* - * Merge of not-null constraints = OR 'em together - */ - prevdef->is_not_null |= newdef->is_not_null; - - /* * Must have the same collation */ prevcollid = GetColumnDefCollation(NULL, prevdef, prevtypeid); @@ -3946,7 +3996,10 @@ rename_constraint_internal(Oid myrelid, constraintOid); con = (Form_pg_constraint) GETSTRUCT(tuple); - if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit) + if (myrelid && + (con->contype == CONSTRAINT_CHECK || + con->contype == CONSTRAINT_NOTNULL) && + !con->connoinherit) { if (recurse) { @@ -4704,15 +4757,6 @@ AlterTableGetLockLevel(List *cmds) cmd_lockmode = ShareUpdateExclusiveLock; break; - case AT_CheckNotNull: - - /* - * This only examines the table's schema; but lock must be - * strong enough to prevent concurrent DROP NOT NULL. - */ - cmd_lockmode = AccessShareLock; - break; - default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -4881,22 +4925,17 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE); - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + /* Set up recursion for phase 2; no other prep needed */ + if (recurse) + cmd->recurse = true; pass = AT_PASS_DROP; break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE); - /* Need command-specific recursion decision */ - ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing, - lockmode, context); - pass = AT_PASS_COL_ATTRS; - break; - case AT_CheckNotNull: /* check column is already marked NOT NULL */ - ATSimplePermissions(cmd->subtype, rel, - ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE); - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); - /* No command-specific prep needed */ + /* Set up recursion for phase 2; no other prep needed */ + if (recurse) + cmd->recurse = true; pass = AT_PASS_COL_ATTRS; break; case AT_SetExpression: /* ALTER COLUMN SET EXPRESSION */ @@ -4961,10 +5000,13 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, case AT_AddConstraint: /* ADD CONSTRAINT */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE); - /* Recursion occurs during execution phase */ - /* No command-specific prep needed except saving recurse flag */ + ATPrepAddPrimaryKey(wqueue, rel, cmd, recurse, lockmode, context); if (recurse) + { + /* recurses at exec time; lock descendants and set flag */ + (void) find_all_inheritors(RelationGetRelid(rel), lockmode, NULL); cmd->recurse = true; + } pass = AT_PASS_ADD_CONSTR; break; case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ @@ -5279,13 +5321,11 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode, cmd->recurse, false); break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ - address = ATExecDropNotNull(rel, cmd->name, lockmode); + address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode); break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ - address = ATExecSetNotNull(tab, rel, cmd->name, lockmode); - break; - case AT_CheckNotNull: /* check column is already marked NOT NULL */ - ATExecCheckNotNull(tab, rel, cmd->name, lockmode); + address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name, + cmd->recurse, false, lockmode); break; case AT_SetExpression: address = ATExecSetExpression(tab, rel, cmd->name, cmd->def, lockmode); @@ -5368,7 +5408,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, - cmd->recurse, false, + cmd->recurse, cmd->missing_ok, lockmode); break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ @@ -5631,21 +5671,10 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, */ switch (cmd2->subtype) { - case AT_SetNotNull: - /* Need command-specific recursion decision */ - ATPrepSetNotNull(wqueue, rel, cmd2, - recurse, false, - lockmode, context); - pass = AT_PASS_COL_ATTRS; - break; case AT_AddIndex: - /* This command never recurses */ - /* No command-specific prep needed */ pass = AT_PASS_ADD_INDEX; break; case AT_AddIndexConstraint: - /* This command never recurses */ - /* No command-specific prep needed */ pass = AT_PASS_ADD_INDEXCONSTR; break; case AT_AddConstraint: @@ -5654,6 +5683,9 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, cmd2->recurse = true; switch (castNode(Constraint, cmd2->def)->contype) { + case CONSTR_NOTNULL: + pass = AT_PASS_COL_ATTRS; + break; case CONSTR_PRIMARY: case CONSTR_UNIQUE: case CONSTR_EXCLUSION: @@ -6093,8 +6125,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) /* * If we are rebuilding the tuples OR if we added any new but not * verified not-null constraints, check all not-null constraints. This - * is a bit of overkill but it minimizes risk of bugs, and - * heap_attisnull is a pretty cheap test anyway. + * is a bit of overkill but it minimizes risk of bugs. */ for (i = 0; i < newTupDesc->natts; i++) { @@ -6314,6 +6345,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) RelationGetRelationName(oldrel)), errtableconstraint(oldrel, con->name))); break; + case CONSTR_NOTNULL: case CONSTR_FOREIGN: /* Nothing to do here */ break; @@ -6427,8 +6459,6 @@ alter_table_type_to_string(AlterTableType cmdtype) return "ALTER COLUMN ... SET EXPRESSION"; case AT_DropExpression: return "ALTER COLUMN ... DROP EXPRESSION"; - case AT_CheckNotNull: - return NULL; /* not real grammar */ case AT_SetStatistics: return "ALTER COLUMN ... SET STATISTICS"; case AT_SetOptions: @@ -7524,13 +7554,14 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid) * nullable, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) +ATExecDropNotNull(Relation rel, const char *colName, bool recurse, + LOCKMODE lockmode) { HeapTuple tuple; + HeapTuple conTup; Form_pg_attribute attTup; AttrNumber attnum; Relation attr_rel; - List *indexoidlist; ObjectAddress address; /* @@ -7546,6 +7577,15 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) colName, RelationGetRelationName(rel)))); attTup = (Form_pg_attribute) GETSTRUCT(tuple); attnum = attTup->attnum; + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); + + /* If the column is already nullable there's nothing to do. */ + if (!attTup->attnotnull) + { + table_close(attr_rel, RowExclusiveLock); + return InvalidObjectAddress; + } /* Prevent them from altering a system attribute */ if (attnum <= 0) @@ -7561,60 +7601,8 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) colName, RelationGetRelationName(rel)))); /* - * Check that the attribute is not in a primary key or in an index used as - * a replica identity. - * - * Note: we'll throw error even if the pkey index is not valid. + * If rel is partition, shouldn't drop NOT NULL if parent has the same. */ - - /* Loop over all indexes on the relation */ - indexoidlist = RelationGetIndexList(rel); - - foreach_oid(indexoid, indexoidlist) - { - HeapTuple indexTuple; - Form_pg_index indexStruct; - - indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); - if (!HeapTupleIsValid(indexTuple)) - elog(ERROR, "cache lookup failed for index %u", indexoid); - indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - - /* - * If the index is not a primary key or an index used as replica - * identity, skip the check. - */ - if (indexStruct->indisprimary || indexStruct->indisreplident) - { - /* - * Loop over each attribute in the primary key or the index used - * as replica identity and see if it matches the to-be-altered - * attribute. - */ - for (int i = 0; i < indexStruct->indnkeyatts; i++) - { - if (indexStruct->indkey.values[i] == attnum) - { - if (indexStruct->indisprimary) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in a primary key", - colName))); - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in index used as replica identity", - colName))); - } - } - } - - ReleaseSysCache(indexTuple); - } - - list_free(indexoidlist); - - /* If rel is partition, shouldn't drop NOT NULL if parent has the same */ if (rel->rd_rel->relispartition) { Oid parentId = get_partition_parent(RelationGetRelid(rel), false); @@ -7632,19 +7620,18 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) } /* - * Okay, actually perform the catalog change ... if needed + * Find the constraint that makes this column NOT NULL, and drop it. + * dropconstraint_internal() resets attnotnull. */ - if (attTup->attnotnull) - { - attTup->attnotnull = false; + conTup = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum); + if (conTup == NULL) + elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation \"%s\"", + colName, RelationGetRelationName(rel)); - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); - } - else - address = InvalidObjectAddress; + /* The normal case: we have a pg_constraint row, remove it */ + dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false, + false, lockmode); + heap_freetuple(conTup); InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); @@ -7655,104 +7642,105 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) } /* - * ALTER TABLE ALTER COLUMN SET NOT NULL + * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3 + * to verify it. + * + * When called to alter an existing table, 'wqueue' must be given so that we + * can queue a check that existing tuples pass the constraint. When called + * from table creation, 'wqueue' should be passed as NULL. */ - static void -ATPrepSetNotNull(List **wqueue, Relation rel, - AlterTableCmd *cmd, bool recurse, bool recursing, - LOCKMODE lockmode, AlterTableUtilityContext *context) +set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, + LOCKMODE lockmode) { + Form_pg_attribute attr; + + CheckAlterTableIsSafe(rel); + /* - * If we're already recursing, there's nothing to do; the topmost - * invocation of ATSimpleRecursion already visited all children. + * Exit quickly by testing attnotnull from the tupledesc's copy of the + * attribute. */ - if (recursing) + attr = TupleDescAttr(RelationGetDescr(rel), attnum - 1); + if (attr->attisdropped) return; - /* - * If the target column is already marked NOT NULL, we can skip recursing - * to children, because their columns should already be marked NOT NULL as - * well. But there's no point in checking here unless the relation has - * some children; else we can just wait till execution to check. (If it - * does have children, however, this can save taking per-child locks - * unnecessarily. This greatly improves concurrency in some parallel - * restore scenarios.) - * - * Unfortunately, we can only apply this optimization to partitioned - * tables, because traditional inheritance doesn't enforce that child - * columns be NOT NULL when their parent is. (That's a bug that should - * get fixed someday.) - */ - if (rel->rd_rel->relhassubclass && - rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (!attr->attnotnull) { + Relation attr_rel; HeapTuple tuple; - bool attnotnull; - tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name); + attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - /* Might as well throw the error now, if name is bad */ + tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); if (!HeapTupleIsValid(tuple)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - cmd->name, RelationGetRelationName(rel)))); + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attnum, RelationGetRelid(rel)); - attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull; - ReleaseSysCache(tuple); - if (attnotnull) - return; - } + attr = (Form_pg_attribute) GETSTRUCT(tuple); + Assert(!attr->attnotnull); + attr->attnotnull = true; + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - /* - * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table, - * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use - * normal recursion logic. - */ - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - !recurse) - { - AlterTableCmd *newcmd = makeNode(AlterTableCmd); + /* + * If the nullness isn't already proven by validated constraints, have + * ALTER TABLE phase 3 test for it. + */ + if (wqueue && !NotNullImpliedByRelConstraints(rel, attr)) + { + AlteredTableInfo *tab; + + tab = ATGetQueueEntry(wqueue, rel); + tab->verify_new_notnull = true; + } + + CommandCounterIncrement(); - newcmd->subtype = AT_CheckNotNull; - newcmd->name = pstrdup(cmd->name); - ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context); + table_close(attr_rel, RowExclusiveLock); + heap_freetuple(tuple); } - else - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); } /* - * Return the address of the modified column. If the column was already NOT - * NULL, InvalidObjectAddress is returned. + * ALTER TABLE ALTER COLUMN SET NOT NULL + * + * Add a not-null constraint to a single table and its children. Returns + * the address of the constraint added to the parent relation, if one gets + * added, or InvalidObjectAddress otherwise. + * + * We must recurse to child tables during execution, rather than using + * ALTER TABLE's normal prep-time recursion. */ static ObjectAddress -ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode) +ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, + bool recurse, bool recursing, LOCKMODE lockmode) { HeapTuple tuple; - Form_pg_attribute attTup; AttrNumber attnum; - Relation attr_rel; ObjectAddress address; + Constraint *constraint; + CookedConstraint *ccon; + List *cooked; + bool is_no_inherit = false; - /* - * lookup the attribute - */ - attr_rel = table_open(AttributeRelationId, RowExclusiveLock); + /* Guard against stack overflow due to overly deep inheritance tree. */ + check_stack_depth(); - tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + { + ATSimplePermissions(AT_AddConstraint, rel, + ATT_PARTITIONED_TABLE | ATT_TABLE | ATT_FOREIGN_TABLE); + Assert(conName != NULL); + } - if (!HeapTupleIsValid(tuple)) + attnum = get_attnum(RelationGetRelid(rel), colName); + if (attnum == InvalidAttrNumber) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", colName, RelationGetRelationName(rel)))); - attTup = (Form_pg_attribute) GETSTRUCT(tuple); - attnum = attTup->attnum; - /* Prevent them from altering a system attribute */ if (attnum <= 0) ereport(ERROR, @@ -7760,79 +7748,130 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, errmsg("cannot alter system column \"%s\"", colName))); - /* - * Okay, actually perform the catalog change ... if needed - */ - if (!attTup->attnotnull) + /* See if there's already a constraint */ + tuple = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum); + if (HeapTupleIsValid(tuple)) { - attTup->attnotnull = true; + Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple); + bool changed = false; - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); + /* + * Don't let a NO INHERIT constraint be changed into inherit. + */ + if (conForm->connoinherit && recurse) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot change NO INHERIT status of NOT NULL constraint \"%s\" on relation \"%s\"", + NameStr(conForm->conname), + RelationGetRelationName(rel))); /* - * Ordinarily phase 3 must ensure that no NULLs exist in columns that - * are set NOT NULL; however, if we can find a constraint which proves - * this then we can skip that. We needn't bother looking if we've - * already found that we must verify some other not-null constraint. + * If we find an appropriate constraint, we're almost done, but just + * need to change some properties on it: if we're recursing, increment + * coninhcount; if not, set conislocal if not already set. */ - if (!tab->verify_new_notnull && !NotNullImpliedByRelConstraints(rel, attTup)) + if (recursing) { - /* Tell Phase 3 it needs to test the constraint */ - tab->verify_new_notnull = true; + if (pg_add_s16_overflow(conForm->coninhcount, 1, + &conForm->coninhcount)) + ereport(ERROR, + errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("too many inheritance parents")); + changed = true; + } + else if (!conForm->conislocal) + { + conForm->conislocal = true; + changed = true; } - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); + if (changed) + { + Relation constr_rel; + + constr_rel = table_open(ConstraintRelationId, RowExclusiveLock); + + CatalogTupleUpdate(constr_rel, &tuple->t_self, tuple); + ObjectAddressSet(address, ConstraintRelationId, conForm->oid); + table_close(constr_rel, RowExclusiveLock); + } + + if (changed) + return address; + else + return InvalidObjectAddress; + } + + /* + * If we're asked not to recurse, and children exist, raise an error for + * partitioned tables. For inheritance, we act as if NO INHERIT had been + * specified. + */ + if (!recurse && + find_inheritance_children(RelationGetRelid(rel), + NoLock) != NIL) + { + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be added to child tables too"), + errhint("Do not specify the ONLY keyword.")); + else + is_no_inherit = true; } - else - address = InvalidObjectAddress; + + /* + * No constraint exists; we must add one. First determine a name to use, + * if we haven't already. + */ + if (!recursing) + { + Assert(conName == NULL); + conName = ChooseConstraintName(RelationGetRelationName(rel), + colName, "not_null", + RelationGetNamespace(rel), + NIL); + } + + constraint = makeNotNullConstraint(makeString(colName)); + constraint->is_no_inherit = is_no_inherit; + constraint->conname = conName; + + /* and do it */ + cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint), + false, !recursing, false, NULL); + ccon = linitial(cooked); + ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); - table_close(attr_rel, RowExclusiveLock); + /* Mark pg_attribute.attnotnull for the column */ + set_attnotnull(wqueue, rel, attnum, lockmode); - return address; -} + /* + * Recurse to propagate the constraint to children that don't have one. + */ + if (recurse) + { + List *children; -/* - * ALTER TABLE ALTER COLUMN CHECK NOT NULL - * - * This doesn't exist in the grammar, but we generate AT_CheckNotNull - * commands against the partitions of a partitioned table if the user - * writes ALTER TABLE ONLY ... SET NOT NULL on the partitioned table, - * or tries to create a primary key on it (which internally creates - * AT_SetNotNull on the partitioned table). Such a command doesn't - * allow us to actually modify any partition, but we want to let it - * go through if the partitions are already properly marked. - * - * In future, this might need to adjust the child table's state, likely - * by incrementing an inheritance count for the attnotnull constraint. - * For now we need only check for the presence of the flag. - */ -static void -ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode) -{ - HeapTuple tuple; + children = find_inheritance_children(RelationGetRelid(rel), + lockmode); - tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + foreach_oid(childoid, children) + { + Relation childrel = table_open(childoid, NoLock); - if (!HeapTupleIsValid(tuple)) - ereport(ERROR, - errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel))); + CommandCounterIncrement(); - if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("constraint must be added to child tables too"), - errdetail("Column \"%s\" of relation \"%s\" is not already NOT NULL.", - colName, RelationGetRelationName(rel)), - errhint("Do not specify the ONLY keyword."))); + ATExecSetNotNull(wqueue, childrel, conName, colName, + recurse, true, lockmode); + table_close(childrel, NoLock); + } + } - ReleaseSysCache(tuple); + return address; } /* @@ -9140,6 +9179,71 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, } /* + * Prepare to add a primary key on table, by adding not-null constraints + * on all columns. + */ +static void +ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, + bool recurse, LOCKMODE lockmode, + AlterTableUtilityContext *context) +{ + ListCell *lc; + Constraint *pkconstr; + + pkconstr = castNode(Constraint, cmd->def); + if (pkconstr->contype != CONSTR_PRIMARY) + return; + + /* + * If not recursing, we must ensure that all children have a NOT NULL + * constraint on the columns, and error out if not. + */ + if (!recurse) + { + List *children; + + children = find_inheritance_children(RelationGetRelid(rel), + lockmode); + foreach_oid(childrelid, children) + { + foreach(lc, pkconstr->keys) + { + HeapTuple tup; + Form_pg_attribute attrForm; + char *attname = strVal(lfirst(lc)); + + tup = SearchSysCacheAttName(childrelid, attname); + if (!tup) + elog(ERROR, "cache lookup failed for attribute %s of relation %u", + attname, childrelid); + attrForm = (Form_pg_attribute) GETSTRUCT(tup); + if (!attrForm->attnotnull) + ereport(ERROR, + errmsg("column \"%s\" of table \"%s\" is not marked NOT NULL", + attname, get_rel_name(childrelid))); + ReleaseSysCache(tup); + } + } + } + + /* Insert not-null constraints in the queue for the PK columns */ + foreach(lc, pkconstr->keys) + { + AlterTableCmd *newcmd; + Constraint *nnconstr; + + nnconstr = makeNotNullConstraint(lfirst(lc)); + + newcmd = makeNode(AlterTableCmd); + newcmd->subtype = AT_AddConstraint; + newcmd->recurse = true; + newcmd->def = (Node *) nnconstr; + + ATPrepCmd(wqueue, rel, newcmd, true, false, lockmode, context); + } +} + +/* * ALTER TABLE ADD INDEX * * There is no such command in the grammar, but parse_utilcmd.c converts @@ -9334,17 +9438,18 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Assert(IsA(newConstraint, Constraint)); /* - * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes - * arriving here (see the preprocessing done in parse_utilcmd.c). Use a - * switch anyway to make it easier to add more code later. + * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and + * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in + * parse_utilcmd.c). */ switch (newConstraint->contype) { case CONSTR_CHECK: + case CONSTR_NOTNULL: address = - ATAddCheckConstraint(wqueue, tab, rel, - newConstraint, recurse, false, is_readd, - lockmode); + ATAddCheckNNConstraint(wqueue, tab, rel, + newConstraint, recurse, false, is_readd, + lockmode); break; case CONSTR_FOREIGN: @@ -9425,9 +9530,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) } /* - * Add a check constraint to a single table and its children. Returns the - * address of the constraint added to the parent relation, if one gets added, - * or InvalidObjectAddress otherwise. + * Add a check or not-null constraint to a single table and its children. + * Returns the address of the constraint added to the parent relation, + * if one gets added, or InvalidObjectAddress otherwise. * * Subroutine for ATExecAddConstraint. * @@ -9440,9 +9545,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) * the parent table and pass that down. */ static ObjectAddress -ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, - Constraint *constr, bool recurse, bool recursing, - bool is_readd, LOCKMODE lockmode) +ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Constraint *constr, bool recurse, bool recursing, + bool is_readd, LOCKMODE lockmode) { List *newcons; ListCell *lcon; @@ -9450,6 +9555,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ListCell *child; ObjectAddress address = InvalidObjectAddress; + /* Guard against stack overflow due to overly deep inheritance tree. */ + check_stack_depth(); + /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) ATSimplePermissions(AT_AddConstraint, rel, @@ -9481,7 +9589,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - if (!ccon->skip_validation) + if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL) { NewConstraint *newcon; @@ -9497,11 +9605,18 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (constr->conname == NULL) constr->conname = ccon->name; + /* + * If adding a not-null constraint, set the pg_attribute flag and tell + * phase 3 to verify existing rows, if needed. + */ + if (constr->contype == CONSTR_NOTNULL) + set_attnotnull(wqueue, rel, ccon->attnum, lockmode); + ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); } /* At this point we must have a locked-down name to use */ - Assert(constr->conname != NULL); + Assert(newcons == NIL || constr->conname != NULL); /* Advance command counter in case same table is visited multiple times */ CommandCounterIncrement(); @@ -9531,7 +9646,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, /* * Check if ONLY was specified with ALTER TABLE. If so, allow the - * constraint creation only if there are no children currently. Error out + * constraint creation only if there are no children currently. Error out * otherwise. */ if (!recurse && children != NIL) @@ -9539,6 +9654,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("constraint must be added to child tables too"))); + /* + * Recurse to create the constraint on each child. + */ foreach(child, children) { Oid childrelid = lfirst_oid(child); @@ -9552,9 +9670,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, /* Find or create work queue entry for this table */ childtab = ATGetQueueEntry(wqueue, childrel); - /* Recurse to child */ - ATAddCheckConstraint(wqueue, childtab, childrel, - constr, recurse, true, is_readd, lockmode); + /* Recurse to this child */ + ATAddCheckNNConstraint(wqueue, childtab, childrel, + constr, recurse, true, is_readd, lockmode); table_close(childrel, NoLock); } @@ -12667,24 +12785,14 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, */ static void ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, - bool recurse, bool recursing, + DropBehavior behavior, bool recurse, bool missing_ok, LOCKMODE lockmode) { - List *children; Relation conrel; - Form_pg_constraint con; SysScanDesc scan; ScanKeyData skey[3]; HeapTuple tuple; bool found = false; - bool is_no_inherit_constraint = false; - char contype; - - /* At top level, permission check was done in ATPrepCmd, else do it */ - if (recursing) - ATSimplePermissions(AT_DropConstraint, rel, - ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE); conrel = table_open(ConstraintRelationId, RowExclusiveLock); @@ -12709,80 +12817,190 @@ ATExecDropConstraint(Relation rel, const char *constrName, /* There can be at most one matching row */ if (HeapTupleIsValid(tuple = systable_getnext(scan))) { - ObjectAddress conobj; + dropconstraint_internal(rel, tuple, behavior, recurse, false, + missing_ok, lockmode); + found = true; + } - con = (Form_pg_constraint) GETSTRUCT(tuple); + systable_endscan(scan); - /* Don't drop inherited constraints */ - if (con->coninhcount > 0 && !recursing) + if (!found) + { + if (!missing_ok) ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", - constrName, RelationGetRelationName(rel)))); + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel))); + else + ereport(NOTICE, + errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", + constrName, RelationGetRelationName(rel))); + } - is_no_inherit_constraint = con->connoinherit; - contype = con->contype; + table_close(conrel, RowExclusiveLock); +} + +/* + * Remove a constraint, using its pg_constraint tuple + * + * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN + * DROP NOT NULL. + * + * Returns the address of the constraint being removed. + */ +static ObjectAddress +dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior, + bool recurse, bool recursing, bool missing_ok, + LOCKMODE lockmode) +{ + Relation conrel; + Form_pg_constraint con; + ObjectAddress conobj; + List *children; + bool is_no_inherit_constraint = false; + char *constrName; + char *colname = NULL; + + /* Guard against stack overflow due to overly deep inheritance tree. */ + check_stack_depth(); + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(AT_DropConstraint, rel, + ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE); + + conrel = table_open(ConstraintRelationId, RowExclusiveLock); + + con = (Form_pg_constraint) GETSTRUCT(constraintTup); + constrName = NameStr(con->conname); + + /* Don't allow drop of inherited constraints */ + if (con->coninhcount > 0 && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); + + /* + * Reset pg_constraint.attnotnull, if this is a not-null constraint. + * + * While doing that, we're in a good position to disallow dropping a not- + * null constraint underneath a primary key, a replica identity index, or + * a generated identity column. + */ + if (con->contype == CONSTRAINT_NOTNULL) + { + Relation attrel = table_open(AttributeRelationId, RowExclusiveLock); + AttrNumber attnum = extractNotNullColumn(constraintTup); + Bitmapset *pkattrs; + Bitmapset *irattrs; + HeapTuple atttup; + Form_pg_attribute attForm; + + /* save column name for recursion step */ + colname = get_attname(RelationGetRelid(rel), attnum, false); /* - * If it's a foreign-key constraint, we'd better lock the referenced - * table and check that that's not in use, just as we've already done - * for the constrained table (else we might, eg, be dropping a trigger - * that has unfired events). But we can/must skip that in the - * self-referential case. + * Disallow if it's in the primary key. For partitioned tables we + * cannot rely solely on RelationGetIndexAttrBitmap, because it'll + * return NULL if the primary key is invalid; but we still need to + * protect not-null constraints under such a constraint, so check the + * slow way. */ - if (contype == CONSTRAINT_FOREIGN && - con->confrelid != RelationGetRelid(rel)) + pkattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY); + + if (pkattrs == NULL && + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - Relation frel; + Oid pkindex = RelationGetPrimaryKeyIndex(rel, true); + + if (OidIsValid(pkindex)) + { + Relation pk = relation_open(pkindex, AccessShareLock); + + pkattrs = NULL; + for (int i = 0; i < pk->rd_index->indnkeyatts; i++) + pkattrs = bms_add_member(pkattrs, pk->rd_index->indkey.values[i]); - /* Must match lock taken by RemoveTriggerById: */ - frel = table_open(con->confrelid, AccessExclusiveLock); - CheckAlterTableIsSafe(frel); - table_close(frel, NoLock); + relation_close(pk, AccessShareLock); + } } - /* - * Perform the actual constraint deletion - */ - conobj.classId = ConstraintRelationId; - conobj.objectId = con->oid; - conobj.objectSubId = 0; + if (pkattrs && + bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, pkattrs)) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in a primary key", + get_attname(RelationGetRelid(rel), attnum, false))); + + /* Disallow if it's in the replica identity */ + irattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY); + if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, irattrs)) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in index used as replica identity", + get_attname(RelationGetRelid(rel), attnum, false))); + + /* Disallow if it's a GENERATED AS IDENTITY column */ + atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); + if (!HeapTupleIsValid(atttup)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attnum, RelationGetRelid(rel)); + attForm = (Form_pg_attribute) GETSTRUCT(atttup); + if (attForm->attidentity != '\0') + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("column \"%s\" of relation \"%s\" is an identity column", + get_attname(RelationGetRelid(rel), attnum, + false), + RelationGetRelationName(rel))); - performDeletion(&conobj, behavior, 0); + /* All good -- reset attnotnull if needed */ + if (attForm->attnotnull) + { + attForm->attnotnull = false; + CatalogTupleUpdate(attrel, &atttup->t_self, atttup); + } - found = true; + table_close(attrel, RowExclusiveLock); } - systable_endscan(scan); + is_no_inherit_constraint = con->connoinherit; - if (!found) + /* + * If it's a foreign-key constraint, we'd better lock the referenced table + * and check that that's not in use, just as we've already done for the + * constrained table (else we might, eg, be dropping a trigger that has + * unfired events). But we can/must skip that in the self-referential + * case. + */ + if (con->contype == CONSTRAINT_FOREIGN && + con->confrelid != RelationGetRelid(rel)) { - if (!missing_ok) - { - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, RelationGetRelationName(rel)))); - } - else - { - ereport(NOTICE, - (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", - constrName, RelationGetRelationName(rel)))); - table_close(conrel, RowExclusiveLock); - return; - } + Relation frel; + + /* Must match lock taken by RemoveTriggerById: */ + frel = table_open(con->confrelid, AccessExclusiveLock); + CheckAlterTableIsSafe(frel); + table_close(frel, NoLock); } /* - * For partitioned tables, non-CHECK inherited constraints are dropped via - * the dependency mechanism, so we're done here. + * Perform the actual constraint deletion */ - if (contype != CONSTRAINT_CHECK && + ObjectAddressSet(conobj, ConstraintRelationId, con->oid); + performDeletion(&conobj, behavior, 0); + + /* + * For partitioned tables, non-CHECK, non-NOT-NULL inherited constraints + * are dropped via the dependency mechanism, so we're done here. + */ + if (con->contype != CONSTRAINT_CHECK && + con->contype != CONSTRAINT_NOTNULL && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { table_close(conrel, RowExclusiveLock); - return; + return conobj; } /* @@ -12798,48 +13016,65 @@ ATExecDropConstraint(Relation rel, const char *constrName, foreach_oid(childrelid, children) { Relation childrel; - HeapTuple copy_tuple; + HeapTuple tuple; + Form_pg_constraint childcon; /* find_inheritance_children already got lock */ childrel = table_open(childrelid, NoLock); CheckAlterTableIsSafe(childrel); - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - ScanKeyInit(&skey[1], - Anum_pg_constraint_contypid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(InvalidOid)); - ScanKeyInit(&skey[2], - Anum_pg_constraint_conname, - BTEqualStrategyNumber, F_NAMEEQ, - CStringGetDatum(constrName)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 3, skey); - - /* There can be at most one matching row */ - if (!HeapTupleIsValid(tuple = systable_getnext(scan))) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, - RelationGetRelationName(childrel)))); - - copy_tuple = heap_copytuple(tuple); - - systable_endscan(scan); + /* + * We search for not-null constraints by column name, and others by + * constraint name. + */ + if (con->contype == CONSTRAINT_NOTNULL) + { + tuple = findNotNullConstraint(childrelid, colname); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation %u", + colname, RelationGetRelid(childrel)); + } + else + { + SysScanDesc scan; + ScanKeyData skey[3]; + + ScanKeyInit(&skey[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + ScanKeyInit(&skey[1], + Anum_pg_constraint_contypid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + ScanKeyInit(&skey[2], + Anum_pg_constraint_conname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(constrName)); + scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, + true, NULL, 3, skey); + /* There can only be one, so no need to loop */ + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); + tuple = heap_copytuple(tuple); + systable_endscan(scan); + } - con = (Form_pg_constraint) GETSTRUCT(copy_tuple); + childcon = (Form_pg_constraint) GETSTRUCT(tuple); - /* Right now only CHECK constraints can be inherited */ - if (con->contype != CONSTRAINT_CHECK) - elog(ERROR, "inherited constraint is not a CHECK constraint"); + /* Right now only CHECK and not-null constraints can be inherited */ + if (childcon->contype != CONSTRAINT_CHECK && + childcon->contype != CONSTRAINT_NOTNULL) + elog(ERROR, "inherited constraint is not a CHECK or not-null constraint"); - if (con->coninhcount <= 0) /* shouldn't happen */ + if (childcon->coninhcount <= 0) /* shouldn't happen */ elog(ERROR, "relation %u has non-inherited constraint \"%s\"", - childrelid, constrName); + childrelid, NameStr(childcon->conname)); if (recurse) { @@ -12847,18 +13082,18 @@ ATExecDropConstraint(Relation rel, const char *constrName, * If the child constraint has other definition sources, just * decrement its inheritance count; if not, recurse to delete it. */ - if (con->coninhcount == 1 && !con->conislocal) + if (childcon->coninhcount == 1 && !childcon->conislocal) { /* Time to delete this child constraint, too */ - ATExecDropConstraint(childrel, constrName, behavior, - true, true, - false, lockmode); + dropconstraint_internal(childrel, tuple, behavior, + recurse, true, missing_ok, + lockmode); } else { /* Child constraint must survive my deletion */ - con->coninhcount--; - CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); + childcon->coninhcount--; + CatalogTupleUpdate(conrel, &tuple->t_self, tuple); /* Make update visible */ CommandCounterIncrement(); @@ -12867,25 +13102,29 @@ ATExecDropConstraint(Relation rel, const char *constrName, else { /* - * If we were told to drop ONLY in this table (no recursion), we - * need to mark the inheritors' constraints as locally defined - * rather than inherited. + * If we were told to drop ONLY in this table (no recursion) and + * there are no further parents for this constraint, we need to + * mark the inheritors' constraints as locally defined rather than + * inherited. */ - con->coninhcount--; - con->conislocal = true; + childcon->coninhcount--; + if (childcon->coninhcount == 0) + childcon->conislocal = true; - CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); + CatalogTupleUpdate(conrel, &tuple->t_self, tuple); /* Make update visible */ CommandCounterIncrement(); } - heap_freetuple(copy_tuple); + heap_freetuple(tuple); table_close(childrel, NoLock); } table_close(conrel, RowExclusiveLock); + + return conobj; } /* @@ -13834,10 +14073,26 @@ RememberConstraintForRebuilding(Oid conoid, AlteredTableInfo *tab) char *defstring = pg_get_constraintdef_command(conoid); Oid indoid; - tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids, - conoid); - tab->changedConstraintDefs = lappend(tab->changedConstraintDefs, - defstring); + /* + * It is critical to create not-null constraints ahead of primary key + * indexes; otherwise, the not-null constraint would be created by the + * primary key, and the constraint name would be wrong. + */ + if (get_constraint_type(conoid) == CONSTRAINT_NOTNULL) + { + tab->changedConstraintOids = lcons_oid(conoid, + tab->changedConstraintOids); + tab->changedConstraintDefs = lcons(defstring, + tab->changedConstraintDefs); + } + else + { + + tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids, + conoid); + tab->changedConstraintDefs = lappend(tab->changedConstraintDefs, + defstring); + } /* * For the index of a constraint, if any, remember if it is used for @@ -14000,9 +14255,10 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) /* * If the constraint is inherited (only), we don't want to inject a - * new definition here; it'll get recreated when ATAddCheckConstraint - * recurses from adding the parent table's constraint. But we had to - * carry the info this far so that we can drop the constraint below. + * new definition here; it'll get recreated when + * ATAddCheckNNConstraint recurses from adding the parent table's + * constraint. But we had to carry the info this far so that we can + * drop the constraint below. */ if (!conislocal) continue; @@ -14241,23 +14497,21 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, tab->subcmds[AT_PASS_OLD_CONSTR] = lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd); - /* recreate any comment on the constraint */ - RebuildConstraintComment(tab, - AT_PASS_OLD_CONSTR, - oldId, - rel, - NIL, - con->conname); - } - else if (cmd->subtype == AT_SetNotNull) - { /* - * The parser will create AT_SetNotNull subcommands for - * columns of PRIMARY KEY indexes/constraints, but we need - * not do anything with them here, because the columns' - * NOT NULL marks will already have been propagated into - * the new table definition. + * Recreate any comment on the constraint. If we have + * recreated a primary key, then transformTableConstraint + * has added an unnamed not-null constraint here; skip + * this in that case. */ + if (con->conname) + RebuildConstraintComment(tab, + AT_PASS_OLD_CONSTR, + oldId, + rel, + NIL, + con->conname); + else + Assert(con->contype == CONSTR_NOTNULL); } else elog(ERROR, "unexpected statement subtype: %d", @@ -16012,14 +16266,24 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel, bool ispart RelationGetRelationName(child_rel), parent_attname))); /* - * Check child doesn't discard NOT NULL property. (Other - * constraints are checked elsewhere.) + * If the parent has a not-null constraint that's not NO INHERIT, + * make sure the child has one too. + * + * Other constraints are checked elsewhere. */ if (parent_att->attnotnull && !child_att->attnotnull) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" in child table must be marked NOT NULL", - parent_attname))); + { + HeapTuple contup; + + contup = findNotNullConstraintAttnum(RelationGetRelid(parent_rel), + parent_att->attnum); + if (HeapTupleIsValid(contup) && + !((Form_pg_constraint) GETSTRUCT(contup))->connoinherit) + ereport(ERROR, + errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" in child table \"%s\" must be marked NOT NULL", + parent_attname, RelationGetRelationName(child_rel))); + } /* * Child column must be generated if and only if parent column is. @@ -16101,6 +16365,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ScanKeyData parent_key; HeapTuple parent_tuple; Oid parent_relid = RelationGetRelid(parent_rel); + AttrMap *attmap; constraintrel = table_open(ConstraintRelationId, RowExclusiveLock); @@ -16112,21 +16377,32 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) parent_scan = systable_beginscan(constraintrel, ConstraintRelidTypidNameIndexId, true, NULL, 1, &parent_key); + attmap = build_attrmap_by_name(RelationGetDescr(parent_rel), + RelationGetDescr(child_rel), + true); + while (HeapTupleIsValid(parent_tuple = systable_getnext(parent_scan))) { Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(parent_tuple); SysScanDesc child_scan; ScanKeyData child_key; HeapTuple child_tuple; + AttrNumber parent_attno; bool found = false; - if (parent_con->contype != CONSTRAINT_CHECK) + if (parent_con->contype != CONSTRAINT_CHECK && + parent_con->contype != CONSTRAINT_NOTNULL) continue; /* if the parent's constraint is marked NO INHERIT, it's not inherited */ if (parent_con->connoinherit) continue; + if (parent_con->contype == CONSTRAINT_NOTNULL) + parent_attno = extractNotNullColumn(parent_tuple); + else + parent_attno = InvalidAttrNumber; + /* Search for a child constraint matching this one */ ScanKeyInit(&child_key, Anum_pg_constraint_conrelid, @@ -16140,20 +16416,46 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); HeapTuple child_copy; - if (child_con->contype != CONSTRAINT_CHECK) + if (child_con->contype != parent_con->contype) continue; - if (strcmp(NameStr(parent_con->conname), - NameStr(child_con->conname)) != 0) - continue; + /* + * CHECK constraint are matched by constraint name, NOT NULL ones + * by attribute number. + */ + if (child_con->contype == CONSTRAINT_CHECK) + { + if (strcmp(NameStr(parent_con->conname), + NameStr(child_con->conname)) != 0) + continue; + } + else if (child_con->contype == CONSTRAINT_NOTNULL) + { + Form_pg_attribute parent_attr; + Form_pg_attribute child_attr; + AttrNumber child_attno; + + parent_attr = TupleDescAttr(parent_rel->rd_att, parent_attno - 1); + child_attno = extractNotNullColumn(child_tuple); + if (parent_attno != attmap->attnums[child_attno - 1]) + continue; - if (!constraints_equivalent(parent_tuple, child_tuple, RelationGetDescr(constraintrel))) + child_attr = TupleDescAttr(child_rel->rd_att, child_attno - 1); + /* there shouldn't be constraints on dropped columns */ + if (parent_attr->attisdropped || child_attr->attisdropped) + elog(ERROR, "found not-null constraint on dropped columns"); + } + + if (child_con->contype == CONSTRAINT_CHECK && + !constraints_equivalent(parent_tuple, child_tuple, RelationGetDescr(constraintrel))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table \"%s\" has different definition for check constraint \"%s\"", RelationGetRelationName(child_rel), NameStr(parent_con->conname)))); - /* If the child constraint is "no inherit" then cannot merge */ + /* + * If the child constraint is "no inherit" then cannot merge + */ if (child_con->connoinherit) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), @@ -16204,10 +16506,21 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) systable_endscan(child_scan); if (!found) + { + if (parent_con->contype == CONSTRAINT_NOTNULL) + ereport(ERROR, + errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" in child table \"%s\" must be marked NOT NULL", + get_attname(parent_relid, + extractNotNullColumn(parent_tuple), + false), + RelationGetRelationName(child_rel))); + ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table is missing constraint \"%s\"", NameStr(parent_con->conname)))); + } } systable_endscan(parent_scan); @@ -16352,7 +16665,9 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) ScanKeyData key[3]; HeapTuple attributeTuple, constraintTuple; + AttrMap *attmap; List *connames; + List *nncolumns; bool found; bool is_partitioning; @@ -16417,11 +16732,18 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) table_close(catalogRelation, RowExclusiveLock); /* - * Likewise, find inherited check constraints and disinherit them. To do - * this, we first need a list of the names of the parent's check - * constraints. (We cheat a bit by only checking for name matches, + * Likewise, find inherited check and not-null constraints and disinherit + * them. To do this, we first need a list of the names of the parent's + * check constraints. (We cheat a bit by only checking for name matches, * assuming that the expressions will match.) + * + * For NOT NULL columns, we store column numbers to match, mapping them in + * to the child rel's attribute numbers. */ + attmap = build_attrmap_by_name(RelationGetDescr(child_rel), + RelationGetDescr(parent_rel), + false); + catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock); ScanKeyInit(&key[0], Anum_pg_constraint_conrelid, @@ -16431,18 +16753,28 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) true, NULL, 1, key); connames = NIL; + nncolumns = NIL; while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); + if (con->connoinherit) + continue; + if (con->contype == CONSTRAINT_CHECK) connames = lappend(connames, pstrdup(NameStr(con->conname))); + if (con->contype == CONSTRAINT_NOTNULL) + { + AttrNumber parent_attno = extractNotNullColumn(constraintTuple); + + nncolumns = lappend_int(nncolumns, attmap->attnums[parent_attno - 1]); + } } systable_endscan(scan); - /* Now scan the child's constraints */ + /* Now scan the child's constraints to find matches */ ScanKeyInit(&key[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, @@ -16453,20 +16785,41 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - bool match; - - if (con->contype != CONSTRAINT_CHECK) - continue; + bool match = false; - match = false; - foreach_ptr(char, chkname, connames) + /* + * Match CHECK constraints by name, not-null constraints by column + * number, and ignore all others. + */ + if (con->contype == CONSTRAINT_CHECK) { - if (strcmp(NameStr(con->conname), chkname) == 0) + foreach_ptr(char, chkname, connames) { - match = true; - break; + if (con->contype == CONSTRAINT_CHECK && + strcmp(NameStr(con->conname), chkname) == 0) + { + match = true; + connames = foreach_delete_current(connames, chkname); + break; + } + } + } + else if (con->contype == CONSTRAINT_NOTNULL) + { + AttrNumber child_attno = extractNotNullColumn(constraintTuple); + + foreach_int(prevattno, nncolumns) + { + if (prevattno == child_attno) + { + match = true; + nncolumns = foreach_delete_current(nncolumns, prevattno); + break; + } } } + else + continue; if (match) { @@ -16487,6 +16840,12 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) } } + /* We should have matched all constraints */ + if (connames != NIL || nncolumns != NIL) + elog(ERROR, "%d unmatched constraints while removing inheritance from \"%s\" to \"%s\"", + list_length(connames) + list_length(nncolumns), + RelationGetRelationName(child_rel), RelationGetRelationName(parent_rel)); + systable_endscan(scan); table_close(catalogRelation, RowExclusiveLock); @@ -19039,7 +19398,8 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) /* * If no suitable index was found in the partition-to-be, create one - * now. + * now. Note that if this is a PK, not-null constraints must already + * exist. */ if (!found) { @@ -19737,7 +20097,7 @@ ATExecDetachPartitionFinalize(Relation rel, RangeVar *name) * DetachAddConstraintIfNeeded * Subroutine for ATExecDetachPartition. Create a constraint that * takes the place of the partition constraint, but avoid creating - * a dupe if an constraint already exists which implies the needed + * a dupe if a constraint already exists which implies the needed * constraint. */ static void @@ -19770,8 +20130,8 @@ DetachAddConstraintIfNeeded(List **wqueue, Relation partRel) n->initially_valid = true; n->skip_validation = true; /* It's a re-add, since it nominally already exists */ - ATAddCheckConstraint(wqueue, tab, partRel, n, - true, false, true, ShareUpdateExclusiveLock); + ATAddCheckNNConstraint(wqueue, tab, partRel, n, + true, false, true, ShareUpdateExclusiveLock); } } @@ -20040,6 +20400,13 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) RelationGetRelationName(partIdx)))); } + /* + * If it's a primary key, make sure the columns in the partition are + * NOT NULL. + */ + if (parentIdx->rd_index->indisprimary) + verifyPartitionIndexNotNull(childInfo, partTbl); + /* All good -- do it */ IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx)); if (OidIsValid(constraintOid)) @@ -20184,6 +20551,29 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl) } /* + * When attaching an index as a partition of a partitioned index which is a + * primary key, verify that all the columns in the partition are marked NOT + * NULL. + */ +static void +verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition) +{ + for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++) + { + Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition), + iinfo->ii_IndexAttrNumbers[i] - 1); + + if (!att->attnotnull) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("invalid primary key definition"), + errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.", + NameStr(att->attname), + RelationGetRelationName(partition))); + } +} + +/* * Return an OID list of constraints that reference the given relation * that are marked as having a parent constraints. */ |