diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2020-08-21 15:00:42 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2020-08-21 15:00:47 -0400 |
commit | 50289819230d8ddad510879ee4793b04a05cf13b (patch) | |
tree | 3a46042ea9648a37341a3b29518f15dca8597e87 /src/backend/parser/parse_utilcmd.c | |
parent | eabba4a3eb71b3886d0ec581155df6202b96b15a (diff) | |
download | postgresql-50289819230d8ddad510879ee4793b04a05cf13b.tar.gz postgresql-50289819230d8ddad510879ee4793b04a05cf13b.zip |
Fix handling of CREATE TABLE LIKE with inheritance.
If a CREATE TABLE command uses both LIKE and traditional inheritance,
Vars in CHECK constraints and expression indexes that are absorbed
from a LIKE parent table tended to get mis-numbered, resulting in
wrong answers and/or bizarre error messages (though probably not any
actual crashes, thanks to validation occurring in the executor).
In v12 and up, the same could happen to Vars in GENERATED expressions,
even in cases with no LIKE clause but multiple traditional-inheritance
parents.
The cause of the problem for LIKE is that parse_utilcmd.c supposed
it could renumber such Vars correctly during transformCreateStmt(),
which it cannot since we have not yet accounted for columns added via
inheritance. Fix that by postponing processing of LIKE INCLUDING
CONSTRAINTS, DEFAULTS, GENERATED, INDEXES till after we've performed
DefineRelation().
The error with GENERATED and multiple inheritance is a simple oversight
in MergeAttributes(); it knows it has to renumber Vars in inherited
CHECK constraints, but forgot to apply the same processing to inherited
GENERATED expressions (a/k/a defaults).
Per bug #16272 from Tom Gottfried. The non-GENERATED variants of the
issue are ancient, presumably dating right back to the addition of
CREATE TABLE LIKE; hence back-patch to all supported branches.
Discussion: https://postgr.es/m/16272-6e32da020e9a9381@postgresql.org
Diffstat (limited to 'src/backend/parser/parse_utilcmd.c')
-rw-r--r-- | src/backend/parser/parse_utilcmd.c | 381 |
1 files changed, 234 insertions, 147 deletions
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 25abc544fc7..6c49554defb 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -86,7 +86,6 @@ typedef struct List *ckconstraints; /* CHECK constraints */ List *fkconstraints; /* FOREIGN KEY constraints */ List *ixconstraints; /* index-creating constraints */ - List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */ List *extstats; /* cloned extended statistics */ List *blist; /* "before list" of things to do before * creating the table */ @@ -154,6 +153,9 @@ static Const *transformPartitionBoundValue(ParseState *pstate, Node *con, * Returns a List of utility commands to be done in sequence. One of these * will be the transformed CreateStmt, but there may be additional actions * to be done before and after the actual DefineRelation() call. + * In addition to normal utility commands such as AlterTableStmt and + * IndexStmt, the result list may contain TableLikeClause(s), representing + * the need to perform additional parse analysis after DefineRelation(). * * SQL allows constraints to be scattered all over, so thumb through * the columns and collect all constraints into one place. @@ -241,7 +243,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; - cxt.inh_indexes = NIL; cxt.extstats = NIL; cxt.blist = NIL; cxt.alist = NIL; @@ -917,18 +918,18 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint) * transformTableLikeClause * * Change the LIKE <srctable> portion of a CREATE TABLE statement into - * column definitions which recreate the user defined column portions of - * <srctable>. + * column definitions that recreate the user defined column portions of + * <srctable>. Also, if there are any LIKE options that we can't fully + * process at this point, add the TableLikeClause to cxt->alist, which + * will cause utility.c to call expandTableLikeClause() after the new + * table has been created. */ static void transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_clause) { AttrNumber parent_attno; - AttrNumber new_attno; Relation relation; TupleDesc tupleDesc; - TupleConstr *constr; - AttrMap *attmap; AclResult aclresult; char *comment; ParseCallbackState pcbstate; @@ -942,6 +943,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("LIKE is not supported for creating foreign tables"))); + /* Open the relation referenced by the LIKE clause */ relation = relation_openrv(table_like_clause->relation, AccessShareLock); if (relation->rd_rel->relkind != RELKIND_RELATION && @@ -978,37 +980,11 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla } tupleDesc = RelationGetDescr(relation); - constr = tupleDesc->constr; - - /* - * Initialize column number map for map_variable_attnos(). We need this - * since dropped columns in the source table aren't copied, so the new - * table can have different column numbers. - */ - attmap = make_attrmap(tupleDesc->natts); - - /* - * We must fill the attmap now so that it can be used to process generated - * column default expressions in the per-column loop below. - */ - new_attno = 1; - for (parent_attno = 1; parent_attno <= tupleDesc->natts; - parent_attno++) - { - Form_pg_attribute attribute = TupleDescAttr(tupleDesc, - parent_attno - 1); - - /* - * Ignore dropped columns in the parent. attmap entry is left zero. - */ - if (attribute->attisdropped) - continue; - - attmap->attnums[parent_attno - 1] = list_length(cxt->columns) + (new_attno++); - } /* * Insert the copied attributes into the cxt for the new table definition. + * We must do this now so that they appear in the table in the relative + * position where the LIKE clause is, as required by SQL99. */ for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) @@ -1052,52 +1028,12 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla cxt->columns = lappend(cxt->columns, def); /* - * Copy default, if present and it should be copied. We have separate - * options for plain default expressions and GENERATED defaults. + * Although we don't transfer the column's default/generation + * expression now, we need to mark it GENERATED if appropriate. */ - if (attribute->atthasdef && - (attribute->attgenerated ? - (table_like_clause->options & CREATE_TABLE_LIKE_GENERATED) : - (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS))) - { - Node *this_default = NULL; - AttrDefault *attrdef; - int i; - bool found_whole_row; - - /* Find default in constraint structure */ - Assert(constr != NULL); - attrdef = constr->defval; - for (i = 0; i < constr->num_defval; i++) - { - if (attrdef[i].adnum == parent_attno) - { - this_default = stringToNode(attrdef[i].adbin); - break; - } - } - Assert(this_default != NULL); - - def->cooked_default = map_variable_attnos(this_default, - 1, 0, - attmap, - InvalidOid, &found_whole_row); - - /* - * Prevent this for the same reason as for constraints below. Note - * that defaults cannot contain any vars, so it's OK that the - * error message refers to generated columns. - */ - if (found_whole_row) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot convert whole-row table reference"), - errdetail("Generation expression for column \"%s\" contains a whole-row reference to table \"%s\".", - attributeName, - RelationGetRelationName(relation)))); - + if (attribute->atthasdef && attribute->attgenerated && + (table_like_clause->options & CREATE_TABLE_LIKE_GENERATED)) def->generated = attribute->attgenerated; - } /* * Copy identity if requested @@ -1146,13 +1082,190 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla } /* + * We cannot yet deal with defaults, CHECK constraints, or indexes, since + * we don't yet know what column numbers the copied columns will have in + * the finished table. If any of those options are specified, add the + * LIKE clause to cxt->alist so that expandTableLikeClause will be called + * after we do know that. + */ + if (table_like_clause->options & + (CREATE_TABLE_LIKE_DEFAULTS | + CREATE_TABLE_LIKE_GENERATED | + CREATE_TABLE_LIKE_CONSTRAINTS | + CREATE_TABLE_LIKE_INDEXES)) + cxt->alist = lappend(cxt->alist, table_like_clause); + + /* + * We may copy extended statistics if requested, since the representation + * of CreateStatsStmt doesn't depend on column numbers. + */ + if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS) + { + List *parent_extstats; + ListCell *l; + + parent_extstats = RelationGetStatExtList(relation); + + foreach(l, parent_extstats) + { + Oid parent_stat_oid = lfirst_oid(l); + CreateStatsStmt *stats_stmt; + + stats_stmt = generateClonedExtStatsStmt(cxt->relation, + RelationGetRelid(relation), + parent_stat_oid); + + /* Copy comment on statistics object, if requested */ + if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) + { + comment = GetComment(parent_stat_oid, StatisticExtRelationId, 0); + + /* + * We make use of CreateStatsStmt's stxcomment option, so as + * not to need to know now what name the statistics will have. + */ + stats_stmt->stxcomment = comment; + } + + cxt->extstats = lappend(cxt->extstats, stats_stmt); + } + + list_free(parent_extstats); + } + + /* + * Close the parent rel, but keep our AccessShareLock on it until xact + * commit. That will prevent someone else from deleting or ALTERing the + * parent before we can run expandTableLikeClause. + */ + table_close(relation, NoLock); +} + +/* + * expandTableLikeClause + * + * Process LIKE options that require knowing the final column numbers + * assigned to the new table's columns. This executes after we have + * run DefineRelation for the new table. It returns a list of utility + * commands that should be run to generate indexes etc. + */ +List * +expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) +{ + List *result = NIL; + List *atsubcmds = NIL; + AttrNumber parent_attno; + Relation relation; + Relation childrel; + TupleDesc tupleDesc; + TupleConstr *constr; + AttrMap *attmap; + char *comment; + + /* + * Open the relation referenced by the LIKE clause. We should still have + * the table lock obtained by transformTableLikeClause (and this'll throw + * an assertion failure if not). Hence, no need to recheck privileges + * etc. + */ + relation = relation_openrv(table_like_clause->relation, NoLock); + + tupleDesc = RelationGetDescr(relation); + constr = tupleDesc->constr; + + /* + * Open the newly-created child relation; we have lock on that too. + */ + childrel = relation_openrv(heapRel, NoLock); + + /* + * Construct a map from the LIKE relation's attnos to the child rel's. + * This re-checks type match etc, although it shouldn't be possible to + * have a failure since both tables are locked. + */ + attmap = build_attrmap_by_name(RelationGetDescr(childrel), + tupleDesc); + + /* + * Process defaults, if required. + */ + if ((table_like_clause->options & + (CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED)) && + constr != NULL) + { + AttrDefault *attrdef = constr->defval; + + for (parent_attno = 1; parent_attno <= tupleDesc->natts; + parent_attno++) + { + Form_pg_attribute attribute = TupleDescAttr(tupleDesc, + parent_attno - 1); + + /* + * Ignore dropped columns in the parent. + */ + if (attribute->attisdropped) + continue; + + /* + * Copy default, if present and it should be copied. We have + * separate options for plain default expressions and GENERATED + * defaults. + */ + if (attribute->atthasdef && + (attribute->attgenerated ? + (table_like_clause->options & CREATE_TABLE_LIKE_GENERATED) : + (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS))) + { + Node *this_default = NULL; + AlterTableCmd *atsubcmd; + bool found_whole_row; + + /* Find default in constraint structure */ + for (int i = 0; i < constr->num_defval; i++) + { + if (attrdef[i].adnum == parent_attno) + { + this_default = stringToNode(attrdef[i].adbin); + break; + } + } + Assert(this_default != NULL); + + atsubcmd = makeNode(AlterTableCmd); + atsubcmd->subtype = AT_CookedColumnDefault; + atsubcmd->num = attmap->attnums[parent_attno - 1]; + atsubcmd->def = map_variable_attnos(this_default, + 1, 0, + attmap, + InvalidOid, + &found_whole_row); + + /* + * Prevent this for the same reason as for constraints below. + * Note that defaults cannot contain any vars, so it's OK that + * the error message refers to generated columns. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Generation expression for column \"%s\" contains a whole-row reference to table \"%s\".", + NameStr(attribute->attname), + RelationGetRelationName(relation)))); + + atsubcmds = lappend(atsubcmds, atsubcmd); + } + } + } + + /* * Copy CHECK constraints if requested, being careful to adjust attribute * numbers so they match the child. */ if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) && - tupleDesc->constr) + constr != NULL) { - TupleConstr *constr = tupleDesc->constr; int ccnum; for (ccnum = 0; ccnum < constr->num_check; ccnum++) @@ -1160,9 +1273,10 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla char *ccname = constr->check[ccnum].ccname; char *ccbin = constr->check[ccnum].ccbin; bool ccnoinherit = constr->check[ccnum].ccnoinherit; - Constraint *n = makeNode(Constraint); Node *ccbin_node; bool found_whole_row; + Constraint *n; + AlterTableCmd *atsubcmd; ccbin_node = map_variable_attnos(stringToNode(ccbin), 1, 0, @@ -1183,13 +1297,22 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla ccname, RelationGetRelationName(relation)))); + n = makeNode(Constraint); n->contype = CONSTR_CHECK; n->conname = pstrdup(ccname); n->location = -1; n->is_no_inherit = ccnoinherit; n->raw_expr = NULL; n->cooked_expr = nodeToString(ccbin_node); - cxt->ckconstraints = lappend(cxt->ckconstraints, n); + + /* We can skip validation, since the new table should be empty. */ + n->skip_validation = true; + n->initially_valid = true; + + atsubcmd = makeNode(AlterTableCmd); + atsubcmd->subtype = AT_AddConstraint; + atsubcmd->def = (Node *) n; + atsubcmds = lappend(atsubcmds, atsubcmd); /* Copy comment on constraint */ if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) && @@ -1201,18 +1324,34 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla CommentStmt *stmt = makeNode(CommentStmt); stmt->objtype = OBJECT_TABCONSTRAINT; - stmt->object = (Node *) list_make3(makeString(cxt->relation->schemaname), - makeString(cxt->relation->relname), + stmt->object = (Node *) list_make3(makeString(heapRel->schemaname), + makeString(heapRel->relname), makeString(n->conname)); stmt->comment = comment; - cxt->alist = lappend(cxt->alist, stmt); + result = lappend(result, stmt); } } } /* - * Likewise, copy indexes if requested + * If we generated any ALTER TABLE actions above, wrap them into a single + * ALTER TABLE command. Stick it at the front of the result, so it runs + * before any CommentStmts we made above. + */ + if (atsubcmds) + { + AlterTableStmt *atcmd = makeNode(AlterTableStmt); + + atcmd->relation = copyObject(heapRel); + atcmd->cmds = atsubcmds; + atcmd->objtype = OBJECT_TABLE; + atcmd->missing_ok = false; + result = lcons(atcmd, result); + } + + /* + * Process indexes if required. */ if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) && relation->rd_rel->relhasindex) @@ -1231,7 +1370,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt->relation, + index_stmt = generateClonedIndexStmt(heapRel, parent_index, attmap, NULL); @@ -1248,49 +1387,14 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla index_stmt->idxcomment = comment; } - /* Save it in the inh_indexes list for the time being */ - cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); + result = lappend(result, index_stmt); index_close(parent_index, AccessShareLock); } } - /* - * Likewise, copy extended statistics if requested - */ - if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS) - { - List *parent_extstats; - ListCell *l; - - parent_extstats = RelationGetStatExtList(relation); - - foreach(l, parent_extstats) - { - Oid parent_stat_oid = lfirst_oid(l); - CreateStatsStmt *stats_stmt; - - stats_stmt = generateClonedExtStatsStmt(cxt->relation, - RelationGetRelid(relation), - parent_stat_oid); - - /* Copy comment on statistics object, if requested */ - if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) - { - comment = GetComment(parent_stat_oid, StatisticExtRelationId, 0); - - /* - * We make use of CreateStatsStmt's stxcomment option, so as - * not to need to know now what name the statistics will have. - */ - stats_stmt->stxcomment = comment; - } - - cxt->extstats = lappend(cxt->extstats, stats_stmt); - } - - list_free(parent_extstats); - } + /* Done with child rel */ + table_close(childrel, NoLock); /* * Close the parent rel, but keep our AccessShareLock on it until xact @@ -1298,6 +1402,8 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla * parent before the child is committed. */ table_close(relation, NoLock); + + return result; } static void @@ -1590,7 +1696,7 @@ generateClonedIndexStmt(RangeVar *heapRel, Relation source_idx, attmap, InvalidOid, &found_whole_row); - /* As in transformTableLikeClause, reject whole-row variables */ + /* As in expandTableLikeClause, reject whole-row variables */ if (found_whole_row) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -1699,7 +1805,7 @@ generateClonedIndexStmt(RangeVar *heapRel, Relation source_idx, attmap, InvalidOid, &found_whole_row); - /* As in transformTableLikeClause, reject whole-row variables */ + /* As in expandTableLikeClause, reject whole-row variables */ if (found_whole_row) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -1897,24 +2003,6 @@ transformIndexConstraints(CreateStmtContext *cxt) indexlist = lappend(indexlist, index); } - /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */ - foreach(lc, cxt->inh_indexes) - { - index = (IndexStmt *) lfirst(lc); - - if (index->primary) - { - if (cxt->pkey != NULL) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); - cxt->pkey = index; - } - - indexlist = lappend(indexlist, index); - } - /* * Scan the index list and remove any redundant index specifications. This * can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A @@ -3115,7 +3203,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; - cxt.inh_indexes = NIL; cxt.extstats = NIL; cxt.blist = NIL; cxt.alist = NIL; |