diff options
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 115 |
1 files changed, 68 insertions, 47 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index af8d51aee99..6df705f90ff 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -108,11 +108,13 @@ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ + bool replicate_valid; /* overall validity flag for entry */ + bool schema_sent; List *streamed_txns; /* streamed toplevel transactions with this * schema */ - bool replicate_valid; + /* are we publishing this rel? */ PublicationActions pubactions; /* @@ -903,7 +905,9 @@ LoadPublications(List *pubnames) } /* - * Publication cache invalidation callback. + * Publication syscache invalidation callback. + * + * Called for invalidations on pg_publication. */ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) HASH_ENTER, &found); Assert(entry != NULL); - /* Not found means schema wasn't sent */ + /* initialize entry, if it's new */ if (!found) { - /* immediately make a new entry valid enough to satisfy callbacks */ + entry->replicate_valid = false; entry->schema_sent = false; entry->streamed_txns = NIL; - entry->replicate_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; @@ -1166,14 +1169,41 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { oldctx = MemoryContextSwitchTo(CacheMemoryContext); if (data->publications) + { list_free_deep(data->publications); - + data->publications = NIL; + } data->publications = LoadPublications(data->publication_names); MemoryContextSwitchTo(oldctx); publications_valid = true; } /* + * Reset schema_sent status as the relation definition may have + * changed. Also reset pubactions to empty in case rel was dropped + * from a publication. Also free any objects that depended on the + * earlier definition. + */ + entry->schema_sent = false; + list_free(entry->streamed_txns); + entry->streamed_txns = NIL; + entry->pubactions.pubinsert = false; + entry->pubactions.pubupdate = false; + entry->pubactions.pubdelete = false; + entry->pubactions.pubtruncate = false; + if (entry->map) + { + /* + * Must free the TupleDescs contained in the map explicitly, + * because free_conversion_map() doesn't. + */ + FreeTupleDesc(entry->map->indesc); + FreeTupleDesc(entry->map->outdesc); + free_conversion_map(entry->map); + } + entry->map = NULL; + + /* * Build publication cache. We can't use one provided by relcache as * relcache considers all publications given relation is in, but here * we only need to consider ones that the subscriber requested. @@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) foreach(lc2, ancestors) { Oid ancestor = lfirst_oid(lc2); + List *apubids = GetRelationPublications(ancestor); + List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); - if (list_member_oid(GetRelationPublications(ancestor), - pub->oid) || - list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), - pub->oid)) + if (list_member_oid(apubids, pub->oid) || + list_member_oid(aschemaPubids, pub->oid)) { ancestor_published = true; if (pub->pubviaroot) publish_as_relid = ancestor; } + list_free(apubids); + list_free(aschemaPubids); } } @@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } list_free(pubids); + list_free(schemaPubids); entry->publish_as_relid = publish_as_relid; entry->replicate_valid = true; @@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) /* * Nobody keeps pointers to entries in this hash table around outside * logical decoding callback calls - but invalidation events can come in - * *during* a callback if we access the relcache in the callback. Because - * of that we must mark the cache entry as invalid but not remove it from - * the hash while it could still be referenced, then prune it at a later - * safe point. - * - * Getting invalidations for relations that aren't in the table is - * entirely normal, since there's no way to unregister for an invalidation - * event. So we don't care if it's found or not. + * *during* a callback if we do any syscache access in the callback. + * Because of that we must mark the cache entry as invalid but not damage + * any of its substructure here. The next get_rel_sync_entry() call will + * rebuild it all. */ - entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid, - HASH_FIND, NULL); - - /* - * Reset schema sent status as the relation definition may have changed. - * Also free any objects that depended on the earlier definition. - */ - if (entry != NULL) + if (OidIsValid(relid)) { - entry->schema_sent = false; - list_free(entry->streamed_txns); - entry->streamed_txns = NIL; - if (entry->map) + /* + * Getting invalidations for relations that aren't in the table is + * entirely normal. So we don't care if it's found or not. + */ + entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid, + HASH_FIND, NULL); + if (entry != NULL) + entry->replicate_valid = false; + } + else + { + /* Whole cache must be flushed. */ + HASH_SEQ_STATUS status; + + hash_seq_init(&status, RelationSyncCache); + while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) { - /* - * Must free the TupleDescs contained in the map explicitly, - * because free_conversion_map() doesn't. - */ - FreeTupleDesc(entry->map->indesc); - FreeTupleDesc(entry->map->outdesc); - free_conversion_map(entry->map); + entry->replicate_valid = false; } - entry->map = NULL; } } /* * Publication relation/schema map syscache invalidation callback + * + * Called for invalidations on pg_publication, pg_publication_rel, and + * pg_publication_namespace. */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) { entry->replicate_valid = false; - - /* - * There might be some relations dropped from the publication so we - * don't need to publish the changes for them. - */ - entry->pubactions.pubinsert = false; - entry->pubactions.pubupdate = false; - entry->pubactions.pubdelete = false; - entry->pubactions.pubtruncate = false; } } |