aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c236
1 files changed, 202 insertions, 34 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b2ba11cc781..cfc685b9499 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel,
static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid,
const char *newNspName, LOCKMODE lockmode);
-static void ATExecValidateConstraint(Relation rel, char *constrName);
+static void ATExecValidateConstraint(Relation rel, char *constrName,
+ bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
Oid *opclasses);
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
+static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
@@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
cooked->name = NULL;
cooked->attnum = attnum;
cooked->expr = colDef->cooked_default;
+ cooked->skip_validation = false;
cooked->is_local = true; /* not used for defaults */
cooked->inhcount = 0; /* ditto */
cookedDefaults = lappend(cookedDefaults, cooked);
@@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
cooked->name = pstrdup(name);
cooked->attnum = 0; /* not used for constraints */
cooked->expr = expr;
+ cooked->skip_validation = false;
cooked->is_local = false;
cooked->inhcount = 1;
constraints = lappend(constraints, cooked);
@@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
break;
- case AT_ValidateConstraint:
+ case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
+ ATSimplePermissions(rel, ATT_TABLE);
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_ValidateConstraintRecurse;
+ pass = AT_PASS_MISC;
+ break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
@@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break;
- case AT_ValidateConstraint:
- ATExecValidateConstraint(rel, cmd->name);
+ case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
+ ATExecValidateConstraint(rel, cmd->name, false, false, lockmode);
+ break;
+ case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
+ * recursion */
+ ATExecValidateConstraint(rel, cmd->name, true, false, lockmode);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
@@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
list_make1(copyObject(constr)),
recursing, !recursing);
- /* Add each constraint to Phase 3's queue */
+ /* Add each to-be-validated constraint to Phase 3's queue */
foreach(lcon, newcons)
{
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
- NewConstraint *newcon;
- newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
- newcon->name = ccon->name;
- newcon->contype = ccon->contype;
- /* ExecQual wants implicit-AND format */
- newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+ if (!ccon->skip_validation)
+ {
+ NewConstraint *newcon;
- tab->constraints = lappend(tab->constraints, newcon);
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = ccon->name;
+ newcon->contype = ccon->contype;
+ /* ExecQual wants implicit-AND format */
+ newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
+
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
/* Save the actually assigned name if it was defaulted */
if (constr->conname == NULL)
@@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
/*
* ALTER TABLE VALIDATE CONSTRAINT
+ *
+ * XXX The reason we handle recursion here rather than at Phase 1 is because
+ * there's no good way to skip recursing when handling foreign keys: there is
+ * no need to lock children in that case, yet we wouldn't be able to avoid
+ * doing so at that level.
*/
static void
-ATExecValidateConstraint(Relation rel, char *constrName)
+ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
+ bool recursing, LOCKMODE lockmode)
{
Relation conrel;
SysScanDesc scan;
@@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName)
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
con = (Form_pg_constraint) GETSTRUCT(tuple);
- if (con->contype == CONSTRAINT_FOREIGN &&
- strcmp(NameStr(con->conname), constrName) == 0)
+ if (strcmp(NameStr(con->conname), constrName) == 0)
{
found = true;
break;
@@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName)
if (!found)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist",
+ errmsg("constraint \"%s\" of relation \"%s\" does not exist",
+ constrName, RelationGetRelationName(rel))));
+
+ if (con->contype != CONSTRAINT_FOREIGN &&
+ con->contype != CONSTRAINT_CHECK)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
constrName, RelationGetRelationName(rel))));
if (!con->convalidated)
{
- Oid conid = HeapTupleGetOid(tuple);
- HeapTuple copyTuple = heap_copytuple(tuple);
- Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
- Relation refrel;
+ HeapTuple copyTuple;
+ Form_pg_constraint copy_con;
- /*
- * Triggers are already in place on both tables, so a concurrent write
- * that alters the result here is not possible. Normally we can run a
- * query here to do the validation, which would only require
- * AccessShareLock. In some cases, it is possible that we might need
- * to fire triggers to perform the check, so we take a lock at
- * RowShareLock level just in case.
- */
- refrel = heap_open(con->confrelid, RowShareLock);
+ if (con->contype == CONSTRAINT_FOREIGN)
+ {
+ Oid conid = HeapTupleGetOid(tuple);
+ Relation refrel;
- validateForeignKeyConstraint(constrName, rel, refrel,
- con->conindid,
- conid);
+ /*
+ * Triggers are already in place on both tables, so a concurrent write
+ * that alters the result here is not possible. Normally we can run a
+ * query here to do the validation, which would only require
+ * AccessShareLock. In some cases, it is possible that we might need
+ * to fire triggers to perform the check, so we take a lock at
+ * RowShareLock level just in case.
+ */
+ refrel = heap_open(con->confrelid, RowShareLock);
+
+ validateForeignKeyConstraint(constrName, rel, refrel,
+ con->conindid,
+ conid);
+ heap_close(refrel, NoLock);
+
+ /*
+ * Foreign keys do not inherit, so we purposedly ignore the
+ * recursion bit here
+ */
+ }
+ else if (con->contype == CONSTRAINT_CHECK)
+ {
+ List *children = NIL;
+ ListCell *child;
+
+ /*
+ * If we're recursing, the parent has already done this, so skip
+ * it.
+ */
+ if (!recursing)
+ children = find_all_inheritors(RelationGetRelid(rel),
+ lockmode, NULL);
+
+ /*
+ * For CHECK constraints, we must ensure that we only mark the
+ * constraint as validated on the parent if it's already validated
+ * on the children.
+ *
+ * We recurse before validating on the parent, to reduce risk of
+ * deadlocks.
+ */
+ foreach(child, children)
+ {
+ Oid childoid = lfirst_oid(child);
+ Relation childrel;
+
+ if (childoid == RelationGetRelid(rel))
+ continue;
+
+ /*
+ * If we are told not to recurse, there had better not be any
+ * child tables; else the addition would put them out of step.
+ */
+ if (!recurse)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("constraint must be validated on child tables too")));
+
+ /* find_all_inheritors already got lock */
+ childrel = heap_open(childoid, NoLock);
+
+ ATExecValidateConstraint(childrel, constrName, false,
+ true, lockmode);
+ heap_close(childrel, NoLock);
+ }
+
+ validateCheckConstraint(rel, tuple);
+
+ /*
+ * Invalidate relcache so that others see the new validated
+ * constraint.
+ */
+ CacheInvalidateRelcache(rel);
+ }
/*
* Now update the catalog, while we have the door open.
*/
+ copyTuple = heap_copytuple(tuple);
+ copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
copy_con->convalidated = true;
simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple);
-
- heap_close(refrel, NoLock);
}
systable_endscan(scan);
@@ -6131,6 +6226,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
}
/*
+ * Scan the existing rows in a table to verify they meet a proposed
+ * CHECK constraint.
+ *
+ * The caller must have opened and locked the relation appropriately.
+ */
+static void
+validateCheckConstraint(Relation rel, HeapTuple constrtup)
+{
+ EState *estate;
+ Datum val;
+ char *conbin;
+ Expr *origexpr;
+ List *exprstate;
+ TupleDesc tupdesc;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ ExprContext *econtext;
+ MemoryContext oldcxt;
+ TupleTableSlot *slot;
+ Form_pg_constraint constrForm;
+ bool isnull;
+
+ constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
+
+ estate = CreateExecutorState();
+ /*
+ * XXX this tuple doesn't really come from a syscache, but this doesn't
+ * matter to SysCacheGetAttr, because it only wants to be able to fetch the
+ * tupdesc
+ */
+ val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u",
+ HeapTupleGetOid(constrtup));
+ conbin = TextDatumGetCString(val);
+ origexpr = (Expr *) stringToNode(conbin);
+ exprstate = (List *)
+ ExecPrepareExpr((Expr *) make_ands_implicit(origexpr), estate);
+
+ econtext = GetPerTupleExprContext(estate);
+ tupdesc = RelationGetDescr(rel);
+ slot = MakeSingleTupleTableSlot(tupdesc);
+ econtext->ecxt_scantuple = slot;
+
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+
+ /*
+ * Switch to per-tuple memory context and reset it for each tuple
+ * produced, so we don't leak memory.
+ */
+ oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ if (!ExecQual(exprstate, econtext, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("check constraint \"%s\" is violated by some row",
+ NameStr(constrForm->conname))));
+
+ ResetExprContext(econtext);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+ heap_endscan(scan);
+ ExecDropSingleTupleTableSlot(slot);
+ FreeExecutorState(estate);
+}
+
+/*
* Scan the existing rows in a table to verify they meet a proposed FK
* constraint.
*