aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/catalog/pg_publication.c82
-rw-r--r--src/backend/commands/publicationcmds.c57
-rw-r--r--src/include/catalog/pg_publication.h3
-rw-r--r--src/include/commands/publicationcmds.h5
-rw-r--r--src/test/regress/expected/publication.out13
-rw-r--r--src/test/regress/sql/publication.sql11
6 files changed, 116 insertions, 55 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index d6fddd6efe7..9cd0c82f93c 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -31,6 +31,7 @@
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
+#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/array.h"
@@ -136,6 +137,42 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(result);
}
+/*
+ * Gets the relations based on the publication partition option for a specified
+ * relation.
+ */
+List *
+GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
+ Oid relid)
+{
+ if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
+ pub_partopt != PUBLICATION_PART_ROOT)
+ {
+ List *all_parts = find_all_inheritors(relid, NoLock,
+ NULL);
+
+ if (pub_partopt == PUBLICATION_PART_ALL)
+ result = list_concat(result, all_parts);
+ else if (pub_partopt == PUBLICATION_PART_LEAF)
+ {
+ ListCell *lc;
+
+ foreach(lc, all_parts)
+ {
+ Oid partOid = lfirst_oid(lc);
+
+ if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
+ result = lappend_oid(result, partOid);
+ }
+ }
+ else
+ Assert(false);
+ }
+ else
+ result = lappend_oid(result, relid);
+
+ return result;
+}
/*
* Insert new publication / relation mapping.
@@ -153,6 +190,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
referenced;
+ List *relids = NIL;
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -208,8 +246,18 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
/* Close the table. */
table_close(rel, RowExclusiveLock);
- /* Invalidate relcache so that publication info is rebuilt. */
- CacheInvalidateRelcache(targetrel->relation);
+ /*
+ * Invalidate relcache so that publication info is rebuilt.
+ *
+ * For the partitioned tables, we must invalidate all partitions contained
+ * in the respective partition hierarchies, not just the one explicitly
+ * mentioned in the publication. This is required because we implicitly
+ * publish the child tables when the parent table is published.
+ */
+ relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
+ relid);
+
+ InvalidatePublicationRels(relids);
return myself;
}
@@ -241,7 +289,7 @@ GetRelationPublications(Oid relid)
/*
* Gets list of relation oids for a publication.
*
- * This should only be used for normal publications, the FOR ALL TABLES
+ * This should only be used FOR TABLE publications, the FOR ALL TABLES
* should use GetAllTablesPublicationRelations().
*/
List *
@@ -270,32 +318,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Form_pg_publication_rel pubrel;
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
-
- if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
- pub_partopt != PUBLICATION_PART_ROOT)
- {
- List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
- NULL);
-
- if (pub_partopt == PUBLICATION_PART_ALL)
- result = list_concat(result, all_parts);
- else if (pub_partopt == PUBLICATION_PART_LEAF)
- {
- ListCell *lc;
-
- foreach(lc, all_parts)
- {
- Oid partOid = lfirst_oid(lc);
-
- if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
- result = lappend_oid(result, partOid);
- }
- }
- else
- Assert(false);
- }
- else
- result = lappend_oid(result, pubrel->prrelid);
+ result = GetPubPartitionOptionRelations(result, pub_partopt,
+ pubrel->prrelid);
}
systable_endscan(scan);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 30929da1f57..9c7f91611dc 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -45,9 +45,6 @@
#include "utils/syscache.h"
#include "utils/varlena.h"
-/* Same as MAXNUMMESSAGES in sinvaladt.c */
-#define MAX_RELCACHE_INVAL_MSGS 4096
-
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
@@ -329,23 +326,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
List *relids = GetPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL);
- /*
- * We don't want to send too many individual messages, at some point
- * it's cheaper to just reset whole relcache.
- */
- if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
- {
- ListCell *lc;
-
- foreach(lc, relids)
- {
- Oid relid = lfirst_oid(lc);
-
- CacheInvalidateRelcacheByRelid(relid);
- }
- }
- else
- CacheInvalidateRelcacheAll();
+ InvalidatePublicationRels(relids);
}
ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
@@ -356,6 +337,27 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
}
/*
+ * Invalidate the relations.
+ */
+void
+InvalidatePublicationRels(List *relids)
+{
+ /*
+ * We don't want to send too many individual messages, at some point it's
+ * cheaper to just reset whole relcache.
+ */
+ if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+ {
+ ListCell *lc;
+
+ foreach(lc, relids)
+ CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
+ }
+ else
+ CacheInvalidateRelcacheAll();
+}
+
+/*
* Add or remove table to/from publication.
*/
static void
@@ -488,6 +490,7 @@ RemovePublicationRelById(Oid proid)
Relation rel;
HeapTuple tup;
Form_pg_publication_rel pubrel;
+ List *relids = NIL;
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -499,8 +502,18 @@ RemovePublicationRelById(Oid proid)
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
- /* Invalidate relcache so that publication info is rebuilt. */
- CacheInvalidateRelcacheByRelid(pubrel->prrelid);
+ /*
+ * Invalidate relcache so that publication info is rebuilt.
+ *
+ * For the partitioned tables, we must invalidate all partitions contained
+ * in the respective partition hierarchies, not just the one explicitly
+ * mentioned in the publication. This is required because we implicitly
+ * publish the child tables when the parent table is published.
+ */
+ relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
+ pubrel->prrelid);
+
+ InvalidatePublicationRels(relids);
CatalogTupleDelete(rel, &tup->t_self);
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 561266aa3ee..82f2536c657 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -111,6 +111,9 @@ typedef enum PublicationPartOpt
extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetPubPartitionOptionRelations(List *result,
+ PublicationPartOpt pub_partopt,
+ Oid relid);
extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index c98d519b29d..77a299bb187 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -17,6 +17,10 @@
#include "catalog/objectaddress.h"
#include "parser/parse_node.h"
+#include "utils/inval.h"
+
+/* Same as MAXNUMMESSAGES in sinvaladt.c */
+#define MAX_RELCACHE_INVAL_MSGS 4096
extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
@@ -25,5 +29,6 @@ extern void RemovePublicationRelById(Oid proid);
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
+extern void InvalidatePublicationRels(List *relids);
#endif /* PUBLICATIONCMDS_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index cad1b374be2..82bce9be097 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -126,10 +126,12 @@ CREATE PUBLICATION testpub_forparted;
CREATE PUBLICATION testpub_forparted1;
RESET client_min_messages;
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+CREATE TABLE testpub_parted2 (LIKE testpub_parted);
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
-- works despite missing REPLICA IDENTITY, because updates are not replicated
UPDATE testpub_parted1 SET a = 1;
-ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted
@@ -156,7 +158,14 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
Tables:
"public.testpub_parted"
-DROP TABLE testpub_parted1;
+-- still fail, because parent's publication replicates updates
+UPDATE testpub_parted2 SET a = 2;
+ERROR: cannot update table "testpub_parted2" because it does not have a replica identity and publishes updates
+HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+-- works again, because update is no longer replicated
+UPDATE testpub_parted2 SET a = 2;
+DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 04b34ee2998..e5745d575b0 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -76,10 +76,12 @@ CREATE PUBLICATION testpub_forparted;
CREATE PUBLICATION testpub_forparted1;
RESET client_min_messages;
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+CREATE TABLE testpub_parted2 (LIKE testpub_parted);
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
-- works despite missing REPLICA IDENTITY, because updates are not replicated
UPDATE testpub_parted1 SET a = 1;
-ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted
@@ -90,7 +92,12 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
UPDATE testpub_parted1 SET a = 1;
ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
\dRp+ testpub_forparted
-DROP TABLE testpub_parted1;
+-- still fail, because parent's publication replicates updates
+UPDATE testpub_parted2 SET a = 2;
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+-- works again, because update is no longer replicated
+UPDATE testpub_parted2 SET a = 2;
+DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication