aboutsummaryrefslogtreecommitdiff
path: root/src/backend/catalog/heap.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/catalog/heap.c')
-rw-r--r--src/backend/catalog/heap.c372
1 files changed, 234 insertions, 138 deletions
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 567280e1529..d12e63445a5 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.332 2008/03/27 03:57:33 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.333 2008/05/09 23:32:04 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -77,9 +77,15 @@ static Oid AddNewRelationType(const char *typeName,
char new_rel_kind,
Oid new_array_type);
static void RelationRemoveInheritance(Oid relid);
-static void StoreRelCheck(Relation rel, char *ccname, char *ccbin);
-static void StoreConstraints(Relation rel, TupleDesc tupdesc);
+static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
+ bool is_local, int inhcount);
+static void StoreConstraints(Relation rel, List *cooked_constraints);
+static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+ bool allow_merge, bool is_local);
static void SetRelationNumChecks(Relation rel, int numchecks);
+static Node *cookConstraint(ParseState *pstate,
+ Node *raw_constraint,
+ char *relname);
static List *insert_ordered_unique_oid(List *list, Oid datum);
@@ -788,6 +794,7 @@ heap_create_with_catalog(const char *relname,
Oid relid,
Oid ownerid,
TupleDesc tupdesc,
+ List *cooked_constraints,
char relkind,
bool shared_relation,
bool oidislocal,
@@ -1004,13 +1011,13 @@ heap_create_with_catalog(const char *relname,
}
/*
- * store constraints and defaults passed in the tupdesc, if any.
+ * Store any supplied constraints and defaults.
*
* NB: this may do a CommandCounterIncrement and rebuild the relcache
* entry, so the relation must be valid and self-consistent at this point.
* In particular, there are not yet constraints and defaults anywhere.
*/
- StoreConstraints(new_rel_desc, tupdesc);
+ StoreConstraints(new_rel_desc, cooked_constraints);
/*
* If there's a special on-commit action, remember it
@@ -1426,12 +1433,11 @@ heap_drop_with_catalog(Oid relid)
/*
* Store a default expression for column attnum of relation rel.
- * The expression must be presented as a nodeToString() string.
*/
void
-StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
+StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
{
- Node *expr;
+ char *adbin;
char *adsrc;
Relation adrel;
HeapTuple tuple;
@@ -1445,12 +1451,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
defobject;
/*
- * Need to construct source equivalent of given node-string.
+ * Flatten expression to string form for storage.
*/
- expr = stringToNode(adbin);
+ adbin = nodeToString(expr);
/*
- * deparse it
+ * Also deparse it to form the mostly-obsolete adsrc field.
*/
adsrc = deparse_expression(expr,
deparse_context_for(RelationGetRelationName(rel),
@@ -1482,6 +1488,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
heap_freetuple(tuple);
+ pfree(adbin);
pfree(adsrc);
/*
@@ -1525,27 +1532,27 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
/*
* Store a check-constraint expression for the given relation.
- * The expression must be presented as a nodeToString() string.
*
* Caller is responsible for updating the count of constraints
* in the pg_class entry for the relation.
*/
static void
-StoreRelCheck(Relation rel, char *ccname, char *ccbin)
+StoreRelCheck(Relation rel, char *ccname, Node *expr,
+ bool is_local, int inhcount)
{
- Node *expr;
+ char *ccbin;
char *ccsrc;
List *varList;
int keycount;
int16 *attNos;
/*
- * Convert condition to an expression tree.
+ * Flatten expression to string form for storage.
*/
- expr = stringToNode(ccbin);
+ ccbin = nodeToString(expr);
/*
- * deparse it
+ * Also deparse it to form the mostly-obsolete consrc field.
*/
ccsrc = deparse_expression(expr,
deparse_context_for(RelationGetRelationName(rel),
@@ -1553,7 +1560,7 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
false, false);
/*
- * Find columns of rel that are used in ccbin
+ * Find columns of rel that are used in expr
*
* NB: pull_var_clause is okay here only because we don't allow subselects
* in check constraints; it would fail to examine the contents of
@@ -1608,26 +1615,29 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
InvalidOid, /* no associated index */
expr, /* Tree form check constraint */
ccbin, /* Binary form check constraint */
- ccsrc); /* Source form check constraint */
+ ccsrc, /* Source form check constraint */
+ is_local, /* conislocal */
+ inhcount); /* coninhcount */
+ pfree(ccbin);
pfree(ccsrc);
}
/*
- * Store defaults and constraints passed in via the tuple constraint struct.
+ * Store defaults and constraints (passed as a list of CookedConstraint).
*
* NOTE: only pre-cooked expressions will be passed this way, which is to
* say expressions inherited from an existing relation. Newly parsed
* expressions can be added later, by direct calls to StoreAttrDefault
- * and StoreRelCheck (see AddRelationRawConstraints()).
+ * and StoreRelCheck (see AddRelationNewConstraints()).
*/
static void
-StoreConstraints(Relation rel, TupleDesc tupdesc)
+StoreConstraints(Relation rel, List *cooked_constraints)
{
- TupleConstr *constr = tupdesc->constr;
- int i;
+ int numchecks = 0;
+ ListCell *lc;
- if (!constr)
+ if (!cooked_constraints)
return; /* nothing to do */
/*
@@ -1637,33 +1647,46 @@ StoreConstraints(Relation rel, TupleDesc tupdesc)
*/
CommandCounterIncrement();
- for (i = 0; i < constr->num_defval; i++)
- StoreAttrDefault(rel, constr->defval[i].adnum,
- constr->defval[i].adbin);
+ foreach(lc, cooked_constraints)
+ {
+ CookedConstraint *con = (CookedConstraint *) lfirst(lc);
- for (i = 0; i < constr->num_check; i++)
- StoreRelCheck(rel, constr->check[i].ccname,
- constr->check[i].ccbin);
+ switch (con->contype)
+ {
+ case CONSTR_DEFAULT:
+ StoreAttrDefault(rel, con->attnum, con->expr);
+ break;
+ case CONSTR_CHECK:
+ StoreRelCheck(rel, con->name, con->expr,
+ con->is_local, con->inhcount);
+ numchecks++;
+ break;
+ default:
+ elog(ERROR, "unrecognized constraint type: %d",
+ (int) con->contype);
+ }
+ }
- if (constr->num_check > 0)
- SetRelationNumChecks(rel, constr->num_check);
+ if (numchecks > 0)
+ SetRelationNumChecks(rel, numchecks);
}
/*
- * AddRelationRawConstraints
+ * AddRelationNewConstraints
*
- * Add raw (not-yet-transformed) column default expressions and/or constraint
- * check expressions to an existing relation. This is defined to do both
- * for efficiency in DefineRelation, but of course you can do just one or
- * the other by passing empty lists.
+ * Add new column default expressions and/or constraint check expressions
+ * to an existing relation. This is defined to do both for efficiency in
+ * DefineRelation, but of course you can do just one or the other by passing
+ * empty lists.
*
* rel: relation to be modified
- * rawColDefaults: list of RawColumnDefault structures
- * rawConstraints: list of Constraint nodes
+ * newColDefaults: list of RawColumnDefault structures
+ * newConstraints: list of Constraint nodes
+ * allow_merge: TRUE if check constraints may be merged with existing ones
+ * is_local: TRUE if definition is local, FALSE if it's inherited
*
- * All entries in rawColDefaults will be processed. Entries in rawConstraints
- * will be processed only if they are CONSTR_CHECK type and contain a "raw"
- * expression.
+ * All entries in newColDefaults will be processed. Entries in newConstraints
+ * will be processed only if they are CONSTR_CHECK type.
*
* Returns a list of CookedConstraint nodes that shows the cooked form of
* the default and constraint expressions added to the relation.
@@ -1674,9 +1697,11 @@ StoreConstraints(Relation rel, TupleDesc tupdesc)
* tuples visible.
*/
List *
-AddRelationRawConstraints(Relation rel,
- List *rawColDefaults,
- List *rawConstraints)
+AddRelationNewConstraints(Relation rel,
+ List *newColDefaults,
+ List *newConstraints,
+ bool allow_merge,
+ bool is_local)
{
List *cookedConstraints = NIL;
TupleDesc tupleDesc;
@@ -1715,7 +1740,7 @@ AddRelationRawConstraints(Relation rel,
/*
* Process column default expressions.
*/
- foreach(cell, rawColDefaults)
+ foreach(cell, newColDefaults)
{
RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
@@ -1739,13 +1764,15 @@ AddRelationRawConstraints(Relation rel,
(IsA(expr, Const) &&((Const *) expr)->constisnull))
continue;
- StoreAttrDefault(rel, colDef->attnum, nodeToString(expr));
+ StoreAttrDefault(rel, colDef->attnum, expr);
cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
cooked->contype = CONSTR_DEFAULT;
cooked->name = NULL;
cooked->attnum = colDef->attnum;
cooked->expr = expr;
+ cooked->is_local = is_local;
+ cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
}
@@ -1754,45 +1781,35 @@ AddRelationRawConstraints(Relation rel,
*/
numchecks = numoldchecks;
checknames = NIL;
- foreach(cell, rawConstraints)
+ foreach(cell, newConstraints)
{
Constraint *cdef = (Constraint *) lfirst(cell);
char *ccname;
- if (cdef->contype != CONSTR_CHECK || cdef->raw_expr == NULL)
+ if (cdef->contype != CONSTR_CHECK)
continue;
- Assert(cdef->cooked_expr == NULL);
- /*
- * Transform raw parsetree to executable expression.
- */
- expr = transformExpr(pstate, cdef->raw_expr);
-
- /*
- * Make sure it yields a boolean result.
- */
- expr = coerce_to_boolean(pstate, expr, "CHECK");
+ if (cdef->raw_expr != NULL)
+ {
+ Assert(cdef->cooked_expr == NULL);
- /*
- * Make sure no outside relations are referred to.
- */
- if (list_length(pstate->p_rtable) != 1)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("only table \"%s\" can be referenced in check constraint",
- RelationGetRelationName(rel))));
+ /*
+ * Transform raw parsetree to executable expression, and verify
+ * it's valid as a CHECK constraint.
+ */
+ expr = cookConstraint(pstate, cdef->raw_expr,
+ RelationGetRelationName(rel));
+ }
+ else
+ {
+ Assert(cdef->cooked_expr != NULL);
- /*
- * No subplans or aggregates, either...
- */
- if (pstate->p_hasSubLinks)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot use subquery in check constraint")));
- if (pstate->p_hasAggs)
- ereport(ERROR,
- (errcode(ERRCODE_GROUPING_ERROR),
- errmsg("cannot use aggregate function in check constraint")));
+ /*
+ * Here, we assume the parser will only pass us valid CHECK
+ * expressions, so we do no particular checking.
+ */
+ expr = stringToNode(cdef->cooked_expr);
+ }
/*
* Check name uniqueness, or generate a name if none was given.
@@ -1802,15 +1819,6 @@ AddRelationRawConstraints(Relation rel,
ListCell *cell2;
ccname = cdef->name;
- /* Check against pre-existing constraints */
- if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- ccname))
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("constraint \"%s\" for relation \"%s\" already exists",
- ccname, RelationGetRelationName(rel))));
/* Check against other new constraints */
/* Needed because we don't do CommandCounterIncrement in loop */
foreach(cell2, checknames)
@@ -1821,6 +1829,19 @@ AddRelationRawConstraints(Relation rel,
errmsg("check constraint \"%s\" already exists",
ccname)));
}
+
+ /* save name for future checks */
+ checknames = lappend(checknames, ccname);
+
+ /*
+ * Check against pre-existing constraints. If we are allowed
+ * to merge with an existing constraint, there's no more to
+ * do here. (We omit the duplicate constraint from the result,
+ * which is what ATAddCheckConstraint wants.)
+ */
+ if (MergeWithExistingConstraint(rel, ccname, expr,
+ allow_merge, is_local))
+ continue;
}
else
{
@@ -1855,15 +1876,15 @@ AddRelationRawConstraints(Relation rel,
"check",
RelationGetNamespace(rel),
checknames);
- }
- /* save name for future checks */
- checknames = lappend(checknames, ccname);
+ /* save name for future checks */
+ checknames = lappend(checknames, ccname);
+ }
/*
* OK, store it.
*/
- StoreRelCheck(rel, ccname, nodeToString(expr));
+ StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
numchecks++;
@@ -1872,6 +1893,8 @@ AddRelationRawConstraints(Relation rel,
cooked->name = ccname;
cooked->attnum = 0;
cooked->expr = expr;
+ cooked->is_local = is_local;
+ cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
}
@@ -1888,6 +1911,90 @@ AddRelationRawConstraints(Relation rel,
}
/*
+ * Check for a pre-existing check constraint that conflicts with a proposed
+ * new one, and either adjust its conislocal/coninhcount settings or throw
+ * error as needed.
+ *
+ * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
+ * got a so-far-unique name, or throws error if conflict.
+ */
+static bool
+MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+ bool allow_merge, bool is_local)
+{
+ bool found;
+ Relation conDesc;
+ SysScanDesc conscan;
+ ScanKeyData skey[2];
+ HeapTuple tup;
+
+ /* Search for a pg_constraint entry with same name and relation */
+ conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
+
+ found = false;
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(ccname));
+
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_connamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetNamespace(rel)));
+
+ conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
+ SnapshotNow, 2, skey);
+
+ while (HeapTupleIsValid(tup = systable_getnext(conscan)))
+ {
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+ if (con->conrelid == RelationGetRelid(rel))
+ {
+ /* Found it. Conflicts if not identical check constraint */
+ if (con->contype == CONSTRAINT_CHECK)
+ {
+ Datum val;
+ bool isnull;
+
+ val = fastgetattr(tup,
+ Anum_pg_constraint_conbin,
+ conDesc->rd_att, &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for rel %s",
+ RelationGetRelationName(rel));
+ if (equal(expr, stringToNode(TextDatumGetCString(val))))
+ found = true;
+ }
+ if (!found || !allow_merge)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("constraint \"%s\" for relation \"%s\" already exists",
+ ccname, RelationGetRelationName(rel))));
+ /* OK to update the tuple */
+ ereport(NOTICE,
+ (errmsg("merging constraint \"%s\" with inherited definition",
+ ccname)));
+ tup = heap_copytuple(tup);
+ con = (Form_pg_constraint) GETSTRUCT(tup);
+ if (is_local)
+ con->conislocal = true;
+ else
+ con->coninhcount++;
+ simple_heap_update(conDesc, &tup->t_self, tup);
+ CatalogUpdateIndexes(conDesc, tup);
+ break;
+ }
+ }
+
+ systable_endscan(conscan);
+ heap_close(conDesc, RowExclusiveLock);
+
+ return found;
+}
+
+/*
* Update the count of constraints in the relation's pg_class tuple.
*
* Caller had better hold exclusive lock on the relation.
@@ -2015,63 +2122,52 @@ cookDefault(ParseState *pstate,
return expr;
}
-
/*
- * Removes all constraints on a relation that match the given name.
- *
- * It is the responsibility of the calling function to acquire a suitable
- * lock on the relation.
+ * Take a raw CHECK constraint expression and convert it to a cooked format
+ * ready for storage.
*
- * Returns: The number of constraints removed.
+ * Parse state must be set up to recognize any vars that might appear
+ * in the expression.
*/
-int
-RemoveRelConstraints(Relation rel, const char *constrName,
- DropBehavior behavior)
+static Node *
+cookConstraint(ParseState *pstate,
+ Node *raw_constraint,
+ char *relname)
{
- int ndeleted = 0;
- Relation conrel;
- SysScanDesc conscan;
- ScanKeyData key[1];
- HeapTuple contup;
-
- /* Grab an appropriate lock on the pg_constraint relation */
- conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
-
- /* Use the index to scan only constraints of the target relation */
- ScanKeyInit(&key[0],
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(rel)));
-
- conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
- SnapshotNow, 1, key);
+ Node *expr;
/*
- * Scan over the result set, removing any matching entries.
+ * Transform raw parsetree to executable expression.
*/
- while ((contup = systable_getnext(conscan)) != NULL)
- {
- Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(contup);
+ expr = transformExpr(pstate, raw_constraint);
- if (strcmp(NameStr(con->conname), constrName) == 0)
- {
- ObjectAddress conobj;
-
- conobj.classId = ConstraintRelationId;
- conobj.objectId = HeapTupleGetOid(contup);
- conobj.objectSubId = 0;
-
- performDeletion(&conobj, behavior);
+ /*
+ * Make sure it yields a boolean result.
+ */
+ expr = coerce_to_boolean(pstate, expr, "CHECK");
- ndeleted++;
- }
- }
+ /*
+ * Make sure no outside relations are referred to.
+ */
+ if (list_length(pstate->p_rtable) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("only table \"%s\" can be referenced in check constraint",
+ relname)));
- /* Clean up after the scan */
- systable_endscan(conscan);
- heap_close(conrel, RowExclusiveLock);
+ /*
+ * No subplans or aggregates, either...
+ */
+ if (pstate->p_hasSubLinks)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use subquery in check constraint")));
+ if (pstate->p_hasAggs)
+ ereport(ERROR,
+ (errcode(ERRCODE_GROUPING_ERROR),
+ errmsg("cannot use aggregate function in check constraint")));
- return ndeleted;
+ return expr;
}