aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogprefetcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogprefetcher.c')
-rw-r--r--src/backend/access/transam/xlogprefetcher.c54
1 files changed, 36 insertions, 18 deletions
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 9aa56411d55..368aa73ce20 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -72,7 +72,9 @@
int recovery_prefetch = RECOVERY_PREFETCH_TRY;
#ifdef USE_PREFETCH
-#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
+#define RecoveryPrefetchEnabled() \
+ (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
+ maintenance_io_concurrency > 0)
#else
#define RecoveryPrefetchEnabled() false
#endif
@@ -985,6 +987,7 @@ XLogRecord *
XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
{
DecodedXLogRecord *record;
+ XLogRecPtr replayed_up_to;
/*
* See if it's time to reset the prefetching machinery, because a relevant
@@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
if (RecoveryPrefetchEnabled())
{
- max_inflight = Max(maintenance_io_concurrency, 2);
+ Assert(maintenance_io_concurrency > 0);
+ max_inflight = maintenance_io_concurrency;
max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
}
else
@@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
}
/*
- * Release last returned record, if there is one. We need to do this so
- * that we can check for empty decode queue accurately.
+ * Release last returned record, if there is one, as it's now been
+ * replayed.
*/
- XLogReleasePreviousRecord(prefetcher->reader);
+ replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
- /* If there's nothing queued yet, then start prefetching. */
+ /*
+ * Can we drop any filters yet? If we were waiting for a relation to be
+ * created or extended, it is now OK to access blocks in the covered
+ * range.
+ */
+ XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
+
+ /*
+ * All IO initiated by earlier WAL is now completed. This might trigger
+ * further prefetching.
+ */
+ lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
+
+ /*
+ * If there's nothing queued yet, then start prefetching to cause at least
+ * one record to be queued.
+ */
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
+ {
+ Assert(lrq_inflight(prefetcher->streaming_read) == 0);
+ Assert(lrq_completed(prefetcher->streaming_read) == 0);
lrq_prefetch(prefetcher->streaming_read);
+ }
/* Read the next record. */
record = XLogNextRecord(prefetcher->reader, errmsg);
@@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
Assert(record == prefetcher->reader->record);
/*
- * Can we drop any prefetch filters yet, given the record we're about to
- * return? This assumes that any records with earlier LSNs have been
- * replayed, so if we were waiting for a relation to be created or
- * extended, it is now OK to access blocks in the covered range.
+ * If maintenance_io_concurrency is set very low, we might have started
+ * prefetching some but not all of the blocks referenced in the record
+ * we're about to return. Forget about the rest of the blocks in this
+ * record by dropping the prefetcher's reference to it.
*/
- XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
+ if (record == prefetcher->record)
+ prefetcher->record = NULL;
/*
* See if it's time to compute some statistics, because enough WAL has
@@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
XLogPrefetcherComputeStats(prefetcher);
- /*
- * The caller is about to replay this record, so we can now report that
- * all IO initiated because of early WAL must be finished. This may
- * trigger more readahead.
- */
- lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
-
Assert(record == prefetcher->reader->record);
return &record->header;