diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/twophase.c | 5 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 17 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 73 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 30 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 2 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 102 |
7 files changed, 125 insertions, 108 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 477709bbc23..546bd43ce8b 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) * * Note clearly that this function can access WAL during normal operation, * similarly to the way WALSender or Logical Decoding would do. - * */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page, - NULL); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + &read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 501f46fd52d..6c69eb6dd76 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -885,8 +885,7 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *readTLI); + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, + NULL, NULL); if (!debug_reader) { @@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size); offset = XLogSegmentOffset(xlogreader->latestPagePtr, wal_segment_size); - XLogFileName(fname, xlogreader->readPageTLI, segno, + XLogFileName(fname, xlogreader->seg.ws_tli, segno, wal_segment_size); ereport(emode_for_corrupt_record(emode, RecPtr ? RecPtr : EndRecPtr), @@ -6353,7 +6353,8 @@ StartupXLOG(void) /* Set up XLOG reader facility */ MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + &XLogPageRead, &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -7355,7 +7356,7 @@ StartupXLOG(void) * and we were reading the old WAL from a segment belonging to a higher * timeline. */ - EndOfLogTLI = xlogreader->readPageTLI; + EndOfLogTLI = xlogreader->seg.ws_tli; /* * Complain if we did not roll forward far enough to render the backup @@ -11523,7 +11524,7 @@ CancelBackup(void) */ static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI) + XLogRecPtr targetRecPtr, char *readBuf) { XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; @@ -11640,7 +11641,7 @@ retry: Assert(targetPageOff == readOff); Assert(reqLen <= readLen); - *readTLI = curFileTLI; + xlogreader->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index a66e3324b11..27c27303d6c 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) * Returns NULL if the xlogreader couldn't be allocated. */ XLogReaderState * -XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, - void *private_data) +XLogReaderAllocate(int wal_segment_size, const char *waldir, + XLogPageReadCB pagereadfunc, void *private_data) { XLogReaderState *state; @@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, return NULL; } - state->wal_segment_size = wal_segment_size; + /* Initialize segment info. */ + WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, + waldir); + state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; @@ -199,6 +202,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) } /* + * Initialize the passed segment structs. + */ +void +WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, + int segsize, const char *waldir) +{ + seg->ws_file = -1; + seg->ws_segno = 0; + seg->ws_off = 0; + seg->ws_tli = 0; + + segcxt->ws_segsize = segsize; + if (waldir) + snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir); +} + +/* * Attempt to read an XLOG record. * * If RecPtr is valid, try to read a record at that position. Otherwise @@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->wal_segment_size - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size); + state->EndRecPtr += state->segcxt.ws_segsize - 1; + state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } if (DecodeXLogRecord(state, record, errormsg)) @@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) Assert((pageptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); - targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); /* check whether we have all the requested data already */ - if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && - reqLen <= state->readLen) + if (targetSegNo == state->seg.ws_segno && + targetPageOff == state->seg.ws_off && reqLen <= state->readLen) return state->readLen; /* @@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. */ - if (targetSegNo != state->readSegNo && targetPageOff != 0) + if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; @@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) */ readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; @@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; } @@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) goto err; /* update read state information */ - state->readSegNo = targetSegNo; - state->readOff = targetPageOff; + state->seg.ws_segno = targetSegNo; + state->seg.ws_off = targetPageOff; state->readLen = readLen; return readLen; @@ -625,8 +645,8 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->readSegNo = 0; - state->readOff = 0; + state->seg.ws_segno = 0; + state->seg.ws_off = 0; state->readLen = 0; } @@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, Assert((recptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(recptr, segno, state->wal_segment_size); - offset = XLogSegmentOffset(recptr, state->wal_segment_size); + XLByteToSeg(recptr, segno, state->segcxt.ws_segsize); + offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); - XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr); + XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr); if (hdr->xlp_magic != XLOG_PAGE_MAGIC) { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "invalid magic number %04X in log segment %s, offset %u", @@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "invalid info bits %04X in log segment %s, offset %u", @@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, (unsigned long long) state->system_identifier); return false; } - else if (longhdr->xlp_seg_size != state->wal_segment_size) + else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize) { report_invalid_record(state, "WAL file is from different database system: incorrect segment size in page header"); @@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, @@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "unexpected pageaddr %X/%X in log segment %s, offset %u", @@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", @@ -997,7 +1017,6 @@ out: #endif /* FRONTEND */ - /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 1fc39333f15..5f1e5ba75d5 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->readSegNo * - state->wal_segment_size + state->readOff; + const XLogRecPtr lastReadPage = state->seg.ws_segno * + state->segcxt.ws_segsize + state->seg.ws_off; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa if (state->currTLIValidUntil != InvalidXLogRecPtr && state->currTLI != ThisTimeLineID && state->currTLI != 0 && - ((wantPage + wantLength) / state->wal_segment_size) < - (state->currTLIValidUntil / state->wal_segment_size)) + ((wantPage + wantLength) / state->segcxt.ws_segsize) < + (state->currTLIValidUntil / state->segcxt.ws_segsize)) return; /* @@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * by a promotion or replay from a cascaded replica. */ List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + XLogRecPtr endOfSegment; - XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1) - * state->wal_segment_size) - 1; - - Assert(wantPage / state->wal_segment_size == - endOfSegment / state->wal_segment_size); + endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) * + state->segcxt.ws_segsize - 1; + Assert(wantPage / state->segcxt.ws_segsize == + endOfSegment / state->segcxt.ws_segsize); /* * Find the timeline of the last LSN on the segment containing @@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa */ int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr read_upto, loc; @@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - - *pageTLI = ThisTimeLineID; + state->seg.ws_tli = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = state->currTLIValidUntil; /* - * Setting pageTLI to our wanted record's TLI is slightly wrong; + * Setting ws_tli to our wanted record's TLI is slightly wrong; * the page might begin on an older timeline if it contains a * timeline switch, since its xlog segment will have been copied * from the prior timeline. This is pretty harmless though, as * nothing cares so long as the timeline doesn't go backwards. We * should read the page header instead; FIXME someday. */ - *pageTLI = state->currTLI; + state->seg.ws_tli = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr, + XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr, XLOG_BLCKSZ); /* number of valid bytes in the buffer */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f8b9020081e..da265f52940 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index d974400d6ef..d1cf80d4417 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -116,10 +116,10 @@ check_permissions(void) int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { return read_local_xlog_page(state, targetPagePtr, reqLen, - targetRecPtr, cur_page, pageTLI); + targetRecPtr, cur_page); } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 23870a25a56..eb4a98cc912 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -128,16 +128,8 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -/* - * These variables are used similarly to openLogFile/SegNo/Off, - * but for walsender to read the XLOG. - */ -static int sendFile = -1; -static XLogSegNo sendSegNo = 0; -static uint32 sendOff = 0; - -/* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static WALOpenSegment *sendSeg = NULL; +static WALSegmentContext *sendCxt = NULL; /* * These variables keep track of the state of the timeline we're currently @@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); /* Initialize walsender process before entering the main command loop */ @@ -285,6 +277,13 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); + + /* Make sure we can remember the current read position in XLOG. */ + sendSeg = (WALOpenSegment *) + MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); + sendCxt = (WALSegmentContext *) + MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext)); + WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL); } /* @@ -301,10 +300,10 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendFile >= 0) + if (sendSeg->ws_file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->ws_file); + sendSeg->ws_file = -1; } if (MyReplicationSlot != NULL) @@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd) */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr flushptr; int count; @@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ); return count; } @@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg) * more than one. */ static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -2382,17 +2381,18 @@ retry: int segbytes; int readbytes; - startoff = XLogSegmentOffset(recptr, wal_segment_size); + startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size)) + if (sendSeg->ws_file < 0 || + !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize)) { char path[MAXPGPATH]; /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); + if (sendSeg->ws_file >= 0) + close(sendSeg->ws_file); - XLByteToSeg(recptr, sendSegNo, wal_segment_size); + XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize); /*------- * When reading from a historic timeline, and there is a timeline @@ -2420,20 +2420,20 @@ retry: * used portion of the old segment is copied to the new file. *------- */ - curFileTimeLine = sendTimeLine; + sendSeg->ws_tli = sendTimeLine; if (sendTimeLineIsHistoric) { XLogSegNo endSegNo; - XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); - if (sendSegNo == endSegNo) - curFileTimeLine = sendTimeLineNextTLI; + XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + if (sendSeg->ws_segno == endSegNo) + sendSeg->ws_tli = sendTimeLineNextTLI; } - XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size); + XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize); - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendFile < 0) + sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (sendSeg->ws_file < 0) { /* * If the file is not found, assume it's because the standby @@ -2444,58 +2444,58 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno)))); else ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); } - sendOff = 0; + sendSeg->ws_off = 0; } /* Need to seek in the file? */ - if (sendOff != startoff) + if (sendSeg->ws_off != startoff) { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), startoff))); - sendOff = startoff; + sendSeg->ws_off = startoff; } /* How many bytes are within this segment? */ - if (nbytes > (wal_segment_size - startoff)) - segbytes = wal_segment_size - startoff; + if (nbytes > (segcxt->ws_segsize - startoff)) + segbytes = segcxt->ws_segsize - startoff; else segbytes = nbytes; pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); + readbytes = read(sendSeg->ws_file, p, segbytes); pgstat_report_wait_end(); if (readbytes < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (Size) segbytes))); + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), + sendSeg->ws_off, (Size) segbytes))); } else if (readbytes == 0) { ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, readbytes, (Size) segbytes))); + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), + sendSeg->ws_off, readbytes, (Size) segbytes))); } /* Update state for read */ recptr += readbytes; - sendOff += readbytes; + sendSeg->ws_off += readbytes; nbytes -= readbytes; p += readbytes; } @@ -2507,7 +2507,7 @@ retry: * read() succeeds in that case, but the data we tried to read might * already have been overwritten with new WAL records. */ - XLByteToSeg(startptr, segno, wal_segment_size); + XLByteToSeg(startptr, segno, segcxt->ws_segsize); CheckXLogRemoved(segno, ThisTimeLineID); /* @@ -2526,10 +2526,10 @@ retry: walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); - if (reload && sendFile >= 0) + if (reload && sendSeg->ws_file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->ws_file); + sendSeg->ws_file = -1; goto retry; } @@ -2695,9 +2695,9 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendFile >= 0) - close(sendFile); - sendFile = -1; + if (sendSeg->ws_file >= 0) + close(sendSeg->ws_file); + sendSeg->ws_file = -1; /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); @@ -2768,7 +2768,7 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes); output_message.len += nbytes; output_message.data[output_message.len] = '\0'; |