diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/common/tupdesc.c | 1 | ||||
-rw-r--r-- | src/backend/catalog/heap.c | 13 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 236 | ||||
-rw-r--r-- | src/backend/commands/typecmds.c | 140 | ||||
-rw-r--r-- | src/backend/optimizer/util/plancat.c | 11 | ||||
-rw-r--r-- | src/backend/parser/gram.y | 12 | ||||
-rw-r--r-- | src/backend/tcop/utility.c | 4 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 1 | ||||
-rw-r--r-- | src/include/access/tupdesc.h | 1 | ||||
-rw-r--r-- | src/include/catalog/heap.h | 1 | ||||
-rw-r--r-- | src/include/commands/typecmds.h | 1 | ||||
-rw-r--r-- | src/include/nodes/parsenodes.h | 3 | ||||
-rw-r--r-- | src/test/regress/expected/alter_table.out | 101 | ||||
-rw-r--r-- | src/test/regress/expected/domain.out | 11 | ||||
-rw-r--r-- | src/test/regress/sql/alter_table.sql | 46 | ||||
-rw-r--r-- | src/test/regress/sql/domain.sql | 10 |
16 files changed, 535 insertions, 57 deletions
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 16979c4ea72..4656dba642c 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -200,6 +200,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc) cpy->check[i].ccname = pstrdup(constr->check[i].ccname); if (constr->check[i].ccbin) cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin); + cpy->check[i].ccvalid = constr->check[i].ccvalid; } } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index e606ac2b9ed..6b8eb53dd1b 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -98,7 +98,7 @@ static Oid AddNewRelationType(const char *typeName, Oid new_array_type); static void RelationRemoveInheritance(Oid relid); static void StoreRelCheck(Relation rel, char *ccname, Node *expr, - bool is_local, int inhcount); + bool is_validated, bool is_local, int inhcount); static void StoreConstraints(Relation rel, List *cooked_constraints); static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, bool allow_merge, bool is_local); @@ -1846,7 +1846,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr) */ static void StoreRelCheck(Relation rel, char *ccname, Node *expr, - bool is_local, int inhcount) + bool is_validated, bool is_local, int inhcount) { char *ccbin; char *ccsrc; @@ -1907,7 +1907,7 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr, CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ - true, /* Is Validated */ + is_validated, RelationGetRelid(rel), /* relation */ attNos, /* attrs in the constraint */ keycount, /* # attrs in the constraint */ @@ -1967,7 +1967,7 @@ StoreConstraints(Relation rel, List *cooked_constraints) StoreAttrDefault(rel, con->attnum, con->expr); break; case CONSTR_CHECK: - StoreRelCheck(rel, con->name, con->expr, + StoreRelCheck(rel, con->name, con->expr, !con->skip_validation, con->is_local, con->inhcount); numchecks++; break; @@ -2081,6 +2081,7 @@ AddRelationNewConstraints(Relation rel, cooked->name = NULL; cooked->attnum = colDef->attnum; cooked->expr = expr; + cooked->skip_validation = false; cooked->is_local = is_local; cooked->inhcount = is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked); @@ -2194,7 +2195,8 @@ AddRelationNewConstraints(Relation rel, /* * OK, store it. */ - StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1); + StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local, + is_local ? 0 : 1); numchecks++; @@ -2203,6 +2205,7 @@ AddRelationNewConstraints(Relation rel, cooked->name = ccname; cooked->attnum = 0; cooked->expr = expr; + cooked->skip_validation = cdef->skip_validation; cooked->is_local = is_local; cooked->inhcount = is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index b2ba11cc781..cfc685b9499 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel, static void AlterSeqNamespaces(Relation classRel, Relation rel, Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode); -static void ATExecValidateConstraint(Relation rel, char *constrName); +static void ATExecValidateConstraint(Relation rel, char *constrName, + bool recurse, bool recursing, LOCKMODE lockmode); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel, int numattrs, int16 *attnums, Oid *opclasses); static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); +static void validateCheckConstraint(Relation rel, HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname, Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid); @@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) cooked->name = NULL; cooked->attnum = attnum; cooked->expr = colDef->cooked_default; + cooked->skip_validation = false; cooked->is_local = true; /* not used for defaults */ cooked->inhcount = 0; /* ditto */ cookedDefaults = lappend(cookedDefaults, cooked); @@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, cooked->name = pstrdup(name); cooked->attnum = 0; /* not used for constraints */ cooked->expr = expr; + cooked->skip_validation = false; cooked->is_local = false; cooked->inhcount = 1; constraints = lappend(constraints, cooked); @@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAddInherit(rel); pass = AT_PASS_MISC; break; - case AT_ValidateConstraint: + case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */ + ATSimplePermissions(rel, ATT_TABLE); + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_ValidateConstraintRecurse; + pass = AT_PASS_MISC; + break; case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableAlwaysTrig: case AT_EnableReplicaTrig: @@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode); break; - case AT_ValidateConstraint: - ATExecValidateConstraint(rel, cmd->name); + case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */ + ATExecValidateConstraint(rel, cmd->name, false, false, lockmode); + break; + case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with + * recursion */ + ATExecValidateConstraint(rel, cmd->name, true, false, lockmode); break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, @@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, list_make1(copyObject(constr)), recursing, !recursing); - /* Add each constraint to Phase 3's queue */ + /* Add each to-be-validated constraint to Phase 3's queue */ foreach(lcon, newcons) { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - NewConstraint *newcon; - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = ccon->name; - newcon->contype = ccon->contype; - /* ExecQual wants implicit-AND format */ - newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); + if (!ccon->skip_validation) + { + NewConstraint *newcon; - tab->constraints = lappend(tab->constraints, newcon); + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = ccon->name; + newcon->contype = ccon->contype; + /* ExecQual wants implicit-AND format */ + newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); + + tab->constraints = lappend(tab->constraints, newcon); + } /* Save the actually assigned name if it was defaulted */ if (constr->conname == NULL) @@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * ALTER TABLE VALIDATE CONSTRAINT + * + * XXX The reason we handle recursion here rather than at Phase 1 is because + * there's no good way to skip recursing when handling foreign keys: there is + * no need to lock children in that case, yet we wouldn't be able to avoid + * doing so at that level. */ static void -ATExecValidateConstraint(Relation rel, char *constrName) +ATExecValidateConstraint(Relation rel, char *constrName, bool recurse, + bool recursing, LOCKMODE lockmode) { Relation conrel; SysScanDesc scan; @@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName) while (HeapTupleIsValid(tuple = systable_getnext(scan))) { con = (Form_pg_constraint) GETSTRUCT(tuple); - if (con->contype == CONSTRAINT_FOREIGN && - strcmp(NameStr(con->conname), constrName) == 0) + if (strcmp(NameStr(con->conname), constrName) == 0) { found = true; break; @@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName) if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist", + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); + + if (con->contype != CONSTRAINT_FOREIGN && + con->contype != CONSTRAINT_CHECK) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint", constrName, RelationGetRelationName(rel)))); if (!con->convalidated) { - Oid conid = HeapTupleGetOid(tuple); - HeapTuple copyTuple = heap_copytuple(tuple); - Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); - Relation refrel; + HeapTuple copyTuple; + Form_pg_constraint copy_con; - /* - * Triggers are already in place on both tables, so a concurrent write - * that alters the result here is not possible. Normally we can run a - * query here to do the validation, which would only require - * AccessShareLock. In some cases, it is possible that we might need - * to fire triggers to perform the check, so we take a lock at - * RowShareLock level just in case. - */ - refrel = heap_open(con->confrelid, RowShareLock); + if (con->contype == CONSTRAINT_FOREIGN) + { + Oid conid = HeapTupleGetOid(tuple); + Relation refrel; - validateForeignKeyConstraint(constrName, rel, refrel, - con->conindid, - conid); + /* + * Triggers are already in place on both tables, so a concurrent write + * that alters the result here is not possible. Normally we can run a + * query here to do the validation, which would only require + * AccessShareLock. In some cases, it is possible that we might need + * to fire triggers to perform the check, so we take a lock at + * RowShareLock level just in case. + */ + refrel = heap_open(con->confrelid, RowShareLock); + + validateForeignKeyConstraint(constrName, rel, refrel, + con->conindid, + conid); + heap_close(refrel, NoLock); + + /* + * Foreign keys do not inherit, so we purposedly ignore the + * recursion bit here + */ + } + else if (con->contype == CONSTRAINT_CHECK) + { + List *children = NIL; + ListCell *child; + + /* + * If we're recursing, the parent has already done this, so skip + * it. + */ + if (!recursing) + children = find_all_inheritors(RelationGetRelid(rel), + lockmode, NULL); + + /* + * For CHECK constraints, we must ensure that we only mark the + * constraint as validated on the parent if it's already validated + * on the children. + * + * We recurse before validating on the parent, to reduce risk of + * deadlocks. + */ + foreach(child, children) + { + Oid childoid = lfirst_oid(child); + Relation childrel; + + if (childoid == RelationGetRelid(rel)) + continue; + + /* + * If we are told not to recurse, there had better not be any + * child tables; else the addition would put them out of step. + */ + if (!recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be validated on child tables too"))); + + /* find_all_inheritors already got lock */ + childrel = heap_open(childoid, NoLock); + + ATExecValidateConstraint(childrel, constrName, false, + true, lockmode); + heap_close(childrel, NoLock); + } + + validateCheckConstraint(rel, tuple); + + /* + * Invalidate relcache so that others see the new validated + * constraint. + */ + CacheInvalidateRelcache(rel); + } /* * Now update the catalog, while we have the door open. */ + copyTuple = heap_copytuple(tuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); copy_con->convalidated = true; simple_heap_update(conrel, ©Tuple->t_self, copyTuple); CatalogUpdateIndexes(conrel, copyTuple); heap_freetuple(copyTuple); - - heap_close(refrel, NoLock); } systable_endscan(scan); @@ -6131,6 +6226,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts) } /* + * Scan the existing rows in a table to verify they meet a proposed + * CHECK constraint. + * + * The caller must have opened and locked the relation appropriately. + */ +static void +validateCheckConstraint(Relation rel, HeapTuple constrtup) +{ + EState *estate; + Datum val; + char *conbin; + Expr *origexpr; + List *exprstate; + TupleDesc tupdesc; + HeapScanDesc scan; + HeapTuple tuple; + ExprContext *econtext; + MemoryContext oldcxt; + TupleTableSlot *slot; + Form_pg_constraint constrForm; + bool isnull; + + constrForm = (Form_pg_constraint) GETSTRUCT(constrtup); + + estate = CreateExecutorState(); + /* + * XXX this tuple doesn't really come from a syscache, but this doesn't + * matter to SysCacheGetAttr, because it only wants to be able to fetch the + * tupdesc + */ + val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin, + &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", + HeapTupleGetOid(constrtup)); + conbin = TextDatumGetCString(val); + origexpr = (Expr *) stringToNode(conbin); + exprstate = (List *) + ExecPrepareExpr((Expr *) make_ands_implicit(origexpr), estate); + + econtext = GetPerTupleExprContext(estate); + tupdesc = RelationGetDescr(rel); + slot = MakeSingleTupleTableSlot(tupdesc); + econtext->ecxt_scantuple = slot; + + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + + /* + * Switch to per-tuple memory context and reset it for each tuple + * produced, so we don't leak memory. + */ + oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + if (!ExecQual(exprstate, econtext, true)) + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("check constraint \"%s\" is violated by some row", + NameStr(constrForm->conname)))); + + ResetExprContext(econtext); + } + + MemoryContextSwitchTo(oldcxt); + heap_endscan(scan); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); +} + +/* * Scan the existing rows in a table to verify they meet a proposed FK * constraint. * diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 66c11de6723..437d23a8102 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -86,6 +86,7 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid); static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodoutFunction(List *procname); static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid); +static void validateDomainConstraint(Oid domainoid, char *ccbin); static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode); static void checkDomainOwner(HeapTuple tup); static void checkEnumOwner(HeapTuple tup); @@ -1941,14 +1942,8 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) Relation typrel; HeapTuple tup; Form_pg_type typTup; - List *rels; - ListCell *rt; - EState *estate; - ExprContext *econtext; - char *ccbin; - Expr *expr; - ExprState *exprstate; Constraint *constr; + char *ccbin; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -2027,10 +2022,129 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) constr, NameStr(typTup->typname)); /* - * Test all values stored in the attributes based on the domain the - * constraint is being added to. + * If requested to validate the constraint, test all values stored in the + * attributes based on the domain the constraint is being added to. */ - expr = (Expr *) stringToNode(ccbin); + if (!constr->skip_validation) + validateDomainConstraint(domainoid, ccbin); + + /* Clean up */ + heap_close(typrel, RowExclusiveLock); +} + +/* + * AlterDomainValidateConstraint + * + * Implements the ALTER DOMAIN .. VALIDATE CONSTRAINT statement. + */ +void +AlterDomainValidateConstraint(List *names, char *constrName) +{ + TypeName *typename; + Oid domainoid; + Relation typrel; + Relation conrel; + HeapTuple tup; + Form_pg_type typTup; + Form_pg_constraint con; + Form_pg_constraint copy_con; + char *conbin; + SysScanDesc scan; + Datum val; + bool found = false; + bool isnull; + HeapTuple tuple; + HeapTuple copyTuple; + ScanKeyData key; + + /* Make a TypeName so we can use standard type lookup machinery */ + typename = makeTypeNameFromNameList(names); + domainoid = typenameTypeId(NULL, typename); + + /* Look up the domain in the type table */ + typrel = heap_open(TypeRelationId, AccessShareLock); + + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", domainoid); + typTup = (Form_pg_type) GETSTRUCT(tup); + + /* Check it's a domain and check user has permission for ALTER DOMAIN */ + checkDomainOwner(tup); + + /* + * Find and check the target constraint + */ + conrel = heap_open(ConstraintRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_constraint_contypid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(domainoid)); + scan = systable_beginscan(conrel, ConstraintTypidIndexId, + true, SnapshotNow, 1, &key); + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) + { + con = (Form_pg_constraint) GETSTRUCT(tuple); + if (strcmp(NameStr(con->conname), constrName) == 0) + { + found = true; + break; + } + } + + if (!found) + { + con = NULL; /* keep compiler quiet */ + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of domain \"%s\" does not exist", + constrName, NameStr(con->conname)))); + } + + if (con->contype != CONSTRAINT_CHECK) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint", + constrName, NameStr(con->conname)))); + + val = SysCacheGetAttr(CONSTROID, tuple, + Anum_pg_constraint_conbin, + &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", + HeapTupleGetOid(tuple)); + conbin = TextDatumGetCString(val); + + validateDomainConstraint(domainoid, conbin); + + /* + * Now update the catalog, while we have the door open. + */ + copyTuple = heap_copytuple(tuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + copy_con->convalidated = true; + simple_heap_update(conrel, ©Tuple->t_self, copyTuple); + CatalogUpdateIndexes(conrel, copyTuple); + heap_freetuple(copyTuple); + + systable_endscan(scan); + + heap_close(typrel, AccessShareLock); + heap_close(conrel, RowExclusiveLock); + + ReleaseSysCache(tup); +} + +static void +validateDomainConstraint(Oid domainoid, char *ccbin) +{ + Expr *expr = (Expr *) stringToNode(ccbin); + List *rels; + ListCell *rt; + EState *estate; + ExprContext *econtext; + ExprState *exprstate; /* Need an EState to run ExecEvalExpr */ estate = CreateExecutorState(); @@ -2092,11 +2206,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) } FreeExecutorState(estate); - - /* Clean up */ - heap_close(typrel, RowExclusiveLock); } - /* * get_rels_with_domain * @@ -2416,7 +2526,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ - true, /* Is Validated */ + !constr->skip_validation, /* Is Validated */ InvalidOid, /* not a relation constraint */ NULL, 0, diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index b28681630b3..0ec17505a7d 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -552,7 +552,7 @@ get_relation_data_width(Oid relid, int32 *attr_widths) /* * get_relation_constraints * - * Retrieve the CHECK constraint expressions of the given relation. + * Retrieve the validated CHECK constraint expressions of the given relation. * * Returns a List (possibly empty) of constraint expressions. Each one * has been canonicalized, and its Vars are changed to have the varno @@ -591,6 +591,13 @@ get_relation_constraints(PlannerInfo *root, { Node *cexpr; + /* + * If this constraint hasn't been fully validated yet, we must + * ignore it here. + */ + if (!constr->check[i].ccvalid) + continue; + cexpr = stringToNode(constr->check[i].ccbin); /* @@ -663,7 +670,7 @@ get_relation_constraints(PlannerInfo *root, * * Detect whether the relation need not be scanned because it has either * self-inconsistent restrictions, or restrictions inconsistent with the - * relation's CHECK constraints. + * relation's validated CHECK constraints. * * Note: this examines only rel->relid, rel->reloptkind, and * rel->baserestrictinfo; therefore it can be called before filling in diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 62cff8a7de3..72260320c38 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -2759,8 +2759,9 @@ ConstraintElem: n->raw_expr = $3; n->cooked_expr = NULL; processCASbits($5, @5, "CHECK", - NULL, NULL, NULL, + NULL, NULL, &n->skip_validation, yyscanner); + n->initially_valid = !n->skip_validation; $$ = (Node *)n; } | UNIQUE '(' columnList ')' opt_definition OptConsTableSpace @@ -7559,6 +7560,15 @@ AlterDomainStmt: n->behavior = $7; $$ = (Node *)n; } + /* ALTER DOMAIN <domain> VALIDATE CONSTRAINT <name> */ + | ALTER DOMAIN_P any_name VALIDATE CONSTRAINT name + { + AlterDomainStmt *n = makeNode(AlterDomainStmt); + n->subtype = 'V'; + n->typeName = $3; + n->name = $6; + $$ = (Node *)n; + } ; opt_as: AS {} diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 0559998c71f..224e1f3761d 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -820,6 +820,10 @@ standard_ProcessUtility(Node *parsetree, stmt->name, stmt->behavior); break; + case 'V': /* VALIDATE CONSTRAINT */ + AlterDomainValidateConstraint(stmt->typeName, + stmt->name); + break; default: /* oops */ elog(ERROR, "unrecognized alter domain type: %d", (int) stmt->subtype); diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 1a400a0a576..0b9d77a61d2 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -3251,6 +3251,7 @@ CheckConstraintFetch(Relation relation) elog(ERROR, "unexpected constraint record found for rel %s", RelationGetRelationName(relation)); + check[found].ccvalid = conform->convalidated; check[found].ccname = MemoryContextStrdup(CacheMemoryContext, NameStr(conform->conname)); diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h index 99448efe12f..8b99cb849d8 100644 --- a/src/include/access/tupdesc.h +++ b/src/include/access/tupdesc.h @@ -29,6 +29,7 @@ typedef struct constrCheck { char *ccname; char *ccbin; /* nodeToString representation of expr */ + bool ccvalid; } ConstrCheck; /* This structure contains constraints of a tuple */ diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index c95e91303b8..0b7b190dd10 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -30,6 +30,7 @@ typedef struct CookedConstraint char *name; /* name, or NULL if none */ AttrNumber attnum; /* which attr (only for DEFAULT) */ Node *expr; /* transformed default or check expr */ + bool skip_validation; /* skip validation? (only for CHECK) */ bool is_local; /* constraint has local (non-inherited) def */ int inhcount; /* number of times constraint is inherited */ } CookedConstraint; diff --git a/src/include/commands/typecmds.h b/src/include/commands/typecmds.h index 6d9d1ccaa95..23726fb0fb2 100644 --- a/src/include/commands/typecmds.h +++ b/src/include/commands/typecmds.h @@ -31,6 +31,7 @@ extern Oid AssignTypeArrayOid(void); extern void AlterDomainDefault(List *names, Node *defaultRaw); extern void AlterDomainNotNull(List *names, bool notNull); extern void AlterDomainAddConstraint(List *names, Node *constr); +extern void AlterDomainValidateConstraint(List *names, char *constrName); extern void AlterDomainDropConstraint(List *names, const char *constrName, DropBehavior behavior); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index c65e3cd6e8c..00c1269bd14 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1190,6 +1190,7 @@ typedef enum AlterTableType AT_AddConstraint, /* add constraint */ AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */ AT_ValidateConstraint, /* validate constraint */ + AT_ValidateConstraintRecurse, /* internal to commands/tablecmds.c */ AT_ProcessedConstraint, /* pre-processed add constraint (local in * parser/parse_utilcmd.c) */ AT_AddIndexConstraint, /* add constraint using existing index */ @@ -1543,6 +1544,8 @@ typedef struct Constraint char fk_matchtype; /* FULL, PARTIAL, UNSPECIFIED */ char fk_upd_action; /* ON UPDATE action */ char fk_del_action; /* ON DELETE action */ + + /* Fields used for constraints that allow a NOT VALID specification */ bool skip_validation; /* skip validation of existing rows? */ bool initially_valid; /* mark the new constraint as valid? */ } Constraint; diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out index 9ab84f983ed..b78b1510efb 100644 --- a/src/test/regress/expected/alter_table.out +++ b/src/test/regress/expected/alter_table.out @@ -196,14 +196,115 @@ DELETE FROM tmp3 where a=5; -- Try (and succeed) and repeat to show it works on already valid constraint ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr; +-- Try a non-verified CHECK constraint +ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail +ERROR: check constraint "b_greater_than_ten" is violated by some row +ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails +ERROR: check constraint "b_greater_than_ten" is violated by some row +DELETE FROM tmp3 WHERE NOT b > 10; +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds +-- Test inherited NOT VALID CHECK constraints +select * from tmp3; + a | b +---+---- + 1 | 20 +(1 row) + +CREATE TABLE tmp6 () INHERITS (tmp3); +CREATE TABLE tmp7 () INHERITS (tmp3); +INSERT INTO tmp6 VALUES (6, 30), (7, 16); +ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID; +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails +ERROR: check constraint "b_le_20" is violated by some row +DELETE FROM tmp6 WHERE b > 20; +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds +-- An already validated constraint must not be revalidated +CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN $1; END; $$; +INSERT INTO tmp7 VALUES (8, 18); +ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b)); +NOTICE: boo: 18 +ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID; +NOTICE: merging constraint "identity" with inherited definition +ALTER TABLE tmp3 VALIDATE CONSTRAINT identity; +NOTICE: boo: 16 +NOTICE: boo: 20 -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- tmp4 is a,b ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; ERROR: there is no unique constraint matching given keys for referenced table "tmp4" +DROP TABLE tmp7; +DROP TABLE tmp6; DROP TABLE tmp5; DROP TABLE tmp4; DROP TABLE tmp3; DROP TABLE tmp2; +-- NOT VALID with plan invalidation -- ensure we don't use a constraint for +-- exclusion until validated +set constraint_exclusion TO 'partition'; +create table nv_parent (d date); +create table nv_child_2010 () inherits (nv_parent); +create table nv_child_2011 () inherits (nv_parent); +alter table nv_child_2010 add check (d between '2010-01-01'::date and '2010-12-31'::date) not valid; +alter table nv_child_2011 add check (d between '2011-01-01'::date and '2011-12-31'::date) not valid; +explain (costs off) select * from nv_parent where d between '2011-08-01' and '2011-08-31'; + QUERY PLAN +--------------------------------------------------------------------------------- + Result + -> Append + -> Seq Scan on nv_parent + Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date)) + -> Seq Scan on nv_child_2010 nv_parent + Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date)) + -> Seq Scan on nv_child_2011 nv_parent + Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date)) +(8 rows) + +create table nv_child_2009 (check (d between '2009-01-01'::date and '2009-12-31'::date)) inherits (nv_parent); +explain (costs off) select * from nv_parent where d between '2011-08-01'::date and '2011-08-31'::date; + QUERY PLAN +--------------------------------------------------------------------------------- + Result + -> Append + -> Seq Scan on nv_parent + Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date)) + -> Seq Scan on nv_child_2010 nv_parent + Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date)) + -> Seq Scan on nv_child_2011 nv_parent + Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date)) +(8 rows) + +explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date; + QUERY PLAN +--------------------------------------------------------------------------------- + Result + -> Append + -> Seq Scan on nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) + -> Seq Scan on nv_child_2010 nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) + -> Seq Scan on nv_child_2011 nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) + -> Seq Scan on nv_child_2009 nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) +(10 rows) + +-- after validation, the constraint should be used +alter table nv_child_2011 VALIDATE CONSTRAINT nv_child_2011_d_check; +explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date; + QUERY PLAN +--------------------------------------------------------------------------------- + Result + -> Append + -> Seq Scan on nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) + -> Seq Scan on nv_child_2010 nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) + -> Seq Scan on nv_child_2009 nv_parent + Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date)) +(8 rows) + -- Foreign key adding test with mixed types -- Note: these tables are TEMP to avoid name conflicts when this test -- is run in parallel with foreign_key.sql. diff --git a/src/test/regress/expected/domain.out b/src/test/regress/expected/domain.out index 34bc31ab380..521fe01fa17 100644 --- a/src/test/regress/expected/domain.out +++ b/src/test/regress/expected/domain.out @@ -352,6 +352,17 @@ alter domain con drop constraint t; insert into domcontest values (-5); --fails ERROR: value for domain con violates check constraint "con_check" insert into domcontest values (42); +-- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID +create domain things AS INT; +CREATE TABLE thethings (stuff things); +INSERT INTO thethings (stuff) VALUES (55); +ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11); +ERROR: column "stuff" of table "thethings" contains values that violate the new constraint +ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID; +ALTER DOMAIN things VALIDATE CONSTRAINT meow; +ERROR: column "stuff" of table "thethings" contains values that violate the new constraint +UPDATE thethings SET stuff = 10; +ALTER DOMAIN things VALIDATE CONSTRAINT meow; -- Confirm ALTER DOMAIN with RULES. create table domtab (col1 integer); create domain dom as integer; diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql index b5d76ea68e3..bb2c27718a0 100644 --- a/src/test/regress/sql/alter_table.sql +++ b/src/test/regress/sql/alter_table.sql @@ -236,12 +236,41 @@ DELETE FROM tmp3 where a=5; ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr; +-- Try a non-verified CHECK constraint +ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail +ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails +DELETE FROM tmp3 WHERE NOT b > 10; +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds + +-- Test inherited NOT VALID CHECK constraints +select * from tmp3; +CREATE TABLE tmp6 () INHERITS (tmp3); +CREATE TABLE tmp7 () INHERITS (tmp3); + +INSERT INTO tmp6 VALUES (6, 30), (7, 16); +ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID; +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails +DELETE FROM tmp6 WHERE b > 20; +ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds + +-- An already validated constraint must not be revalidated +CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN $1; END; $$; +INSERT INTO tmp7 VALUES (8, 18); +ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b)); +ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID; +ALTER TABLE tmp3 VALIDATE CONSTRAINT identity; -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- tmp4 is a,b ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; +DROP TABLE tmp7; + +DROP TABLE tmp6; + DROP TABLE tmp5; DROP TABLE tmp4; @@ -250,6 +279,23 @@ DROP TABLE tmp3; DROP TABLE tmp2; +-- NOT VALID with plan invalidation -- ensure we don't use a constraint for +-- exclusion until validated +set constraint_exclusion TO 'partition'; +create table nv_parent (d date); +create table nv_child_2010 () inherits (nv_parent); +create table nv_child_2011 () inherits (nv_parent); +alter table nv_child_2010 add check (d between '2010-01-01'::date and '2010-12-31'::date) not valid; +alter table nv_child_2011 add check (d between '2011-01-01'::date and '2011-12-31'::date) not valid; +explain (costs off) select * from nv_parent where d between '2011-08-01' and '2011-08-31'; +create table nv_child_2009 (check (d between '2009-01-01'::date and '2009-12-31'::date)) inherits (nv_parent); +explain (costs off) select * from nv_parent where d between '2011-08-01'::date and '2011-08-31'::date; +explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date; +-- after validation, the constraint should be used +alter table nv_child_2011 VALIDATE CONSTRAINT nv_child_2011_d_check; +explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date; + + -- Foreign key adding test with mixed types -- Note: these tables are TEMP to avoid name conflicts when this test diff --git a/src/test/regress/sql/domain.sql b/src/test/regress/sql/domain.sql index 1cc447b8a24..449b4234a5c 100644 --- a/src/test/regress/sql/domain.sql +++ b/src/test/regress/sql/domain.sql @@ -259,6 +259,16 @@ alter domain con drop constraint t; insert into domcontest values (-5); --fails insert into domcontest values (42); +-- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID +create domain things AS INT; +CREATE TABLE thethings (stuff things); +INSERT INTO thethings (stuff) VALUES (55); +ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11); +ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID; +ALTER DOMAIN things VALIDATE CONSTRAINT meow; +UPDATE thethings SET stuff = 10; +ALTER DOMAIN things VALIDATE CONSTRAINT meow; + -- Confirm ALTER DOMAIN with RULES. create table domtab (col1 integer); create domain dom as integer; |