diff options
-rw-r--r-- | src/backend/storage/aio/aio_callback.c | 5 | ||||
-rw-r--r-- | src/backend/storage/buffer/README | 9 | ||||
-rw-r--r-- | src/backend/storage/buffer/buf_init.c | 3 | ||||
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 841 | ||||
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 61 | ||||
-rw-r--r-- | src/backend/storage/page/bufpage.c | 12 | ||||
-rw-r--r-- | src/include/storage/aio.h | 6 | ||||
-rw-r--r-- | src/include/storage/buf_internals.h | 7 | ||||
-rw-r--r-- | src/include/storage/bufmgr.h | 6 | ||||
-rw-r--r-- | src/include/storage/bufpage.h | 1 |
10 files changed, 885 insertions, 66 deletions
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index b00b6bc1025..bf42778a48c 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -18,6 +18,7 @@ #include "miscadmin.h" #include "storage/aio.h" #include "storage/aio_internal.h" +#include "storage/bufmgr.h" #include "storage/md.h" @@ -40,6 +41,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb), + + CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), + + CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb), #undef CALLBACK_ENTRY }; diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README index 011af7aff3e..a182fcd660c 100644 --- a/src/backend/storage/buffer/README +++ b/src/backend/storage/buffer/README @@ -147,9 +147,12 @@ in the buffer. It is used per the rules above. * The BM_IO_IN_PROGRESS flag acts as a kind of lock, used to wait for I/O on a buffer to complete (and in releases before 14, it was accompanied by a -per-buffer LWLock). The process doing a read or write sets the flag for the -duration, and processes that need to wait for it to be cleared sleep on a -condition variable. +per-buffer LWLock). The process starting a read or write sets the flag. When +the I/O is completed, be it by the process that initiated the I/O or by +another process, the flag is removed and the Buffer's condition variable is +signalled. Processes that need to wait for the I/O to complete can wait for +asynchronous I/O by using BufferDesc->io_wref and for BM_IO_IN_PROGRESS to be +unset by sleeping on the buffer's condition variable. Normal Buffer Replacement Strategy diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index ed1f8e03190..ed1dc488a42 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" @@ -125,6 +126,8 @@ BufferManagerShmemInit(void) buf->buf_id = i; + pgaio_wref_clear(&buf->io_wref); + /* * Initially link all the buffers together as unused. Subsequent * management of this list is done by freelist.c. diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 3b66c5c6b4c..4fea8ebf482 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -48,6 +48,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -519,7 +520,8 @@ static int SyncOneBuffer(int buf_id, bool skip_recently_used, static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - uint32 set_flag_bits, bool forget_owner); + uint32 set_flag_bits, bool forget_owner, + bool release_aio); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); @@ -1041,7 +1043,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) { /* Simple case for non-shared buffers. */ bufHdr = GetLocalBufferDescriptor(-buffer - 1); - need_to_zero = StartLocalBufferIO(bufHdr, true); + need_to_zero = StartLocalBufferIO(bufHdr, true, false); } else { @@ -1077,9 +1079,9 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) /* Set BM_VALID, terminate IO, and wake up any waiters */ if (isLocalBuf) - TerminateLocalBufferIO(bufHdr, false, BM_VALID); + TerminateLocalBufferIO(bufHdr, false, BM_VALID, false); else - TerminateBufferIO(bufHdr, false, BM_VALID, true); + TerminateBufferIO(bufHdr, false, BM_VALID, true, false); } else if (!isLocalBuf) { @@ -1454,7 +1456,8 @@ static inline bool WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) { if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true); + return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), + true, nowait); else return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); } @@ -1619,9 +1622,9 @@ WaitReadBuffers(ReadBuffersOperation *operation) /* Set BM_VALID, terminate IO, and wake up any waiters */ if (persistence == RELPERSISTENCE_TEMP) - TerminateLocalBufferIO(bufHdr, false, BM_VALID); + TerminateLocalBufferIO(bufHdr, false, BM_VALID, false); else - TerminateBufferIO(bufHdr, false, BM_VALID, true); + TerminateBufferIO(bufHdr, false, BM_VALID, true, false); /* Report I/Os as completing individually. */ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, @@ -1883,13 +1886,14 @@ retry: } /* - * We assume the only reason for it to be pinned is that someone else is - * flushing the page out. Wait for them to finish. (This could be an - * infinite loop if the refcount is messed up... it would be nice to time - * out after awhile, but there seems no way to be sure how many loops may - * be needed. Note that if the other guy has pinned the buffer but not - * yet done StartBufferIO, WaitIO will fall through and we'll effectively - * be busy-looping here.) + * We assume the reason for it to be pinned is that either we were + * asynchronously reading the page in before erroring out or someone else + * is flushing the page out. Wait for the IO to finish. (This could be + * an infinite loop if the refcount is messed up... it would be nice to + * time out after awhile, but there seems no way to be sure how many loops + * may be needed. Note that if the other guy has pinned the buffer but + * not yet done StartBufferIO, WaitIO will fall through and we'll + * effectively be busy-looping here.) */ if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { @@ -2529,7 +2533,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, if (lock) LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE); - TerminateBufferIO(buf_hdr, false, BM_VALID, true); + TerminateBufferIO(buf_hdr, false, BM_VALID, true, false); } pgBufferUsage.shared_blks_written += extend_by; @@ -2876,6 +2880,44 @@ PinBuffer_Locked(BufferDesc *buf) } /* + * Support for waking up another backend that is waiting for the cleanup lock + * to be released using BM_PIN_COUNT_WAITER. + * + * See LockBufferForCleanup(). + * + * Expected to be called just after releasing a buffer pin (in a BufferDesc, + * not just reducing the backend-local pincount for the buffer). + */ +static void +WakePinCountWaiter(BufferDesc *buf) +{ + /* + * Acquire the buffer header lock, re-check that there's a waiter. Another + * backend could have unpinned this buffer, and already woken up the + * waiter. + * + * There's no danger of the buffer being replaced after we unpinned it + * above, as it's pinned by the waiter. The waiter removes + * BM_PIN_COUNT_WAITER if it stops waiting for a reason other than this + * backend waking it up. + */ + uint32 buf_state = LockBufHdr(buf); + + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pgprocno = buf->wait_backend_pgprocno; + + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pgprocno); + } + else + UnlockBufHdr(buf, buf_state); +} + +/* * UnpinBuffer -- make buffer available for replacement. * * This should be applied only to shared buffers, never local ones. This @@ -2943,29 +2985,8 @@ UnpinBufferNoOwner(BufferDesc *buf) /* Support LockBufferForCleanup() */ if (buf_state & BM_PIN_COUNT_WAITER) - { - /* - * Acquire the buffer header lock, re-check that there's a waiter. - * Another backend could have unpinned this buffer, and already - * woken up the waiter. There's no danger of the buffer being - * replaced after we unpinned it above, as it's pinned by the - * waiter. - */ - buf_state = LockBufHdr(buf); + WakePinCountWaiter(buf); - if ((buf_state & BM_PIN_COUNT_WAITER) && - BUF_STATE_GET_REFCOUNT(buf_state) == 1) - { - /* we just released the last pin other than the waiter's */ - int wait_backend_pgprocno = buf->wait_backend_pgprocno; - - buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf, buf_state); - ProcSendSignal(wait_backend_pgprocno); - } - else - UnlockBufHdr(buf, buf_state); - } ForgetPrivateRefCountEntry(ref); } } @@ -3986,7 +4007,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and * end the BM_IO_IN_PROGRESS state. */ - TerminateBufferIO(buf, true, 0, true); + TerminateBufferIO(buf, true, 0, true, false); TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag), buf->tag.blockNum, @@ -5256,6 +5277,13 @@ LockBufferForCleanup(Buffer buffer) CheckBufferIsPinnedOnce(buffer); + /* + * We do not yet need to be worried about in-progress AIOs holding a pin, + * as we, so far, only support doing reads via AIO and this function can + * only be called once the buffer is valid (i.e. no read can be in + * flight). + */ + /* Nobody else to wait for */ if (BufferIsLocal(buffer)) return; @@ -5413,6 +5441,8 @@ ConditionalLockBufferForCleanup(Buffer buffer) Assert(BufferIsValid(buffer)); + /* see AIO related comment in LockBufferForCleanup() */ + if (BufferIsLocal(buffer)) { refcount = LocalRefCount[-buffer - 1]; @@ -5468,6 +5498,8 @@ IsBufferCleanupOK(Buffer buffer) Assert(BufferIsValid(buffer)); + /* see AIO related comment in LockBufferForCleanup() */ + if (BufferIsLocal(buffer)) { /* There should be exactly one pin */ @@ -5520,6 +5552,7 @@ WaitIO(BufferDesc *buf) for (;;) { uint32 buf_state; + PgAioWaitRef iow; /* * It may not be necessary to acquire the spinlock to check the flag @@ -5527,10 +5560,40 @@ WaitIO(BufferDesc *buf) * play it safe. */ buf_state = LockBufHdr(buf); + + /* + * Copy the wait reference while holding the spinlock. This protects + * against a concurrent TerminateBufferIO() in another backend from + * clearing the wref while it's being read. + */ + iow = buf->io_wref; UnlockBufHdr(buf, buf_state); + /* no IO in progress, we don't need to wait */ if (!(buf_state & BM_IO_IN_PROGRESS)) break; + + /* + * The buffer has asynchronous IO in progress, wait for it to + * complete. + */ + if (pgaio_wref_valid(&iow)) + { + pgaio_wref_wait(&iow); + + /* + * The AIO subsystem internally uses condition variables and thus + * might remove this backend from the BufferDesc's CV. While that + * wouldn't cause a correctness issue (the first CV sleep just + * immediately returns if not already registered), it seems worth + * avoiding unnecessary loop iterations, given that we take care + * to do so at the start of the function. + */ + ConditionVariablePrepareToSleep(cv); + continue; + } + + /* wait on BufferDesc->cv, e.g. for concurrent synchronous IO */ ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO); } ConditionVariableCancelSleep(); @@ -5539,13 +5602,12 @@ WaitIO(BufferDesc *buf) /* * StartBufferIO: begin I/O on this buffer * (Assumptions) - * My process is executing no IO + * My process is executing no IO on this buffer * The buffer is Pinned * - * In some scenarios there are race conditions in which multiple backends - * could attempt the same I/O operation concurrently. If someone else - * has already started I/O on this buffer then we will block on the - * I/O condition variable until he's done. + * In some scenarios multiple backends could attempt the same I/O operation + * concurrently. If someone else has already started I/O on this buffer then + * we will wait for completion of the IO using WaitIO(). * * Input operations are only attempted on buffers that are not BM_VALID, * and output operations only on buffers that are BM_VALID and BM_DIRTY, @@ -5581,9 +5643,9 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) /* Once we get here, there is definitely no I/O active on this buffer */ + /* Check if someone else already did the I/O */ if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { - /* someone else already did the I/O */ UnlockBufHdr(buf, buf_state); return false; } @@ -5619,7 +5681,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) */ static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, - bool forget_owner) + bool forget_owner, bool release_aio) { uint32 buf_state; @@ -5634,6 +5696,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); + if (release_aio) + { + /* release ownership by the AIO subsystem */ + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state -= BUF_REFCOUNT_ONE; + pgaio_wref_clear(&buf->io_wref); + } + buf_state |= set_flag_bits; UnlockBufHdr(buf, buf_state); @@ -5642,6 +5712,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, BufferDescriptorGetBuffer(buf)); ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf)); + + /* + * Support LockBufferForCleanup() + * + * We may have just released the last pin other than the waiter's. In most + * cases, this backend holds another pin on the buffer. But, if, for + * example, this backend is completing an IO issued by another backend, it + * may be time to wake the waiter. + */ + if (release_aio && (buf_state & BM_PIN_COUNT_WAITER)) + WakePinCountWaiter(buf); } /* @@ -5690,7 +5771,7 @@ AbortBufferIO(Buffer buffer) } } - TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false); + TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, false); } /* @@ -6141,3 +6222,671 @@ EvictUnpinnedBuffer(Buffer buf) return result; } + +/* + * Generic implementation of the AIO handle staging callback for readv/writev + * on local/shared buffers. + * + * Each readv/writev can target multiple buffers. The buffers have already + * been registered with the IO handle. + * + * To make the IO ready for execution ("staging"), we need to ensure that the + * targeted buffers are in an appropriate state while the IO is ongoing. For + * that the AIO subsystem needs to have its own buffer pin, otherwise an error + * in this backend could lead to this backend's buffer pin being released as + * part of error handling, which in turn could lead to the buffer being + * replaced while IO is ongoing. + */ +static pg_attribute_always_inline void +buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp) +{ + uint64 *io_data; + uint8 handle_data_len; + PgAioWaitRef io_ref; + BufferTag first PG_USED_FOR_ASSERTS_ONLY = {0}; + + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + + pgaio_io_get_wref(ioh, &io_ref); + + /* iterate over all buffers affected by the vectored readv/writev */ + for (int i = 0; i < handle_data_len; i++) + { + Buffer buffer = (Buffer) io_data[i]; + BufferDesc *buf_hdr = is_temp ? + GetLocalBufferDescriptor(-buffer - 1) + : GetBufferDescriptor(buffer - 1); + uint32 buf_state; + + /* + * Check that all the buffers are actually ones that could conceivably + * be done in one IO, i.e. are sequential. This is the last + * buffer-aware code before IO is actually executed and confusion + * about which buffers are targeted by IO can be hard to debug, making + * it worth doing extra-paranoid checks. + */ + if (i == 0) + first = buf_hdr->tag; + else + { + Assert(buf_hdr->tag.relNumber == first.relNumber); + Assert(buf_hdr->tag.blockNum == first.blockNum + i); + } + + if (is_temp) + buf_state = pg_atomic_read_u32(&buf_hdr->state); + else + buf_state = LockBufHdr(buf_hdr); + + /* verify the buffer is in the expected state */ + Assert(buf_state & BM_TAG_VALID); + if (is_write) + { + Assert(buf_state & BM_VALID); + Assert(buf_state & BM_DIRTY); + } + else + { + Assert(!(buf_state & BM_VALID)); + Assert(!(buf_state & BM_DIRTY)); + } + + /* temp buffers don't use BM_IO_IN_PROGRESS */ + if (!is_temp) + Assert(buf_state & BM_IO_IN_PROGRESS); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1); + + /* + * Reflect that the buffer is now owned by the AIO subsystem. + * + * For local buffers: This can't be done just via LocalRefCount, as + * one might initially think, as this backend could error out while + * AIO is still in progress, releasing all the pins by the backend + * itself. + * + * This pin is released again in TerminateBufferIO(). + */ + buf_state += BUF_REFCOUNT_ONE; + buf_hdr->io_wref = io_ref; + + if (is_temp) + pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state); + else + UnlockBufHdr(buf_hdr, buf_state); + + /* + * Ensure the content lock that prevents buffer modifications while + * the buffer is being written out is not released early due to an + * error. + */ + if (is_write && !is_temp) + { + LWLock *content_lock; + + content_lock = BufferDescriptorGetContentLock(buf_hdr); + + Assert(LWLockHeldByMe(content_lock)); + + /* + * Lock is now owned by AIO subsystem. + */ + LWLockDisown(content_lock); + } + + /* + * Stop tracking this buffer via the resowner - the AIO system now + * keeps track. + */ + if (!is_temp) + ResourceOwnerForgetBufferIO(CurrentResourceOwner, buffer); + } +} + +/* + * Decode readv errors as encoded by buffer_readv_encode_error(). + */ +static inline void +buffer_readv_decode_error(PgAioResult result, + bool *zeroed_any, + bool *ignored_any, + uint8 *zeroed_or_error_count, + uint8 *checkfail_count, + uint8 *first_off) +{ + uint32 rem_error = result.error_data; + + /* see static asserts in buffer_readv_encode_error */ +#define READV_COUNT_BITS 7 +#define READV_COUNT_MASK ((1 << READV_COUNT_BITS) - 1) + + *zeroed_any = rem_error & 1; + rem_error >>= 1; + + *ignored_any = rem_error & 1; + rem_error >>= 1; + + *zeroed_or_error_count = rem_error & READV_COUNT_MASK; + rem_error >>= READV_COUNT_BITS; + + *checkfail_count = rem_error & READV_COUNT_MASK; + rem_error >>= READV_COUNT_BITS; + + *first_off = rem_error & READV_COUNT_MASK; + rem_error >>= READV_COUNT_BITS; +} + +/* + * Helper to encode errors for buffer_readv_complete() + * + * Errors are encoded as follows: + * - bit 0 indicates whether any page was zeroed (1) or not (0) + * - bit 1 indicates whether any checksum failure was ignored (1) or not (0) + * - next READV_COUNT_BITS bits indicate the number of errored or zeroed pages + * - next READV_COUNT_BITS bits indicate the number of checksum failures + * - next READV_COUNT_BITS bits indicate the first offset of the first page + * that was errored or zeroed or, if no errors/zeroes, the first ignored + * checksum + */ +static inline void +buffer_readv_encode_error(PgAioResult *result, + bool is_temp, + bool zeroed_any, + bool ignored_any, + uint8 error_count, + uint8 zeroed_count, + uint8 checkfail_count, + uint8 first_error_off, + uint8 first_zeroed_off, + uint8 first_ignored_off) +{ + + uint8 shift = 0; + uint8 zeroed_or_error_count = + error_count > 0 ? error_count : zeroed_count; + uint8 first_off; + + StaticAssertStmt(PG_IOV_MAX <= 1 << READV_COUNT_BITS, + "PG_IOV_MAX is bigger than reserved space for error data"); + StaticAssertStmt((1 + 1 + 3 * READV_COUNT_BITS) <= PGAIO_RESULT_ERROR_BITS, + "PGAIO_RESULT_ERROR_BITS is insufficient for buffer_readv"); + + /* + * We only have space to encode one offset - but luckily that's good + * enough. If there is an error, the error is the interesting offset, same + * with a zeroed buffer vs an ignored buffer. + */ + if (error_count > 0) + first_off = first_error_off; + else if (zeroed_count > 0) + first_off = first_zeroed_off; + else + first_off = first_ignored_off; + + Assert(!zeroed_any || error_count == 0); + + result->error_data = 0; + + result->error_data |= zeroed_any << shift; + shift += 1; + + result->error_data |= ignored_any << shift; + shift += 1; + + result->error_data |= ((uint32) zeroed_or_error_count) << shift; + shift += READV_COUNT_BITS; + + result->error_data |= ((uint32) checkfail_count) << shift; + shift += READV_COUNT_BITS; + + result->error_data |= ((uint32) first_off) << shift; + shift += READV_COUNT_BITS; + + result->id = is_temp ? PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV; + + if (error_count > 0) + result->status = PGAIO_RS_ERROR; + else + result->status = PGAIO_RS_WARNING; + + /* + * The encoding is complicated enough to warrant cross-checking it against + * the decode function. + */ +#ifdef USE_ASSERT_CHECKING + { + bool zeroed_any_2, + ignored_any_2; + uint8 zeroed_or_error_count_2, + checkfail_count_2, + first_off_2; + + buffer_readv_decode_error(*result, + &zeroed_any_2, &ignored_any_2, + &zeroed_or_error_count_2, + &checkfail_count_2, + &first_off_2); + Assert(zeroed_any == zeroed_any_2); + Assert(ignored_any == ignored_any_2); + Assert(zeroed_or_error_count == zeroed_or_error_count_2); + Assert(checkfail_count == checkfail_count_2); + Assert(first_off == first_off_2); + } +#endif + +#undef READV_COUNT_BITS +#undef READV_COUNT_MASK +} + +/* + * Helper for AIO readv completion callbacks, supporting both shared and temp + * buffers. Gets called once for each buffer in a multi-page read. + */ +static pg_attribute_always_inline void +buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer, + uint8 flags, bool failed, bool is_temp, + bool *buffer_invalid, + bool *failed_checksum, + bool *ignored_checksum, + bool *zeroed_buffer) +{ + BufferDesc *buf_hdr = is_temp ? + GetLocalBufferDescriptor(-buffer - 1) + : GetBufferDescriptor(buffer - 1); + BufferTag tag = buf_hdr->tag; + char *bufdata = BufferGetBlock(buffer); + uint32 set_flag_bits; + int piv_flags; + + /* check that the buffer is in the expected state for a read */ +#ifdef USE_ASSERT_CHECKING + { + uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state); + + Assert(buf_state & BM_TAG_VALID); + Assert(!(buf_state & BM_VALID)); + /* temp buffers don't use BM_IO_IN_PROGRESS */ + if (!is_temp) + Assert(buf_state & BM_IO_IN_PROGRESS); + Assert(!(buf_state & BM_DIRTY)); + } +#endif + + *buffer_invalid = false; + *failed_checksum = false; + *ignored_checksum = false; + *zeroed_buffer = false; + + /* + * We ask PageIsVerified() to only log the message about checksum errors, + * as the completion might be run in any backend (or IO workers). We will + * report checksum errors in buffer_readv_report(). + */ + piv_flags = PIV_LOG_LOG; + + /* the local zero_damaged_pages may differ from the definer's */ + if (flags & READ_BUFFERS_IGNORE_CHECKSUM_FAILURES) + piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE; + + /* Check for garbage data. */ + if (!failed) + { + PgAioResult result_one; + + if (!PageIsVerified((Page) bufdata, tag.blockNum, piv_flags, + failed_checksum)) + { + if (flags & READ_BUFFERS_ZERO_ON_ERROR) + { + memset(bufdata, 0, BLCKSZ); + *zeroed_buffer = true; + } + else + { + *buffer_invalid = true; + /* mark buffer as having failed */ + failed = true; + } + } + else if (*failed_checksum) + *ignored_checksum = true; + + /* + * Immediately log a message about the invalid page, but only to the + * server log. The reason to do so immediately is that this may be + * executed in a different backend than the one that originated the + * request. The reason to do so immediately is that the originator + * might not process the query result immediately (because it is busy + * doing another part of query processing) or at all (e.g. if it was + * cancelled or errored out due to another IO also failing). The + * definer of the IO will emit an ERROR or WARNING when processing the + * IO's results + * + * To avoid duplicating the code to emit these log messages, we reuse + * buffer_readv_report(). + */ + if (*buffer_invalid || *failed_checksum || *zeroed_buffer) + { + buffer_readv_encode_error(&result_one, is_temp, + *zeroed_buffer, + *ignored_checksum, + *buffer_invalid, + *zeroed_buffer ? 1 : 0, + *failed_checksum ? 1 : 0, + buf_off, buf_off, buf_off); + pgaio_result_report(result_one, td, LOG_SERVER_ONLY); + } + } + + /* Terminate I/O and set BM_VALID. */ + set_flag_bits = failed ? BM_IO_ERROR : BM_VALID; + if (is_temp) + TerminateLocalBufferIO(buf_hdr, false, set_flag_bits, true); + else + TerminateBufferIO(buf_hdr, false, set_flag_bits, false, true); + + /* + * Call the BUFFER_READ_DONE tracepoint in the callback, even though the + * callback may not be executed in the same backend that called + * BUFFER_READ_START. The alternative would be to defer calling the + * tracepoint to a later point (e.g. the local completion callback for + * shared buffer reads), which seems even less helpful. + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum, + tag.blockNum, + tag.spcOid, + tag.dbOid, + tag.relNumber, + is_temp ? MyProcNumber : INVALID_PROC_NUMBER, + false); +} + +/* + * Perform completion handling of a single AIO read. This read may cover + * multiple blocks / buffers. + * + * Shared between shared and local buffers, to reduce code duplication. + */ +static pg_attribute_always_inline PgAioResult +buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data, bool is_temp) +{ + PgAioResult result = prior_result; + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + uint8 first_error_off = 0; + uint8 first_zeroed_off = 0; + uint8 first_ignored_off = 0; + uint8 error_count = 0; + uint8 zeroed_count = 0; + uint8 ignored_count = 0; + uint8 checkfail_count = 0; + uint64 *io_data; + uint8 handle_data_len; + + if (is_temp) + { + Assert(td->smgr.is_temp); + Assert(pgaio_io_get_owner(ioh) == MyProcNumber); + } + else + Assert(!td->smgr.is_temp); + + /* + * Iterate over all the buffers affected by this IO and call the + * per-buffer completion function for each buffer. + */ + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++) + { + Buffer buf = io_data[buf_off]; + bool failed; + bool failed_verification = false; + bool failed_checksum = false; + bool zeroed_buffer = false; + bool ignored_checksum = false; + + Assert(BufferIsValid(buf)); + + /* + * If the entire I/O failed on a lower-level, each buffer needs to be + * marked as failed. In case of a partial read, the first few buffers + * may be ok. + */ + failed = + prior_result.status == PGAIO_RS_ERROR + || prior_result.result <= buf_off; + + buffer_readv_complete_one(td, buf_off, buf, cb_data, failed, is_temp, + &failed_verification, + &failed_checksum, + &ignored_checksum, + &zeroed_buffer); + + /* + * Track information about the number of different kinds of error + * conditions across all pages, as there can be multiple pages failing + * verification as part of one IO. + */ + if (failed_verification && !zeroed_buffer && error_count++ == 0) + first_error_off = buf_off; + if (zeroed_buffer && zeroed_count++ == 0) + first_zeroed_off = buf_off; + if (ignored_checksum && ignored_count++ == 0) + first_ignored_off = buf_off; + if (failed_checksum) + checkfail_count++; + } + + /* + * If the smgr read succeeded [partially] and page verification failed for + * some of the pages, adjust the IO's result state appropriately. + */ + if (prior_result.status != PGAIO_RS_ERROR && + (error_count > 0 || ignored_count > 0 || zeroed_count > 0)) + { + buffer_readv_encode_error(&result, is_temp, + zeroed_count > 0, ignored_count > 0, + error_count, zeroed_count, checkfail_count, + first_error_off, first_zeroed_off, + first_ignored_off); + pgaio_result_report(result, td, DEBUG1); + } + + /* + * For shared relations this reporting is done in + * shared_buffer_readv_complete_local(). + */ + if (is_temp && checkfail_count > 0) + pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid, + checkfail_count); + + return result; +} + +/* + * AIO error reporting callback for aio_shared_buffer_readv_cb and + * aio_local_buffer_readv_cb. + * + * The error is encoded / decoded in buffer_readv_encode_error() / + * buffer_readv_decode_error(). + */ +static void +buffer_readv_report(PgAioResult result, const PgAioTargetData *td, + int elevel) +{ + int nblocks = td->smgr.nblocks; + BlockNumber first = td->smgr.blockNum; + BlockNumber last = first + nblocks - 1; + ProcNumber errProc = + td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER; + RelPathStr rpath = + relpathbackend(td->smgr.rlocator, errProc, td->smgr.forkNum); + bool zeroed_any, + ignored_any; + uint8 zeroed_or_error_count, + checkfail_count, + first_off; + uint8 affected_count; + const char *msg_one, + *msg_mult, + *det_mult, + *hint_mult; + + buffer_readv_decode_error(result, &zeroed_any, &ignored_any, + &zeroed_or_error_count, + &checkfail_count, + &first_off); + + /* + * Treat a read that had both zeroed buffers *and* ignored checksums as a + * special case, it's too irregular to be emitted the same way as the + * other cases. + */ + if (zeroed_any && ignored_any) + { + Assert(zeroed_any && ignored_any); + Assert(nblocks > 1); /* same block can't be both zeroed and ignored */ + Assert(result.status != PGAIO_RS_ERROR); + affected_count = zeroed_or_error_count; + + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg("zeroing %u page(s) and ignoring %u checksum failure(s) among blocks %u..%u of relation %s", + affected_count, checkfail_count, first, last, rpath.str), + affected_count > 1 ? + errdetail("Block %u held first zeroed page.", + first + first_off) : 0, + errhint("See server log for details about the other %u invalid block(s).", + affected_count + checkfail_count - 1)); + return; + } + + /* + * The other messages are highly repetitive. To avoid duplicating a long + * and complicated ereport(), gather the translated format strings + * separately and then do one common ereport. + */ + if (result.status == PGAIO_RS_ERROR) + { + Assert(!zeroed_any); /* can't have invalid pages when zeroing them */ + affected_count = zeroed_or_error_count; + msg_one = _("invalid page in block %u of relation %s"); + msg_mult = _("%u invalid pages among blocks %u..%u of relation %s"); + det_mult = _("Block %u held first invalid page."); + hint_mult = _("See server log for the other %u invalid block(s)."); + } + else if (zeroed_any && !ignored_any) + { + affected_count = zeroed_or_error_count; + msg_one = _("invalid page in block %u of relation %s; zeroing out page"); + msg_mult = _("zeroing out %u invalid pages among blocks %u..%u of relation %s"); + det_mult = _("Block %u held first zeroed page."); + hint_mult = _("See server log for the other %u zeroed block(s)."); + } + else if (!zeroed_any && ignored_any) + { + affected_count = checkfail_count; + msg_one = _("ignoring checksum failure in block %u of relation %s"); + msg_mult = _("ignoring %u checksum failures among blocks %u..%u of relation %s"); + det_mult = _("Block %u held first ignored page."); + hint_mult = _("See server log for the other %u ignored block(s)."); + } + else + pg_unreachable(); + + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + affected_count == 1 ? + errmsg_internal(msg_one, first + first_off, rpath.str) : + errmsg_internal(msg_mult, affected_count, first, last, rpath.str), + affected_count > 1 ? errdetail_internal(det_mult, first + first_off) : 0, + affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0); +} + +static void +shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) +{ + buffer_stage_common(ioh, false, false); +} + +static PgAioResult +shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data) +{ + return buffer_readv_complete(ioh, prior_result, cb_data, false); +} + +/* + * We need a backend-local completion callback for shared buffers, to be able + * to report checksum errors correctly. Unfortunately that can only safely + * happen if the reporting backend has previously called + * pgstat_prepare_report_checksum_failure(), which we can only guarantee in + * the backend that started the IO. Hence this callback. + */ +static PgAioResult +shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data) +{ + bool zeroed_any, + ignored_any; + uint8 zeroed_or_error_count, + checkfail_count, + first_off; + + if (prior_result.status == PGAIO_RS_OK) + return prior_result; + + buffer_readv_decode_error(prior_result, + &zeroed_any, + &ignored_any, + &zeroed_or_error_count, + &checkfail_count, + &first_off); + + if (checkfail_count) + { + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + + pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid, + checkfail_count); + } + + return prior_result; +} + +static void +local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) +{ + buffer_stage_common(ioh, false, true); +} + +static PgAioResult +local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data) +{ + return buffer_readv_complete(ioh, prior_result, cb_data, true); +} + +/* readv callback is passed READ_BUFFERS_* flags as callback data */ +const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { + .stage = shared_buffer_readv_stage, + .complete_shared = shared_buffer_readv_complete, + /* need a local callback to report checksum failures */ + .complete_local = shared_buffer_readv_complete_local, + .report = buffer_readv_report, +}; + +/* readv callback is passed READ_BUFFERS_* flags as callback data */ +const PgAioHandleCallbacks aio_local_buffer_readv_cb = { + .stage = local_buffer_readv_stage, + + /* + * Note that this, in contrast to the shared_buffers case, uses + * complete_local, as only the issuing backend has access to the required + * datastructures. This is important in case the IO completion may be + * consumed incidentally by another backend. + */ + .complete_local = local_buffer_readv_complete, + .report = buffer_readv_report, +}; diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 3a722321533..bf89076bb10 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -18,6 +18,7 @@ #include "access/parallel.h" #include "executor/instrument.h" #include "pgstat.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -187,7 +188,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) * Try to start an I/O operation. There currently are no reasons for * StartLocalBufferIO to return false, so we raise an error in that case. */ - if (!StartLocalBufferIO(bufHdr, false)) + if (!StartLocalBufferIO(bufHdr, false, false)) elog(ERROR, "failed to start write IO on local buffer"); /* Find smgr relation for buffer */ @@ -211,7 +212,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) IOOP_WRITE, io_start, 1, BLCKSZ); /* Mark not-dirty */ - TerminateLocalBufferIO(bufHdr, true, 0); + TerminateLocalBufferIO(bufHdr, true, 0, false); pgBufferUsage.local_blks_written++; } @@ -430,7 +431,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state); /* no need to loop for local buffers */ - StartLocalBufferIO(existing_hdr, true); + StartLocalBufferIO(existing_hdr, true, false); } else { @@ -446,7 +447,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, hresult->id = victim_buf_id; - StartLocalBufferIO(victim_buf_hdr, true); + StartLocalBufferIO(victim_buf_hdr, true, false); } } @@ -515,13 +516,31 @@ MarkLocalBufferDirty(Buffer buffer) * Like StartBufferIO, but for local buffers */ bool -StartLocalBufferIO(BufferDesc *bufHdr, bool forInput) +StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait) { - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + uint32 buf_state; + + /* + * With AIO the buffer could have IO in progress, e.g. when there are two + * scans of the same relation. Either wait for the other IO or return + * false. + */ + if (pgaio_wref_valid(&bufHdr->io_wref)) + { + PgAioWaitRef iow = bufHdr->io_wref; + if (nowait) + return false; + + pgaio_wref_wait(&iow); + } + + /* Once we get here, there is definitely no I/O active on this buffer */ + + /* Check if someone else already did the I/O */ + buf_state = pg_atomic_read_u32(&bufHdr->state); if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { - /* someone else already did the I/O */ return false; } @@ -536,7 +555,8 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput) * Like TerminateBufferIO, but for local buffers */ void -TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits) +TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, + bool release_aio) { /* Only need to adjust flags */ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); @@ -549,12 +569,22 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit if (clear_dirty) buf_state &= ~BM_DIRTY; + if (release_aio) + { + /* release pin held by IO subsystem, see also buffer_stage_common() */ + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state -= BUF_REFCOUNT_ONE; + pgaio_wref_clear(&bufHdr->io_wref); + } + buf_state |= set_flag_bits; pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); /* local buffers don't track IO using resowners */ /* local buffers don't use the IO CV, as no other process can see buffer */ + + /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */ } /* @@ -575,6 +605,19 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced) uint32 buf_state; LocalBufferLookupEnt *hresult; + /* + * It's possible that we started IO on this buffer before e.g. aborting + * the transaction that created a table. We need to wait for that IO to + * complete before removing / reusing the buffer. + */ + if (pgaio_wref_valid(&bufHdr->io_wref)) + { + PgAioWaitRef iow = bufHdr->io_wref; + + pgaio_wref_wait(&iow); + Assert(!pgaio_wref_valid(&bufHdr->io_wref)); + } + buf_state = pg_atomic_read_u32(&bufHdr->state); /* @@ -714,6 +757,8 @@ InitLocalBuffers(void) */ buf->buf_id = -i - 2; + pgaio_wref_clear(&buf->io_wref); + /* * Intentionally do not initialize the buffer's atomic variable * (besides zeroing the underlying memory above). That way we get diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index 0afeab5140c..82457bacc62 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -78,8 +78,8 @@ PageInit(Page page, Size pageSize, Size specialSize) * treat such a page as empty and without free space. Eventually, VACUUM * will clean up such a page and make it usable. * - * If flag PIV_LOG_WARNING is set, a WARNING is logged in the event of - * a checksum failure. + * If flag PIV_LOG_WARNING/PIV_LOG_LOG is set, a WARNING/LOG message is logged + * in the event of a checksum failure. * * If flag PIV_IGNORE_CHECKSUM_FAILURE is set, checksum failures will cause a * message about the failure to be emitted, but will not cause @@ -143,13 +143,13 @@ PageIsVerified(PageData *page, BlockNumber blkno, int flags, bool *checksum_fail return true; /* - * Throw a WARNING if the checksum fails, but only after we've checked for - * the all-zeroes case. + * Throw a WARNING/LOG, as instructed by PIV_LOG_*, if the checksum fails, + * but only after we've checked for the all-zeroes case. */ if (checksum_failure) { - if ((flags & PIV_LOG_WARNING) != 0) - ereport(WARNING, + if ((flags & (PIV_LOG_WARNING | PIV_LOG_LOG)) != 0) + ereport(flags & PIV_LOG_WARNING ? WARNING : LOG, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("page verification failed, calculated checksum %u but expected %u", checksum, p->pd_checksum))); diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 4ab4b05145a..9fe9d9ad9fa 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -194,9 +194,13 @@ typedef enum PgAioHandleCallbackID PGAIO_HCB_INVALID = 0, PGAIO_HCB_MD_READV, + + PGAIO_HCB_SHARED_BUFFER_READV, + + PGAIO_HCB_LOCAL_BUFFER_READV, } PgAioHandleCallbackID; -#define PGAIO_HCB_MAX PGAIO_HCB_MD_READV +#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS), "PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS"); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 9327f60c44c..72b36a4af26 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -17,6 +17,7 @@ #include "pgstat.h" #include "port/atomics.h" +#include "storage/aio_types.h" #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" @@ -264,6 +265,8 @@ typedef struct BufferDesc int wait_backend_pgprocno; /* backend of pin-count waiter */ int freeNext; /* link in freelist chain */ + + PgAioWaitRef io_wref; /* set iff AIO is in progress */ LWLock content_lock; /* to lock access to buffer contents */ } BufferDesc; @@ -472,8 +475,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr, uint32 *extended_by); extern void MarkLocalBufferDirty(Buffer buffer); extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, - uint32 set_flag_bits); -extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput); + uint32 set_flag_bits, bool release_aio); +extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); extern void DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum, diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 538b890a51d..11f8508a90b 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -15,6 +15,7 @@ #define BUFMGR_H #include "port/pg_iovec.h" +#include "storage/aio_types.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -111,6 +112,8 @@ typedef struct BufferManagerRelation #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0) /* Call smgrprefetch() if I/O necessary. */ #define READ_BUFFERS_ISSUE_ADVICE (1 << 1) +/* Don't treat page as invalid due to checksum failures. */ +#define READ_BUFFERS_IGNORE_CHECKSUM_FAILURES (1 << 2) struct ReadBuffersOperation { @@ -170,6 +173,9 @@ extern PGDLLIMPORT int checkpoint_flush_after; extern PGDLLIMPORT int backend_flush_after; extern PGDLLIMPORT int bgwriter_flush_after; +extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_local_buffer_readv_cb; + /* in buf_init.c */ extern PGDLLIMPORT char *BufferBlocks; diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 26d0a551fc9..aeb67c498c5 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -467,6 +467,7 @@ do { \ /* flags for PageIsVerified() */ #define PIV_LOG_WARNING (1 << 0) +#define PIV_LOG_LOG (1 << 1) #define PIV_IGNORE_CHECKSUM_FAILURE (1 << 2) #define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap) \ |