diff options
-rw-r--r-- | src/backend/commands/indexcmds.c | 42 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 100 | ||||
-rw-r--r-- | src/include/storage/lock.h | 19 |
3 files changed, 129 insertions, 32 deletions
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index e96235c6f53..81246768bc9 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.147 2006/08/25 04:06:48 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.148 2006/08/27 19:14:34 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -119,8 +119,11 @@ DefineIndex(RangeVar *heapRelation, Datum reloptions; IndexInfo *indexInfo; int numberOfAttributes; + List *old_xact_list; + ListCell *lc; uint32 ixcnt; LockRelId heaprelid; + LOCKTAG heaplocktag; Snapshot snapshot; Relation pg_index; HeapTuple indexTuple; @@ -466,33 +469,26 @@ DefineIndex(RangeVar *heapRelation, CommitTransactionCommand(); StartTransactionCommand(); - /* Establish transaction snapshot ... else GetLatestSnapshot complains */ - (void) GetTransactionSnapshot(); - /* * Now we must wait until no running transaction could have the table open - * with the old list of indexes. If we can take an exclusive lock then - * there are none now and anybody who opens it later will get the new - * index in their relcache entry. Alternatively, if our Xmin reaches our - * own (new) transaction then we know no transactions that started before - * the index was visible are left anyway. + * with the old list of indexes. To do this, inquire which xacts currently + * would conflict with ShareLock on the table -- ie, which ones have + * a lock that permits writing the table. Then wait for each of these + * xacts to commit or abort. Note we do not need to worry about xacts + * that open the table for writing after this point; they will see the + * new index when they open it. + * + * Note: GetLockConflicts() never reports our own xid, + * hence we need not check for that. */ - for (;;) - { - CHECK_FOR_INTERRUPTS(); + SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId); + old_xact_list = GetLockConflicts(&heaplocktag, ShareLock); - if (ConditionalLockRelationOid(relationId, ExclusiveLock)) - { - /* Release the lock right away to avoid blocking anyone */ - UnlockRelationOid(relationId, ExclusiveLock); - break; - } - - if (TransactionIdEquals(GetLatestSnapshot()->xmin, - GetTopTransactionId())) - break; + foreach(lc, old_xact_list) + { + TransactionId xid = lfirst_xid(lc); - pg_usleep(1000000L); /* 1 sec */ + XactLockTableWait(xid); } /* diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 7fe011a2baf..c59999e20ca 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.171 2006/08/19 01:36:28 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.172 2006/08/27 19:14:34 tgl Exp $ * * NOTES * A lock table is a shared memory hash table. When @@ -1686,6 +1686,104 @@ LockReassignCurrentOwner(void) /* + * GetLockConflicts + * Get a list of TransactionIds of xacts currently holding locks + * that would conflict with the specified lock/lockmode. + * xacts merely awaiting such a lock are NOT reported. + * + * Of course, the result could be out of date by the time it's returned, + * so use of this function has to be thought about carefully. + * + * Only top-level XIDs are reported. Note we never include the current xact + * in the result list, since an xact never blocks itself. + */ +List * +GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode) +{ + List *result = NIL; + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LockMethod lockMethodTable; + LOCK *lock; + LOCKMASK conflictMask; + SHM_QUEUE *procLocks; + PROCLOCK *proclock; + uint32 hashcode; + LWLockId partitionLock; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) + elog(ERROR, "unrecognized lock mode: %d", lockmode); + + /* + * Look up the lock object matching the tag. + */ + hashcode = LockTagHashCode(locktag); + partitionLock = LockHashPartitionLock(hashcode); + + LWLockAcquire(partitionLock, LW_SHARED); + + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (void *) locktag, + hashcode, + HASH_FIND, + NULL); + if (!lock) + { + /* + * If the lock object doesn't exist, there is nothing holding a + * lock on this lockable object. + */ + LWLockRelease(partitionLock); + return NIL; + } + + /* + * Examine each existing holder (or awaiter) of the lock. + */ + conflictMask = lockMethodTable->conflictTab[lockmode]; + + procLocks = &(lock->procLocks); + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + + while (proclock) + { + if (conflictMask & proclock->holdMask) + { + PGPROC *proc = proclock->tag.myProc; + + /* A backend never blocks itself */ + if (proc != MyProc) + { + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId xid = proc->xid; + + /* + * Race condition: during xact commit/abort we zero out + * PGPROC's xid before we mark its locks released. If we see + * zero in the xid field, assume the xact is in process of + * shutting down and act as though the lock is already + * released. + */ + if (TransactionIdIsValid(xid)) + result = lappend_xid(result, xid); + } + } + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + LWLockRelease(partitionLock); + + return result; +} + + +/* * AtPrepare_Locks * Do the preparatory work for a PREPARE: make 2PC state file records * for all locks currently held. diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 208b4a93ccc..120fd4f3ca4 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -7,13 +7,14 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.97 2006/07/31 20:09:05 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.98 2006/08/27 19:14:34 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef LOCK_H_ #define LOCK_H_ +#include "nodes/pg_list.h" #include "storage/itemptr.h" #include "storage/lwlock.h" #include "storage/shmem.h" @@ -412,6 +413,7 @@ extern bool LockRelease(const LOCKTAG *locktag, extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); extern void LockReleaseCurrentOwner(void); extern void LockReassignCurrentOwner(void); +extern List *GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode); extern void AtPrepare_Locks(void); extern void PostPrepare_Locks(TransactionId xid); extern int LockCheckConflicts(LockMethod lockMethodTable, @@ -421,13 +423,6 @@ extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode); extern void GrantAwaitedLock(void); extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode); extern Size LockShmemSize(void); -extern bool DeadLockCheck(PGPROC *proc); -extern void DeadLockReport(void); -extern void RememberSimpleDeadLock(PGPROC *proc1, - LOCKMODE lockmode, - LOCK *lock, - PGPROC *proc2); -extern void InitDeadLockChecking(void); extern LockData *GetLockStatusData(void); extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode); @@ -438,6 +433,14 @@ extern void lock_twophase_postcommit(TransactionId xid, uint16 info, extern void lock_twophase_postabort(TransactionId xid, uint16 info, void *recdata, uint32 len); +extern bool DeadLockCheck(PGPROC *proc); +extern void DeadLockReport(void); +extern void RememberSimpleDeadLock(PGPROC *proc1, + LOCKMODE lockmode, + LOCK *lock, + PGPROC *proc2); +extern void InitDeadLockChecking(void); + #ifdef LOCK_DEBUG extern void DumpLocks(PGPROC *proc); extern void DumpAllLocks(void); |