aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/adt/ri_triggers.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r--src/backend/utils/adt/ri_triggers.c263
1 files changed, 248 insertions, 15 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 72f8a9d69cf..095334b3363 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -50,6 +50,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rls.h"
+#include "utils/ruleutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
Datum *vals, char *nulls);
static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
- TupleTableSlot *violator, TupleDesc tupdesc,
- int queryno) pg_attribute_noreturn();
+ TupleTableSlot *violatorslot, TupleDesc tupdesc,
+ int queryno, bool partgone) pg_attribute_noreturn();
/*
@@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
+ const char *pk_only;
/* ----------
* The query string built is
- * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+ * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* corresponding FK attributes.
* ----------
*/
initStringInfo(&querybuf);
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
@@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
char attname[MAX_QUOTED_NAME_LEN];
char paramname[16];
const char *querysep;
+ const char *pk_only;
Oid queryoids[RI_MAX_NUMKEYS];
/* ----------
* The query string built is
- * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+ * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* PK attributes themselves.
* ----------
*/
initStringInfo(&querybuf);
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
@@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
RangeTblEntry *fkrte;
const char *sep;
const char *fk_only;
+ const char *pk_only;
int save_nestlevel;
char workmembuf[32];
int spi_result;
@@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
/*----------
* The query string built is:
* SELECT fk.keycols FROM [ONLY] relname fk
- * LEFT OUTER JOIN ONLY pkrelname pk
+ * LEFT OUTER JOIN [ONLY] pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE pk.pkkeycol1 IS NULL AND
* For MATCH SIMPLE:
@@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
quoteRelationName(fkrelname, fk_rel);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
appendStringInfo(&querybuf,
- " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
- fk_only, fkrelname, pkrelname);
+ " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
+ fk_only, fkrelname, pk_only, pkrelname);
strcpy(pkattname, "pk.");
strcpy(fkattname, "fk.");
@@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
ri_ReportViolation(&fake_riinfo,
pk_rel, fk_rel,
slot, tupdesc,
- RI_PLAN_CHECK_LOOKUPPK);
+ RI_PLAN_CHECK_LOOKUPPK, false);
ExecDropSingleTupleTableSlot(slot);
}
@@ -1546,6 +1558,214 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
return true;
}
+/*
+ * RI_PartitionRemove_Check -
+ *
+ * Verify no referencing values exist, when a partition is detached on
+ * the referenced side of a foreign key constraint.
+ */
+void
+RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
+{
+ const RI_ConstraintInfo *riinfo;
+ StringInfoData querybuf;
+ char *constraintDef;
+ char pkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char fkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char pkattname[MAX_QUOTED_NAME_LEN + 3];
+ char fkattname[MAX_QUOTED_NAME_LEN + 3];
+ const char *sep;
+ const char *fk_only;
+ int save_nestlevel;
+ char workmembuf[32];
+ int spi_result;
+ SPIPlanPtr qplan;
+ int i;
+
+ riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+ /*
+ * We don't check permissions before displaying the error message, on the
+ * assumption that the user detaching the partition must have enough
+ * privileges to examine the table contents anyhow.
+ */
+
+ /*----------
+ * The query string built is:
+ * SELECT fk.keycols FROM [ONLY] relname fk
+ * JOIN pkrelname pk
+ * ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+ * WHERE (<partition constraint>) AND
+ * For MATCH SIMPLE:
+ * (fk.keycol1 IS NOT NULL [AND ...])
+ * For MATCH FULL:
+ * (fk.keycol1 IS NOT NULL [OR ...])
+ *
+ * We attach COLLATE clauses to the operators when comparing columns
+ * that have different collations.
+ *----------
+ */
+ initStringInfo(&querybuf);
+ appendStringInfoString(&querybuf, "SELECT ");
+ sep = "";
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ quoteOneName(fkattname,
+ RIAttName(fk_rel, riinfo->fk_attnums[i]));
+ appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
+ sep = ", ";
+ }
+
+ quoteRelationName(pkrelname, pk_rel);
+ quoteRelationName(fkrelname, fk_rel);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
+ appendStringInfo(&querybuf,
+ " FROM %s%s fk JOIN %s pk ON",
+ fk_only, fkrelname, pkrelname);
+ strcpy(pkattname, "pk.");
+ strcpy(fkattname, "fk.");
+ sep = "(";
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+ Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+ Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+ Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+ quoteOneName(pkattname + 3,
+ RIAttName(pk_rel, riinfo->pk_attnums[i]));
+ quoteOneName(fkattname + 3,
+ RIAttName(fk_rel, riinfo->fk_attnums[i]));
+ ri_GenerateQual(&querybuf, sep,
+ pkattname, pk_type,
+ riinfo->pf_eq_oprs[i],
+ fkattname, fk_type);
+ if (pk_coll != fk_coll)
+ ri_GenerateQualCollation(&querybuf, pk_coll);
+ sep = "AND";
+ }
+
+ /*
+ * Start the WHERE clause with the partition constraint (except if this is
+ * the default partition and there's no other partition, because the
+ * partition constraint is the empty string in that case.)
+ */
+ constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
+ if (constraintDef && constraintDef[0] != '\0')
+ appendStringInfo(&querybuf, ") WHERE %s AND (",
+ constraintDef);
+ else
+ appendStringInfo(&querybuf, ") WHERE (");
+
+ sep = "";
+ for (i = 0; i < riinfo->nkeys; i++)
+ {
+ quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
+ appendStringInfo(&querybuf,
+ "%sfk.%s IS NOT NULL",
+ sep, fkattname);
+ switch (riinfo->confmatchtype)
+ {
+ case FKCONSTR_MATCH_SIMPLE:
+ sep = " AND ";
+ break;
+ case FKCONSTR_MATCH_FULL:
+ sep = " OR ";
+ break;
+ }
+ }
+ appendStringInfoChar(&querybuf, ')');
+
+ /*
+ * Temporarily increase work_mem so that the check query can be executed
+ * more efficiently. It seems okay to do this because the query is simple
+ * enough to not use a multiple of work_mem, and one typically would not
+ * have many large foreign-key validations happening concurrently. So
+ * this seems to meet the criteria for being considered a "maintenance"
+ * operation, and accordingly we use maintenance_work_mem.
+ *
+ * We use the equivalent of a function SET option to allow the setting to
+ * persist for exactly the duration of the check query. guc.c also takes
+ * care of undoing the setting on error.
+ */
+ save_nestlevel = NewGUCNestLevel();
+
+ snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
+ (void) set_config_option("work_mem", workmembuf,
+ PGC_USERSET, PGC_S_SESSION,
+ GUC_ACTION_SAVE, true, 0, false);
+
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ /*
+ * Generate the plan. We don't need to cache it, and there are no
+ * arguments to the plan.
+ */
+ qplan = SPI_prepare(querybuf.data, 0, NULL);
+
+ if (qplan == NULL)
+ elog(ERROR, "SPI_prepare returned %s for %s",
+ SPI_result_code_string(SPI_result), querybuf.data);
+
+ /*
+ * Run the plan. For safety we force a current snapshot to be used. (In
+ * transaction-snapshot mode, this arguably violates transaction isolation
+ * rules, but we really haven't got much choice.) We don't need to
+ * register the snapshot, because SPI_execute_snapshot will see to it. We
+ * need at most one tuple returned, so pass limit = 1.
+ */
+ spi_result = SPI_execute_snapshot(qplan,
+ NULL, NULL,
+ GetLatestSnapshot(),
+ InvalidSnapshot,
+ true, false, 1);
+
+ /* Check result */
+ if (spi_result != SPI_OK_SELECT)
+ elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+
+ /* Did we find a tuple that would violate the constraint? */
+ if (SPI_processed > 0)
+ {
+ TupleTableSlot *slot;
+ HeapTuple tuple = SPI_tuptable->vals[0];
+ TupleDesc tupdesc = SPI_tuptable->tupdesc;
+ RI_ConstraintInfo fake_riinfo;
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ heap_deform_tuple(tuple, tupdesc,
+ slot->tts_values, slot->tts_isnull);
+ ExecStoreVirtualTuple(slot);
+
+ /*
+ * The columns to look at in the result tuple are 1..N, not whatever
+ * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation
+ * will behave properly.
+ *
+ * In addition to this, we have to pass the correct tupdesc to
+ * ri_ReportViolation, overriding its normal habit of using the pk_rel
+ * or fk_rel's tupdesc.
+ */
+ memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
+ for (i = 0; i < fake_riinfo.nkeys; i++)
+ fake_riinfo.pk_attnums[i] = i + 1;
+
+ ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
+ slot, tupdesc, 0, true);
+ }
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ /*
+ * Restore work_mem.
+ */
+ AtEOXact_GUC(true, save_nestlevel);
+}
+
/* ----------
* Local functions below
@@ -2078,7 +2298,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
pk_rel, fk_rel,
newslot ? newslot : oldslot,
NULL,
- qkey->constr_queryno);
+ qkey->constr_queryno, false);
return SPI_processed != 0;
}
@@ -2119,7 +2339,7 @@ static void
ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
- int queryno)
+ int queryno, bool partgone)
{
StringInfoData key_names;
StringInfoData key_values;
@@ -2158,9 +2378,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
*
* Check table-level permissions next and, failing that, column-level
* privileges.
+ *
+ * When a partition at the referenced side is being detached/dropped, we
+ * needn't check, since the user must be the table owner anyway.
*/
-
- if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
+ if (partgone)
+ has_perm = true;
+ else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
{
aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
if (aclresult != ACLCHECK_OK)
@@ -2222,7 +2446,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
}
}
- if (onfk)
+ if (partgone)
+ ereport(ERROR,
+ (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+ errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
+ RelationGetRelationName(pk_rel),
+ NameStr(riinfo->conname)),
+ errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
+ key_names.data, key_values.data,
+ RelationGetRelationName(fk_rel))));
+ else if (onfk)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",