aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/alter.c7
-rw-r--r--src/backend/commands/indexcmds.c13
-rw-r--r--src/backend/commands/lockcmds.c136
-rw-r--r--src/backend/commands/sequence.c15
-rw-r--r--src/backend/commands/tablecmds.c13
-rw-r--r--src/backend/commands/trigger.c18
-rw-r--r--src/backend/commands/vacuum.c11
7 files changed, 121 insertions, 92 deletions
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 08ee93b8eab..e1be451e35c 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -108,7 +108,12 @@ ExecRenameStmt(RenameStmt *stmt)
CheckRelationOwnership(stmt->relation, true);
- relid = RangeVarGetRelid(stmt->relation, false);
+ /*
+ * Lock level used here should match what will be taken later,
+ * in RenameRelation, renameatt, or renametrig.
+ */
+ relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
+ false, false);
switch (stmt->renameType)
{
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b7c021d943a..50248540816 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -1531,9 +1531,15 @@ ReindexIndex(RangeVar *indexRelation)
Oid indOid;
HeapTuple tuple;
- indOid = RangeVarGetRelid(indexRelation, false);
+ /*
+ * XXX: This is not safe in the presence of concurrent DDL. We should
+ * take AccessExclusiveLock here, but that would violate the rule that
+ * indexes should only be locked after their parent tables. For now,
+ * we live with it.
+ */
+ indOid = RangeVarGetRelid(indexRelation, NoLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid));
- if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
+ if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u", indOid);
if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
@@ -1562,7 +1568,8 @@ ReindexTable(RangeVar *relation)
Oid heapOid;
HeapTuple tuple;
- heapOid = RangeVarGetRelid(relation, false);
+ /* The lock level used here should match reindex_relation(). */
+ heapOid = RangeVarGetRelid(relation, ShareLock, false, false);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for relation %u", heapOid);
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index b2c6ea55593..875f868b969 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -24,8 +24,8 @@
#include "utils/acl.h"
#include "utils/lsyscache.h"
-static void LockTableRecurse(Oid reloid, RangeVar *rv,
- LOCKMODE lockmode, bool nowait, bool recurse);
+static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait,
+ bool recurse);
/*
@@ -37,99 +37,39 @@ LockTableCommand(LockStmt *lockstmt)
ListCell *p;
/*
+ * During recovery we only accept these variations: LOCK TABLE foo IN
+ * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo
+ * IN ROW EXCLUSIVE MODE This test must match the restrictions defined
+ * in LockAcquire()
+ */
+ if (lockstmt->mode > RowExclusiveLock)
+ PreventCommandDuringRecovery("LOCK TABLE");
+
+ /*
* Iterate over the list and process the named relations one at a time
*/
foreach(p, lockstmt->relations)
{
- RangeVar *relation = (RangeVar *) lfirst(p);
- bool recurse = interpretInhOption(relation->inhOpt);
+ RangeVar *rv = (RangeVar *) lfirst(p);
+ Relation rel;
+ bool recurse = interpretInhOption(rv->inhOpt);
Oid reloid;
- reloid = RangeVarGetRelid(relation, false);
-
- /*
- * During recovery we only accept these variations: LOCK TABLE foo IN
- * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo
- * IN ROW EXCLUSIVE MODE This test must match the restrictions defined
- * in LockAcquire()
- */
- if (lockstmt->mode > RowExclusiveLock)
- PreventCommandDuringRecovery("LOCK TABLE");
+ reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait);
+ rel = relation_open(reloid, NoLock);
- LockTableRecurse(reloid, relation,
- lockstmt->mode, lockstmt->nowait, recurse);
+ LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse);
}
}
/*
* Apply LOCK TABLE recursively over an inheritance tree
- *
- * At top level, "rv" is the original command argument; we use it to throw
- * an appropriate error message if the relation isn't there. Below top level,
- * "rv" is NULL and we should just silently ignore any dropped child rel.
*/
static void
-LockTableRecurse(Oid reloid, RangeVar *rv,
- LOCKMODE lockmode, bool nowait, bool recurse)
+LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse)
{
- Relation rel;
AclResult aclresult;
-
- /*
- * Acquire the lock. We must do this first to protect against concurrent
- * drops. Note that a lock against an already-dropped relation's OID
- * won't fail.
- */
- if (nowait)
- {
- if (!ConditionalLockRelationOid(reloid, lockmode))
- {
- /* try to throw error by name; relation could be deleted... */
- char *relname = rv ? rv->relname : get_rel_name(reloid);
-
- if (relname)
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation \"%s\"",
- relname)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
- errmsg("could not obtain lock on relation with OID %u",
- reloid)));
- }
- }
- else
- LockRelationOid(reloid, lockmode);
-
- /*
- * Now that we have the lock, check to see if the relation really exists
- * or not.
- */
- rel = try_relation_open(reloid, NoLock);
-
- if (!rel)
- {
- /* Release useless lock */
- UnlockRelationOid(reloid, lockmode);
-
- /* At top level, throw error; otherwise, ignore this child rel */
- if (rv)
- {
- if (rv->schemaname)
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s.%s\" does not exist",
- rv->schemaname, rv->relname)));
- else
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s\" does not exist",
- rv->relname)));
- }
-
- return;
- }
+ Oid reloid = RelationGetRelid(rel);
/* Verify adequate privilege */
if (lockmode == AccessShareLock)
@@ -159,12 +99,48 @@ LockTableRecurse(Oid reloid, RangeVar *rv,
{
List *children = find_inheritance_children(reloid, NoLock);
ListCell *lc;
+ Relation childrel;
foreach(lc, children)
{
Oid childreloid = lfirst_oid(lc);
- LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse);
+ /*
+ * Acquire the lock, to protect against concurrent drops. Note
+ * that a lock against an already-dropped relation's OID won't
+ * fail.
+ */
+ if (!nowait)
+ LockRelationOid(childreloid, lockmode);
+ else if (!ConditionalLockRelationOid(childreloid, lockmode))
+ {
+ /* try to throw error by name; relation could be deleted... */
+ char *relname = get_rel_name(childreloid);
+
+ if (relname)
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation \"%s\"",
+ relname)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain lock on relation with OID %u",
+ reloid)));
+ }
+
+ /*
+ * Now that we have the lock, check to see if the relation really
+ * exists or not.
+ */
+ childrel = try_relation_open(childreloid, NoLock);
+ if (!childrel)
+ {
+ /* Release useless lock */
+ UnlockRelationOid(childreloid, lockmode);
+ }
+
+ LockTableRecurse(childrel, lockmode, nowait, recurse);
}
}
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index be04177a2ee..de6e2a3e33f 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -427,8 +427,8 @@ AlterSequence(AlterSeqStmt *stmt)
FormData_pg_sequence new;
List *owned_by;
- /* open and AccessShareLock sequence */
- relid = RangeVarGetRelid(stmt->sequence, false);
+ /* Open and lock sequence. */
+ relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false);
init_sequence(relid, &elm, &seqrel);
/* allow ALTER to sequence owner only */
@@ -507,7 +507,16 @@ nextval(PG_FUNCTION_ARGS)
Oid relid;
sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin));
- relid = RangeVarGetRelid(sequence, false);
+
+ /*
+ * XXX: This is not safe in the presence of concurrent DDL, but
+ * acquiring a lock here is more expensive than letting nextval_internal
+ * do it, since the latter maintains a cache that keeps us from hitting
+ * the lock manager more than once per transaction. It's not clear
+ * whether the performance penalty is material in practice, but for now,
+ * we do it this way.
+ */
+ relid = RangeVarGetRelid(sequence, NoLock, false, false);
PG_RETURN_INT64(nextval_internal(relid));
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c1af12a5a30..295a1ff6e63 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -764,8 +764,15 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
- /* Look up the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(rel, true);
+ /*
+ * Look up the appropriate relation using namespace search.
+ *
+ * XXX: Doing this without a lock is unsafe in the presence of
+ * concurrent DDL, but acquiring a lock here might violate the rule
+ * that a table must be locked before its corresponding index.
+ * So, for now, we ignore the hazard.
+ */
+ relOid = RangeVarGetRelid(rel, NoLock, true, false);
/* Not there? */
if (!OidIsValid(relOid))
@@ -2234,6 +2241,8 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
+ *
+ * Lock level used here should match ExecRenameStmt
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 9cf8372e967..cadf3a15453 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -194,7 +194,17 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
RelationGetRelationName(rel))));
if (stmt->isconstraint && stmt->constrrel != NULL)
- constrrelid = RangeVarGetRelid(stmt->constrrel, false);
+ {
+ /*
+ * We must take a lock on the target relation to protect against
+ * concurrent drop. It's not clear that AccessShareLock is strong
+ * enough, but we certainly need at least that much... otherwise,
+ * we might end up creating a pg_constraint entry referencing a
+ * nonexistent table.
+ */
+ constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false,
+ false);
+ }
/* permission checks */
if (!isInternal)
@@ -1020,11 +1030,15 @@ ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid)
* DropTrigger - drop an individual trigger by name
*/
void
-DropTrigger(Oid relid, const char *trigname, DropBehavior behavior,
+DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior,
bool missing_ok)
{
+ Oid relid;
ObjectAddress object;
+ /* lock level should match RemoveTriggerById */
+ relid = RangeVarGetRelid(relation, ShareRowExclusiveLock, false, false);
+
object.classId = TriggerRelationId;
object.objectId = get_trigger_oid(relid, trigname, missing_ok);
object.objectSubId = 0;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 93039678630..889737e26e6 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -318,7 +318,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel)
/* Process a specific relation */
Oid relid;
- relid = RangeVarGetRelid(vacrel, false);
+ /*
+ * Since we don't take a lock here, the relation might be gone,
+ * or the RangeVar might no longer refer to the OID we look up
+ * here. In the former case, VACUUM will do nothing; in the
+ * latter case, it will process the OID we looked up here, rather
+ * than the new one. Neither is ideal, but there's little practical
+ * alternative, since we're going to commit this transaction and
+ * begin a new one between now and then.
+ */
+ relid = RangeVarGetRelid(vacrel, NoLock, false, false);
/* Make a relation list entry for this guy */
oldcontext = MemoryContextSwitchTo(vac_context);