diff options
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r-- | src/backend/commands/copy.c | 174 |
1 files changed, 171 insertions, 3 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index ec5d6f15659..270be0af18e 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -161,6 +161,11 @@ typedef struct CopyStateData ExprState **defexprs; /* array of default att expressions */ bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; + PartitionDispatch *partition_dispatch_info; + int num_dispatch; + int num_partitions; + ResultRelInfo *partitions; + TupleConversionMap **partition_tupconv_maps; /* * These variables are used to reduce overhead in textual COPY FROM. @@ -1397,6 +1402,71 @@ BeginCopy(ParseState *pstate, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(cstate->rel)))); + + /* + * Initialize state for CopyFrom tuple routing. Watch out for + * any foreign partitions. + */ + if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDispatch *pd; + List *leaf_parts; + ListCell *cell; + int i, + num_parted, + num_leaf_parts; + ResultRelInfo *leaf_part_rri; + + /* Get the tuple-routing information and lock partitions */ + pd = RelationGetPartitionDispatchInfo(rel, RowExclusiveLock, + &num_parted, &leaf_parts); + num_leaf_parts = list_length(leaf_parts); + cstate->partition_dispatch_info = pd; + cstate->num_dispatch = num_parted; + cstate->num_partitions = num_leaf_parts; + cstate->partitions = (ResultRelInfo *) palloc(num_leaf_parts * + sizeof(ResultRelInfo)); + cstate->partition_tupconv_maps = (TupleConversionMap **) + palloc0(num_leaf_parts * sizeof(TupleConversionMap *)); + + leaf_part_rri = cstate->partitions; + i = 0; + foreach(cell, leaf_parts) + { + Relation partrel; + + /* + * We locked all the partitions above including the leaf + * partitions. Note that each of the relations in + * cstate->partitions will be closed by CopyFrom() after + * it's finished with its processing. + */ + partrel = heap_open(lfirst_oid(cell), NoLock); + + /* + * Verify result relation is a valid target for the current + * operation. + */ + CheckValidResultRel(partrel, CMD_INSERT); + + InitResultRelInfo(leaf_part_rri, + partrel, + 1, /* dummy */ + false, /* no partition constraint check */ + 0); + + /* Open partition indices */ + ExecOpenIndices(leaf_part_rri, false); + + if (!equalTupleDescs(tupDesc, RelationGetDescr(partrel))) + cstate->partition_tupconv_maps[i] = + convert_tuples_by_name(tupDesc, + RelationGetDescr(partrel), + gettext_noop("could not convert row type")); + leaf_part_rri++; + i++; + } + } } else { @@ -1751,6 +1821,12 @@ BeginCopyTo(ParseState *pstate, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from sequence \"%s\"", RelationGetRelationName(rel)))); + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from partitioned table \"%s\"", + RelationGetRelationName(rel)), + errhint("Try the COPY (SELECT ...) TO variant."))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -2249,6 +2325,7 @@ CopyFrom(CopyState cstate) Datum *values; bool *nulls; ResultRelInfo *resultRelInfo; + ResultRelInfo *saved_resultRelInfo = NULL; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ExprContext *econtext; TupleTableSlot *myslot; @@ -2275,6 +2352,7 @@ CopyFrom(CopyState cstate) * only hint about them in the view case.) */ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && + cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && !(cstate->rel->trigdesc && cstate->rel->trigdesc->trig_insert_instead_row)) { @@ -2385,6 +2463,7 @@ CopyFrom(CopyState cstate) InitResultRelInfo(resultRelInfo, cstate->rel, 1, /* dummy rangetable index */ + true, /* do load partition check expression */ 0); ExecOpenIndices(resultRelInfo, false); @@ -2407,11 +2486,13 @@ CopyFrom(CopyState cstate) * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default * expressions. Such triggers or expressions might query the table we're * inserting to, and act differently if the tuples that have already been - * processed and prepared for insertion are not there. + * processed and prepared for insertion are not there. We also can't + * do it if the table is partitioned. */ if ((resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) || + cstate->partition_dispatch_info != NULL || cstate->volatile_defexprs) { useHeapMultiInsert = false; @@ -2488,6 +2569,59 @@ CopyFrom(CopyState cstate) slot = myslot; ExecStoreTuple(tuple, slot, InvalidBuffer, false); + /* Determine the partition to heap_insert the tuple into */ + if (cstate->partition_dispatch_info) + { + int leaf_part_index; + TupleConversionMap *map; + + /* + * Away we go ... If we end up not finding a partition after all, + * ExecFindPartition() does not return and errors out instead. + * Otherwise, the returned value is to be used as an index into + * arrays mt_partitions[] and mt_partition_tupconv_maps[] that + * will get us the ResultRelInfo and TupleConversionMap for the + * partition, respectively. + */ + leaf_part_index = ExecFindPartition(resultRelInfo, + cstate->partition_dispatch_info, + slot, + estate); + Assert(leaf_part_index >= 0 && + leaf_part_index < cstate->num_partitions); + + /* + * Save the old ResultRelInfo and switch to the one corresponding + * to the selected partition. + */ + saved_resultRelInfo = resultRelInfo; + resultRelInfo = cstate->partitions + leaf_part_index; + + /* We do not yet have a way to insert into a foreign partition */ + if (resultRelInfo->ri_FdwRoutine) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot route inserted tuples to a foreign table"))); + + /* + * For ExecInsertIndexTuples() to work on the partition's indexes + */ + estate->es_result_relation_info = resultRelInfo; + + /* + * We might need to convert from the parent rowtype to the + * partition rowtype. + */ + map = cstate->partition_tupconv_maps[leaf_part_index]; + if (map) + { + tuple = do_convert_tuple(tuple, map); + ExecStoreTuple(tuple, slot, InvalidBuffer, true); + } + + tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + } + skip_tuple = false; /* BEFORE ROW INSERT Triggers */ @@ -2513,7 +2647,8 @@ CopyFrom(CopyState cstate) else { /* Check the constraints of the tuple */ - if (cstate->rel->rd_att->constr) + if (cstate->rel->rd_att->constr || + resultRelInfo->ri_PartitionCheck) ExecConstraints(resultRelInfo, slot, estate); if (useHeapMultiInsert) @@ -2546,7 +2681,8 @@ CopyFrom(CopyState cstate) List *recheckIndexes = NIL; /* OK, store the tuple and create index entries for it */ - heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); + heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, + hi_options, bistate); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, @@ -2570,6 +2706,12 @@ CopyFrom(CopyState cstate) * tuples inserted by an INSERT command. */ processed++; + + if (saved_resultRelInfo) + { + resultRelInfo = saved_resultRelInfo; + estate->es_result_relation_info = resultRelInfo; + } } } @@ -2607,6 +2749,32 @@ CopyFrom(CopyState cstate) ExecCloseIndices(resultRelInfo); + /* Close all the partitioned tables, leaf partitions, and their indices */ + if (cstate->partition_dispatch_info) + { + int i; + + /* + * Remember cstate->partition_dispatch_info[0] corresponds to the root + * partitioned table, which we must not try to close, because it is + * the main target table of COPY that will be closed eventually by + * DoCopy(). + */ + for (i = 1; i < cstate->num_dispatch; i++) + { + PartitionDispatch pd = cstate->partition_dispatch_info[i]; + + heap_close(pd->reldesc, NoLock); + } + for (i = 0; i < cstate->num_partitions; i++) + { + ResultRelInfo *resultRelInfo = cstate->partitions + i; + + ExecCloseIndices(resultRelInfo); + heap_close(resultRelInfo->ri_RelationDesc, NoLock); + } + } + FreeExecutorState(estate); /* |