diff options
author | Andres Freund <andres@anarazel.de> | 2016-04-10 20:12:32 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2016-04-10 20:12:32 -0700 |
commit | 48354581a49c30f5757c203415aa8412d85b0f70 (patch) | |
tree | ca509a2c196f179e97993ac89979c361c4b5f431 /src/backend/storage/buffer/bufmgr.c | |
parent | cf223c3bf5ba16232147c66b5fef4037aafe747c (diff) | |
download | postgresql-48354581a49c30f5757c203415aa8412d85b0f70.tar.gz postgresql-48354581a49c30f5757c203415aa8412d85b0f70.zip |
Allow Pin/UnpinBuffer to operate in a lockfree manner.
Pinning/Unpinning a buffer is a very frequent operation; especially in
read-mostly cache resident workloads. Benchmarking shows that in various
scenarios the spinlock protecting a buffer header's state becomes a
significant bottleneck. The problem can be reproduced with pgbench -S on
larger machines, but can be considerably worse for queries which touch
the same buffers over and over at a high frequency (e.g. nested loops
over a small inner table).
To allow atomic operations to be used, cram BufferDesc's flags,
usage_count, buf_hdr_lock, refcount into a single 32bit atomic variable;
that allows to manipulate them together using 32bit compare-and-swap
operations. This requires reducing MAX_BACKENDS to 2^18-1 (which could
be lifted by using a 64bit field, but it's not a realistic configuration
atm).
As not all operations can easily implemented in a lockfree manner,
implement the previous buf_hdr_lock via a flag bit in the atomic
variable. That way we can continue to lock the header in places where
it's needed, but can get away without acquiring it in the more frequent
hot-paths. There's some additional operations which can be done without
the lock, but aren't in this patch; but the most important places are
covered.
As bufmgr.c now essentially re-implements spinlocks, abstract the delay
logic from s_lock.c into something more generic. It now has already two
users, and more are coming up; there's a follupw patch for lwlock.c at
least.
This patch is based on a proof-of-concept written by me, which Alexander
Korotkov made into a fully working patch; the committed version is again
revised by me. Benchmarking and testing has, amongst others, been
provided by Dilip Kumar, Alexander Korotkov, Robert Haas.
On a large x86 system improvements for readonly pgbench, with a high
client count, of a factor of 8 have been observed.
Author: Alexander Korotkov and Andres Freund
Discussion: 2400449.GjM57CE0Yg@dinodell
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 508 |
1 files changed, 334 insertions, 174 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index c664984d0a1..29f10e59568 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); +static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - int set_flag_bits); + uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, @@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ do { - LockBufHdr(bufHdr); - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; - UnlockBufHdr(bufHdr); + uint32 buf_state = LockBufHdr(bufHdr); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + UnlockBufHdr(bufHdr, buf_state); } while (!StartBufferIO(bufHdr, true)); } } @@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - bufHdr->flags |= BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; + uint32 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * Select a victim buffer. The buffer is returned with its header * spinlock still held! */ - buf = StrategyGetBuffer(strategy); + buf = StrategyGetBuffer(strategy, &buf_state); - Assert(buf->refcount == 0); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); /* Must copy buffer flags while we still hold the spinlock */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); @@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, XLogRecPtr lsn; /* Read the LSN while holding buffer header lock */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); lsn = BufferGetLSN(buf); - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) @@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, /* * Need to lock the buffer header too in order to change its tag. */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were @@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ - oldFlags = buf->flags; - if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) + oldFlags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY)) break; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); BufTableDelete(&newTag, newHash); if ((oldFlags & BM_TAG_VALID) && oldPartitionLock != newPartitionLock) @@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; - buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | + BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | + BUF_USAGECOUNT_MASK); if (relpersistence == RELPERSISTENCE_PERMANENT) - buf->flags |= BM_TAG_VALID | BM_PERMANENT; + buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else - buf->flags |= BM_TAG_VALID; - buf->usage_count = 1; + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (oldFlags & BM_TAG_VALID) { @@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf) BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; + uint32 buf_state; /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; - UnlockBufHdr(buf); + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + UnlockBufHdr(buf, buf_state); /* * Need to compute the old tag's hashcode and partition lock ID. XXX is it @@ -1362,12 +1375,12 @@ retry: LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); return; } @@ -1381,9 +1394,9 @@ retry: * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ - if (buf->refcount != 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) @@ -1396,12 +1409,10 @@ retry: * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - - UnlockBufHdr(buf); + buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); + UnlockBufHdr(buf, buf_state); /* * Remove the buffer from the lookup hashtable, if it was in there. @@ -1433,6 +1444,8 @@ void MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state; + uint32 old_buf_state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); @@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer) /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); - LockBufHdr(bufHdr); + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(bufHdr); - Assert(bufHdr->refcount > 0); + buf_state = old_buf_state; + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + break; + } /* * If the buffer was not dirty already, do vacuum accounting. */ - if (!(bufHdr->flags & BM_DIRTY)) + if (!(old_buf_state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageDirty; } - - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - - UnlockBufHdr(bufHdr); } /* @@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer, * * This should be applied only to shared buffers, never local ones. * + * Since buffers are pinned/unpinned very frequently, pin buffers without + * taking the buffer header lock; instead update the state variable in loop of + * CAS operations. Hopefully it's just a single CAS. + * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows @@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) if (ref == NULL) { + uint32 buf_state; + uint32 old_buf_state; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - LockBufHdr(buf); - buf->refcount++; - if (strategy == NULL) - { - if (buf->usage_count < BM_MAX_USAGE_COUNT) - buf->usage_count++; - } - else + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { - if (buf->usage_count == 0) - buf->usage_count = 1; + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + /* increase refcount */ + buf_state += BUF_REFCOUNT_ONE; + + /* increase usagecount unless already max */ + if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) + buf_state += BUF_USAGECOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + { + result = (buf_state & BM_VALID) != 0; + break; + } } - result = (buf->flags & BM_VALID) != 0; - UnlockBufHdr(buf); } else { @@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf) { Buffer b; PrivateRefCountEntry *ref; + uint32 buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - buf->refcount++; - UnlockBufHdr(buf); + /* + * Since we hold the buffer spinlock, we can update the buffer state and + * release the lock in one operation. + */ + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + buf_state += BUF_REFCOUNT_ONE; + UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { + uint32 buf_state; + uint32 old_buf_state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); - LockBufHdr(buf); + /* + * Decrement the shared reference count. + * + * Since buffer spinlock holder can update status using just write, + * it's not safe to use atomic decrement here; thus use a CAS loop. + */ + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + buf_state -= BUF_REFCOUNT_ONE; - /* Decrement the shared reference count */ - Assert(buf->refcount > 0); - buf->refcount--; + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + break; + } /* Support LockBufferForCleanup() */ - if ((buf->flags & BM_PIN_COUNT_WAITER) && - buf->refcount == 1) + if (buf_state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + /* + * Acquire the buffer header lock, re-check that there's a waiter. + * Another backend could have unpinned this buffer, and already + * woken up the waiter. There's no danger of the buffer being + * replaced after we unpinned it above, as it's pinned by the + * waiter. + */ + buf_state = LockBufHdr(buf); - buf->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); - ProcSendSignal(wait_backend_pid); - } - else - UnlockBufHdr(buf); + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pid = buf->wait_backend_pid; + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf, buf_state); + } ForgetPrivateRefCountEntry(ref); } } @@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) static void BufferSync(int flags) { + uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -1736,13 +1809,13 @@ BufferSync(int flags) * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if ((bufHdr->flags & mask) == mask) + if ((buf_state & mask) == mask) { CkptSortItem *item; - bufHdr->flags |= BM_CHECKPOINT_NEEDED; + buf_state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; @@ -1752,7 +1825,7 @@ BufferSync(int flags) item->blockNum = bufHdr->tag.blockNum; } - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } if (num_to_scan == 0) @@ -1888,7 +1961,7 @@ BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (bufHdr->flags & BM_CHECKPOINT_NEEDED) + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { @@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context) } num_to_scan--; - if (buffer_state & BUF_WRITTEN) + if (sync_state & BUF_WRITTEN) { reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) @@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context) break; } } - else if (buffer_state & BUF_REUSABLE) + else if (sync_state & BUF_REUSABLE) reusable_buffers++; } @@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 buf_state; BufferTag tag; ReservePrivateRefCountEntry(); @@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && + BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } - if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } @@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path; BackendId backend; + uint32 buf_state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) @@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + buf_state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, - buf->tag.blockNum, buf->flags, - buf->refcount, loccount); + buf->tag.blockNum, buf_state & BUF_FLAG_MASK, + BUF_STATE_GET_REFCOUNT(buf_state), loccount); pfree(path); } @@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) io_time; Block bufBlock; char *bufToWrite; + uint32 buf_state; /* * Acquire the buffer's io_in_progress lock. If StartBufferIO returns @@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the @@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf, buf_state); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL @@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf->flags & BM_PERMANENT) + if (buf_state & BM_PERMANENT) XLogFlush(recptr); /* @@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer) /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we * need not bother with the buffer header spinlock. Even if someone else - * changes the buffer header flags while we're doing this, we assume that - * changing an aligned 2-byte BufFlags value is atomic, so we'll read the - * old value or the new value, but not random garbage. + * changes the buffer header state while we're doing this, the state is + * changed atomically, so we'll read the old value or the new value, but + * not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); - return (bufHdr->flags & BM_PERMANENT) != 0; + return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* @@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer) BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); XLogRecPtr lsn; + uint32 buf_state; /* * If we don't need locking for correctness, fastpath out. @@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer) Assert(BufferIsValid(buffer)); Assert(BufferIsPinned(buffer)); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); lsn = PageGetLSN(page); - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return lsn; } @@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before @@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node)) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) { RelFileNode *rnode = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) if (rnode == NULL) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } pfree(nodes); @@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode != dbid) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel) { for (i = 0; i < NLocBuffer; i++) { + uint32 buf_state; + bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + ((buf_state = pg_atomic_read_u32(&bufHdr->state)) & + (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; @@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel) localpage, false); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_write_u32(&bufHdr->state, buf_state); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != + if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 buf_state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ - if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) + if (XLogHintBitIsNeeded() && + (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. @@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) lsn = XLogSaveBufferForHint(buffer, buffer_std); } - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (!(bufHdr->flags & BM_DIRTY)) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + + if (!(buf_state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ @@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - UnlockBufHdr(bufHdr); + + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + UnlockBufHdr(bufHdr, buf_state); if (delayChkpt) MyPgXact->delayChkpt = false; @@ -3406,17 +3503,19 @@ UnlockBuffers(void) if (buf) { - LockBufHdr(buf); + uint32 buf_state; + + buf_state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) - buf->flags &= ~BM_PIN_COUNT_WAITER; + buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); PinCountWaitBuf = NULL; } @@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer) for (;;) { + uint32 buf_state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (bufHdr->flags & BM_PIN_COUNT_WAITER) + if (buf_state & BM_PIN_COUNT_WAITER) { - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; - UnlockBufHdr(bufHdr); + buf_state |= BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ @@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer) * impossible with the current usages due to table level locking, but * better be safe. */ - LockBufHdr(bufHdr); - if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && + buf_state = LockBufHdr(bufHdr); + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) - bufHdr->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr); + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); PinCountWaitBuf = NULL; /* Loop back and try again */ @@ -3603,22 +3705,26 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ - Assert(LocalRefCount[-buffer - 1] > 0); - if (LocalRefCount[-buffer - 1] != 1) + Assert(refcount > 0); + if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ - Assert(GetPrivateRefCount(buffer) > 0); - if (GetPrivateRefCount(buffer) != 1) + refcount = GetPrivateRefCount(buffer); + Assert(refcount); + if (refcount != 1) return false; /* Try to acquire lock */ @@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer) return false; bufHdr = GetBufferDescriptor(buffer - 1); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + refcount = BUF_STATE_GET_REFCOUNT(buf_state); + + Assert(refcount > 0); + if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return true; } /* Failed, so release the lock */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); return false; } @@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf) */ for (;;) { - BufFlags sv_flags; + uint32 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ - LockBufHdr(buf); - sv_flags = buf->flags; - UnlockBufHdr(buf); - if (!(sv_flags & BM_IO_IN_PROGRESS)) + buf_state = LockBufHdr(buf); + UnlockBufHdr(buf, buf_state); + + if (!(buf_state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); @@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf) static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 buf_state; + Assert(!InProgressBuf); for (;;) @@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + if (!(buf_state & BM_IO_IN_PROGRESS)) break; /* @@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput) * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); WaitIO(buf); } /* Once we get here, there is definitely no I/O active on this buffer */ - if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { /* someone else already did the I/O */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); return false; } - buf->flags |= BM_IO_IN_PROGRESS; - - UnlockBufHdr(buf); + buf_state |= BM_IO_IN_PROGRESS; + UnlockBufHdr(buf, buf_state); InProgressBuf = buf; IsForInput = forInput; @@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput) * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) +TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) { + uint32 buf_state; + Assert(buf == InProgressBuf); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); + + Assert(buf_state & BM_IO_IN_PROGRESS); - Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf->flags |= set_flag_bits; + buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) + buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - UnlockBufHdr(buf); + buf_state |= set_flag_bits; + UnlockBufHdr(buf, buf_state); InProgressBuf = NULL; @@ -3803,6 +3915,8 @@ AbortBufferIO(void) if (buf) { + uint32 buf_state; + /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that @@ -3811,24 +3925,22 @@ AbortBufferIO(void) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); + buf_state = LockBufHdr(buf); + Assert(buf_state & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY)); + Assert(!(buf_state & BM_DIRTY)); + /* We'd better not think buffer is valid yet */ - Assert(!(buf->flags & BM_VALID)); - UnlockBufHdr(buf); + Assert(!(buf_state & BM_VALID)); + UnlockBufHdr(buf, buf_state); } else { - BufFlags sv_flags; - - sv_flags = buf->flags; - Assert(sv_flags & BM_DIRTY); - UnlockBufHdr(buf); + Assert(buf_state & BM_DIRTY); + UnlockBufHdr(buf, buf_state); /* Issue notice if this is not the first failure... */ - if (sv_flags & BM_IO_ERROR) + if (buf_state & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ char *path; @@ -3912,6 +4024,54 @@ rnode_comparator(const void *p1, const void *p2) } /* + * Lock buffer header - set BM_LOCKED in buffer state. + */ +uint32 +LockBufHdr(BufferDesc *desc) +{ + SpinDelayStatus delayStatus = init_spin_delay(desc); + uint32 old_buf_state; + + while (true) + { + /* set BM_LOCKED flag */ + old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + /* if it wasn't set before we're OK */ + if (!(old_buf_state & BM_LOCKED)) + break; + perform_spin_delay(&delayStatus); + } + finish_spin_delay(&delayStatus); + return old_buf_state | BM_LOCKED; +} + +/* + * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's + * state at that point. + * + * Obviously the buffer could be locked by the time the value is returned, so + * this is primarily useful in CAS style loops. + */ +static uint32 +WaitBufHdrUnlocked(BufferDesc *buf) +{ + SpinDelayStatus delayStatus = init_spin_delay(buf); + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&buf->state); + + while (buf_state & BM_LOCKED) + { + perform_spin_delay(&delayStatus); + buf_state = pg_atomic_read_u32(&buf->state); + } + + finish_spin_delay(&delayStatus); + + return buf_state; +} + +/* * BufferTag comparator. */ static int |