aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/conflict.c64
-rw-r--r--src/backend/replication/logical/worker.c68
2 files changed, 71 insertions, 61 deletions
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..f1e92f2fc1a 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
- [CT_DELETE_MISSING] = "delete_missing"
+ [CT_DELETE_MISSING] = "delete_missing",
+ [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
static int errcode_apply_conflict(ConflictType type);
-static int errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int errdetail_apply_conflict(EState *estate,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
RepOriginId localorigin,
- TimestampTz localts);
+ TimestampTz localts, StringInfo err_msg);
static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
ConflictType type,
TupleTableSlot *searchslot,
@@ -90,30 +91,33 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
* 'searchslot' should contain the tuple used to search the local tuple to be
* updated or deleted.
*
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
- *
* 'remoteslot' should contain the remote new tuple, if any.
*
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * conflicttuples is a list of local tuples that caused the conflict and the
+ * conflict related information. See ConflictTupleInfo.
*
- * The caller must ensure that the index with the OID 'indexoid' is locked so
- * that we can fetch and display the conflicting key value.
+ * The caller must ensure that all the indexes passed in ConflictTupleInfo are
+ * locked so that we can fetch and display the conflicting key values.
*/
void
ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
- TupleTableSlot *localslot, TupleTableSlot *remoteslot,
- Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ TupleTableSlot *remoteslot, List *conflicttuples)
{
Relation localrel = relinfo->ri_RelationDesc;
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
- Assert(!OidIsValid(indexoid) ||
- CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ /* Form errdetail message by combining conflicting tuples information. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ conflicttuple->slot, remoteslot,
+ conflicttuple->indexoid,
+ conflicttuple->xmin,
+ conflicttuple->origin,
+ conflicttuple->ts,
+ &err_detail);
pgstat_report_subscription_conflict(MySubscription->oid, type);
@@ -123,9 +127,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
get_namespace_name(RelationGetNamespace(localrel)),
RelationGetRelationName(localrel),
ConflictTypeNames[type]),
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- localslot, remoteslot, indexoid,
- localxmin, localorigin, localts));
+ errdetail_internal("%s", err_detail.data));
}
/*
@@ -169,6 +171,7 @@ errcode_apply_conflict(ConflictType type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
return errcode(ERRCODE_UNIQUE_VIOLATION);
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
@@ -191,12 +194,13 @@ errcode_apply_conflict(ConflictType type)
* replica identity columns, if any. The remote old tuple is excluded as its
* information is covered in the replica identity columns.
*/
-static int
+static void
errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts)
+ RepOriginId localorigin, TimestampTz localts,
+ StringInfo err_msg)
{
StringInfoData err_detail;
char *val_desc;
@@ -209,7 +213,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
- Assert(OidIsValid(indexoid));
+ case CT_MULTIPLE_UNIQUE_CONFLICTS:
+ Assert(OidIsValid(indexoid) &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
if (localts)
{
@@ -291,7 +297,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
if (val_desc)
appendStringInfo(&err_detail, "\n%s", val_desc);
- return errdetail_internal("%s", err_detail.data);
+ /*
+ * Insert a blank line to visually separate the new detail line from the
+ * existing ones.
+ */
+ if (err_msg->len > 0)
+ appendStringInfoChar(err_msg, '\n');
+
+ appendStringInfo(err_msg, "%s", err_detail.data);
}
/*
@@ -323,7 +336,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
* Report the conflicting key values in the case of a unique constraint
* violation.
*/
- if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+ if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+ type == CT_MULTIPLE_UNIQUE_CONFLICTS)
{
Assert(OidIsValid(indexoid) && localslot);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..e3b2b144942 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
- TupleTableSlot *localslot;
+ TupleTableSlot *localslot = NULL;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
MemoryContext oldctx;
@@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
newslot = table_slot_create(localrel, &estate->es_tupleTable);
slot_store_data(newslot, relmapentry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot, localslot, newslot,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Process and store remote tuple in the slot */
@@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, newslot, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
+ ConflictTupleInfo conflicttuple = {0};
bool found;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
@@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/* If found delete it. */
if (found)
{
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
-
/*
* Report the conflict if the tuple was modified by a different
* origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin, &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ {
+ conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
- remoteslot, localslot, NULL,
- InvalidOid, localxmin, localorigin, localts);
+ remoteslot, NULL,
+ list_make1(&conflicttuple));
+ }
EvalPlanQualSetSlot(&epqstate, localslot);
@@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
* emitting a log message.
*/
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
- remoteslot, NULL, NULL,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ remoteslot, NULL, list_make1(&conflicttuple));
}
/* Cleanup. */
@@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
Relation partrel_new;
bool found;
EPQState epqstate;
- RepOriginId localorigin;
- TransactionId localxmin;
- TimestampTz localts;
+ ConflictTupleInfo conflicttuple = {0};
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(edata, partrel,
@@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* The tuple to be updated could not be found. Do nothing
* except for emitting a log message.
*/
- ReportApplyConflict(estate, partrelinfo,
- LOG, CT_UPDATE_MISSING,
- remoteslot_part, NULL, newslot,
- InvalidOid, InvalidTransactionId,
- InvalidRepOriginId, 0);
+ ReportApplyConflict(estate, partrelinfo, LOG,
+ CT_UPDATE_MISSING, remoteslot_part,
+ newslot, list_make1(&conflicttuple));
return;
}
@@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* Report the conflict if the tuple was modified by a
* different origin.
*/
- if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
- localorigin != replorigin_session_origin)
+ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
{
TupleTableSlot *newslot;
@@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
+ conflicttuple.slot = localslot;
+
ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
- remoteslot_part, localslot, newslot,
- InvalidOid, localxmin, localorigin,
- localts);
+ remoteslot_part, newslot,
+ list_make1(&conflicttuple));
}
/*