aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2011-06-01 18:43:50 -0400
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2011-06-30 11:24:31 -0400
commit897795240cfaaed724af2f53ed2c50c9862f951f (patch)
treea646222fe29936f565e715a1cce3a65016587057 /src/backend
parentb36927fbe922d1aac5d6e42c04eecf65bf37f5f3 (diff)
downloadpostgresql-897795240cfaaed724af2f53ed2c50c9862f951f.tar.gz
postgresql-897795240cfaaed724af2f53ed2c50c9862f951f.zip
Enable CHECK constraints to be declared NOT VALID
This means that they can initially be added to a large existing table without checking its initial contents, but new tuples must comply to them; a separate pass invoked by ALTER TABLE / VALIDATE can verify existing data and ensure it complies with the constraint, at which point it is marked validated and becomes a normal part of the table ecosystem. An non-validated CHECK constraint is ignored in the planner for constraint_exclusion purposes; when validated, cached plans are recomputed so that partitioning starts working right away. This patch also enables domains to have unvalidated CHECK constraints attached to them as well by way of ALTER DOMAIN / ADD CONSTRAINT / NOT VALID, which can later be validated with ALTER DOMAIN / VALIDATE CONSTRAINT. Thanks to Thom Brown, Dean Rasheed and Jaime Casanova for the various reviews, and Robert Hass for documentation wording improvement suggestions. This patch was sponsored by Enova Financial.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/common/tupdesc.c1
-rw-r--r--src/backend/catalog/heap.c13
-rw-r--r--src/backend/commands/tablecmds.c236
-rw-r--r--src/backend/commands/typecmds.c140
-rw-r--r--src/backend/optimizer/util/plancat.c11
-rw-r--r--src/backend/parser/gram.y12
-rw-r--r--src/backend/tcop/utility.c4
-rw-r--r--src/backend/utils/cache/relcache.c1
8 files changed, 361 insertions, 57 deletions
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 16979c4ea72..4656dba642c 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -200,6 +200,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
cpy->check[i].ccname = pstrdup(constr->check[i].ccname);
if (constr->check[i].ccbin)
cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
+ cpy->check[i].ccvalid = constr->check[i].ccvalid;
}
}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index e606ac2b9ed..6b8eb53dd1b 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -98,7 +98,7 @@ static Oid AddNewRelationType(const char *typeName,
Oid new_array_type);
static void RelationRemoveInheritance(Oid relid);
static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
- bool is_local, int inhcount);
+ bool is_validated, bool is_local, int inhcount);
static void StoreConstraints(Relation rel, List *cooked_constraints);
static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
bool allow_merge, bool is_local);
@@ -1846,7 +1846,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
*/
static void
StoreRelCheck(Relation rel, char *ccname, Node *expr,
- bool is_local, int inhcount)
+ bool is_validated, bool is_local, int inhcount)
{
char *ccbin;
char *ccsrc;
@@ -1907,7 +1907,7 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr,
CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
- true, /* Is Validated */
+ is_validated,
RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */
keycount, /* # attrs in the constraint */
@@ -1967,7 +1967,7 @@ StoreConstraints(Relation rel, List *cooked_constraints)
StoreAttrDefault(rel, con->attnum, con->expr);
break;
case CONSTR_CHECK:
- StoreRelCheck(rel, con->name, con->expr,
+ StoreRelCheck(rel, con->name, con->expr, !con->skip_validation,
con->is_local, con->inhcount);
numchecks++;
break;
@@ -2081,6 +2081,7 @@ AddRelationNewConstraints(Relation rel,
cooked->name = NULL;
cooked->attnum = colDef->attnum;
cooked->expr = expr;
+ cooked->skip_validation = false;
cooked->is_local = is_local;
cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
@@ -2194,7 +2195,8 @@ AddRelationNewConstraints(Relation rel,
/*
* OK, store it.
*/
- StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
+ StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local,
+ is_local ? 0 : 1);
numchecks++;
@@ -2203,6 +2205,7 @@ AddRelationNewConstraints(Relation rel,
cooked->name = ccname;
cooked->attnum = 0;
cooked->expr = expr;
+ cooked->skip_validation = cdef->skip_validation;
cooked->is_local = is_local;
cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b2ba11cc781..cfc685b9499 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel,
static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid,
const char *newNspName, LOCKMODE lockmode);
-static void ATExecValidateConstraint(Relation rel, char *constrName);
+static void ATExecValidateConstraint(Relation rel, char *constrName,
+ bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
Oid *opclasses);
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
+static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
@@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
cooked->name = NULL;
cooked->attnum = attnum;
cooked->expr = colDef->cooked_default;
+ cooked->skip_validation = false;
cooked->is_local = true; /* not used for defaults */
cooked->inhcount = 0; /* ditto */
cookedDefaults = lappend(cookedDefaults, cooked);
@@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
cooked->name = pstrdup(name);
cooked->attnum = 0; /* not used for constraints */
cooked->expr = expr;
+ cooked->skip_validation = false;
cooked->is_local = false;
cooked->inhcount = 1;
constraints = lappend(constraints, cooked);
@@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
break;
- case AT_ValidateConstraint:
+ case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
+ ATSimplePermissions(rel, ATT_TABLE);
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_ValidateConstraintRecurse;
+ pass = AT_PASS_MISC;
+ break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
@@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break;
- case AT_ValidateConstraint:
- ATExecValidateConstraint(rel, cmd->name);
+ case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
+ ATExecValidateConstraint(rel, cmd->name, false, false, lockmode);
+ break;
+ case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
+ * recursion */
+ ATExecValidateConstraint(rel, cmd->name, true, false, lockmode);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
@@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
list_make1(copyObject(constr)),
recursing, !recursing);
- /* Add each constraint to Phase 3's queue */
+ /* Add each to-be-validated constraint to Phase 3's queue */
foreach(lcon, newcons)
{
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- NewConstraint *newcon;
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = ccon->name;
- newcon->contype = ccon->contype;
- /* ExecQual wants implicit-AND format */
- newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+ if (!ccon->skip_validation)
+ {
+ NewConstraint *newcon;
- tab->constraints = lappend(tab->constraints, newcon);
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = ccon->name;
+ newcon->contype = ccon->contype;
+ /* ExecQual wants implicit-AND format */
+ newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
/* Save the actually assigned name if it was defaulted */
if (constr->conname == NULL)
@@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
/*
* ALTER TABLE VALIDATE CONSTRAINT
+ *
+ * XXX The reason we handle recursion here rather than at Phase 1 is because
+ * there's no good way to skip recursing when handling foreign keys: there is
+ * no need to lock children in that case, yet we wouldn't be able to avoid
+ * doing so at that level.
*/
static void
-ATExecValidateConstraint(Relation rel, char *constrName)
+ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
+ bool recursing, LOCKMODE lockmode)
{
Relation conrel;
SysScanDesc scan;
@@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName)
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (con->contype == CONSTRAINT_FOREIGN &&
- strcmp(NameStr(con->conname), constrName) == 0)
+ if (strcmp(NameStr(con->conname), constrName) == 0)
{
found = true;
break;
@@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName)
if (!found)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist",
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel))));
+
+ if (con->contype != CONSTRAINT_FOREIGN &&
+ con->contype != CONSTRAINT_CHECK)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
constrName, RelationGetRelationName(rel))));
if (!con->convalidated)
{
- Oid conid = HeapTupleGetOid(tuple);
- HeapTuple copyTuple = heap_copytuple(tuple);
- Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
- Relation refrel;
+ HeapTuple copyTuple;
+ Form_pg_constraint copy_con;
- /*
- * Triggers are already in place on both tables, so a concurrent write
- * that alters the result here is not possible. Normally we can run a
- * query here to do the validation, which would only require
- * AccessShareLock. In some cases, it is possible that we might need
- * to fire triggers to perform the check, so we take a lock at
- * RowShareLock level just in case.
- */
- refrel = heap_open(con->confrelid, RowShareLock);
+ if (con->contype == CONSTRAINT_FOREIGN)
+ {
+ Oid conid = HeapTupleGetOid(tuple);
+ Relation refrel;
- validateForeignKeyConstraint(constrName, rel, refrel,
- con->conindid,
- conid);
+ /*
+ * Triggers are already in place on both tables, so a concurrent write
+ * that alters the result here is not possible. Normally we can run a
+ * query here to do the validation, which would only require
+ * AccessShareLock. In some cases, it is possible that we might need
+ * to fire triggers to perform the check, so we take a lock at
+ * RowShareLock level just in case.
+ */
+ refrel = heap_open(con->confrelid, RowShareLock);
+
+ validateForeignKeyConstraint(constrName, rel, refrel,
+ con->conindid,
+ conid);
+ heap_close(refrel, NoLock);
+
+ /*
+ * Foreign keys do not inherit, so we purposedly ignore the
+ * recursion bit here
+ */
+ }
+ else if (con->contype == CONSTRAINT_CHECK)
+ {
+ List *children = NIL;
+ ListCell *child;
+
+ /*
+ * If we're recursing, the parent has already done this, so skip
+ * it.
+ */
+ if (!recursing)
+ children = find_all_inheritors(RelationGetRelid(rel),
+ lockmode, NULL);
+
+ /*
+ * For CHECK constraints, we must ensure that we only mark the
+ * constraint as validated on the parent if it's already validated
+ * on the children.
+ *
+ * We recurse before validating on the parent, to reduce risk of
+ * deadlocks.
+ */
+ foreach(child, children)
+ {
+ Oid childoid = lfirst_oid(child);
+ Relation childrel;
+
+ if (childoid == RelationGetRelid(rel))
+ continue;
+
+ /*
+ * If we are told not to recurse, there had better not be any
+ * child tables; else the addition would put them out of step.
+ */
+ if (!recurse)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be validated on child tables too")));
+
+ /* find_all_inheritors already got lock */
+ childrel = heap_open(childoid, NoLock);
+
+ ATExecValidateConstraint(childrel, constrName, false,
+ true, lockmode);
+ heap_close(childrel, NoLock);
+ }
+
+ validateCheckConstraint(rel, tuple);
+
+ /*
+ * Invalidate relcache so that others see the new validated
+ * constraint.
+ */
+ CacheInvalidateRelcache(rel);
+ }
/*
* Now update the catalog, while we have the door open.
*/
+ copyTuple = heap_copytuple(tuple);
+ copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
copy_con->convalidated = true;
simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple);
-
- heap_close(refrel, NoLock);
}
systable_endscan(scan);
@@ -6131,6 +6226,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
}
/*
+ * Scan the existing rows in a table to verify they meet a proposed
+ * CHECK constraint.
+ *
+ * The caller must have opened and locked the relation appropriately.
+ */
+static void
+validateCheckConstraint(Relation rel, HeapTuple constrtup)
+{
+ EState *estate;
+ Datum val;
+ char *conbin;
+ Expr *origexpr;
+ List *exprstate;
+ TupleDesc tupdesc;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ ExprContext *econtext;
+ MemoryContext oldcxt;
+ TupleTableSlot *slot;
+ Form_pg_constraint constrForm;
+ bool isnull;
+
+ constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
+
+ estate = CreateExecutorState();
+ /*
+ * XXX this tuple doesn't really come from a syscache, but this doesn't
+ * matter to SysCacheGetAttr, because it only wants to be able to fetch the
+ * tupdesc
+ */
+ val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u",
+ HeapTupleGetOid(constrtup));
+ conbin = TextDatumGetCString(val);
+ origexpr = (Expr *) stringToNode(conbin);
+ exprstate = (List *)
+ ExecPrepareExpr((Expr *) make_ands_implicit(origexpr), estate);
+
+ econtext = GetPerTupleExprContext(estate);
+ tupdesc = RelationGetDescr(rel);
+ slot = MakeSingleTupleTableSlot(tupdesc);
+ econtext->ecxt_scantuple = slot;
+
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+
+ /*
+ * Switch to per-tuple memory context and reset it for each tuple
+ * produced, so we don't leak memory.
+ */
+ oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ if (!ExecQual(exprstate, econtext, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("check constraint \"%s\" is violated by some row",
+ NameStr(constrForm->conname))));
+
+ ResetExprContext(econtext);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+ heap_endscan(scan);
+ ExecDropSingleTupleTableSlot(slot);
+ FreeExecutorState(estate);
+}
+
+/*
* Scan the existing rows in a table to verify they meet a proposed FK
* constraint.
*
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 66c11de6723..437d23a8102 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -86,6 +86,7 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid);
static Oid findTypeTypmodinFunction(List *procname);
static Oid findTypeTypmodoutFunction(List *procname);
static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid);
+static void validateDomainConstraint(Oid domainoid, char *ccbin);
static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode);
static void checkDomainOwner(HeapTuple tup);
static void checkEnumOwner(HeapTuple tup);
@@ -1941,14 +1942,8 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
Relation typrel;
HeapTuple tup;
Form_pg_type typTup;
- List *rels;
- ListCell *rt;
- EState *estate;
- ExprContext *econtext;
- char *ccbin;
- Expr *expr;
- ExprState *exprstate;
Constraint *constr;
+ char *ccbin;
/* Make a TypeName so we can use standard type lookup machinery */
typename = makeTypeNameFromNameList(names);
@@ -2027,10 +2022,129 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
constr, NameStr(typTup->typname));
/*
- * Test all values stored in the attributes based on the domain the
- * constraint is being added to.
+ * If requested to validate the constraint, test all values stored in the
+ * attributes based on the domain the constraint is being added to.
*/
- expr = (Expr *) stringToNode(ccbin);
+ if (!constr->skip_validation)
+ validateDomainConstraint(domainoid, ccbin);
+
+ /* Clean up */
+ heap_close(typrel, RowExclusiveLock);
+}
+
+/*
+ * AlterDomainValidateConstraint
+ *
+ * Implements the ALTER DOMAIN .. VALIDATE CONSTRAINT statement.
+ */
+void
+AlterDomainValidateConstraint(List *names, char *constrName)
+{
+ TypeName *typename;
+ Oid domainoid;
+ Relation typrel;
+ Relation conrel;
+ HeapTuple tup;
+ Form_pg_type typTup;
+ Form_pg_constraint con;
+ Form_pg_constraint copy_con;
+ char *conbin;
+ SysScanDesc scan;
+ Datum val;
+ bool found = false;
+ bool isnull;
+ HeapTuple tuple;
+ HeapTuple copyTuple;
+ ScanKeyData key;
+
+ /* Make a TypeName so we can use standard type lookup machinery */
+ typename = makeTypeNameFromNameList(names);
+ domainoid = typenameTypeId(NULL, typename);
+
+ /* Look up the domain in the type table */
+ typrel = heap_open(TypeRelationId, AccessShareLock);
+
+ tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainoid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for type %u", domainoid);
+ typTup = (Form_pg_type) GETSTRUCT(tup);
+
+ /* Check it's a domain and check user has permission for ALTER DOMAIN */
+ checkDomainOwner(tup);
+
+ /*
+ * Find and check the target constraint
+ */
+ conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(domainoid));
+ scan = systable_beginscan(conrel, ConstraintTypidIndexId,
+ true, SnapshotNow, 1, &key);
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
+ if (strcmp(NameStr(con->conname), constrName) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ con = NULL; /* keep compiler quiet */
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" of domain \"%s\" does not exist",
+ constrName, NameStr(con->conname))));
+ }
+
+ if (con->contype != CONSTRAINT_CHECK)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint",
+ constrName, NameStr(con->conname))));
+
+ val = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_conbin,
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u",
+ HeapTupleGetOid(tuple));
+ conbin = TextDatumGetCString(val);
+
+ validateDomainConstraint(domainoid, conbin);
+
+ /*
+ * Now update the catalog, while we have the door open.
+ */
+ copyTuple = heap_copytuple(tuple);
+ copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
+ copy_con->convalidated = true;
+ simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
+ CatalogUpdateIndexes(conrel, copyTuple);
+ heap_freetuple(copyTuple);
+
+ systable_endscan(scan);
+
+ heap_close(typrel, AccessShareLock);
+ heap_close(conrel, RowExclusiveLock);
+
+ ReleaseSysCache(tup);
+}
+
+static void
+validateDomainConstraint(Oid domainoid, char *ccbin)
+{
+ Expr *expr = (Expr *) stringToNode(ccbin);
+ List *rels;
+ ListCell *rt;
+ EState *estate;
+ ExprContext *econtext;
+ ExprState *exprstate;
/* Need an EState to run ExecEvalExpr */
estate = CreateExecutorState();
@@ -2092,11 +2206,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
}
FreeExecutorState(estate);
-
- /* Clean up */
- heap_close(typrel, RowExclusiveLock);
}
-
/*
* get_rels_with_domain
*
@@ -2416,7 +2526,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
- true, /* Is Validated */
+ !constr->skip_validation, /* Is Validated */
InvalidOid, /* not a relation constraint */
NULL,
0,
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index b28681630b3..0ec17505a7d 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -552,7 +552,7 @@ get_relation_data_width(Oid relid, int32 *attr_widths)
/*
* get_relation_constraints
*
- * Retrieve the CHECK constraint expressions of the given relation.
+ * Retrieve the validated CHECK constraint expressions of the given relation.
*
* Returns a List (possibly empty) of constraint expressions. Each one
* has been canonicalized, and its Vars are changed to have the varno
@@ -591,6 +591,13 @@ get_relation_constraints(PlannerInfo *root,
{
Node *cexpr;
+ /*
+ * If this constraint hasn't been fully validated yet, we must
+ * ignore it here.
+ */
+ if (!constr->check[i].ccvalid)
+ continue;
+
cexpr = stringToNode(constr->check[i].ccbin);
/*
@@ -663,7 +670,7 @@ get_relation_constraints(PlannerInfo *root,
*
* Detect whether the relation need not be scanned because it has either
* self-inconsistent restrictions, or restrictions inconsistent with the
- * relation's CHECK constraints.
+ * relation's validated CHECK constraints.
*
* Note: this examines only rel->relid, rel->reloptkind, and
* rel->baserestrictinfo; therefore it can be called before filling in
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 62cff8a7de3..72260320c38 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -2759,8 +2759,9 @@ ConstraintElem:
n->raw_expr = $3;
n->cooked_expr = NULL;
processCASbits($5, @5, "CHECK",
- NULL, NULL, NULL,
+ NULL, NULL, &n->skip_validation,
yyscanner);
+ n->initially_valid = !n->skip_validation;
$$ = (Node *)n;
}
| UNIQUE '(' columnList ')' opt_definition OptConsTableSpace
@@ -7559,6 +7560,15 @@ AlterDomainStmt:
n->behavior = $7;
$$ = (Node *)n;
}
+ /* ALTER DOMAIN <domain> VALIDATE CONSTRAINT <name> */
+ | ALTER DOMAIN_P any_name VALIDATE CONSTRAINT name
+ {
+ AlterDomainStmt *n = makeNode(AlterDomainStmt);
+ n->subtype = 'V';
+ n->typeName = $3;
+ n->name = $6;
+ $$ = (Node *)n;
+ }
;
opt_as: AS {}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 0559998c71f..224e1f3761d 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -820,6 +820,10 @@ standard_ProcessUtility(Node *parsetree,
stmt->name,
stmt->behavior);
break;
+ case 'V': /* VALIDATE CONSTRAINT */
+ AlterDomainValidateConstraint(stmt->typeName,
+ stmt->name);
+ break;
default: /* oops */
elog(ERROR, "unrecognized alter domain type: %d",
(int) stmt->subtype);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 1a400a0a576..0b9d77a61d2 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3251,6 +3251,7 @@ CheckConstraintFetch(Relation relation)
elog(ERROR, "unexpected constraint record found for rel %s",
RelationGetRelationName(relation));
+ check[found].ccvalid = conform->convalidated;
check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
NameStr(conform->conname));