diff options
Diffstat (limited to 'src/backend/utils/cache/relcache.c')
-rw-r--r-- | src/backend/utils/cache/relcache.c | 57 |
1 files changed, 52 insertions, 5 deletions
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2810b35eea1..32313244adb 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -73,6 +73,7 @@ #include "utils/memutils.h" #include "utils/relmapper.h" #include "utils/resowner_private.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -235,7 +236,7 @@ static void formrdesc(const char *relationName, Oid relationReltype, bool isshared, bool hasoids, int natts, const FormData_pg_attribute *attrs); -static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK); +static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic); static Relation AllocateRelationDesc(Form_pg_class relp); static void RelationParseRelOptions(Relation relation, HeapTuple tuple); static void RelationBuildTupleDesc(Relation relation); @@ -274,12 +275,13 @@ static void unlink_initfile(const char *initfilename); * and must eventually be freed with heap_freetuple. */ static HeapTuple -ScanPgRelation(Oid targetRelId, bool indexOK) +ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic) { HeapTuple pg_class_tuple; Relation pg_class_desc; SysScanDesc pg_class_scan; ScanKeyData key[1]; + Snapshot snapshot; /* * If something goes wrong during backend startup, we might find ourselves @@ -305,9 +307,20 @@ ScanPgRelation(Oid targetRelId, bool indexOK) * scan by setting indexOK == false. */ pg_class_desc = heap_open(RelationRelationId, AccessShareLock); + + /* + * The caller might need a tuple that's newer than the one the historic + * snapshot; currently the only case requiring to do so is looking up the + * relfilenode of non mapped system relations during decoding. + */ + if (force_non_historic) + snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId); + else + snapshot = GetCatalogSnapshot(RelationRelationId); + pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId, indexOK && criticalRelcachesBuilt, - NULL, + snapshot, 1, key); pg_class_tuple = systable_getnext(pg_class_scan); @@ -836,7 +849,7 @@ RelationBuildDesc(Oid targetRelId, bool insertIt) /* * find the tuple in pg_class corresponding to the given relation id */ - pg_class_tuple = ScanPgRelation(targetRelId, true); + pg_class_tuple = ScanPgRelation(targetRelId, true, false); /* * if no such tuple exists, return NULL @@ -989,8 +1002,42 @@ RelationInitPhysicalAddr(Relation relation) relation->rd_node.dbNode = InvalidOid; else relation->rd_node.dbNode = MyDatabaseId; + if (relation->rd_rel->relfilenode) + { + /* + * Even if we are using a decoding snapshot that doesn't represent + * the current state of the catalog we need to make sure the + * filenode points to the current file since the older file will + * be gone (or truncated). The new file will still contain older + * rows so lookups in them will work correctly. This wouldn't work + * correctly if rewrites were allowed to change the schema in a + * noncompatible way, but those are prevented both on catalog + * tables and on user tables declared as additional catalog + * tables. + */ + if (HistoricSnapshotActive() + && RelationIsAccessibleInLogicalDecoding(relation) + && IsTransactionState()) + { + HeapTuple phys_tuple; + Form_pg_class physrel; + + phys_tuple = ScanPgRelation(RelationGetRelid(relation), + RelationGetRelid(relation) != ClassOidIndexId, + true); + if (!HeapTupleIsValid(phys_tuple)) + elog(ERROR, "could not find pg_class entry for %u", + RelationGetRelid(relation)); + physrel = (Form_pg_class) GETSTRUCT(phys_tuple); + + relation->rd_rel->reltablespace = physrel->reltablespace; + relation->rd_rel->relfilenode = physrel->relfilenode; + heap_freetuple(phys_tuple); + } + relation->rd_node.relNode = relation->rd_rel->relfilenode; + } else { /* Consult the relation mapper */ @@ -1742,7 +1789,7 @@ RelationReloadIndexInfo(Relation relation) * for pg_class_oid_index ... */ indexOK = (RelationGetRelid(relation) != ClassOidIndexId); - pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK); + pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false); if (!HeapTupleIsValid(pg_class_tuple)) elog(ERROR, "could not find pg_class tuple for index %u", RelationGetRelid(relation)); |