aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c841
1 files changed, 795 insertions, 46 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3b66c5c6b4c..4fea8ebf482 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -48,6 +48,7 @@
#include "pg_trace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
+#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -519,7 +520,8 @@ static int SyncOneBuffer(int buf_id, bool skip_recently_used,
static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
- uint32 set_flag_bits, bool forget_owner);
+ uint32 set_flag_bits, bool forget_owner,
+ bool release_aio);
static void AbortBufferIO(Buffer buffer);
static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg);
@@ -1041,7 +1043,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
{
/* Simple case for non-shared buffers. */
bufHdr = GetLocalBufferDescriptor(-buffer - 1);
- need_to_zero = StartLocalBufferIO(bufHdr, true);
+ need_to_zero = StartLocalBufferIO(bufHdr, true, false);
}
else
{
@@ -1077,9 +1079,9 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (isLocalBuf)
- TerminateLocalBufferIO(bufHdr, false, BM_VALID);
+ TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
else
- TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
}
else if (!isLocalBuf)
{
@@ -1454,7 +1456,8 @@ static inline bool
WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
{
if (BufferIsLocal(buffer))
- return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
+ return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
+ true, nowait);
else
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
}
@@ -1619,9 +1622,9 @@ WaitReadBuffers(ReadBuffersOperation *operation)
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (persistence == RELPERSISTENCE_TEMP)
- TerminateLocalBufferIO(bufHdr, false, BM_VALID);
+ TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
else
- TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
/* Report I/Os as completing individually. */
TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
@@ -1883,13 +1886,14 @@ retry:
}
/*
- * We assume the only reason for it to be pinned is that someone else is
- * flushing the page out. Wait for them to finish. (This could be an
- * infinite loop if the refcount is messed up... it would be nice to time
- * out after awhile, but there seems no way to be sure how many loops may
- * be needed. Note that if the other guy has pinned the buffer but not
- * yet done StartBufferIO, WaitIO will fall through and we'll effectively
- * be busy-looping here.)
+ * We assume the reason for it to be pinned is that either we were
+ * asynchronously reading the page in before erroring out or someone else
+ * is flushing the page out. Wait for the IO to finish. (This could be
+ * an infinite loop if the refcount is messed up... it would be nice to
+ * time out after awhile, but there seems no way to be sure how many loops
+ * may be needed. Note that if the other guy has pinned the buffer but
+ * not yet done StartBufferIO, WaitIO will fall through and we'll
+ * effectively be busy-looping here.)
*/
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
{
@@ -2529,7 +2533,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
if (lock)
LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
- TerminateBufferIO(buf_hdr, false, BM_VALID, true);
+ TerminateBufferIO(buf_hdr, false, BM_VALID, true, false);
}
pgBufferUsage.shared_blks_written += extend_by;
@@ -2876,6 +2880,44 @@ PinBuffer_Locked(BufferDesc *buf)
}
/*
+ * Support for waking up another backend that is waiting for the cleanup lock
+ * to be released using BM_PIN_COUNT_WAITER.
+ *
+ * See LockBufferForCleanup().
+ *
+ * Expected to be called just after releasing a buffer pin (in a BufferDesc,
+ * not just reducing the backend-local pincount for the buffer).
+ */
+static void
+WakePinCountWaiter(BufferDesc *buf)
+{
+ /*
+ * Acquire the buffer header lock, re-check that there's a waiter. Another
+ * backend could have unpinned this buffer, and already woken up the
+ * waiter.
+ *
+ * There's no danger of the buffer being replaced after we unpinned it
+ * above, as it's pinned by the waiter. The waiter removes
+ * BM_PIN_COUNT_WAITER if it stops waiting for a reason other than this
+ * backend waking it up.
+ */
+ uint32 buf_state = LockBufHdr(buf);
+
+ if ((buf_state & BM_PIN_COUNT_WAITER) &&
+ BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+ {
+ /* we just released the last pin other than the waiter's */
+ int wait_backend_pgprocno = buf->wait_backend_pgprocno;
+
+ buf_state &= ~BM_PIN_COUNT_WAITER;
+ UnlockBufHdr(buf, buf_state);
+ ProcSendSignal(wait_backend_pgprocno);
+ }
+ else
+ UnlockBufHdr(buf, buf_state);
+}
+
+/*
* UnpinBuffer -- make buffer available for replacement.
*
* This should be applied only to shared buffers, never local ones. This
@@ -2943,29 +2985,8 @@ UnpinBufferNoOwner(BufferDesc *buf)
/* Support LockBufferForCleanup() */
if (buf_state & BM_PIN_COUNT_WAITER)
- {
- /*
- * Acquire the buffer header lock, re-check that there's a waiter.
- * Another backend could have unpinned this buffer, and already
- * woken up the waiter. There's no danger of the buffer being
- * replaced after we unpinned it above, as it's pinned by the
- * waiter.
- */
- buf_state = LockBufHdr(buf);
+ WakePinCountWaiter(buf);
- if ((buf_state & BM_PIN_COUNT_WAITER) &&
- BUF_STATE_GET_REFCOUNT(buf_state) == 1)
- {
- /* we just released the last pin other than the waiter's */
- int wait_backend_pgprocno = buf->wait_backend_pgprocno;
-
- buf_state &= ~BM_PIN_COUNT_WAITER;
- UnlockBufHdr(buf, buf_state);
- ProcSendSignal(wait_backend_pgprocno);
- }
- else
- UnlockBufHdr(buf, buf_state);
- }
ForgetPrivateRefCountEntry(ref);
}
}
@@ -3986,7 +4007,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
* end the BM_IO_IN_PROGRESS state.
*/
- TerminateBufferIO(buf, true, 0, true);
+ TerminateBufferIO(buf, true, 0, true, false);
TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
buf->tag.blockNum,
@@ -5256,6 +5277,13 @@ LockBufferForCleanup(Buffer buffer)
CheckBufferIsPinnedOnce(buffer);
+ /*
+ * We do not yet need to be worried about in-progress AIOs holding a pin,
+ * as we, so far, only support doing reads via AIO and this function can
+ * only be called once the buffer is valid (i.e. no read can be in
+ * flight).
+ */
+
/* Nobody else to wait for */
if (BufferIsLocal(buffer))
return;
@@ -5413,6 +5441,8 @@ ConditionalLockBufferForCleanup(Buffer buffer)
Assert(BufferIsValid(buffer));
+ /* see AIO related comment in LockBufferForCleanup() */
+
if (BufferIsLocal(buffer))
{
refcount = LocalRefCount[-buffer - 1];
@@ -5468,6 +5498,8 @@ IsBufferCleanupOK(Buffer buffer)
Assert(BufferIsValid(buffer));
+ /* see AIO related comment in LockBufferForCleanup() */
+
if (BufferIsLocal(buffer))
{
/* There should be exactly one pin */
@@ -5520,6 +5552,7 @@ WaitIO(BufferDesc *buf)
for (;;)
{
uint32 buf_state;
+ PgAioWaitRef iow;
/*
* It may not be necessary to acquire the spinlock to check the flag
@@ -5527,10 +5560,40 @@ WaitIO(BufferDesc *buf)
* play it safe.
*/
buf_state = LockBufHdr(buf);
+
+ /*
+ * Copy the wait reference while holding the spinlock. This protects
+ * against a concurrent TerminateBufferIO() in another backend from
+ * clearing the wref while it's being read.
+ */
+ iow = buf->io_wref;
UnlockBufHdr(buf, buf_state);
+ /* no IO in progress, we don't need to wait */
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
+
+ /*
+ * The buffer has asynchronous IO in progress, wait for it to
+ * complete.
+ */
+ if (pgaio_wref_valid(&iow))
+ {
+ pgaio_wref_wait(&iow);
+
+ /*
+ * The AIO subsystem internally uses condition variables and thus
+ * might remove this backend from the BufferDesc's CV. While that
+ * wouldn't cause a correctness issue (the first CV sleep just
+ * immediately returns if not already registered), it seems worth
+ * avoiding unnecessary loop iterations, given that we take care
+ * to do so at the start of the function.
+ */
+ ConditionVariablePrepareToSleep(cv);
+ continue;
+ }
+
+ /* wait on BufferDesc->cv, e.g. for concurrent synchronous IO */
ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO);
}
ConditionVariableCancelSleep();
@@ -5539,13 +5602,12 @@ WaitIO(BufferDesc *buf)
/*
* StartBufferIO: begin I/O on this buffer
* (Assumptions)
- * My process is executing no IO
+ * My process is executing no IO on this buffer
* The buffer is Pinned
*
- * In some scenarios there are race conditions in which multiple backends
- * could attempt the same I/O operation concurrently. If someone else
- * has already started I/O on this buffer then we will block on the
- * I/O condition variable until he's done.
+ * In some scenarios multiple backends could attempt the same I/O operation
+ * concurrently. If someone else has already started I/O on this buffer then
+ * we will wait for completion of the IO using WaitIO().
*
* Input operations are only attempted on buffers that are not BM_VALID,
* and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -5581,9 +5643,9 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
/* Once we get here, there is definitely no I/O active on this buffer */
+ /* Check if someone else already did the I/O */
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
- /* someone else already did the I/O */
UnlockBufHdr(buf, buf_state);
return false;
}
@@ -5619,7 +5681,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
*/
static void
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
- bool forget_owner)
+ bool forget_owner, bool release_aio)
{
uint32 buf_state;
@@ -5634,6 +5696,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
+ if (release_aio)
+ {
+ /* release ownership by the AIO subsystem */
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+ buf_state -= BUF_REFCOUNT_ONE;
+ pgaio_wref_clear(&buf->io_wref);
+ }
+
buf_state |= set_flag_bits;
UnlockBufHdr(buf, buf_state);
@@ -5642,6 +5712,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
BufferDescriptorGetBuffer(buf));
ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
+
+ /*
+ * Support LockBufferForCleanup()
+ *
+ * We may have just released the last pin other than the waiter's. In most
+ * cases, this backend holds another pin on the buffer. But, if, for
+ * example, this backend is completing an IO issued by another backend, it
+ * may be time to wake the waiter.
+ */
+ if (release_aio && (buf_state & BM_PIN_COUNT_WAITER))
+ WakePinCountWaiter(buf);
}
/*
@@ -5690,7 +5771,7 @@ AbortBufferIO(Buffer buffer)
}
}
- TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false);
+ TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, false);
}
/*
@@ -6141,3 +6222,671 @@ EvictUnpinnedBuffer(Buffer buf)
return result;
}
+
+/*
+ * Generic implementation of the AIO handle staging callback for readv/writev
+ * on local/shared buffers.
+ *
+ * Each readv/writev can target multiple buffers. The buffers have already
+ * been registered with the IO handle.
+ *
+ * To make the IO ready for execution ("staging"), we need to ensure that the
+ * targeted buffers are in an appropriate state while the IO is ongoing. For
+ * that the AIO subsystem needs to have its own buffer pin, otherwise an error
+ * in this backend could lead to this backend's buffer pin being released as
+ * part of error handling, which in turn could lead to the buffer being
+ * replaced while IO is ongoing.
+ */
+static pg_attribute_always_inline void
+buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
+{
+ uint64 *io_data;
+ uint8 handle_data_len;
+ PgAioWaitRef io_ref;
+ BufferTag first PG_USED_FOR_ASSERTS_ONLY = {0};
+
+ io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+
+ pgaio_io_get_wref(ioh, &io_ref);
+
+ /* iterate over all buffers affected by the vectored readv/writev */
+ for (int i = 0; i < handle_data_len; i++)
+ {
+ Buffer buffer = (Buffer) io_data[i];
+ BufferDesc *buf_hdr = is_temp ?
+ GetLocalBufferDescriptor(-buffer - 1)
+ : GetBufferDescriptor(buffer - 1);
+ uint32 buf_state;
+
+ /*
+ * Check that all the buffers are actually ones that could conceivably
+ * be done in one IO, i.e. are sequential. This is the last
+ * buffer-aware code before IO is actually executed and confusion
+ * about which buffers are targeted by IO can be hard to debug, making
+ * it worth doing extra-paranoid checks.
+ */
+ if (i == 0)
+ first = buf_hdr->tag;
+ else
+ {
+ Assert(buf_hdr->tag.relNumber == first.relNumber);
+ Assert(buf_hdr->tag.blockNum == first.blockNum + i);
+ }
+
+ if (is_temp)
+ buf_state = pg_atomic_read_u32(&buf_hdr->state);
+ else
+ buf_state = LockBufHdr(buf_hdr);
+
+ /* verify the buffer is in the expected state */
+ Assert(buf_state & BM_TAG_VALID);
+ if (is_write)
+ {
+ Assert(buf_state & BM_VALID);
+ Assert(buf_state & BM_DIRTY);
+ }
+ else
+ {
+ Assert(!(buf_state & BM_VALID));
+ Assert(!(buf_state & BM_DIRTY));
+ }
+
+ /* temp buffers don't use BM_IO_IN_PROGRESS */
+ if (!is_temp)
+ Assert(buf_state & BM_IO_IN_PROGRESS);
+
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1);
+
+ /*
+ * Reflect that the buffer is now owned by the AIO subsystem.
+ *
+ * For local buffers: This can't be done just via LocalRefCount, as
+ * one might initially think, as this backend could error out while
+ * AIO is still in progress, releasing all the pins by the backend
+ * itself.
+ *
+ * This pin is released again in TerminateBufferIO().
+ */
+ buf_state += BUF_REFCOUNT_ONE;
+ buf_hdr->io_wref = io_ref;
+
+ if (is_temp)
+ pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+ else
+ UnlockBufHdr(buf_hdr, buf_state);
+
+ /*
+ * Ensure the content lock that prevents buffer modifications while
+ * the buffer is being written out is not released early due to an
+ * error.
+ */
+ if (is_write && !is_temp)
+ {
+ LWLock *content_lock;
+
+ content_lock = BufferDescriptorGetContentLock(buf_hdr);
+
+ Assert(LWLockHeldByMe(content_lock));
+
+ /*
+ * Lock is now owned by AIO subsystem.
+ */
+ LWLockDisown(content_lock);
+ }
+
+ /*
+ * Stop tracking this buffer via the resowner - the AIO system now
+ * keeps track.
+ */
+ if (!is_temp)
+ ResourceOwnerForgetBufferIO(CurrentResourceOwner, buffer);
+ }
+}
+
+/*
+ * Decode readv errors as encoded by buffer_readv_encode_error().
+ */
+static inline void
+buffer_readv_decode_error(PgAioResult result,
+ bool *zeroed_any,
+ bool *ignored_any,
+ uint8 *zeroed_or_error_count,
+ uint8 *checkfail_count,
+ uint8 *first_off)
+{
+ uint32 rem_error = result.error_data;
+
+ /* see static asserts in buffer_readv_encode_error */
+#define READV_COUNT_BITS 7
+#define READV_COUNT_MASK ((1 << READV_COUNT_BITS) - 1)
+
+ *zeroed_any = rem_error & 1;
+ rem_error >>= 1;
+
+ *ignored_any = rem_error & 1;
+ rem_error >>= 1;
+
+ *zeroed_or_error_count = rem_error & READV_COUNT_MASK;
+ rem_error >>= READV_COUNT_BITS;
+
+ *checkfail_count = rem_error & READV_COUNT_MASK;
+ rem_error >>= READV_COUNT_BITS;
+
+ *first_off = rem_error & READV_COUNT_MASK;
+ rem_error >>= READV_COUNT_BITS;
+}
+
+/*
+ * Helper to encode errors for buffer_readv_complete()
+ *
+ * Errors are encoded as follows:
+ * - bit 0 indicates whether any page was zeroed (1) or not (0)
+ * - bit 1 indicates whether any checksum failure was ignored (1) or not (0)
+ * - next READV_COUNT_BITS bits indicate the number of errored or zeroed pages
+ * - next READV_COUNT_BITS bits indicate the number of checksum failures
+ * - next READV_COUNT_BITS bits indicate the first offset of the first page
+ * that was errored or zeroed or, if no errors/zeroes, the first ignored
+ * checksum
+ */
+static inline void
+buffer_readv_encode_error(PgAioResult *result,
+ bool is_temp,
+ bool zeroed_any,
+ bool ignored_any,
+ uint8 error_count,
+ uint8 zeroed_count,
+ uint8 checkfail_count,
+ uint8 first_error_off,
+ uint8 first_zeroed_off,
+ uint8 first_ignored_off)
+{
+
+ uint8 shift = 0;
+ uint8 zeroed_or_error_count =
+ error_count > 0 ? error_count : zeroed_count;
+ uint8 first_off;
+
+ StaticAssertStmt(PG_IOV_MAX <= 1 << READV_COUNT_BITS,
+ "PG_IOV_MAX is bigger than reserved space for error data");
+ StaticAssertStmt((1 + 1 + 3 * READV_COUNT_BITS) <= PGAIO_RESULT_ERROR_BITS,
+ "PGAIO_RESULT_ERROR_BITS is insufficient for buffer_readv");
+
+ /*
+ * We only have space to encode one offset - but luckily that's good
+ * enough. If there is an error, the error is the interesting offset, same
+ * with a zeroed buffer vs an ignored buffer.
+ */
+ if (error_count > 0)
+ first_off = first_error_off;
+ else if (zeroed_count > 0)
+ first_off = first_zeroed_off;
+ else
+ first_off = first_ignored_off;
+
+ Assert(!zeroed_any || error_count == 0);
+
+ result->error_data = 0;
+
+ result->error_data |= zeroed_any << shift;
+ shift += 1;
+
+ result->error_data |= ignored_any << shift;
+ shift += 1;
+
+ result->error_data |= ((uint32) zeroed_or_error_count) << shift;
+ shift += READV_COUNT_BITS;
+
+ result->error_data |= ((uint32) checkfail_count) << shift;
+ shift += READV_COUNT_BITS;
+
+ result->error_data |= ((uint32) first_off) << shift;
+ shift += READV_COUNT_BITS;
+
+ result->id = is_temp ? PGAIO_HCB_LOCAL_BUFFER_READV :
+ PGAIO_HCB_SHARED_BUFFER_READV;
+
+ if (error_count > 0)
+ result->status = PGAIO_RS_ERROR;
+ else
+ result->status = PGAIO_RS_WARNING;
+
+ /*
+ * The encoding is complicated enough to warrant cross-checking it against
+ * the decode function.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ bool zeroed_any_2,
+ ignored_any_2;
+ uint8 zeroed_or_error_count_2,
+ checkfail_count_2,
+ first_off_2;
+
+ buffer_readv_decode_error(*result,
+ &zeroed_any_2, &ignored_any_2,
+ &zeroed_or_error_count_2,
+ &checkfail_count_2,
+ &first_off_2);
+ Assert(zeroed_any == zeroed_any_2);
+ Assert(ignored_any == ignored_any_2);
+ Assert(zeroed_or_error_count == zeroed_or_error_count_2);
+ Assert(checkfail_count == checkfail_count_2);
+ Assert(first_off == first_off_2);
+ }
+#endif
+
+#undef READV_COUNT_BITS
+#undef READV_COUNT_MASK
+}
+
+/*
+ * Helper for AIO readv completion callbacks, supporting both shared and temp
+ * buffers. Gets called once for each buffer in a multi-page read.
+ */
+static pg_attribute_always_inline void
+buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
+ uint8 flags, bool failed, bool is_temp,
+ bool *buffer_invalid,
+ bool *failed_checksum,
+ bool *ignored_checksum,
+ bool *zeroed_buffer)
+{
+ BufferDesc *buf_hdr = is_temp ?
+ GetLocalBufferDescriptor(-buffer - 1)
+ : GetBufferDescriptor(buffer - 1);
+ BufferTag tag = buf_hdr->tag;
+ char *bufdata = BufferGetBlock(buffer);
+ uint32 set_flag_bits;
+ int piv_flags;
+
+ /* check that the buffer is in the expected state for a read */
+#ifdef USE_ASSERT_CHECKING
+ {
+ uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+ Assert(buf_state & BM_TAG_VALID);
+ Assert(!(buf_state & BM_VALID));
+ /* temp buffers don't use BM_IO_IN_PROGRESS */
+ if (!is_temp)
+ Assert(buf_state & BM_IO_IN_PROGRESS);
+ Assert(!(buf_state & BM_DIRTY));
+ }
+#endif
+
+ *buffer_invalid = false;
+ *failed_checksum = false;
+ *ignored_checksum = false;
+ *zeroed_buffer = false;
+
+ /*
+ * We ask PageIsVerified() to only log the message about checksum errors,
+ * as the completion might be run in any backend (or IO workers). We will
+ * report checksum errors in buffer_readv_report().
+ */
+ piv_flags = PIV_LOG_LOG;
+
+ /* the local zero_damaged_pages may differ from the definer's */
+ if (flags & READ_BUFFERS_IGNORE_CHECKSUM_FAILURES)
+ piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE;
+
+ /* Check for garbage data. */
+ if (!failed)
+ {
+ PgAioResult result_one;
+
+ if (!PageIsVerified((Page) bufdata, tag.blockNum, piv_flags,
+ failed_checksum))
+ {
+ if (flags & READ_BUFFERS_ZERO_ON_ERROR)
+ {
+ memset(bufdata, 0, BLCKSZ);
+ *zeroed_buffer = true;
+ }
+ else
+ {
+ *buffer_invalid = true;
+ /* mark buffer as having failed */
+ failed = true;
+ }
+ }
+ else if (*failed_checksum)
+ *ignored_checksum = true;
+
+ /*
+ * Immediately log a message about the invalid page, but only to the
+ * server log. The reason to do so immediately is that this may be
+ * executed in a different backend than the one that originated the
+ * request. The reason to do so immediately is that the originator
+ * might not process the query result immediately (because it is busy
+ * doing another part of query processing) or at all (e.g. if it was
+ * cancelled or errored out due to another IO also failing). The
+ * definer of the IO will emit an ERROR or WARNING when processing the
+ * IO's results
+ *
+ * To avoid duplicating the code to emit these log messages, we reuse
+ * buffer_readv_report().
+ */
+ if (*buffer_invalid || *failed_checksum || *zeroed_buffer)
+ {
+ buffer_readv_encode_error(&result_one, is_temp,
+ *zeroed_buffer,
+ *ignored_checksum,
+ *buffer_invalid,
+ *zeroed_buffer ? 1 : 0,
+ *failed_checksum ? 1 : 0,
+ buf_off, buf_off, buf_off);
+ pgaio_result_report(result_one, td, LOG_SERVER_ONLY);
+ }
+ }
+
+ /* Terminate I/O and set BM_VALID. */
+ set_flag_bits = failed ? BM_IO_ERROR : BM_VALID;
+ if (is_temp)
+ TerminateLocalBufferIO(buf_hdr, false, set_flag_bits, true);
+ else
+ TerminateBufferIO(buf_hdr, false, set_flag_bits, false, true);
+
+ /*
+ * Call the BUFFER_READ_DONE tracepoint in the callback, even though the
+ * callback may not be executed in the same backend that called
+ * BUFFER_READ_START. The alternative would be to defer calling the
+ * tracepoint to a later point (e.g. the local completion callback for
+ * shared buffer reads), which seems even less helpful.
+ */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum,
+ tag.blockNum,
+ tag.spcOid,
+ tag.dbOid,
+ tag.relNumber,
+ is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+ false);
+}
+
+/*
+ * Perform completion handling of a single AIO read. This read may cover
+ * multiple blocks / buffers.
+ *
+ * Shared between shared and local buffers, to reduce code duplication.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data, bool is_temp)
+{
+ PgAioResult result = prior_result;
+ PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+ uint8 first_error_off = 0;
+ uint8 first_zeroed_off = 0;
+ uint8 first_ignored_off = 0;
+ uint8 error_count = 0;
+ uint8 zeroed_count = 0;
+ uint8 ignored_count = 0;
+ uint8 checkfail_count = 0;
+ uint64 *io_data;
+ uint8 handle_data_len;
+
+ if (is_temp)
+ {
+ Assert(td->smgr.is_temp);
+ Assert(pgaio_io_get_owner(ioh) == MyProcNumber);
+ }
+ else
+ Assert(!td->smgr.is_temp);
+
+ /*
+ * Iterate over all the buffers affected by this IO and call the
+ * per-buffer completion function for each buffer.
+ */
+ io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+ for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
+ {
+ Buffer buf = io_data[buf_off];
+ bool failed;
+ bool failed_verification = false;
+ bool failed_checksum = false;
+ bool zeroed_buffer = false;
+ bool ignored_checksum = false;
+
+ Assert(BufferIsValid(buf));
+
+ /*
+ * If the entire I/O failed on a lower-level, each buffer needs to be
+ * marked as failed. In case of a partial read, the first few buffers
+ * may be ok.
+ */
+ failed =
+ prior_result.status == PGAIO_RS_ERROR
+ || prior_result.result <= buf_off;
+
+ buffer_readv_complete_one(td, buf_off, buf, cb_data, failed, is_temp,
+ &failed_verification,
+ &failed_checksum,
+ &ignored_checksum,
+ &zeroed_buffer);
+
+ /*
+ * Track information about the number of different kinds of error
+ * conditions across all pages, as there can be multiple pages failing
+ * verification as part of one IO.
+ */
+ if (failed_verification && !zeroed_buffer && error_count++ == 0)
+ first_error_off = buf_off;
+ if (zeroed_buffer && zeroed_count++ == 0)
+ first_zeroed_off = buf_off;
+ if (ignored_checksum && ignored_count++ == 0)
+ first_ignored_off = buf_off;
+ if (failed_checksum)
+ checkfail_count++;
+ }
+
+ /*
+ * If the smgr read succeeded [partially] and page verification failed for
+ * some of the pages, adjust the IO's result state appropriately.
+ */
+ if (prior_result.status != PGAIO_RS_ERROR &&
+ (error_count > 0 || ignored_count > 0 || zeroed_count > 0))
+ {
+ buffer_readv_encode_error(&result, is_temp,
+ zeroed_count > 0, ignored_count > 0,
+ error_count, zeroed_count, checkfail_count,
+ first_error_off, first_zeroed_off,
+ first_ignored_off);
+ pgaio_result_report(result, td, DEBUG1);
+ }
+
+ /*
+ * For shared relations this reporting is done in
+ * shared_buffer_readv_complete_local().
+ */
+ if (is_temp && checkfail_count > 0)
+ pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid,
+ checkfail_count);
+
+ return result;
+}
+
+/*
+ * AIO error reporting callback for aio_shared_buffer_readv_cb and
+ * aio_local_buffer_readv_cb.
+ *
+ * The error is encoded / decoded in buffer_readv_encode_error() /
+ * buffer_readv_decode_error().
+ */
+static void
+buffer_readv_report(PgAioResult result, const PgAioTargetData *td,
+ int elevel)
+{
+ int nblocks = td->smgr.nblocks;
+ BlockNumber first = td->smgr.blockNum;
+ BlockNumber last = first + nblocks - 1;
+ ProcNumber errProc =
+ td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER;
+ RelPathStr rpath =
+ relpathbackend(td->smgr.rlocator, errProc, td->smgr.forkNum);
+ bool zeroed_any,
+ ignored_any;
+ uint8 zeroed_or_error_count,
+ checkfail_count,
+ first_off;
+ uint8 affected_count;
+ const char *msg_one,
+ *msg_mult,
+ *det_mult,
+ *hint_mult;
+
+ buffer_readv_decode_error(result, &zeroed_any, &ignored_any,
+ &zeroed_or_error_count,
+ &checkfail_count,
+ &first_off);
+
+ /*
+ * Treat a read that had both zeroed buffers *and* ignored checksums as a
+ * special case, it's too irregular to be emitted the same way as the
+ * other cases.
+ */
+ if (zeroed_any && ignored_any)
+ {
+ Assert(zeroed_any && ignored_any);
+ Assert(nblocks > 1); /* same block can't be both zeroed and ignored */
+ Assert(result.status != PGAIO_RS_ERROR);
+ affected_count = zeroed_or_error_count;
+
+ ereport(elevel,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("zeroing %u page(s) and ignoring %u checksum failure(s) among blocks %u..%u of relation %s",
+ affected_count, checkfail_count, first, last, rpath.str),
+ affected_count > 1 ?
+ errdetail("Block %u held first zeroed page.",
+ first + first_off) : 0,
+ errhint("See server log for details about the other %u invalid block(s).",
+ affected_count + checkfail_count - 1));
+ return;
+ }
+
+ /*
+ * The other messages are highly repetitive. To avoid duplicating a long
+ * and complicated ereport(), gather the translated format strings
+ * separately and then do one common ereport.
+ */
+ if (result.status == PGAIO_RS_ERROR)
+ {
+ Assert(!zeroed_any); /* can't have invalid pages when zeroing them */
+ affected_count = zeroed_or_error_count;
+ msg_one = _("invalid page in block %u of relation %s");
+ msg_mult = _("%u invalid pages among blocks %u..%u of relation %s");
+ det_mult = _("Block %u held first invalid page.");
+ hint_mult = _("See server log for the other %u invalid block(s).");
+ }
+ else if (zeroed_any && !ignored_any)
+ {
+ affected_count = zeroed_or_error_count;
+ msg_one = _("invalid page in block %u of relation %s; zeroing out page");
+ msg_mult = _("zeroing out %u invalid pages among blocks %u..%u of relation %s");
+ det_mult = _("Block %u held first zeroed page.");
+ hint_mult = _("See server log for the other %u zeroed block(s).");
+ }
+ else if (!zeroed_any && ignored_any)
+ {
+ affected_count = checkfail_count;
+ msg_one = _("ignoring checksum failure in block %u of relation %s");
+ msg_mult = _("ignoring %u checksum failures among blocks %u..%u of relation %s");
+ det_mult = _("Block %u held first ignored page.");
+ hint_mult = _("See server log for the other %u ignored block(s).");
+ }
+ else
+ pg_unreachable();
+
+ ereport(elevel,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ affected_count == 1 ?
+ errmsg_internal(msg_one, first + first_off, rpath.str) :
+ errmsg_internal(msg_mult, affected_count, first, last, rpath.str),
+ affected_count > 1 ? errdetail_internal(det_mult, first + first_off) : 0,
+ affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0);
+}
+
+static void
+shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+ buffer_stage_common(ioh, false, false);
+}
+
+static PgAioResult
+shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data)
+{
+ return buffer_readv_complete(ioh, prior_result, cb_data, false);
+}
+
+/*
+ * We need a backend-local completion callback for shared buffers, to be able
+ * to report checksum errors correctly. Unfortunately that can only safely
+ * happen if the reporting backend has previously called
+ * pgstat_prepare_report_checksum_failure(), which we can only guarantee in
+ * the backend that started the IO. Hence this callback.
+ */
+static PgAioResult
+shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data)
+{
+ bool zeroed_any,
+ ignored_any;
+ uint8 zeroed_or_error_count,
+ checkfail_count,
+ first_off;
+
+ if (prior_result.status == PGAIO_RS_OK)
+ return prior_result;
+
+ buffer_readv_decode_error(prior_result,
+ &zeroed_any,
+ &ignored_any,
+ &zeroed_or_error_count,
+ &checkfail_count,
+ &first_off);
+
+ if (checkfail_count)
+ {
+ PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+
+ pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid,
+ checkfail_count);
+ }
+
+ return prior_result;
+}
+
+static void
+local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+ buffer_stage_common(ioh, false, true);
+}
+
+static PgAioResult
+local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+ uint8 cb_data)
+{
+ return buffer_readv_complete(ioh, prior_result, cb_data, true);
+}
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
+const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
+ .stage = shared_buffer_readv_stage,
+ .complete_shared = shared_buffer_readv_complete,
+ /* need a local callback to report checksum failures */
+ .complete_local = shared_buffer_readv_complete_local,
+ .report = buffer_readv_report,
+};
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
+const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
+ .stage = local_buffer_readv_stage,
+
+ /*
+ * Note that this, in contrast to the shared_buffers case, uses
+ * complete_local, as only the issuing backend has access to the required
+ * datastructures. This is important in case the IO completion may be
+ * consumed incidentally by another backend.
+ */
+ .complete_local = local_buffer_readv_complete,
+ .report = buffer_readv_report,
+};