aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeModifyTable.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeModifyTable.c')
-rw-r--r--src/backend/executor/nodeModifyTable.c160
1 files changed, 160 insertions, 0 deletions
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 921e6954194..9c36860704a 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,13 @@
#include "utils/rel.h"
+static void ExecBatchInsert(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int numSlots,
+ EState *estate,
+ bool canSetTag);
static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
ItemPointer conflictTid,
@@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate,
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
OnConflictAction onconflict = node->onConflictAction;
PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+ MemoryContext oldContext;
/*
* If the input result relation is a partitioned table, find the leaf
@@ -442,6 +450,55 @@ ExecInsert(ModifyTableState *mtstate,
CMD_INSERT);
/*
+ * If the FDW supports batching, and batching is requested, accumulate
+ * rows and insert them in batches. Otherwise use the per-row inserts.
+ */
+ if (resultRelInfo->ri_BatchSize > 1)
+ {
+ /*
+ * If a certain number of tuples have already been accumulated,
+ * or a tuple has come for a different relation than that for
+ * the accumulated tuples, perform the batch insert
+ */
+ if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+ {
+ ExecBatchInsert(mtstate, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, canSetTag);
+ resultRelInfo->ri_NumSlots = 0;
+ }
+
+ oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+ if (resultRelInfo->ri_Slots == NULL)
+ {
+ resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
+ resultRelInfo->ri_BatchSize);
+ resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
+ resultRelInfo->ri_BatchSize);
+ }
+
+ resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+ MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+ slot->tts_ops);
+ ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+ slot);
+ resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+ MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+ planSlot->tts_ops);
+ ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+ planSlot);
+
+ resultRelInfo->ri_NumSlots++;
+
+ MemoryContextSwitchTo(oldContext);
+
+ return NULL;
+ }
+
+ /*
* insert into foreign table: let the FDW do it
*/
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
@@ -699,6 +756,70 @@ ExecInsert(ModifyTableState *mtstate,
}
/* ----------------------------------------------------------------
+ * ExecBatchInsert
+ *
+ * Insert multiple tuples in an efficient way.
+ * Currently, this handles inserting into a foreign table without
+ * RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBatchInsert(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int numSlots,
+ EState *estate,
+ bool canSetTag)
+{
+ int i;
+ int numInserted = numSlots;
+ TupleTableSlot *slot = NULL;
+ TupleTableSlot **rslots;
+
+ /*
+ * insert into foreign table: let the FDW do it
+ */
+ rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+ resultRelInfo,
+ slots,
+ planSlots,
+ &numInserted);
+
+ for (i = 0; i < numInserted; i++)
+ {
+ slot = rslots[i];
+
+ /*
+ * AFTER ROW Triggers or RETURNING expressions might reference the
+ * tableoid column, so (re-)initialize tts_tableOid before evaluating
+ * them.
+ */
+ slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+ mtstate->mt_transition_capture);
+
+ /*
+ * Check any WITH CHECK OPTION constraints from parent views. See the
+ * comment in ExecInsert.
+ */
+ if (resultRelInfo->ri_WithCheckOptions != NIL)
+ ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+ }
+
+ if (canSetTag && numInserted > 0)
+ estate->es_processed += numInserted;
+
+ for (i = 0; i < numSlots; i++)
+ {
+ ExecDropSingleTupleTableSlot(slots[i]);
+ ExecDropSingleTupleTableSlot(planSlots[i]);
+ }
+}
+
+/* ----------------------------------------------------------------
* ExecDelete
*
* DELETE is like UPDATE, except that we delete the tuple and no
@@ -1937,6 +2058,9 @@ ExecModifyTable(PlanState *pstate)
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
+ PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
+ List *relinfos = NIL;
+ ListCell *lc;
CHECK_FOR_INTERRUPTS();
@@ -2153,6 +2277,25 @@ ExecModifyTable(PlanState *pstate)
}
/*
+ * Insert remaining tuples for batch insert.
+ */
+ if (proute)
+ relinfos = estate->es_tuple_routing_result_relations;
+ else
+ relinfos = estate->es_opened_result_relations;
+
+ foreach(lc, relinfos)
+ {
+ resultRelInfo = lfirst(lc);
+ if (resultRelInfo->ri_NumSlots > 0)
+ ExecBatchInsert(node, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, node->canSetTag);
+ }
+
+ /*
* We're done, but fire AFTER STATEMENT triggers before exiting.
*/
fireASTriggers(node);
@@ -2651,6 +2794,23 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
/*
+ * Determine if the FDW supports batch insert and determine the batch
+ * size (a FDW may support batching, but it may be disabled for the
+ * server/table).
+ */
+ if (!resultRelInfo->ri_usesFdwDirectModify &&
+ operation == CMD_INSERT &&
+ resultRelInfo->ri_FdwRoutine != NULL &&
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+ resultRelInfo->ri_BatchSize =
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+ else
+ resultRelInfo->ri_BatchSize = 1;
+
+ Assert(resultRelInfo->ri_BatchSize >= 1);
+
+ /*
* Lastly, if this is not the primary (canSetTag) ModifyTable node, add it
* to estate->es_auxmodifytables so that it will be run to completion by
* ExecPostprocessPlan. (It'd actually work fine to add the primary