diff options
Diffstat (limited to 'src/backend/parser/parse_utilcmd.c')
-rw-r--r-- | src/backend/parser/parse_utilcmd.c | 257 |
1 files changed, 132 insertions, 125 deletions
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 2ff6f9274d7..a53f01c32ab 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -19,7 +19,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.6 2007/11/15 21:14:37 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.7 2007/12/01 23:44:44 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -28,6 +28,8 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/reloptions.h" +#include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/namespace.h" @@ -675,13 +677,15 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } + /* + * Likewise, copy indexes if requested + */ if (including_indexes && relation->rd_rel->relhasindex) { - AttrNumber *attmap; + AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); List *parent_indexes; ListCell *l; - attmap = varattnos_map_schema(tupleDesc, cxt->columns); parent_indexes = RelationGetIndexList(relation); foreach(l, parent_indexes) @@ -693,14 +697,12 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt, parent_index, - attmap); + index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap); - /* Add the new IndexStmt to the create context */ + /* Save it in the inh_indexes list for the time being */ cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); - /* Keep our lock on the index till xact commit */ - index_close(parent_index, NoLock); + index_close(parent_index, AccessShareLock); } } @@ -713,54 +715,62 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } /* - * Generate an IndexStmt entry using information from an already - * existing index "source_idx". - * - * Note: Much of this functionality is cribbed from pg_get_indexdef. + * Generate an IndexStmt node using information from an already existing index + * "source_idx". Attribute numbers should be adjusted according to attmap. */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, AttrNumber *attmap) { - HeapTuple ht_idx; + Oid source_relid = RelationGetRelid(source_idx); HeapTuple ht_idxrel; - HeapTuple ht_am; - Form_pg_index idxrec; + HeapTuple ht_idx; Form_pg_class idxrelrec; + Form_pg_index idxrec; Form_pg_am amrec; - List *indexprs = NIL; + oidvector *indclass; + IndexStmt *index; + List *indexprs; ListCell *indexpr_item; Oid indrelid; - Oid source_relid; int keyno; Oid keycoltype; - Datum indclassDatum; - Datum indoptionDatum; + Datum datum; bool isnull; - oidvector *indclass; - int2vector *indoption; - IndexStmt *index; - Datum reloptions; - source_relid = RelationGetRelid(source_idx); + /* + * Fetch pg_class tuple of source index. We can't use the copy in the + * relcache entry because it doesn't include optional fields. + */ + ht_idxrel = SearchSysCache(RELOID, + ObjectIdGetDatum(source_relid), + 0, 0, 0); + if (!HeapTupleIsValid(ht_idxrel)) + elog(ERROR, "cache lookup failed for relation %u", source_relid); + idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); - /* Fetch pg_index tuple for source index */ - ht_idx = SearchSysCache(INDEXRELID, - ObjectIdGetDatum(source_relid), - 0, 0, 0); - if (!HeapTupleIsValid(ht_idx)) - elog(ERROR, "cache lookup failed for index %u", source_relid); + /* Fetch pg_index tuple for source index from relcache entry */ + ht_idx = source_idx->rd_indextuple; idxrec = (Form_pg_index) GETSTRUCT(ht_idx); - - Assert(source_relid == idxrec->indexrelid); indrelid = idxrec->indrelid; + /* Fetch pg_am tuple for source index from relcache entry */ + amrec = source_idx->rd_am; + + /* Must get indclass the hard way, since it's not stored in relcache */ + datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(datum); + + /* Begin building the IndexStmt */ index = makeNode(IndexStmt); + index->relation = cxt->relation; + index->accessMethod = pstrdup(NameStr(amrec->amname)); + index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); index->unique = idxrec->indisunique; - index->concurrent = false; index->primary = idxrec->indisprimary; - index->relation = cxt->relation; - index->isconstraint = false; + index->concurrent = false; /* * We don't try to preserve the name of the source index; instead, just @@ -768,65 +778,40 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, */ index->idxname = NULL; - /* Must get indclass and indoption the hard way */ - indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indclass, &isnull); - Assert(!isnull); - indclass = (oidvector *) DatumGetPointer(indclassDatum); - indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indoption, &isnull); - Assert(!isnull); - indoption = (int2vector *) DatumGetPointer(indoptionDatum); - - /* Fetch pg_class tuple of source index */ - ht_idxrel = SearchSysCache(RELOID, - ObjectIdGetDatum(source_relid), - 0, 0, 0); - if (!HeapTupleIsValid(ht_idxrel)) - elog(ERROR, "cache lookup failed for relation %u", source_relid); - /* - * Store the reloptions for later use by this new index + * If the index is marked PRIMARY, it's certainly from a constraint; + * else, if it's not marked UNIQUE, it certainly isn't; else, we have + * to search pg_depend to see if there's an associated unique constraint. */ - reloptions = SysCacheGetAttr(RELOID, ht_idxrel, - Anum_pg_class_reloptions, &isnull); - if (!isnull) - index->src_options = flatten_reloptions(source_relid); - - idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); - - /* Fetch pg_am tuple for the index's access method */ - ht_am = SearchSysCache(AMOID, - ObjectIdGetDatum(idxrelrec->relam), - 0, 0, 0); - if (!HeapTupleIsValid(ht_am)) - elog(ERROR, "cache lookup failed for access method %u", - idxrelrec->relam); - amrec = (Form_pg_am) GETSTRUCT(ht_am); - index->accessMethod = pstrdup(NameStr(amrec->amname)); + if (index->primary) + index->isconstraint = true; + else if (!index->unique) + index->isconstraint = false; + else + index->isconstraint = OidIsValid(get_index_constraint(source_relid)); /* Get the index expressions, if any */ - if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs)) + datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indexprs, &isnull); + if (!isnull) { - Datum exprsDatum; - bool isnull; char *exprsString; - exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indexprs, &isnull); - exprsString = DatumGetCString(DirectFunctionCall1(textout, - exprsDatum)); - Assert(!isnull); + exprsString = DatumGetCString(DirectFunctionCall1(textout, datum)); indexprs = (List *) stringToNode(exprsString); } + else + indexprs = NIL; - indexpr_item = list_head(indexprs); + /* Build the list of IndexElem */ + index->indexParams = NIL; + indexpr_item = list_head(indexprs); for (keyno = 0; keyno < idxrec->indnatts; keyno++) { IndexElem *iparam; AttrNumber attnum = idxrec->indkey.values[keyno]; - int16 opt = indoption->values[keyno]; + int16 opt = source_idx->rd_indoption[keyno]; iparam = makeNode(IndexElem); @@ -849,11 +834,14 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); + indexpr_item = lnext(indexpr_item); + + /* OK to modify indexkey since we are working on a private copy */ change_varattnos_of_a_node(indexkey, attmap); + iparam->name = NULL; iparam->expr = indexkey; - indexpr_item = lnext(indexpr_item); keycoltype = exprType(indexkey); } @@ -866,40 +854,50 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, /* Adjust options if necessary */ if (amrec->amcanorder) { - /* If it supports sort ordering, report DESC and NULLS opts */ + /* + * If it supports sort ordering, copy DESC and NULLS opts. + * Don't set non-default settings unnecessarily, though, + * so as to improve the chance of recognizing equivalence + * to constraint indexes. + */ if (opt & INDOPTION_DESC) + { iparam->ordering = SORTBY_DESC; - if (opt & INDOPTION_NULLS_FIRST) - iparam->nulls_ordering = SORTBY_NULLS_FIRST; + if ((opt & INDOPTION_NULLS_FIRST) == 0) + iparam->nulls_ordering = SORTBY_NULLS_LAST; + } + else + { + if (opt & INDOPTION_NULLS_FIRST) + iparam->nulls_ordering = SORTBY_NULLS_FIRST; + } } index->indexParams = lappend(index->indexParams, iparam); } - /* Use the same tablespace as the source index */ - index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); + /* Copy reloptions if any */ + datum = SysCacheGetAttr(RELOID, ht_idxrel, + Anum_pg_class_reloptions, &isnull); + if (!isnull) + index->options = untransformRelOptions(datum); /* If it's a partial index, decompile and append the predicate */ - if (!heap_attisnull(ht_idx, Anum_pg_index_indpred)) + datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indpred, &isnull); + if (!isnull) { - Datum pred_datum; - bool isnull; char *pred_str; /* Convert text string to node tree */ - pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indpred, &isnull); - Assert(!isnull); - pred_str = DatumGetCString(DirectFunctionCall1(textout, - pred_datum)); + pred_str = DatumGetCString(DirectFunctionCall1(textout, datum)); index->whereClause = (Node *) stringToNode(pred_str); + /* Adjust attribute numbers */ change_varattnos_of_a_node(index->whereClause, attmap); } /* Clean up */ - ReleaseSysCache(ht_idx); ReleaseSysCache(ht_idxrel); - ReleaseSysCache(ht_am); return index; } @@ -924,11 +922,11 @@ get_opclass(Oid opclass, Oid actual_datatype) elog(ERROR, "cache lookup failed for opclass %u", opclass); opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc); - if (!OidIsValid(actual_datatype) || - GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) + if (GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) { + /* For simplicity, we always schema-qualify the name */ char *nsp_name = get_namespace_name(opc_rec->opcnamespace); - char *opc_name = NameStr(opc_rec->opcname); + char *opc_name = pstrdup(NameStr(opc_rec->opcname)); result = list_make2(makeString(nsp_name), makeString(opc_name)); } @@ -940,8 +938,8 @@ get_opclass(Oid opclass, Oid actual_datatype) /* * transformIndexConstraints - * Handle UNIQUE and PRIMARY KEY constraints, which create - * indexes. We also merge index definitions arising from + * Handle UNIQUE and PRIMARY KEY constraints, which create indexes. + * We also merge in any index definitions arising from * LIKE ... INCLUDING INDEXES. */ static void @@ -960,7 +958,30 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) { Constraint *constraint = (Constraint *) lfirst(lc); + Assert(IsA(constraint, Constraint)); + Assert(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE); + index = transformIndexConstraint(constraint, cxt); + + indexlist = lappend(indexlist, index); + } + + /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */ + foreach(lc, cxt->inh_indexes) + { + index = (IndexStmt *) lfirst(lc); + + if (index->primary) + { + if (cxt->pkey != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + cxt->pkey = index; + } + indexlist = lappend(indexlist, index); } @@ -995,8 +1016,11 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) { IndexStmt *priorindex = lfirst(k); - if (equal(index->indexParams, priorindex->indexParams)) + if (equal(index->indexParams, priorindex->indexParams) && + equal(index->whereClause, priorindex->whereClause) && + strcmp(index->accessMethod, priorindex->accessMethod) == 0) { + priorindex->unique |= index->unique; /* * If the prior index is as yet unnamed, and this one is * named, then transfer the name to the prior index. This @@ -1013,27 +1037,13 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) if (keep) cxt->alist = lappend(cxt->alist, index); } - - /* Copy indexes defined by LIKE ... INCLUDING INDEXES */ - foreach(lc, cxt->inh_indexes) - { - index = (IndexStmt *) lfirst(lc); - - if (index->primary) - { - if (cxt->pkey) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); - - cxt->pkey = index; - } - - cxt->alist = lappend(cxt->alist, index); - } } +/* + * transformIndexConstraint + * Transform one UNIQUE or PRIMARY KEY constraint for + * transformIndexConstraints. + */ static IndexStmt * transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) { @@ -1041,13 +1051,10 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) ListCell *keys; IndexElem *iparam; - Assert(constraint->contype == CONSTR_PRIMARY || - constraint->contype == CONSTR_UNIQUE); - index = makeNode(IndexStmt); + index->unique = true; index->primary = (constraint->contype == CONSTR_PRIMARY); - if (index->primary) { if (cxt->pkey != NULL) @@ -1771,7 +1778,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) /* * transformIndexConstraints wants cxt.alist to contain only index - * statements, so transfer anything we already have into save_alist. + * statements, so transfer anything we already have into save_alist * immediately. */ save_alist = cxt.alist; |