aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2023-08-25 13:31:24 +0200
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2023-08-25 13:31:24 +0200
commitb0e96f311985bceba79825214f8e43f65afa653a (patch)
tree862e344e9a51ef034f3039d44c289a3c26fc656f /src/backend/commands/tablecmds.c
parent9c13b6814ac7943036c64b377675184b243f04e8 (diff)
downloadpostgresql-b0e96f311985bceba79825214f8e43f65afa653a.tar.gz
postgresql-b0e96f311985bceba79825214f8e43f65afa653a.zip
Catalog not-null constraints
We now create contype='n' pg_constraint rows for not-null constraints. We propagate these constraints to other tables during operations such as adding inheritance relationships, creating and attaching partitions and creating tables LIKE other tables. We also spawn not-null constraints for inheritance child tables when their parents have primary keys. These related constraints mostly follow the well-known rules of conislocal and coninhcount that we have for CHECK constraints, with some adaptations: for example, as opposed to CHECK constraints, we don't match not-null ones by name when descending a hierarchy to alter it, instead matching by column name that they apply to. This means we don't require the constraint names to be identical across a hierarchy. For now, we omit them for system catalogs. Maybe this is worth reconsidering. We don't support NOT VALID nor DEFERRABLE clauses either; these can be added as separate features later (this patch is already large and complicated enough.) psql shows these constraints in \d+. pg_dump requires some ad-hoc hacks, particularly when dumping a primary key. We now create one "throwaway" not-null constraint for each column in the PK together with the CREATE TABLE command, and once the PK is created, all those throwaway constraints are removed. This avoids having to check each tuple for nullness when the dump restores the primary key creation. pg_upgrading from an older release requires a somewhat brittle procedure to create a constraint state that matches what would be created if the database were being created fresh in Postgres 17. I have tested all the scenarios I could think of, and it works correctly as far as I can tell, but I could have neglected weird cases. This patch has been very long in the making. The first patch was written by Bernd Helmle in 2010 to add a new pg_constraint.contype value ('n'), which I (Álvaro) then hijacked in 2011 and 2012, until that one was killed by the realization that we ought to use contype='c' instead: manufactured CHECK constraints. However, later SQL standard development, as well as nonobvious emergent properties of that design (mostly, failure to distinguish them from "normal" CHECK constraints as well as the performance implication of having to test the CHECK expression) led us to reconsider this choice, so now the current implementation uses contype='n' again. During Postgres 16 this had already been introduced by commit e056c557aef4, but there were some problems mainly with the pg_upgrade procedure that couldn't be fixed in reasonable time, so it was reverted. In 2016 Vitaly Burovoy also worked on this feature[1] but found no consensus for his proposed approach, which was claimed to be closer to the letter of the standard, requiring an additional pg_attribute column to track the OID of the not-null constraint for that column. [1] https://postgr.es/m/CAKOSWNkN6HSyatuys8xZxzRCR-KL1OkHS5-b9qd9bf1Rad3PLA@mail.gmail.com Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Author: Bernd Helmle <mailings@oopsware.de> Reviewed-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Peter Eisentraut <peter.eisentraut@enterprisedb.com> Reviewed-by: Dean Rasheed <dean.a.rasheed@gmail.com>
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c1591
1 files changed, 1172 insertions, 419 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index f77de4e7c99..47c900445c7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -200,7 +200,7 @@ typedef struct AlteredTableInfo
} AlteredTableInfo;
/* Struct describing one new constraint to check in Phase 3 scan */
-/* Note: new NOT NULL constraints are handled elsewhere */
+/* Note: new not-null constraints are handled elsewhere */
typedef struct NewConstraint
{
char *name; /* Constraint name, or NULL if none */
@@ -351,7 +351,8 @@ static void truncate_check_activity(Relation rel);
static void RangeVarCallbackForTruncate(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static List *MergeAttributes(List *schema, List *supers, char relpersistence,
- bool is_partition, List **supconstr);
+ bool is_partition, List **supconstr,
+ List **supnotnulls);
static bool MergeCheckConstraint(List *constraints, char *name, Node *expr);
static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
@@ -432,16 +433,16 @@ static bool check_for_column_name_collision(Relation rel, const char *colname,
bool if_not_exists);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid);
-static void ATPrepDropNotNull(Relation rel, bool recurse, bool recursing);
-static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode);
-static void ATPrepSetNotNull(List **wqueue, Relation rel,
- AlterTableCmd *cmd, bool recurse, bool recursing,
- LOCKMODE lockmode,
- AlterTableUtilityContext *context);
-static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode);
-static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode);
+static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
+ LOCKMODE lockmode);
+static bool set_attnotnull(List **wqueue, Relation rel,
+ AttrNumber attnum, bool recurse, LOCKMODE lockmode);
+static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel,
+ char *constrname, char *colName,
+ bool recurse, bool recursing,
+ List **readyRels, LOCKMODE lockmode);
+static ObjectAddress ATExecSetAttNotNull(List **wqueue, Relation rel,
+ const char *colName, LOCKMODE lockmode);
static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr);
static bool ConstraintImpliedByRelConstraint(Relation scanrel,
List *testConstraint, List *provenConstraint);
@@ -470,6 +471,8 @@ static ObjectAddress ATExecDropColumn(List **wqueue, Relation rel, const char *c
bool recurse, bool recursing,
bool missing_ok, LOCKMODE lockmode,
ObjectAddresses *addrs);
+static void ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
+ LOCKMODE lockmode, AlterTableUtilityContext *context);
static ObjectAddress ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode);
static ObjectAddress ATExecAddStatistics(AlteredTableInfo *tab, Relation rel,
@@ -481,11 +484,11 @@ static ObjectAddress ATExecAddConstraint(List **wqueue,
static char *ChooseForeignKeyConstraintNameAddition(List *colnames);
static ObjectAddress ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, LOCKMODE lockmode);
-static ObjectAddress ATAddCheckConstraint(List **wqueue,
- AlteredTableInfo *tab, Relation rel,
- Constraint *constr,
- bool recurse, bool recursing, bool is_readd,
- LOCKMODE lockmode);
+static ObjectAddress ATAddCheckNNConstraint(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ Constraint *constr,
+ bool recurse, bool recursing, bool is_readd,
+ LOCKMODE lockmode);
static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab,
Relation rel, Constraint *fkconstraint,
bool recurse, bool recursing,
@@ -542,6 +545,11 @@ static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
bool missing_ok, LOCKMODE lockmode);
+static ObjectAddress dropconstraint_internal(Relation rel,
+ HeapTuple constraintTup, DropBehavior behavior,
+ bool recurse, bool recursing,
+ bool missing_ok, List **readyRels,
+ LOCKMODE lockmode);
static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
@@ -614,10 +622,12 @@ static void ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partPa
static void CreateInheritance(Relation child_rel, Relation parent_rel);
static void RemoveInheritance(Relation child_rel, Relation parent_rel,
bool expect_detached);
+static void ATInheritAdjustNotNulls(Relation parent_rel, Relation child_rel,
+ int inhcount);
static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
PartitionCmd *cmd,
AlterTableUtilityContext *context);
-static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel);
+static void AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel);
static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
List *partConstraint,
bool validate_default);
@@ -635,6 +645,7 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx,
static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
+static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx);
static List *GetParentedForeignKeyRefs(Relation partition);
static void ATDetachCheckNoForeignKeyRefs(Relation partition);
static char GetAttributeCompression(Oid atttypid, char *compression);
@@ -672,8 +683,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
TupleDesc descriptor;
List *inheritOids;
List *old_constraints;
+ List *old_notnulls;
List *rawDefaults;
List *cookedDefaults;
+ List *nncols;
Datum reloptions;
ListCell *listptr;
AttrNumber attnum;
@@ -863,12 +876,13 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
MergeAttributes(stmt->tableElts, inheritOids,
stmt->relation->relpersistence,
stmt->partbound != NULL,
- &old_constraints);
+ &old_constraints, &old_notnulls);
/*
* Create a tuple descriptor from the relation schema. Note that this
- * deals with column names, types, and NOT NULL constraints, but not
- * default values or CHECK constraints; we handle those below.
+ * deals with column names, types, and in-descriptor NOT NULL flags, but
+ * not default values, NOT NULL or CHECK constraints; we handle those
+ * below.
*/
descriptor = BuildDescForRelation(stmt->tableElts);
@@ -1251,6 +1265,17 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
AddRelationNewConstraints(rel, NIL, stmt->constraints,
true, true, false, queryString);
+ /*
+ * Finally, merge the not-null constraints that are declared directly with
+ * those that come from parent relations (making sure to count inheritance
+ * appropriately for each), create them, and set the attnotnull flag on
+ * columns that don't yet have it.
+ */
+ nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints,
+ old_notnulls);
+ foreach(listptr, nncols)
+ set_attnotnull(NULL, rel, lfirst_int(listptr), false, NoLock);
+
ObjectAddressSet(address, RelationRelationId, relationId);
/*
@@ -2299,6 +2324,8 @@ storage_name(char c)
* Output arguments:
* 'supconstr' receives a list of constraints belonging to the parents,
* updated as necessary to be valid for the child.
+ * 'supnotnulls' receives a list of CookedConstraints that corresponds to
+ * constraints coming from inheritance parents.
*
* Return value:
* Completed schema list.
@@ -2327,9 +2354,12 @@ storage_name(char c)
* If the same attribute name appears multiple times, then it appears
* in the result table in the proper location for its first appearance.
*
- * Constraints (including NOT NULL constraints) for the child table
+ * Constraints (including not-null constraints) for the child table
* are the union of all relevant constraints, from both the child schema
- * and parent tables.
+ * and parent tables. In addition, in legacy inheritance, each column that
+ * appears in a primary key in any of the parents also gets a NOT NULL
+ * constraint (partitioning doesn't need this, because the PK itself gets
+ * inherited.)
*
* The default value for a child column is defined as:
* (1) If the child schema specifies a default, that value is used.
@@ -2348,10 +2378,11 @@ storage_name(char c)
*/
static List *
MergeAttributes(List *schema, List *supers, char relpersistence,
- bool is_partition, List **supconstr)
+ bool is_partition, List **supconstr, List **supnotnulls)
{
List *inhSchema = NIL;
List *constraints = NIL;
+ List *nnconstraints = NIL;
bool have_bogus_defaults = false;
int child_attno;
static Node bogus_marker = {0}; /* marks conflicting defaults */
@@ -2462,9 +2493,12 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
AttrMap *newattmap;
List *inherited_defaults;
List *cols_with_defaults;
+ List *nnconstrs;
AttrNumber parent_attno;
ListCell *lc1;
ListCell *lc2;
+ Bitmapset *pkattrs;
+ Bitmapset *nncols = NULL;
/* caller already got lock */
relation = table_open(parent, NoLock);
@@ -2553,6 +2587,20 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/* We can't process inherited defaults until newattmap is complete. */
inherited_defaults = cols_with_defaults = NIL;
+ /*
+ * All columns that are part of the parent's primary key need to be
+ * NOT NULL; if partition just the attnotnull bit, otherwise a full
+ * constraint (if they don't have one already). Also, we request
+ * attnotnull on columns that have a not-null constraint that's not
+ * marked NO INHERIT.
+ */
+ pkattrs = RelationGetIndexAttrBitmap(relation,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+ nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation), true);
+ foreach(lc1, nnconstrs)
+ nncols = bms_add_member(nncols,
+ ((CookedConstraint *) lfirst(lc1))->attnum);
+
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
@@ -2648,9 +2696,38 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
/*
- * Merge of NOT NULL constraints = OR 'em together
+ * In regular inheritance, columns in the parent's primary key
+ * get an extra not-null constraint. Partitioning doesn't
+ * need this, because the PK itself is going to be cloned to
+ * the partition.
*/
- def->is_not_null |= attribute->attnotnull;
+ if (!is_partition &&
+ bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber,
+ pkattrs))
+ {
+ CookedConstraint *nn;
+
+ nn = palloc(sizeof(CookedConstraint));
+ nn->contype = CONSTR_NOTNULL;
+ nn->conoid = InvalidOid;
+ nn->name = NULL;
+ nn->attnum = exist_attno;
+ nn->expr = NULL;
+ nn->skip_validation = false;
+ nn->is_local = false;
+ nn->inhcount = 1;
+ nn->is_no_inherit = false;
+
+ nnconstraints = lappend(nnconstraints, nn);
+ }
+
+ /*
+ * mark attnotnull if parent has it and it's not NO INHERIT
+ */
+ if (bms_is_member(parent_attno, nncols) ||
+ bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber,
+ pkattrs))
+ def->is_not_null = true;
/*
* Check for GENERATED conflicts
@@ -2684,7 +2761,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
attribute->atttypmod);
def->inhcount = 1;
def->is_local = false;
- def->is_not_null = attribute->attnotnull;
+ /* mark attnotnull if parent has it and it's not NO INHERIT */
+ if (bms_is_member(parent_attno, nncols) ||
+ bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber,
+ pkattrs))
+ def->is_not_null = true;
def->is_from_type = false;
def->storage = attribute->attstorage;
def->raw_default = NULL;
@@ -2701,6 +2782,33 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
def->compression = NULL;
inhSchema = lappend(inhSchema, def);
newattmap->attnums[parent_attno - 1] = ++child_attno;
+
+ /*
+ * In regular inheritance, columns in the parent's primary key
+ * get an extra not-null constraint. Partitioning doesn't
+ * need this, because the PK itself is going to be cloned to
+ * the partition.
+ */
+ if (!is_partition &&
+ bms_is_member(parent_attno -
+ FirstLowInvalidHeapAttributeNumber,
+ pkattrs))
+ {
+ CookedConstraint *nn;
+
+ nn = palloc(sizeof(CookedConstraint));
+ nn->contype = CONSTR_NOTNULL;
+ nn->conoid = InvalidOid;
+ nn->name = NULL;
+ nn->attnum = newattmap->attnums[parent_attno - 1];
+ nn->expr = NULL;
+ nn->skip_validation = false;
+ nn->is_local = false;
+ nn->inhcount = 1;
+ nn->is_no_inherit = false;
+
+ nnconstraints = lappend(nnconstraints, nn);
+ }
}
/*
@@ -2845,6 +2953,23 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
}
+ /*
+ * Also copy the not-null constraints from this parent. The
+ * attnotnull markings were already installed above.
+ */
+ foreach(lc1, nnconstrs)
+ {
+ CookedConstraint *nn = lfirst(lc1);
+
+ Assert(nn->contype == CONSTR_NOTNULL);
+
+ nn->attnum = newattmap->attnums[nn->attnum - 1];
+ nn->is_local = false;
+ nn->inhcount = 1;
+
+ nnconstraints = lappend(nnconstraints, nn);
+ }
+
free_attrmap(newattmap);
/*
@@ -2972,7 +3097,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
/*
- * Merge of NOT NULL constraints = OR 'em together
+ * Merge of not-null constraints = OR 'em together
*/
def->is_not_null |= newdef->is_not_null;
@@ -3051,8 +3176,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/*
* Now that we have the column definition list for a partition, we can
* check whether the columns referenced in the column constraint specs
- * actually exist. Also, we merge parent's NOT NULL constraints and
- * defaults into each corresponding column definition.
+ * actually exist. Also, merge column defaults.
*/
if (is_partition)
{
@@ -3069,7 +3193,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
if (strcmp(coldef->colname, restdef->colname) == 0)
{
found = true;
- coldef->is_not_null |= restdef->is_not_null;
/*
* Check for conflicts related to generated columns.
@@ -3158,6 +3281,8 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
*supconstr = constraints;
+ *supnotnulls = nnconstraints;
+
return schema;
}
@@ -3769,7 +3894,10 @@ rename_constraint_internal(Oid myrelid,
constraintOid);
con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit)
+ if (myrelid &&
+ (con->contype == CONSTRAINT_CHECK ||
+ con->contype == CONSTRAINT_NOTNULL) &&
+ !con->connoinherit)
{
if (recurse)
{
@@ -4354,6 +4482,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_AddIndexConstraint:
case AT_ReplicaIdentity:
case AT_SetNotNull:
+ case AT_SetAttNotNull:
case AT_EnableRowSecurity:
case AT_DisableRowSecurity:
case AT_ForceRowSecurity:
@@ -4492,15 +4621,6 @@ AlterTableGetLockLevel(List *cmds)
cmd_lockmode = ShareUpdateExclusiveLock;
break;
- case AT_CheckNotNull:
-
- /*
- * This only examines the table's schema; but lock must be
- * strong enough to prevent concurrent DROP NOT NULL.
- */
- cmd_lockmode = AccessShareLock;
- break;
-
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
@@ -4652,21 +4772,23 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
- ATPrepDropNotNull(rel, recurse, recursing);
- ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
+ /* Set up recursion for phase 2; no other prep needed */
+ if (recurse)
+ cmd->recurse = true;
pass = AT_PASS_DROP;
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
- /* Need command-specific recursion decision */
- ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing,
- lockmode, context);
+ /* Set up recursion for phase 2; no other prep needed */
+ if (recurse)
+ cmd->recurse = true;
pass = AT_PASS_COL_ATTRS;
break;
- case AT_CheckNotNull: /* check column is already marked NOT NULL */
+ case AT_SetAttNotNull: /* set pg_attribute.attnotnull without adding
+ * a constraint */
ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
+ /* Need command-specific recursion decision */
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
- /* No command-specific prep needed */
pass = AT_PASS_COL_ATTRS;
break;
case AT_DropExpression: /* ALTER COLUMN DROP EXPRESSION */
@@ -5045,13 +5167,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode);
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
- address = ATExecDropNotNull(rel, cmd->name, lockmode);
+ address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode);
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
- address = ATExecSetNotNull(tab, rel, cmd->name, lockmode);
+ address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name,
+ cmd->recurse, false, NULL, lockmode);
break;
- case AT_CheckNotNull: /* check column is already marked NOT NULL */
- ATExecCheckNotNull(tab, rel, cmd->name, lockmode);
+ case AT_SetAttNotNull: /* set pg_attribute.attnotnull */
+ address = ATExecSetAttNotNull(wqueue, rel, cmd->name, lockmode);
break;
case AT_DropExpression:
address = ATExecDropExpression(rel, cmd->name, cmd->missing_ok, lockmode);
@@ -5387,21 +5510,23 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/
switch (cmd2->subtype)
{
- case AT_SetNotNull:
- /* Need command-specific recursion decision */
- ATPrepSetNotNull(wqueue, rel, cmd2,
- recurse, false,
- lockmode, context);
+ case AT_SetAttNotNull:
+ ATSimpleRecursion(wqueue, rel, cmd2, recurse, lockmode, context);
pass = AT_PASS_COL_ATTRS;
break;
case AT_AddIndex:
- /* This command never recurses */
- /* No command-specific prep needed */
+
+ /*
+ * A primary key on a inheritance parent needs supporting NOT
+ * NULL constraint on its children; enqueue commands to create
+ * those or mark them inherited if they already exist.
+ */
+ ATPrepAddPrimaryKey(wqueue, rel, cmd2, lockmode, context);
pass = AT_PASS_ADD_INDEX;
break;
case AT_AddIndexConstraint:
- /* This command never recurses */
- /* No command-specific prep needed */
+ /* as above */
+ ATPrepAddPrimaryKey(wqueue, rel, cmd2, lockmode, context);
pass = AT_PASS_ADD_INDEXCONSTR;
break;
case AT_AddConstraint:
@@ -5845,7 +5970,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
{
/*
* If we are rebuilding the tuples OR if we added any new but not
- * verified NOT NULL constraints, check all not-null constraints. This
+ * verified not-null constraints, check all not-null constraints. This
* is a bit of overkill but it minimizes risk of bugs, and
* heap_attisnull is a pretty cheap test anyway.
*/
@@ -6067,6 +6192,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
RelationGetRelationName(oldrel)),
errtableconstraint(oldrel, con->name)));
break;
+ case CONSTR_NOTNULL:
case CONSTR_FOREIGN:
/* Nothing to do here */
break;
@@ -6175,10 +6301,10 @@ alter_table_type_to_string(AlterTableType cmdtype)
return "ALTER COLUMN ... DROP NOT NULL";
case AT_SetNotNull:
return "ALTER COLUMN ... SET NOT NULL";
+ case AT_SetAttNotNull:
+ return NULL; /* not real grammar */
case AT_DropExpression:
return "ALTER COLUMN ... DROP EXPRESSION";
- case AT_CheckNotNull:
- return NULL; /* not real grammar */
case AT_SetStatistics:
return "ALTER COLUMN ... SET STATISTICS";
case AT_SetOptions:
@@ -6774,8 +6900,7 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
*/
static ObjectAddress
ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
- AlterTableCmd **cmd,
- bool recurse, bool recursing,
+ AlterTableCmd **cmd, bool recurse, bool recursing,
LOCKMODE lockmode, int cur_pass,
AlterTableUtilityContext *context)
{
@@ -7044,7 +7169,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
* the effect of NULL values in the new column.
*
* An exception occurs when the new column is of a domain type: the domain
- * might have a NOT NULL constraint, or a check constraint that indirectly
+ * might have a not-null constraint, or a check constraint that indirectly
* rejects nulls. If there are any domain constraints then we construct
* an explicit NULL default value that will be passed through
* CoerceToDomain processing. (This is a tad inefficient, since it causes
@@ -7290,42 +7415,21 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid)
/*
* ALTER TABLE ALTER COLUMN DROP NOT NULL
- */
-
-static void
-ATPrepDropNotNull(Relation rel, bool recurse, bool recursing)
-{
- /*
- * If the parent is a partitioned table, like check constraints, we do not
- * support removing the NOT NULL while partitions exist.
- */
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- {
- PartitionDesc partdesc = RelationGetPartitionDesc(rel, true);
-
- Assert(partdesc != NULL);
- if (partdesc->nparts > 0 && !recurse && !recursing)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
- errhint("Do not specify the ONLY keyword.")));
- }
-}
-
-/*
+ *
* Return the address of the modified column. If the column was already
* nullable, InvalidObjectAddress is returned.
*/
static ObjectAddress
-ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
+ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
+ LOCKMODE lockmode)
{
HeapTuple tuple;
+ HeapTuple conTup;
Form_pg_attribute attTup;
AttrNumber attnum;
Relation attr_rel;
- List *indexoidlist;
- ListCell *indexoidscan;
ObjectAddress address;
+ List *readyRels;
/*
* lookup the attribute
@@ -7340,6 +7444,15 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
colName, RelationGetRelationName(rel))));
attTup = (Form_pg_attribute) GETSTRUCT(tuple);
attnum = attTup->attnum;
+ ObjectAddressSubSet(address, RelationRelationId,
+ RelationGetRelid(rel), attnum);
+
+ /* If the column is already nullable there's nothing to do. */
+ if (!attTup->attnotnull)
+ {
+ table_close(attr_rel, RowExclusiveLock);
+ return InvalidObjectAddress;
+ }
/* Prevent them from altering a system attribute */
if (attnum <= 0)
@@ -7355,62 +7468,37 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
colName, RelationGetRelationName(rel))));
/*
- * Check that the attribute is not in a primary key or in an index used as
- * a replica identity.
- *
- * Note: we'll throw error even if the pkey index is not valid.
+ * It's not OK to remove a constraint only for the parent and leave it in
+ * the children, so disallow that.
*/
-
- /* Loop over all indexes on the relation */
- indexoidlist = RelationGetIndexList(rel);
-
- foreach(indexoidscan, indexoidlist)
+ if (!recurse)
{
- Oid indexoid = lfirst_oid(indexoidscan);
- HeapTuple indexTuple;
- Form_pg_index indexStruct;
- int i;
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc partdesc;
- indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
- if (!HeapTupleIsValid(indexTuple))
- elog(ERROR, "cache lookup failed for index %u", indexoid);
- indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+ partdesc = RelationGetPartitionDesc(rel, true);
- /*
- * If the index is not a primary key or an index used as replica
- * identity, skip the check.
- */
- if (indexStruct->indisprimary || indexStruct->indisreplident)
+ if (partdesc->nparts > 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
+ errhint("Do not specify the ONLY keyword."));
+ }
+ else if (rel->rd_rel->relhassubclass &&
+ find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
{
- /*
- * Loop over each attribute in the primary key or the index used
- * as replica identity and see if it matches the to-be-altered
- * attribute.
- */
- for (i = 0; i < indexStruct->indnkeyatts; i++)
- {
- if (indexStruct->indkey.values[i] == attnum)
- {
- if (indexStruct->indisprimary)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("column \"%s\" is in a primary key",
- colName)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("column \"%s\" is in index used as replica identity",
- colName)));
- }
- }
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("not-null constraint on column \"%s\" must be removed in child tables too",
+ colName),
+ errhint("Do not specify the ONLY keyword."));
}
-
- ReleaseSysCache(indexTuple);
}
- list_free(indexoidlist);
-
- /* If rel is partition, shouldn't drop NOT NULL if parent has the same */
+ /*
+ * If rel is partition, shouldn't drop NOT NULL if parent has the same.
+ */
if (rel->rd_rel->relispartition)
{
Oid parentId = get_partition_parent(RelationGetRelid(rel), false);
@@ -7428,19 +7516,35 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
}
/*
- * Okay, actually perform the catalog change ... if needed
+ * Find the constraint that makes this column NOT NULL.
*/
- if (attTup->attnotnull)
+ conTup = findNotNullConstraint(RelationGetRelid(rel), colName);
+ if (conTup == NULL)
{
- attTup->attnotnull = false;
+ Bitmapset *pkcols;
- CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
+ /*
+ * There's no not-null constraint, so throw an error. If the column
+ * is in a primary key, we can throw a specific error. Otherwise,
+ * this is unexpected.
+ */
+ pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY);
+ if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
+ pkcols))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" is in a primary key", colName));
- ObjectAddressSubSet(address, RelationRelationId,
- RelationGetRelid(rel), attnum);
+ /* this shouldn't happen */
+ elog(ERROR, "could not find not-null constraint on column \"%s\", relation \"%s\"",
+ colName, RelationGetRelationName(rel));
}
- else
- address = InvalidObjectAddress;
+
+ readyRels = NIL;
+ dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false,
+ false, &readyRels, lockmode);
+
+ heap_freetuple(conTup);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel), attnum);
@@ -7451,102 +7555,137 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
}
/*
- * ALTER TABLE ALTER COLUMN SET NOT NULL
+ * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3
+ * to verify it; recurses to apply the same to children.
+ *
+ * When called to alter an existing table, 'wqueue' must be given so that we can
+ * queue a check that existing tuples pass the constraint. When called from
+ * table creation, 'wqueue' should be passed as NULL.
+ *
+ * Returns true if the flag was set in any table, otherwise false.
*/
-
-static void
-ATPrepSetNotNull(List **wqueue, Relation rel,
- AlterTableCmd *cmd, bool recurse, bool recursing,
- LOCKMODE lockmode, AlterTableUtilityContext *context)
+static bool
+set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, bool recurse,
+ LOCKMODE lockmode)
{
- /*
- * If we're already recursing, there's nothing to do; the topmost
- * invocation of ATSimpleRecursion already visited all children.
- */
- if (recursing)
- return;
+ HeapTuple tuple;
+ Form_pg_attribute attForm;
+ bool retval = false;
- /*
- * If the target column is already marked NOT NULL, we can skip recursing
- * to children, because their columns should already be marked NOT NULL as
- * well. But there's no point in checking here unless the relation has
- * some children; else we can just wait till execution to check. (If it
- * does have children, however, this can save taking per-child locks
- * unnecessarily. This greatly improves concurrency in some parallel
- * restore scenarios.)
- *
- * Unfortunately, we can only apply this optimization to partitioned
- * tables, because traditional inheritance doesn't enforce that child
- * columns be NOT NULL when their parent is. (That's a bug that should
- * get fixed someday.)
- */
- if (rel->rd_rel->relhassubclass &&
- rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ attnum, RelationGetRelid(rel));
+ attForm = (Form_pg_attribute) GETSTRUCT(tuple);
+ if (!attForm->attnotnull)
{
- HeapTuple tuple;
- bool attnotnull;
+ Relation attr_rel;
- tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name);
+ attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
- /* Might as well throw the error now, if name is bad */
- if (!HeapTupleIsValid(tuple))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- cmd->name, RelationGetRelationName(rel))));
+ attForm->attnotnull = true;
+ CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
- attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull;
- ReleaseSysCache(tuple);
- if (attnotnull)
- return;
+ table_close(attr_rel, RowExclusiveLock);
+
+ /*
+ * And set up for existing values to be checked, unless another
+ * constraint already proves this.
+ */
+ if (wqueue && !NotNullImpliedByRelConstraints(rel, attForm))
+ {
+ AlteredTableInfo *tab;
+
+ tab = ATGetQueueEntry(wqueue, rel);
+ tab->verify_new_notnull = true;
+ }
+
+ retval = true;
}
- /*
- * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table,
- * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use
- * normal recursion logic.
- */
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
- !recurse)
+ if (recurse)
{
- AlterTableCmd *newcmd = makeNode(AlterTableCmd);
+ List *children;
+ ListCell *lc;
+
+ children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+ foreach(lc, children)
+ {
+ Oid childrelid = lfirst_oid(lc);
+ Relation childrel;
+ AttrNumber childattno;
+
+ /* find_inheritance_children already got lock */
+ childrel = table_open(childrelid, NoLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
- newcmd->subtype = AT_CheckNotNull;
- newcmd->name = pstrdup(cmd->name);
- ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context);
+ childattno = get_attnum(RelationGetRelid(childrel),
+ get_attname(RelationGetRelid(rel), attnum,
+ false));
+ retval |= set_attnotnull(wqueue, childrel, childattno,
+ recurse, lockmode);
+ table_close(childrel, NoLock);
+ }
}
- else
- ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
+
+ return retval;
}
/*
- * Return the address of the modified column. If the column was already NOT
- * NULL, InvalidObjectAddress is returned.
+ * ALTER TABLE ALTER COLUMN SET NOT NULL
+ *
+ * Add a not-null constraint to a single table and its children. Returns
+ * the address of the constraint added to the parent relation, if one gets
+ * added, or InvalidObjectAddress otherwise.
+ *
+ * We must recurse to child tables during execution, rather than using
+ * ALTER TABLE's normal prep-time recursion.
*/
static ObjectAddress
-ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode)
+ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
+ bool recurse, bool recursing, List **readyRels,
+ LOCKMODE lockmode)
{
HeapTuple tuple;
+ Relation constr_rel;
+ ScanKeyData skey;
+ SysScanDesc conscan;
AttrNumber attnum;
- Relation attr_rel;
ObjectAddress address;
+ Constraint *constraint;
+ CookedConstraint *ccon;
+ List *cooked;
+ bool is_no_inherit = false;
+ List *ready = NIL;
/*
- * lookup the attribute
+ * In cases of multiple inheritance, we might visit the same child more
+ * than once. In the topmost call, set up a list that we fill with all
+ * visited relations, to skip those.
*/
- attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
+ if (readyRels == NULL)
+ {
+ Assert(!recursing);
+ readyRels = &ready;
+ }
+ if (list_member_oid(*readyRels, RelationGetRelid(rel)))
+ return InvalidObjectAddress;
+ *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel));
- tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ {
+ ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
+ Assert(conName != NULL);
+ }
- if (!HeapTupleIsValid(tuple))
+ attnum = get_attnum(RelationGetRelid(rel), colName);
+ if (attnum == InvalidAttrNumber)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
colName, RelationGetRelationName(rel))));
- attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
-
/* Prevent them from altering a system attribute */
if (attnum <= 0)
ereport(ERROR,
@@ -7554,80 +7693,178 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
errmsg("cannot alter system column \"%s\"",
colName)));
- /*
- * Okay, actually perform the catalog change ... if needed
- */
- if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
+ /* See if there's already a constraint */
+ constr_rel = table_open(ConstraintRelationId, RowExclusiveLock);
+ ScanKeyInit(&skey,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ conscan = systable_beginscan(constr_rel, ConstraintRelidTypidNameIndexId, true,
+ NULL, 1, &skey);
+
+ while (HeapTupleIsValid(tuple = systable_getnext(conscan)))
{
- ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = true;
+ Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple);
+ bool changed = false;
+ HeapTuple copytup;
- CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
+ if (conForm->contype != CONSTRAINT_NOTNULL)
+ continue;
+
+ if (extractNotNullColumn(tuple) != attnum)
+ continue;
+
+ copytup = heap_copytuple(tuple);
+ conForm = (Form_pg_constraint) GETSTRUCT(copytup);
/*
- * Ordinarily phase 3 must ensure that no NULLs exist in columns that
- * are set NOT NULL; however, if we can find a constraint which proves
- * this then we can skip that. We needn't bother looking if we've
- * already found that we must verify some other NOT NULL constraint.
+ * If we find an appropriate constraint, we're almost done, but just
+ * need to change some properties on it: if we're recursing, increment
+ * coninhcount; if not, set conislocal if not already set.
*/
- if (!tab->verify_new_notnull &&
- !NotNullImpliedByRelConstraints(rel, (Form_pg_attribute) GETSTRUCT(tuple)))
+ if (recursing)
{
- /* Tell Phase 3 it needs to test the constraint */
- tab->verify_new_notnull = true;
+ conForm->coninhcount++;
+ changed = true;
+ }
+ else if (!conForm->conislocal)
+ {
+ conForm->conislocal = true;
+ changed = true;
}
- ObjectAddressSubSet(address, RelationRelationId,
- RelationGetRelid(rel), attnum);
+ if (changed)
+ {
+ CatalogTupleUpdate(constr_rel, &copytup->t_self, copytup);
+ ObjectAddressSet(address, ConstraintRelationId, conForm->oid);
+ }
+
+ systable_endscan(conscan);
+ table_close(constr_rel, RowExclusiveLock);
+
+ if (changed)
+ return address;
+ else
+ return InvalidObjectAddress;
}
- else
- address = InvalidObjectAddress;
+
+ systable_endscan(conscan);
+ table_close(constr_rel, RowExclusiveLock);
+
+ /*
+ * If we're asked not to recurse, and children exist, raise an error for
+ * partitioned tables. For inheritance, we act as if NO INHERIT had been
+ * specified.
+ */
+ if (!recurse &&
+ find_inheritance_children(RelationGetRelid(rel),
+ NoLock) != NIL)
+ {
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be added to child tables too"),
+ errhint("Do not specify the ONLY keyword."));
+ else
+ is_no_inherit = true;
+ }
+
+ /*
+ * No constraint exists; we must add one. First determine a name to use,
+ * if we haven't already.
+ */
+ if (!recursing)
+ {
+ Assert(conName == NULL);
+ conName = ChooseConstraintName(RelationGetRelationName(rel),
+ colName, "not_null",
+ RelationGetNamespace(rel),
+ NIL);
+ }
+ constraint = makeNode(Constraint);
+ constraint->contype = CONSTR_NOTNULL;
+ constraint->conname = conName;
+ constraint->deferrable = false;
+ constraint->initdeferred = false;
+ constraint->location = -1;
+ constraint->keys = list_make1(makeString(colName));
+ constraint->is_no_inherit = is_no_inherit;
+ constraint->inhcount = recursing ? 1 : 0;
+ constraint->skip_validation = false;
+ constraint->initially_valid = true;
+
+ /* and do it */
+ cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint),
+ false, !recursing, false, NULL);
+ ccon = linitial(cooked);
+ ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel), attnum);
- table_close(attr_rel, RowExclusiveLock);
+ /*
+ * Mark pg_attribute.attnotnull for the column. Tell that function not to
+ * recurse, because we're going to do it here.
+ */
+ set_attnotnull(wqueue, rel, attnum, false, lockmode);
+
+ /*
+ * Recurse to propagate the constraint to children that don't have one.
+ */
+ if (recurse)
+ {
+ List *children;
+ ListCell *lc;
+
+ children = find_inheritance_children(RelationGetRelid(rel),
+ lockmode);
+
+ foreach(lc, children)
+ {
+ Relation childrel;
+
+ childrel = table_open(lfirst_oid(lc), NoLock);
+
+ ATExecSetNotNull(wqueue, childrel,
+ conName, colName, recurse, true,
+ readyRels, lockmode);
+
+ table_close(childrel, NoLock);
+ }
+ }
return address;
}
/*
- * ALTER TABLE ALTER COLUMN CHECK NOT NULL
+ * ALTER TABLE ALTER COLUMN SET ATTNOTNULL
*
- * This doesn't exist in the grammar, but we generate AT_CheckNotNull
- * commands against the partitions of a partitioned table if the user
- * writes ALTER TABLE ONLY ... SET NOT NULL on the partitioned table,
- * or tries to create a primary key on it (which internally creates
- * AT_SetNotNull on the partitioned table). Such a command doesn't
- * allow us to actually modify any partition, but we want to let it
- * go through if the partitions are already properly marked.
- *
- * In future, this might need to adjust the child table's state, likely
- * by incrementing an inheritance count for the attnotnull constraint.
- * For now we need only check for the presence of the flag.
+ * This doesn't exist in the grammar; it's used when creating a
+ * primary key and the column is not already marked attnotnull.
*/
-static void
-ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel,
- const char *colName, LOCKMODE lockmode)
+static ObjectAddress
+ATExecSetAttNotNull(List **wqueue, Relation rel,
+ const char *colName, LOCKMODE lockmode)
{
- HeapTuple tuple;
-
- tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+ AttrNumber attnum;
+ ObjectAddress address = InvalidObjectAddress;
- if (!HeapTupleIsValid(tuple))
+ attnum = get_attnum(RelationGetRelid(rel), colName);
+ if (attnum == InvalidAttrNumber)
ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- colName, RelationGetRelationName(rel))));
+ errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel)));
- if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("constraint must be added to child tables too"),
- errdetail("Column \"%s\" of relation \"%s\" is not already NOT NULL.",
- colName, RelationGetRelationName(rel)),
- errhint("Do not specify the ONLY keyword.")));
+ /*
+ * Make the change, if necessary, and only if so report the column as
+ * changed
+ */
+ if (set_attnotnull(wqueue, rel, attnum, false, lockmode))
+ ObjectAddressSubSet(address, RelationRelationId,
+ RelationGetRelid(rel), attnum);
- ReleaseSysCache(tuple);
+ return address;
}
/*
@@ -8678,6 +8915,71 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
}
/*
+ * Prepare to add a primary key on an inheritance parent, by adding NOT NULL
+ * constraint on its children.
+ */
+static void
+ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd,
+ LOCKMODE lockmode, AlterTableUtilityContext *context)
+{
+ List *children;
+ List *newconstrs = NIL;
+ ListCell *lc;
+ IndexStmt *stmt;
+
+ /* No work if no legacy inheritance children are present */
+ if (rel->rd_rel->relkind != RELKIND_RELATION ||
+ !rel->rd_rel->relhassubclass)
+ return;
+
+ children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+
+ stmt = castNode(IndexStmt, cmd->def);
+ foreach(lc, stmt->indexParams)
+ {
+ IndexElem *elem = lfirst_node(IndexElem, lc);
+ Constraint *nnconstr;
+
+ Assert(elem->expr == NULL);
+
+ nnconstr = makeNode(Constraint);
+ nnconstr->contype = CONSTR_NOTNULL;
+ nnconstr->conname = NULL; /* XXX use PK name? */
+ nnconstr->inhcount = 1;
+ nnconstr->deferrable = false;
+ nnconstr->initdeferred = false;
+ nnconstr->location = -1;
+ nnconstr->keys = list_make1(makeString(elem->name));
+ nnconstr->skip_validation = false;
+ nnconstr->initially_valid = true;
+
+ newconstrs = lappend(newconstrs, nnconstr);
+ }
+
+ foreach(lc, children)
+ {
+ Oid childrelid = lfirst_oid(lc);
+ Relation childrel = table_open(childrelid, NoLock);
+ AlterTableCmd *newcmd = makeNode(AlterTableCmd);
+ ListCell *lc2;
+
+ newcmd->subtype = AT_AddConstraint;
+ newcmd->recurse = true;
+
+ foreach(lc2, newconstrs)
+ {
+ /* ATPrepCmd copies newcmd, so we can scribble on it here */
+ newcmd->def = lfirst(lc2);
+
+ ATPrepCmd(wqueue, childrel, newcmd,
+ true, false, lockmode, context);
+ }
+
+ table_close(childrel, NoLock);
+ }
+}
+
+/*
* ALTER TABLE ADD INDEX
*
* There is no such command in the grammar, but parse_utilcmd.c converts
@@ -8872,17 +9174,18 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Assert(IsA(newConstraint, Constraint));
/*
- * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes
- * arriving here (see the preprocessing done in parse_utilcmd.c). Use a
- * switch anyway to make it easier to add more code later.
+ * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and
+ * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in
+ * parse_utilcmd.c).
*/
switch (newConstraint->contype)
{
case CONSTR_CHECK:
+ case CONSTR_NOTNULL:
address =
- ATAddCheckConstraint(wqueue, tab, rel,
- newConstraint, recurse, false, is_readd,
- lockmode);
+ ATAddCheckNNConstraint(wqueue, tab, rel,
+ newConstraint, recurse, false, is_readd,
+ lockmode);
break;
case CONSTR_FOREIGN:
@@ -8963,9 +9266,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames)
}
/*
- * Add a check constraint to a single table and its children. Returns the
- * address of the constraint added to the parent relation, if one gets added,
- * or InvalidObjectAddress otherwise.
+ * Add a check or not-null constraint to a single table and its children.
+ * Returns the address of the constraint added to the parent relation,
+ * if one gets added, or InvalidObjectAddress otherwise.
*
* Subroutine for ATExecAddConstraint.
*
@@ -8978,9 +9281,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames)
* the parent table and pass that down.
*/
static ObjectAddress
-ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
- Constraint *constr, bool recurse, bool recursing,
- bool is_readd, LOCKMODE lockmode)
+ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Constraint *constr, bool recurse, bool recursing,
+ bool is_readd, LOCKMODE lockmode)
{
List *newcons;
ListCell *lcon;
@@ -9018,7 +9321,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
{
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- if (!ccon->skip_validation)
+ if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL)
{
NewConstraint *newcon;
@@ -9034,11 +9337,19 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (constr->conname == NULL)
constr->conname = ccon->name;
+ /*
+ * If adding a not-null constraint, set the pg_attribute flag and tell
+ * phase 3 to verify existing rows, if needed.
+ */
+ if (constr->contype == CONSTR_NOTNULL)
+ set_attnotnull(wqueue, rel, ccon->attnum,
+ !ccon->is_no_inherit, lockmode);
+
ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
}
/* At this point we must have a locked-down name to use */
- Assert(constr->conname != NULL);
+ Assert(newcons == NIL || constr->conname != NULL);
/* Advance command counter in case same table is visited multiple times */
CommandCounterIncrement();
@@ -9076,6 +9387,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("constraint must be added to child tables too")));
+ /*
+ * The constraint must appear as inherited in children, so create a
+ * modified constraint object to use.
+ */
+ constr = copyObject(constr);
+ constr->inhcount = 1;
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
@@ -9089,9 +9406,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* Find or create work queue entry for this table */
childtab = ATGetQueueEntry(wqueue, childrel);
- /* Recurse to child */
- ATAddCheckConstraint(wqueue, childtab, childrel,
- constr, recurse, true, is_readd, lockmode);
+ /*
+ * Recurse to child. XXX if we didn't create a constraint on the
+ * parent because it already existed, and we do create one on a child,
+ * should we return that child's constraint ObjectAddress here?
+ */
+ ATAddCheckNNConstraint(wqueue, childtab, childrel,
+ constr, recurse, true, is_readd, lockmode);
table_close(childrel, NoLock);
}
@@ -11958,16 +12279,11 @@ ATExecDropConstraint(Relation rel, const char *constrName,
bool recurse, bool recursing,
bool missing_ok, LOCKMODE lockmode)
{
- List *children;
- ListCell *child;
Relation conrel;
- Form_pg_constraint con;
SysScanDesc scan;
ScanKeyData skey[3];
HeapTuple tuple;
bool found = false;
- bool is_no_inherit_constraint = false;
- char contype;
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
@@ -11996,80 +12312,238 @@ ATExecDropConstraint(Relation rel, const char *constrName,
/* There can be at most one matching row */
if (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
- ObjectAddress conobj;
+ List *readyRels = NIL;
+
+ dropconstraint_internal(rel, tuple, behavior, recurse, recursing,
+ missing_ok, &readyRels, lockmode);
+ found = true;
+ }
- con = (Form_pg_constraint) GETSTRUCT(tuple);
+ systable_endscan(scan);
- /* Don't drop inherited constraints */
- if (con->coninhcount > 0 && !recursing)
+ if (!found)
+ {
+ if (!missing_ok)
ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
- constrName, RelationGetRelationName(rel))));
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel)));
+ else
+ ereport(NOTICE,
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping",
+ constrName, RelationGetRelationName(rel)));
+ }
- is_no_inherit_constraint = con->connoinherit;
- contype = con->contype;
+ table_close(conrel, RowExclusiveLock);
+}
- /*
- * If it's a foreign-key constraint, we'd better lock the referenced
- * table and check that that's not in use, just as we've already done
- * for the constrained table (else we might, eg, be dropping a trigger
- * that has unfired events). But we can/must skip that in the
- * self-referential case.
- */
- if (contype == CONSTRAINT_FOREIGN &&
- con->confrelid != RelationGetRelid(rel))
- {
- Relation frel;
+/*
+ * Remove a constraint, using its pg_constraint tuple
+ *
+ * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN
+ * DROP NOT NULL.
+ *
+ * Returns the address of the constraint being removed.
+ */
+static ObjectAddress
+dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior,
+ bool recurse, bool recursing, bool missing_ok, List **readyRels,
+ LOCKMODE lockmode)
+{
+ Relation conrel;
+ Form_pg_constraint con;
+ ObjectAddress conobj;
+ List *children;
+ ListCell *child;
+ bool is_no_inherit_constraint = false;
+ bool dropping_pk = false;
+ char *constrName;
+ List *unconstrained_cols = NIL;
+ char *colname;
- /* Must match lock taken by RemoveTriggerById: */
- frel = table_open(con->confrelid, AccessExclusiveLock);
- CheckTableNotInUse(frel, "ALTER TABLE");
- table_close(frel, NoLock);
- }
+ if (list_member_oid(*readyRels, RelationGetRelid(rel)))
+ return InvalidObjectAddress;
+ *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel));
- /*
- * Perform the actual constraint deletion
- */
- conobj.classId = ConstraintRelationId;
- conobj.objectId = con->oid;
- conobj.objectSubId = 0;
+ conrel = table_open(ConstraintRelationId, RowExclusiveLock);
- performDeletion(&conobj, behavior, 0);
+ con = (Form_pg_constraint) GETSTRUCT(constraintTup);
+ constrName = NameStr(con->conname);
- found = true;
+ /* Don't allow drop of inherited constraints */
+ if (con->coninhcount > 0 && !recursing)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
+ constrName, RelationGetRelationName(rel))));
+
+ /*
+ * See if we have a not-null constraint or a PRIMARY KEY. If so, we have
+ * more checks and actions below, so obtain the list of columns that are
+ * constrained by the constraint being dropped.
+ */
+ if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ AttrNumber colnum = extractNotNullColumn(constraintTup);
+
+ if (colnum != InvalidAttrNumber)
+ unconstrained_cols = list_make1_int(colnum);
}
+ else if (con->contype == CONSTRAINT_PRIMARY)
+ {
+ Datum adatum;
+ ArrayType *arr;
+ int numkeys;
+ bool isNull;
+ int16 *attnums;
- systable_endscan(scan);
+ dropping_pk = true;
- if (!found)
+ adatum = heap_getattr(constraintTup, Anum_pg_constraint_conkey,
+ RelationGetDescr(conrel), &isNull);
+ if (isNull)
+ elog(ERROR, "null conkey for constraint %u", con->oid);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ numkeys = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ numkeys < 0 ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "conkey is not a 1-D smallint array");
+ attnums = (int16 *) ARR_DATA_PTR(arr);
+
+ for (int i = 0; i < numkeys; i++)
+ unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]);
+ }
+
+ is_no_inherit_constraint = con->connoinherit;
+
+ /*
+ * If it's a foreign-key constraint, we'd better lock the referenced table
+ * and check that that's not in use, just as we've already done for the
+ * constrained table (else we might, eg, be dropping a trigger that has
+ * unfired events). But we can/must skip that in the self-referential
+ * case.
+ */
+ if (con->contype == CONSTRAINT_FOREIGN &&
+ con->confrelid != RelationGetRelid(rel))
{
- if (!missing_ok)
- {
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" of relation \"%s\" does not exist",
- constrName, RelationGetRelationName(rel))));
- }
- else
+ Relation frel;
+
+ /* Must match lock taken by RemoveTriggerById: */
+ frel = table_open(con->confrelid, AccessExclusiveLock);
+ CheckTableNotInUse(frel, "ALTER TABLE");
+ table_close(frel, NoLock);
+ }
+
+ /*
+ * Perform the actual constraint deletion
+ */
+ ObjectAddressSet(conobj, ConstraintRelationId, con->oid);
+ performDeletion(&conobj, behavior, 0);
+
+ /*
+ * If this was a NOT NULL or the primary key, the constrained columns must
+ * have had pg_attribute.attnotnull set. See if we need to reset it, and
+ * do so.
+ */
+ if (unconstrained_cols)
+ {
+ Relation attrel;
+ Bitmapset *pkcols;
+ Bitmapset *ircols;
+ ListCell *lc;
+
+ /* Make the above deletion visible */
+ CommandCounterIncrement();
+
+ attrel = table_open(AttributeRelationId, RowExclusiveLock);
+
+ /*
+ * We want to test columns for their presence in the primary key, but
+ * only if we're not dropping it.
+ */
+ pkcols = dropping_pk ? NULL :
+ RelationGetIndexAttrBitmap(rel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+ ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+ foreach(lc, unconstrained_cols)
{
- ereport(NOTICE,
- (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping",
- constrName, RelationGetRelationName(rel))));
- table_close(conrel, RowExclusiveLock);
- return;
+ AttrNumber attnum = lfirst_int(lc);
+ HeapTuple atttup;
+ HeapTuple contup;
+ Form_pg_attribute attForm;
+
+ /*
+ * Obtain pg_attribute tuple and verify conditions on it. We use
+ * a copy we can scribble on.
+ */
+ atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
+ if (!HeapTupleIsValid(atttup))
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ attnum, RelationGetRelid(rel));
+ attForm = (Form_pg_attribute) GETSTRUCT(atttup);
+
+ /*
+ * Since the above deletion has been made visible, we can now
+ * search for any remaining constraints on this column (or these
+ * columns, in the case we're dropping a multicol primary key.)
+ * Then, verify whether any further NOT NULL or primary key
+ * exists, and reset attnotnull if none.
+ *
+ * However, if this is a generated identity column, abort the
+ * whole thing with a specific error message, because the
+ * constraint is required in that case.
+ */
+ contup = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum);
+ if (contup ||
+ bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
+ pkcols))
+ continue;
+
+ /*
+ * It's not valid to drop the not-null constraint for a GENERATED
+ * AS IDENTITY column.
+ */
+ if (attForm->attidentity)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("column \"%s\" of relation \"%s\" is an identity column",
+ get_attname(RelationGetRelid(rel), attnum,
+ false),
+ RelationGetRelationName(rel)));
+
+ /*
+ * It's not valid to drop the not-null constraint for a column in
+ * the replica identity index, either. (FULL is not affected.)
+ */
+ if (bms_is_member(lfirst_int(lc) - FirstLowInvalidHeapAttributeNumber, ircols))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" is in index used as replica identity",
+ get_attname(RelationGetRelid(rel), lfirst_int(lc), false)));
+
+ /* Reset attnotnull */
+ if (attForm->attnotnull)
+ {
+ attForm->attnotnull = false;
+ CatalogTupleUpdate(attrel, &atttup->t_self, atttup);
+ }
}
+ table_close(attrel, RowExclusiveLock);
}
/*
- * For partitioned tables, non-CHECK inherited constraints are dropped via
- * the dependency mechanism, so we're done here.
+ * For partitioned tables, non-CHECK, non-NOT-NULL inherited constraints
+ * are dropped via the dependency mechanism, so we're done here.
*/
- if (contype != CONSTRAINT_CHECK &&
+ if (con->contype != CONSTRAINT_CHECK &&
+ con->contype != CONSTRAINT_NOTNULL &&
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
table_close(conrel, RowExclusiveLock);
- return;
+ return conobj;
}
/*
@@ -12094,52 +12568,79 @@ ATExecDropConstraint(Relation rel, const char *constrName,
errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
errhint("Do not specify the ONLY keyword.")));
+ /* For not-null constraints we recurse by column name */
+ if (con->contype == CONSTRAINT_NOTNULL)
+ colname = NameStr(TupleDescAttr(RelationGetDescr(rel),
+ linitial_int(unconstrained_cols) - 1)->attname);
+ else
+ colname = NULL; /* keep compiler quiet */
+
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
Relation childrel;
- HeapTuple copy_tuple;
+ HeapTuple tuple;
+ Form_pg_constraint childcon;
+
+ if (list_member_oid(*readyRels, childrelid))
+ continue; /* child already processed */
/* find_inheritance_children already got lock */
childrel = table_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
- ScanKeyInit(&skey[0],
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(childrelid));
- ScanKeyInit(&skey[1],
- Anum_pg_constraint_contypid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(InvalidOid));
- ScanKeyInit(&skey[2],
- Anum_pg_constraint_conname,
- BTEqualStrategyNumber, F_NAMEEQ,
- CStringGetDatum(constrName));
- scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
- true, NULL, 3, skey);
-
- /* There can be at most one matching row */
- if (!HeapTupleIsValid(tuple = systable_getnext(scan)))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" of relation \"%s\" does not exist",
- constrName,
- RelationGetRelationName(childrel))));
-
- copy_tuple = heap_copytuple(tuple);
-
- systable_endscan(scan);
+ /*
+ * We search for not-null constraint by column number, and other
+ * constraints by name.
+ */
+ if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ tuple = findNotNullConstraint(childrelid, colname);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation %u",
+ colname, RelationGetRelid(childrel));
+ }
+ else
+ {
+ SysScanDesc scan;
+ ScanKeyData skey[3];
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(childrelid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(InvalidOid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(constrName));
+ scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
+ true, NULL, 3, skey);
+ /* There can only be one, so no need to loop */
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName,
+ RelationGetRelationName(childrel))));
+ tuple = heap_copytuple(tuple);
+ systable_endscan(scan);
+ }
- con = (Form_pg_constraint) GETSTRUCT(copy_tuple);
+ childcon = (Form_pg_constraint) GETSTRUCT(tuple);
- /* Right now only CHECK constraints can be inherited */
- if (con->contype != CONSTRAINT_CHECK)
- elog(ERROR, "inherited constraint is not a CHECK constraint");
+ /* Right now only CHECK and not-null constraints can be inherited */
+ if (childcon->contype != CONSTRAINT_CHECK &&
+ childcon->contype != CONSTRAINT_NOTNULL)
+ elog(ERROR, "inherited constraint is not a CHECK or not-null constraint");
- if (con->coninhcount <= 0) /* shouldn't happen */
+ if (childcon->coninhcount <= 0) /* shouldn't happen */
elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
- childrelid, constrName);
+ childrelid, NameStr(childcon->conname));
if (recurse)
{
@@ -12147,18 +12648,18 @@ ATExecDropConstraint(Relation rel, const char *constrName,
* If the child constraint has other definition sources, just
* decrement its inheritance count; if not, recurse to delete it.
*/
- if (con->coninhcount == 1 && !con->conislocal)
+ if (childcon->coninhcount == 1 && !childcon->conislocal)
{
/* Time to delete this child constraint, too */
- ATExecDropConstraint(childrel, constrName, behavior,
- true, true,
- false, lockmode);
+ dropconstraint_internal(childrel, tuple, behavior,
+ recurse, true, missing_ok, readyRels,
+ lockmode);
}
else
{
/* Child constraint must survive my deletion */
- con->coninhcount--;
- CatalogTupleUpdate(conrel, &copy_tuple->t_self, copy_tuple);
+ childcon->coninhcount--;
+ CatalogTupleUpdate(conrel, &tuple->t_self, tuple);
/* Make update visible */
CommandCounterIncrement();
@@ -12167,25 +12668,94 @@ ATExecDropConstraint(Relation rel, const char *constrName,
else
{
/*
- * If we were told to drop ONLY in this table (no recursion), we
- * need to mark the inheritors' constraints as locally defined
- * rather than inherited.
+ * If we were told to drop ONLY in this table (no recursion) and
+ * there are no further parents for this constraint, we need to
+ * mark the inheritors' constraints as locally defined rather than
+ * inherited.
*/
- con->coninhcount--;
- con->conislocal = true;
+ childcon->coninhcount--;
+ if (childcon->coninhcount == 0)
+ childcon->conislocal = true;
- CatalogTupleUpdate(conrel, &copy_tuple->t_self, copy_tuple);
+ CatalogTupleUpdate(conrel, &tuple->t_self, tuple);
/* Make update visible */
CommandCounterIncrement();
}
- heap_freetuple(copy_tuple);
+ heap_freetuple(tuple);
table_close(childrel, NoLock);
}
+ /*
+ * In addition, when dropping a primary key from a legacy-inheritance
+ * parent table, we must recurse to children to mark the corresponding NOT
+ * NULL constraint as no longer inherited, or drop it if this its last
+ * reference.
+ */
+ if (con->contype == CONSTRAINT_PRIMARY &&
+ rel->rd_rel->relkind == RELKIND_RELATION &&
+ rel->rd_rel->relhassubclass)
+ {
+ List *colnames = NIL;
+ ListCell *lc;
+ List *pkready = NIL;
+
+ /*
+ * Because primary keys are always marked as NO INHERIT, we don't have
+ * a list of children yet, so obtain one now.
+ */
+ children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+
+ /*
+ * Find out the list of column names to process. Fortunately, we
+ * already have the list of column numbers.
+ */
+ foreach(lc, unconstrained_cols)
+ {
+ colnames = lappend(colnames, get_attname(RelationGetRelid(rel),
+ lfirst_int(lc), false));
+ }
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ if (list_member_oid(pkready, childrelid))
+ continue; /* child already processed */
+
+ /* find_inheritance_children already got lock */
+ childrel = table_open(childrelid, NoLock);
+ CheckTableNotInUse(childrel, "ALTER TABLE");
+
+ foreach(lc, colnames)
+ {
+ HeapTuple contup;
+ char *colName = lfirst(lc);
+
+ contup = findNotNullConstraint(childrelid, colName);
+ if (contup == NULL)
+ elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\", relation \"%s\"",
+ colName, RelationGetRelationName(childrel));
+
+ dropconstraint_internal(childrel, contup,
+ DROP_RESTRICT, true, true,
+ false, &pkready,
+ lockmode);
+ pkready = NIL;
+ }
+
+ table_close(childrel, NoLock);
+
+ pkready = lappend_oid(pkready, childrelid);
+ }
+ }
+
table_close(conrel, RowExclusiveLock);
+
+ return conobj;
}
/*
@@ -13262,9 +13832,10 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
/*
* If the constraint is inherited (only), we don't want to inject a
- * new definition here; it'll get recreated when ATAddCheckConstraint
- * recurses from adding the parent table's constraint. But we had to
- * carry the info this far so that we can drop the constraint below.
+ * new definition here; it'll get recreated when
+ * ATAddCheckNNConstraint recurses from adding the parent table's
+ * constraint. But we had to carry the info this far so that we can
+ * drop the constraint below.
*/
if (!conislocal)
continue;
@@ -13511,10 +14082,10 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd,
NIL,
con->conname);
}
- else if (cmd->subtype == AT_SetNotNull)
+ else if (cmd->subtype == AT_SetAttNotNull)
{
/*
- * The parser will create AT_SetNotNull subcommands for
+ * The parser will create AT_AttSetNotNull subcommands for
* columns of PRIMARY KEY indexes/constraints, but we need
* not do anything with them here, because the columns'
* NOT NULL marks will already have been propagated into
@@ -14988,6 +15559,13 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode)
/* OK to create inheritance */
CreateInheritance(child_rel, parent_rel);
+ /*
+ * If parent_rel has a primary key, then child_rel has not-null
+ * constraints that make these columns as non nullable. Make those
+ * constraints as inherited.
+ */
+ ATInheritAdjustNotNulls(parent_rel, child_rel, 1);
+
ObjectAddressSet(address, RelationRelationId,
RelationGetRelid(parent_rel));
@@ -15181,13 +15759,21 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
/*
* Check child doesn't discard NOT NULL property. (Other
- * constraints are checked elsewhere.)
+ * constraints are checked elsewhere.) However, if the constraint
+ * is NO INHERIT in the parent, this is allowed.
*/
if (attribute->attnotnull && !childatt->attnotnull)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("column \"%s\" in child table must be marked NOT NULL",
- attributeName)));
+ {
+ HeapTuple contup;
+
+ contup = findNotNullConstraintAttnum(RelationGetRelid(parent_rel),
+ attribute->attnum);
+ if (!((Form_pg_constraint) GETSTRUCT(contup))->connoinherit)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" in child table must be marked NOT NULL",
+ attributeName)));
+ }
/*
* Child column must be generated if and only if parent column is.
@@ -15264,6 +15850,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
SysScanDesc parent_scan;
ScanKeyData parent_key;
HeapTuple parent_tuple;
+ Oid parent_relid = RelationGetRelid(parent_rel);
bool child_is_partition = false;
catalog_relation = table_open(ConstraintRelationId, RowExclusiveLock);
@@ -15277,7 +15864,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
ScanKeyInit(&parent_key,
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(parent_rel)));
+ ObjectIdGetDatum(parent_relid));
parent_scan = systable_beginscan(catalog_relation, ConstraintRelidTypidNameIndexId,
true, NULL, 1, &parent_key);
@@ -15289,7 +15876,8 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
HeapTuple child_tuple;
bool found = false;
- if (parent_con->contype != CONSTRAINT_CHECK)
+ if (parent_con->contype != CONSTRAINT_CHECK &&
+ parent_con->contype != CONSTRAINT_NOTNULL)
continue;
/* if the parent's constraint is marked NO INHERIT, it's not inherited */
@@ -15309,22 +15897,49 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple);
HeapTuple child_copy;
- if (child_con->contype != CONSTRAINT_CHECK)
+ if (child_con->contype != parent_con->contype)
continue;
- if (strcmp(NameStr(parent_con->conname),
+ /*
+ * CHECK constraint are matched by name, NOT NULL ones by
+ * attribute number
+ */
+ if (child_con->contype == CONSTRAINT_CHECK &&
+ strcmp(NameStr(parent_con->conname),
NameStr(child_con->conname)) != 0)
continue;
+ else if (child_con->contype == CONSTRAINT_NOTNULL)
+ {
+ AttrNumber parent_attno = extractNotNullColumn(parent_tuple);
+ AttrNumber child_attno = extractNotNullColumn(child_tuple);
+
+ if (strcmp(get_attname(parent_relid, parent_attno, false),
+ get_attname(RelationGetRelid(child_rel), child_attno,
+ false)) != 0)
+ continue;
+ }
- if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc))
+ if (child_con->contype == CONSTRAINT_CHECK &&
+ !constraints_equivalent(parent_tuple, child_tuple, tuple_desc))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different definition for check constraint \"%s\"",
RelationGetRelationName(child_rel),
NameStr(parent_con->conname))));
- /* If the child constraint is "no inherit" then cannot merge */
- if (child_con->connoinherit)
+ /*
+ * If the CHECK child constraint is "no inherit" then cannot
+ * merge.
+ *
+ * This is not desirable for not-null constraints, mostly because
+ * it breaks our pg_upgrade strategy, but it also makes sense on
+ * its own: if a child has its own not-null constraint and then
+ * acquires a parent with the same constraint, then we start to
+ * enforce that constraint for all the descendants of that child
+ * too, if any.
+ */
+ if (child_con->contype == CONSTRAINT_CHECK &&
+ child_con->connoinherit)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("constraint \"%s\" conflicts with non-inherited constraint on child table \"%s\"",
@@ -15353,6 +15968,9 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
ereport(ERROR,
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many inheritance parents"));
+ if (child_con->contype == CONSTRAINT_NOTNULL &&
+ child_con->connoinherit)
+ child_con->connoinherit = false;
/*
* In case of partitions, an inherited constraint must be
@@ -15416,6 +16034,18 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
/* Off to RemoveInheritance() where most of the work happens */
RemoveInheritance(rel, parent_rel, false);
+ /*
+ * If parent_rel has a primary key, then child_rel has not-null
+ * constraints that make these columns as non nullable. Mark those
+ * constraints as no longer inherited by this parent.
+ */
+ ATInheritAdjustNotNulls(parent_rel, rel, -1);
+
+ /*
+ * If the parent has a primary key, then we decrement counts for all NOT
+ * NULL constraints
+ */
+
ObjectAddressSet(address, RelationRelationId,
RelationGetRelid(parent_rel));
@@ -15524,6 +16154,7 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
HeapTuple attributeTuple,
constraintTuple;
List *connames;
+ List *nncolumns;
bool found;
bool child_is_partition = false;
@@ -15594,6 +16225,8 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
* this, we first need a list of the names of the parent's check
* constraints. (We cheat a bit by only checking for name matches,
* assuming that the expressions will match.)
+ *
+ * For NOT NULL columns, we store column numbers to match.
*/
catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
@@ -15604,6 +16237,7 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
true, NULL, 1, key);
connames = NIL;
+ nncolumns = NIL;
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
@@ -15611,6 +16245,8 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
if (con->contype == CONSTRAINT_CHECK)
connames = lappend(connames, pstrdup(NameStr(con->conname)));
+ if (con->contype == CONSTRAINT_NOTNULL)
+ nncolumns = lappend_int(nncolumns, extractNotNullColumn(constraintTuple));
}
systable_endscan(scan);
@@ -15626,21 +16262,40 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
- bool match;
+ bool match = false;
ListCell *lc;
- if (con->contype != CONSTRAINT_CHECK)
- continue;
-
- match = false;
- foreach(lc, connames)
+ /*
+ * Match CHECK constraints by name, not-null constraints by column
+ * number, and ignore all others.
+ */
+ if (con->contype == CONSTRAINT_CHECK)
{
- if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0)
+ foreach(lc, connames)
{
- match = true;
- break;
+ if (con->contype == CONSTRAINT_CHECK &&
+ strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0)
+ {
+ match = true;
+ break;
+ }
+ }
+ }
+ else if (con->contype == CONSTRAINT_NOTNULL)
+ {
+ AttrNumber child_attno = extractNotNullColumn(constraintTuple);
+
+ foreach(lc, nncolumns)
+ {
+ if (lfirst_int(lc) == child_attno)
+ {
+ match = true;
+ break;
+ }
}
}
+ else
+ continue;
if (match)
{
@@ -15680,6 +16335,54 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
}
/*
+ * Adjust coninhcount of not-null constraints upwards or downwards when a
+ * table is marked as inheriting or no longer doing so a table with a primary
+ * key.
+ *
+ * Note: these constraints are not dropped, even if their inhcount goes to zero
+ * and conislocal is false. Instead we mark the constraints as locally defined.
+ * This is seen as more useful behavior, with no downsides. The user can always
+ * drop them afterwards.
+ */
+static void
+ATInheritAdjustNotNulls(Relation parent_rel, Relation child_rel, int inhcount)
+{
+ Bitmapset *pkattnos;
+
+ /* Quick exit when parent has no PK */
+ if (!parent_rel->rd_rel->relhasindex)
+ return;
+
+ pkattnos = RelationGetIndexAttrBitmap(parent_rel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+ if (pkattnos != NULL)
+ {
+ Bitmapset *childattnums = NULL;
+ AttrMap *attmap;
+ int i;
+
+ attmap = build_attrmap_by_name(RelationGetDescr(parent_rel),
+ RelationGetDescr(child_rel), true);
+
+ i = -1;
+ while ((i = bms_next_member(pkattnos, i)) >= 0)
+ {
+ childattnums = bms_add_member(childattnums,
+ attmap->attnums[i + FirstLowInvalidHeapAttributeNumber - 1]);
+ }
+
+ /*
+ * CCI is needed in case there's a NOT NULL PRIMARY KEY column in the
+ * parent: the relevant not-null constraint in the child already had
+ * its inhcount modified earlier.
+ */
+ CommandCounterIncrement();
+ AdjustNotNullInheritance(RelationGetRelid(child_rel), childattnums,
+ inhcount);
+ }
+}
+
+/*
* Drop the dependency created by StoreCatalogInheritance1 (CREATE TABLE
* INHERITS/ALTER TABLE INHERIT -- refclassid will be RelationRelationId) or
* heap_create_with_catalog (CREATE TABLE OF/ALTER TABLE OF -- refclassid will
@@ -17530,7 +18233,7 @@ ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partParams, AttrNu
* Do scanrel's existing constraints imply the partition constraint?
*
* "Existing constraints" include its check constraints and column-level
- * NOT NULL constraints. partConstraint describes the partition constraint,
+ * not-null constraints. partConstraint describes the partition constraint,
* in implicit-AND form.
*/
bool
@@ -17910,7 +18613,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
StorePartitionBound(attachrel, rel, cmd->bound);
/* Ensure there exists a correct set of indexes in the partition. */
- AttachPartitionEnsureIndexes(rel, attachrel);
+ AttachPartitionEnsureIndexes(wqueue, rel, attachrel);
/* and triggers */
CloneRowTriggersToPartition(rel, attachrel);
@@ -18023,13 +18726,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
* partitioned table.
*/
static void
-AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
+AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
{
List *idxes;
List *attachRelIdxs;
Relation *attachrelIdxRels;
IndexInfo **attachInfos;
- int i;
ListCell *cell;
MemoryContext cxt;
MemoryContext oldcxt;
@@ -18045,14 +18747,13 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs));
/* Build arrays of all existing indexes and their IndexInfos */
- i = 0;
foreach(cell, attachRelIdxs)
{
Oid cldIdxId = lfirst_oid(cell);
+ int i = foreach_current_index(cell);
attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock);
attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]);
- i++;
}
/*
@@ -18118,7 +18819,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
* the first matching, valid, unattached one we find, if any, as
* partition of the parent index. If we find one, we're done.
*/
- for (i = 0; i < list_length(attachRelIdxs); i++)
+ for (int i = 0; i < list_length(attachRelIdxs); i++)
{
Oid cldIdxId = RelationGetRelid(attachrelIdxRels[i]);
Oid cldConstrOid = InvalidOid;
@@ -18178,6 +18879,28 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
stmt = generateClonedIndexStmt(NULL,
idxRel, attmap,
&conOid);
+
+ /*
+ * If the index is a primary key, mark all columns as NOT NULL if
+ * they aren't already.
+ */
+ if (stmt->primary)
+ {
+ MemoryContextSwitchTo(oldcxt);
+ for (int j = 0; j < info->ii_NumIndexKeyAttrs; j++)
+ {
+ AttrNumber childattno;
+
+ childattno = get_attnum(RelationGetRelid(attachrel),
+ get_attname(RelationGetRelid(rel),
+ info->ii_IndexAttrNumbers[j],
+ false));
+ set_attnotnull(wqueue, attachrel, childattno,
+ true, AccessExclusiveLock);
+ }
+ MemoryContextSwitchTo(cxt);
+ }
+
DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid,
RelationGetRelid(idxRel),
conOid,
@@ -18190,7 +18913,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
out:
/* Clean up. */
- for (i = 0; i < list_length(attachRelIdxs); i++)
+ for (int i = 0; i < list_length(attachRelIdxs); i++)
index_close(attachrelIdxRels[i], AccessShareLock);
MemoryContextSwitchTo(oldcxt);
MemoryContextDelete(cxt);
@@ -18821,8 +19544,8 @@ DetachAddConstraintIfNeeded(List **wqueue, Relation partRel)
n->initially_valid = true;
n->skip_validation = true;
/* It's a re-add, since it nominally already exists */
- ATAddCheckConstraint(wqueue, tab, partRel, n,
- true, false, true, ShareUpdateExclusiveLock);
+ ATAddCheckNNConstraint(wqueue, tab, partRel, n,
+ true, false, true, ShareUpdateExclusiveLock);
}
}
@@ -19091,6 +19814,13 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
RelationGetRelationName(partIdx))));
}
+ /*
+ * If it's a primary key, make sure the columns in the partition are
+ * NOT NULL.
+ */
+ if (parentIdx->rd_index->indisprimary)
+ verifyPartitionIndexNotNull(childInfo, partTbl);
+
/* All good -- do it */
IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx));
if (OidIsValid(constraintOid))
@@ -19235,6 +19965,29 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
}
/*
+ * When attaching an index as a partition of a partitioned index which is a
+ * primary key, verify that all the columns in the partition are marked NOT
+ * NULL.
+ */
+static void
+verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition)
+{
+ for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition),
+ iinfo->ii_IndexAttrNumbers[i] - 1);
+
+ if (!att->attnotnull)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("invalid primary key definition"),
+ errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.",
+ NameStr(att->attname),
+ RelationGetRelationName(partition)));
+ }
+}
+
+/*
* Return an OID list of constraints that reference the given relation
* that are marked as having a parent constraints.
*/