aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/tablecmds.c150
1 files changed, 104 insertions, 46 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index d8f0a99ad93..57662fd7662 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -574,8 +574,9 @@ static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
Oid indexOid,
Oid parentDelTrigger, Oid parentUpdTrigger,
Oid *deleteTrigOid, Oid *updateTrigOid);
-static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
- Oid partRelid,
+static bool tryAttachPartitionForeignKey(List **wqueue,
+ ForeignKeyCacheInfo *fk,
+ Relation partition,
Oid parentConstrOid, int numfks,
AttrNumber *mapped_conkey, AttrNumber *confkey,
Oid *conpfeqop,
@@ -9772,22 +9773,12 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Validity checks (permission checks wait till we have the column
* numbers)
*/
- if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- {
- if (!recurse)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot use ONLY for foreign key on partitioned table \"%s\" referencing relation \"%s\"",
- RelationGetRelationName(rel),
- RelationGetRelationName(pkrel))));
- if (fkconstraint->skip_validation && !fkconstraint->initially_valid)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("cannot add NOT VALID foreign key on partitioned table \"%s\" referencing relation \"%s\"",
- RelationGetRelationName(rel),
- RelationGetRelationName(pkrel)),
- errdetail("This feature is not yet supported on partitioned tables.")));
- }
+ if (!recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot use ONLY for foreign key on partitioned table \"%s\" referencing relation \"%s\"",
+ RelationGetRelationName(rel),
+ RelationGetRelationName(pkrel)));
if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
@@ -10782,14 +10773,12 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
*/
for (int i = 0; i < pd->nparts; i++)
{
- Oid partitionId = pd->oids[i];
- Relation partition = table_open(partitionId, lockmode);
+ Relation partition = table_open(pd->oids[i], lockmode);
List *partFKs;
AttrMap *attmap;
AttrNumber mapped_fkattnum[INDEX_MAX_KEYS];
bool attached;
ObjectAddress address;
- ListCell *cell;
CheckAlterTableIsSafe(partition);
@@ -10802,13 +10791,11 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
/* Check whether an existing constraint can be repurposed */
partFKs = copyObject(RelationGetFKeyList(partition));
attached = false;
- foreach(cell, partFKs)
+ foreach_node(ForeignKeyCacheInfo, fk, partFKs)
{
- ForeignKeyCacheInfo *fk;
-
- fk = lfirst_node(ForeignKeyCacheInfo, cell);
- if (tryAttachPartitionForeignKey(fk,
- partitionId,
+ if (tryAttachPartitionForeignKey(wqueue,
+ fk,
+ partition,
parentConstr,
numfks,
mapped_fkattnum,
@@ -11260,8 +11247,9 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, lc);
- if (tryAttachPartitionForeignKey(fk,
- RelationGetRelid(partRel),
+ if (tryAttachPartitionForeignKey(wqueue,
+ fk,
+ partRel,
parentConstrOid,
numfks,
mapped_conkey,
@@ -11364,8 +11352,9 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
* return false.
*/
static bool
-tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
- Oid partRelid,
+tryAttachPartitionForeignKey(List **wqueue,
+ ForeignKeyCacheInfo *fk,
+ Relation partition,
Oid parentConstrOid,
int numfks,
AttrNumber *mapped_conkey,
@@ -11379,6 +11368,7 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
Form_pg_constraint parentConstr;
HeapTuple partcontup;
Form_pg_constraint partConstr;
+ bool queueValidation;
ScanKeyData key;
SysScanDesc scan;
HeapTuple trigtup;
@@ -11411,18 +11401,12 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
}
}
- /*
- * Looks good so far; do some more extensive checks. Presumably the check
- * for 'convalidated' could be dropped, since we don't really care about
- * that, but let's be careful for now.
- */
- partcontup = SearchSysCache1(CONSTROID,
- ObjectIdGetDatum(fk->conoid));
+ /* Looks good so far; perform more extensive checks. */
+ partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid));
if (!HeapTupleIsValid(partcontup))
elog(ERROR, "cache lookup failed for constraint %u", fk->conoid);
partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
if (OidIsValid(partConstr->conparentid) ||
- !partConstr->convalidated ||
partConstr->condeferrable != parentConstr->condeferrable ||
partConstr->condeferred != parentConstr->condeferred ||
partConstr->confupdtype != parentConstr->confupdtype ||
@@ -11434,6 +11418,13 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
return false;
}
+ /*
+ * Will we need to validate this constraint? A valid parent constraint
+ * implies that all child constraints have been validated, so if this one
+ * isn't, we must trigger phase 3 validation.
+ */
+ queueValidation = parentConstr->convalidated && !partConstr->convalidated;
+
ReleaseSysCache(partcontup);
ReleaseSysCache(parentConstrTup);
@@ -11481,7 +11472,8 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
systable_endscan(scan);
- ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
+ ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
+ RelationGetRelid(partition));
/*
* Like the constraint, attach partition's "check" triggers to the
@@ -11492,10 +11484,10 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
&insertTriggerOid, &updateTriggerOid);
Assert(OidIsValid(insertTriggerOid) && OidIsValid(parentInsTrigger));
TriggerSetParentTrigger(trigrel, insertTriggerOid, parentInsTrigger,
- partRelid);
+ RelationGetRelid(partition));
Assert(OidIsValid(updateTriggerOid) && OidIsValid(parentUpdTrigger));
TriggerSetParentTrigger(trigrel, updateTriggerOid, parentUpdTrigger,
- partRelid);
+ RelationGetRelid(partition));
/*
* If the referenced table is partitioned, then the partition we're
@@ -11572,7 +11564,33 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
table_close(pg_constraint, RowShareLock);
}
- CommandCounterIncrement();
+ /* If validation is needed, put it in the queue now. */
+ if (queueValidation)
+ {
+ Relation conrel;
+
+ /*
+ * We updated this pg_constraint row above to set its parent;
+ * validating it will cause its convalidated flag to change, so we
+ * need CCI here. XXX it might work better to effect the convalidated
+ * changes for all constraints together during phase 3, but that
+ * requires more invasive code surgery.
+ */
+ CommandCounterIncrement();
+
+ conrel = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid));
+ if (!HeapTupleIsValid(partcontup))
+ elog(ERROR, "cache lookup failed for constraint %u", fk->conoid);
+
+ /* Use the same lock as for AT_ValidateConstraint */
+ QueueFKConstraintValidation(wqueue, conrel, partition, partcontup,
+ ShareUpdateExclusiveLock);
+ ReleaseSysCache(partcontup);
+ table_close(conrel, RowExclusiveLock);
+ }
+
return true;
}
@@ -12113,7 +12131,7 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
*
* Add an entry to the wqueue to validate the given foreign key constraint in
* Phase 3 and update the convalidated field in the pg_constraint catalog
- * for the specified relation.
+ * for the specified relation and all its children.
*/
static void
QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
@@ -12126,6 +12144,7 @@ QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
con = (Form_pg_constraint) GETSTRUCT(contuple);
Assert(con->contype == CONSTRAINT_FOREIGN);
+ Assert(!con->convalidated);
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
@@ -12151,9 +12170,48 @@ QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
}
/*
- * We disallow creating invalid foreign keys to or from partitioned
- * tables, so ignoring the recursion bit is okay.
+ * If the table at either end of the constraint is partitioned, we need to
+ * recurse and handle every constraint that is a child of this constraint.
*/
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ||
+ get_rel_relkind(con->confrelid) == RELKIND_PARTITIONED_TABLE)
+ {
+ ScanKeyData pkey;
+ SysScanDesc pscan;
+ HeapTuple childtup;
+
+ ScanKeyInit(&pkey,
+ Anum_pg_constraint_conparentid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(con->oid));
+
+ pscan = systable_beginscan(conrel, ConstraintParentIndexId,
+ true, NULL, 1, &pkey);
+
+ while (HeapTupleIsValid(childtup = systable_getnext(pscan)))
+ {
+ Form_pg_constraint childcon;
+ Relation childrel;
+
+ childcon = (Form_pg_constraint) GETSTRUCT(childtup);
+
+ /*
+ * If the child constraint has already been validated, no further
+ * action is required for it or its descendants, as they are all
+ * valid.
+ */
+ if (childcon->convalidated)
+ continue;
+
+ childrel = table_open(childcon->conrelid, lockmode);
+
+ QueueFKConstraintValidation(wqueue, conrel, childrel, childtup,
+ lockmode);
+ table_close(childrel, NoLock);
+ }
+
+ systable_endscan(pscan);
+ }
/*
* Now update the catalog, while we have the door open.