diff options
Diffstat (limited to 'src/backend/utils/cache')
-rw-r--r-- | src/backend/utils/cache/inval.c | 35 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 136 | ||||
-rw-r--r-- | src/backend/utils/cache/syscache.c | 69 |
3 files changed, 227 insertions, 13 deletions
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 1f502828038..11f9218f664 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -375,11 +375,16 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, { SharedInvalidationMessage msg; - /* Don't add a duplicate item */ - /* We assume dbId need not be checked because it will never change */ + /* + * Don't add a duplicate item. + * We assume dbId need not be checked because it will never change. + * InvalidOid for relId means all relations so we don't need to add + * individual ones when it is present. + */ ProcessMessageList(hdr->rclist, if (msg->rc.id == SHAREDINVALRELCACHE_ID && - msg->rc.relId == relId) + (msg->rc.relId == relId || + msg->rc.relId == InvalidOid)) return); /* OK, add the item */ @@ -509,8 +514,10 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId) /* * If the relation being invalidated is one of those cached in the local * relcache init file, mark that we need to zap that file at commit. + * Same is true when we are invalidating whole relcache. */ - if (OidIsValid(dbId) && RelationIdIsInInitFile(relId)) + if (OidIsValid(dbId) && + (RelationIdIsInInitFile(relId) || relId == InvalidOid)) transInvalInfo->RelcacheInitFileInval = true; } @@ -565,7 +572,10 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) { int i; - RelationCacheInvalidateEntry(msg->rc.relId); + if (msg->rc.relId == InvalidOid) + RelationCacheInvalidate(); + else + RelationCacheInvalidateEntry(msg->rc.relId); for (i = 0; i < relcache_callback_count; i++) { @@ -1227,6 +1237,21 @@ CacheInvalidateRelcache(Relation relation) } /* + * CacheInvalidateRelcacheAll + * Register invalidation of the whole relcache at the end of command. + * + * This is used by alter publication as changes in publications may affect + * large number of tables. + */ +void +CacheInvalidateRelcacheAll(void) +{ + PrepareInvalidationState(); + + RegisterRelcacheInvalidation(InvalidOid, InvalidOid); +} + +/* * CacheInvalidateRelcacheByTuple * As above, but relation is identified by passing its pg_class tuple. */ diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 24678fcd487..26ff7e187a2 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -53,8 +53,10 @@ #include "catalog/pg_opclass.h" #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" +#include "catalog/pg_publication.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_shseclabel.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" @@ -103,6 +105,7 @@ static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_ static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members}; static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index}; static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel}; +static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription}; /* * Hash tables that index the relation cache @@ -2336,7 +2339,10 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc) list_free(relation->rd_indexlist); bms_free(relation->rd_indexattr); bms_free(relation->rd_keyattr); + bms_free(relation->rd_pkattr); bms_free(relation->rd_idattr); + if (relation->rd_pubactions) + pfree(relation->rd_pubactions); if (relation->rd_options) pfree(relation->rd_options); if (relation->rd_indextuple) @@ -3043,6 +3049,7 @@ AtEOXact_cleanup(Relation relation, bool isCommit) list_free(relation->rd_indexlist); relation->rd_indexlist = NIL; relation->rd_oidindex = InvalidOid; + relation->rd_pkindex = InvalidOid; relation->rd_replidindex = InvalidOid; relation->rd_indexvalid = 0; } @@ -3155,6 +3162,7 @@ AtEOSubXact_cleanup(Relation relation, bool isCommit, list_free(relation->rd_indexlist); relation->rd_indexlist = NIL; relation->rd_oidindex = InvalidOid; + relation->rd_pkindex = InvalidOid; relation->rd_replidindex = InvalidOid; relation->rd_indexvalid = 0; } @@ -3588,8 +3596,10 @@ RelationCacheInitializePhase2(void) false, Natts_pg_auth_members, Desc_pg_auth_members); formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true, false, Natts_pg_shseclabel, Desc_pg_shseclabel); + formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true, + true, Natts_pg_subscription, Desc_pg_subscription); -#define NUM_CRITICAL_SHARED_RELS 4 /* fix if you change list above */ +#define NUM_CRITICAL_SHARED_RELS 5 /* fix if you change list above */ } MemoryContextSwitchTo(oldcxt); @@ -4425,6 +4435,7 @@ RelationGetIndexList(Relation relation) oldlist = relation->rd_indexlist; relation->rd_indexlist = list_copy(result); relation->rd_oidindex = oidIndex; + relation->rd_pkindex = pkeyIndex; if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex)) relation->rd_replidindex = pkeyIndex; else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex)) @@ -4492,7 +4503,7 @@ insert_ordered_oid(List *list, Oid datum) * to ensure that a correct rd_indexattr set has been cached before first * calling RelationSetIndexList; else a subsequent inquiry might cause a * wrong rd_indexattr set to get computed and cached. Likewise, we do not - * touch rd_keyattr or rd_idattr. + * touch rd_keyattr, rd_pkattr or rd_idattr. */ void RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex) @@ -4508,7 +4519,11 @@ RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex) list_free(relation->rd_indexlist); relation->rd_indexlist = indexIds; relation->rd_oidindex = oidIndex; - /* For the moment, assume the target rel hasn't got a replica index */ + /* + * For the moment, assume the target rel hasn't got a pk or replica + * index. We'll load them on demand in the API that wraps access to them. + */ + relation->rd_pkindex = InvalidOid; relation->rd_replidindex = InvalidOid; relation->rd_indexvalid = 2; /* mark list as forced */ /* Flag relation as needing eoxact cleanup (to reset the list) */ @@ -4544,6 +4559,27 @@ RelationGetOidIndex(Relation relation) } /* + * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index + * + * Returns InvalidOid if there is no such index. + */ +Oid +RelationGetPrimaryKeyIndex(Relation relation) +{ + List *ilist; + + if (relation->rd_indexvalid == 0) + { + /* RelationGetIndexList does the heavy lifting. */ + ilist = RelationGetIndexList(relation); + list_free(ilist); + Assert(relation->rd_indexvalid != 0); + } + + return relation->rd_pkindex; +} + +/* * RelationGetReplicaIndex -- get OID of the relation's replica identity index * * Returns InvalidOid if there is no such index. @@ -4722,8 +4758,10 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) { Bitmapset *indexattrs; /* indexed columns */ Bitmapset *uindexattrs; /* columns in unique indexes */ + Bitmapset *pkindexattrs; /* columns in the primary index */ Bitmapset *idindexattrs; /* columns in the replica identity */ List *indexoidlist; + Oid relpkindex; Oid relreplindex; ListCell *l; MemoryContext oldcxt; @@ -4737,6 +4775,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) return bms_copy(relation->rd_indexattr); case INDEX_ATTR_BITMAP_KEY: return bms_copy(relation->rd_keyattr); + case INDEX_ATTR_BITMAP_PRIMARY_KEY: + return bms_copy(relation->rd_pkattr); case INDEX_ATTR_BITMAP_IDENTITY_KEY: return bms_copy(relation->rd_idattr); default: @@ -4758,12 +4798,14 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) return NULL; /* - * Copy the rd_replidindex value computed by RelationGetIndexList before - * proceeding. This is needed because a relcache flush could occur inside - * index_open below, resetting the fields managed by RelationGetIndexList. - * (The values we're computing will still be valid, assuming that caller - * has a sufficient lock on the relation.) + * Copy the rd_pkindex and rd_replidindex value computed by + * RelationGetIndexList before proceeding. This is needed because a + * relcache flush could occur inside index_open below, resetting the + * fields managed by RelationGetIndexList. (The values we're computing + * will still be valid, assuming that caller has a sufficient lock on + * the relation.) */ + relpkindex = relation->rd_pkindex; relreplindex = relation->rd_replidindex; /* @@ -4778,6 +4820,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) */ indexattrs = NULL; uindexattrs = NULL; + pkindexattrs = NULL; idindexattrs = NULL; foreach(l, indexoidlist) { @@ -4786,6 +4829,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) IndexInfo *indexInfo; int i; bool isKey; /* candidate key */ + bool isPK; /* primary key */ bool isIDKey; /* replica identity index */ indexDesc = index_open(indexOid, AccessShareLock); @@ -4798,6 +4842,9 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) indexInfo->ii_Expressions == NIL && indexInfo->ii_Predicate == NIL; + /* Is this a primary key? */ + isPK = (indexOid == relpkindex); + /* Is this index the configured (or default) replica identity? */ isIDKey = (indexOid == relreplindex); @@ -4815,6 +4862,10 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) uindexattrs = bms_add_member(uindexattrs, attrnum - FirstLowInvalidHeapAttributeNumber); + if (isPK) + pkindexattrs = bms_add_member(pkindexattrs, + attrnum - FirstLowInvalidHeapAttributeNumber); + if (isIDKey) idindexattrs = bms_add_member(idindexattrs, attrnum - FirstLowInvalidHeapAttributeNumber); @@ -4837,6 +4888,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) relation->rd_indexattr = NULL; bms_free(relation->rd_keyattr); relation->rd_keyattr = NULL; + bms_free(relation->rd_pkattr); + relation->rd_pkattr = NULL; bms_free(relation->rd_idattr); relation->rd_idattr = NULL; @@ -4849,6 +4902,7 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); relation->rd_keyattr = bms_copy(uindexattrs); + relation->rd_pkattr = bms_copy(pkindexattrs); relation->rd_idattr = bms_copy(idindexattrs); relation->rd_indexattr = bms_copy(indexattrs); MemoryContextSwitchTo(oldcxt); @@ -4860,6 +4914,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) return indexattrs; case INDEX_ATTR_BITMAP_KEY: return uindexattrs; + case INDEX_ATTR_BITMAP_PRIMARY_KEY: + return bms_copy(relation->rd_pkattr); case INDEX_ATTR_BITMAP_IDENTITY_KEY: return idindexattrs; default: @@ -4992,6 +5048,67 @@ RelationGetExclusionInfo(Relation indexRelation, MemoryContextSwitchTo(oldcxt); } +/* + * Get publication actions for the given relation. + */ +struct PublicationActions * +GetRelationPublicationActions(Relation relation) +{ + List *puboids; + ListCell *lc; + MemoryContext oldcxt; + PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); + + if (relation->rd_pubactions) + return memcpy(pubactions, relation->rd_pubactions, + sizeof(PublicationActions)); + + /* Fetch the publication membership info. */ + puboids = GetRelationPublications(RelationGetRelid(relation)); + puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + + foreach(lc, puboids) + { + Oid pubid = lfirst_oid(lc); + HeapTuple tup; + Form_pg_publication pubform; + + tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication %u", pubid); + + pubform = (Form_pg_publication) GETSTRUCT(tup); + + pubactions->pubinsert |= pubform->pubinsert; + pubactions->pubupdate |= pubform->pubupdate; + pubactions->pubdelete |= pubform->pubdelete; + + ReleaseSysCache(tup); + + /* + * If we know everything is replicated, there is no point to check + * for other publications. + */ + if (pubactions->pubinsert && pubactions->pubupdate && + pubactions->pubdelete) + break; + } + + if (relation->rd_pubactions) + { + pfree(relation->rd_pubactions); + relation->rd_pubactions = NULL; + } + + /* Now save copy of the actions in the relcache entry. */ + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + relation->rd_pubactions = palloc(sizeof(PublicationActions)); + memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions)); + MemoryContextSwitchTo(oldcxt); + + return pubactions; +} /* * Routines to support ereport() reports of relation-related errors @@ -5407,10 +5524,13 @@ load_relcache_init_file(bool shared) rel->rd_fkeyvalid = false; rel->rd_indexlist = NIL; rel->rd_oidindex = InvalidOid; + rel->rd_pkindex = InvalidOid; rel->rd_replidindex = InvalidOid; rel->rd_indexattr = NULL; rel->rd_keyattr = NULL; + rel->rd_pkattr = NULL; rel->rd_idattr = NULL; + rel->rd_pubactions = NULL; rel->rd_createSubid = InvalidSubTransactionId; rel->rd_newRelfilenodeSubid = InvalidSubTransactionId; rel->rd_amcache = NULL; diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d634a3b6831..bdfaa0ce758 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -50,6 +50,8 @@ #include "catalog/pg_opfamily.h" #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" +#include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" #include "catalog/pg_range.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_seclabel.h" @@ -59,6 +61,7 @@ #include "catalog/pg_shseclabel.h" #include "catalog/pg_replication_origin.h" #include "catalog/pg_statistic.h" +#include "catalog/pg_subscription.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_transform.h" #include "catalog/pg_ts_config.h" @@ -658,6 +661,50 @@ static const struct cachedesc cacheinfo[] = { }, 16 }, + {PublicationRelationId, /* PUBLICATIONOID */ + PublicationObjectIndexId, + 1, + { + ObjectIdAttributeNumber, + 0, + 0, + 0 + }, + 8 + }, + {PublicationRelationId, /* PUBLICATIONNAME */ + PublicationNameIndexId, + 1, + { + Anum_pg_publication_pubname, + 0, + 0, + 0 + }, + 8 + }, + {PublicationRelRelationId, /* PUBLICATIONREL */ + PublicationRelObjectIndexId, + 1, + { + ObjectIdAttributeNumber, + 0, + 0, + 0 + }, + 64 + }, + {PublicationRelRelationId, /* PUBLICATIONRELMAP */ + PublicationRelMapIndexId, + 2, + { + Anum_pg_publication_rel_prrelid, + Anum_pg_publication_rel_prpubid, + 0, + 0 + }, + 64 + }, {RewriteRelationId, /* RULERELNAME */ RewriteRelRulenameIndexId, 2, @@ -691,6 +738,28 @@ static const struct cachedesc cacheinfo[] = { }, 128 }, + {SubscriptionRelationId, /* SUBSCRIPTIONOID */ + SubscriptionObjectIndexId, + 1, + { + ObjectIdAttributeNumber, + 0, + 0, + 0 + }, + 4 + }, + {SubscriptionRelationId, /* SUBSCRIPTIONNAME */ + SubscriptionNameIndexId, + 2, + { + Anum_pg_subscription_subdbid, + Anum_pg_subscription_subname, + 0, + 0 + }, + 4 + }, {TableSpaceRelationId, /* TABLESPACEOID */ TablespaceOidIndexId, 1, |