aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/proto.c1
-rw-r--r--src/backend/replication/logical/worker.c18
-rw-r--r--src/include/replication/logicalproto.h7
-rw-r--r--src/test/subscription/t/003_constraints.pl12
4 files changed, 29 insertions, 9 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 2b1356ee249..04b4f494bb9 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -548,6 +548,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
/* Allocate space for per-column values; zero out unused StringInfoDatas */
tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
tuple->colstatus = (char *) palloc(natts * sizeof(char));
+ tuple->ncols = natts;
/* Read the data */
for (i = 0; i < natts; i++)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 407eee3c0bc..2fcf2e61bc3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -354,6 +354,8 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
{
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
+ Assert(remoteattnum < tupleData->ncols);
+
errarg.local_attnum = i;
errarg.remote_attnum = remoteattnum;
@@ -477,6 +479,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
if (remoteattnum < 0)
continue;
+ Assert(remoteattnum < tupleData->ncols);
+
if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
{
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
@@ -831,9 +835,17 @@ apply_handle_update(StringInfo s)
target_rte = list_nth(estate->es_range_table, 0);
for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
{
- if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED)
- target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
- i + 1 - FirstLowInvalidHeapAttributeNumber);
+ Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
+ int remoteattnum = rel->attrmap->attnums[i];
+
+ if (!att->attisdropped && remoteattnum >= 0)
+ {
+ Assert(remoteattnum < newtup.ncols);
+ if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
+ target_rte->updatedCols =
+ bms_add_member(target_rte->updatedCols,
+ i + 1 - FirstLowInvalidHeapAttributeNumber);
+ }
}
fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 287288ab415..60a76bc85cf 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -27,13 +27,18 @@
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
-/* Tuple coming via logical replication. */
+/*
+ * This struct stores a tuple received via logical replication.
+ * Keep in mind that the columns correspond to the *remote* table.
+ */
typedef struct LogicalRepTupleData
{
/* Array of StringInfos, one per column; some may be unused */
StringInfoData *colvalues;
/* Array of markers for null/unchanged/text/binary, one per column */
char *colstatus;
+ /* Length of above arrays */
+ int ncols;
} LogicalRepTupleData;
/* Possible values for LogicalRepTupleData.colstatus[colnum] */
diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl
index 3a590f871a5..9f140b552b4 100644
--- a/src/test/subscription/t/003_constraints.pl
+++ b/src/test/subscription/t/003_constraints.pl
@@ -19,14 +19,14 @@ $node_subscriber->start;
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
$node_publisher->safe_psql('postgres',
- "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));"
+ "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));"
);
-# Setup structure on subscriber
+# Setup structure on subscriber; column order intentionally different
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
$node_subscriber->safe_psql('postgres',
- "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));"
+ "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);"
);
# Setup logical replication
@@ -42,8 +42,10 @@ $node_publisher->wait_for_catchup('tap_sub');
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk (bid) VALUES (1);");
+# "junk" value is meant to be large enough to force out-of-line storage
$node_publisher->safe_psql('postgres',
- "INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);");
+ "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));"
+);
$node_publisher->wait_for_catchup('tap_sub');
@@ -128,7 +130,7 @@ $node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
is($result, qq(2|1|2),
- 'check column trigger applied on even for other column');
+ 'check column trigger applied even on update for other column');
$node_subscriber->stop('fast');
$node_publisher->stop('fast');