diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 48 |
1 files changed, 32 insertions, 16 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5423cf0a171..45b8b3684f6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; - TimeLineID currTLI = GetWALInsertionTimeLine(); + TimeLineID currTLI; + + /* + * Make sure we have enough WAL available before retrieving the current + * timeline. This is needed to determine am_cascading_walsender accurately + * which is needed to determine the current timeline. + */ + flushptr = WalSndWaitForWal(targetPagePtr + reqLen); /* - * Since logical decoding is only permitted on a primary server, we know - * that the current timeline ID can't be changing any more. If we did this - * on a standby, we'd have to worry about the values we compute here - * becoming invalid due to a promotion or timeline change. + * Since logical decoding is also permitted on a standby server, we need + * to check if the server is in recovery to decide how to get the current + * timeline ID (so that it also cover the promotion or timeline change + * cases). */ + am_cascading_walsender = RecoveryInProgress(); + + if (am_cascading_walsender) + GetXLogReplayRecPtr(&currTLI); + else + currTLI = GetWALInsertionTimeLine(); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; - /* make sure we have enough WAL available */ - flushptr = WalSndWaitForWal(targetPagePtr + reqLen); - /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) return -1; @@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req cur_page, targetPagePtr, XLOG_BLCKSZ, - state->seg.ws_tli, /* Pass the current TLI because only - * WalSndSegmentOpen controls whether new - * TLI is needed. */ + currTLI, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new TLI + * is needed. */ &errinfo)) WALReadRaiseError(&errinfo); @@ -3076,10 +3087,14 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(NULL); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(NULL); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + if (am_cascading_walsender) + flushPtr = GetStandbyFlushRecPtr(NULL); + else + flushPtr = GetFlushRecPtr(NULL); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - *tli = replayTLI; + if (tli) + *tli = replayTLI; result = replayPtr; if (receiveTLI == replayTLI && receivePtr > replayPtr) |