diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/conflict.c | 64 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 68 |
2 files changed, 71 insertions, 61 deletions
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 772fc83e88b..f1e92f2fc1a 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = { [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", - [CT_DELETE_MISSING] = "delete_missing" + [CT_DELETE_MISSING] = "delete_missing", + [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; static int errcode_apply_conflict(ConflictType type); -static int errdetail_apply_conflict(EState *estate, +static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, @@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, - TimestampTz localts); + TimestampTz localts, StringInfo err_msg); static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, @@ -90,30 +91,33 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, * 'searchslot' should contain the tuple used to search the local tuple to be * updated or deleted. * - * 'localslot' should contain the existing local tuple, if any, that conflicts - * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the - * transaction information related to this existing local tuple. - * * 'remoteslot' should contain the remote new tuple, if any. * - * The 'indexoid' represents the OID of the unique index that triggered the - * constraint violation error. We use this to report the key values for - * conflicting tuple. + * conflicttuples is a list of local tuples that caused the conflict and the + * conflict related information. See ConflictTupleInfo. * - * The caller must ensure that the index with the OID 'indexoid' is locked so - * that we can fetch and display the conflicting key value. + * The caller must ensure that all the indexes passed in ConflictTupleInfo are + * locked so that we can fetch and display the conflicting key values. */ void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - TupleTableSlot *localslot, TupleTableSlot *remoteslot, - Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; + StringInfoData err_detail; + + initStringInfo(&err_detail); - Assert(!OidIsValid(indexoid) || - CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); + /* Form errdetail message by combining conflicting tuples information. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + errdetail_apply_conflict(estate, relinfo, type, searchslot, + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, + &err_detail); pgstat_report_subscription_conflict(MySubscription->oid, type); @@ -123,9 +127,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, get_namespace_name(RelationGetNamespace(localrel)), RelationGetRelationName(localrel), ConflictTypeNames[type]), - errdetail_apply_conflict(estate, relinfo, type, searchslot, - localslot, remoteslot, indexoid, - localxmin, localorigin, localts)); + errdetail_internal("%s", err_detail.data)); } /* @@ -169,6 +171,7 @@ errcode_apply_conflict(ConflictType type) { case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: + case CT_MULTIPLE_UNIQUE_CONFLICTS: return errcode(ERRCODE_UNIQUE_VIOLATION); case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: @@ -191,12 +194,13 @@ errcode_apply_conflict(ConflictType type) * replica identity columns, if any. The remote old tuple is excluded as its * information is covered in the replica identity columns. */ -static int +static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + RepOriginId localorigin, TimestampTz localts, + StringInfo err_msg) { StringInfoData err_detail; char *val_desc; @@ -209,7 +213,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, { case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: - Assert(OidIsValid(indexoid)); + case CT_MULTIPLE_UNIQUE_CONFLICTS: + Assert(OidIsValid(indexoid) && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); if (localts) { @@ -291,7 +297,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (val_desc) appendStringInfo(&err_detail, "\n%s", val_desc); - return errdetail_internal("%s", err_detail.data); + /* + * Insert a blank line to visually separate the new detail line from the + * existing ones. + */ + if (err_msg->len > 0) + appendStringInfoChar(err_msg, '\n'); + + appendStringInfo(err_msg, "%s", err_detail.data); } /* @@ -323,7 +336,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, * Report the conflicting key values in the case of a unique constraint * violation. */ - if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS) + if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS || + type == CT_MULTIPLE_UNIQUE_CONFLICTS) { Assert(OidIsValid(indexoid) && localslot); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 31ab69ea13a..e3b2b144942 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; - TupleTableSlot *localslot; + TupleTableSlot *localslot = NULL; + ConflictTupleInfo conflicttuple = {0}; bool found; MemoryContext oldctx; @@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, newslot = table_slot_create(localrel, &estate->es_tupleTable); slot_store_data(newslot, relmapentry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot, localslot, newslot, - InvalidOid, localxmin, localorigin, localts); + remoteslot, newslot, + list_make1(&conflicttuple)); } /* Process and store remote tuple in the slot */ @@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + remoteslot, newslot, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; + ConflictTupleInfo conflicttuple = {0}; bool found; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); @@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + { + conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, - remoteslot, localslot, NULL, - InvalidOid, localxmin, localorigin, localts); + remoteslot, NULL, + list_make1(&conflicttuple)); + } EvalPlanQualSetSlot(&epqstate, localslot); @@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, - remoteslot, NULL, NULL, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + remoteslot, NULL, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; + ConflictTupleInfo conflicttuple = {0}; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * The tuple to be updated could not be found. Do nothing * except for emitting a log message. */ - ReportApplyConflict(estate, partrelinfo, - LOG, CT_UPDATE_MISSING, - remoteslot_part, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + ReportApplyConflict(estate, partrelinfo, LOG, + CT_UPDATE_MISSING, remoteslot_part, + newslot, list_make1(&conflicttuple)); return; } @@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a * different origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, newslot = table_slot_create(partrel, &estate->es_tupleTable); slot_store_data(newslot, part_entry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot_part, localslot, newslot, - InvalidOid, localxmin, localorigin, - localts); + remoteslot_part, newslot, + list_make1(&conflicttuple)); } /* |