diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2019-11-25 15:04:54 -0300 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2019-11-25 15:04:54 -0300 |
commit | 0dc8ead46363fec6f621a12c7e1f889ba73b55a9 (patch) | |
tree | ae1eb1d20970c69eeac44e57a44f4315b5223ea0 /src/backend/access/transam | |
parent | 5883f5fe27d7b52c812dd0f8cbda67373a14c451 (diff) | |
download | postgresql-0dc8ead46363fec6f621a12c7e1f889ba73b55a9.tar.gz postgresql-0dc8ead46363fec6f621a12c7e1f889ba73b55a9.zip |
Refactor WAL file-reading code into WALRead()
XLogReader, walsender and pg_waldump all had their own routines to read
data from WAL files to memory, with slightly different approaches
according to the particular conditions of each environment. There's a
lot of commonality, so we can refactor that into a single routine
WALRead in XLogReader, and move the differences to a separate (simpler)
callback that just opens the next WAL-segment. This results in a
clearer (ahem) code flow.
The error reporting needs are covered by filling in a new error-info
struct, WALReadError, and it's the caller's responsibility to act on it.
The backend has WALReadRaiseError() to do so.
We no longer ever need to seek in this interface; switch to using
pg_pread().
Author: Antonin Houska, with contributions from Álvaro Herrera
Reviewed-by: Michaël Paquier, Kyotaro Horiguchi
Discussion: https://postgr.es/m/14984.1554998742@spoje.net
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 106 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 205 |
2 files changed, 171 insertions, 140 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7f24f0cb95f..67418b05f15 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include <unistd.h> + #include "access/transam.h" #include "access/xlog_internal.h" #include "access/xlogreader.h" @@ -27,6 +29,7 @@ #ifndef FRONTEND #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -208,7 +211,6 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, { seg->ws_file = -1; seg->ws_segno = 0; - seg->ws_off = 0; seg->ws_tli = 0; segcxt->ws_segsize = segsize; @@ -295,8 +297,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ - readOff = ReadPageInternal(state, - targetPagePtr, + readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); if (readOff < 0) goto err; @@ -556,7 +557,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* check whether we have all the requested data already */ if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->seg.ws_off && reqLen <= state->readLen) + targetPageOff == state->segoff && reqLen <= state->readLen) return state->readLen; /* @@ -627,7 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* update read state information */ state->seg.ws_segno = targetSegNo; - state->seg.ws_off = targetPageOff; + state->segoff = targetPageOff; state->readLen = readLen; return readLen; @@ -644,7 +645,7 @@ static void XLogReaderInvalReadState(XLogReaderState *state) { state->seg.ws_segno = 0; - state->seg.ws_off = 0; + state->segoff = 0; state->readLen = 0; } @@ -1015,6 +1016,99 @@ out: #endif /* FRONTEND */ +/* + * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL + * fetched from timeline 'tli'. + * + * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback + * to open the next segment, if necessary. + * + * Returns true if succeeded, false if an error occurs, in which case + * 'errinfo' receives error details. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + */ +bool +WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, + WALOpenSegment *seg, WALSegmentContext *segcxt, + WALSegmentOpen openSegment, WALReadError *errinfo) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + + /* + * If the data we want is not in a segment we have open, close what we + * have (if anything) and open the next one, using the caller's + * provided openSegment callback. + */ + if (seg->ws_file < 0 || + !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || + tli != seg->ws_tli) + { + XLogSegNo nextSegNo; + + if (seg->ws_file >= 0) + close(seg->ws_file); + + XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); + seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + + /* Update the current segment info. */ + seg->ws_tli = tli; + seg->ws_segno = nextSegNo; + } + + /* How many bytes are within this segment? */ + if (nbytes > (segcxt->ws_segsize - startoff)) + segbytes = segcxt->ws_segsize - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* Reset errno first; eases reporting non-errno-affecting errors */ + errno = 0; + readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + errinfo->wre_errno = errno; + errinfo->wre_req = segbytes; + errinfo->wre_read = readbytes; + errinfo->wre_off = startoff; + errinfo->wre_seg = *seg; + return false; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + } + + return true; +} + /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 5f1e5ba75d5..446760ed6e7 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -640,128 +640,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, } /* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * in timeline 'tli'. - * - * Will open, and keep open, one WAL segment stored in the static file - * descriptor 'sendFile'. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - * - * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead - * in walsender.c but for small differences (such as lack of elog() in - * frontend). Probably these should be merged at some point. - */ -static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - /* state maintained across calls */ - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; - static uint32 sendOff = 0; - - Assert(segsize == wal_segment_size); - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segsize); - - /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) || - sendTLI != tli) - { - char path[MAXPGPATH]; - - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, segsize); - - XLogFilePath(path, tli, sendSegNo, segsize); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - sendTLI = tli; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segsize - startoff)) - segbytes = segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - -/* * Determine which timeline to read an xlog page from and set the * XLogReaderState's currTLI to that timeline ID. * @@ -802,8 +680,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->seg.ws_segno * - state->segcxt.ws_segsize + state->seg.ws_off; + const XLogRecPtr lastReadPage = (state->seg.ws_segno * + state->segcxt.ws_segsize + state->segoff); Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -896,6 +774,34 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* openSegment callback for WALRead */ +static int +wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int fd; + + XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; + + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + + return -1; /* keep compiler quiet */ +} + /* * read_page callback for reading local xlog files * @@ -913,7 +819,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, { XLogRecPtr read_upto, loc; + TimeLineID tli; int count; + WALReadError errinfo; loc = targetPagePtr + reqLen; @@ -932,7 +840,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - state->seg.ws_tli = ThisTimeLineID; + tli = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -982,14 +890,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = state->currTLIValidUntil; /* - * Setting ws_tli to our wanted record's TLI is slightly wrong; - * the page might begin on an older timeline if it contains a - * timeline switch, since its xlog segment will have been copied - * from the prior timeline. This is pretty harmless though, as - * nothing cares so long as the timeline doesn't go backwards. We - * should read the page header instead; FIXME someday. + * Setting tli to our wanted record's TLI is slightly wrong; the + * page might begin on an older timeline if it contains a timeline + * switch, since its xlog segment will have been copied from the + * prior timeline. This is pretty harmless though, as nothing + * cares so long as the timeline doesn't go backwards. We should + * read the page header instead; FIXME someday. */ - state->seg.ws_tli = state->currTLI; + tli = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1020,9 +928,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr, - XLOG_BLCKSZ); + if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, + &state->segcxt, wal_segment_open, &errinfo)) + WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ return count; } + +/* + * Backend-specific convenience code to handle read errors encountered by + * WALRead(). + */ +void +WALReadRaiseError(WALReadError *errinfo) +{ + WALOpenSegment *seg = &errinfo->wre_seg; + char *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno); + + if (errinfo->wre_read < 0) + { + errno = errinfo->wre_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u: %m", + fname, errinfo->wre_off))); + } + else if (errinfo->wre_read == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: read %d of %zu", + fname, errinfo->wre_off, errinfo->wre_read, + (Size) errinfo->wre_req))); + } +} |