aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2025-03-30 17:28:03 -0400
committerAndres Freund <andres@anarazel.de>2025-03-30 17:28:03 -0400
commit047cba7fa0f8c6930b0dd1d93d98c736ef1e4a5a (patch)
tree29feb0de9ab74851134110b04486700444fa3852 /src/backend/storage/buffer/bufmgr.c
parentef64fe26bad92a7b8425767cdbbe8b946d4637f0 (diff)
downloadpostgresql-047cba7fa0f8c6930b0dd1d93d98c736ef1e4a5a.tar.gz
postgresql-047cba7fa0f8c6930b0dd1d93d98c736ef1e4a5a.zip
bufmgr: Implement AIO read support
This commit implements the infrastructure to perform asynchronous reads into the buffer pool. To do so, it: - Adds readv AIO callbacks for shared and local buffers It may be worth calling out that shared buffer completions may be run in a different backend than where the IO started. - Adds an AIO wait reference to BufferDesc, to allow backends to wait for in-progress asynchronous IOs - Adapts StartBufferIO(), WaitIO(), TerminateBufferIO(), and their localbuf.c equivalents, to be able to deal with AIO - Moves the code to handle BM_PIN_COUNT_WAITER into a helper function, as it now also needs to be called on IO completion As of this commit, nothing issues AIO on shared/local buffers. A future commit will update StartReadBuffers() to do so. Buffer reads executed through this infrastructure will report invalid page / checksum errors / warnings differently than before: In the error case the error message will cover all the blocks that were included in the read, rather than just the reporting the first invalid block. If more than one block is invalid, the error will include information about the range of the read, the first invalid block and the number of invalid pages, with a HINT towards the server log for per-block details. For the warning case (i.e. zero_damaged_buffers) we would previously emit one warning message for each buffer in a multi-block read. Now there is only a single warning message for the entire read, again referring to the server log for more details in case of multiple checksum failures within a single larger read. Reviewed-by: Noah Misch <noah@leadboat.com> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com> Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c841
1 files changed, 795 insertions, 46 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3b66c5c6b4c..4fea8ebf482 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -48,6 +48,7 @@
#include "pg_trace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
+#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -519,7 +520,8 @@ static int SyncOneBuffer(int buf_id, bool skip_recently_used,
static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
- uint32 set_flag_bits, bool forget_owner);
+ uint32 set_flag_bits, bool forget_owner,
+ bool release_aio);
static void AbortBufferIO(Buffer buffer);
static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg);
@@ -1041,7 +1043,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
{
/* Simple case for non-shared buffers. */
bufHdr = GetLocalBufferDescriptor(-buffer - 1);
- need_to_zero = StartLocalBufferIO(bufHdr, true);
+ need_to_zero = StartLocalBufferIO(bufHdr, true, false);
}
else
{
@@ -1077,9 +1079,9 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (isLocalBuf)
- TerminateLocalBufferIO(bufHdr, false, BM_VALID);
+ TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
else
- TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
}
else if (!isLocalBuf)
{
@@ -1454,7 +1456,8 @@ static inline bool
WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
{
if (BufferIsLocal(buffer))
- return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
+ return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
+ true, nowait);
else
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
}
@@ -1619,9 +1622,9 @@ WaitReadBuffers(ReadBuffersOperation *operation)
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (persistence == RELPERSISTENCE_TEMP)
- TerminateLocalBufferIO(bufHdr, false, BM_VALID);
+ TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
else
- TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
/* Report I/Os as completing individually. */
TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
@@ -1883,13 +1886,14 @@ retry:
}
/*
- * We assume the only reason for it to be pinned is that someone else is
- * flushing the page out. Wait for them to finish. (This could be an
- * infinite loop if the refcount is messed up... it would be nice to time
- * out after awhile, but there seems no way to be sure how many loops may
- * be needed. Note that if the other guy has pinned the buffer but not
- * yet done StartBufferIO, WaitIO will fall through and we'll effectively
- * be busy-looping here.)
+ * We assume the reason for it to be pinned is that either we were
+ * asynchronously reading the page in before erroring out or someone else
+ * is flushing the page out. Wait for the IO to finish. (This could be
+ * an infinite loop if the refcount is messed up... it would be nice to
+ * time out after awhile, but there seems no way to be sure how many loops
+ * may be needed. Note that if the other guy has pinned the buffer but
+ * not yet done StartBufferIO, WaitIO will fall through and we'll
+ * effectively be busy-looping here.)
*/
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
{
@@ -2529,7 +2533,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
if (lock)
LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
- TerminateBufferIO(buf_hdr, false, BM_VALID, true);
+ TerminateBufferIO(buf_hdr, false, BM_VALID, true, false);
}
pgBufferUsage.shared_blks_written += extend_by;
@@ -2876,6 +2880,44 @@ PinBuffer_Locked(BufferDesc *buf)
}
/*
+ * Support for waking up another backend that is waiting for the cleanup lock
+ * to be released using BM_PIN_COUNT_WAITER.
+ *
+ * See LockBufferForCleanup().
+ *
+ * Expected to be called just after releasing a buffer pin (in a BufferDesc,
+ * not just reducing the backend-local pincount for the buffer).
+ */
+static void
+WakePinCountWaiter(BufferDesc *buf)
+{
+ /*
+ * Acquire the buffer header lock, re-check that there's a waiter. Another
+ * backend could have unpinned this buffer, and already woken up the
+ * waiter.
+ *
+ * There's no danger of the buffer being replaced after we unpinned it
+ * above, as it's pinned by the waiter. The waiter removes
+ * BM_PIN_COUNT_WAITER if it stops waiting for a reason other than this
+ * backend waking it up.
+ */
+ uint32 buf_state = LockBufHdr(buf);
+
+ if ((buf_state & BM_PIN_COUNT_WAITER) &&
+ BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+ {
+ /* we just released the last pin other than the waiter's */
+ int wait_backend_pgprocno = buf->wait_backend_pgprocno;
+
+ buf_state &= ~BM_PIN_COUNT_WAITER;
+ UnlockBufHdr(buf, buf_state);
+ ProcSendSignal(wait_backend_pgprocno);
+ }
+ else
+ UnlockBufHdr(buf, buf_state);
+}
+
+/*
* UnpinBuffer -- make buffer available for replacement.
*
* This should be applied only to shared buffers, never local ones. This
@@ -2943,29 +2985,8 @@ UnpinBufferNoOwner(BufferDesc *buf)
/* Support LockBufferForCleanup() */
if (buf_state & BM_PIN_COUNT_WAITER)
- {
- /*
- * Acquire the buffer header lock, re-check that there's a waiter.
- * Another backend could have unpinned this buffer, and already
- * woken up the waiter. There's no danger of the buffer being
- * replaced after we unpinned it above, as it's pinned by the
- * waiter.
- */
- buf_state = LockBufHdr(buf);
+ WakePinCountWaiter(buf);
- if ((buf_state & BM_PIN_COUNT_WAITER) &&
- BUF_STATE_GET_REFCOUNT(buf_state) == 1)
- {
- /* we just released the last pin other than the waiter's */
- int wait_backend_pgprocno = buf->wait_backend_pgprocno;
-
- buf_state &= ~BM_PIN_COUNT_WAITER;
- UnlockBufHdr(buf, buf_state);
- ProcSendSignal(wait_backend_pgprocno);
- }
- else
- UnlockBufHdr(buf, buf_state);
- }
ForgetPrivateRefCountEntry(ref);
}
}
@@ -3986,7 +4007,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
* end the BM_IO_IN_PROGRESS state.
*/
- TerminateBufferIO(buf, true, 0, true);
+ TerminateBufferIO(buf, true, 0, true, false);
TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
buf->tag.blockNum,
@@ -5256,6 +5277,13 @@ LockBufferForCleanup(Buffer buffer)
CheckBufferIsPinnedOnce(buffer);
+ /*
+ * We do not yet need to be worried about in-progress AIOs holding a pin,
+ * as we, so far, only support doing reads via AIO and this function can
+ * only be called once the buffer is valid (i.e. no read can be in
+ * flight).
+ */
+
/* Nobody else to wait for */
if (BufferIsLocal(buffer))
return;
@@ -5413,6 +5441,8 @@ ConditionalLockBufferForCleanup(Buffer buffer)
Assert(BufferIsValid(buffer));
+ /* see AIO related comment in LockBufferForCleanup() */
+
if (BufferIsLocal(buffer))
{
refcount = LocalRefCount[-buffer - 1];
@@ -5468,6 +5498,8 @@ IsBufferCleanupOK(Buffer buffer)
Assert(BufferIsValid(buffer));
+ /* see AIO related comment in LockBufferForCleanup() */
+
if (BufferIsLocal(buffer))
{
/* There should be exactly one pin */
@@ -5520,6 +5552,7 @@ WaitIO(BufferDesc *buf)
for (;;)
{
uint32 buf_state;
+ PgAioWaitRef iow;
/*
* It may not be necessary to acquire the spinlock to check the flag
@@ -5527,10 +5560,40 @@ WaitIO(BufferDesc *buf)
* play it safe.
*/
buf_state = LockBufHdr(buf);
+
+ /*
+ * Copy the wait reference while holding the spinlock. This protects
+ * against a concurrent TerminateBufferIO() in another backend from
+ * clearing the wref while it's being read.
+ */
+ iow = buf->io_wref;
UnlockBufHdr(buf, buf_state);
+ /* no IO in progress, we don't need to wait */
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
+
+ /*
+ * The buffer has asynchronous IO in progress, wait for it to
+ * complete.
+ */
+ if (pgaio_wref_valid(&iow))
+ {
+ pgaio_wref_wait(&iow);
+
+ /*
+ * The AIO subsystem internally uses condition variables and thus
+ * might remove this backend from the BufferDesc's CV. While that
+ * wouldn't cause a correctness issue (the first CV sleep just
+ * immediately returns if not already registered), it seems worth
+ * avoiding unnecessary loop iterations, given that we take care
+ * to do so at the start of the function.
+ */
+ ConditionVariablePrepareToSleep(cv);
+ continue;
+ }
+
+ /* wait on BufferDesc->cv, e.g. for concurrent synchronous IO */
ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO);
}
ConditionVariableCancelSleep();
@@ -5539,13 +5602,12 @@ WaitIO(BufferDesc *buf)
/*
* StartBufferIO: begin I/O on this buffer
* (Assumptions)
- * My process is executing no IO
+ * My process is executing no IO on this buffer
* The buffer is Pinned
*
- * In some scenarios there are race conditions in which multiple backends
- * could attempt the same I/O operation concurrently. If someone else
- * has already started I/O on this buffer then we will block on the
- * I/O condition variable until he's done.
+ * In some scenarios multiple backends could attempt the same I/O operation
+ * concurrently. If someone else has already started I/O on this buffer then
+ * we will wait for completion of the IO using WaitIO().
*
* Input operations are only attempted on buffers that are not BM_VALID,
* and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -5581,9 +5643,9 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
/* Once we get here, there is definitely no I/O active on this buffer */
+ /* Check if someone else already did the I/O */
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
- /* someone else already did the I/O */
UnlockBufHdr(buf, buf_state);
return false;
}
@@ -5619,7 +5681,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
*/
static void
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
- bool forget_owner)
+ bool forget_owner, bool release_aio)
{
uint32 buf_state;
@@ -5634,6 +5696,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
+ if (release_aio)
+ {
+ /* release ownership by the AIO subsystem */
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+ buf_state -= BUF_REFCOUNT_ONE;
+ pgaio_wref_clear(&buf->io_wref);
+ }
+
buf_state |= set_flag_bits;
UnlockBufHdr(buf, buf_state);
@@ -5642,6 +5712,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
BufferDescriptorGetBuffer(buf));
ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
+
+ /*
+ * Support LockBufferForCleanup()
+ *
+ * We may have just released the last pin other than the waiter's. In most
+ * cases, this backend holds another pin on the buffer. But, if, for
+ * example, this backend is completing an IO issued by another backend, it
+ * may be time to wake the waiter.
+ */
+ if (release_aio && (buf_state & BM_PIN_COUNT_WAITER))
+ WakePinCountWaiter(buf);
}
/*
@@ -5690,7 +5771,7 @@ AbortBufferIO(Buffer buffer)
}
}
- TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false);
+ TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, false);
}
/*
@@ -6141,3 +6222,671 @@ EvictUnpinnedBuffer(Buffer buf)
return result;
}
+
+/*
+ * Generic implementation of the AIO handle staging callback for readv/writev
+ * on local/shared buffers.
+ *
+ * Each readv/writev can target multiple buffers. The buffers have already
+ * been registered with the IO handle.
+ *
+ * To make the IO ready for execution ("staging"), we need to ensure that the
+ * targeted buffers are in an appropriate state while the IO is ongoing. For
+ * that the AIO subsystem needs to have its own buffer pin, otherwise an error
+ * in this backend could lead to this backend's buffer pin being released as
+ * part of error handling, which in turn could lead to the buffer being
+ * replaced while IO is ongoing.
+ */
+static pg_attribute_always_inline void
+buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
+{
+ uint64 *io_data;
+ uint8 handle_data_len;
+ PgAioWaitRef io_ref;
+ BufferTag first PG_USED_FOR_ASSERTS_ONLY = {0};
+
+ io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+
+ pgaio_io_get_wref(ioh, &io_ref);
+
+ /* iterate over all buffers affected by the vectored readv/writev */
+ for (int i = 0; i < handle_data_len; i++)
+ {
+ Buffer buffer = (Buffer) io_data[i];
+ BufferDesc *buf_hdr = is_temp ?
+ GetLocalBufferDescriptor(-buffer - 1)
+ : GetBufferDescriptor(buffer - 1);
+ uint32 buf_state;
+
+ /*
+ * Check that all the buffers are actually ones that could conceivably
+ * be done in one IO, i.e. are sequential. This is the last
+ * buffer-aware code before IO is actually executed and confusion
+ * about which buffers are targeted by IO can be hard to debug, making
+ * it worth doing extra-paranoid checks.
+ */
+ if (i == 0)
+ first = buf_hdr->tag;
+ else
+ {
+ Assert(buf_hdr->tag.relNumber == first.relNumber);
+ Assert(buf_hdr->tag.blockNum == first.blockNum + i);
+ }
+
+ if (is_temp)
+ buf_state = pg_atomic_read_u32(&buf_hdr->state);
+ else
+ buf_state = LockBufHdr(buf_hdr);
+
+ /* verify the buffer is in the expected state */
+ Assert(buf_state & BM_TAG_VALID);
+ if (is_write)
+ {
+ Assert(buf_state & BM_VALID);
+ Assert(buf_state & BM_DIRTY);
+ }
+ else
+ {
+ Assert(!(buf_state & BM_VALID));
+ Assert(!(buf_state & BM_DIRTY));
+ }
+
+ /* temp buffers don't use BM_IO_IN_PROGRESS */
+ if (!is_temp)
+ Assert(buf_state & BM_IO_IN_PROGRESS);
+
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1);
+
+ /*
+ * Reflect that the buffer is now owned by the AIO subsystem.
+ *
+ * For local buffers: This can't be done just via LocalRefCount, as
+ * one might initially think, as this backend could error out while
+ * AIO is still in progress, releasing all the pins by the backend
+ * itself.
+ *
+ * This pin is released again in TerminateBufferIO().
+ */
+ buf_state += BUF_REFCOUNT_ONE;
+ buf_hdr->io_wref = io_ref;
+
+ if (is_temp)
+ pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+ else
+ UnlockBufHdr(buf_hdr, buf_state);
+
+ /*
+ * Ensure the content lock that prevents buffer modifications while
+ * the buffer is being written out is not released early due to an
+ * error.
+ */
+ if (is_write && !is_temp)
+ {
+ LWLock *content_lock;
+
+ content_lock = BufferDescriptorGetContentLock(buf_hdr);
+
+ Assert(LWLockHeldByMe(content_lock));
+
+ /*
+ * Lock is now owned by AIO subsystem.
+ */
+ LWLockDisown(content_lock);
+ }
+
+ /*
+ * Stop tracking this buffer via the resowner - the AIO system now
+ * keeps track.
+ */
+ if (!is_temp)
+ ResourceOwnerForgetBufferIO(CurrentResourceOwner, buffer);
+ }
+}
+
+/*
+ * Decode readv errors as encoded by buffer_readv_encode_error().
+ */
+static inline void
+buffer_readv_decode_error(PgAioResult result,
+ bool *zeroed_any,
+ bool *ignored_any,
+ uint8 *zeroed_or_error_count,
+ uint8 *checkfail_count,
+ uint8 *first_off)
+{
+ uint32 rem_error = result.error_data;
+
+ /* see static asserts in buffer_readv_encode_error */
+#define READV_COUNT_BITS 7
+#define READV_COUNT_MASK ((1 << READV_COUNT_BITS) - 1)
+
+ *zeroed_any = rem_error & 1;
+ rem_error >>= 1;
+
+ *ignored_any = rem_error & 1;
+ rem_error >>= 1;
+
+ *zeroed_or_error_count = rem_error & READV_COUNT_MASK;
+ rem_error >>= READV_COUNT_BITS;
+
+ *checkfail_count = rem_error & READV_COUNT_MASK;
+ rem_error >>= READV_COUNT_BITS;
+
+ *first_off = rem_error & READV_COUNT_MASK;
+ rem_error >>= READV_COUNT_BITS;
+}
+
+/*
+ * Helper to encode errors for buffer_readv_complete()
+ *
+ * Errors are encoded as follows:
+ * - bit 0 indicates whether any page was zeroed (1) or not (0)
+ * - bit 1 indicates whether any checksum failure was ignored (1) or not (0)
+ * - next READV_COUNT_BITS bits indicate the number of errored or zeroed pages
+ * - next READV_COUNT_BITS bits indicate the number of checksum failures
+ * - next READV_COUNT_BITS bits indicate the first offset of the first page
+ * that was errored or zeroed or, if no errors/zeroes, the first ignored
+ * checksum
+ */
+static inline void
+buffer_readv_encode_error(PgAioResult *result,
+ bool is_temp,
+ bool zeroed_any,
+ bool ignored_any,
+ uint8 error_count,
+ uint8 zeroed_count,
+ uint8 checkfail_count,
+ uint8 first_error_off,
+ uint8 first_zeroed_off,
+ uint8 first_ignored_off)
+{
+
+ uint8 shift = 0;
+ uint8 zeroed_or_error_count =
+ error_count > 0 ? error_count : zeroed_count;
+ uint8 first_off;
+
+ StaticAssertStmt(PG_IOV_MAX <= 1 << READV_COUNT_BITS,
+ "PG_IOV_MAX is bigger than reserved space for error data");
+ StaticAssertStmt((1 + 1 + 3 * READV_COUNT_BITS) <= PGAIO_RESULT_ERROR_BITS,
+ "PGAIO_RESULT_ERROR_BITS is insufficient for buffer_readv");
+
+ /*
+ * We only have space to encode one offset - but luckily that's good
+ * enough. If there is an error, the error is the interesting offset, same
+ * with a zeroed buffer vs an ignored buffer.
+ */
+ if (error_count > 0)
+ first_off = first_error_off;
+ else if (zeroed_count > 0)
+ first_off = first_zeroed_off;
+ else
+ first_off = first_ignored_off;
+
+ Assert(!zeroed_any || error_count == 0);
+
+ result->error_data = 0;
+
+ result->error_data |= zeroed_any << shift;
+ shift += 1;
+
+ result->error_data |= ignored_any << shift;
+ shift += 1;
+
+ result->error_data |= ((uint32) zeroed_or_error_count) << shift;
+ shift += READV_COUNT_BITS;
+
+ result->error_data |= ((uint32) checkfail_count) << shift;
+ shift += READV_COUNT_BITS;
+
+ result->error_data |= ((uint32) first_off) << shift;
+ shift += READV_COUNT_BITS;
+
+ result->id = is_temp ? PGAIO_HCB_LOCAL_BUFFER_READV :
+ PGAIO_HCB_SHARED_BUFFER_READV;
+
+ if (error_count > 0)
+ result->status = PGAIO_RS_ERROR;
+ else
+ result->status = PGAIO_RS_WARNING;
+
+ /*
+ * The encoding is complicated enough to warrant cross-checking it against
+ * the decode function.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ bool zeroed_any_2,
+ ignored_any_2;
+ uint8 zeroed_or_error_count_2,
+ checkfail_count_2,
+ first_off_2;
+
+ buffer_readv_decode_error(*result,
+ &zeroed_any_2, &ignored_any_2,
+ &zeroed_or_error_count_2,
+ &checkfail_count_2,
+ &first_off_2);
+ Assert(zeroed_any == zeroed_any_2);
+ Assert(ignored_any == ignored_any_2);
+ Assert(zeroed_or_error_count == zeroed_or_error_count_2);
+ Assert(checkfail_count == checkfail_count_2);
+ Assert(first_off == first_off_2);
+ }
+#endif
+
+#undef READV_COUNT_BITS
+#undef READV_COUNT_MASK
+}
+
+/*
+ * Helper for AIO readv completion callbacks, supporting both shared and temp
+ * buffers. Gets called once for each buffer in a multi-page read.
+ */
+static pg_attribute_always_inline void
+buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
+ uint8 flags, bool failed, bool is_temp,
+ bool *buffer_invalid,
+ bool *failed_checksum,
+ bool *ignored_checksum,
+ bool *zeroed_buffer)
+{
+ BufferDesc *buf_hdr = is_temp ?
+ GetLocalBufferDescriptor(-buffer - 1)
+ : GetBufferDescriptor(buffer - 1);
+ BufferTag tag = buf_hdr->tag;
+ char *bufdata = BufferGetBlock(buffer);
+ uint32 set_flag_bits;
+ int piv_flags;
+
+ /* check that the buffer is in the expected state for a read */
+#ifdef USE_ASSERT_CHECKING
+ {
+ uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+ Assert(buf_state & BM_TAG_VALID);
+ Assert(!(buf_state & BM_VALID));
+ /* temp buffers don't use BM_IO_IN_PROGRESS */
+ if (!is_temp)
+ Assert(buf_state & BM_IO_IN_PROGRESS);
+ Assert(!(buf_state & BM_DIRTY));
+ }
+#endif
+
+ *buffer_invalid = false;
+ *failed_checksum = false;
+ *ignored_checksum = false;
+ *zeroed_buffer = false;
+
+ /*
+ * We ask PageIsVerified() to only log the message about checksum errors,
+ * as the completion might be run in any backend (or IO workers). We will
+ * report checksum errors in buffer_readv_report().
+ */
+ piv_flags = PIV_LOG_LOG;
+
+ /* the local zero_damaged_pages may differ from the definer's */
+ if (flags & READ_BUFFERS_IGNORE_CHECKSUM_FAILURES)
+ piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE;
+
+ /* Check for garbage data. */
+ if (!failed)
+ {
+ PgAioResult result_one;
+
+ if (!PageIsVerified((Page) bufdata, tag.blockNum, piv_flags,
+ failed_checksum))
+ {
+ if (flags & READ_BUFFERS_ZERO_ON_ERROR)
+ {
+ memset(bufdata, 0, BLCKSZ);
+ *zeroed_buffer = true;
+ }
+ else
+ {
+ *buffer_invalid = true;
+ /* mark buffer as having failed */
+ failed = true;
+ }
+ }
+ else if (*failed_checksum)
+ *ignored_checksum = true;
+
+ /*
+ * Immediately log a message about the invalid page, but only to the
+ * server log. The reason to do so immediately is that this may be
+ * executed in a different backend than the one that originated the
+ * request. The reason to do so immediately is that the originator
+ * might not process the query result immediately (because it is busy
+ * doing another part of query processing) or at all (e.g. if it was
+ * cancelled or errored out due to another IO also failing). The
+ * definer of the IO will emit an ERROR or WARNING when processing the
+ * IO's results
+ *
+ * To avoid duplicating the code to emit these log messages, we reuse
+ * buffer_readv_report().
+ */
+ if (*buffer_invalid || *failed_checksum || *zeroed_buffer)
+ {
+ buffer_readv_encode_error(&result_one, is_temp,
+ *zeroed_buffer,
+ *ignored_checksum,
+ *buffer_invalid,
+ *zeroed_buffer ? 1 : 0,
+ *failed_checksum ? 1 : 0,
+ buf_off, buf_off, buf_off);
+ pgaio_result_report(result_one, td, LOG_SERVER_ONLY);
+ }
+ }
+
+ /* Terminate I/O and set BM_VALID. */
+ set_flag_bits = failed ? BM_IO_ERROR : BM_VALID;
+ if (is_temp)
+ TerminateLocalBufferIO(buf_hdr, false, set_flag_bits, true);
+ else
+ TerminateBufferIO(buf_hdr, false, set_flag_bits, false, true);
+
+ /*
+ * Call the BUFFER_READ_DONE tracepoint in the callback, even though the
+ * callback may not be executed in the same backend that called
+ * BUFFER_READ_START. The alternative would be to defer calling the
+ * tracepoint to a later point (e.g. the local completion callback for
+ * shared buffer reads), which seems even less helpful.
+ */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum,
+ tag.blockNum,
+ tag.spcOid,
+ tag.dbOid,
+ tag.relNumber,
+ is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+ false);
+}
+
+/*
+ * Perform completion handling of a single AIO read. This read may cover
+ * multiple blocks / buffers.
+ *
+ * Shared between shared and local buffers, to reduce code duplication.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data, bool is_temp)
+{
+ PgAioResult result = prior_result;
+ PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+ uint8 first_error_off = 0;
+ uint8 first_zeroed_off = 0;
+ uint8 first_ignored_off = 0;
+ uint8 error_count = 0;
+ uint8 zeroed_count = 0;
+ uint8 ignored_count = 0;
+ uint8 checkfail_count = 0;
+ uint64 *io_data;
+ uint8 handle_data_len;
+
+ if (is_temp)
+ {
+ Assert(td->smgr.is_temp);
+ Assert(pgaio_io_get_owner(ioh) == MyProcNumber);
+ }
+ else
+ Assert(!td->smgr.is_temp);
+
+ /*
+ * Iterate over all the buffers affected by this IO and call the
+ * per-buffer completion function for each buffer.
+ */
+ io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+ for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
+ {
+ Buffer buf = io_data[buf_off];
+ bool failed;
+ bool failed_verification = false;
+ bool failed_checksum = false;
+ bool zeroed_buffer = false;
+ bool ignored_checksum = false;
+
+ Assert(BufferIsValid(buf));
+
+ /*
+ * If the entire I/O failed on a lower-level, each buffer needs to be
+ * marked as failed. In case of a partial read, the first few buffers
+ * may be ok.
+ */
+ failed =
+ prior_result.status == PGAIO_RS_ERROR
+ || prior_result.result <= buf_off;
+
+ buffer_readv_complete_one(td, buf_off, buf, cb_data, failed, is_temp,
+ &failed_verification,
+ &failed_checksum,
+ &ignored_checksum,
+ &zeroed_buffer);
+
+ /*
+ * Track information about the number of different kinds of error
+ * conditions across all pages, as there can be multiple pages failing
+ * verification as part of one IO.
+ */
+ if (failed_verification && !zeroed_buffer && error_count++ == 0)
+ first_error_off = buf_off;
+ if (zeroed_buffer && zeroed_count++ == 0)
+ first_zeroed_off = buf_off;
+ if (ignored_checksum && ignored_count++ == 0)
+ first_ignored_off = buf_off;
+ if (failed_checksum)
+ checkfail_count++;
+ }
+
+ /*
+ * If the smgr read succeeded [partially] and page verification failed for
+ * some of the pages, adjust the IO's result state appropriately.
+ */
+ if (prior_result.status != PGAIO_RS_ERROR &&
+ (error_count > 0 || ignored_count > 0 || zeroed_count > 0))
+ {
+ buffer_readv_encode_error(&result, is_temp,
+ zeroed_count > 0, ignored_count > 0,
+ error_count, zeroed_count, checkfail_count,
+ first_error_off, first_zeroed_off,
+ first_ignored_off);
+ pgaio_result_report(result, td, DEBUG1);
+ }
+
+ /*
+ * For shared relations this reporting is done in
+ * shared_buffer_readv_complete_local().
+ */
+ if (is_temp && checkfail_count > 0)
+ pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid,
+ checkfail_count);
+
+ return result;
+}
+
+/*
+ * AIO error reporting callback for aio_shared_buffer_readv_cb and
+ * aio_local_buffer_readv_cb.
+ *
+ * The error is encoded / decoded in buffer_readv_encode_error() /
+ * buffer_readv_decode_error().
+ */
+static void
+buffer_readv_report(PgAioResult result, const PgAioTargetData *td,
+ int elevel)
+{
+ int nblocks = td->smgr.nblocks;
+ BlockNumber first = td->smgr.blockNum;
+ BlockNumber last = first + nblocks - 1;
+ ProcNumber errProc =
+ td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER;
+ RelPathStr rpath =
+ relpathbackend(td->smgr.rlocator, errProc, td->smgr.forkNum);
+ bool zeroed_any,
+ ignored_any;
+ uint8 zeroed_or_error_count,
+ checkfail_count,
+ first_off;
+ uint8 affected_count;
+ const char *msg_one,
+ *msg_mult,
+ *det_mult,
+ *hint_mult;
+
+ buffer_readv_decode_error(result, &zeroed_any, &ignored_any,
+ &zeroed_or_error_count,
+ &checkfail_count,
+ &first_off);
+
+ /*
+ * Treat a read that had both zeroed buffers *and* ignored checksums as a
+ * special case, it's too irregular to be emitted the same way as the
+ * other cases.
+ */
+ if (zeroed_any && ignored_any)
+ {
+ Assert(zeroed_any && ignored_any);
+ Assert(nblocks > 1); /* same block can't be both zeroed and ignored */
+ Assert(result.status != PGAIO_RS_ERROR);
+ affected_count = zeroed_or_error_count;
+
+ ereport(elevel,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("zeroing %u page(s) and ignoring %u checksum failure(s) among blocks %u..%u of relation %s",
+ affected_count, checkfail_count, first, last, rpath.str),
+ affected_count > 1 ?
+ errdetail("Block %u held first zeroed page.",
+ first + first_off) : 0,
+ errhint("See server log for details about the other %u invalid block(s).",
+ affected_count + checkfail_count - 1));
+ return;
+ }
+
+ /*
+ * The other messages are highly repetitive. To avoid duplicating a long
+ * and complicated ereport(), gather the translated format strings
+ * separately and then do one common ereport.
+ */
+ if (result.status == PGAIO_RS_ERROR)
+ {
+ Assert(!zeroed_any); /* can't have invalid pages when zeroing them */
+ affected_count = zeroed_or_error_count;
+ msg_one = _("invalid page in block %u of relation %s");
+ msg_mult = _("%u invalid pages among blocks %u..%u of relation %s");
+ det_mult = _("Block %u held first invalid page.");
+ hint_mult = _("See server log for the other %u invalid block(s).");
+ }
+ else if (zeroed_any && !ignored_any)
+ {
+ affected_count = zeroed_or_error_count;
+ msg_one = _("invalid page in block %u of relation %s; zeroing out page");
+ msg_mult = _("zeroing out %u invalid pages among blocks %u..%u of relation %s");
+ det_mult = _("Block %u held first zeroed page.");
+ hint_mult = _("See server log for the other %u zeroed block(s).");
+ }
+ else if (!zeroed_any && ignored_any)
+ {
+ affected_count = checkfail_count;
+ msg_one = _("ignoring checksum failure in block %u of relation %s");
+ msg_mult = _("ignoring %u checksum failures among blocks %u..%u of relation %s");
+ det_mult = _("Block %u held first ignored page.");
+ hint_mult = _("See server log for the other %u ignored block(s).");
+ }
+ else
+ pg_unreachable();
+
+ ereport(elevel,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ affected_count == 1 ?
+ errmsg_internal(msg_one, first + first_off, rpath.str) :
+ errmsg_internal(msg_mult, affected_count, first, last, rpath.str),
+ affected_count > 1 ? errdetail_internal(det_mult, first + first_off) : 0,
+ affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0);
+}
+
+static void
+shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+ buffer_stage_common(ioh, false, false);
+}
+
+static PgAioResult
+shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data)
+{
+ return buffer_readv_complete(ioh, prior_result, cb_data, false);
+}
+
+/*
+ * We need a backend-local completion callback for shared buffers, to be able
+ * to report checksum errors correctly. Unfortunately that can only safely
+ * happen if the reporting backend has previously called
+ * pgstat_prepare_report_checksum_failure(), which we can only guarantee in
+ * the backend that started the IO. Hence this callback.
+ */
+static PgAioResult
+shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data)
+{
+ bool zeroed_any,
+ ignored_any;
+ uint8 zeroed_or_error_count,
+ checkfail_count,
+ first_off;
+
+ if (prior_result.status == PGAIO_RS_OK)
+ return prior_result;
+
+ buffer_readv_decode_error(prior_result,
+ &zeroed_any,
+ &ignored_any,
+ &zeroed_or_error_count,
+ &checkfail_count,
+ &first_off);
+
+ if (checkfail_count)
+ {
+ PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+
+ pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid,
+ checkfail_count);
+ }
+
+ return prior_result;
+}
+
+static void
+local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+ buffer_stage_common(ioh, false, true);
+}
+
+static PgAioResult
+local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data)
+{
+ return buffer_readv_complete(ioh, prior_result, cb_data, true);
+}
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
+const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
+ .stage = shared_buffer_readv_stage,
+ .complete_shared = shared_buffer_readv_complete,
+ /* need a local callback to report checksum failures */
+ .complete_local = shared_buffer_readv_complete_local,
+ .report = buffer_readv_report,
+};
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
+const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
+ .stage = local_buffer_readv_stage,
+
+ /*
+ * Note that this, in contrast to the shared_buffers case, uses
+ * complete_local, as only the issuing backend has access to the required
+ * datastructures. This is important in case the IO completion may be
+ * consumed incidentally by another backend.
+ */
+ .complete_local = local_buffer_readv_complete,
+ .report = buffer_readv_report,
+};