diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2018-03-23 10:48:22 -0300 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2018-03-23 10:48:22 -0300 |
commit | 86f575948c773b0ec5b0f27066e37dd93a7f0a96 (patch) | |
tree | 564d827795d68a7628429d08043c3ee87c635e9c /src/backend/commands/tablecmds.c | |
parent | 5700aa130186e0b5d600806645b051bfd9067f09 (diff) | |
download | postgresql-86f575948c773b0ec5b0f27066e37dd93a7f0a96.tar.gz postgresql-86f575948c773b0ec5b0f27066e37dd93a7f0a96.zip |
Allow FOR EACH ROW triggers on partitioned tables
Previously, FOR EACH ROW triggers were not allowed in partitioned
tables. Now we allow AFTER triggers on them, and on trigger creation we
cascade to create an identical trigger in each partition. We also clone
the triggers to each partition that is created or attached later.
This means that deferred unique keys are allowed on partitioned tables,
too.
Author: Álvaro Herrera
Reviewed-by: Peter Eisentraut, Simon Riggs, Amit Langote, Robert Haas,
Thomas Munro
Discussion: https://postgr.es/m/20171229225319.ajltgss2ojkfd3kp@alvherre.pgsql
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 150 |
1 files changed, 143 insertions, 7 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index f5c744b9f5a..e74fb1f4691 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -487,6 +487,7 @@ static void ValidatePartitionConstraints(List **wqueue, Relation scanrel, List *scanrel_children, List *partConstraint, bool validate_default); +static void CloneRowTriggersToPartition(Relation parent, Relation partition); static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name); static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation rel, RangeVar *name); @@ -906,9 +907,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, } /* - * If we're creating a partition, create now all the indexes defined in - * the parent. We can't do it earlier, because DefineIndex wants to know - * the partition key which we just stored. + * If we're creating a partition, create now all the indexes and triggers + * defined in the parent. + * + * We can't do it earlier, because DefineIndex wants to know the partition + * key which we just stored. */ if (stmt->partbound) { @@ -949,6 +952,14 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, } list_free(idxlist); + + /* + * If there are any row-level triggers, clone them to the new + * partition. + */ + if (parent->trigdesc != NULL) + CloneRowTriggersToPartition(parent, rel); + heap_close(parent, NoLock); } @@ -7491,6 +7502,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, fkconstraint->deferrable, fkconstraint->initdeferred, fkconstraint->initially_valid, + InvalidOid, /* no parent constraint */ RelationGetRelid(rel), fkattnum, numfks, @@ -8445,7 +8457,7 @@ CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint, fk_trigger->args = NIL; (void) CreateTrigger(fk_trigger, NULL, myRelOid, refRelOid, constraintOid, - indexOid, true); + indexOid, InvalidOid, InvalidOid, NULL, true, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -8519,7 +8531,7 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, fk_trigger->args = NIL; (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, - indexOid, true); + indexOid, InvalidOid, InvalidOid, NULL, true, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -8574,7 +8586,7 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, fk_trigger->args = NIL; (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, - indexOid, true); + indexOid, InvalidOid, InvalidOid, NULL, true, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -11114,7 +11126,7 @@ static void ATExecEnableDisableTrigger(Relation rel, const char *trigname, char fires_when, bool skip_system, LOCKMODE lockmode) { - EnableDisableTrigger(rel, trigname, fires_when, skip_system); + EnableDisableTrigger(rel, trigname, fires_when, skip_system, lockmode); } /* @@ -14031,6 +14043,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) /* Ensure there exists a correct set of indexes in the partition. */ AttachPartitionEnsureIndexes(rel, attachrel); + /* and triggers */ + CloneRowTriggersToPartition(rel, attachrel); + /* * Generate partition constraint from the partition bound specification. * If the parent itself is a partition, make sure to include its @@ -14255,6 +14270,127 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) } /* + * CloneRowTriggersToPartition + * subroutine for ATExecAttachPartition/DefineRelation to create row + * triggers on partitions + */ +static void +CloneRowTriggersToPartition(Relation parent, Relation partition) +{ + Relation pg_trigger; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tuple; + MemoryContext oldcxt, + perTupCxt; + + ScanKeyInit(&key, Anum_pg_trigger_tgrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parent))); + pg_trigger = heap_open(TriggerRelationId, RowExclusiveLock); + scan = systable_beginscan(pg_trigger, TriggerRelidNameIndexId, + true, NULL, 1, &key); + + perTupCxt = AllocSetContextCreate(CurrentMemoryContext, + "clone trig", ALLOCSET_SMALL_SIZES); + oldcxt = MemoryContextSwitchTo(perTupCxt); + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) + { + Form_pg_trigger trigForm; + CreateTrigStmt *trigStmt; + Node *qual = NULL; + Datum value; + bool isnull; + List *cols = NIL; + + trigForm = (Form_pg_trigger) GETSTRUCT(tuple); + + /* + * Ignore statement-level triggers; those are not cloned. + */ + if (!TRIGGER_FOR_ROW(trigForm->tgtype)) + continue; + + /* + * Complain if we find an unexpected trigger type. + */ + if (!TRIGGER_FOR_AFTER(trigForm->tgtype)) + elog(ERROR, "unexpected trigger \"%s\" found", + NameStr(trigForm->tgname)); + + /* + * If there is a WHEN clause, generate a 'cooked' version of it that's + * appropriate for the partition. + */ + value = heap_getattr(tuple, Anum_pg_trigger_tgqual, + RelationGetDescr(pg_trigger), &isnull); + if (!isnull) + { + bool found_whole_row; + + qual = stringToNode(TextDatumGetCString(value)); + qual = (Node *) map_partition_varattnos((List *) qual, PRS2_OLD_VARNO, + partition, parent, + &found_whole_row); + if (found_whole_row) + elog(ERROR, "unexpected whole-row reference found in trigger WHEN clause"); + qual = (Node *) map_partition_varattnos((List *) qual, PRS2_NEW_VARNO, + partition, parent, + &found_whole_row); + if (found_whole_row) + elog(ERROR, "unexpected whole-row reference found in trigger WHEN clause"); + } + + /* + * If there is a column list, transform it to a list of column names. + * Note we don't need to map this list in any way ... + */ + if (trigForm->tgattr.dim1 > 0) + { + int i; + + for (i = 0; i < trigForm->tgattr.dim1; i++) + { + Form_pg_attribute col; + + col = TupleDescAttr(parent->rd_att, + trigForm->tgattr.values[i] - 1); + cols = lappend(cols, makeString(NameStr(col->attname))); + } + } + + trigStmt = makeNode(CreateTrigStmt); + trigStmt->trigname = NameStr(trigForm->tgname); + trigStmt->relation = NULL; + trigStmt->funcname = NULL; /* passed separately */ + trigStmt->args = NULL; /* passed separately */ + trigStmt->row = true; + trigStmt->timing = trigForm->tgtype & TRIGGER_TYPE_TIMING_MASK; + trigStmt->events = trigForm->tgtype & TRIGGER_TYPE_EVENT_MASK; + trigStmt->columns = cols; + trigStmt->whenClause = NULL; /* passed separately */ + trigStmt->isconstraint = OidIsValid(trigForm->tgconstraint); + trigStmt->transitionRels = NIL; /* not supported at present */ + trigStmt->deferrable = trigForm->tgdeferrable; + trigStmt->initdeferred = trigForm->tginitdeferred; + trigStmt->constrrel = NULL; /* passed separately */ + + CreateTrigger(trigStmt, NULL, RelationGetRelid(partition), + trigForm->tgconstrrelid, InvalidOid, InvalidOid, + trigForm->tgfoid, HeapTupleGetOid(tuple), qual, + false, true); + + MemoryContextReset(perTupCxt); + } + + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(perTupCxt); + + systable_endscan(scan); + heap_close(pg_trigger, RowExclusiveLock); +} + +/* * ALTER TABLE DETACH PARTITION * * Return the address of the relation that is no longer a partition of rel. |