aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c126
1 files changed, 57 insertions, 69 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c1d4415a433..7faac01bf24 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0;
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
- * will be just past that page. readLen indicates how much of the current
- * page has been read into readBuf, and readSource indicates where we got
- * the currently open file from.
+ * will be just past that page. readSource indicates where we got the
+ * currently open file from.
* Note: we could use Reserve/ReleaseExternalFD to track consumption of
* this FD too; but it doesn't currently seem worthwhile, since the XLOG is
* not read by general-purpose sessions.
*/
static int readFile = -1;
-static XLogSegNo readSegNo = 0;
-static uint32 readOff = 0;
-static uint32 readLen = 0;
static XLogSource readSource = XLOG_FROM_ANY;
/*
@@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY;
static bool lastSourceFailed = false;
static bool pendingWalRcvRestart = false;
-typedef struct XLogPageReadPrivate
-{
- int emode;
- bool fetching_ckpt; /* are we fetching a checkpoint record? */
- bool randAccess;
-} XLogPageReadPrivate;
-
/*
* These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as
@@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
-static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
+static bool XLogPageRead(XLogReaderState *state,
+ bool fetching_ckpt, int emode, bool randAccess);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt, XLogRecPtr tliRecPtr);
+ bool fetching_ckpt,
+ XLogRecPtr tliRecPtr,
+ XLogSegNo readSegNo);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
static void XLogFileClose(void);
static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata,
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
if (!debug_reader)
- debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
- XL_ROUTINE(), NULL);
+ debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
if (!debug_reader)
{
@@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
bool fetching_ckpt)
{
XLogRecord *record;
- XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
-
- /* Pass through parameters to XLogPageRead */
- private->fetching_ckpt = fetching_ckpt;
- private->emode = emode;
- private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
+ bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr);
/* This is the first attempt to read this page. */
lastSourceFailed = false;
@@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
for (;;)
{
char *errormsg;
+ XLogReadRecordResult result;
+
+ while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
+ == XLREAD_NEED_DATA)
+ {
+ if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
+ break;
+
+ }
- record = XLogReadRecord(xlogreader, &errormsg);
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
@@ -6457,7 +6450,6 @@ StartupXLOG(void)
bool backupFromStandby = false;
DBState dbstate_at_startup;
XLogReaderState *xlogreader;
- XLogPageReadPrivate private;
bool promoted = false;
struct stat st;
@@ -6616,13 +6608,9 @@ StartupXLOG(void)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
/* Set up XLOG reader facility */
- MemSet(&private, 0, sizeof(XLogPageReadPrivate));
xlogreader =
- XLogReaderAllocate(wal_segment_size, NULL,
- XL_ROUTINE(.page_read = &XLogPageRead,
- .segment_open = NULL,
- .segment_close = wal_segment_close),
- &private);
+ XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
+
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7819,7 +7807,8 @@ StartupXLOG(void)
XLogRecPtr pageBeginPtr;
pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
- Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size));
+ Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) ==
+ XLogSegmentOffset(pageBeginPtr, wal_segment_size));
firstIdx = XLogRecPtrToBufIdx(EndOfLog);
@@ -12107,13 +12096,15 @@ CancelBackup(void)
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static int
-XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *readBuf)
-{
- XLogPageReadPrivate *private =
- (XLogPageReadPrivate *) xlogreader->private_data;
- int emode = private->emode;
+static bool
+XLogPageRead(XLogReaderState *state,
+ bool fetching_ckpt, int emode, bool randAccess)
+{
+ char *readBuf = state->readBuf;
+ XLogRecPtr targetPagePtr = state->readPagePtr;
+ int reqLen = state->reqLen;
+ int readLen = 0;
+ XLogRecPtr targetRecPtr = state->ReadRecPtr;
uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
int r;
@@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
* is not in the currently open one.
*/
if (readFile >= 0 &&
- !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size))
+ !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size))
{
/*
* Request a restartpoint if we've replayed too much xlog since the
@@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
*/
if (bgwriterLaunched)
{
- if (XLogCheckpointNeeded(readSegNo))
+ if (XLogCheckpointNeeded(state->seg.ws_segno))
{
(void) GetRedoRecPtr();
- if (XLogCheckpointNeeded(readSegNo))
+ if (XLogCheckpointNeeded(state->seg.ws_segno))
RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
}
}
@@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
readSource = XLOG_FROM_ANY;
}
- XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size);
+ XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size);
retry:
/* See if we need to retrieve more data */
@@ -12156,17 +12147,15 @@ retry:
flushedUpto < targetPagePtr + reqLen))
{
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
- private->randAccess,
- private->fetching_ckpt,
- targetRecPtr))
+ randAccess, fetching_ckpt,
+ targetRecPtr, state->seg.ws_segno))
{
if (readFile >= 0)
close(readFile);
readFile = -1;
- readLen = 0;
readSource = XLOG_FROM_ANY;
-
- return -1;
+ XLogReaderSetInputData(state, -1);
+ return false;
}
}
@@ -12193,40 +12182,36 @@ retry:
else
readLen = XLOG_BLCKSZ;
- /* Read the requested page */
- readOff = targetPageOff;
-
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
+ r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff);
if (r != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
int save_errno = errno;
pgstat_report_wait_end();
- XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
+ XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size);
if (r < 0)
{
errno = save_errno;
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
- fname, readOff)));
+ fname, targetPageOff)));
}
else
ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- fname, readOff, r, (Size) XLOG_BLCKSZ)));
+ fname, targetPageOff, r, (Size) XLOG_BLCKSZ)));
goto next_record_is_invalid;
}
pgstat_report_wait_end();
- Assert(targetSegNo == readSegNo);
- Assert(targetPageOff == readOff);
- Assert(reqLen <= readLen);
+ Assert(targetSegNo == state->seg.ws_segno);
+ Assert(readLen >= reqLen);
- xlogreader->seg.ws_tli = curFileTLI;
+ state->seg.ws_tli = curFileTLI;
/*
* Check the page header immediately, so that we can retry immediately if
@@ -12254,14 +12239,16 @@ retry:
* Validating the page header is cheap enough that doing it twice
* shouldn't be a big deal from a performance point of view.
*/
- if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf))
+ if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf))
{
- /* reset any error XLogReaderValidatePageHeader() might have set */
- xlogreader->errormsg_buf[0] = '\0';
+ /* reset any error StateValidatePageHeader() might have set */
+ state->errormsg_buf[0] = '\0';
goto next_record_is_invalid;
}
- return readLen;
+ Assert(state->readPagePtr == targetPagePtr);
+ XLogReaderSetInputData(state, readLen);
+ return true;
next_record_is_invalid:
lastSourceFailed = true;
@@ -12269,14 +12256,14 @@ next_record_is_invalid:
if (readFile >= 0)
close(readFile);
readFile = -1;
- readLen = 0;
readSource = XLOG_FROM_ANY;
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
- else
- return -1;
+
+ XLogReaderSetInputData(state, -1);
+ return false;
}
/*
@@ -12307,7 +12294,8 @@ next_record_is_invalid:
*/
static bool
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt, XLogRecPtr tliRecPtr)
+ bool fetching_ckpt, XLogRecPtr tliRecPtr,
+ XLogSegNo readSegNo)
{
static TimestampTz last_fail_time = 0;
TimestampTz now;