diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 300 |
1 files changed, 123 insertions, 177 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cbc928501af..ac9209747a4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); +static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); -static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); /* Initialize walsender process before entering the main command loop */ @@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req { XLogRecPtr flushptr; int count; + WALReadError errinfo; + XLogSegNo segno; XLogReadDetermineTimeline(state, targetPagePtr, reqLen); sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); @@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ); + if (!WALRead(cur_page, + targetPagePtr, + XLOG_BLCKSZ, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + WALReadRaiseError(&errinfo); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); return count; } @@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. - * - * Will open, and keep open, one WAL segment stored in the global file - * descriptor sendFile. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - */ -static void -XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) +/* walsender's openSegment callback for WALRead */ +static int +WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) { - char *p; - XLogRecPtr recptr; - Size nbytes; - XLogSegNo segno; - -retry: - p = buf; - recptr = startptr; - nbytes = count; + char path[MAXPGPATH]; + int fd; - while (nbytes > 0) + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + *tli_p = sendTimeLine; + if (sendTimeLineIsHistoric) { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); - - if (sendSeg->ws_file < 0 || - !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendSeg->ws_file >= 0) - close(sendSeg->ws_file); - - XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize); - - /*------- - * When reading from a historic timeline, and there is a timeline - * switch within this segment, read from the WAL segment belonging - * to the new timeline. - * - * For example, imagine that this server is currently on timeline - * 5, and we're streaming timeline 4. The switch from timeline 4 - * to 5 happened at 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from - * segment 0x13, on timeline 4, we read the WAL from file - * 000000050000000000000013. Archive recovery prefers files from - * newer timelines, so if the segment was restored from the - * archive on this server, the file belonging to the old timeline, - * 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the - * used portion of the old segment is copied to the new file. - *------- - */ - sendSeg->ws_tli = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); - if (sendSeg->ws_segno == endSegNo) - sendSeg->ws_tli = sendTimeLineNextTLI; - } - - XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize); - - sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendSeg->ws_file < 0) - { - /* - * If the file is not found, assume it's because the standby - * asked for a too old WAL segment that has already been - * removed or recycled. - */ - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno)))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendSeg->ws_off = 0; - } - - /* Need to seek in the file? */ - if (sendSeg->ws_off != startoff) - { - if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - startoff))); - sendSeg->ws_off = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendSeg->ws_file, p, segbytes); - pgstat_report_wait_end(); - if (readbytes < 0) - { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, (Size) segbytes))); - } - else if (readbytes == 0) - { - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, readbytes, (Size) segbytes))); - } + XLogSegNo endSegNo; - /* Update state for read */ - recptr += readbytes; - - sendSeg->ws_off += readbytes; - nbytes -= readbytes; - p += readbytes; + XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + if (sendSeg->ws_segno == endSegNo) + *tli_p = sendTimeLineNextTLI; } - /* - * After reading into the buffer, check that what we read was valid. We do - * this after reading, because even though the segment was present when we - * opened it, it might get recycled or removed while we read it. The - * read() succeeds in that case, but the data we tried to read might - * already have been overwritten with new WAL records. - */ - XLByteToSeg(startptr, segno, segcxt->ws_segsize); - CheckXLogRemoved(segno, ThisTimeLineID); + XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. + * If the file is not found, assume it's because the standby asked for a + * too old WAL segment that has already been removed or recycled. */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; - - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); - - if (reload && sendSeg->ws_file >= 0) - { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; - - goto retry; - } - } + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(*tli_p, nextSegNo)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + return -1; /* keep compiler quiet */ } /* @@ -2562,6 +2464,8 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; + WALReadError errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2777,7 +2681,49 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes); + +retry: + if (!WALRead(&output_message.data[output_message.len], + startptr, + nbytes, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + WALReadRaiseError(&errinfo); + + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendSeg->ws_file >= 0) + { + close(sendSeg->ws_file); + sendSeg->ws_file = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; |