diff options
author | Tomas Vondra <tomas.vondra@postgresql.org> | 2021-01-20 23:05:46 +0100 |
---|---|---|
committer | Tomas Vondra <tomas.vondra@postgresql.org> | 2021-01-20 23:57:27 +0100 |
commit | b663a4136331de6c7364226e3dbf7c88bfee7145 (patch) | |
tree | 3a14fac68bcfc27a42e365501ce1cdbc1ddfdc00 /src | |
parent | ad600bba0422dde4b73fbd61049ff2a3847b068a (diff) | |
download | postgresql-b663a4136331de6c7364226e3dbf7c88bfee7145.tar.gz postgresql-b663a4136331de6c7364226e3dbf7c88bfee7145.zip |
Implement support for bulk inserts in postgres_fdw
Extends the FDW API to allow batching inserts into foreign tables. That
is usually much more efficient than inserting individual rows, due to
high latency for each round-trip to the foreign server.
It was possible to implement something similar in the regular FDW API,
but it was inconvenient and there were issues with reporting the number
of actually inserted rows etc. This extends the FDW API with two new
functions:
* GetForeignModifyBatchSize - allows the FDW picking optimal batch size
* ExecForeignBatchInsert - inserts a batch of rows at once
Currently, only INSERT queries support batching. Support for DELETE and
UPDATE may be added in the future.
This also implements batching for postgres_fdw. The batch size may be
specified using "batch_size" option both at the server and table level.
The initial patch version was written by me, but it was rewritten and
improved in many ways by Takayuki Tsunakawa.
Author: Takayuki Tsunakawa
Reviewed-by: Tomas Vondra, Amit Langote
Discussion: https://postgr.es/m/20200628151002.7x5laxwpgvkyiu3q@development
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/executor/execPartition.c | 17 | ||||
-rw-r--r-- | src/backend/executor/nodeModifyTable.c | 160 | ||||
-rw-r--r-- | src/backend/nodes/list.c | 15 | ||||
-rw-r--r-- | src/include/foreign/fdwapi.h | 10 | ||||
-rw-r--r-- | src/include/nodes/execnodes.h | 6 | ||||
-rw-r--r-- | src/include/nodes/pg_list.h | 15 |
6 files changed, 223 insertions, 0 deletions
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 941731a0a9b..1746cb87936 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -993,6 +993,23 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); + /* + * Determine if the FDW supports batch insert and determine the batch + * size (a FDW may support batching, but it may be disabled for the + * server/table or for this particular query). + * + * If the FDW does not support batching, we set the batch size to 1. + */ + if (partRelInfo->ri_FdwRoutine != NULL && + partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + partRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + partRelInfo->ri_BatchSize = + partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(partRelInfo); + else + partRelInfo->ri_BatchSize = 1; + + Assert(partRelInfo->ri_BatchSize >= 1); + partRelInfo->ri_CopyMultiInsertBuffer = NULL; /* diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 921e6954194..9c36860704a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -58,6 +58,13 @@ #include "utils/rel.h" +static void ExecBatchInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag); static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer conflictTid, @@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate, ModifyTable *node = (ModifyTable *) mtstate->ps.plan; OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; + MemoryContext oldContext; /* * If the input result relation is a partitioned table, find the leaf @@ -442,6 +450,55 @@ ExecInsert(ModifyTableState *mtstate, CMD_INSERT); /* + * If the FDW supports batching, and batching is requested, accumulate + * rows and insert them in batches. Otherwise use the per-row inserts. + */ + if (resultRelInfo->ri_BatchSize > 1) + { + /* + * If a certain number of tuples have already been accumulated, + * or a tuple has come for a different relation than that for + * the accumulated tuples, perform the batch insert + */ + if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize) + { + ExecBatchInsert(mtstate, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, canSetTag); + resultRelInfo->ri_NumSlots = 0; + } + + oldContext = MemoryContextSwitchTo(estate->es_query_cxt); + + if (resultRelInfo->ri_Slots == NULL) + { + resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) * + resultRelInfo->ri_BatchSize); + resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) * + resultRelInfo->ri_BatchSize); + } + + resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, + slot->tts_ops); + ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots], + slot); + resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor, + planSlot->tts_ops); + ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], + planSlot); + + resultRelInfo->ri_NumSlots++; + + MemoryContextSwitchTo(oldContext); + + return NULL; + } + + /* * insert into foreign table: let the FDW do it */ slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, @@ -699,6 +756,70 @@ ExecInsert(ModifyTableState *mtstate, } /* ---------------------------------------------------------------- + * ExecBatchInsert + * + * Insert multiple tuples in an efficient way. + * Currently, this handles inserting into a foreign table without + * RETURNING clause. + * ---------------------------------------------------------------- + */ +static void +ExecBatchInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag) +{ + int i; + int numInserted = numSlots; + TupleTableSlot *slot = NULL; + TupleTableSlot **rslots; + + /* + * insert into foreign table: let the FDW do it + */ + rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, + resultRelInfo, + slots, + planSlots, + &numInserted); + + for (i = 0; i < numInserted; i++) + { + slot = rslots[i]; + + /* + * AFTER ROW Triggers or RETURNING expressions might reference the + * tableoid column, so (re-)initialize tts_tableOid before evaluating + * them. + */ + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, NIL, + mtstate->mt_transition_capture); + + /* + * Check any WITH CHECK OPTION constraints from parent views. See the + * comment in ExecInsert. + */ + if (resultRelInfo->ri_WithCheckOptions != NIL) + ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); + } + + if (canSetTag && numInserted > 0) + estate->es_processed += numInserted; + + for (i = 0; i < numSlots; i++) + { + ExecDropSingleTupleTableSlot(slots[i]); + ExecDropSingleTupleTableSlot(planSlots[i]); + } +} + +/* ---------------------------------------------------------------- * ExecDelete * * DELETE is like UPDATE, except that we delete the tuple and no @@ -1937,6 +2058,9 @@ ExecModifyTable(PlanState *pstate) ItemPointerData tuple_ctid; HeapTupleData oldtupdata; HeapTuple oldtuple; + PartitionTupleRouting *proute = node->mt_partition_tuple_routing; + List *relinfos = NIL; + ListCell *lc; CHECK_FOR_INTERRUPTS(); @@ -2153,6 +2277,25 @@ ExecModifyTable(PlanState *pstate) } /* + * Insert remaining tuples for batch insert. + */ + if (proute) + relinfos = estate->es_tuple_routing_result_relations; + else + relinfos = estate->es_opened_result_relations; + + foreach(lc, relinfos) + { + resultRelInfo = lfirst(lc); + if (resultRelInfo->ri_NumSlots > 0) + ExecBatchInsert(node, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, node->canSetTag); + } + + /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ fireASTriggers(node); @@ -2651,6 +2794,23 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* + * Determine if the FDW supports batch insert and determine the batch + * size (a FDW may support batching, but it may be disabled for the + * server/table). + */ + if (!resultRelInfo->ri_usesFdwDirectModify && + operation == CMD_INSERT && + resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + resultRelInfo->ri_BatchSize = + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); + else + resultRelInfo->ri_BatchSize = 1; + + Assert(resultRelInfo->ri_BatchSize >= 1); + + /* * Lastly, if this is not the primary (canSetTag) ModifyTable node, add it * to estate->es_auxmodifytables so that it will be run to completion by * ExecPostprocessPlan. (It'd actually work fine to add the primary diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index c4eba6b053f..dbf6b30233a 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2, return list; } +List * +list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, ListCell datum5) +{ + List *list = new_list(t, 5); + + list->elements[0] = datum1; + list->elements[1] = datum2; + list->elements[2] = datum3; + list->elements[3] = datum4; + list->elements[4] = datum5; + check_list_invariants(list); + return list; +} + /* * Make room for a new head cell in the given (non-NIL) list. * diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 2953499fb10..248f78da452 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); + +typedef int (*GetForeignModifyBatchSize_function) (ResultRelInfo *rinfo); + typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, @@ -209,6 +217,8 @@ typedef struct FdwRoutine PlanForeignModify_function PlanForeignModify; BeginForeignModify_function BeginForeignModify; ExecForeignInsert_function ExecForeignInsert; + ExecForeignBatchInsert_function ExecForeignBatchInsert; + GetForeignModifyBatchSize_function GetForeignModifyBatchSize; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; EndForeignModify_function EndForeignModify; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 48c3f570fa9..d65099c94aa 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -446,6 +446,12 @@ typedef struct ResultRelInfo /* true when modifying foreign table directly */ bool ri_usesFdwDirectModify; + /* batch insert stuff */ + int ri_NumSlots; /* number of slots in the array */ + int ri_BatchSize; /* max slots inserted in a single batch */ + TupleTableSlot **ri_Slots; /* input tuples for batch insert */ + TupleTableSlot **ri_PlanSlots; + /* list of WithCheckOption's to be checked */ List *ri_WithCheckOptions; diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index 710dcd37ef4..404e03f132d 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -213,6 +213,10 @@ list_length(const List *l) #define list_make4(x1,x2,x3,x4) \ list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ list_make_ptr_cell(x3), list_make_ptr_cell(x4)) +#define list_make5(x1,x2,x3,x4,x5) \ + list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ + list_make_ptr_cell(x3), list_make_ptr_cell(x4), \ + list_make_ptr_cell(x5)) #define list_make1_int(x1) \ list_make1_impl(T_IntList, list_make_int_cell(x1)) @@ -224,6 +228,10 @@ list_length(const List *l) #define list_make4_int(x1,x2,x3,x4) \ list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ list_make_int_cell(x3), list_make_int_cell(x4)) +#define list_make5_int(x1,x2,x3,x4,x5) \ + list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ + list_make_int_cell(x3), list_make_int_cell(x4), \ + list_make_int_cell(x5)) #define list_make1_oid(x1) \ list_make1_impl(T_OidList, list_make_oid_cell(x1)) @@ -235,6 +243,10 @@ list_length(const List *l) #define list_make4_oid(x1,x2,x3,x4) \ list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ list_make_oid_cell(x3), list_make_oid_cell(x4)) +#define list_make5_oid(x1,x2,x3,x4,x5) \ + list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ + list_make_oid_cell(x3), list_make_oid_cell(x4), \ + list_make_oid_cell(x5)) /* * Locate the n'th cell (counting from 0) of the list. @@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2, ListCell datum3); extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2, ListCell datum3, ListCell datum4); +extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, + ListCell datum5); extern pg_nodiscard List *lappend(List *list, void *datum); extern pg_nodiscard List *lappend_int(List *list, int datum); |