diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2001-07-06 21:04:26 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2001-07-06 21:04:26 +0000 |
commit | 55432fedd2b3383c0cd0724a70ad0ae5134710f3 (patch) | |
tree | d6aa387a59107c56fd2d4fdfa6c7b12320bd0d70 /src/backend/storage/buffer | |
parent | 1e9e5defc256708ca40009640d337baeca5698ec (diff) | |
download | postgresql-55432fedd2b3383c0cd0724a70ad0ae5134710f3.tar.gz postgresql-55432fedd2b3383c0cd0724a70ad0ae5134710f3.zip |
Implement LockBufferForCleanup(), which will allow concurrent VACUUM
to wait until it's safe to remove tuples and compact free space in a
shared buffer page. Miscellaneous small code cleanups in bufmgr, too.
Diffstat (limited to 'src/backend/storage/buffer')
-rw-r--r-- | src/backend/storage/buffer/README | 100 | ||||
-rw-r--r-- | src/backend/storage/buffer/buf_init.c | 4 | ||||
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 335 | ||||
-rw-r--r-- | src/backend/storage/buffer/freelist.c | 63 |
4 files changed, 298 insertions, 204 deletions
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README new file mode 100644 index 00000000000..519c9c9ebc0 --- /dev/null +++ b/src/backend/storage/buffer/README @@ -0,0 +1,100 @@ +$Header: /cvsroot/pgsql/src/backend/storage/buffer/README,v 1.1 2001/07/06 21:04:25 tgl Exp $ + +Notes about shared buffer access rules +-------------------------------------- + +There are two separate access control mechanisms for shared disk buffers: +reference counts (a/k/a pin counts) and buffer locks. (Actually, there's +a third level of access control: one must hold the appropriate kind of +lock on a relation before one can legally access any page belonging to +the relation. Relation-level locks are not discussed here.) + +Pins: one must "hold a pin on" a buffer (increment its reference count) +before being allowed to do anything at all with it. An unpinned buffer is +subject to being reclaimed and reused for a different page at any instant, +so touching it is unsafe. Typically a pin is acquired via ReadBuffer and +released via WriteBuffer (if one modified the page) or ReleaseBuffer (if not). +It is OK and indeed common for a single backend to pin a page more than +once concurrently; the buffer manager handles this efficiently. It is +considered OK to hold a pin for long intervals --- for example, sequential +scans hold a pin on the current page until done processing all the tuples +on the page, which could be quite a while if the scan is the outer scan of +a join. Similarly, btree index scans hold a pin on the current index page. +This is OK because normal operations never wait for a page's pin count to +drop to zero. (Anything that might need to do such a wait is instead +handled by waiting to obtain the relation-level lock, which is why you'd +better hold one first.) Pins may not be held across transaction +boundaries, however. + +Buffer locks: there are two kinds of buffer locks, shared and exclusive, +which act just as you'd expect: multiple backends can hold shared locks on +the same buffer, but an exclusive lock prevents anyone else from holding +either shared or exclusive lock. (These can alternatively be called READ +and WRITE locks.) These locks are short-term: they should not be held for +long. They are implemented as per-buffer spinlocks, so another backend +trying to acquire a competing lock will spin as long as you hold yours! +Buffer locks are acquired and released by LockBuffer(). It will *not* work +for a single backend to try to acquire multiple locks on the same buffer. +One must pin a buffer before trying to lock it. + +Buffer access rules: + +1. To scan a page for tuples, one must hold a pin and either shared or +exclusive lock. To examine the commit status (XIDs and status bits) of +a tuple in a shared buffer, one must likewise hold a pin and either shared +or exclusive lock. + +2. Once one has determined that a tuple is interesting (visible to the +current transaction) one may drop the buffer lock, yet continue to access +the tuple's data for as long as one holds the buffer pin. This is what is +typically done by heap scans, since the tuple returned by heap_fetch +contains a pointer to tuple data in the shared buffer. Therefore the +tuple cannot go away while the pin is held (see rule #5). Its state could +change, but that is assumed not to matter after the initial determination +of visibility is made. + +3. To add a tuple or change the xmin/xmax fields of an existing tuple, +one must hold a pin and an exclusive lock on the containing buffer. +This ensures that no one else might see a partially-updated state of the +tuple. + +4. It is considered OK to update tuple commit status bits (ie, OR the +values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or +HEAP_XMAX_INVALID into t_infomask) while holding only a shared lock and +pin on a buffer. This is OK because another backend looking at the tuple +at about the same time would OR the same bits into the field, so there +is little or no risk of conflicting update; what's more, if there did +manage to be a conflict it would merely mean that one bit-update would +be lost and need to be done again later. These four bits are only hints +(they cache the results of transaction status lookups in pg_log), so no +great harm is done if they get reset to zero by conflicting updates. + +5. To physically remove a tuple or compact free space on a page, one +must hold a pin and an exclusive lock, *and* observe while holding the +exclusive lock that the buffer's shared reference count is one (ie, +no other backend holds a pin). If these conditions are met then no other +backend can perform a page scan until the exclusive lock is dropped, and +no other backend can be holding a reference to an existing tuple that it +might expect to examine again. Note that another backend might pin the +buffer (increment the refcount) while one is performing the cleanup, but +it won't be able to actually examine the page until it acquires shared +or exclusive lock. + + +As of 7.1, the only operation that removes tuples or compacts free space is +(oldstyle) VACUUM. It does not have to implement rule #5 directly, because +it instead acquires exclusive lock at the relation level, which ensures +indirectly that no one else is accessing pages of the relation at all. + +To implement concurrent VACUUM we will need to make it obey rule #5 fully. +To do this, we'll create a new buffer manager operation +LockBufferForCleanup() that gets an exclusive lock and then checks to see +if the shared pin count is currently 1. If not, it releases the exclusive +lock (but not the caller's pin) and waits until signaled by another backend, +whereupon it tries again. The signal will occur when UnpinBuffer +decrements the shared pin count to 1. As indicated above, this operation +might have to wait a good while before it acquires lock, but that shouldn't +matter much for concurrent VACUUM. The current implementation only +supports a single waiter for pin-count-1 on any particular shared buffer. +This is enough for VACUUM's use, since we don't allow multiple VACUUMs +concurrently on a single relation anyway. diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index 03d6504db86..819fe7e206c 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.42 2001/03/22 03:59:44 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.43 2001/07/06 21:04:25 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -63,7 +63,6 @@ long *PrivateRefCount; /* also used in freelist.c */ bits8 *BufferLocks; /* flag bits showing locks I have set */ BufferTag *BufferTagLastDirtied; /* tag buffer had when last * dirtied by me */ -BufferBlindId *BufferBlindLastDirtied; bool *BufferDirtiedByMe; /* T if buf has been dirtied in cur xact */ @@ -237,7 +236,6 @@ InitBufferPoolAccess(void) PrivateRefCount = (long *) calloc(NBuffers, sizeof(long)); BufferLocks = (bits8 *) calloc(NBuffers, sizeof(bits8)); BufferTagLastDirtied = (BufferTag *) calloc(NBuffers, sizeof(BufferTag)); - BufferBlindLastDirtied = (BufferBlindId *) calloc(NBuffers, sizeof(BufferBlindId)); BufferDirtiedByMe = (bool *) calloc(NBuffers, sizeof(bool)); /* diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 6090e729c04..89443ee160e 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.115 2001/07/02 18:47:18 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.116 2001/07/06 21:04:25 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -46,14 +46,12 @@ #include <math.h> #include <signal.h> -#include "executor/execdebug.h" #include "miscadmin.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" -#include "storage/s_lock.h" +#include "storage/proc.h" #include "storage/smgr.h" #include "utils/relcache.h" -#include "catalog/pg_database.h" #include "pgstat.h" @@ -254,7 +252,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum, if (!BufTableDelete(bufHdr)) { SpinRelease(BufMgrLock); - elog(FATAL, "BufRead: buffer table broken after IO error\n"); + elog(FATAL, "BufRead: buffer table broken after IO error"); } /* remember that BufferAlloc() pinned the buffer */ UnpinBuffer(bufHdr); @@ -426,33 +424,27 @@ BufferAlloc(Relation reln, if (smok == FALSE) { - elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s", - buf->tag.blockNum, buf->blind.dbname, buf->blind.relname); + elog(NOTICE, "BufferAlloc: cannot write block %u for %u/%u", + buf->tag.blockNum, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode); inProgress = FALSE; buf->flags |= BM_IO_ERROR; buf->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(buf); - PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0; - Assert(buf->refcount > 0); - buf->refcount--; - if (buf->refcount == 0) - { - AddBufferToFreelist(buf); - buf->flags |= BM_FREE; - } + UnpinBuffer(buf); buf = (BufferDesc *) NULL; } else { - /* * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't * be setted by anyone. - vadim 01/17/97 */ if (buf->flags & BM_JUST_DIRTIED) { - elog(STOP, "BufferAlloc: content of block %u (%s) changed while flushing", - buf->tag.blockNum, buf->blind.relname); + elog(STOP, "BufferAlloc: content of block %u (%u/%u) changed while flushing", + buf->tag.blockNum, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode); } else buf->flags &= ~BM_DIRTY; @@ -475,8 +467,7 @@ BufferAlloc(Relation reln, inProgress = FALSE; buf->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(buf); - PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0; - buf->refcount--; + UnpinBuffer(buf); buf = (BufferDesc *) NULL; } @@ -501,15 +492,8 @@ BufferAlloc(Relation reln, { buf->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(buf); - /* give up the buffer since we don't need it any more */ - PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0; - Assert(buf->refcount > 0); - buf->refcount--; - if (buf->refcount == 0) - { - AddBufferToFreelist(buf); - buf->flags |= BM_FREE; - } + /* give up old buffer since we don't need it any more */ + UnpinBuffer(buf); } PinBuffer(buf2); @@ -551,18 +535,15 @@ BufferAlloc(Relation reln, if (!BufTableDelete(buf)) { SpinRelease(BufMgrLock); - elog(FATAL, "buffer wasn't in the buffer table\n"); + elog(FATAL, "buffer wasn't in the buffer table"); } - /* record the database name and relation name for this buffer */ - strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery"); - strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln)); - INIT_BUFFERTAG(&(buf->tag), reln, blockNum); + if (!BufTableInsert(buf)) { SpinRelease(BufMgrLock); - elog(FATAL, "Buffer in lookup table twice \n"); + elog(FATAL, "Buffer in lookup table twice"); } /* @@ -704,14 +685,7 @@ ReleaseAndReadBuffer(Buffer buffer, else { SpinAcquire(BufMgrLock); - PrivateRefCount[buffer - 1] = 0; - Assert(bufHdr->refcount > 0); - bufHdr->refcount--; - if (bufHdr->refcount == 0) - { - AddBufferToFreelist(bufHdr); - bufHdr->flags |= BM_FREE; - } + UnpinBuffer(bufHdr); return ReadBufferInternal(relation, blockNum, true); } } @@ -831,8 +805,9 @@ BufferSync() } if (status == SM_FAIL) /* disk failure ?! */ - elog(STOP, "BufferSync: cannot write %u for %s", - bufHdr->tag.blockNum, bufHdr->blind.relname); + elog(STOP, "BufferSync: cannot write %u for %u/%u", + bufHdr->tag.blockNum, + bufHdr->tag.rnode.tblNode, bufHdr->tag.rnode.relNode); /* * Note that it's safe to change cntxDirty here because of we @@ -956,16 +931,11 @@ ResetBufferPool(bool isCommit) { BufferDesc *buf = &BufferDescriptors[i]; + PrivateRefCount[i] = 1; /* make sure we release shared pin */ SpinAcquire(BufMgrLock); - PrivateRefCount[i] = 0; - Assert(buf->refcount > 0); - buf->refcount--; - if (buf->refcount == 0) - { - AddBufferToFreelist(buf); - buf->flags |= BM_FREE; - } + UnpinBuffer(buf); SpinRelease(BufMgrLock); + Assert(PrivateRefCount[i] == 0); } } @@ -975,32 +945,31 @@ ResetBufferPool(bool isCommit) smgrabort(); } -/* ----------------------------------------------- - * BufferPoolCheckLeak +/* + * BufferPoolCheckLeak * * check if there is buffer leak - * - * ----------------------------------------------- */ -int -BufferPoolCheckLeak() +bool +BufferPoolCheckLeak(void) { int i; - int result = 0; + bool result = false; - for (i = 1; i <= NBuffers; i++) + for (i = 0; i < NBuffers; i++) { - if (PrivateRefCount[i - 1] != 0) + if (PrivateRefCount[i] != 0) { - BufferDesc *buf = &(BufferDescriptors[i - 1]); + BufferDesc *buf = &(BufferDescriptors[i]); elog(NOTICE, "Buffer Leak: [%03d] (freeNext=%d, freePrev=%d, \ -relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)", - i - 1, buf->freeNext, buf->freePrev, - buf->blind.relname, buf->tag.blockNum, buf->flags, - buf->refcount, PrivateRefCount[i - 1]); - result = 1; +rel=%u/%u, blockNum=%u, flags=0x%x, refcount=%d %ld)", + i, buf->freeNext, buf->freePrev, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, buf->flags, + buf->refcount, PrivateRefCount[i]); + result = true; } } return result; @@ -1389,10 +1358,11 @@ PrintBufferDescs() SpinAcquire(BufMgrLock); for (i = 0; i < NBuffers; ++i, ++buf) { - elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \ -blockNum=%d, flags=0x%x, refcount=%d %ld)", + elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \ +blockNum=%u, flags=0x%x, refcount=%d %ld)", i, buf->freeNext, buf->freePrev, - buf->blind.relname, buf->tag.blockNum, buf->flags, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } SpinRelease(BufMgrLock); @@ -1402,8 +1372,9 @@ blockNum=%d, flags=0x%x, refcount=%d %ld)", /* interactive backend */ for (i = 0; i < NBuffers; ++i, ++buf) { - printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n", - i, buf->blind.relname, buf->tag.blockNum, + printf("[%-2d] (%u/%u, %u) flags=0x%x, refcnt=%d %ld)\n", + i, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } } @@ -1419,9 +1390,10 @@ PrintPinnedBufs() for (i = 0; i < NBuffers; ++i, ++buf) { if (PrivateRefCount[i] > 0) - elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \ -blockNum=%d, flags=0x%x, refcount=%d %ld)\n", - i, buf->freeNext, buf->freePrev, buf->blind.relname, + elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \ +blockNum=%u, flags=0x%x, refcount=%d %ld)", + i, buf->freeNext, buf->freePrev, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } @@ -1581,8 +1553,10 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) (char *) MAKE_PTR(bufHdr->data)); if (status == SM_FAIL) /* disk failure ?! */ - elog(STOP, "FlushRelationBuffers: cannot write %u for %s", - bufHdr->tag.blockNum, bufHdr->blind.relname); + elog(STOP, "FlushRelationBuffers: cannot write %u for %u/%u", + bufHdr->tag.blockNum, + bufHdr->tag.rnode.tblNode, + bufHdr->tag.rnode.relNode); BufferFlushCount++; @@ -1624,7 +1598,6 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) /* * ReleaseBuffer -- remove the pin on a buffer without * marking it dirty. - * */ int ReleaseBuffer(Buffer buffer) @@ -1649,14 +1622,7 @@ ReleaseBuffer(Buffer buffer) else { SpinAcquire(BufMgrLock); - PrivateRefCount[buffer - 1] = 0; - Assert(bufHdr->refcount > 0); - bufHdr->refcount--; - if (bufHdr->refcount == 0) - { - AddBufferToFreelist(bufHdr); - bufHdr->flags |= BM_FREE; - } + UnpinBuffer(bufHdr); SpinRelease(BufMgrLock); } @@ -1665,7 +1631,7 @@ ReleaseBuffer(Buffer buffer) /* * ReleaseBufferWithBufferLock - * Same as ReleaseBuffer except we hold the lock + * Same as ReleaseBuffer except we hold the bufmgr lock */ static int ReleaseBufferWithBufferLock(Buffer buffer) @@ -1688,16 +1654,7 @@ ReleaseBufferWithBufferLock(Buffer buffer) if (PrivateRefCount[buffer - 1] > 1) PrivateRefCount[buffer - 1]--; else - { - PrivateRefCount[buffer - 1] = 0; - Assert(bufHdr->refcount > 0); - bufHdr->refcount--; - if (bufHdr->refcount == 0) - { - AddBufferToFreelist(bufHdr); - bufHdr->flags |= BM_FREE; - } - } + UnpinBuffer(bufHdr); return STATUS_OK; } @@ -1712,9 +1669,11 @@ IncrBufferRefCount_Debug(char *file, int line, Buffer buffer) { BufferDesc *buf = &BufferDescriptors[buffer - 1]; - fprintf(stderr, "PIN(Incr) %d relname = %s, blockNum = %d, \ + fprintf(stderr, "PIN(Incr) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", - buffer, buf->blind.relname, buf->tag.blockNum, + buffer, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } } @@ -1730,9 +1689,11 @@ ReleaseBuffer_Debug(char *file, int line, Buffer buffer) { BufferDesc *buf = &BufferDescriptors[buffer - 1]; - fprintf(stderr, "UNPIN(Rel) %d relname = %s, blockNum = %d, \ + fprintf(stderr, "UNPIN(Rel) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", - buffer, buf->blind.relname, buf->tag.blockNum, + buffer, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } } @@ -1757,18 +1718,22 @@ ReleaseAndReadBuffer_Debug(char *file, { BufferDesc *buf = &BufferDescriptors[buffer - 1]; - fprintf(stderr, "UNPIN(Rel&Rd) %d relname = %s, blockNum = %d, \ + fprintf(stderr, "UNPIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", - buffer, buf->blind.relname, buf->tag.blockNum, + buffer, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer)) { BufferDesc *buf = &BufferDescriptors[b - 1]; - fprintf(stderr, "PIN(Rel&Rd) %d relname = %s, blockNum = %d, \ + fprintf(stderr, "PIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", - b, buf->blind.relname, buf->tag.blockNum, + b, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.blockNum, PrivateRefCount[b - 1], file, line); } return b; @@ -1784,6 +1749,7 @@ refcount = %ld, file: %s, line: %d\n", * and die if there's anything fishy. */ +void _bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType) { long start, @@ -1835,6 +1801,7 @@ okay: *CurTraceBuf = (start + 1) % BMT_LIMIT; } +void _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType, long start, long cur) { @@ -1860,7 +1827,7 @@ _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo, tb = &TraceBuf[i]; if (tb->bmt_op != BMT_NOTUSED) { - fprintf(fp, " [%3d]%spid %d buf %2d for <%d,%u,%d> ", + fprintf(fp, " [%3d]%spid %d buf %2d for <%u,%u,%u> ", i, (i == cur ? " ---> " : "\t"), tb->bmt_pid, tb->bmt_buf, tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno); @@ -1967,7 +1934,9 @@ UnlockBuffers(void) for (i = 0; i < NBuffers; i++) { - if (BufferLocks[i] == 0) + bits8 buflocks = BufferLocks[i]; + + if (buflocks == 0) continue; Assert(BufferIsValid(i + 1)); @@ -1977,14 +1946,13 @@ UnlockBuffers(void) S_LOCK(&(buf->cntx_lock)); - if (BufferLocks[i] & BL_R_LOCK) + if (buflocks & BL_R_LOCK) { Assert(buf->r_locks > 0); (buf->r_locks)--; } - if (BufferLocks[i] & BL_RI_LOCK) + if (buflocks & BL_RI_LOCK) { - /* * Someone else could remove our RI lock when acquiring W * lock. This is possible if we came here from elog(ERROR) @@ -1993,7 +1961,7 @@ UnlockBuffers(void) */ buf->ri_lock = false; } - if (BufferLocks[i] & BL_W_LOCK) + if (buflocks & BL_W_LOCK) { Assert(buf->w_lock); buf->w_lock = false; @@ -2001,6 +1969,20 @@ UnlockBuffers(void) S_UNLOCK(&(buf->cntx_lock)); + if (buflocks & BL_PIN_COUNT_LOCK) + { + SpinAcquire(BufMgrLock); + /* + * Don't complain if flag bit not set; it could have been reset + * but we got a cancel/die interrupt before getting the signal. + */ + if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + buf->wait_backend_id == MyBackendId) + buf->flags &= ~BM_PIN_COUNT_WAITER; + SpinRelease(BufMgrLock); + ProcCancelWaitForSignal(); + } + BufferLocks[i] = 0; RESUME_INTERRUPTS(); @@ -2127,6 +2109,77 @@ LockBuffer(Buffer buffer, int mode) } /* + * LockBufferForCleanup - lock a buffer in preparation for deleting items + * + * Items may be deleted from a disk page only when the caller (a) holds an + * exclusive lock on the buffer and (b) has observed that no other backend + * holds a pin on the buffer. If there is a pin, then the other backend + * might have a pointer into the buffer (for example, a heapscan reference + * to an item --- see README for more details). It's OK if a pin is added + * after the cleanup starts, however; the newly-arrived backend will be + * unable to look at the page until we release the exclusive lock. + * + * To implement this protocol, a would-be deleter must pin the buffer and + * then call LockBufferForCleanup(). LockBufferForCleanup() is similar to + * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until + * it has successfully observed pin count = 1. + */ +void +LockBufferForCleanup(Buffer buffer) +{ + BufferDesc *bufHdr; + bits8 *buflock; + + Assert(BufferIsValid(buffer)); + + if (BufferIsLocal(buffer)) + { + /* There should be exactly one pin */ + if (LocalRefCount[-buffer - 1] != 1) + elog(ERROR, "LockBufferForCleanup: wrong local pin count"); + /* Nobody else to wait for */ + return; + } + + /* There should be exactly one local pin */ + if (PrivateRefCount[buffer - 1] != 1) + elog(ERROR, "LockBufferForCleanup: wrong local pin count"); + + bufHdr = &BufferDescriptors[buffer - 1]; + buflock = &(BufferLocks[buffer - 1]); + + for (;;) + { + /* Try to acquire lock */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + SpinAcquire(BufMgrLock); + Assert(bufHdr->refcount > 0); + if (bufHdr->refcount == 1) + { + /* Successfully acquired exclusive lock with pincount 1 */ + SpinRelease(BufMgrLock); + return; + } + /* Failed, so mark myself as waiting for pincount 1 */ + if (bufHdr->flags & BM_PIN_COUNT_WAITER) + { + SpinRelease(BufMgrLock); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + elog(ERROR, "Multiple backends attempting to wait for pincount 1"); + } + bufHdr->wait_backend_id = MyBackendId; + bufHdr->flags |= BM_PIN_COUNT_WAITER; + *buflock |= BL_PIN_COUNT_LOCK; + SpinRelease(BufMgrLock); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + /* Wait to be signaled by UnpinBuffer() */ + ProcWaitForSignal(); + *buflock &= ~BL_PIN_COUNT_LOCK; + /* Loop back and try again */ + } +} + +/* * Functions for IO error handling * * Note : We assume that nested buffer IO never occur. @@ -2240,8 +2293,9 @@ AbortBufferIO(void) /* Issue notice if this is not the first failure... */ if (buf->flags & BM_IO_ERROR) { - elog(NOTICE, "write error may be permanent: cannot write block %u for %s/%s", - buf->tag.blockNum, buf->blind.dbname, buf->blind.relname); + elog(NOTICE, "write error may be permanent: cannot write block %u for %u/%u", + buf->tag.blockNum, + buf->tag.rnode.tblNode, buf->tag.rnode.relNode); } buf->flags |= BM_DIRTY; } @@ -2252,59 +2306,6 @@ AbortBufferIO(void) } } -/* - * Cleanup buffer or mark it for cleanup. Buffer may be cleaned - * up if it's pinned only once. - * - * NOTE: buffer must be excl locked. - */ -void -MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc) (Buffer)) -{ - BufferDesc *bufHdr = &BufferDescriptors[buffer - 1]; - - Assert(PrivateRefCount[buffer - 1] > 0); - - if (PrivateRefCount[buffer - 1] > 1) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - PrivateRefCount[buffer - 1]--; - SpinAcquire(BufMgrLock); - Assert(bufHdr->refcount > 0); - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - bufHdr->CleanupFunc = CleanupFunc; - SpinRelease(BufMgrLock); - return; - } - - SpinAcquire(BufMgrLock); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) - { - SpinRelease(BufMgrLock); - CleanupFunc(buffer); - CleanupFunc = NULL; - } - else - SpinRelease(BufMgrLock); - - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - - SpinAcquire(BufMgrLock); - PrivateRefCount[buffer - 1] = 0; - Assert(bufHdr->refcount > 0); - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - bufHdr->CleanupFunc = CleanupFunc; - bufHdr->refcount--; - if (bufHdr->refcount == 0) - { - AddBufferToFreelist(bufHdr); - bufHdr->flags |= BM_FREE; - } - SpinRelease(BufMgrLock); - return; -} - RelFileNode BufferGetFileNode(Buffer buffer) { diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index b88e1bbf715..84eded79509 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.23 2001/01/24 19:43:06 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.24 2001/07/06 21:04:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -29,14 +29,14 @@ #include "storage/buf_internals.h" #include "storage/bufmgr.h" +#include "storage/proc.h" static BufferDesc *SharedFreeList; -/* only actually used in debugging. The lock - * should be acquired before calling the freelist manager. +/* + * State-checking macros */ -extern SPINLOCK BufMgrLock; #define IsInQueue(bf) \ ( \ @@ -45,7 +45,7 @@ extern SPINLOCK BufMgrLock; AssertMacro((bf->flags & BM_FREE)) \ ) -#define NotInQueue(bf) \ +#define IsNotInQueue(bf) \ ( \ AssertMacro((bf->freeNext == INVALID_DESCRIPTOR)), \ AssertMacro((bf->freePrev == INVALID_DESCRIPTOR)), \ @@ -61,14 +61,14 @@ extern SPINLOCK BufMgrLock; * the manner in which buffers are added to the freelist queue. * Currently, they are added on an LRU basis. */ -void +static void AddBufferToFreelist(BufferDesc *bf) { #ifdef BMTRACE _bm_trace(bf->tag.relId.dbId, bf->tag.relId.relId, bf->tag.blockNum, BufferDescriptorGetBuffer(bf), BMT_DEALLOC); #endif /* BMTRACE */ - NotInQueue(bf); + IsNotInQueue(bf); /* change bf so it points to inFrontOfNew and its successor */ bf->freePrev = SharedFreeList->freePrev; @@ -83,13 +83,14 @@ AddBufferToFreelist(BufferDesc *bf) /* * PinBuffer -- make buffer unavailable for replacement. + * + * This should be applied only to shared buffers, never local ones. + * Bufmgr lock must be held by caller. */ void PinBuffer(BufferDesc *buf) { - long b; - - /* Assert (buf->refcount < 25); */ + int b = BufferDescriptorGetBuffer(buf) - 1; if (buf->refcount == 0) { @@ -104,13 +105,12 @@ PinBuffer(BufferDesc *buf) buf->flags &= ~BM_FREE; } else - NotInQueue(buf); + IsNotInQueue(buf); - b = BufferDescriptorGetBuffer(buf) - 1; - Assert(PrivateRefCount[b] >= 0); if (PrivateRefCount[b] == 0) buf->refcount++; PrivateRefCount[b]++; + Assert(PrivateRefCount[b] > 0); } #ifdef NOT_USED @@ -135,24 +135,35 @@ refcount = %ld, file: %s, line: %d\n", /* * UnpinBuffer -- make buffer available for replacement. + * + * This should be applied only to shared buffers, never local ones. + * Bufmgr lock must be held by caller. */ void UnpinBuffer(BufferDesc *buf) { - long b = BufferDescriptorGetBuffer(buf) - 1; + int b = BufferDescriptorGetBuffer(buf) - 1; + IsNotInQueue(buf); Assert(buf->refcount > 0); Assert(PrivateRefCount[b] > 0); PrivateRefCount[b]--; if (PrivateRefCount[b] == 0) buf->refcount--; - NotInQueue(buf); if (buf->refcount == 0) { + /* buffer is now unpinned */ AddBufferToFreelist(buf); buf->flags |= BM_FREE; } + else if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + buf->refcount == 1) + { + /* we just released the last pin other than the waiter's */ + buf->flags &= ~BM_PIN_COUNT_WAITER; + ProcSendSignal(buf->wait_backend_id); + } else { /* do nothing */ @@ -179,18 +190,16 @@ refcount = %ld, file: %s, line: %d\n", /* * GetFreeBuffer() -- get the 'next' buffer from the freelist. - * */ BufferDesc * -GetFreeBuffer() +GetFreeBuffer(void) { BufferDesc *buf; if (Free_List_Descriptor == SharedFreeList->freeNext) { - /* queue is empty. All buffers in the buffer pool are pinned. */ - elog(ERROR, "out of free buffers: time to abort !\n"); + elog(ERROR, "out of free buffers: time to abort!"); return NULL; } buf = &(BufferDescriptors[SharedFreeList->freeNext]); @@ -220,7 +229,7 @@ InitFreeList(bool init) if (init) { - /* we only do this once, normally the postmaster */ + /* we only do this once, normally in the postmaster */ SharedFreeList->data = INVALID_OFFSET; SharedFreeList->flags = 0; SharedFreeList->flags &= ~(BM_VALID | BM_DELETED | BM_FREE); @@ -249,37 +258,23 @@ DBG_FreeListCheck(int nfree) buf = &(BufferDescriptors[SharedFreeList->freeNext]); for (i = 0; i < nfree; i++, buf = &(BufferDescriptors[buf->freeNext])) { - if (!(buf->flags & (BM_FREE))) { if (buf != SharedFreeList) - { printf("\tfree list corrupted: %d flags %x\n", buf->buf_id, buf->flags); - } else - { printf("\tfree list corrupted: too short -- %d not %d\n", i, nfree); - - } - - } if ((BufferDescriptors[buf->freeNext].freePrev != buf->buf_id) || (BufferDescriptors[buf->freePrev].freeNext != buf->buf_id)) - { printf("\tfree list links corrupted: %d %ld %ld\n", buf->buf_id, buf->freePrev, buf->freeNext); - } - } if (buf != SharedFreeList) - { printf("\tfree list corrupted: %d-th buffer is %d\n", nfree, buf->buf_id); - - } } #endif |