diff options
Diffstat (limited to 'src/backend/access/transam/xlogprefetcher.c')
-rw-r--r-- | src/backend/access/transam/xlogprefetcher.c | 54 |
1 files changed, 36 insertions, 18 deletions
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 9aa56411d55..368aa73ce20 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -72,7 +72,9 @@ int recovery_prefetch = RECOVERY_PREFETCH_TRY; #ifdef USE_PREFETCH -#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF) +#define RecoveryPrefetchEnabled() \ + (recovery_prefetch != RECOVERY_PREFETCH_OFF && \ + maintenance_io_concurrency > 0) #else #define RecoveryPrefetchEnabled() false #endif @@ -985,6 +987,7 @@ XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) { DecodedXLogRecord *record; + XLogRecPtr replayed_up_to; /* * See if it's time to reset the prefetching machinery, because a relevant @@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) if (RecoveryPrefetchEnabled()) { - max_inflight = Max(maintenance_io_concurrency, 2); + Assert(maintenance_io_concurrency > 0); + max_inflight = maintenance_io_concurrency; max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER; } else @@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) } /* - * Release last returned record, if there is one. We need to do this so - * that we can check for empty decode queue accurately. + * Release last returned record, if there is one, as it's now been + * replayed. */ - XLogReleasePreviousRecord(prefetcher->reader); + replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader); - /* If there's nothing queued yet, then start prefetching. */ + /* + * Can we drop any filters yet? If we were waiting for a relation to be + * created or extended, it is now OK to access blocks in the covered + * range. + */ + XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to); + + /* + * All IO initiated by earlier WAL is now completed. This might trigger + * further prefetching. + */ + lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to); + + /* + * If there's nothing queued yet, then start prefetching to cause at least + * one record to be queued. + */ if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader)) + { + Assert(lrq_inflight(prefetcher->streaming_read) == 0); + Assert(lrq_completed(prefetcher->streaming_read) == 0); lrq_prefetch(prefetcher->streaming_read); + } /* Read the next record. */ record = XLogNextRecord(prefetcher->reader, errmsg); @@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) Assert(record == prefetcher->reader->record); /* - * Can we drop any prefetch filters yet, given the record we're about to - * return? This assumes that any records with earlier LSNs have been - * replayed, so if we were waiting for a relation to be created or - * extended, it is now OK to access blocks in the covered range. + * If maintenance_io_concurrency is set very low, we might have started + * prefetching some but not all of the blocks referenced in the record + * we're about to return. Forget about the rest of the blocks in this + * record by dropping the prefetcher's reference to it. */ - XLogPrefetcherCompleteFilters(prefetcher, record->lsn); + if (record == prefetcher->record) + prefetcher->record = NULL; /* * See if it's time to compute some statistics, because enough WAL has @@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn)) XLogPrefetcherComputeStats(prefetcher); - /* - * The caller is about to replay this record, so we can now report that - * all IO initiated because of early WAL must be finished. This may - * trigger more readahead. - */ - lrq_complete_lsn(prefetcher->streaming_read, record->lsn); - Assert(record == prefetcher->reader->record); return &record->header; |