diff options
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r-- | src/backend/utils/adt/ri_triggers.c | 263 |
1 files changed, 248 insertions, 15 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 72f8a9d69cf..095334b3363 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -50,6 +50,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rls.h" +#include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot, Datum *vals, char *nulls); static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, - TupleTableSlot *violator, TupleDesc tupdesc, - int queryno) pg_attribute_noreturn(); + TupleTableSlot *violatorslot, TupleDesc tupdesc, + int queryno, bool partgone) pg_attribute_noreturn(); /* @@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata) char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; + const char *pk_only; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; + const char *pk_only; Oid queryoids[RI_MAX_NUMKEYS]; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RangeTblEntry *fkrte; const char *sep; const char *fk_only; + const char *pk_only; int save_nestlevel; char workmembuf[32]; int spi_result; @@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) /*---------- * The query string built is: * SELECT fk.keycols FROM [ONLY] relname fk - * LEFT OUTER JOIN ONLY pkrelname pk + * LEFT OUTER JOIN [ONLY] pkrelname pk * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH SIMPLE: @@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) quoteRelationName(fkrelname, fk_rel); fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; appendStringInfo(&querybuf, - " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON", - fk_only, fkrelname, pkrelname); + " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON", + fk_only, fkrelname, pk_only, pkrelname); strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); @@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, slot, tupdesc, - RI_PLAN_CHECK_LOOKUPPK); + RI_PLAN_CHECK_LOOKUPPK, false); ExecDropSingleTupleTableSlot(slot); } @@ -1546,6 +1558,214 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) return true; } +/* + * RI_PartitionRemove_Check - + * + * Verify no referencing values exist, when a partition is detached on + * the referenced side of a foreign key constraint. + */ +void +RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) +{ + const RI_ConstraintInfo *riinfo; + StringInfoData querybuf; + char *constraintDef; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; + char fkrelname[MAX_QUOTED_REL_NAME_LEN]; + char pkattname[MAX_QUOTED_NAME_LEN + 3]; + char fkattname[MAX_QUOTED_NAME_LEN + 3]; + const char *sep; + const char *fk_only; + int save_nestlevel; + char workmembuf[32]; + int spi_result; + SPIPlanPtr qplan; + int i; + + riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); + + /* + * We don't check permissions before displaying the error message, on the + * assumption that the user detaching the partition must have enough + * privileges to examine the table contents anyhow. + */ + + /*---------- + * The query string built is: + * SELECT fk.keycols FROM [ONLY] relname fk + * JOIN pkrelname pk + * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) + * WHERE (<partition constraint>) AND + * For MATCH SIMPLE: + * (fk.keycol1 IS NOT NULL [AND ...]) + * For MATCH FULL: + * (fk.keycol1 IS NOT NULL [OR ...]) + * + * We attach COLLATE clauses to the operators when comparing columns + * that have different collations. + *---------- + */ + initStringInfo(&querybuf); + appendStringInfoString(&querybuf, "SELECT "); + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname); + sep = ", "; + } + + quoteRelationName(pkrelname, pk_rel); + quoteRelationName(fkrelname, fk_rel); + fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; + appendStringInfo(&querybuf, + " FROM %s%s fk JOIN %s pk ON", + fk_only, fkrelname, pkrelname); + strcpy(pkattname, "pk."); + strcpy(fkattname, "fk."); + sep = "("; + for (i = 0; i < riinfo->nkeys; i++) + { + Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]); + Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]); + Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]); + + quoteOneName(pkattname + 3, + RIAttName(pk_rel, riinfo->pk_attnums[i])); + quoteOneName(fkattname + 3, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + ri_GenerateQual(&querybuf, sep, + pkattname, pk_type, + riinfo->pf_eq_oprs[i], + fkattname, fk_type); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&querybuf, pk_coll); + sep = "AND"; + } + + /* + * Start the WHERE clause with the partition constraint (except if this is + * the default partition and there's no other partition, because the + * partition constraint is the empty string in that case.) + */ + constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk"); + if (constraintDef && constraintDef[0] != '\0') + appendStringInfo(&querybuf, ") WHERE %s AND (", + constraintDef); + else + appendStringInfo(&querybuf, ") WHERE ("); + + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, + "%sfk.%s IS NOT NULL", + sep, fkattname); + switch (riinfo->confmatchtype) + { + case FKCONSTR_MATCH_SIMPLE: + sep = " AND "; + break; + case FKCONSTR_MATCH_FULL: + sep = " OR "; + break; + } + } + appendStringInfoChar(&querybuf, ')'); + + /* + * Temporarily increase work_mem so that the check query can be executed + * more efficiently. It seems okay to do this because the query is simple + * enough to not use a multiple of work_mem, and one typically would not + * have many large foreign-key validations happening concurrently. So + * this seems to meet the criteria for being considered a "maintenance" + * operation, and accordingly we use maintenance_work_mem. + * + * We use the equivalent of a function SET option to allow the setting to + * persist for exactly the duration of the check query. guc.c also takes + * care of undoing the setting on error. + */ + save_nestlevel = NewGUCNestLevel(); + + snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem); + (void) set_config_option("work_mem", workmembuf, + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* + * Generate the plan. We don't need to cache it, and there are no + * arguments to the plan. + */ + qplan = SPI_prepare(querybuf.data, 0, NULL); + + if (qplan == NULL) + elog(ERROR, "SPI_prepare returned %s for %s", + SPI_result_code_string(SPI_result), querybuf.data); + + /* + * Run the plan. For safety we force a current snapshot to be used. (In + * transaction-snapshot mode, this arguably violates transaction isolation + * rules, but we really haven't got much choice.) We don't need to + * register the snapshot, because SPI_execute_snapshot will see to it. We + * need at most one tuple returned, so pass limit = 1. + */ + spi_result = SPI_execute_snapshot(qplan, + NULL, NULL, + GetLatestSnapshot(), + InvalidSnapshot, + true, false, 1); + + /* Check result */ + if (spi_result != SPI_OK_SELECT) + elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result)); + + /* Did we find a tuple that would violate the constraint? */ + if (SPI_processed > 0) + { + TupleTableSlot *slot; + HeapTuple tuple = SPI_tuptable->vals[0]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + RI_ConstraintInfo fake_riinfo; + + slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + + heap_deform_tuple(tuple, tupdesc, + slot->tts_values, slot->tts_isnull); + ExecStoreVirtualTuple(slot); + + /* + * The columns to look at in the result tuple are 1..N, not whatever + * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation + * will behave properly. + * + * In addition to this, we have to pass the correct tupdesc to + * ri_ReportViolation, overriding its normal habit of using the pk_rel + * or fk_rel's tupdesc. + */ + memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo)); + for (i = 0; i < fake_riinfo.nkeys; i++) + fake_riinfo.pk_attnums[i] = i + 1; + + ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, + slot, tupdesc, 0, true); + } + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + /* + * Restore work_mem. + */ + AtEOXact_GUC(true, save_nestlevel); +} + /* ---------- * Local functions below @@ -2078,7 +2298,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, pk_rel, fk_rel, newslot ? newslot : oldslot, NULL, - qkey->constr_queryno); + qkey->constr_queryno, false); return SPI_processed != 0; } @@ -2119,7 +2339,7 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, TupleTableSlot *violatorslot, TupleDesc tupdesc, - int queryno) + int queryno, bool partgone) { StringInfoData key_names; StringInfoData key_values; @@ -2158,9 +2378,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, * * Check table-level permissions next and, failing that, column-level * privileges. + * + * When a partition at the referenced side is being detached/dropped, we + * needn't check, since the user must be the table owner anyway. */ - - if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) + if (partgone) + has_perm = true; + else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) { aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT); if (aclresult != ACLCHECK_OK) @@ -2222,7 +2446,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, } } - if (onfk) + if (partgone) + ereport(ERROR, + (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), + errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"", + RelationGetRelationName(pk_rel), + NameStr(riinfo->conname)), + errdetail("Key (%s)=(%s) still referenced from table \"%s\".", + key_names.data, key_values.data, + RelationGetRelationName(fk_rel)))); + else if (onfk) ereport(ERROR, (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", |