aboutsummaryrefslogtreecommitdiff
path: root/src/backend/catalog/pg_constraint.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/catalog/pg_constraint.c')
-rw-r--r--src/backend/catalog/pg_constraint.c237
1 files changed, 237 insertions, 0 deletions
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 4f1a27a7d34..153522782d4 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -26,6 +26,7 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
+#include "commands/tablecmds.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
return conOid;
}
+/*
+ * CloneForeignKeyConstraints
+ * Clone foreign keys from a partitioned table to a newly acquired
+ * partition.
+ *
+ * relationId is a partition of parentId, so we can be certain that it has the
+ * same columns with the same datatypes. The columns may be in different
+ * order, though.
+ *
+ * The *cloned list is appended ClonedConstraint elements describing what was
+ * created.
+ */
+void
+CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+{
+ Relation pg_constraint;
+ Relation parentRel;
+ Relation rel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ AttrNumber *attmap;
+
+ parentRel = heap_open(parentId, NoLock); /* already got lock */
+ /* see ATAddForeignKeyConstraint about lock level */
+ rel = heap_open(relationId, AccessExclusiveLock);
+
+ pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+ tupdesc = RelationGetDescr(pg_constraint);
+
+ /*
+ * The constraint key may differ, if the columns in the partition are
+ * different. This map is used to convert them.
+ */
+ attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
+ RelationGetDescr(parentRel),
+ gettext_noop("could not convert row type"));
+
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(parentId));
+ scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
+ NULL, 1, &key);
+
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+ AttrNumber conkey[INDEX_MAX_KEYS];
+ AttrNumber mapped_conkey[INDEX_MAX_KEYS];
+ AttrNumber confkey[INDEX_MAX_KEYS];
+ Oid conpfeqop[INDEX_MAX_KEYS];
+ Oid conppeqop[INDEX_MAX_KEYS];
+ Oid conffeqop[INDEX_MAX_KEYS];
+ Constraint *fkconstraint;
+ ClonedConstraint *newc;
+ Oid constrOid;
+ ObjectAddress parentAddr,
+ childAddr;
+ int nelem;
+ int i;
+ ArrayType *arr;
+ Datum datum;
+ bool isnull;
+
+ /* only foreign keys */
+ if (constrForm->contype != CONSTRAINT_FOREIGN)
+ continue;
+
+ ObjectAddressSet(parentAddr, ConstraintRelationId,
+ HeapTupleGetOid(tuple));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conkey");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "conkey is not a 1-D smallint array");
+ memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
+
+ for (i = 0; i < nelem; i++)
+ mapped_conkey[i] = attmap[conkey[i] - 1];
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null confkey");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "confkey is not a 1-D smallint array");
+ memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conpfeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conpfeqop is not a 1-D OID array");
+ memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conpfeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conpfeqop is not a 1-D OID array");
+ memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conppeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conppeqop is not a 1-D OID array");
+ memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conffeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conffeqop is not a 1-D OID array");
+ memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ constrOid =
+ CreateConstraintEntry(NameStr(constrForm->conname),
+ constrForm->connamespace,
+ CONSTRAINT_FOREIGN,
+ constrForm->condeferrable,
+ constrForm->condeferred,
+ constrForm->convalidated,
+ HeapTupleGetOid(tuple),
+ relationId,
+ mapped_conkey,
+ nelem,
+ InvalidOid, /* not a domain constraint */
+ constrForm->conindid, /* same index */
+ constrForm->confrelid, /* same foreign rel */
+ confkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ nelem,
+ constrForm->confupdtype,
+ constrForm->confdeltype,
+ constrForm->confmatchtype,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ false,
+ 1, false, true);
+
+ ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
+ recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
+
+ fkconstraint = makeNode(Constraint);
+ /* for now this is all we need */
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+
+ createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
+ constrOid, constrForm->conindid, false);
+
+ if (cloned)
+ {
+ /*
+ * Feed back caller about the constraints we created, so that they can
+ * set up constraint verification.
+ */
+ newc = palloc(sizeof(ClonedConstraint));
+ newc->relid = relationId;
+ newc->refrelid = constrForm->confrelid;
+ newc->conindid = constrForm->conindid;
+ newc->conid = constrOid;
+ newc->constraint = fkconstraint;
+
+ *cloned = lappend(*cloned, newc);
+ }
+ }
+ systable_endscan(scan);
+
+ pfree(attmap);
+
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+ int i;
+
+ for (i = 0; i < partdesc->nparts; i++)
+ CloneForeignKeyConstraints(RelationGetRelid(rel),
+ partdesc->oids[i],
+ cloned);
+ }
+
+ heap_close(rel, NoLock); /* keep lock till commit */
+ heap_close(parentRel, NoLock);
+ heap_close(pg_constraint, RowShareLock);
+}
/*
* Test whether given name is currently used as a constraint name