aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/adt/ri_triggers.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r--src/backend/utils/adt/ri_triggers.c564
1 files changed, 330 insertions, 234 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 01d4c22cfce..088b4027008 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -23,6 +23,7 @@
#include "postgres.h"
+#include "access/genam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/table.h"
@@ -33,6 +34,7 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/trigger.h"
+#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "lib/ilist.h"
@@ -68,19 +70,14 @@
#define RI_KEYS_NONE_NULL 2
/* RI query type codes */
-/* these queries are executed against the PK (referenced) table: */
-#define RI_PLAN_CHECK_LOOKUPPK 1
-#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2
-#define RI_PLAN_LAST_ON_PK RI_PLAN_CHECK_LOOKUPPK_FROM_PK
-/* these queries are executed against the FK (referencing) table: */
-#define RI_PLAN_CASCADE_ONDELETE 3
-#define RI_PLAN_CASCADE_ONUPDATE 4
+#define RI_PLAN_CASCADE_ONDELETE 1
+#define RI_PLAN_CASCADE_ONUPDATE 2
/* For RESTRICT, the same plan can be used for both ON DELETE and ON UPDATE triggers. */
-#define RI_PLAN_RESTRICT 5
-#define RI_PLAN_SETNULL_ONDELETE 6
-#define RI_PLAN_SETNULL_ONUPDATE 7
-#define RI_PLAN_SETDEFAULT_ONDELETE 8
-#define RI_PLAN_SETDEFAULT_ONUPDATE 9
+#define RI_PLAN_RESTRICT 3
+#define RI_PLAN_SETNULL_ONDELETE 4
+#define RI_PLAN_SETNULL_ONUPDATE 5
+#define RI_PLAN_SETDEFAULT_ONDELETE 6
+#define RI_PLAN_SETDEFAULT_ONUPDATE 7
#define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3)
#define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2)
@@ -229,10 +226,279 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
- int queryno, bool partgone) pg_attribute_noreturn();
+ bool on_fk, bool partgone) pg_attribute_noreturn();
+static Oid get_fkey_unique_index(Oid conoid);
/*
+ * Checks whether a tuple containing the unique key as extracted from the
+ * tuple provided in 'slot' exists in 'pk_rel'. The key is extracted using the
+ * constraint's index given in 'riinfo', which is also scanned to check the
+ * existence of the key.
+ *
+ * If 'pk_rel' is a partitioned table, the check is performed on its leaf
+ * partition that would contain the key.
+ *
+ * The provided tuple is either the one being inserted into the referencing
+ * relation ('fk_rel' is non-NULL), or the one being deleted from the
+ * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL).
+ */
+static bool
+ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
+ TupleTableSlot *slot, const RI_ConstraintInfo *riinfo)
+{
+ Oid constr_id = riinfo->constraint_id;
+ Oid idxoid;
+ Relation idxrel;
+ Relation leaf_pk_rel = NULL;
+ int num_pk;
+ int i;
+ bool found = false;
+ const Oid *eq_oprs;
+ Datum pk_vals[INDEX_MAX_KEYS];
+ char pk_nulls[INDEX_MAX_KEYS];
+ ScanKeyData skey[INDEX_MAX_KEYS];
+ Snapshot snap = InvalidSnapshot;
+ bool pushed_latest_snapshot = false;
+ IndexScanDesc scan;
+ TupleTableSlot *outslot;
+ Oid saved_userid;
+ int saved_sec_context;
+ AclResult aclresult;
+
+ /*
+ * Extract the unique key from the provided slot and choose the equality
+ * operators to use when scanning the index below.
+ */
+ if (fk_rel)
+ {
+ ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls);
+ /* Use PK = FK equality operator. */
+ eq_oprs = riinfo->pf_eq_oprs;
+
+ /*
+ * May need to cast each of the individual values of the foreign key
+ * to the corresponding PK column's type if the equality operator
+ * demands it.
+ */
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ if (pk_nulls[i] != 'n')
+ {
+ Oid eq_opr = eq_oprs[i];
+ Oid typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+ RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+ if (OidIsValid(entry->cast_func_finfo.fn_oid))
+ pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+ pk_vals[i],
+ Int32GetDatum(-1), /* typmod */
+ BoolGetDatum(false)); /* implicit coercion */
+ }
+ }
+ }
+ else
+ {
+ ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls);
+ /* Use PK = PK equality operator. */
+ eq_oprs = riinfo->pp_eq_oprs;
+ }
+
+ /*
+ * Switch to referenced table's owner to perform the below operations as.
+ * This matches what ri_PerformCheck() does.
+ *
+ * Note that as with queries done by ri_PerformCheck(), the way we select
+ * the referenced row below effectively bypasses any RLS policies that may
+ * be present on the referenced table.
+ */
+ GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
+ SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
+ saved_sec_context | SECURITY_LOCAL_USERID_CHANGE);
+
+ /*
+ * Also check that the new user has permissions to look into the schema of
+ * and SELECT from the referenced table.
+ */
+ aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
+ GetUserId(), ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_SCHEMA,
+ get_namespace_name(RelationGetNamespace(pk_rel)));
+ aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
+ ACL_SELECT);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_TABLE,
+ RelationGetRelationName(pk_rel));
+
+ /* Make the changes of the current command visible in all cases. */
+ CommandCounterIncrement();
+
+ /*
+ * In the case of scanning the PK index for ri_Check_Pk_Match(), we'd like
+ * to see all rows that could be interesting, even those that would not be
+ * visible to the transaction snapshot. To do so, force-push the latest
+ * snapshot.
+ */
+ if (fk_rel == NULL)
+ {
+ snap = GetLatestSnapshot();
+ PushActiveSnapshot(snap);
+ pushed_latest_snapshot = true;
+ }
+ else
+ {
+ snap = GetTransactionSnapshot();
+ PushActiveSnapshot(snap);
+ }
+
+ /*
+ * Open the constraint index to be scanned.
+ *
+ * If the target table is partitioned, we must look up the leaf partition
+ * and its corresponding unique index to search the keys in.
+ */
+ idxoid = get_fkey_unique_index(constr_id);
+ if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ Oid leaf_idxoid;
+ Snapshot mysnap = InvalidSnapshot;
+
+ /*
+ * XXX the partition descriptor machinery has a hack that assumes that
+ * the queries originating in this module push the latest snapshot in
+ * the transaction-snapshot mode. If we haven't pushed one already,
+ * do so now.
+ */
+ if (!pushed_latest_snapshot)
+ {
+ mysnap = GetLatestSnapshot();
+ PushActiveSnapshot(mysnap);
+ }
+
+ leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
+ riinfo->pk_attnums,
+ pk_vals, pk_nulls,
+ idxoid, RowShareLock,
+ &leaf_idxoid);
+
+ /*
+ * XXX done fiddling with the partition descriptor machinery so unset
+ * the active snapshot if we must.
+ */
+ if (mysnap != InvalidSnapshot)
+ PopActiveSnapshot();
+
+ /*
+ * If no suitable leaf partition exists, neither can the key we're
+ * looking for.
+ */
+ if (leaf_pk_rel == NULL)
+ {
+ SetUserIdAndSecContext(saved_userid, saved_sec_context);
+ PopActiveSnapshot();
+ return false;
+ }
+
+ pk_rel = leaf_pk_rel;
+ idxoid = leaf_idxoid;
+ }
+ idxrel = index_open(idxoid, RowShareLock);
+
+ /* Set up ScanKeys for the index scan. */
+ num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+ for (i = 0; i < num_pk; i++)
+ {
+ int pkattno = i + 1;
+ Oid operator = eq_oprs[i];
+ Oid opfamily = idxrel->rd_opfamily[i];
+ StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
+ RegProcedure regop = get_opcode(operator);
+
+ /* Initialize the scankey. */
+ ScanKeyInit(&skey[i],
+ pkattno,
+ strat,
+ regop,
+ pk_vals[i]);
+
+ skey[i].sk_collation = idxrel->rd_indcollation[i];
+
+ /*
+ * Check for null value. Should not occur, because callers currently
+ * take care of the cases in which they do occur.
+ */
+ if (pk_nulls[i] == 'n')
+ skey[i].sk_flags |= SK_ISNULL;
+ }
+
+ scan = index_beginscan(pk_rel, idxrel, snap, num_pk, 0);
+ index_rescan(scan, skey, num_pk, NULL, 0);
+
+ /* Look for the tuple, and if found, try to lock it in key share mode. */
+ outslot = table_slot_create(pk_rel, NULL);
+ if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+ {
+ /*
+ * If we fail to lock the tuple for whatever reason, assume it doesn't
+ * exist.
+ */
+ found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+ snap,
+ GetCurrentCommandId(false),
+ LockTupleKeyShare,
+ LockWaitBlock, NULL);
+ }
+
+ index_endscan(scan);
+ ExecDropSingleTupleTableSlot(outslot);
+
+ /* Don't release lock until commit. */
+ index_close(idxrel, NoLock);
+
+ /* Close leaf partition relation if any. */
+ if (leaf_pk_rel)
+ table_close(leaf_pk_rel, NoLock);
+
+ /* Restore UID and security context */
+ SetUserIdAndSecContext(saved_userid, saved_sec_context);
+
+ PopActiveSnapshot();
+
+ return found;
+}
+
+/*
+ * get_fkey_unique_index
+ * Returns the unique index used by a supposedly foreign key constraint
+ *
+ * XXX This is very similar to get_constraint_index; probably they should be
+ * unified.
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+ Oid result = InvalidOid;
+ HeapTuple tp;
+
+ tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+ if (HeapTupleIsValid(tp))
+ {
+ Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+ if (contup->contype == CONSTRAINT_FOREIGN)
+ result = contup->conindid;
+ ReleaseSysCache(tp);
+ }
+
+ if (!OidIsValid(result))
+ elog(ERROR, "unique index not found for foreign key constraint %u",
+ conoid);
+
+ return result;
+}
+
+/*
* RI_FKey_check -
*
* Check foreign key existence (combined for INSERT and UPDATE).
@@ -244,8 +510,6 @@ RI_FKey_check(TriggerData *trigdata)
Relation fk_rel;
Relation pk_rel;
TupleTableSlot *newslot;
- RI_QueryKey qkey;
- SPIPlanPtr qplan;
riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
trigdata->tg_relation, false);
@@ -325,9 +589,9 @@ RI_FKey_check(TriggerData *trigdata)
/*
* MATCH PARTIAL - all non-null columns must match. (not
- * implemented, can be done by modifying the query below
- * to only include non-null columns, or by writing a
- * special version here)
+ * implemented, can be done by modifying
+ * ri_ReferencedKeyExists() to only include non-null
+ * columns.
*/
break;
#endif
@@ -342,74 +606,12 @@ RI_FKey_check(TriggerData *trigdata)
break;
}
- if (SPI_connect() != SPI_OK_CONNECT)
- elog(ERROR, "SPI_connect failed");
-
- /* Fetch or prepare a saved plan for the real check */
- ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
-
- if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
- {
- StringInfoData querybuf;
- char pkrelname[MAX_QUOTED_REL_NAME_LEN];
- char attname[MAX_QUOTED_NAME_LEN];
- char paramname[16];
- const char *querysep;
- Oid queryoids[RI_MAX_NUMKEYS];
- const char *pk_only;
-
- /* ----------
- * The query string built is
- * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
- * FOR KEY SHARE OF x
- * The type id's for the $ parameters are those of the
- * corresponding FK attributes.
- * ----------
- */
- initStringInfo(&querybuf);
- pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
- "" : "ONLY ";
- quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
- pk_only, pkrelname);
- querysep = "WHERE";
- for (int i = 0; i < riinfo->nkeys; i++)
- {
- Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
- Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-
- quoteOneName(attname,
- RIAttName(pk_rel, riinfo->pk_attnums[i]));
- sprintf(paramname, "$%d", i + 1);
- ri_GenerateQual(&querybuf, querysep,
- attname, pk_type,
- riinfo->pf_eq_oprs[i],
- paramname, fk_type);
- querysep = "AND";
- queryoids[i] = fk_type;
- }
- appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
- /* Prepare and save the plan */
- qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
- &qkey, fk_rel, pk_rel);
- }
-
- /*
- * Now check that foreign key exists in PK table
- *
- * XXX detectNewRows must be true when a partitioned table is on the
- * referenced side. The reason is that our snapshot must be fresh in
- * order for the hack in find_inheritance_children() to work.
- */
- ri_PerformCheck(riinfo, &qkey, qplan,
- fk_rel, pk_rel,
- NULL, newslot,
- pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
- SPI_OK_SELECT);
-
- if (SPI_finish() != SPI_OK_FINISH)
- elog(ERROR, "SPI_finish failed");
+ if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo))
+ ri_ReportViolation(riinfo,
+ pk_rel, fk_rel,
+ newslot,
+ NULL,
+ true, false);
table_close(pk_rel, RowShareLock);
@@ -464,81 +666,10 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
TupleTableSlot *oldslot,
const RI_ConstraintInfo *riinfo)
{
- SPIPlanPtr qplan;
- RI_QueryKey qkey;
- bool result;
-
/* Only called for non-null rows */
Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
- if (SPI_connect() != SPI_OK_CONNECT)
- elog(ERROR, "SPI_connect failed");
-
- /*
- * Fetch or prepare a saved plan for checking PK table with values coming
- * from a PK row
- */
- ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
-
- if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
- {
- StringInfoData querybuf;
- char pkrelname[MAX_QUOTED_REL_NAME_LEN];
- char attname[MAX_QUOTED_NAME_LEN];
- char paramname[16];
- const char *querysep;
- const char *pk_only;
- Oid queryoids[RI_MAX_NUMKEYS];
-
- /* ----------
- * The query string built is
- * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
- * FOR KEY SHARE OF x
- * The type id's for the $ parameters are those of the
- * PK attributes themselves.
- * ----------
- */
- initStringInfo(&querybuf);
- pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
- "" : "ONLY ";
- quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
- pk_only, pkrelname);
- querysep = "WHERE";
- for (int i = 0; i < riinfo->nkeys; i++)
- {
- Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-
- quoteOneName(attname,
- RIAttName(pk_rel, riinfo->pk_attnums[i]));
- sprintf(paramname, "$%d", i + 1);
- ri_GenerateQual(&querybuf, querysep,
- attname, pk_type,
- riinfo->pp_eq_oprs[i],
- paramname, pk_type);
- querysep = "AND";
- queryoids[i] = pk_type;
- }
- appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
- /* Prepare and save the plan */
- qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
- &qkey, fk_rel, pk_rel);
- }
-
- /*
- * We have a plan now. Run it.
- */
- result = ri_PerformCheck(riinfo, &qkey, qplan,
- fk_rel, pk_rel,
- oldslot, NULL,
- true, /* treat like update */
- SPI_OK_SELECT);
-
- if (SPI_finish() != SPI_OK_FINISH)
- elog(ERROR, "SPI_finish failed");
-
- return result;
+ return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo);
}
@@ -1608,15 +1739,10 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
errtableconstraint(fk_rel,
NameStr(fake_riinfo.conname))));
- /*
- * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
- * query, which isn't true, but will cause it to use
- * fake_riinfo.fk_attnums as we need.
- */
ri_ReportViolation(&fake_riinfo,
pk_rel, fk_rel,
slot, tupdesc,
- RI_PLAN_CHECK_LOOKUPPK, false);
+ true, false);
ExecDropSingleTupleTableSlot(slot);
}
@@ -1833,7 +1959,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
fake_riinfo.pk_attnums[i] = i + 1;
ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
- slot, tupdesc, 0, true);
+ slot, tupdesc, true, true);
}
if (SPI_finish() != SPI_OK_FINISH)
@@ -1970,9 +2096,8 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
{
/*
* Inherited constraints with a common ancestor can share ri_query_cache
- * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
- * Except in that case, the query processes the other table involved in
- * the FK constraint (i.e., not the table on which the trigger has been
+ * entries, because each query processes the other table involved in the
+ * FK constraint (i.e., not the table on which the trigger has been
* fired), and so it will be the same for all members of the inheritance
* tree. So we may use the root constraint's OID in the hash key, rather
* than the constraint's own OID. This avoids creating duplicate SPI
@@ -1983,13 +2108,13 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
* constraint, because partitions can have different column orders,
* resulting in different pk_attnums[] or fk_attnums[] array contents.)
*
+ * (Note also that for a standalone or non-inherited constraint,
+ * constraint_root_id is same as constraint_id.)
+ *
* We assume struct RI_QueryKey contains no padding bytes, else we'd need
* to use memset to clear them.
*/
- if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
- key->constr_id = riinfo->constraint_root_id;
- else
- key->constr_id = riinfo->constraint_id;
+ key->constr_id = riinfo->constraint_root_id;
key->constr_queryno = constr_queryno;
}
@@ -2260,19 +2385,12 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
{
SPIPlanPtr qplan;
- Relation query_rel;
+
+ /* There are currently no queries that run on PK table. */
+ Relation query_rel = fk_rel;
Oid save_userid;
int save_sec_context;
- /*
- * Use the query type code to determine whether the query is run against
- * the PK or FK table; we'll do the check as that table's owner
- */
- if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
- query_rel = pk_rel;
- else
- query_rel = fk_rel;
-
/* Switch to proper UID to perform check as */
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
@@ -2305,9 +2423,9 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool detectNewRows, int expect_OK)
{
- Relation query_rel,
- source_rel;
- bool source_is_pk;
+ /* There are currently no queries that run on PK table. */
+ Relation query_rel = fk_rel,
+ source_rel = pk_rel;
Snapshot test_snapshot;
Snapshot crosscheck_snapshot;
int limit;
@@ -2317,46 +2435,17 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
Datum vals[RI_MAX_NUMKEYS * 2];
char nulls[RI_MAX_NUMKEYS * 2];
- /*
- * Use the query type code to determine whether the query is run against
- * the PK or FK table; we'll do the check as that table's owner
- */
- if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
- query_rel = pk_rel;
- else
- query_rel = fk_rel;
-
- /*
- * The values for the query are taken from the table on which the trigger
- * is called - it is normally the other one with respect to query_rel. An
- * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
- * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK). We might eventually
- * need some less klugy way to determine this.
- */
- if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
- {
- source_rel = fk_rel;
- source_is_pk = false;
- }
- else
- {
- source_rel = pk_rel;
- source_is_pk = true;
- }
-
/* Extract the parameters to be passed into the query */
if (newslot)
{
- ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
- vals, nulls);
+ ri_ExtractValues(source_rel, newslot, riinfo, true, vals, nulls);
if (oldslot)
- ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
+ ri_ExtractValues(source_rel, oldslot, riinfo, true,
vals + riinfo->nkeys, nulls + riinfo->nkeys);
}
else
{
- ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
- vals, nulls);
+ ri_ExtractValues(source_rel, oldslot, riinfo, true, vals, nulls);
}
/*
@@ -2420,14 +2509,12 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
errhint("This is most likely due to a rule having rewritten the query.")));
/* XXX wouldn't it be clearer to do this part at the caller? */
- if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
- expect_OK == SPI_OK_SELECT &&
- (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
+ if (expect_OK == SPI_OK_SELECT && SPI_processed != 0)
ri_ReportViolation(riinfo,
pk_rel, fk_rel,
newslot ? newslot : oldslot,
NULL,
- qkey->constr_queryno, false);
+ false, false);
return SPI_processed != 0;
}
@@ -2458,9 +2545,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot,
/*
* Produce an error report
*
- * If the failed constraint was on insert/update to the FK table,
- * we want the key names and values extracted from there, and the error
- * message to look like 'key blah is not present in PK'.
+ * If the failed constraint was on insert/update to the FK table (on_fk is
+ * true), we want the key names and values extracted from there, and the
+ * error message to look like 'key blah is not present in PK'.
* Otherwise, the attr names and values come from the PK table and the
* message looks like 'key blah is still referenced from FK'.
*/
@@ -2468,22 +2555,20 @@ static void
ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
- int queryno, bool partgone)
+ bool on_fk, bool partgone)
{
StringInfoData key_names;
StringInfoData key_values;
- bool onfk;
const int16 *attnums;
Oid rel_oid;
AclResult aclresult;
bool has_perm = true;
/*
- * Determine which relation to complain about. If tupdesc wasn't passed
- * by caller, assume the violator tuple came from there.
+ * If tupdesc wasn't passed by caller, assume the violator tuple came from
+ * there.
*/
- onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
- if (onfk)
+ if (on_fk)
{
attnums = riinfo->fk_attnums;
rel_oid = fk_rel->rd_id;
@@ -2585,7 +2670,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
key_names.data, key_values.data,
RelationGetRelationName(fk_rel)),
errtableconstraint(fk_rel, NameStr(riinfo->conname))));
- else if (onfk)
+ else if (on_fk)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -2892,7 +2977,10 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
* ri_HashCompareOp -
*
* See if we know how to compare two values, and create a new hash entry
- * if not.
+ * if not. The entry contains the FmgrInfo of the equality operator function
+ * and that of the cast function, if one is needed to convert the right
+ * operand (whose type OID has been passed) before passing it to the equality
+ * function.
*/
static RI_CompareHashEntry *
ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -2948,8 +3036,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
* moment since that will never be generated for implicit coercions.
*/
op_input_types(eq_opr, &lefttype, &righttype);
- Assert(lefttype == righttype);
- if (typeid == lefttype)
+
+ /*
+ * Don't need to cast if the values that will be passed to the
+ * operator will be of expected operand type(s). The operator can be
+ * cross-type (such as when called by ri_ReferencedKeyExists()), in
+ * which case, we only need the cast if the right operand value
+ * doesn't match the type expected by the operator.
+ */
+ if ((lefttype == righttype && typeid == lefttype) ||
+ (lefttype != righttype && typeid == righttype))
castfunc = InvalidOid; /* simplest case */
else
{