diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 236 |
1 files changed, 202 insertions, 34 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index b2ba11cc781..cfc685b9499 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel, static void AlterSeqNamespaces(Relation classRel, Relation rel, Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode); -static void ATExecValidateConstraint(Relation rel, char *constrName); +static void ATExecValidateConstraint(Relation rel, char *constrName, + bool recurse, bool recursing, LOCKMODE lockmode); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel, int numattrs, int16 *attnums, Oid *opclasses); static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); +static void validateCheckConstraint(Relation rel, HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname, Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid); @@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) cooked->name = NULL; cooked->attnum = attnum; cooked->expr = colDef->cooked_default; + cooked->skip_validation = false; cooked->is_local = true; /* not used for defaults */ cooked->inhcount = 0; /* ditto */ cookedDefaults = lappend(cookedDefaults, cooked); @@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, cooked->name = pstrdup(name); cooked->attnum = 0; /* not used for constraints */ cooked->expr = expr; + cooked->skip_validation = false; cooked->is_local = false; cooked->inhcount = 1; constraints = lappend(constraints, cooked); @@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAddInherit(rel); pass = AT_PASS_MISC; break; - case AT_ValidateConstraint: + case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */ + ATSimplePermissions(rel, ATT_TABLE); + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_ValidateConstraintRecurse; + pass = AT_PASS_MISC; + break; case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableAlwaysTrig: case AT_EnableReplicaTrig: @@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode); break; - case AT_ValidateConstraint: - ATExecValidateConstraint(rel, cmd->name); + case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */ + ATExecValidateConstraint(rel, cmd->name, false, false, lockmode); + break; + case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with + * recursion */ + ATExecValidateConstraint(rel, cmd->name, true, false, lockmode); break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, @@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, list_make1(copyObject(constr)), recursing, !recursing); - /* Add each constraint to Phase 3's queue */ + /* Add each to-be-validated constraint to Phase 3's queue */ foreach(lcon, newcons) { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - NewConstraint *newcon; - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = ccon->name; - newcon->contype = ccon->contype; - /* ExecQual wants implicit-AND format */ - newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); + if (!ccon->skip_validation) + { + NewConstraint *newcon; - tab->constraints = lappend(tab->constraints, newcon); + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = ccon->name; + newcon->contype = ccon->contype; + /* ExecQual wants implicit-AND format */ + newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); + + tab->constraints = lappend(tab->constraints, newcon); + } /* Save the actually assigned name if it was defaulted */ if (constr->conname == NULL) @@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * ALTER TABLE VALIDATE CONSTRAINT + * + * XXX The reason we handle recursion here rather than at Phase 1 is because + * there's no good way to skip recursing when handling foreign keys: there is + * no need to lock children in that case, yet we wouldn't be able to avoid + * doing so at that level. */ static void -ATExecValidateConstraint(Relation rel, char *constrName) +ATExecValidateConstraint(Relation rel, char *constrName, bool recurse, + bool recursing, LOCKMODE lockmode) { Relation conrel; SysScanDesc scan; @@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName) while (HeapTupleIsValid(tuple = systable_getnext(scan))) { con = (Form_pg_constraint) GETSTRUCT(tuple); - if (con->contype == CONSTRAINT_FOREIGN && - strcmp(NameStr(con->conname), constrName) == 0) + if (strcmp(NameStr(con->conname), constrName) == 0) { found = true; break; @@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName) if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist", + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); + + if (con->contype != CONSTRAINT_FOREIGN && + con->contype != CONSTRAINT_CHECK) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint", constrName, RelationGetRelationName(rel)))); if (!con->convalidated) { - Oid conid = HeapTupleGetOid(tuple); - HeapTuple copyTuple = heap_copytuple(tuple); - Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); - Relation refrel; + HeapTuple copyTuple; + Form_pg_constraint copy_con; - /* - * Triggers are already in place on both tables, so a concurrent write - * that alters the result here is not possible. Normally we can run a - * query here to do the validation, which would only require - * AccessShareLock. In some cases, it is possible that we might need - * to fire triggers to perform the check, so we take a lock at - * RowShareLock level just in case. - */ - refrel = heap_open(con->confrelid, RowShareLock); + if (con->contype == CONSTRAINT_FOREIGN) + { + Oid conid = HeapTupleGetOid(tuple); + Relation refrel; - validateForeignKeyConstraint(constrName, rel, refrel, - con->conindid, - conid); + /* + * Triggers are already in place on both tables, so a concurrent write + * that alters the result here is not possible. Normally we can run a + * query here to do the validation, which would only require + * AccessShareLock. In some cases, it is possible that we might need + * to fire triggers to perform the check, so we take a lock at + * RowShareLock level just in case. + */ + refrel = heap_open(con->confrelid, RowShareLock); + + validateForeignKeyConstraint(constrName, rel, refrel, + con->conindid, + conid); + heap_close(refrel, NoLock); + + /* + * Foreign keys do not inherit, so we purposedly ignore the + * recursion bit here + */ + } + else if (con->contype == CONSTRAINT_CHECK) + { + List *children = NIL; + ListCell *child; + + /* + * If we're recursing, the parent has already done this, so skip + * it. + */ + if (!recursing) + children = find_all_inheritors(RelationGetRelid(rel), + lockmode, NULL); + + /* + * For CHECK constraints, we must ensure that we only mark the + * constraint as validated on the parent if it's already validated + * on the children. + * + * We recurse before validating on the parent, to reduce risk of + * deadlocks. + */ + foreach(child, children) + { + Oid childoid = lfirst_oid(child); + Relation childrel; + + if (childoid == RelationGetRelid(rel)) + continue; + + /* + * If we are told not to recurse, there had better not be any + * child tables; else the addition would put them out of step. + */ + if (!recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be validated on child tables too"))); + + /* find_all_inheritors already got lock */ + childrel = heap_open(childoid, NoLock); + + ATExecValidateConstraint(childrel, constrName, false, + true, lockmode); + heap_close(childrel, NoLock); + } + + validateCheckConstraint(rel, tuple); + + /* + * Invalidate relcache so that others see the new validated + * constraint. + */ + CacheInvalidateRelcache(rel); + } /* * Now update the catalog, while we have the door open. */ + copyTuple = heap_copytuple(tuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); copy_con->convalidated = true; simple_heap_update(conrel, ©Tuple->t_self, copyTuple); CatalogUpdateIndexes(conrel, copyTuple); heap_freetuple(copyTuple); - - heap_close(refrel, NoLock); } systable_endscan(scan); @@ -6131,6 +6226,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts) } /* + * Scan the existing rows in a table to verify they meet a proposed + * CHECK constraint. + * + * The caller must have opened and locked the relation appropriately. + */ +static void +validateCheckConstraint(Relation rel, HeapTuple constrtup) +{ + EState *estate; + Datum val; + char *conbin; + Expr *origexpr; + List *exprstate; + TupleDesc tupdesc; + HeapScanDesc scan; + HeapTuple tuple; + ExprContext *econtext; + MemoryContext oldcxt; + TupleTableSlot *slot; + Form_pg_constraint constrForm; + bool isnull; + + constrForm = (Form_pg_constraint) GETSTRUCT(constrtup); + + estate = CreateExecutorState(); + /* + * XXX this tuple doesn't really come from a syscache, but this doesn't + * matter to SysCacheGetAttr, because it only wants to be able to fetch the + * tupdesc + */ + val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin, + &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", + HeapTupleGetOid(constrtup)); + conbin = TextDatumGetCString(val); + origexpr = (Expr *) stringToNode(conbin); + exprstate = (List *) + ExecPrepareExpr((Expr *) make_ands_implicit(origexpr), estate); + + econtext = GetPerTupleExprContext(estate); + tupdesc = RelationGetDescr(rel); + slot = MakeSingleTupleTableSlot(tupdesc); + econtext->ecxt_scantuple = slot; + + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + + /* + * Switch to per-tuple memory context and reset it for each tuple + * produced, so we don't leak memory. + */ + oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + if (!ExecQual(exprstate, econtext, true)) + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("check constraint \"%s\" is violated by some row", + NameStr(constrForm->conname)))); + + ResetExprContext(econtext); + } + + MemoryContextSwitchTo(oldcxt); + heap_endscan(scan); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); +} + +/* * Scan the existing rows in a table to verify they meet a proposed FK * constraint. * |