diff options
-rw-r--r-- | src/backend/replication/logical/relation.c | 71 | ||||
-rw-r--r-- | src/include/replication/logicalrelation.h | 11 |
2 files changed, 53 insertions, 29 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 3d2d56295b0..9ee70a2563e 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -77,7 +77,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) { if (entry->localreloid == reloid) { - entry->localreloid = InvalidOid; + entry->localrelvalid = false; hash_seq_term(&status); break; } @@ -91,7 +91,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepRelMap); while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localreloid = InvalidOid; + entry->localrelvalid = false; } } @@ -230,15 +230,13 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) /* * Open the local relation associated with the remote one. * - * Optionally rebuilds the Relcache mapping if it was invalidated - * by local DDL. + * Rebuilds the Relcache mapping if it was invalidated by local DDL. */ LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) { LogicalRepRelMapEntry *entry; bool found; - Oid relid = InvalidOid; LogicalRepRelation *remoterel; if (LogicalRepRelMap == NULL) @@ -254,14 +252,45 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) remoterel = &entry->remoterel; + /* Ensure we don't leak a relcache refcount. */ + if (entry->localrel) + elog(ERROR, "remote relation ID %u is already open", remoteid); + /* * When opening and locking a relation, pending invalidation messages are - * processed which can invalidate the relation. We need to update the - * local cache both when we are first time accessing the relation and when - * the relation is invalidated (aka entry->localreloid is set InvalidOid). + * processed which can invalidate the relation. Hence, if the entry is + * currently considered valid, try to open the local relation by OID and + * see if invalidation ensues. + */ + if (entry->localrelvalid) + { + entry->localrel = try_table_open(entry->localreloid, lockmode); + if (!entry->localrel) + { + /* Table was renamed or dropped. */ + entry->localrelvalid = false; + } + else if (!entry->localrelvalid) + { + /* Note we release the no-longer-useful lock here. */ + table_close(entry->localrel, lockmode); + entry->localrel = NULL; + } + } + + /* + * If the entry has been marked invalid since we last had lock on it, + * re-open the local relation by name and rebuild all derived data. */ - if (!OidIsValid(entry->localreloid)) + if (!entry->localrelvalid) { + Oid relid; + int found; + Bitmapset *idkey; + TupleDesc desc; + MemoryContext oldctx; + int i; + /* Try to find and lock the relation by name. */ relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, remoterel->relname, -1), @@ -272,21 +301,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) errmsg("logical replication target relation \"%s.%s\" does not exist", remoterel->nspname, remoterel->relname))); entry->localrel = table_open(relid, NoLock); - - } - else - { - relid = entry->localreloid; - entry->localrel = table_open(entry->localreloid, lockmode); - } - - if (!OidIsValid(entry->localreloid)) - { - int found; - Bitmapset *idkey; - TupleDesc desc; - MemoryContext oldctx; - int i; + entry->localreloid = relid; /* Check for supported relkind. */ CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, @@ -380,7 +395,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) } } - entry->localreloid = relid; + entry->localrelvalid = true; } if (entry->state != SUBREL_STATE_READY) @@ -523,7 +538,7 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { if (entry->localreloid == reloid) { - entry->localreloid = InvalidOid; + entry->localrelvalid = false; hash_seq_term(&status); break; } @@ -537,7 +552,7 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localreloid = InvalidOid; + entry->localrelvalid = false; } } @@ -656,6 +671,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, entry->updatable = root->updatable; + entry->localrelvalid = true; + /* state and statelsn are left set to 0. */ MemoryContextSwitchTo(oldctx); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index a6b44b12bd1..62ddd3c7a2a 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -19,9 +19,16 @@ typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ - /* Mapping to local relation, filled as needed. */ + /* + * Validity flag -- when false, revalidate all derived info at next + * logicalrep_rel_open. (While the localrel is open, we assume our lock + * on that rel ensures the info remains good.) + */ + bool localrelvalid; + + /* Mapping to local relation. */ Oid localreloid; /* local relation id */ - Relation localrel; /* relcache entry */ + Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ |