aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2023-04-08 02:20:01 -0700
committerAndres Freund <andres@anarazel.de>2023-04-08 02:20:05 -0700
commit0fdab27ad68a059a1663fa5ce48d76333f1bd74c (patch)
tree3d29650901130428712f71cbb8b57cb19f738f1a /src/backend/replication/walsender.c
parente101dfac3a53c20bfbf1ca85d30a368c2954facf (diff)
downloadpostgresql-0fdab27ad68a059a1663fa5ce48d76333f1bd74c.tar.gz
postgresql-0fdab27ad68a059a1663fa5ce48d76333f1bd74c.zip
Allow logical decoding on standbys
Unsurprisingly, this requires wal_level = logical to be set on the primary and standby. The infrastructure added in 26669757b6a ensures that slots are invalidated if the primary's wal_level is lowered. Creating a slot on a standby waits for a xl_running_xact record to be processed. If the primary is idle (and thus not emitting xl_running_xact records), that can take a while. To make that faster, this commit also introduces the pg_log_standby_snapshot() function. By executing it on the primary, completion of slot creation on the standby can be accelerated. Note that logical decoding on a standby does not itself enforce that required catalog rows are not removed. The user has to use physical replication slots + hot_standby_feedback or other measures to prevent that. If catalog rows required for a slot are removed, the slot is invalidated. See 6af1793954e for an overall design of logical decoding on a standby. Bumps catversion, for the addition of the pg_log_standby_snapshot() function. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> (in an older version) Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: FabrÃŒzio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com>
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c48
1 files changed, 32 insertions, 16 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5423cf0a171..45b8b3684f6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
- TimeLineID currTLI = GetWALInsertionTimeLine();
+ TimeLineID currTLI;
+
+ /*
+ * Make sure we have enough WAL available before retrieving the current
+ * timeline. This is needed to determine am_cascading_walsender accurately
+ * which is needed to determine the current timeline.
+ */
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
/*
- * Since logical decoding is only permitted on a primary server, we know
- * that the current timeline ID can't be changing any more. If we did this
- * on a standby, we'd have to worry about the values we compute here
- * becoming invalid due to a promotion or timeline change.
+ * Since logical decoding is also permitted on a standby server, we need
+ * to check if the server is in recovery to decide how to get the current
+ * timeline ID (so that it also cover the promotion or timeline change
+ * cases).
*/
+ am_cascading_walsender = RecoveryInProgress();
+
+ if (am_cascading_walsender)
+ GetXLogReplayRecPtr(&currTLI);
+ else
+ currTLI = GetWALInsertionTimeLine();
+
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
- /* make sure we have enough WAL available */
- flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
- state->seg.ws_tli, /* Pass the current TLI because only
- * WalSndSegmentOpen controls whether new
- * TLI is needed. */
+ currTLI, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new TLI
+ * is needed. */
&errinfo))
WALReadRaiseError(&errinfo);
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr. Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr(NULL);
- else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr(NULL);
+ if (flushPtr == InvalidXLogRecPtr ||
+ logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ {
+ if (am_cascading_walsender)
+ flushPtr = GetStandbyFlushRecPtr(NULL);
+ else
+ flushPtr = GetFlushRecPtr(NULL);
+ }
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- *tli = replayTLI;
+ if (tli)
+ *tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)