aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/launcher.c2
-rw-r--r--src/backend/replication/logical/worker.c24
-rw-r--r--src/backend/replication/walsender.c10
3 files changed, 34 insertions, 2 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 10677da56b2..1c3c051403d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1016,7 +1016,7 @@ logicalrep_launcher_attach_dshmem(void)
last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
dsa_pin_mapping(last_start_times_dsa);
last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
- LogicalRepCtx->last_start_dsh, 0);
+ LogicalRepCtx->last_start_dsh, NULL);
}
MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..a23262957ac 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4626,8 +4626,16 @@ run_apply_worker()
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so
+ * ensure we have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ PopActiveSnapshot();
CommitTransactionCommand();
}
else
@@ -4843,7 +4851,15 @@ DisableSubscriptionAndExit(void)
/* Disable the subscription */
StartTransactionCommand();
+
+ /*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
DisableSubscription(MySubscription->oid);
+ PopActiveSnapshot();
CommitTransactionCommand();
/* Ensure we remove no-longer-useful entry for worker's start time */
@@ -4948,6 +4964,12 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
}
/*
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /*
* Protect subskiplsn of pg_subscription from being concurrently updated
* while clearing it.
*/
@@ -5005,6 +5027,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
heap_freetuple(tup);
table_close(rel, NoLock);
+ PopActiveSnapshot();
+
if (started_tx)
CommitTransactionCommand();
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..f2c33250e8b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3449,8 +3449,16 @@ XLogSendLogical(void)
if (flushPtr == InvalidXLogRecPtr ||
logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
{
+ /*
+ * For cascading logical WAL senders, we use the replay LSN instead of
+ * the flush LSN, since logical decoding on a standby only processes
+ * WAL that has been replayed. This distinction becomes particularly
+ * important during shutdown, as new WAL is no longer replayed and the
+ * last replayed LSN marks the furthest point up to which decoding can
+ * proceed.
+ */
if (am_cascading_walsender)
- flushPtr = GetStandbyFlushRecPtr(NULL);
+ flushPtr = GetXLogReplayRecPtr(NULL);
else
flushPtr = GetFlushRecPtr(NULL);
}