aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/localbuf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/localbuf.c')
-rw-r--r--src/backend/storage/buffer/localbuf.c61
1 files changed, 53 insertions, 8 deletions
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 3a722321533..bf89076bb10 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -18,6 +18,7 @@
#include "access/parallel.h"
#include "executor/instrument.h"
#include "pgstat.h"
+#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -187,7 +188,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
* Try to start an I/O operation. There currently are no reasons for
* StartLocalBufferIO to return false, so we raise an error in that case.
*/
- if (!StartLocalBufferIO(bufHdr, false))
+ if (!StartLocalBufferIO(bufHdr, false, false))
elog(ERROR, "failed to start write IO on local buffer");
/* Find smgr relation for buffer */
@@ -211,7 +212,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
IOOP_WRITE, io_start, 1, BLCKSZ);
/* Mark not-dirty */
- TerminateLocalBufferIO(bufHdr, true, 0);
+ TerminateLocalBufferIO(bufHdr, true, 0, false);
pgBufferUsage.local_blks_written++;
}
@@ -430,7 +431,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
/* no need to loop for local buffers */
- StartLocalBufferIO(existing_hdr, true);
+ StartLocalBufferIO(existing_hdr, true, false);
}
else
{
@@ -446,7 +447,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
hresult->id = victim_buf_id;
- StartLocalBufferIO(victim_buf_hdr, true);
+ StartLocalBufferIO(victim_buf_hdr, true, false);
}
}
@@ -515,13 +516,31 @@ MarkLocalBufferDirty(Buffer buffer)
* Like StartBufferIO, but for local buffers
*/
bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
{
- uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
+ uint32 buf_state;
+
+ /*
+ * With AIO the buffer could have IO in progress, e.g. when there are two
+ * scans of the same relation. Either wait for the other IO or return
+ * false.
+ */
+ if (pgaio_wref_valid(&bufHdr->io_wref))
+ {
+ PgAioWaitRef iow = bufHdr->io_wref;
+ if (nowait)
+ return false;
+
+ pgaio_wref_wait(&iow);
+ }
+
+ /* Once we get here, there is definitely no I/O active on this buffer */
+
+ /* Check if someone else already did the I/O */
+ buf_state = pg_atomic_read_u32(&bufHdr->state);
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
- /* someone else already did the I/O */
return false;
}
@@ -536,7 +555,8 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
* Like TerminateBufferIO, but for local buffers
*/
void
-TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits)
+TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
+ bool release_aio)
{
/* Only need to adjust flags */
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
@@ -549,12 +569,22 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit
if (clear_dirty)
buf_state &= ~BM_DIRTY;
+ if (release_aio)
+ {
+ /* release pin held by IO subsystem, see also buffer_stage_common() */
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+ buf_state -= BUF_REFCOUNT_ONE;
+ pgaio_wref_clear(&bufHdr->io_wref);
+ }
+
buf_state |= set_flag_bits;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
/* local buffers don't track IO using resowners */
/* local buffers don't use the IO CV, as no other process can see buffer */
+
+ /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
}
/*
@@ -575,6 +605,19 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
uint32 buf_state;
LocalBufferLookupEnt *hresult;
+ /*
+ * It's possible that we started IO on this buffer before e.g. aborting
+ * the transaction that created a table. We need to wait for that IO to
+ * complete before removing / reusing the buffer.
+ */
+ if (pgaio_wref_valid(&bufHdr->io_wref))
+ {
+ PgAioWaitRef iow = bufHdr->io_wref;
+
+ pgaio_wref_wait(&iow);
+ Assert(!pgaio_wref_valid(&bufHdr->io_wref));
+ }
+
buf_state = pg_atomic_read_u32(&bufHdr->state);
/*
@@ -714,6 +757,8 @@ InitLocalBuffers(void)
*/
buf->buf_id = -i - 2;
+ pgaio_wref_clear(&buf->io_wref);
+
/*
* Intentionally do not initialize the buffer's atomic variable
* (besides zeroing the underlying memory above). That way we get