diff options
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r-- | src/backend/utils/adt/ri_triggers.c | 169 |
1 files changed, 146 insertions, 23 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 25931f397f7..6896e1ae638 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -30,6 +30,7 @@ #include "access/xact.h" #include "catalog/pg_collation.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_proc.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/spi.h" @@ -45,6 +46,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rangetypes.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/ruleutils.h" @@ -96,6 +98,9 @@ * * Information extracted from an FK pg_constraint entry. This is cached in * ri_constraint_cache. + * + * Note that pf/pp/ff_eq_oprs may hold the overlaps operator instead of equals + * for the PERIOD part of a temporal foreign key. */ typedef struct RI_ConstraintInfo { @@ -115,12 +120,15 @@ typedef struct RI_ConstraintInfo int16 confdelsetcols[RI_MAX_NUMKEYS]; /* attnums of cols to set on * delete */ char confmatchtype; /* foreign key's match type */ + bool hasperiod; /* if the foreign key uses PERIOD */ int nkeys; /* number of key columns */ int16 pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */ int16 fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */ Oid pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = FK) */ Oid pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = PK) */ Oid ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */ + Oid period_contained_by_oper; /* anyrange <@ anyrange */ + Oid agged_period_contained_by_oper; /* fkattr <@ range_agg(pkattr) */ dlist_node valid_link; /* Link in list of valid entries */ } RI_ConstraintInfo; @@ -199,8 +207,8 @@ static void ri_BuildQueryKey(RI_QueryKey *key, int32 constr_queryno); static bool ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, const RI_ConstraintInfo *riinfo, bool rel_is_pk); -static bool ri_AttributesEqual(Oid eq_opr, Oid typeid, - Datum oldvalue, Datum newvalue); +static bool ri_CompareWithCast(Oid eq_opr, Oid typeid, + Datum lhs, Datum rhs); static void ri_InitHashTables(void); static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue); @@ -361,14 +369,41 @@ RI_FKey_check(TriggerData *trigdata) * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. + * + * But for temporal FKs we need to make sure + * the FK's range is completely covered. + * So we use this query instead: + * SELECT 1 + * FROM ( + * SELECT pkperiodatt AS r + * FROM [ONLY] pktable x + * WHERE pkatt1 = $1 [AND ...] + * AND pkperiodatt && $n + * FOR KEY SHARE OF x + * ) x1 + * HAVING $n <@ range_agg(x1.r) + * Note if FOR KEY SHARE ever allows GROUP BY and HAVING + * we can make this a bit simpler. * ---------- */ initStringInfo(&querybuf); pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", - pk_only, pkrelname); + if (riinfo->hasperiod) + { + quoteOneName(attname, + RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1])); + + appendStringInfo(&querybuf, + "SELECT 1 FROM (SELECT %s AS r FROM %s%s x", + attname, pk_only, pkrelname); + } + else + { + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); + } querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -386,6 +421,18 @@ RI_FKey_check(TriggerData *trigdata) queryoids[i] = fk_type; } appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); + if (riinfo->hasperiod) + { + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]); + + appendStringInfo(&querybuf, ") x1 HAVING "); + sprintf(paramname, "$%d", riinfo->nkeys); + ri_GenerateQual(&querybuf, "", + paramname, fk_type, + riinfo->agged_period_contained_by_oper, + "pg_catalog.range_agg", ANYMULTIRANGEOID); + appendStringInfo(&querybuf, "(x1.r)"); + } /* Prepare and save the plan */ qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, @@ -492,14 +539,40 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. + * + * But for temporal FKs we need to make sure + * the old PK's range is completely covered. + * So we use this query instead: + * SELECT 1 + * FROM ( + * SELECT pkperiodatt AS r + * FROM [ONLY] pktable x + * WHERE pkatt1 = $1 [AND ...] + * AND pkperiodatt && $n + * FOR KEY SHARE OF x + * ) x1 + * HAVING $n <@ range_agg(x1.r) + * Note if FOR KEY SHARE ever allows GROUP BY and HAVING + * we can make this a bit simpler. * ---------- */ initStringInfo(&querybuf); pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", - pk_only, pkrelname); + if (riinfo->hasperiod) + { + quoteOneName(attname, RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1])); + + appendStringInfo(&querybuf, + "SELECT 1 FROM (SELECT %s AS r FROM %s%s x", + attname, pk_only, pkrelname); + } + else + { + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); + } querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -516,6 +589,18 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, queryoids[i] = pk_type; } appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); + if (riinfo->hasperiod) + { + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]); + + appendStringInfo(&querybuf, ") x1 HAVING "); + sprintf(paramname, "$%d", riinfo->nkeys); + ri_GenerateQual(&querybuf, "", + paramname, fk_type, + riinfo->agged_period_contained_by_oper, + "pg_catalog.range_agg", ANYMULTIRANGEOID); + appendStringInfo(&querybuf, "(x1.r)"); + } /* Prepare and save the plan */ qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, @@ -2154,6 +2239,7 @@ ri_LoadConstraintInfo(Oid constraintOid) riinfo->confupdtype = conForm->confupdtype; riinfo->confdeltype = conForm->confdeltype; riinfo->confmatchtype = conForm->confmatchtype; + riinfo->hasperiod = conForm->conperiod; DeconstructFkConstraintRow(tup, &riinfo->nkeys, @@ -2165,6 +2251,20 @@ ri_LoadConstraintInfo(Oid constraintOid) &riinfo->ndelsetcols, riinfo->confdelsetcols); + /* + * For temporal FKs, get the operators and functions we need. We ask the + * opclass of the PK element for these. This all gets cached (as does the + * generated plan), so there's no performance issue. + */ + if (riinfo->hasperiod) + { + Oid opclass = get_index_column_opclass(conForm->conindid, riinfo->nkeys); + + FindFKPeriodOpers(opclass, + &riinfo->period_contained_by_oper, + &riinfo->agged_period_contained_by_oper); + } + ReleaseSysCache(tup); /* @@ -2776,7 +2876,10 @@ ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan) /* * ri_KeysEqual - * - * Check if all key values in OLD and NEW are equal. + * Check if all key values in OLD and NEW are "equivalent": + * For normal FKs we check for equality. + * For temporal FKs we check that the PK side is a superset of its old value, + * or the FK side is a subset of its old value. * * Note: at some point we might wish to redefine this as checking for * "IS NOT DISTINCT" rather than "=", that is, allow two nulls to be @@ -2832,13 +2935,25 @@ ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, } else { + Oid eq_opr; + + /* + * When comparing the PERIOD columns we can skip the check + * whenever the referencing column stayed equal or shrank, so test + * with the contained-by operator instead. + */ + if (riinfo->hasperiod && i == riinfo->nkeys - 1) + eq_opr = riinfo->period_contained_by_oper; + else + eq_opr = riinfo->ff_eq_oprs[i]; + /* * For the FK table, compare with the appropriate equality * operator. Changes that compare equal will still satisfy the * constraint after the update. */ - if (!ri_AttributesEqual(riinfo->ff_eq_oprs[i], RIAttType(rel, attnums[i]), - oldvalue, newvalue)) + if (!ri_CompareWithCast(eq_opr, RIAttType(rel, attnums[i]), + newvalue, oldvalue)) return false; } } @@ -2848,29 +2963,31 @@ ri_KeysEqual(Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, /* - * ri_AttributesEqual - + * ri_CompareWithCast - * - * Call the appropriate equality comparison operator for two values. + * Call the appropriate comparison operator for two values. + * Normally this is equality, but for the PERIOD part of foreign keys + * it is ContainedBy, so the order of lhs vs rhs is significant. * * NB: we have already checked that neither value is null. */ static bool -ri_AttributesEqual(Oid eq_opr, Oid typeid, - Datum oldvalue, Datum newvalue) +ri_CompareWithCast(Oid eq_opr, Oid typeid, + Datum lhs, Datum rhs) { RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid); /* Do we need to cast the values? */ if (OidIsValid(entry->cast_func_finfo.fn_oid)) { - oldvalue = FunctionCall3(&entry->cast_func_finfo, - oldvalue, - Int32GetDatum(-1), /* typmod */ - BoolGetDatum(false)); /* implicit coercion */ - newvalue = FunctionCall3(&entry->cast_func_finfo, - newvalue, - Int32GetDatum(-1), /* typmod */ - BoolGetDatum(false)); /* implicit coercion */ + lhs = FunctionCall3(&entry->cast_func_finfo, + lhs, + Int32GetDatum(-1), /* typmod */ + BoolGetDatum(false)); /* implicit coercion */ + rhs = FunctionCall3(&entry->cast_func_finfo, + rhs, + Int32GetDatum(-1), /* typmod */ + BoolGetDatum(false)); /* implicit coercion */ } /* @@ -2884,10 +3001,16 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid, * open), we'll just use the default collation here, which could lead to * some false negatives. All this would break if we ever allow * database-wide collations to be nondeterministic. + * + * With range/multirangetypes, the collation of the base type is stored as + * part of the rangetype (pg_range.rngcollation), and always used, so + * there is no danger of inconsistency even using a non-equals operator. + * But if we support arbitrary types with PERIOD, we should perhaps just + * always force a re-check. */ return DatumGetBool(FunctionCall2Coll(&entry->eq_opr_finfo, DEFAULT_COLLATION_OID, - oldvalue, newvalue)); + lhs, rhs)); } /* @@ -2942,7 +3065,7 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid) * the cast function to get to the operator's input type. * * XXX eventually it would be good to support array-coercion cases - * here and in ri_AttributesEqual(). At the moment there is no point + * here and in ri_CompareWithCast(). At the moment there is no point * because cases involving nonidentical array types will be rejected * at constraint creation time. * |