diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 50 |
1 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 729fc5ff13c..adfc6f67e29 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -35,6 +35,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" +#include "access/xlogprefetch.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -110,6 +111,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; +int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; #ifdef WAL_DEBUG @@ -910,7 +912,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); static bool XLogPageRead(XLogReaderState *state, - bool fetching_ckpt, int emode, bool randAccess); + bool fetching_ckpt, int emode, bool randAccess, + bool nowait); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr, @@ -1461,7 +1464,7 @@ checkXLogConsistency(XLogReaderState *record) * temporary page. */ buf = XLogReadBufferExtended(rnode, forknum, blkno, - RBM_NORMAL_NO_LOG); + RBM_NORMAL_NO_LOG, InvalidBuffer); if (!BufferIsValid(buf)) continue; @@ -3729,7 +3732,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", xlogfname); set_ps_display(activitymsg); - restoredFromArchive = RestoreArchivedFile(path, xlogfname, "RECOVERYXLOG", wal_segment_size, @@ -4389,9 +4391,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode, while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) == XLREAD_NEED_DATA) { - if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess, + false /* wait for data if streaming */)) break; - } ReadRecPtr = xlogreader->ReadRecPtr; @@ -6633,6 +6635,12 @@ StartupXLOG(void) xlogreader->system_identifier = ControlFile->system_identifier; /* + * Set the WAL decode buffer size. This limits how far ahead we can read + * in the WAL. + */ + XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); + + /* * Allocate two page buffers dedicated to WAL consistency checks. We do * it this way, rather than just making static arrays, for two reasons: * (1) no need to waste the storage in most instantiations of the backend; @@ -7312,6 +7320,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetchState prefetch; PGRUsage ru0; pg_rusage_init(&ru0); @@ -7322,6 +7331,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", LSN_FORMAT_ARGS(ReadRecPtr)))); + /* Prepare to prefetch, if configured. */ + XLogPrefetchBegin(&prefetch, xlogreader); + /* * main redo apply loop */ @@ -7351,6 +7363,14 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* Perform WAL prefetching, if enabled. */ + while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, false, LOG, false, + true /* don't wait for streaming data */)) + break; + } + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7524,6 +7544,9 @@ StartupXLOG(void) */ if (AllowCascadeReplication()) WalSndWakeup(); + + /* Reset the prefetcher. */ + XLogPrefetchReconfigure(); } /* Exit loop if we reached inclusive recovery target */ @@ -7540,6 +7563,7 @@ StartupXLOG(void) /* * end of main redo apply loop */ + XLogPrefetchEnd(&prefetch); if (reachedRecoveryTarget) { @@ -12109,10 +12133,13 @@ CancelBackup(void) * and call XLogPageRead() again with the same arguments. This lets * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. + * + * If nowait is true, then return false immediately if the requested data isn't + * available yet. */ static bool XLogPageRead(XLogReaderState *state, - bool fetching_ckpt, int emode, bool randAccess) + bool fetching_ckpt, int emode, bool randAccess, bool nowait) { char *readBuf = state->readBuf; XLogRecPtr targetPagePtr = state->readPagePtr; @@ -12136,9 +12163,6 @@ XLogPageRead(XLogReaderState *state, /* * Request a restartpoint if we've replayed too much xlog since the * last one. - * - * XXX Why is this here? Move it to recovery loop, since it's based - * on replay position, not read position? */ if (bgwriterLaunched) { @@ -12163,6 +12187,12 @@ retry: (readSource == XLOG_FROM_STREAM && flushedUpto < targetPagePtr + reqLen)) { + if (nowait) + { + XLogReaderSetInputData(state, -1); + return false; + } + if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, randAccess, fetching_ckpt, targetRecPtr, state->seg.ws_segno)) @@ -12396,6 +12426,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ currentSource = XLOG_FROM_STREAM; startWalReceiver = true; + XLogPrefetchReconfigure(); break; case XLOG_FROM_STREAM: @@ -12651,6 +12682,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, else havedata = false; } + if (havedata) { /* |