aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c105
1 files changed, 76 insertions, 29 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9bfb3de15e7..2ef90497bb6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -28,12 +28,13 @@
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.29 2010/07/22 13:03:11 rhaas Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.30 2010/09/11 15:48:04 heikki Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <signal.h>
#include <unistd.h>
#include "access/xlog_internal.h"
@@ -66,9 +67,6 @@ bool am_walsender = false; /* Am I a walsender process ? */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
-#define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles
- * (100ms) */
-
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
@@ -93,6 +91,7 @@ static volatile sig_atomic_t ready_to_stop = false;
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);
+static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
@@ -144,6 +143,16 @@ WalSenderMain(void)
/* Handle handshake messages before streaming */
WalSndHandshake();
+ /* Initialize shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
/* Main loop of walsender */
return WalSndLoop();
}
@@ -380,8 +389,6 @@ WalSndLoop(void)
/* Loop forever, unless we get an error */
for (;;)
{
- long remain; /* remaining time (us) */
-
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
@@ -421,32 +428,42 @@ WalSndLoop(void)
/*
* If we had sent all accumulated WAL in last round, nap for the
* configured time before retrying.
- *
- * On some platforms, signals won't interrupt the sleep. To ensure we
- * respond reasonably promptly when someone signals us, break down the
- * sleep into NAPTIME_PER_CYCLE increments, and check for interrupts
- * after each nap.
*/
if (caughtup)
{
- remain = WalSndDelay * 1000L;
- while (remain > 0)
- {
- /* Check for interrupts */
- if (got_SIGHUP || shutdown_requested || ready_to_stop)
- break;
+ /*
+ * Even if we wrote all the WAL that was available when we started
+ * sending, more might have arrived while we were sending this
+ * batch. We had the latch set while sending, so we have not
+ * received any signals from that time. Let's arm the latch
+ * again, and after that check that we're still up-to-date.
+ */
+ ResetLatch(&MyWalSnd->latch);
- /* Sleep and check that the connection is still alive */
- pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
- CheckClosedConnection();
+ if (!XLogSend(output_message, &caughtup))
+ break;
+ if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+ {
+ /*
+ * XXX: We don't really need the periodic wakeups anymore,
+ * WaitLatchOrSocket should reliably wake up as soon as
+ * something interesting happens.
+ */
- remain -= NAPTIME_PER_CYCLE;
+ /* Sleep */
+ WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+ WalSndDelay);
}
- }
- /* Attempt to send the log once every loop */
- if (!XLogSend(output_message, &caughtup))
- break;
+ /* Check if the connection was closed */
+ CheckClosedConnection();
+ }
+ else
+ {
+ /* Attempt to send the log once every loop */
+ if (!XLogSend(output_message, &caughtup))
+ break;
+ }
}
/*
@@ -493,10 +510,15 @@ InitWalSnd(void)
}
else
{
- /* found */
- MyWalSnd = (WalSnd *) walsnd;
+ /*
+ * Found a free slot. Take ownership of the latch and initialize
+ * the other fields.
+ */
+ OwnLatch((Latch *) &walsnd->latch);
walsnd->pid = MyProcPid;
- MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+ /* Set MyWalSnd only after it's fully initialized. */
+ MyWalSnd = (WalSnd *) walsnd;
SpinLockRelease(&walsnd->mutex);
break;
}
@@ -523,6 +545,7 @@ WalSndKill(int code, Datum arg)
* for this.
*/
MyWalSnd->pid = 0;
+ DisownLatch(&MyWalSnd->latch);
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
@@ -787,6 +810,8 @@ static void
WalSndSigHupHandler(SIGNAL_ARGS)
{
got_SIGHUP = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/* SIGTERM: set flag to shut down */
@@ -794,6 +819,8 @@ static void
WalSndShutdownHandler(SIGNAL_ARGS)
{
shutdown_requested = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/*
@@ -828,11 +855,20 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
exit(2);
}
+/* SIGUSR1: set flag to send WAL records */
+static void
+WalSndXLogSendHandler(SIGNAL_ARGS)
+{
+ latch_sigusr1_handler();
+}
+
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
ready_to_stop = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/* Set up signal handlers */
@@ -847,7 +883,7 @@ WalSndSignals(void)
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN); /* not used */
+ pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
@@ -891,10 +927,21 @@ WalSndShmemInit(void)
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockInit(&walsnd->mutex);
+ InitSharedLatch(&walsnd->latch);
}
}
}
+/* Wake up all walsenders */
+void
+WalSndWakeup(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ SetLatch(&WalSndCtl->walsnds[i].latch);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the