aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c56
1 files changed, 23 insertions, 33 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c7d1734a174..10f97119727 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -400,12 +400,15 @@ static void apply_handle_insert_internal(ApplyExecutionData *edata,
static void apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- LogicalRepTupleData *newtup);
+ LogicalRepTupleData *newtup,
+ Oid localindexoid);
static void apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
- TupleTableSlot *remoteslot);
+ TupleTableSlot *remoteslot,
+ Oid localindexoid);
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
+ Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -2351,24 +2354,6 @@ apply_handle_type(StringInfo s)
}
/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
- Oid idxoid;
-
- idxoid = RelationGetReplicaIndex(rel);
-
- if (!OidIsValid(idxoid))
- idxoid = RelationGetPrimaryKeyIndex(rel);
-
- return idxoid;
-}
-
-/*
* Check that we (the subscription owner) have sufficient privileges on the
* target relation to perform the given operation.
*/
@@ -2627,7 +2612,7 @@ apply_handle_update(StringInfo s)
remoteslot, &newtup, CMD_UPDATE);
else
apply_handle_update_internal(edata, edata->targetRelInfo,
- remoteslot, &newtup);
+ remoteslot, &newtup, rel->localindexoid);
finish_edata(edata);
@@ -2648,7 +2633,8 @@ static void
apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- LogicalRepTupleData *newtup)
+ LogicalRepTupleData *newtup,
+ Oid localindexoid)
{
EState *estate = edata->estate;
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
@@ -2663,6 +2649,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel(estate, localrel,
&relmapentry->remoterel,
+ localindexoid,
remoteslot, &localslot);
ExecClearTuple(remoteslot);
@@ -2767,7 +2754,7 @@ apply_handle_delete(StringInfo s)
remoteslot, NULL, CMD_DELETE);
else
apply_handle_delete_internal(edata, edata->targetRelInfo,
- remoteslot);
+ remoteslot, rel->localindexoid);
finish_edata(edata);
@@ -2787,7 +2774,8 @@ apply_handle_delete(StringInfo s)
static void
apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
- TupleTableSlot *remoteslot)
+ TupleTableSlot *remoteslot,
+ Oid localindexoid)
{
EState *estate = edata->estate;
Relation localrel = relinfo->ri_RelationDesc;
@@ -2799,7 +2787,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
- found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+ found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
remoteslot, &localslot);
/* If found delete it. */
@@ -2833,17 +2821,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/*
* Try to find a tuple received from the publication side (in 'remoteslot') in
* the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
*
* Local tuple, if found, is returned in '*localslot'.
*/
static bool
FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
+ Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot)
{
- Oid idxoid;
bool found;
/*
@@ -2854,12 +2842,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
- idxoid = GetRelationIdentityOrPK(localrel);
- Assert(OidIsValid(idxoid) ||
+ Assert(OidIsValid(localidxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
- if (OidIsValid(idxoid))
- found = RelationFindReplTupleByIndex(localrel, idxoid,
+ if (OidIsValid(localidxoid))
+ found = RelationFindReplTupleByIndex(localrel, localidxoid,
LockTupleExclusive,
remoteslot, *localslot);
else
@@ -2960,7 +2947,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
case CMD_DELETE:
apply_handle_delete_internal(edata, partrelinfo,
- remoteslot_part);
+ remoteslot_part,
+ part_entry->localindexoid);
break;
case CMD_UPDATE:
@@ -2980,6 +2968,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
+ part_entry->localindexoid,
remoteslot_part, &localslot);
if (!found)
{
@@ -3076,7 +3065,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* DELETE old tuple found in the old partition. */
apply_handle_delete_internal(edata, partrelinfo,
- localslot);
+ localslot,
+ part_entry->localindexoid);
/* INSERT new tuple into the new partition. */