diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2019-04-03 14:38:20 -0300 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2019-04-03 14:40:21 -0300 |
commit | f56f8f8da6afd8523b4d5284e02a20ed2b33ef8d (patch) | |
tree | e5f59afa60601ff9c2e92d7746df6dba57b73c99 /src/backend/utils/adt/ri_triggers.c | |
parent | 9155580fd5fc2a0cbb23376dfca7cd21f59c2c7b (diff) | |
download | postgresql-f56f8f8da6afd8523b4d5284e02a20ed2b33ef8d.tar.gz postgresql-f56f8f8da6afd8523b4d5284e02a20ed2b33ef8d.zip |
Support foreign keys that reference partitioned tables
Previously, while primary keys could be made on partitioned tables, it
was not possible to define foreign keys that reference those primary
keys. Now it is possible to do that.
Author: Álvaro Herrera
Reviewed-by: Amit Langote, Jesper Pedersen
Discussion: https://postgr.es/m/20181102234158.735b3fevta63msbj@alvherre.pgsql
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r-- | src/backend/utils/adt/ri_triggers.c | 263 |
1 files changed, 248 insertions, 15 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 72f8a9d69cf..095334b3363 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -50,6 +50,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rls.h" +#include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot, Datum *vals, char *nulls); static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, - TupleTableSlot *violator, TupleDesc tupdesc, - int queryno) pg_attribute_noreturn(); + TupleTableSlot *violatorslot, TupleDesc tupdesc, + int queryno, bool partgone) pg_attribute_noreturn(); /* @@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata) char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; + const char *pk_only; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; + const char *pk_only; Oid queryoids[RI_MAX_NUMKEYS]; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RangeTblEntry *fkrte; const char *sep; const char *fk_only; + const char *pk_only; int save_nestlevel; char workmembuf[32]; int spi_result; @@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) /*---------- * The query string built is: * SELECT fk.keycols FROM [ONLY] relname fk - * LEFT OUTER JOIN ONLY pkrelname pk + * LEFT OUTER JOIN [ONLY] pkrelname pk * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH SIMPLE: @@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) quoteRelationName(fkrelname, fk_rel); fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; appendStringInfo(&querybuf, - " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON", - fk_only, fkrelname, pkrelname); + " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON", + fk_only, fkrelname, pk_only, pkrelname); strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); @@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, slot, tupdesc, - RI_PLAN_CHECK_LOOKUPPK); + RI_PLAN_CHECK_LOOKUPPK, false); ExecDropSingleTupleTableSlot(slot); } @@ -1546,6 +1558,214 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) return true; } +/* + * RI_PartitionRemove_Check - + * + * Verify no referencing values exist, when a partition is detached on + * the referenced side of a foreign key constraint. + */ +void +RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) +{ + const RI_ConstraintInfo *riinfo; + StringInfoData querybuf; + char *constraintDef; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; + char fkrelname[MAX_QUOTED_REL_NAME_LEN]; + char pkattname[MAX_QUOTED_NAME_LEN + 3]; + char fkattname[MAX_QUOTED_NAME_LEN + 3]; + const char *sep; + const char *fk_only; + int save_nestlevel; + char workmembuf[32]; + int spi_result; + SPIPlanPtr qplan; + int i; + + riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); + + /* + * We don't check permissions before displaying the error message, on the + * assumption that the user detaching the partition must have enough + * privileges to examine the table contents anyhow. + */ + + /*---------- + * The query string built is: + * SELECT fk.keycols FROM [ONLY] relname fk + * JOIN pkrelname pk + * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) + * WHERE (<partition constraint>) AND + * For MATCH SIMPLE: + * (fk.keycol1 IS NOT NULL [AND ...]) + * For MATCH FULL: + * (fk.keycol1 IS NOT NULL [OR ...]) + * + * We attach COLLATE clauses to the operators when comparing columns + * that have different collations. + *---------- + */ + initStringInfo(&querybuf); + appendStringInfoString(&querybuf, "SELECT "); + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname); + sep = ", "; + } + + quoteRelationName(pkrelname, pk_rel); + quoteRelationName(fkrelname, fk_rel); + fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; + appendStringInfo(&querybuf, + " FROM %s%s fk JOIN %s pk ON", + fk_only, fkrelname, pkrelname); + strcpy(pkattname, "pk."); + strcpy(fkattname, "fk."); + sep = "("; + for (i = 0; i < riinfo->nkeys; i++) + { + Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]); + Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]); + Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]); + + quoteOneName(pkattname + 3, + RIAttName(pk_rel, riinfo->pk_attnums[i])); + quoteOneName(fkattname + 3, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + ri_GenerateQual(&querybuf, sep, + pkattname, pk_type, + riinfo->pf_eq_oprs[i], + fkattname, fk_type); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&querybuf, pk_coll); + sep = "AND"; + } + + /* + * Start the WHERE clause with the partition constraint (except if this is + * the default partition and there's no other partition, because the + * partition constraint is the empty string in that case.) + */ + constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk"); + if (constraintDef && constraintDef[0] != '\0') + appendStringInfo(&querybuf, ") WHERE %s AND (", + constraintDef); + else + appendStringInfo(&querybuf, ") WHERE ("); + + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, + "%sfk.%s IS NOT NULL", + sep, fkattname); + switch (riinfo->confmatchtype) + { + case FKCONSTR_MATCH_SIMPLE: + sep = " AND "; + break; + case FKCONSTR_MATCH_FULL: + sep = " OR "; + break; + } + } + appendStringInfoChar(&querybuf, ')'); + + /* + * Temporarily increase work_mem so that the check query can be executed + * more efficiently. It seems okay to do this because the query is simple + * enough to not use a multiple of work_mem, and one typically would not + * have many large foreign-key validations happening concurrently. So + * this seems to meet the criteria for being considered a "maintenance" + * operation, and accordingly we use maintenance_work_mem. + * + * We use the equivalent of a function SET option to allow the setting to + * persist for exactly the duration of the check query. guc.c also takes + * care of undoing the setting on error. + */ + save_nestlevel = NewGUCNestLevel(); + + snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem); + (void) set_config_option("work_mem", workmembuf, + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* + * Generate the plan. We don't need to cache it, and there are no + * arguments to the plan. + */ + qplan = SPI_prepare(querybuf.data, 0, NULL); + + if (qplan == NULL) + elog(ERROR, "SPI_prepare returned %s for %s", + SPI_result_code_string(SPI_result), querybuf.data); + + /* + * Run the plan. For safety we force a current snapshot to be used. (In + * transaction-snapshot mode, this arguably violates transaction isolation + * rules, but we really haven't got much choice.) We don't need to + * register the snapshot, because SPI_execute_snapshot will see to it. We + * need at most one tuple returned, so pass limit = 1. + */ + spi_result = SPI_execute_snapshot(qplan, + NULL, NULL, + GetLatestSnapshot(), + InvalidSnapshot, + true, false, 1); + + /* Check result */ + if (spi_result != SPI_OK_SELECT) + elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result)); + + /* Did we find a tuple that would violate the constraint? */ + if (SPI_processed > 0) + { + TupleTableSlot *slot; + HeapTuple tuple = SPI_tuptable->vals[0]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + RI_ConstraintInfo fake_riinfo; + + slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + + heap_deform_tuple(tuple, tupdesc, + slot->tts_values, slot->tts_isnull); + ExecStoreVirtualTuple(slot); + + /* + * The columns to look at in the result tuple are 1..N, not whatever + * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation + * will behave properly. + * + * In addition to this, we have to pass the correct tupdesc to + * ri_ReportViolation, overriding its normal habit of using the pk_rel + * or fk_rel's tupdesc. + */ + memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo)); + for (i = 0; i < fake_riinfo.nkeys; i++) + fake_riinfo.pk_attnums[i] = i + 1; + + ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, + slot, tupdesc, 0, true); + } + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + /* + * Restore work_mem. + */ + AtEOXact_GUC(true, save_nestlevel); +} + /* ---------- * Local functions below @@ -2078,7 +2298,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, pk_rel, fk_rel, newslot ? newslot : oldslot, NULL, - qkey->constr_queryno); + qkey->constr_queryno, false); return SPI_processed != 0; } @@ -2119,7 +2339,7 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, TupleTableSlot *violatorslot, TupleDesc tupdesc, - int queryno) + int queryno, bool partgone) { StringInfoData key_names; StringInfoData key_values; @@ -2158,9 +2378,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, * * Check table-level permissions next and, failing that, column-level * privileges. + * + * When a partition at the referenced side is being detached/dropped, we + * needn't check, since the user must be the table owner anyway. */ - - if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) + if (partgone) + has_perm = true; + else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) { aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT); if (aclresult != ACLCHECK_OK) @@ -2222,7 +2446,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, } } - if (onfk) + if (partgone) + ereport(ERROR, + (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), + errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"", + RelationGetRelationName(pk_rel), + NameStr(riinfo->conname)), + errdetail("Key (%s)=(%s) still referenced from table \"%s\".", + key_names.data, key_values.data, + RelationGetRelationName(fk_rel)))); + else if (onfk) ereport(ERROR, (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", |