diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 203 |
1 files changed, 105 insertions, 98 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index b3c630b79c0..912a25ff229 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.87 2001/03/18 20:13:13 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.88 2001/03/22 03:59:46 momjian Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -40,10 +40,10 @@ #include "utils/memutils.h" #include "utils/ps_status.h" -static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, - LOCK *lock, HOLDER *holder); +static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, + LOCK *lock, HOLDER *holder); static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, - int *myHolding); + int *myHolding); static char *lock_mode_names[] = { @@ -65,40 +65,40 @@ static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual pag /*------ * The following configuration options are available for lock debugging: * - * TRACE_LOCKS -- give a bunch of output what's going on in this file - * TRACE_USERLOCKS -- same but for user locks - * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid - * (use to avoid output on system tables) - * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally - * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;) + * TRACE_LOCKS -- give a bunch of output what's going on in this file + * TRACE_USERLOCKS -- same but for user locks + * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid + * (use to avoid output on system tables) + * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally + * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;) * * Furthermore, but in storage/ipc/spin.c: - * TRACE_SPINLOCKS -- trace spinlocks (pretty useless) + * TRACE_SPINLOCKS -- trace spinlocks (pretty useless) * * Define LOCK_DEBUG at compile time to get all these enabled. * -------- */ -int Trace_lock_oidmin = BootstrapObjectIdData; -bool Trace_locks = false; -bool Trace_userlocks = false; -int Trace_lock_table = 0; -bool Debug_deadlocks = false; +int Trace_lock_oidmin = BootstrapObjectIdData; +bool Trace_locks = false; +bool Trace_userlocks = false; +int Trace_lock_table = 0; +bool Debug_deadlocks = false; inline static bool -LOCK_DEBUG_ENABLED(const LOCK * lock) +LOCK_DEBUG_ENABLED(const LOCK *lock) { return - (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) - || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) - && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); + (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) + || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) + && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); } inline static void -LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type) +LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) { if (LOCK_DEBUG_ENABLED(lock)) elog(DEBUG, @@ -119,30 +119,30 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type) inline static void -HOLDER_PRINT(const char * where, const HOLDER * holderP) +HOLDER_PRINT(const char *where, const HOLDER *holderP) { if ( - (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) - || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) - && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) - ) + (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) + || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) + && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) + ) elog(DEBUG, "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d", where, MAKE_OFFSET(holderP), holderP->tag.lock, HOLDER_LOCKMETHOD(*(holderP)), holderP->tag.proc, holderP->tag.xid, - holderP->holding[1], holderP->holding[2], holderP->holding[3], - holderP->holding[4], holderP->holding[5], holderP->holding[6], + holderP->holding[1], holderP->holding[2], holderP->holding[3], + holderP->holding[4], holderP->holding[5], holderP->holding[6], holderP->holding[7], holderP->nHolding); } -#else /* not LOCK_DEBUG */ +#else /* not LOCK_DEBUG */ #define LOCK_PRINT(where, lock, type) #define HOLDER_PRINT(where, holderP) -#endif /* not LOCK_DEBUG */ +#endif /* not LOCK_DEBUG */ @@ -218,7 +218,7 @@ LockingDisabled(void) LOCKMETHODTABLE * GetLocksMethodTable(LOCK *lock) { - LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock); + LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock); Assert(lockmethod > 0 && lockmethod < NumLockMethods); return LockMethodTable[lockmethod]; @@ -258,7 +258,7 @@ LockMethodInit(LOCKMETHODTABLE *lockMethodTable, * is wasteful, in this case, but not much space is involved. * * NOTE: data structures allocated here are allocated permanently, using - * TopMemoryContext and shared memory. We don't ever release them anyway, + * TopMemoryContext and shared memory. We don't ever release them anyway, * and in normal multi-backend operation the lock table structures set up * by the postmaster are inherited by each backend, so they must be in * TopMemoryContext. @@ -304,8 +304,8 @@ LockMethodTableInit(char *tabName, SpinAcquire(LockMgrLock); /* - * allocate a control structure from shared memory or attach to it - * if it already exists. + * allocate a control structure from shared memory or attach to it if + * it already exists. * */ sprintf(shmemName, "%s (ctl)", tabName); @@ -341,8 +341,8 @@ LockMethodTableInit(char *tabName, Assert(NumLockMethods <= MAX_LOCK_METHODS); /* - * allocate a hash table for LOCK structs. This is used - * to store per-locked-object information. + * allocate a hash table for LOCK structs. This is used to store + * per-locked-object information. * */ info.keysize = SHMEM_LOCKTAB_KEYSIZE; @@ -362,8 +362,8 @@ LockMethodTableInit(char *tabName, Assert(lockMethodTable->lockHash->hash == tag_hash); /* - * allocate a hash table for HOLDER structs. This is used - * to store per-lock-holder information. + * allocate a hash table for HOLDER structs. This is used to store + * per-lock-holder information. * */ info.keysize = SHMEM_HOLDERTAB_KEYSIZE; @@ -558,7 +558,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * Create the hash key for the holder table. * */ - MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */ + MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, + * needed */ holdertag.lock = MAKE_OFFSET(lock); holdertag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &holdertag.xid); @@ -595,6 +596,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert(holder->nHolding <= lock->nGranted); #ifdef CHECK_DEADLOCK_RISK + /* * Issue warning if we already hold a lower-level lock on this * object and do not hold a lock of the requested level or higher. @@ -602,12 +604,13 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * a deadlock if another backend were following the same code path * at about the same time). * - * This is not enabled by default, because it may generate log entries - * about user-level coding practices that are in fact safe in context. - * It can be enabled to help find system-level problems. + * This is not enabled by default, because it may generate log + * entries about user-level coding practices that are in fact safe + * in context. It can be enabled to help find system-level + * problems. * - * XXX Doing numeric comparison on the lockmodes is a hack; - * it'd be better to use a table. For now, though, this works. + * XXX Doing numeric comparison on the lockmodes is a hack; it'd be + * better to use a table. For now, though, this works. */ for (i = lockMethodTable->ctl->numLockModes; i > 0; i--) { @@ -618,17 +621,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, elog(DEBUG, "Deadlock risk: raising lock level" " from %s to %s on object %u/%u/%u", lock_mode_names[i], lock_mode_names[lockmode], - lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); + lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); break; } } -#endif /* CHECK_DEADLOCK_RISK */ +#endif /* CHECK_DEADLOCK_RISK */ } /* * lock->nRequested and lock->requested[] count the total number of - * requests, whether granted or waiting, so increment those immediately. - * The other counts don't increment till we get the lock. + * requests, whether granted or waiting, so increment those + * immediately. The other counts don't increment till we get the lock. * */ lock->nRequested++; @@ -636,8 +639,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* - * If I already hold one or more locks of the requested type, - * just grant myself another one without blocking. + * If I already hold one or more locks of the requested type, just + * grant myself another one without blocking. * */ if (holder->holding[lockmode] > 0) @@ -649,8 +652,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* - * If this process (under any XID) is a holder of the lock, - * also grant myself another one without blocking. + * If this process (under any XID) is a holder of the lock, also grant + * myself another one without blocking. * */ LockCountMyLocks(holder->tag.lock, MyProc, myHolding); @@ -663,9 +666,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* - * If lock requested conflicts with locks requested by waiters, - * must join wait queue. Otherwise, check for conflict with - * already-held locks. (That's last because most complex check.) + * If lock requested conflicts with locks requested by waiters, must + * join wait queue. Otherwise, check for conflict with already-held + * locks. (That's last because most complex check.) * */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) @@ -711,7 +714,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, SpinRelease(masterLock); return FALSE; } -#endif /* USER_LOCKS */ +#endif /* USER_LOCKS */ /* * Construct bitmask of locks this process holds on this object. @@ -737,8 +740,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * NOTE: do not do any material change of state between here and - * return. All required changes in locktable state must have been - * done when the lock was granted to us --- see notes in WaitOnLock. + * return. All required changes in locktable state must have been + * done when the lock was granted to us --- see notes in + * WaitOnLock. */ /* @@ -795,13 +799,13 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, int localHolding[MAX_LOCKMODES]; /* - * first check for global conflicts: If no locks conflict - * with my request, then I get the lock. + * first check for global conflicts: If no locks conflict with my + * request, then I get the lock. * * Checking for conflict: lock->grantMask represents the types of - * currently held locks. conflictTable[lockmode] has a bit - * set for each type of lock that conflicts with request. Bitwise - * compare tells if there is a conflict. + * currently held locks. conflictTable[lockmode] has a bit set for + * each type of lock that conflicts with request. Bitwise compare + * tells if there is a conflict. * */ if (!(lockctl->conflictTab[lockmode] & lock->grantMask)) @@ -811,10 +815,10 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, } /* - * Rats. Something conflicts. But it could still be my own - * lock. We have to construct a conflict mask - * that does not reflect our own locks. Locks held by the current - * process under another XID also count as "our own locks". + * Rats. Something conflicts. But it could still be my own lock. We + * have to construct a conflict mask that does not reflect our own + * locks. Locks held by the current process under another XID also + * count as "our own locks". * */ if (myHolding == NULL) @@ -834,10 +838,9 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, } /* - * now check again for conflicts. 'bitmask' describes the types - * of locks held by other processes. If one of these - * conflicts with the kind of lock that I want, there is a - * conflict and I have to sleep. + * now check again for conflicts. 'bitmask' describes the types of + * locks held by other processes. If one of these conflicts with the + * kind of lock that I want, there is a conflict and I have to sleep. * */ if (!(lockctl->conflictTab[lockmode] & bitmask)) @@ -878,9 +881,7 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding) if (lockOffset == holder->tag.lock) { for (i = 1; i < MAX_LOCKMODES; i++) - { myHolding[i] += holder->holding[i]; - } } holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink, @@ -947,8 +948,8 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * Hence, after granting, the locktable state must fully reflect the * fact that we own the lock; we can't do additional work on return. * Contrariwise, if we fail, any cleanup must happen in xact abort - * processing, not here, to ensure it will also happen in the cancel/die - * case. + * processing, not here, to ensure it will also happen in the + * cancel/die case. */ if (ProcSleep(lockMethodTable, @@ -956,9 +957,10 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, lock, holder) != STATUS_OK) { + /* - * We failed as a result of a deadlock, see HandleDeadLock(). - * Quit now. Removal of the holder and lock objects, if no longer + * We failed as a result of a deadlock, see HandleDeadLock(). Quit + * now. Removal of the holder and lock objects, if no longer * needed, will happen in xact cleanup (see above for motivation). */ LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode); @@ -984,15 +986,15 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * NB: this does not remove the process' holder object, nor the lock object, * even though their counts might now have gone to zero. That will happen * during a subsequent LockReleaseAll call, which we expect will happen - * during transaction cleanup. (Removal of a proc from its wait queue by + * during transaction cleanup. (Removal of a proc from its wait queue by * this routine can only happen if we are aborting the transaction.) *-------------------- */ void RemoveFromWaitQueue(PROC *proc) { - LOCK *waitLock = proc->waitLock; - LOCKMODE lockmode = proc->waitLockMode; + LOCK *waitLock = proc->waitLock; + LOCKMODE lockmode = proc->waitLockMode; /* Make sure proc is waiting */ Assert(proc->links.next != INVALID_OFFSET); @@ -1095,7 +1097,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * Find the holder entry for this holder. */ - MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */ + MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, + * needed */ holdertag.lock = MAKE_OFFSET(lock); holdertag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &holdertag.xid); @@ -1156,11 +1159,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * We need only run ProcLockWakeup if the released lock conflicts with * at least one of the lock types requested by waiter(s). Otherwise - * whatever conflict made them wait must still exist. NOTE: before MVCC, - * we could skip wakeup if lock->granted[lockmode] was still positive. - * But that's not true anymore, because the remaining granted locks might - * belong to some waiter, who could now be awakened because he doesn't - * conflict with his own locks. + * whatever conflict made them wait must still exist. NOTE: before + * MVCC, we could skip wakeup if lock->granted[lockmode] was still + * positive. But that's not true anymore, because the remaining + * granted locks might belong to some waiter, who could now be + * awakened because he doesn't conflict with his own locks. * */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) @@ -1168,10 +1171,10 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, if (lock->nRequested == 0) { + /* - * if there's no one waiting in the queue, - * we just released the last lock on this object. - * Delete it from the lock table. + * if there's no one waiting in the queue, we just released the + * last lock on this object. Delete it from the lock table. * */ Assert(lockMethodTable->lockHash->hash == tag_hash); @@ -1197,8 +1200,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); /* - * If this was my last hold on this lock, delete my entry in the holder - * table. + * If this was my last hold on this lock, delete my entry in the + * holder table. */ if (holder->nHolding == 0) { @@ -1316,11 +1319,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); if (lock->granted[i] == 0) lock->grantMask &= BITS_OFF[i]; + /* * Read comments in LockRelease */ if (!wakeupNeeded && - lockMethodTable->ctl->conflictTab[i] & lock->waitMask) + lockMethodTable->ctl->conflictTab[i] & lock->waitMask) wakeupNeeded = true; } } @@ -1331,9 +1335,10 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, } else { + /* - * This holder accounts for all the requested locks on the object, - * so we can be lazy and just zero things out. + * This holder accounts for all the requested locks on the + * object, so we can be lazy and just zero things out. * */ lock->nRequested = 0; @@ -1371,6 +1376,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, if (lock->nRequested == 0) { + /* * We've just released the last lock, so garbage-collect the * lock object. @@ -1412,7 +1418,8 @@ LockShmemSize(int maxBackends) size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ size += maxBackends * MAXALIGN(sizeof(PROC)); /* each MyProc */ - size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each lockMethodTable->ctl */ + size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each + * lockMethodTable->ctl */ /* lockHash table */ size += hash_estimate_size(NLOCKENTS(maxBackends), @@ -1534,4 +1541,4 @@ DumpAllLocks(void) } } -#endif /* LOCK_DEBUG */ +#endif /* LOCK_DEBUG */ |