aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2006-07-31 20:09:10 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2006-07-31 20:09:10 +0000
commit09d3670df3e4593be1d2299a62d982829016b847 (patch)
tree7a62e91c1cf595d0334dd2c196d1c79835cc267b /src/backend/storage
parent4cd72b53b9975bab5f4ca0792cf4f54c84829404 (diff)
downloadpostgresql-09d3670df3e4593be1d2299a62d982829016b847.tar.gz
postgresql-09d3670df3e4593be1d2299a62d982829016b847.zip
Change the relation_open protocol so that we obtain lock on a relation
(table or index) before trying to open its relcache entry. This fixes race conditions in which someone else commits a change to the relation's catalog entries while we are in process of doing relcache load. Problems of that ilk have been reported sporadically for years, but it was not really practical to fix until recently --- for instance, the recent addition of WAL-log support for in-place updates helped. Along the way, remove pg_am.amconcurrent: all AMs are now expected to support concurrent update.
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/large_object/inv_api.c12
-rw-r--r--src/backend/storage/lmgr/lmgr.c215
-rw-r--r--src/backend/storage/lmgr/lock.c11
3 files changed, 176 insertions, 62 deletions
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c
index fd6ae5abc32..582eabdb2ad 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -17,7 +17,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.118 2006/07/14 14:52:23 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/large_object/inv_api.c,v 1.119 2006/07/31 20:09:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -69,7 +69,7 @@ open_lo_relation(void)
if (lo_heap_r == NULL)
lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
if (lo_index_r == NULL)
- lo_index_r = index_open(LargeObjectLOidPNIndexId);
+ lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
}
PG_CATCH();
{
@@ -103,7 +103,7 @@ close_lo_relation(bool isCommit)
CurrentResourceOwner = TopTransactionResourceOwner;
if (lo_index_r)
- index_close(lo_index_r);
+ index_close(lo_index_r, NoLock);
if (lo_heap_r)
heap_close(lo_heap_r, NoLock);
}
@@ -314,7 +314,7 @@ inv_getsize(LargeObjectDesc *obj_desc)
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(obj_desc->id));
- sd = index_beginscan(lo_heap_r, lo_index_r, true,
+ sd = index_beginscan(lo_heap_r, lo_index_r,
obj_desc->snapshot, 1, skey);
/*
@@ -425,7 +425,7 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
BTGreaterEqualStrategyNumber, F_INT4GE,
Int32GetDatum(pageno));
- sd = index_beginscan(lo_heap_r, lo_index_r, true,
+ sd = index_beginscan(lo_heap_r, lo_index_r,
obj_desc->snapshot, 2, skey);
while ((tuple = index_getnext(sd, ForwardScanDirection)) != NULL)
@@ -541,7 +541,7 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
BTGreaterEqualStrategyNumber, F_INT4GE,
Int32GetDatum(pageno));
- sd = index_beginscan(lo_heap_r, lo_index_r, false /* got lock */,
+ sd = index_beginscan(lo_heap_r, lo_index_r,
obj_desc->snapshot, 2, skey);
oldtuple = NULL;
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 8e99d4be48f..ad7ee3e6013 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.85 2006/07/14 16:59:19 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.86 2006/07/31 20:09:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -18,10 +18,13 @@
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
+#include "catalog/catalog.h"
+#include "catalog/namespace.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "utils/inval.h"
+#include "utils/lsyscache.h"
/*
@@ -45,7 +48,105 @@ RelationInitLockInfo(Relation relation)
}
/*
+ * SetLocktagRelationOid
+ * Set up a locktag for a relation, given only relation OID
+ */
+static inline void
+SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
+{
+ Oid dbid;
+
+ if (IsSharedRelation(relid))
+ dbid = InvalidOid;
+ else
+ dbid = MyDatabaseId;
+
+ SET_LOCKTAG_RELATION(*tag, dbid, relid);
+}
+
+/*
+ * LockRelationOid
+ *
+ * Lock a relation given only its OID. This should generally be used
+ * before attempting to open the relation's relcache entry.
+ */
+void
+LockRelationOid(Oid relid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+ LockAcquireResult res;
+
+ SetLocktagRelationOid(&tag, relid);
+
+ res = LockAcquire(&tag, lockmode, false, false);
+
+ /*
+ * Now that we have the lock, check for invalidation messages, so that
+ * we will update or flush any stale relcache entry before we try to use
+ * it. We can skip this in the not-uncommon case that we already had
+ * the same type of lock being requested, since then no one else could
+ * have modified the relcache entry in an undesirable way. (In the
+ * case where our own xact modifies the rel, the relcache update happens
+ * via CommandCounterIncrement, not here.)
+ */
+ if (res != LOCKACQUIRE_ALREADY_HELD)
+ AcceptInvalidationMessages();
+}
+
+/*
+ * ConditionalLockRelationOid
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE iff the lock was acquired.
+ *
+ * NOTE: we do not currently need conditional versions of all the
+ * LockXXX routines in this file, but they could easily be added if needed.
+ */
+bool
+ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+ LockAcquireResult res;
+
+ SetLocktagRelationOid(&tag, relid);
+
+ res = LockAcquire(&tag, lockmode, false, true);
+
+ if (res == LOCKACQUIRE_NOT_AVAIL)
+ return false;
+
+ /*
+ * Now that we have the lock, check for invalidation messages; see
+ * notes in LockRelationOid.
+ */
+ if (res != LOCKACQUIRE_ALREADY_HELD)
+ AcceptInvalidationMessages();
+
+ return true;
+}
+
+/*
+ * UnlockRelationId
+ *
+ * Note: we don't supply UnlockRelationOid since it's normally easy for
+ * callers to provide the LockRelId info from a relcache entry.
+ */
+void
+UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
+
+ LockRelease(&tag, lockmode, false);
+}
+
+/*
* LockRelation
+ *
+ * This is a convenience routine for acquiring an additional lock on an
+ * already-open relation. Never try to do "relation_open(foo, NoLock)"
+ * and then lock with this.
*/
void
LockRelation(Relation relation, LOCKMODE lockmode)
@@ -57,31 +158,22 @@ LockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(&tag, relation->rd_istemp, lockmode, false, false);
+ res = LockAcquire(&tag, lockmode, false, false);
/*
- * Check to see if the relcache entry has been invalidated while we were
- * waiting to lock it. If so, rebuild it, or ereport() trying. Increment
- * the refcount to ensure that RelationFlushRelation will rebuild it and
- * not just delete it. We can skip this if the lock was already held,
- * however.
+ * Now that we have the lock, check for invalidation messages; see
+ * notes in LockRelationOid.
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
- {
- RelationIncrementReferenceCount(relation);
AcceptInvalidationMessages();
- RelationDecrementReferenceCount(relation);
- }
}
/*
* ConditionalLockRelation
*
- * As above, but only lock if we can get the lock without blocking.
- * Returns TRUE iff the lock was acquired.
- *
- * NOTE: we do not currently need conditional versions of all the
- * LockXXX routines in this file, but they could easily be added if needed.
+ * This is a convenience routine for acquiring an additional lock on an
+ * already-open relation. Never try to do "relation_open(foo, NoLock)"
+ * and then lock with this.
*/
bool
ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
@@ -93,30 +185,26 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(&tag, relation->rd_istemp, lockmode, false, true);
+ res = LockAcquire(&tag, lockmode, false, true);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
/*
- * Check to see if the relcache entry has been invalidated while we were
- * waiting to lock it. If so, rebuild it, or ereport() trying. Increment
- * the refcount to ensure that RelationFlushRelation will rebuild it and
- * not just delete it. We can skip this if the lock was already held,
- * however.
+ * Now that we have the lock, check for invalidation messages; see
+ * notes in LockRelationOid.
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
- {
- RelationIncrementReferenceCount(relation);
AcceptInvalidationMessages();
- RelationDecrementReferenceCount(relation);
- }
return true;
}
/*
* UnlockRelation
+ *
+ * This is a convenience routine for unlocking a relation without also
+ * closing it.
*/
void
UnlockRelation(Relation relation, LOCKMODE lockmode)
@@ -131,11 +219,11 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
}
/*
- * LockRelationForSession
+ * LockRelationIdForSession
*
* This routine grabs a session-level lock on the target relation. The
* session lock persists across transaction boundaries. It will be removed
- * when UnlockRelationForSession() is called, or if an ereport(ERROR) occurs,
+ * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
* or if the backend exits.
*
* Note that one should also grab a transaction-level lock on the rel
@@ -143,20 +231,20 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
* relcache entry is up to date.
*/
void
-LockRelationForSession(LockRelId *relid, bool istemprel, LOCKMODE lockmode)
+LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
- (void) LockAcquire(&tag, istemprel, lockmode, true, false);
+ (void) LockAcquire(&tag, lockmode, true, false);
}
/*
- * UnlockRelationForSession
+ * UnlockRelationIdForSession
*/
void
-UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
+UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
{
LOCKTAG tag;
@@ -184,7 +272,7 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- (void) LockAcquire(&tag, relation->rd_istemp, lockmode, false, false);
+ (void) LockAcquire(&tag, lockmode, false, false);
}
/*
@@ -218,7 +306,7 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
- (void) LockAcquire(&tag, relation->rd_istemp, lockmode, false, false);
+ (void) LockAcquire(&tag, lockmode, false, false);
}
/*
@@ -237,8 +325,7 @@ ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
- return (LockAcquire(&tag, relation->rd_istemp,
- lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+ return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
}
/*
@@ -275,7 +362,7 @@ LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
- (void) LockAcquire(&tag, relation->rd_istemp, lockmode, false, false);
+ (void) LockAcquire(&tag, lockmode, false, false);
}
/*
@@ -295,8 +382,7 @@ ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
- return (LockAcquire(&tag, relation->rd_istemp,
- lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+ return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
}
/*
@@ -330,7 +416,7 @@ XactLockTableInsert(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
- (void) LockAcquire(&tag, false, ExclusiveLock, false, false);
+ (void) LockAcquire(&tag, ExclusiveLock, false, false);
}
/*
@@ -375,7 +461,7 @@ XactLockTableWait(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
- (void) LockAcquire(&tag, false, ShareLock, false, false);
+ (void) LockAcquire(&tag, ShareLock, false, false);
LockRelease(&tag, ShareLock, false);
@@ -403,8 +489,7 @@ ConditionalXactLockTableWait(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
- if (LockAcquire(&tag, false,
- ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
+ if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
return false;
LockRelease(&tag, ShareLock, false);
@@ -423,9 +508,7 @@ ConditionalXactLockTableWait(TransactionId xid)
* Obtain a lock on a general object of the current database. Don't use
* this for shared objects (such as tablespaces). It's unwise to apply it
* to relations, also, since a lock taken this way will NOT conflict with
- * LockRelation, and also may be wrongly marked if the relation is temp.
- * (If we ever invent temp objects that aren't tables, we'll want to extend
- * the API of this routine to include an isTempObject flag.)
+ * locks taken via LockRelation and friends.
*/
void
LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
@@ -439,7 +522,7 @@ LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
- (void) LockAcquire(&tag, false, lockmode, false, false);
+ (void) LockAcquire(&tag, lockmode, false, false);
}
/*
@@ -477,7 +560,7 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
- (void) LockAcquire(&tag, false, lockmode, false, false);
+ (void) LockAcquire(&tag, lockmode, false, false);
/* Make sure syscaches are up-to-date with any changes we waited for */
AcceptInvalidationMessages();
@@ -500,3 +583,39 @@ UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
LockRelease(&tag, lockmode, false);
}
+
+
+/*
+ * LockTagIsTemp
+ * Determine whether a locktag is for a lock on a temporary object
+ *
+ * We need this because 2PC cannot deal with temp objects
+ */
+bool
+LockTagIsTemp(const LOCKTAG *tag)
+{
+ switch (tag->locktag_type)
+ {
+ case LOCKTAG_RELATION:
+ case LOCKTAG_RELATION_EXTEND:
+ case LOCKTAG_PAGE:
+ case LOCKTAG_TUPLE:
+ /* check for lock on a temp relation */
+ /* field1 is dboid, field2 is reloid for all of these */
+ if ((Oid) tag->locktag_field1 == InvalidOid)
+ return false; /* shared, so not temp */
+ if (isTempNamespace(get_rel_namespace((Oid) tag->locktag_field2)))
+ return true;
+ break;
+ case LOCKTAG_TRANSACTION:
+ /* there are no temp transactions */
+ break;
+ case LOCKTAG_OBJECT:
+ /* there are currently no non-table temp objects */
+ break;
+ case LOCKTAG_USERLOCK:
+ /* assume these aren't temp */
+ break;
+ }
+ return false; /* default case */
+}
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index a0bc2869c00..10049d593a0 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.169 2006/07/24 16:32:45 petere Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.170 2006/07/31 20:09:05 tgl Exp $
*
* NOTES
* A lock table is a shared memory hash table. When
@@ -36,6 +36,7 @@
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "miscadmin.h"
+#include "storage/lmgr.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
@@ -449,8 +450,6 @@ ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
*
* Inputs:
* locktag: unique identifier for the lockable object
- * isTempObject: is the lockable object a temporary object? (Under 2PC,
- * such locks cannot be persisted)
* lockmode: lock mode to acquire
* sessionLock: if true, acquire lock for session not current transaction
* dontWait: if true, don't wait to acquire lock
@@ -471,7 +470,6 @@ ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
*/
LockAcquireResult
LockAcquire(const LOCKTAG *locktag,
- bool isTempObject,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait)
@@ -528,7 +526,6 @@ LockAcquire(const LOCKTAG *locktag,
{
locallock->lock = NULL;
locallock->proclock = NULL;
- locallock->isTempObject = isTempObject;
locallock->hashcode = LockTagHashCode(&(localtag.lock));
locallock->nLocks = 0;
locallock->numLockOwners = 0;
@@ -540,8 +537,6 @@ LockAcquire(const LOCKTAG *locktag,
}
else
{
- Assert(locallock->isTempObject == isTempObject);
-
/* Make sure there will be room to remember the lock */
if (locallock->numLockOwners >= locallock->maxLockOwners)
{
@@ -1733,7 +1728,7 @@ AtPrepare_Locks(void)
}
/* Can't handle it if the lock is on a temporary object */
- if (locallock->isTempObject)
+ if (LockTagIsTemp(&locallock->tag.lock))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has operated on temporary tables")));