diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/commands/copyfrom.c | 313 | ||||
-rw-r--r-- | src/include/commands/copyfrom_internal.h | 12 |
2 files changed, 237 insertions, 88 deletions
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 175aa837f2e..a079c70152f 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -78,7 +78,8 @@ typedef struct CopyMultiInsertBuffer { TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel */ + BulkInsertState bistate; /* BulkInsertState for this rel if plain + * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -116,6 +117,12 @@ CopyFromErrorCallback(void *arg) { CopyFromState cstate = (CopyFromState) arg; + if (cstate->relname_only) + { + errcontext("COPY %s", + cstate->cur_relname); + return; + } if (cstate->opts.binary) { /* can't usefully display the data */ @@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); buffer->resultRelInfo = rri; - buffer->bistate = GetBulkInsertState(); + buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -299,83 +306,171 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) */ static inline void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) + CopyMultiInsertBuffer *buffer, + int64 *processed) { - MemoryContext oldcontext; - int i; - uint64 save_cur_lineno; CopyFromState cstate = miinfo->cstate; EState *estate = miinfo->estate; - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; - bool line_buf_valid = cstate->line_buf_valid; int nused = buffer->nused; ResultRelInfo *resultRelInfo = buffer->resultRelInfo; TupleTableSlot **slots = buffer->slots; + int i; - /* - * Print error context information correctly, if one of the operations - * below fails. - */ - cstate->line_buf_valid = false; - save_cur_lineno = cstate->cur_lineno; + if (resultRelInfo->ri_FdwRoutine) + { + int batch_size = resultRelInfo->ri_BatchSize; + int sent = 0; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + Assert(buffer->bistate == NULL); + + /* Ensure that the FDW supports batching and it's enabled */ + Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); + Assert(batch_size > 1); - for (i = 0; i < nused; i++) - { /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. + * We suppress error context information other than the relation name, + * if one of the operations below fails. */ - if (resultRelInfo->ri_NumIndices > 0) + Assert(!cstate->relname_only); + cstate->relname_only = true; + + while (sent < nused) { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, false, - NULL, NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); + int size = (batch_size < nused - sent) ? batch_size : (nused - sent); + int inserted = size; + TupleTableSlot **rslots; + + /* insert into foreign table: let the FDW do it */ + rslots = + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, + resultRelInfo, + &slots[sent], + NULL, + &inserted); + + sent += size; + + /* No need to do anything if there are no inserted rows */ + if (inserted <= 0) + continue; + + /* Triggers on foreign tables should not have transition tables */ + Assert(resultRelInfo->ri_TrigDesc == NULL || + resultRelInfo->ri_TrigDesc->trig_insert_new_table == false); + + /* Run AFTER ROW INSERT triggers */ + if (resultRelInfo->ri_TrigDesc != NULL && + resultRelInfo->ri_TrigDesc->trig_insert_after_row) + { + Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + for (i = 0; i < inserted; i++) + { + TupleTableSlot *slot = rslots[i]; + + /* + * AFTER ROW Triggers might reference the tableoid column, + * so (re-)initialize tts_tableOid before evaluating them. + */ + slot->tts_tableOid = relid; + + ExecARInsertTriggers(estate, resultRelInfo, + slot, NIL, + cstate->transition_capture); + } + } + + /* Update the row counter and progress of the COPY command */ + *processed += inserted; + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + *processed); } + for (i = 0; i < nused; i++) + ExecClearTuple(slots[i]); + + /* reset relname_only */ + cstate->relname_only = false; + } + else + { + CommandId mycid = miinfo->mycid; + int ti_options = miinfo->ti_options; + bool line_buf_valid = cstate->line_buf_valid; + uint64 save_cur_lineno = cstate->cur_lineno; + MemoryContext oldcontext; + + Assert(buffer->bistate != NULL); + /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. + * Print error context information correctly, if one of the operations + * below fails. */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + cstate->line_buf_valid = false; + + /* + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < nused; i++) { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); + /* + * If there are any indexes, update them for all the inserted + * tuples, and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + buffer->slots[i], estate, false, + false, NULL, NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + cstate->transition_capture); + } + + ExecClearTuple(slots[i]); } - ExecClearTuple(slots[i]); + /* Update the row counter and progress of the COPY command */ + *processed += nused; + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + *processed); + + /* reset cur_lineno and line_buf_valid to what they were */ + cstate->line_buf_valid = line_buf_valid; + cstate->cur_lineno = save_cur_lineno; } /* Mark that all slots are free */ buffer->nused = 0; - - /* reset cur_lineno and line_buf_valid to what they were */ - cstate->line_buf_valid = line_buf_valid; - cstate->cur_lineno = save_cur_lineno; } /* @@ -387,22 +482,30 @@ static inline void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer) { + ResultRelInfo *resultRelInfo = buffer->resultRelInfo; int i; /* Ensure buffer was flushed */ Assert(buffer->nused == 0); /* Remove back-link to ourself */ - buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; + resultRelInfo->ri_CopyMultiInsertBuffer = NULL; - FreeBulkInsertState(buffer->bistate); + if (resultRelInfo->ri_FdwRoutine == NULL) + { + Assert(buffer->bistate != NULL); + FreeBulkInsertState(buffer->bistate); + } + else + Assert(buffer->bistate == NULL); /* Since we only create slots on demand, just drop the non-null ones. */ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) ExecDropSingleTupleTableSlot(buffer->slots[i]); - table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + if (resultRelInfo->ri_FdwRoutine == NULL) + table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, + miinfo->ti_options); pfree(buffer); } @@ -418,7 +521,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, * 'curr_rri'. */ static inline void -CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) +CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, + int64 *processed) { ListCell *lc; @@ -426,7 +530,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) { CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); - CopyMultiInsertBufferFlush(miinfo, buffer); + CopyMultiInsertBufferFlush(miinfo, buffer, processed); } miinfo->bufferedTuples = 0; @@ -679,6 +783,23 @@ CopyFrom(CopyFromState cstate) resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, resultRelInfo); + /* + * Also, if the named relation is a foreign table, determine if the FDW + * supports batch insert and determine the batch size (a FDW may support + * batching, but it may be disabled for the server/table). + * + * If the FDW does not support batching, we set the batch size to 1. + */ + if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + resultRelInfo->ri_BatchSize = + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); + else + resultRelInfo->ri_BatchSize = 1; + + Assert(resultRelInfo->ri_BatchSize >= 1); + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -708,10 +829,11 @@ CopyFrom(CopyFromState cstate) /* * It's generally more efficient to prepare a bunch of tuples for - * insertion, and insert them in one table_multi_insert() call, than call - * table_tuple_insert() separately for every tuple. However, there are a - * number of reasons why we might not be able to do this. These are - * explained below. + * insertion, and insert them in one + * table_multi_insert()/ExecForeignBatchInsert() call, than call + * table_tuple_insert()/ExecForeignInsert() separately for every tuple. + * However, there are a number of reasons why we might not be able to do + * this. These are explained below. */ if (resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || @@ -725,6 +847,15 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } + else if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_BatchSize == 1) + { + /* + * Can't support multi-inserts to a foreign table if the FDW does not + * support batching, or it's disabled for the server or foreign table. + */ + insertMethod = CIM_SINGLE; + } else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && resultRelInfo->ri_TrigDesc->trig_insert_new_table) { @@ -737,14 +868,12 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) + else if (cstate->volatile_defexprs) { /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. + * Can't support multi-inserts if there are any volatile default + * expressions in the table. Similarly to the trigger case above, + * such expressions may query the table we're inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the @@ -767,13 +896,14 @@ CopyFrom(CopyFromState cstate) * For partitioned tables, we may still be able to perform bulk * inserts. However, the possibility of this depends on which types * of triggers exist on the partition. We must disable bulk inserts - * if the partition is a foreign table or it has any before row insert - * or insert instead triggers (same as we checked above for the parent - * table). Since the partition's resultRelInfos are initialized only - * when we actually need to insert the first tuple into them, we must - * have the intermediate insert method of CIM_MULTI_CONDITIONAL to - * flag that we must later determine if we can use bulk-inserts for - * the partition being inserted into. + * if the partition is a foreign table that can't use batching or it + * has any before row insert or insert instead triggers (same as we + * checked above for the parent table). Since the partition's + * resultRelInfos are initialized only when we actually need to insert + * the first tuple into them, we must have the intermediate insert + * method of CIM_MULTI_CONDITIONAL to flag that we must later + * determine if we can use bulk-inserts for the partition being + * inserted into. */ if (proute) insertMethod = CIM_MULTI_CONDITIONAL; @@ -910,12 +1040,14 @@ CopyFrom(CopyFromState cstate) /* * Disable multi-inserts when the partition has BEFORE/INSTEAD - * OF triggers, or if the partition is a foreign partition. + * OF triggers, or if the partition is a foreign table that + * can't use batching. */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + (resultRelInfo->ri_FdwRoutine == NULL || + resultRelInfo->ri_BatchSize > 1); /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) @@ -931,7 +1063,9 @@ CopyFrom(CopyFromState cstate) * Flush pending inserts if this partition can't use * batching, so rows are visible to triggers etc. */ - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + CopyMultiInsertInfoFlush(&multiInsertInfo, + resultRelInfo, + &processed); } if (bistate != NULL) @@ -1067,7 +1201,17 @@ CopyFrom(CopyFromState cstate) * buffers out to their tables. */ if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + CopyMultiInsertInfoFlush(&multiInsertInfo, + resultRelInfo, + &processed); + + /* + * We delay updating the row counter and progress of the + * COPY command until after writing the tuples stored in + * the buffer out to the table, as in single insert mode. + * See CopyMultiInsertBufferFlush(). + */ + continue; /* next tuple please */ } else { @@ -1130,7 +1274,7 @@ CopyFrom(CopyFromState cstate) if (insertMethod != CIM_SINGLE) { if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); + CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed); } /* Done, clean up */ @@ -1348,6 +1492,7 @@ BeginCopyFrom(ParseState *pstate, cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; + cstate->relname_only = false; /* * Allocate buffers for the input pipeline. diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index e37c6032ae6..8d9cc5accdb 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -40,13 +40,16 @@ typedef enum EolType } EolType; /* - * Represents the heap insert method to be used during COPY FROM. + * Represents the insert method to be used during COPY FROM. */ typedef enum CopyInsertMethod { - CIM_SINGLE, /* use table_tuple_insert or fdw routine */ - CIM_MULTI, /* always use table_multi_insert */ - CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ + CIM_SINGLE, /* use table_tuple_insert or + * ExecForeignInsert */ + CIM_MULTI, /* always use table_multi_insert or + * ExecForeignBatchInsert */ + CIM_MULTI_CONDITIONAL /* use table_multi_insert or + * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; /* @@ -81,6 +84,7 @@ typedef struct CopyFromStateData uint64 cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ /* * Working state |