From b663a4136331de6c7364226e3dbf7c88bfee7145 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 20 Jan 2021 23:05:46 +0100 Subject: Implement support for bulk inserts in postgres_fdw Extends the FDW API to allow batching inserts into foreign tables. That is usually much more efficient than inserting individual rows, due to high latency for each round-trip to the foreign server. It was possible to implement something similar in the regular FDW API, but it was inconvenient and there were issues with reporting the number of actually inserted rows etc. This extends the FDW API with two new functions: * GetForeignModifyBatchSize - allows the FDW picking optimal batch size * ExecForeignBatchInsert - inserts a batch of rows at once Currently, only INSERT queries support batching. Support for DELETE and UPDATE may be added in the future. This also implements batching for postgres_fdw. The batch size may be specified using "batch_size" option both at the server and table level. The initial patch version was written by me, but it was rewritten and improved in many ways by Takayuki Tsunakawa. Author: Takayuki Tsunakawa Reviewed-by: Tomas Vondra, Amit Langote Discussion: https://postgr.es/m/20200628151002.7x5laxwpgvkyiu3q@development --- src/backend/executor/nodeModifyTable.c | 160 +++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) (limited to 'src/backend/executor/nodeModifyTable.c') diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 921e6954194..9c36860704a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -58,6 +58,13 @@ #include "utils/rel.h" +static void ExecBatchInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag); static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer conflictTid, @@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate, ModifyTable *node = (ModifyTable *) mtstate->ps.plan; OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; + MemoryContext oldContext; /* * If the input result relation is a partitioned table, find the leaf @@ -441,6 +449,55 @@ ExecInsert(ModifyTableState *mtstate, ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_INSERT); + /* + * If the FDW supports batching, and batching is requested, accumulate + * rows and insert them in batches. Otherwise use the per-row inserts. + */ + if (resultRelInfo->ri_BatchSize > 1) + { + /* + * If a certain number of tuples have already been accumulated, + * or a tuple has come for a different relation than that for + * the accumulated tuples, perform the batch insert + */ + if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize) + { + ExecBatchInsert(mtstate, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, canSetTag); + resultRelInfo->ri_NumSlots = 0; + } + + oldContext = MemoryContextSwitchTo(estate->es_query_cxt); + + if (resultRelInfo->ri_Slots == NULL) + { + resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) * + resultRelInfo->ri_BatchSize); + resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) * + resultRelInfo->ri_BatchSize); + } + + resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, + slot->tts_ops); + ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots], + slot); + resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor, + planSlot->tts_ops); + ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], + planSlot); + + resultRelInfo->ri_NumSlots++; + + MemoryContextSwitchTo(oldContext); + + return NULL; + } + /* * insert into foreign table: let the FDW do it */ @@ -698,6 +755,70 @@ ExecInsert(ModifyTableState *mtstate, return result; } +/* ---------------------------------------------------------------- + * ExecBatchInsert + * + * Insert multiple tuples in an efficient way. + * Currently, this handles inserting into a foreign table without + * RETURNING clause. + * ---------------------------------------------------------------- + */ +static void +ExecBatchInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag) +{ + int i; + int numInserted = numSlots; + TupleTableSlot *slot = NULL; + TupleTableSlot **rslots; + + /* + * insert into foreign table: let the FDW do it + */ + rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, + resultRelInfo, + slots, + planSlots, + &numInserted); + + for (i = 0; i < numInserted; i++) + { + slot = rslots[i]; + + /* + * AFTER ROW Triggers or RETURNING expressions might reference the + * tableoid column, so (re-)initialize tts_tableOid before evaluating + * them. + */ + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, NIL, + mtstate->mt_transition_capture); + + /* + * Check any WITH CHECK OPTION constraints from parent views. See the + * comment in ExecInsert. + */ + if (resultRelInfo->ri_WithCheckOptions != NIL) + ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); + } + + if (canSetTag && numInserted > 0) + estate->es_processed += numInserted; + + for (i = 0; i < numSlots; i++) + { + ExecDropSingleTupleTableSlot(slots[i]); + ExecDropSingleTupleTableSlot(planSlots[i]); + } +} + /* ---------------------------------------------------------------- * ExecDelete * @@ -1937,6 +2058,9 @@ ExecModifyTable(PlanState *pstate) ItemPointerData tuple_ctid; HeapTupleData oldtupdata; HeapTuple oldtuple; + PartitionTupleRouting *proute = node->mt_partition_tuple_routing; + List *relinfos = NIL; + ListCell *lc; CHECK_FOR_INTERRUPTS(); @@ -2152,6 +2276,25 @@ ExecModifyTable(PlanState *pstate) return slot; } + /* + * Insert remaining tuples for batch insert. + */ + if (proute) + relinfos = estate->es_tuple_routing_result_relations; + else + relinfos = estate->es_opened_result_relations; + + foreach(lc, relinfos) + { + resultRelInfo = lfirst(lc); + if (resultRelInfo->ri_NumSlots > 0) + ExecBatchInsert(node, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, node->canSetTag); + } + /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ @@ -2650,6 +2793,23 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } } + /* + * Determine if the FDW supports batch insert and determine the batch + * size (a FDW may support batching, but it may be disabled for the + * server/table). + */ + if (!resultRelInfo->ri_usesFdwDirectModify && + operation == CMD_INSERT && + resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + resultRelInfo->ri_BatchSize = + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); + else + resultRelInfo->ri_BatchSize = 1; + + Assert(resultRelInfo->ri_BatchSize >= 1); + /* * Lastly, if this is not the primary (canSetTag) ModifyTable node, add it * to estate->es_auxmodifytables so that it will be run to completion by -- cgit v1.2.3