diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 188 |
1 files changed, 75 insertions, 113 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index adfc6f67e29..c1d4415a433 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -35,7 +35,6 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" -#include "access/xlogprefetch.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -111,7 +110,6 @@ int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; -int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; #ifdef WAL_DEBUG @@ -813,13 +811,17 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readSource indicates where we got the - * currently open file from. + * will be just past that page. readLen indicates how much of the current + * page has been read into readBuf, and readSource indicates where we got + * the currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; +static XLogSegNo readSegNo = 0; +static uint32 readOff = 0; +static uint32 readLen = 0; static XLogSource readSource = XLOG_FROM_ANY; /* @@ -836,6 +838,13 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; +} XLogPageReadPrivate; + /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -911,13 +920,10 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static bool XLogPageRead(XLogReaderState *state, - bool fetching_ckpt, int emode, bool randAccess, - bool nowait); +static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, - XLogRecPtr tliRecPtr, - XLogSegNo readSegNo); + bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -1212,7 +1218,6 @@ XLogInsertRecord(XLogRecData *rdata, StringInfoData recordBuf; char *errormsg = NULL; MemoryContext oldCxt; - DecodedXLogRecord *decoded; oldCxt = MemoryContextSwitchTo(walDebugCxt); @@ -1228,19 +1233,15 @@ XLogInsertRecord(XLogRecData *rdata, for (; rdata != NULL; rdata = rdata->next) appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); - /* How much space would it take to decode this record? */ - decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len)); - if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(), NULL); if (!debug_reader) { appendStringInfoString(&buf, "error decoding record: out of memory"); } - else if (!DecodeXLogRecord(debug_reader, decoded, - (XLogRecord *) recordBuf.data, - EndPos, + else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data, &errormsg)) { appendStringInfo(&buf, "error decoding record: %s", @@ -1249,17 +1250,10 @@ XLogInsertRecord(XLogRecData *rdata, else { appendStringInfoString(&buf, " - "); - /* - * Temporarily make this decoded record the current record for - * XLogRecGetXXX() macros. - */ - debug_reader->record = decoded; xlog_outdesc(&buf, debug_reader); - debug_reader->record = NULL; } elog(LOG, "%s", buf.data); - pfree(decoded); pfree(buf.data); pfree(recordBuf.data); MemoryContextSwitchTo(oldCxt); @@ -1433,7 +1427,7 @@ checkXLogConsistency(XLogReaderState *record) Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0); - for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) + for (block_id = 0; block_id <= record->max_block_id; block_id++) { Buffer buf; Page page; @@ -1464,7 +1458,7 @@ checkXLogConsistency(XLogReaderState *record) * temporary page. */ buf = XLogReadBufferExtended(rnode, forknum, blkno, - RBM_NORMAL_NO_LOG, InvalidBuffer); + RBM_NORMAL_NO_LOG); if (!BufferIsValid(buf)) continue; @@ -3732,6 +3726,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", xlogfname); set_ps_display(activitymsg); + restoredFromArchive = RestoreArchivedFile(path, xlogfname, "RECOVERYXLOG", wal_segment_size, @@ -4378,7 +4373,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; + + /* Pass through parameters to XLogPageRead */ + private->fetching_ckpt = fetching_ckpt; + private->emode = emode; + private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4386,19 +4386,10 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; - XLogReadRecordResult result; - - while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) - == XLREAD_NEED_DATA) - { - if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess, - false /* wait for data if streaming */)) - break; - } + record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; - if (record == NULL) { if (readFile >= 0) @@ -6466,6 +6457,7 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; + XLogPageReadPrivate private; bool promoted = false; struct stat st; @@ -6624,9 +6616,13 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ + MemSet(&private, 0, sizeof(XLogPageReadPrivate)); xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); - + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -6635,12 +6631,6 @@ StartupXLOG(void) xlogreader->system_identifier = ControlFile->system_identifier; /* - * Set the WAL decode buffer size. This limits how far ahead we can read - * in the WAL. - */ - XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); - - /* * Allocate two page buffers dedicated to WAL consistency checks. We do * it this way, rather than just making static arrays, for two reasons: * (1) no need to waste the storage in most instantiations of the backend; @@ -7320,7 +7310,6 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; - XLogPrefetchState prefetch; PGRUsage ru0; pg_rusage_init(&ru0); @@ -7331,9 +7320,6 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", LSN_FORMAT_ARGS(ReadRecPtr)))); - /* Prepare to prefetch, if configured. */ - XLogPrefetchBegin(&prefetch, xlogreader); - /* * main redo apply loop */ @@ -7363,14 +7349,6 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); - /* Perform WAL prefetching, if enabled. */ - while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA) - { - if (!XLogPageRead(xlogreader, false, LOG, false, - true /* don't wait for streaming data */)) - break; - } - /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7544,9 +7522,6 @@ StartupXLOG(void) */ if (AllowCascadeReplication()) WalSndWakeup(); - - /* Reset the prefetcher. */ - XLogPrefetchReconfigure(); } /* Exit loop if we reached inclusive recovery target */ @@ -7563,7 +7538,6 @@ StartupXLOG(void) /* * end of main redo apply loop */ - XLogPrefetchEnd(&prefetch); if (reachedRecoveryTarget) { @@ -7845,8 +7819,7 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == - XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -10338,7 +10311,7 @@ xlog_redo(XLogReaderState *record) * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info * code just to distinguish them for statistics purposes. */ - for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) + for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++) { Buffer buffer; @@ -10473,7 +10446,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record) int block_id; /* decode block references */ - for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) + for (block_id = 0; block_id <= record->max_block_id; block_id++) { RelFileNode rnode; ForkNumber forknum; @@ -12133,19 +12106,14 @@ CancelBackup(void) * and call XLogPageRead() again with the same arguments. This lets * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. - * - * If nowait is true, then return false immediately if the requested data isn't - * available yet. */ -static bool -XLogPageRead(XLogReaderState *state, - bool fetching_ckpt, int emode, bool randAccess, bool nowait) -{ - char *readBuf = state->readBuf; - XLogRecPtr targetPagePtr = state->readPagePtr; - int reqLen = state->reqLen; - int readLen = 0; - XLogRecPtr targetRecPtr = state->DecodeRecPtr; +static int +XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf) +{ + XLogPageReadPrivate *private = + (XLogPageReadPrivate *) xlogreader->private_data; + int emode = private->emode; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12158,7 +12126,7 @@ XLogPageRead(XLogReaderState *state, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size)) + !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -12166,10 +12134,10 @@ XLogPageRead(XLogReaderState *state, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(state->seg.ws_segno)) + if (XLogCheckpointNeeded(readSegNo)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(state->seg.ws_segno)) + if (XLogCheckpointNeeded(readSegNo)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -12179,7 +12147,7 @@ XLogPageRead(XLogReaderState *state, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size); + XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -12187,22 +12155,18 @@ retry: (readSource == XLOG_FROM_STREAM && flushedUpto < targetPagePtr + reqLen)) { - if (nowait) - { - XLogReaderSetInputData(state, -1); - return false; - } - if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - randAccess, fetching_ckpt, - targetRecPtr, state->seg.ws_segno)) + private->randAccess, + private->fetching_ckpt, + targetRecPtr)) { if (readFile >= 0) close(readFile); readFile = -1; + readLen = 0; readSource = XLOG_FROM_ANY; - XLogReaderSetInputData(state, -1); - return false; + + return -1; } } @@ -12229,36 +12193,40 @@ retry: else readLen = XLOG_BLCKSZ; + /* Read the requested page */ + readOff = targetPageOff; + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size); + XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, targetPageOff))); + fname, readOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); + fname, readOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == state->seg.ws_segno); - Assert(readLen >= reqLen); + Assert(targetSegNo == readSegNo); + Assert(targetPageOff == readOff); + Assert(reqLen <= readLen); - state->seg.ws_tli = curFileTLI; + xlogreader->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -12286,16 +12254,14 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) { - /* reset any error StateValidatePageHeader() might have set */ - state->errormsg_buf[0] = '\0'; + /* reset any error XLogReaderValidatePageHeader() might have set */ + xlogreader->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - Assert(state->readPagePtr == targetPagePtr); - XLogReaderSetInputData(state, readLen); - return true; + return readLen; next_record_is_invalid: lastSourceFailed = true; @@ -12303,14 +12269,14 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; + readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - - XLogReaderSetInputData(state, -1); - return false; + else + return -1; } /* @@ -12341,8 +12307,7 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr, - XLogSegNo readSegNo) + bool fetching_ckpt, XLogRecPtr tliRecPtr) { static TimestampTz last_fail_time = 0; TimestampTz now; @@ -12426,7 +12391,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ currentSource = XLOG_FROM_STREAM; startWalReceiver = true; - XLogPrefetchReconfigure(); break; case XLOG_FROM_STREAM: @@ -12661,7 +12625,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * be updated on each cycle. When we are behind, * XLogReceiptTime will not advance, so the grace time * allotted to conflicting queries will decrease. - * */ if (RecPtr < flushedUpto) havedata = true; @@ -12682,7 +12645,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, else havedata = false; } - if (havedata) { /* |