aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c300
1 files changed, 123 insertions, 177 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cbc928501af..ac9209747a4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
+static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
/* Initialize walsender process before entering the main command loop */
@@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
{
XLogRecPtr flushptr;
int count;
+ WALReadError errinfo;
+ XLogSegNo segno;
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+ if (!WALRead(cur_page,
+ targetPagePtr,
+ XLOG_BLCKSZ,
+ sendSeg->ws_tli, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new
+ * TLI is needed. */
+ sendSeg,
+ sendCxt,
+ WalSndSegmentOpen,
+ &errinfo))
+ WALReadRaiseError(&errinfo);
+
+ /*
+ * After reading into the buffer, check that what we read was valid. We do
+ * this after reading, because even though the segment was present when we
+ * opened it, it might get recycled or removed while we read it. The
+ * read() succeeds in that case, but the data we tried to read might
+ * already have been overwritten with new WAL records.
+ */
+ XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+ CheckXLogRemoved(segno, sendSeg->ws_tli);
return count;
}
@@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg)
SpinLockRelease(&walsnd->mutex);
}
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+/* walsender's openSegment callback for WALRead */
+static int
+WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+ TimeLineID *tli_p)
{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
- XLogSegNo segno;
-
-retry:
- p = buf;
- recptr = startptr;
- nbytes = count;
+ char path[MAXPGPATH];
+ int fd;
- while (nbytes > 0)
+ /*-------
+ * When reading from a historic timeline, and there is a timeline switch
+ * within this segment, read from the WAL segment belonging to the new
+ * timeline.
+ *
+ * For example, imagine that this server is currently on timeline 5, and
+ * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+ * 0/13002088. In pg_wal, we have these files:
+ *
+ * ...
+ * 000000040000000000000012
+ * 000000040000000000000013
+ * 000000050000000000000013
+ * 000000050000000000000014
+ * ...
+ *
+ * In this situation, when requested to send the WAL from segment 0x13, on
+ * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+ * recovery prefers files from newer timelines, so if the segment was
+ * restored from the archive on this server, the file belonging to the old
+ * timeline, 000000040000000000000013, might not exist. Their contents are
+ * equal up to the switchpoint, because at a timeline switch, the used
+ * portion of the old segment is copied to the new file. -------
+ */
+ *tli_p = sendTimeLine;
+ if (sendTimeLineIsHistoric)
{
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
- if (sendSeg->ws_file < 0 ||
- !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
- {
- char path[MAXPGPATH];
-
- /* Switch to another logfile segment */
- if (sendSeg->ws_file >= 0)
- close(sendSeg->ws_file);
-
- XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
- /*-------
- * When reading from a historic timeline, and there is a timeline
- * switch within this segment, read from the WAL segment belonging
- * to the new timeline.
- *
- * For example, imagine that this server is currently on timeline
- * 5, and we're streaming timeline 4. The switch from timeline 4
- * to 5 happened at 0/13002088. In pg_wal, we have these files:
- *
- * ...
- * 000000040000000000000012
- * 000000040000000000000013
- * 000000050000000000000013
- * 000000050000000000000014
- * ...
- *
- * In this situation, when requested to send the WAL from
- * segment 0x13, on timeline 4, we read the WAL from file
- * 000000050000000000000013. Archive recovery prefers files from
- * newer timelines, so if the segment was restored from the
- * archive on this server, the file belonging to the old timeline,
- * 000000040000000000000013, might not exist. Their contents are
- * equal up to the switchpoint, because at a timeline switch, the
- * used portion of the old segment is copied to the new file.
- *-------
- */
- sendSeg->ws_tli = sendTimeLine;
- if (sendTimeLineIsHistoric)
- {
- XLogSegNo endSegNo;
-
- XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
- if (sendSeg->ws_segno == endSegNo)
- sendSeg->ws_tli = sendTimeLineNextTLI;
- }
-
- XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
- sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (sendSeg->ws_file < 0)
- {
- /*
- * If the file is not found, assume it's because the standby
- * asked for a too old WAL segment that has already been
- * removed or recycled.
- */
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendSeg->ws_off = 0;
- }
-
- /* Need to seek in the file? */
- if (sendSeg->ws_off != startoff)
- {
- if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
- startoff)));
- sendSeg->ws_off = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (segcxt->ws_segsize - startoff))
- segbytes = segcxt->ws_segsize - startoff;
- else
- segbytes = nbytes;
-
- pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendSeg->ws_file, p, segbytes);
- pgstat_report_wait_end();
- if (readbytes < 0)
- {
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %zu: %m",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
- sendSeg->ws_off, (Size) segbytes)));
- }
- else if (readbytes == 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
- sendSeg->ws_off, readbytes, (Size) segbytes)));
- }
+ XLogSegNo endSegNo;
- /* Update state for read */
- recptr += readbytes;
-
- sendSeg->ws_off += readbytes;
- nbytes -= readbytes;
- p += readbytes;
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+ if (sendSeg->ws_segno == endSegNo)
+ *tli_p = sendTimeLineNextTLI;
}
- /*
- * After reading into the buffer, check that what we read was valid. We do
- * this after reading, because even though the segment was present when we
- * opened it, it might get recycled or removed while we read it. The
- * read() succeeds in that case, but the data we tried to read might
- * already have been overwritten with new WAL records.
- */
- XLByteToSeg(startptr, segno, segcxt->ws_segsize);
- CheckXLogRemoved(segno, ThisTimeLineID);
+ XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+ fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (fd >= 0)
+ return fd;
/*
- * During recovery, the currently-open WAL file might be replaced with the
- * file of the same name retrieved from archive. So we always need to
- * check what we read was valid after reading into the buffer. If it's
- * invalid, we try to open and read the file again.
+ * If the file is not found, assume it's because the standby asked for a
+ * too old WAL segment that has already been removed or recycled.
*/
- if (am_cascading_walsender)
- {
- WalSnd *walsnd = MyWalSnd;
- bool reload;
-
- SpinLockAcquire(&walsnd->mutex);
- reload = walsnd->needreload;
- walsnd->needreload = false;
- SpinLockRelease(&walsnd->mutex);
-
- if (reload && sendSeg->ws_file >= 0)
- {
- close(sendSeg->ws_file);
- sendSeg->ws_file = -1;
-
- goto retry;
- }
- }
+ if (errno == ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ XLogFileNameP(*tli_p, nextSegNo))));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+ return -1; /* keep compiler quiet */
}
/*
@@ -2562,6 +2464,8 @@ XLogSendPhysical(void)
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
+ XLogSegNo segno;
+ WALReadError errinfo;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2777,7 +2681,49 @@ XLogSendPhysical(void)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+ if (!WALRead(&output_message.data[output_message.len],
+ startptr,
+ nbytes,
+ sendSeg->ws_tli, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new
+ * TLI is needed. */
+ sendSeg,
+ sendCxt,
+ WalSndSegmentOpen,
+ &errinfo))
+ WALReadRaiseError(&errinfo);
+
+ /* See logical_read_xlog_page(). */
+ XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+ CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with the
+ * file of the same name retrieved from archive. So we always need to
+ * check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendSeg->ws_file >= 0)
+ {
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
+
+ goto retry;
+ }
+ }
+
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';