aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/basebackup.c10
-rw-r--r--src/backend/replication/walsender.c86
-rw-r--r--src/include/replication/walsender_private.h6
3 files changed, 55 insertions, 47 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 07030a2ef08..3058ce921b0 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1294,15 +1294,21 @@ throttle(size_t increment)
/* Only sleep if the transfer is faster than it should be. */
if (sleep > 0)
{
- ResetLatch(&MyWalSnd->latch);
+ ResetLatch(MyLatch);
+
+ /* We're eating a potentially set latch, so check for interrupts */
+ CHECK_FOR_INTERRUPTS();
/*
* (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
* the maximum time to sleep. Thus the cast to long is safe.
*/
- wait_result = WaitLatch(&MyWalSnd->latch,
+ wait_result = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
(long) (sleep / 1000));
+
+ if (wait_result & WL_LATCH_SET)
+ CHECK_FOR_INTERRUPTS();
}
else
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 86c36bf502c..05d2339b150 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1081,6 +1081,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
if (!PostmasterIsAlive())
exit(1);
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
@@ -1092,9 +1097,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
@@ -1117,15 +1119,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
/* Sleep until something happens or we time out */
- ImmediateInterruptOK = true;
- CHECK_FOR_INTERRUPTS();
- WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
- ImmediateInterruptOK = false;
}
/* reactivate latch so WalSndLoop knows to continue */
- SetLatch(&MyWalSnd->latch);
+ SetLatch(MyLatch);
}
/*
@@ -1165,6 +1164,11 @@ WalSndWaitForWal(XLogRecPtr loc)
if (!PostmasterIsAlive())
exit(1);
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
@@ -1176,9 +1180,6 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
/* Update our idea of the currently flushed position. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr();
@@ -1244,15 +1245,12 @@ WalSndWaitForWal(XLogRecPtr loc)
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
- ImmediateInterruptOK = true;
- CHECK_FOR_INTERRUPTS();
- WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
- ImmediateInterruptOK = false;
}
/* reactivate latch so WalSndLoop knows to continue */
- SetLatch(&MyWalSnd->latch);
+ SetLatch(MyLatch);
return RecentFlushPtr;
}
@@ -1813,6 +1811,11 @@ WalSndLoop(WalSndSendDataCallback send_data)
if (!PostmasterIsAlive())
exit(1);
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
@@ -1821,14 +1824,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
SyncRepInitConfig();
}
- CHECK_FOR_INTERRUPTS();
-
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
/*
* If we have received CopyDone from the client, sent CopyDone
* ourselves, and the output buffer is empty, it's time to exit
@@ -1912,11 +1910,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
- ImmediateInterruptOK = true;
- CHECK_FOR_INTERRUPTS();
- WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+ WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
- ImmediateInterruptOK = false;
}
}
return;
@@ -1959,9 +1954,9 @@ InitWalSenderSlot(void)
walsnd->pid = MyProcPid;
walsnd->sentPtr = InvalidXLogRecPtr;
walsnd->state = WALSNDSTATE_STARTUP;
+ walsnd->latch = &MyProc->procLatch;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
- OwnLatch((Latch *) &walsnd->latch);
MyWalSnd = (WalSnd *) walsnd;
break;
@@ -1986,19 +1981,14 @@ WalSndKill(int code, Datum arg)
Assert(walsnd != NULL);
- /*
- * Clear MyWalSnd first; then disown the latch. This is so that signal
- * handlers won't try to touch the latch after it's no longer ours.
- */
MyWalSnd = NULL;
- DisownLatch(&walsnd->latch);
-
- /*
- * Mark WalSnd struct no longer in use. Assume that no lock is required
- * for this.
- */
+ SpinLockAcquire(&walsnd->mutex);
+ /* clear latch while holding the spinlock, so it can safely be read */
+ walsnd->latch = NULL;
+ /* Mark WalSnd struct as no longer being in use. */
walsnd->pid = 0;
+ SpinLockRelease(&walsnd->mutex);
}
/*
@@ -2570,8 +2560,8 @@ WalSndSigHupHandler(SIGNAL_ARGS)
int save_errno = errno;
got_SIGHUP = true;
- if (MyWalSnd)
- SetLatch(&MyWalSnd->latch);
+
+ SetLatch(MyLatch);
errno = save_errno;
}
@@ -2603,8 +2593,7 @@ WalSndLastCycleHandler(SIGNAL_ARGS)
kill(MyProcPid, SIGTERM);
walsender_ready_to_stop = true;
- if (MyWalSnd)
- SetLatch(&MyWalSnd->latch);
+ SetLatch(MyLatch);
errno = save_errno;
}
@@ -2668,7 +2657,6 @@ WalSndShmemInit(void)
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockInit(&walsnd->mutex);
- InitSharedLatch(&walsnd->latch);
}
}
}
@@ -2685,7 +2673,21 @@ WalSndWakeup(void)
int i;
for (i = 0; i < max_wal_senders; i++)
- SetLatch(&WalSndCtl->walsnds[i].latch);
+ {
+ Latch *latch;
+ WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ /*
+ * Get latch pointer with spinlock held, for the unlikely case that
+ * pointer reads aren't atomic (as they're 8 bytes).
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ latch = walsnd->latch;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (latch != NULL)
+ SetLatch(latch);
+ }
}
/* Set state for current walsender (only called in walsender) */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index cc351d6f673..88677506f34 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -51,10 +51,10 @@ typedef struct WalSnd
slock_t mutex;
/*
- * Latch used by backends to wake up this walsender when it has work to
- * do.
+ * Pointer to the walsender's latch. Used by backends to wake up this
+ * walsender when it has work to do. NULL if the walsender isn't active.
*/
- Latch latch;
+ Latch *latch;
/*
* The priority order of the standby managed by this WALSender, as listed