diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 126 |
1 files changed, 57 insertions, 69 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c1d4415a433..7faac01bf24 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readLen indicates how much of the current - * page has been read into readBuf, and readSource indicates where we got - * the currently open file from. + * will be just past that page. readSource indicates where we got the + * currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; -static XLogSegNo readSegNo = 0; -static uint32 readOff = 0; -static uint32 readLen = 0; static XLogSource readSource = XLOG_FROM_ANY; /* @@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; -typedef struct XLogPageReadPrivate -{ - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr); + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + XLogSegNo readSegNo); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(), NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); if (!debug_reader) { @@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - - /* Pass through parameters to XLogPageRead */ - private->fetching_ckpt = fetching_ckpt; - private->emode = emode; - private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; + XLogReadRecordResult result; + + while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) + == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + break; + + } - record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -6457,7 +6450,6 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; - XLogPageReadPrivate private; bool promoted = false; struct stat st; @@ -6616,13 +6608,9 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ - MemSet(&private, 0, sizeof(XLogPageReadPrivate)); xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.page_read = &XLogPageRead, - .segment_open = NULL, - .segment_close = wal_segment_close), - &private); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); + if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -7819,7 +7807,8 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == + XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -12107,13 +12096,15 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int -XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf) -{ - XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; - int emode = private->emode; +static bool +XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess) +{ + char *readBuf = state->readBuf; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + int readLen = 0; + XLogRecPtr targetRecPtr = state->ReadRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) + !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); + XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -12156,17 +12147,15 @@ retry: flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, - targetRecPtr)) + randAccess, fetching_ckpt, + targetRecPtr, state->seg.ws_segno)) { if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; - - return -1; + XLogReaderSetInputData(state, -1); + return false; } } @@ -12193,40 +12182,36 @@ retry: else readLen = XLOG_BLCKSZ; - /* Read the requested page */ - readOff = targetPageOff; - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); + XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, targetPageOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, readOff, r, (Size) XLOG_BLCKSZ))); + fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == readSegNo); - Assert(targetPageOff == readOff); - Assert(reqLen <= readLen); + Assert(targetSegNo == state->seg.ws_segno); + Assert(readLen >= reqLen); - xlogreader->seg.ws_tli = curFileTLI; + state->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -12254,14 +12239,16 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) { - /* reset any error XLogReaderValidatePageHeader() might have set */ - xlogreader->errormsg_buf[0] = '\0'; + /* reset any error StateValidatePageHeader() might have set */ + state->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - return readLen; + Assert(state->readPagePtr == targetPagePtr); + XLogReaderSetInputData(state, readLen); + return true; next_record_is_invalid: lastSourceFailed = true; @@ -12269,14 +12256,14 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + XLogReaderSetInputData(state, -1); + return false; } /* @@ -12307,7 +12294,8 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr) + bool fetching_ckpt, XLogRecPtr tliRecPtr, + XLogSegNo readSegNo) { static TimestampTz last_fail_time = 0; TimestampTz now; |