diff options
author | Andres Freund <andres@anarazel.de> | 2023-04-08 02:20:01 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2023-04-08 02:20:05 -0700 |
commit | 0fdab27ad68a059a1663fa5ce48d76333f1bd74c (patch) | |
tree | 3d29650901130428712f71cbb8b57cb19f738f1a /src/backend/replication/walsender.c | |
parent | e101dfac3a53c20bfbf1ca85d30a368c2954facf (diff) | |
download | postgresql-0fdab27ad68a059a1663fa5ce48d76333f1bd74c.tar.gz postgresql-0fdab27ad68a059a1663fa5ce48d76333f1bd74c.zip |
Allow logical decoding on standbys
Unsurprisingly, this requires wal_level = logical to be set on the primary and
standby. The infrastructure added in 26669757b6a ensures that slots are
invalidated if the primary's wal_level is lowered.
Creating a slot on a standby waits for a xl_running_xact record to be
processed. If the primary is idle (and thus not emitting xl_running_xact
records), that can take a while. To make that faster, this commit also
introduces the pg_log_standby_snapshot() function. By executing it on the
primary, completion of slot creation on the standby can be accelerated.
Note that logical decoding on a standby does not itself enforce that required
catalog rows are not removed. The user has to use physical replication slots +
hot_standby_feedback or other measures to prevent that. If catalog rows
required for a slot are removed, the slot is invalidated.
See 6af1793954e for an overall design of logical decoding on a standby.
Bumps catversion, for the addition of the pg_log_standby_snapshot() function.
Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Author: Andres Freund <andres@anarazel.de> (in an older version)
Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: FabrÃŒzio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-By: Robert Haas <robertmhaas@gmail.com>
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 48 |
1 files changed, 32 insertions, 16 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5423cf0a171..45b8b3684f6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; - TimeLineID currTLI = GetWALInsertionTimeLine(); + TimeLineID currTLI; + + /* + * Make sure we have enough WAL available before retrieving the current + * timeline. This is needed to determine am_cascading_walsender accurately + * which is needed to determine the current timeline. + */ + flushptr = WalSndWaitForWal(targetPagePtr + reqLen); /* - * Since logical decoding is only permitted on a primary server, we know - * that the current timeline ID can't be changing any more. If we did this - * on a standby, we'd have to worry about the values we compute here - * becoming invalid due to a promotion or timeline change. + * Since logical decoding is also permitted on a standby server, we need + * to check if the server is in recovery to decide how to get the current + * timeline ID (so that it also cover the promotion or timeline change + * cases). */ + am_cascading_walsender = RecoveryInProgress(); + + if (am_cascading_walsender) + GetXLogReplayRecPtr(&currTLI); + else + currTLI = GetWALInsertionTimeLine(); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; - /* make sure we have enough WAL available */ - flushptr = WalSndWaitForWal(targetPagePtr + reqLen); - /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) return -1; @@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req cur_page, targetPagePtr, XLOG_BLCKSZ, - state->seg.ws_tli, /* Pass the current TLI because only - * WalSndSegmentOpen controls whether new - * TLI is needed. */ + currTLI, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new TLI + * is needed. */ &errinfo)) WALReadRaiseError(&errinfo); @@ -3076,10 +3087,14 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(NULL); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(NULL); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + if (am_cascading_walsender) + flushPtr = GetStandbyFlushRecPtr(NULL); + else + flushPtr = GetFlushRecPtr(NULL); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - *tli = replayTLI; + if (tli) + *tli = replayTLI; result = replayPtr; if (receiveTLI == replayTLI && receivePtr > replayPtr) |