aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2023-04-08 00:24:24 -0700
committerAndres Freund <andres@anarazel.de>2023-04-08 01:06:00 -0700
commite101dfac3a53c20bfbf1ca85d30a368c2954facf (patch)
tree1a5c755d9eb0a2f6f1d6251d9dd9f185654534cb /src/backend/replication/walsender.c
parent26669757b6a7665c1069e77e6472bd8550193ca6 (diff)
downloadpostgresql-e101dfac3a53c20bfbf1ca85d30a368c2954facf.tar.gz
postgresql-e101dfac3a53c20bfbf1ca85d30a368c2954facf.zip
For cascading replication, wake physical and logical walsenders separately
Physical walsenders can't send data until it's been flushed; logical walsenders can't decode and send data until it's been applied. On the standby, the WAL is flushed first, which will only wake up physical walsenders; and then applied, which will only wake up logical walsenders. Previously, all walsenders were awakened when the WAL was flushed. That was fine for logical walsenders on the primary; but on the standby the flushed WAL would have been not applied yet, so logical walsenders were awakened too early. Per idea from Jeff Davis and Amit Kapila. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-By: Jeff Davis <pgsql@j-davis.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c41
1 files changed, 37 insertions, 4 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e40a9b1ba7b..5423cf0a171 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
+
+ /*
+ * The kind assignment is done here and not in StartReplication()
+ * and StartLogicalReplication(). Indeed, the logical walsender
+ * needs to read WAL records (like snapshot of running
+ * transactions) during the slot creation. So it needs to be woken
+ * up based on its kind.
+ *
+ * The kind assignment could also be done in StartReplication(),
+ * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+ * seems better to set it on one place.
+ */
+ if (MyDatabaseId == InvalidOid)
+ walsnd->kind = REPLICATION_KIND_PHYSICAL;
+ else
+ walsnd->kind = REPLICATION_KIND_LOGICAL;
+
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
}
/*
- * Wake up all walsenders
+ * Wake up physical, logical or both kinds of walsenders
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical walsenders separately
+ * from logical walsenders (see the comment before calling WalSndWakeup() in
+ * ApplyWalRecord() for more details).
*
* This will be called inside critical sections, so throwing an error is not
* advisable.
*/
void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
Latch *latch;
+ ReplicationKind kind;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
/*
* Get latch pointer with spinlock held, for the unlikely case that
- * pointer reads aren't atomic (as they're 8 bytes).
+ * pointer reads aren't atomic (as they're 8 bytes). While at it, also
+ * get kind.
*/
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
+ kind = walsnd->kind;
SpinLockRelease(&walsnd->mutex);
- if (latch != NULL)
+ if (latch == NULL)
+ continue;
+
+ if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+ (logical && kind == REPLICATION_KIND_LOGICAL))
SetLatch(latch);
}
}