aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/rmgrdesc/standbydesc.c2
-rw-r--r--src/backend/commands/alter.c16
-rw-r--r--src/backend/commands/publicationcmds.c39
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c8
-rw-r--r--src/backend/utils/cache/inval.c125
-rw-r--r--src/include/commands/publicationcmds.h1
-rw-r--r--src/include/pg_config_manual.h8
-rw-r--r--src/include/storage/sinval.h24
-rw-r--r--src/include/utils/inval.h10
-rw-r--r--src/test/subscription/t/007_ddl.pl85
10 files changed, 302 insertions, 16 deletions
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54ba..81eff5f31c4 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
else if (msg->id == SHAREDINVALSNAPSHOT_ID)
appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+ else if (msg->id == SHAREDINVALRELSYNC_ID)
+ appendStringInfo(buf, " relsync %u", msg->rs.relid);
else
appendStringInfo(buf, " unrecognized id %d", msg->id);
}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b84..c801c869c1c 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
InvokeObjectPostAlterHook(classId, objectId, 0);
+ /* Do post catalog-update tasks */
+ if (classId == PublicationRelationId)
+ {
+ Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+ /*
+ * Invalidate relsynccache entries.
+ *
+ * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+ * publication does not impact the publication status of tables. So,
+ * we don't need to invalidate relcache to rebuild the rd_pubdesc.
+ * Instead, we invalidate only the relsyncache.
+ */
+ InvalidatePubRelSyncCache(pub->oid, pub->puballtables);
+ }
+
/* Release memory */
pfree(values);
pfree(nulls);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16f..3091d36ce98 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
return *invalid_column_list || *invalid_gen_col;
}
+/*
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
+ */
+void
+InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
+{
+ if (puballtables)
+ {
+ CacheInvalidateRelSyncAll();
+ }
+ else
+ {
+ List *relids = NIL;
+ List *schemarelids = NIL;
+
+ /*
+ * For partitioned tables, we must invalidate all partitions and
+ * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
+ * a target. However, WAL records for TRUNCATE specify both a root and
+ * its leaves.
+ */
+ relids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ALL);
+ schemarelids = GetAllSchemaPublicationRelations(pubid,
+ PUBLICATION_PART_ALL);
+
+ relids = list_concat_unique_oid(relids, schemarelids);
+
+ /* Invalidate the relsyncache */
+ foreach_oid(relid, relids)
+ CacheInvalidateRelSync(relid);
+ }
+
+ return;
+}
+
/* check_functions_in_node callback */
static bool
contain_mutable_or_user_functions_checker(Oid func_id, void *context)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1df..ed806c54300 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
CacheRegisterSyscacheCallback(PUBLICATIONOID,
publication_invalidation_cb,
(Datum) 0);
+ CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+ (Datum) 0);
publication_callback_registered = true;
}
@@ -1789,12 +1791,6 @@ static void
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
{
publications_valid = false;
-
- /*
- * Also invalidate per-relation cache so that next time the filtering info
- * is checked it will be updated with the new publication settings.
- */
- rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
/*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9b..4eb67720737 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int debug_discard_caches = 0;
#define MAX_SYSCACHE_CALLBACKS 64
#define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
static struct SYSCACHECALLBACK
{
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
static int relcache_callback_count = 0;
+static struct RELSYNCCALLBACK
+{
+ RelSyncCallbackFunction function;
+ Datum arg;
+} relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int relsync_callback_count = 0;
+
+
/* ----------------------------------------------------------------
* Invalidation subgroup support functions
* ----------------------------------------------------------------
@@ -485,6 +495,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
}
/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+ Oid dbId, Oid relId)
+{
+ SharedInvalidationMessage msg;
+
+ /* Don't add a duplicate item. */
+ ProcessMessageSubGroup(group, RelCacheMsgs,
+ if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+ (msg->rc.relId == relId ||
+ msg->rc.relId == InvalidOid))
+ return);
+
+ /* OK, add the item */
+ msg.rc.id = SHAREDINVALRELSYNC_ID;
+ msg.rc.dbId = dbId;
+ msg.rc.relId = relId;
+ /* check AddCatcacheInvalidationMessage() for an explanation */
+ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+ AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
+/*
* Add a snapshot inval entry
*
* We put these into the relcache subgroup for simplicity.
@@ -612,6 +652,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
}
/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+ AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+/*
* RegisterSnapshotInvalidation
*
* Register an invalidation event for MVCC scans against a given catalog.
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
ccitem->function(ccitem->arg, InvalidOid);
}
+
+ for (i = 0; i < relsync_callback_count; i++)
+ {
+ struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+ ccitem->function(ccitem->arg, InvalidOid);
+ }
}
/*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
else if (msg->sn.dbId == MyDatabaseId)
InvalidateCatalogSnapshot();
}
+ else if (msg->id == SHAREDINVALRELSYNC_ID)
+ {
+ /* We only care about our own database */
+ if (msg->rs.dbId == MyDatabaseId)
+ CallRelSyncCallbacks(msg->rs.relid);
+ }
else
elog(FATAL, "unrecognized SI message ID: %d", msg->id);
}
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
ReleaseSysCache(tup);
}
+/*
+ * CacheInvalidateRelSync
+ * Register invalidation of the cache in logical decoding output plugin
+ * for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+ RegisterRelsyncInvalidation(PrepareInvalidationState(),
+ MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ * Register invalidation of the whole cache in logical decoding output
+ * plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+ CacheInvalidateRelSync(InvalidOid);
+}
/*
* CacheInvalidateSmgr
@@ -1764,6 +1854,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
}
/*
+ * CacheRegisterRelSyncCallback
+ * Register the specified function to be called for all future
+ * relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+ Datum arg)
+{
+ if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+ elog(FATAL, "out of relsync_callback_list slots");
+
+ relsync_callback_list[relsync_callback_count].function = func;
+ relsync_callback_list[relsync_callback_count].arg = arg;
+
+ ++relsync_callback_count;
+}
+
+/*
* CallSyscacheCallbacks
*
* This is exported so that CatalogCacheFlushCatalog can call it, saving
@@ -1789,6 +1900,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
}
/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+ for (int i = 0; i < relsync_callback_count; i++)
+ {
+ struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+ ccitem->function(ccitem->arg, relid);
+ }
+}
+
+/*
* LogLogicalInvalidations
*
* Emit WAL for invalidations caused by the current command.
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0f..e41df6db038 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
char pubgencols_type,
bool *invalid_column_list,
bool *invalid_gen_col);
+extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables);
#endif /* PUBLICATIONCMDS_H */
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78c..125d3eb5fff 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
/*
* For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations. See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsynccache entries whenever it's possible to deliver
+ * invalidations. See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
*
* USE_ASSERT_CHECKING builds default to enabling this. It's possible to use
* DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fac..5dc5aafe5c9 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
* * invalidate an smgr cache entry for a specific physical relation
* * invalidate the mapped-relation mapping for a given database
* * invalidate any saved snapshot that might be used to scan a given relation
+ * * invalidate a RelationSyncCache entry for a specific relation
* More types could be added if needed. The message type is identified by
* the first "int8" field of the message struct. Zero or positive means a
* specific-catcache inval message (and also serves as the catcache ID field).
@@ -46,12 +47,12 @@
* catcache inval messages must be generated for each of its caches, since
* the hash keys will generally be different.
*
- * Catcache, relcache, and snapshot invalidations are transactional, and so
- * are sent to other backends upon commit. Internally to the generating
- * backend, they are also processed at CommandCounterIncrement so that later
- * commands in the same transaction see the new state. The generating backend
- * also has to process them at abort, to flush out any cache state it's loaded
- * from no-longer-valid entries.
+ * Catcache, relcache, relsynccache, and snapshot invalidations are
+ * transactional, and so are sent to other backends upon commit. Internally
+ * to the generating backend, they are also processed at
+ * CommandCounterIncrement so that later commands in the same transaction see
+ * the new state. The generating backend also has to process them at abort,
+ * to flush out any cache state it's loaded from no-longer-valid entries.
*
* smgr and relation mapping invalidations are non-transactional: they are
* sent immediately when the underlying file change is made.
@@ -110,6 +111,16 @@ typedef struct
Oid relId; /* relation ID */
} SharedInvalSnapshotMsg;
+#define SHAREDINVALRELSYNC_ID (-6)
+
+typedef struct
+{
+ int8 id; /* type field --- must be first */
+ Oid dbId; /* database ID */
+ Oid relid; /* relation ID, or 0 if whole
+ * RelationSyncCache */
+} SharedInvalRelSyncMsg;
+
typedef union
{
int8 id; /* type field --- must be first */
@@ -119,6 +130,7 @@ typedef union
SharedInvalSmgrMsg sm;
SharedInvalRelmapMsg rm;
SharedInvalSnapshotMsg sn;
+ SharedInvalRelSyncMsg rs;
} SharedInvalidationMessage;
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ffc..9b871caef62 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
extern void CacheInvalidateRelcacheByRelid(Oid relid);
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
Datum arg);
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+ Datum arg);
+
extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
+extern void CallRelSyncCallbacks(Oid relid);
+
extern void InvalidateSystemCaches(void);
extern void InvalidateSystemCachesExtended(bool debug_discard);
diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 4d3b917ac04..7d4c2d51c3a 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -69,6 +69,91 @@ ok( $stderr =~
"Alter subscription set publication throws warning for non-existent publication"
);
+# Cleanup
+$node_publisher->safe_psql('postgres', qq[
+ DROP PUBLICATION mypub;
+ SELECT pg_drop_replication_slot('mysub');
+]);
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1");
+
+#
+# Test ALTER PUBLICATION RENAME command during the replication
+#
+
+# Test function for swaping name of publications
+sub test_swap
+{
+ my ($table_name, $pubname, $appname) = @_;
+
+ # Confirms tuples can be replicated
+ $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (1);");
+ $node_publisher->wait_for_catchup($appname);
+ my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name");
+ is($result, qq(1), 'check replication worked well before renaming a publication');
+
+ # Swap the name of publications; $pubname <-> pub_empty
+ $node_publisher->safe_psql('postgres', qq[
+ ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp;
+ ALTER PUBLICATION pub_empty RENAME TO $pubname;
+ ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty;
+ ]);
+
+ # Insert the data again
+ $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (2);");
+ $node_publisher->wait_for_catchup($appname);
+
+ # Confirms the second tuple won't be replicated because $pubname does not
+ # contains relations anymore.
+ $result =
+ $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name ORDER BY a");
+ is($result, qq(1),
+ 'check the tuple inserted after the RENAME was not replicated');
+
+ # Restore the name of publications because it can be called several times
+ $node_publisher->safe_psql('postgres', qq[
+ ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp;
+ ALTER PUBLICATION pub_empty RENAME TO $pubname;
+ ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty;
+ ]);
+}
+
+# Create another table
+$ddl = "CREATE TABLE test2 (a int, b text);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Create publications and a subscription
+$node_publisher->safe_psql('postgres', qq[
+ CREATE PUBLICATION pub_empty;
+ CREATE PUBLICATION pub_for_tab FOR TABLE test1;
+ CREATE PUBLICATION pub_for_all_tables FOR ALL TABLES;
+]);
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION pub_for_tab WITH (copy_data = off)"
+);
+
+# Confirms RENAME command works well for a publication
+test_swap('test1', 'pub_for_tab', 'tap_sub');
+
+# Switches a publication which includes all tables
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET PUBLICATION pub_for_all_tables WITH (refresh = true, copy_data = false);"
+);
+
+# Confirms RENAME command works well for ALL TABLES publication
+test_swap('test2', 'pub_for_all_tables', 'tap_sub');
+
+# Cleanup
+$node_publisher->safe_psql('postgres', qq[
+ DROP PUBLICATION pub_empty, pub_for_tab, pub_for_all_tables;
+ DROP TABLE test1, test2;
+]);
+$node_subscriber->safe_psql('postgres', qq[
+ DROP SUBSCRIPTION tap_sub;
+ DROP TABLE test1, test2;
+]);
+
$node_subscriber->stop;
$node_publisher->stop;