diff options
Diffstat (limited to 'src/backend/storage/buffer/localbuf.c')
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 61 |
1 files changed, 53 insertions, 8 deletions
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 3a722321533..bf89076bb10 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -18,6 +18,7 @@ #include "access/parallel.h" #include "executor/instrument.h" #include "pgstat.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -187,7 +188,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) * Try to start an I/O operation. There currently are no reasons for * StartLocalBufferIO to return false, so we raise an error in that case. */ - if (!StartLocalBufferIO(bufHdr, false)) + if (!StartLocalBufferIO(bufHdr, false, false)) elog(ERROR, "failed to start write IO on local buffer"); /* Find smgr relation for buffer */ @@ -211,7 +212,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) IOOP_WRITE, io_start, 1, BLCKSZ); /* Mark not-dirty */ - TerminateLocalBufferIO(bufHdr, true, 0); + TerminateLocalBufferIO(bufHdr, true, 0, false); pgBufferUsage.local_blks_written++; } @@ -430,7 +431,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state); /* no need to loop for local buffers */ - StartLocalBufferIO(existing_hdr, true); + StartLocalBufferIO(existing_hdr, true, false); } else { @@ -446,7 +447,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, hresult->id = victim_buf_id; - StartLocalBufferIO(victim_buf_hdr, true); + StartLocalBufferIO(victim_buf_hdr, true, false); } } @@ -515,13 +516,31 @@ MarkLocalBufferDirty(Buffer buffer) * Like StartBufferIO, but for local buffers */ bool -StartLocalBufferIO(BufferDesc *bufHdr, bool forInput) +StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait) { - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + uint32 buf_state; + + /* + * With AIO the buffer could have IO in progress, e.g. when there are two + * scans of the same relation. Either wait for the other IO or return + * false. + */ + if (pgaio_wref_valid(&bufHdr->io_wref)) + { + PgAioWaitRef iow = bufHdr->io_wref; + if (nowait) + return false; + + pgaio_wref_wait(&iow); + } + + /* Once we get here, there is definitely no I/O active on this buffer */ + + /* Check if someone else already did the I/O */ + buf_state = pg_atomic_read_u32(&bufHdr->state); if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { - /* someone else already did the I/O */ return false; } @@ -536,7 +555,8 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput) * Like TerminateBufferIO, but for local buffers */ void -TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits) +TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, + bool release_aio) { /* Only need to adjust flags */ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); @@ -549,12 +569,22 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit if (clear_dirty) buf_state &= ~BM_DIRTY; + if (release_aio) + { + /* release pin held by IO subsystem, see also buffer_stage_common() */ + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state -= BUF_REFCOUNT_ONE; + pgaio_wref_clear(&bufHdr->io_wref); + } + buf_state |= set_flag_bits; pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); /* local buffers don't track IO using resowners */ /* local buffers don't use the IO CV, as no other process can see buffer */ + + /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */ } /* @@ -575,6 +605,19 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced) uint32 buf_state; LocalBufferLookupEnt *hresult; + /* + * It's possible that we started IO on this buffer before e.g. aborting + * the transaction that created a table. We need to wait for that IO to + * complete before removing / reusing the buffer. + */ + if (pgaio_wref_valid(&bufHdr->io_wref)) + { + PgAioWaitRef iow = bufHdr->io_wref; + + pgaio_wref_wait(&iow); + Assert(!pgaio_wref_valid(&bufHdr->io_wref)); + } + buf_state = pg_atomic_read_u32(&bufHdr->state); /* @@ -714,6 +757,8 @@ InitLocalBuffers(void) */ buf->buf_id = -i - 2; + pgaio_wref_clear(&buf->io_wref); + /* * Intentionally do not initialize the buffer's atomic variable * (besides zeroing the underlying memory above). That way we get |