aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c41
1 files changed, 37 insertions, 4 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e40a9b1ba7b..5423cf0a171 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
+
+ /*
+ * The kind assignment is done here and not in StartReplication()
+ * and StartLogicalReplication(). Indeed, the logical walsender
+ * needs to read WAL records (like snapshot of running
+ * transactions) during the slot creation. So it needs to be woken
+ * up based on its kind.
+ *
+ * The kind assignment could also be done in StartReplication(),
+ * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+ * seems better to set it on one place.
+ */
+ if (MyDatabaseId == InvalidOid)
+ walsnd->kind = REPLICATION_KIND_PHYSICAL;
+ else
+ walsnd->kind = REPLICATION_KIND_LOGICAL;
+
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
}
/*
- * Wake up all walsenders
+ * Wake up physical, logical or both kinds of walsenders
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical walsenders separately
+ * from logical walsenders (see the comment before calling WalSndWakeup() in
+ * ApplyWalRecord() for more details).
*
* This will be called inside critical sections, so throwing an error is not
* advisable.
*/
void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
Latch *latch;
+ ReplicationKind kind;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
/*
* Get latch pointer with spinlock held, for the unlikely case that
- * pointer reads aren't atomic (as they're 8 bytes).
+ * pointer reads aren't atomic (as they're 8 bytes). While at it, also
+ * get kind.
*/
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
+ kind = walsnd->kind;
SpinLockRelease(&walsnd->mutex);
- if (latch != NULL)
+ if (latch == NULL)
+ continue;
+
+ if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+ (logical && kind == REPLICATION_KIND_LOGICAL))
SetLatch(latch);
}
}