aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c48
1 files changed, 32 insertions, 16 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5423cf0a171..45b8b3684f6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
- TimeLineID currTLI = GetWALInsertionTimeLine();
+ TimeLineID currTLI;
+
+ /*
+ * Make sure we have enough WAL available before retrieving the current
+ * timeline. This is needed to determine am_cascading_walsender accurately
+ * which is needed to determine the current timeline.
+ */
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
/*
- * Since logical decoding is only permitted on a primary server, we know
- * that the current timeline ID can't be changing any more. If we did this
- * on a standby, we'd have to worry about the values we compute here
- * becoming invalid due to a promotion or timeline change.
+ * Since logical decoding is also permitted on a standby server, we need
+ * to check if the server is in recovery to decide how to get the current
+ * timeline ID (so that it also cover the promotion or timeline change
+ * cases).
*/
+ am_cascading_walsender = RecoveryInProgress();
+
+ if (am_cascading_walsender)
+ GetXLogReplayRecPtr(&currTLI);
+ else
+ currTLI = GetWALInsertionTimeLine();
+
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
- /* make sure we have enough WAL available */
- flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
- state->seg.ws_tli, /* Pass the current TLI because only
- * WalSndSegmentOpen controls whether new
- * TLI is needed. */
+ currTLI, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new TLI
+ * is needed. */
&errinfo))
WALReadRaiseError(&errinfo);
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr. Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr(NULL);
- else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr(NULL);
+ if (flushPtr == InvalidXLogRecPtr ||
+ logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ {
+ if (am_cascading_walsender)
+ flushPtr = GetStandbyFlushRecPtr(NULL);
+ else
+ flushPtr = GetFlushRecPtr(NULL);
+ }
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- *tli = replayTLI;
+ if (tli)
+ *tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)