aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/relation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/relation.c')
-rw-r--r--src/backend/replication/logical/relation.c71
1 files changed, 44 insertions, 27 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 3d2d56295b0..9ee70a2563e 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -77,7 +77,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
if (entry->localreloid == reloid)
{
- entry->localreloid = InvalidOid;
+ entry->localrelvalid = false;
hash_seq_term(&status);
break;
}
@@ -91,7 +91,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
hash_seq_init(&status, LogicalRepRelMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
- entry->localreloid = InvalidOid;
+ entry->localrelvalid = false;
}
}
@@ -230,15 +230,13 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
/*
* Open the local relation associated with the remote one.
*
- * Optionally rebuilds the Relcache mapping if it was invalidated
- * by local DDL.
+ * Rebuilds the Relcache mapping if it was invalidated by local DDL.
*/
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
LogicalRepRelMapEntry *entry;
bool found;
- Oid relid = InvalidOid;
LogicalRepRelation *remoterel;
if (LogicalRepRelMap == NULL)
@@ -254,14 +252,45 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
remoterel = &entry->remoterel;
+ /* Ensure we don't leak a relcache refcount. */
+ if (entry->localrel)
+ elog(ERROR, "remote relation ID %u is already open", remoteid);
+
/*
* When opening and locking a relation, pending invalidation messages are
- * processed which can invalidate the relation. We need to update the
- * local cache both when we are first time accessing the relation and when
- * the relation is invalidated (aka entry->localreloid is set InvalidOid).
+ * processed which can invalidate the relation. Hence, if the entry is
+ * currently considered valid, try to open the local relation by OID and
+ * see if invalidation ensues.
+ */
+ if (entry->localrelvalid)
+ {
+ entry->localrel = try_table_open(entry->localreloid, lockmode);
+ if (!entry->localrel)
+ {
+ /* Table was renamed or dropped. */
+ entry->localrelvalid = false;
+ }
+ else if (!entry->localrelvalid)
+ {
+ /* Note we release the no-longer-useful lock here. */
+ table_close(entry->localrel, lockmode);
+ entry->localrel = NULL;
+ }
+ }
+
+ /*
+ * If the entry has been marked invalid since we last had lock on it,
+ * re-open the local relation by name and rebuild all derived data.
*/
- if (!OidIsValid(entry->localreloid))
+ if (!entry->localrelvalid)
{
+ Oid relid;
+ int found;
+ Bitmapset *idkey;
+ TupleDesc desc;
+ MemoryContext oldctx;
+ int i;
+
/* Try to find and lock the relation by name. */
relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
remoterel->relname, -1),
@@ -272,21 +301,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
errmsg("logical replication target relation \"%s.%s\" does not exist",
remoterel->nspname, remoterel->relname)));
entry->localrel = table_open(relid, NoLock);
-
- }
- else
- {
- relid = entry->localreloid;
- entry->localrel = table_open(entry->localreloid, lockmode);
- }
-
- if (!OidIsValid(entry->localreloid))
- {
- int found;
- Bitmapset *idkey;
- TupleDesc desc;
- MemoryContext oldctx;
- int i;
+ entry->localreloid = relid;
/* Check for supported relkind. */
CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
@@ -380,7 +395,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
}
}
- entry->localreloid = relid;
+ entry->localrelvalid = true;
}
if (entry->state != SUBREL_STATE_READY)
@@ -523,7 +538,7 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
{
if (entry->localreloid == reloid)
{
- entry->localreloid = InvalidOid;
+ entry->localrelvalid = false;
hash_seq_term(&status);
break;
}
@@ -537,7 +552,7 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
hash_seq_init(&status, LogicalRepPartMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
- entry->localreloid = InvalidOid;
+ entry->localrelvalid = false;
}
}
@@ -656,6 +671,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
entry->updatable = root->updatable;
+ entry->localrelvalid = true;
+
/* state and statelsn are left set to 0. */
MemoryContextSwitchTo(oldctx);