aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c188
1 files changed, 75 insertions, 113 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index adfc6f67e29..c1d4415a433 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -35,7 +35,6 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xloginsert.h"
-#include "access/xlogprefetch.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
@@ -111,7 +110,6 @@ int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000;
int max_slot_wal_keep_size_mb = -1;
-int wal_decode_buffer_size = 512 * 1024;
bool track_wal_io_timing = false;
#ifdef WAL_DEBUG
@@ -813,13 +811,17 @@ static XLogSegNo openLogSegNo = 0;
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
- * will be just past that page. readSource indicates where we got the
- * currently open file from.
+ * will be just past that page. readLen indicates how much of the current
+ * page has been read into readBuf, and readSource indicates where we got
+ * the currently open file from.
* Note: we could use Reserve/ReleaseExternalFD to track consumption of
* this FD too; but it doesn't currently seem worthwhile, since the XLOG is
* not read by general-purpose sessions.
*/
static int readFile = -1;
+static XLogSegNo readSegNo = 0;
+static uint32 readOff = 0;
+static uint32 readLen = 0;
static XLogSource readSource = XLOG_FROM_ANY;
/*
@@ -836,6 +838,13 @@ static XLogSource currentSource = XLOG_FROM_ANY;
static bool lastSourceFailed = false;
static bool pendingWalRcvRestart = false;
+typedef struct XLogPageReadPrivate
+{
+ int emode;
+ bool fetching_ckpt; /* are we fetching a checkpoint record? */
+ bool randAccess;
+} XLogPageReadPrivate;
+
/*
* These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as
@@ -911,13 +920,10 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
-static bool XLogPageRead(XLogReaderState *state,
- bool fetching_ckpt, int emode, bool randAccess,
- bool nowait);
+static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt,
- XLogRecPtr tliRecPtr,
- XLogSegNo readSegNo);
+ bool fetching_ckpt, XLogRecPtr tliRecPtr);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
static void XLogFileClose(void);
static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -1212,7 +1218,6 @@ XLogInsertRecord(XLogRecData *rdata,
StringInfoData recordBuf;
char *errormsg = NULL;
MemoryContext oldCxt;
- DecodedXLogRecord *decoded;
oldCxt = MemoryContextSwitchTo(walDebugCxt);
@@ -1228,19 +1233,15 @@ XLogInsertRecord(XLogRecData *rdata,
for (; rdata != NULL; rdata = rdata->next)
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
- /* How much space would it take to decode this record? */
- decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len));
-
if (!debug_reader)
- debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+ debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(), NULL);
if (!debug_reader)
{
appendStringInfoString(&buf, "error decoding record: out of memory");
}
- else if (!DecodeXLogRecord(debug_reader, decoded,
- (XLogRecord *) recordBuf.data,
- EndPos,
+ else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
&errormsg))
{
appendStringInfo(&buf, "error decoding record: %s",
@@ -1249,17 +1250,10 @@ XLogInsertRecord(XLogRecData *rdata,
else
{
appendStringInfoString(&buf, " - ");
- /*
- * Temporarily make this decoded record the current record for
- * XLogRecGetXXX() macros.
- */
- debug_reader->record = decoded;
xlog_outdesc(&buf, debug_reader);
- debug_reader->record = NULL;
}
elog(LOG, "%s", buf.data);
- pfree(decoded);
pfree(buf.data);
pfree(recordBuf.data);
MemoryContextSwitchTo(oldCxt);
@@ -1433,7 +1427,7 @@ checkXLogConsistency(XLogReaderState *record)
Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
- for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
Buffer buf;
Page page;
@@ -1464,7 +1458,7 @@ checkXLogConsistency(XLogReaderState *record)
* temporary page.
*/
buf = XLogReadBufferExtended(rnode, forknum, blkno,
- RBM_NORMAL_NO_LOG, InvalidBuffer);
+ RBM_NORMAL_NO_LOG);
if (!BufferIsValid(buf))
continue;
@@ -3732,6 +3726,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
xlogfname);
set_ps_display(activitymsg);
+
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG",
wal_segment_size,
@@ -4378,7 +4373,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
bool fetching_ckpt)
{
XLogRecord *record;
- bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
+ XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+
+ /* Pass through parameters to XLogPageRead */
+ private->fetching_ckpt = fetching_ckpt;
+ private->emode = emode;
+ private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
/* This is the first attempt to read this page. */
lastSourceFailed = false;
@@ -4386,19 +4386,10 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
for (;;)
{
char *errormsg;
- XLogReadRecordResult result;
-
- while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
- == XLREAD_NEED_DATA)
- {
- if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess,
- false /* wait for data if streaming */))
- break;
- }
+ record = XLogReadRecord(xlogreader, &errormsg);
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
-
if (record == NULL)
{
if (readFile >= 0)
@@ -6466,6 +6457,7 @@ StartupXLOG(void)
bool backupFromStandby = false;
DBState dbstate_at_startup;
XLogReaderState *xlogreader;
+ XLogPageReadPrivate private;
bool promoted = false;
struct stat st;
@@ -6624,9 +6616,13 @@ StartupXLOG(void)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
/* Set up XLOG reader facility */
+ MemSet(&private, 0, sizeof(XLogPageReadPrivate));
xlogreader =
- XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
-
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -6635,12 +6631,6 @@ StartupXLOG(void)
xlogreader->system_identifier = ControlFile->system_identifier;
/*
- * Set the WAL decode buffer size. This limits how far ahead we can read
- * in the WAL.
- */
- XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
-
- /*
* Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons:
* (1) no need to waste the storage in most instantiations of the backend;
@@ -7320,7 +7310,6 @@ StartupXLOG(void)
{
ErrorContextCallback errcallback;
TimestampTz xtime;
- XLogPrefetchState prefetch;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -7331,9 +7320,6 @@ StartupXLOG(void)
(errmsg("redo starts at %X/%X",
LSN_FORMAT_ARGS(ReadRecPtr))));
- /* Prepare to prefetch, if configured. */
- XLogPrefetchBegin(&prefetch, xlogreader);
-
/*
* main redo apply loop
*/
@@ -7363,14 +7349,6 @@ StartupXLOG(void)
/* Handle interrupt signals of startup process */
HandleStartupProcInterrupts();
- /* Perform WAL prefetching, if enabled. */
- while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA)
- {
- if (!XLogPageRead(xlogreader, false, LOG, false,
- true /* don't wait for streaming data */))
- break;
- }
-
/*
* Pause WAL replay, if requested by a hot-standby session via
* SetRecoveryPause().
@@ -7544,9 +7522,6 @@ StartupXLOG(void)
*/
if (AllowCascadeReplication())
WalSndWakeup();
-
- /* Reset the prefetcher. */
- XLogPrefetchReconfigure();
}
/* Exit loop if we reached inclusive recovery target */
@@ -7563,7 +7538,6 @@ StartupXLOG(void)
/*
* end of main redo apply loop
*/
- XLogPrefetchEnd(&prefetch);
if (reachedRecoveryTarget)
{
@@ -7845,8 +7819,7 @@ StartupXLOG(void)
XLogRecPtr pageBeginPtr;
pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
- Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) ==
- XLogSegmentOffset(pageBeginPtr, wal_segment_size));
+ Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size));
firstIdx = XLogRecPtrToBufIdx(EndOfLog);
@@ -10338,7 +10311,7 @@ xlog_redo(XLogReaderState *record)
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
* code just to distinguish them for statistics purposes.
*/
- for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
+ for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++)
{
Buffer buffer;
@@ -10473,7 +10446,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record)
int block_id;
/* decode block references */
- for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
RelFileNode rnode;
ForkNumber forknum;
@@ -12133,19 +12106,14 @@ CancelBackup(void)
* and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
- *
- * If nowait is true, then return false immediately if the requested data isn't
- * available yet.
*/
-static bool
-XLogPageRead(XLogReaderState *state,
- bool fetching_ckpt, int emode, bool randAccess, bool nowait)
-{
- char *readBuf = state->readBuf;
- XLogRecPtr targetPagePtr = state->readPagePtr;
- int reqLen = state->reqLen;
- int readLen = 0;
- XLogRecPtr targetRecPtr = state->DecodeRecPtr;
+static int
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *readBuf)
+{
+ XLogPageReadPrivate *private =
+ (XLogPageReadPrivate *) xlogreader->private_data;
+ int emode = private->emode;
uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
int r;
@@ -12158,7 +12126,7 @@ XLogPageRead(XLogReaderState *state,
* is not in the currently open one.
*/
if (readFile >= 0 &&
- !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size))
+ !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size))
{
/*
* Request a restartpoint if we've replayed too much xlog since the
@@ -12166,10 +12134,10 @@ XLogPageRead(XLogReaderState *state,
*/
if (bgwriterLaunched)
{
- if (XLogCheckpointNeeded(state->seg.ws_segno))
+ if (XLogCheckpointNeeded(readSegNo))
{
(void) GetRedoRecPtr();
- if (XLogCheckpointNeeded(state->seg.ws_segno))
+ if (XLogCheckpointNeeded(readSegNo))
RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
}
}
@@ -12179,7 +12147,7 @@ XLogPageRead(XLogReaderState *state,
readSource = XLOG_FROM_ANY;
}
- XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size);
+ XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size);
retry:
/* See if we need to retrieve more data */
@@ -12187,22 +12155,18 @@ retry:
(readSource == XLOG_FROM_STREAM &&
flushedUpto < targetPagePtr + reqLen))
{
- if (nowait)
- {
- XLogReaderSetInputData(state, -1);
- return false;
- }
-
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
- randAccess, fetching_ckpt,
- targetRecPtr, state->seg.ws_segno))
+ private->randAccess,
+ private->fetching_ckpt,
+ targetRecPtr))
{
if (readFile >= 0)
close(readFile);
readFile = -1;
+ readLen = 0;
readSource = XLOG_FROM_ANY;
- XLogReaderSetInputData(state, -1);
- return false;
+
+ return -1;
}
}
@@ -12229,36 +12193,40 @@ retry:
else
readLen = XLOG_BLCKSZ;
+ /* Read the requested page */
+ readOff = targetPageOff;
+
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff);
+ r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
if (r != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
int save_errno = errno;
pgstat_report_wait_end();
- XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size);
+ XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
if (r < 0)
{
errno = save_errno;
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
- fname, targetPageOff)));
+ fname, readOff)));
}
else
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- fname, targetPageOff, r, (Size) XLOG_BLCKSZ)));
+ fname, readOff, r, (Size) XLOG_BLCKSZ)));
goto next_record_is_invalid;
}
pgstat_report_wait_end();
- Assert(targetSegNo == state->seg.ws_segno);
- Assert(readLen >= reqLen);
+ Assert(targetSegNo == readSegNo);
+ Assert(targetPageOff == readOff);
+ Assert(reqLen <= readLen);
- state->seg.ws_tli = curFileTLI;
+ xlogreader->seg.ws_tli = curFileTLI;
/*
* Check the page header immediately, so that we can retry immediately if
@@ -12286,16 +12254,14 @@ retry:
* Validating the page header is cheap enough that doing it twice
* shouldn't be a big deal from a performance point of view.
*/
- if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf))
+ if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf))
{
- /* reset any error StateValidatePageHeader() might have set */
- state->errormsg_buf[0] = '\0';
+ /* reset any error XLogReaderValidatePageHeader() might have set */
+ xlogreader->errormsg_buf[0] = '\0';
goto next_record_is_invalid;
}
- Assert(state->readPagePtr == targetPagePtr);
- XLogReaderSetInputData(state, readLen);
- return true;
+ return readLen;
next_record_is_invalid:
lastSourceFailed = true;
@@ -12303,14 +12269,14 @@ next_record_is_invalid:
if (readFile >= 0)
close(readFile);
readFile = -1;
+ readLen = 0;
readSource = XLOG_FROM_ANY;
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
-
- XLogReaderSetInputData(state, -1);
- return false;
+ else
+ return -1;
}
/*
@@ -12341,8 +12307,7 @@ next_record_is_invalid:
*/
static bool
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt, XLogRecPtr tliRecPtr,
- XLogSegNo readSegNo)
+ bool fetching_ckpt, XLogRecPtr tliRecPtr)
{
static TimestampTz last_fail_time = 0;
TimestampTz now;
@@ -12426,7 +12391,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
currentSource = XLOG_FROM_STREAM;
startWalReceiver = true;
- XLogPrefetchReconfigure();
break;
case XLOG_FROM_STREAM:
@@ -12661,7 +12625,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* be updated on each cycle. When we are behind,
* XLogReceiptTime will not advance, so the grace time
* allotted to conflicting queries will decrease.
- *
*/
if (RecPtr < flushedUpto)
havedata = true;
@@ -12682,7 +12645,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
else
havedata = false;
}
-
if (havedata)
{
/*