aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2001-09-29 04:02:27 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2001-09-29 04:02:27 +0000
commit499abb0c0f21cb861c5af1d49a06469f3cfcc1eb (patch)
tree0af6262d9b6d1159315e93e90e69047b959ea5f5 /src/backend/storage/buffer/bufmgr.c
parent818fb55ac49b4b20e65d9899fc1784e54e86db58 (diff)
downloadpostgresql-499abb0c0f21cb861c5af1d49a06469f3cfcc1eb.tar.gz
postgresql-499abb0c0f21cb861c5af1d49a06469f3cfcc1eb.zip
Implement new 'lightweight lock manager' that's intermediate between
existing lock manager and spinlocks: it understands exclusive vs shared lock but has few other fancy features. Replace most uses of spinlocks with lightweight locks. All remaining uses of spinlocks have very short lock hold times (a few dozen instructions), so tweak spinlock backoff code to work efficiently given this assumption. All per my proposal on pghackers 26-Sep-01.
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c327
1 files changed, 116 insertions, 211 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 89443ee160e..86c2c478f48 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.116 2001/07/06 21:04:25 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.117 2001/09/29 04:02:23 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -59,7 +59,6 @@
(*((XLogRecPtr*)MAKE_PTR((bufHdr)->data)))
-extern SPINLOCK BufMgrLock;
extern long int ReadBufferCount;
extern long int ReadLocalBufferCount;
extern long int BufferHitCount;
@@ -76,7 +75,7 @@ extern long int LocalBufferFlushCount;
*/
bool SharedBufferChanged = false;
-static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
+static void WaitIO(BufferDesc *buf);
static void StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf);
static void ContinueBufferIO(BufferDesc *buf, bool forInput);
@@ -130,7 +129,7 @@ ReadBuffer(Relation reln, BlockNumber blockNum)
/*
* ReadBufferInternal -- internal version of ReadBuffer with more options
*
- * bufferLockHeld: if true, caller already acquired the bufmgr spinlock.
+ * bufferLockHeld: if true, caller already acquired the bufmgr lock.
* (This is assumed never to be true if dealing with a local buffer!)
*/
static Buffer
@@ -179,7 +178,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
* block is not currently in memory.
*/
if (!bufferLockHeld)
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
bufHdr = BufferAlloc(reln, blockNum, &found);
if (found)
{
@@ -188,7 +187,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
}
}
- /* At this point we do NOT hold the bufmgr spinlock. */
+ /* At this point we do NOT hold the bufmgr lock. */
if (!bufHdr)
return InvalidBuffer;
@@ -208,9 +207,9 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
*/
if (!isLocalBuf)
{
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
StartBufferIO(bufHdr, false);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
}
@@ -243,7 +242,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
}
/* lock buffer manager again to update IO IN PROGRESS */
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
if (status == SM_FAIL)
{
@@ -251,7 +250,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
if (!BufTableDelete(bufHdr))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
elog(FATAL, "BufRead: buffer table broken after IO error");
}
/* remember that BufferAlloc() pinned the buffer */
@@ -274,7 +273,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
/* If anyone was waiting for IO to complete, wake them up now */
TerminateBufferIO(bufHdr);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
if (status == SM_FAIL)
return InvalidBuffer;
@@ -322,7 +321,7 @@ BufferAlloc(Relation reln,
*foundPtr = TRUE;
if (inProgress) /* confirm end of IO */
{
- WaitIO(buf, BufMgrLock);
+ WaitIO(buf);
inProgress = (buf->flags & BM_IO_IN_PROGRESS);
}
if (BUFFER_IS_BROKEN(buf))
@@ -354,7 +353,7 @@ BufferAlloc(Relation reln,
if (!(*foundPtr))
StartBufferIO(buf, true);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return buf;
}
@@ -364,7 +363,7 @@ BufferAlloc(Relation reln,
/*
* Didn't find it in the buffer pool. We'll have to initialize a new
* buffer. First, grab one from the free list. If it's dirty, flush
- * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
+ * it to disk. Remember to unlock BufMgrLock while doing the IOs.
*/
inProgress = FALSE;
for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
@@ -502,7 +501,7 @@ BufferAlloc(Relation reln,
*foundPtr = TRUE;
if (inProgress)
{
- WaitIO(buf2, BufMgrLock);
+ WaitIO(buf2);
inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
}
if (BUFFER_IS_BROKEN(buf2))
@@ -510,7 +509,7 @@ BufferAlloc(Relation reln,
if (!(*foundPtr))
StartBufferIO(buf2, true);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return buf2;
}
@@ -534,7 +533,7 @@ BufferAlloc(Relation reln,
if (!BufTableDelete(buf))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
elog(FATAL, "buffer wasn't in the buffer table");
}
@@ -542,7 +541,7 @@ BufferAlloc(Relation reln,
if (!BufTableInsert(buf))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
elog(FATAL, "Buffer in lookup table twice");
}
@@ -561,7 +560,7 @@ BufferAlloc(Relation reln,
_bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
#endif /* BMTRACE */
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return buf;
}
@@ -595,13 +594,13 @@ WriteBuffer(Buffer buffer)
SharedBufferChanged = true;
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
UnpinBuffer(bufHdr);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return TRUE;
}
@@ -625,12 +624,12 @@ WriteNoReleaseBuffer(Buffer buffer)
SharedBufferChanged = true;
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return STATUS_OK;
}
@@ -639,10 +638,10 @@ WriteNoReleaseBuffer(Buffer buffer)
#undef ReleaseAndReadBuffer
/*
* ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
- * to save a spinlock release/acquire.
+ * to save a lock release/acquire.
*
* Also, if the passed buffer is valid and already contains the desired block
- * number, we simply return it without ever acquiring the spinlock at all.
+ * number, we simply return it without ever acquiring the lock at all.
* Since the passed buffer must be pinned, it's OK to examine its block
* number without getting the lock first.
*
@@ -652,7 +651,7 @@ WriteNoReleaseBuffer(Buffer buffer)
*
* Also note: while it will work to call this routine with blockNum == P_NEW,
* it's best to avoid doing so, since that would result in calling
- * smgrnblocks() while holding the bufmgr spinlock, hence some loss of
+ * smgrnblocks() while holding the bufmgr lock, hence some loss of
* concurrency.
*/
Buffer
@@ -684,7 +683,7 @@ ReleaseAndReadBuffer(Buffer buffer,
PrivateRefCount[buffer - 1]--;
else
{
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(bufHdr);
return ReadBufferInternal(relation, blockNum, true);
}
@@ -712,12 +711,11 @@ BufferSync()
for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
{
-
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
if (!(bufHdr->flags & BM_VALID))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
continue;
}
@@ -731,7 +729,7 @@ BufferSync()
*/
if (!(bufHdr->flags & BM_DIRTY) && !(bufHdr->cntxDirty))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
continue;
}
@@ -741,11 +739,11 @@ BufferSync()
*/
if (bufHdr->flags & BM_IO_IN_PROGRESS)
{
- WaitIO(bufHdr, BufMgrLock);
+ WaitIO(bufHdr);
if (!(bufHdr->flags & BM_VALID) ||
(!(bufHdr->flags & BM_DIRTY) && !(bufHdr->cntxDirty)))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
continue;
}
}
@@ -761,7 +759,7 @@ BufferSync()
buffer = BufferDescriptorGetBuffer(bufHdr);
rnode = bufHdr->tag.rnode;
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
/*
* Try to find relation for buffer
@@ -784,10 +782,10 @@ BufferSync()
* should not be able to write it while we were busy with locking
* and log flushing because of we setted IO flag.
*/
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
Assert(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty);
bufHdr->flags &= ~BM_JUST_DIRTIED;
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
if (reln == (Relation) NULL)
{
@@ -822,7 +820,7 @@ BufferSync()
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
BufferFlushCount++;
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
bufHdr->flags &= ~BM_IO_IN_PROGRESS; /* mark IO finished */
TerminateBufferIO(bufHdr); /* Sync IO finished */
@@ -834,7 +832,7 @@ BufferSync()
if (!(bufHdr->flags & BM_JUST_DIRTIED))
bufHdr->flags &= ~BM_DIRTY;
UnpinBuffer(bufHdr);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
/* drop refcnt obtained by RelationNodeCacheGetRelation */
if (reln != (Relation) NULL)
@@ -846,24 +844,25 @@ BufferSync()
/*
* WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
*
- * Should be entered with buffer manager spinlock held; releases it before
+ * Should be entered with buffer manager lock held; releases it before
* waiting and re-acquires it afterwards.
*/
static void
-WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+WaitIO(BufferDesc *buf)
{
-
/*
* Changed to wait until there's no IO - Inoue 01/13/2000
+ *
+ * Note this is *necessary* because an error abort in the process
+ * doing I/O could release the io_in_progress_lock prematurely.
+ * See AbortBufferIO.
*/
while ((buf->flags & BM_IO_IN_PROGRESS) != 0)
{
- SpinRelease(spinlock);
- HOLD_INTERRUPTS(); /* don't want to die() holding the lock... */
- S_LOCK(&(buf->io_in_progress_lock));
- S_UNLOCK(&(buf->io_in_progress_lock));
- RESUME_INTERRUPTS();
- SpinAcquire(spinlock);
+ LWLockRelease(BufMgrLock);
+ LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
+ LWLockRelease(buf->io_in_progress_lock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
}
}
@@ -932,9 +931,9 @@ ResetBufferPool(bool isCommit)
BufferDesc *buf = &BufferDescriptors[i];
PrivateRefCount[i] = 1; /* make sure we release shared pin */
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(buf);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
Assert(PrivateRefCount[i] == 0);
}
}
@@ -1039,7 +1038,7 @@ BufferReplace(BufferDesc *bufHdr)
/* To check if block content changed while flushing. - vadim 01/17/97 */
bufHdr->flags &= ~BM_JUST_DIRTIED;
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
/*
* No need to lock buffer context - no one should be able to end
@@ -1067,7 +1066,7 @@ BufferReplace(BufferDesc *bufHdr)
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
if (status == SM_FAIL)
return FALSE;
@@ -1140,7 +1139,8 @@ DropRelationBuffers(Relation rel)
return;
}
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+
for (i = 1; i <= NBuffers; i++)
{
bufHdr = &BufferDescriptors[i - 1];
@@ -1155,7 +1155,7 @@ recheck:
*/
if (bufHdr->flags & BM_IO_IN_PROGRESS)
{
- WaitIO(bufHdr, BufMgrLock);
+ WaitIO(bufHdr);
/*
* By now, the buffer very possibly belongs to some other
@@ -1189,7 +1189,7 @@ recheck:
}
}
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
/* ---------------------------------------------------------------------
@@ -1223,7 +1223,8 @@ DropRelFileNodeBuffers(RelFileNode rnode)
}
}
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+
for (i = 1; i <= NBuffers; i++)
{
bufHdr = &BufferDescriptors[i - 1];
@@ -1238,7 +1239,7 @@ recheck:
*/
if (bufHdr->flags & BM_IO_IN_PROGRESS)
{
- WaitIO(bufHdr, BufMgrLock);
+ WaitIO(bufHdr);
/*
* By now, the buffer very possibly belongs to some other
@@ -1272,7 +1273,7 @@ recheck:
}
}
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
/* ---------------------------------------------------------------------
@@ -1292,7 +1293,8 @@ DropBuffers(Oid dbid)
int i;
BufferDesc *bufHdr;
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+
for (i = 1; i <= NBuffers; i++)
{
bufHdr = &BufferDescriptors[i - 1];
@@ -1313,7 +1315,7 @@ recheck:
*/
if (bufHdr->flags & BM_IO_IN_PROGRESS)
{
- WaitIO(bufHdr, BufMgrLock);
+ WaitIO(bufHdr);
/*
* By now, the buffer very possibly belongs to some other
@@ -1337,7 +1339,8 @@ recheck:
BufTableDelete(bufHdr);
}
}
- SpinRelease(BufMgrLock);
+
+ LWLockRelease(BufMgrLock);
}
/* -----------------------------------------------------------------
@@ -1355,7 +1358,7 @@ PrintBufferDescs()
if (IsUnderPostmaster)
{
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
for (i = 0; i < NBuffers; ++i, ++buf)
{
elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \
@@ -1365,7 +1368,7 @@ blockNum=%u, flags=0x%x, refcount=%d %ld)",
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
else
{
@@ -1386,7 +1389,7 @@ PrintPinnedBufs()
int i;
BufferDesc *buf = BufferDescriptors;
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
for (i = 0; i < NBuffers; ++i, ++buf)
{
if (PrivateRefCount[i] > 0)
@@ -1397,7 +1400,7 @@ blockNum=%u, flags=0x%x, refcount=%d %ld)",
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
/*
@@ -1514,7 +1517,8 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
return 0;
}
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+
for (i = 0; i < NBuffers; i++)
{
bufHdr = &BufferDescriptors[i];
@@ -1524,8 +1528,8 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
{
PinBuffer(bufHdr);
if (bufHdr->flags & BM_IO_IN_PROGRESS)
- WaitIO(bufHdr, BufMgrLock);
- SpinRelease(BufMgrLock);
+ WaitIO(bufHdr);
+ LWLockRelease(BufMgrLock);
/*
* Force XLOG flush for buffer' LSN
@@ -1537,16 +1541,16 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
* Now it's safe to write buffer to disk
*/
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
if (bufHdr->flags & BM_IO_IN_PROGRESS)
- WaitIO(bufHdr, BufMgrLock);
+ WaitIO(bufHdr);
if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
{
bufHdr->flags &= ~BM_JUST_DIRTIED;
StartBufferIO(bufHdr, false); /* output IO start */
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
status = smgrwrite(DEFAULT_SMGR, rel,
bufHdr->tag.blockNum,
@@ -1560,7 +1564,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
BufferFlushCount++;
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
bufHdr->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(bufHdr);
Assert(!(bufHdr->flags & BM_JUST_DIRTIED));
@@ -1578,7 +1582,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
}
if (!(bufHdr->flags & BM_FREE))
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)",
RelationGetRelationName(rel), firstDelBlock,
bufHdr->tag.blockNum,
@@ -1589,7 +1593,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
BufTableDelete(bufHdr);
}
}
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return 0;
}
@@ -1621,9 +1625,9 @@ ReleaseBuffer(Buffer buffer)
PrivateRefCount[buffer - 1]--;
else
{
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(bufHdr);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
return STATUS_OK;
@@ -1919,13 +1923,18 @@ SetBufferCommitInfoNeedsSave(Buffer buffer)
if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED))
{
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
}
+/*
+ * Release buffer context locks for shared buffers.
+ *
+ * Used to clean up after errors.
+ */
void
UnlockBuffers(void)
{
@@ -1942,36 +1951,15 @@ UnlockBuffers(void)
Assert(BufferIsValid(i + 1));
buf = &(BufferDescriptors[i]);
- HOLD_INTERRUPTS(); /* don't want to die() holding the lock... */
-
- S_LOCK(&(buf->cntx_lock));
-
- if (buflocks & BL_R_LOCK)
- {
- Assert(buf->r_locks > 0);
- (buf->r_locks)--;
- }
- if (buflocks & BL_RI_LOCK)
- {
- /*
- * Someone else could remove our RI lock when acquiring W
- * lock. This is possible if we came here from elog(ERROR)
- * from IpcSemaphore{Lock|Unlock}(WaitCLSemId). And so we
- * don't do Assert(buf->ri_lock) here.
- */
- buf->ri_lock = false;
- }
- if (buflocks & BL_W_LOCK)
- {
- Assert(buf->w_lock);
- buf->w_lock = false;
- }
+ HOLD_INTERRUPTS(); /* don't want to die() partway through... */
- S_UNLOCK(&(buf->cntx_lock));
+ /*
+ * The buffer's cntx_lock has already been released by lwlock.c.
+ */
if (buflocks & BL_PIN_COUNT_LOCK)
{
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
/*
* Don't complain if flag bit not set; it could have been reset
* but we got a cancel/die interrupt before getting the signal.
@@ -1979,7 +1967,7 @@ UnlockBuffers(void)
if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_id == MyBackendId)
buf->flags &= ~BM_PIN_COUNT_WAITER;
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
ProcCancelWaitForSignal();
}
@@ -1989,94 +1977,31 @@ UnlockBuffers(void)
}
}
-/* Max time to wait to acquire a buffer read or write lock */
-#define BUFFER_LOCK_TIMEOUT (10*60*1000000) /* 10 minutes */
-
+/*
+ * Acquire or release the cntx_lock for the buffer.
+ */
void
LockBuffer(Buffer buffer, int mode)
{
BufferDesc *buf;
- bits8 *buflock;
Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer))
return;
buf = &(BufferDescriptors[buffer - 1]);
- buflock = &(BufferLocks[buffer - 1]);
-
- HOLD_INTERRUPTS(); /* don't want to die() holding the lock... */
-
- S_LOCK(&(buf->cntx_lock));
if (mode == BUFFER_LOCK_UNLOCK)
{
- if (*buflock & BL_R_LOCK)
- {
- Assert(buf->r_locks > 0);
- Assert(!(buf->w_lock));
- Assert(!(*buflock & (BL_W_LOCK | BL_RI_LOCK)));
- (buf->r_locks)--;
- *buflock &= ~BL_R_LOCK;
- }
- else if (*buflock & BL_W_LOCK)
- {
- Assert(buf->w_lock);
- Assert(buf->r_locks == 0);
- Assert(!(*buflock & (BL_R_LOCK | BL_RI_LOCK)));
- buf->w_lock = false;
- *buflock &= ~BL_W_LOCK;
- }
- else
- {
- S_UNLOCK(&(buf->cntx_lock));
- RESUME_INTERRUPTS();
- elog(ERROR, "UNLockBuffer: buffer %d is not locked", buffer);
- }
+ LWLockRelease(buf->cntx_lock);
}
else if (mode == BUFFER_LOCK_SHARE)
{
- unsigned i = 0;
-
- Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
- while (buf->ri_lock || buf->w_lock)
- {
- S_UNLOCK(&(buf->cntx_lock));
- RESUME_INTERRUPTS();
- S_LOCK_SLEEP(&(buf->cntx_lock), i++, BUFFER_LOCK_TIMEOUT);
- HOLD_INTERRUPTS();
- S_LOCK(&(buf->cntx_lock));
- }
- (buf->r_locks)++;
- *buflock |= BL_R_LOCK;
+ LWLockAcquire(buf->cntx_lock, LW_SHARED);
}
else if (mode == BUFFER_LOCK_EXCLUSIVE)
{
- unsigned i = 0;
-
- Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
- while (buf->r_locks > 0 || buf->w_lock)
- {
- if (buf->r_locks > 3 || (*buflock & BL_RI_LOCK))
- {
-
- /*
- * Our RI lock might be removed by concurrent W lock
- * acquiring (see what we do with RI locks below when our
- * own W acquiring succeeded) and so we set RI lock again
- * if we already did this.
- */
- *buflock |= BL_RI_LOCK;
- buf->ri_lock = true;
- }
- S_UNLOCK(&(buf->cntx_lock));
- RESUME_INTERRUPTS();
- S_LOCK_SLEEP(&(buf->cntx_lock), i++, BUFFER_LOCK_TIMEOUT);
- HOLD_INTERRUPTS();
- S_LOCK(&(buf->cntx_lock));
- }
- buf->w_lock = true;
- *buflock |= BL_W_LOCK;
+ LWLockAcquire(buf->cntx_lock, LW_EXCLUSIVE);
/*
* This is not the best place to set cntxDirty flag (eg indices do
@@ -2085,27 +2010,11 @@ LockBuffer(Buffer buffer, int mode)
* changes with XLogInsert() - see comments in BufferSync().
*/
buf->cntxDirty = true;
-
- if (*buflock & BL_RI_LOCK)
- {
-
- /*
- * It's possible to remove RI locks acquired by another W
- * lockers here, but they'll take care about it.
- */
- buf->ri_lock = false;
- *buflock &= ~BL_RI_LOCK;
- }
}
else
{
- S_UNLOCK(&(buf->cntx_lock));
- RESUME_INTERRUPTS();
elog(ERROR, "LockBuffer: unknown lock mode %d", mode);
}
-
- S_UNLOCK(&(buf->cntx_lock));
- RESUME_INTERRUPTS();
}
/*
@@ -2152,25 +2061,25 @@ LockBufferForCleanup(Buffer buffer)
{
/* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- SpinAcquire(BufMgrLock);
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
Assert(bufHdr->refcount > 0);
if (bufHdr->refcount == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
return;
}
/* Failed, so mark myself as waiting for pincount 1 */
if (bufHdr->flags & BM_PIN_COUNT_WAITER)
{
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "Multiple backends attempting to wait for pincount 1");
}
bufHdr->wait_backend_id = MyBackendId;
bufHdr->flags |= BM_PIN_COUNT_WAITER;
*buflock |= BL_PIN_COUNT_LOCK;
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Wait to be signaled by UnpinBuffer() */
ProcWaitForSignal();
@@ -2183,8 +2092,7 @@ LockBufferForCleanup(Buffer buffer)
* Functions for IO error handling
*
* Note : We assume that nested buffer IO never occur.
- * i.e at most one io_in_progress spinlock is held
- * per proc.
+ * i.e at most one io_in_progress lock is held per proc.
*/
static BufferDesc *InProgressBuf = (BufferDesc *) NULL;
static bool IsForInput;
@@ -2207,18 +2115,7 @@ StartBufferIO(BufferDesc *buf, bool forInput)
Assert(!(buf->flags & BM_IO_IN_PROGRESS));
buf->flags |= BM_IO_IN_PROGRESS;
- /*
- * There used to be
- *
- * Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
- *
- * here, but that's wrong because of the way WaitIO works: someone else
- * waiting for the I/O to complete will succeed in grabbing the lock
- * for a few instructions, and if we context-swap back to here the
- * Assert could fail. Tiny window for failure, but I've seen it
- * happen -- tgl
- */
- S_LOCK(&(buf->io_in_progress_lock));
+ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
InProgressBuf = buf;
IsForInput = forInput;
@@ -2238,7 +2135,7 @@ static void
TerminateBufferIO(BufferDesc *buf)
{
Assert(buf == InProgressBuf);
- S_UNLOCK(&(buf->io_in_progress_lock));
+ LWLockRelease(buf->io_in_progress_lock);
InProgressBuf = (BufferDesc *) 0;
}
@@ -2271,7 +2168,6 @@ InitBufferIO(void)
/*
* Clean up any active buffer I/O after an error.
- * This function is called from ProcReleaseSpins().
* BufMgrLock isn't held when this function is called.
*
* If I/O was in progress, we always set BM_IO_ERROR.
@@ -2283,7 +2179,16 @@ AbortBufferIO(void)
if (buf)
{
- SpinAcquire(BufMgrLock);
+ /*
+ * Since LWLockReleaseAll has already been called,
+ * we're not holding the buffer's io_in_progress_lock.
+ * We have to re-acquire it so that we can use TerminateBufferIO.
+ * Anyone who's executing WaitIO on the buffer will be in a busy spin
+ * until we succeed in doing this.
+ */
+ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
+
+ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
Assert(buf->flags & BM_IO_IN_PROGRESS);
if (IsForInput)
Assert(!(buf->flags & BM_DIRTY) && !(buf->cntxDirty));
@@ -2302,7 +2207,7 @@ AbortBufferIO(void)
buf->flags |= BM_IO_ERROR;
buf->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(buf);
- SpinRelease(BufMgrLock);
+ LWLockRelease(BufMgrLock);
}
}