diff options
Diffstat (limited to 'src/backend/storage/lmgr/proc.c')
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 128 |
1 files changed, 73 insertions, 55 deletions
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 229a78587cb..b80d32e1b44 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -46,7 +46,7 @@ * This is so that we can support more backends. (system-wide semaphore * sets run out pretty fast.) -ay 4/95 * - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $ */ #include <sys/time.h> #include <unistd.h> @@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid); static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum); static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum); +static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause."; + /* * InitProcGlobal - * initializes the global process table. We put it here so that @@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue) */ int ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ - SPINLOCK spinlock, + LOCKMETHODCTL *lockctl, int token, /* lockmode */ - int prio, - LOCK *lock, - TransactionId xid) /* needed by user locks, see below */ + LOCK *lock) { int i; + SPINLOCK spinlock = lockctl->masterLock; PROC *proc; + int myMask = (1 << token); + int waitMask = lock->waitMask; + int aheadHolders[MAX_LOCKMODES]; + bool selfConflict = (lockctl->conflictTab[token] & myMask), + prevSame = false; bool deadlock_checked = false; struct itimerval timeval, dummy; - /* - * If the first entries in the waitQueue have a greater priority than - * we have, we must be a reader, and they must be a writers, and we - * must be here because the current holder is a writer or a reader but - * we don't share shared locks if a writer is waiting. We put - * ourselves after the writers. This way, we have a FIFO, but keep - * the readers together to give them decent priority, and no one - * starves. Because we group all readers together, a non-empty queue - * only has a few possible configurations: - * - * [readers] [writers] [readers][writers] [writers][readers] - * [writers][readers][writers] - * - * In a full queue, we would have a reader holding a lock, then a writer - * gets the lock, then a bunch of readers, made up of readers who - * could not share the first readlock because a writer was waiting, - * and new readers arriving while the writer had the lock. bjm - */ + MyProc->token = token; + MyProc->waitLock = lock; + proc = (PROC *) MAKE_PTR(waitQueue->links.prev); - /* If we are a reader, and they are writers, skip past them */ - for (i = 0; i < waitQueue->size && proc->prio > prio; i++) - proc = (PROC *) MAKE_PTR(proc->links.prev); + /* if we don't conflict with any waiter - be first in queue */ + if (!(lockctl->conflictTab[token] & waitMask)) + goto ins; - /* The rest of the queue is FIFO, with readers first, writers last */ - for (; i < waitQueue->size && proc->prio <= prio; i++) - proc = (PROC *) MAKE_PTR(proc->links.prev); + for (i = 1; i < MAX_LOCKMODES; i++) + aheadHolders[i] = lock->activeHolders[i]; + (aheadHolders[token])++; - MyProc->prio = prio; - MyProc->token = token; - MyProc->waitLock = lock; + for (i = 0; i < waitQueue->size; i++) + { + /* am I waiting for him ? */ + if (lockctl->conflictTab[token] & proc->holdLock) + { + /* is he waiting for me ? */ + if (lockctl->conflictTab[proc->token] & MyProc->holdLock) + { + MyProc->errType = STATUS_ERROR; + elog(NOTICE, DeadLockMessage); + goto rt; + } + /* being waiting for him - go past */ + } + /* if he waits for me */ + else if (lockctl->conflictTab[proc->token] & MyProc->holdLock) + { + break; + } + /* if conflicting locks requested */ + else if (lockctl->conflictTab[proc->token] & myMask) + { + /* + * If I request non self-conflicting lock and there + * are others requesting the same lock just before me - + * stay here. + */ + if (!selfConflict && prevSame) + break; + } + /* + * Last attempt to don't move any more: if we don't conflict + * with rest waiters in queue. + */ + else if (!(lockctl->conflictTab[token] & waitMask)) + break; -#ifdef USER_LOCKS - /* ------------------- - * Currently, we only need this for the ProcWakeup routines. - * This must be 0 for user lock, so we can't just use the value - * from GetCurrentTransactionId(). - * ------------------- - */ - TransactionIdStore(xid, &MyProc->xid); -#else -#ifndef LowLevelLocking - /* ------------------- - * currently, we only need this for the ProcWakeup routines - * ------------------- - */ - TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid); -#endif -#endif + prevSame = (proc->token == token); + (aheadHolders[proc->token])++; + if (aheadHolders[proc->token] == lock->holders[proc->token]) + waitMask &= ~ (1 << proc->token); + proc = (PROC *) MAKE_PTR(proc->links.prev); + } +ins:; /* ------------------- * assume that these two operations are atomic (because * of the spinlock). @@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ SHMQueueInsertTL(&(proc->links), &(MyProc->links)); waitQueue->size++; + lock->waitMask |= myMask; SpinRelease(spinlock); /* -------------- @@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ */ SpinAcquire(spinlock); +rt:; + #ifdef LOCK_MGR_DEBUG /* Just to get meaningful debug messages from DumpLocks() */ MyProc->waitLock = (LOCK *) NULL; @@ -655,9 +672,9 @@ int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) { PROC *proc; - int count; + int count = 0; int trace_flag; - int last_locktype = -1; + int last_locktype = 0; int queue_size = queue->size; Assert(queue->size >= 0); @@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) return STATUS_NOT_FOUND; proc = (PROC *) MAKE_PTR(queue->links.prev); - count = 0; while ((queue_size--) && (proc)) { @@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) continue; /* - * This proc conflicts with locks held by others, ignored. + * Does this proc conflict with locks held by others ? */ if (LockResolveConflicts(lockmethod, lock, @@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) proc->xid, (XIDLookupEnt *) NULL) != STATUS_OK) { + if (count != 0) + break; last_locktype = proc->token; continue; } @@ -828,7 +846,7 @@ HandleDeadLock(int sig) */ UnlockLockTable(); - elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause."); + elog(NOTICE, DeadLockMessage); return; } |