aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/tablecmds.c9
-rw-r--r--src/backend/utils/adt/ri_triggers.c226
-rw-r--r--src/include/commands/trigger.h5
3 files changed, 229 insertions, 11 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3ece4f80f95..395077081a7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.85 2003/10/02 06:36:37 petere Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.86 2003/10/06 16:38:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -3455,6 +3455,13 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint,
int count;
/*
+ * See if we can do it with a single LEFT JOIN query. A FALSE result
+ * indicates we must proceed with the fire-the-trigger method.
+ */
+ if (RI_Initial_Check(fkconstraint, rel, pkrel))
+ return;
+
+ /*
* Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
* as if that tuple had just been inserted. If any of those fail, it
* should ereport(ERROR) and that's that.
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 11b7e84df03..306dda0aa4e 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -17,7 +17,7 @@
*
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
*
- * $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.61 2003/10/01 21:30:52 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.62 2003/10/06 16:38:28 tgl Exp $
*
* ----------
*/
@@ -40,6 +40,7 @@
#include "rewrite/rewriteHandler.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
+#include "utils/acl.h"
#include "miscadmin.h"
@@ -164,7 +165,8 @@ static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
Datum *vals, char *nulls);
static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
Relation pk_rel, Relation fk_rel,
- HeapTuple violator, bool spi_err);
+ HeapTuple violator, TupleDesc tupdesc,
+ bool spi_err);
/* ----------
@@ -2540,7 +2542,205 @@ RI_FKey_keyequal_upd(TriggerData *trigdata)
}
+/* ----------
+ * RI_Initial_Check -
+ *
+ * Check an entire table for non-matching values using a single query.
+ * This is not a trigger procedure, but is called during ALTER TABLE
+ * ADD FOREIGN KEY to validate the initial table contents.
+ *
+ * We expect that an exclusive lock has been taken on rel and pkrel;
+ * hence, we do not need to lock individual rows for the check.
+ *
+ * If the check fails because the current user doesn't have permissions
+ * to read both tables, return false to let our caller know that they will
+ * need to do something else to check the constraint.
+ * ----------
+ */
+bool
+RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
+{
+ const char *constrname = fkconstraint->constr_name;
+ char querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
+ (MAX_QUOTED_NAME_LEN + 32) * ((RI_MAX_NUMKEYS * 4)+1)];
+ char pkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char relname[MAX_QUOTED_REL_NAME_LEN];
+ char attname[MAX_QUOTED_NAME_LEN];
+ char fkattname[MAX_QUOTED_NAME_LEN];
+ const char *sep;
+ List *list;
+ List *list2;
+ int spi_result;
+ void *qplan;
+
+ /*
+ * Check to make sure current user has enough permissions to do the
+ * test query. (If not, caller can fall back to the trigger method,
+ * which works because it changes user IDs on the fly.)
+ *
+ * XXX are there any other show-stopper conditions to check?
+ */
+ if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+ return false;
+ if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+ return false;
+
+ /*----------
+ * The query string built is:
+ * SELECT fk.keycols FROM ONLY relname fk
+ * LEFT OUTER JOIN ONLY pkrelname pk
+ * ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+ * WHERE pk.pkkeycol1 IS NULL AND
+ * For MATCH unspecified:
+ * (fk.keycol1 IS NOT NULL [AND ...])
+ * For MATCH FULL:
+ * (fk.keycol1 IS NOT NULL [OR ...])
+ *----------
+ */
+
+ sprintf(querystr, "SELECT ");
+ sep="";
+ foreach(list, fkconstraint->fk_attrs)
+ {
+ quoteOneName(attname, strVal(lfirst(list)));
+ snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+ "%sfk.%s", sep, attname);
+ sep = ", ";
+ }
+
+ quoteRelationName(pkrelname, pkrel);
+ quoteRelationName(relname, rel);
+ snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+ " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
+ relname, pkrelname);
+
+ sep="";
+ for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs;
+ list != NIL && list2 != NIL;
+ list=lnext(list), list2=lnext(list2))
+ {
+ quoteOneName(attname, strVal(lfirst(list)));
+ quoteOneName(fkattname, strVal(lfirst(list2)));
+ snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+ "%spk.%s=fk.%s",
+ sep, attname, fkattname);
+ sep = " AND ";
+ }
+ /*
+ * It's sufficient to test any one pk attribute for null to detect a
+ * join failure.
+ */
+ quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
+ snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+ ") WHERE pk.%s IS NULL AND (", attname);
+
+ sep="";
+ foreach(list, fkconstraint->fk_attrs)
+ {
+ quoteOneName(attname, strVal(lfirst(list)));
+ snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+ "%sfk.%s IS NOT NULL",
+ sep, attname);
+ switch (fkconstraint->fk_matchtype)
+ {
+ case FKCONSTR_MATCH_UNSPECIFIED:
+ sep=" AND ";
+ break;
+ case FKCONSTR_MATCH_FULL:
+ sep=" OR ";
+ break;
+ case FKCONSTR_MATCH_PARTIAL:
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("MATCH PARTIAL not yet implemented")));
+ break;
+ default:
+ elog(ERROR, "unrecognized match type: %d",
+ fkconstraint->fk_matchtype);
+ break;
+ }
+ }
+ snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+ ")");
+
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+ /*
+ * Generate the plan. We don't need to cache it, and there are no
+ * arguments to the plan.
+ */
+ qplan = SPI_prepare(querystr, 0, NULL);
+
+ if (qplan == NULL)
+ elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
+
+ /*
+ * Run the plan. For safety we force a current query snapshot to be
+ * used. (In serializable mode, this arguably violates serializability,
+ * but we really haven't got much choice.) We need at most one tuple
+ * returned, so pass limit = 1.
+ */
+ spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);
+
+ /* Check result */
+ if (spi_result != SPI_OK_SELECT)
+ elog(ERROR, "SPI_execp_current returned %d", spi_result);
+
+ /* Did we find a tuple violating the constraint? */
+ if (SPI_processed > 0)
+ {
+ HeapTuple tuple = SPI_tuptable->vals[0];
+ TupleDesc tupdesc = SPI_tuptable->tupdesc;
+ int nkeys = length(fkconstraint->fk_attrs);
+ int i;
+ RI_QueryKey qkey;
+
+ /*
+ * If it's MATCH FULL, and there are any nulls in the FK keys,
+ * complain about that rather than the lack of a match. MATCH FULL
+ * disallows partially-null FK rows.
+ */
+ if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
+ {
+ bool isnull = false;
+
+ for (i = 1; i <= nkeys; i++)
+ {
+ (void) SPI_getbinval(tuple, tupdesc, i, &isnull);
+ if (isnull)
+ break;
+ }
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+ errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
+ RelationGetRelationName(rel),
+ constrname),
+ errdetail("MATCH FULL does not allow mixing of null and nonnull key values.")));
+ }
+
+ /*
+ * Although we didn't cache the query, we need to set up a fake
+ * query key to pass to ri_ReportViolation.
+ */
+ MemSet(&qkey, 0, sizeof(qkey));
+ qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
+ qkey.nkeypairs = nkeys;
+ for (i = 0; i < nkeys; i++)
+ qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
+
+ ri_ReportViolation(&qkey, constrname,
+ pkrel, rel,
+ tuple, tupdesc,
+ false);
+ }
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ return true;
+}
/* ----------
@@ -2782,6 +2982,9 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
/* Create the plan */
qplan = SPI_prepare(querystr, nargs, argtypes);
+ if (qplan == NULL)
+ elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
+
/* Restore UID */
SetUserId(save_uid);
@@ -2905,6 +3108,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
ri_ReportViolation(qkey, constrname ? constrname : "",
pk_rel, fk_rel,
new_tuple ? new_tuple : old_tuple,
+ NULL,
true);
/* XXX wouldn't it be clearer to do this part at the caller? */
@@ -2913,6 +3117,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
ri_ReportViolation(qkey, constrname,
pk_rel, fk_rel,
new_tuple ? new_tuple : old_tuple,
+ NULL,
false);
return SPI_processed != 0;
@@ -2950,7 +3155,8 @@ ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
static void
ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
Relation pk_rel, Relation fk_rel,
- HeapTuple violator, bool spi_err)
+ HeapTuple violator, TupleDesc tupdesc,
+ bool spi_err)
{
#define BUFLENGTH 512
char key_names[BUFLENGTH];
@@ -2958,7 +3164,6 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
char *name_ptr = key_names;
char *val_ptr = key_values;
bool onfk;
- Relation rel;
int idx,
key_idx;
@@ -2972,18 +3177,21 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
errhint("This is most likely due to a rule having rewritten the query.")));
/*
- * rel is set to where the tuple description is coming from.
+ * Determine which relation to complain about. If tupdesc wasn't
+ * passed by caller, assume the violator tuple came from there.
*/
onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
if (onfk)
{
- rel = fk_rel;
key_idx = RI_KEYPAIR_FK_IDX;
+ if (tupdesc == NULL)
+ tupdesc = fk_rel->rd_att;
}
else
{
- rel = pk_rel;
key_idx = RI_KEYPAIR_PK_IDX;
+ if (tupdesc == NULL)
+ tupdesc = pk_rel->rd_att;
}
/*
@@ -3008,8 +3216,8 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
char *name,
*val;
- name = SPI_fname(rel->rd_att, fnum);
- val = SPI_getvalue(violator, rel->rd_att, fnum);
+ name = SPI_fname(tupdesc, fnum);
+ val = SPI_getvalue(violator, tupdesc, fnum);
if (!val)
val = "null";
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index d3aa9e385af..fe8f6af61d9 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: trigger.h,v 1.43 2003/08/04 02:40:13 momjian Exp $
+ * $Id: trigger.h,v 1.44 2003/10/06 16:38:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -197,5 +197,8 @@ extern void DeferredTriggerSetState(ConstraintsSetStmt *stmt);
* in utils/adt/ri_triggers.c
*/
extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
+extern bool RI_Initial_Check(FkConstraint *fkconstraint,
+ Relation rel,
+ Relation pkrel);
#endif /* TRIGGER_H */