aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c1551
1 files changed, 811 insertions, 740 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 457d23b0e02..59dec8f9ead 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.185 2005/01/10 20:02:21 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.186 2005/03/04 20:21:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -25,7 +25,9 @@
*
* WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
*
- * BufferSync() -- flush all (or some) dirty buffers in the buffer pool.
+ * BufferSync() -- flush all dirty buffers in the buffer pool.
+ *
+ * BgBufferSync() -- flush some dirty buffers in the buffer pool.
*
* InitBufferPool() -- Init the buffer module.
*
@@ -50,16 +52,22 @@
#include "pgstat.h"
-#define BufferGetLSN(bufHdr) \
- (*((XLogRecPtr*) MAKE_PTR((bufHdr)->data)))
+/* Note: these two macros only work on shared buffers, not local ones! */
+#define BufHdrGetBlock(bufHdr) BufferBlockPointers[(bufHdr)->buf_id]
+#define BufferGetLSN(bufHdr) (*((XLogRecPtr*) BufHdrGetBlock(bufHdr)))
+
+/* Note: this macro only works on local buffers, not shared ones! */
+#define LocalBufHdrGetBlock(bufHdr) \
+ LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
-/* GUC variable */
+/* GUC variables */
bool zero_damaged_pages = false;
+double bgwriter_lru_percent = 1.0;
+double bgwriter_all_percent = 0.333;
+int bgwriter_lru_maxpages = 5;
+int bgwriter_all_maxpages = 5;
-#ifdef NOT_USED
-bool ShowPinTrace = false;
-#endif
long NDirectFileRead; /* some I/O's are direct file access.
* bypass bufmgr */
@@ -73,18 +81,18 @@ static bool IsForInput;
static BufferDesc *PinCountWaitBuf = NULL;
-static void PinBuffer(BufferDesc *buf, bool fixOwner);
-static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
+static bool PinBuffer(BufferDesc *buf);
+static void PinBuffer_Locked(BufferDesc *buf);
+static void UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK);
+static bool SyncOneBuffer(int buf_id, bool skip_pinned);
static void WaitIO(BufferDesc *buf);
-static void StartBufferIO(BufferDesc *buf, bool forInput);
-static void TerminateBufferIO(BufferDesc *buf, int err_flag);
-static void ContinueBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
+ int set_flag_bits);
static void buffer_write_error_callback(void *arg);
-static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum,
- bool bufferLockHeld);
static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
bool *foundPtr);
-static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock);
+static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
static void write_buffer(Buffer buffer, bool unpin);
@@ -106,27 +114,15 @@ static void write_buffer(Buffer buffer, bool unpin);
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
- ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
- return ReadBufferInternal(reln, blockNum, false);
-}
-
-/*
- * ReadBufferInternal -- internal version of ReadBuffer with more options
- *
- * bufferLockHeld: if true, caller already acquired the bufmgr lock.
- * (This is assumed never to be true if dealing with a local buffer!)
- *
- * The caller must have done ResourceOwnerEnlargeBuffers(CurrentResourceOwner)
- */
-static Buffer
-ReadBufferInternal(Relation reln, BlockNumber blockNum,
- bool bufferLockHeld)
-{
BufferDesc *bufHdr;
+ Block bufBlock;
bool found;
bool isExtend;
bool isLocalBuf;
+ /* Make sure we will have room to remember the buffer pin */
+ ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
isExtend = (blockNum == P_NEW);
isLocalBuf = reln->rd_istemp;
@@ -137,10 +133,11 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
if (isExtend)
blockNum = smgrnblocks(reln->rd_smgr);
+ pgstat_count_buffer_read(&reln->pgstat_info, reln);
+
if (isLocalBuf)
{
ReadLocalBufferCount++;
- pgstat_count_buffer_read(&reln->pgstat_info, reln);
bufHdr = LocalBufferAlloc(reln, blockNum, &found);
if (found)
LocalBufferHitCount++;
@@ -148,20 +145,17 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
else
{
ReadBufferCount++;
- pgstat_count_buffer_read(&reln->pgstat_info, reln);
/*
* lookup the buffer. IO_IN_PROGRESS is set if the requested
* block is not currently in memory.
*/
- if (!bufferLockHeld)
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
bufHdr = BufferAlloc(reln, blockNum, &found);
if (found)
BufferHitCount++;
}
- /* At this point we do NOT hold the bufmgr lock. */
+ /* At this point we do NOT hold any locks. */
/* if it was already in the buffer pool, we're done */
if (found)
@@ -187,20 +181,22 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
* same buffer (if it's not been recycled) but come right back here to
* try smgrextend again.
*/
- Assert(!(bufHdr->flags & BM_VALID));
+ Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */
+
+ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (isExtend)
{
/* new buffers are zero-filled */
- MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
- smgrextend(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data),
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+ smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
reln->rd_istemp);
}
else
{
- smgrread(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data));
+ smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
/* check for garbage data */
- if (!PageHeaderIsValid((PageHeader) MAKE_PTR(bufHdr->data)))
+ if (!PageHeaderIsValid((PageHeader) bufBlock))
{
/*
* During WAL recovery, the first access to any data page
@@ -215,7 +211,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
blockNum, RelationGetRelationName(reln))));
- MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+ MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
@@ -232,16 +228,8 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
}
else
{
- /* lock buffer manager again to update IO IN PROGRESS */
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
-
- /* IO Succeeded, so mark data valid */
- bufHdr->flags |= BM_VALID;
-
- /* If anyone was waiting for IO to complete, wake them up now */
- TerminateBufferIO(bufHdr, 0);
-
- LWLockRelease(BufMgrLock);
+ /* Set BM_VALID, terminate IO, and wake up any waiters */
+ TerminateBufferIO(bufHdr, false, BM_VALID);
}
if (VacuumCostActive)
@@ -263,8 +251,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
* *foundPtr is actually redundant with the buffer's BM_VALID flag, but
* we keep it for simplicity in ReadBuffer.
*
- * BufMgrLock must be held at entry. When this routine returns,
- * the BufMgrLock is guaranteed NOT to be held.
+ * No locks are held either at entry or exit.
*/
static BufferDesc *
BufferAlloc(Relation reln,
@@ -272,229 +259,343 @@ BufferAlloc(Relation reln,
bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
- BufferDesc *buf,
- *buf2;
- int cdb_found_index,
- cdb_replace_index;
- bool inProgress; /* did we already do StartBufferIO? */
+ BufferTag oldTag;
+ BufFlags oldFlags;
+ int buf_id;
+ BufferDesc *buf;
+ bool valid;
/* create a tag so we can lookup the buffer */
INIT_BUFFERTAG(newTag, reln, blockNum);
/* see if the block is in the buffer pool already */
- buf = StrategyBufferLookup(&newTag, false, &cdb_found_index);
- if (buf != NULL)
+ LWLockAcquire(BufMappingLock, LW_SHARED);
+ buf_id = BufTableLookup(&newTag);
+ if (buf_id >= 0)
{
/*
* Found it. Now, pin the buffer so no one can steal it from the
- * buffer pool, and check to see if someone else is still reading
- * data into the buffer. (Formerly, we'd always block here if
- * IO_IN_PROGRESS is set, but there's no need to wait when someone
- * is writing rather than reading.)
+ * buffer pool, and check to see if the correct data has been
+ * loaded into the buffer.
*/
- *foundPtr = TRUE;
+ buf = &BufferDescriptors[buf_id];
+
+ valid = PinBuffer(buf);
- PinBuffer(buf, true);
+ /* Can release the mapping lock as soon as we've pinned it */
+ LWLockRelease(BufMappingLock);
+
+ *foundPtr = TRUE;
- if (!(buf->flags & BM_VALID))
+ if (!valid)
{
- if (buf->flags & BM_IO_IN_PROGRESS)
- {
- /* someone else is reading it, wait for them */
- WaitIO(buf);
- }
- if (!(buf->flags & BM_VALID))
+ /*
+ * We can only get here if (a) someone else is still reading
+ * in the page, or (b) a previous read attempt failed. We
+ * have to wait for any active read attempt to finish, and
+ * then set up our own read attempt if the page is still not
+ * BM_VALID. StartBufferIO does it all.
+ */
+ if (StartBufferIO(buf, true))
{
/*
* If we get here, previous attempts to read the buffer
* must have failed ... but we shall bravely try again.
*/
*foundPtr = FALSE;
- StartBufferIO(buf, true);
}
}
- LWLockRelease(BufMgrLock);
-
return buf;
}
- *foundPtr = FALSE;
-
/*
* Didn't find it in the buffer pool. We'll have to initialize a new
- * buffer. First, grab one from the free list. If it's dirty, flush
- * it to disk. Remember to unlock BufMgrLock while doing the IO.
+ * buffer. Remember to unlock BufMappingLock while doing the work.
*/
- inProgress = FALSE;
- do
- {
- buf = StrategyGetBuffer(&cdb_replace_index);
-
- /* StrategyGetBuffer will elog if it can't find a free buffer */
- Assert(buf);
+ LWLockRelease(BufMappingLock);
+ /* Loop here in case we have to try another victim buffer */
+ for (;;)
+ {
/*
- * There should be exactly one pin on the buffer after it is
- * allocated -- ours. If it had a pin it wouldn't have been on
- * the free list. No one else could have pinned it between
- * StrategyGetBuffer and here because we have the BufMgrLock.
- *
- * (We must pin the buffer before releasing BufMgrLock ourselves,
- * to ensure StrategyGetBuffer won't give the same buffer to someone
- * else.)
+ * Select a victim buffer. The buffer is returned with its
+ * header spinlock still held! Also the BufFreelistLock is
+ * still held, since it would be bad to hold the spinlock
+ * while possibly waking up other processes.
*/
+ buf = StrategyGetBuffer();
+
Assert(buf->refcount == 0);
- buf->refcount = 1;
- PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
- ResourceOwnerRememberBuffer(CurrentResourceOwner,
- BufferDescriptorGetBuffer(buf));
+ /* Must copy buffer flags while we still hold the spinlock */
+ oldFlags = buf->flags;
- if ((buf->flags & BM_VALID) &&
- (buf->flags & BM_DIRTY || buf->cntxDirty))
- {
- /*
- * Set BM_IO_IN_PROGRESS to show the buffer is being written.
- * It cannot already be set because the buffer would be pinned
- * if someone were writing it.
- *
- * Note: it's okay to grab the io_in_progress lock while holding
- * BufMgrLock. All code paths that acquire this lock pin the
- * buffer first; since no one had it pinned (it just came off
- * the free list), no one else can have the lock.
- */
- StartBufferIO(buf, false);
+ /* Pin the buffer and then release the buffer spinlock */
+ PinBuffer_Locked(buf);
- inProgress = TRUE;
-
- /*
- * Write the buffer out, being careful to release BufMgrLock
- * while doing the I/O. We also tell FlushBuffer to share-lock
- * the buffer before releasing BufMgrLock. This is safe because
- * we know no other backend currently has the buffer pinned,
- * therefore no one can have it locked either, so we can always
- * get the lock without blocking. It is necessary because if
- * we release BufMgrLock first, it's possible for someone else
- * to pin and exclusive-lock the buffer before we get to the
- * share-lock, causing us to block. If the someone else then
- * blocks on a lock we hold, deadlock ensues. This has been
- * observed to happen when two backends are both trying to split
- * btree index pages, and the second one just happens to be
- * trying to split the page the first one got from the freelist.
- */
- FlushBuffer(buf, NULL, true);
+ /* Now it's safe to release the freelist lock */
+ LWLockRelease(BufFreelistLock);
+ /*
+ * If the buffer was dirty, try to write it out. There is a race
+ * condition here, in that someone might dirty it after we released
+ * it above, or even while we are writing it out (since our share-lock
+ * won't prevent hint-bit updates). We will recheck the dirty bit
+ * after re-locking the buffer header.
+ */
+ if (oldFlags & BM_DIRTY)
+ {
/*
- * Somebody could have allocated another buffer for the same
- * block we are about to read in. While we flush out the dirty
- * buffer, we don't hold the lock and someone could have
- * allocated another buffer for the same block. The problem is
- * we haven't yet inserted the new tag into the buffer table.
- * So we need to check here. -ay 3/95
- *
- * Another reason we have to do this is to update
- * cdb_found_index, since the CDB could have disappeared from
- * B1/B2 list while we were writing.
+ * We need a share-lock on the buffer contents to write it out
+ * (else we might write invalid data, eg because someone else
+ * is compacting the page contents while we write). We must use
+ * a conditional lock acquisition here to avoid deadlock. Even
+ * though the buffer was not pinned (and therefore surely not
+ * locked) when StrategyGetBuffer returned it, someone else could
+ * have pinned and exclusive-locked it by the time we get here.
+ * If we try to get the lock unconditionally, we'd block waiting
+ * for them; if they later block waiting for us, deadlock ensues.
+ * (This has been observed to happen when two backends are both
+ * trying to split btree index pages, and the second one just
+ * happens to be trying to split the page the first one got from
+ * StrategyGetBuffer.)
*/
- buf2 = StrategyBufferLookup(&newTag, true, &cdb_found_index);
- if (buf2 != NULL)
+ if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
+ {
+ FlushBuffer(buf, NULL);
+ LWLockRelease(buf->content_lock);
+ }
+ else
{
/*
- * Found it. Someone has already done what we were about
- * to do. We'll just handle this as if it were found in
- * the buffer pool in the first place. First, give up the
- * buffer we were planning to use.
+ * Someone else has pinned the buffer, so give it up and
+ * loop back to get another one.
*/
- TerminateBufferIO(buf, 0);
- UnpinBuffer(buf, true);
+ UnpinBuffer(buf, true, false /* evidently recently used */ );
+ continue;
+ }
+ }
- buf = buf2;
+ /*
+ * Acquire exclusive mapping lock in preparation for changing
+ * the buffer's association.
+ */
+ LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);
- /* remaining code should match code at top of routine */
+ /*
+ * Try to make a hashtable entry for the buffer under its new tag.
+ * This could fail because while we were writing someone else
+ * allocated another buffer for the same block we want to read in.
+ * Note that we have not yet removed the hashtable entry for the
+ * old tag.
+ */
+ buf_id = BufTableInsert(&newTag, buf->buf_id);
- *foundPtr = TRUE;
+ if (buf_id >= 0)
+ {
+ /*
+ * Got a collision. Someone has already done what we were about
+ * to do. We'll just handle this as if it were found in
+ * the buffer pool in the first place. First, give up the
+ * buffer we were planning to use. Don't allow it to be
+ * thrown in the free list (we don't want to hold both
+ * global locks at once).
+ */
+ UnpinBuffer(buf, true, false);
- PinBuffer(buf, true);
+ /* remaining code should match code at top of routine */
- if (!(buf->flags & BM_VALID))
- {
- if (buf->flags & BM_IO_IN_PROGRESS)
- {
- /* someone else is reading it, wait for them */
- WaitIO(buf);
- }
- if (!(buf->flags & BM_VALID))
- {
- /*
- * If we get here, previous attempts to read the
- * buffer must have failed ... but we shall
- * bravely try again.
- */
- *foundPtr = FALSE;
- StartBufferIO(buf, true);
- }
- }
+ buf = &BufferDescriptors[buf_id];
- LWLockRelease(BufMgrLock);
+ valid = PinBuffer(buf);
- return buf;
- }
+ /* Can release the mapping lock as soon as we've pinned it */
+ LWLockRelease(BufMappingLock);
- /*
- * Somebody could have pinned the buffer while we were doing
- * the I/O and had given up the BufMgrLock. If so, we can't
- * recycle this buffer --- we need to clear the I/O flags,
- * remove our pin and choose a new victim buffer. Similarly,
- * we have to start over if somebody re-dirtied the buffer.
- */
- if (buf->refcount > 1 || buf->flags & BM_DIRTY || buf->cntxDirty)
+ *foundPtr = TRUE;
+
+ if (!valid)
{
- TerminateBufferIO(buf, 0);
- UnpinBuffer(buf, true);
- inProgress = FALSE;
- buf = NULL;
+ /*
+ * We can only get here if (a) someone else is still reading
+ * in the page, or (b) a previous read attempt failed. We
+ * have to wait for any active read attempt to finish, and
+ * then set up our own read attempt if the page is still not
+ * BM_VALID. StartBufferIO does it all.
+ */
+ if (StartBufferIO(buf, true))
+ {
+ /*
+ * If we get here, previous attempts to read the buffer
+ * must have failed ... but we shall bravely try again.
+ */
+ *foundPtr = FALSE;
+ }
}
+
+ return buf;
}
- } while (buf == NULL);
- /*
- * At this point we should have the sole pin on a non-dirty buffer and
- * we may or may not already have the BM_IO_IN_PROGRESS flag set.
- */
+ /*
+ * Need to lock the buffer header too in order to change its tag.
+ */
+ LockBufHdr_NoHoldoff(buf);
+
+ /*
+ * Somebody could have pinned or re-dirtied the buffer while we were
+ * doing the I/O and making the new hashtable entry. If so, we
+ * can't recycle this buffer; we must undo everything we've done and
+ * start over with a new victim buffer.
+ */
+ if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
+ break;
+
+ UnlockBufHdr_NoHoldoff(buf);
+ BufTableDelete(&newTag);
+ LWLockRelease(BufMappingLock);
+ UnpinBuffer(buf, true, false /* evidently recently used */ );
+ }
/*
- * Tell the buffer replacement strategy that we are replacing the
- * buffer content. Then rename the buffer. Clearing BM_VALID here is
- * necessary, clearing the dirtybits is just paranoia.
+ * Okay, it's finally safe to rename the buffer.
+ *
+ * Clearing BM_VALID here is necessary, clearing the dirtybits
+ * is just paranoia. We also clear the usage_count since any
+ * recency of use of the old content is no longer relevant.
*/
- StrategyReplaceBuffer(buf, &newTag, cdb_found_index, cdb_replace_index);
+ oldTag = buf->tag;
+ oldFlags = buf->flags;
buf->tag = newTag;
buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
- buf->cntxDirty = false;
+ buf->flags |= BM_TAG_VALID;
+ buf->usage_count = 0;
+
+ UnlockBufHdr_NoHoldoff(buf);
+
+ if (oldFlags & BM_TAG_VALID)
+ BufTableDelete(&oldTag);
+
+ LWLockRelease(BufMappingLock);
/*
- * Buffer contents are currently invalid. Have to mark IO IN PROGRESS
- * so no one fiddles with them until the read completes. We may have
- * already marked it, in which case we just flip from write to read
- * status.
+ * Buffer contents are currently invalid. Try to get the io_in_progress
+ * lock. If StartBufferIO returns false, then someone else managed
+ * to read it before we did, so there's nothing left for BufferAlloc()
+ * to do.
*/
- if (!inProgress)
- StartBufferIO(buf, true);
+ if (StartBufferIO(buf, true))
+ *foundPtr = FALSE;
else
- ContinueBufferIO(buf, true);
-
- LWLockRelease(BufMgrLock);
+ *foundPtr = TRUE;
return buf;
}
/*
+ * InvalidateBuffer -- mark a shared buffer invalid and return it to the
+ * freelist.
+ *
+ * The buffer header spinlock must be held at entry. We drop it before
+ * returning. (This is sane because the caller must have locked the
+ * buffer in order to be sure it should be dropped.)
+ *
+ * This is used only in contexts such as dropping a relation. We assume
+ * that no other backend could possibly be interested in using the page,
+ * so the only reason the buffer might be pinned is if someone else is
+ * trying to write it out. We have to let them finish before we can
+ * reclaim the buffer.
+ *
+ * The buffer could get reclaimed by someone else while we are waiting
+ * to acquire the necessary locks; if so, don't mess it up.
+ */
+static void
+InvalidateBuffer(BufferDesc *buf)
+{
+ BufferTag oldTag;
+ BufFlags oldFlags;
+
+ /* Save the original buffer tag before dropping the spinlock */
+ oldTag = buf->tag;
+
+ UnlockBufHdr(buf);
+
+retry:
+ /*
+ * Acquire exclusive mapping lock in preparation for changing
+ * the buffer's association.
+ */
+ LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);
+
+ /* Re-lock the buffer header (NoHoldoff since we have an LWLock) */
+ LockBufHdr_NoHoldoff(buf);
+
+ /* If it's changed while we were waiting for lock, do nothing */
+ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
+ {
+ UnlockBufHdr_NoHoldoff(buf);
+ LWLockRelease(BufMappingLock);
+ return;
+ }
+
+ /*
+ * We assume the only reason for it to be pinned is that someone else
+ * is flushing the page out. Wait for them to finish. (This could be
+ * an infinite loop if the refcount is messed up... it would be nice
+ * to time out after awhile, but there seems no way to be sure how
+ * many loops may be needed. Note that if the other guy has pinned
+ * the buffer but not yet done StartBufferIO, WaitIO will fall through
+ * and we'll effectively be busy-looping here.)
+ */
+ if (buf->refcount != 0)
+ {
+ UnlockBufHdr_NoHoldoff(buf);
+ LWLockRelease(BufMappingLock);
+ WaitIO(buf);
+ goto retry;
+ }
+
+ /*
+ * Clear out the buffer's tag and flags. We must do this to ensure
+ * that linear scans of the buffer array don't think the buffer is valid.
+ */
+ oldFlags = buf->flags;
+ CLEAR_BUFFERTAG(buf->tag);
+ buf->flags = 0;
+ buf->usage_count = 0;
+
+ UnlockBufHdr_NoHoldoff(buf);
+
+ /*
+ * Remove the buffer from the lookup hashtable, if it was in there.
+ */
+ if (oldFlags & BM_TAG_VALID)
+ BufTableDelete(&oldTag);
+
+ /*
+ * Avoid accepting a cancel interrupt when we release the mapping lock;
+ * that would leave the buffer free but not on the freelist. (Which would
+ * not be fatal, since it'd get picked up again by the clock scanning
+ * code, but we'd rather be sure it gets to the freelist.)
+ */
+ HOLD_INTERRUPTS();
+
+ LWLockRelease(BufMappingLock);
+
+ /*
+ * Insert the buffer at the head of the list of free buffers.
+ */
+ StrategyFreeBuffer(buf, true);
+
+ RESUME_INTERRUPTS();
+}
+
+/*
* write_buffer -- common functionality for
* WriteBuffer and WriteNoReleaseBuffer
*/
static void
-write_buffer(Buffer buffer, bool release)
+write_buffer(Buffer buffer, bool unpin)
{
BufferDesc *bufHdr;
@@ -503,7 +604,7 @@ write_buffer(Buffer buffer, bool release)
if (BufferIsLocal(buffer))
{
- WriteLocalBuffer(buffer, release);
+ WriteLocalBuffer(buffer, unpin);
return;
}
@@ -511,7 +612,8 @@ write_buffer(Buffer buffer, bool release)
Assert(PrivateRefCount[buffer - 1] > 0);
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ LockBufHdr(bufHdr);
+
Assert(bufHdr->refcount > 0);
/*
@@ -522,9 +624,10 @@ write_buffer(Buffer buffer, bool release)
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
- if (release)
- UnpinBuffer(bufHdr, true);
- LWLockRelease(BufMgrLock);
+ UnlockBufHdr(bufHdr);
+
+ if (unpin)
+ UnpinBuffer(bufHdr, true, true);
}
/*
@@ -555,21 +658,16 @@ WriteNoReleaseBuffer(Buffer buffer)
/*
* ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
- * to save a lock release/acquire.
*
- * Also, if the passed buffer is valid and already contains the desired block
- * number, we simply return it without ever acquiring the lock at all.
- * Since the passed buffer must be pinned, it's OK to examine its block
- * number without getting the lock first.
+ * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
+ * compared to calling the two routines separately. Now it's mainly just
+ * a convenience function. However, if the passed buffer is valid and
+ * already contains the desired block, we just return it as-is; and that
+ * does save considerable work compared to a full release and reacquire.
*
* Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
* buffer actually needs to be released. This case is the same as ReadBuffer,
* but can save some tests in the caller.
- *
- * Also note: while it will work to call this routine with blockNum == P_NEW,
- * it's best to avoid doing so, since that would result in calling
- * smgrnblocks() while holding the bufmgr lock, hence some loss of
- * concurrency.
*/
Buffer
ReleaseAndReadBuffer(Buffer buffer,
@@ -588,235 +686,313 @@ ReleaseAndReadBuffer(Buffer buffer,
RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
return buffer;
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
- /* owner now has a free slot, so no need for Enlarge() */
LocalRefCount[-buffer - 1]--;
+ if (LocalRefCount[-buffer - 1] == 0 &&
+ bufHdr->usage_count < BM_MAX_USAGE_COUNT)
+ bufHdr->usage_count++;
}
else
{
Assert(PrivateRefCount[buffer - 1] > 0);
bufHdr = &BufferDescriptors[buffer - 1];
+ /* we have pin, so it's ok to examine tag without spinlock */
if (bufHdr->tag.blockNum == blockNum &&
RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
return buffer;
- ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
- /* owner now has a free slot, so no need for Enlarge() */
- if (PrivateRefCount[buffer - 1] > 1)
- PrivateRefCount[buffer - 1]--;
- else
- {
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
- UnpinBuffer(bufHdr, false);
- return ReadBufferInternal(relation, blockNum, true);
- }
+ UnpinBuffer(bufHdr, true, true);
}
}
- else
- ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
- return ReadBufferInternal(relation, blockNum, false);
+ return ReadBuffer(relation, blockNum);
}
/*
* PinBuffer -- make buffer unavailable for replacement.
*
* This should be applied only to shared buffers, never local ones.
- * Bufmgr lock must be held by caller.
*
- * Most but not all callers want CurrentResourceOwner to be adjusted.
* Note that ResourceOwnerEnlargeBuffers must have been done already.
+ *
+ * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows
+ * some callers to avoid an extra spinlock cycle.
+ */
+static bool
+PinBuffer(BufferDesc *buf)
+{
+ int b = buf->buf_id;
+ bool result;
+
+ if (PrivateRefCount[b] == 0)
+ {
+ /*
+ * Use NoHoldoff here because we don't want the unlock to be a
+ * potential place to honor a QueryCancel request.
+ * (The caller should be holding off interrupts anyway.)
+ */
+ LockBufHdr_NoHoldoff(buf);
+ buf->refcount++;
+ result = (buf->flags & BM_VALID) != 0;
+ UnlockBufHdr_NoHoldoff(buf);
+ }
+ else
+ {
+ /* If we previously pinned the buffer, it must surely be valid */
+ result = true;
+ }
+ PrivateRefCount[b]++;
+ Assert(PrivateRefCount[b] > 0);
+ ResourceOwnerRememberBuffer(CurrentResourceOwner,
+ BufferDescriptorGetBuffer(buf));
+ return result;
+}
+
+/*
+ * PinBuffer_Locked -- as above, but caller already locked the buffer header.
+ * The spinlock is released before return.
+ *
+ * Note: use of this routine is frequently mandatory, not just an optimization
+ * to save a spin lock/unlock cycle, because we need to pin a buffer before
+ * its state can change under us.
*/
static void
-PinBuffer(BufferDesc *buf, bool fixOwner)
+PinBuffer_Locked(BufferDesc *buf)
{
- int b = BufferDescriptorGetBuffer(buf) - 1;
+ int b = buf->buf_id;
if (PrivateRefCount[b] == 0)
buf->refcount++;
+ /* NoHoldoff since we mustn't accept cancel interrupt here */
+ UnlockBufHdr_NoHoldoff(buf);
PrivateRefCount[b]++;
Assert(PrivateRefCount[b] > 0);
- if (fixOwner)
- ResourceOwnerRememberBuffer(CurrentResourceOwner,
- BufferDescriptorGetBuffer(buf));
+ ResourceOwnerRememberBuffer(CurrentResourceOwner,
+ BufferDescriptorGetBuffer(buf));
+ /* Now we can accept cancel */
+ RESUME_INTERRUPTS();
}
/*
* UnpinBuffer -- make buffer available for replacement.
*
* This should be applied only to shared buffers, never local ones.
- * Bufmgr lock must be held by caller.
*
* Most but not all callers want CurrentResourceOwner to be adjusted.
+ *
+ * If we are releasing a buffer during VACUUM, and it's not been otherwise
+ * used recently, and trashOK is true, send the buffer to the freelist.
*/
static void
-UnpinBuffer(BufferDesc *buf, bool fixOwner)
+UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK)
{
- int b = BufferDescriptorGetBuffer(buf) - 1;
+ int b = buf->buf_id;
if (fixOwner)
ResourceOwnerForgetBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
- Assert(buf->refcount > 0);
Assert(PrivateRefCount[b] > 0);
PrivateRefCount[b]--;
if (PrivateRefCount[b] == 0)
{
- buf->refcount--;
+ bool trash_buffer = false;
+
/* I'd better not still hold any locks on the buffer */
- Assert(!LWLockHeldByMe(buf->cntx_lock));
+ Assert(!LWLockHeldByMe(buf->content_lock));
Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
- }
- if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
- buf->refcount == 1)
- {
- /* we just released the last pin other than the waiter's */
- buf->flags &= ~BM_PIN_COUNT_WAITER;
- ProcSendSignal(buf->wait_backend_id);
- }
- else
- {
- /* do nothing */
+ /* NoHoldoff ensures we don't lose control before sending signal */
+ LockBufHdr_NoHoldoff(buf);
+
+ /* Decrement the shared reference count */
+ Assert(buf->refcount > 0);
+ buf->refcount--;
+
+ /* Mark the buffer recently used, unless we are in VACUUM */
+ if (!strategy_hint_vacuum)
+ {
+ if (buf->usage_count < BM_MAX_USAGE_COUNT)
+ buf->usage_count++;
+ }
+ else if (trashOK &&
+ buf->refcount == 0 &&
+ buf->usage_count == 0)
+ trash_buffer = true;
+
+ if ((buf->flags & BM_PIN_COUNT_WAITER) &&
+ buf->refcount == 1)
+ {
+ /* we just released the last pin other than the waiter's */
+ BackendId wait_backend_id = buf->wait_backend_id;
+
+ buf->flags &= ~BM_PIN_COUNT_WAITER;
+ UnlockBufHdr_NoHoldoff(buf);
+ ProcSendSignal(wait_backend_id);
+ }
+ else
+ UnlockBufHdr_NoHoldoff(buf);
+
+ /*
+ * If VACUUM is releasing an otherwise-unused buffer, send it to
+ * the freelist for near-term reuse. We put it at the tail so that
+ * it won't be used before any invalid buffers that may exist.
+ */
+ if (trash_buffer)
+ StrategyFreeBuffer(buf, false);
}
}
/*
- * BufferSync -- Write out dirty buffers in the pool.
+ * BufferSync -- Write out all dirty buffers in the pool.
*
- * This is called at checkpoint time to write out all dirty shared buffers,
- * and by the background writer process to write out some of the dirty blocks.
- * percent/maxpages should be -1 in the former case, and limit values (>= 0)
- * in the latter.
- *
- * Returns the number of buffers written.
+ * This is called at checkpoint time to write out all dirty shared buffers.
*/
-int
-BufferSync(int percent, int maxpages)
+void
+BufferSync(void)
{
- BufferDesc **dirty_buffers;
- BufferTag *buftags;
- int num_buffer_dirty;
- int i;
-
- /* If either limit is zero then we are disabled from doing anything... */
- if (percent == 0 || maxpages == 0)
- return 0;
+ int buf_id;
+ int num_to_scan;
/*
- * Get a list of all currently dirty buffers and how many there are.
- * We do not flush buffers that get dirtied after we started. They
- * have to wait until the next checkpoint.
+ * Find out where to start the circular scan.
*/
- dirty_buffers = (BufferDesc **) palloc(NBuffers * sizeof(BufferDesc *));
- buftags = (BufferTag *) palloc(NBuffers * sizeof(BufferTag));
+ buf_id = StrategySyncStart();
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
- num_buffer_dirty = StrategyDirtyBufferList(dirty_buffers, buftags,
- NBuffers);
+ /* Make sure we can handle the pin inside SyncOneBuffer */
+ ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
/*
- * If called by the background writer, we are usually asked to only
- * write out some portion of dirty buffers now, to prevent the IO
- * storm at checkpoint time.
+ * Loop over all buffers.
*/
- if (percent > 0)
+ num_to_scan = NBuffers;
+ while (num_to_scan-- > 0)
{
- Assert(percent <= 100);
- num_buffer_dirty = (num_buffer_dirty * percent + 99) / 100;
+ (void) SyncOneBuffer(buf_id, false);
+ if (++buf_id >= NBuffers)
+ buf_id = 0;
}
- if (maxpages > 0 && num_buffer_dirty > maxpages)
- num_buffer_dirty = maxpages;
+}
- /* Make sure we can handle the pin inside the loop */
+/*
+ * BgBufferSync -- Write out some dirty buffers in the pool.
+ *
+ * This is called periodically by the background writer process.
+ */
+void
+BgBufferSync(void)
+{
+ static int buf_id1 = 0;
+ int buf_id2;
+ int num_to_scan;
+ int num_written;
+
+ /* Make sure we can handle the pin inside SyncOneBuffer */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
/*
- * Loop over buffers to be written. Note the BufMgrLock is held at
- * loop top, but is released and reacquired within FlushBuffer, so we
- * aren't holding it long.
+ * To minimize work at checkpoint time, we want to try to keep all the
+ * buffers clean; this motivates a scan that proceeds sequentially through
+ * all buffers. But we are also charged with ensuring that buffers that
+ * will be recycled soon are clean when needed; these buffers are the
+ * ones just ahead of the StrategySyncStart point. We make a separate
+ * scan through those.
*/
- for (i = 0; i < num_buffer_dirty; i++)
- {
- BufferDesc *bufHdr = dirty_buffers[i];
- /*
- * Check it is still the same page and still needs writing.
- *
- * We can check bufHdr->cntxDirty here *without* holding any lock on
- * buffer context as long as we set this flag in access methods
- * *before* logging changes with XLogInsert(): if someone will set
- * cntxDirty just after our check we don't worry because of our
- * checkpoint.redo points before log record for upcoming changes
- * and so we are not required to write such dirty buffer.
- */
- if (!(bufHdr->flags & BM_VALID))
- continue;
- if (!BUFFERTAGS_EQUAL(bufHdr->tag, buftags[i]))
- continue;
- if (!(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
- continue;
+ /*
+ * This loop runs over all buffers, including pinned ones. The
+ * starting point advances through the buffer pool on successive calls.
+ */
+ if (bgwriter_all_percent > 0.0 && bgwriter_all_maxpages > 0)
+ {
+ num_to_scan = (int) ((NBuffers * bgwriter_all_percent + 99) / 100);
+ num_written = 0;
- /*
- * IO synchronization. Note that we do it with unpinned buffer to
- * avoid conflicts with FlushRelationBuffers.
- */
- if (bufHdr->flags & BM_IO_IN_PROGRESS)
+ while (num_to_scan-- > 0)
{
- WaitIO(bufHdr);
- /* Still need writing? */
- if (!(bufHdr->flags & BM_VALID))
- continue;
- if (!BUFFERTAGS_EQUAL(bufHdr->tag, buftags[i]))
- continue;
- if (!(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
- continue;
+ if (SyncOneBuffer(buf_id1, false))
+ num_written++;
+ if (++buf_id1 >= NBuffers)
+ buf_id1 = 0;
+ if (num_written >= bgwriter_all_maxpages)
+ break;
}
-
- /*
- * Here: no one doing IO for this buffer and it's dirty. Pin
- * buffer now and set IO state for it *before* acquiring shlock to
- * avoid conflicts with FlushRelationBuffers.
- */
- PinBuffer(bufHdr, true);
- StartBufferIO(bufHdr, false);
-
- FlushBuffer(bufHdr, NULL, false);
-
- TerminateBufferIO(bufHdr, 0);
- UnpinBuffer(bufHdr, true);
}
- LWLockRelease(BufMgrLock);
+ /*
+ * This loop considers only unpinned buffers close to the clock sweep
+ * point.
+ */
+ if (bgwriter_lru_percent > 0.0 && bgwriter_lru_maxpages > 0)
+ {
+ num_to_scan = (int) ((NBuffers * bgwriter_lru_percent + 99) / 100);
+ num_written = 0;
- pfree(dirty_buffers);
- pfree(buftags);
+ buf_id2 = StrategySyncStart();
- return num_buffer_dirty;
+ while (num_to_scan-- > 0)
+ {
+ if (SyncOneBuffer(buf_id2, true))
+ num_written++;
+ if (++buf_id2 >= NBuffers)
+ buf_id2 = 0;
+ if (num_written >= bgwriter_lru_maxpages)
+ break;
+ }
+ }
}
/*
- * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
+ * SyncOneBuffer -- process a single buffer during syncing.
+ *
+ * If skip_pinned is true, we don't write currently-pinned buffers, nor
+ * buffers marked recently used, as these are not replacement candidates.
*
- * Should be entered with buffer manager lock held; releases it before
- * waiting and re-acquires it afterwards.
+ * Returns true if buffer was written, else false. (This could be in error
+ * if FlushBuffers finds the buffer clean after locking it, but we don't
+ * care all that much.)
+ *
+ * Note: caller must have done ResourceOwnerEnlargeBuffers.
*/
-static void
-WaitIO(BufferDesc *buf)
+static bool
+SyncOneBuffer(int buf_id, bool skip_pinned)
{
+ BufferDesc *bufHdr = &BufferDescriptors[buf_id];
+
/*
- * Changed to wait until there's no IO - Inoue 01/13/2000
+ * Check whether buffer needs writing.
*
- * Note this is *necessary* because an error abort in the process doing
- * I/O could release the io_in_progress_lock prematurely. See
- * AbortBufferIO.
+ * We can make this check without taking the buffer content lock
+ * so long as we mark pages dirty in access methods *before* logging
+ * changes with XLogInsert(): if someone marks the buffer dirty
+ * just after our check we don't worry because our checkpoint.redo
+ * points before log record for upcoming changes and so we are not
+ * required to write such dirty buffer.
*/
- while ((buf->flags & BM_IO_IN_PROGRESS) != 0)
+ LockBufHdr(bufHdr);
+ if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
{
- LWLockRelease(BufMgrLock);
- LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
- LWLockRelease(buf->io_in_progress_lock);
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ UnlockBufHdr(bufHdr);
+ return false;
}
+ if (skip_pinned &&
+ (bufHdr->refcount != 0 || bufHdr->usage_count != 0))
+ {
+ UnlockBufHdr(bufHdr);
+ return false;
+ }
+
+ /*
+ * Pin it, share-lock it, write it. (FlushBuffer will do nothing
+ * if the buffer is clean by the time we've locked it.)
+ */
+ PinBuffer_Locked(bufHdr);
+ LWLockAcquire(bufHdr->content_lock, LW_SHARED);
+
+ FlushBuffer(bufHdr, NULL);
+
+ LWLockRelease(bufHdr->content_lock);
+ UnpinBuffer(bufHdr, true, false /* don't change freelist */ );
+
+ return true;
}
@@ -888,6 +1064,9 @@ AtEOXact_Buffers(bool isCommit)
AtEOXact_LocalBuffers(isCommit);
#endif
+
+ /* Make sure we reset the strategy hint in case VACUUM errored out */
+ StrategyHintVacuum(false);
}
/*
@@ -912,9 +1091,7 @@ AtProcExit_Buffers(void)
* here, it suggests that ResourceOwners are messed up.
*/
PrivateRefCount[i] = 1; /* make sure we release shared pin */
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
- UnpinBuffer(buf, false);
- LWLockRelease(BufMgrLock);
+ UnpinBuffer(buf, false, false /* don't change freelist */ );
Assert(PrivateRefCount[i] == 0);
}
}
@@ -941,6 +1118,7 @@ PrintBufferLeakWarning(Buffer buffer)
loccount = PrivateRefCount[buffer - 1];
}
+ /* theoretically we should lock the bufhdr here */
elog(WARNING,
"buffer refcount leak: [%03d] "
"(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
@@ -961,7 +1139,7 @@ PrintBufferLeakWarning(Buffer buffer)
void
FlushBufferPool(void)
{
- BufferSync(-1, -1);
+ BufferSync();
smgrsync();
}
@@ -988,12 +1166,17 @@ BufmgrCommit(void)
BlockNumber
BufferGetBlockNumber(Buffer buffer)
{
+ BufferDesc *bufHdr;
+
Assert(BufferIsPinned(buffer));
if (BufferIsLocal(buffer))
- return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
+ bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
else
- return BufferDescriptors[buffer - 1].tag.blockNum;
+ bufHdr = &BufferDescriptors[buffer - 1];
+
+ /* pinned, so OK to read tag without spinlock */
+ return bufHdr->tag.blockNum;
}
/*
@@ -1013,7 +1196,7 @@ BufferGetFileNode(Buffer buffer)
else
bufHdr = &BufferDescriptors[buffer - 1];
- return (bufHdr->tag.rnode);
+ return bufHdr->tag.rnode;
}
/*
@@ -1026,41 +1209,28 @@ BufferGetFileNode(Buffer buffer)
* However, we will need to force the changes to disk via fsync before
* we can checkpoint WAL.
*
- * BufMgrLock must be held at entry, and the buffer must be pinned. The
- * caller is also responsible for doing StartBufferIO/TerminateBufferIO.
+ * The caller must hold a pin on the buffer and have share-locked the
+ * buffer contents. (Note: a share-lock does not prevent updates of
+ * hint bits in the buffer, so the page could change while the write
+ * is in progress, but we assume that that will not invalidate the data
+ * written.)
*
* If the caller has an smgr reference for the buffer's relation, pass it
- * as the second parameter. If not, pass NULL. (Do not open relation
- * while holding BufMgrLock!)
- *
- * When earlylock is TRUE, we grab the per-buffer sharelock before releasing
- * BufMgrLock, rather than after. Normally this would be a bad idea since
- * we might deadlock, but it is safe and necessary when called from
- * BufferAlloc() --- see comments therein.
+ * as the second parameter. If not, pass NULL.
*/
static void
-FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock)
+FlushBuffer(BufferDesc *buf, SMgrRelation reln)
{
- Buffer buffer = BufferDescriptorGetBuffer(buf);
XLogRecPtr recptr;
ErrorContextCallback errcontext;
- /* Transpose cntxDirty into flags while holding BufMgrLock */
- buf->cntxDirty = false;
- buf->flags |= BM_DIRTY;
-
- /* To check if block content changed while flushing. - vadim 01/17/97 */
- buf->flags &= ~BM_JUST_DIRTIED;
-
/*
- * If earlylock, grab buffer sharelock before anyone else could re-lock
- * the buffer.
+ * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
+ * false, then someone else flushed the buffer before we could, so
+ * we need not do anything.
*/
- if (earlylock)
- LockBuffer(buffer, BUFFER_LOCK_SHARE);
-
- /* Release BufMgrLock while doing xlog work */
- LWLockRelease(BufMgrLock);
+ if (!StartBufferIO(buf, false))
+ return;
/* Setup error traceback support for ereport() */
errcontext.callback = buffer_write_error_callback;
@@ -1068,20 +1238,12 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock)
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
- /* Find smgr relation for buffer while holding minimal locks */
+ /* Find smgr relation for buffer */
if (reln == NULL)
reln = smgropen(buf->tag.rnode);
/*
- * Protect buffer content against concurrent update. (Note that
- * hint-bit updates can still occur while the write is in progress,
- * but we assume that that will not invalidate the data written.)
- */
- if (!earlylock)
- LockBuffer(buffer, BUFFER_LOCK_SHARE);
-
- /*
- * Force XLOG flush for buffer' LSN. This implements the basic WAL
+ * Force XLOG flush up to buffer's LSN. This implements the basic WAL
* rule that log updates must hit disk before any of the data-file
* changes they describe do.
*/
@@ -1090,35 +1252,30 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock)
/*
* Now it's safe to write buffer to disk. Note that no one else should
- * have been able to write it while we were busy with locking and log
- * flushing because caller has set the IO flag.
- *
- * It would be better to clear BM_JUST_DIRTIED right here, but we'd have
- * to reacquire the BufMgrLock and it doesn't seem worth it.
+ * have been able to write it while we were busy with log flushing
+ * because we have the io_in_progress lock.
*/
+
+ /* To check if block content changes while flushing. - vadim 01/17/97 */
+ LockBufHdr_NoHoldoff(buf);
+ buf->flags &= ~BM_JUST_DIRTIED;
+ UnlockBufHdr_NoHoldoff(buf);
+
smgrwrite(reln,
buf->tag.blockNum,
- (char *) MAKE_PTR(buf->data),
+ (char *) BufHdrGetBlock(buf),
false);
- /* Pop the error context stack */
- error_context_stack = errcontext.previous;
-
- /*
- * Release the per-buffer readlock, reacquire BufMgrLock.
- */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
-
BufferFlushCount++;
/*
- * If this buffer was marked by someone as DIRTY while we were
- * flushing it out we must not clear DIRTY flag - vadim 01/17/97
+ * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set)
+ * and end the io_in_progress state.
*/
- if (!(buf->flags & BM_JUST_DIRTIED))
- buf->flags &= ~BM_DIRTY;
+ TerminateBufferIO(buf, true, 0);
+
+ /* Pop the error context stack */
+ error_context_stack = errcontext.previous;
}
/*
@@ -1210,62 +1367,24 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
bufHdr->tag.rnode.dbNode,
bufHdr->tag.rnode.relNode,
LocalRefCount[i]);
- bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
- bufHdr->cntxDirty = false;
- bufHdr->tag.rnode.relNode = InvalidOid;
+ CLEAR_BUFFERTAG(bufHdr->tag);
+ bufHdr->flags = 0;
+ bufHdr->usage_count = 0;
}
}
return;
}
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
-
- for (i = 1; i <= NBuffers; i++)
+ for (i = 0; i < NBuffers; i++)
{
- bufHdr = &BufferDescriptors[i - 1];
-recheck:
+ bufHdr = &BufferDescriptors[i];
+ LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
bufHdr->tag.blockNum >= firstDelBlock)
- {
- /*
- * If there is I/O in progress, better wait till it's done;
- * don't want to delete the relation out from under someone
- * who's just trying to flush the buffer!
- */
- if (bufHdr->flags & BM_IO_IN_PROGRESS)
- {
- WaitIO(bufHdr);
-
- /*
- * By now, the buffer very possibly belongs to some other
- * rel, so check again before proceeding.
- */
- goto recheck;
- }
-
- /*
- * There should be no pin on the buffer.
- */
- if (bufHdr->refcount != 0)
- elog(ERROR, "block %u of %u/%u/%u is still referenced (private %d, global %u)",
- bufHdr->tag.blockNum,
- bufHdr->tag.rnode.spcNode,
- bufHdr->tag.rnode.dbNode,
- bufHdr->tag.rnode.relNode,
- PrivateRefCount[i - 1], bufHdr->refcount);
-
- /* Now we can do what we came for */
- bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
- bufHdr->cntxDirty = false;
-
- /*
- * And mark the buffer as no longer occupied by this rel.
- */
- StrategyInvalidateBuffer(bufHdr);
- }
+ InvalidateBuffer(bufHdr); /* releases spinlock */
+ else
+ UnlockBufHdr(bufHdr);
}
-
- LWLockRelease(BufMgrLock);
}
/* ---------------------------------------------------------------------
@@ -1285,47 +1404,20 @@ DropBuffers(Oid dbid)
int i;
BufferDesc *bufHdr;
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ /*
+ * We needn't consider local buffers, since by assumption the target
+ * database isn't our own.
+ */
- for (i = 1; i <= NBuffers; i++)
+ for (i = 0; i < NBuffers; i++)
{
- bufHdr = &BufferDescriptors[i - 1];
-recheck:
+ bufHdr = &BufferDescriptors[i];
+ LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid)
- {
- /*
- * If there is I/O in progress, better wait till it's done;
- * don't want to delete the database out from under someone
- * who's just trying to flush the buffer!
- */
- if (bufHdr->flags & BM_IO_IN_PROGRESS)
- {
- WaitIO(bufHdr);
-
- /*
- * By now, the buffer very possibly belongs to some other
- * DB, so check again before proceeding.
- */
- goto recheck;
- }
- /* Now we can do what we came for */
- bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
- bufHdr->cntxDirty = false;
-
- /*
- * The thing should be free, if caller has checked that no
- * backends are running in that database.
- */
- Assert(bufHdr->refcount == 0);
-
- /*
- * And mark the buffer as no longer occupied by this page.
- */
- StrategyInvalidateBuffer(bufHdr);
- }
+ InvalidateBuffer(bufHdr); /* releases spinlock */
+ else
+ UnlockBufHdr(bufHdr);
}
-
- LWLockRelease(BufMgrLock);
}
/* -----------------------------------------------------------------
@@ -1342,32 +1434,17 @@ PrintBufferDescs(void)
int i;
BufferDesc *buf = BufferDescriptors;
- if (IsUnderPostmaster)
- {
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
- for (i = 0; i < NBuffers; ++i, ++buf)
- {
- elog(LOG,
- "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, "
- "blockNum=%u, flags=0x%x, refcount=%u %d)",
- i, buf->freeNext, buf->freePrev,
- buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
- buf->tag.rnode.relNode,
- buf->tag.blockNum, buf->flags,
- buf->refcount, PrivateRefCount[i]);
- }
- LWLockRelease(BufMgrLock);
- }
- else
+ for (i = 0; i < NBuffers; ++i, ++buf)
{
- /* interactive backend */
- for (i = 0; i < NBuffers; ++i, ++buf)
- {
- printf("[%-2d] (%u/%u/%u, %u) flags=0x%x, refcount=%u %d)\n",
- i, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
- buf->tag.rnode.relNode, buf->tag.blockNum,
- buf->flags, buf->refcount, PrivateRefCount[i]);
- }
+ /* theoretically we should lock the bufhdr here */
+ elog(LOG,
+ "[%02d] (freeNext=%d, rel=%u/%u/%u, "
+ "blockNum=%u, flags=0x%x, refcount=%u %d)",
+ i, buf->freeNext,
+ buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
+ buf->tag.rnode.relNode,
+ buf->tag.blockNum, buf->flags,
+ buf->refcount, PrivateRefCount[i]);
}
}
#endif
@@ -1379,20 +1456,21 @@ PrintPinnedBufs(void)
int i;
BufferDesc *buf = BufferDescriptors;
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
for (i = 0; i < NBuffers; ++i, ++buf)
{
if (PrivateRefCount[i] > 0)
- elog(NOTICE,
- "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, "
+ {
+ /* theoretically we should lock the bufhdr here */
+ elog(LOG,
+ "[%02d] (freeNext=%d, rel=%u/%u/%u, "
"blockNum=%u, flags=0x%x, refcount=%u %d)",
- i, buf->freeNext, buf->freePrev,
+ i, buf->freeNext,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
+ }
}
- LWLockRelease(BufMgrLock);
}
#endif
@@ -1451,8 +1529,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
bufHdr = &LocalBufferDescriptors[i];
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
{
- if ((bufHdr->flags & BM_VALID) &&
- (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
+ if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
{
ErrorContextCallback errcontext;
@@ -1464,11 +1541,10 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
smgrwrite(rel->rd_smgr,
bufHdr->tag.blockNum,
- (char *) MAKE_PTR(bufHdr->data),
+ (char *) LocalBufHdrGetBlock(bufHdr),
true);
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
- bufHdr->cntxDirty = false;
/* Pop the error context stack */
error_context_stack = errcontext.previous;
@@ -1478,7 +1554,11 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
RelationGetRelationName(rel), firstDelBlock,
bufHdr->tag.blockNum, LocalRefCount[i]);
if (bufHdr->tag.blockNum >= firstDelBlock)
- bufHdr->tag.rnode.relNode = InvalidOid;
+ {
+ CLEAR_BUFFERTAG(bufHdr->tag);
+ bufHdr->flags = 0;
+ bufHdr->usage_count = 0;
+ }
}
}
@@ -1488,46 +1568,40 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
/* Make sure we can handle the pin inside the loop */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
-
for (i = 0; i < NBuffers; i++)
{
bufHdr = &BufferDescriptors[i];
+ recheck:
+ LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
{
- if ((bufHdr->flags & BM_VALID) &&
- (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
+ if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
{
- PinBuffer(bufHdr, true);
- /* Someone else might be flushing buffer */
- if (bufHdr->flags & BM_IO_IN_PROGRESS)
- WaitIO(bufHdr);
- /* Still dirty? */
- if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
- {
- StartBufferIO(bufHdr, false);
-
- FlushBuffer(bufHdr, rel->rd_smgr, false);
-
- TerminateBufferIO(bufHdr, 0);
- }
- UnpinBuffer(bufHdr, true);
- if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
- elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u was re-dirtied",
- RelationGetRelationName(rel), firstDelBlock,
- bufHdr->tag.blockNum);
+ PinBuffer_Locked(bufHdr);
+ LWLockAcquire(bufHdr->content_lock, LW_SHARED);
+ FlushBuffer(bufHdr, rel->rd_smgr);
+ LWLockRelease(bufHdr->content_lock);
+ UnpinBuffer(bufHdr, true, false /* no freelist change */ );
+ /*
+ * As soon as we unpin, it's possible for someone to take
+ * the buffer away from us; so loop back to re-lock and
+ * re-check if it still belongs to the target relation.
+ */
+ goto recheck;
}
- if (bufHdr->refcount != 0)
- elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u is referenced (private %d, global %u)",
- RelationGetRelationName(rel), firstDelBlock,
- bufHdr->tag.blockNum,
- PrivateRefCount[i], bufHdr->refcount);
+ /*
+ * Even though it's not dirty, it could still be pinned because
+ * TerminateIO and UnpinBuffer are separate actions. Hence,
+ * we can't error out on nonzero reference count here.
+ */
if (bufHdr->tag.blockNum >= firstDelBlock)
- StrategyInvalidateBuffer(bufHdr);
+ InvalidateBuffer(bufHdr); /* releases spinlock */
+ else
+ UnlockBufHdr(bufHdr);
}
+ else
+ UnlockBufHdr(bufHdr);
}
-
- LWLockRelease(BufMgrLock);
}
/*
@@ -1547,7 +1621,11 @@ ReleaseBuffer(Buffer buffer)
if (BufferIsLocal(buffer))
{
Assert(LocalRefCount[-buffer - 1] > 0);
+ bufHdr = &LocalBufferDescriptors[-buffer - 1];
LocalRefCount[-buffer - 1]--;
+ if (LocalRefCount[-buffer - 1] == 0 &&
+ bufHdr->usage_count < BM_MAX_USAGE_COUNT)
+ bufHdr->usage_count++;
return;
}
@@ -1558,11 +1636,7 @@ ReleaseBuffer(Buffer buffer)
if (PrivateRefCount[buffer - 1] > 1)
PrivateRefCount[buffer - 1]--;
else
- {
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
- UnpinBuffer(bufHdr, false);
- LWLockRelease(BufMgrLock);
- }
+ UnpinBuffer(bufHdr, false, true);
}
/*
@@ -1585,88 +1659,6 @@ IncrBufferRefCount(Buffer buffer)
PrivateRefCount[buffer - 1]++;
}
-#ifdef NOT_USED
-void
-IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
-{
- IncrBufferRefCount(buffer);
- if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
- {
- BufferDesc *buf = &BufferDescriptors[buffer - 1];
-
- fprintf(stderr,
- "PIN(Incr) %d rel = %u/%u/%u, blockNum = %u, "
- "refcount = %d, file: %s, line: %d\n",
- buffer,
- buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
- buf->tag.rnode.relNode, buf->tag.blockNum,
- PrivateRefCount[buffer - 1], file, line);
- }
-}
-#endif
-
-#ifdef NOT_USED
-void
-ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
-{
- ReleaseBuffer(buffer);
- if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
- {
- BufferDesc *buf = &BufferDescriptors[buffer - 1];
-
- fprintf(stderr,
- "UNPIN(Rel) %d rel = %u/%u/%u, blockNum = %u, "
- "refcount = %d, file: %s, line: %d\n",
- buffer,
- buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
- buf->tag.rnode.relNode, buf->tag.blockNum,
- PrivateRefCount[buffer - 1], file, line);
- }
-}
-#endif
-
-#ifdef NOT_USED
-Buffer
-ReleaseAndReadBuffer_Debug(char *file,
- int line,
- Buffer buffer,
- Relation relation,
- BlockNumber blockNum)
-{
- bool bufferValid;
- Buffer b;
-
- bufferValid = BufferIsValid(buffer);
- b = ReleaseAndReadBuffer(buffer, relation, blockNum);
- if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
- && is_userbuffer(buffer))
- {
- BufferDesc *buf = &BufferDescriptors[buffer - 1];
-
- fprintf(stderr,
- "UNPIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, "
- "refcount = %d, file: %s, line: %d\n",
- buffer,
- buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
- buf->tag.rnode.relNode, buf->tag.blockNum,
- PrivateRefCount[buffer - 1], file, line);
- }
- if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
- {
- BufferDesc *buf = &BufferDescriptors[b - 1];
-
- fprintf(stderr,
- "PIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, "
- "refcount = %d, file: %s, line: %d\n",
- b,
- buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
- buf->tag.rnode.relNode, buf->tag.blockNum,
- PrivateRefCount[b - 1], file, line);
- }
- return b;
-}
-#endif
-
/*
* SetBufferCommitInfoNeedsSave
*
@@ -1682,7 +1674,7 @@ ReleaseAndReadBuffer_Debug(char *file,
* This routine might get called many times on the same page, if we are making
* the first scan after commit of an xact that added/deleted many tuples.
* So, be as quick as we can if the buffer is already dirty. We do this by
- * not acquiring BufMgrLock if it looks like the status bits are already OK.
+ * not acquiring spinlock if it looks like the status bits are already OK.
* (Note it is okay if someone else clears BM_JUST_DIRTIED immediately after
* we look, because the buffer content update is already done and will be
* reflected in the I/O.)
@@ -1703,23 +1695,25 @@ SetBufferCommitInfoNeedsSave(Buffer buffer)
bufHdr = &BufferDescriptors[buffer - 1];
+ Assert(PrivateRefCount[buffer - 1] > 0);
+
if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED))
{
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ LockBufHdr(bufHdr);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
- LWLockRelease(BufMgrLock);
+ UnlockBufHdr(bufHdr);
}
}
/*
- * Release buffer context locks for shared buffers.
+ * Release buffer content locks for shared buffers.
*
* Used to clean up after errors.
*
* Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
- * of releasing buffer context locks per se; the only thing we need to deal
+ * of releasing buffer content locks per se; the only thing we need to deal
* with here is clearing any PIN_COUNT request that was in progress.
*/
void
@@ -1731,7 +1725,7 @@ UnlockBuffers(void)
{
HOLD_INTERRUPTS(); /* don't want to die() partway through... */
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ LockBufHdr_NoHoldoff(buf);
/*
* Don't complain if flag bit not set; it could have been
@@ -1741,18 +1735,19 @@ UnlockBuffers(void)
if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_id == MyBackendId)
buf->flags &= ~BM_PIN_COUNT_WAITER;
- LWLockRelease(BufMgrLock);
+
+ UnlockBufHdr_NoHoldoff(buf);
ProcCancelWaitForSignal();
+ PinCountWaitBuf = NULL;
+
RESUME_INTERRUPTS();
}
-
- PinCountWaitBuf = NULL;
}
/*
- * Acquire or release the cntx_lock for the buffer.
+ * Acquire or release the content_lock for the buffer.
*/
void
LockBuffer(Buffer buffer, int mode)
@@ -1766,27 +1761,29 @@ LockBuffer(Buffer buffer, int mode)
buf = &(BufferDescriptors[buffer - 1]);
if (mode == BUFFER_LOCK_UNLOCK)
- LWLockRelease(buf->cntx_lock);
+ LWLockRelease(buf->content_lock);
else if (mode == BUFFER_LOCK_SHARE)
- LWLockAcquire(buf->cntx_lock, LW_SHARED);
+ LWLockAcquire(buf->content_lock, LW_SHARED);
else if (mode == BUFFER_LOCK_EXCLUSIVE)
{
- LWLockAcquire(buf->cntx_lock, LW_EXCLUSIVE);
+ LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
/*
- * This is not the best place to set cntxDirty flag (eg indices do
+ * This is not the best place to mark buffer dirty (eg indices do
* not always change buffer they lock in excl mode). But please
- * remember that it's critical to set cntxDirty *before* logging
- * changes with XLogInsert() - see comments in BufferSync().
+ * remember that it's critical to set dirty bit *before* logging
+ * changes with XLogInsert() - see comments in SyncOneBuffer().
*/
- buf->cntxDirty = true;
+ LockBufHdr_NoHoldoff(buf);
+ buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+ UnlockBufHdr_NoHoldoff(buf);
}
else
elog(ERROR, "unrecognized buffer lock mode: %d", mode);
}
/*
- * Acquire the cntx_lock for the buffer, but only if we don't have to wait.
+ * Acquire the content_lock for the buffer, but only if we don't have to wait.
*
* This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
*/
@@ -1801,15 +1798,17 @@ ConditionalLockBuffer(Buffer buffer)
buf = &(BufferDescriptors[buffer - 1]);
- if (LWLockConditionalAcquire(buf->cntx_lock, LW_EXCLUSIVE))
+ if (LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE))
{
/*
- * This is not the best place to set cntxDirty flag (eg indices do
+ * This is not the best place to mark buffer dirty (eg indices do
* not always change buffer they lock in excl mode). But please
- * remember that it's critical to set cntxDirty *before* logging
- * changes with XLogInsert() - see comments in BufferSync().
+ * remember that it's critical to set dirty bit *before* logging
+ * changes with XLogInsert() - see comments in SyncOneBuffer().
*/
- buf->cntxDirty = true;
+ LockBufHdr_NoHoldoff(buf);
+ buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+ UnlockBufHdr_NoHoldoff(buf);
return true;
}
@@ -1861,25 +1860,25 @@ LockBufferForCleanup(Buffer buffer)
{
/* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ LockBufHdr_NoHoldoff(bufHdr);
Assert(bufHdr->refcount > 0);
if (bufHdr->refcount == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
- LWLockRelease(BufMgrLock);
+ UnlockBufHdr_NoHoldoff(bufHdr);
return;
}
/* Failed, so mark myself as waiting for pincount 1 */
if (bufHdr->flags & BM_PIN_COUNT_WAITER)
{
- LWLockRelease(BufMgrLock);
+ UnlockBufHdr_NoHoldoff(bufHdr);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "multiple backends attempting to wait for pincount 1");
}
bufHdr->wait_backend_id = MyBackendId;
bufHdr->flags |= BM_PIN_COUNT_WAITER;
PinCountWaitBuf = bufHdr;
- LWLockRelease(BufMgrLock);
+ UnlockBufHdr_NoHoldoff(bufHdr);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Wait to be signaled by UnpinBuffer() */
ProcWaitForSignal();
@@ -1889,94 +1888,160 @@ LockBufferForCleanup(Buffer buffer)
}
/*
- * Functions for IO error handling
+ * Functions for buffer I/O handling
*
- * Note: We assume that nested buffer IO never occurs.
+ * Note: We assume that nested buffer I/O never occurs.
* i.e at most one io_in_progress lock is held per proc.
+ *
+ * Also note that these are used only for shared buffers, not local ones.
+ */
+
+/*
+ * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
*/
+static void
+WaitIO(BufferDesc *buf)
+{
+ /*
+ * Changed to wait until there's no IO - Inoue 01/13/2000
+ *
+ * Note this is *necessary* because an error abort in the process doing
+ * I/O could release the io_in_progress_lock prematurely. See
+ * AbortBufferIO.
+ */
+ for (;;)
+ {
+ BufFlags sv_flags;
+
+ /*
+ * It may not be necessary to acquire the spinlock to check the
+ * flag here, but since this test is essential for correctness,
+ * we'd better play it safe.
+ */
+ LockBufHdr(buf);
+ sv_flags = buf->flags;
+ UnlockBufHdr(buf);
+ if (!(sv_flags & BM_IO_IN_PROGRESS))
+ break;
+ LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
+ LWLockRelease(buf->io_in_progress_lock);
+ }
+}
/*
- * Function:StartBufferIO
+ * StartBufferIO: begin I/O on this buffer
* (Assumptions)
* My process is executing no IO
- * BufMgrLock is held
- * BM_IO_IN_PROGRESS mask is not set for the buffer
* The buffer is Pinned
*
- * Because BufMgrLock is held, we are already in an interrupt holdoff here,
- * and do not need another.
+ * In some scenarios there are race conditions in which multiple backends
+ * could attempt the same I/O operation concurrently. If someone else
+ * has already started I/O on this buffer then we will block on the
+ * io_in_progress lock until he's done.
+ *
+ * Input operations are only attempted on buffers that are not BM_VALID,
+ * and output operations only on buffers that are BM_VALID and BM_DIRTY,
+ * so we can always tell if the work is already done.
+ *
+ * Returns TRUE if we successfully marked the buffer as I/O busy,
+ * FALSE if someone else already did the work.
*/
-static void
+static bool
StartBufferIO(BufferDesc *buf, bool forInput)
{
Assert(!InProgressBuf);
- Assert(!(buf->flags & BM_IO_IN_PROGRESS));
+
+ for (;;)
+ {
+ /*
+ * Grab the io_in_progress lock so that other processes can wait for
+ * me to finish the I/O.
+ */
+ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
+
+ /* NoHoldoff is OK since we now have an LWLock */
+ LockBufHdr_NoHoldoff(buf);
+
+ if (!(buf->flags & BM_IO_IN_PROGRESS))
+ break;
+
+ /*
+ * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
+ * lock isn't held is if the process doing the I/O is recovering from
+ * an error (see AbortBufferIO). If that's the case, we must wait for
+ * him to get unwedged.
+ */
+ UnlockBufHdr_NoHoldoff(buf);
+ LWLockRelease(buf->io_in_progress_lock);
+ WaitIO(buf);
+ }
+
+ /* Once we get here, there is definitely no I/O active on this buffer */
+
+ if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
+ {
+ /* someone else already did the I/O */
+ UnlockBufHdr_NoHoldoff(buf);
+ LWLockRelease(buf->io_in_progress_lock);
+ return false;
+ }
+
buf->flags |= BM_IO_IN_PROGRESS;
- LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
+ UnlockBufHdr_NoHoldoff(buf);
InProgressBuf = buf;
IsForInput = forInput;
+
+ return true;
}
/*
- * Function:TerminateBufferIO
+ * TerminateBufferIO: release a buffer we were doing I/O on
* (Assumptions)
* My process is executing IO for the buffer
- * BufMgrLock is held
- * BM_IO_IN_PROGRESS mask is set for the buffer
+ * BM_IO_IN_PROGRESS bit is set for the buffer
+ * We hold the buffer's io_in_progress lock
* The buffer is Pinned
*
- * err_flag must be 0 for successful completion and BM_IO_ERROR for failure.
+ * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
+ * buffer's BM_DIRTY flag. This is appropriate when terminating a
+ * successful write. The check on BM_JUST_DIRTIED is necessary to avoid
+ * marking the buffer clean if it was re-dirtied while we were writing.
*
- * Because BufMgrLock is held, we are already in an interrupt holdoff here,
- * and do not need another.
+ * set_flag_bits gets ORed into the buffer's flags. It must include
+ * BM_IO_ERROR in a failure case. For successful completion it could
+ * be 0, or BM_VALID if we just finished reading in the page.
*/
static void
-TerminateBufferIO(BufferDesc *buf, int err_flag)
+TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits)
{
Assert(buf == InProgressBuf);
+
+ /* NoHoldoff is OK since we must have an LWLock */
+ LockBufHdr_NoHoldoff(buf);
+
Assert(buf->flags & BM_IO_IN_PROGRESS);
buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
- buf->flags |= err_flag;
+ if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
+ buf->flags &= ~BM_DIRTY;
+ buf->flags |= set_flag_bits;
- LWLockRelease(buf->io_in_progress_lock);
+ UnlockBufHdr_NoHoldoff(buf);
InProgressBuf = NULL;
-}
-/*
- * Function:ContinueBufferIO
- * (Assumptions)
- * My process is executing IO for the buffer
- * BufMgrLock is held
- * The buffer is Pinned
- *
- * Because BufMgrLock is held, we are already in an interrupt holdoff here,
- * and do not need another.
- */
-static void
-ContinueBufferIO(BufferDesc *buf, bool forInput)
-{
- Assert(buf == InProgressBuf);
- Assert(buf->flags & BM_IO_IN_PROGRESS);
- IsForInput = forInput;
-}
-
-#ifdef NOT_USED
-void
-InitBufferIO(void)
-{
- InProgressBuf = NULL;
+ LWLockRelease(buf->io_in_progress_lock);
}
-#endif
/*
- * Clean up any active buffer I/O after an error.
- * BufMgrLock isn't held when this function is called,
+ * AbortBufferIO: Clean up any active buffer I/O after an error.
+ *
+ * All LWLocks we might have held have been released,
* but we haven't yet released buffer pins, so the buffer is still pinned.
*
- * If I/O was in progress, we always set BM_IO_ERROR.
+ * If I/O was in progress, we always set BM_IO_ERROR, even though it's
+ * possible the error condition wasn't related to the I/O.
*/
void
AbortBufferIO(void)
@@ -1994,20 +2059,27 @@ AbortBufferIO(void)
*/
LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
- LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
+ /* NoHoldoff is OK since we now have an LWLock */
+ LockBufHdr_NoHoldoff(buf);
Assert(buf->flags & BM_IO_IN_PROGRESS);
if (IsForInput)
{
- Assert(!(buf->flags & BM_DIRTY || buf->cntxDirty));
+ Assert(!(buf->flags & BM_DIRTY));
/* We'd better not think buffer is valid yet */
Assert(!(buf->flags & BM_VALID));
+ UnlockBufHdr_NoHoldoff(buf);
}
else
{
- Assert(buf->flags & BM_DIRTY || buf->cntxDirty);
+ BufFlags sv_flags;
+
+ sv_flags = buf->flags;
+ Assert(sv_flags & BM_DIRTY);
+ UnlockBufHdr_NoHoldoff(buf);
/* Issue notice if this is not the first failure... */
- if (buf->flags & BM_IO_ERROR)
+ if (sv_flags & BM_IO_ERROR)
{
+ /* Buffer is pinned, so we can read tag without spinlock */
ereport(WARNING,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not write block %u of %u/%u/%u",
@@ -2017,10 +2089,8 @@ AbortBufferIO(void)
buf->tag.rnode.relNode),
errdetail("Multiple failures --- write error may be permanent.")));
}
- buf->flags |= BM_DIRTY;
}
- TerminateBufferIO(buf, BM_IO_ERROR);
- LWLockRelease(BufMgrLock);
+ TerminateBufferIO(buf, false, BM_IO_ERROR);
}
}
@@ -2032,6 +2102,7 @@ buffer_write_error_callback(void *arg)
{
BufferDesc *bufHdr = (BufferDesc *) arg;
+ /* Buffer is pinned, so we can read the tag without locking the spinlock */
if (bufHdr != NULL)
errcontext("writing block %u of relation %u/%u/%u",
bufHdr->tag.blockNum,