aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogprefetch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogprefetch.c')
-rw-r--r--src/backend/access/transam/xlogprefetch.c923
1 files changed, 0 insertions, 923 deletions
diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c
deleted file mode 100644
index ae4585232be..00000000000
--- a/src/backend/access/transam/xlogprefetch.c
+++ /dev/null
@@ -1,923 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * xlogprefetch.c
- * Prefetching support for recovery.
- *
- * Portions Copyright (c) 2021, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * src/backend/access/transam/xlogprefetch.c
- *
- * The goal of this module is to read future WAL records and issue
- * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O
- * stalls in the main recovery loop.
- *
- * When examining a WAL record from the future, we need to consider that a
- * referenced block or segment file might not exist on disk until this record
- * or some earlier record has been replayed. After a crash, a file might also
- * be missing because it was dropped by a later WAL record; in that case, it
- * will be recreated when this record is replayed. These cases are handled by
- * recognizing them and adding a "filter" that prevents all prefetching of a
- * certain block range until the present WAL record has been replayed. Blocks
- * skipped for these reasons are counted as "skip_new" (that is, cases where we
- * didn't try to prefetch "new" blocks).
- *
- * Blocks found in the buffer pool already are counted as "skip_hit".
- * Repeated access to the same buffer is detected and skipped, and this is
- * counted with "skip_seq". Blocks that were logged with FPWs are skipped if
- * recovery_prefetch_fpw is off, since on most systems there will be no I/O
- * stall; this is counted with "skip_fpw".
- *
- * The only way we currently have to know that an I/O initiated with
- * PrefetchSharedBuffer() has completed is to wait for the corresponding call
- * to XLogReadBufferInRedo() to return. Therefore, we track the number of
- * potentially in-flight I/Os by using a circular buffer of LSNs. When it's
- * full, we have to wait for recovery to replay enough records to remove some
- * LSNs, and only then can we initiate more prefetching. Ideally, this keeps
- * us just the right distance ahead to respect maintenance_io_concurrency,
- * though in practice it errs on the side of being too conservative because
- * many I/Os complete sooner than we know.
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xlog.h"
-#include "access/xlogprefetch.h"
-#include "access/xlogreader.h"
-#include "access/xlogutils.h"
-#include "catalog/storage_xlog.h"
-#include "utils/fmgrprotos.h"
-#include "utils/timestamp.h"
-#include "funcapi.h"
-#include "pgstat.h"
-#include "miscadmin.h"
-#include "port/atomics.h"
-#include "storage/bufmgr.h"
-#include "storage/shmem.h"
-#include "storage/smgr.h"
-#include "utils/guc.h"
-#include "utils/hsearch.h"
-
-/*
- * Sample the queue depth and distance every time we replay this much WAL.
- * This is used to compute avg_queue_depth and avg_distance for the log
- * message that appears at the end of crash recovery. It's also used to send
- * messages periodically to the stats collector, to save the counters on disk.
- */
-#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000
-
-/* GUCs */
-bool recovery_prefetch = false;
-bool recovery_prefetch_fpw = false;
-
-int XLogPrefetchReconfigureCount;
-
-/*
- * A prefetcher object. There is at most one of these in existence at a time,
- * recreated whenever there is a configuration change.
- */
-struct XLogPrefetcher
-{
- /* Reader and current reading state. */
- XLogReaderState *reader;
- DecodedXLogRecord *record;
- int next_block_id;
- bool shutdown;
-
- /* Details of last prefetch to skip repeats and seq scans. */
- SMgrRelation last_reln;
- RelFileNode last_rnode;
- BlockNumber last_blkno;
-
- /* Online averages. */
- uint64 samples;
- double avg_queue_depth;
- double avg_distance;
- XLogRecPtr next_sample_lsn;
-
- /* Book-keeping required to avoid accessing non-existing blocks. */
- HTAB *filter_table;
- dlist_head filter_queue;
-
- /* Book-keeping required to limit concurrent prefetches. */
- int prefetch_head;
- int prefetch_tail;
- int prefetch_queue_size;
- XLogRecPtr prefetch_queue[MAX_IO_CONCURRENCY + 1];
-};
-
-/*
- * A temporary filter used to track block ranges that haven't been created
- * yet, whole relations that haven't been created yet, and whole relations
- * that we must assume have already been dropped.
- */
-typedef struct XLogPrefetcherFilter
-{
- RelFileNode rnode;
- XLogRecPtr filter_until_replayed;
- BlockNumber filter_from_block;
- dlist_node link;
-} XLogPrefetcherFilter;
-
-/*
- * Counters exposed in shared memory for pg_stat_prefetch_recovery.
- */
-typedef struct XLogPrefetchStats
-{
- pg_atomic_uint64 reset_time; /* Time of last reset. */
- pg_atomic_uint64 prefetch; /* Prefetches initiated. */
- pg_atomic_uint64 skip_hit; /* Blocks already buffered. */
- pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
- pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
- pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */
- float avg_distance;
- float avg_queue_depth;
-
- /* Reset counters */
- pg_atomic_uint32 reset_request;
- uint32 reset_handled;
-
- /* Dynamic values */
- int distance; /* Number of bytes ahead in the WAL. */
- int queue_depth; /* Number of I/Os possibly in progress. */
-} XLogPrefetchStats;
-
-static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
- RelFileNode rnode,
- BlockNumber blockno,
- XLogRecPtr lsn);
-static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
- RelFileNode rnode,
- BlockNumber blockno);
-static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
- XLogRecPtr replaying_lsn);
-static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
- XLogRecPtr prefetching_lsn);
-static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
- XLogRecPtr replaying_lsn);
-static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
-static bool XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher,
- XLogRecPtr replaying_lsn);
-static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher);
-static void XLogPrefetchSaveStats(void);
-static void XLogPrefetchRestoreStats(void);
-
-static XLogPrefetchStats *SharedStats;
-
-size_t
-XLogPrefetchShmemSize(void)
-{
- return sizeof(XLogPrefetchStats);
-}
-
-static void
-XLogPrefetchResetStats(void)
-{
- pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
- pg_atomic_write_u64(&SharedStats->prefetch, 0);
- pg_atomic_write_u64(&SharedStats->skip_hit, 0);
- pg_atomic_write_u64(&SharedStats->skip_new, 0);
- pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
- pg_atomic_write_u64(&SharedStats->skip_seq, 0);
- SharedStats->avg_distance = 0;
- SharedStats->avg_queue_depth = 0;
-}
-
-void
-XLogPrefetchShmemInit(void)
-{
- bool found;
-
- SharedStats = (XLogPrefetchStats *)
- ShmemInitStruct("XLogPrefetchStats",
- sizeof(XLogPrefetchStats),
- &found);
-
- if (!found)
- {
- pg_atomic_init_u32(&SharedStats->reset_request, 0);
- SharedStats->reset_handled = 0;
-
- pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
- pg_atomic_init_u64(&SharedStats->prefetch, 0);
- pg_atomic_init_u64(&SharedStats->skip_hit, 0);
- pg_atomic_init_u64(&SharedStats->skip_new, 0);
- pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
- pg_atomic_init_u64(&SharedStats->skip_seq, 0);
- SharedStats->avg_distance = 0;
- SharedStats->avg_queue_depth = 0;
- SharedStats->distance = 0;
- SharedStats->queue_depth = 0;
- }
-}
-
-/*
- * Called when any GUC is changed that affects prefetching.
- */
-void
-XLogPrefetchReconfigure(void)
-{
- XLogPrefetchReconfigureCount++;
-}
-
-/*
- * Called by any backend to request that the stats be reset.
- */
-void
-XLogPrefetchRequestResetStats(void)
-{
- pg_atomic_fetch_add_u32(&SharedStats->reset_request, 1);
-}
-
-/*
- * Tell the stats collector to serialize the shared memory counters into the
- * stats file.
- */
-static void
-XLogPrefetchSaveStats(void)
-{
- PgStat_RecoveryPrefetchStats serialized = {
- .prefetch = pg_atomic_read_u64(&SharedStats->prefetch),
- .skip_hit = pg_atomic_read_u64(&SharedStats->skip_hit),
- .skip_new = pg_atomic_read_u64(&SharedStats->skip_new),
- .skip_fpw = pg_atomic_read_u64(&SharedStats->skip_fpw),
- .skip_seq = pg_atomic_read_u64(&SharedStats->skip_seq),
- .stat_reset_timestamp = pg_atomic_read_u64(&SharedStats->reset_time)
- };
-
- pgstat_send_recoveryprefetch(&serialized);
-}
-
-/*
- * Try to restore the shared memory counters from the stats file.
- */
-static void
-XLogPrefetchRestoreStats(void)
-{
- PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch();
-
- if (serialized->stat_reset_timestamp != 0)
- {
- pg_atomic_write_u64(&SharedStats->prefetch, serialized->prefetch);
- pg_atomic_write_u64(&SharedStats->skip_hit, serialized->skip_hit);
- pg_atomic_write_u64(&SharedStats->skip_new, serialized->skip_new);
- pg_atomic_write_u64(&SharedStats->skip_fpw, serialized->skip_fpw);
- pg_atomic_write_u64(&SharedStats->skip_seq, serialized->skip_seq);
- pg_atomic_write_u64(&SharedStats->reset_time, serialized->stat_reset_timestamp);
- }
-}
-
-/*
- * Increment a counter in shared memory. This is equivalent to *counter++ on a
- * plain uint64 without any memory barrier or locking, except on platforms
- * where readers can't read uint64 without possibly observing a torn value.
- */
-static inline void
-XLogPrefetchIncrement(pg_atomic_uint64 *counter)
-{
- Assert(AmStartupProcess() || !IsUnderPostmaster);
- pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
-}
-
-/*
- * Initialize an XLogPrefetchState object and restore the last saved
- * statistics from disk.
- */
-void
-XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader)
-{
- XLogPrefetchRestoreStats();
-
- /* We'll reconfigure on the first call to XLogPrefetch(). */
- state->reader = reader;
- state->prefetcher = NULL;
- state->reconfigure_count = XLogPrefetchReconfigureCount - 1;
-}
-
-/*
- * Shut down the prefetching infrastructure, if configured.
- */
-void
-XLogPrefetchEnd(XLogPrefetchState *state)
-{
- XLogPrefetchSaveStats();
-
- if (state->prefetcher)
- XLogPrefetcherFree(state->prefetcher);
- state->prefetcher = NULL;
-
- SharedStats->queue_depth = 0;
- SharedStats->distance = 0;
-}
-
-/*
- * Create a prefetcher that is ready to begin prefetching blocks referenced by
- * WAL records.
- */
-XLogPrefetcher *
-XLogPrefetcherAllocate(XLogReaderState *reader)
-{
- XLogPrefetcher *prefetcher;
- static HASHCTL hash_table_ctl = {
- .keysize = sizeof(RelFileNode),
- .entrysize = sizeof(XLogPrefetcherFilter)
- };
-
- /*
- * The size of the queue is based on the maintenance_io_concurrency
- * setting. In theory we might have a separate queue for each tablespace,
- * but it's not clear how that should work, so for now we'll just use the
- * general GUC to rate-limit all prefetching. The queue has space for up
- * the highest possible value of the GUC + 1, because our circular buffer
- * has a gap between head and tail when full.
- */
- prefetcher = palloc0(sizeof(XLogPrefetcher));
- prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1;
- prefetcher->reader = reader;
- prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
- &hash_table_ctl,
- HASH_ELEM | HASH_BLOBS);
- dlist_init(&prefetcher->filter_queue);
-
- SharedStats->queue_depth = 0;
- SharedStats->distance = 0;
-
- return prefetcher;
-}
-
-/*
- * Destroy a prefetcher and release all resources.
- */
-void
-XLogPrefetcherFree(XLogPrefetcher *prefetcher)
-{
- /* Log final statistics. */
- ereport(LOG,
- (errmsg("recovery finished prefetching at %X/%X; "
- "prefetch = %llu, "
- "skip_hit = %llu, "
- "skip_new = %llu, "
- "skip_fpw = %llu, "
- "skip_seq = %llu, "
- "avg_distance = %f, "
- "avg_queue_depth = %f",
- LSN_FORMAT_ARGS(prefetcher->reader->EndRecPtr),
- (unsigned long long) pg_atomic_read_u64(&SharedStats->prefetch),
- (unsigned long long) pg_atomic_read_u64(&SharedStats->skip_hit),
- (unsigned long long) pg_atomic_read_u64(&SharedStats->skip_new),
- (unsigned long long) pg_atomic_read_u64(&SharedStats->skip_fpw),
- (unsigned long long) pg_atomic_read_u64(&SharedStats->skip_seq),
- SharedStats->avg_distance,
- SharedStats->avg_queue_depth)));
- hash_destroy(prefetcher->filter_table);
- pfree(prefetcher);
-}
-
-/*
- * Called when recovery is replaying a new LSN, to check if we can read ahead.
- */
-bool
-XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
-{
- uint32 reset_request;
-
- /* If an error has occurred or we've hit the end of the WAL, do nothing. */
- if (prefetcher->shutdown)
- return false;
-
- /*
- * Have any in-flight prefetches definitely completed, judging by the LSN
- * that is currently being replayed?
- */
- XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
-
- /*
- * Do we already have the maximum permitted number of I/Os running
- * (according to the information we have)? If so, we have to wait for at
- * least one to complete, so give up early and let recovery catch up.
- */
- if (XLogPrefetcherSaturated(prefetcher))
- return false;
-
- /*
- * Can we drop any filters yet? This happens when the LSN that is
- * currently being replayed has moved past a record that prevents
- * prefetching of a block range, such as relation extension.
- */
- XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
-
- /*
- * Have we been asked to reset our stats counters? This is checked with
- * an unsynchronized memory read, but we'll see it eventually and we'll be
- * accessing that cache line anyway.
- */
- reset_request = pg_atomic_read_u32(&SharedStats->reset_request);
- if (reset_request != SharedStats->reset_handled)
- {
- XLogPrefetchResetStats();
- SharedStats->reset_handled = reset_request;
-
- prefetcher->avg_distance = 0;
- prefetcher->avg_queue_depth = 0;
- prefetcher->samples = 0;
- }
-
- /* OK, we can now try reading ahead. */
- return XLogPrefetcherScanRecords(prefetcher, replaying_lsn);
-}
-
-/*
- * Read ahead as far as we are allowed to, considering the LSN that recovery
- * is currently replaying.
- *
- * Return true if the xlogreader would like more data.
- */
-static bool
-XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
-{
- XLogReaderState *reader = prefetcher->reader;
- DecodedXLogRecord *record;
-
- Assert(!XLogPrefetcherSaturated(prefetcher));
-
- for (;;)
- {
- char *error;
- int64 distance;
-
- /* If we don't already have a record, then try to read one. */
- if (prefetcher->record == NULL)
- {
- switch (XLogReadAhead(reader, &record, &error))
- {
- case XLREAD_NEED_DATA:
- return true;
- case XLREAD_FAIL:
- if (error)
- ereport(LOG,
- (errmsg("recovery no longer prefetching: %s",
- error)));
- else
- ereport(LOG,
- (errmsg("recovery no longer prefetching")));
- prefetcher->shutdown = true;
- SharedStats->queue_depth = 0;
- SharedStats->distance = 0;
-
- return false;
- case XLREAD_FULL:
- return false;
- case XLREAD_SUCCESS:
- prefetcher->record = record;
- prefetcher->next_block_id = 0;
- break;
- }
- }
- else
- {
- /*
- * We ran out of I/O queue while part way through a record. We'll
- * carry on where we left off, according to next_block_id.
- */
- record = prefetcher->record;
- }
-
- /* How far ahead of replay are we now? */
- distance = record->lsn - replaying_lsn;
-
- /* Update distance shown in shm. */
- SharedStats->distance = distance;
-
- /* Periodically recompute some statistics. */
- if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
- {
- /* Compute online averages. */
- prefetcher->samples++;
- if (prefetcher->samples == 1)
- {
- prefetcher->avg_distance = SharedStats->distance;
- prefetcher->avg_queue_depth = SharedStats->queue_depth;
- }
- else
- {
- prefetcher->avg_distance +=
- (SharedStats->distance - prefetcher->avg_distance) /
- prefetcher->samples;
- prefetcher->avg_queue_depth +=
- (SharedStats->queue_depth - prefetcher->avg_queue_depth) /
- prefetcher->samples;
- }
-
- /* Expose it in shared memory. */
- SharedStats->avg_distance = prefetcher->avg_distance;
- SharedStats->avg_queue_depth = prefetcher->avg_queue_depth;
-
- /* Also periodically save the simple counters. */
- XLogPrefetchSaveStats();
-
- prefetcher->next_sample_lsn =
- replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE;
- }
-
- /* Are we not far enough ahead? */
- if (distance <= 0)
- {
- /* XXX Is this still possible? */
- prefetcher->record = NULL; /* skip this record */
- continue;
- }
-
- /*
- * If this is a record that creates a new SMGR relation, we'll avoid
- * prefetching anything from that rnode until it has been replayed.
- */
- if (replaying_lsn < record->lsn &&
- record->header.xl_rmid == RM_SMGR_ID &&
- (record->header.xl_info & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
- {
- xl_smgr_create *xlrec = (xl_smgr_create *) record->main_data;
-
- XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, record->lsn);
- }
-
- /* Scan the record's block references. */
- if (!XLogPrefetcherScanBlocks(prefetcher))
- return false;
-
- /* Advance to the next record. */
- prefetcher->record = NULL;
- }
-}
-
-/*
- * Scan the current record for block references, and consider prefetching.
- *
- * Return true if we processed the current record to completion and still have
- * queue space to process a new record, and false if we saturated the I/O
- * queue and need to wait for recovery to advance before we continue.
- */
-static bool
-XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher)
-{
- DecodedXLogRecord *record = prefetcher->record;
-
- Assert(!XLogPrefetcherSaturated(prefetcher));
-
- /*
- * We might already have been partway through processing this record when
- * our queue became saturated, so we need to start where we left off.
- */
- for (int block_id = prefetcher->next_block_id;
- block_id <= record->max_block_id;
- ++block_id)
- {
- DecodedBkpBlock *block = &record->blocks[block_id];
- PrefetchBufferResult prefetch;
- SMgrRelation reln;
-
- /* Ignore everything but the main fork for now. */
- if (block->forknum != MAIN_FORKNUM)
- continue;
-
- /*
- * If there is a full page image attached, we won't be reading the
- * page, so you might think we should skip it. However, if the
- * underlying filesystem uses larger logical blocks than us, it might
- * still need to perform a read-before-write some time later.
- * Therefore, only prefetch if configured to do so.
- */
- if (block->has_image && !recovery_prefetch_fpw)
- {
- XLogPrefetchIncrement(&SharedStats->skip_fpw);
- continue;
- }
-
- /*
- * If this block will initialize a new page then it's probably a
- * relation extension. Since that might create a new segment, we
- * can't try to prefetch this block until the record has been
- * replayed, or we might try to open a file that doesn't exist yet.
- */
- if (block->flags & BKPBLOCK_WILL_INIT)
- {
- XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
- record->lsn);
- XLogPrefetchIncrement(&SharedStats->skip_new);
- continue;
- }
-
- /* Should we skip this block due to a filter? */
- if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
- {
- XLogPrefetchIncrement(&SharedStats->skip_new);
- continue;
- }
-
- /* Fast path for repeated references to the same relation. */
- if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
- {
- /*
- * If this is a repeat access to the same block, then skip it.
- *
- * XXX We could also check for last_blkno + 1 too, and also update
- * last_blkno; it's not clear if the kernel would do a better job
- * of sequential prefetching.
- */
- if (block->blkno == prefetcher->last_blkno)
- {
- XLogPrefetchIncrement(&SharedStats->skip_seq);
- continue;
- }
-
- /* We can avoid calling smgropen(). */
- reln = prefetcher->last_reln;
- }
- else
- {
- /* Otherwise we have to open it. */
- reln = smgropen(block->rnode, InvalidBackendId);
- prefetcher->last_rnode = block->rnode;
- prefetcher->last_reln = reln;
- }
- prefetcher->last_blkno = block->blkno;
-
- /* Try to prefetch this block! */
- prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
- if (BufferIsValid(prefetch.recent_buffer))
- {
- /*
- * It was already cached, so do nothing. We'll remember the
- * buffer, so that recovery can try to avoid looking it up again.
- */
- block->recent_buffer = prefetch.recent_buffer;
- XLogPrefetchIncrement(&SharedStats->skip_hit);
- }
- else if (prefetch.initiated_io)
- {
- /*
- * I/O has possibly been initiated (though we don't know if it was
- * already cached by the kernel, so we just have to assume that it
- * has due to lack of better information). Record this as an I/O
- * in progress until eventually we replay this LSN.
- */
- XLogPrefetchIncrement(&SharedStats->prefetch);
- XLogPrefetcherInitiatedIO(prefetcher, record->lsn);
-
- /*
- * If the queue is now full, we'll have to wait before processing
- * any more blocks from this record, or move to a new record if
- * that was the last block.
- */
- if (XLogPrefetcherSaturated(prefetcher))
- {
- prefetcher->next_block_id = block_id + 1;
- return false;
- }
- }
- else
- {
- /*
- * Neither cached nor initiated. The underlying segment file
- * doesn't exist. Presumably it will be unlinked by a later WAL
- * record. When recovery reads this block, it will use the
- * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do
- * that sort of thing while merely prefetching, so let's just
- * ignore references to this relation until this record is
- * replayed, and let recovery create the dummy file or complain if
- * something is wrong.
- */
- XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
- record->lsn);
- XLogPrefetchIncrement(&SharedStats->skip_new);
- }
- }
-
- return true;
-}
-
-/*
- * Expose statistics about recovery prefetching.
- */
-Datum
-pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS)
-{
-#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10
- ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
- TupleDesc tupdesc;
- Tuplestorestate *tupstore;
- MemoryContext per_query_ctx;
- MemoryContext oldcontext;
- Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
- bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
-
- if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("set-valued function called in context that cannot accept a set")));
- if (!(rsinfo->allowedModes & SFRM_Materialize))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("materialize mode required, but it is not allowed in this context")));
-
- if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
- elog(ERROR, "return type must be a row type");
-
- per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
- oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->returnMode = SFRM_Materialize;
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
-
- MemoryContextSwitchTo(oldcontext);
-
- if (pg_atomic_read_u32(&SharedStats->reset_request) != SharedStats->reset_handled)
- {
- /* There's an unhandled reset request, so just show NULLs */
- for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
- nulls[i] = true;
- }
- else
- {
- for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
- nulls[i] = false;
- }
-
- values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
- values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
- values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_hit));
- values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
- values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
- values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_seq));
- values[6] = Int32GetDatum(SharedStats->distance);
- values[7] = Int32GetDatum(SharedStats->queue_depth);
- values[8] = Float4GetDatum(SharedStats->avg_distance);
- values[9] = Float4GetDatum(SharedStats->avg_queue_depth);
- tuplestore_putvalues(tupstore, tupdesc, values, nulls);
- tuplestore_donestoring(tupstore);
-
- return (Datum) 0;
-}
-
-/*
- * Compute (n + 1) % prefetch_queue_size, assuming n < prefetch_queue_size,
- * without using division.
- */
-static inline int
-XLogPrefetcherNext(XLogPrefetcher *prefetcher, int n)
-{
- int next = n + 1;
-
- return next == prefetcher->prefetch_queue_size ? 0 : next;
-}
-
-/*
- * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
- * has been replayed.
- */
-static inline void
-XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
- BlockNumber blockno, XLogRecPtr lsn)
-{
- XLogPrefetcherFilter *filter;
- bool found;
-
- filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
- if (!found)
- {
- /*
- * Don't allow any prefetching of this block or higher until replayed.
- */
- filter->filter_until_replayed = lsn;
- filter->filter_from_block = blockno;
- dlist_push_head(&prefetcher->filter_queue, &filter->link);
- }
- else
- {
- /*
- * We were already filtering this rnode. Extend the filter's lifetime
- * to cover this WAL record, but leave the (presumably lower) block
- * number there because we don't want to have to track individual
- * blocks.
- */
- filter->filter_until_replayed = lsn;
- dlist_delete(&filter->link);
- dlist_push_head(&prefetcher->filter_queue, &filter->link);
- }
-}
-
-/*
- * Have we replayed the records that caused us to begin filtering a block
- * range? That means that relations should have been created, extended or
- * dropped as required, so we can drop relevant filters.
- */
-static inline void
-XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
-{
- while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
- {
- XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
- link,
- &prefetcher->filter_queue);
-
- if (filter->filter_until_replayed >= replaying_lsn)
- break;
- dlist_delete(&filter->link);
- hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
- }
-}
-
-/*
- * Check if a given block should be skipped due to a filter.
- */
-static inline bool
-XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
- BlockNumber blockno)
-{
- /*
- * Test for empty queue first, because we expect it to be empty most of
- * the time and we can avoid the hash table lookup in that case.
- */
- if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
- {
- XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
- HASH_FIND, NULL);
-
- if (filter && filter->filter_from_block <= blockno)
- return true;
- }
-
- return false;
-}
-
-/*
- * Insert an LSN into the queue. The queue must not be full already. This
- * tracks the fact that we have (to the best of our knowledge) initiated an
- * I/O, so that we can impose a cap on concurrent prefetching.
- */
-static inline void
-XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
- XLogRecPtr prefetching_lsn)
-{
- Assert(!XLogPrefetcherSaturated(prefetcher));
- prefetcher->prefetch_queue[prefetcher->prefetch_head] = prefetching_lsn;
- prefetcher->prefetch_head =
- XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
- SharedStats->queue_depth++;
-
- Assert(SharedStats->queue_depth <= prefetcher->prefetch_queue_size);
-}
-
-/*
- * Have we replayed the records that caused us to initiate the oldest
- * prefetches yet? That means that they're definitely finished, so we can can
- * forget about them and allow ourselves to initiate more prefetches. For now
- * we don't have any awareness of when I/O really completes.
- */
-static inline void
-XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
-{
- while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
- prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
- {
- prefetcher->prefetch_tail =
- XLogPrefetcherNext(prefetcher, prefetcher->prefetch_tail);
- SharedStats->queue_depth--;
-
- Assert(SharedStats->queue_depth >= 0);
- }
-}
-
-/*
- * Check if the maximum allowed number of I/Os is already in flight.
- */
-static inline bool
-XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
-{
- int next = XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
-
- return next == prefetcher->prefetch_tail;
-}
-
-void
-assign_recovery_prefetch(bool new_value, void *extra)
-{
- /* Reconfigure prefetching, because a setting it depends on changed. */
- recovery_prefetch = new_value;
- if (AmStartupProcess())
- XLogPrefetchReconfigure();
-}
-
-void
-assign_recovery_prefetch_fpw(bool new_value, void *extra)
-{
- /* Reconfigure prefetching, because a setting it depends on changed. */
- recovery_prefetch_fpw = new_value;
- if (AmStartupProcess())
- XLogPrefetchReconfigure();
-}