diff options
author | Thomas Munro <tmunro@postgresql.org> | 2021-05-10 16:00:53 +1200 |
---|---|---|
committer | Thomas Munro <tmunro@postgresql.org> | 2021-05-10 16:06:09 +1200 |
commit | c2dc19342e05e081dc13b296787baa38352681ef (patch) | |
tree | 10ba15831ecc5e9795912cac612c871ffad63a82 /src/backend/access/transam/xlog.c | |
parent | 63db0ac3f9e6bae313da67f640c95c0045b7f0ee (diff) | |
download | postgresql-c2dc19342e05e081dc13b296787baa38352681ef.tar.gz postgresql-c2dc19342e05e081dc13b296787baa38352681ef.zip |
Revert recovery prefetching feature.
This set of commits has some bugs with known fixes, but at this late
stage in the release cycle it seems best to revert and resubmit next
time, along with some new automated test coverage for this whole area.
Commits reverted:
dc88460c: Doc: Review for "Optionally prefetch referenced data in recovery."
1d257577: Optionally prefetch referenced data in recovery.
f003d9f8: Add circular WAL decoding buffer.
323cbe7c: Remove read_page callback from XLogReader.
Remove the new GUC group WAL_RECOVERY recently added by a55a9847, as the
corresponding section of config.sgml is now reverted.
Discussion: https://postgr.es/m/CAOuzzgrn7iKnFRsB4MHp3UisEQAGgZMbk_ViTN4HV4-Ksq8zCg%40mail.gmail.com
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 188 |
1 files changed, 75 insertions, 113 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index adfc6f67e29..c1d4415a433 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -35,7 +35,6 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" -#include "access/xlogprefetch.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -111,7 +110,6 @@ int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; -int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; #ifdef WAL_DEBUG @@ -813,13 +811,17 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readSource indicates where we got the - * currently open file from. + * will be just past that page. readLen indicates how much of the current + * page has been read into readBuf, and readSource indicates where we got + * the currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; +static XLogSegNo readSegNo = 0; +static uint32 readOff = 0; +static uint32 readLen = 0; static XLogSource readSource = XLOG_FROM_ANY; /* @@ -836,6 +838,13 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; +} XLogPageReadPrivate; + /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -911,13 +920,10 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static bool XLogPageRead(XLogReaderState *state, - bool fetching_ckpt, int emode, bool randAccess, - bool nowait); +static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, - XLogRecPtr tliRecPtr, - XLogSegNo readSegNo); + bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -1212,7 +1218,6 @@ XLogInsertRecord(XLogRecData *rdata, StringInfoData recordBuf; char *errormsg = NULL; MemoryContext oldCxt; - DecodedXLogRecord *decoded; oldCxt = MemoryContextSwitchTo(walDebugCxt); @@ -1228,19 +1233,15 @@ XLogInsertRecord(XLogRecData *rdata, for (; rdata != NULL; rdata = rdata->next) appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); - /* How much space would it take to decode this record? */ - decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len)); - if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(), NULL); if (!debug_reader) { appendStringInfoString(&buf, "error decoding record: out of memory"); } - else if (!DecodeXLogRecord(debug_reader, decoded, - (XLogRecord *) recordBuf.data, - EndPos, + else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data, &errormsg)) { appendStringInfo(&buf, "error decoding record: %s", @@ -1249,17 +1250,10 @@ XLogInsertRecord(XLogRecData *rdata, else { appendStringInfoString(&buf, " - "); - /* - * Temporarily make this decoded record the current record for - * XLogRecGetXXX() macros. - */ - debug_reader->record = decoded; xlog_outdesc(&buf, debug_reader); - debug_reader->record = NULL; } elog(LOG, "%s", buf.data); - pfree(decoded); pfree(buf.data); pfree(recordBuf.data); MemoryContextSwitchTo(oldCxt); @@ -1433,7 +1427,7 @@ checkXLogConsistency(XLogReaderState *record) Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0); - for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) + for (block_id = 0; block_id <= record->max_block_id; block_id++) { Buffer buf; Page page; @@ -1464,7 +1458,7 @@ checkXLogConsistency(XLogReaderState *record) * temporary page. */ buf = XLogReadBufferExtended(rnode, forknum, blkno, - RBM_NORMAL_NO_LOG, InvalidBuffer); + RBM_NORMAL_NO_LOG); if (!BufferIsValid(buf)) continue; @@ -3732,6 +3726,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", xlogfname); set_ps_display(activitymsg); + restoredFromArchive = RestoreArchivedFile(path, xlogfname, "RECOVERYXLOG", wal_segment_size, @@ -4378,7 +4373,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; + + /* Pass through parameters to XLogPageRead */ + private->fetching_ckpt = fetching_ckpt; + private->emode = emode; + private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4386,19 +4386,10 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; - XLogReadRecordResult result; - - while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) - == XLREAD_NEED_DATA) - { - if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess, - false /* wait for data if streaming */)) - break; - } + record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; - if (record == NULL) { if (readFile >= 0) @@ -6466,6 +6457,7 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; + XLogPageReadPrivate private; bool promoted = false; struct stat st; @@ -6624,9 +6616,13 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ + MemSet(&private, 0, sizeof(XLogPageReadPrivate)); xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); - + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -6635,12 +6631,6 @@ StartupXLOG(void) xlogreader->system_identifier = ControlFile->system_identifier; /* - * Set the WAL decode buffer size. This limits how far ahead we can read - * in the WAL. - */ - XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); - - /* * Allocate two page buffers dedicated to WAL consistency checks. We do * it this way, rather than just making static arrays, for two reasons: * (1) no need to waste the storage in most instantiations of the backend; @@ -7320,7 +7310,6 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; - XLogPrefetchState prefetch; PGRUsage ru0; pg_rusage_init(&ru0); @@ -7331,9 +7320,6 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", LSN_FORMAT_ARGS(ReadRecPtr)))); - /* Prepare to prefetch, if configured. */ - XLogPrefetchBegin(&prefetch, xlogreader); - /* * main redo apply loop */ @@ -7363,14 +7349,6 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); - /* Perform WAL prefetching, if enabled. */ - while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA) - { - if (!XLogPageRead(xlogreader, false, LOG, false, - true /* don't wait for streaming data */)) - break; - } - /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7544,9 +7522,6 @@ StartupXLOG(void) */ if (AllowCascadeReplication()) WalSndWakeup(); - - /* Reset the prefetcher. */ - XLogPrefetchReconfigure(); } /* Exit loop if we reached inclusive recovery target */ @@ -7563,7 +7538,6 @@ StartupXLOG(void) /* * end of main redo apply loop */ - XLogPrefetchEnd(&prefetch); if (reachedRecoveryTarget) { @@ -7845,8 +7819,7 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == - XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -10338,7 +10311,7 @@ xlog_redo(XLogReaderState *record) * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info * code just to distinguish them for statistics purposes. */ - for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) + for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++) { Buffer buffer; @@ -10473,7 +10446,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record) int block_id; /* decode block references */ - for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) + for (block_id = 0; block_id <= record->max_block_id; block_id++) { RelFileNode rnode; ForkNumber forknum; @@ -12133,19 +12106,14 @@ CancelBackup(void) * and call XLogPageRead() again with the same arguments. This lets * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. - * - * If nowait is true, then return false immediately if the requested data isn't - * available yet. */ -static bool -XLogPageRead(XLogReaderState *state, - bool fetching_ckpt, int emode, bool randAccess, bool nowait) -{ - char *readBuf = state->readBuf; - XLogRecPtr targetPagePtr = state->readPagePtr; - int reqLen = state->reqLen; - int readLen = 0; - XLogRecPtr targetRecPtr = state->DecodeRecPtr; +static int +XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf) +{ + XLogPageReadPrivate *private = + (XLogPageReadPrivate *) xlogreader->private_data; + int emode = private->emode; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12158,7 +12126,7 @@ XLogPageRead(XLogReaderState *state, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size)) + !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -12166,10 +12134,10 @@ XLogPageRead(XLogReaderState *state, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(state->seg.ws_segno)) + if (XLogCheckpointNeeded(readSegNo)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(state->seg.ws_segno)) + if (XLogCheckpointNeeded(readSegNo)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -12179,7 +12147,7 @@ XLogPageRead(XLogReaderState *state, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size); + XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -12187,22 +12155,18 @@ retry: (readSource == XLOG_FROM_STREAM && flushedUpto < targetPagePtr + reqLen)) { - if (nowait) - { - XLogReaderSetInputData(state, -1); - return false; - } - if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - randAccess, fetching_ckpt, - targetRecPtr, state->seg.ws_segno)) + private->randAccess, + private->fetching_ckpt, + targetRecPtr)) { if (readFile >= 0) close(readFile); readFile = -1; + readLen = 0; readSource = XLOG_FROM_ANY; - XLogReaderSetInputData(state, -1); - return false; + + return -1; } } @@ -12229,36 +12193,40 @@ retry: else readLen = XLOG_BLCKSZ; + /* Read the requested page */ + readOff = targetPageOff; + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size); + XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, targetPageOff))); + fname, readOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); + fname, readOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == state->seg.ws_segno); - Assert(readLen >= reqLen); + Assert(targetSegNo == readSegNo); + Assert(targetPageOff == readOff); + Assert(reqLen <= readLen); - state->seg.ws_tli = curFileTLI; + xlogreader->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -12286,16 +12254,14 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) { - /* reset any error StateValidatePageHeader() might have set */ - state->errormsg_buf[0] = '\0'; + /* reset any error XLogReaderValidatePageHeader() might have set */ + xlogreader->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - Assert(state->readPagePtr == targetPagePtr); - XLogReaderSetInputData(state, readLen); - return true; + return readLen; next_record_is_invalid: lastSourceFailed = true; @@ -12303,14 +12269,14 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; + readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - - XLogReaderSetInputData(state, -1); - return false; + else + return -1; } /* @@ -12341,8 +12307,7 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr, - XLogSegNo readSegNo) + bool fetching_ckpt, XLogRecPtr tliRecPtr) { static TimestampTz last_fail_time = 0; TimestampTz now; @@ -12426,7 +12391,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ currentSource = XLOG_FROM_STREAM; startWalReceiver = true; - XLogPrefetchReconfigure(); break; case XLOG_FROM_STREAM: @@ -12661,7 +12625,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * be updated on each cycle. When we are behind, * XLogReceiptTime will not advance, so the grace time * allotted to conflicting queries will decrease. - * */ if (RecPtr < flushedUpto) havedata = true; @@ -12682,7 +12645,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, else havedata = false; } - if (havedata) { /* |