diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/alter.c | 7 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 13 | ||||
-rw-r--r-- | src/backend/commands/lockcmds.c | 136 | ||||
-rw-r--r-- | src/backend/commands/sequence.c | 15 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 13 | ||||
-rw-r--r-- | src/backend/commands/trigger.c | 18 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 11 |
7 files changed, 121 insertions, 92 deletions
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 08ee93b8eab..e1be451e35c 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -108,7 +108,12 @@ ExecRenameStmt(RenameStmt *stmt) CheckRelationOwnership(stmt->relation, true); - relid = RangeVarGetRelid(stmt->relation, false); + /* + * Lock level used here should match what will be taken later, + * in RenameRelation, renameatt, or renametrig. + */ + relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, + false, false); switch (stmt->renameType) { diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index b7c021d943a..50248540816 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -1531,9 +1531,15 @@ ReindexIndex(RangeVar *indexRelation) Oid indOid; HeapTuple tuple; - indOid = RangeVarGetRelid(indexRelation, false); + /* + * XXX: This is not safe in the presence of concurrent DDL. We should + * take AccessExclusiveLock here, but that would violate the rule that + * indexes should only be locked after their parent tables. For now, + * we live with it. + */ + indOid = RangeVarGetRelid(indexRelation, NoLock, false, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid)); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ + if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for relation %u", indOid); if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX) @@ -1562,7 +1568,8 @@ ReindexTable(RangeVar *relation) Oid heapOid; HeapTuple tuple; - heapOid = RangeVarGetRelid(relation, false); + /* The lock level used here should match reindex_relation(). */ + heapOid = RangeVarGetRelid(relation, ShareLock, false, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid)); if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ elog(ERROR, "cache lookup failed for relation %u", heapOid); diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index b2c6ea55593..875f868b969 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -24,8 +24,8 @@ #include "utils/acl.h" #include "utils/lsyscache.h" -static void LockTableRecurse(Oid reloid, RangeVar *rv, - LOCKMODE lockmode, bool nowait, bool recurse); +static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, + bool recurse); /* @@ -37,99 +37,39 @@ LockTableCommand(LockStmt *lockstmt) ListCell *p; /* + * During recovery we only accept these variations: LOCK TABLE foo IN + * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo + * IN ROW EXCLUSIVE MODE This test must match the restrictions defined + * in LockAcquire() + */ + if (lockstmt->mode > RowExclusiveLock) + PreventCommandDuringRecovery("LOCK TABLE"); + + /* * Iterate over the list and process the named relations one at a time */ foreach(p, lockstmt->relations) { - RangeVar *relation = (RangeVar *) lfirst(p); - bool recurse = interpretInhOption(relation->inhOpt); + RangeVar *rv = (RangeVar *) lfirst(p); + Relation rel; + bool recurse = interpretInhOption(rv->inhOpt); Oid reloid; - reloid = RangeVarGetRelid(relation, false); - - /* - * During recovery we only accept these variations: LOCK TABLE foo IN - * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo - * IN ROW EXCLUSIVE MODE This test must match the restrictions defined - * in LockAcquire() - */ - if (lockstmt->mode > RowExclusiveLock) - PreventCommandDuringRecovery("LOCK TABLE"); + reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait); + rel = relation_open(reloid, NoLock); - LockTableRecurse(reloid, relation, - lockstmt->mode, lockstmt->nowait, recurse); + LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse); } } /* * Apply LOCK TABLE recursively over an inheritance tree - * - * At top level, "rv" is the original command argument; we use it to throw - * an appropriate error message if the relation isn't there. Below top level, - * "rv" is NULL and we should just silently ignore any dropped child rel. */ static void -LockTableRecurse(Oid reloid, RangeVar *rv, - LOCKMODE lockmode, bool nowait, bool recurse) +LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse) { - Relation rel; AclResult aclresult; - - /* - * Acquire the lock. We must do this first to protect against concurrent - * drops. Note that a lock against an already-dropped relation's OID - * won't fail. - */ - if (nowait) - { - if (!ConditionalLockRelationOid(reloid, lockmode)) - { - /* try to throw error by name; relation could be deleted... */ - char *relname = rv ? rv->relname : get_rel_name(reloid); - - if (relname) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation \"%s\"", - relname))); - else - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation with OID %u", - reloid))); - } - } - else - LockRelationOid(reloid, lockmode); - - /* - * Now that we have the lock, check to see if the relation really exists - * or not. - */ - rel = try_relation_open(reloid, NoLock); - - if (!rel) - { - /* Release useless lock */ - UnlockRelationOid(reloid, lockmode); - - /* At top level, throw error; otherwise, ignore this child rel */ - if (rv) - { - if (rv->schemaname) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s.%s\" does not exist", - rv->schemaname, rv->relname))); - else - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - rv->relname))); - } - - return; - } + Oid reloid = RelationGetRelid(rel); /* Verify adequate privilege */ if (lockmode == AccessShareLock) @@ -159,12 +99,48 @@ LockTableRecurse(Oid reloid, RangeVar *rv, { List *children = find_inheritance_children(reloid, NoLock); ListCell *lc; + Relation childrel; foreach(lc, children) { Oid childreloid = lfirst_oid(lc); - LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse); + /* + * Acquire the lock, to protect against concurrent drops. Note + * that a lock against an already-dropped relation's OID won't + * fail. + */ + if (!nowait) + LockRelationOid(childreloid, lockmode); + else if (!ConditionalLockRelationOid(childreloid, lockmode)) + { + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(childreloid); + + if (relname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); + else + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation with OID %u", + reloid))); + } + + /* + * Now that we have the lock, check to see if the relation really + * exists or not. + */ + childrel = try_relation_open(childreloid, NoLock); + if (!childrel) + { + /* Release useless lock */ + UnlockRelationOid(childreloid, lockmode); + } + + LockTableRecurse(childrel, lockmode, nowait, recurse); } } diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index be04177a2ee..de6e2a3e33f 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -427,8 +427,8 @@ AlterSequence(AlterSeqStmt *stmt) FormData_pg_sequence new; List *owned_by; - /* open and AccessShareLock sequence */ - relid = RangeVarGetRelid(stmt->sequence, false); + /* Open and lock sequence. */ + relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false); init_sequence(relid, &elm, &seqrel); /* allow ALTER to sequence owner only */ @@ -507,7 +507,16 @@ nextval(PG_FUNCTION_ARGS) Oid relid; sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin)); - relid = RangeVarGetRelid(sequence, false); + + /* + * XXX: This is not safe in the presence of concurrent DDL, but + * acquiring a lock here is more expensive than letting nextval_internal + * do it, since the latter maintains a cache that keeps us from hitting + * the lock manager more than once per transaction. It's not clear + * whether the performance penalty is material in practice, but for now, + * we do it this way. + */ + relid = RangeVarGetRelid(sequence, NoLock, false, false); PG_RETURN_INT64(nextval_internal(relid)); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c1af12a5a30..295a1ff6e63 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -764,8 +764,15 @@ RemoveRelations(DropStmt *drop) */ AcceptInvalidationMessages(); - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(rel, true); + /* + * Look up the appropriate relation using namespace search. + * + * XXX: Doing this without a lock is unsafe in the presence of + * concurrent DDL, but acquiring a lock here might violate the rule + * that a table must be locked before its corresponding index. + * So, for now, we ignore the hazard. + */ + relOid = RangeVarGetRelid(rel, NoLock, true, false); /* Not there? */ if (!OidIsValid(relOid)) @@ -2234,6 +2241,8 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype) /* * Grab an exclusive lock on the target table, index, sequence or view, * which we will NOT release until end of transaction. + * + * Lock level used here should match ExecRenameStmt */ targetrelation = relation_open(myrelid, AccessExclusiveLock); diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 9cf8372e967..cadf3a15453 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -194,7 +194,17 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, RelationGetRelationName(rel)))); if (stmt->isconstraint && stmt->constrrel != NULL) - constrrelid = RangeVarGetRelid(stmt->constrrel, false); + { + /* + * We must take a lock on the target relation to protect against + * concurrent drop. It's not clear that AccessShareLock is strong + * enough, but we certainly need at least that much... otherwise, + * we might end up creating a pg_constraint entry referencing a + * nonexistent table. + */ + constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false, + false); + } /* permission checks */ if (!isInternal) @@ -1020,11 +1030,15 @@ ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) * DropTrigger - drop an individual trigger by name */ void -DropTrigger(Oid relid, const char *trigname, DropBehavior behavior, +DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior, bool missing_ok) { + Oid relid; ObjectAddress object; + /* lock level should match RemoveTriggerById */ + relid = RangeVarGetRelid(relation, ShareRowExclusiveLock, false, false); + object.classId = TriggerRelationId; object.objectId = get_trigger_oid(relid, trigname, missing_ok); object.objectSubId = 0; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 93039678630..889737e26e6 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -318,7 +318,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel) /* Process a specific relation */ Oid relid; - relid = RangeVarGetRelid(vacrel, false); + /* + * Since we don't take a lock here, the relation might be gone, + * or the RangeVar might no longer refer to the OID we look up + * here. In the former case, VACUUM will do nothing; in the + * latter case, it will process the OID we looked up here, rather + * than the new one. Neither is ideal, but there's little practical + * alternative, since we're going to commit this transaction and + * begin a new one between now and then. + */ + relid = RangeVarGetRelid(vacrel, NoLock, false, false); /* Make a relation list entry for this guy */ oldcontext = MemoryContextSwitchTo(vac_context); |