diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/walreceiver.c | 2 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 41 |
2 files changed, 38 insertions, 5 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 685af51d5d3..feff7094351 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) - WalSndWakeup(); + WalSndWakeup(true, false); /* Report XLOG streaming progress in PS display */ if (update_process_title) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e40a9b1ba7b..5423cf0a171 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2603,6 +2603,23 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + /* + * The kind assignment is done here and not in StartReplication() + * and StartLogicalReplication(). Indeed, the logical walsender + * needs to read WAL records (like snapshot of running + * transactions) during the slot creation. So it needs to be woken + * up based on its kind. + * + * The kind assignment could also be done in StartReplication(), + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it + * seems better to set it on one place. + */ + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3280,30 +3297,46 @@ WalSndShmemInit(void) } /* - * Wake up all walsenders + * Wake up physical, logical or both kinds of walsenders + * + * The distinction between physical and logical walsenders is done, because: + * - physical walsenders can't send data until it's been flushed + * - logical walsenders on standby can't decode and send data until it's been + * applied + * + * For cascading replication we need to wake up physical walsenders separately + * from logical walsenders (see the comment before calling WalSndWakeup() in + * ApplyWalRecord() for more details). * * This will be called inside critical sections, so throwing an error is not * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; /* * Get latch pointer with spinlock held, for the unlikely case that - * pointer reads aren't atomic (as they're 8 bytes). + * pointer reads aren't atomic (as they're 8 bytes). While at it, also + * get kind. */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch == NULL) + continue; + + if ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL)) SetLatch(latch); } } |