aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c50
1 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 729fc5ff13c..adfc6f67e29 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -35,6 +35,7 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xloginsert.h"
+#include "access/xlogprefetch.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
@@ -110,6 +111,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000;
int max_slot_wal_keep_size_mb = -1;
+int wal_decode_buffer_size = 512 * 1024;
bool track_wal_io_timing = false;
#ifdef WAL_DEBUG
@@ -910,7 +912,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
static bool XLogPageRead(XLogReaderState *state,
- bool fetching_ckpt, int emode, bool randAccess);
+ bool fetching_ckpt, int emode, bool randAccess,
+ bool nowait);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt,
XLogRecPtr tliRecPtr,
@@ -1461,7 +1464,7 @@ checkXLogConsistency(XLogReaderState *record)
* temporary page.
*/
buf = XLogReadBufferExtended(rnode, forknum, blkno,
- RBM_NORMAL_NO_LOG);
+ RBM_NORMAL_NO_LOG, InvalidBuffer);
if (!BufferIsValid(buf))
continue;
@@ -3729,7 +3732,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
xlogfname);
set_ps_display(activitymsg);
-
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG",
wal_segment_size,
@@ -4389,9 +4391,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
== XLREAD_NEED_DATA)
{
- if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
+ if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess,
+ false /* wait for data if streaming */))
break;
-
}
ReadRecPtr = xlogreader->ReadRecPtr;
@@ -6633,6 +6635,12 @@ StartupXLOG(void)
xlogreader->system_identifier = ControlFile->system_identifier;
/*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /*
* Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons:
* (1) no need to waste the storage in most instantiations of the backend;
@@ -7312,6 +7320,7 @@ StartupXLOG(void)
{
ErrorContextCallback errcallback;
TimestampTz xtime;
+ XLogPrefetchState prefetch;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -7322,6 +7331,9 @@ StartupXLOG(void)
(errmsg("redo starts at %X/%X",
LSN_FORMAT_ARGS(ReadRecPtr))));
+ /* Prepare to prefetch, if configured. */
+ XLogPrefetchBegin(&prefetch, xlogreader);
+
/*
* main redo apply loop
*/
@@ -7351,6 +7363,14 @@ StartupXLOG(void)
/* Handle interrupt signals of startup process */
HandleStartupProcInterrupts();
+ /* Perform WAL prefetching, if enabled. */
+ while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA)
+ {
+ if (!XLogPageRead(xlogreader, false, LOG, false,
+ true /* don't wait for streaming data */))
+ break;
+ }
+
/*
* Pause WAL replay, if requested by a hot-standby session via
* SetRecoveryPause().
@@ -7524,6 +7544,9 @@ StartupXLOG(void)
*/
if (AllowCascadeReplication())
WalSndWakeup();
+
+ /* Reset the prefetcher. */
+ XLogPrefetchReconfigure();
}
/* Exit loop if we reached inclusive recovery target */
@@ -7540,6 +7563,7 @@ StartupXLOG(void)
/*
* end of main redo apply loop
*/
+ XLogPrefetchEnd(&prefetch);
if (reachedRecoveryTarget)
{
@@ -12109,10 +12133,13 @@ CancelBackup(void)
* and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
+ *
+ * If nowait is true, then return false immediately if the requested data isn't
+ * available yet.
*/
static bool
XLogPageRead(XLogReaderState *state,
- bool fetching_ckpt, int emode, bool randAccess)
+ bool fetching_ckpt, int emode, bool randAccess, bool nowait)
{
char *readBuf = state->readBuf;
XLogRecPtr targetPagePtr = state->readPagePtr;
@@ -12136,9 +12163,6 @@ XLogPageRead(XLogReaderState *state,
/*
* Request a restartpoint if we've replayed too much xlog since the
* last one.
- *
- * XXX Why is this here? Move it to recovery loop, since it's based
- * on replay position, not read position?
*/
if (bgwriterLaunched)
{
@@ -12163,6 +12187,12 @@ retry:
(readSource == XLOG_FROM_STREAM &&
flushedUpto < targetPagePtr + reqLen))
{
+ if (nowait)
+ {
+ XLogReaderSetInputData(state, -1);
+ return false;
+ }
+
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
randAccess, fetching_ckpt,
targetRecPtr, state->seg.ws_segno))
@@ -12396,6 +12426,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
currentSource = XLOG_FROM_STREAM;
startWalReceiver = true;
+ XLogPrefetchReconfigure();
break;
case XLOG_FROM_STREAM:
@@ -12651,6 +12682,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
else
havedata = false;
}
+
if (havedata)
{
/*