diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 41 |
1 files changed, 37 insertions, 4 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e40a9b1ba7b..5423cf0a171 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2603,6 +2603,23 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + /* + * The kind assignment is done here and not in StartReplication() + * and StartLogicalReplication(). Indeed, the logical walsender + * needs to read WAL records (like snapshot of running + * transactions) during the slot creation. So it needs to be woken + * up based on its kind. + * + * The kind assignment could also be done in StartReplication(), + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it + * seems better to set it on one place. + */ + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3280,30 +3297,46 @@ WalSndShmemInit(void) } /* - * Wake up all walsenders + * Wake up physical, logical or both kinds of walsenders + * + * The distinction between physical and logical walsenders is done, because: + * - physical walsenders can't send data until it's been flushed + * - logical walsenders on standby can't decode and send data until it's been + * applied + * + * For cascading replication we need to wake up physical walsenders separately + * from logical walsenders (see the comment before calling WalSndWakeup() in + * ApplyWalRecord() for more details). * * This will be called inside critical sections, so throwing an error is not * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; /* * Get latch pointer with spinlock held, for the unlikely case that - * pointer reads aren't atomic (as they're 8 bytes). + * pointer reads aren't atomic (as they're 8 bytes). While at it, also + * get kind. */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch == NULL) + continue; + + if ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL)) SetLatch(latch); } } |