aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c1264
1 files changed, 827 insertions, 437 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index eaa81424270..ccd9645e7d2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -371,7 +371,8 @@ static void truncate_check_activity(Relation rel);
static void RangeVarCallbackForTruncate(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static List *MergeAttributes(List *columns, const List *supers, char relpersistence,
- bool is_partition, List **supconstr);
+ bool is_partition, List **supconstr,
+ List **supnotnulls);
static List *MergeCheckConstraint(List *constraints, const char *name, Node *expr);
static void MergeChildAttribute(List *inh_columns, int exist_attno, int newcol_attno, const ColumnDef *newdef);
static ColumnDef *MergeInheritedAttribute(List *inh_columns, int exist_attno, const ColumnDef *newdef);
@@ -456,15 +457,14 @@ static bool check_for_column_name_collision(Relation rel, const char *colname,
bool if_not_exists);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid);
-static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode);
-static void ATPrepSetNotNull(List **wqueue, Relation rel,
- AlterTableCmd *cmd, bool recurse, bool recursing,
- LOCKMODE lockmode,
- AlterTableUtilityContext *context);
-static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode);
-static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode);
+static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
+ LOCKMODE lockmode);
+static void set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
+ LOCKMODE lockmode);
+static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel,
+ char *constrname, char *colName,
+ bool recurse, bool recursing,
+ LOCKMODE lockmode);
static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr);
static bool ConstraintImpliedByRelConstraint(Relation scanrel,
List *testConstraint, List *provenConstraint);
@@ -496,6 +496,9 @@ static ObjectAddress ATExecDropColumn(List **wqueue, Relation rel, const char *c
bool recurse, bool recursing,
bool missing_ok, LOCKMODE lockmode,
ObjectAddresses *addrs);
+static void ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
+ bool recurse, LOCKMODE lockmode,
+ AlterTableUtilityContext *context);
static ObjectAddress ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode);
static ObjectAddress ATExecAddStatistics(AlteredTableInfo *tab, Relation rel,
@@ -507,11 +510,11 @@ static ObjectAddress ATExecAddConstraint(List **wqueue,
static char *ChooseForeignKeyConstraintNameAddition(List *colnames);
static ObjectAddress ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, LOCKMODE lockmode);
-static ObjectAddress ATAddCheckConstraint(List **wqueue,
- AlteredTableInfo *tab, Relation rel,
- Constraint *constr,
- bool recurse, bool recursing, bool is_readd,
- LOCKMODE lockmode);
+static ObjectAddress ATAddCheckNNConstraint(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ Constraint *constr,
+ bool recurse, bool recursing, bool is_readd,
+ LOCKMODE lockmode);
static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab,
Relation rel, Constraint *fkconstraint,
bool recurse, bool recursing,
@@ -577,9 +580,12 @@ static void GetForeignKeyCheckTriggers(Relation trigrel,
Oid *insertTriggerOid,
Oid *updateTriggerOid);
static void ATExecDropConstraint(Relation rel, const char *constrName,
- DropBehavior behavior,
- bool recurse, bool recursing,
+ DropBehavior behavior, bool recurse,
bool missing_ok, LOCKMODE lockmode);
+static ObjectAddress dropconstraint_internal(Relation rel,
+ HeapTuple constraintTup, DropBehavior behavior,
+ bool recurse, bool recursing,
+ bool missing_ok, LOCKMODE lockmode);
static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
@@ -677,6 +683,7 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx,
static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
+static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx);
static List *GetParentedForeignKeyRefs(Relation partition);
static void ATDetachCheckNoForeignKeyRefs(Relation partition);
static char GetAttributeCompression(Oid atttypid, const char *compression);
@@ -714,8 +721,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
TupleDesc descriptor;
List *inheritOids;
List *old_constraints;
+ List *old_notnulls;
List *rawDefaults;
List *cookedDefaults;
+ List *nncols;
Datum reloptions;
ListCell *listptr;
AttrNumber attnum;
@@ -906,12 +915,13 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
MergeAttributes(stmt->tableElts, inheritOids,
stmt->relation->relpersistence,
stmt->partbound != NULL,
- &old_constraints);
+ &old_constraints, &old_notnulls);
/*
* Create a tuple descriptor from the relation schema. Note that this
- * deals with column names, types, and not-null constraints, but not
- * default values or CHECK constraints; we handle those below.
+ * deals with column names, types, and in-descriptor NOT NULL flags, but
+ * not default values, NOT NULL or CHECK constraints; we handle those
+ * below.
*/
descriptor = BuildDescForRelation(stmt->tableElts);
@@ -1283,6 +1293,17 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
AddRelationNewConstraints(rel, NIL, stmt->constraints,
true, true, false, queryString);
+ /*
+ * Finally, merge the not-null constraints that are declared directly with
+ * those that come from parent relations (making sure to count inheritance
+ * appropriately for each), create them, and set the attnotnull flag on
+ * columns that don't yet have it.
+ */
+ nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints,
+ old_notnulls);
+ foreach_int(attrnum, nncols)
+ set_attnotnull(NULL, rel, attrnum, NoLock);
+
ObjectAddressSet(address, RelationRelationId, relationId);
/*
@@ -2414,6 +2435,8 @@ storage_name(char c)
* Output arguments:
* 'supconstr' receives a list of constraints belonging to the parents,
* updated as necessary to be valid for the child.
+ * 'supnotnulls' receives a list of CookedConstraints that corresponds to
+ * constraints coming from inheritance parents.
*
* Return value:
* Completed schema list.
@@ -2444,7 +2467,10 @@ storage_name(char c)
*
* Constraints (including not-null constraints) for the child table
* are the union of all relevant constraints, from both the child schema
- * and parent tables.
+ * and parent tables. In addition, in legacy inheritance, each column that
+ * appears in a primary key in any of the parents also gets a NOT NULL
+ * constraint (partitioning doesn't need this, because the PK itself gets
+ * inherited.)
*
* The default value for a child column is defined as:
* (1) If the child schema specifies a default, that value is used.
@@ -2463,10 +2489,11 @@ storage_name(char c)
*/
static List *
MergeAttributes(List *columns, const List *supers, char relpersistence,
- bool is_partition, List **supconstr)
+ bool is_partition, List **supconstr, List **supnotnulls)
{
List *inh_columns = NIL;
List *constraints = NIL;
+ List *nnconstraints = NIL;
bool have_bogus_defaults = false;
int child_attno;
static Node bogus_marker = {0}; /* marks conflicting defaults */
@@ -2577,8 +2604,10 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
AttrMap *newattmap;
List *inherited_defaults;
List *cols_with_defaults;
+ List *nnconstrs;
ListCell *lc1;
ListCell *lc2;
+ Bitmapset *nncols = NULL;
/* caller already got lock */
relation = table_open(parent, NoLock);
@@ -2666,6 +2695,15 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
/* We can't process inherited defaults until newattmap is complete. */
inherited_defaults = cols_with_defaults = NIL;
+ /*
+ * Request attnotnull on columns that have a not-null constraint
+ * that's not marked NO INHERIT.
+ */
+ nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation),
+ true, false);
+ foreach_ptr(CookedConstraint, cc, nnconstrs)
+ nncols = bms_add_member(nncols, cc->attnum);
+
for (AttrNumber parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
@@ -2687,7 +2725,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
*/
newdef = makeColumnDef(attributeName, attribute->atttypid,
attribute->atttypmod, attribute->attcollation);
- newdef->is_not_null = attribute->attnotnull;
newdef->storage = attribute->attstorage;
newdef->generated = attribute->attgenerated;
if (CompressionMethodIsValid(attribute->attcompression))
@@ -2736,6 +2773,12 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
}
/*
+ * mark attnotnull if parent has it
+ */
+ if (bms_is_member(parent_attno, nncols))
+ mergeddef->is_not_null = true;
+
+ /*
* Locate default/generation expression if any
*/
if (attribute->atthasdef)
@@ -2846,6 +2889,19 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
}
}
+ /*
+ * Also copy the not-null constraints from this parent. The
+ * attnotnull markings were already installed above.
+ */
+ foreach_ptr(CookedConstraint, nn, nnconstrs)
+ {
+ Assert(nn->contype == CONSTR_NOTNULL);
+
+ nn->attnum = newattmap->attnums[nn->attnum - 1];
+
+ nnconstraints = lappend(nnconstraints, nn);
+ }
+
free_attrmap(newattmap);
/*
@@ -2916,8 +2972,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
/*
* Now that we have the column definition list for a partition, we can
* check whether the columns referenced in the column constraint specs
- * actually exist. Also, we merge parent's not-null constraints and
- * defaults into each corresponding column definition.
+ * actually exist. Also, merge column defaults.
*/
if (is_partition)
{
@@ -2934,7 +2989,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
if (strcmp(coldef->colname, restdef->colname) == 0)
{
found = true;
- coldef->is_not_null |= restdef->is_not_null;
/*
* Check for conflicts related to generated columns.
@@ -3023,6 +3077,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence,
}
*supconstr = constraints;
+ *supnotnulls = nnconstraints;
return columns;
}
@@ -3304,11 +3359,6 @@ MergeInheritedAttribute(List *inh_columns,
format_type_with_typemod(newtypeid, newtypmod))));
/*
- * Merge of not-null constraints = OR 'em together
- */
- prevdef->is_not_null |= newdef->is_not_null;
-
- /*
* Must have the same collation
*/
prevcollid = GetColumnDefCollation(NULL, prevdef, prevtypeid);
@@ -3946,7 +3996,10 @@ rename_constraint_internal(Oid myrelid,
constraintOid);
con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit)
+ if (myrelid &&
+ (con->contype == CONSTRAINT_CHECK ||
+ con->contype == CONSTRAINT_NOTNULL) &&
+ !con->connoinherit)
{
if (recurse)
{
@@ -4704,15 +4757,6 @@ AlterTableGetLockLevel(List *cmds)
cmd_lockmode = ShareUpdateExclusiveLock;
break;
- case AT_CheckNotNull:
-
- /*
- * This only examines the table's schema; but lock must be
- * strong enough to prevent concurrent DROP NOT NULL.
- */
- cmd_lockmode = AccessShareLock;
- break;
-
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
@@ -4881,22 +4925,17 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(cmd->subtype, rel,
ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE);
- ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
+ /* Set up recursion for phase 2; no other prep needed */
+ if (recurse)
+ cmd->recurse = true;
pass = AT_PASS_DROP;
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(cmd->subtype, rel,
ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE);
- /* Need command-specific recursion decision */
- ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing,
- lockmode, context);
- pass = AT_PASS_COL_ATTRS;
- break;
- case AT_CheckNotNull: /* check column is already marked NOT NULL */
- ATSimplePermissions(cmd->subtype, rel,
- ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE);
- ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
- /* No command-specific prep needed */
+ /* Set up recursion for phase 2; no other prep needed */
+ if (recurse)
+ cmd->recurse = true;
pass = AT_PASS_COL_ATTRS;
break;
case AT_SetExpression: /* ALTER COLUMN SET EXPRESSION */
@@ -4961,10 +5000,13 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_AddConstraint: /* ADD CONSTRAINT */
ATSimplePermissions(cmd->subtype, rel,
ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE);
- /* Recursion occurs during execution phase */
- /* No command-specific prep needed except saving recurse flag */
+ ATPrepAddPrimaryKey(wqueue, rel, cmd, recurse, lockmode, context);
if (recurse)
+ {
+ /* recurses at exec time; lock descendants and set flag */
+ (void) find_all_inheritors(RelationGetRelid(rel), lockmode, NULL);
cmd->recurse = true;
+ }
pass = AT_PASS_ADD_CONSTR;
break;
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
@@ -5279,13 +5321,11 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode, cmd->recurse, false);
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
- address = ATExecDropNotNull(rel, cmd->name, lockmode);
+ address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode);
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
- address = ATExecSetNotNull(tab, rel, cmd->name, lockmode);
- break;
- case AT_CheckNotNull: /* check column is already marked NOT NULL */
- ATExecCheckNotNull(tab, rel, cmd->name, lockmode);
+ address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name,
+ cmd->recurse, false, lockmode);
break;
case AT_SetExpression:
address = ATExecSetExpression(tab, rel, cmd->name, cmd->def, lockmode);
@@ -5368,7 +5408,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
- cmd->recurse, false,
+ cmd->recurse,
cmd->missing_ok, lockmode);
break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */
@@ -5631,21 +5671,10 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/
switch (cmd2->subtype)
{
- case AT_SetNotNull:
- /* Need command-specific recursion decision */
- ATPrepSetNotNull(wqueue, rel, cmd2,
- recurse, false,
- lockmode, context);
- pass = AT_PASS_COL_ATTRS;
- break;
case AT_AddIndex:
- /* This command never recurses */
- /* No command-specific prep needed */
pass = AT_PASS_ADD_INDEX;
break;
case AT_AddIndexConstraint:
- /* This command never recurses */
- /* No command-specific prep needed */
pass = AT_PASS_ADD_INDEXCONSTR;
break;
case AT_AddConstraint:
@@ -5654,6 +5683,9 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
cmd2->recurse = true;
switch (castNode(Constraint, cmd2->def)->contype)
{
+ case CONSTR_NOTNULL:
+ pass = AT_PASS_COL_ATTRS;
+ break;
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
case CONSTR_EXCLUSION:
@@ -6093,8 +6125,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
/*
* If we are rebuilding the tuples OR if we added any new but not
* verified not-null constraints, check all not-null constraints. This
- * is a bit of overkill but it minimizes risk of bugs, and
- * heap_attisnull is a pretty cheap test anyway.
+ * is a bit of overkill but it minimizes risk of bugs.
*/
for (i = 0; i < newTupDesc->natts; i++)
{
@@ -6314,6 +6345,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
RelationGetRelationName(oldrel)),
errtableconstraint(oldrel, con->name)));
break;
+ case CONSTR_NOTNULL:
case CONSTR_FOREIGN:
/* Nothing to do here */
break;
@@ -6427,8 +6459,6 @@ alter_table_type_to_string(AlterTableType cmdtype)
return "ALTER COLUMN ... SET EXPRESSION";
case AT_DropExpression:
return "ALTER COLUMN ... DROP EXPRESSION";
- case AT_CheckNotNull:
- return NULL; /* not real grammar */
case AT_SetStatistics:
return "ALTER COLUMN ... SET STATISTICS";
case AT_SetOptions:
@@ -7524,13 +7554,14 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid)
* nullable, InvalidObjectAddress is returned.
*/
static ObjectAddress
-ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
+ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
+ LOCKMODE lockmode)
{
HeapTuple tuple;
+ HeapTuple conTup;
Form_pg_attribute attTup;
AttrNumber attnum;
Relation attr_rel;
- List *indexoidlist;
ObjectAddress address;
/*
@@ -7546,6 +7577,15 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
colName, RelationGetRelationName(rel))));
attTup = (Form_pg_attribute) GETSTRUCT(tuple);
attnum = attTup->attnum;
+ ObjectAddressSubSet(address, RelationRelationId,
+ RelationGetRelid(rel), attnum);
+
+ /* If the column is already nullable there's nothing to do. */
+ if (!attTup->attnotnull)
+ {
+ table_close(attr_rel, RowExclusiveLock);
+ return InvalidObjectAddress;
+ }
/* Prevent them from altering a system attribute */
if (attnum <= 0)
@@ -7561,60 +7601,8 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
colName, RelationGetRelationName(rel))));
/*
- * Check that the attribute is not in a primary key or in an index used as
- * a replica identity.
- *
- * Note: we'll throw error even if the pkey index is not valid.
+ * If rel is partition, shouldn't drop NOT NULL if parent has the same.
*/
-
- /* Loop over all indexes on the relation */
- indexoidlist = RelationGetIndexList(rel);
-
- foreach_oid(indexoid, indexoidlist)
- {
- HeapTuple indexTuple;
- Form_pg_index indexStruct;
-
- indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
- if (!HeapTupleIsValid(indexTuple))
- elog(ERROR, "cache lookup failed for index %u", indexoid);
- indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
-
- /*
- * If the index is not a primary key or an index used as replica
- * identity, skip the check.
- */
- if (indexStruct->indisprimary || indexStruct->indisreplident)
- {
- /*
- * Loop over each attribute in the primary key or the index used
- * as replica identity and see if it matches the to-be-altered
- * attribute.
- */
- for (int i = 0; i < indexStruct->indnkeyatts; i++)
- {
- if (indexStruct->indkey.values[i] == attnum)
- {
- if (indexStruct->indisprimary)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("column \"%s\" is in a primary key",
- colName)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("column \"%s\" is in index used as replica identity",
- colName)));
- }
- }
- }
-
- ReleaseSysCache(indexTuple);
- }
-
- list_free(indexoidlist);
-
- /* If rel is partition, shouldn't drop NOT NULL if parent has the same */
if (rel->rd_rel->relispartition)
{
Oid parentId = get_partition_parent(RelationGetRelid(rel), false);
@@ -7632,19 +7620,18 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
}
/*
- * Okay, actually perform the catalog change ... if needed
+ * Find the constraint that makes this column NOT NULL, and drop it.
+ * dropconstraint_internal() resets attnotnull.
*/
- if (attTup->attnotnull)
- {
- attTup->attnotnull = false;
+ conTup = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum);
+ if (conTup == NULL)
+ elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation \"%s\"",
+ colName, RelationGetRelationName(rel));
- CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
-
- ObjectAddressSubSet(address, RelationRelationId,
- RelationGetRelid(rel), attnum);
- }
- else
- address = InvalidObjectAddress;
+ /* The normal case: we have a pg_constraint row, remove it */
+ dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false,
+ false, lockmode);
+ heap_freetuple(conTup);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel), attnum);
@@ -7655,104 +7642,105 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
}
/*
- * ALTER TABLE ALTER COLUMN SET NOT NULL
+ * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3
+ * to verify it.
+ *
+ * When called to alter an existing table, 'wqueue' must be given so that we
+ * can queue a check that existing tuples pass the constraint. When called
+ * from table creation, 'wqueue' should be passed as NULL.
*/
-
static void
-ATPrepSetNotNull(List **wqueue, Relation rel,
- AlterTableCmd *cmd, bool recurse, bool recursing,
- LOCKMODE lockmode, AlterTableUtilityContext *context)
+set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum,
+ LOCKMODE lockmode)
{
+ Form_pg_attribute attr;
+
+ CheckAlterTableIsSafe(rel);
+
/*
- * If we're already recursing, there's nothing to do; the topmost
- * invocation of ATSimpleRecursion already visited all children.
+ * Exit quickly by testing attnotnull from the tupledesc's copy of the
+ * attribute.
*/
- if (recursing)
+ attr = TupleDescAttr(RelationGetDescr(rel), attnum - 1);
+ if (attr->attisdropped)
return;
- /*
- * If the target column is already marked NOT NULL, we can skip recursing
- * to children, because their columns should already be marked NOT NULL as
- * well. But there's no point in checking here unless the relation has
- * some children; else we can just wait till execution to check. (If it
- * does have children, however, this can save taking per-child locks
- * unnecessarily. This greatly improves concurrency in some parallel
- * restore scenarios.)
- *
- * Unfortunately, we can only apply this optimization to partitioned
- * tables, because traditional inheritance doesn't enforce that child
- * columns be NOT NULL when their parent is. (That's a bug that should
- * get fixed someday.)
- */
- if (rel->rd_rel->relhassubclass &&
- rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ if (!attr->attnotnull)
{
+ Relation attr_rel;
HeapTuple tuple;
- bool attnotnull;
- tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name);
+ attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
- /* Might as well throw the error now, if name is bad */
+ tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
if (!HeapTupleIsValid(tuple))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- cmd->name, RelationGetRelationName(rel))));
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ attnum, RelationGetRelid(rel));
- attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull;
- ReleaseSysCache(tuple);
- if (attnotnull)
- return;
- }
+ attr = (Form_pg_attribute) GETSTRUCT(tuple);
+ Assert(!attr->attnotnull);
+ attr->attnotnull = true;
+ CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
- /*
- * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table,
- * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use
- * normal recursion logic.
- */
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
- !recurse)
- {
- AlterTableCmd *newcmd = makeNode(AlterTableCmd);
+ /*
+ * If the nullness isn't already proven by validated constraints, have
+ * ALTER TABLE phase 3 test for it.
+ */
+ if (wqueue && !NotNullImpliedByRelConstraints(rel, attr))
+ {
+ AlteredTableInfo *tab;
+
+ tab = ATGetQueueEntry(wqueue, rel);
+ tab->verify_new_notnull = true;
+ }
+
+ CommandCounterIncrement();
- newcmd->subtype = AT_CheckNotNull;
- newcmd->name = pstrdup(cmd->name);
- ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context);
+ table_close(attr_rel, RowExclusiveLock);
+ heap_freetuple(tuple);
}
- else
- ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
}
/*
- * Return the address of the modified column. If the column was already NOT
- * NULL, InvalidObjectAddress is returned.
+ * ALTER TABLE ALTER COLUMN SET NOT NULL
+ *
+ * Add a not-null constraint to a single table and its children. Returns
+ * the address of the constraint added to the parent relation, if one gets
+ * added, or InvalidObjectAddress otherwise.
+ *
+ * We must recurse to child tables during execution, rather than using
+ * ALTER TABLE's normal prep-time recursion.
*/
static ObjectAddress
-ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode)
+ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
+ bool recurse, bool recursing, LOCKMODE lockmode)
{
HeapTuple tuple;
- Form_pg_attribute attTup;
AttrNumber attnum;
- Relation attr_rel;
ObjectAddress address;
+ Constraint *constraint;
+ CookedConstraint *ccon;
+ List *cooked;
+ bool is_no_inherit = false;
- /*
- * lookup the attribute
- */
- attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
+ /* Guard against stack overflow due to overly deep inheritance tree. */
+ check_stack_depth();
- tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ {
+ ATSimplePermissions(AT_AddConstraint, rel,
+ ATT_PARTITIONED_TABLE | ATT_TABLE | ATT_FOREIGN_TABLE);
+ Assert(conName != NULL);
+ }
- if (!HeapTupleIsValid(tuple))
+ attnum = get_attnum(RelationGetRelid(rel), colName);
+ if (attnum == InvalidAttrNumber)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
colName, RelationGetRelationName(rel))));
- attTup = (Form_pg_attribute) GETSTRUCT(tuple);
- attnum = attTup->attnum;
-
/* Prevent them from altering a system attribute */
if (attnum <= 0)
ereport(ERROR,
@@ -7760,79 +7748,130 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
errmsg("cannot alter system column \"%s\"",
colName)));
- /*
- * Okay, actually perform the catalog change ... if needed
- */
- if (!attTup->attnotnull)
+ /* See if there's already a constraint */
+ tuple = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum);
+ if (HeapTupleIsValid(tuple))
{
- attTup->attnotnull = true;
+ Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple);
+ bool changed = false;
- CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
+ /*
+ * Don't let a NO INHERIT constraint be changed into inherit.
+ */
+ if (conForm->connoinherit && recurse)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot change NO INHERIT status of NOT NULL constraint \"%s\" on relation \"%s\"",
+ NameStr(conForm->conname),
+ RelationGetRelationName(rel)));
/*
- * Ordinarily phase 3 must ensure that no NULLs exist in columns that
- * are set NOT NULL; however, if we can find a constraint which proves
- * this then we can skip that. We needn't bother looking if we've
- * already found that we must verify some other not-null constraint.
+ * If we find an appropriate constraint, we're almost done, but just
+ * need to change some properties on it: if we're recursing, increment
+ * coninhcount; if not, set conislocal if not already set.
*/
- if (!tab->verify_new_notnull && !NotNullImpliedByRelConstraints(rel, attTup))
+ if (recursing)
{
- /* Tell Phase 3 it needs to test the constraint */
- tab->verify_new_notnull = true;
+ if (pg_add_s16_overflow(conForm->coninhcount, 1,
+ &conForm->coninhcount))
+ ereport(ERROR,
+ errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("too many inheritance parents"));
+ changed = true;
+ }
+ else if (!conForm->conislocal)
+ {
+ conForm->conislocal = true;
+ changed = true;
}
- ObjectAddressSubSet(address, RelationRelationId,
- RelationGetRelid(rel), attnum);
+ if (changed)
+ {
+ Relation constr_rel;
+
+ constr_rel = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ CatalogTupleUpdate(constr_rel, &tuple->t_self, tuple);
+ ObjectAddressSet(address, ConstraintRelationId, conForm->oid);
+ table_close(constr_rel, RowExclusiveLock);
+ }
+
+ if (changed)
+ return address;
+ else
+ return InvalidObjectAddress;
+ }
+
+ /*
+ * If we're asked not to recurse, and children exist, raise an error for
+ * partitioned tables. For inheritance, we act as if NO INHERIT had been
+ * specified.
+ */
+ if (!recurse &&
+ find_inheritance_children(RelationGetRelid(rel),
+ NoLock) != NIL)
+ {
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be added to child tables too"),
+ errhint("Do not specify the ONLY keyword."));
+ else
+ is_no_inherit = true;
}
- else
- address = InvalidObjectAddress;
+
+ /*
+ * No constraint exists; we must add one. First determine a name to use,
+ * if we haven't already.
+ */
+ if (!recursing)
+ {
+ Assert(conName == NULL);
+ conName = ChooseConstraintName(RelationGetRelationName(rel),
+ colName, "not_null",
+ RelationGetNamespace(rel),
+ NIL);
+ }
+
+ constraint = makeNotNullConstraint(makeString(colName));
+ constraint->is_no_inherit = is_no_inherit;
+ constraint->conname = conName;
+
+ /* and do it */
+ cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint),
+ false, !recursing, false, NULL);
+ ccon = linitial(cooked);
+ ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel), attnum);
- table_close(attr_rel, RowExclusiveLock);
+ /* Mark pg_attribute.attnotnull for the column */
+ set_attnotnull(wqueue, rel, attnum, lockmode);
- return address;
-}
+ /*
+ * Recurse to propagate the constraint to children that don't have one.
+ */
+ if (recurse)
+ {
+ List *children;
-/*
- * ALTER TABLE ALTER COLUMN CHECK NOT NULL
- *
- * This doesn't exist in the grammar, but we generate AT_CheckNotNull
- * commands against the partitions of a partitioned table if the user
- * writes ALTER TABLE ONLY ... SET NOT NULL on the partitioned table,
- * or tries to create a primary key on it (which internally creates
- * AT_SetNotNull on the partitioned table). Such a command doesn't
- * allow us to actually modify any partition, but we want to let it
- * go through if the partitions are already properly marked.
- *
- * In future, this might need to adjust the child table's state, likely
- * by incrementing an inheritance count for the attnotnull constraint.
- * For now we need only check for the presence of the flag.
- */
-static void
-ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode)
-{
- HeapTuple tuple;
+ children = find_inheritance_children(RelationGetRelid(rel),
+ lockmode);
- tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+ foreach_oid(childoid, children)
+ {
+ Relation childrel = table_open(childoid, NoLock);
- if (!HeapTupleIsValid(tuple))
- ereport(ERROR,
- errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- colName, RelationGetRelationName(rel)));
+ CommandCounterIncrement();
- if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("constraint must be added to child tables too"),
- errdetail("Column \"%s\" of relation \"%s\" is not already NOT NULL.",
- colName, RelationGetRelationName(rel)),
- errhint("Do not specify the ONLY keyword.")));
+ ATExecSetNotNull(wqueue, childrel, conName, colName,
+ recurse, true, lockmode);
+ table_close(childrel, NoLock);
+ }
+ }
- ReleaseSysCache(tuple);
+ return address;
}
/*
@@ -9140,6 +9179,71 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
}
/*
+ * Prepare to add a primary key on table, by adding not-null constraints
+ * on all columns.
+ */
+static void
+ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
+ bool recurse, LOCKMODE lockmode,
+ AlterTableUtilityContext *context)
+{
+ ListCell *lc;
+ Constraint *pkconstr;
+
+ pkconstr = castNode(Constraint, cmd->def);
+ if (pkconstr->contype != CONSTR_PRIMARY)
+ return;
+
+ /*
+ * If not recursing, we must ensure that all children have a NOT NULL
+ * constraint on the columns, and error out if not.
+ */
+ if (!recurse)
+ {
+ List *children;
+
+ children = find_inheritance_children(RelationGetRelid(rel),
+ lockmode);
+ foreach_oid(childrelid, children)
+ {
+ foreach(lc, pkconstr->keys)
+ {
+ HeapTuple tup;
+ Form_pg_attribute attrForm;
+ char *attname = strVal(lfirst(lc));
+
+ tup = SearchSysCacheAttName(childrelid, attname);
+ if (!tup)
+ elog(ERROR, "cache lookup failed for attribute %s of relation %u",
+ attname, childrelid);
+ attrForm = (Form_pg_attribute) GETSTRUCT(tup);
+ if (!attrForm->attnotnull)
+ ereport(ERROR,
+ errmsg("column \"%s\" of table \"%s\" is not marked NOT NULL",
+ attname, get_rel_name(childrelid)));
+ ReleaseSysCache(tup);
+ }
+ }
+ }
+
+ /* Insert not-null constraints in the queue for the PK columns */
+ foreach(lc, pkconstr->keys)
+ {
+ AlterTableCmd *newcmd;
+ Constraint *nnconstr;
+
+ nnconstr = makeNotNullConstraint(lfirst(lc));
+
+ newcmd = makeNode(AlterTableCmd);
+ newcmd->subtype = AT_AddConstraint;
+ newcmd->recurse = true;
+ newcmd->def = (Node *) nnconstr;
+
+ ATPrepCmd(wqueue, rel, newcmd, true, false, lockmode, context);
+ }
+}
+
+/*
* ALTER TABLE ADD INDEX
*
* There is no such command in the grammar, but parse_utilcmd.c converts
@@ -9334,17 +9438,18 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Assert(IsA(newConstraint, Constraint));
/*
- * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes
- * arriving here (see the preprocessing done in parse_utilcmd.c). Use a
- * switch anyway to make it easier to add more code later.
+ * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and
+ * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in
+ * parse_utilcmd.c).
*/
switch (newConstraint->contype)
{
case CONSTR_CHECK:
+ case CONSTR_NOTNULL:
address =
- ATAddCheckConstraint(wqueue, tab, rel,
- newConstraint, recurse, false, is_readd,
- lockmode);
+ ATAddCheckNNConstraint(wqueue, tab, rel,
+ newConstraint, recurse, false, is_readd,
+ lockmode);
break;
case CONSTR_FOREIGN:
@@ -9425,9 +9530,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames)
}
/*
- * Add a check constraint to a single table and its children. Returns the
- * address of the constraint added to the parent relation, if one gets added,
- * or InvalidObjectAddress otherwise.
+ * Add a check or not-null constraint to a single table and its children.
+ * Returns the address of the constraint added to the parent relation,
+ * if one gets added, or InvalidObjectAddress otherwise.
*
* Subroutine for ATExecAddConstraint.
*
@@ -9440,9 +9545,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames)
* the parent table and pass that down.
*/
static ObjectAddress
-ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
- Constraint *constr, bool recurse, bool recursing,
- bool is_readd, LOCKMODE lockmode)
+ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Constraint *constr, bool recurse, bool recursing,
+ bool is_readd, LOCKMODE lockmode)
{
List *newcons;
ListCell *lcon;
@@ -9450,6 +9555,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
ListCell *child;
ObjectAddress address = InvalidObjectAddress;
+ /* Guard against stack overflow due to overly deep inheritance tree. */
+ check_stack_depth();
+
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(AT_AddConstraint, rel,
@@ -9481,7 +9589,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
{
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- if (!ccon->skip_validation)
+ if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL)
{
NewConstraint *newcon;
@@ -9497,11 +9605,18 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (constr->conname == NULL)
constr->conname = ccon->name;
+ /*
+ * If adding a not-null constraint, set the pg_attribute flag and tell
+ * phase 3 to verify existing rows, if needed.
+ */
+ if (constr->contype == CONSTR_NOTNULL)
+ set_attnotnull(wqueue, rel, ccon->attnum, lockmode);
+
ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
}
/* At this point we must have a locked-down name to use */
- Assert(constr->conname != NULL);
+ Assert(newcons == NIL || constr->conname != NULL);
/* Advance command counter in case same table is visited multiple times */
CommandCounterIncrement();
@@ -9531,7 +9646,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/*
* Check if ONLY was specified with ALTER TABLE. If so, allow the
- * constraint creation only if there are no children currently. Error out
+ * constraint creation only if there are no children currently. Error out
* otherwise.
*/
if (!recurse && children != NIL)
@@ -9539,6 +9654,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("constraint must be added to child tables too")));
+ /*
+ * Recurse to create the constraint on each child.
+ */
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
@@ -9552,9 +9670,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* Find or create work queue entry for this table */
childtab = ATGetQueueEntry(wqueue, childrel);
- /* Recurse to child */
- ATAddCheckConstraint(wqueue, childtab, childrel,
- constr, recurse, true, is_readd, lockmode);
+ /* Recurse to this child */
+ ATAddCheckNNConstraint(wqueue, childtab, childrel,
+ constr, recurse, true, is_readd, lockmode);
table_close(childrel, NoLock);
}
@@ -12667,24 +12785,14 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
*/
static void
ATExecDropConstraint(Relation rel, const char *constrName,
- DropBehavior behavior,
- bool recurse, bool recursing,
+ DropBehavior behavior, bool recurse,
bool missing_ok, LOCKMODE lockmode)
{
- List *children;
Relation conrel;
- Form_pg_constraint con;
SysScanDesc scan;
ScanKeyData skey[3];
HeapTuple tuple;
bool found = false;
- bool is_no_inherit_constraint = false;
- char contype;
-
- /* At top level, permission check was done in ATPrepCmd, else do it */
- if (recursing)
- ATSimplePermissions(AT_DropConstraint, rel,
- ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE);
conrel = table_open(ConstraintRelationId, RowExclusiveLock);
@@ -12709,80 +12817,190 @@ ATExecDropConstraint(Relation rel, const char *constrName,
/* There can be at most one matching row */
if (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
- ObjectAddress conobj;
+ dropconstraint_internal(rel, tuple, behavior, recurse, false,
+ missing_ok, lockmode);
+ found = true;
+ }
- con = (Form_pg_constraint) GETSTRUCT(tuple);
+ systable_endscan(scan);
- /* Don't drop inherited constraints */
- if (con->coninhcount > 0 && !recursing)
+ if (!found)
+ {
+ if (!missing_ok)
ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
- constrName, RelationGetRelationName(rel))));
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel)));
+ else
+ ereport(NOTICE,
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping",
+ constrName, RelationGetRelationName(rel)));
+ }
- is_no_inherit_constraint = con->connoinherit;
- contype = con->contype;
+ table_close(conrel, RowExclusiveLock);
+}
+
+/*
+ * Remove a constraint, using its pg_constraint tuple
+ *
+ * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN
+ * DROP NOT NULL.
+ *
+ * Returns the address of the constraint being removed.
+ */
+static ObjectAddress
+dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior,
+ bool recurse, bool recursing, bool missing_ok,
+ LOCKMODE lockmode)
+{
+ Relation conrel;
+ Form_pg_constraint con;
+ ObjectAddress conobj;
+ List *children;
+ bool is_no_inherit_constraint = false;
+ char *constrName;
+ char *colname = NULL;
+
+ /* Guard against stack overflow due to overly deep inheritance tree. */
+ check_stack_depth();
+
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ ATSimplePermissions(AT_DropConstraint, rel,
+ ATT_TABLE | ATT_PARTITIONED_TABLE | ATT_FOREIGN_TABLE);
+
+ conrel = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ con = (Form_pg_constraint) GETSTRUCT(constraintTup);
+ constrName = NameStr(con->conname);
+
+ /* Don't allow drop of inherited constraints */
+ if (con->coninhcount > 0 && !recursing)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
+ constrName, RelationGetRelationName(rel))));
+
+ /*
+ * Reset pg_constraint.attnotnull, if this is a not-null constraint.
+ *
+ * While doing that, we're in a good position to disallow dropping a not-
+ * null constraint underneath a primary key, a replica identity index, or
+ * a generated identity column.
+ */
+ if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ Relation attrel = table_open(AttributeRelationId, RowExclusiveLock);
+ AttrNumber attnum = extractNotNullColumn(constraintTup);
+ Bitmapset *pkattrs;
+ Bitmapset *irattrs;
+ HeapTuple atttup;
+ Form_pg_attribute attForm;
+
+ /* save column name for recursion step */
+ colname = get_attname(RelationGetRelid(rel), attnum, false);
/*
- * If it's a foreign-key constraint, we'd better lock the referenced
- * table and check that that's not in use, just as we've already done
- * for the constrained table (else we might, eg, be dropping a trigger
- * that has unfired events). But we can/must skip that in the
- * self-referential case.
+ * Disallow if it's in the primary key. For partitioned tables we
+ * cannot rely solely on RelationGetIndexAttrBitmap, because it'll
+ * return NULL if the primary key is invalid; but we still need to
+ * protect not-null constraints under such a constraint, so check the
+ * slow way.
*/
- if (contype == CONSTRAINT_FOREIGN &&
- con->confrelid != RelationGetRelid(rel))
+ pkattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+ if (pkattrs == NULL &&
+ rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
- Relation frel;
+ Oid pkindex = RelationGetPrimaryKeyIndex(rel, true);
+
+ if (OidIsValid(pkindex))
+ {
+ Relation pk = relation_open(pkindex, AccessShareLock);
+
+ pkattrs = NULL;
+ for (int i = 0; i < pk->rd_index->indnkeyatts; i++)
+ pkattrs = bms_add_member(pkattrs, pk->rd_index->indkey.values[i]);
- /* Must match lock taken by RemoveTriggerById: */
- frel = table_open(con->confrelid, AccessExclusiveLock);
- CheckAlterTableIsSafe(frel);
- table_close(frel, NoLock);
+ relation_close(pk, AccessShareLock);
+ }
}
- /*
- * Perform the actual constraint deletion
- */
- conobj.classId = ConstraintRelationId;
- conobj.objectId = con->oid;
- conobj.objectSubId = 0;
+ if (pkattrs &&
+ bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, pkattrs))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" is in a primary key",
+ get_attname(RelationGetRelid(rel), attnum, false)));
+
+ /* Disallow if it's in the replica identity */
+ irattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, irattrs))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" is in index used as replica identity",
+ get_attname(RelationGetRelid(rel), attnum, false)));
+
+ /* Disallow if it's a GENERATED AS IDENTITY column */
+ atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
+ if (!HeapTupleIsValid(atttup))
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ attnum, RelationGetRelid(rel));
+ attForm = (Form_pg_attribute) GETSTRUCT(atttup);
+ if (attForm->attidentity != '\0')
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("column \"%s\" of relation \"%s\" is an identity column",
+ get_attname(RelationGetRelid(rel), attnum,
+ false),
+ RelationGetRelationName(rel)));
- performDeletion(&conobj, behavior, 0);
+ /* All good -- reset attnotnull if needed */
+ if (attForm->attnotnull)
+ {
+ attForm->attnotnull = false;
+ CatalogTupleUpdate(attrel, &atttup->t_self, atttup);
+ }
- found = true;
+ table_close(attrel, RowExclusiveLock);
}
- systable_endscan(scan);
+ is_no_inherit_constraint = con->connoinherit;
- if (!found)
+ /*
+ * If it's a foreign-key constraint, we'd better lock the referenced table
+ * and check that that's not in use, just as we've already done for the
+ * constrained table (else we might, eg, be dropping a trigger that has
+ * unfired events). But we can/must skip that in the self-referential
+ * case.
+ */
+ if (con->contype == CONSTRAINT_FOREIGN &&
+ con->confrelid != RelationGetRelid(rel))
{
- if (!missing_ok)
- {
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" of relation \"%s\" does not exist",
- constrName, RelationGetRelationName(rel))));
- }
- else
- {
- ereport(NOTICE,
- (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping",
- constrName, RelationGetRelationName(rel))));
- table_close(conrel, RowExclusiveLock);
- return;
- }
+ Relation frel;
+
+ /* Must match lock taken by RemoveTriggerById: */
+ frel = table_open(con->confrelid, AccessExclusiveLock);
+ CheckAlterTableIsSafe(frel);
+ table_close(frel, NoLock);
}
/*
- * For partitioned tables, non-CHECK inherited constraints are dropped via
- * the dependency mechanism, so we're done here.
+ * Perform the actual constraint deletion
*/
- if (contype != CONSTRAINT_CHECK &&
+ ObjectAddressSet(conobj, ConstraintRelationId, con->oid);
+ performDeletion(&conobj, behavior, 0);
+
+ /*
+ * For partitioned tables, non-CHECK, non-NOT-NULL inherited constraints
+ * are dropped via the dependency mechanism, so we're done here.
+ */
+ if (con->contype != CONSTRAINT_CHECK &&
+ con->contype != CONSTRAINT_NOTNULL &&
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
table_close(conrel, RowExclusiveLock);
- return;
+ return conobj;
}
/*
@@ -12798,48 +13016,65 @@ ATExecDropConstraint(Relation rel, const char *constrName,
foreach_oid(childrelid, children)
{
Relation childrel;
- HeapTuple copy_tuple;
+ HeapTuple tuple;
+ Form_pg_constraint childcon;
/* find_inheritance_children already got lock */
childrel = table_open(childrelid, NoLock);
CheckAlterTableIsSafe(childrel);
- ScanKeyInit(&skey[0],
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(childrelid));
- ScanKeyInit(&skey[1],
- Anum_pg_constraint_contypid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(InvalidOid));
- ScanKeyInit(&skey[2],
- Anum_pg_constraint_conname,
- BTEqualStrategyNumber, F_NAMEEQ,
- CStringGetDatum(constrName));
- scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
- true, NULL, 3, skey);
-
- /* There can be at most one matching row */
- if (!HeapTupleIsValid(tuple = systable_getnext(scan)))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" of relation \"%s\" does not exist",
- constrName,
- RelationGetRelationName(childrel))));
-
- copy_tuple = heap_copytuple(tuple);
-
- systable_endscan(scan);
+ /*
+ * We search for not-null constraints by column name, and others by
+ * constraint name.
+ */
+ if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ tuple = findNotNullConstraint(childrelid, colname);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation %u",
+ colname, RelationGetRelid(childrel));
+ }
+ else
+ {
+ SysScanDesc scan;
+ ScanKeyData skey[3];
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(childrelid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(InvalidOid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(constrName));
+ scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
+ true, NULL, 3, skey);
+ /* There can only be one, so no need to loop */
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName,
+ RelationGetRelationName(childrel))));
+ tuple = heap_copytuple(tuple);
+ systable_endscan(scan);
+ }
- con = (Form_pg_constraint) GETSTRUCT(copy_tuple);
+ childcon = (Form_pg_constraint) GETSTRUCT(tuple);
- /* Right now only CHECK constraints can be inherited */
- if (con->contype != CONSTRAINT_CHECK)
- elog(ERROR, "inherited constraint is not a CHECK constraint");
+ /* Right now only CHECK and not-null constraints can be inherited */
+ if (childcon->contype != CONSTRAINT_CHECK &&
+ childcon->contype != CONSTRAINT_NOTNULL)
+ elog(ERROR, "inherited constraint is not a CHECK or not-null constraint");
- if (con->coninhcount <= 0) /* shouldn't happen */
+ if (childcon->coninhcount <= 0) /* shouldn't happen */
elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
- childrelid, constrName);
+ childrelid, NameStr(childcon->conname));
if (recurse)
{
@@ -12847,18 +13082,18 @@ ATExecDropConstraint(Relation rel, const char *constrName,
* If the child constraint has other definition sources, just
* decrement its inheritance count; if not, recurse to delete it.
*/
- if (con->coninhcount == 1 && !con->conislocal)
+ if (childcon->coninhcount == 1 && !childcon->conislocal)
{
/* Time to delete this child constraint, too */
- ATExecDropConstraint(childrel, constrName, behavior,
- true, true,
- false, lockmode);
+ dropconstraint_internal(childrel, tuple, behavior,
+ recurse, true, missing_ok,
+ lockmode);
}
else
{
/* Child constraint must survive my deletion */
- con->coninhcount--;
- CatalogTupleUpdate(conrel, &copy_tuple->t_self, copy_tuple);
+ childcon->coninhcount--;
+ CatalogTupleUpdate(conrel, &tuple->t_self, tuple);
/* Make update visible */
CommandCounterIncrement();
@@ -12867,25 +13102,29 @@ ATExecDropConstraint(Relation rel, const char *constrName,
else
{
/*
- * If we were told to drop ONLY in this table (no recursion), we
- * need to mark the inheritors' constraints as locally defined
- * rather than inherited.
+ * If we were told to drop ONLY in this table (no recursion) and
+ * there are no further parents for this constraint, we need to
+ * mark the inheritors' constraints as locally defined rather than
+ * inherited.
*/
- con->coninhcount--;
- con->conislocal = true;
+ childcon->coninhcount--;
+ if (childcon->coninhcount == 0)
+ childcon->conislocal = true;
- CatalogTupleUpdate(conrel, &copy_tuple->t_self, copy_tuple);
+ CatalogTupleUpdate(conrel, &tuple->t_self, tuple);
/* Make update visible */
CommandCounterIncrement();
}
- heap_freetuple(copy_tuple);
+ heap_freetuple(tuple);
table_close(childrel, NoLock);
}
table_close(conrel, RowExclusiveLock);
+
+ return conobj;
}
/*
@@ -13834,10 +14073,26 @@ RememberConstraintForRebuilding(Oid conoid, AlteredTableInfo *tab)
char *defstring = pg_get_constraintdef_command(conoid);
Oid indoid;
- tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids,
- conoid);
- tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
- defstring);
+ /*
+ * It is critical to create not-null constraints ahead of primary key
+ * indexes; otherwise, the not-null constraint would be created by the
+ * primary key, and the constraint name would be wrong.
+ */
+ if (get_constraint_type(conoid) == CONSTRAINT_NOTNULL)
+ {
+ tab->changedConstraintOids = lcons_oid(conoid,
+ tab->changedConstraintOids);
+ tab->changedConstraintDefs = lcons(defstring,
+ tab->changedConstraintDefs);
+ }
+ else
+ {
+
+ tab->changedConstraintOids = lappend_oid(tab->changedConstraintOids,
+ conoid);
+ tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
+ defstring);
+ }
/*
* For the index of a constraint, if any, remember if it is used for
@@ -14000,9 +14255,10 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
/*
* If the constraint is inherited (only), we don't want to inject a
- * new definition here; it'll get recreated when ATAddCheckConstraint
- * recurses from adding the parent table's constraint. But we had to
- * carry the info this far so that we can drop the constraint below.
+ * new definition here; it'll get recreated when
+ * ATAddCheckNNConstraint recurses from adding the parent table's
+ * constraint. But we had to carry the info this far so that we can
+ * drop the constraint below.
*/
if (!conislocal)
continue;
@@ -14241,23 +14497,21 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd,
tab->subcmds[AT_PASS_OLD_CONSTR] =
lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
- /* recreate any comment on the constraint */
- RebuildConstraintComment(tab,
- AT_PASS_OLD_CONSTR,
- oldId,
- rel,
- NIL,
- con->conname);
- }
- else if (cmd->subtype == AT_SetNotNull)
- {
/*
- * The parser will create AT_SetNotNull subcommands for
- * columns of PRIMARY KEY indexes/constraints, but we need
- * not do anything with them here, because the columns'
- * NOT NULL marks will already have been propagated into
- * the new table definition.
+ * Recreate any comment on the constraint. If we have
+ * recreated a primary key, then transformTableConstraint
+ * has added an unnamed not-null constraint here; skip
+ * this in that case.
*/
+ if (con->conname)
+ RebuildConstraintComment(tab,
+ AT_PASS_OLD_CONSTR,
+ oldId,
+ rel,
+ NIL,
+ con->conname);
+ else
+ Assert(con->contype == CONSTR_NOTNULL);
}
else
elog(ERROR, "unexpected statement subtype: %d",
@@ -16012,14 +16266,24 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel, bool ispart
RelationGetRelationName(child_rel), parent_attname)));
/*
- * Check child doesn't discard NOT NULL property. (Other
- * constraints are checked elsewhere.)
+ * If the parent has a not-null constraint that's not NO INHERIT,
+ * make sure the child has one too.
+ *
+ * Other constraints are checked elsewhere.
*/
if (parent_att->attnotnull && !child_att->attnotnull)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("column \"%s\" in child table must be marked NOT NULL",
- parent_attname)));
+ {
+ HeapTuple contup;
+
+ contup = findNotNullConstraintAttnum(RelationGetRelid(parent_rel),
+ parent_att->attnum);
+ if (HeapTupleIsValid(contup) &&
+ !((Form_pg_constraint) GETSTRUCT(contup))->connoinherit)
+ ereport(ERROR,
+ errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" in child table \"%s\" must be marked NOT NULL",
+ parent_attname, RelationGetRelationName(child_rel)));
+ }
/*
* Child column must be generated if and only if parent column is.
@@ -16101,6 +16365,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
ScanKeyData parent_key;
HeapTuple parent_tuple;
Oid parent_relid = RelationGetRelid(parent_rel);
+ AttrMap *attmap;
constraintrel = table_open(ConstraintRelationId, RowExclusiveLock);
@@ -16112,21 +16377,32 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
parent_scan = systable_beginscan(constraintrel, ConstraintRelidTypidNameIndexId,
true, NULL, 1, &parent_key);
+ attmap = build_attrmap_by_name(RelationGetDescr(parent_rel),
+ RelationGetDescr(child_rel),
+ true);
+
while (HeapTupleIsValid(parent_tuple = systable_getnext(parent_scan)))
{
Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(parent_tuple);
SysScanDesc child_scan;
ScanKeyData child_key;
HeapTuple child_tuple;
+ AttrNumber parent_attno;
bool found = false;
- if (parent_con->contype != CONSTRAINT_CHECK)
+ if (parent_con->contype != CONSTRAINT_CHECK &&
+ parent_con->contype != CONSTRAINT_NOTNULL)
continue;
/* if the parent's constraint is marked NO INHERIT, it's not inherited */
if (parent_con->connoinherit)
continue;
+ if (parent_con->contype == CONSTRAINT_NOTNULL)
+ parent_attno = extractNotNullColumn(parent_tuple);
+ else
+ parent_attno = InvalidAttrNumber;
+
/* Search for a child constraint matching this one */
ScanKeyInit(&child_key,
Anum_pg_constraint_conrelid,
@@ -16140,20 +16416,46 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple);
HeapTuple child_copy;
- if (child_con->contype != CONSTRAINT_CHECK)
+ if (child_con->contype != parent_con->contype)
continue;
- if (strcmp(NameStr(parent_con->conname),
- NameStr(child_con->conname)) != 0)
- continue;
+ /*
+ * CHECK constraint are matched by constraint name, NOT NULL ones
+ * by attribute number.
+ */
+ if (child_con->contype == CONSTRAINT_CHECK)
+ {
+ if (strcmp(NameStr(parent_con->conname),
+ NameStr(child_con->conname)) != 0)
+ continue;
+ }
+ else if (child_con->contype == CONSTRAINT_NOTNULL)
+ {
+ Form_pg_attribute parent_attr;
+ Form_pg_attribute child_attr;
+ AttrNumber child_attno;
+
+ parent_attr = TupleDescAttr(parent_rel->rd_att, parent_attno - 1);
+ child_attno = extractNotNullColumn(child_tuple);
+ if (parent_attno != attmap->attnums[child_attno - 1])
+ continue;
- if (!constraints_equivalent(parent_tuple, child_tuple, RelationGetDescr(constraintrel)))
+ child_attr = TupleDescAttr(child_rel->rd_att, child_attno - 1);
+ /* there shouldn't be constraints on dropped columns */
+ if (parent_attr->attisdropped || child_attr->attisdropped)
+ elog(ERROR, "found not-null constraint on dropped columns");
+ }
+
+ if (child_con->contype == CONSTRAINT_CHECK &&
+ !constraints_equivalent(parent_tuple, child_tuple, RelationGetDescr(constraintrel)))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different definition for check constraint \"%s\"",
RelationGetRelationName(child_rel), NameStr(parent_con->conname))));
- /* If the child constraint is "no inherit" then cannot merge */
+ /*
+ * If the child constraint is "no inherit" then cannot merge
+ */
if (child_con->connoinherit)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@@ -16204,10 +16506,21 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
systable_endscan(child_scan);
if (!found)
+ {
+ if (parent_con->contype == CONSTRAINT_NOTNULL)
+ ereport(ERROR,
+ errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" in child table \"%s\" must be marked NOT NULL",
+ get_attname(parent_relid,
+ extractNotNullColumn(parent_tuple),
+ false),
+ RelationGetRelationName(child_rel)));
+
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table is missing constraint \"%s\"",
NameStr(parent_con->conname))));
+ }
}
systable_endscan(parent_scan);
@@ -16352,7 +16665,9 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
ScanKeyData key[3];
HeapTuple attributeTuple,
constraintTuple;
+ AttrMap *attmap;
List *connames;
+ List *nncolumns;
bool found;
bool is_partitioning;
@@ -16417,11 +16732,18 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
table_close(catalogRelation, RowExclusiveLock);
/*
- * Likewise, find inherited check constraints and disinherit them. To do
- * this, we first need a list of the names of the parent's check
- * constraints. (We cheat a bit by only checking for name matches,
+ * Likewise, find inherited check and not-null constraints and disinherit
+ * them. To do this, we first need a list of the names of the parent's
+ * check constraints. (We cheat a bit by only checking for name matches,
* assuming that the expressions will match.)
+ *
+ * For NOT NULL columns, we store column numbers to match, mapping them in
+ * to the child rel's attribute numbers.
*/
+ attmap = build_attrmap_by_name(RelationGetDescr(child_rel),
+ RelationGetDescr(parent_rel),
+ false);
+
catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
Anum_pg_constraint_conrelid,
@@ -16431,18 +16753,28 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
true, NULL, 1, key);
connames = NIL;
+ nncolumns = NIL;
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
+ if (con->connoinherit)
+ continue;
+
if (con->contype == CONSTRAINT_CHECK)
connames = lappend(connames, pstrdup(NameStr(con->conname)));
+ if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ AttrNumber parent_attno = extractNotNullColumn(constraintTuple);
+
+ nncolumns = lappend_int(nncolumns, attmap->attnums[parent_attno - 1]);
+ }
}
systable_endscan(scan);
- /* Now scan the child's constraints */
+ /* Now scan the child's constraints to find matches */
ScanKeyInit(&key[0],
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
@@ -16453,20 +16785,41 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
- bool match;
-
- if (con->contype != CONSTRAINT_CHECK)
- continue;
+ bool match = false;
- match = false;
- foreach_ptr(char, chkname, connames)
+ /*
+ * Match CHECK constraints by name, not-null constraints by column
+ * number, and ignore all others.
+ */
+ if (con->contype == CONSTRAINT_CHECK)
{
- if (strcmp(NameStr(con->conname), chkname) == 0)
+ foreach_ptr(char, chkname, connames)
{
- match = true;
- break;
+ if (con->contype == CONSTRAINT_CHECK &&
+ strcmp(NameStr(con->conname), chkname) == 0)
+ {
+ match = true;
+ connames = foreach_delete_current(connames, chkname);
+ break;
+ }
+ }
+ }
+ else if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ AttrNumber child_attno = extractNotNullColumn(constraintTuple);
+
+ foreach_int(prevattno, nncolumns)
+ {
+ if (prevattno == child_attno)
+ {
+ match = true;
+ nncolumns = foreach_delete_current(nncolumns, prevattno);
+ break;
+ }
}
}
+ else
+ continue;
if (match)
{
@@ -16487,6 +16840,12 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
}
}
+ /* We should have matched all constraints */
+ if (connames != NIL || nncolumns != NIL)
+ elog(ERROR, "%d unmatched constraints while removing inheritance from \"%s\" to \"%s\"",
+ list_length(connames) + list_length(nncolumns),
+ RelationGetRelationName(child_rel), RelationGetRelationName(parent_rel));
+
systable_endscan(scan);
table_close(catalogRelation, RowExclusiveLock);
@@ -19039,7 +19398,8 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
/*
* If no suitable index was found in the partition-to-be, create one
- * now.
+ * now. Note that if this is a PK, not-null constraints must already
+ * exist.
*/
if (!found)
{
@@ -19737,7 +20097,7 @@ ATExecDetachPartitionFinalize(Relation rel, RangeVar *name)
* DetachAddConstraintIfNeeded
* Subroutine for ATExecDetachPartition. Create a constraint that
* takes the place of the partition constraint, but avoid creating
- * a dupe if an constraint already exists which implies the needed
+ * a dupe if a constraint already exists which implies the needed
* constraint.
*/
static void
@@ -19770,8 +20130,8 @@ DetachAddConstraintIfNeeded(List **wqueue, Relation partRel)
n->initially_valid = true;
n->skip_validation = true;
/* It's a re-add, since it nominally already exists */
- ATAddCheckConstraint(wqueue, tab, partRel, n,
- true, false, true, ShareUpdateExclusiveLock);
+ ATAddCheckNNConstraint(wqueue, tab, partRel, n,
+ true, false, true, ShareUpdateExclusiveLock);
}
}
@@ -20040,6 +20400,13 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
RelationGetRelationName(partIdx))));
}
+ /*
+ * If it's a primary key, make sure the columns in the partition are
+ * NOT NULL.
+ */
+ if (parentIdx->rd_index->indisprimary)
+ verifyPartitionIndexNotNull(childInfo, partTbl);
+
/* All good -- do it */
IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx));
if (OidIsValid(constraintOid))
@@ -20184,6 +20551,29 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
}
/*
+ * When attaching an index as a partition of a partitioned index which is a
+ * primary key, verify that all the columns in the partition are marked NOT
+ * NULL.
+ */
+static void
+verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition)
+{
+ for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition),
+ iinfo->ii_IndexAttrNumbers[i] - 1);
+
+ if (!att->attnotnull)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("invalid primary key definition"),
+ errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.",
+ NameStr(att->attname),
+ RelationGetRelationName(partition)));
+ }
+}
+
+/*
* Return an OID list of constraints that reference the given relation
* that are marked as having a parent constraints.
*/