diff options
Diffstat (limited to 'src/backend/executor/nodeModifyTable.c')
-rw-r--r-- | src/backend/executor/nodeModifyTable.c | 936 |
1 files changed, 900 insertions, 36 deletions
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 701fe052967..171575cd73b 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -24,11 +24,20 @@ * values plus row-locating info for UPDATE and MERGE cases, or just the * row-locating info for DELETE cases. * + * MERGE runs a join between the source relation and the target + * table; if any WHEN NOT MATCHED clauses are present, then the + * join is an outer join. In this case, any unmatched tuples will + * have NULL row-locating info, and only INSERT can be run. But for + * matched tuples, then row-locating info is used to determine the + * tuple to UPDATE or DELETE. When all clauses are WHEN MATCHED, + * then an inner join is used, so all tuples contain row-locating info. + * * If the query specifies RETURNING, then the ModifyTable returns a * RETURNING tuple after completing each row insert, update, or delete. * It must be called again to continue the operation. Without RETURNING, * we just loop within the node until all the work is done, then - * return NULL. This avoids useless call/return overhead. + * return NULL. This avoids useless call/return overhead. (MERGE does + * not support RETURNING.) */ #include "postgres.h" @@ -78,6 +87,17 @@ typedef struct ModifyTableContext */ TupleTableSlot *planSlot; + /* + * During EvalPlanQual, project and return the new version of the new + * tuple + */ + TupleTableSlot *(*GetUpdateNewTuple) (ResultRelInfo *resultRelInfo, + TupleTableSlot *epqslot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + + /* MERGE specific */ + MergeActionState *relaction; /* MERGE action in progress */ /* * Information about the changes that were made concurrently to a tuple @@ -140,6 +160,28 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate, ResultRelInfo *targetRelInfo, TupleTableSlot *slot, ResultRelInfo **partRelInfo); +static TupleTableSlot *internalGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + +static TupleTableSlot *ExecMerge(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + bool canSetTag); +static void ExecInitMerge(ModifyTableState *mtstate, EState *estate); +static bool ExecMergeMatched(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + bool canSetTag); +static void ExecMergeNotMatched(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + bool canSetTag); +static TupleTableSlot *mergeGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + /* * Verify that the tuples to be produced by INSERT match the @@ -616,21 +658,32 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, TupleTableSlot *oldSlot) { - ProjectionInfo *newProj = relinfo->ri_projectNew; - ExprContext *econtext; - /* Use a few extra Asserts to protect against outside callers */ Assert(relinfo->ri_projectNewInfoValid); Assert(planSlot != NULL && !TTS_EMPTY(planSlot)); Assert(oldSlot != NULL && !TTS_EMPTY(oldSlot)); + return internalGetUpdateNewTuple(relinfo, planSlot, oldSlot, NULL); +} + +/* + * Callback for ModifyTableState->GetUpdateNewTuple for use by regular UPDATE. + */ +static TupleTableSlot * +internalGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ProjectionInfo *newProj = relinfo->ri_projectNew; + ExprContext *econtext; + econtext = newProj->pi_exprContext; econtext->ecxt_outertuple = planSlot; econtext->ecxt_scantuple = oldSlot; return ExecProject(newProj); } - /* ---------------------------------------------------------------- * ExecInsert * @@ -847,9 +900,17 @@ ExecInsert(ModifyTableContext *context, * partition, we should instead check UPDATE policies, because we are * executing policies defined on the target table, and not those * defined on the child partitions. + * + * If we're running MERGE, we refer to the action that we're executing + * to know if we're doing an INSERT or UPDATE to a partition table. */ - wco_kind = (mtstate->operation == CMD_UPDATE) ? - WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + if (mtstate->operation == CMD_UPDATE) + wco_kind = WCO_RLS_UPDATE_CHECK; + else if (mtstate->operation == CMD_MERGE) + wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ? + WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + else + wco_kind = WCO_RLS_INSERT_CHECK; /* * ExecWithCheckOptions() will skip any WCOs which are not of the kind @@ -1453,7 +1514,13 @@ ldelete:; ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent delete"))); - /* tuple already deleted; nothing to do */ + + /* + * tuple already deleted; nothing to do. But MERGE might want + * to handle it differently. We've already filled-in + * actionInfo with sufficient information for MERGE to look + * at. + */ return NULL; default: @@ -1659,7 +1726,8 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, elog(ERROR, "failed to fetch tuple being updated"); /* and project the new tuple to retry the UPDATE with */ context->cpUpdateRetrySlot = - ExecGetUpdateNewTuple(resultRelInfo, epqslot, oldSlot); + context->GetUpdateNewTuple(resultRelInfo, epqslot, oldSlot, + context->relaction); return false; } } @@ -1718,7 +1786,8 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) return ExecBRUpdateTriggers(context->estate, context->epqstate, - resultRelInfo, tupleid, oldtuple, slot); + resultRelInfo, tupleid, oldtuple, slot, + &context->tmfd); return true; } @@ -1865,6 +1934,13 @@ lreplace:; } /* + * No luck, a retry is needed. If running MERGE, we do not do so + * here; instead let it handle that on its own rules. + */ + if (context->relaction != NULL) + return TM_Updated; + + /* * ExecCrossPartitionUpdate installed an updated version of the new * tuple in the retry slot; start over. */ @@ -2109,8 +2185,8 @@ redo_act: /* * If ExecUpdateAct reports that a cross-partition update was done, - * then the returning tuple has been projected and there's nothing - * else for us to do. + * then the RETURNING tuple (if any) has been projected and there's + * nothing else for us to do. */ if (updateCxt.crossPartUpdate) return context->cpUpdateReturningSlot; @@ -2337,9 +2413,9 @@ ExecOnConflictUpdate(ModifyTableContext *context, * to break. * * It is the user's responsibility to prevent this situation from - * occurring. These problems are why SQL-2003 similarly specifies - * that for SQL MERGE, an exception must be raised in the event of - * an attempt to update the same row twice. + * occurring. These problems are why the SQL standard similarly + * specifies that for SQL MERGE, an exception must be raised in + * the event of an attempt to update the same row twice. */ xminDatum = slot_getsysattr(existing, MinTransactionIdAttributeNumber, @@ -2350,7 +2426,9 @@ ExecOnConflictUpdate(ModifyTableContext *context, if (TransactionIdIsCurrentTransactionId(xmin)) ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), - errmsg("ON CONFLICT DO UPDATE command cannot affect row a second time"), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", + "ON CONFLICT DO UPDATE"), errhint("Ensure that no rows proposed for insertion within the same command have duplicate constrained values."))); /* This shouldn't happen */ @@ -2490,6 +2568,705 @@ ExecOnConflictUpdate(ModifyTableContext *context, return true; } +/* + * Perform MERGE. + */ +static TupleTableSlot * +ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag) +{ + bool matched; + + /*----- + * If we are dealing with a WHEN MATCHED case (tupleid is valid), we + * execute the first action for which the additional WHEN MATCHED AND + * quals pass. If an action without quals is found, that action is + * executed. + * + * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at + * the given WHEN NOT MATCHED actions in sequence until one passes. + * + * Things get interesting in case of concurrent update/delete of the + * target tuple. Such concurrent update/delete is detected while we are + * executing a WHEN MATCHED action. + * + * A concurrent update can: + * + * 1. modify the target tuple so that it no longer satisfies the + * additional quals attached to the current WHEN MATCHED action + * + * In this case, we are still dealing with a WHEN MATCHED case. + * We recheck the list of WHEN MATCHED actions from the start and + * choose the first one that satisfies the new target tuple. + * + * 2. modify the target tuple so that the join quals no longer pass and + * hence the source tuple no longer has a match. + * + * In this case, the source tuple no longer matches the target tuple, + * so we now instead find a qualifying WHEN NOT MATCHED action to + * execute. + * + * XXX Hmmm, what if the updated tuple would now match one that was + * considered NOT MATCHED so far? + * + * A concurrent delete changes a WHEN MATCHED case to WHEN NOT MATCHED. + * + * ExecMergeMatched takes care of following the update chain and + * re-finding the qualifying WHEN MATCHED action, as long as the updated + * target tuple still satisfies the join quals, i.e., it remains a WHEN + * MATCHED case. If the tuple gets deleted or the join quals fail, it + * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched + * always make progress by following the update chain and we never switch + * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a + * livelock. + */ + matched = tupleid != NULL; + if (matched) + matched = ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag); + + /* + * Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched() + * returned "false", indicating the previously MATCHED tuple no longer + * matches. + */ + if (!matched) + ExecMergeNotMatched(context, resultRelInfo, canSetTag); + + /* No RETURNING support yet */ + return NULL; +} + +/* + * Check and execute the first qualifying MATCHED action. The current target + * tuple is identified by tupleid. + * + * We start from the first WHEN MATCHED action and check if the WHEN quals + * pass, if any. If the WHEN quals for the first action do not pass, we + * check the second, then the third and so on. If we reach to the end, no + * action is taken and we return true, indicating that no further action is + * required for this tuple. + * + * If we do find a qualifying action, then we attempt to execute the action. + * + * If the tuple is concurrently updated, EvalPlanQual is run with the updated + * tuple to recheck the join quals. Note that the additional quals associated + * with individual actions are evaluated by this routine via ExecQual, while + * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the + * updated tuple still passes the join quals, then we restart from the first + * action to look for a qualifying action. Otherwise, we return false -- + * meaning that a NOT MATCHED action must now be executed for the current + * source tuple. + */ +static bool +ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + TupleTableSlot *newslot; + EState *estate = context->estate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + bool isNull; + EPQState *epqstate = &mtstate->mt_epqstate; + ListCell *l; + + /* + * If there are no WHEN MATCHED actions, we are done. + */ + if (resultRelInfo->ri_matchedMergeAction == NIL) + return true; + + /* + * Make tuple and any needed join variables available to ExecQual and + * ExecProject. The target's existing tuple is installed in the scantuple. + * Again, this target relation's slot is required only in the case of a + * MATCHED tuple and UPDATE/DELETE actions. + */ + econtext->ecxt_scantuple = resultRelInfo->ri_oldTupleSlot; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + +lmerge_matched:; + + /* + * This routine is only invoked for matched rows, and we must have found + * the tupleid of the target row in that case; fetch that tuple. + * + * We use SnapshotAny for this because we might get called again after + * EvalPlanQual returns us a new tuple, which may not be visible to our + * MVCC snapshot. + */ + + if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc, + tupleid, + SnapshotAny, + resultRelInfo->ri_oldTupleSlot)) + elog(ERROR, "failed to fetch the target tuple"); + + foreach(l, resultRelInfo->ri_matchedMergeAction) + { + MergeActionState *relaction = (MergeActionState *) lfirst(l); + CmdType commandType = relaction->mas_action->commandType; + List *recheckIndexes = NIL; + TM_Result result; + UpdateContext updateCxt = {0}; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(relaction->mas_whenqual, econtext)) + continue; + + /* + * Check if the existing target tuple meets the USING checks of + * UPDATE/DELETE RLS policies. If those checks fail, we throw an + * error. + * + * The WITH CHECK quals are applied in ExecUpdate() and hence we need + * not do anything special to handle them. + * + * NOTE: We must do this after WHEN quals are evaluated, so that we + * check policies only when they matter. + */ + if (resultRelInfo->ri_WithCheckOptions) + { + ExecWithCheckOptions(commandType == CMD_UPDATE ? + WCO_RLS_MERGE_UPDATE_CHECK : WCO_RLS_MERGE_DELETE_CHECK, + resultRelInfo, + resultRelInfo->ri_oldTupleSlot, + context->mtstate->ps.state); + } + + /* Perform stated action */ + switch (commandType) + { + case CMD_UPDATE: + + /* + * Project the output tuple, and use that to update the table. + * We don't need to filter out junk attributes, because the + * UPDATE action's targetlist doesn't have any. + */ + newslot = ExecProject(relaction->mas_proj); + + context->relaction = relaction; + context->GetUpdateNewTuple = mergeGetUpdateNewTuple; + context->cpUpdateRetrySlot = NULL; + + if (!ExecUpdatePrologue(context, resultRelInfo, + tupleid, NULL, newslot)) + { + result = TM_Ok; + break; + } + ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate); + result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL, + newslot, mtstate->canSetTag, &updateCxt); + if (result == TM_Ok && updateCxt.updated) + { + ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, + tupleid, NULL, newslot, recheckIndexes); + mtstate->mt_merge_updated += 1; + } + + break; + + case CMD_DELETE: + context->relaction = relaction; + if (!ExecDeletePrologue(context, resultRelInfo, tupleid, + NULL, NULL)) + { + result = TM_Ok; + break; + } + result = ExecDeleteAct(context, resultRelInfo, tupleid, false); + if (result == TM_Ok) + { + ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL, + false); + mtstate->mt_merge_deleted += 1; + } + break; + + case CMD_NOTHING: + /* Doing nothing is always OK */ + result = TM_Ok; + break; + + default: + elog(ERROR, "unknown action in MERGE WHEN MATCHED clause"); + } + + switch (result) + { + case TM_Ok: + /* all good; perform final actions */ + if (canSetTag) + (estate->es_processed)++; + + break; + + case TM_SelfModified: + + /* + * The SQL standard disallows this for MERGE. + */ + if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax)) + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", + "MERGE"), + errhint("Ensure that not more than one source row matches any one target row."))); + /* This shouldn't happen */ + elog(ERROR, "attempted to update or delete invisible tuple"); + break; + + case TM_Deleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); + + /* + * If the tuple was already deleted, return to let caller + * handle it under NOT MATCHED clauses. + */ + return false; + + case TM_Updated: + { + Relation resultRelationDesc; + TupleTableSlot *epqslot, + *inputslot; + LockTupleMode lockmode; + + /* + * The target tuple was concurrently updated by some other + * transaction. + */ + + /* + * If cpUpdateRetrySlot is set, ExecCrossPartitionUpdate() + * must have detected that the tuple was concurrently + * updated, so we restart the search for an appropriate + * WHEN MATCHED clause to process the updated tuple. + * + * In this case, ExecDelete() would already have performed + * EvalPlanQual() on the latest version of the tuple, + * which in turn would already have been loaded into + * ri_oldTupleSlot, so no need to do either of those + * things. + * + * XXX why do we not check the WHEN NOT MATCHED list in + * this case? + */ + if (!TupIsNull(context->cpUpdateRetrySlot)) + goto lmerge_matched; + + /* + * Otherwise, we run the EvalPlanQual() with the new + * version of the tuple. If EvalPlanQual() does not return + * a tuple, then we switch to the NOT MATCHED list of + * actions. If it does return a tuple and the join qual is + * still satisfied, then we just need to recheck the + * MATCHED actions, starting from the top, and execute the + * first qualifying action. + */ + resultRelationDesc = resultRelInfo->ri_RelationDesc; + lockmode = ExecUpdateLockMode(estate, resultRelInfo); + + inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc, + resultRelInfo->ri_RangeTableIndex); + + result = table_tuple_lock(resultRelationDesc, tupleid, + estate->es_snapshot, + inputslot, estate->es_output_cid, + lockmode, LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &context->tmfd); + switch (result) + { + case TM_Ok: + epqslot = EvalPlanQual(epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + + /* + * If we got no tuple, or the tuple we get has a + * NULL ctid, go back to caller: this one is not a + * MATCHED tuple anymore, so they can retry with + * NOT MATCHED actions. + */ + if (TupIsNull(epqslot)) + return false; + + (void) ExecGetJunkAttribute(epqslot, + resultRelInfo->ri_RowIdAttNo, + &isNull); + if (isNull) + return false; + + /* + * When a tuple was updated and migrated to + * another partition concurrently, the current + * MERGE implementation can't follow. There's + * probably a better way to handle this case, but + * it'd require recognizing the relation to which + * the tuple moved, and setting our current + * resultRelInfo to that. + */ + if (ItemPointerIndicatesMovedPartitions(&context->tmfd.ctid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be deleted was already moved to another partition due to concurrent update"))); + + /* + * A non-NULL ctid means that we are still dealing + * with MATCHED case. Restart the loop so that we + * apply all the MATCHED rules again, to ensure + * that the first qualifying WHEN MATCHED action + * is executed. + * + * Update tupleid to that of the new tuple, for + * the refetch we do at the top. + */ + ItemPointerCopy(&context->tmfd.ctid, tupleid); + goto lmerge_matched; + + case TM_Deleted: + + /* + * tuple already deleted; tell caller to run NOT + * MATCHED actions + */ + return false; + + case TM_SelfModified: + + /* + * This can be reached when following an update + * chain from a tuple updated by another session, + * reaching a tuple that was already updated in + * this transaction. If previously modified by + * this command, ignore the redundant update, + * otherwise error out. + * + * See also response to TM_SelfModified in + * ExecUpdate(). + */ + if (context->tmfd.cmax != estate->es_output_cid) + ereport(ERROR, + (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), + errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"), + errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); + return false; + + default: + /* see table_tuple_lock call in ExecDelete() */ + elog(ERROR, "unexpected table_tuple_lock status: %u", + result); + return false; + } + } + + case TM_Invisible: + case TM_WouldBlock: + case TM_BeingModified: + /* these should not occur */ + elog(ERROR, "unexpected tuple operation result: %d", result); + break; + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } + + /* + * Successfully executed an action or no qualifying action was found. + */ + return true; +} + +/* + * Execute the first qualifying NOT MATCHED action. + */ +static void +ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + List *actionStates = NIL; + ListCell *l; + + /* + * For INSERT actions, the root relation's merge action is OK since the + * INSERT's targetlist and the WHEN conditions can only refer to the + * source relation and hence it does not matter which result relation we + * work with. + * + * XXX does this mean that we can avoid creating copies of actionStates on + * partitioned tables, for not-matched actions? + */ + actionStates = resultRelInfo->ri_notMatchedMergeAction; + + /* + * Make source tuple available to ExecQual and ExecProject. We don't need + * the target tuple, since the WHEN quals and targetlist can't refer to + * the target columns. + */ + econtext->ecxt_scantuple = NULL; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + + foreach(l, actionStates) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + TupleTableSlot *newslot; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(action->mas_whenqual, econtext)) + continue; + + /* Perform stated action */ + switch (commandType) + { + case CMD_INSERT: + + /* + * Project the tuple. In case of a partitioned table, the + * projection was already built to use the root's descriptor, + * so we don't need to map the tuple here. + */ + newslot = ExecProject(action->mas_proj); + context->relaction = action; + + (void) ExecInsert(context, mtstate->rootResultRelInfo, newslot, + canSetTag, NULL, NULL); + mtstate->mt_merge_inserted += 1; + break; + case CMD_NOTHING: + /* Do nothing */ + break; + default: + elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause"); + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } +} + +/* + * Initialize state for execution of MERGE. + */ +void +ExecInitMerge(ModifyTableState *mtstate, EState *estate) +{ + ModifyTable *node = (ModifyTable *) mtstate->ps.plan; + ResultRelInfo *rootRelInfo = mtstate->rootResultRelInfo; + ResultRelInfo *resultRelInfo; + ExprContext *econtext; + ListCell *lc; + int i; + + if (node->mergeActionLists == NIL) + return; + + mtstate->mt_merge_subcommands = 0; + + if (mtstate->ps.ps_ExprContext == NULL) + ExecAssignExprContext(estate, &mtstate->ps); + econtext = mtstate->ps.ps_ExprContext; + + /* + * Create a MergeActionState for each action on the mergeActionList and + * add it to either a list of matched actions or not-matched actions. + * + * Similar logic appears in ExecInitPartitionInfo(), so if changing + * anything here, do so there too. + */ + i = 0; + foreach(lc, node->mergeActionLists) + { + List *mergeActionList = lfirst(lc); + TupleDesc relationDesc; + ListCell *l; + + resultRelInfo = mtstate->resultRelInfo + i; + i++; + relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + + /* initialize slots for MERGE fetches from this rel */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitMergeTupleSlots(mtstate, resultRelInfo); + + foreach(l, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + MergeActionState *action_state; + TupleTableSlot *tgtslot; + TupleDesc tgtdesc; + List **list; + + /* + * Build action merge state for this rel. (For partitions, + * equivalent code exists in ExecInitPartitionInfo.) + */ + action_state = makeNode(MergeActionState); + action_state->mas_action = action; + action_state->mas_whenqual = ExecInitQual((List *) action->qual, + &mtstate->ps); + + /* + * We create two lists - one for WHEN MATCHED actions and one for + * WHEN NOT MATCHED actions - and stick the MergeActionState into + * the appropriate list. + */ + if (action_state->mas_action->matched) + list = &resultRelInfo->ri_matchedMergeAction; + else + list = &resultRelInfo->ri_notMatchedMergeAction; + *list = lappend(*list, action_state); + + switch (action->commandType) + { + case CMD_INSERT: + ExecCheckPlanOutput(rootRelInfo->ri_RelationDesc, + action->targetList); + + /* + * If the MERGE targets a partitioned table, any INSERT + * actions must be routed through it, not the child + * relations. Initialize the routing struct and the root + * table's "new" tuple slot for that, if not already done. + * The projection we prepare, for all relations, uses the + * root relation descriptor, and targets the plan's root + * slot. (This is consistent with the fact that we + * checked the plan output to match the root relation, + * above.) + */ + if (rootRelInfo->ri_RelationDesc->rd_rel->relkind == + RELKIND_PARTITIONED_TABLE) + { + if (mtstate->mt_partition_tuple_routing == NULL) + { + /* + * Initialize planstate for routing if not already + * done. + * + * Note that the slot is managed as a standalone + * slot belonging to ModifyTableState, so we pass + * NULL for the 2nd argument. + */ + mtstate->mt_root_tuple_slot = + table_slot_create(rootRelInfo->ri_RelationDesc, + NULL); + mtstate->mt_partition_tuple_routing = + ExecSetupPartitionTupleRouting(estate, + rootRelInfo->ri_RelationDesc); + } + tgtslot = mtstate->mt_root_tuple_slot; + tgtdesc = RelationGetDescr(rootRelInfo->ri_RelationDesc); + } + else + { + /* not partitioned? use the stock relation and slot */ + tgtslot = resultRelInfo->ri_newTupleSlot; + tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + } + + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + tgtslot, + &mtstate->ps, + tgtdesc); + + mtstate->mt_merge_subcommands |= MERGE_INSERT; + break; + case CMD_UPDATE: + action_state->mas_proj = + ExecBuildUpdateProjection(action->targetList, + true, + action->updateColnos, + relationDesc, + econtext, + resultRelInfo->ri_newTupleSlot, + &mtstate->ps); + mtstate->mt_merge_subcommands |= MERGE_UPDATE; + break; + case CMD_DELETE: + mtstate->mt_merge_subcommands |= MERGE_DELETE; + break; + case CMD_NOTHING: + break; + default: + elog(ERROR, "unknown operation"); + break; + } + } + } +} + +/* + * Initializes the tuple slots in a ResultRelInfo for any MERGE action. + * + * We mark 'projectNewInfoValid' even though the projections themselves + * are not initialized here. + */ +void +ExecInitMergeTupleSlots(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo) +{ + EState *estate = mtstate->ps.state; + + Assert(!resultRelInfo->ri_projectNewInfoValid); + + resultRelInfo->ri_oldTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_newTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_projectNewInfoValid = true; +} + +/* + * Callback for ModifyTableContext->GetUpdateNewTuple for use by MERGE. It + * computes the updated tuple by projecting from the current merge action's + * projection. + */ +static TupleTableSlot * +mergeGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ExprContext *econtext = relaction->mas_proj->pi_exprContext; + + econtext->ecxt_scantuple = oldSlot; + econtext->ecxt_innertuple = planSlot; + + return ExecProject(relaction->mas_proj); +} /* * Process BEFORE EACH STATEMENT triggers @@ -2514,6 +3291,14 @@ fireBSTriggers(ModifyTableState *node) case CMD_DELETE: ExecBSDeleteTriggers(node->ps.state, resultRelInfo); break; + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecBSInsertTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecBSUpdateTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecBSDeleteTriggers(node->ps.state, resultRelInfo); + break; default: elog(ERROR, "unknown operation"); break; @@ -2547,6 +3332,17 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); break; + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecASDeleteTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecASUpdateTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecASInsertTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + break; default: elog(ERROR, "unknown operation"); break; @@ -2749,7 +3545,28 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(planSlot, node->mt_resultOidAttno, &isNull); if (isNull) + { + /* + * For commands other than MERGE, any tuples having InvalidOid + * for tableoid are errors. For MERGE, we may need to handle + * them as WHEN NOT MATCHED clauses if any, so do that. + * + * Note that we use the node's toplevel resultRelInfo, not any + * specific partition's. + */ + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); + + context.planSlot = planSlot; + context.lockmode = 0; + + ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag); + continue; /* no RETURNING support yet */ + } + elog(ERROR, "tableoid is NULL"); + } resultoid = DatumGetObjectId(datum); /* If it's not the same as last time, we need to locate the rel */ @@ -2784,13 +3601,14 @@ ExecModifyTable(PlanState *pstate) oldtuple = NULL; /* - * For UPDATE/DELETE, fetch the row identity info for the tuple to be - * updated/deleted. For a heap relation, that's a TID; otherwise we - * may have a wholerow junk attr that carries the old tuple in toto. - * Keep this in step with the part of ExecInitModifyTable that sets up - * ri_RowIdAttNo. + * For UPDATE/DELETE/MERGE, fetch the row identity info for the tuple + * to be updated/deleted/merged. For a heap relation, that's a TID; + * otherwise we may have a wholerow junk attr that carries the old + * tuple in toto. Keep this in step with the part of + * ExecInitModifyTable that sets up ri_RowIdAttNo. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { char relkind; Datum datum; @@ -2806,9 +3624,30 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(slot, resultRelInfo->ri_RowIdAttNo, &isNull); - /* shouldn't ever get a null result... */ + + /* + * For commands other than MERGE, any tuples having a null row + * identifier are errors. For MERGE, we may need to handle + * them as WHEN NOT MATCHED clauses if any, so do that. + * + * Note that we use the node's toplevel resultRelInfo, not any + * specific partition's. + */ if (isNull) + { + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); + + context.planSlot = planSlot; + context.lockmode = 0; + + ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag); + continue; /* no RETURNING support yet */ + } + elog(ERROR, "ctid is NULL"); + } tupleid = (ItemPointer) DatumGetPointer(datum); tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ @@ -2898,8 +3737,10 @@ ExecModifyTable(PlanState *pstate) oldSlot)) elog(ERROR, "failed to fetch tuple being updated"); } - slot = ExecGetUpdateNewTuple(resultRelInfo, planSlot, - oldSlot); + slot = internalGetUpdateNewTuple(resultRelInfo, planSlot, + oldSlot, NULL); + context.GetUpdateNewTuple = internalGetUpdateNewTuple; + context.relaction = NULL; /* Now apply the update. */ slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, @@ -2911,6 +3752,10 @@ ExecModifyTable(PlanState *pstate) true, false, node->canSetTag, NULL, NULL); break; + case CMD_MERGE: + slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag); + break; + default: elog(ERROR, "unknown operation"); break; @@ -3044,6 +3889,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->resultRelInfo = (ResultRelInfo *) palloc(nrels * sizeof(ResultRelInfo)); + mtstate->mt_merge_inserted = 0; + mtstate->mt_merge_updated = 0; + mtstate->mt_merge_deleted = 0; + /*---------- * Resolve the target relation. This is the same as: * @@ -3147,12 +3996,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* - * For UPDATE/DELETE, find the appropriate junk attr now, either a - * 'ctid' or 'wholerow' attribute depending on relkind. For foreign + * For UPDATE/DELETE/MERGE, find the appropriate junk attr now, either + * a 'ctid' or 'wholerow' attribute depending on relkind. For foreign * tables, the FDW might have created additional junk attr(s), but * those are no concern of ours. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { char relkind; @@ -3169,19 +4019,28 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) else if (relkind == RELKIND_FOREIGN_TABLE) { /* + * We don't support MERGE with foreign tables for now. (It's + * problematic because the implementation uses CTID.) + */ + Assert(operation != CMD_MERGE); + + /* * When there is a row-level trigger, there should be a * wholerow attribute. We also require it to be present in - * UPDATE, so we can get the values of unchanged columns. + * UPDATE and MERGE, so we can get the values of unchanged + * columns. */ resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(subplan->targetlist, "wholerow"); - if (mtstate->operation == CMD_UPDATE && + if ((mtstate->operation == CMD_UPDATE || mtstate->operation == CMD_MERGE) && !AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo)) elog(ERROR, "could not find junk wholerow column"); } else { + /* No support for MERGE */ + Assert(operation != CMD_MERGE); /* Other valid target relkinds must provide wholerow */ resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(subplan->targetlist, @@ -3193,10 +4052,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* - * If this is an inherited update/delete, there will be a junk attribute - * named "tableoid" present in the subplan's targetlist. It will be used - * to identify the result relation for a given tuple to be - * updated/deleted. + * If this is an inherited update/delete/merge, there will be a junk + * attribute named "tableoid" present in the subplan's targetlist. It + * will be used to identify the result relation for a given tuple to be + * updated/deleted/merged. */ mtstate->mt_resultOidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "tableoid"); @@ -3209,8 +4068,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* * Build state for tuple routing if it's a partitioned INSERT. An UPDATE - * might need this too, but only if it actually moves tuples between - * partitions; in that case setup is done by ExecCrossPartitionUpdate. + * or MERGE might need this too, but only if it actually moves tuples + * between partitions; in that case setup is done by + * ExecCrossPartitionUpdate. */ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && operation == CMD_INSERT) @@ -3379,6 +4239,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) arowmarks = lappend(arowmarks, aerm); } + /* For a MERGE command, initialize its state */ + if (mtstate->operation == CMD_MERGE) + ExecInitMerge(mtstate, estate); + EvalPlanQualSetPlan(&mtstate->mt_epqstate, subplan, arowmarks); /* |