diff options
author | Amit Kapila <akapila@postgresql.org> | 2025-03-24 12:30:44 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2025-03-24 12:30:44 +0530 |
commit | 73eba5004a06a744b6b8570e42432b9e9f75997b (patch) | |
tree | da0a5610000c1b9c4fd540299dc2068231efcf82 /src/backend/replication | |
parent | 35a92b7c2520cca3df9ecddab1dcad14ff71ec0b (diff) | |
download | postgresql-73eba5004a06a744b6b8570e42432b9e9f75997b.tar.gz postgresql-73eba5004a06a744b6b8570e42432b9e9f75997b.zip |
Detect and Log multiple_unique_conflicts type conflict.
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.
Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation needs to be handled one by one
making the process slow and error-prone.
With this patch, the apply worker checks all unique constraints upfront
once the first key conflict is detected and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
In the future, this will also allow us to specify different resolution
handlers for such a conflict type.
Add the stats for this conflict type in pg_stat_subscription_stats.
Author: Nisha Moond <nisha.moond412@gmail.com>
Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Discussion: https://postgr.es/m/CABdArM7FW-_dnthGkg2s0fy1HhUB8C3ELA0gZX1kkbs1ZZoV3Q@mail.gmail.com
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/conflict.c | 64 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 68 |
2 files changed, 71 insertions, 61 deletions
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 772fc83e88b..f1e92f2fc1a 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = { [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", - [CT_DELETE_MISSING] = "delete_missing" + [CT_DELETE_MISSING] = "delete_missing", + [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; static int errcode_apply_conflict(ConflictType type); -static int errdetail_apply_conflict(EState *estate, +static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, @@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts); + TimestampTz localts, StringInfo err_msg); static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, @@ -90,30 +91,33 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, * 'searchslot' should contain the tuple used to search the local tuple to be * updated or deleted. * - * 'localslot' should contain the existing local tuple, if any, that conflicts - * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the - * transaction information related to this existing local tuple. - * * 'remoteslot' should contain the remote new tuple, if any. * - * The 'indexoid' represents the OID of the unique index that triggered the - * constraint violation error. We use this to report the key values for - * conflicting tuple. + * conflicttuples is a list of local tuples that caused the conflict and the + * conflict related information. See ConflictTupleInfo. * - * The caller must ensure that the index with the OID 'indexoid' is locked so - * that we can fetch and display the conflicting key value. + * The caller must ensure that all the indexes passed in ConflictTupleInfo are + * locked so that we can fetch and display the conflicting key values. */ void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - TupleTableSlot *localslot, TupleTableSlot *remoteslot, - Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; + StringInfoData err_detail; + + initStringInfo(&err_detail); - Assert(!OidIsValid(indexoid) || - CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); + /* Form errdetail message by combining conflicting tuples information. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + errdetail_apply_conflict(estate, relinfo, type, searchslot, + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, + &err_detail); pgstat_report_subscription_conflict(MySubscription->oid, type); @@ -123,9 +127,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, get_namespace_name(RelationGetNamespace(localrel)), RelationGetRelationName(localrel), ConflictTypeNames[type]), - errdetail_apply_conflict(estate, relinfo, type, searchslot, - localslot, remoteslot, indexoid, - localxmin, localorigin, localts)); + errdetail_internal("%s", err_detail.data)); } /* @@ -169,6 +171,7 @@ errcode_apply_conflict(ConflictType type) { case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: + case CT_MULTIPLE_UNIQUE_CONFLICTS: return errcode(ERRCODE_UNIQUE_VIOLATION); case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: @@ -191,12 +194,13 @@ errcode_apply_conflict(ConflictType type) * replica identity columns, if any. The remote old tuple is excluded as its * information is covered in the replica identity columns. */ -static int +static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + RepOriginId localorigin, TimestampTz localts, + StringInfo err_msg) { StringInfoData err_detail; char *val_desc; @@ -209,7 +213,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, { case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: - Assert(OidIsValid(indexoid)); + case CT_MULTIPLE_UNIQUE_CONFLICTS: + Assert(OidIsValid(indexoid) && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); if (localts) { @@ -291,7 +297,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (val_desc) appendStringInfo(&err_detail, "\n%s", val_desc); - return errdetail_internal("%s", err_detail.data); + /* + * Insert a blank line to visually separate the new detail line from the + * existing ones. + */ + if (err_msg->len > 0) + appendStringInfoChar(err_msg, '\n'); + + appendStringInfo(err_msg, "%s", err_detail.data); } /* @@ -323,7 +336,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, * Report the conflicting key values in the case of a unique constraint * violation. */ - if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS) + if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS || + type == CT_MULTIPLE_UNIQUE_CONFLICTS) { Assert(OidIsValid(indexoid) && localslot); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 31ab69ea13a..e3b2b144942 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; - TupleTableSlot *localslot; + TupleTableSlot *localslot = NULL; + ConflictTupleInfo conflicttuple = {0}; bool found; MemoryContext oldctx; @@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, newslot = table_slot_create(localrel, &estate->es_tupleTable); slot_store_data(newslot, relmapentry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot, localslot, newslot, - InvalidOid, localxmin, localorigin, localts); + remoteslot, newslot, + list_make1(&conflicttuple)); } /* Process and store remote tuple in the slot */ @@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + remoteslot, newslot, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; + ConflictTupleInfo conflicttuple = {0}; bool found; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); @@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + { + conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, - remoteslot, localslot, NULL, - InvalidOid, localxmin, localorigin, localts); + remoteslot, NULL, + list_make1(&conflicttuple)); + } EvalPlanQualSetSlot(&epqstate, localslot); @@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, - remoteslot, NULL, NULL, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + remoteslot, NULL, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; + ConflictTupleInfo conflicttuple = {0}; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * The tuple to be updated could not be found. Do nothing * except for emitting a log message. */ - ReportApplyConflict(estate, partrelinfo, - LOG, CT_UPDATE_MISSING, - remoteslot_part, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + ReportApplyConflict(estate, partrelinfo, LOG, + CT_UPDATE_MISSING, remoteslot_part, + newslot, list_make1(&conflicttuple)); return; } @@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a * different origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, newslot = table_slot_create(partrel, &estate->es_tupleTable); slot_store_data(newslot, part_entry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot_part, localslot, newslot, - InvalidOid, localxmin, localorigin, - localts); + remoteslot_part, newslot, + list_make1(&conflicttuple)); } /* |