From 24c5f1a103ce6656a5cb430d9a996c34e61ab2a5 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 30 Mar 2016 20:07:05 -0300 Subject: Enable logical slots to follow timeline switches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When decoding from a logical slot, it's necessary for xlog reading to be able to read xlog from historical (i.e. not current) timelines; otherwise, decoding fails after failover, because the archives are in the historical timeline. This is required to make "failover logical slots" possible; it currently has no other use, although theoretically it could be used by an extension that creates a slot on a standby and continues to replay from the slot when the standby is promoted. This commit includes a module in src/test/modules with functions to manipulate the slots (which is not otherwise possible in SQL code) in order to enable testing, and a new test in src/test/recovery to ensure that the behavior is as expected. Author: Craig Ringer Reviewed-By: Oleksii Kliukin, Andres Freund, Petr JelĂ­nek --- src/backend/access/transam/xlogutils.c | 244 +++++++++++++++++++++++++++++---- 1 file changed, 221 insertions(+), 23 deletions(-) (limited to 'src/backend/access/transam/xlogutils.c') diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 2635d80dc0c..f6ca2b95e51 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,6 +19,7 @@ #include +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -659,6 +660,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; + static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -674,7 +676,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || + sendTLI != tli) { char path[MAXPGPATH]; @@ -701,6 +704,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + sendTLI = tli; } /* Need to seek in the file? */ @@ -748,6 +752,147 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } } +/* + * Determine XLogReaderState->currTLI and ->currTLIValidUntil; + * XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the + * decision. This may later be used to determine which xlog segment file to + * open, etc. + * + * We switch to an xlog segment from the new timeline eagerly when on a + * historical timeline, as soon as we reach the start of the xlog segment + * containing the timeline switch. The server copied the segment to the new + * timeline so all the data up to the switch point is the same, but there's no + * guarantee the old segment will still exist. It may have been deleted or + * renamed with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * Because of this, callers MAY NOT assume that currTLI is the timeline that + * will be in a page's xlp_tli; the page may begin on an older timeline or we + * might be reading from historical timeline data on a segment that's been + * copied to a new timeline. + */ +static void +XLogReadDetermineTimeline(XLogReaderState *state) +{ + /* Read the history on first time through */ + if (state->timelineHistory == NIL) + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); + + /* + * Are we reading the record immediately following the one we read last + * time? If not, then don't use the cached timeline info. + */ + if (state->currRecPtr != state->EndRecPtr) + { + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + /* + * Are we reading a timeline that used to be the latest one, but became + * historical? This can happen in a replica that gets promoted, and in a + * cascading replica whose upstream gets promoted. In either case, + * re-read the timeline history data. We cannot read past the timeline + * switch point, because either the records in the old timeline might be + * invalid, or worse, they may valid but *different* from the ones we + * should be reading. + */ + if (state->currTLIValidUntil == InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0) + { + /* re-read timeline history */ + list_free_deep(state->timelineHistory); + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); + + elog(DEBUG2, "timeline %u became historical during decoding", + state->currTLI); + + /* then invalidate the cached timeline info */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + /* + * Are we reading a record immediately following a timeline switch? If + * so, we must follow the switch too. + */ + if (state->currRecPtr == state->EndRecPtr && + state->currTLI != 0 && + state->currTLIValidUntil != InvalidXLogRecPtr && + state->currRecPtr >= state->currTLIValidUntil) + { + elog(DEBUG2, + "requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline", + (uint32) (state->currRecPtr >> 32), + (uint32) state->currRecPtr, + state->currTLI, + (uint32) (state->currTLIValidUntil >> 32), + (uint32) (state->currTLIValidUntil)); + + /* invalidate TLI info so we look up the next TLI */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + if (state->currTLI == 0) + { + /* + * Something changed; work out what timeline this record is on. We + * might read it from the segment on this TLI or, if the segment is + * also contained by newer timelines, the copy from a newer TLI. + */ + state->currTLI = tliOfPointInHistory(state->currRecPtr, + state->timelineHistory); + + /* + * Look for the most recent timeline that's on the same xlog segment + * as this record, since that's the only one we can assume is still + * readable. + */ + while (state->currTLI != ThisTimeLineID && + state->currTLIValidUntil == InvalidXLogRecPtr) + { + XLogRecPtr tliSwitch; + TimeLineID nextTLI; + + CHECK_FOR_INTERRUPTS(); + + tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory, + &nextTLI); + + /* round ValidUntil down to start of seg containing the switch */ + state->currTLIValidUntil = + ((tliSwitch / XLogSegSize) * XLogSegSize); + + if (state->currRecPtr >= state->currTLIValidUntil) + { + /* + * The new currTLI ends on this WAL segment so check the next + * TLI to see if it's the last one on the segment. + * + * If that's the current TLI we'll stop searching. + */ + state->currTLI = nextTLI; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + } + + /* + * We're now either reading from the first xlog segment in the current + * server's timeline or the most recent historical timeline that + * exists on the target segment. + */ + elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u", + (uint32) (state->currRecPtr >> 32), + (uint32) state->currRecPtr, + state->currTLI, + (uint32) (state->currTLIValidUntil >> 32), + (uint32) (state->currTLIValidUntil), + ThisTimeLineID); + } +} + /* * read_page callback for reading local xlog files * @@ -761,48 +906,101 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) */ int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page, + TimeLineID *pageTLI) { - XLogRecPtr flushptr, + XLogRecPtr read_upto, loc; int count; loc = targetPagePtr + reqLen; + + /* Make sure enough xlog is available... */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. */ - if (!RecoveryInProgress()) + XLogReadDetermineTimeline(state); + + if (state->currTLI == ThisTimeLineID) { - *pageTLI = ThisTimeLineID; - flushptr = GetFlushRecPtr(); + /* + * We're reading from the current timeline so we might have to + * wait for the desired record to be generated (or, for a standby, + * received & replayed) + */ + if (!RecoveryInProgress()) + { + *pageTLI = ThisTimeLineID; + read_upto = GetFlushRecPtr(); + } + else + read_upto = GetXLogReplayRecPtr(pageTLI); + + if (loc <= read_upto) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - flushptr = GetXLogReplayRecPtr(pageTLI); - - if (loc <= flushptr) + { + /* + * We're on a historical timeline, so limit reading to the switch + * point where we moved to the next timeline. + * + * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know + * about the new timeline, so we must've received past the end of + * it. + */ + read_upto = state->currTLIValidUntil; + + /* + * Setting pageTLI to our wanted record's TLI is slightly wrong; + * the page might begin on an older timeline if it contains a + * timeline switch, since its xlog segment will have been copied + * from the prior timeline. This is pretty harmless though, as + * nothing cares so long as the timeline doesn't go backwards. We + * should read the page header instead; FIXME someday. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } - /* more than one block available */ - if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + if (targetPagePtr + XLOG_BLCKSZ <= read_upto) + { + /* + * more than one block available; read only that block, have caller + * come back if they need more. + */ count = XLOG_BLCKSZ; - /* not enough data there */ - else if (targetPagePtr + reqLen > flushptr) + } + else if (targetPagePtr + reqLen > read_upto) + { + /* not enough data there */ return -1; - /* part of the page available */ + } else - count = flushptr - targetPagePtr; + { + /* enough bytes available to satisfy the request */ + count = read_upto - targetPagePtr; + } + /* + * Even though we just determined how much of the page can be validly read + * as 'count', read the whole page anyway. It's guaranteed to be + * zero-padded up to the page boundary if it's incomplete. + */ XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ); + /* number of valid bytes in the buffer */ return count; } -- cgit v1.2.3