aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2011-07-08 22:19:30 -0400
committerRobert Haas <rhaas@postgresql.org>2011-07-08 22:19:30 -0400
commit4240e429d0c2d889d0cda23c618f94e12c13ade7 (patch)
tree306e1fac6085c29b55287f383ce8831d947e1312 /src/backend/commands
parent9d522cb35d8b4f266abadd0d019f68eb8802ae05 (diff)
downloadpostgresql-4240e429d0c2d889d0cda23c618f94e12c13ade7.tar.gz
postgresql-4240e429d0c2d889d0cda23c618f94e12c13ade7.zip
Try to acquire relation locks in RangeVarGetRelid.
In the previous coding, we would look up a relation in RangeVarGetRelid, lock the resulting OID, and then AcceptInvalidationMessages(). While this was sufficient to ensure that we noticed any changes to the relation definition before building the relcache entry, it didn't handle the possibility that the name we looked up no longer referenced the same OID. This was particularly problematic in the case where a table had been dropped and recreated: we'd latch on to the entry for the old relation and fail later on. Now, we acquire the relation lock inside RangeVarGetRelid, and retry the name lookup if we notice that invalidation messages have been processed meanwhile. Many operations that would previously have failed with an error in the presence of concurrent DDL will now succeed. There is a good deal of work remaining to be done here: many callers of RangeVarGetRelid still pass NoLock for one reason or another. In addition, nothing in this patch guards against the possibility that the meaning of an unqualified name might change due to the creation of a relation in a schema earlier in the user's search path than the one where it was previously found. Furthermore, there's nothing at all here to guard against similar race conditions for non-relations. For all that, it's a start. Noah Misch and Robert Haas
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/alter.c7
-rw-r--r--src/backend/commands/indexcmds.c13
-rw-r--r--src/backend/commands/lockcmds.c136
-rw-r--r--src/backend/commands/sequence.c15
-rw-r--r--src/backend/commands/tablecmds.c13
-rw-r--r--src/backend/commands/trigger.c18
-rw-r--r--src/backend/commands/vacuum.c11
7 files changed, 121 insertions, 92 deletions
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 08ee93b8eab..e1be451e35c 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -108,7 +108,12 @@ ExecRenameStmt(RenameStmt *stmt)
CheckRelationOwnership(stmt->relation, true);
- relid = RangeVarGetRelid(stmt->relation, false);
+ /*
+ * Lock level used here should match what will be taken later,
+ * in RenameRelation, renameatt, or renametrig.
+ */
+ relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
+ false, false);
switch (stmt->renameType)
{
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b7c021d943a..50248540816 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -1531,9 +1531,15 @@ ReindexIndex(RangeVar *indexRelation)
Oid indOid;
HeapTuple tuple;
- indOid = RangeVarGetRelid(indexRelation, false);
+ /*
+ * XXX: This is not safe in the presence of concurrent DDL. We should
+ * take AccessExclusiveLock here, but that would violate the rule that
+ * indexes should only be locked after their parent tables. For now,
+ * we live with it.
+ */
+ indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
- if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
+ if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", indOid);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
@@ -1562,7 +1568,8 @@ ReindexTable(RangeVar *relation)
Oid heapOid;
HeapTuple tuple;
- heapOid = RangeVarGetRelid(relation, false);
+ /* The lock level used here should match reindex_relation(). */
+ heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", heapOid);
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index b2c6ea55593..875f868b969 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -24,8 +24,8 @@
#include "utils/acl.h"
#include "utils/lsyscache.h"
-static void LockTableRecurse(Oid reloid, RangeVar *rv,
- LOCKMODE lockmode, bool nowait, bool recurse);
+static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
+ bool recurse);
/*
@@ -37,99 +37,39 @@ LockTableCommand(LockStmt *lockstmt)
ListCell *p;
/*
+ * During recovery we only accept these variations: LOCK TABLE foo IN
+ * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo
+ * IN ROW EXCLUSIVE MODE This test must match the restrictions defined
+ * in LockAcquire()
+ */
+ if (lockstmt->mode > RowExclusiveLock)
+ PreventCommandDuringRecovery("LOCK TABLE");
+
+ /*
* Iterate over the list and process the named relations one at a time
*/
foreach(p, lockstmt->relations)
{
- RangeVar *relation = (RangeVar *) lfirst(p);
- bool recurse = interpretInhOption(relation->inhOpt);
+ RangeVar *rv = (RangeVar *) lfirst(p);
+ Relation rel;
+ bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
- reloid = RangeVarGetRelid(relation, false);
-
- /*
- * During recovery we only accept these variations: LOCK TABLE foo IN
- * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo
- * IN ROW EXCLUSIVE MODE This test must match the restrictions defined
- * in LockAcquire()
- */
- if (lockstmt->mode > RowExclusiveLock)
- PreventCommandDuringRecovery("LOCK TABLE");
+ reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
+ rel = relation_open(reloid, NoLock);
- LockTableRecurse(reloid, relation,
- lockstmt->mode, lockstmt->nowait, recurse);
+ LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
}
}
/*
* Apply LOCK TABLE recursively over an inheritance tree
- *
- * At top level, "rv" is the original command argument; we use it to throw
- * an appropriate error message if the relation isn't there. Below top level,
- * "rv" is NULL and we should just silently ignore any dropped child rel.
*/
static void
-LockTableRecurse(Oid reloid, RangeVar *rv,
- LOCKMODE lockmode, bool nowait, bool recurse)
+LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
{
- Relation rel;
AclResult aclresult;
-
- /*
- * Acquire the lock. We must do this first to protect against concurrent
- * drops. Note that a lock against an already-dropped relation's OID
- * won't fail.
- */
- if (nowait)
- {
- if (!ConditionalLockRelationOid(reloid, lockmode))
- {
- /* try to throw error by name; relation could be deleted... */
- char *relname = rv ? rv->relname : get_rel_name(reloid);
-
- if (relname)
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation \"%s\"",
- relname)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation with OID %u",
- reloid)));
- }
- }
- else
- LockRelationOid(reloid, lockmode);
-
- /*
- * Now that we have the lock, check to see if the relation really exists
- * or not.
- */
- rel = try_relation_open(reloid, NoLock);
-
- if (!rel)
- {
- /* Release useless lock */
- UnlockRelationOid(reloid, lockmode);
-
- /* At top level, throw error; otherwise, ignore this child rel */
- if (rv)
- {
- if (rv->schemaname)
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s.%s\" does not exist",
- rv->schemaname, rv->relname)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s\" does not exist",
- rv->relname)));
- }
-
- return;
- }
+ Oid reloid = RelationGetRelid(rel);
/* Verify adequate privilege */
if (lockmode == AccessShareLock)
@@ -159,12 +99,48 @@ LockTableRecurse(Oid reloid, RangeVar *rv,
{
List *children = find_inheritance_children(reloid, NoLock);
ListCell *lc;
+ Relation childrel;
foreach(lc, children)
{
Oid childreloid = lfirst_oid(lc);
- LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse);
+ /*
+ * Acquire the lock, to protect against concurrent drops. Note
+ * that a lock against an already-dropped relation's OID won't
+ * fail.
+ */
+ if (!nowait)
+ LockRelationOid(childreloid, lockmode);
+ else if (!ConditionalLockRelationOid(childreloid, lockmode))
+ {
+ /* try to throw error by name; relation could be deleted... */
+ char *relname = get_rel_name(childreloid);
+
+ if (relname)
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation \"%s\"",
+ relname)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation with OID %u",
+ reloid)));
+ }
+
+ /*
+ * Now that we have the lock, check to see if the relation really
+ * exists or not.
+ */
+ childrel = try_relation_open(childreloid, NoLock);
+ if (!childrel)
+ {
+ /* Release useless lock */
+ UnlockRelationOid(childreloid, lockmode);
+ }
+
+ LockTableRecurse(childrel, lockmode, nowait, recurse);
}
}
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index be04177a2ee..de6e2a3e33f 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -427,8 +427,8 @@ AlterSequence(AlterSeqStmt *stmt)
FormData_pg_sequence new;
List *owned_by;
- /* open and AccessShareLock sequence */
- relid = RangeVarGetRelid(stmt->sequence, false);
+ /* Open and lock sequence. */
+ relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false);
init_sequence(relid, &elm, &seqrel);
/* allow ALTER to sequence owner only */
@@ -507,7 +507,16 @@ nextval(PG_FUNCTION_ARGS)
Oid relid;
sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
- relid = RangeVarGetRelid(sequence, false);
+
+ /*
+ * XXX: This is not safe in the presence of concurrent DDL, but
+ * acquiring a lock here is more expensive than letting nextval_internal
+ * do it, since the latter maintains a cache that keeps us from hitting
+ * the lock manager more than once per transaction. It's not clear
+ * whether the performance penalty is material in practice, but for now,
+ * we do it this way.
+ */
+ relid = RangeVarGetRelid(sequence, NoLock, false, false);
PG_RETURN_INT64(nextval_internal(relid));
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c1af12a5a30..295a1ff6e63 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -764,8 +764,15 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
- /* Look up the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(rel, true);
+ /*
+ * Look up the appropriate relation using namespace search.
+ *
+ * XXX: Doing this without a lock is unsafe in the presence of
+ * concurrent DDL, but acquiring a lock here might violate the rule
+ * that a table must be locked before its corresponding index.
+ * So, for now, we ignore the hazard.
+ */
+ relOid = RangeVarGetRelid(rel, NoLock, true, false);
/* Not there? */
if (!OidIsValid(relOid))
@@ -2234,6 +2241,8 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
+ *
+ * Lock level used here should match ExecRenameStmt
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 9cf8372e967..cadf3a15453 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -194,7 +194,17 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
RelationGetRelationName(rel))));
if (stmt->isconstraint && stmt->constrrel != NULL)
- constrrelid = RangeVarGetRelid(stmt->constrrel, false);
+ {
+ /*
+ * We must take a lock on the target relation to protect against
+ * concurrent drop. It's not clear that AccessShareLock is strong
+ * enough, but we certainly need at least that much... otherwise,
+ * we might end up creating a pg_constraint entry referencing a
+ * nonexistent table.
+ */
+ constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false,
+ false);
+ }
/* permission checks */
if (!isInternal)
@@ -1020,11 +1030,15 @@ ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid)
* DropTrigger - drop an individual trigger by name
*/
void
-DropTrigger(Oid relid, const char *trigname, DropBehavior behavior,
+DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior,
bool missing_ok)
{
+ Oid relid;
ObjectAddress object;
+ /* lock level should match RemoveTriggerById */
+ relid = RangeVarGetRelid(relation, ShareRowExclusiveLock, false, false);
+
object.classId = TriggerRelationId;
object.objectId = get_trigger_oid(relid, trigname, missing_ok);
object.objectSubId = 0;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 93039678630..889737e26e6 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -318,7 +318,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
/* Process a specific relation */
Oid relid;
- relid = RangeVarGetRelid(vacrel, false);
+ /*
+ * Since we don't take a lock here, the relation might be gone,
+ * or the RangeVar might no longer refer to the OID we look up
+ * here. In the former case, VACUUM will do nothing; in the
+ * latter case, it will process the OID we looked up here, rather
+ * than the new one. Neither is ideal, but there's little practical
+ * alternative, since we're going to commit this transaction and
+ * begin a new one between now and then.
+ */
+ relid = RangeVarGetRelid(vacrel, NoLock, false, false);
/* Make a relation list entry for this guy */
oldcontext = MemoryContextSwitchTo(vac_context);