diff options
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 1551 |
1 files changed, 811 insertions, 740 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 457d23b0e02..59dec8f9ead 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.185 2005/01/10 20:02:21 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.186 2005/03/04 20:21:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -25,7 +25,9 @@ * * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer() * - * BufferSync() -- flush all (or some) dirty buffers in the buffer pool. + * BufferSync() -- flush all dirty buffers in the buffer pool. + * + * BgBufferSync() -- flush some dirty buffers in the buffer pool. * * InitBufferPool() -- Init the buffer module. * @@ -50,16 +52,22 @@ #include "pgstat.h" -#define BufferGetLSN(bufHdr) \ - (*((XLogRecPtr*) MAKE_PTR((bufHdr)->data))) +/* Note: these two macros only work on shared buffers, not local ones! */ +#define BufHdrGetBlock(bufHdr) BufferBlockPointers[(bufHdr)->buf_id] +#define BufferGetLSN(bufHdr) (*((XLogRecPtr*) BufHdrGetBlock(bufHdr))) + +/* Note: this macro only works on local buffers, not shared ones! */ +#define LocalBufHdrGetBlock(bufHdr) \ + LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] -/* GUC variable */ +/* GUC variables */ bool zero_damaged_pages = false; +double bgwriter_lru_percent = 1.0; +double bgwriter_all_percent = 0.333; +int bgwriter_lru_maxpages = 5; +int bgwriter_all_maxpages = 5; -#ifdef NOT_USED -bool ShowPinTrace = false; -#endif long NDirectFileRead; /* some I/O's are direct file access. * bypass bufmgr */ @@ -73,18 +81,18 @@ static bool IsForInput; static BufferDesc *PinCountWaitBuf = NULL; -static void PinBuffer(BufferDesc *buf, bool fixOwner); -static void UnpinBuffer(BufferDesc *buf, bool fixOwner); +static bool PinBuffer(BufferDesc *buf); +static void PinBuffer_Locked(BufferDesc *buf); +static void UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK); +static bool SyncOneBuffer(int buf_id, bool skip_pinned); static void WaitIO(BufferDesc *buf); -static void StartBufferIO(BufferDesc *buf, bool forInput); -static void TerminateBufferIO(BufferDesc *buf, int err_flag); -static void ContinueBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput); +static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, + int set_flag_bits); static void buffer_write_error_callback(void *arg); -static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum, - bool bufferLockHeld); static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr); -static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock); +static void FlushBuffer(BufferDesc *buf, SMgrRelation reln); static void write_buffer(Buffer buffer, bool unpin); @@ -106,27 +114,15 @@ static void write_buffer(Buffer buffer, bool unpin); Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { - ResourceOwnerEnlargeBuffers(CurrentResourceOwner); - return ReadBufferInternal(reln, blockNum, false); -} - -/* - * ReadBufferInternal -- internal version of ReadBuffer with more options - * - * bufferLockHeld: if true, caller already acquired the bufmgr lock. - * (This is assumed never to be true if dealing with a local buffer!) - * - * The caller must have done ResourceOwnerEnlargeBuffers(CurrentResourceOwner) - */ -static Buffer -ReadBufferInternal(Relation reln, BlockNumber blockNum, - bool bufferLockHeld) -{ BufferDesc *bufHdr; + Block bufBlock; bool found; bool isExtend; bool isLocalBuf; + /* Make sure we will have room to remember the buffer pin */ + ResourceOwnerEnlargeBuffers(CurrentResourceOwner); + isExtend = (blockNum == P_NEW); isLocalBuf = reln->rd_istemp; @@ -137,10 +133,11 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, if (isExtend) blockNum = smgrnblocks(reln->rd_smgr); + pgstat_count_buffer_read(&reln->pgstat_info, reln); + if (isLocalBuf) { ReadLocalBufferCount++; - pgstat_count_buffer_read(&reln->pgstat_info, reln); bufHdr = LocalBufferAlloc(reln, blockNum, &found); if (found) LocalBufferHitCount++; @@ -148,20 +145,17 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, else { ReadBufferCount++; - pgstat_count_buffer_read(&reln->pgstat_info, reln); /* * lookup the buffer. IO_IN_PROGRESS is set if the requested * block is not currently in memory. */ - if (!bufferLockHeld) - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); bufHdr = BufferAlloc(reln, blockNum, &found); if (found) BufferHitCount++; } - /* At this point we do NOT hold the bufmgr lock. */ + /* At this point we do NOT hold any locks. */ /* if it was already in the buffer pool, we're done */ if (found) @@ -187,20 +181,22 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, * same buffer (if it's not been recycled) but come right back here to * try smgrextend again. */ - Assert(!(bufHdr->flags & BM_VALID)); + Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + + bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); if (isExtend) { /* new buffers are zero-filled */ - MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ); - smgrextend(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data), + MemSet((char *) bufBlock, 0, BLCKSZ); + smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock, reln->rd_istemp); } else { - smgrread(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data)); + smgrread(reln->rd_smgr, blockNum, (char *) bufBlock); /* check for garbage data */ - if (!PageHeaderIsValid((PageHeader) MAKE_PTR(bufHdr->data))) + if (!PageHeaderIsValid((PageHeader) bufBlock)) { /* * During WAL recovery, the first access to any data page @@ -215,7 +211,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page", blockNum, RelationGetRelationName(reln)))); - MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ); + MemSet((char *) bufBlock, 0, BLCKSZ); } else ereport(ERROR, @@ -232,16 +228,8 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, } else { - /* lock buffer manager again to update IO IN PROGRESS */ - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - - /* IO Succeeded, so mark data valid */ - bufHdr->flags |= BM_VALID; - - /* If anyone was waiting for IO to complete, wake them up now */ - TerminateBufferIO(bufHdr, 0); - - LWLockRelease(BufMgrLock); + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID); } if (VacuumCostActive) @@ -263,8 +251,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, * *foundPtr is actually redundant with the buffer's BM_VALID flag, but * we keep it for simplicity in ReadBuffer. * - * BufMgrLock must be held at entry. When this routine returns, - * the BufMgrLock is guaranteed NOT to be held. + * No locks are held either at entry or exit. */ static BufferDesc * BufferAlloc(Relation reln, @@ -272,229 +259,343 @@ BufferAlloc(Relation reln, bool *foundPtr) { BufferTag newTag; /* identity of requested block */ - BufferDesc *buf, - *buf2; - int cdb_found_index, - cdb_replace_index; - bool inProgress; /* did we already do StartBufferIO? */ + BufferTag oldTag; + BufFlags oldFlags; + int buf_id; + BufferDesc *buf; + bool valid; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, reln, blockNum); /* see if the block is in the buffer pool already */ - buf = StrategyBufferLookup(&newTag, false, &cdb_found_index); - if (buf != NULL) + LWLockAcquire(BufMappingLock, LW_SHARED); + buf_id = BufTableLookup(&newTag); + if (buf_id >= 0) { /* * Found it. Now, pin the buffer so no one can steal it from the - * buffer pool, and check to see if someone else is still reading - * data into the buffer. (Formerly, we'd always block here if - * IO_IN_PROGRESS is set, but there's no need to wait when someone - * is writing rather than reading.) + * buffer pool, and check to see if the correct data has been + * loaded into the buffer. */ - *foundPtr = TRUE; + buf = &BufferDescriptors[buf_id]; + + valid = PinBuffer(buf); - PinBuffer(buf, true); + /* Can release the mapping lock as soon as we've pinned it */ + LWLockRelease(BufMappingLock); + + *foundPtr = TRUE; - if (!(buf->flags & BM_VALID)) + if (!valid) { - if (buf->flags & BM_IO_IN_PROGRESS) - { - /* someone else is reading it, wait for them */ - WaitIO(buf); - } - if (!(buf->flags & BM_VALID)) + /* + * We can only get here if (a) someone else is still reading + * in the page, or (b) a previous read attempt failed. We + * have to wait for any active read attempt to finish, and + * then set up our own read attempt if the page is still not + * BM_VALID. StartBufferIO does it all. + */ + if (StartBufferIO(buf, true)) { /* * If we get here, previous attempts to read the buffer * must have failed ... but we shall bravely try again. */ *foundPtr = FALSE; - StartBufferIO(buf, true); } } - LWLockRelease(BufMgrLock); - return buf; } - *foundPtr = FALSE; - /* * Didn't find it in the buffer pool. We'll have to initialize a new - * buffer. First, grab one from the free list. If it's dirty, flush - * it to disk. Remember to unlock BufMgrLock while doing the IO. + * buffer. Remember to unlock BufMappingLock while doing the work. */ - inProgress = FALSE; - do - { - buf = StrategyGetBuffer(&cdb_replace_index); - - /* StrategyGetBuffer will elog if it can't find a free buffer */ - Assert(buf); + LWLockRelease(BufMappingLock); + /* Loop here in case we have to try another victim buffer */ + for (;;) + { /* - * There should be exactly one pin on the buffer after it is - * allocated -- ours. If it had a pin it wouldn't have been on - * the free list. No one else could have pinned it between - * StrategyGetBuffer and here because we have the BufMgrLock. - * - * (We must pin the buffer before releasing BufMgrLock ourselves, - * to ensure StrategyGetBuffer won't give the same buffer to someone - * else.) + * Select a victim buffer. The buffer is returned with its + * header spinlock still held! Also the BufFreelistLock is + * still held, since it would be bad to hold the spinlock + * while possibly waking up other processes. */ + buf = StrategyGetBuffer(); + Assert(buf->refcount == 0); - buf->refcount = 1; - PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1; - ResourceOwnerRememberBuffer(CurrentResourceOwner, - BufferDescriptorGetBuffer(buf)); + /* Must copy buffer flags while we still hold the spinlock */ + oldFlags = buf->flags; - if ((buf->flags & BM_VALID) && - (buf->flags & BM_DIRTY || buf->cntxDirty)) - { - /* - * Set BM_IO_IN_PROGRESS to show the buffer is being written. - * It cannot already be set because the buffer would be pinned - * if someone were writing it. - * - * Note: it's okay to grab the io_in_progress lock while holding - * BufMgrLock. All code paths that acquire this lock pin the - * buffer first; since no one had it pinned (it just came off - * the free list), no one else can have the lock. - */ - StartBufferIO(buf, false); + /* Pin the buffer and then release the buffer spinlock */ + PinBuffer_Locked(buf); - inProgress = TRUE; - - /* - * Write the buffer out, being careful to release BufMgrLock - * while doing the I/O. We also tell FlushBuffer to share-lock - * the buffer before releasing BufMgrLock. This is safe because - * we know no other backend currently has the buffer pinned, - * therefore no one can have it locked either, so we can always - * get the lock without blocking. It is necessary because if - * we release BufMgrLock first, it's possible for someone else - * to pin and exclusive-lock the buffer before we get to the - * share-lock, causing us to block. If the someone else then - * blocks on a lock we hold, deadlock ensues. This has been - * observed to happen when two backends are both trying to split - * btree index pages, and the second one just happens to be - * trying to split the page the first one got from the freelist. - */ - FlushBuffer(buf, NULL, true); + /* Now it's safe to release the freelist lock */ + LWLockRelease(BufFreelistLock); + /* + * If the buffer was dirty, try to write it out. There is a race + * condition here, in that someone might dirty it after we released + * it above, or even while we are writing it out (since our share-lock + * won't prevent hint-bit updates). We will recheck the dirty bit + * after re-locking the buffer header. + */ + if (oldFlags & BM_DIRTY) + { /* - * Somebody could have allocated another buffer for the same - * block we are about to read in. While we flush out the dirty - * buffer, we don't hold the lock and someone could have - * allocated another buffer for the same block. The problem is - * we haven't yet inserted the new tag into the buffer table. - * So we need to check here. -ay 3/95 - * - * Another reason we have to do this is to update - * cdb_found_index, since the CDB could have disappeared from - * B1/B2 list while we were writing. + * We need a share-lock on the buffer contents to write it out + * (else we might write invalid data, eg because someone else + * is compacting the page contents while we write). We must use + * a conditional lock acquisition here to avoid deadlock. Even + * though the buffer was not pinned (and therefore surely not + * locked) when StrategyGetBuffer returned it, someone else could + * have pinned and exclusive-locked it by the time we get here. + * If we try to get the lock unconditionally, we'd block waiting + * for them; if they later block waiting for us, deadlock ensues. + * (This has been observed to happen when two backends are both + * trying to split btree index pages, and the second one just + * happens to be trying to split the page the first one got from + * StrategyGetBuffer.) */ - buf2 = StrategyBufferLookup(&newTag, true, &cdb_found_index); - if (buf2 != NULL) + if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) + { + FlushBuffer(buf, NULL); + LWLockRelease(buf->content_lock); + } + else { /* - * Found it. Someone has already done what we were about - * to do. We'll just handle this as if it were found in - * the buffer pool in the first place. First, give up the - * buffer we were planning to use. + * Someone else has pinned the buffer, so give it up and + * loop back to get another one. */ - TerminateBufferIO(buf, 0); - UnpinBuffer(buf, true); + UnpinBuffer(buf, true, false /* evidently recently used */ ); + continue; + } + } - buf = buf2; + /* + * Acquire exclusive mapping lock in preparation for changing + * the buffer's association. + */ + LWLockAcquire(BufMappingLock, LW_EXCLUSIVE); - /* remaining code should match code at top of routine */ + /* + * Try to make a hashtable entry for the buffer under its new tag. + * This could fail because while we were writing someone else + * allocated another buffer for the same block we want to read in. + * Note that we have not yet removed the hashtable entry for the + * old tag. + */ + buf_id = BufTableInsert(&newTag, buf->buf_id); - *foundPtr = TRUE; + if (buf_id >= 0) + { + /* + * Got a collision. Someone has already done what we were about + * to do. We'll just handle this as if it were found in + * the buffer pool in the first place. First, give up the + * buffer we were planning to use. Don't allow it to be + * thrown in the free list (we don't want to hold both + * global locks at once). + */ + UnpinBuffer(buf, true, false); - PinBuffer(buf, true); + /* remaining code should match code at top of routine */ - if (!(buf->flags & BM_VALID)) - { - if (buf->flags & BM_IO_IN_PROGRESS) - { - /* someone else is reading it, wait for them */ - WaitIO(buf); - } - if (!(buf->flags & BM_VALID)) - { - /* - * If we get here, previous attempts to read the - * buffer must have failed ... but we shall - * bravely try again. - */ - *foundPtr = FALSE; - StartBufferIO(buf, true); - } - } + buf = &BufferDescriptors[buf_id]; - LWLockRelease(BufMgrLock); + valid = PinBuffer(buf); - return buf; - } + /* Can release the mapping lock as soon as we've pinned it */ + LWLockRelease(BufMappingLock); - /* - * Somebody could have pinned the buffer while we were doing - * the I/O and had given up the BufMgrLock. If so, we can't - * recycle this buffer --- we need to clear the I/O flags, - * remove our pin and choose a new victim buffer. Similarly, - * we have to start over if somebody re-dirtied the buffer. - */ - if (buf->refcount > 1 || buf->flags & BM_DIRTY || buf->cntxDirty) + *foundPtr = TRUE; + + if (!valid) { - TerminateBufferIO(buf, 0); - UnpinBuffer(buf, true); - inProgress = FALSE; - buf = NULL; + /* + * We can only get here if (a) someone else is still reading + * in the page, or (b) a previous read attempt failed. We + * have to wait for any active read attempt to finish, and + * then set up our own read attempt if the page is still not + * BM_VALID. StartBufferIO does it all. + */ + if (StartBufferIO(buf, true)) + { + /* + * If we get here, previous attempts to read the buffer + * must have failed ... but we shall bravely try again. + */ + *foundPtr = FALSE; + } } + + return buf; } - } while (buf == NULL); - /* - * At this point we should have the sole pin on a non-dirty buffer and - * we may or may not already have the BM_IO_IN_PROGRESS flag set. - */ + /* + * Need to lock the buffer header too in order to change its tag. + */ + LockBufHdr_NoHoldoff(buf); + + /* + * Somebody could have pinned or re-dirtied the buffer while we were + * doing the I/O and making the new hashtable entry. If so, we + * can't recycle this buffer; we must undo everything we've done and + * start over with a new victim buffer. + */ + if (buf->refcount == 1 && !(buf->flags & BM_DIRTY)) + break; + + UnlockBufHdr_NoHoldoff(buf); + BufTableDelete(&newTag); + LWLockRelease(BufMappingLock); + UnpinBuffer(buf, true, false /* evidently recently used */ ); + } /* - * Tell the buffer replacement strategy that we are replacing the - * buffer content. Then rename the buffer. Clearing BM_VALID here is - * necessary, clearing the dirtybits is just paranoia. + * Okay, it's finally safe to rename the buffer. + * + * Clearing BM_VALID here is necessary, clearing the dirtybits + * is just paranoia. We also clear the usage_count since any + * recency of use of the old content is no longer relevant. */ - StrategyReplaceBuffer(buf, &newTag, cdb_found_index, cdb_replace_index); + oldTag = buf->tag; + oldFlags = buf->flags; buf->tag = newTag; buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); - buf->cntxDirty = false; + buf->flags |= BM_TAG_VALID; + buf->usage_count = 0; + + UnlockBufHdr_NoHoldoff(buf); + + if (oldFlags & BM_TAG_VALID) + BufTableDelete(&oldTag); + + LWLockRelease(BufMappingLock); /* - * Buffer contents are currently invalid. Have to mark IO IN PROGRESS - * so no one fiddles with them until the read completes. We may have - * already marked it, in which case we just flip from write to read - * status. + * Buffer contents are currently invalid. Try to get the io_in_progress + * lock. If StartBufferIO returns false, then someone else managed + * to read it before we did, so there's nothing left for BufferAlloc() + * to do. */ - if (!inProgress) - StartBufferIO(buf, true); + if (StartBufferIO(buf, true)) + *foundPtr = FALSE; else - ContinueBufferIO(buf, true); - - LWLockRelease(BufMgrLock); + *foundPtr = TRUE; return buf; } /* + * InvalidateBuffer -- mark a shared buffer invalid and return it to the + * freelist. + * + * The buffer header spinlock must be held at entry. We drop it before + * returning. (This is sane because the caller must have locked the + * buffer in order to be sure it should be dropped.) + * + * This is used only in contexts such as dropping a relation. We assume + * that no other backend could possibly be interested in using the page, + * so the only reason the buffer might be pinned is if someone else is + * trying to write it out. We have to let them finish before we can + * reclaim the buffer. + * + * The buffer could get reclaimed by someone else while we are waiting + * to acquire the necessary locks; if so, don't mess it up. + */ +static void +InvalidateBuffer(BufferDesc *buf) +{ + BufferTag oldTag; + BufFlags oldFlags; + + /* Save the original buffer tag before dropping the spinlock */ + oldTag = buf->tag; + + UnlockBufHdr(buf); + +retry: + /* + * Acquire exclusive mapping lock in preparation for changing + * the buffer's association. + */ + LWLockAcquire(BufMappingLock, LW_EXCLUSIVE); + + /* Re-lock the buffer header (NoHoldoff since we have an LWLock) */ + LockBufHdr_NoHoldoff(buf); + + /* If it's changed while we were waiting for lock, do nothing */ + if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) + { + UnlockBufHdr_NoHoldoff(buf); + LWLockRelease(BufMappingLock); + return; + } + + /* + * We assume the only reason for it to be pinned is that someone else + * is flushing the page out. Wait for them to finish. (This could be + * an infinite loop if the refcount is messed up... it would be nice + * to time out after awhile, but there seems no way to be sure how + * many loops may be needed. Note that if the other guy has pinned + * the buffer but not yet done StartBufferIO, WaitIO will fall through + * and we'll effectively be busy-looping here.) + */ + if (buf->refcount != 0) + { + UnlockBufHdr_NoHoldoff(buf); + LWLockRelease(BufMappingLock); + WaitIO(buf); + goto retry; + } + + /* + * Clear out the buffer's tag and flags. We must do this to ensure + * that linear scans of the buffer array don't think the buffer is valid. + */ + oldFlags = buf->flags; + CLEAR_BUFFERTAG(buf->tag); + buf->flags = 0; + buf->usage_count = 0; + + UnlockBufHdr_NoHoldoff(buf); + + /* + * Remove the buffer from the lookup hashtable, if it was in there. + */ + if (oldFlags & BM_TAG_VALID) + BufTableDelete(&oldTag); + + /* + * Avoid accepting a cancel interrupt when we release the mapping lock; + * that would leave the buffer free but not on the freelist. (Which would + * not be fatal, since it'd get picked up again by the clock scanning + * code, but we'd rather be sure it gets to the freelist.) + */ + HOLD_INTERRUPTS(); + + LWLockRelease(BufMappingLock); + + /* + * Insert the buffer at the head of the list of free buffers. + */ + StrategyFreeBuffer(buf, true); + + RESUME_INTERRUPTS(); +} + +/* * write_buffer -- common functionality for * WriteBuffer and WriteNoReleaseBuffer */ static void -write_buffer(Buffer buffer, bool release) +write_buffer(Buffer buffer, bool unpin) { BufferDesc *bufHdr; @@ -503,7 +604,7 @@ write_buffer(Buffer buffer, bool release) if (BufferIsLocal(buffer)) { - WriteLocalBuffer(buffer, release); + WriteLocalBuffer(buffer, unpin); return; } @@ -511,7 +612,8 @@ write_buffer(Buffer buffer, bool release) Assert(PrivateRefCount[buffer - 1] > 0); - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + LockBufHdr(bufHdr); + Assert(bufHdr->refcount > 0); /* @@ -522,9 +624,10 @@ write_buffer(Buffer buffer, bool release) bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - if (release) - UnpinBuffer(bufHdr, true); - LWLockRelease(BufMgrLock); + UnlockBufHdr(bufHdr); + + if (unpin) + UnpinBuffer(bufHdr, true, true); } /* @@ -555,21 +658,16 @@ WriteNoReleaseBuffer(Buffer buffer) /* * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer() - * to save a lock release/acquire. * - * Also, if the passed buffer is valid and already contains the desired block - * number, we simply return it without ever acquiring the lock at all. - * Since the passed buffer must be pinned, it's OK to examine its block - * number without getting the lock first. + * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock + * compared to calling the two routines separately. Now it's mainly just + * a convenience function. However, if the passed buffer is valid and + * already contains the desired block, we just return it as-is; and that + * does save considerable work compared to a full release and reacquire. * * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old * buffer actually needs to be released. This case is the same as ReadBuffer, * but can save some tests in the caller. - * - * Also note: while it will work to call this routine with blockNum == P_NEW, - * it's best to avoid doing so, since that would result in calling - * smgrnblocks() while holding the bufmgr lock, hence some loss of - * concurrency. */ Buffer ReleaseAndReadBuffer(Buffer buffer, @@ -588,235 +686,313 @@ ReleaseAndReadBuffer(Buffer buffer, RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) return buffer; ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); - /* owner now has a free slot, so no need for Enlarge() */ LocalRefCount[-buffer - 1]--; + if (LocalRefCount[-buffer - 1] == 0 && + bufHdr->usage_count < BM_MAX_USAGE_COUNT) + bufHdr->usage_count++; } else { Assert(PrivateRefCount[buffer - 1] > 0); bufHdr = &BufferDescriptors[buffer - 1]; + /* we have pin, so it's ok to examine tag without spinlock */ if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) return buffer; - ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); - /* owner now has a free slot, so no need for Enlarge() */ - if (PrivateRefCount[buffer - 1] > 1) - PrivateRefCount[buffer - 1]--; - else - { - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - UnpinBuffer(bufHdr, false); - return ReadBufferInternal(relation, blockNum, true); - } + UnpinBuffer(bufHdr, true, true); } } - else - ResourceOwnerEnlargeBuffers(CurrentResourceOwner); - return ReadBufferInternal(relation, blockNum, false); + return ReadBuffer(relation, blockNum); } /* * PinBuffer -- make buffer unavailable for replacement. * * This should be applied only to shared buffers, never local ones. - * Bufmgr lock must be held by caller. * - * Most but not all callers want CurrentResourceOwner to be adjusted. * Note that ResourceOwnerEnlargeBuffers must have been done already. + * + * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows + * some callers to avoid an extra spinlock cycle. + */ +static bool +PinBuffer(BufferDesc *buf) +{ + int b = buf->buf_id; + bool result; + + if (PrivateRefCount[b] == 0) + { + /* + * Use NoHoldoff here because we don't want the unlock to be a + * potential place to honor a QueryCancel request. + * (The caller should be holding off interrupts anyway.) + */ + LockBufHdr_NoHoldoff(buf); + buf->refcount++; + result = (buf->flags & BM_VALID) != 0; + UnlockBufHdr_NoHoldoff(buf); + } + else + { + /* If we previously pinned the buffer, it must surely be valid */ + result = true; + } + PrivateRefCount[b]++; + Assert(PrivateRefCount[b] > 0); + ResourceOwnerRememberBuffer(CurrentResourceOwner, + BufferDescriptorGetBuffer(buf)); + return result; +} + +/* + * PinBuffer_Locked -- as above, but caller already locked the buffer header. + * The spinlock is released before return. + * + * Note: use of this routine is frequently mandatory, not just an optimization + * to save a spin lock/unlock cycle, because we need to pin a buffer before + * its state can change under us. */ static void -PinBuffer(BufferDesc *buf, bool fixOwner) +PinBuffer_Locked(BufferDesc *buf) { - int b = BufferDescriptorGetBuffer(buf) - 1; + int b = buf->buf_id; if (PrivateRefCount[b] == 0) buf->refcount++; + /* NoHoldoff since we mustn't accept cancel interrupt here */ + UnlockBufHdr_NoHoldoff(buf); PrivateRefCount[b]++; Assert(PrivateRefCount[b] > 0); - if (fixOwner) - ResourceOwnerRememberBuffer(CurrentResourceOwner, - BufferDescriptorGetBuffer(buf)); + ResourceOwnerRememberBuffer(CurrentResourceOwner, + BufferDescriptorGetBuffer(buf)); + /* Now we can accept cancel */ + RESUME_INTERRUPTS(); } /* * UnpinBuffer -- make buffer available for replacement. * * This should be applied only to shared buffers, never local ones. - * Bufmgr lock must be held by caller. * * Most but not all callers want CurrentResourceOwner to be adjusted. + * + * If we are releasing a buffer during VACUUM, and it's not been otherwise + * used recently, and trashOK is true, send the buffer to the freelist. */ static void -UnpinBuffer(BufferDesc *buf, bool fixOwner) +UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK) { - int b = BufferDescriptorGetBuffer(buf) - 1; + int b = buf->buf_id; if (fixOwner) ResourceOwnerForgetBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); - Assert(buf->refcount > 0); Assert(PrivateRefCount[b] > 0); PrivateRefCount[b]--; if (PrivateRefCount[b] == 0) { - buf->refcount--; + bool trash_buffer = false; + /* I'd better not still hold any locks on the buffer */ - Assert(!LWLockHeldByMe(buf->cntx_lock)); + Assert(!LWLockHeldByMe(buf->content_lock)); Assert(!LWLockHeldByMe(buf->io_in_progress_lock)); - } - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && - buf->refcount == 1) - { - /* we just released the last pin other than the waiter's */ - buf->flags &= ~BM_PIN_COUNT_WAITER; - ProcSendSignal(buf->wait_backend_id); - } - else - { - /* do nothing */ + /* NoHoldoff ensures we don't lose control before sending signal */ + LockBufHdr_NoHoldoff(buf); + + /* Decrement the shared reference count */ + Assert(buf->refcount > 0); + buf->refcount--; + + /* Mark the buffer recently used, unless we are in VACUUM */ + if (!strategy_hint_vacuum) + { + if (buf->usage_count < BM_MAX_USAGE_COUNT) + buf->usage_count++; + } + else if (trashOK && + buf->refcount == 0 && + buf->usage_count == 0) + trash_buffer = true; + + if ((buf->flags & BM_PIN_COUNT_WAITER) && + buf->refcount == 1) + { + /* we just released the last pin other than the waiter's */ + BackendId wait_backend_id = buf->wait_backend_id; + + buf->flags &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr_NoHoldoff(buf); + ProcSendSignal(wait_backend_id); + } + else + UnlockBufHdr_NoHoldoff(buf); + + /* + * If VACUUM is releasing an otherwise-unused buffer, send it to + * the freelist for near-term reuse. We put it at the tail so that + * it won't be used before any invalid buffers that may exist. + */ + if (trash_buffer) + StrategyFreeBuffer(buf, false); } } /* - * BufferSync -- Write out dirty buffers in the pool. + * BufferSync -- Write out all dirty buffers in the pool. * - * This is called at checkpoint time to write out all dirty shared buffers, - * and by the background writer process to write out some of the dirty blocks. - * percent/maxpages should be -1 in the former case, and limit values (>= 0) - * in the latter. - * - * Returns the number of buffers written. + * This is called at checkpoint time to write out all dirty shared buffers. */ -int -BufferSync(int percent, int maxpages) +void +BufferSync(void) { - BufferDesc **dirty_buffers; - BufferTag *buftags; - int num_buffer_dirty; - int i; - - /* If either limit is zero then we are disabled from doing anything... */ - if (percent == 0 || maxpages == 0) - return 0; + int buf_id; + int num_to_scan; /* - * Get a list of all currently dirty buffers and how many there are. - * We do not flush buffers that get dirtied after we started. They - * have to wait until the next checkpoint. + * Find out where to start the circular scan. */ - dirty_buffers = (BufferDesc **) palloc(NBuffers * sizeof(BufferDesc *)); - buftags = (BufferTag *) palloc(NBuffers * sizeof(BufferTag)); + buf_id = StrategySyncStart(); - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - num_buffer_dirty = StrategyDirtyBufferList(dirty_buffers, buftags, - NBuffers); + /* Make sure we can handle the pin inside SyncOneBuffer */ + ResourceOwnerEnlargeBuffers(CurrentResourceOwner); /* - * If called by the background writer, we are usually asked to only - * write out some portion of dirty buffers now, to prevent the IO - * storm at checkpoint time. + * Loop over all buffers. */ - if (percent > 0) + num_to_scan = NBuffers; + while (num_to_scan-- > 0) { - Assert(percent <= 100); - num_buffer_dirty = (num_buffer_dirty * percent + 99) / 100; + (void) SyncOneBuffer(buf_id, false); + if (++buf_id >= NBuffers) + buf_id = 0; } - if (maxpages > 0 && num_buffer_dirty > maxpages) - num_buffer_dirty = maxpages; +} - /* Make sure we can handle the pin inside the loop */ +/* + * BgBufferSync -- Write out some dirty buffers in the pool. + * + * This is called periodically by the background writer process. + */ +void +BgBufferSync(void) +{ + static int buf_id1 = 0; + int buf_id2; + int num_to_scan; + int num_written; + + /* Make sure we can handle the pin inside SyncOneBuffer */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); /* - * Loop over buffers to be written. Note the BufMgrLock is held at - * loop top, but is released and reacquired within FlushBuffer, so we - * aren't holding it long. + * To minimize work at checkpoint time, we want to try to keep all the + * buffers clean; this motivates a scan that proceeds sequentially through + * all buffers. But we are also charged with ensuring that buffers that + * will be recycled soon are clean when needed; these buffers are the + * ones just ahead of the StrategySyncStart point. We make a separate + * scan through those. */ - for (i = 0; i < num_buffer_dirty; i++) - { - BufferDesc *bufHdr = dirty_buffers[i]; - /* - * Check it is still the same page and still needs writing. - * - * We can check bufHdr->cntxDirty here *without* holding any lock on - * buffer context as long as we set this flag in access methods - * *before* logging changes with XLogInsert(): if someone will set - * cntxDirty just after our check we don't worry because of our - * checkpoint.redo points before log record for upcoming changes - * and so we are not required to write such dirty buffer. - */ - if (!(bufHdr->flags & BM_VALID)) - continue; - if (!BUFFERTAGS_EQUAL(bufHdr->tag, buftags[i])) - continue; - if (!(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)) - continue; + /* + * This loop runs over all buffers, including pinned ones. The + * starting point advances through the buffer pool on successive calls. + */ + if (bgwriter_all_percent > 0.0 && bgwriter_all_maxpages > 0) + { + num_to_scan = (int) ((NBuffers * bgwriter_all_percent + 99) / 100); + num_written = 0; - /* - * IO synchronization. Note that we do it with unpinned buffer to - * avoid conflicts with FlushRelationBuffers. - */ - if (bufHdr->flags & BM_IO_IN_PROGRESS) + while (num_to_scan-- > 0) { - WaitIO(bufHdr); - /* Still need writing? */ - if (!(bufHdr->flags & BM_VALID)) - continue; - if (!BUFFERTAGS_EQUAL(bufHdr->tag, buftags[i])) - continue; - if (!(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)) - continue; + if (SyncOneBuffer(buf_id1, false)) + num_written++; + if (++buf_id1 >= NBuffers) + buf_id1 = 0; + if (num_written >= bgwriter_all_maxpages) + break; } - - /* - * Here: no one doing IO for this buffer and it's dirty. Pin - * buffer now and set IO state for it *before* acquiring shlock to - * avoid conflicts with FlushRelationBuffers. - */ - PinBuffer(bufHdr, true); - StartBufferIO(bufHdr, false); - - FlushBuffer(bufHdr, NULL, false); - - TerminateBufferIO(bufHdr, 0); - UnpinBuffer(bufHdr, true); } - LWLockRelease(BufMgrLock); + /* + * This loop considers only unpinned buffers close to the clock sweep + * point. + */ + if (bgwriter_lru_percent > 0.0 && bgwriter_lru_maxpages > 0) + { + num_to_scan = (int) ((NBuffers * bgwriter_lru_percent + 99) / 100); + num_written = 0; - pfree(dirty_buffers); - pfree(buftags); + buf_id2 = StrategySyncStart(); - return num_buffer_dirty; + while (num_to_scan-- > 0) + { + if (SyncOneBuffer(buf_id2, true)) + num_written++; + if (++buf_id2 >= NBuffers) + buf_id2 = 0; + if (num_written >= bgwriter_lru_maxpages) + break; + } + } } /* - * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared. + * SyncOneBuffer -- process a single buffer during syncing. + * + * If skip_pinned is true, we don't write currently-pinned buffers, nor + * buffers marked recently used, as these are not replacement candidates. * - * Should be entered with buffer manager lock held; releases it before - * waiting and re-acquires it afterwards. + * Returns true if buffer was written, else false. (This could be in error + * if FlushBuffers finds the buffer clean after locking it, but we don't + * care all that much.) + * + * Note: caller must have done ResourceOwnerEnlargeBuffers. */ -static void -WaitIO(BufferDesc *buf) +static bool +SyncOneBuffer(int buf_id, bool skip_pinned) { + BufferDesc *bufHdr = &BufferDescriptors[buf_id]; + /* - * Changed to wait until there's no IO - Inoue 01/13/2000 + * Check whether buffer needs writing. * - * Note this is *necessary* because an error abort in the process doing - * I/O could release the io_in_progress_lock prematurely. See - * AbortBufferIO. + * We can make this check without taking the buffer content lock + * so long as we mark pages dirty in access methods *before* logging + * changes with XLogInsert(): if someone marks the buffer dirty + * just after our check we don't worry because our checkpoint.redo + * points before log record for upcoming changes and so we are not + * required to write such dirty buffer. */ - while ((buf->flags & BM_IO_IN_PROGRESS) != 0) + LockBufHdr(bufHdr); + if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) { - LWLockRelease(BufMgrLock); - LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); - LWLockRelease(buf->io_in_progress_lock); - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + UnlockBufHdr(bufHdr); + return false; } + if (skip_pinned && + (bufHdr->refcount != 0 || bufHdr->usage_count != 0)) + { + UnlockBufHdr(bufHdr); + return false; + } + + /* + * Pin it, share-lock it, write it. (FlushBuffer will do nothing + * if the buffer is clean by the time we've locked it.) + */ + PinBuffer_Locked(bufHdr); + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + + FlushBuffer(bufHdr, NULL); + + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true, false /* don't change freelist */ ); + + return true; } @@ -888,6 +1064,9 @@ AtEOXact_Buffers(bool isCommit) AtEOXact_LocalBuffers(isCommit); #endif + + /* Make sure we reset the strategy hint in case VACUUM errored out */ + StrategyHintVacuum(false); } /* @@ -912,9 +1091,7 @@ AtProcExit_Buffers(void) * here, it suggests that ResourceOwners are messed up. */ PrivateRefCount[i] = 1; /* make sure we release shared pin */ - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - UnpinBuffer(buf, false); - LWLockRelease(BufMgrLock); + UnpinBuffer(buf, false, false /* don't change freelist */ ); Assert(PrivateRefCount[i] == 0); } } @@ -941,6 +1118,7 @@ PrintBufferLeakWarning(Buffer buffer) loccount = PrivateRefCount[buffer - 1]; } + /* theoretically we should lock the bufhdr here */ elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)", @@ -961,7 +1139,7 @@ PrintBufferLeakWarning(Buffer buffer) void FlushBufferPool(void) { - BufferSync(-1, -1); + BufferSync(); smgrsync(); } @@ -988,12 +1166,17 @@ BufmgrCommit(void) BlockNumber BufferGetBlockNumber(Buffer buffer) { + BufferDesc *bufHdr; + Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) - return LocalBufferDescriptors[-buffer - 1].tag.blockNum; + bufHdr = &(LocalBufferDescriptors[-buffer - 1]); else - return BufferDescriptors[buffer - 1].tag.blockNum; + bufHdr = &BufferDescriptors[buffer - 1]; + + /* pinned, so OK to read tag without spinlock */ + return bufHdr->tag.blockNum; } /* @@ -1013,7 +1196,7 @@ BufferGetFileNode(Buffer buffer) else bufHdr = &BufferDescriptors[buffer - 1]; - return (bufHdr->tag.rnode); + return bufHdr->tag.rnode; } /* @@ -1026,41 +1209,28 @@ BufferGetFileNode(Buffer buffer) * However, we will need to force the changes to disk via fsync before * we can checkpoint WAL. * - * BufMgrLock must be held at entry, and the buffer must be pinned. The - * caller is also responsible for doing StartBufferIO/TerminateBufferIO. + * The caller must hold a pin on the buffer and have share-locked the + * buffer contents. (Note: a share-lock does not prevent updates of + * hint bits in the buffer, so the page could change while the write + * is in progress, but we assume that that will not invalidate the data + * written.) * * If the caller has an smgr reference for the buffer's relation, pass it - * as the second parameter. If not, pass NULL. (Do not open relation - * while holding BufMgrLock!) - * - * When earlylock is TRUE, we grab the per-buffer sharelock before releasing - * BufMgrLock, rather than after. Normally this would be a bad idea since - * we might deadlock, but it is safe and necessary when called from - * BufferAlloc() --- see comments therein. + * as the second parameter. If not, pass NULL. */ static void -FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock) +FlushBuffer(BufferDesc *buf, SMgrRelation reln) { - Buffer buffer = BufferDescriptorGetBuffer(buf); XLogRecPtr recptr; ErrorContextCallback errcontext; - /* Transpose cntxDirty into flags while holding BufMgrLock */ - buf->cntxDirty = false; - buf->flags |= BM_DIRTY; - - /* To check if block content changed while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; - /* - * If earlylock, grab buffer sharelock before anyone else could re-lock - * the buffer. + * Acquire the buffer's io_in_progress lock. If StartBufferIO returns + * false, then someone else flushed the buffer before we could, so + * we need not do anything. */ - if (earlylock) - LockBuffer(buffer, BUFFER_LOCK_SHARE); - - /* Release BufMgrLock while doing xlog work */ - LWLockRelease(BufMgrLock); + if (!StartBufferIO(buf, false)) + return; /* Setup error traceback support for ereport() */ errcontext.callback = buffer_write_error_callback; @@ -1068,20 +1238,12 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock) errcontext.previous = error_context_stack; error_context_stack = &errcontext; - /* Find smgr relation for buffer while holding minimal locks */ + /* Find smgr relation for buffer */ if (reln == NULL) reln = smgropen(buf->tag.rnode); /* - * Protect buffer content against concurrent update. (Note that - * hint-bit updates can still occur while the write is in progress, - * but we assume that that will not invalidate the data written.) - */ - if (!earlylock) - LockBuffer(buffer, BUFFER_LOCK_SHARE); - - /* - * Force XLOG flush for buffer' LSN. This implements the basic WAL + * Force XLOG flush up to buffer's LSN. This implements the basic WAL * rule that log updates must hit disk before any of the data-file * changes they describe do. */ @@ -1090,35 +1252,30 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, bool earlylock) /* * Now it's safe to write buffer to disk. Note that no one else should - * have been able to write it while we were busy with locking and log - * flushing because caller has set the IO flag. - * - * It would be better to clear BM_JUST_DIRTIED right here, but we'd have - * to reacquire the BufMgrLock and it doesn't seem worth it. + * have been able to write it while we were busy with log flushing + * because we have the io_in_progress lock. */ + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + LockBufHdr_NoHoldoff(buf); + buf->flags &= ~BM_JUST_DIRTIED; + UnlockBufHdr_NoHoldoff(buf); + smgrwrite(reln, buf->tag.blockNum, - (char *) MAKE_PTR(buf->data), + (char *) BufHdrGetBlock(buf), false); - /* Pop the error context stack */ - error_context_stack = errcontext.previous; - - /* - * Release the per-buffer readlock, reacquire BufMgrLock. - */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - BufferFlushCount++; /* - * If this buffer was marked by someone as DIRTY while we were - * flushing it out we must not clear DIRTY flag - vadim 01/17/97 + * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) + * and end the io_in_progress state. */ - if (!(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~BM_DIRTY; + TerminateBufferIO(buf, true, 0); + + /* Pop the error context stack */ + error_context_stack = errcontext.previous; } /* @@ -1210,62 +1367,24 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp, bufHdr->tag.rnode.dbNode, bufHdr->tag.rnode.relNode, LocalRefCount[i]); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); - bufHdr->cntxDirty = false; - bufHdr->tag.rnode.relNode = InvalidOid; + CLEAR_BUFFERTAG(bufHdr->tag); + bufHdr->flags = 0; + bufHdr->usage_count = 0; } } return; } - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - - for (i = 1; i <= NBuffers; i++) + for (i = 0; i < NBuffers; i++) { - bufHdr = &BufferDescriptors[i - 1]; -recheck: + bufHdr = &BufferDescriptors[i]; + LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.blockNum >= firstDelBlock) - { - /* - * If there is I/O in progress, better wait till it's done; - * don't want to delete the relation out from under someone - * who's just trying to flush the buffer! - */ - if (bufHdr->flags & BM_IO_IN_PROGRESS) - { - WaitIO(bufHdr); - - /* - * By now, the buffer very possibly belongs to some other - * rel, so check again before proceeding. - */ - goto recheck; - } - - /* - * There should be no pin on the buffer. - */ - if (bufHdr->refcount != 0) - elog(ERROR, "block %u of %u/%u/%u is still referenced (private %d, global %u)", - bufHdr->tag.blockNum, - bufHdr->tag.rnode.spcNode, - bufHdr->tag.rnode.dbNode, - bufHdr->tag.rnode.relNode, - PrivateRefCount[i - 1], bufHdr->refcount); - - /* Now we can do what we came for */ - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); - bufHdr->cntxDirty = false; - - /* - * And mark the buffer as no longer occupied by this rel. - */ - StrategyInvalidateBuffer(bufHdr); - } + InvalidateBuffer(bufHdr); /* releases spinlock */ + else + UnlockBufHdr(bufHdr); } - - LWLockRelease(BufMgrLock); } /* --------------------------------------------------------------------- @@ -1285,47 +1404,20 @@ DropBuffers(Oid dbid) int i; BufferDesc *bufHdr; - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + /* + * We needn't consider local buffers, since by assumption the target + * database isn't our own. + */ - for (i = 1; i <= NBuffers; i++) + for (i = 0; i < NBuffers; i++) { - bufHdr = &BufferDescriptors[i - 1]; -recheck: + bufHdr = &BufferDescriptors[i]; + LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) - { - /* - * If there is I/O in progress, better wait till it's done; - * don't want to delete the database out from under someone - * who's just trying to flush the buffer! - */ - if (bufHdr->flags & BM_IO_IN_PROGRESS) - { - WaitIO(bufHdr); - - /* - * By now, the buffer very possibly belongs to some other - * DB, so check again before proceeding. - */ - goto recheck; - } - /* Now we can do what we came for */ - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); - bufHdr->cntxDirty = false; - - /* - * The thing should be free, if caller has checked that no - * backends are running in that database. - */ - Assert(bufHdr->refcount == 0); - - /* - * And mark the buffer as no longer occupied by this page. - */ - StrategyInvalidateBuffer(bufHdr); - } + InvalidateBuffer(bufHdr); /* releases spinlock */ + else + UnlockBufHdr(bufHdr); } - - LWLockRelease(BufMgrLock); } /* ----------------------------------------------------------------- @@ -1342,32 +1434,17 @@ PrintBufferDescs(void) int i; BufferDesc *buf = BufferDescriptors; - if (IsUnderPostmaster) - { - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - for (i = 0; i < NBuffers; ++i, ++buf) - { - elog(LOG, - "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, " - "blockNum=%u, flags=0x%x, refcount=%u %d)", - i, buf->freeNext, buf->freePrev, - buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, - buf->tag.rnode.relNode, - buf->tag.blockNum, buf->flags, - buf->refcount, PrivateRefCount[i]); - } - LWLockRelease(BufMgrLock); - } - else + for (i = 0; i < NBuffers; ++i, ++buf) { - /* interactive backend */ - for (i = 0; i < NBuffers; ++i, ++buf) - { - printf("[%-2d] (%u/%u/%u, %u) flags=0x%x, refcount=%u %d)\n", - i, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, - buf->tag.rnode.relNode, buf->tag.blockNum, - buf->flags, buf->refcount, PrivateRefCount[i]); - } + /* theoretically we should lock the bufhdr here */ + elog(LOG, + "[%02d] (freeNext=%d, rel=%u/%u/%u, " + "blockNum=%u, flags=0x%x, refcount=%u %d)", + i, buf->freeNext, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, + buf->tag.blockNum, buf->flags, + buf->refcount, PrivateRefCount[i]); } } #endif @@ -1379,20 +1456,21 @@ PrintPinnedBufs(void) int i; BufferDesc *buf = BufferDescriptors; - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); for (i = 0; i < NBuffers; ++i, ++buf) { if (PrivateRefCount[i] > 0) - elog(NOTICE, - "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, " + { + /* theoretically we should lock the bufhdr here */ + elog(LOG, + "[%02d] (freeNext=%d, rel=%u/%u/%u, " "blockNum=%u, flags=0x%x, refcount=%u %d)", - i, buf->freeNext, buf->freePrev, + i, buf->freeNext, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); + } } - LWLockRelease(BufMgrLock); } #endif @@ -1451,8 +1529,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) bufHdr = &LocalBufferDescriptors[i]; if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node)) { - if ((bufHdr->flags & BM_VALID) && - (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)) + if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ErrorContextCallback errcontext; @@ -1464,11 +1541,10 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) smgrwrite(rel->rd_smgr, bufHdr->tag.blockNum, - (char *) MAKE_PTR(bufHdr->data), + (char *) LocalBufHdrGetBlock(bufHdr), true); bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); - bufHdr->cntxDirty = false; /* Pop the error context stack */ error_context_stack = errcontext.previous; @@ -1478,7 +1554,11 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) RelationGetRelationName(rel), firstDelBlock, bufHdr->tag.blockNum, LocalRefCount[i]); if (bufHdr->tag.blockNum >= firstDelBlock) - bufHdr->tag.rnode.relNode = InvalidOid; + { + CLEAR_BUFFERTAG(bufHdr->tag); + bufHdr->flags = 0; + bufHdr->usage_count = 0; + } } } @@ -1488,46 +1568,40 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) /* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - for (i = 0; i < NBuffers; i++) { bufHdr = &BufferDescriptors[i]; + recheck: + LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node)) { - if ((bufHdr->flags & BM_VALID) && - (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)) + if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { - PinBuffer(bufHdr, true); - /* Someone else might be flushing buffer */ - if (bufHdr->flags & BM_IO_IN_PROGRESS) - WaitIO(bufHdr); - /* Still dirty? */ - if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty) - { - StartBufferIO(bufHdr, false); - - FlushBuffer(bufHdr, rel->rd_smgr, false); - - TerminateBufferIO(bufHdr, 0); - } - UnpinBuffer(bufHdr, true); - if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty) - elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u was re-dirtied", - RelationGetRelationName(rel), firstDelBlock, - bufHdr->tag.blockNum); + PinBuffer_Locked(bufHdr); + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + FlushBuffer(bufHdr, rel->rd_smgr); + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true, false /* no freelist change */ ); + /* + * As soon as we unpin, it's possible for someone to take + * the buffer away from us; so loop back to re-lock and + * re-check if it still belongs to the target relation. + */ + goto recheck; } - if (bufHdr->refcount != 0) - elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u is referenced (private %d, global %u)", - RelationGetRelationName(rel), firstDelBlock, - bufHdr->tag.blockNum, - PrivateRefCount[i], bufHdr->refcount); + /* + * Even though it's not dirty, it could still be pinned because + * TerminateIO and UnpinBuffer are separate actions. Hence, + * we can't error out on nonzero reference count here. + */ if (bufHdr->tag.blockNum >= firstDelBlock) - StrategyInvalidateBuffer(bufHdr); + InvalidateBuffer(bufHdr); /* releases spinlock */ + else + UnlockBufHdr(bufHdr); } + else + UnlockBufHdr(bufHdr); } - - LWLockRelease(BufMgrLock); } /* @@ -1547,7 +1621,11 @@ ReleaseBuffer(Buffer buffer) if (BufferIsLocal(buffer)) { Assert(LocalRefCount[-buffer - 1] > 0); + bufHdr = &LocalBufferDescriptors[-buffer - 1]; LocalRefCount[-buffer - 1]--; + if (LocalRefCount[-buffer - 1] == 0 && + bufHdr->usage_count < BM_MAX_USAGE_COUNT) + bufHdr->usage_count++; return; } @@ -1558,11 +1636,7 @@ ReleaseBuffer(Buffer buffer) if (PrivateRefCount[buffer - 1] > 1) PrivateRefCount[buffer - 1]--; else - { - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); - UnpinBuffer(bufHdr, false); - LWLockRelease(BufMgrLock); - } + UnpinBuffer(bufHdr, false, true); } /* @@ -1585,88 +1659,6 @@ IncrBufferRefCount(Buffer buffer) PrivateRefCount[buffer - 1]++; } -#ifdef NOT_USED -void -IncrBufferRefCount_Debug(char *file, int line, Buffer buffer) -{ - IncrBufferRefCount(buffer); - if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer)) - { - BufferDesc *buf = &BufferDescriptors[buffer - 1]; - - fprintf(stderr, - "PIN(Incr) %d rel = %u/%u/%u, blockNum = %u, " - "refcount = %d, file: %s, line: %d\n", - buffer, - buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, - buf->tag.rnode.relNode, buf->tag.blockNum, - PrivateRefCount[buffer - 1], file, line); - } -} -#endif - -#ifdef NOT_USED -void -ReleaseBuffer_Debug(char *file, int line, Buffer buffer) -{ - ReleaseBuffer(buffer); - if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer)) - { - BufferDesc *buf = &BufferDescriptors[buffer - 1]; - - fprintf(stderr, - "UNPIN(Rel) %d rel = %u/%u/%u, blockNum = %u, " - "refcount = %d, file: %s, line: %d\n", - buffer, - buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, - buf->tag.rnode.relNode, buf->tag.blockNum, - PrivateRefCount[buffer - 1], file, line); - } -} -#endif - -#ifdef NOT_USED -Buffer -ReleaseAndReadBuffer_Debug(char *file, - int line, - Buffer buffer, - Relation relation, - BlockNumber blockNum) -{ - bool bufferValid; - Buffer b; - - bufferValid = BufferIsValid(buffer); - b = ReleaseAndReadBuffer(buffer, relation, blockNum); - if (ShowPinTrace && bufferValid && BufferIsLocal(buffer) - && is_userbuffer(buffer)) - { - BufferDesc *buf = &BufferDescriptors[buffer - 1]; - - fprintf(stderr, - "UNPIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, " - "refcount = %d, file: %s, line: %d\n", - buffer, - buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, - buf->tag.rnode.relNode, buf->tag.blockNum, - PrivateRefCount[buffer - 1], file, line); - } - if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer)) - { - BufferDesc *buf = &BufferDescriptors[b - 1]; - - fprintf(stderr, - "PIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, " - "refcount = %d, file: %s, line: %d\n", - b, - buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, - buf->tag.rnode.relNode, buf->tag.blockNum, - PrivateRefCount[b - 1], file, line); - } - return b; -} -#endif - /* * SetBufferCommitInfoNeedsSave * @@ -1682,7 +1674,7 @@ ReleaseAndReadBuffer_Debug(char *file, * This routine might get called many times on the same page, if we are making * the first scan after commit of an xact that added/deleted many tuples. * So, be as quick as we can if the buffer is already dirty. We do this by - * not acquiring BufMgrLock if it looks like the status bits are already OK. + * not acquiring spinlock if it looks like the status bits are already OK. * (Note it is okay if someone else clears BM_JUST_DIRTIED immediately after * we look, because the buffer content update is already done and will be * reflected in the I/O.) @@ -1703,23 +1695,25 @@ SetBufferCommitInfoNeedsSave(Buffer buffer) bufHdr = &BufferDescriptors[buffer - 1]; + Assert(PrivateRefCount[buffer - 1] > 0); + if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + LockBufHdr(bufHdr); Assert(bufHdr->refcount > 0); bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - LWLockRelease(BufMgrLock); + UnlockBufHdr(bufHdr); } } /* - * Release buffer context locks for shared buffers. + * Release buffer content locks for shared buffers. * * Used to clean up after errors. * * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care - * of releasing buffer context locks per se; the only thing we need to deal + * of releasing buffer content locks per se; the only thing we need to deal * with here is clearing any PIN_COUNT request that was in progress. */ void @@ -1731,7 +1725,7 @@ UnlockBuffers(void) { HOLD_INTERRUPTS(); /* don't want to die() partway through... */ - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + LockBufHdr_NoHoldoff(buf); /* * Don't complain if flag bit not set; it could have been @@ -1741,18 +1735,19 @@ UnlockBuffers(void) if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_id == MyBackendId) buf->flags &= ~BM_PIN_COUNT_WAITER; - LWLockRelease(BufMgrLock); + + UnlockBufHdr_NoHoldoff(buf); ProcCancelWaitForSignal(); + PinCountWaitBuf = NULL; + RESUME_INTERRUPTS(); } - - PinCountWaitBuf = NULL; } /* - * Acquire or release the cntx_lock for the buffer. + * Acquire or release the content_lock for the buffer. */ void LockBuffer(Buffer buffer, int mode) @@ -1766,27 +1761,29 @@ LockBuffer(Buffer buffer, int mode) buf = &(BufferDescriptors[buffer - 1]); if (mode == BUFFER_LOCK_UNLOCK) - LWLockRelease(buf->cntx_lock); + LWLockRelease(buf->content_lock); else if (mode == BUFFER_LOCK_SHARE) - LWLockAcquire(buf->cntx_lock, LW_SHARED); + LWLockAcquire(buf->content_lock, LW_SHARED); else if (mode == BUFFER_LOCK_EXCLUSIVE) { - LWLockAcquire(buf->cntx_lock, LW_EXCLUSIVE); + LWLockAcquire(buf->content_lock, LW_EXCLUSIVE); /* - * This is not the best place to set cntxDirty flag (eg indices do + * This is not the best place to mark buffer dirty (eg indices do * not always change buffer they lock in excl mode). But please - * remember that it's critical to set cntxDirty *before* logging - * changes with XLogInsert() - see comments in BufferSync(). + * remember that it's critical to set dirty bit *before* logging + * changes with XLogInsert() - see comments in SyncOneBuffer(). */ - buf->cntxDirty = true; + LockBufHdr_NoHoldoff(buf); + buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED); + UnlockBufHdr_NoHoldoff(buf); } else elog(ERROR, "unrecognized buffer lock mode: %d", mode); } /* - * Acquire the cntx_lock for the buffer, but only if we don't have to wait. + * Acquire the content_lock for the buffer, but only if we don't have to wait. * * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode. */ @@ -1801,15 +1798,17 @@ ConditionalLockBuffer(Buffer buffer) buf = &(BufferDescriptors[buffer - 1]); - if (LWLockConditionalAcquire(buf->cntx_lock, LW_EXCLUSIVE)) + if (LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE)) { /* - * This is not the best place to set cntxDirty flag (eg indices do + * This is not the best place to mark buffer dirty (eg indices do * not always change buffer they lock in excl mode). But please - * remember that it's critical to set cntxDirty *before* logging - * changes with XLogInsert() - see comments in BufferSync(). + * remember that it's critical to set dirty bit *before* logging + * changes with XLogInsert() - see comments in SyncOneBuffer(). */ - buf->cntxDirty = true; + LockBufHdr_NoHoldoff(buf); + buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED); + UnlockBufHdr_NoHoldoff(buf); return true; } @@ -1861,25 +1860,25 @@ LockBufferForCleanup(Buffer buffer) { /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + LockBufHdr_NoHoldoff(bufHdr); Assert(bufHdr->refcount > 0); if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - LWLockRelease(BufMgrLock); + UnlockBufHdr_NoHoldoff(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ if (bufHdr->flags & BM_PIN_COUNT_WAITER) { - LWLockRelease(BufMgrLock); + UnlockBufHdr_NoHoldoff(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_id = MyBackendId; bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; - LWLockRelease(BufMgrLock); + UnlockBufHdr_NoHoldoff(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Wait to be signaled by UnpinBuffer() */ ProcWaitForSignal(); @@ -1889,94 +1888,160 @@ LockBufferForCleanup(Buffer buffer) } /* - * Functions for IO error handling + * Functions for buffer I/O handling * - * Note: We assume that nested buffer IO never occurs. + * Note: We assume that nested buffer I/O never occurs. * i.e at most one io_in_progress lock is held per proc. + * + * Also note that these are used only for shared buffers, not local ones. + */ + +/* + * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared. */ +static void +WaitIO(BufferDesc *buf) +{ + /* + * Changed to wait until there's no IO - Inoue 01/13/2000 + * + * Note this is *necessary* because an error abort in the process doing + * I/O could release the io_in_progress_lock prematurely. See + * AbortBufferIO. + */ + for (;;) + { + BufFlags sv_flags; + + /* + * It may not be necessary to acquire the spinlock to check the + * flag here, but since this test is essential for correctness, + * we'd better play it safe. + */ + LockBufHdr(buf); + sv_flags = buf->flags; + UnlockBufHdr(buf); + if (!(sv_flags & BM_IO_IN_PROGRESS)) + break; + LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); + LWLockRelease(buf->io_in_progress_lock); + } +} /* - * Function:StartBufferIO + * StartBufferIO: begin I/O on this buffer * (Assumptions) * My process is executing no IO - * BufMgrLock is held - * BM_IO_IN_PROGRESS mask is not set for the buffer * The buffer is Pinned * - * Because BufMgrLock is held, we are already in an interrupt holdoff here, - * and do not need another. + * In some scenarios there are race conditions in which multiple backends + * could attempt the same I/O operation concurrently. If someone else + * has already started I/O on this buffer then we will block on the + * io_in_progress lock until he's done. + * + * Input operations are only attempted on buffers that are not BM_VALID, + * and output operations only on buffers that are BM_VALID and BM_DIRTY, + * so we can always tell if the work is already done. + * + * Returns TRUE if we successfully marked the buffer as I/O busy, + * FALSE if someone else already did the work. */ -static void +static bool StartBufferIO(BufferDesc *buf, bool forInput) { Assert(!InProgressBuf); - Assert(!(buf->flags & BM_IO_IN_PROGRESS)); + + for (;;) + { + /* + * Grab the io_in_progress lock so that other processes can wait for + * me to finish the I/O. + */ + LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); + + /* NoHoldoff is OK since we now have an LWLock */ + LockBufHdr_NoHoldoff(buf); + + if (!(buf->flags & BM_IO_IN_PROGRESS)) + break; + + /* + * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress + * lock isn't held is if the process doing the I/O is recovering from + * an error (see AbortBufferIO). If that's the case, we must wait for + * him to get unwedged. + */ + UnlockBufHdr_NoHoldoff(buf); + LWLockRelease(buf->io_in_progress_lock); + WaitIO(buf); + } + + /* Once we get here, there is definitely no I/O active on this buffer */ + + if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + { + /* someone else already did the I/O */ + UnlockBufHdr_NoHoldoff(buf); + LWLockRelease(buf->io_in_progress_lock); + return false; + } + buf->flags |= BM_IO_IN_PROGRESS; - LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); + UnlockBufHdr_NoHoldoff(buf); InProgressBuf = buf; IsForInput = forInput; + + return true; } /* - * Function:TerminateBufferIO + * TerminateBufferIO: release a buffer we were doing I/O on * (Assumptions) * My process is executing IO for the buffer - * BufMgrLock is held - * BM_IO_IN_PROGRESS mask is set for the buffer + * BM_IO_IN_PROGRESS bit is set for the buffer + * We hold the buffer's io_in_progress lock * The buffer is Pinned * - * err_flag must be 0 for successful completion and BM_IO_ERROR for failure. + * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the + * buffer's BM_DIRTY flag. This is appropriate when terminating a + * successful write. The check on BM_JUST_DIRTIED is necessary to avoid + * marking the buffer clean if it was re-dirtied while we were writing. * - * Because BufMgrLock is held, we are already in an interrupt holdoff here, - * and do not need another. + * set_flag_bits gets ORed into the buffer's flags. It must include + * BM_IO_ERROR in a failure case. For successful completion it could + * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, int err_flag) +TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) { Assert(buf == InProgressBuf); + + /* NoHoldoff is OK since we must have an LWLock */ + LockBufHdr_NoHoldoff(buf); + Assert(buf->flags & BM_IO_IN_PROGRESS); buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - buf->flags |= err_flag; + if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) + buf->flags &= ~BM_DIRTY; + buf->flags |= set_flag_bits; - LWLockRelease(buf->io_in_progress_lock); + UnlockBufHdr_NoHoldoff(buf); InProgressBuf = NULL; -} -/* - * Function:ContinueBufferIO - * (Assumptions) - * My process is executing IO for the buffer - * BufMgrLock is held - * The buffer is Pinned - * - * Because BufMgrLock is held, we are already in an interrupt holdoff here, - * and do not need another. - */ -static void -ContinueBufferIO(BufferDesc *buf, bool forInput) -{ - Assert(buf == InProgressBuf); - Assert(buf->flags & BM_IO_IN_PROGRESS); - IsForInput = forInput; -} - -#ifdef NOT_USED -void -InitBufferIO(void) -{ - InProgressBuf = NULL; + LWLockRelease(buf->io_in_progress_lock); } -#endif /* - * Clean up any active buffer I/O after an error. - * BufMgrLock isn't held when this function is called, + * AbortBufferIO: Clean up any active buffer I/O after an error. + * + * All LWLocks we might have held have been released, * but we haven't yet released buffer pins, so the buffer is still pinned. * - * If I/O was in progress, we always set BM_IO_ERROR. + * If I/O was in progress, we always set BM_IO_ERROR, even though it's + * possible the error condition wasn't related to the I/O. */ void AbortBufferIO(void) @@ -1994,20 +2059,27 @@ AbortBufferIO(void) */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); - LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); + /* NoHoldoff is OK since we now have an LWLock */ + LockBufHdr_NoHoldoff(buf); Assert(buf->flags & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY || buf->cntxDirty)); + Assert(!(buf->flags & BM_DIRTY)); /* We'd better not think buffer is valid yet */ Assert(!(buf->flags & BM_VALID)); + UnlockBufHdr_NoHoldoff(buf); } else { - Assert(buf->flags & BM_DIRTY || buf->cntxDirty); + BufFlags sv_flags; + + sv_flags = buf->flags; + Assert(sv_flags & BM_DIRTY); + UnlockBufHdr_NoHoldoff(buf); /* Issue notice if this is not the first failure... */ - if (buf->flags & BM_IO_ERROR) + if (sv_flags & BM_IO_ERROR) { + /* Buffer is pinned, so we can read tag without spinlock */ ereport(WARNING, (errcode(ERRCODE_IO_ERROR), errmsg("could not write block %u of %u/%u/%u", @@ -2017,10 +2089,8 @@ AbortBufferIO(void) buf->tag.rnode.relNode), errdetail("Multiple failures --- write error may be permanent."))); } - buf->flags |= BM_DIRTY; } - TerminateBufferIO(buf, BM_IO_ERROR); - LWLockRelease(BufMgrLock); + TerminateBufferIO(buf, false, BM_IO_ERROR); } } @@ -2032,6 +2102,7 @@ buffer_write_error_callback(void *arg) { BufferDesc *bufHdr = (BufferDesc *) arg; + /* Buffer is pinned, so we can read the tag without locking the spinlock */ if (bufHdr != NULL) errcontext("writing block %u of relation %u/%u/%u", bufHdr->tag.blockNum, |