aboutsummaryrefslogtreecommitdiff
path: root/src/backend/parser/parse_utilcmd.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2009-12-07 05:22:23 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2009-12-07 05:22:23 +0000
commit0cb65564e5f855b1e9aa145fd645352130f74646 (patch)
treebadcc3ee73a16d472f9e637246589d6b803e620f /src/backend/parser/parse_utilcmd.c
parent8de7472b45859108761223fb19b396efaa8f0a4d (diff)
downloadpostgresql-0cb65564e5f855b1e9aa145fd645352130f74646.tar.gz
postgresql-0cb65564e5f855b1e9aa145fd645352130f74646.zip
Add exclusion constraints, which generalize the concept of uniqueness to
support any indexable commutative operator, not just equality. Two rows violate the exclusion constraint if "row1.col OP row2.col" is TRUE for each of the columns in the constraint. Jeff Davis, reviewed by Robert Haas
Diffstat (limited to 'src/backend/parser/parse_utilcmd.c')
-rw-r--r--src/backend/parser/parse_utilcmd.c138
1 files changed, 116 insertions, 22 deletions
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 18f7e5c556e..f21fcb61044 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -19,7 +19,7 @@
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.30 2009/11/13 23:49:23 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.31 2009/12/07 05:22:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -35,6 +35,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_opclass.h"
+#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/comment.h"
#include "commands/defrem.h"
@@ -456,6 +457,10 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
saw_default = true;
break;
+ case CONSTR_CHECK:
+ cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
+ break;
+
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
if (constraint->keys == NIL)
@@ -463,8 +468,9 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
break;
- case CONSTR_CHECK:
- cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
+ case CONSTR_EXCLUSION:
+ /* grammar does not allow EXCLUDE as a column constraint */
+ elog(ERROR, "column exclusion constraints are not supported");
break;
case CONSTR_FOREIGN:
@@ -503,6 +509,7 @@ transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt,
{
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
+ case CONSTR_EXCLUSION:
cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
break;
@@ -814,7 +821,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
/*
* chooseIndexName
*
- * Set name to unnamed index. See also the same logic in DefineIndex.
+ * Set name for unnamed index. See also the same logic in DefineIndex.
*/
static char *
chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt)
@@ -828,6 +835,13 @@ chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt)
return ChooseRelationName(relation->relname, NULL,
"pkey", namespaceId);
}
+ else if (index_stmt->excludeOpNames != NIL)
+ {
+ IndexElem *iparam = (IndexElem *) linitial(index_stmt->indexParams);
+
+ return ChooseRelationName(relation->relname, iparam->name,
+ "exclusion", namespaceId);
+ }
else
{
IndexElem *iparam = (IndexElem *) linitial(index_stmt->indexParams);
@@ -880,7 +894,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
/* Fetch pg_am tuple for source index from relcache entry */
amrec = source_idx->rd_am;
- /* Must get indclass the hard way, since it's not stored in relcache */
+ /* Extract indclass from the pg_index tuple */
datum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
@@ -905,12 +919,12 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
index->idxname = NULL;
/*
- * If the index is marked PRIMARY, it's certainly from a constraint; else,
- * if it's not marked UNIQUE, it certainly isn't. If it is or might be
- * from a constraint, we have to fetch the constraint to check for
- * deferrability attributes.
+ * If the index is marked PRIMARY or has an exclusion condition, it's
+ * certainly from a constraint; else, if it's not marked UNIQUE, it
+ * certainly isn't. If it is or might be from a constraint, we have to
+ * fetch the pg_constraint record.
*/
- if (index->primary || index->unique)
+ if (index->primary || index->unique || idxrelrec->relhasexclusion)
{
Oid constraintId = get_index_constraint(source_relid);
@@ -931,6 +945,53 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
index->deferrable = conrec->condeferrable;
index->initdeferred = conrec->condeferred;
+ /* If it's an exclusion constraint, we need the operator names */
+ if (idxrelrec->relhasexclusion)
+ {
+ Datum *elems;
+ int nElems;
+ int i;
+
+ Assert(conrec->contype == CONSTRAINT_EXCLUSION);
+ /* Extract operator OIDs from the pg_constraint tuple */
+ datum = SysCacheGetAttr(CONSTROID, ht_constr,
+ Anum_pg_constraint_conexclop,
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null conexclop for constraint %u",
+ constraintId);
+
+ deconstruct_array(DatumGetArrayTypeP(datum),
+ OIDOID, sizeof(Oid), true, 'i',
+ &elems, NULL, &nElems);
+
+ for (i = 0; i < nElems; i++)
+ {
+ Oid operid = DatumGetObjectId(elems[i]);
+ HeapTuple opertup;
+ Form_pg_operator operform;
+ char *oprname;
+ char *nspname;
+ List *namelist;
+
+ opertup = SearchSysCache(OPEROID,
+ ObjectIdGetDatum(operid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(opertup))
+ elog(ERROR, "cache lookup failed for operator %u",
+ operid);
+ operform = (Form_pg_operator) GETSTRUCT(opertup);
+ oprname = pstrdup(NameStr(operform->oprname));
+ /* For simplicity we always schema-qualify the op name */
+ nspname = get_namespace_name(operform->oprnamespace);
+ namelist = list_make2(makeString(nspname),
+ makeString(oprname));
+ index->excludeOpNames = lappend(index->excludeOpNames,
+ namelist);
+ ReleaseSysCache(opertup);
+ }
+ }
+
ReleaseSysCache(ht_constr);
}
else
@@ -1087,7 +1148,7 @@ get_opclass(Oid opclass, Oid actual_datatype)
/*
* transformIndexConstraints
- * Handle UNIQUE and PRIMARY KEY constraints, which create indexes.
+ * Handle UNIQUE, PRIMARY KEY, EXCLUDE constraints, which create indexes.
* We also merge in any index definitions arising from
* LIKE ... INCLUDING INDEXES.
*/
@@ -1100,8 +1161,9 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
/*
* Run through the constraints that need to generate an index. For PRIMARY
- * KEY, mark each column as NOT NULL and create an index. For UNIQUE,
- * create an index as for PRIMARY KEY, but do not insist on NOT NULL.
+ * KEY, mark each column as NOT NULL and create an index. For UNIQUE or
+ * EXCLUDE, create an index as for PRIMARY KEY, but do not insist on NOT
+ * NULL.
*/
foreach(lc, cxt->ixconstraints)
{
@@ -1109,7 +1171,8 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
Assert(IsA(constraint, Constraint));
Assert(constraint->contype == CONSTR_PRIMARY ||
- constraint->contype == CONSTR_UNIQUE);
+ constraint->contype == CONSTR_UNIQUE ||
+ constraint->contype == CONSTR_EXCLUSION);
index = transformIndexConstraint(constraint, cxt);
@@ -1167,6 +1230,7 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
if (equal(index->indexParams, priorindex->indexParams) &&
equal(index->whereClause, priorindex->whereClause) &&
+ equal(index->excludeOpNames, priorindex->excludeOpNames) &&
strcmp(index->accessMethod, priorindex->accessMethod) == 0 &&
index->deferrable == priorindex->deferrable &&
index->initdeferred == priorindex->initdeferred)
@@ -1193,19 +1257,18 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
/*
* transformIndexConstraint
- * Transform one UNIQUE or PRIMARY KEY constraint for
+ * Transform one UNIQUE, PRIMARY KEY, or EXCLUDE constraint for
* transformIndexConstraints.
*/
static IndexStmt *
transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
IndexStmt *index;
- ListCell *keys;
- IndexElem *iparam;
+ ListCell *lc;
index = makeNode(IndexStmt);
- index->unique = true;
+ index->unique = (constraint->contype != CONSTR_EXCLUSION);
index->primary = (constraint->contype == CONSTR_PRIMARY);
if (index->primary)
{
@@ -1231,25 +1294,55 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->idxname = NULL; /* DefineIndex will choose name */
index->relation = cxt->relation;
- index->accessMethod = DEFAULT_INDEX_TYPE;
+ index->accessMethod = constraint->access_method ? constraint->access_method : DEFAULT_INDEX_TYPE;
index->options = constraint->options;
index->tableSpace = constraint->indexspace;
+ index->whereClause = constraint->where_clause;
index->indexParams = NIL;
- index->whereClause = NULL;
+ index->excludeOpNames = NIL;
index->concurrent = false;
/*
+ * If it's an EXCLUDE constraint, the grammar returns a list of pairs
+ * of IndexElems and operator names. We have to break that apart into
+ * separate lists.
+ */
+ if (constraint->contype == CONSTR_EXCLUSION)
+ {
+ foreach(lc, constraint->exclusions)
+ {
+ List *pair = (List *) lfirst(lc);
+ IndexElem *elem;
+ List *opname;
+
+ Assert(list_length(pair) == 2);
+ elem = (IndexElem *) linitial(pair);
+ Assert(IsA(elem, IndexElem));
+ opname = (List *) lsecond(pair);
+ Assert(IsA(opname, List));
+
+ index->indexParams = lappend(index->indexParams, elem);
+ index->excludeOpNames = lappend(index->excludeOpNames, opname);
+ }
+
+ return index;
+ }
+
+ /*
+ * For UNIQUE and PRIMARY KEY, we just have a list of column names.
+ *
* Make sure referenced keys exist. If we are making a PRIMARY KEY index,
* also make sure they are NOT NULL, if possible. (Although we could leave
* it to DefineIndex to mark the columns NOT NULL, it's more efficient to
* get it right the first time.)
*/
- foreach(keys, constraint->keys)
+ foreach(lc, constraint->keys)
{
- char *key = strVal(lfirst(keys));
+ char *key = strVal(lfirst(lc));
bool found = false;
ColumnDef *column = NULL;
ListCell *columns;
+ IndexElem *iparam;
foreach(columns, cxt->columns)
{
@@ -2000,6 +2093,7 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
((node) != NULL && \
((node)->contype == CONSTR_PRIMARY || \
(node)->contype == CONSTR_UNIQUE || \
+ (node)->contype == CONSTR_EXCLUSION || \
(node)->contype == CONSTR_FOREIGN))
foreach(clist, constraintList)