aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeModifyTable.c
diff options
context:
space:
mode:
authorEtsuro Fujita <efujita@postgresql.org>2022-11-25 17:45:03 +0900
committerEtsuro Fujita <efujita@postgresql.org>2022-11-25 17:45:03 +0900
commite52245228ecf3a58b031c820332bc56478bb1ba2 (patch)
tree67578250905f61346eb46ed4864a247272f8a85c /src/backend/executor/nodeModifyTable.c
parent9f2cc1a73ccf84c24dc4c386721b3bb73757dc24 (diff)
downloadpostgresql-e52245228ecf3a58b031c820332bc56478bb1ba2.tar.gz
postgresql-e52245228ecf3a58b031c820332bc56478bb1ba2.zip
Fix handling of pending inserts in nodeModifyTable.c.
Commit b663a4136, which allowed FDWs to INSERT rows in bulk, added to nodeModifyTable.c code to flush pending inserts to the foreign-table result relation(s) before completing processing of the ModifyTable node, but the code failed to take into account the case where the INSERT query has modifying CTEs, leading to incorrect results. Also, that commit failed to flush pending inserts before firing BEFORE ROW triggers so that rows are visible to such triggers. In that commit we scanned through EState's es_tuple_routing_result_relations or es_opened_result_relations list to find the foreign-table result relations to which pending inserts are flushed, but that would be inefficient in some cases. So to fix, 1) add a List member to EState to record the insert-pending result relations, and 2) modify nodeModifyTable.c so that it adds the foreign-table result relation to the list in ExecInsert() if appropriate, and flushes pending inserts properly using the list where needed. While here, fix a copy-and-pasteo in a comment in ExecBatchInsert(), which was added by that commit. Back-patch to v14 where that commit appeared. Discussion: https://postgr.es/m/CAPmGK16qutyCmyJJzgQOhfBq%3DNoGDqTB6O0QBZTihrbqre%2BoxA%40mail.gmail.com
Diffstat (limited to 'src/backend/executor/nodeModifyTable.c')
-rw-r--r--src/backend/executor/nodeModifyTable.c92
1 files changed, 71 insertions, 21 deletions
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 37ba4755cbc..ee0f0422040 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -67,6 +67,7 @@ static void ExecBatchInsert(ModifyTableState *mtstate,
int numSlots,
EState *estate,
bool canSetTag);
+static void ExecPendingInserts(EState *estate);
static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
ItemPointer conflictTid,
@@ -645,6 +646,10 @@ ExecInsert(ModifyTableState *mtstate,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row)
{
+ /* Flush any pending inserts, so rows are visible to the triggers */
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
+
if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
return NULL; /* "do nothing" */
}
@@ -678,6 +683,8 @@ ExecInsert(ModifyTableState *mtstate,
*/
if (resultRelInfo->ri_BatchSize > 1)
{
+ bool flushed = false;
+
/*
* When we've reached the desired batch size, perform the
* insertion.
@@ -690,6 +697,7 @@ ExecInsert(ModifyTableState *mtstate,
resultRelInfo->ri_NumSlots,
estate, canSetTag);
resultRelInfo->ri_NumSlots = 0;
+ flushed = true;
}
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -732,6 +740,24 @@ ExecInsert(ModifyTableState *mtstate,
ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
planSlot);
+ /*
+ * If these are the first tuples stored in the buffers, add the
+ * target rel to the es_insert_pending_result_relations list,
+ * except in the case where flushing was done above, in which case
+ * the target rel would already have been added to the list, so no
+ * need to do this.
+ */
+ if (resultRelInfo->ri_NumSlots == 0 && !flushed)
+ {
+ Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
+ resultRelInfo));
+ estate->es_insert_pending_result_relations =
+ lappend(estate->es_insert_pending_result_relations,
+ resultRelInfo);
+ }
+ Assert(list_member_ptr(estate->es_insert_pending_result_relations,
+ resultRelInfo));
+
resultRelInfo->ri_NumSlots++;
MemoryContextSwitchTo(oldContext);
@@ -1034,9 +1060,8 @@ ExecBatchInsert(ModifyTableState *mtstate,
slot = rslots[i];
/*
- * AFTER ROW Triggers or RETURNING expressions might reference the
- * tableoid column, so (re-)initialize tts_tableOid before evaluating
- * them.
+ * AFTER ROW Triggers might reference the tableoid column, so
+ * (re-)initialize tts_tableOid before evaluating them.
*/
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
@@ -1107,6 +1132,10 @@ ExecDelete(ModifyTableState *mtstate,
{
bool dodelete;
+ /* Flush any pending inserts, so rows are visible to the triggers */
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
+
dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
tupleid, oldtuple, epqreturnslot);
@@ -1411,6 +1440,32 @@ ldelete:;
}
/*
+ * ExecPendingInserts -- flushes all pending inserts to the foreign tables
+ */
+static void
+ExecPendingInserts(EState *estate)
+{
+ ListCell *lc;
+
+ foreach(lc, estate->es_insert_pending_result_relations)
+ {
+ ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(lc);
+ ModifyTableState *mtstate = resultRelInfo->ri_ModifyTableState;
+
+ Assert(mtstate);
+ ExecBatchInsert(mtstate, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, mtstate->canSetTag);
+ resultRelInfo->ri_NumSlots = 0;
+ }
+
+ list_free(estate->es_insert_pending_result_relations);
+ estate->es_insert_pending_result_relations = NIL;
+}
+
+/*
* ExecCrossPartitionUpdate --- Move an updated tuple to another partition.
*
* This works by first deleting the old tuple from the current partition,
@@ -1634,6 +1689,10 @@ ExecUpdate(ModifyTableState *mtstate,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
+ /* Flush any pending inserts, so rows are visible to the triggers */
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
+
if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
tupleid, oldtuple, slot))
return NULL; /* "do nothing" */
@@ -2361,9 +2420,6 @@ ExecModifyTable(PlanState *pstate)
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
- PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
- List *relinfos = NIL;
- ListCell *lc;
CHECK_FOR_INTERRUPTS();
@@ -2620,21 +2676,8 @@ ExecModifyTable(PlanState *pstate)
/*
* Insert remaining tuples for batch insert.
*/
- if (proute)
- relinfos = estate->es_tuple_routing_result_relations;
- else
- relinfos = estate->es_opened_result_relations;
-
- foreach(lc, relinfos)
- {
- resultRelInfo = lfirst(lc);
- if (resultRelInfo->ri_NumSlots > 0)
- ExecBatchInsert(node, resultRelInfo,
- resultRelInfo->ri_Slots,
- resultRelInfo->ri_PlanSlots,
- resultRelInfo->ri_NumSlots,
- estate, node->canSetTag);
- }
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
@@ -3140,6 +3183,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
else
resultRelInfo->ri_BatchSize = 1;
+
+ /*
+ * If doing batch insert, setup back-link so we can easily find the
+ * mtstate again.
+ */
+ if (resultRelInfo->ri_BatchSize > 1)
+ resultRelInfo->ri_ModifyTableState = mtstate;
}
/*