diff options
author | Amit Kapila <akapila@postgresql.org> | 2024-08-20 08:35:11 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2024-08-20 08:35:11 +0530 |
commit | 9758174e2e5cd278cf37e0980da76b51890e0011 (patch) | |
tree | 9ca019972be8f6b4b20acd98cdeb12a9475851e9 /src/backend/executor/execReplication.c | |
parent | adf97c1562380e02acd60dc859c289ed3a8352ee (diff) | |
download | postgresql-9758174e2e5cd278cf37e0980da76b51890e0011.tar.gz postgresql-9758174e2e5cd278cf37e0980da76b51890e0011.zip |
Log the conflicts while applying changes in logical replication.
This patch provides the additional logging information in the following
conflict scenarios while applying changes:
insert_exists: Inserting a row that violates a NOT DEFERRABLE unique constraint.
update_differ: Updating a row that was previously modified by another origin.
update_exists: The updated row value violates a NOT DEFERRABLE unique constraint.
update_missing: The tuple to be updated is missing.
delete_differ: Deleting a row that was previously modified by another origin.
delete_missing: The tuple to be deleted is missing.
For insert_exists and update_exists conflicts, the log can include the origin
and commit timestamp details of the conflicting key with track_commit_timestamp
enabled.
update_differ and delete_differ conflicts can only be detected when
track_commit_timestamp is enabled on the subscriber.
We do not offer additional logging for exclusion constraint violations because
these constraints can specify rules that are more complex than simple equality
checks. Resolving such conflicts won't be straightforward. This area can be
further enhanced if required.
Author: Hou Zhijie
Reviewed-by: Shveta Malik, Amit Kapila, Nisha Moond, Hayato Kuroda, Dilip Kumar
Discussion: https://postgr.es/m/OS0PR01MB5716352552DFADB8E9AD1D8994C92@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r-- | src/backend/executor/execReplication.c | 236 |
1 files changed, 176 insertions, 60 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd5778..1086cbc9624 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" +#include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" #include "utils/builtins.h" @@ -166,6 +167,51 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, return skey_attoff; } + +/* + * Helper function to check if it is necessary to re-fetch and lock the tuple + * due to concurrent modifications. This function should be called after + * invoking table_tuple_lock. + */ +static bool +should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) +{ + bool refetch = false; + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + refetch = true; + break; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + refetch = true; + break; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + + return refetch; +} + /* * Search the relation 'rel' for tuple using the index. * @@ -260,34 +306,8 @@ retry: PopActiveSnapshot(); - switch (res) - { - case TM_Ok: - break; - case TM_Updated: - /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - goto retry; - case TM_Invisible: - elog(ERROR, "attempted to lock invisible tuple"); - break; - default: - elog(ERROR, "unexpected table_tuple_lock status: %u", res); - break; - } + if (should_refetch_tuple(res, &tmfd)) + goto retry; } index_endscan(scan); @@ -444,34 +464,8 @@ retry: PopActiveSnapshot(); - switch (res) - { - case TM_Ok: - break; - case TM_Updated: - /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - goto retry; - case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - goto retry; - case TM_Invisible: - elog(ERROR, "attempted to lock invisible tuple"); - break; - default: - elog(ERROR, "unexpected table_tuple_lock status: %u", res); - break; - } + if (should_refetch_tuple(res, &tmfd)) + goto retry; } table_endscan(scan); @@ -481,6 +475,89 @@ retry: } /* + * Find the tuple that violates the passed unique index (conflictindex). + * + * If the conflicting tuple is found return true, otherwise false. + * + * We lock the tuple to avoid getting it deleted before the caller can fetch + * the required information. Note that if the tuple is deleted before a lock + * is acquired, we will retry to find the conflicting tuple again. + */ +static bool +FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, + Oid conflictindex, TupleTableSlot *slot, + TupleTableSlot **conflictslot) +{ + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointerData conflictTid; + TM_FailureData tmfd; + TM_Result res; + + *conflictslot = NULL; + +retry: + if (ExecCheckIndexConstraints(resultRelInfo, slot, estate, + &conflictTid, &slot->tts_tid, + list_make1_oid(conflictindex))) + { + if (*conflictslot) + ExecDropSingleTupleTableSlot(*conflictslot); + + *conflictslot = NULL; + return false; + } + + *conflictslot = table_slot_create(rel, NULL); + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(), + *conflictslot, + GetCurrentCommandId(false), + LockTupleShare, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + if (should_refetch_tuple(res, &tmfd)) + goto retry; + + return true; +} + +/* + * Check all the unique indexes in 'recheckIndexes' for conflict with the + * tuple in 'remoteslot' and report if found. + */ +static void +CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, + ConflictType type, List *recheckIndexes, + TupleTableSlot *searchslot, TupleTableSlot *remoteslot) +{ + /* Check all the unique indexes for a conflict */ + foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes) + { + TupleTableSlot *conflictslot; + + if (list_member_oid(recheckIndexes, uniqueidx) && + FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot, + &conflictslot)) + { + RepOriginId origin; + TimestampTz committs; + TransactionId xmin; + + GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs); + ReportApplyConflict(estate, resultRelInfo, ERROR, type, + searchslot, conflictslot, remoteslot, + uniqueidx, xmin, origin, committs); + } + } +} + +/* * Insert tuple represented in the slot to the relation, update the indexes, * and execute any constraints and per-row triggers. * @@ -509,6 +586,8 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, if (!skip_tuple) { List *recheckIndexes = NIL; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -525,10 +604,33 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, false, - NULL, NIL, false); + slot, estate, false, + conflictindexes ? true : false, + &conflict, + conflictindexes, false); + + /* + * Checks the conflict indexes to fetch the conflicting local tuple + * and reports the conflict. We perform this check here, instead of + * performing an additional index scan before the actual insertion and + * reporting the conflict if any conflicting tuples are found. This is + * to avoid the overhead of executing the extra scan for each INSERT + * operation, even when no conflict arises, which could introduce + * significant overhead to replication, particularly in cases where + * conflicts are rare. + * + * XXX OTOH, this could lead to clean-up effort for dead tuples added + * in heap and index in case of conflicts. But as conflicts shouldn't + * be a frequent thing so we preferred to save the performance + * overhead of extra scan before each insertion. + */ + if (conflict) + CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS, + recheckIndexes, NULL, slot); /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, slot, @@ -577,6 +679,8 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, { List *recheckIndexes = NIL; TU_UpdateIndexes update_indexes; + List *conflictindexes; + bool conflict = false; /* Compute stored generated columns */ if (rel->rd_att->constr && @@ -593,12 +697,24 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, simple_table_tuple_update(rel, tid, slot, estate->es_snapshot, &update_indexes); + conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes; + if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None)) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, true, false, - NULL, NIL, + slot, estate, true, + conflictindexes ? true : false, + &conflict, conflictindexes, (update_indexes == TU_Summarizing)); + /* + * Refer to the comments above the call to CheckAndReportConflict() in + * ExecSimpleRelationInsert to understand why this check is done at + * this point. + */ + if (conflict) + CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS, + recheckIndexes, searchslot, slot); + /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, NULL, NULL, |