diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/tablecmds.c | 150 |
1 files changed, 104 insertions, 46 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index d8f0a99ad93..57662fd7662 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -574,8 +574,9 @@ static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Oid indexOid, Oid parentDelTrigger, Oid parentUpdTrigger, Oid *deleteTrigOid, Oid *updateTrigOid); -static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, - Oid partRelid, +static bool tryAttachPartitionForeignKey(List **wqueue, + ForeignKeyCacheInfo *fk, + Relation partition, Oid parentConstrOid, int numfks, AttrNumber *mapped_conkey, AttrNumber *confkey, Oid *conpfeqop, @@ -9772,22 +9773,12 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Validity checks (permission checks wait till we have the column * numbers) */ - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - if (!recurse) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use ONLY for foreign key on partitioned table \"%s\" referencing relation \"%s\"", - RelationGetRelationName(rel), - RelationGetRelationName(pkrel)))); - if (fkconstraint->skip_validation && !fkconstraint->initially_valid) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot add NOT VALID foreign key on partitioned table \"%s\" referencing relation \"%s\"", - RelationGetRelationName(rel), - RelationGetRelationName(pkrel)), - errdetail("This feature is not yet supported on partitioned tables."))); - } + if (!recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use ONLY for foreign key on partitioned table \"%s\" referencing relation \"%s\"", + RelationGetRelationName(rel), + RelationGetRelationName(pkrel))); if (pkrel->rd_rel->relkind != RELKIND_RELATION && pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) @@ -10782,14 +10773,12 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, */ for (int i = 0; i < pd->nparts; i++) { - Oid partitionId = pd->oids[i]; - Relation partition = table_open(partitionId, lockmode); + Relation partition = table_open(pd->oids[i], lockmode); List *partFKs; AttrMap *attmap; AttrNumber mapped_fkattnum[INDEX_MAX_KEYS]; bool attached; ObjectAddress address; - ListCell *cell; CheckAlterTableIsSafe(partition); @@ -10802,13 +10791,11 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, /* Check whether an existing constraint can be repurposed */ partFKs = copyObject(RelationGetFKeyList(partition)); attached = false; - foreach(cell, partFKs) + foreach_node(ForeignKeyCacheInfo, fk, partFKs) { - ForeignKeyCacheInfo *fk; - - fk = lfirst_node(ForeignKeyCacheInfo, cell); - if (tryAttachPartitionForeignKey(fk, - partitionId, + if (tryAttachPartitionForeignKey(wqueue, + fk, + partition, parentConstr, numfks, mapped_fkattnum, @@ -11260,8 +11247,9 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel) { ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, lc); - if (tryAttachPartitionForeignKey(fk, - RelationGetRelid(partRel), + if (tryAttachPartitionForeignKey(wqueue, + fk, + partRel, parentConstrOid, numfks, mapped_conkey, @@ -11364,8 +11352,9 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel) * return false. */ static bool -tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, - Oid partRelid, +tryAttachPartitionForeignKey(List **wqueue, + ForeignKeyCacheInfo *fk, + Relation partition, Oid parentConstrOid, int numfks, AttrNumber *mapped_conkey, @@ -11379,6 +11368,7 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, Form_pg_constraint parentConstr; HeapTuple partcontup; Form_pg_constraint partConstr; + bool queueValidation; ScanKeyData key; SysScanDesc scan; HeapTuple trigtup; @@ -11411,18 +11401,12 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, } } - /* - * Looks good so far; do some more extensive checks. Presumably the check - * for 'convalidated' could be dropped, since we don't really care about - * that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); + /* Looks good so far; perform more extensive checks. */ + partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid)); if (!HeapTupleIsValid(partcontup)) elog(ERROR, "cache lookup failed for constraint %u", fk->conoid); partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || partConstr->condeferrable != parentConstr->condeferrable || partConstr->condeferred != parentConstr->condeferred || partConstr->confupdtype != parentConstr->confupdtype || @@ -11434,6 +11418,13 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, return false; } + /* + * Will we need to validate this constraint? A valid parent constraint + * implies that all child constraints have been validated, so if this one + * isn't, we must trigger phase 3 validation. + */ + queueValidation = parentConstr->convalidated && !partConstr->convalidated; + ReleaseSysCache(partcontup); ReleaseSysCache(parentConstrTup); @@ -11481,7 +11472,8 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, systable_endscan(scan); - ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid); + ConstraintSetParentConstraint(fk->conoid, parentConstrOid, + RelationGetRelid(partition)); /* * Like the constraint, attach partition's "check" triggers to the @@ -11492,10 +11484,10 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, &insertTriggerOid, &updateTriggerOid); Assert(OidIsValid(insertTriggerOid) && OidIsValid(parentInsTrigger)); TriggerSetParentTrigger(trigrel, insertTriggerOid, parentInsTrigger, - partRelid); + RelationGetRelid(partition)); Assert(OidIsValid(updateTriggerOid) && OidIsValid(parentUpdTrigger)); TriggerSetParentTrigger(trigrel, updateTriggerOid, parentUpdTrigger, - partRelid); + RelationGetRelid(partition)); /* * If the referenced table is partitioned, then the partition we're @@ -11572,7 +11564,33 @@ tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, table_close(pg_constraint, RowShareLock); } - CommandCounterIncrement(); + /* If validation is needed, put it in the queue now. */ + if (queueValidation) + { + Relation conrel; + + /* + * We updated this pg_constraint row above to set its parent; + * validating it will cause its convalidated flag to change, so we + * need CCI here. XXX it might work better to effect the convalidated + * changes for all constraints together during phase 3, but that + * requires more invasive code surgery. + */ + CommandCounterIncrement(); + + conrel = table_open(ConstraintRelationId, RowExclusiveLock); + + partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid)); + if (!HeapTupleIsValid(partcontup)) + elog(ERROR, "cache lookup failed for constraint %u", fk->conoid); + + /* Use the same lock as for AT_ValidateConstraint */ + QueueFKConstraintValidation(wqueue, conrel, partition, partcontup, + ShareUpdateExclusiveLock); + ReleaseSysCache(partcontup); + table_close(conrel, RowExclusiveLock); + } + return true; } @@ -12113,7 +12131,7 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, * * Add an entry to the wqueue to validate the given foreign key constraint in * Phase 3 and update the convalidated field in the pg_constraint catalog - * for the specified relation. + * for the specified relation and all its children. */ static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, @@ -12126,6 +12144,7 @@ QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, con = (Form_pg_constraint) GETSTRUCT(contuple); Assert(con->contype == CONSTRAINT_FOREIGN); + Assert(!con->convalidated); if (rel->rd_rel->relkind == RELKIND_RELATION) { @@ -12151,9 +12170,48 @@ QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, } /* - * We disallow creating invalid foreign keys to or from partitioned - * tables, so ignoring the recursion bit is okay. + * If the table at either end of the constraint is partitioned, we need to + * recurse and handle every constraint that is a child of this constraint. */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE || + get_rel_relkind(con->confrelid) == RELKIND_PARTITIONED_TABLE) + { + ScanKeyData pkey; + SysScanDesc pscan; + HeapTuple childtup; + + ScanKeyInit(&pkey, + Anum_pg_constraint_conparentid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(con->oid)); + + pscan = systable_beginscan(conrel, ConstraintParentIndexId, + true, NULL, 1, &pkey); + + while (HeapTupleIsValid(childtup = systable_getnext(pscan))) + { + Form_pg_constraint childcon; + Relation childrel; + + childcon = (Form_pg_constraint) GETSTRUCT(childtup); + + /* + * If the child constraint has already been validated, no further + * action is required for it or its descendants, as they are all + * valid. + */ + if (childcon->convalidated) + continue; + + childrel = table_open(childcon->conrelid, lockmode); + + QueueFKConstraintValidation(wqueue, conrel, childrel, childtup, + lockmode); + table_close(childrel, NoLock); + } + + systable_endscan(pscan); + } /* * Now update the catalog, while we have the door open. |