aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c1326
1 files changed, 329 insertions, 997 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 03466e495ac..d9bbeafd82c 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -203,7 +203,7 @@ typedef struct AlteredTableInfo
typedef struct NewConstraint
{
char *name; /* Constraint name, or NULL if none */
- ConstrType contype; /* CHECK, FOREIGN */
+ ConstrType contype; /* CHECK or FOREIGN */
Oid refrelid; /* PK rel, if FOREIGN */
Oid refindid; /* OID of PK's index, if FOREIGN */
Oid conid; /* OID of pg_constraint entry, if FOREIGN */
@@ -350,8 +350,7 @@ static void truncate_check_activity(Relation rel);
static void RangeVarCallbackForTruncate(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static List *MergeAttributes(List *schema, List *supers, char relpersistence,
- bool is_partition, List **supconstr,
- List **additional_notnulls);
+ bool is_partition, List **supconstr);
static bool MergeCheckConstraint(List *constraints, char *name, Node *expr);
static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
@@ -432,16 +431,14 @@ static bool check_for_column_name_collision(Relation rel, const char *colname,
bool if_not_exists);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid);
-static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
- LOCKMODE lockmode);
-static void set_attnotnull(List **wqueue, Relation rel,
- AttrNumber attnum, bool recurse, LOCKMODE lockmode);
-static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel,
- char *constrname, char *colName,
- bool recurse, bool recursing,
- List **readyRels, LOCKMODE lockmode);
-static void ATExecSetAttNotNull(List **wqueue, Relation rel,
- const char *colName, LOCKMODE lockmode);
+static void ATPrepDropNotNull(Relation rel, bool recurse, bool recursing);
+static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode);
+static void ATPrepSetNotNull(List **wqueue, Relation rel,
+ AlterTableCmd *cmd, bool recurse, bool recursing,
+ LOCKMODE lockmode,
+ AlterTableUtilityContext *context);
+static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
+ const char *colName, LOCKMODE lockmode);
static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel,
const char *colName, LOCKMODE lockmode);
static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr);
@@ -544,11 +541,6 @@ static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
bool missing_ok, LOCKMODE lockmode);
-static ObjectAddress dropconstraint_internal(Relation rel,
- HeapTuple constraintTup, DropBehavior behavior,
- bool recurse, bool recursing,
- bool missing_ok, List **readyRels,
- LOCKMODE lockmode);
static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing,
@@ -624,7 +616,7 @@ static void RemoveInheritance(Relation child_rel, Relation parent_rel,
static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
PartitionCmd *cmd,
AlterTableUtilityContext *context);
-static void AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel);
+static void AttachPartitionEnsureIndexes(Relation rel, Relation attachrel);
static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
List *partConstraint,
bool validate_default);
@@ -642,7 +634,6 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx,
static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
-static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx);
static List *GetParentedForeignKeyRefs(Relation partition);
static void ATDetachCheckNoForeignKeyRefs(Relation partition);
static char GetAttributeCompression(Oid atttypid, char *compression);
@@ -680,10 +671,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
TupleDesc descriptor;
List *inheritOids;
List *old_constraints;
- List *old_notnulls;
List *rawDefaults;
List *cookedDefaults;
- List *nncols;
Datum reloptions;
ListCell *listptr;
AttrNumber attnum;
@@ -873,13 +862,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
MergeAttributes(stmt->tableElts, inheritOids,
stmt->relation->relpersistence,
stmt->partbound != NULL,
- &old_constraints, &old_notnulls);
+ &old_constraints);
/*
* Create a tuple descriptor from the relation schema. Note that this
- * deals with column names, types, and in-descriptor NOT NULL flags, but
- * not default values, NOT NULL or CHECK constraints; we handle those
- * below.
+ * deals with column names, types, and NOT NULL constraints, but not
+ * default values or CHECK constraints; we handle those below.
*/
descriptor = BuildDescForRelation(stmt->tableElts);
@@ -1262,17 +1250,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
AddRelationNewConstraints(rel, NIL, stmt->constraints,
true, true, false, queryString);
- /*
- * Finally, merge the NOT NULL constraints that are directly declared with
- * those that come from parent relations (making sure to count inheritance
- * appropriately for each), create them, and set the attnotnull flag on
- * columns that don't yet have it.
- */
- nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints,
- old_notnulls);
- foreach(listptr, nncols)
- set_attnotnull(NULL, rel, lfirst_int(listptr), false, NoLock);
-
ObjectAddressSet(address, RelationRelationId, relationId);
/*
@@ -2321,8 +2298,6 @@ storage_name(char c)
* Output arguments:
* 'supconstr' receives a list of constraints belonging to the parents,
* updated as necessary to be valid for the child.
- * 'nnconstraints' receives a list of CookedConstraints that corresponds to
- * constraints coming from inheritance parents.
*
* Return value:
* Completed schema list.
@@ -2353,10 +2328,7 @@ storage_name(char c)
*
* Constraints (including NOT NULL constraints) for the child table
* are the union of all relevant constraints, from both the child schema
- * and parent tables. In addition, in legacy inheritance, each column that
- * appears in a primary key in any of the parents also gets a NOT NULL
- * constraint (partitioning doesn't need this, because the PK itself gets
- * inherited.)
+ * and parent tables.
*
* The default value for a child column is defined as:
* (1) If the child schema specifies a default, that value is used.
@@ -2375,11 +2347,10 @@ storage_name(char c)
*/
static List *
MergeAttributes(List *schema, List *supers, char relpersistence,
- bool is_partition, List **supconstr, List **supnotnulls)
+ bool is_partition, List **supconstr)
{
List *inhSchema = NIL;
List *constraints = NIL;
- List *nnconstraints = NIL;
bool have_bogus_defaults = false;
int child_attno;
static Node bogus_marker = {0}; /* marks conflicting defaults */
@@ -2491,13 +2462,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
AttrMap *newattmap;
List *inherited_defaults;
List *cols_with_defaults;
- List *nnconstrs;
AttrNumber parent_attno;
ListCell *lc1;
ListCell *lc2;
- Bitmapset *pkattrs;
- Bitmapset *nncols = NULL;
-
/* caller already got lock */
relation = table_open(parent, NoLock);
@@ -2586,20 +2553,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/* We can't process inherited defaults until newattmap is complete. */
inherited_defaults = cols_with_defaults = NIL;
- /*
- * All columns that are part of the parent's primary key need to be
- * NOT NULL; if partition just the attnotnull bit, otherwise a full
- * constraint (if they don't have one already). Also, we request
- * attnotnull on columns that have a NOT NULL constraint that's not
- * marked NO INHERIT.
- */
- pkattrs = RelationGetIndexAttrBitmap(relation,
- INDEX_ATTR_BITMAP_PRIMARY_KEY);
- nnconstrs = RelationGetNotNullConstraints(relation, true);
- foreach(lc1, nnconstrs)
- nncols = bms_add_member(nncols,
- ((CookedConstraint *) lfirst(lc1))->attnum);
-
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
@@ -2695,38 +2648,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
/*
- * In regular inheritance, columns in the parent's primary key
- * get an extra CHECK (NOT NULL) constraint. Partitioning
- * doesn't need this, because the PK itself is going to be
- * cloned to the partition.
- */
- if (!is_partition &&
- bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber,
- pkattrs))
- {
- CookedConstraint *nn;
-
- nn = palloc(sizeof(CookedConstraint));
- nn->contype = CONSTR_NOTNULL;
- nn->conoid = InvalidOid;
- nn->name = NULL;
- nn->attnum = exist_attno;
- nn->expr = NULL;
- nn->skip_validation = false;
- nn->is_local = false;
- nn->inhcount = 1;
- nn->is_no_inherit = false;
-
- nnconstraints = lappend(nnconstraints, nn);
- }
-
- /*
- * mark attnotnull if parent has it and it's not NO INHERIT
+ * Merge of NOT NULL constraints = OR 'em together
*/
- if (bms_is_member(parent_attno, nncols) ||
- bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber,
- pkattrs))
- def->is_not_null = true;
+ def->is_not_null |= attribute->attnotnull;
/*
* Check for GENERATED conflicts
@@ -2760,11 +2684,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
attribute->atttypmod);
def->inhcount = 1;
def->is_local = false;
- /* mark attnotnull if parent has it and it's not NO INHERIT */
- if (bms_is_member(parent_attno, nncols) ||
- bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber,
- pkattrs))
- def->is_not_null = true;
+ def->is_not_null = attribute->attnotnull;
def->is_from_type = false;
def->storage = attribute->attstorage;
def->raw_default = NULL;
@@ -2781,33 +2701,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
def->compression = NULL;
inhSchema = lappend(inhSchema, def);
newattmap->attnums[parent_attno - 1] = ++child_attno;
-
- /*
- * In regular inheritance, columns in the parent's primary key
- * get an extra NOT NULL constraint. Partitioning doesn't
- * need this, because the PK itself is going to be cloned to
- * the partition.
- */
- if (!is_partition &&
- bms_is_member(parent_attno -
- FirstLowInvalidHeapAttributeNumber,
- pkattrs))
- {
- CookedConstraint *nn;
-
- nn = palloc(sizeof(CookedConstraint));
- nn->contype = CONSTR_NOTNULL;
- nn->conoid = InvalidOid;
- nn->name = NULL;
- nn->attnum = newattmap->attnums[parent_attno - 1];
- nn->expr = NULL;
- nn->skip_validation = false;
- nn->is_local = false;
- nn->inhcount = 1;
- nn->is_no_inherit = false;
-
- nnconstraints = lappend(nnconstraints, nn);
- }
}
/*
@@ -2952,19 +2845,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
}
- /*
- * Also copy the NOT NULL constraints from this parent. The
- * attnotnull markings were already installed above.
- */
- foreach(lc1, nnconstrs)
- {
- CookedConstraint *nn = lfirst(lc1);
-
- nn->attnum = newattmap->attnums[nn->attnum - 1];
-
- nnconstraints = lappend(nnconstraints, nn);
- }
-
free_attrmap(newattmap);
/*
@@ -3171,7 +3051,8 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/*
* Now that we have the column definition list for a partition, we can
* check whether the columns referenced in the column constraint specs
- * actually exist.
+ * actually exist. Also, we merge parent's NOT NULL constraints and
+ * defaults into each corresponding column definition.
*/
if (is_partition)
{
@@ -3277,8 +3158,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
*supconstr = constraints;
- *supnotnulls = nnconstraints;
-
return schema;
}
@@ -3330,85 +3209,6 @@ MergeCheckConstraint(List *constraints, char *name, Node *expr)
return false;
}
-/*
- * RelationGetNotNullConstraints -- get list of NOT NULL constraints
- *
- * Caller can request cooked constraints, or raw.
- *
- * This is seldom needed, so we just scan pg_constraint each time.
- *
- * XXX This is only used to create derived tables, so NO INHERIT constraints
- * are always skipped.
- */
-List *
-RelationGetNotNullConstraints(Relation relation, bool cooked)
-{
- List *notnulls = NIL;
- Relation constrRel;
- HeapTuple htup;
- SysScanDesc conscan;
- ScanKeyData skey;
-
- constrRel = table_open(ConstraintRelationId, AccessShareLock);
- ScanKeyInit(&skey,
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(relation)));
- conscan = systable_beginscan(constrRel, ConstraintRelidTypidNameIndexId, true,
- NULL, 1, &skey);
-
- while (HeapTupleIsValid(htup = systable_getnext(conscan)))
- {
- Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(htup);
- AttrNumber colnum;
-
- if (conForm->contype != CONSTRAINT_NOTNULL)
- continue;
- if (conForm->connoinherit)
- continue;
-
- colnum = extractNotNullColumn(htup);
-
- if (cooked)
- {
- CookedConstraint *cooked;
-
- cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
-
- cooked->contype = CONSTR_NOTNULL;
- cooked->name = pstrdup(NameStr(conForm->conname));
- cooked->attnum = colnum;
- cooked->expr = NULL;
- cooked->skip_validation = false;
- cooked->is_local = true;
- cooked->inhcount = 0;
- cooked->is_no_inherit = conForm->connoinherit;
-
- notnulls = lappend(notnulls, cooked);
- }
- else
- {
- Constraint *constr;
-
- constr = makeNode(Constraint);
- constr->contype = CONSTR_NOTNULL;
- constr->conname = pstrdup(NameStr(conForm->conname));
- constr->deferrable = false;
- constr->initdeferred = false;
- constr->location = -1;
- constr->colname = get_attname(RelationGetRelid(relation),
- colnum, false);
- constr->skip_validation = false;
- constr->initially_valid = true;
- notnulls = lappend(notnulls, constr);
- }
- }
-
- systable_endscan(conscan);
- table_close(constrRel, AccessShareLock);
-
- return notnulls;
-}
/*
* StoreCatalogInheritance
@@ -3969,10 +3769,7 @@ rename_constraint_internal(Oid myrelid,
constraintOid);
con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (myrelid &&
- (con->contype == CONSTRAINT_CHECK ||
- con->contype == CONSTRAINT_NOTNULL) &&
- !con->connoinherit)
+ if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit)
{
if (recurse)
{
@@ -4557,7 +4354,6 @@ AlterTableGetLockLevel(List *cmds)
case AT_AddIndexConstraint:
case AT_ReplicaIdentity:
case AT_SetNotNull:
- case AT_SetAttNotNull:
case AT_EnableRowSecurity:
case AT_DisableRowSecurity:
case AT_ForceRowSecurity:
@@ -4856,23 +4652,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
- /* Set up recursion for phase 2; no other prep needed */
- if (recurse)
- cmd->recurse = true;
+ ATPrepDropNotNull(rel, recurse, recursing);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
pass = AT_PASS_DROP;
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* Need command-specific recursion decision */
- if (recurse)
- cmd->recurse = true;
- pass = AT_PASS_COL_ATTRS;
- break;
- case AT_SetAttNotNull: /* set pg_attribute.attnotnull without adding
- * a constraint */
- ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
- /* Need command-specific recursion decision */
- ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
+ ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing,
+ lockmode, context);
pass = AT_PASS_COL_ATTRS;
break;
case AT_CheckNotNull: /* check column is already marked NOT NULL */
@@ -5257,14 +5045,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode);
break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
- address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode);
+ address = ATExecDropNotNull(rel, cmd->name, lockmode);
break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
- address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name,
- cmd->recurse, false, NULL, lockmode);
- break;
- case AT_SetAttNotNull: /* set pg_attribute.attnotnull */
- ATExecSetAttNotNull(wqueue, rel, cmd->name, lockmode);
+ address = ATExecSetNotNull(tab, rel, cmd->name, lockmode);
break;
case AT_CheckNotNull: /* check column is already marked NOT NULL */
ATExecCheckNotNull(tab, rel, cmd->name, lockmode);
@@ -5603,8 +5387,11 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/
switch (cmd2->subtype)
{
- case AT_SetAttNotNull:
- ATSimpleRecursion(wqueue, rel, cmd2, recurse, lockmode, context);
+ case AT_SetNotNull:
+ /* Need command-specific recursion decision */
+ ATPrepSetNotNull(wqueue, rel, cmd2,
+ recurse, false,
+ lockmode, context);
pass = AT_PASS_COL_ATTRS;
break;
case AT_AddIndex:
@@ -6280,7 +6067,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
RelationGetRelationName(oldrel)),
errtableconstraint(oldrel, con->name)));
break;
- case CONSTR_NOTNULL:
case CONSTR_FOREIGN:
/* Nothing to do here */
break;
@@ -6389,8 +6175,6 @@ alter_table_type_to_string(AlterTableType cmdtype)
return "ALTER COLUMN ... DROP NOT NULL";
case AT_SetNotNull:
return "ALTER COLUMN ... SET NOT NULL";
- case AT_SetAttNotNull:
- return NULL; /* not real grammar */
case AT_DropExpression:
return "ALTER COLUMN ... DROP EXPRESSION";
case AT_CheckNotNull:
@@ -6990,7 +6774,8 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
*/
static ObjectAddress
ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
- AlterTableCmd **cmd, bool recurse, bool recursing,
+ AlterTableCmd **cmd,
+ bool recurse, bool recursing,
LOCKMODE lockmode, int cur_pass,
AlterTableUtilityContext *context)
{
@@ -7505,19 +7290,41 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid)
/*
* ALTER TABLE ALTER COLUMN DROP NOT NULL
- *
+ */
+
+static void
+ATPrepDropNotNull(Relation rel, bool recurse, bool recursing)
+{
+ /*
+ * If the parent is a partitioned table, like check constraints, we do not
+ * support removing the NOT NULL while partitions exist.
+ */
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc partdesc = RelationGetPartitionDesc(rel, true);
+
+ Assert(partdesc != NULL);
+ if (partdesc->nparts > 0 && !recurse && !recursing)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
+ errhint("Do not specify the ONLY keyword.")));
+ }
+}
+
+/*
* Return the address of the modified column. If the column was already
* nullable, InvalidObjectAddress is returned.
*/
static ObjectAddress
-ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
- LOCKMODE lockmode)
+ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
{
HeapTuple tuple;
- HeapTuple conTup;
Form_pg_attribute attTup;
AttrNumber attnum;
Relation attr_rel;
+ List *indexoidlist;
+ ListCell *indexoidscan;
ObjectAddress address;
/*
@@ -7533,15 +7340,6 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
colName, RelationGetRelationName(rel))));
attTup = (Form_pg_attribute) GETSTRUCT(tuple);
attnum = attTup->attnum;
- ObjectAddressSubSet(address, RelationRelationId,
- RelationGetRelid(rel), attnum);
-
- /* If the column is already nullable there's nothing to do. */
- if (!attTup->attnotnull)
- {
- table_close(attr_rel, RowExclusiveLock);
- return InvalidObjectAddress;
- }
/* Prevent them from altering a system attribute */
if (attnum <= 0)
@@ -7557,43 +7355,68 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
colName, RelationGetRelationName(rel))));
/*
- * It's not OK to remove a constraint only for the parent and leave it in
- * the children, so disallow that.
+ * Check that the attribute is not in a primary key or in an index used as
+ * a replica identity.
+ *
+ * Note: we'll throw error even if the pkey index is not valid.
*/
- if (!recurse)
+
+ /* Loop over all indexes on the relation */
+ indexoidlist = RelationGetIndexList(rel);
+
+ foreach(indexoidscan, indexoidlist)
{
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- {
- PartitionDesc partdesc;
+ Oid indexoid = lfirst_oid(indexoidscan);
+ HeapTuple indexTuple;
+ Form_pg_index indexStruct;
+ int i;
- partdesc = RelationGetPartitionDesc(rel, true);
+ indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+ if (!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "cache lookup failed for index %u", indexoid);
+ indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
- if (partdesc->nparts > 0)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
- errhint("Do not specify the ONLY keyword."));
- }
- else if (rel->rd_rel->relhassubclass &&
- find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
+ /*
+ * If the index is not a primary key or an index used as replica
+ * identity, skip the check.
+ */
+ if (indexStruct->indisprimary || indexStruct->indisreplident)
{
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("NOT NULL constraint on column \"%s\" must be removed in child tables too",
- colName),
- errhint("Do not specify the ONLY keyword."));
+ /*
+ * Loop over each attribute in the primary key or the index used
+ * as replica identity and see if it matches the to-be-altered
+ * attribute.
+ */
+ for (i = 0; i < indexStruct->indnkeyatts; i++)
+ {
+ if (indexStruct->indkey.values[i] == attnum)
+ {
+ if (indexStruct->indisprimary)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" is in a primary key",
+ colName)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" is in index used as replica identity",
+ colName)));
+ }
+ }
}
+
+ ReleaseSysCache(indexTuple);
}
- /*
- * If rel is partition, shouldn't drop NOT NULL if parent has the same.
- */
+ list_free(indexoidlist);
+
+ /* If rel is partition, shouldn't drop NOT NULL if parent has the same */
if (rel->rd_rel->relispartition)
{
- Oid parentId = get_partition_parent(RelationGetRelid(rel), false);
- Relation parent = table_open(parentId, AccessShareLock);
- TupleDesc tupDesc = RelationGetDescr(parent);
- AttrNumber parent_attnum;
+ Oid parentId = get_partition_parent(RelationGetRelid(rel), false);
+ Relation parent = table_open(parentId, AccessShareLock);
+ TupleDesc tupDesc = RelationGetDescr(parent);
+ AttrNumber parent_attnum;
parent_attnum = get_attnum(parentId, colName);
if (TupleDescAttr(tupDesc, parent_attnum - 1)->attnotnull)
@@ -7605,33 +7428,22 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
}
/*
- * Find the constraint that makes this column NOT NULL.
+ * Okay, actually perform the catalog change ... if needed
*/
- conTup = findNotNullConstraint(rel, colName);
- if (conTup == NULL)
+ if (attTup->attnotnull)
{
- Bitmapset *pkcols;
+ attTup->attnotnull = false;
- /*
- * There's no NOT NULL constraint, so throw an error. If the column
- * is in a primary key, we can throw a specific error. Otherwise,
- * this is unexpected.
- */
- pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY);
- if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
- pkcols))
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("column \"%s\" is in a primary key", colName));
+ CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
- /* this shouldn't happen */
- elog(ERROR, "no NOT NULL constraint found to drop");
+ ObjectAddressSubSet(address, RelationRelationId,
+ RelationGetRelid(rel), attnum);
}
+ else
+ address = InvalidObjectAddress;
- dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false,
- false, NULL, lockmode);
-
- heap_freetuple(conTup);
+ InvokeObjectPostAlterHook(RelationRelationId,
+ RelationGetRelid(rel), attnum);
table_close(attr_rel, RowExclusiveLock);
@@ -7639,126 +7451,102 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse,
}
/*
- * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3
- * to verify it; recurses to apply the same to children.
- *
- * When called to alter an existing table, 'wqueue' must be given so that we can
- * queue a check that existing tuples pass the constraint. When called from
- * table creation, 'wqueue' should be passed as NULL.
+ * ALTER TABLE ALTER COLUMN SET NOT NULL
*/
+
static void
-set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, bool recurse,
- LOCKMODE lockmode)
+ATPrepSetNotNull(List **wqueue, Relation rel,
+ AlterTableCmd *cmd, bool recurse, bool recursing,
+ LOCKMODE lockmode, AlterTableUtilityContext *context)
{
- HeapTuple tuple;
- Form_pg_attribute attForm;
- List *children;
- ListCell *lc;
+ /*
+ * If we're already recursing, there's nothing to do; the topmost
+ * invocation of ATSimpleRecursion already visited all children.
+ */
+ if (recursing)
+ return;
- tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
- if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for attribute %d of relation %u",
- attnum, RelationGetRelid(rel));
- attForm = (Form_pg_attribute) GETSTRUCT(tuple);
- if (!attForm->attnotnull)
+ /*
+ * If the target column is already marked NOT NULL, we can skip recursing
+ * to children, because their columns should already be marked NOT NULL as
+ * well. But there's no point in checking here unless the relation has
+ * some children; else we can just wait till execution to check. (If it
+ * does have children, however, this can save taking per-child locks
+ * unnecessarily. This greatly improves concurrency in some parallel
+ * restore scenarios.)
+ *
+ * Unfortunately, we can only apply this optimization to partitioned
+ * tables, because traditional inheritance doesn't enforce that child
+ * columns be NOT NULL when their parent is. (That's a bug that should
+ * get fixed someday.)
+ */
+ if (rel->rd_rel->relhassubclass &&
+ rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
- Relation attr_rel;
-
- attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
-
- attForm->attnotnull = true;
- CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
+ HeapTuple tuple;
+ bool attnotnull;
- table_close(attr_rel, RowExclusiveLock);
+ tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name);
- /*
- * And set up for existing values to be checked, unless another constraint
- * already proves this.
- */
- if (wqueue && !NotNullImpliedByRelConstraints(rel, attForm))
- {
- AlteredTableInfo *tab;
+ /* Might as well throw the error now, if name is bad */
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ cmd->name, RelationGetRelationName(rel))));
- tab = ATGetQueueEntry(wqueue, rel);
- tab->verify_new_notnull = true;
- }
+ attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull;
+ ReleaseSysCache(tuple);
+ if (attnotnull)
+ return;
}
- /* if no recursion is desired, we're done */
- if (!recurse)
- return;
-
- children = find_inheritance_children(RelationGetRelid(rel), lockmode);
- foreach(lc, children)
+ /*
+ * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table,
+ * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use
+ * normal recursion logic.
+ */
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+ !recurse)
{
- Oid childrelid = lfirst_oid(lc);
- Relation childrel;
- AttrNumber childattno;
-
- /* find_inheritance_children already got lock */
- childrel = table_open(childrelid, NoLock);
- CheckTableNotInUse(childrel, "ALTER TABLE");
+ AlterTableCmd *newcmd = makeNode(AlterTableCmd);
- childattno = get_attnum(RelationGetRelid(childrel),
- get_attname(RelationGetRelid(rel), attnum,
- false));
- set_attnotnull(wqueue, childrel, childattno,
- recurse, lockmode);
- table_close(childrel, NoLock);
+ newcmd->subtype = AT_CheckNotNull;
+ newcmd->name = pstrdup(cmd->name);
+ ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context);
}
+ else
+ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context);
}
/*
- * ALTER TABLE ALTER COLUMN SET NOT NULL
- *
- * Add a NOT NULL constraint to a single table and its children. Returns
- * the address of the constraint added to the parent relation, if one gets
- * added, or InvalidObjectAddress otherwise.
- *
- * We must recurse to child tables during execution, rather than using
- * ALTER TABLE's normal prep-time recursion.
+ * Return the address of the modified column. If the column was already NOT
+ * NULL, InvalidObjectAddress is returned.
*/
static ObjectAddress
-ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
- bool recurse, bool recursing, List **readyRels,
- LOCKMODE lockmode)
+ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
+ const char *colName, LOCKMODE lockmode)
{
HeapTuple tuple;
- Relation constr_rel;
- ScanKeyData skey;
- SysScanDesc conscan;
AttrNumber attnum;
+ Relation attr_rel;
ObjectAddress address;
- Constraint *constraint;
- CookedConstraint *ccon;
- List *cooked;
- List *ready = NIL;
/*
- * In cases of multiple inheritance, we might visit the same child more
- * than once. In the topmost call, set up a list that we fill with all
- * visited relations, to skip those.
+ * lookup the attribute
*/
- if (readyRels == NULL)
- readyRels = &ready;
- if (list_member_oid(*readyRels, RelationGetRelid(rel)))
- return InvalidObjectAddress;
- *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel));
+ attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
- /* At top level, permission check was done in ATPrepCmd, else do it */
- if (recursing)
- {
- ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE);
- Assert(conName != NULL);
- }
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
- attnum = get_attnum(RelationGetRelid(rel), colName);
- if (attnum == InvalidAttrNumber)
+ if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
colName, RelationGetRelationName(rel))));
+ attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
+
/* Prevent them from altering a system attribute */
if (attnum <= 0)
ereport(ERROR,
@@ -7766,168 +7554,43 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName,
errmsg("cannot alter system column \"%s\"",
colName)));
- /* See if there's already a constraint */
- constr_rel = table_open(ConstraintRelationId, RowExclusiveLock);
- ScanKeyInit(&skey,
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(rel)));
- conscan = systable_beginscan(constr_rel, ConstraintRelidTypidNameIndexId, true,
- NULL, 1, &skey);
-
- while (HeapTupleIsValid(tuple = systable_getnext(conscan)))
+ /*
+ * Okay, actually perform the catalog change ... if needed
+ */
+ if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
{
- Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple);
- bool changed = false;
- HeapTuple copytup;
+ ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = true;
- if (conForm->contype != CONSTRAINT_NOTNULL)
- continue;
-
- if (extractNotNullColumn(tuple) != attnum)
- continue;
-
- copytup = heap_copytuple(tuple);
- conForm = (Form_pg_constraint) GETSTRUCT(copytup);
+ CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
/*
- * If we find an appropriate constraint, we're almost done, but just
- * need to change some properties on it: if we're recursing, increment
- * coninhcount; if not, set conislocal if not already set.
+ * Ordinarily phase 3 must ensure that no NULLs exist in columns that
+ * are set NOT NULL; however, if we can find a constraint which proves
+ * this then we can skip that. We needn't bother looking if we've
+ * already found that we must verify some other NOT NULL constraint.
*/
- if (recursing)
- {
- conForm->coninhcount++;
- changed = true;
- }
- else if (!conForm->conislocal)
+ if (!tab->verify_new_notnull &&
+ !NotNullImpliedByRelConstraints(rel, (Form_pg_attribute) GETSTRUCT(tuple)))
{
- conForm->conislocal = true;
- changed = true;
- }
-
- if (changed)
- {
- CatalogTupleUpdate(constr_rel, &copytup->t_self, copytup);
- ObjectAddressSet(address, ConstraintRelationId, conForm->oid);
+ /* Tell Phase 3 it needs to test the constraint */
+ tab->verify_new_notnull = true;
}
- systable_endscan(conscan);
- table_close(constr_rel, RowExclusiveLock);
-
- if (changed)
- return address;
- else
- return InvalidObjectAddress;
- }
-
- systable_endscan(conscan);
- table_close(constr_rel, RowExclusiveLock);
-
- /*
- * If we're asked not to recurse, and children exist, raise an error.
- */
- if (!recurse &&
- find_inheritance_children(RelationGetRelid(rel),
- NoLock) != NIL)
- {
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot add constraint to only the partitioned table when partitions exist"),
- errhint("Do not specify the ONLY keyword."));
- else
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot add constraint only to table with inheritance children"),
- errhint("Do not specify the ONLY keyword."));
- }
-
- /*
- * No constraint exists; we must add one. First determine a name to use,
- * if we haven't already.
- */
- if (!recursing)
- {
- Assert(conName == NULL);
- conName = ChooseConstraintName(RelationGetRelationName(rel),
- colName, "not_null",
- RelationGetNamespace(rel),
- NIL);
+ ObjectAddressSubSet(address, RelationRelationId,
+ RelationGetRelid(rel), attnum);
}
- constraint = makeNode(Constraint);
- constraint->contype = CONSTR_NOTNULL;
- constraint->conname = conName;
- constraint->deferrable = false;
- constraint->initdeferred = false;
- constraint->location = -1;
- constraint->colname = colName;
- constraint->skip_validation = false;
- constraint->initially_valid = true;
-
- /* and do it */
- cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint),
- false, !recursing, false, NULL);
- ccon = linitial(cooked);
- ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
-
- /*
- * Mark pg_attribute.attnotnull for the column. Tell that function not to
- * recurse, because we're going to do it here.
- */
- set_attnotnull(wqueue, rel, attnum, false, lockmode);
-
- /*
- * Recurse to propagate the constraint to children that don't have one.
- */
- if (recurse)
- {
- List *children;
- ListCell *lc;
-
- children = find_inheritance_children(RelationGetRelid(rel),
- lockmode);
-
- foreach(lc, children)
- {
- Relation childrel;
-
- childrel = table_open(lfirst_oid(lc), NoLock);
+ else
+ address = InvalidObjectAddress;
- ATExecSetNotNull(wqueue, childrel,
- conName, colName, recurse, true,
- readyRels, lockmode);
+ InvokeObjectPostAlterHook(RelationRelationId,
+ RelationGetRelid(rel), attnum);
- table_close(childrel, NoLock);
- }
- }
+ table_close(attr_rel, RowExclusiveLock);
return address;
}
/*
- * ALTER TABLE ALTER COLUMN SET ATTNOTNULL
- *
- * This doesn't exist in the grammar; it's used when creating a
- * primary key and the column is not already marked attnotnull.
- */
-static void
-ATExecSetAttNotNull(List **wqueue, Relation rel,
- const char *colName, LOCKMODE lockmode)
-{
- AttrNumber attnum;
-
- attnum = get_attnum(RelationGetRelid(rel), colName);
- if (attnum == InvalidAttrNumber) /* XXX should not happen .. elog? */
- ereport(ERROR,
- errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- colName, RelationGetRelationName(rel)));
-
- set_attnotnull(wqueue, rel, attnum, false, lockmode);
-}
-
-/*
* ALTER TABLE ALTER COLUMN CHECK NOT NULL
*
* This doesn't exist in the grammar, but we generate AT_CheckNotNull
@@ -9209,14 +8872,13 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Assert(IsA(newConstraint, Constraint));
/*
- * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and
- * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in
- * parse_utilcmd.c).
+ * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes
+ * arriving here (see the preprocessing done in parse_utilcmd.c). Use a
+ * switch anyway to make it easier to add more code later.
*/
switch (newConstraint->contype)
{
case CONSTR_CHECK:
- case CONSTR_NOTNULL:
address =
ATAddCheckConstraint(wqueue, tab, rel,
newConstraint, recurse, false, is_readd,
@@ -9301,9 +8963,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames)
}
/*
- * Add a check or NOT NULL constraint to a single table and its children.
- * Returns the address of the constraint added to the parent relation,
- * if one gets added, or InvalidObjectAddress otherwise.
+ * Add a check constraint to a single table and its children. Returns the
+ * address of the constraint added to the parent relation, if one gets added,
+ * or InvalidObjectAddress otherwise.
*
* Subroutine for ATExecAddConstraint.
*
@@ -9356,7 +9018,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
{
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL)
+ if (!ccon->skip_validation)
{
NewConstraint *newcon;
@@ -9372,14 +9034,6 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (constr->conname == NULL)
constr->conname = ccon->name;
- /*
- * If adding a NOT NULL constraint, set the pg_attribute flag and
- * tell phase 3 to verify existing rows, if needed.
- */
- if (constr->contype == CONSTR_NOTNULL)
- set_attnotnull(wqueue, rel, ccon->attnum,
- !ccon->is_no_inherit, lockmode);
-
ObjectAddressSet(address, ConstraintRelationId, ccon->conoid);
}
@@ -12304,11 +11958,16 @@ ATExecDropConstraint(Relation rel, const char *constrName,
bool recurse, bool recursing,
bool missing_ok, LOCKMODE lockmode)
{
+ List *children;
+ ListCell *child;
Relation conrel;
+ Form_pg_constraint con;
SysScanDesc scan;
ScanKeyData skey[3];
HeapTuple tuple;
bool found = false;
+ bool is_no_inherit_constraint = false;
+ char contype;
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
@@ -12337,258 +11996,80 @@ ATExecDropConstraint(Relation rel, const char *constrName,
/* There can be at most one matching row */
if (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
- dropconstraint_internal(rel, tuple, behavior, recurse, recursing,
- missing_ok, NULL, lockmode);
- found = true;
- }
+ ObjectAddress conobj;
- systable_endscan(scan);
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (!found)
- {
- if (!missing_ok)
+ /* Don't drop inherited constraints */
+ if (con->coninhcount > 0 && !recursing)
ereport(ERROR,
- errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" of relation \"%s\" does not exist",
- constrName, RelationGetRelationName(rel)));
- else
- ereport(NOTICE,
- errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping",
- constrName, RelationGetRelationName(rel)));
- }
-
- table_close(conrel, RowExclusiveLock);
-}
-
-/*
- * Remove a constraint, using its pg_constraint tuple
- *
- * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN
- * DROP NOT NULL.
- *
- * Returns the address of the constraint being removed.
- */
-static ObjectAddress
-dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior,
- bool recurse, bool recursing, bool missing_ok, List **readyRels,
- LOCKMODE lockmode)
-{
- Relation conrel;
- Form_pg_constraint con;
- ObjectAddress conobj;
- List *children;
- ListCell *child;
- bool is_no_inherit_constraint = false;
- bool dropping_pk = false;
- char *constrName;
- List *unconstrained_cols = NIL;
- char *colname; /* to match NOT NULL constraints when recursing */
- List *ready = NIL;
-
- if (readyRels == NULL)
- readyRels = &ready;
- if (list_member_oid(*readyRels, RelationGetRelid(rel)))
- return InvalidObjectAddress;
- *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel));
-
- conrel = table_open(ConstraintRelationId, RowExclusiveLock);
-
- con = (Form_pg_constraint) GETSTRUCT(constraintTup);
- constrName = NameStr(con->conname);
-
- /*
- * If the constraint is marked conislocal and is also inherited, then we
- * just set conislocal false and we're done. The constraint doesn't go
- * away, and we don't modify any children.
- */
- if (con->conislocal && con->coninhcount > 0)
- {
- HeapTuple copytup;
-
- /* make a copy we can scribble on */
- copytup = heap_copytuple(constraintTup);
- con = (Form_pg_constraint) GETSTRUCT(copytup);
- con->conislocal = false;
- CatalogTupleUpdate(conrel, &copytup->t_self, copytup);
-
- table_close(conrel, RowExclusiveLock);
-
- ObjectAddressSet(conobj, ConstraintRelationId, con->oid);
- return conobj;
- }
-
- /* Don't drop inherited constraints */
- if (con->coninhcount > 0 && !recursing)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
- constrName, RelationGetRelationName(rel))));
-
- /*
- * See if we have a NOT NULL constraint or a PRIMARY KEY. If so, we have
- * more checks and actions below, so obtain the list of columns that are
- * constrained by the constraint being dropped.
- */
- if (con->contype == CONSTRAINT_NOTNULL)
- {
- AttrNumber colnum = extractNotNullColumn(constraintTup);
-
- if (colnum != InvalidAttrNumber)
- unconstrained_cols = list_make1_int(colnum);
- }
- else if (con->contype == CONSTRAINT_PRIMARY)
- {
- Datum adatum;
- ArrayType *arr;
- int numkeys;
- bool isNull;
- int16 *attnums;
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"",
+ constrName, RelationGetRelationName(rel))));
- dropping_pk = true;
+ is_no_inherit_constraint = con->connoinherit;
+ contype = con->contype;
- adatum = heap_getattr(constraintTup, Anum_pg_constraint_conkey,
- RelationGetDescr(conrel), &isNull);
- if (isNull)
- elog(ERROR, "null conkey for constraint %u", con->oid);
- arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
- numkeys = ARR_DIMS(arr)[0];
- if (ARR_NDIM(arr) != 1 ||
- numkeys < 0 ||
- ARR_HASNULL(arr) ||
- ARR_ELEMTYPE(arr) != INT2OID)
- elog(ERROR, "conkey is not a 1-D smallint array");
- attnums = (int16 *) ARR_DATA_PTR(arr);
+ /*
+ * If it's a foreign-key constraint, we'd better lock the referenced
+ * table and check that that's not in use, just as we've already done
+ * for the constrained table (else we might, eg, be dropping a trigger
+ * that has unfired events). But we can/must skip that in the
+ * self-referential case.
+ */
+ if (contype == CONSTRAINT_FOREIGN &&
+ con->confrelid != RelationGetRelid(rel))
+ {
+ Relation frel;
- for (int i = 0; i < numkeys; i++)
- unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]);
- }
+ /* Must match lock taken by RemoveTriggerById: */
+ frel = table_open(con->confrelid, AccessExclusiveLock);
+ CheckTableNotInUse(frel, "ALTER TABLE");
+ table_close(frel, NoLock);
+ }
- is_no_inherit_constraint = con->connoinherit;
+ /*
+ * Perform the actual constraint deletion
+ */
+ conobj.classId = ConstraintRelationId;
+ conobj.objectId = con->oid;
+ conobj.objectSubId = 0;
- /*
- * If it's a foreign-key constraint, we'd better lock the referenced table
- * and check that that's not in use, just as we've already done for the
- * constrained table (else we might, eg, be dropping a trigger that has
- * unfired events). But we can/must skip that in the self-referential
- * case.
- */
- if (con->contype == CONSTRAINT_FOREIGN &&
- con->confrelid != RelationGetRelid(rel))
- {
- Relation frel;
+ performDeletion(&conobj, behavior, 0);
- /* Must match lock taken by RemoveTriggerById: */
- frel = table_open(con->confrelid, AccessExclusiveLock);
- CheckTableNotInUse(frel, "ALTER TABLE");
- table_close(frel, NoLock);
+ found = true;
}
- /*
- * Perform the actual constraint deletion
- */
- ObjectAddressSet(conobj, ConstraintRelationId, con->oid);
- performDeletion(&conobj, behavior, 0);
+ systable_endscan(scan);
- /*
- * If this was a NOT NULL or the primary key, the constrained columns must
- * have had pg_attribute.attnotnull set. See if we need to reset it, and
- * do so.
- */
- if (unconstrained_cols)
+ if (!found)
{
- Relation attrel;
- Bitmapset *pkcols;
- Bitmapset *ircols;
- ListCell *lc;
-
- /* Make the above deletion visible */
- CommandCounterIncrement();
-
- attrel = table_open(AttributeRelationId, RowExclusiveLock);
-
- /*
- * We want to test columns for their presence in the primary key, but
- * only if we're not dropping it.
- */
- pkcols = dropping_pk ? NULL :
- RelationGetIndexAttrBitmap(rel,
- INDEX_ATTR_BITMAP_PRIMARY_KEY);
- ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
-
- foreach(lc, unconstrained_cols)
+ if (!missing_ok)
{
- AttrNumber attnum = lfirst_int(lc);
- HeapTuple atttup;
- HeapTuple contup;
- Form_pg_attribute attForm;
-
- /*
- * Obtain pg_attribute tuple and verify conditions on it. We use
- * a copy we can scribble on.
- */
- atttup = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum);
- if (!HeapTupleIsValid(atttup))
- elog(ERROR, "cache lookup failed for column %d", attnum);
- attForm = (Form_pg_attribute) GETSTRUCT(atttup);
-
- /*
- * Since the above deletion has been made visible, we can now
- * search for any remaining constraints on this column (or these
- * columns, in the case we're dropping a multicol primary key.)
- * Then, verify whether any further NOT NULL or primary key
- * exists, and reset attnotnull if none.
- *
- * However, if this is a generated identity column, abort the
- * whole thing with a specific error message, because the
- * constraint is required in that case.
- */
- contup = findNotNullConstraintAttnum(rel, attnum);
- if (contup ||
- bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
- pkcols))
- continue;
-
- /*
- * It's not valid to drop the last NOT NULL constraint for a
- * GENERATED AS IDENTITY column.
- */
- if (attForm->attidentity)
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("column \"%s\" of relation \"%s\" is an identity column",
- get_attname(RelationGetRelid(rel), attnum,
- false),
- RelationGetRelationName(rel)));
-
- /*
- * It's not valid to drop the last NOT NULL constraint for the
- * replica identity either. XXX make exception for FULL?
- */
- if (bms_is_member(lfirst_int(lc) - FirstLowInvalidHeapAttributeNumber, ircols))
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("column \"%s\" is in index used as replica identity",
- get_attname(RelationGetRelid(rel), lfirst_int(lc), false)));
-
- /* Reset attnotnull */
- if (attForm->attnotnull)
- {
- attForm->attnotnull = false;
- CatalogTupleUpdate(attrel, &atttup->t_self, atttup);
- }
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel))));
+ }
+ else
+ {
+ ereport(NOTICE,
+ (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping",
+ constrName, RelationGetRelationName(rel))));
+ table_close(conrel, RowExclusiveLock);
+ return;
}
- table_close(attrel, RowExclusiveLock);
}
/*
* For partitioned tables, non-CHECK inherited constraints are dropped via
* the dependency mechanism, so we're done here.
*/
- if (con->contype != CONSTRAINT_CHECK &&
+ if (contype != CONSTRAINT_CHECK &&
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
table_close(conrel, RowExclusiveLock);
- return conobj;
+ return;
}
/*
@@ -12613,104 +12094,50 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
errmsg("cannot remove constraint from only the partitioned table when partitions exist"),
errhint("Do not specify the ONLY keyword.")));
- /* For NOT NULL constraints we recurse by column name */
- if (con->contype == CONSTRAINT_NOTNULL)
- colname = NameStr(TupleDescAttr(RelationGetDescr(rel),
- linitial_int(unconstrained_cols) - 1)->attname);
- else
- colname = NULL; /* keep compiler quiet */
-
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
Relation childrel;
- HeapTuple tuple;
- Form_pg_constraint childcon;
HeapTuple copy_tuple;
- SysScanDesc scan;
- ScanKeyData skey[3];
-
- if (list_member_oid(*readyRels, childrelid))
- continue; /* child already processed */
/* find_inheritance_children already got lock */
childrel = table_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
- /*
- * We search for NOT NULL constraint by column number, and other
- * constraints by name.
- */
- if (con->contype == CONSTRAINT_NOTNULL)
- {
- bool found = false;
- AttrNumber child_colnum;
- HeapTuple child_tup;
-
- child_colnum = get_attnum(RelationGetRelid(childrel), colname);
- ScanKeyInit(&skey[0],
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(childrelid));
- scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
- true, NULL, 1, skey);
- while (HeapTupleIsValid(child_tup = systable_getnext(scan)))
- {
- Form_pg_constraint constr = (Form_pg_constraint) GETSTRUCT(child_tup);
- AttrNumber constr_colnum;
-
- if (constr->contype != CONSTRAINT_NOTNULL)
- continue;
- constr_colnum = extractNotNullColumn(child_tup);
- if (constr_colnum != child_colnum)
- continue;
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(childrelid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(InvalidOid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(constrName));
+ scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
+ true, NULL, 3, skey);
+
+ /* There can be at most one matching row */
+ if (!HeapTupleIsValid(tuple = systable_getnext(scan)))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName,
+ RelationGetRelationName(childrel))));
- found = true;
- break; /* found it */
- }
- if (!found) /* shouldn't happen? */
- elog(ERROR, "failed to find NOT NULL constraint for column \"%s\" in table \"%s\"",
- colname, RelationGetRelationName(childrel));
+ copy_tuple = heap_copytuple(tuple);
- copy_tuple = heap_copytuple(child_tup);
- systable_endscan(scan);
- }
- else
- {
- ScanKeyInit(&skey[0],
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(childrelid));
- ScanKeyInit(&skey[1],
- Anum_pg_constraint_contypid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(InvalidOid));
- ScanKeyInit(&skey[2],
- Anum_pg_constraint_conname,
- BTEqualStrategyNumber, F_NAMEEQ,
- CStringGetDatum(constrName));
- scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId,
- true, NULL, 3, skey);
- /* There can only be one, so no need to loop */
- tuple = systable_getnext(scan);
- if (!HeapTupleIsValid(tuple))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" of relation \"%s\" does not exist",
- constrName,
- RelationGetRelationName(childrel))));
- copy_tuple = heap_copytuple(tuple);
- systable_endscan(scan);
- }
+ systable_endscan(scan);
- childcon = (Form_pg_constraint) GETSTRUCT(copy_tuple);
+ con = (Form_pg_constraint) GETSTRUCT(copy_tuple);
- /* Right now only CHECK and NOT NULL constraints can be inherited */
- if (childcon->contype != CONSTRAINT_CHECK &&
- childcon->contype != CONSTRAINT_NOTNULL)
- elog(ERROR, "inherited constraint is not a CHECK or NOT NULL constraint");
+ /* Right now only CHECK constraints can be inherited */
+ if (con->contype != CONSTRAINT_CHECK)
+ elog(ERROR, "inherited constraint is not a CHECK constraint");
- if (childcon->coninhcount <= 0) /* shouldn't happen */
+ if (con->coninhcount <= 0) /* shouldn't happen */
elog(ERROR, "relation %u has non-inherited constraint \"%s\"",
childrelid, constrName);
@@ -12720,17 +12147,17 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
* If the child constraint has other definition sources, just
* decrement its inheritance count; if not, recurse to delete it.
*/
- if (childcon->coninhcount == 1 && !childcon->conislocal)
+ if (con->coninhcount == 1 && !con->conislocal)
{
/* Time to delete this child constraint, too */
- dropconstraint_internal(childrel, copy_tuple, behavior,
- recurse, true, missing_ok, readyRels,
- lockmode);
+ ATExecDropConstraint(childrel, constrName, behavior,
+ true, true,
+ false, lockmode);
}
else
{
/* Child constraint must survive my deletion */
- childcon->coninhcount--;
+ con->coninhcount--;
CatalogTupleUpdate(conrel, &copy_tuple->t_self, copy_tuple);
/* Make update visible */
@@ -12744,8 +12171,8 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
* need to mark the inheritors' constraints as locally defined
* rather than inherited.
*/
- childcon->coninhcount--;
- childcon->conislocal = true;
+ con->coninhcount--;
+ con->conislocal = true;
CatalogTupleUpdate(conrel, &copy_tuple->t_self, copy_tuple);
@@ -12759,8 +12186,6 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha
}
table_close(conrel, RowExclusiveLock);
-
- return conobj;
}
/*
@@ -14086,10 +13511,10 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd,
NIL,
con->conname);
}
- else if (cmd->subtype == AT_SetAttNotNull)
+ else if (cmd->subtype == AT_SetNotNull)
{
/*
- * The parser will create AT_AttSetNotNull subcommands for
+ * The parser will create AT_SetNotNull subcommands for
* columns of PRIMARY KEY indexes/constraints, but we need
* not do anything with them here, because the columns'
* NOT NULL marks will already have been propagated into
@@ -15833,7 +15258,6 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
SysScanDesc parent_scan;
ScanKeyData parent_key;
HeapTuple parent_tuple;
- Oid parent_relid = RelationGetRelid(parent_rel);
bool child_is_partition = false;
catalog_relation = table_open(ConstraintRelationId, RowExclusiveLock);
@@ -15847,7 +15271,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
ScanKeyInit(&parent_key,
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(parent_relid));
+ ObjectIdGetDatum(RelationGetRelid(parent_rel)));
parent_scan = systable_beginscan(catalog_relation, ConstraintRelidTypidNameIndexId,
true, NULL, 1, &parent_key);
@@ -15859,8 +15283,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
HeapTuple child_tuple;
bool found = false;
- if (parent_con->contype != CONSTRAINT_CHECK &&
- parent_con->contype != CONSTRAINT_NOTNULL)
+ if (parent_con->contype != CONSTRAINT_CHECK)
continue;
/* if the parent's constraint is marked NO INHERIT, it's not inherited */
@@ -15880,30 +15303,14 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple);
HeapTuple child_copy;
- if (child_con->contype != parent_con->contype)
+ if (child_con->contype != CONSTRAINT_CHECK)
continue;
- /*
- * CHECK constraint are matched by name, NOT NULL ones by
- * attribute number
- */
- if (child_con->contype == CONSTRAINT_CHECK &&
- strcmp(NameStr(parent_con->conname),
+ if (strcmp(NameStr(parent_con->conname),
NameStr(child_con->conname)) != 0)
continue;
- else if (child_con->contype == CONSTRAINT_NOTNULL)
- {
- AttrNumber parent_attno = extractNotNullColumn(parent_tuple);
- AttrNumber child_attno = extractNotNullColumn(child_tuple);
-
- if (strcmp(get_attname(parent_relid, parent_attno, false),
- get_attname(RelationGetRelid(child_rel), child_attno,
- false)) != 0)
- continue;
- }
- if (child_con->contype == CONSTRAINT_CHECK &&
- !constraints_equivalent(parent_tuple, child_tuple, tuple_desc))
+ if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different definition for check constraint \"%s\"",
@@ -16111,7 +15518,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
HeapTuple attributeTuple,
constraintTuple;
List *connames;
- List *nncolumns;
bool found;
bool child_is_partition = false;
@@ -16182,8 +15588,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
* this, we first need a list of the names of the parent's check
* constraints. (We cheat a bit by only checking for name matches,
* assuming that the expressions will match.)
- *
- * For NOT NULL columns, we store column numbers to match.
*/
catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
@@ -16194,7 +15598,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
true, NULL, 1, key);
connames = NIL;
- nncolumns = NIL;
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
@@ -16202,8 +15605,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
if (con->contype == CONSTRAINT_CHECK)
connames = lappend(connames, pstrdup(NameStr(con->conname)));
- if (con->contype == CONSTRAINT_NOTNULL)
- nncolumns = lappend_int(nncolumns, extractNotNullColumn(constraintTuple));
}
systable_endscan(scan);
@@ -16219,40 +15620,21 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
- bool match = false;
+ bool match;
ListCell *lc;
- /*
- * Match CHECK constraints by name, NOT NULL constraints by column
- * number, and ignore all others.
- */
- if (con->contype == CONSTRAINT_CHECK)
- {
- foreach(lc, connames)
- {
- if (con->contype == CONSTRAINT_CHECK &&
- strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0)
- {
- match = true;
- break;
- }
- }
- }
- else if (con->contype == CONSTRAINT_NOTNULL)
- {
- AttrNumber child_attno = extractNotNullColumn(constraintTuple);
+ if (con->contype != CONSTRAINT_CHECK)
+ continue;
- foreach(lc, nncolumns)
+ match = false;
+ foreach(lc, connames)
+ {
+ if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0)
{
- if (lfirst_int(lc) == child_attno)
- {
- match = true;
- break;
- }
+ match = true;
+ break;
}
}
- else
- continue;
if (match)
{
@@ -18543,7 +17925,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
StorePartitionBound(attachrel, rel, cmd->bound);
/* Ensure there exists a correct set of indexes in the partition. */
- AttachPartitionEnsureIndexes(wqueue, rel, attachrel);
+ AttachPartitionEnsureIndexes(rel, attachrel);
/* and triggers */
CloneRowTriggersToPartition(rel, attachrel);
@@ -18656,12 +18038,13 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
* partitioned table.
*/
static void
-AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
+AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
{
List *idxes;
List *attachRelIdxs;
Relation *attachrelIdxRels;
IndexInfo **attachInfos;
+ int i;
ListCell *cell;
MemoryContext cxt;
MemoryContext oldcxt;
@@ -18677,13 +18060,14 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs));
/* Build arrays of all existing indexes and their IndexInfos */
+ i = 0;
foreach(cell, attachRelIdxs)
{
Oid cldIdxId = lfirst_oid(cell);
- int i = foreach_current_index(cell);
attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock);
attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]);
+ i++;
}
/*
@@ -18749,7 +18133,7 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
* the first matching, unattached one we find, if any, as partition of
* the parent index. If we find one, we're done.
*/
- for (int i = 0; i < list_length(attachRelIdxs); i++)
+ for (i = 0; i < list_length(attachRelIdxs); i++)
{
Oid cldIdxId = RelationGetRelid(attachrelIdxRels[i]);
Oid cldConstrOid = InvalidOid;
@@ -18805,28 +18189,6 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
stmt = generateClonedIndexStmt(NULL,
idxRel, attmap,
&conOid);
-
- /*
- * If the index is a primary key, mark all columns as NOT NULL if
- * they aren't already.
- */
- if (stmt->primary)
- {
- MemoryContextSwitchTo(oldcxt);
- for (int j = 0; j < info->ii_NumIndexKeyAttrs; j++)
- {
- AttrNumber childattno;
-
- childattno = get_attnum(RelationGetRelid(attachrel),
- get_attname(RelationGetRelid(rel),
- info->ii_IndexAttrNumbers[j],
- false));
- set_attnotnull(wqueue, attachrel, childattno,
- true, AccessExclusiveLock);
- }
- MemoryContextSwitchTo(cxt);
- }
-
DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid,
RelationGetRelid(idxRel),
conOid,
@@ -18839,7 +18201,7 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel)
out:
/* Clean up. */
- for (int i = 0; i < list_length(attachRelIdxs); i++)
+ for (i = 0; i < list_length(attachRelIdxs); i++)
index_close(attachrelIdxRels[i], AccessShareLock);
MemoryContextSwitchTo(oldcxt);
MemoryContextDelete(cxt);
@@ -19740,13 +19102,6 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
RelationGetRelationName(partIdx))));
}
- /*
- * If it's a primary key, make sure the columns in the partition are
- * NOT NULL.
- */
- if (parentIdx->rd_index->indisprimary)
- verifyPartitionIndexNotNull(childInfo, partTbl);
-
/* All good -- do it */
IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx));
if (OidIsValid(constraintOid))
@@ -19884,29 +19239,6 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
}
/*
- * When attaching an index as a partition of a partitioned index which is a
- * primary key, verify that all the columns in the partition are marked NOT
- * NULL.
- */
-static void
-verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition)
-{
- for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++)
- {
- Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition),
- iinfo->ii_IndexAttrNumbers[i] - 1);
-
- if (!att->attnotnull)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("invalid primary key definition"),
- errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.",
- NameStr(att->attname),
- RelationGetRelationName(partition)));
- }
-}
-
-/*
* Return an OID list of constraints that reference the given relation
* that are marked as having a parent constraints.
*/