diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 122 |
1 files changed, 102 insertions, 20 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 269fe143019..e3e9599fc98 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -35,6 +35,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/twophase_rmgr.h" +#include "access/xact.h" #include "access/xlog.h" #include "miscadmin.h" #include "pg_trace.h" @@ -1136,6 +1137,18 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, { uint32 partition = LockHashPartition(hashcode); + /* + * It might seem unsafe to access proclock->groupLeader without a lock, + * but it's not really. Either we are initializing a proclock on our + * own behalf, in which case our group leader isn't changing because + * the group leader for a process can only ever be changed by the + * process itself; or else we are transferring a fast-path lock to the + * main lock table, in which case that process can't change it's lock + * group leader without first releasing all of its locks (and in + * particular the one we are currently transferring). + */ + proclock->groupLeader = proc->lockGroupLeader != NULL ? + proc->lockGroupLeader : proc; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ @@ -1255,9 +1268,10 @@ RemoveLocalLock(LOCALLOCK *locallock) * NOTES: * Here's what makes this complicated: one process's locks don't * conflict with one another, no matter what purpose they are held for - * (eg, session and transaction locks do not conflict). - * So, we must subtract off our own locks when determining whether the - * requested new lock conflicts with those already held. + * (eg, session and transaction locks do not conflict). Nor do the locks + * of one process in a lock group conflict with those of another process in + * the same group. So, we must subtract off these locks when determining + * whether the requested new lock conflicts with those already held. */ int LockCheckConflicts(LockMethod lockMethodTable, @@ -1267,8 +1281,12 @@ LockCheckConflicts(LockMethod lockMethodTable, { int numLockModes = lockMethodTable->numLockModes; LOCKMASK myLocks; - LOCKMASK otherLocks; + int conflictMask = lockMethodTable->conflictTab[lockmode]; + int conflictsRemaining[MAX_LOCKMODES]; + int totalConflictsRemaining = 0; int i; + SHM_QUEUE *procLocks; + PROCLOCK *otherproclock; /* * first check for global conflicts: If no locks conflict with my request, @@ -1279,40 +1297,91 @@ LockCheckConflicts(LockMethod lockMethodTable, * type of lock that conflicts with request. Bitwise compare tells if * there is a conflict. */ - if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask)) + if (!(conflictMask & lock->grantMask)) { PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return STATUS_OK; } /* - * Rats. Something conflicts. But it could still be my own lock. We have - * to construct a conflict mask that does not reflect our own locks, but - * only lock types held by other processes. + * Rats. Something conflicts. But it could still be my own lock, or + * a lock held by another member of my locking group. First, figure out + * how many conflicts remain after subtracting out any locks I hold + * myself. */ myLocks = proclock->holdMask; - otherLocks = 0; for (i = 1; i <= numLockModes; i++) { - int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0; + if ((conflictMask & LOCKBIT_ON(i)) == 0) + { + conflictsRemaining[i] = 0; + continue; + } + conflictsRemaining[i] = lock->granted[i]; + if (myLocks & LOCKBIT_ON(i)) + --conflictsRemaining[i]; + totalConflictsRemaining += conflictsRemaining[i]; + } - if (lock->granted[i] > myHolding) - otherLocks |= LOCKBIT_ON(i); + /* If no conflicts remain, we get the lock. */ + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock); + return STATUS_OK; + } + + /* If no group locking, it's definitely a conflict. */ + if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL) + { + Assert(proclock->tag.myProc == MyProc); + PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)", + proclock); + return STATUS_FOUND; } /* - * now check again for conflicts. 'otherLocks' describes the types of - * locks held by other processes. If one of these conflicts with the kind - * of lock that I want, there is a conflict and I have to sleep. + * Locks held in conflicting modes by members of our own lock group are + * not real conflicts; we can subtract those out and see if we still have + * a conflict. This is O(N) in the number of processes holding or awaiting + * locks on this object. We could improve that by making the shared memory + * state more complex (and larger) but it doesn't seem worth it. */ - if (!(lockMethodTable->conflictTab[lockmode] & otherLocks)) + procLocks = &(lock->procLocks); + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); + while (otherproclock != NULL) { - /* no conflict. OK to get the lock */ - PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); - return STATUS_OK; + if (proclock != otherproclock && + proclock->groupLeader == otherproclock->groupLeader && + (otherproclock->holdMask & conflictMask) != 0) + { + int intersectMask = otherproclock->holdMask & conflictMask; + + for (i = 1; i <= numLockModes; i++) + { + if ((intersectMask & LOCKBIT_ON(i)) != 0) + { + if (conflictsRemaining[i] <= 0) + elog(PANIC, "proclocks held do not match lock"); + conflictsRemaining[i]--; + totalConflictsRemaining--; + } + } + + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (group)", + proclock); + return STATUS_OK; + } + } + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, &otherproclock->lockLink, + offsetof(PROCLOCK, lockLink)); } - PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock); + /* Nope, it's a real conflict. */ + PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock); return STATUS_FOUND; } @@ -3095,6 +3164,10 @@ PostPrepare_Locks(TransactionId xid) PROCLOCKTAG proclocktag; int partition; + /* Can't prepare a lock group follower. */ + Assert(MyProc->lockGroupLeader == NULL || + MyProc->lockGroupLeader == MyProc); + /* This is a critical section: any error means big trouble */ START_CRIT_SECTION(); @@ -3239,6 +3312,13 @@ PostPrepare_Locks(TransactionId xid) proclocktag.myProc = newproc; /* + * Update groupLeader pointer to point to the new proc. (We'd + * better not be a member of somebody else's lock group!) + */ + Assert(proclock->groupLeader == proclock->tag.myProc); + proclock->groupLeader = newproc; + + /* * Update the proclock. We should not find any existing entry for * the same hash key, since there can be only one entry for any * given lock with my own proc. @@ -3785,6 +3865,8 @@ lock_twophase_recover(TransactionId xid, uint16 info, */ if (!found) { + Assert(proc->lockGroupLeader == NULL); + proclock->groupLeader = proc; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ |