aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/relation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/relation.c')
-rw-r--r--src/backend/replication/logical/relation.c189
1 files changed, 189 insertions, 0 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 3d7291b9703..2e7b755aebb 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
+/*
+ * Partition map (LogicalRepPartMap)
+ *
+ * When a partitioned table is used as replication target, replicated
+ * operations are actually performed on its leaf partitions, which requires
+ * the partitions to also be mapped to the remote relation. Parent's entry
+ * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
+ * individual partitions may have different attribute numbers, which means
+ * attribute mappings to remote relation's attributes must be maintained
+ * separately for each partition.
+ */
+static MemoryContext LogicalRepPartMapContext = NULL;
+static HTAB *LogicalRepPartMap = NULL;
+typedef struct LogicalRepPartMapEntry
+{
+ Oid partoid; /* LogicalRepPartMap's key */
+ LogicalRepRelMapEntry relmapentry;
+} LogicalRepPartMapEntry;
/*
* Relcache invalidation callback for our relation map cache.
@@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid)
Assert(OidIsValid(entry->remoteid));
return psprintf("%s.%s", entry->nspname, entry->typname);
}
+
+/*
+ * Partition cache: look up partition LogicalRepRelMapEntry's
+ *
+ * Unlike relation map cache, this is keyed by partition OID, not remote
+ * relation OID, because we only have to use this cache in the case where
+ * partitions are not directly mapped to any remote relation, such as when
+ * replication is occurring with one of their ancestors as target.
+ */
+
+/*
+ * Relcache invalidation callback
+ */
+static void
+logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
+{
+ LogicalRepRelMapEntry *entry;
+
+ /* Just to be sure. */
+ if (LogicalRepPartMap == NULL)
+ return;
+
+ if (reloid != InvalidOid)
+ {
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LogicalRepPartMap);
+
+ /* TODO, use inverse lookup hashtable? */
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ {
+ if (entry->localreloid == reloid)
+ {
+ entry->localreloid = InvalidOid;
+ hash_seq_term(&status);
+ break;
+ }
+ }
+ }
+ else
+ {
+ /* invalidate all cache entries */
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LogicalRepPartMap);
+
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ entry->localreloid = InvalidOid;
+ }
+}
+
+/*
+ * Initialize the partition map cache.
+ */
+static void
+logicalrep_partmap_init(void)
+{
+ HASHCTL ctl;
+
+ if (!LogicalRepPartMapContext)
+ LogicalRepPartMapContext =
+ AllocSetContextCreate(CacheMemoryContext,
+ "LogicalRepPartMapContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /* Initialize the relation hash table. */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid); /* partition OID */
+ ctl.entrysize = sizeof(LogicalRepPartMapEntry);
+ ctl.hcxt = LogicalRepPartMapContext;
+
+ LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+ /* Watch for invalidation events. */
+ CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
+ (Datum) 0);
+}
+
+/*
+ * logicalrep_partition_open
+ *
+ * Returned entry reuses most of the values of the root table's entry, save
+ * the attribute map, which can be different for the partition.
+ *
+ * Note there's no logialrep_partition_close, because the caller closes the
+ * the component relation.
+ */
+LogicalRepRelMapEntry *
+logicalrep_partition_open(LogicalRepRelMapEntry *root,
+ Relation partrel, AttrMap *map)
+{
+ LogicalRepRelMapEntry *entry;
+ LogicalRepPartMapEntry *part_entry;
+ LogicalRepRelation *remoterel = &root->remoterel;
+ Oid partOid = RelationGetRelid(partrel);
+ AttrMap *attrmap = root->attrmap;
+ bool found;
+ int i;
+ MemoryContext oldctx;
+
+ if (LogicalRepPartMap == NULL)
+ logicalrep_partmap_init();
+
+ /* Search for existing entry. */
+ part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
+ (void *) &partOid,
+ HASH_ENTER, &found);
+
+ if (found)
+ return &part_entry->relmapentry;
+
+ memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+
+ /* Switch to longer-lived context. */
+ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
+
+ part_entry->partoid = partOid;
+
+ /* Remote relation is used as-is from the root entry. */
+ entry = &part_entry->relmapentry;
+ entry->remoterel.remoteid = remoterel->remoteid;
+ entry->remoterel.nspname = pstrdup(remoterel->nspname);
+ entry->remoterel.relname = pstrdup(remoterel->relname);
+ entry->remoterel.natts = remoterel->natts;
+ entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+ entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+ for (i = 0; i < remoterel->natts; i++)
+ {
+ entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+ entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+ }
+ entry->remoterel.replident = remoterel->replident;
+ entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+
+ entry->localrel = partrel;
+ entry->localreloid = partOid;
+
+ /*
+ * If the partition's attributes don't match the root relation's, we'll
+ * need to make a new attrmap which maps partition attribute numbers to
+ * remoterel's, instead the original which maps root relation's attribute
+ * numbers to remoterel's.
+ *
+ * Note that 'map' which comes from the tuple routing data structure
+ * contains 1-based attribute numbers (of the parent relation). However,
+ * the map in 'entry', a logical replication data structure, contains
+ * 0-based attribute numbers (of the remote relation).
+ */
+ if (map)
+ {
+ AttrNumber attno;
+
+ entry->attrmap = make_attrmap(map->maplen);
+ for (attno = 0; attno < entry->attrmap->maplen; attno++)
+ {
+ AttrNumber root_attno = map->attnums[attno];
+
+ entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+ }
+ }
+ else
+ entry->attrmap = attrmap;
+
+ entry->updatable = root->updatable;
+
+ /* state and statelsn are left set to 0. */
+ MemoryContextSwitchTo(oldctx);
+
+ return entry;
+}