aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeModifyTable.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeModifyTable.c')
-rw-r--r--src/backend/executor/nodeModifyTable.c936
1 files changed, 900 insertions, 36 deletions
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 701fe052967..171575cd73b 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -24,11 +24,20 @@
* values plus row-locating info for UPDATE and MERGE cases, or just the
* row-locating info for DELETE cases.
*
+ * MERGE runs a join between the source relation and the target
+ * table; if any WHEN NOT MATCHED clauses are present, then the
+ * join is an outer join. In this case, any unmatched tuples will
+ * have NULL row-locating info, and only INSERT can be run. But for
+ * matched tuples, then row-locating info is used to determine the
+ * tuple to UPDATE or DELETE. When all clauses are WHEN MATCHED,
+ * then an inner join is used, so all tuples contain row-locating info.
+ *
* If the query specifies RETURNING, then the ModifyTable returns a
* RETURNING tuple after completing each row insert, update, or delete.
* It must be called again to continue the operation. Without RETURNING,
* we just loop within the node until all the work is done, then
- * return NULL. This avoids useless call/return overhead.
+ * return NULL. This avoids useless call/return overhead. (MERGE does
+ * not support RETURNING.)
*/
#include "postgres.h"
@@ -78,6 +87,17 @@ typedef struct ModifyTableContext
*/
TupleTableSlot *planSlot;
+ /*
+ * During EvalPlanQual, project and return the new version of the new
+ * tuple
+ */
+ TupleTableSlot *(*GetUpdateNewTuple) (ResultRelInfo *resultRelInfo,
+ TupleTableSlot *epqslot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction);
+
+ /* MERGE specific */
+ MergeActionState *relaction; /* MERGE action in progress */
/*
* Information about the changes that were made concurrently to a tuple
@@ -140,6 +160,28 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
ResultRelInfo *targetRelInfo,
TupleTableSlot *slot,
ResultRelInfo **partRelInfo);
+static TupleTableSlot *internalGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction);
+
+static TupleTableSlot *ExecMerge(ModifyTableContext *context,
+ ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid,
+ bool canSetTag);
+static void ExecInitMerge(ModifyTableState *mtstate, EState *estate);
+static bool ExecMergeMatched(ModifyTableContext *context,
+ ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid,
+ bool canSetTag);
+static void ExecMergeNotMatched(ModifyTableContext *context,
+ ResultRelInfo *resultRelInfo,
+ bool canSetTag);
+static TupleTableSlot *mergeGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction);
+
/*
* Verify that the tuples to be produced by INSERT match the
@@ -616,21 +658,32 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
TupleTableSlot *planSlot,
TupleTableSlot *oldSlot)
{
- ProjectionInfo *newProj = relinfo->ri_projectNew;
- ExprContext *econtext;
-
/* Use a few extra Asserts to protect against outside callers */
Assert(relinfo->ri_projectNewInfoValid);
Assert(planSlot != NULL && !TTS_EMPTY(planSlot));
Assert(oldSlot != NULL && !TTS_EMPTY(oldSlot));
+ return internalGetUpdateNewTuple(relinfo, planSlot, oldSlot, NULL);
+}
+
+/*
+ * Callback for ModifyTableState->GetUpdateNewTuple for use by regular UPDATE.
+ */
+static TupleTableSlot *
+internalGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction)
+{
+ ProjectionInfo *newProj = relinfo->ri_projectNew;
+ ExprContext *econtext;
+
econtext = newProj->pi_exprContext;
econtext->ecxt_outertuple = planSlot;
econtext->ecxt_scantuple = oldSlot;
return ExecProject(newProj);
}
-
/* ----------------------------------------------------------------
* ExecInsert
*
@@ -847,9 +900,17 @@ ExecInsert(ModifyTableContext *context,
* partition, we should instead check UPDATE policies, because we are
* executing policies defined on the target table, and not those
* defined on the child partitions.
+ *
+ * If we're running MERGE, we refer to the action that we're executing
+ * to know if we're doing an INSERT or UPDATE to a partition table.
*/
- wco_kind = (mtstate->operation == CMD_UPDATE) ?
- WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+ if (mtstate->operation == CMD_UPDATE)
+ wco_kind = WCO_RLS_UPDATE_CHECK;
+ else if (mtstate->operation == CMD_MERGE)
+ wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ?
+ WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
+ else
+ wco_kind = WCO_RLS_INSERT_CHECK;
/*
* ExecWithCheckOptions() will skip any WCOs which are not of the kind
@@ -1453,7 +1514,13 @@ ldelete:;
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent delete")));
- /* tuple already deleted; nothing to do */
+
+ /*
+ * tuple already deleted; nothing to do. But MERGE might want
+ * to handle it differently. We've already filled-in
+ * actionInfo with sufficient information for MERGE to look
+ * at.
+ */
return NULL;
default:
@@ -1659,7 +1726,8 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
elog(ERROR, "failed to fetch tuple being updated");
/* and project the new tuple to retry the UPDATE with */
context->cpUpdateRetrySlot =
- ExecGetUpdateNewTuple(resultRelInfo, epqslot, oldSlot);
+ context->GetUpdateNewTuple(resultRelInfo, epqslot, oldSlot,
+ context->relaction);
return false;
}
}
@@ -1718,7 +1786,8 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_before_row)
return ExecBRUpdateTriggers(context->estate, context->epqstate,
- resultRelInfo, tupleid, oldtuple, slot);
+ resultRelInfo, tupleid, oldtuple, slot,
+ &context->tmfd);
return true;
}
@@ -1865,6 +1934,13 @@ lreplace:;
}
/*
+ * No luck, a retry is needed. If running MERGE, we do not do so
+ * here; instead let it handle that on its own rules.
+ */
+ if (context->relaction != NULL)
+ return TM_Updated;
+
+ /*
* ExecCrossPartitionUpdate installed an updated version of the new
* tuple in the retry slot; start over.
*/
@@ -2109,8 +2185,8 @@ redo_act:
/*
* If ExecUpdateAct reports that a cross-partition update was done,
- * then the returning tuple has been projected and there's nothing
- * else for us to do.
+ * then the RETURNING tuple (if any) has been projected and there's
+ * nothing else for us to do.
*/
if (updateCxt.crossPartUpdate)
return context->cpUpdateReturningSlot;
@@ -2337,9 +2413,9 @@ ExecOnConflictUpdate(ModifyTableContext *context,
* to break.
*
* It is the user's responsibility to prevent this situation from
- * occurring. These problems are why SQL-2003 similarly specifies
- * that for SQL MERGE, an exception must be raised in the event of
- * an attempt to update the same row twice.
+ * occurring. These problems are why the SQL standard similarly
+ * specifies that for SQL MERGE, an exception must be raised in
+ * the event of an attempt to update the same row twice.
*/
xminDatum = slot_getsysattr(existing,
MinTransactionIdAttributeNumber,
@@ -2350,7 +2426,9 @@ ExecOnConflictUpdate(ModifyTableContext *context,
if (TransactionIdIsCurrentTransactionId(xmin))
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
- errmsg("ON CONFLICT DO UPDATE command cannot affect row a second time"),
+ /* translator: %s is a SQL command name */
+ errmsg("%s command cannot affect row a second time",
+ "ON CONFLICT DO UPDATE"),
errhint("Ensure that no rows proposed for insertion within the same command have duplicate constrained values.")));
/* This shouldn't happen */
@@ -2490,6 +2568,705 @@ ExecOnConflictUpdate(ModifyTableContext *context,
return true;
}
+/*
+ * Perform MERGE.
+ */
+static TupleTableSlot *
+ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid, bool canSetTag)
+{
+ bool matched;
+
+ /*-----
+ * If we are dealing with a WHEN MATCHED case (tupleid is valid), we
+ * execute the first action for which the additional WHEN MATCHED AND
+ * quals pass. If an action without quals is found, that action is
+ * executed.
+ *
+ * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at
+ * the given WHEN NOT MATCHED actions in sequence until one passes.
+ *
+ * Things get interesting in case of concurrent update/delete of the
+ * target tuple. Such concurrent update/delete is detected while we are
+ * executing a WHEN MATCHED action.
+ *
+ * A concurrent update can:
+ *
+ * 1. modify the target tuple so that it no longer satisfies the
+ * additional quals attached to the current WHEN MATCHED action
+ *
+ * In this case, we are still dealing with a WHEN MATCHED case.
+ * We recheck the list of WHEN MATCHED actions from the start and
+ * choose the first one that satisfies the new target tuple.
+ *
+ * 2. modify the target tuple so that the join quals no longer pass and
+ * hence the source tuple no longer has a match.
+ *
+ * In this case, the source tuple no longer matches the target tuple,
+ * so we now instead find a qualifying WHEN NOT MATCHED action to
+ * execute.
+ *
+ * XXX Hmmm, what if the updated tuple would now match one that was
+ * considered NOT MATCHED so far?
+ *
+ * A concurrent delete changes a WHEN MATCHED case to WHEN NOT MATCHED.
+ *
+ * ExecMergeMatched takes care of following the update chain and
+ * re-finding the qualifying WHEN MATCHED action, as long as the updated
+ * target tuple still satisfies the join quals, i.e., it remains a WHEN
+ * MATCHED case. If the tuple gets deleted or the join quals fail, it
+ * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
+ * always make progress by following the update chain and we never switch
+ * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
+ * livelock.
+ */
+ matched = tupleid != NULL;
+ if (matched)
+ matched = ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag);
+
+ /*
+ * Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched()
+ * returned "false", indicating the previously MATCHED tuple no longer
+ * matches.
+ */
+ if (!matched)
+ ExecMergeNotMatched(context, resultRelInfo, canSetTag);
+
+ /* No RETURNING support yet */
+ return NULL;
+}
+
+/*
+ * Check and execute the first qualifying MATCHED action. The current target
+ * tuple is identified by tupleid.
+ *
+ * We start from the first WHEN MATCHED action and check if the WHEN quals
+ * pass, if any. If the WHEN quals for the first action do not pass, we
+ * check the second, then the third and so on. If we reach to the end, no
+ * action is taken and we return true, indicating that no further action is
+ * required for this tuple.
+ *
+ * If we do find a qualifying action, then we attempt to execute the action.
+ *
+ * If the tuple is concurrently updated, EvalPlanQual is run with the updated
+ * tuple to recheck the join quals. Note that the additional quals associated
+ * with individual actions are evaluated by this routine via ExecQual, while
+ * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the
+ * updated tuple still passes the join quals, then we restart from the first
+ * action to look for a qualifying action. Otherwise, we return false --
+ * meaning that a NOT MATCHED action must now be executed for the current
+ * source tuple.
+ */
+static bool
+ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
+ ItemPointer tupleid, bool canSetTag)
+{
+ ModifyTableState *mtstate = context->mtstate;
+ TupleTableSlot *newslot;
+ EState *estate = context->estate;
+ ExprContext *econtext = mtstate->ps.ps_ExprContext;
+ bool isNull;
+ EPQState *epqstate = &mtstate->mt_epqstate;
+ ListCell *l;
+
+ /*
+ * If there are no WHEN MATCHED actions, we are done.
+ */
+ if (resultRelInfo->ri_matchedMergeAction == NIL)
+ return true;
+
+ /*
+ * Make tuple and any needed join variables available to ExecQual and
+ * ExecProject. The target's existing tuple is installed in the scantuple.
+ * Again, this target relation's slot is required only in the case of a
+ * MATCHED tuple and UPDATE/DELETE actions.
+ */
+ econtext->ecxt_scantuple = resultRelInfo->ri_oldTupleSlot;
+ econtext->ecxt_innertuple = context->planSlot;
+ econtext->ecxt_outertuple = NULL;
+
+lmerge_matched:;
+
+ /*
+ * This routine is only invoked for matched rows, and we must have found
+ * the tupleid of the target row in that case; fetch that tuple.
+ *
+ * We use SnapshotAny for this because we might get called again after
+ * EvalPlanQual returns us a new tuple, which may not be visible to our
+ * MVCC snapshot.
+ */
+
+ if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+ tupleid,
+ SnapshotAny,
+ resultRelInfo->ri_oldTupleSlot))
+ elog(ERROR, "failed to fetch the target tuple");
+
+ foreach(l, resultRelInfo->ri_matchedMergeAction)
+ {
+ MergeActionState *relaction = (MergeActionState *) lfirst(l);
+ CmdType commandType = relaction->mas_action->commandType;
+ List *recheckIndexes = NIL;
+ TM_Result result;
+ UpdateContext updateCxt = {0};
+
+ /*
+ * Test condition, if any.
+ *
+ * In the absence of any condition, we perform the action
+ * unconditionally (no need to check separately since ExecQual() will
+ * return true if there are no conditions to evaluate).
+ */
+ if (!ExecQual(relaction->mas_whenqual, econtext))
+ continue;
+
+ /*
+ * Check if the existing target tuple meets the USING checks of
+ * UPDATE/DELETE RLS policies. If those checks fail, we throw an
+ * error.
+ *
+ * The WITH CHECK quals are applied in ExecUpdate() and hence we need
+ * not do anything special to handle them.
+ *
+ * NOTE: We must do this after WHEN quals are evaluated, so that we
+ * check policies only when they matter.
+ */
+ if (resultRelInfo->ri_WithCheckOptions)
+ {
+ ExecWithCheckOptions(commandType == CMD_UPDATE ?
+ WCO_RLS_MERGE_UPDATE_CHECK : WCO_RLS_MERGE_DELETE_CHECK,
+ resultRelInfo,
+ resultRelInfo->ri_oldTupleSlot,
+ context->mtstate->ps.state);
+ }
+
+ /* Perform stated action */
+ switch (commandType)
+ {
+ case CMD_UPDATE:
+
+ /*
+ * Project the output tuple, and use that to update the table.
+ * We don't need to filter out junk attributes, because the
+ * UPDATE action's targetlist doesn't have any.
+ */
+ newslot = ExecProject(relaction->mas_proj);
+
+ context->relaction = relaction;
+ context->GetUpdateNewTuple = mergeGetUpdateNewTuple;
+ context->cpUpdateRetrySlot = NULL;
+
+ if (!ExecUpdatePrologue(context, resultRelInfo,
+ tupleid, NULL, newslot))
+ {
+ result = TM_Ok;
+ break;
+ }
+ ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate);
+ result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
+ newslot, mtstate->canSetTag, &updateCxt);
+ if (result == TM_Ok && updateCxt.updated)
+ {
+ ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
+ tupleid, NULL, newslot, recheckIndexes);
+ mtstate->mt_merge_updated += 1;
+ }
+
+ break;
+
+ case CMD_DELETE:
+ context->relaction = relaction;
+ if (!ExecDeletePrologue(context, resultRelInfo, tupleid,
+ NULL, NULL))
+ {
+ result = TM_Ok;
+ break;
+ }
+ result = ExecDeleteAct(context, resultRelInfo, tupleid, false);
+ if (result == TM_Ok)
+ {
+ ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
+ false);
+ mtstate->mt_merge_deleted += 1;
+ }
+ break;
+
+ case CMD_NOTHING:
+ /* Doing nothing is always OK */
+ result = TM_Ok;
+ break;
+
+ default:
+ elog(ERROR, "unknown action in MERGE WHEN MATCHED clause");
+ }
+
+ switch (result)
+ {
+ case TM_Ok:
+ /* all good; perform final actions */
+ if (canSetTag)
+ (estate->es_processed)++;
+
+ break;
+
+ case TM_SelfModified:
+
+ /*
+ * The SQL standard disallows this for MERGE.
+ */
+ if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax))
+ ereport(ERROR,
+ (errcode(ERRCODE_CARDINALITY_VIOLATION),
+ /* translator: %s is a SQL command name */
+ errmsg("%s command cannot affect row a second time",
+ "MERGE"),
+ errhint("Ensure that not more than one source row matches any one target row.")));
+ /* This shouldn't happen */
+ elog(ERROR, "attempted to update or delete invisible tuple");
+ break;
+
+ case TM_Deleted:
+ if (IsolationUsesXactSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to concurrent delete")));
+
+ /*
+ * If the tuple was already deleted, return to let caller
+ * handle it under NOT MATCHED clauses.
+ */
+ return false;
+
+ case TM_Updated:
+ {
+ Relation resultRelationDesc;
+ TupleTableSlot *epqslot,
+ *inputslot;
+ LockTupleMode lockmode;
+
+ /*
+ * The target tuple was concurrently updated by some other
+ * transaction.
+ */
+
+ /*
+ * If cpUpdateRetrySlot is set, ExecCrossPartitionUpdate()
+ * must have detected that the tuple was concurrently
+ * updated, so we restart the search for an appropriate
+ * WHEN MATCHED clause to process the updated tuple.
+ *
+ * In this case, ExecDelete() would already have performed
+ * EvalPlanQual() on the latest version of the tuple,
+ * which in turn would already have been loaded into
+ * ri_oldTupleSlot, so no need to do either of those
+ * things.
+ *
+ * XXX why do we not check the WHEN NOT MATCHED list in
+ * this case?
+ */
+ if (!TupIsNull(context->cpUpdateRetrySlot))
+ goto lmerge_matched;
+
+ /*
+ * Otherwise, we run the EvalPlanQual() with the new
+ * version of the tuple. If EvalPlanQual() does not return
+ * a tuple, then we switch to the NOT MATCHED list of
+ * actions. If it does return a tuple and the join qual is
+ * still satisfied, then we just need to recheck the
+ * MATCHED actions, starting from the top, and execute the
+ * first qualifying action.
+ */
+ resultRelationDesc = resultRelInfo->ri_RelationDesc;
+ lockmode = ExecUpdateLockMode(estate, resultRelInfo);
+
+ inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,
+ resultRelInfo->ri_RangeTableIndex);
+
+ result = table_tuple_lock(resultRelationDesc, tupleid,
+ estate->es_snapshot,
+ inputslot, estate->es_output_cid,
+ lockmode, LockWaitBlock,
+ TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+ &context->tmfd);
+ switch (result)
+ {
+ case TM_Ok:
+ epqslot = EvalPlanQual(epqstate,
+ resultRelationDesc,
+ resultRelInfo->ri_RangeTableIndex,
+ inputslot);
+
+ /*
+ * If we got no tuple, or the tuple we get has a
+ * NULL ctid, go back to caller: this one is not a
+ * MATCHED tuple anymore, so they can retry with
+ * NOT MATCHED actions.
+ */
+ if (TupIsNull(epqslot))
+ return false;
+
+ (void) ExecGetJunkAttribute(epqslot,
+ resultRelInfo->ri_RowIdAttNo,
+ &isNull);
+ if (isNull)
+ return false;
+
+ /*
+ * When a tuple was updated and migrated to
+ * another partition concurrently, the current
+ * MERGE implementation can't follow. There's
+ * probably a better way to handle this case, but
+ * it'd require recognizing the relation to which
+ * the tuple moved, and setting our current
+ * resultRelInfo to that.
+ */
+ if (ItemPointerIndicatesMovedPartitions(&context->tmfd.ctid))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be deleted was already moved to another partition due to concurrent update")));
+
+ /*
+ * A non-NULL ctid means that we are still dealing
+ * with MATCHED case. Restart the loop so that we
+ * apply all the MATCHED rules again, to ensure
+ * that the first qualifying WHEN MATCHED action
+ * is executed.
+ *
+ * Update tupleid to that of the new tuple, for
+ * the refetch we do at the top.
+ */
+ ItemPointerCopy(&context->tmfd.ctid, tupleid);
+ goto lmerge_matched;
+
+ case TM_Deleted:
+
+ /*
+ * tuple already deleted; tell caller to run NOT
+ * MATCHED actions
+ */
+ return false;
+
+ case TM_SelfModified:
+
+ /*
+ * This can be reached when following an update
+ * chain from a tuple updated by another session,
+ * reaching a tuple that was already updated in
+ * this transaction. If previously modified by
+ * this command, ignore the redundant update,
+ * otherwise error out.
+ *
+ * See also response to TM_SelfModified in
+ * ExecUpdate().
+ */
+ if (context->tmfd.cmax != estate->es_output_cid)
+ ereport(ERROR,
+ (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
+ errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"),
+ errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
+ return false;
+
+ default:
+ /* see table_tuple_lock call in ExecDelete() */
+ elog(ERROR, "unexpected table_tuple_lock status: %u",
+ result);
+ return false;
+ }
+ }
+
+ case TM_Invisible:
+ case TM_WouldBlock:
+ case TM_BeingModified:
+ /* these should not occur */
+ elog(ERROR, "unexpected tuple operation result: %d", result);
+ break;
+ }
+
+ /*
+ * We've activated one of the WHEN clauses, so we don't search
+ * further. This is required behaviour, not an optimization.
+ */
+ break;
+ }
+
+ /*
+ * Successfully executed an action or no qualifying action was found.
+ */
+ return true;
+}
+
+/*
+ * Execute the first qualifying NOT MATCHED action.
+ */
+static void
+ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
+ bool canSetTag)
+{
+ ModifyTableState *mtstate = context->mtstate;
+ ExprContext *econtext = mtstate->ps.ps_ExprContext;
+ List *actionStates = NIL;
+ ListCell *l;
+
+ /*
+ * For INSERT actions, the root relation's merge action is OK since the
+ * INSERT's targetlist and the WHEN conditions can only refer to the
+ * source relation and hence it does not matter which result relation we
+ * work with.
+ *
+ * XXX does this mean that we can avoid creating copies of actionStates on
+ * partitioned tables, for not-matched actions?
+ */
+ actionStates = resultRelInfo->ri_notMatchedMergeAction;
+
+ /*
+ * Make source tuple available to ExecQual and ExecProject. We don't need
+ * the target tuple, since the WHEN quals and targetlist can't refer to
+ * the target columns.
+ */
+ econtext->ecxt_scantuple = NULL;
+ econtext->ecxt_innertuple = context->planSlot;
+ econtext->ecxt_outertuple = NULL;
+
+ foreach(l, actionStates)
+ {
+ MergeActionState *action = (MergeActionState *) lfirst(l);
+ CmdType commandType = action->mas_action->commandType;
+ TupleTableSlot *newslot;
+
+ /*
+ * Test condition, if any.
+ *
+ * In the absence of any condition, we perform the action
+ * unconditionally (no need to check separately since ExecQual() will
+ * return true if there are no conditions to evaluate).
+ */
+ if (!ExecQual(action->mas_whenqual, econtext))
+ continue;
+
+ /* Perform stated action */
+ switch (commandType)
+ {
+ case CMD_INSERT:
+
+ /*
+ * Project the tuple. In case of a partitioned table, the
+ * projection was already built to use the root's descriptor,
+ * so we don't need to map the tuple here.
+ */
+ newslot = ExecProject(action->mas_proj);
+ context->relaction = action;
+
+ (void) ExecInsert(context, mtstate->rootResultRelInfo, newslot,
+ canSetTag, NULL, NULL);
+ mtstate->mt_merge_inserted += 1;
+ break;
+ case CMD_NOTHING:
+ /* Do nothing */
+ break;
+ default:
+ elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause");
+ }
+
+ /*
+ * We've activated one of the WHEN clauses, so we don't search
+ * further. This is required behaviour, not an optimization.
+ */
+ break;
+ }
+}
+
+/*
+ * Initialize state for execution of MERGE.
+ */
+void
+ExecInitMerge(ModifyTableState *mtstate, EState *estate)
+{
+ ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
+ ResultRelInfo *rootRelInfo = mtstate->rootResultRelInfo;
+ ResultRelInfo *resultRelInfo;
+ ExprContext *econtext;
+ ListCell *lc;
+ int i;
+
+ if (node->mergeActionLists == NIL)
+ return;
+
+ mtstate->mt_merge_subcommands = 0;
+
+ if (mtstate->ps.ps_ExprContext == NULL)
+ ExecAssignExprContext(estate, &mtstate->ps);
+ econtext = mtstate->ps.ps_ExprContext;
+
+ /*
+ * Create a MergeActionState for each action on the mergeActionList and
+ * add it to either a list of matched actions or not-matched actions.
+ *
+ * Similar logic appears in ExecInitPartitionInfo(), so if changing
+ * anything here, do so there too.
+ */
+ i = 0;
+ foreach(lc, node->mergeActionLists)
+ {
+ List *mergeActionList = lfirst(lc);
+ TupleDesc relationDesc;
+ ListCell *l;
+
+ resultRelInfo = mtstate->resultRelInfo + i;
+ i++;
+ relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+
+ /* initialize slots for MERGE fetches from this rel */
+ if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
+ ExecInitMergeTupleSlots(mtstate, resultRelInfo);
+
+ foreach(l, mergeActionList)
+ {
+ MergeAction *action = (MergeAction *) lfirst(l);
+ MergeActionState *action_state;
+ TupleTableSlot *tgtslot;
+ TupleDesc tgtdesc;
+ List **list;
+
+ /*
+ * Build action merge state for this rel. (For partitions,
+ * equivalent code exists in ExecInitPartitionInfo.)
+ */
+ action_state = makeNode(MergeActionState);
+ action_state->mas_action = action;
+ action_state->mas_whenqual = ExecInitQual((List *) action->qual,
+ &mtstate->ps);
+
+ /*
+ * We create two lists - one for WHEN MATCHED actions and one for
+ * WHEN NOT MATCHED actions - and stick the MergeActionState into
+ * the appropriate list.
+ */
+ if (action_state->mas_action->matched)
+ list = &resultRelInfo->ri_matchedMergeAction;
+ else
+ list = &resultRelInfo->ri_notMatchedMergeAction;
+ *list = lappend(*list, action_state);
+
+ switch (action->commandType)
+ {
+ case CMD_INSERT:
+ ExecCheckPlanOutput(rootRelInfo->ri_RelationDesc,
+ action->targetList);
+
+ /*
+ * If the MERGE targets a partitioned table, any INSERT
+ * actions must be routed through it, not the child
+ * relations. Initialize the routing struct and the root
+ * table's "new" tuple slot for that, if not already done.
+ * The projection we prepare, for all relations, uses the
+ * root relation descriptor, and targets the plan's root
+ * slot. (This is consistent with the fact that we
+ * checked the plan output to match the root relation,
+ * above.)
+ */
+ if (rootRelInfo->ri_RelationDesc->rd_rel->relkind ==
+ RELKIND_PARTITIONED_TABLE)
+ {
+ if (mtstate->mt_partition_tuple_routing == NULL)
+ {
+ /*
+ * Initialize planstate for routing if not already
+ * done.
+ *
+ * Note that the slot is managed as a standalone
+ * slot belonging to ModifyTableState, so we pass
+ * NULL for the 2nd argument.
+ */
+ mtstate->mt_root_tuple_slot =
+ table_slot_create(rootRelInfo->ri_RelationDesc,
+ NULL);
+ mtstate->mt_partition_tuple_routing =
+ ExecSetupPartitionTupleRouting(estate,
+ rootRelInfo->ri_RelationDesc);
+ }
+ tgtslot = mtstate->mt_root_tuple_slot;
+ tgtdesc = RelationGetDescr(rootRelInfo->ri_RelationDesc);
+ }
+ else
+ {
+ /* not partitioned? use the stock relation and slot */
+ tgtslot = resultRelInfo->ri_newTupleSlot;
+ tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+ }
+
+ action_state->mas_proj =
+ ExecBuildProjectionInfo(action->targetList, econtext,
+ tgtslot,
+ &mtstate->ps,
+ tgtdesc);
+
+ mtstate->mt_merge_subcommands |= MERGE_INSERT;
+ break;
+ case CMD_UPDATE:
+ action_state->mas_proj =
+ ExecBuildUpdateProjection(action->targetList,
+ true,
+ action->updateColnos,
+ relationDesc,
+ econtext,
+ resultRelInfo->ri_newTupleSlot,
+ &mtstate->ps);
+ mtstate->mt_merge_subcommands |= MERGE_UPDATE;
+ break;
+ case CMD_DELETE:
+ mtstate->mt_merge_subcommands |= MERGE_DELETE;
+ break;
+ case CMD_NOTHING:
+ break;
+ default:
+ elog(ERROR, "unknown operation");
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * Initializes the tuple slots in a ResultRelInfo for any MERGE action.
+ *
+ * We mark 'projectNewInfoValid' even though the projections themselves
+ * are not initialized here.
+ */
+void
+ExecInitMergeTupleSlots(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo)
+{
+ EState *estate = mtstate->ps.state;
+
+ Assert(!resultRelInfo->ri_projectNewInfoValid);
+
+ resultRelInfo->ri_oldTupleSlot =
+ table_slot_create(resultRelInfo->ri_RelationDesc,
+ &estate->es_tupleTable);
+ resultRelInfo->ri_newTupleSlot =
+ table_slot_create(resultRelInfo->ri_RelationDesc,
+ &estate->es_tupleTable);
+ resultRelInfo->ri_projectNewInfoValid = true;
+}
+
+/*
+ * Callback for ModifyTableContext->GetUpdateNewTuple for use by MERGE. It
+ * computes the updated tuple by projecting from the current merge action's
+ * projection.
+ */
+static TupleTableSlot *
+mergeGetUpdateNewTuple(ResultRelInfo *relinfo,
+ TupleTableSlot *planSlot,
+ TupleTableSlot *oldSlot,
+ MergeActionState *relaction)
+{
+ ExprContext *econtext = relaction->mas_proj->pi_exprContext;
+
+ econtext->ecxt_scantuple = oldSlot;
+ econtext->ecxt_innertuple = planSlot;
+
+ return ExecProject(relaction->mas_proj);
+}
/*
* Process BEFORE EACH STATEMENT triggers
@@ -2514,6 +3291,14 @@ fireBSTriggers(ModifyTableState *node)
case CMD_DELETE:
ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
break;
+ case CMD_MERGE:
+ if (node->mt_merge_subcommands & MERGE_INSERT)
+ ExecBSInsertTriggers(node->ps.state, resultRelInfo);
+ if (node->mt_merge_subcommands & MERGE_UPDATE)
+ ExecBSUpdateTriggers(node->ps.state, resultRelInfo);
+ if (node->mt_merge_subcommands & MERGE_DELETE)
+ ExecBSDeleteTriggers(node->ps.state, resultRelInfo);
+ break;
default:
elog(ERROR, "unknown operation");
break;
@@ -2547,6 +3332,17 @@ fireASTriggers(ModifyTableState *node)
ExecASDeleteTriggers(node->ps.state, resultRelInfo,
node->mt_transition_capture);
break;
+ case CMD_MERGE:
+ if (node->mt_merge_subcommands & MERGE_DELETE)
+ ExecASDeleteTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ if (node->mt_merge_subcommands & MERGE_UPDATE)
+ ExecASUpdateTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ if (node->mt_merge_subcommands & MERGE_INSERT)
+ ExecASInsertTriggers(node->ps.state, resultRelInfo,
+ node->mt_transition_capture);
+ break;
default:
elog(ERROR, "unknown operation");
break;
@@ -2749,7 +3545,28 @@ ExecModifyTable(PlanState *pstate)
datum = ExecGetJunkAttribute(planSlot, node->mt_resultOidAttno,
&isNull);
if (isNull)
+ {
+ /*
+ * For commands other than MERGE, any tuples having InvalidOid
+ * for tableoid are errors. For MERGE, we may need to handle
+ * them as WHEN NOT MATCHED clauses if any, so do that.
+ *
+ * Note that we use the node's toplevel resultRelInfo, not any
+ * specific partition's.
+ */
+ if (operation == CMD_MERGE)
+ {
+ EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
+
+ context.planSlot = planSlot;
+ context.lockmode = 0;
+
+ ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag);
+ continue; /* no RETURNING support yet */
+ }
+
elog(ERROR, "tableoid is NULL");
+ }
resultoid = DatumGetObjectId(datum);
/* If it's not the same as last time, we need to locate the rel */
@@ -2784,13 +3601,14 @@ ExecModifyTable(PlanState *pstate)
oldtuple = NULL;
/*
- * For UPDATE/DELETE, fetch the row identity info for the tuple to be
- * updated/deleted. For a heap relation, that's a TID; otherwise we
- * may have a wholerow junk attr that carries the old tuple in toto.
- * Keep this in step with the part of ExecInitModifyTable that sets up
- * ri_RowIdAttNo.
+ * For UPDATE/DELETE/MERGE, fetch the row identity info for the tuple
+ * to be updated/deleted/merged. For a heap relation, that's a TID;
+ * otherwise we may have a wholerow junk attr that carries the old
+ * tuple in toto. Keep this in step with the part of
+ * ExecInitModifyTable that sets up ri_RowIdAttNo.
*/
- if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ if (operation == CMD_UPDATE || operation == CMD_DELETE ||
+ operation == CMD_MERGE)
{
char relkind;
Datum datum;
@@ -2806,9 +3624,30 @@ ExecModifyTable(PlanState *pstate)
datum = ExecGetJunkAttribute(slot,
resultRelInfo->ri_RowIdAttNo,
&isNull);
- /* shouldn't ever get a null result... */
+
+ /*
+ * For commands other than MERGE, any tuples having a null row
+ * identifier are errors. For MERGE, we may need to handle
+ * them as WHEN NOT MATCHED clauses if any, so do that.
+ *
+ * Note that we use the node's toplevel resultRelInfo, not any
+ * specific partition's.
+ */
if (isNull)
+ {
+ if (operation == CMD_MERGE)
+ {
+ EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
+
+ context.planSlot = planSlot;
+ context.lockmode = 0;
+
+ ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag);
+ continue; /* no RETURNING support yet */
+ }
+
elog(ERROR, "ctid is NULL");
+ }
tupleid = (ItemPointer) DatumGetPointer(datum);
tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
@@ -2898,8 +3737,10 @@ ExecModifyTable(PlanState *pstate)
oldSlot))
elog(ERROR, "failed to fetch tuple being updated");
}
- slot = ExecGetUpdateNewTuple(resultRelInfo, planSlot,
- oldSlot);
+ slot = internalGetUpdateNewTuple(resultRelInfo, planSlot,
+ oldSlot, NULL);
+ context.GetUpdateNewTuple = internalGetUpdateNewTuple;
+ context.relaction = NULL;
/* Now apply the update. */
slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
@@ -2911,6 +3752,10 @@ ExecModifyTable(PlanState *pstate)
true, false, node->canSetTag, NULL, NULL);
break;
+ case CMD_MERGE:
+ slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag);
+ break;
+
default:
elog(ERROR, "unknown operation");
break;
@@ -3044,6 +3889,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate->resultRelInfo = (ResultRelInfo *)
palloc(nrels * sizeof(ResultRelInfo));
+ mtstate->mt_merge_inserted = 0;
+ mtstate->mt_merge_updated = 0;
+ mtstate->mt_merge_deleted = 0;
+
/*----------
* Resolve the target relation. This is the same as:
*
@@ -3147,12 +3996,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
/*
- * For UPDATE/DELETE, find the appropriate junk attr now, either a
- * 'ctid' or 'wholerow' attribute depending on relkind. For foreign
+ * For UPDATE/DELETE/MERGE, find the appropriate junk attr now, either
+ * a 'ctid' or 'wholerow' attribute depending on relkind. For foreign
* tables, the FDW might have created additional junk attr(s), but
* those are no concern of ours.
*/
- if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ if (operation == CMD_UPDATE || operation == CMD_DELETE ||
+ operation == CMD_MERGE)
{
char relkind;
@@ -3169,19 +4019,28 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
else if (relkind == RELKIND_FOREIGN_TABLE)
{
/*
+ * We don't support MERGE with foreign tables for now. (It's
+ * problematic because the implementation uses CTID.)
+ */
+ Assert(operation != CMD_MERGE);
+
+ /*
* When there is a row-level trigger, there should be a
* wholerow attribute. We also require it to be present in
- * UPDATE, so we can get the values of unchanged columns.
+ * UPDATE and MERGE, so we can get the values of unchanged
+ * columns.
*/
resultRelInfo->ri_RowIdAttNo =
ExecFindJunkAttributeInTlist(subplan->targetlist,
"wholerow");
- if (mtstate->operation == CMD_UPDATE &&
+ if ((mtstate->operation == CMD_UPDATE || mtstate->operation == CMD_MERGE) &&
!AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo))
elog(ERROR, "could not find junk wholerow column");
}
else
{
+ /* No support for MERGE */
+ Assert(operation != CMD_MERGE);
/* Other valid target relkinds must provide wholerow */
resultRelInfo->ri_RowIdAttNo =
ExecFindJunkAttributeInTlist(subplan->targetlist,
@@ -3193,10 +4052,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
/*
- * If this is an inherited update/delete, there will be a junk attribute
- * named "tableoid" present in the subplan's targetlist. It will be used
- * to identify the result relation for a given tuple to be
- * updated/deleted.
+ * If this is an inherited update/delete/merge, there will be a junk
+ * attribute named "tableoid" present in the subplan's targetlist. It
+ * will be used to identify the result relation for a given tuple to be
+ * updated/deleted/merged.
*/
mtstate->mt_resultOidAttno =
ExecFindJunkAttributeInTlist(subplan->targetlist, "tableoid");
@@ -3209,8 +4068,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
/*
* Build state for tuple routing if it's a partitioned INSERT. An UPDATE
- * might need this too, but only if it actually moves tuples between
- * partitions; in that case setup is done by ExecCrossPartitionUpdate.
+ * or MERGE might need this too, but only if it actually moves tuples
+ * between partitions; in that case setup is done by
+ * ExecCrossPartitionUpdate.
*/
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
operation == CMD_INSERT)
@@ -3379,6 +4239,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
arowmarks = lappend(arowmarks, aerm);
}
+ /* For a MERGE command, initialize its state */
+ if (mtstate->operation == CMD_MERGE)
+ ExecInitMerge(mtstate, estate);
+
EvalPlanQualSetPlan(&mtstate->mt_epqstate, subplan, arowmarks);
/*