aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c25
-rw-r--r--src/test/subscription/t/100_bugs.pl65
2 files changed, 84 insertions, 6 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 77372425163..876adab38e5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx)
Assert(RelationSyncCache != NULL);
+ /* We must update the cache entry for a relation after a relcache flush */
CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+ /*
+ * Flush all cache entries after a pg_namespace change, in case it was a
+ * schema rename affecting a relation being replicated.
+ */
+ CacheRegisterSyscacheCallback(NAMESPACEOID,
+ rel_sync_cache_publication_cb,
+ (Datum) 0);
+
+ /*
+ * Flush all cache entries after any publication changes. (We need no
+ * callback entry for pg_publication, because publication_invalidation_cb
+ * will take care of it.)
+ */
CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
rel_sync_cache_publication_cb,
(Datum) 0);
@@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
/*
* Publication relation/schema map syscache invalidation callback
*
- * Called for invalidations on pg_publication, pg_publication_rel, and
- * pg_publication_namespace.
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
*/
static void
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
/*
* We can get here if the plugin was used in SQL interface as the
* RelSchemaSyncCache is destroyed when the decoding finishes, but there
- * is no way to unregister the relcache invalidation callback.
+ * is no way to unregister the invalidation callbacks.
*/
if (RelationSyncCache == NULL)
return;
/*
- * There is no way to find which entry in our cache the hash belongs to so
- * mark the whole cache as invalid.
+ * We have no easy way to identify which cache entries this invalidation
+ * event might have affected, so just mark them all invalid.
*/
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 143caac792e..036576acab5 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1');
pass('index predicates do not cause crash');
# We'll re-use these nodes below, so drop their replication state.
-# We don't bother to drop the tables though.
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+# Drop the tables too.
+$node_publisher->safe_psql('postgres', "DROP TABLE tab1");
$node_publisher->stop('fast');
$node_subscriber->stop('fast');
@@ -307,6 +308,68 @@ is( $node_subscriber->safe_psql(
qq(-1|1),
"update works with REPLICA IDENTITY");
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+
+# Test schema invalidation by renaming the schema
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will cover t1 under both schema names
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Check what happens to data inserted before and after schema rename
+$node_publisher->safe_psql(
+ 'postgres',
+ "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1,
+# but not the row inserted into the old sch1.t1 post-rename.
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is( $result, qq(1
+2), 'check data in subscriber sch1.t1 after schema rename');
+
+# Subscriber's sch2.t1 won't have gotten anything yet ...
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is($result, '', 'no data yet in subscriber sch2.t1 after schema rename');
+
+# ... but it should show up after REFRESH.
+$node_subscriber->safe_psql('postgres',
+ 'ALTER SUBSCRIPTION tap_sub_sch REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is( $result, qq(1
+3), 'check data in subscriber sch2.t1 after schema rename');
+
$node_publisher->stop('fast');
$node_subscriber->stop('fast');