diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 193 |
1 files changed, 157 insertions, 36 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c0f987cc814..ec2f9616edd 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -338,9 +338,6 @@ static void validateCheckConstraint(Relation rel, HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname, Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid); -static void createForeignKeyTriggers(Relation rel, Oid refRelOid, - Constraint *fkconstraint, - Oid constraintOid, Oid indexOid); static void ATController(AlterTableStmt *parsetree, Relation rel, List *cmds, bool recurse, LOCKMODE lockmode); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, @@ -411,8 +408,10 @@ static ObjectAddress ATAddCheckConstraint(List **wqueue, Constraint *constr, bool recurse, bool recursing, bool is_readd, LOCKMODE lockmode); -static ObjectAddress ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, - Constraint *fkconstraint, LOCKMODE lockmode); +static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, + Relation rel, Constraint *fkconstraint, Oid parentConstr, + bool recurse, bool recursing, + LOCKMODE lockmode); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -505,6 +504,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, * relkind: relkind to assign to the new relation * ownerId: if not InvalidOid, use this as the new relation's owner. * typaddress: if not null, it's set to the pg_type entry's address. + * queryString: for error reporting * * Note that permissions checks are done against current user regardless of * ownerId. A nonzero ownerId is used when someone is creating a relation @@ -908,8 +908,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, } /* - * If we're creating a partition, create now all the indexes and triggers - * defined in the parent. + * If we're creating a partition, create now all the indexes, triggers, + * FKs defined in the parent. * * We can't do it earlier, because DefineIndex wants to know the partition * key which we just stored. @@ -961,6 +961,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, if (parent->trigdesc != NULL) CloneRowTriggersToPartition(parent, rel); + /* + * And foreign keys too. Note that because we're freshly creating the + * table, there is no need to verify these new constraints. + */ + CloneForeignKeyConstraints(parentId, relationId, NULL); + heap_close(parent, NoLock); } @@ -7025,7 +7031,9 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, RelationGetNamespace(rel), NIL); - address = ATAddForeignKeyConstraint(tab, rel, newConstraint, + address = ATAddForeignKeyConstraint(wqueue, tab, rel, + newConstraint, InvalidOid, + recurse, false, lockmode); break; @@ -7180,8 +7188,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * We do permissions checks here, however. */ static ObjectAddress -ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, - Constraint *fkconstraint, LOCKMODE lockmode) +ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Constraint *fkconstraint, Oid parentConstr, + bool recurse, bool recursing, LOCKMODE lockmode) { Relation pkrel; int16 pkattnum[INDEX_MAX_KEYS]; @@ -7220,6 +7229,21 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, errmsg("cannot reference partitioned table \"%s\"", RelationGetRelationName(pkrel)))); + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + if (!recurse) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign key referencing partitioned table \"%s\" must not be ONLY", + RelationGetRelationName(pkrel)))); + if (fkconstraint->skip_validation && !fkconstraint->initially_valid) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot add NOT VALID foreign key to relation \"%s\"", + RelationGetRelationName(pkrel)), + errdetail("This feature is not yet supported on partitioned tables."))); + } + if (pkrel->rd_rel->relkind != RELKIND_RELATION) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -7527,7 +7551,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, fkconstraint->deferrable, fkconstraint->initdeferred, fkconstraint->initially_valid, - InvalidOid, /* no parent constraint */ + parentConstr, RelationGetRelid(rel), fkattnum, numfks, @@ -7553,10 +7577,12 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, ObjectAddressSet(address, ConstraintRelationId, constrOid); /* - * Create the triggers that will enforce the constraint. + * Create the triggers that will enforce the constraint. We only want + * the action triggers to appear for the parent partitioned relation, + * even though the constraints also exist below. */ createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, - constrOid, indexOid); + constrOid, indexOid, !recursing); /* * Tell Phase 3 to check that the constraint is satisfied by existing @@ -7581,6 +7607,40 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, } /* + * When called on a partitioned table, recurse to create the constraint on + * the partitions also. + */ + if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc partdesc; + + partdesc = RelationGetPartitionDesc(rel); + + for (i = 0; i < partdesc->nparts; i++) + { + Oid partitionId = partdesc->oids[i]; + Relation partition = heap_open(partitionId, lockmode); + AlteredTableInfo *childtab; + ObjectAddress childAddr; + + CheckTableNotInUse(partition, "ALTER TABLE"); + + /* Find or create work queue entry for this table */ + childtab = ATGetQueueEntry(wqueue, partition); + + childAddr = + ATAddForeignKeyConstraint(wqueue, childtab, partition, + fkconstraint, constrOid, + recurse, true, lockmode); + + /* Record this constraint as dependent on the parent one */ + recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO); + + heap_close(partition, NoLock); + } + } + + /* * Close pk table, but keep lock until we've committed. */ heap_close(pkrel, NoLock); @@ -7842,8 +7902,8 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse, heap_close(refrel, NoLock); /* - * Foreign keys do not inherit, so we purposely ignore the - * recursion bit here + * We disallow creating invalid foreign keys to or from + * partitioned tables, so ignoring the recursion bit is okay. */ } else if (con->contype == CONSTRAINT_CHECK) @@ -8489,23 +8549,16 @@ CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint, } /* - * Create the triggers that implement an FK constraint. - * - * NB: if you change any trigger properties here, see also - * ATExecAlterConstraint. + * createForeignKeyActionTriggers + * Create the referenced-side "action" triggers that implement a foreign + * key. */ static void -createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, - Oid constraintOid, Oid indexOid) +createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, + Oid constraintOid, Oid indexOid) { - Oid myRelOid; CreateTrigStmt *fk_trigger; - myRelOid = RelationGetRelid(rel); - - /* Make changes-so-far visible */ - CommandCounterIncrement(); - /* * Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON * DELETE action on the referenced table. @@ -8555,7 +8608,8 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, + (void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel), + constraintOid, indexOid, InvalidOid, InvalidOid, NULL, true, false); /* Make changes-so-far visible */ @@ -8610,16 +8664,21 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, + (void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel), + constraintOid, indexOid, InvalidOid, InvalidOid, NULL, true, false); +} - /* Make changes-so-far visible */ - CommandCounterIncrement(); - - /* - * Build and execute CREATE CONSTRAINT TRIGGER statements for the CHECK - * action for both INSERTs and UPDATEs on the referencing table. - */ +/* + * createForeignKeyCheckTriggers + * Create the referencing-side "check" triggers that implement a foreign + * key. + */ +static void +createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid) +{ CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid, indexOid, true); CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid, @@ -8627,6 +8686,37 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, } /* + * Create the triggers that implement an FK constraint. + * + * NB: if you change any trigger properties here, see also + * ATExecAlterConstraint. + */ +void +createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, + Oid constraintOid, Oid indexOid, bool create_action) +{ + /* + * For the referenced side, create action triggers, if requested. (If the + * referencing side is partitioned, there is still only one trigger, which + * runs on the referenced side and points to the top of the referencing + * hierarchy.) + */ + if (create_action) + createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid, + indexOid); + + /* + * For the referencing side, create the check triggers. We only need these + * on the partitions. + */ + if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid, + fkconstraint, constraintOid, indexOid); + + CommandCounterIncrement(); +} + +/* * ALTER TABLE DROP CONSTRAINT * * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism. @@ -13889,6 +13979,8 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) bool found_whole_row; Oid defaultPartOid; List *partBoundConstraint; + List *cloned; + ListCell *l; /* * We must lock the default partition, because attaching a new partition @@ -14072,6 +14164,35 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) CloneRowTriggersToPartition(rel, attachrel); /* + * Clone foreign key constraints, and setup for Phase 3 to verify them. + */ + cloned = NIL; + CloneForeignKeyConstraints(RelationGetRelid(rel), + RelationGetRelid(attachrel), &cloned); + foreach(l, cloned) + { + ClonedConstraint *cloned = lfirst(l); + NewConstraint *newcon; + Relation clonedrel; + AlteredTableInfo *parttab; + + clonedrel = relation_open(cloned->relid, NoLock); + parttab = ATGetQueueEntry(wqueue, clonedrel); + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = cloned->constraint->conname; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = cloned->refrelid; + newcon->refindid = cloned->conindid; + newcon->conid = cloned->conid; + newcon->qual = (Node *) cloned->constraint; + + parttab->constraints = lappend(parttab->constraints, newcon); + + relation_close(clonedrel, NoLock); + } + + /* * Generate partition constraint from the partition bound specification. * If the parent itself is a partition, make sure to include its * constraint as well. |