aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/cache')
-rw-r--r--src/backend/utils/cache/inval.c35
-rw-r--r--src/backend/utils/cache/relcache.c136
-rw-r--r--src/backend/utils/cache/syscache.c69
3 files changed, 227 insertions, 13 deletions
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 1f502828038..11f9218f664 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -375,11 +375,16 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
{
SharedInvalidationMessage msg;
- /* Don't add a duplicate item */
- /* We assume dbId need not be checked because it will never change */
+ /*
+ * Don't add a duplicate item.
+ * We assume dbId need not be checked because it will never change.
+ * InvalidOid for relId means all relations so we don't need to add
+ * individual ones when it is present.
+ */
ProcessMessageList(hdr->rclist,
if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
- msg->rc.relId == relId)
+ (msg->rc.relId == relId ||
+ msg->rc.relId == InvalidOid))
return);
/* OK, add the item */
@@ -509,8 +514,10 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
/*
* If the relation being invalidated is one of those cached in the local
* relcache init file, mark that we need to zap that file at commit.
+ * Same is true when we are invalidating whole relcache.
*/
- if (OidIsValid(dbId) && RelationIdIsInInitFile(relId))
+ if (OidIsValid(dbId) &&
+ (RelationIdIsInInitFile(relId) || relId == InvalidOid))
transInvalInfo->RelcacheInitFileInval = true;
}
@@ -565,7 +572,10 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
{
int i;
- RelationCacheInvalidateEntry(msg->rc.relId);
+ if (msg->rc.relId == InvalidOid)
+ RelationCacheInvalidate();
+ else
+ RelationCacheInvalidateEntry(msg->rc.relId);
for (i = 0; i < relcache_callback_count; i++)
{
@@ -1227,6 +1237,21 @@ CacheInvalidateRelcache(Relation relation)
}
/*
+ * CacheInvalidateRelcacheAll
+ * Register invalidation of the whole relcache at the end of command.
+ *
+ * This is used by alter publication as changes in publications may affect
+ * large number of tables.
+ */
+void
+CacheInvalidateRelcacheAll(void)
+{
+ PrepareInvalidationState();
+
+ RegisterRelcacheInvalidation(InvalidOid, InvalidOid);
+}
+
+/*
* CacheInvalidateRelcacheByTuple
* As above, but relation is identified by passing its pg_class tuple.
*/
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 24678fcd487..26ff7e187a2 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -53,8 +53,10 @@
#include "catalog/pg_opclass.h"
#include "catalog/pg_partitioned_table.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_publication.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_subscription.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
@@ -103,6 +105,7 @@ static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_
static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
+static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
/*
* Hash tables that index the relation cache
@@ -2336,7 +2339,10 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
list_free(relation->rd_indexlist);
bms_free(relation->rd_indexattr);
bms_free(relation->rd_keyattr);
+ bms_free(relation->rd_pkattr);
bms_free(relation->rd_idattr);
+ if (relation->rd_pubactions)
+ pfree(relation->rd_pubactions);
if (relation->rd_options)
pfree(relation->rd_options);
if (relation->rd_indextuple)
@@ -3043,6 +3049,7 @@ AtEOXact_cleanup(Relation relation, bool isCommit)
list_free(relation->rd_indexlist);
relation->rd_indexlist = NIL;
relation->rd_oidindex = InvalidOid;
+ relation->rd_pkindex = InvalidOid;
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 0;
}
@@ -3155,6 +3162,7 @@ AtEOSubXact_cleanup(Relation relation, bool isCommit,
list_free(relation->rd_indexlist);
relation->rd_indexlist = NIL;
relation->rd_oidindex = InvalidOid;
+ relation->rd_pkindex = InvalidOid;
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 0;
}
@@ -3588,8 +3596,10 @@ RelationCacheInitializePhase2(void)
false, Natts_pg_auth_members, Desc_pg_auth_members);
formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
false, Natts_pg_shseclabel, Desc_pg_shseclabel);
+ formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
+ true, Natts_pg_subscription, Desc_pg_subscription);
-#define NUM_CRITICAL_SHARED_RELS 4 /* fix if you change list above */
+#define NUM_CRITICAL_SHARED_RELS 5 /* fix if you change list above */
}
MemoryContextSwitchTo(oldcxt);
@@ -4425,6 +4435,7 @@ RelationGetIndexList(Relation relation)
oldlist = relation->rd_indexlist;
relation->rd_indexlist = list_copy(result);
relation->rd_oidindex = oidIndex;
+ relation->rd_pkindex = pkeyIndex;
if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
relation->rd_replidindex = pkeyIndex;
else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
@@ -4492,7 +4503,7 @@ insert_ordered_oid(List *list, Oid datum)
* to ensure that a correct rd_indexattr set has been cached before first
* calling RelationSetIndexList; else a subsequent inquiry might cause a
* wrong rd_indexattr set to get computed and cached. Likewise, we do not
- * touch rd_keyattr or rd_idattr.
+ * touch rd_keyattr, rd_pkattr or rd_idattr.
*/
void
RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
@@ -4508,7 +4519,11 @@ RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
list_free(relation->rd_indexlist);
relation->rd_indexlist = indexIds;
relation->rd_oidindex = oidIndex;
- /* For the moment, assume the target rel hasn't got a replica index */
+ /*
+ * For the moment, assume the target rel hasn't got a pk or replica
+ * index. We'll load them on demand in the API that wraps access to them.
+ */
+ relation->rd_pkindex = InvalidOid;
relation->rd_replidindex = InvalidOid;
relation->rd_indexvalid = 2; /* mark list as forced */
/* Flag relation as needing eoxact cleanup (to reset the list) */
@@ -4544,6 +4559,27 @@ RelationGetOidIndex(Relation relation)
}
/*
+ * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
+ *
+ * Returns InvalidOid if there is no such index.
+ */
+Oid
+RelationGetPrimaryKeyIndex(Relation relation)
+{
+ List *ilist;
+
+ if (relation->rd_indexvalid == 0)
+ {
+ /* RelationGetIndexList does the heavy lifting. */
+ ilist = RelationGetIndexList(relation);
+ list_free(ilist);
+ Assert(relation->rd_indexvalid != 0);
+ }
+
+ return relation->rd_pkindex;
+}
+
+/*
* RelationGetReplicaIndex -- get OID of the relation's replica identity index
*
* Returns InvalidOid if there is no such index.
@@ -4722,8 +4758,10 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
{
Bitmapset *indexattrs; /* indexed columns */
Bitmapset *uindexattrs; /* columns in unique indexes */
+ Bitmapset *pkindexattrs; /* columns in the primary index */
Bitmapset *idindexattrs; /* columns in the replica identity */
List *indexoidlist;
+ Oid relpkindex;
Oid relreplindex;
ListCell *l;
MemoryContext oldcxt;
@@ -4737,6 +4775,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
return bms_copy(relation->rd_indexattr);
case INDEX_ATTR_BITMAP_KEY:
return bms_copy(relation->rd_keyattr);
+ case INDEX_ATTR_BITMAP_PRIMARY_KEY:
+ return bms_copy(relation->rd_pkattr);
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return bms_copy(relation->rd_idattr);
default:
@@ -4758,12 +4798,14 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
return NULL;
/*
- * Copy the rd_replidindex value computed by RelationGetIndexList before
- * proceeding. This is needed because a relcache flush could occur inside
- * index_open below, resetting the fields managed by RelationGetIndexList.
- * (The values we're computing will still be valid, assuming that caller
- * has a sufficient lock on the relation.)
+ * Copy the rd_pkindex and rd_replidindex value computed by
+ * RelationGetIndexList before proceeding. This is needed because a
+ * relcache flush could occur inside index_open below, resetting the
+ * fields managed by RelationGetIndexList. (The values we're computing
+ * will still be valid, assuming that caller has a sufficient lock on
+ * the relation.)
*/
+ relpkindex = relation->rd_pkindex;
relreplindex = relation->rd_replidindex;
/*
@@ -4778,6 +4820,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
*/
indexattrs = NULL;
uindexattrs = NULL;
+ pkindexattrs = NULL;
idindexattrs = NULL;
foreach(l, indexoidlist)
{
@@ -4786,6 +4829,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
IndexInfo *indexInfo;
int i;
bool isKey; /* candidate key */
+ bool isPK; /* primary key */
bool isIDKey; /* replica identity index */
indexDesc = index_open(indexOid, AccessShareLock);
@@ -4798,6 +4842,9 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
indexInfo->ii_Expressions == NIL &&
indexInfo->ii_Predicate == NIL;
+ /* Is this a primary key? */
+ isPK = (indexOid == relpkindex);
+
/* Is this index the configured (or default) replica identity? */
isIDKey = (indexOid == relreplindex);
@@ -4815,6 +4862,10 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
uindexattrs = bms_add_member(uindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
+ if (isPK)
+ pkindexattrs = bms_add_member(pkindexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+
if (isIDKey)
idindexattrs = bms_add_member(idindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
@@ -4837,6 +4888,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
relation->rd_indexattr = NULL;
bms_free(relation->rd_keyattr);
relation->rd_keyattr = NULL;
+ bms_free(relation->rd_pkattr);
+ relation->rd_pkattr = NULL;
bms_free(relation->rd_idattr);
relation->rd_idattr = NULL;
@@ -4849,6 +4902,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
*/
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
relation->rd_keyattr = bms_copy(uindexattrs);
+ relation->rd_pkattr = bms_copy(pkindexattrs);
relation->rd_idattr = bms_copy(idindexattrs);
relation->rd_indexattr = bms_copy(indexattrs);
MemoryContextSwitchTo(oldcxt);
@@ -4860,6 +4914,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
return indexattrs;
case INDEX_ATTR_BITMAP_KEY:
return uindexattrs;
+ case INDEX_ATTR_BITMAP_PRIMARY_KEY:
+ return bms_copy(relation->rd_pkattr);
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return idindexattrs;
default:
@@ -4992,6 +5048,67 @@ RelationGetExclusionInfo(Relation indexRelation,
MemoryContextSwitchTo(oldcxt);
}
+/*
+ * Get publication actions for the given relation.
+ */
+struct PublicationActions *
+GetRelationPublicationActions(Relation relation)
+{
+ List *puboids;
+ ListCell *lc;
+ MemoryContext oldcxt;
+ PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
+
+ if (relation->rd_pubactions)
+ return memcpy(pubactions, relation->rd_pubactions,
+ sizeof(PublicationActions));
+
+ /* Fetch the publication membership info. */
+ puboids = GetRelationPublications(RelationGetRelid(relation));
+ puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
+
+ foreach(lc, puboids)
+ {
+ Oid pubid = lfirst_oid(lc);
+ HeapTuple tup;
+ Form_pg_publication pubform;
+
+ tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication %u", pubid);
+
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ pubactions->pubinsert |= pubform->pubinsert;
+ pubactions->pubupdate |= pubform->pubupdate;
+ pubactions->pubdelete |= pubform->pubdelete;
+
+ ReleaseSysCache(tup);
+
+ /*
+ * If we know everything is replicated, there is no point to check
+ * for other publications.
+ */
+ if (pubactions->pubinsert && pubactions->pubupdate &&
+ pubactions->pubdelete)
+ break;
+ }
+
+ if (relation->rd_pubactions)
+ {
+ pfree(relation->rd_pubactions);
+ relation->rd_pubactions = NULL;
+ }
+
+ /* Now save copy of the actions in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ relation->rd_pubactions = palloc(sizeof(PublicationActions));
+ memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
+ MemoryContextSwitchTo(oldcxt);
+
+ return pubactions;
+}
/*
* Routines to support ereport() reports of relation-related errors
@@ -5407,10 +5524,13 @@ load_relcache_init_file(bool shared)
rel->rd_fkeyvalid = false;
rel->rd_indexlist = NIL;
rel->rd_oidindex = InvalidOid;
+ rel->rd_pkindex = InvalidOid;
rel->rd_replidindex = InvalidOid;
rel->rd_indexattr = NULL;
rel->rd_keyattr = NULL;
+ rel->rd_pkattr = NULL;
rel->rd_idattr = NULL;
+ rel->rd_pubactions = NULL;
rel->rd_createSubid = InvalidSubTransactionId;
rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
rel->rd_amcache = NULL;
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index d634a3b6831..bdfaa0ce758 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -50,6 +50,8 @@
#include "catalog/pg_opfamily.h"
#include "catalog/pg_partitioned_table.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
#include "catalog/pg_range.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_seclabel.h"
@@ -59,6 +61,7 @@
#include "catalog/pg_shseclabel.h"
#include "catalog/pg_replication_origin.h"
#include "catalog/pg_statistic.h"
+#include "catalog/pg_subscription.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_transform.h"
#include "catalog/pg_ts_config.h"
@@ -658,6 +661,50 @@ static const struct cachedesc cacheinfo[] = {
},
16
},
+ {PublicationRelationId, /* PUBLICATIONOID */
+ PublicationObjectIndexId,
+ 1,
+ {
+ ObjectIdAttributeNumber,
+ 0,
+ 0,
+ 0
+ },
+ 8
+ },
+ {PublicationRelationId, /* PUBLICATIONNAME */
+ PublicationNameIndexId,
+ 1,
+ {
+ Anum_pg_publication_pubname,
+ 0,
+ 0,
+ 0
+ },
+ 8
+ },
+ {PublicationRelRelationId, /* PUBLICATIONREL */
+ PublicationRelObjectIndexId,
+ 1,
+ {
+ ObjectIdAttributeNumber,
+ 0,
+ 0,
+ 0
+ },
+ 64
+ },
+ {PublicationRelRelationId, /* PUBLICATIONRELMAP */
+ PublicationRelMapIndexId,
+ 2,
+ {
+ Anum_pg_publication_rel_prrelid,
+ Anum_pg_publication_rel_prpubid,
+ 0,
+ 0
+ },
+ 64
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,
@@ -691,6 +738,28 @@ static const struct cachedesc cacheinfo[] = {
},
128
},
+ {SubscriptionRelationId, /* SUBSCRIPTIONOID */
+ SubscriptionObjectIndexId,
+ 1,
+ {
+ ObjectIdAttributeNumber,
+ 0,
+ 0,
+ 0
+ },
+ 4
+ },
+ {SubscriptionRelationId, /* SUBSCRIPTIONNAME */
+ SubscriptionNameIndexId,
+ 2,
+ {
+ Anum_pg_subscription_subdbid,
+ Anum_pg_subscription_subname,
+ 0,
+ 0
+ },
+ 4
+ },
{TableSpaceRelationId, /* TABLESPACEOID */
TablespaceOidIndexId,
1,