diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 1591 |
1 files changed, 1172 insertions, 419 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index f77de4e7c99..47c900445c7 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -200,7 +200,7 @@ typedef struct AlteredTableInfo } AlteredTableInfo; /* Struct describing one new constraint to check in Phase 3 scan */ -/* Note: new NOT NULL constraints are handled elsewhere */ +/* Note: new not-null constraints are handled elsewhere */ typedef struct NewConstraint { char *name; /* Constraint name, or NULL if none */ @@ -351,7 +351,8 @@ static void truncate_check_activity(Relation rel); static void RangeVarCallbackForTruncate(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); static List *MergeAttributes(List *schema, List *supers, char relpersistence, - bool is_partition, List **supconstr); + bool is_partition, List **supconstr, + List **supnotnulls); static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); @@ -432,16 +433,16 @@ static bool check_for_column_name_collision(Relation rel, const char *colname, bool if_not_exists); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid); -static void ATPrepDropNotNull(Relation rel, bool recurse, bool recursing); -static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode); -static void ATPrepSetNotNull(List **wqueue, Relation rel, - AlterTableCmd *cmd, bool recurse, bool recursing, - LOCKMODE lockmode, - AlterTableUtilityContext *context); -static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode); -static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode); +static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse, + LOCKMODE lockmode); +static bool set_attnotnull(List **wqueue, Relation rel, + AttrNumber attnum, bool recurse, LOCKMODE lockmode); +static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel, + char *constrname, char *colName, + bool recurse, bool recursing, + List **readyRels, LOCKMODE lockmode); +static ObjectAddress ATExecSetAttNotNull(List **wqueue, Relation rel, + const char *colName, LOCKMODE lockmode); static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr); static bool ConstraintImpliedByRelConstraint(Relation scanrel, List *testConstraint, List *provenConstraint); @@ -470,6 +471,8 @@ static ObjectAddress ATExecDropColumn(List **wqueue, Relation rel, const char *c bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode, ObjectAddresses *addrs); +static void ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, + LOCKMODE lockmode, AlterTableUtilityContext *context); static ObjectAddress ATExecAddIndex(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode); static ObjectAddress ATExecAddStatistics(AlteredTableInfo *tab, Relation rel, @@ -481,11 +484,11 @@ static ObjectAddress ATExecAddConstraint(List **wqueue, static char *ChooseForeignKeyConstraintNameAddition(List *colnames); static ObjectAddress ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, LOCKMODE lockmode); -static ObjectAddress ATAddCheckConstraint(List **wqueue, - AlteredTableInfo *tab, Relation rel, - Constraint *constr, - bool recurse, bool recursing, bool is_readd, - LOCKMODE lockmode); +static ObjectAddress ATAddCheckNNConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Constraint *constr, + bool recurse, bool recursing, bool is_readd, + LOCKMODE lockmode); static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Constraint *fkconstraint, bool recurse, bool recursing, @@ -542,6 +545,11 @@ static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode); +static ObjectAddress dropconstraint_internal(Relation rel, + HeapTuple constraintTup, DropBehavior behavior, + bool recurse, bool recursing, + bool missing_ok, List **readyRels, + LOCKMODE lockmode); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -614,10 +622,12 @@ static void ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partPa static void CreateInheritance(Relation child_rel, Relation parent_rel); static void RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached); +static void ATInheritAdjustNotNulls(Relation parent_rel, Relation child_rel, + int inhcount); static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, AlterTableUtilityContext *context); -static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel); +static void AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel); static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel, List *partConstraint, bool validate_default); @@ -635,6 +645,7 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl); static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl); +static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx); static List *GetParentedForeignKeyRefs(Relation partition); static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, char *compression); @@ -672,8 +683,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, TupleDesc descriptor; List *inheritOids; List *old_constraints; + List *old_notnulls; List *rawDefaults; List *cookedDefaults; + List *nncols; Datum reloptions; ListCell *listptr; AttrNumber attnum; @@ -863,12 +876,13 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, MergeAttributes(stmt->tableElts, inheritOids, stmt->relation->relpersistence, stmt->partbound != NULL, - &old_constraints); + &old_constraints, &old_notnulls); /* * Create a tuple descriptor from the relation schema. Note that this - * deals with column names, types, and NOT NULL constraints, but not - * default values or CHECK constraints; we handle those below. + * deals with column names, types, and in-descriptor NOT NULL flags, but + * not default values, NOT NULL or CHECK constraints; we handle those + * below. */ descriptor = BuildDescForRelation(stmt->tableElts); @@ -1251,6 +1265,17 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, AddRelationNewConstraints(rel, NIL, stmt->constraints, true, true, false, queryString); + /* + * Finally, merge the not-null constraints that are declared directly with + * those that come from parent relations (making sure to count inheritance + * appropriately for each), create them, and set the attnotnull flag on + * columns that don't yet have it. + */ + nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints, + old_notnulls); + foreach(listptr, nncols) + set_attnotnull(NULL, rel, lfirst_int(listptr), false, NoLock); + ObjectAddressSet(address, RelationRelationId, relationId); /* @@ -2299,6 +2324,8 @@ storage_name(char c) * Output arguments: * 'supconstr' receives a list of constraints belonging to the parents, * updated as necessary to be valid for the child. + * 'supnotnulls' receives a list of CookedConstraints that corresponds to + * constraints coming from inheritance parents. * * Return value: * Completed schema list. @@ -2327,9 +2354,12 @@ storage_name(char c) * If the same attribute name appears multiple times, then it appears * in the result table in the proper location for its first appearance. * - * Constraints (including NOT NULL constraints) for the child table + * Constraints (including not-null constraints) for the child table * are the union of all relevant constraints, from both the child schema - * and parent tables. + * and parent tables. In addition, in legacy inheritance, each column that + * appears in a primary key in any of the parents also gets a NOT NULL + * constraint (partitioning doesn't need this, because the PK itself gets + * inherited.) * * The default value for a child column is defined as: * (1) If the child schema specifies a default, that value is used. @@ -2348,10 +2378,11 @@ storage_name(char c) */ static List * MergeAttributes(List *schema, List *supers, char relpersistence, - bool is_partition, List **supconstr) + bool is_partition, List **supconstr, List **supnotnulls) { List *inhSchema = NIL; List *constraints = NIL; + List *nnconstraints = NIL; bool have_bogus_defaults = false; int child_attno; static Node bogus_marker = {0}; /* marks conflicting defaults */ @@ -2462,9 +2493,12 @@ MergeAttributes(List *schema, List *supers, char relpersistence, AttrMap *newattmap; List *inherited_defaults; List *cols_with_defaults; + List *nnconstrs; AttrNumber parent_attno; ListCell *lc1; ListCell *lc2; + Bitmapset *pkattrs; + Bitmapset *nncols = NULL; /* caller already got lock */ relation = table_open(parent, NoLock); @@ -2553,6 +2587,20 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* We can't process inherited defaults until newattmap is complete. */ inherited_defaults = cols_with_defaults = NIL; + /* + * All columns that are part of the parent's primary key need to be + * NOT NULL; if partition just the attnotnull bit, otherwise a full + * constraint (if they don't have one already). Also, we request + * attnotnull on columns that have a not-null constraint that's not + * marked NO INHERIT. + */ + pkattrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation), true); + foreach(lc1, nnconstrs) + nncols = bms_add_member(nncols, + ((CookedConstraint *) lfirst(lc1))->attnum); + for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) { @@ -2648,9 +2696,38 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } /* - * Merge of NOT NULL constraints = OR 'em together + * In regular inheritance, columns in the parent's primary key + * get an extra not-null constraint. Partitioning doesn't + * need this, because the PK itself is going to be cloned to + * the partition. */ - def->is_not_null |= attribute->attnotnull; + if (!is_partition && + bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, + pkattrs)) + { + CookedConstraint *nn; + + nn = palloc(sizeof(CookedConstraint)); + nn->contype = CONSTR_NOTNULL; + nn->conoid = InvalidOid; + nn->name = NULL; + nn->attnum = exist_attno; + nn->expr = NULL; + nn->skip_validation = false; + nn->is_local = false; + nn->inhcount = 1; + nn->is_no_inherit = false; + + nnconstraints = lappend(nnconstraints, nn); + } + + /* + * mark attnotnull if parent has it and it's not NO INHERIT + */ + if (bms_is_member(parent_attno, nncols) || + bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, + pkattrs)) + def->is_not_null = true; /* * Check for GENERATED conflicts @@ -2684,7 +2761,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence, attribute->atttypmod); def->inhcount = 1; def->is_local = false; - def->is_not_null = attribute->attnotnull; + /* mark attnotnull if parent has it and it's not NO INHERIT */ + if (bms_is_member(parent_attno, nncols) || + bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, + pkattrs)) + def->is_not_null = true; def->is_from_type = false; def->storage = attribute->attstorage; def->raw_default = NULL; @@ -2701,6 +2782,33 @@ MergeAttributes(List *schema, List *supers, char relpersistence, def->compression = NULL; inhSchema = lappend(inhSchema, def); newattmap->attnums[parent_attno - 1] = ++child_attno; + + /* + * In regular inheritance, columns in the parent's primary key + * get an extra not-null constraint. Partitioning doesn't + * need this, because the PK itself is going to be cloned to + * the partition. + */ + if (!is_partition && + bms_is_member(parent_attno - + FirstLowInvalidHeapAttributeNumber, + pkattrs)) + { + CookedConstraint *nn; + + nn = palloc(sizeof(CookedConstraint)); + nn->contype = CONSTR_NOTNULL; + nn->conoid = InvalidOid; + nn->name = NULL; + nn->attnum = newattmap->attnums[parent_attno - 1]; + nn->expr = NULL; + nn->skip_validation = false; + nn->is_local = false; + nn->inhcount = 1; + nn->is_no_inherit = false; + + nnconstraints = lappend(nnconstraints, nn); + } } /* @@ -2845,6 +2953,23 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } } + /* + * Also copy the not-null constraints from this parent. The + * attnotnull markings were already installed above. + */ + foreach(lc1, nnconstrs) + { + CookedConstraint *nn = lfirst(lc1); + + Assert(nn->contype == CONSTR_NOTNULL); + + nn->attnum = newattmap->attnums[nn->attnum - 1]; + nn->is_local = false; + nn->inhcount = 1; + + nnconstraints = lappend(nnconstraints, nn); + } + free_attrmap(newattmap); /* @@ -2972,7 +3097,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } /* - * Merge of NOT NULL constraints = OR 'em together + * Merge of not-null constraints = OR 'em together */ def->is_not_null |= newdef->is_not_null; @@ -3051,8 +3176,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* * Now that we have the column definition list for a partition, we can * check whether the columns referenced in the column constraint specs - * actually exist. Also, we merge parent's NOT NULL constraints and - * defaults into each corresponding column definition. + * actually exist. Also, merge column defaults. */ if (is_partition) { @@ -3069,7 +3193,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence, if (strcmp(coldef->colname, restdef->colname) == 0) { found = true; - coldef->is_not_null |= restdef->is_not_null; /* * Check for conflicts related to generated columns. @@ -3158,6 +3281,8 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } *supconstr = constraints; + *supnotnulls = nnconstraints; + return schema; } @@ -3769,7 +3894,10 @@ rename_constraint_internal(Oid myrelid, constraintOid); con = (Form_pg_constraint) GETSTRUCT(tuple); - if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit) + if (myrelid && + (con->contype == CONSTRAINT_CHECK || + con->contype == CONSTRAINT_NOTNULL) && + !con->connoinherit) { if (recurse) { @@ -4354,6 +4482,7 @@ AlterTableGetLockLevel(List *cmds) case AT_AddIndexConstraint: case AT_ReplicaIdentity: case AT_SetNotNull: + case AT_SetAttNotNull: case AT_EnableRowSecurity: case AT_DisableRowSecurity: case AT_ForceRowSecurity: @@ -4492,15 +4621,6 @@ AlterTableGetLockLevel(List *cmds) cmd_lockmode = ShareUpdateExclusiveLock; break; - case AT_CheckNotNull: - - /* - * This only examines the table's schema; but lock must be - * strong enough to prevent concurrent DROP NOT NULL. - */ - cmd_lockmode = AccessShareLock; - break; - default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -4652,21 +4772,23 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - ATPrepDropNotNull(rel, recurse, recursing); - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + /* Set up recursion for phase 2; no other prep needed */ + if (recurse) + cmd->recurse = true; pass = AT_PASS_DROP; break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Need command-specific recursion decision */ - ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing, - lockmode, context); + /* Set up recursion for phase 2; no other prep needed */ + if (recurse) + cmd->recurse = true; pass = AT_PASS_COL_ATTRS; break; - case AT_CheckNotNull: /* check column is already marked NOT NULL */ + case AT_SetAttNotNull: /* set pg_attribute.attnotnull without adding + * a constraint */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); + /* Need command-specific recursion decision */ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); - /* No command-specific prep needed */ pass = AT_PASS_COL_ATTRS; break; case AT_DropExpression: /* ALTER COLUMN DROP EXPRESSION */ @@ -5045,13 +5167,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode); break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ - address = ATExecDropNotNull(rel, cmd->name, lockmode); + address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode); break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ - address = ATExecSetNotNull(tab, rel, cmd->name, lockmode); + address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name, + cmd->recurse, false, NULL, lockmode); break; - case AT_CheckNotNull: /* check column is already marked NOT NULL */ - ATExecCheckNotNull(tab, rel, cmd->name, lockmode); + case AT_SetAttNotNull: /* set pg_attribute.attnotnull */ + address = ATExecSetAttNotNull(wqueue, rel, cmd->name, lockmode); break; case AT_DropExpression: address = ATExecDropExpression(rel, cmd->name, cmd->missing_ok, lockmode); @@ -5387,21 +5510,23 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, */ switch (cmd2->subtype) { - case AT_SetNotNull: - /* Need command-specific recursion decision */ - ATPrepSetNotNull(wqueue, rel, cmd2, - recurse, false, - lockmode, context); + case AT_SetAttNotNull: + ATSimpleRecursion(wqueue, rel, cmd2, recurse, lockmode, context); pass = AT_PASS_COL_ATTRS; break; case AT_AddIndex: - /* This command never recurses */ - /* No command-specific prep needed */ + + /* + * A primary key on a inheritance parent needs supporting NOT + * NULL constraint on its children; enqueue commands to create + * those or mark them inherited if they already exist. + */ + ATPrepAddPrimaryKey(wqueue, rel, cmd2, lockmode, context); pass = AT_PASS_ADD_INDEX; break; case AT_AddIndexConstraint: - /* This command never recurses */ - /* No command-specific prep needed */ + /* as above */ + ATPrepAddPrimaryKey(wqueue, rel, cmd2, lockmode, context); pass = AT_PASS_ADD_INDEXCONSTR; break; case AT_AddConstraint: @@ -5845,7 +5970,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) { /* * If we are rebuilding the tuples OR if we added any new but not - * verified NOT NULL constraints, check all not-null constraints. This + * verified not-null constraints, check all not-null constraints. This * is a bit of overkill but it minimizes risk of bugs, and * heap_attisnull is a pretty cheap test anyway. */ @@ -6067,6 +6192,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) RelationGetRelationName(oldrel)), errtableconstraint(oldrel, con->name))); break; + case CONSTR_NOTNULL: case CONSTR_FOREIGN: /* Nothing to do here */ break; @@ -6175,10 +6301,10 @@ alter_table_type_to_string(AlterTableType cmdtype) return "ALTER COLUMN ... DROP NOT NULL"; case AT_SetNotNull: return "ALTER COLUMN ... SET NOT NULL"; + case AT_SetAttNotNull: + return NULL; /* not real grammar */ case AT_DropExpression: return "ALTER COLUMN ... DROP EXPRESSION"; - case AT_CheckNotNull: - return NULL; /* not real grammar */ case AT_SetStatistics: return "ALTER COLUMN ... SET STATISTICS"; case AT_SetOptions: @@ -6774,8 +6900,7 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, */ static ObjectAddress ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, - AlterTableCmd **cmd, - bool recurse, bool recursing, + AlterTableCmd **cmd, bool recurse, bool recursing, LOCKMODE lockmode, int cur_pass, AlterTableUtilityContext *context) { @@ -7044,7 +7169,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, * the effect of NULL values in the new column. * * An exception occurs when the new column is of a domain type: the domain - * might have a NOT NULL constraint, or a check constraint that indirectly + * might have a not-null constraint, or a check constraint that indirectly * rejects nulls. If there are any domain constraints then we construct * an explicit NULL default value that will be passed through * CoerceToDomain processing. (This is a tad inefficient, since it causes @@ -7290,42 +7415,21 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid) /* * ALTER TABLE ALTER COLUMN DROP NOT NULL - */ - -static void -ATPrepDropNotNull(Relation rel, bool recurse, bool recursing) -{ - /* - * If the parent is a partitioned table, like check constraints, we do not - * support removing the NOT NULL while partitions exist. - */ - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - PartitionDesc partdesc = RelationGetPartitionDesc(rel, true); - - Assert(partdesc != NULL); - if (partdesc->nparts > 0 && !recurse && !recursing) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot remove constraint from only the partitioned table when partitions exist"), - errhint("Do not specify the ONLY keyword."))); - } -} - -/* + * * Return the address of the modified column. If the column was already * nullable, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) +ATExecDropNotNull(Relation rel, const char *colName, bool recurse, + LOCKMODE lockmode) { HeapTuple tuple; + HeapTuple conTup; Form_pg_attribute attTup; AttrNumber attnum; Relation attr_rel; - List *indexoidlist; - ListCell *indexoidscan; ObjectAddress address; + List *readyRels; /* * lookup the attribute @@ -7340,6 +7444,15 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) colName, RelationGetRelationName(rel)))); attTup = (Form_pg_attribute) GETSTRUCT(tuple); attnum = attTup->attnum; + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); + + /* If the column is already nullable there's nothing to do. */ + if (!attTup->attnotnull) + { + table_close(attr_rel, RowExclusiveLock); + return InvalidObjectAddress; + } /* Prevent them from altering a system attribute */ if (attnum <= 0) @@ -7355,62 +7468,37 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) colName, RelationGetRelationName(rel)))); /* - * Check that the attribute is not in a primary key or in an index used as - * a replica identity. - * - * Note: we'll throw error even if the pkey index is not valid. + * It's not OK to remove a constraint only for the parent and leave it in + * the children, so disallow that. */ - - /* Loop over all indexes on the relation */ - indexoidlist = RelationGetIndexList(rel); - - foreach(indexoidscan, indexoidlist) + if (!recurse) { - Oid indexoid = lfirst_oid(indexoidscan); - HeapTuple indexTuple; - Form_pg_index indexStruct; - int i; + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc partdesc; - indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); - if (!HeapTupleIsValid(indexTuple)) - elog(ERROR, "cache lookup failed for index %u", indexoid); - indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); + partdesc = RelationGetPartitionDesc(rel, true); - /* - * If the index is not a primary key or an index used as replica - * identity, skip the check. - */ - if (indexStruct->indisprimary || indexStruct->indisreplident) + if (partdesc->nparts > 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot remove constraint from only the partitioned table when partitions exist"), + errhint("Do not specify the ONLY keyword.")); + } + else if (rel->rd_rel->relhassubclass && + find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL) { - /* - * Loop over each attribute in the primary key or the index used - * as replica identity and see if it matches the to-be-altered - * attribute. - */ - for (i = 0; i < indexStruct->indnkeyatts; i++) - { - if (indexStruct->indkey.values[i] == attnum) - { - if (indexStruct->indisprimary) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in a primary key", - colName))); - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in index used as replica identity", - colName))); - } - } + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("not-null constraint on column \"%s\" must be removed in child tables too", + colName), + errhint("Do not specify the ONLY keyword.")); } - - ReleaseSysCache(indexTuple); } - list_free(indexoidlist); - - /* If rel is partition, shouldn't drop NOT NULL if parent has the same */ + /* + * If rel is partition, shouldn't drop NOT NULL if parent has the same. + */ if (rel->rd_rel->relispartition) { Oid parentId = get_partition_parent(RelationGetRelid(rel), false); @@ -7428,19 +7516,35 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) } /* - * Okay, actually perform the catalog change ... if needed + * Find the constraint that makes this column NOT NULL. */ - if (attTup->attnotnull) + conTup = findNotNullConstraint(RelationGetRelid(rel), colName); + if (conTup == NULL) { - attTup->attnotnull = false; + Bitmapset *pkcols; - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); + /* + * There's no not-null constraint, so throw an error. If the column + * is in a primary key, we can throw a specific error. Otherwise, + * this is unexpected. + */ + pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY); + if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, + pkcols)) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in a primary key", colName)); - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); + /* this shouldn't happen */ + elog(ERROR, "could not find not-null constraint on column \"%s\", relation \"%s\"", + colName, RelationGetRelationName(rel)); } - else - address = InvalidObjectAddress; + + readyRels = NIL; + dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false, + false, &readyRels, lockmode); + + heap_freetuple(conTup); InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); @@ -7451,102 +7555,137 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) } /* - * ALTER TABLE ALTER COLUMN SET NOT NULL + * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3 + * to verify it; recurses to apply the same to children. + * + * When called to alter an existing table, 'wqueue' must be given so that we can + * queue a check that existing tuples pass the constraint. When called from + * table creation, 'wqueue' should be passed as NULL. + * + * Returns true if the flag was set in any table, otherwise false. */ - -static void -ATPrepSetNotNull(List **wqueue, Relation rel, - AlterTableCmd *cmd, bool recurse, bool recursing, - LOCKMODE lockmode, AlterTableUtilityContext *context) +static bool +set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, bool recurse, + LOCKMODE lockmode) { - /* - * If we're already recursing, there's nothing to do; the topmost - * invocation of ATSimpleRecursion already visited all children. - */ - if (recursing) - return; + HeapTuple tuple; + Form_pg_attribute attForm; + bool retval = false; - /* - * If the target column is already marked NOT NULL, we can skip recursing - * to children, because their columns should already be marked NOT NULL as - * well. But there's no point in checking here unless the relation has - * some children; else we can just wait till execution to check. (If it - * does have children, however, this can save taking per-child locks - * unnecessarily. This greatly improves concurrency in some parallel - * restore scenarios.) - * - * Unfortunately, we can only apply this optimization to partitioned - * tables, because traditional inheritance doesn't enforce that child - * columns be NOT NULL when their parent is. (That's a bug that should - * get fixed someday.) - */ - if (rel->rd_rel->relhassubclass && - rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attnum, RelationGetRelid(rel)); + attForm = (Form_pg_attribute) GETSTRUCT(tuple); + if (!attForm->attnotnull) { - HeapTuple tuple; - bool attnotnull; + Relation attr_rel; - tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name); + attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - /* Might as well throw the error now, if name is bad */ - if (!HeapTupleIsValid(tuple)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - cmd->name, RelationGetRelationName(rel)))); + attForm->attnotnull = true; + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull; - ReleaseSysCache(tuple); - if (attnotnull) - return; + table_close(attr_rel, RowExclusiveLock); + + /* + * And set up for existing values to be checked, unless another + * constraint already proves this. + */ + if (wqueue && !NotNullImpliedByRelConstraints(rel, attForm)) + { + AlteredTableInfo *tab; + + tab = ATGetQueueEntry(wqueue, rel); + tab->verify_new_notnull = true; + } + + retval = true; } - /* - * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table, - * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use - * normal recursion logic. - */ - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - !recurse) + if (recurse) { - AlterTableCmd *newcmd = makeNode(AlterTableCmd); + List *children; + ListCell *lc; + + children = find_inheritance_children(RelationGetRelid(rel), lockmode); + foreach(lc, children) + { + Oid childrelid = lfirst_oid(lc); + Relation childrel; + AttrNumber childattno; + + /* find_inheritance_children already got lock */ + childrel = table_open(childrelid, NoLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); - newcmd->subtype = AT_CheckNotNull; - newcmd->name = pstrdup(cmd->name); - ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context); + childattno = get_attnum(RelationGetRelid(childrel), + get_attname(RelationGetRelid(rel), attnum, + false)); + retval |= set_attnotnull(wqueue, childrel, childattno, + recurse, lockmode); + table_close(childrel, NoLock); + } } - else - ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + + return retval; } /* - * Return the address of the modified column. If the column was already NOT - * NULL, InvalidObjectAddress is returned. + * ALTER TABLE ALTER COLUMN SET NOT NULL + * + * Add a not-null constraint to a single table and its children. Returns + * the address of the constraint added to the parent relation, if one gets + * added, or InvalidObjectAddress otherwise. + * + * We must recurse to child tables during execution, rather than using + * ALTER TABLE's normal prep-time recursion. */ static ObjectAddress -ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode) +ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, + bool recurse, bool recursing, List **readyRels, + LOCKMODE lockmode) { HeapTuple tuple; + Relation constr_rel; + ScanKeyData skey; + SysScanDesc conscan; AttrNumber attnum; - Relation attr_rel; ObjectAddress address; + Constraint *constraint; + CookedConstraint *ccon; + List *cooked; + bool is_no_inherit = false; + List *ready = NIL; /* - * lookup the attribute + * In cases of multiple inheritance, we might visit the same child more + * than once. In the topmost call, set up a list that we fill with all + * visited relations, to skip those. */ - attr_rel = table_open(AttributeRelationId, RowExclusiveLock); + if (readyRels == NULL) + { + Assert(!recursing); + readyRels = &ready; + } + if (list_member_oid(*readyRels, RelationGetRelid(rel))) + return InvalidObjectAddress; + *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); - tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + { + ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); + Assert(conName != NULL); + } - if (!HeapTupleIsValid(tuple)) + attnum = get_attnum(RelationGetRelid(rel), colName); + if (attnum == InvalidAttrNumber) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", colName, RelationGetRelationName(rel)))); - attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; - /* Prevent them from altering a system attribute */ if (attnum <= 0) ereport(ERROR, @@ -7554,80 +7693,178 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, errmsg("cannot alter system column \"%s\"", colName))); - /* - * Okay, actually perform the catalog change ... if needed - */ - if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) + /* See if there's already a constraint */ + constr_rel = table_open(ConstraintRelationId, RowExclusiveLock); + ScanKeyInit(&skey, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + conscan = systable_beginscan(constr_rel, ConstraintRelidTypidNameIndexId, true, + NULL, 1, &skey); + + while (HeapTupleIsValid(tuple = systable_getnext(conscan))) { - ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = true; + Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple); + bool changed = false; + HeapTuple copytup; - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); + if (conForm->contype != CONSTRAINT_NOTNULL) + continue; + + if (extractNotNullColumn(tuple) != attnum) + continue; + + copytup = heap_copytuple(tuple); + conForm = (Form_pg_constraint) GETSTRUCT(copytup); /* - * Ordinarily phase 3 must ensure that no NULLs exist in columns that - * are set NOT NULL; however, if we can find a constraint which proves - * this then we can skip that. We needn't bother looking if we've - * already found that we must verify some other NOT NULL constraint. + * If we find an appropriate constraint, we're almost done, but just + * need to change some properties on it: if we're recursing, increment + * coninhcount; if not, set conislocal if not already set. */ - if (!tab->verify_new_notnull && - !NotNullImpliedByRelConstraints(rel, (Form_pg_attribute) GETSTRUCT(tuple))) + if (recursing) { - /* Tell Phase 3 it needs to test the constraint */ - tab->verify_new_notnull = true; + conForm->coninhcount++; + changed = true; + } + else if (!conForm->conislocal) + { + conForm->conislocal = true; + changed = true; } - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); + if (changed) + { + CatalogTupleUpdate(constr_rel, ©tup->t_self, copytup); + ObjectAddressSet(address, ConstraintRelationId, conForm->oid); + } + + systable_endscan(conscan); + table_close(constr_rel, RowExclusiveLock); + + if (changed) + return address; + else + return InvalidObjectAddress; } - else - address = InvalidObjectAddress; + + systable_endscan(conscan); + table_close(constr_rel, RowExclusiveLock); + + /* + * If we're asked not to recurse, and children exist, raise an error for + * partitioned tables. For inheritance, we act as if NO INHERIT had been + * specified. + */ + if (!recurse && + find_inheritance_children(RelationGetRelid(rel), + NoLock) != NIL) + { + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be added to child tables too"), + errhint("Do not specify the ONLY keyword.")); + else + is_no_inherit = true; + } + + /* + * No constraint exists; we must add one. First determine a name to use, + * if we haven't already. + */ + if (!recursing) + { + Assert(conName == NULL); + conName = ChooseConstraintName(RelationGetRelationName(rel), + colName, "not_null", + RelationGetNamespace(rel), + NIL); + } + constraint = makeNode(Constraint); + constraint->contype = CONSTR_NOTNULL; + constraint->conname = conName; + constraint->deferrable = false; + constraint->initdeferred = false; + constraint->location = -1; + constraint->keys = list_make1(makeString(colName)); + constraint->is_no_inherit = is_no_inherit; + constraint->inhcount = recursing ? 1 : 0; + constraint->skip_validation = false; + constraint->initially_valid = true; + + /* and do it */ + cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint), + false, !recursing, false, NULL); + ccon = linitial(cooked); + ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); - table_close(attr_rel, RowExclusiveLock); + /* + * Mark pg_attribute.attnotnull for the column. Tell that function not to + * recurse, because we're going to do it here. + */ + set_attnotnull(wqueue, rel, attnum, false, lockmode); + + /* + * Recurse to propagate the constraint to children that don't have one. + */ + if (recurse) + { + List *children; + ListCell *lc; + + children = find_inheritance_children(RelationGetRelid(rel), + lockmode); + + foreach(lc, children) + { + Relation childrel; + + childrel = table_open(lfirst_oid(lc), NoLock); + + ATExecSetNotNull(wqueue, childrel, + conName, colName, recurse, true, + readyRels, lockmode); + + table_close(childrel, NoLock); + } + } return address; } /* - * ALTER TABLE ALTER COLUMN CHECK NOT NULL + * ALTER TABLE ALTER COLUMN SET ATTNOTNULL * - * This doesn't exist in the grammar, but we generate AT_CheckNotNull - * commands against the partitions of a partitioned table if the user - * writes ALTER TABLE ONLY ... SET NOT NULL on the partitioned table, - * or tries to create a primary key on it (which internally creates - * AT_SetNotNull on the partitioned table). Such a command doesn't - * allow us to actually modify any partition, but we want to let it - * go through if the partitions are already properly marked. - * - * In future, this might need to adjust the child table's state, likely - * by incrementing an inheritance count for the attnotnull constraint. - * For now we need only check for the presence of the flag. + * This doesn't exist in the grammar; it's used when creating a + * primary key and the column is not already marked attnotnull. */ -static void -ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, - const char *colName, LOCKMODE lockmode) +static ObjectAddress +ATExecSetAttNotNull(List **wqueue, Relation rel, + const char *colName, LOCKMODE lockmode) { - HeapTuple tuple; - - tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + AttrNumber attnum; + ObjectAddress address = InvalidObjectAddress; - if (!HeapTupleIsValid(tuple)) + attnum = get_attnum(RelationGetRelid(rel), colName); + if (attnum == InvalidAttrNumber) ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel)))); + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel))); - if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("constraint must be added to child tables too"), - errdetail("Column \"%s\" of relation \"%s\" is not already NOT NULL.", - colName, RelationGetRelationName(rel)), - errhint("Do not specify the ONLY keyword."))); + /* + * Make the change, if necessary, and only if so report the column as + * changed + */ + if (set_attnotnull(wqueue, rel, attnum, false, lockmode)) + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); - ReleaseSysCache(tuple); + return address; } /* @@ -8678,6 +8915,71 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, } /* + * Prepare to add a primary key on an inheritance parent, by adding NOT NULL + * constraint on its children. + */ +static void +ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, + LOCKMODE lockmode, AlterTableUtilityContext *context) +{ + List *children; + List *newconstrs = NIL; + ListCell *lc; + IndexStmt *stmt; + + /* No work if no legacy inheritance children are present */ + if (rel->rd_rel->relkind != RELKIND_RELATION || + !rel->rd_rel->relhassubclass) + return; + + children = find_inheritance_children(RelationGetRelid(rel), lockmode); + + stmt = castNode(IndexStmt, cmd->def); + foreach(lc, stmt->indexParams) + { + IndexElem *elem = lfirst_node(IndexElem, lc); + Constraint *nnconstr; + + Assert(elem->expr == NULL); + + nnconstr = makeNode(Constraint); + nnconstr->contype = CONSTR_NOTNULL; + nnconstr->conname = NULL; /* XXX use PK name? */ + nnconstr->inhcount = 1; + nnconstr->deferrable = false; + nnconstr->initdeferred = false; + nnconstr->location = -1; + nnconstr->keys = list_make1(makeString(elem->name)); + nnconstr->skip_validation = false; + nnconstr->initially_valid = true; + + newconstrs = lappend(newconstrs, nnconstr); + } + + foreach(lc, children) + { + Oid childrelid = lfirst_oid(lc); + Relation childrel = table_open(childrelid, NoLock); + AlterTableCmd *newcmd = makeNode(AlterTableCmd); + ListCell *lc2; + + newcmd->subtype = AT_AddConstraint; + newcmd->recurse = true; + + foreach(lc2, newconstrs) + { + /* ATPrepCmd copies newcmd, so we can scribble on it here */ + newcmd->def = lfirst(lc2); + + ATPrepCmd(wqueue, childrel, newcmd, + true, false, lockmode, context); + } + + table_close(childrel, NoLock); + } +} + +/* * ALTER TABLE ADD INDEX * * There is no such command in the grammar, but parse_utilcmd.c converts @@ -8872,17 +9174,18 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Assert(IsA(newConstraint, Constraint)); /* - * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes - * arriving here (see the preprocessing done in parse_utilcmd.c). Use a - * switch anyway to make it easier to add more code later. + * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and + * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in + * parse_utilcmd.c). */ switch (newConstraint->contype) { case CONSTR_CHECK: + case CONSTR_NOTNULL: address = - ATAddCheckConstraint(wqueue, tab, rel, - newConstraint, recurse, false, is_readd, - lockmode); + ATAddCheckNNConstraint(wqueue, tab, rel, + newConstraint, recurse, false, is_readd, + lockmode); break; case CONSTR_FOREIGN: @@ -8963,9 +9266,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) } /* - * Add a check constraint to a single table and its children. Returns the - * address of the constraint added to the parent relation, if one gets added, - * or InvalidObjectAddress otherwise. + * Add a check or not-null constraint to a single table and its children. + * Returns the address of the constraint added to the parent relation, + * if one gets added, or InvalidObjectAddress otherwise. * * Subroutine for ATExecAddConstraint. * @@ -8978,9 +9281,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) * the parent table and pass that down. */ static ObjectAddress -ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, - Constraint *constr, bool recurse, bool recursing, - bool is_readd, LOCKMODE lockmode) +ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Constraint *constr, bool recurse, bool recursing, + bool is_readd, LOCKMODE lockmode) { List *newcons; ListCell *lcon; @@ -9018,7 +9321,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - if (!ccon->skip_validation) + if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL) { NewConstraint *newcon; @@ -9034,11 +9337,19 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (constr->conname == NULL) constr->conname = ccon->name; + /* + * If adding a not-null constraint, set the pg_attribute flag and tell + * phase 3 to verify existing rows, if needed. + */ + if (constr->contype == CONSTR_NOTNULL) + set_attnotnull(wqueue, rel, ccon->attnum, + !ccon->is_no_inherit, lockmode); + ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); } /* At this point we must have a locked-down name to use */ - Assert(constr->conname != NULL); + Assert(newcons == NIL || constr->conname != NULL); /* Advance command counter in case same table is visited multiple times */ CommandCounterIncrement(); @@ -9076,6 +9387,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("constraint must be added to child tables too"))); + /* + * The constraint must appear as inherited in children, so create a + * modified constraint object to use. + */ + constr = copyObject(constr); + constr->inhcount = 1; foreach(child, children) { Oid childrelid = lfirst_oid(child); @@ -9089,9 +9406,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, /* Find or create work queue entry for this table */ childtab = ATGetQueueEntry(wqueue, childrel); - /* Recurse to child */ - ATAddCheckConstraint(wqueue, childtab, childrel, - constr, recurse, true, is_readd, lockmode); + /* + * Recurse to child. XXX if we didn't create a constraint on the + * parent because it already existed, and we do create one on a child, + * should we return that child's constraint ObjectAddress here? + */ + ATAddCheckNNConstraint(wqueue, childtab, childrel, + constr, recurse, true, is_readd, lockmode); table_close(childrel, NoLock); } @@ -11958,16 +12279,11 @@ ATExecDropConstraint(Relation rel, const char *constrName, bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode) { - List *children; - ListCell *child; Relation conrel; - Form_pg_constraint con; SysScanDesc scan; ScanKeyData skey[3]; HeapTuple tuple; bool found = false; - bool is_no_inherit_constraint = false; - char contype; /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) @@ -11996,80 +12312,238 @@ ATExecDropConstraint(Relation rel, const char *constrName, /* There can be at most one matching row */ if (HeapTupleIsValid(tuple = systable_getnext(scan))) { - ObjectAddress conobj; + List *readyRels = NIL; + + dropconstraint_internal(rel, tuple, behavior, recurse, recursing, + missing_ok, &readyRels, lockmode); + found = true; + } - con = (Form_pg_constraint) GETSTRUCT(tuple); + systable_endscan(scan); - /* Don't drop inherited constraints */ - if (con->coninhcount > 0 && !recursing) + if (!found) + { + if (!missing_ok) ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", - constrName, RelationGetRelationName(rel)))); + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel))); + else + ereport(NOTICE, + errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", + constrName, RelationGetRelationName(rel))); + } - is_no_inherit_constraint = con->connoinherit; - contype = con->contype; + table_close(conrel, RowExclusiveLock); +} - /* - * If it's a foreign-key constraint, we'd better lock the referenced - * table and check that that's not in use, just as we've already done - * for the constrained table (else we might, eg, be dropping a trigger - * that has unfired events). But we can/must skip that in the - * self-referential case. - */ - if (contype == CONSTRAINT_FOREIGN && - con->confrelid != RelationGetRelid(rel)) - { - Relation frel; +/* + * Remove a constraint, using its pg_constraint tuple + * + * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN + * DROP NOT NULL. + * + * Returns the address of the constraint being removed. + */ +static ObjectAddress +dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior, + bool recurse, bool recursing, bool missing_ok, List **readyRels, + LOCKMODE lockmode) +{ + Relation conrel; + Form_pg_constraint con; + ObjectAddress conobj; + List *children; + ListCell *child; + bool is_no_inherit_constraint = false; + bool dropping_pk = false; + char *constrName; + List *unconstrained_cols = NIL; + char *colname; - /* Must match lock taken by RemoveTriggerById: */ - frel = table_open(con->confrelid, AccessExclusiveLock); - CheckTableNotInUse(frel, "ALTER TABLE"); - table_close(frel, NoLock); - } + if (list_member_oid(*readyRels, RelationGetRelid(rel))) + return InvalidObjectAddress; + *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); - /* - * Perform the actual constraint deletion - */ - conobj.classId = ConstraintRelationId; - conobj.objectId = con->oid; - conobj.objectSubId = 0; + conrel = table_open(ConstraintRelationId, RowExclusiveLock); - performDeletion(&conobj, behavior, 0); + con = (Form_pg_constraint) GETSTRUCT(constraintTup); + constrName = NameStr(con->conname); - found = true; + /* Don't allow drop of inherited constraints */ + if (con->coninhcount > 0 && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); + + /* + * See if we have a not-null constraint or a PRIMARY KEY. If so, we have + * more checks and actions below, so obtain the list of columns that are + * constrained by the constraint being dropped. + */ + if (con->contype == CONSTRAINT_NOTNULL) + { + AttrNumber colnum = extractNotNullColumn(constraintTup); + + if (colnum != InvalidAttrNumber) + unconstrained_cols = list_make1_int(colnum); } + else if (con->contype == CONSTRAINT_PRIMARY) + { + Datum adatum; + ArrayType *arr; + int numkeys; + bool isNull; + int16 *attnums; - systable_endscan(scan); + dropping_pk = true; - if (!found) + adatum = heap_getattr(constraintTup, Anum_pg_constraint_conkey, + RelationGetDescr(conrel), &isNull); + if (isNull) + elog(ERROR, "null conkey for constraint %u", con->oid); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + numkeys = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + numkeys < 0 || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != INT2OID) + elog(ERROR, "conkey is not a 1-D smallint array"); + attnums = (int16 *) ARR_DATA_PTR(arr); + + for (int i = 0; i < numkeys; i++) + unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]); + } + + is_no_inherit_constraint = con->connoinherit; + + /* + * If it's a foreign-key constraint, we'd better lock the referenced table + * and check that that's not in use, just as we've already done for the + * constrained table (else we might, eg, be dropping a trigger that has + * unfired events). But we can/must skip that in the self-referential + * case. + */ + if (con->contype == CONSTRAINT_FOREIGN && + con->confrelid != RelationGetRelid(rel)) { - if (!missing_ok) - { - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, RelationGetRelationName(rel)))); - } - else + Relation frel; + + /* Must match lock taken by RemoveTriggerById: */ + frel = table_open(con->confrelid, AccessExclusiveLock); + CheckTableNotInUse(frel, "ALTER TABLE"); + table_close(frel, NoLock); + } + + /* + * Perform the actual constraint deletion + */ + ObjectAddressSet(conobj, ConstraintRelationId, con->oid); + performDeletion(&conobj, behavior, 0); + + /* + * If this was a NOT NULL or the primary key, the constrained columns must + * have had pg_attribute.attnotnull set. See if we need to reset it, and + * do so. + */ + if (unconstrained_cols) + { + Relation attrel; + Bitmapset *pkcols; + Bitmapset *ircols; + ListCell *lc; + + /* Make the above deletion visible */ + CommandCounterIncrement(); + + attrel = table_open(AttributeRelationId, RowExclusiveLock); + + /* + * We want to test columns for their presence in the primary key, but + * only if we're not dropping it. + */ + pkcols = dropping_pk ? NULL : + RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY); + + foreach(lc, unconstrained_cols) { - ereport(NOTICE, - (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", - constrName, RelationGetRelationName(rel)))); - table_close(conrel, RowExclusiveLock); - return; + AttrNumber attnum = lfirst_int(lc); + HeapTuple atttup; + HeapTuple contup; + Form_pg_attribute attForm; + + /* + * Obtain pg_attribute tuple and verify conditions on it. We use + * a copy we can scribble on. + */ + atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); + if (!HeapTupleIsValid(atttup)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attnum, RelationGetRelid(rel)); + attForm = (Form_pg_attribute) GETSTRUCT(atttup); + + /* + * Since the above deletion has been made visible, we can now + * search for any remaining constraints on this column (or these + * columns, in the case we're dropping a multicol primary key.) + * Then, verify whether any further NOT NULL or primary key + * exists, and reset attnotnull if none. + * + * However, if this is a generated identity column, abort the + * whole thing with a specific error message, because the + * constraint is required in that case. + */ + contup = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum); + if (contup || + bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, + pkcols)) + continue; + + /* + * It's not valid to drop the not-null constraint for a GENERATED + * AS IDENTITY column. + */ + if (attForm->attidentity) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("column \"%s\" of relation \"%s\" is an identity column", + get_attname(RelationGetRelid(rel), attnum, + false), + RelationGetRelationName(rel))); + + /* + * It's not valid to drop the not-null constraint for a column in + * the replica identity index, either. (FULL is not affected.) + */ + if (bms_is_member(lfirst_int(lc) - FirstLowInvalidHeapAttributeNumber, ircols)) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in index used as replica identity", + get_attname(RelationGetRelid(rel), lfirst_int(lc), false))); + + /* Reset attnotnull */ + if (attForm->attnotnull) + { + attForm->attnotnull = false; + CatalogTupleUpdate(attrel, &atttup->t_self, atttup); + } } + table_close(attrel, RowExclusiveLock); } /* - * For partitioned tables, non-CHECK inherited constraints are dropped via - * the dependency mechanism, so we're done here. + * For partitioned tables, non-CHECK, non-NOT-NULL inherited constraints + * are dropped via the dependency mechanism, so we're done here. */ - if (contype != CONSTRAINT_CHECK && + if (con->contype != CONSTRAINT_CHECK && + con->contype != CONSTRAINT_NOTNULL && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { table_close(conrel, RowExclusiveLock); - return; + return conobj; } /* @@ -12094,52 +12568,79 @@ ATExecDropConstraint(Relation rel, const char *constrName, errmsg("cannot remove constraint from only the partitioned table when partitions exist"), errhint("Do not specify the ONLY keyword."))); + /* For not-null constraints we recurse by column name */ + if (con->contype == CONSTRAINT_NOTNULL) + colname = NameStr(TupleDescAttr(RelationGetDescr(rel), + linitial_int(unconstrained_cols) - 1)->attname); + else + colname = NULL; /* keep compiler quiet */ + foreach(child, children) { Oid childrelid = lfirst_oid(child); Relation childrel; - HeapTuple copy_tuple; + HeapTuple tuple; + Form_pg_constraint childcon; + + if (list_member_oid(*readyRels, childrelid)) + continue; /* child already processed */ /* find_inheritance_children already got lock */ childrel = table_open(childrelid, NoLock); CheckTableNotInUse(childrel, "ALTER TABLE"); - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - ScanKeyInit(&skey[1], - Anum_pg_constraint_contypid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(InvalidOid)); - ScanKeyInit(&skey[2], - Anum_pg_constraint_conname, - BTEqualStrategyNumber, F_NAMEEQ, - CStringGetDatum(constrName)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 3, skey); - - /* There can be at most one matching row */ - if (!HeapTupleIsValid(tuple = systable_getnext(scan))) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, - RelationGetRelationName(childrel)))); - - copy_tuple = heap_copytuple(tuple); - - systable_endscan(scan); + /* + * We search for not-null constraint by column number, and other + * constraints by name. + */ + if (con->contype == CONSTRAINT_NOTNULL) + { + tuple = findNotNullConstraint(childrelid, colname); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation %u", + colname, RelationGetRelid(childrel)); + } + else + { + SysScanDesc scan; + ScanKeyData skey[3]; + + ScanKeyInit(&skey[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + ScanKeyInit(&skey[1], + Anum_pg_constraint_contypid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + ScanKeyInit(&skey[2], + Anum_pg_constraint_conname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(constrName)); + scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, + true, NULL, 3, skey); + /* There can only be one, so no need to loop */ + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); + tuple = heap_copytuple(tuple); + systable_endscan(scan); + } - con = (Form_pg_constraint) GETSTRUCT(copy_tuple); + childcon = (Form_pg_constraint) GETSTRUCT(tuple); - /* Right now only CHECK constraints can be inherited */ - if (con->contype != CONSTRAINT_CHECK) - elog(ERROR, "inherited constraint is not a CHECK constraint"); + /* Right now only CHECK and not-null constraints can be inherited */ + if (childcon->contype != CONSTRAINT_CHECK && + childcon->contype != CONSTRAINT_NOTNULL) + elog(ERROR, "inherited constraint is not a CHECK or not-null constraint"); - if (con->coninhcount <= 0) /* shouldn't happen */ + if (childcon->coninhcount <= 0) /* shouldn't happen */ elog(ERROR, "relation %u has non-inherited constraint \"%s\"", - childrelid, constrName); + childrelid, NameStr(childcon->conname)); if (recurse) { @@ -12147,18 +12648,18 @@ ATExecDropConstraint(Relation rel, const char *constrName, * If the child constraint has other definition sources, just * decrement its inheritance count; if not, recurse to delete it. */ - if (con->coninhcount == 1 && !con->conislocal) + if (childcon->coninhcount == 1 && !childcon->conislocal) { /* Time to delete this child constraint, too */ - ATExecDropConstraint(childrel, constrName, behavior, - true, true, - false, lockmode); + dropconstraint_internal(childrel, tuple, behavior, + recurse, true, missing_ok, readyRels, + lockmode); } else { /* Child constraint must survive my deletion */ - con->coninhcount--; - CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); + childcon->coninhcount--; + CatalogTupleUpdate(conrel, &tuple->t_self, tuple); /* Make update visible */ CommandCounterIncrement(); @@ -12167,25 +12668,94 @@ ATExecDropConstraint(Relation rel, const char *constrName, else { /* - * If we were told to drop ONLY in this table (no recursion), we - * need to mark the inheritors' constraints as locally defined - * rather than inherited. + * If we were told to drop ONLY in this table (no recursion) and + * there are no further parents for this constraint, we need to + * mark the inheritors' constraints as locally defined rather than + * inherited. */ - con->coninhcount--; - con->conislocal = true; + childcon->coninhcount--; + if (childcon->coninhcount == 0) + childcon->conislocal = true; - CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); + CatalogTupleUpdate(conrel, &tuple->t_self, tuple); /* Make update visible */ CommandCounterIncrement(); } - heap_freetuple(copy_tuple); + heap_freetuple(tuple); table_close(childrel, NoLock); } + /* + * In addition, when dropping a primary key from a legacy-inheritance + * parent table, we must recurse to children to mark the corresponding NOT + * NULL constraint as no longer inherited, or drop it if this its last + * reference. + */ + if (con->contype == CONSTRAINT_PRIMARY && + rel->rd_rel->relkind == RELKIND_RELATION && + rel->rd_rel->relhassubclass) + { + List *colnames = NIL; + ListCell *lc; + List *pkready = NIL; + + /* + * Because primary keys are always marked as NO INHERIT, we don't have + * a list of children yet, so obtain one now. + */ + children = find_inheritance_children(RelationGetRelid(rel), lockmode); + + /* + * Find out the list of column names to process. Fortunately, we + * already have the list of column numbers. + */ + foreach(lc, unconstrained_cols) + { + colnames = lappend(colnames, get_attname(RelationGetRelid(rel), + lfirst_int(lc), false)); + } + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(pkready, childrelid)) + continue; /* child already processed */ + + /* find_inheritance_children already got lock */ + childrel = table_open(childrelid, NoLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); + + foreach(lc, colnames) + { + HeapTuple contup; + char *colName = lfirst(lc); + + contup = findNotNullConstraint(childrelid, colName); + if (contup == NULL) + elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\", relation \"%s\"", + colName, RelationGetRelationName(childrel)); + + dropconstraint_internal(childrel, contup, + DROP_RESTRICT, true, true, + false, &pkready, + lockmode); + pkready = NIL; + } + + table_close(childrel, NoLock); + + pkready = lappend_oid(pkready, childrelid); + } + } + table_close(conrel, RowExclusiveLock); + + return conobj; } /* @@ -13262,9 +13832,10 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) /* * If the constraint is inherited (only), we don't want to inject a - * new definition here; it'll get recreated when ATAddCheckConstraint - * recurses from adding the parent table's constraint. But we had to - * carry the info this far so that we can drop the constraint below. + * new definition here; it'll get recreated when + * ATAddCheckNNConstraint recurses from adding the parent table's + * constraint. But we had to carry the info this far so that we can + * drop the constraint below. */ if (!conislocal) continue; @@ -13511,10 +14082,10 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, NIL, con->conname); } - else if (cmd->subtype == AT_SetNotNull) + else if (cmd->subtype == AT_SetAttNotNull) { /* - * The parser will create AT_SetNotNull subcommands for + * The parser will create AT_AttSetNotNull subcommands for * columns of PRIMARY KEY indexes/constraints, but we need * not do anything with them here, because the columns' * NOT NULL marks will already have been propagated into @@ -14988,6 +15559,13 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode) /* OK to create inheritance */ CreateInheritance(child_rel, parent_rel); + /* + * If parent_rel has a primary key, then child_rel has not-null + * constraints that make these columns as non nullable. Make those + * constraints as inherited. + */ + ATInheritAdjustNotNulls(parent_rel, child_rel, 1); + ObjectAddressSet(address, RelationRelationId, RelationGetRelid(parent_rel)); @@ -15181,13 +15759,21 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) /* * Check child doesn't discard NOT NULL property. (Other - * constraints are checked elsewhere.) + * constraints are checked elsewhere.) However, if the constraint + * is NO INHERIT in the parent, this is allowed. */ if (attribute->attnotnull && !childatt->attnotnull) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" in child table must be marked NOT NULL", - attributeName))); + { + HeapTuple contup; + + contup = findNotNullConstraintAttnum(RelationGetRelid(parent_rel), + attribute->attnum); + if (!((Form_pg_constraint) GETSTRUCT(contup))->connoinherit) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" in child table must be marked NOT NULL", + attributeName))); + } /* * Child column must be generated if and only if parent column is. @@ -15264,6 +15850,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) SysScanDesc parent_scan; ScanKeyData parent_key; HeapTuple parent_tuple; + Oid parent_relid = RelationGetRelid(parent_rel); bool child_is_partition = false; catalog_relation = table_open(ConstraintRelationId, RowExclusiveLock); @@ -15277,7 +15864,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ScanKeyInit(&parent_key, Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(parent_rel))); + ObjectIdGetDatum(parent_relid)); parent_scan = systable_beginscan(catalog_relation, ConstraintRelidTypidNameIndexId, true, NULL, 1, &parent_key); @@ -15289,7 +15876,8 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) HeapTuple child_tuple; bool found = false; - if (parent_con->contype != CONSTRAINT_CHECK) + if (parent_con->contype != CONSTRAINT_CHECK && + parent_con->contype != CONSTRAINT_NOTNULL) continue; /* if the parent's constraint is marked NO INHERIT, it's not inherited */ @@ -15309,22 +15897,49 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); HeapTuple child_copy; - if (child_con->contype != CONSTRAINT_CHECK) + if (child_con->contype != parent_con->contype) continue; - if (strcmp(NameStr(parent_con->conname), + /* + * CHECK constraint are matched by name, NOT NULL ones by + * attribute number + */ + if (child_con->contype == CONSTRAINT_CHECK && + strcmp(NameStr(parent_con->conname), NameStr(child_con->conname)) != 0) continue; + else if (child_con->contype == CONSTRAINT_NOTNULL) + { + AttrNumber parent_attno = extractNotNullColumn(parent_tuple); + AttrNumber child_attno = extractNotNullColumn(child_tuple); + + if (strcmp(get_attname(parent_relid, parent_attno, false), + get_attname(RelationGetRelid(child_rel), child_attno, + false)) != 0) + continue; + } - if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) + if (child_con->contype == CONSTRAINT_CHECK && + !constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table \"%s\" has different definition for check constraint \"%s\"", RelationGetRelationName(child_rel), NameStr(parent_con->conname)))); - /* If the child constraint is "no inherit" then cannot merge */ - if (child_con->connoinherit) + /* + * If the CHECK child constraint is "no inherit" then cannot + * merge. + * + * This is not desirable for not-null constraints, mostly because + * it breaks our pg_upgrade strategy, but it also makes sense on + * its own: if a child has its own not-null constraint and then + * acquires a parent with the same constraint, then we start to + * enforce that constraint for all the descendants of that child + * too, if any. + */ + if (child_con->contype == CONSTRAINT_CHECK && + child_con->connoinherit) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("constraint \"%s\" conflicts with non-inherited constraint on child table \"%s\"", @@ -15353,6 +15968,9 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ereport(ERROR, errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many inheritance parents")); + if (child_con->contype == CONSTRAINT_NOTNULL && + child_con->connoinherit) + child_con->connoinherit = false; /* * In case of partitions, an inherited constraint must be @@ -15416,6 +16034,18 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) /* Off to RemoveInheritance() where most of the work happens */ RemoveInheritance(rel, parent_rel, false); + /* + * If parent_rel has a primary key, then child_rel has not-null + * constraints that make these columns as non nullable. Mark those + * constraints as no longer inherited by this parent. + */ + ATInheritAdjustNotNulls(parent_rel, rel, -1); + + /* + * If the parent has a primary key, then we decrement counts for all NOT + * NULL constraints + */ + ObjectAddressSet(address, RelationRelationId, RelationGetRelid(parent_rel)); @@ -15524,6 +16154,7 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) HeapTuple attributeTuple, constraintTuple; List *connames; + List *nncolumns; bool found; bool child_is_partition = false; @@ -15594,6 +16225,8 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) * this, we first need a list of the names of the parent's check * constraints. (We cheat a bit by only checking for name matches, * assuming that the expressions will match.) + * + * For NOT NULL columns, we store column numbers to match. */ catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock); ScanKeyInit(&key[0], @@ -15604,6 +16237,7 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) true, NULL, 1, key); connames = NIL; + nncolumns = NIL; while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { @@ -15611,6 +16245,8 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) if (con->contype == CONSTRAINT_CHECK) connames = lappend(connames, pstrdup(NameStr(con->conname))); + if (con->contype == CONSTRAINT_NOTNULL) + nncolumns = lappend_int(nncolumns, extractNotNullColumn(constraintTuple)); } systable_endscan(scan); @@ -15626,21 +16262,40 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - bool match; + bool match = false; ListCell *lc; - if (con->contype != CONSTRAINT_CHECK) - continue; - - match = false; - foreach(lc, connames) + /* + * Match CHECK constraints by name, not-null constraints by column + * number, and ignore all others. + */ + if (con->contype == CONSTRAINT_CHECK) { - if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) + foreach(lc, connames) { - match = true; - break; + if (con->contype == CONSTRAINT_CHECK && + strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) + { + match = true; + break; + } + } + } + else if (con->contype == CONSTRAINT_NOTNULL) + { + AttrNumber child_attno = extractNotNullColumn(constraintTuple); + + foreach(lc, nncolumns) + { + if (lfirst_int(lc) == child_attno) + { + match = true; + break; + } } } + else + continue; if (match) { @@ -15680,6 +16335,54 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) } /* + * Adjust coninhcount of not-null constraints upwards or downwards when a + * table is marked as inheriting or no longer doing so a table with a primary + * key. + * + * Note: these constraints are not dropped, even if their inhcount goes to zero + * and conislocal is false. Instead we mark the constraints as locally defined. + * This is seen as more useful behavior, with no downsides. The user can always + * drop them afterwards. + */ +static void +ATInheritAdjustNotNulls(Relation parent_rel, Relation child_rel, int inhcount) +{ + Bitmapset *pkattnos; + + /* Quick exit when parent has no PK */ + if (!parent_rel->rd_rel->relhasindex) + return; + + pkattnos = RelationGetIndexAttrBitmap(parent_rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + if (pkattnos != NULL) + { + Bitmapset *childattnums = NULL; + AttrMap *attmap; + int i; + + attmap = build_attrmap_by_name(RelationGetDescr(parent_rel), + RelationGetDescr(child_rel), true); + + i = -1; + while ((i = bms_next_member(pkattnos, i)) >= 0) + { + childattnums = bms_add_member(childattnums, + attmap->attnums[i + FirstLowInvalidHeapAttributeNumber - 1]); + } + + /* + * CCI is needed in case there's a NOT NULL PRIMARY KEY column in the + * parent: the relevant not-null constraint in the child already had + * its inhcount modified earlier. + */ + CommandCounterIncrement(); + AdjustNotNullInheritance(RelationGetRelid(child_rel), childattnums, + inhcount); + } +} + +/* * Drop the dependency created by StoreCatalogInheritance1 (CREATE TABLE * INHERITS/ALTER TABLE INHERIT -- refclassid will be RelationRelationId) or * heap_create_with_catalog (CREATE TABLE OF/ALTER TABLE OF -- refclassid will @@ -17530,7 +18233,7 @@ ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partParams, AttrNu * Do scanrel's existing constraints imply the partition constraint? * * "Existing constraints" include its check constraints and column-level - * NOT NULL constraints. partConstraint describes the partition constraint, + * not-null constraints. partConstraint describes the partition constraint, * in implicit-AND form. */ bool @@ -17910,7 +18613,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, StorePartitionBound(attachrel, rel, cmd->bound); /* Ensure there exists a correct set of indexes in the partition. */ - AttachPartitionEnsureIndexes(rel, attachrel); + AttachPartitionEnsureIndexes(wqueue, rel, attachrel); /* and triggers */ CloneRowTriggersToPartition(rel, attachrel); @@ -18023,13 +18726,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, * partitioned table. */ static void -AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) +AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) { List *idxes; List *attachRelIdxs; Relation *attachrelIdxRels; IndexInfo **attachInfos; - int i; ListCell *cell; MemoryContext cxt; MemoryContext oldcxt; @@ -18045,14 +18747,13 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs)); /* Build arrays of all existing indexes and their IndexInfos */ - i = 0; foreach(cell, attachRelIdxs) { Oid cldIdxId = lfirst_oid(cell); + int i = foreach_current_index(cell); attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock); attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]); - i++; } /* @@ -18118,7 +18819,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) * the first matching, valid, unattached one we find, if any, as * partition of the parent index. If we find one, we're done. */ - for (i = 0; i < list_length(attachRelIdxs); i++) + for (int i = 0; i < list_length(attachRelIdxs); i++) { Oid cldIdxId = RelationGetRelid(attachrelIdxRels[i]); Oid cldConstrOid = InvalidOid; @@ -18178,6 +18879,28 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) stmt = generateClonedIndexStmt(NULL, idxRel, attmap, &conOid); + + /* + * If the index is a primary key, mark all columns as NOT NULL if + * they aren't already. + */ + if (stmt->primary) + { + MemoryContextSwitchTo(oldcxt); + for (int j = 0; j < info->ii_NumIndexKeyAttrs; j++) + { + AttrNumber childattno; + + childattno = get_attnum(RelationGetRelid(attachrel), + get_attname(RelationGetRelid(rel), + info->ii_IndexAttrNumbers[j], + false)); + set_attnotnull(wqueue, attachrel, childattno, + true, AccessExclusiveLock); + } + MemoryContextSwitchTo(cxt); + } + DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid, RelationGetRelid(idxRel), conOid, @@ -18190,7 +18913,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) out: /* Clean up. */ - for (i = 0; i < list_length(attachRelIdxs); i++) + for (int i = 0; i < list_length(attachRelIdxs); i++) index_close(attachrelIdxRels[i], AccessShareLock); MemoryContextSwitchTo(oldcxt); MemoryContextDelete(cxt); @@ -18821,8 +19544,8 @@ DetachAddConstraintIfNeeded(List **wqueue, Relation partRel) n->initially_valid = true; n->skip_validation = true; /* It's a re-add, since it nominally already exists */ - ATAddCheckConstraint(wqueue, tab, partRel, n, - true, false, true, ShareUpdateExclusiveLock); + ATAddCheckNNConstraint(wqueue, tab, partRel, n, + true, false, true, ShareUpdateExclusiveLock); } } @@ -19091,6 +19814,13 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) RelationGetRelationName(partIdx)))); } + /* + * If it's a primary key, make sure the columns in the partition are + * NOT NULL. + */ + if (parentIdx->rd_index->indisprimary) + verifyPartitionIndexNotNull(childInfo, partTbl); + /* All good -- do it */ IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx)); if (OidIsValid(constraintOid)) @@ -19235,6 +19965,29 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl) } /* + * When attaching an index as a partition of a partitioned index which is a + * primary key, verify that all the columns in the partition are marked NOT + * NULL. + */ +static void +verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition) +{ + for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++) + { + Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition), + iinfo->ii_IndexAttrNumbers[i] - 1); + + if (!att->attnotnull) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("invalid primary key definition"), + errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.", + NameStr(att->attname), + RelationGetRelationName(partition))); + } +} + +/* * Return an OID list of constraints that reference the given relation * that are marked as having a parent constraints. */ |