diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execIndexing.c | 17 | ||||
-rw-r--r-- | src/backend/executor/execMain.c | 7 | ||||
-rw-r--r-- | src/backend/executor/execReplication.c | 236 | ||||
-rw-r--r-- | src/backend/executor/nodeModifyTable.c | 5 |
4 files changed, 192 insertions, 73 deletions
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 9f05b3654c1..403a3f40551 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -207,8 +207,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative) ii = BuildIndexInfo(indexDesc); /* - * If the indexes are to be used for speculative insertion, add extra - * information required by unique index entries. + * If the indexes are to be used for speculative insertion or conflict + * detection in logical replication, add extra information required by + * unique index entries. */ if (speculative && ii->ii_Unique) BuildSpeculativeIndexInfo(indexDesc, ii); @@ -519,14 +520,18 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, * * Note that this doesn't lock the values in any way, so it's * possible that a conflicting tuple is inserted immediately - * after this returns. But this can be used for a pre-check - * before insertion. + * after this returns. This can be used for either a pre-check + * before insertion or a re-check after finding a conflict. + * + * 'tupleid' should be the TID of the tuple that has been recently + * inserted (or can be invalid if we haven't inserted a new tuple yet). + * This tuple will be excluded from conflict checking. * ---------------------------------------------------------------- */ bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, - List *arbiterIndexes) + ItemPointer tupleid, List *arbiterIndexes) { int i; int numIndices; @@ -629,7 +634,7 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, satisfiesConstraint = check_exclusion_or_unique_constraint(heapRelation, indexRelation, - indexInfo, &invalidItemPtr, + indexInfo, tupleid, values, isnull, estate, false, CEOUC_WAIT, true, conflictTid); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 4d7c92d63c1..29e186fa73d 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -88,11 +88,6 @@ static bool ExecCheckPermissionsModified(Oid relOid, Oid userid, Bitmapset *modifiedCols, AclMode requiredPerms); static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt); -static char *ExecBuildSlotValueDescription(Oid reloid, - TupleTableSlot *slot, - TupleDesc tupdesc, - Bitmapset *modifiedCols, - int maxfieldlen); static void EvalPlanQualStart(EPQState *epqstate, Plan *planTree); /* end of local decls */ @@ -2210,7 +2205,7 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo, * column involved, that subset will be returned with a key identifying which * columns they are. */ -static char * +char * ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot, TupleDesc tupdesc, diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd5778..1086cbc9624 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" +#include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" #include "utils/builtins.h" @@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, return skey_attoff; } + +/* + * Helper function to check if it is necessary to re-fetch and lock the tuple + * due to concurrent modifications. This function should be called after + * invoking table_tuple_lock. + */ +static bool +should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) +{ + bool refetch = false; + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + refetch = true; + break; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + refetch = true; + break; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + + return refetch; +} + /* * Search the relation 'rel' for tuple using the index. * @@ -260,34 +306,8 @@ retry: PopActiveSnapshot(); - switch (res) - { - case TM_Ok: - break; - case TM_Updated: - /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - goto retry; - case TM_Invisible: - elog(ERROR, "attempted to lock invisible tuple"); - break; - default: - elog(ERROR, "unexpected table_tuple_lock status: %u", res); - break; - } + if (should_refetch_tuple(res, &tmfd)) + goto retry; } index_endscan(scan); @@ -444,34 +464,8 @@ retry: PopActiveSnapshot(); - switch (res) - { - case TM_Ok: - break; - case TM_Updated: - /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - goto retry; - case TM_Invisible: - elog(ERROR, "attempted to lock invisible tuple"); - break; - default: - elog(ERROR, "unexpected table_tuple_lock status: %u", res); - break; - } + if (should_refetch_tuple(res, &tmfd)) + goto retry; } table_endscan(scan); @@ -481,6 +475,89 @@ retry: } /* + * Find the tuple that violates the passed unique index (conflictindex). + * + * If the conflicting tuple is found return true, otherwise false. + * + * We lock the tuple to avoid getting it deleted before the caller can fetch + * the required information. Note that if the tuple is deleted before a lock + * is acquired, we will retry to find the conflicting tuple again. + */ +static bool +FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, + Oid conflictindex, TupleTableSlot *slot, + TupleTableSlot **conflictslot) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointerData conflictTid; + TM_FailureData tmfd; + TM_Result res; + + *conflictslot = NULL; + +retry: + if (ExecCheckIndexConstraints(resultRelInfo, slot, estate, + &conflictTid, &slot->tts_tid, + list_make1_oid(conflictindex))) + { + if (*conflictslot) + ExecDropSingleTupleTableSlot(*conflictslot); + + *conflictslot = NULL; + return false; + } + + *conflictslot = table_slot_create(rel, NULL); + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(), + *conflictslot, + GetCurrentCommandId(false), + LockTupleShare, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + if (should_refetch_tuple(res, &tmfd)) + goto retry; + + return true; +} + +/* + * Check all the unique indexes in 'recheckIndexes' for conflict with the + * tuple in 'remoteslot' and report if found. + */ +static void +CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, + ConflictType type, List *recheckIndexes, + TupleTableSlot *searchslot, TupleTableSlot *remoteslot) +{ + /* Check all the unique indexes for a conflict */ + foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes) + { + TupleTableSlot *conflictslot; + + if (list_member_oid(recheckIndexes, uniqueidx) && + FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot, + &conflictslot)) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + + GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(estate, resultRelInfo, ERROR, type, + searchslot, conflictslot, remoteslot, + uniqueidx, xmin, origin, committs); + } + } +} + +/* * Insert tuple represented in the slot to the relation, update the indexes, * and execute any constraints and per-row triggers. * @@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (!skip_tuple) { List *recheckIndexes = NIL; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, false, - NULL, NIL, false); + slot, estate, false, + conflictindexes ? true : false, + &conflict, + conflictindexes, false); + + /* + * Checks the conflict indexes to fetch the conflicting local tuple + * and reports the conflict. We perform this check here, instead of + * performing an additional index scan before the actual insertion and + * reporting the conflict if any conflicting tuples are found. This is + * to avoid the overhead of executing the extra scan for each INSERT + * operation, even when no conflict arises, which could introduce + * significant overhead to replication, particularly in cases where + * conflicts are rare. + * + * XXX OTOH, this could lead to clean-up effort for dead tuples added + * in heap and index in case of conflicts. But as conflicts shouldn't + * be a frequent thing so we preferred to save the performance + * overhead of extra scan before each insertion. + */ + if (conflict) + CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS, + recheckIndexes, NULL, slot); /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, slot, @@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, { List *recheckIndexes = NIL; TU_UpdateIndexes update_indexes; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, simple_table_tuple_update(rel, tid, slot, estate->es_snapshot, &update_indexes); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None)) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, true, false, - NULL, NIL, + slot, estate, true, + conflictindexes ? true : false, + &conflict, conflictindexes, (update_indexes == TU_Summarizing)); + /* + * Refer to the comments above the call to CheckAndReportConflict() in + * ExecSimpleRelationInsert to understand why this check is done at + * this point. + */ + if (conflict) + CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS, + recheckIndexes, searchslot, slot); + /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, NULL, NULL, diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 4913e493199..8bf4c80d4a0 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1019,9 +1019,11 @@ ExecInsert(ModifyTableContext *context, /* Perform a speculative insertion. */ uint32 specToken; ItemPointerData conflictTid; + ItemPointerData invalidItemPtr; bool specConflict; List *arbiterIndexes; + ItemPointerSetInvalid(&invalidItemPtr); arbiterIndexes = resultRelInfo->ri_onConflictArbiterIndexes; /* @@ -1041,7 +1043,8 @@ ExecInsert(ModifyTableContext *context, CHECK_FOR_INTERRUPTS(); specConflict = false; if (!ExecCheckIndexConstraints(resultRelInfo, slot, estate, - &conflictTid, arbiterIndexes)) + &conflictTid, &invalidItemPtr, + arbiterIndexes)) { /* committed conflict tuple found */ if (onconflict == ONCONFLICT_UPDATE) |