aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/proc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/proc.c')
-rw-r--r--src/backend/storage/lmgr/proc.c128
1 files changed, 73 insertions, 55 deletions
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 229a78587cb..b80d32e1b44 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
*/
#include <sys/time.h>
#include <unistd.h>
@@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid);
static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
+static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
+
/*
* InitProcGlobal -
* initializes the global process table. We put it here so that
@@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue)
*/
int
ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
- SPINLOCK spinlock,
+ LOCKMETHODCTL *lockctl,
int token, /* lockmode */
- int prio,
- LOCK *lock,
- TransactionId xid) /* needed by user locks, see below */
+ LOCK *lock)
{
int i;
+ SPINLOCK spinlock = lockctl->masterLock;
PROC *proc;
+ int myMask = (1 << token);
+ int waitMask = lock->waitMask;
+ int aheadHolders[MAX_LOCKMODES];
+ bool selfConflict = (lockctl->conflictTab[token] & myMask),
+ prevSame = false;
bool deadlock_checked = false;
struct itimerval timeval,
dummy;
- /*
- * If the first entries in the waitQueue have a greater priority than
- * we have, we must be a reader, and they must be a writers, and we
- * must be here because the current holder is a writer or a reader but
- * we don't share shared locks if a writer is waiting. We put
- * ourselves after the writers. This way, we have a FIFO, but keep
- * the readers together to give them decent priority, and no one
- * starves. Because we group all readers together, a non-empty queue
- * only has a few possible configurations:
- *
- * [readers] [writers] [readers][writers] [writers][readers]
- * [writers][readers][writers]
- *
- * In a full queue, we would have a reader holding a lock, then a writer
- * gets the lock, then a bunch of readers, made up of readers who
- * could not share the first readlock because a writer was waiting,
- * and new readers arriving while the writer had the lock. bjm
- */
+ MyProc->token = token;
+ MyProc->waitLock = lock;
+
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
- /* If we are a reader, and they are writers, skip past them */
- for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ /* if we don't conflict with any waiter - be first in queue */
+ if (!(lockctl->conflictTab[token] & waitMask))
+ goto ins;
- /* The rest of the queue is FIFO, with readers first, writers last */
- for (; i < waitQueue->size && proc->prio <= prio; i++)
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ for (i = 1; i < MAX_LOCKMODES; i++)
+ aheadHolders[i] = lock->activeHolders[i];
+ (aheadHolders[token])++;
- MyProc->prio = prio;
- MyProc->token = token;
- MyProc->waitLock = lock;
+ for (i = 0; i < waitQueue->size; i++)
+ {
+ /* am I waiting for him ? */
+ if (lockctl->conflictTab[token] & proc->holdLock)
+ {
+ /* is he waiting for me ? */
+ if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ {
+ MyProc->errType = STATUS_ERROR;
+ elog(NOTICE, DeadLockMessage);
+ goto rt;
+ }
+ /* being waiting for him - go past */
+ }
+ /* if he waits for me */
+ else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ {
+ break;
+ }
+ /* if conflicting locks requested */
+ else if (lockctl->conflictTab[proc->token] & myMask)
+ {
+ /*
+ * If I request non self-conflicting lock and there
+ * are others requesting the same lock just before me -
+ * stay here.
+ */
+ if (!selfConflict && prevSame)
+ break;
+ }
+ /*
+ * Last attempt to don't move any more: if we don't conflict
+ * with rest waiters in queue.
+ */
+ else if (!(lockctl->conflictTab[token] & waitMask))
+ break;
-#ifdef USER_LOCKS
- /* -------------------
- * Currently, we only need this for the ProcWakeup routines.
- * This must be 0 for user lock, so we can't just use the value
- * from GetCurrentTransactionId().
- * -------------------
- */
- TransactionIdStore(xid, &MyProc->xid);
-#else
-#ifndef LowLevelLocking
- /* -------------------
- * currently, we only need this for the ProcWakeup routines
- * -------------------
- */
- TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
-#endif
-#endif
+ prevSame = (proc->token == token);
+ (aheadHolders[proc->token])++;
+ if (aheadHolders[proc->token] == lock->holders[proc->token])
+ waitMask &= ~ (1 << proc->token);
+ proc = (PROC *) MAKE_PTR(proc->links.prev);
+ }
+ins:;
/* -------------------
* assume that these two operations are atomic (because
* of the spinlock).
@@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
SHMQueueInsertTL(&(proc->links), &(MyProc->links));
waitQueue->size++;
+ lock->waitMask |= myMask;
SpinRelease(spinlock);
/* --------------
@@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
*/
SpinAcquire(spinlock);
+rt:;
+
#ifdef LOCK_MGR_DEBUG
/* Just to get meaningful debug messages from DumpLocks() */
MyProc->waitLock = (LOCK *) NULL;
@@ -655,9 +672,9 @@ int
ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
{
PROC *proc;
- int count;
+ int count = 0;
int trace_flag;
- int last_locktype = -1;
+ int last_locktype = 0;
int queue_size = queue->size;
Assert(queue->size >= 0);
@@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev);
- count = 0;
while ((queue_size--) && (proc))
{
@@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
continue;
/*
- * This proc conflicts with locks held by others, ignored.
+ * Does this proc conflict with locks held by others ?
*/
if (LockResolveConflicts(lockmethod,
lock,
@@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
proc->xid,
(XIDLookupEnt *) NULL) != STATUS_OK)
{
+ if (count != 0)
+ break;
last_locktype = proc->token;
continue;
}
@@ -828,7 +846,7 @@ HandleDeadLock(int sig)
*/
UnlockLockTable();
- elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
+ elog(NOTICE, DeadLockMessage);
return;
}