aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/relation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/relation.c')
-rw-r--r--src/backend/replication/logical/relation.c489
1 files changed, 489 insertions, 0 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
new file mode 100644
index 00000000000..383c6ebe76a
--- /dev/null
+++ b/src/backend/replication/logical/relation.c
@@ -0,0 +1,489 @@
+/*-------------------------------------------------------------------------
+ * relation.c
+ * PostgreSQL logical replication
+ *
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/relation.c
+ *
+ * NOTES
+ * This file contains helper functions for logical replication relation
+ * mapping cache.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/sysattr.h"
+#include "catalog/namespace.h"
+#include "nodes/makefuncs.h"
+#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
+#include "utils/builtins.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+static MemoryContext LogicalRepRelMapContext = NULL;
+
+static HTAB *LogicalRepRelMap = NULL;
+static HTAB *LogicalRepTypMap = NULL;
+
+static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
+ uint32 hashvalue);
+
+/*
+ * Relcache invalidation callback for our relation map cache.
+ */
+static void
+logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
+{
+ LogicalRepRelMapEntry *entry;
+
+ /* Just to be sure. */
+ if (LogicalRepRelMap == NULL)
+ return;
+
+ if (reloid != InvalidOid)
+ {
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LogicalRepRelMap);
+
+ /* TODO, use inverse lookup hashtable? */
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ {
+ if (entry->localreloid == reloid)
+ {
+ entry->localreloid = InvalidOid;
+ hash_seq_term(&status);
+ break;
+ }
+ }
+ }
+ else
+ {
+ /* invalidate all cache entries */
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LogicalRepRelMap);
+
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ entry->localreloid = InvalidOid;
+ }
+}
+
+/*
+ * Initialize the relation map cache.
+ */
+static void
+logicalrep_relmap_init()
+{
+ HASHCTL ctl;
+
+ if (!LogicalRepRelMapContext)
+ LogicalRepRelMapContext =
+ AllocSetContextCreate(CacheMemoryContext,
+ "LogicalRepRelMapContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /* Initialize the relation hash table. */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(LogicalRepRelId);
+ ctl.entrysize = sizeof(LogicalRepRelMapEntry);
+ ctl.hcxt = LogicalRepRelMapContext;
+
+ LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+ /* Initialize the type hash table. */
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(LogicalRepTyp);
+ ctl.hcxt = LogicalRepRelMapContext;
+
+ /* This will usually be small. */
+ LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
+ HASH_ELEM | HASH_BLOBS |HASH_CONTEXT);
+
+ /* Watch for invalidation events. */
+ CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
+ (Datum) 0);
+ CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
+ (Datum) 0);
+}
+
+/*
+ * Free the entry of a relation map cache.
+ */
+static void
+logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
+{
+ LogicalRepRelation *remoterel;
+
+ remoterel = &entry->remoterel;
+
+ pfree(remoterel->nspname);
+ pfree(remoterel->relname);
+
+ if (remoterel->natts > 0)
+ {
+ int i;
+
+ for (i = 0; i < remoterel->natts; i++)
+ pfree(remoterel->attnames[i]);
+
+ pfree(remoterel->attnames);
+ pfree(remoterel->atttyps);
+ }
+ remoterel->attnames = NULL;
+ remoterel->atttyps = NULL;
+
+ bms_free(remoterel->attkeys);
+ remoterel->attkeys = NULL;
+
+ if (entry->attrmap)
+ pfree(entry->attrmap);
+
+ entry->attrmap = NULL;
+ remoterel->natts = 0;
+ entry->localreloid = InvalidOid;
+ entry->localrel = NULL;
+}
+
+/*
+ * Add new entry or update existing entry in the relation map cache.
+ *
+ * Called when new relation mapping is sent by the publisher to update
+ * our expected view of incoming data from said publisher.
+ */
+void
+logicalrep_relmap_update(LogicalRepRelation *remoterel)
+{
+ MemoryContext oldctx;
+ LogicalRepRelMapEntry *entry;
+ bool found;
+ int i;
+
+ if (LogicalRepRelMap == NULL)
+ logicalrep_relmap_init();
+
+ /*
+ * HASH_ENTER returns the existing entry if present or creates a new one.
+ */
+ entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
+ HASH_ENTER, &found);
+
+ if (found)
+ logicalrep_relmap_free_entry(entry);
+
+ /* Make cached copy of the data */
+ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
+ entry->remoterel.remoteid = remoterel->remoteid;
+ entry->remoterel.nspname = pstrdup(remoterel->nspname);
+ entry->remoterel.relname = pstrdup(remoterel->relname);
+ entry->remoterel.natts = remoterel->natts;
+ entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+ entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+ for (i = 0; i < remoterel->natts; i++)
+ {
+ entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+ entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+ }
+ entry->remoterel.replident = remoterel->replident;
+ entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+ entry->attrmap = NULL;
+ entry->localreloid = InvalidOid;
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
+ * Find attribute index in TupleDesc struct by attribute name.
+ *
+ * Returns -1 if not found.
+ */
+static int
+logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
+{
+ int i;
+
+ for (i = 0; i < remoterel->natts; i++)
+ {
+ if (strcmp(remoterel->attnames[i], attname) == 0)
+ return i;
+ }
+
+ return -1;
+}
+
+/*
+ * Open the local relation associated with the remote one.
+ *
+ * Optionally rebuilds the Relcache mapping if it was invalidated
+ * by local DDL.
+ */
+LogicalRepRelMapEntry *
+logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
+{
+ LogicalRepRelMapEntry *entry;
+ bool found;
+
+ if (LogicalRepRelMap == NULL)
+ logicalrep_relmap_init();
+
+ /* Search for existing entry. */
+ entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
+ HASH_FIND, &found);
+
+ if (!found)
+ elog(ERROR, "no relation map entry for remote relation ID %u",
+ remoteid);
+
+ /* Need to update the local cache? */
+ if (!OidIsValid(entry->localreloid))
+ {
+ Oid relid;
+ int i;
+ int found;
+ Bitmapset *idkey;
+ TupleDesc desc;
+ LogicalRepRelation *remoterel;
+ MemoryContext oldctx;
+ remoterel = &entry->remoterel;
+
+ /* Try to find and lock the relation by name. */
+ relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
+ remoterel->relname, -1),
+ lockmode, true);
+ if (!OidIsValid(relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" does not exist",
+ remoterel->nspname, remoterel->relname)));
+ entry->localrel = heap_open(relid, NoLock);
+
+ /*
+ * We currently only support writing to regular and partitioned
+ * tables.
+ */
+ if (entry->localrel->rd_rel->relkind != RELKIND_RELATION)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("logical replication target relation \"%s.%s\" is not a table",
+ remoterel->nspname, remoterel->relname)));
+
+ /*
+ * Build the mapping of local attribute numbers to remote attribute
+ * numbers and validate that we don't miss any replicated columns
+ * as that would result in potentially unwanted data loss.
+ */
+ desc = RelationGetDescr(entry->localrel);
+ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
+ entry->attrmap = palloc(desc->natts * sizeof(int));
+ MemoryContextSwitchTo(oldctx);
+
+ found = 0;
+ for (i = 0; i < desc->natts; i++)
+ {
+ int attnum = logicalrep_rel_att_by_name(remoterel,
+ NameStr(desc->attrs[i]->attname));
+ entry->attrmap[i] = attnum;
+ if (attnum >= 0)
+ found++;
+ }
+
+ /* TODO, detail message with names of missing columns */
+ if (found < remoterel->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" is missing "
+ "some replicated columns",
+ remoterel->nspname, remoterel->relname)));
+
+ /*
+ * Check that replica identity matches. We allow for stricter replica
+ * identity (fewer columns) on subscriber as that will not stop us
+ * from finding unique tuple. IE, if publisher has identity
+ * (id,timestamp) and subscriber just (id) this will not be a problem,
+ * but in the opposite scenario it will.
+ *
+ * Don't throw any error here just mark the relation entry as not
+ * updatable, as replica identity is only for updates and deletes
+ * but inserts can be replicated even without it.
+ */
+ entry->updatable = true;
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ /* fallback to PK if no replica identity */
+ if (idkey == NULL)
+ {
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+ /*
+ * If no replica identity index and no PK, the published table
+ * must have replica identity FULL.
+ */
+ if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+ entry->updatable = false;
+ }
+
+ i = -1;
+ while ((i = bms_next_member(idkey, i)) >= 0)
+ {
+ int attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" uses "
+ "system columns in REPLICA IDENTITY index",
+ remoterel->nspname, remoterel->relname)));
+
+ attnum = AttrNumberGetAttrOffset(attnum);
+
+ if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
+ {
+ entry->updatable = false;
+ break;
+ }
+ }
+
+ entry->localreloid = relid;
+ }
+ else
+ entry->localrel = heap_open(entry->localreloid, lockmode);
+
+ return entry;
+}
+
+/*
+ * Close the previously opened logical relation.
+ */
+void
+logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
+{
+ heap_close(rel->localrel, lockmode);
+ rel->localrel = NULL;
+}
+
+
+/*
+ * Type cache invalidation callback for our type map cache.
+ */
+static void
+logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+ HASH_SEQ_STATUS status;
+ LogicalRepTyp *entry;
+
+ /* Just to be sure. */
+ if (LogicalRepTypMap == NULL)
+ return;
+
+ /* invalidate all cache entries */
+ hash_seq_init(&status, LogicalRepTypMap);
+
+ while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
+ entry->typoid = InvalidOid;
+}
+
+/*
+ * Free the type map cache entry data.
+ */
+static void
+logicalrep_typmap_free_entry(LogicalRepTyp *entry)
+{
+ pfree(entry->nspname);
+ pfree(entry->typname);
+
+ entry->typoid = InvalidOid;
+}
+
+/*
+ * Add new entry or update existing entry in the type map cache.
+ */
+void
+logicalrep_typmap_update(LogicalRepTyp *remotetyp)
+{
+ MemoryContext oldctx;
+ LogicalRepTyp *entry;
+ bool found;
+
+ if (LogicalRepTypMap == NULL)
+ logicalrep_relmap_init();
+
+ /*
+ * HASH_ENTER returns the existing entry if present or creates a new one.
+ */
+ entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
+ HASH_ENTER, &found);
+
+ if (found)
+ logicalrep_typmap_free_entry(entry);
+
+ /* Make cached copy of the data */
+ entry->remoteid = remotetyp->remoteid;
+ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
+ entry->nspname = pstrdup(remotetyp->nspname);
+ entry->typname = pstrdup(remotetyp->typname);
+ MemoryContextSwitchTo(oldctx);
+ entry->typoid = InvalidOid;
+}
+
+/*
+ * Fetch type info from the cache.
+ */
+Oid
+logicalrep_typmap_getid(Oid remoteid)
+{
+ LogicalRepTyp *entry;
+ bool found;
+ Oid nspoid;
+
+ /* Internal types are mapped directly. */
+ if (remoteid < FirstNormalObjectId)
+ {
+ if (!get_typisdefined(remoteid))
+ ereport(ERROR,
+ (errmsg("builtin type %u not found", remoteid),
+ errhint("This can be caused by having publisher with "
+ "higher major version than subscriber")));
+ return remoteid;
+ }
+
+ if (LogicalRepTypMap == NULL)
+ logicalrep_relmap_init();
+
+ /* Try finding the mapping. */
+ entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
+ HASH_FIND, &found);
+
+ if (!found)
+ elog(ERROR, "no type map entry for remote type %u",
+ remoteid);
+
+ /* Found and mapped, return the oid. */
+ if (OidIsValid(entry->typoid))
+ return entry->typoid;
+
+ /* Otherwise, try to map to local type. */
+ nspoid = LookupExplicitNamespace(entry->nspname, true);
+ if (OidIsValid(nspoid))
+ entry->typoid = GetSysCacheOid2(TYPENAMENSP,
+ PointerGetDatum(entry->typname),
+ ObjectIdGetDatum(nspoid));
+ else
+ entry->typoid = InvalidOid;
+
+ if (!OidIsValid(entry->typoid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("data type \"%s.%s\" required for logical replication does not exist",
+ entry->nspname, entry->typname)));
+
+ return entry->typoid;
+}