diff options
Diffstat (limited to 'src/backend/replication/logical/relation.c')
-rw-r--r-- | src/backend/replication/logical/relation.c | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 3d7291b9703..2e7b755aebb 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL; static HTAB *LogicalRepRelMap = NULL; static HTAB *LogicalRepTypMap = NULL; +/* + * Partition map (LogicalRepPartMap) + * + * When a partitioned table is used as replication target, replicated + * operations are actually performed on its leaf partitions, which requires + * the partitions to also be mapped to the remote relation. Parent's entry + * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because + * individual partitions may have different attribute numbers, which means + * attribute mappings to remote relation's attributes must be maintained + * separately for each partition. + */ +static MemoryContext LogicalRepPartMapContext = NULL; +static HTAB *LogicalRepPartMap = NULL; +typedef struct LogicalRepPartMapEntry +{ + Oid partoid; /* LogicalRepPartMap's key */ + LogicalRepRelMapEntry relmapentry; +} LogicalRepPartMapEntry; /* * Relcache invalidation callback for our relation map cache. @@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid) Assert(OidIsValid(entry->remoteid)); return psprintf("%s.%s", entry->nspname, entry->typname); } + +/* + * Partition cache: look up partition LogicalRepRelMapEntry's + * + * Unlike relation map cache, this is keyed by partition OID, not remote + * relation OID, because we only have to use this cache in the case where + * partitions are not directly mapped to any remote relation, such as when + * replication is occurring with one of their ancestors as target. + */ + +/* + * Relcache invalidation callback + */ +static void +logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) +{ + LogicalRepRelMapEntry *entry; + + /* Just to be sure. */ + if (LogicalRepPartMap == NULL) + return; + + if (reloid != InvalidOid) + { + HASH_SEQ_STATUS status; + + hash_seq_init(&status, LogicalRepPartMap); + + /* TODO, use inverse lookup hashtable? */ + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->localreloid == reloid) + { + entry->localreloid = InvalidOid; + hash_seq_term(&status); + break; + } + } + } + else + { + /* invalidate all cache entries */ + HASH_SEQ_STATUS status; + + hash_seq_init(&status, LogicalRepPartMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + entry->localreloid = InvalidOid; + } +} + +/* + * Initialize the partition map cache. + */ +static void +logicalrep_partmap_init(void) +{ + HASHCTL ctl; + + if (!LogicalRepPartMapContext) + LogicalRepPartMapContext = + AllocSetContextCreate(CacheMemoryContext, + "LogicalRepPartMapContext", + ALLOCSET_DEFAULT_SIZES); + + /* Initialize the relation hash table. */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); /* partition OID */ + ctl.entrysize = sizeof(LogicalRepPartMapEntry); + ctl.hcxt = LogicalRepPartMapContext; + + LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb, + (Datum) 0); +} + +/* + * logicalrep_partition_open + * + * Returned entry reuses most of the values of the root table's entry, save + * the attribute map, which can be different for the partition. + * + * Note there's no logialrep_partition_close, because the caller closes the + * the component relation. + */ +LogicalRepRelMapEntry * +logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map) +{ + LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *part_entry; + LogicalRepRelation *remoterel = &root->remoterel; + Oid partOid = RelationGetRelid(partrel); + AttrMap *attrmap = root->attrmap; + bool found; + int i; + MemoryContext oldctx; + + if (LogicalRepPartMap == NULL) + logicalrep_partmap_init(); + + /* Search for existing entry. */ + part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap, + (void *) &partOid, + HASH_ENTER, &found); + + if (found) + return &part_entry->relmapentry; + + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + + /* Switch to longer-lived context. */ + oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); + + part_entry->partoid = partOid; + + /* Remote relation is used as-is from the root entry. */ + entry = &part_entry->relmapentry; + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + + entry->localrel = partrel; + entry->localreloid = partOid; + + /* + * If the partition's attributes don't match the root relation's, we'll + * need to make a new attrmap which maps partition attribute numbers to + * remoterel's, instead the original which maps root relation's attribute + * numbers to remoterel's. + * + * Note that 'map' which comes from the tuple routing data structure + * contains 1-based attribute numbers (of the parent relation). However, + * the map in 'entry', a logical replication data structure, contains + * 0-based attribute numbers (of the remote relation). + */ + if (map) + { + AttrNumber attno; + + entry->attrmap = make_attrmap(map->maplen); + for (attno = 0; attno < entry->attrmap->maplen; attno++) + { + AttrNumber root_attno = map->attnums[attno]; + + entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + } + } + else + entry->attrmap = attrmap; + + entry->updatable = root->updatable; + + /* state and statelsn are left set to 0. */ + MemoryContextSwitchTo(oldctx); + + return entry; +} |