diff options
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r-- | src/backend/utils/adt/ri_triggers.c | 564 |
1 files changed, 330 insertions, 234 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 01d4c22cfce..088b4027008 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "access/genam.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "access/table.h" @@ -33,6 +34,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_type.h" #include "commands/trigger.h" +#include "executor/execPartition.h" #include "executor/executor.h" #include "executor/spi.h" #include "lib/ilist.h" @@ -68,19 +70,14 @@ #define RI_KEYS_NONE_NULL 2 /* RI query type codes */ -/* these queries are executed against the PK (referenced) table: */ -#define RI_PLAN_CHECK_LOOKUPPK 1 -#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2 -#define RI_PLAN_LAST_ON_PK RI_PLAN_CHECK_LOOKUPPK_FROM_PK -/* these queries are executed against the FK (referencing) table: */ -#define RI_PLAN_CASCADE_ONDELETE 3 -#define RI_PLAN_CASCADE_ONUPDATE 4 +#define RI_PLAN_CASCADE_ONDELETE 1 +#define RI_PLAN_CASCADE_ONUPDATE 2 /* For RESTRICT, the same plan can be used for both ON DELETE and ON UPDATE triggers. */ -#define RI_PLAN_RESTRICT 5 -#define RI_PLAN_SETNULL_ONDELETE 6 -#define RI_PLAN_SETNULL_ONUPDATE 7 -#define RI_PLAN_SETDEFAULT_ONDELETE 8 -#define RI_PLAN_SETDEFAULT_ONUPDATE 9 +#define RI_PLAN_RESTRICT 3 +#define RI_PLAN_SETNULL_ONDELETE 4 +#define RI_PLAN_SETNULL_ONUPDATE 5 +#define RI_PLAN_SETDEFAULT_ONDELETE 6 +#define RI_PLAN_SETDEFAULT_ONUPDATE 7 #define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3) #define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2) @@ -229,10 +226,279 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot, static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, TupleTableSlot *violatorslot, TupleDesc tupdesc, - int queryno, bool partgone) pg_attribute_noreturn(); + bool on_fk, bool partgone) pg_attribute_noreturn(); +static Oid get_fkey_unique_index(Oid conoid); /* + * Checks whether a tuple containing the unique key as extracted from the + * tuple provided in 'slot' exists in 'pk_rel'. The key is extracted using the + * constraint's index given in 'riinfo', which is also scanned to check the + * existence of the key. + * + * If 'pk_rel' is a partitioned table, the check is performed on its leaf + * partition that would contain the key. + * + * The provided tuple is either the one being inserted into the referencing + * relation ('fk_rel' is non-NULL), or the one being deleted from the + * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL). + */ +static bool +ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel, + TupleTableSlot *slot, const RI_ConstraintInfo *riinfo) +{ + Oid constr_id = riinfo->constraint_id; + Oid idxoid; + Relation idxrel; + Relation leaf_pk_rel = NULL; + int num_pk; + int i; + bool found = false; + const Oid *eq_oprs; + Datum pk_vals[INDEX_MAX_KEYS]; + char pk_nulls[INDEX_MAX_KEYS]; + ScanKeyData skey[INDEX_MAX_KEYS]; + Snapshot snap = InvalidSnapshot; + bool pushed_latest_snapshot = false; + IndexScanDesc scan; + TupleTableSlot *outslot; + Oid saved_userid; + int saved_sec_context; + AclResult aclresult; + + /* + * Extract the unique key from the provided slot and choose the equality + * operators to use when scanning the index below. + */ + if (fk_rel) + { + ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls); + /* Use PK = FK equality operator. */ + eq_oprs = riinfo->pf_eq_oprs; + + /* + * May need to cast each of the individual values of the foreign key + * to the corresponding PK column's type if the equality operator + * demands it. + */ + for (i = 0; i < riinfo->nkeys; i++) + { + if (pk_nulls[i] != 'n') + { + Oid eq_opr = eq_oprs[i]; + Oid typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]); + RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid); + + if (OidIsValid(entry->cast_func_finfo.fn_oid)) + pk_vals[i] = FunctionCall3(&entry->cast_func_finfo, + pk_vals[i], + Int32GetDatum(-1), /* typmod */ + BoolGetDatum(false)); /* implicit coercion */ + } + } + } + else + { + ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls); + /* Use PK = PK equality operator. */ + eq_oprs = riinfo->pp_eq_oprs; + } + + /* + * Switch to referenced table's owner to perform the below operations as. + * This matches what ri_PerformCheck() does. + * + * Note that as with queries done by ri_PerformCheck(), the way we select + * the referenced row below effectively bypasses any RLS policies that may + * be present on the referenced table. + */ + GetUserIdAndSecContext(&saved_userid, &saved_sec_context); + SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner, + saved_sec_context | SECURITY_LOCAL_USERID_CHANGE); + + /* + * Also check that the new user has permissions to look into the schema of + * and SELECT from the referenced table. + */ + aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel), + GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_SCHEMA, + get_namespace_name(RelationGetNamespace(pk_rel))); + aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(), + ACL_SELECT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_TABLE, + RelationGetRelationName(pk_rel)); + + /* Make the changes of the current command visible in all cases. */ + CommandCounterIncrement(); + + /* + * In the case of scanning the PK index for ri_Check_Pk_Match(), we'd like + * to see all rows that could be interesting, even those that would not be + * visible to the transaction snapshot. To do so, force-push the latest + * snapshot. + */ + if (fk_rel == NULL) + { + snap = GetLatestSnapshot(); + PushActiveSnapshot(snap); + pushed_latest_snapshot = true; + } + else + { + snap = GetTransactionSnapshot(); + PushActiveSnapshot(snap); + } + + /* + * Open the constraint index to be scanned. + * + * If the target table is partitioned, we must look up the leaf partition + * and its corresponding unique index to search the keys in. + */ + idxoid = get_fkey_unique_index(constr_id); + if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + Oid leaf_idxoid; + Snapshot mysnap = InvalidSnapshot; + + /* + * XXX the partition descriptor machinery has a hack that assumes that + * the queries originating in this module push the latest snapshot in + * the transaction-snapshot mode. If we haven't pushed one already, + * do so now. + */ + if (!pushed_latest_snapshot) + { + mysnap = GetLatestSnapshot(); + PushActiveSnapshot(mysnap); + } + + leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys, + riinfo->pk_attnums, + pk_vals, pk_nulls, + idxoid, RowShareLock, + &leaf_idxoid); + + /* + * XXX done fiddling with the partition descriptor machinery so unset + * the active snapshot if we must. + */ + if (mysnap != InvalidSnapshot) + PopActiveSnapshot(); + + /* + * If no suitable leaf partition exists, neither can the key we're + * looking for. + */ + if (leaf_pk_rel == NULL) + { + SetUserIdAndSecContext(saved_userid, saved_sec_context); + PopActiveSnapshot(); + return false; + } + + pk_rel = leaf_pk_rel; + idxoid = leaf_idxoid; + } + idxrel = index_open(idxoid, RowShareLock); + + /* Set up ScanKeys for the index scan. */ + num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel); + for (i = 0; i < num_pk; i++) + { + int pkattno = i + 1; + Oid operator = eq_oprs[i]; + Oid opfamily = idxrel->rd_opfamily[i]; + StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily); + RegProcedure regop = get_opcode(operator); + + /* Initialize the scankey. */ + ScanKeyInit(&skey[i], + pkattno, + strat, + regop, + pk_vals[i]); + + skey[i].sk_collation = idxrel->rd_indcollation[i]; + + /* + * Check for null value. Should not occur, because callers currently + * take care of the cases in which they do occur. + */ + if (pk_nulls[i] == 'n') + skey[i].sk_flags |= SK_ISNULL; + } + + scan = index_beginscan(pk_rel, idxrel, snap, num_pk, 0); + index_rescan(scan, skey, num_pk, NULL, 0); + + /* Look for the tuple, and if found, try to lock it in key share mode. */ + outslot = table_slot_create(pk_rel, NULL); + if (index_getnext_slot(scan, ForwardScanDirection, outslot)) + { + /* + * If we fail to lock the tuple for whatever reason, assume it doesn't + * exist. + */ + found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot, + snap, + GetCurrentCommandId(false), + LockTupleKeyShare, + LockWaitBlock, NULL); + } + + index_endscan(scan); + ExecDropSingleTupleTableSlot(outslot); + + /* Don't release lock until commit. */ + index_close(idxrel, NoLock); + + /* Close leaf partition relation if any. */ + if (leaf_pk_rel) + table_close(leaf_pk_rel, NoLock); + + /* Restore UID and security context */ + SetUserIdAndSecContext(saved_userid, saved_sec_context); + + PopActiveSnapshot(); + + return found; +} + +/* + * get_fkey_unique_index + * Returns the unique index used by a supposedly foreign key constraint + * + * XXX This is very similar to get_constraint_index; probably they should be + * unified. + */ +static Oid +get_fkey_unique_index(Oid conoid) +{ + Oid result = InvalidOid; + HeapTuple tp; + + tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid)); + if (HeapTupleIsValid(tp)) + { + Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp); + + if (contup->contype == CONSTRAINT_FOREIGN) + result = contup->conindid; + ReleaseSysCache(tp); + } + + if (!OidIsValid(result)) + elog(ERROR, "unique index not found for foreign key constraint %u", + conoid); + + return result; +} + +/* * RI_FKey_check - * * Check foreign key existence (combined for INSERT and UPDATE). @@ -244,8 +510,6 @@ RI_FKey_check(TriggerData *trigdata) Relation fk_rel; Relation pk_rel; TupleTableSlot *newslot; - RI_QueryKey qkey; - SPIPlanPtr qplan; riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger, trigdata->tg_relation, false); @@ -325,9 +589,9 @@ RI_FKey_check(TriggerData *trigdata) /* * MATCH PARTIAL - all non-null columns must match. (not - * implemented, can be done by modifying the query below - * to only include non-null columns, or by writing a - * special version here) + * implemented, can be done by modifying + * ri_ReferencedKeyExists() to only include non-null + * columns. */ break; #endif @@ -342,74 +606,12 @@ RI_FKey_check(TriggerData *trigdata) break; } - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - - /* Fetch or prepare a saved plan for the real check */ - ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK); - - if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) - { - StringInfoData querybuf; - char pkrelname[MAX_QUOTED_REL_NAME_LEN]; - char attname[MAX_QUOTED_NAME_LEN]; - char paramname[16]; - const char *querysep; - Oid queryoids[RI_MAX_NUMKEYS]; - const char *pk_only; - - /* ---------- - * The query string built is - * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] - * FOR KEY SHARE OF x - * The type id's for the $ parameters are those of the - * corresponding FK attributes. - * ---------- - */ - initStringInfo(&querybuf); - pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? - "" : "ONLY "; - quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", - pk_only, pkrelname); - querysep = "WHERE"; - for (int i = 0; i < riinfo->nkeys; i++) - { - Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); - Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]); - - quoteOneName(attname, - RIAttName(pk_rel, riinfo->pk_attnums[i])); - sprintf(paramname, "$%d", i + 1); - ri_GenerateQual(&querybuf, querysep, - attname, pk_type, - riinfo->pf_eq_oprs[i], - paramname, fk_type); - querysep = "AND"; - queryoids[i] = fk_type; - } - appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); - - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, - &qkey, fk_rel, pk_rel); - } - - /* - * Now check that foreign key exists in PK table - * - * XXX detectNewRows must be true when a partitioned table is on the - * referenced side. The reason is that our snapshot must be fresh in - * order for the hack in find_inheritance_children() to work. - */ - ri_PerformCheck(riinfo, &qkey, qplan, - fk_rel, pk_rel, - NULL, newslot, - pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE, - SPI_OK_SELECT); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo)) + ri_ReportViolation(riinfo, + pk_rel, fk_rel, + newslot, + NULL, + true, false); table_close(pk_rel, RowShareLock); @@ -464,81 +666,10 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, TupleTableSlot *oldslot, const RI_ConstraintInfo *riinfo) { - SPIPlanPtr qplan; - RI_QueryKey qkey; - bool result; - /* Only called for non-null rows */ Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL); - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - - /* - * Fetch or prepare a saved plan for checking PK table with values coming - * from a PK row - */ - ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK); - - if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) - { - StringInfoData querybuf; - char pkrelname[MAX_QUOTED_REL_NAME_LEN]; - char attname[MAX_QUOTED_NAME_LEN]; - char paramname[16]; - const char *querysep; - const char *pk_only; - Oid queryoids[RI_MAX_NUMKEYS]; - - /* ---------- - * The query string built is - * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] - * FOR KEY SHARE OF x - * The type id's for the $ parameters are those of the - * PK attributes themselves. - * ---------- - */ - initStringInfo(&querybuf); - pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? - "" : "ONLY "; - quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", - pk_only, pkrelname); - querysep = "WHERE"; - for (int i = 0; i < riinfo->nkeys; i++) - { - Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); - - quoteOneName(attname, - RIAttName(pk_rel, riinfo->pk_attnums[i])); - sprintf(paramname, "$%d", i + 1); - ri_GenerateQual(&querybuf, querysep, - attname, pk_type, - riinfo->pp_eq_oprs[i], - paramname, pk_type); - querysep = "AND"; - queryoids[i] = pk_type; - } - appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); - - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, - &qkey, fk_rel, pk_rel); - } - - /* - * We have a plan now. Run it. - */ - result = ri_PerformCheck(riinfo, &qkey, qplan, - fk_rel, pk_rel, - oldslot, NULL, - true, /* treat like update */ - SPI_OK_SELECT); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); - - return result; + return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo); } @@ -1608,15 +1739,10 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) errtableconstraint(fk_rel, NameStr(fake_riinfo.conname)))); - /* - * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK - * query, which isn't true, but will cause it to use - * fake_riinfo.fk_attnums as we need. - */ ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, slot, tupdesc, - RI_PLAN_CHECK_LOOKUPPK, false); + true, false); ExecDropSingleTupleTableSlot(slot); } @@ -1833,7 +1959,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) fake_riinfo.pk_attnums[i] = i + 1; ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, - slot, tupdesc, 0, true); + slot, tupdesc, true, true); } if (SPI_finish() != SPI_OK_FINISH) @@ -1970,9 +2096,8 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo, { /* * Inherited constraints with a common ancestor can share ri_query_cache - * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK. - * Except in that case, the query processes the other table involved in - * the FK constraint (i.e., not the table on which the trigger has been + * entries, because each query processes the other table involved in the + * FK constraint (i.e., not the table on which the trigger has been * fired), and so it will be the same for all members of the inheritance * tree. So we may use the root constraint's OID in the hash key, rather * than the constraint's own OID. This avoids creating duplicate SPI @@ -1983,13 +2108,13 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo, * constraint, because partitions can have different column orders, * resulting in different pk_attnums[] or fk_attnums[] array contents.) * + * (Note also that for a standalone or non-inherited constraint, + * constraint_root_id is same as constraint_id.) + * * We assume struct RI_QueryKey contains no padding bytes, else we'd need * to use memset to clear them. */ - if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK) - key->constr_id = riinfo->constraint_root_id; - else - key->constr_id = riinfo->constraint_id; + key->constr_id = riinfo->constraint_root_id; key->constr_queryno = constr_queryno; } @@ -2260,19 +2385,12 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes, RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel) { SPIPlanPtr qplan; - Relation query_rel; + + /* There are currently no queries that run on PK table. */ + Relation query_rel = fk_rel; Oid save_userid; int save_sec_context; - /* - * Use the query type code to determine whether the query is run against - * the PK or FK table; we'll do the check as that table's owner - */ - if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK) - query_rel = pk_rel; - else - query_rel = fk_rel; - /* Switch to proper UID to perform check as */ GetUserIdAndSecContext(&save_userid, &save_sec_context); SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner, @@ -2305,9 +2423,9 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool detectNewRows, int expect_OK) { - Relation query_rel, - source_rel; - bool source_is_pk; + /* There are currently no queries that run on PK table. */ + Relation query_rel = fk_rel, + source_rel = pk_rel; Snapshot test_snapshot; Snapshot crosscheck_snapshot; int limit; @@ -2317,46 +2435,17 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, Datum vals[RI_MAX_NUMKEYS * 2]; char nulls[RI_MAX_NUMKEYS * 2]; - /* - * Use the query type code to determine whether the query is run against - * the PK or FK table; we'll do the check as that table's owner - */ - if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK) - query_rel = pk_rel; - else - query_rel = fk_rel; - - /* - * The values for the query are taken from the table on which the trigger - * is called - it is normally the other one with respect to query_rel. An - * exception is ri_Check_Pk_Match(), which uses the PK table for both (and - * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK). We might eventually - * need some less klugy way to determine this. - */ - if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK) - { - source_rel = fk_rel; - source_is_pk = false; - } - else - { - source_rel = pk_rel; - source_is_pk = true; - } - /* Extract the parameters to be passed into the query */ if (newslot) { - ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk, - vals, nulls); + ri_ExtractValues(source_rel, newslot, riinfo, true, vals, nulls); if (oldslot) - ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk, + ri_ExtractValues(source_rel, oldslot, riinfo, true, vals + riinfo->nkeys, nulls + riinfo->nkeys); } else { - ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk, - vals, nulls); + ri_ExtractValues(source_rel, oldslot, riinfo, true, vals, nulls); } /* @@ -2420,14 +2509,12 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, errhint("This is most likely due to a rule having rewritten the query."))); /* XXX wouldn't it be clearer to do this part at the caller? */ - if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK && - expect_OK == SPI_OK_SELECT && - (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)) + if (expect_OK == SPI_OK_SELECT && SPI_processed != 0) ri_ReportViolation(riinfo, pk_rel, fk_rel, newslot ? newslot : oldslot, NULL, - qkey->constr_queryno, false); + false, false); return SPI_processed != 0; } @@ -2458,9 +2545,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot, /* * Produce an error report * - * If the failed constraint was on insert/update to the FK table, - * we want the key names and values extracted from there, and the error - * message to look like 'key blah is not present in PK'. + * If the failed constraint was on insert/update to the FK table (on_fk is + * true), we want the key names and values extracted from there, and the + * error message to look like 'key blah is not present in PK'. * Otherwise, the attr names and values come from the PK table and the * message looks like 'key blah is still referenced from FK'. */ @@ -2468,22 +2555,20 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, TupleTableSlot *violatorslot, TupleDesc tupdesc, - int queryno, bool partgone) + bool on_fk, bool partgone) { StringInfoData key_names; StringInfoData key_values; - bool onfk; const int16 *attnums; Oid rel_oid; AclResult aclresult; bool has_perm = true; /* - * Determine which relation to complain about. If tupdesc wasn't passed - * by caller, assume the violator tuple came from there. + * If tupdesc wasn't passed by caller, assume the violator tuple came from + * there. */ - onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK); - if (onfk) + if (on_fk) { attnums = riinfo->fk_attnums; rel_oid = fk_rel->rd_id; @@ -2585,7 +2670,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, key_names.data, key_values.data, RelationGetRelationName(fk_rel)), errtableconstraint(fk_rel, NameStr(riinfo->conname)))); - else if (onfk) + else if (on_fk) ereport(ERROR, (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", @@ -2892,7 +2977,10 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid, * ri_HashCompareOp - * * See if we know how to compare two values, and create a new hash entry - * if not. + * if not. The entry contains the FmgrInfo of the equality operator function + * and that of the cast function, if one is needed to convert the right + * operand (whose type OID has been passed) before passing it to the equality + * function. */ static RI_CompareHashEntry * ri_HashCompareOp(Oid eq_opr, Oid typeid) @@ -2948,8 +3036,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid) * moment since that will never be generated for implicit coercions. */ op_input_types(eq_opr, &lefttype, &righttype); - Assert(lefttype == righttype); - if (typeid == lefttype) + + /* + * Don't need to cast if the values that will be passed to the + * operator will be of expected operand type(s). The operator can be + * cross-type (such as when called by ri_ReferencedKeyExists()), in + * which case, we only need the cast if the right operand value + * doesn't match the type expected by the operator. + */ + if ((lefttype == righttype && typeid == lefttype) || + (lefttype != righttype && typeid == righttype)) castfunc = InvalidOid; /* simplest case */ else { |