aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r--src/backend/storage/lmgr/lock.c122
1 files changed, 102 insertions, 20 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 269fe143019..e3e9599fc98 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -35,6 +35,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
+#include "access/xact.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -1136,6 +1137,18 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
{
uint32 partition = LockHashPartition(hashcode);
+ /*
+ * It might seem unsafe to access proclock->groupLeader without a lock,
+ * but it's not really. Either we are initializing a proclock on our
+ * own behalf, in which case our group leader isn't changing because
+ * the group leader for a process can only ever be changed by the
+ * process itself; or else we are transferring a fast-path lock to the
+ * main lock table, in which case that process can't change it's lock
+ * group leader without first releasing all of its locks (and in
+ * particular the one we are currently transferring).
+ */
+ proclock->groupLeader = proc->lockGroupLeader != NULL ?
+ proc->lockGroupLeader : proc;
proclock->holdMask = 0;
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
@@ -1255,9 +1268,10 @@ RemoveLocalLock(LOCALLOCK *locallock)
* NOTES:
* Here's what makes this complicated: one process's locks don't
* conflict with one another, no matter what purpose they are held for
- * (eg, session and transaction locks do not conflict).
- * So, we must subtract off our own locks when determining whether the
- * requested new lock conflicts with those already held.
+ * (eg, session and transaction locks do not conflict). Nor do the locks
+ * of one process in a lock group conflict with those of another process in
+ * the same group. So, we must subtract off these locks when determining
+ * whether the requested new lock conflicts with those already held.
*/
int
LockCheckConflicts(LockMethod lockMethodTable,
@@ -1267,8 +1281,12 @@ LockCheckConflicts(LockMethod lockMethodTable,
{
int numLockModes = lockMethodTable->numLockModes;
LOCKMASK myLocks;
- LOCKMASK otherLocks;
+ int conflictMask = lockMethodTable->conflictTab[lockmode];
+ int conflictsRemaining[MAX_LOCKMODES];
+ int totalConflictsRemaining = 0;
int i;
+ SHM_QUEUE *procLocks;
+ PROCLOCK *otherproclock;
/*
* first check for global conflicts: If no locks conflict with my request,
@@ -1279,40 +1297,91 @@ LockCheckConflicts(LockMethod lockMethodTable,
* type of lock that conflicts with request. Bitwise compare tells if
* there is a conflict.
*/
- if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
+ if (!(conflictMask & lock->grantMask))
{
PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
return STATUS_OK;
}
/*
- * Rats. Something conflicts. But it could still be my own lock. We have
- * to construct a conflict mask that does not reflect our own locks, but
- * only lock types held by other processes.
+ * Rats. Something conflicts. But it could still be my own lock, or
+ * a lock held by another member of my locking group. First, figure out
+ * how many conflicts remain after subtracting out any locks I hold
+ * myself.
*/
myLocks = proclock->holdMask;
- otherLocks = 0;
for (i = 1; i <= numLockModes; i++)
{
- int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
+ if ((conflictMask & LOCKBIT_ON(i)) == 0)
+ {
+ conflictsRemaining[i] = 0;
+ continue;
+ }
+ conflictsRemaining[i] = lock->granted[i];
+ if (myLocks & LOCKBIT_ON(i))
+ --conflictsRemaining[i];
+ totalConflictsRemaining += conflictsRemaining[i];
+ }
- if (lock->granted[i] > myHolding)
- otherLocks |= LOCKBIT_ON(i);
+ /* If no conflicts remain, we get the lock. */
+ if (totalConflictsRemaining == 0)
+ {
+ PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
+ return STATUS_OK;
+ }
+
+ /* If no group locking, it's definitely a conflict. */
+ if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
+ {
+ Assert(proclock->tag.myProc == MyProc);
+ PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
+ proclock);
+ return STATUS_FOUND;
}
/*
- * now check again for conflicts. 'otherLocks' describes the types of
- * locks held by other processes. If one of these conflicts with the kind
- * of lock that I want, there is a conflict and I have to sleep.
+ * Locks held in conflicting modes by members of our own lock group are
+ * not real conflicts; we can subtract those out and see if we still have
+ * a conflict. This is O(N) in the number of processes holding or awaiting
+ * locks on this object. We could improve that by making the shared memory
+ * state more complex (and larger) but it doesn't seem worth it.
*/
- if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
+ procLocks = &(lock->procLocks);
+ otherproclock = (PROCLOCK *)
+ SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
+ while (otherproclock != NULL)
{
- /* no conflict. OK to get the lock */
- PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
- return STATUS_OK;
+ if (proclock != otherproclock &&
+ proclock->groupLeader == otherproclock->groupLeader &&
+ (otherproclock->holdMask & conflictMask) != 0)
+ {
+ int intersectMask = otherproclock->holdMask & conflictMask;
+
+ for (i = 1; i <= numLockModes; i++)
+ {
+ if ((intersectMask & LOCKBIT_ON(i)) != 0)
+ {
+ if (conflictsRemaining[i] <= 0)
+ elog(PANIC, "proclocks held do not match lock");
+ conflictsRemaining[i]--;
+ totalConflictsRemaining--;
+ }
+ }
+
+ if (totalConflictsRemaining == 0)
+ {
+ PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
+ proclock);
+ return STATUS_OK;
+ }
+ }
+ otherproclock = (PROCLOCK *)
+ SHMQueueNext(procLocks, &otherproclock->lockLink,
+ offsetof(PROCLOCK, lockLink));
}
- PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
+ /* Nope, it's a real conflict. */
+ PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
return STATUS_FOUND;
}
@@ -3095,6 +3164,10 @@ PostPrepare_Locks(TransactionId xid)
PROCLOCKTAG proclocktag;
int partition;
+ /* Can't prepare a lock group follower. */
+ Assert(MyProc->lockGroupLeader == NULL ||
+ MyProc->lockGroupLeader == MyProc);
+
/* This is a critical section: any error means big trouble */
START_CRIT_SECTION();
@@ -3239,6 +3312,13 @@ PostPrepare_Locks(TransactionId xid)
proclocktag.myProc = newproc;
/*
+ * Update groupLeader pointer to point to the new proc. (We'd
+ * better not be a member of somebody else's lock group!)
+ */
+ Assert(proclock->groupLeader == proclock->tag.myProc);
+ proclock->groupLeader = newproc;
+
+ /*
* Update the proclock. We should not find any existing entry for
* the same hash key, since there can be only one entry for any
* given lock with my own proc.
@@ -3785,6 +3865,8 @@ lock_twophase_recover(TransactionId xid, uint16 info,
*/
if (!found)
{
+ Assert(proc->lockGroupLeader == NULL);
+ proclock->groupLeader = proc;
proclock->holdMask = 0;
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */