diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/analyze.c | 13 | ||||
-rw-r--r-- | src/backend/commands/cluster.c | 43 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 34 | ||||
-rw-r--r-- | src/backend/commands/lockcmds.c | 2 | ||||
-rw-r--r-- | src/backend/commands/matview.c | 3 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 18 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 76 |
7 files changed, 118 insertions, 71 deletions
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 29b6a20f1ed..28880bd2991 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -149,16 +149,15 @@ analyze_rel(Oid relid, RangeVar *relation, return; /* - * Check if relation needs to be skipped based on ownership. This check + * Check if relation needs to be skipped based on privileges. This check * happens also when building the relation list to analyze for a manual * operation, and needs to be done additionally here as ANALYZE could - * happen across multiple transactions where relation ownership could have - * changed in-between. Make sure to generate only logs for ANALYZE in - * this case. + * happen across multiple transactions where privileges could have changed + * in-between. Make sure to generate only logs for ANALYZE in this case. */ - if (!vacuum_is_relation_owner(RelationGetRelid(onerel), - onerel->rd_rel, - params->options & VACOPT_ANALYZE)) + if (!vacuum_is_permitted_for_relation(RelationGetRelid(onerel), + onerel->rd_rel, + params->options & ~VACOPT_VACUUM)) { relation_close(onerel, ShareUpdateExclusiveLock); return; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index cea7c8ea368..c04886c4090 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -77,6 +77,7 @@ static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, static List *get_tables_to_cluster(MemoryContext cluster_context); static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid); +static bool cluster_is_permitted_for_relation(Oid relid, Oid userid); /*--------------------------------------------------------------------------- @@ -144,7 +145,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) tableOid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock, 0, - RangeVarCallbackOwnsTable, NULL); + RangeVarCallbackMaintainsTable, + NULL); rel = table_open(tableOid, NoLock); /* @@ -362,8 +364,8 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params) */ if (recheck) { - /* Check that the user still owns the relation */ - if (!object_ownercheck(RelationRelationId, tableOid, save_userid)) + /* Check that the user still has privileges for the relation */ + if (!cluster_is_permitted_for_relation(tableOid, save_userid)) { relation_close(OldHeap, AccessExclusiveLock); goto out; @@ -1619,7 +1621,7 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, /* - * Get a list of tables that the current user owns and + * Get a list of tables that the current user has privileges on and * have indisclustered set. Return the list in a List * of RelToCluster * (stored in the specified memory context), each one giving the tableOid * and the indexOid on which the table is already clustered. @@ -1636,8 +1638,8 @@ get_tables_to_cluster(MemoryContext cluster_context) List *rtcs = NIL; /* - * Get all indexes that have indisclustered set and are owned by - * appropriate user. + * Get all indexes that have indisclustered set and that the current user + * has the appropriate privileges for. */ indRelation = table_open(IndexRelationId, AccessShareLock); ScanKeyInit(&entry, @@ -1651,7 +1653,7 @@ get_tables_to_cluster(MemoryContext cluster_context) index = (Form_pg_index) GETSTRUCT(indexTuple); - if (!object_ownercheck(RelationRelationId, index->indrelid, GetUserId())) + if (!cluster_is_permitted_for_relation(index->indrelid, GetUserId())) continue; /* Use a permanent memory context for the result list */ @@ -1699,10 +1701,13 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid) if (get_rel_relkind(indexrelid) != RELKIND_INDEX) continue; - /* Silently skip partitions which the user has no access to. */ - if (!object_ownercheck(RelationRelationId, relid, GetUserId()) && - (!object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) || - IsSharedRelation(relid))) + /* + * It's possible that the user does not have privileges to CLUSTER the + * leaf partition despite having such privileges on the partitioned + * table. We skip any partitions which the user is not permitted to + * CLUSTER. + */ + if (!cluster_is_permitted_for_relation(relid, GetUserId())) continue; /* Use a permanent memory context for the result list */ @@ -1718,3 +1723,19 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid) return rtcs; } + +/* + * Return whether userid has privileges to CLUSTER relid. If not, this + * function emits a WARNING. + */ +static bool +cluster_is_permitted_for_relation(Oid relid, Oid userid) +{ + if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK) + return true; + + ereport(WARNING, + (errmsg("permission denied to cluster \"%s\", skipping it", + get_rel_name(relid)))); + return false; +} diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 943a48bfa71..de89be8d759 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -28,6 +28,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" +#include "catalog/pg_authid.h" #include "catalog/pg_constraint.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" @@ -2947,6 +2948,7 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation, char relkind; struct ReindexIndexCallbackState *state = arg; LOCKMODE table_lockmode; + Oid table_oid; /* * Lock level here should match table lock in reindex_index() for @@ -2986,14 +2988,19 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation, errmsg("\"%s\" is not an index", relation->relname))); /* Check permissions */ - if (!object_ownercheck(RelationRelationId, relId, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX, relation->relname); + table_oid = IndexGetRelation(relId, true); + if (OidIsValid(table_oid)) + { + AclResult aclresult; + + aclresult = pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_INDEX, relation->relname); + } /* Lock heap before index to avoid deadlock. */ if (relId != oldRelId) { - Oid table_oid = IndexGetRelation(relId, true); - /* * If the OID isn't valid, it means the index was concurrently * dropped, which is not a problem for us; just return normally. @@ -3029,7 +3036,7 @@ ReindexTable(const ReindexStmt *stmt, const ReindexParams *params, bool isTopLev (params->options & REINDEXOPT_CONCURRENTLY) != 0 ? ShareUpdateExclusiveLock : ShareLock, 0, - RangeVarCallbackOwnsTable, NULL); + RangeVarCallbackMaintainsTable, NULL); if (get_rel_relkind(heapOid) == RELKIND_PARTITIONED_TABLE) ReindexPartitions(stmt, heapOid, params, isTopLevel); @@ -3113,7 +3120,8 @@ ReindexMultipleTables(const ReindexStmt *stmt, const ReindexParams *params) { objectOid = get_namespace_oid(objectName, false); - if (!object_ownercheck(NamespaceRelationId, objectOid, GetUserId())) + if (!object_ownercheck(NamespaceRelationId, objectOid, GetUserId()) && + !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN)) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA, objectName); } @@ -3125,7 +3133,8 @@ ReindexMultipleTables(const ReindexStmt *stmt, const ReindexParams *params) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("can only reindex the currently open database"))); - if (!object_ownercheck(DatabaseRelationId, objectOid, GetUserId())) + if (!object_ownercheck(DatabaseRelationId, objectOid, GetUserId()) && + !has_privs_of_role(GetUserId(), ROLE_PG_MAINTAIN)) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, get_database_name(objectOid)); } @@ -3197,15 +3206,12 @@ ReindexMultipleTables(const ReindexStmt *stmt, const ReindexParams *params) continue; /* - * The table can be reindexed if the user is superuser, the table - * owner, or the database/schema owner (but in the latter case, only - * if it's not a shared relation). object_ownercheck includes the - * superuser case, and depending on objectKind we already know that - * the user has permission to run REINDEX on this database or schema - * per the permission checks at the beginning of this routine. + * We already checked privileges on the database or schema, but we + * further restrict reindexing shared catalogs to roles with the + * MAINTAIN privilege on the relation. */ if (classtuple->relisshared && - !object_ownercheck(RelationRelationId, relid, GetUserId())) + pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK) continue; /* diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index 09ae09cf5cc..cd20ae76bad 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -283,7 +283,7 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid) AclMode aclmask; /* any of these privileges permit any lock mode */ - aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE; + aclmask = ACL_MAINTAIN | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE; /* SELECT privileges also permit ACCESS SHARE and below */ if (lockmode <= AccessShareLock) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 03373462f0e..6d09b755564 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -160,7 +160,8 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ matviewOid = RangeVarGetRelidExtended(stmt->relation, lockmode, 0, - RangeVarCallbackOwnsTable, NULL); + RangeVarCallbackMaintainsTable, + NULL); matviewRel = table_open(matviewOid, NoLock); relowner = matviewRel->rd_rel->relowner; diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 7014be80396..2470265561a 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -18096,15 +18096,16 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid, * This is intended as a callback for RangeVarGetRelidExtended(). It allows * the relation to be locked only if (1) it's a plain or partitioned table, * materialized view, or TOAST table and (2) the current user is the owner (or - * the superuser). This meets the permission-checking needs of CLUSTER, - * REINDEX TABLE, and REFRESH MATERIALIZED VIEW; we expose it here so that it - * can be used by all. + * the superuser) or has been granted MAINTAIN. This meets the + * permission-checking needs of CLUSTER, REINDEX TABLE, and REFRESH + * MATERIALIZED VIEW; we expose it here so that it can be used by all. */ void -RangeVarCallbackOwnsTable(const RangeVar *relation, - Oid relId, Oid oldRelId, void *arg) +RangeVarCallbackMaintainsTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) { char relkind; + AclResult aclresult; /* Nothing to do if the relation was not found. */ if (!OidIsValid(relId)) @@ -18125,8 +18126,11 @@ RangeVarCallbackOwnsTable(const RangeVar *relation, errmsg("\"%s\" is not a table or materialized view", relation->relname))); /* Check permissions */ - if (!object_ownercheck(RelationRelationId, relId, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname); + aclresult = pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, + get_relkind_objtype(get_rel_relkind(relId)), + relation->relname); } /* diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 223d33728d5..e63c86cae45 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -170,6 +170,9 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel) /* By default parallel vacuum is enabled */ params.nworkers = 0; + /* Will be set later if we recurse to a TOAST table. */ + params.toast_parent = InvalidOid; + /* * Set this to an invalid value so it is clear whether or not a * BUFFER_USAGE_LIMIT was specified when making the access strategy. @@ -695,32 +698,29 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, } /* - * Check if a given relation can be safely vacuumed or analyzed. If the - * user is not the relation owner, issue a WARNING log message and return - * false to let the caller decide what to do with this relation. This - * routine is used to decide if a relation can be processed for VACUUM or - * ANALYZE. + * Check if the current user has privileges to vacuum or analyze the relation. + * If not, issue a WARNING log message and return false to let the caller + * decide what to do with this relation. This routine is used to decide if a + * relation can be processed for VACUUM or ANALYZE. */ bool -vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, bits32 options) +vacuum_is_permitted_for_relation(Oid relid, Form_pg_class reltuple, + bits32 options) { char *relname; Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0); - /* - * Check permissions. - * - * We allow the user to vacuum or analyze a table if he is superuser, the - * table owner, or the database owner (but in the latter case, only if - * it's not a shared relation). object_ownercheck includes the superuser - * case. - * - * Note we choose to treat permissions failure as a WARNING and keep - * trying to vacuum or analyze the rest of the DB --- is this appropriate? + /*---------- + * A role has privileges to vacuum or analyze the relation if any of the + * following are true: + * - the role owns the current database and the relation is not shared + * - the role has the MAINTAIN privilege on the relation + *---------- */ - if (object_ownercheck(RelationRelationId, relid, GetUserId()) || - (object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared)) + if ((object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && + !reltuple->relisshared) || + pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK) return true; relname = NameStr(reltuple->relname); @@ -936,10 +936,10 @@ expand_vacuum_rel(VacuumRelation *vrel, MemoryContext vac_context, classForm = (Form_pg_class) GETSTRUCT(tuple); /* - * Make a returnable VacuumRelation for this rel if user is a proper - * owner. + * Make a returnable VacuumRelation for this rel if the user has the + * required privileges. */ - if (vacuum_is_relation_owner(relid, classForm, options)) + if (vacuum_is_permitted_for_relation(relid, classForm, options)) { oldcontext = MemoryContextSwitchTo(vac_context); vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation, @@ -1036,7 +1036,7 @@ get_all_vacuum_rels(MemoryContext vac_context, int options) continue; /* check permissions of relation */ - if (!vacuum_is_relation_owner(relid, classForm, options)) + if (!vacuum_is_permitted_for_relation(relid, classForm, options)) continue; /* @@ -1952,6 +1952,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, LOCKMODE lmode; Relation rel; LockRelId lockrelid; + Oid priv_relid; Oid toast_relid; Oid save_userid; int save_sec_context; @@ -2028,16 +2029,25 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, } /* - * Check if relation needs to be skipped based on ownership. This check + * When recursing to a TOAST table, check privileges on the parent. NB: + * This is only safe to do because we hold a session lock on the main + * relation that prevents concurrent deletion. + */ + if (OidIsValid(params->toast_parent)) + priv_relid = params->toast_parent; + else + priv_relid = RelationGetRelid(rel); + + /* + * Check if relation needs to be skipped based on privileges. This check * happens also when building the relation list to vacuum for a manual * operation, and needs to be done additionally here as VACUUM could - * happen across multiple transactions where relation ownership could have - * changed in-between. Make sure to only generate logs for VACUUM in this - * case. + * happen across multiple transactions where privileges could have changed + * in-between. Make sure to only generate logs for VACUUM in this case. */ - if (!vacuum_is_relation_owner(RelationGetRelid(rel), - rel->rd_rel, - params->options & VACOPT_VACUUM)) + if (!vacuum_is_permitted_for_relation(priv_relid, + rel->rd_rel, + params->options & ~VACOPT_ANALYZE)) { relation_close(rel, lmode); PopActiveSnapshot(); @@ -2224,9 +2234,15 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, { VacuumParams toast_vacuum_params; - /* force VACOPT_PROCESS_MAIN so vacuum_rel() processes it */ + /* + * Force VACOPT_PROCESS_MAIN so vacuum_rel() processes it. Likewise, + * set toast_parent so that the privilege checks are done on the main + * relation. NB: This is only safe to do because we hold a session + * lock on the main relation that prevents concurrent deletion. + */ memcpy(&toast_vacuum_params, params, sizeof(VacuumParams)); toast_vacuum_params.options |= VACOPT_PROCESS_MAIN; + toast_vacuum_params.toast_parent = relid; vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy); } |