aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2011-06-01 18:43:50 -0400
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2011-06-30 11:24:31 -0400
commit897795240cfaaed724af2f53ed2c50c9862f951f (patch)
treea646222fe29936f565e715a1cce3a65016587057 /src/backend/commands/tablecmds.c
parentb36927fbe922d1aac5d6e42c04eecf65bf37f5f3 (diff)
downloadpostgresql-897795240cfaaed724af2f53ed2c50c9862f951f.tar.gz
postgresql-897795240cfaaed724af2f53ed2c50c9862f951f.zip
Enable CHECK constraints to be declared NOT VALID
This means that they can initially be added to a large existing table without checking its initial contents, but new tuples must comply to them; a separate pass invoked by ALTER TABLE / VALIDATE can verify existing data and ensure it complies with the constraint, at which point it is marked validated and becomes a normal part of the table ecosystem. An non-validated CHECK constraint is ignored in the planner for constraint_exclusion purposes; when validated, cached plans are recomputed so that partitioning starts working right away. This patch also enables domains to have unvalidated CHECK constraints attached to them as well by way of ALTER DOMAIN / ADD CONSTRAINT / NOT VALID, which can later be validated with ALTER DOMAIN / VALIDATE CONSTRAINT. Thanks to Thom Brown, Dean Rasheed and Jaime Casanova for the various reviews, and Robert Hass for documentation wording improvement suggestions. This patch was sponsored by Enova Financial.
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c236
1 files changed, 202 insertions, 34 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b2ba11cc781..cfc685b9499 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel,
static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid,
const char *newNspName, LOCKMODE lockmode);
-static void ATExecValidateConstraint(Relation rel, char *constrName);
+static void ATExecValidateConstraint(Relation rel, char *constrName,
+ bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
Oid *opclasses);
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
+static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
@@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
cooked->name = NULL;
cooked->attnum = attnum;
cooked->expr = colDef->cooked_default;
+ cooked->skip_validation = false;
cooked->is_local = true; /* not used for defaults */
cooked->inhcount = 0; /* ditto */
cookedDefaults = lappend(cookedDefaults, cooked);
@@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
cooked->name = pstrdup(name);
cooked->attnum = 0; /* not used for constraints */
cooked->expr = expr;
+ cooked->skip_validation = false;
cooked->is_local = false;
cooked->inhcount = 1;
constraints = lappend(constraints, cooked);
@@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
break;
- case AT_ValidateConstraint:
+ case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
+ ATSimplePermissions(rel, ATT_TABLE);
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_ValidateConstraintRecurse;
+ pass = AT_PASS_MISC;
+ break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
@@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break;
- case AT_ValidateConstraint:
- ATExecValidateConstraint(rel, cmd->name);
+ case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
+ ATExecValidateConstraint(rel, cmd->name, false, false, lockmode);
+ break;
+ case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
+ * recursion */
+ ATExecValidateConstraint(rel, cmd->name, true, false, lockmode);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
@@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
list_make1(copyObject(constr)),
recursing, !recursing);
- /* Add each constraint to Phase 3's queue */
+ /* Add each to-be-validated constraint to Phase 3's queue */
foreach(lcon, newcons)
{
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- NewConstraint *newcon;
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = ccon->name;
- newcon->contype = ccon->contype;
- /* ExecQual wants implicit-AND format */
- newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+ if (!ccon->skip_validation)
+ {
+ NewConstraint *newcon;
- tab->constraints = lappend(tab->constraints, newcon);
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = ccon->name;
+ newcon->contype = ccon->contype;
+ /* ExecQual wants implicit-AND format */
+ newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
/* Save the actually assigned name if it was defaulted */
if (constr->conname == NULL)
@@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
/*
* ALTER TABLE VALIDATE CONSTRAINT
+ *
+ * XXX The reason we handle recursion here rather than at Phase 1 is because
+ * there's no good way to skip recursing when handling foreign keys: there is
+ * no need to lock children in that case, yet we wouldn't be able to avoid
+ * doing so at that level.
*/
static void
-ATExecValidateConstraint(Relation rel, char *constrName)
+ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
+ bool recursing, LOCKMODE lockmode)
{
Relation conrel;
SysScanDesc scan;
@@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName)
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (con->contype == CONSTRAINT_FOREIGN &&
- strcmp(NameStr(con->conname), constrName) == 0)
+ if (strcmp(NameStr(con->conname), constrName) == 0)
{
found = true;
break;
@@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName)
if (!found)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist",
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel))));
+
+ if (con->contype != CONSTRAINT_FOREIGN &&
+ con->contype != CONSTRAINT_CHECK)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
constrName, RelationGetRelationName(rel))));
if (!con->convalidated)
{
- Oid conid = HeapTupleGetOid(tuple);
- HeapTuple copyTuple = heap_copytuple(tuple);
- Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
- Relation refrel;
+ HeapTuple copyTuple;
+ Form_pg_constraint copy_con;
- /*
- * Triggers are already in place on both tables, so a concurrent write
- * that alters the result here is not possible. Normally we can run a
- * query here to do the validation, which would only require
- * AccessShareLock. In some cases, it is possible that we might need
- * to fire triggers to perform the check, so we take a lock at
- * RowShareLock level just in case.
- */
- refrel = heap_open(con->confrelid, RowShareLock);
+ if (con->contype == CONSTRAINT_FOREIGN)
+ {
+ Oid conid = HeapTupleGetOid(tuple);
+ Relation refrel;
- validateForeignKeyConstraint(constrName, rel, refrel,
- con->conindid,
- conid);
+ /*
+ * Triggers are already in place on both tables, so a concurrent write
+ * that alters the result here is not possible. Normally we can run a
+ * query here to do the validation, which would only require
+ * AccessShareLock. In some cases, it is possible that we might need
+ * to fire triggers to perform the check, so we take a lock at
+ * RowShareLock level just in case.
+ */
+ refrel = heap_open(con->confrelid, RowShareLock);
+
+ validateForeignKeyConstraint(constrName, rel, refrel,
+ con->conindid,
+ conid);
+ heap_close(refrel, NoLock);
+
+ /*
+ * Foreign keys do not inherit, so we purposedly ignore the
+ * recursion bit here
+ */
+ }
+ else if (con->contype == CONSTRAINT_CHECK)
+ {
+ List *children = NIL;
+ ListCell *child;
+
+ /*
+ * If we're recursing, the parent has already done this, so skip
+ * it.
+ */
+ if (!recursing)
+ children = find_all_inheritors(RelationGetRelid(rel),
+ lockmode, NULL);
+
+ /*
+ * For CHECK constraints, we must ensure that we only mark the
+ * constraint as validated on the parent if it's already validated
+ * on the children.
+ *
+ * We recurse before validating on the parent, to reduce risk of
+ * deadlocks.
+ */
+ foreach(child, children)
+ {
+ Oid childoid = lfirst_oid(child);
+ Relation childrel;
+
+ if (childoid == RelationGetRelid(rel))
+ continue;
+
+ /*
+ * If we are told not to recurse, there had better not be any
+ * child tables; else the addition would put them out of step.
+ */
+ if (!recurse)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be validated on child tables too")));
+
+ /* find_all_inheritors already got lock */
+ childrel = heap_open(childoid, NoLock);
+
+ ATExecValidateConstraint(childrel, constrName, false,
+ true, lockmode);
+ heap_close(childrel, NoLock);
+ }
+
+ validateCheckConstraint(rel, tuple);
+
+ /*
+ * Invalidate relcache so that others see the new validated
+ * constraint.
+ */
+ CacheInvalidateRelcache(rel);
+ }
/*
* Now update the catalog, while we have the door open.
*/
+ copyTuple = heap_copytuple(tuple);
+ copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
copy_con->convalidated = true;
simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple);
-
- heap_close(refrel, NoLock);
}
systable_endscan(scan);
@@ -6131,6 +6226,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
}
/*
+ * Scan the existing rows in a table to verify they meet a proposed
+ * CHECK constraint.
+ *
+ * The caller must have opened and locked the relation appropriately.
+ */
+static void
+validateCheckConstraint(Relation rel, HeapTuple constrtup)
+{
+ EState *estate;
+ Datum val;
+ char *conbin;
+ Expr *origexpr;
+ List *exprstate;
+ TupleDesc tupdesc;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ ExprContext *econtext;
+ MemoryContext oldcxt;
+ TupleTableSlot *slot;
+ Form_pg_constraint constrForm;
+ bool isnull;
+
+ constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
+
+ estate = CreateExecutorState();
+ /*
+ * XXX this tuple doesn't really come from a syscache, but this doesn't
+ * matter to SysCacheGetAttr, because it only wants to be able to fetch the
+ * tupdesc
+ */
+ val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u",
+ HeapTupleGetOid(constrtup));
+ conbin = TextDatumGetCString(val);
+ origexpr = (Expr *) stringToNode(conbin);
+ exprstate = (List *)
+ ExecPrepareExpr((Expr *) make_ands_implicit(origexpr), estate);
+
+ econtext = GetPerTupleExprContext(estate);
+ tupdesc = RelationGetDescr(rel);
+ slot = MakeSingleTupleTableSlot(tupdesc);
+ econtext->ecxt_scantuple = slot;
+
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+
+ /*
+ * Switch to per-tuple memory context and reset it for each tuple
+ * produced, so we don't leak memory.
+ */
+ oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ if (!ExecQual(exprstate, econtext, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("check constraint \"%s\" is violated by some row",
+ NameStr(constrForm->conname))));
+
+ ResetExprContext(econtext);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+ heap_endscan(scan);
+ ExecDropSingleTupleTableSlot(slot);
+ FreeExecutorState(estate);
+}
+
+/*
* Scan the existing rows in a table to verify they meet a proposed FK
* constraint.
*