aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/cache/relcache.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/cache/relcache.c')
-rw-r--r--src/backend/utils/cache/relcache.c57
1 files changed, 52 insertions, 5 deletions
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2810b35eea1..32313244adb 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -73,6 +73,7 @@
#include "utils/memutils.h"
#include "utils/relmapper.h"
#include "utils/resowner_private.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
@@ -235,7 +236,7 @@ static void formrdesc(const char *relationName, Oid relationReltype,
bool isshared, bool hasoids,
int natts, const FormData_pg_attribute *attrs);
-static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
+static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
static Relation AllocateRelationDesc(Form_pg_class relp);
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
static void RelationBuildTupleDesc(Relation relation);
@@ -274,12 +275,13 @@ static void unlink_initfile(const char *initfilename);
* and must eventually be freed with heap_freetuple.
*/
static HeapTuple
-ScanPgRelation(Oid targetRelId, bool indexOK)
+ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
{
HeapTuple pg_class_tuple;
Relation pg_class_desc;
SysScanDesc pg_class_scan;
ScanKeyData key[1];
+ Snapshot snapshot;
/*
* If something goes wrong during backend startup, we might find ourselves
@@ -305,9 +307,20 @@ ScanPgRelation(Oid targetRelId, bool indexOK)
* scan by setting indexOK == false.
*/
pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
+
+ /*
+ * The caller might need a tuple that's newer than the one the historic
+ * snapshot; currently the only case requiring to do so is looking up the
+ * relfilenode of non mapped system relations during decoding.
+ */
+ if (force_non_historic)
+ snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
+ else
+ snapshot = GetCatalogSnapshot(RelationRelationId);
+
pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
indexOK && criticalRelcachesBuilt,
- NULL,
+ snapshot,
1, key);
pg_class_tuple = systable_getnext(pg_class_scan);
@@ -836,7 +849,7 @@ RelationBuildDesc(Oid targetRelId, bool insertIt)
/*
* find the tuple in pg_class corresponding to the given relation id
*/
- pg_class_tuple = ScanPgRelation(targetRelId, true);
+ pg_class_tuple = ScanPgRelation(targetRelId, true, false);
/*
* if no such tuple exists, return NULL
@@ -989,8 +1002,42 @@ RelationInitPhysicalAddr(Relation relation)
relation->rd_node.dbNode = InvalidOid;
else
relation->rd_node.dbNode = MyDatabaseId;
+
if (relation->rd_rel->relfilenode)
+ {
+ /*
+ * Even if we are using a decoding snapshot that doesn't represent
+ * the current state of the catalog we need to make sure the
+ * filenode points to the current file since the older file will
+ * be gone (or truncated). The new file will still contain older
+ * rows so lookups in them will work correctly. This wouldn't work
+ * correctly if rewrites were allowed to change the schema in a
+ * noncompatible way, but those are prevented both on catalog
+ * tables and on user tables declared as additional catalog
+ * tables.
+ */
+ if (HistoricSnapshotActive()
+ && RelationIsAccessibleInLogicalDecoding(relation)
+ && IsTransactionState())
+ {
+ HeapTuple phys_tuple;
+ Form_pg_class physrel;
+
+ phys_tuple = ScanPgRelation(RelationGetRelid(relation),
+ RelationGetRelid(relation) != ClassOidIndexId,
+ true);
+ if (!HeapTupleIsValid(phys_tuple))
+ elog(ERROR, "could not find pg_class entry for %u",
+ RelationGetRelid(relation));
+ physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
+
+ relation->rd_rel->reltablespace = physrel->reltablespace;
+ relation->rd_rel->relfilenode = physrel->relfilenode;
+ heap_freetuple(phys_tuple);
+ }
+
relation->rd_node.relNode = relation->rd_rel->relfilenode;
+ }
else
{
/* Consult the relation mapper */
@@ -1742,7 +1789,7 @@ RelationReloadIndexInfo(Relation relation)
* for pg_class_oid_index ...
*/
indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
- pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
+ pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
if (!HeapTupleIsValid(pg_class_tuple))
elog(ERROR, "could not find pg_class tuple for index %u",
RelationGetRelid(relation));