diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 105 |
1 files changed, 76 insertions, 29 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9bfb3de15e7..2ef90497bb6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -28,12 +28,13 @@ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.29 2010/07/22 13:03:11 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.30 2010/09/11 15:48:04 heikki Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include <signal.h> #include <unistd.h> #include "access/xlog_internal.h" @@ -66,9 +67,6 @@ bool am_walsender = false; /* Am I a walsender process ? */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 200; /* max sleep time between some actions */ -#define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles - * (100ms) */ - /* * These variables are used similarly to openLogFile/Id/Seg/Off, * but for walsender to read the XLOG. @@ -93,6 +91,7 @@ static volatile sig_atomic_t ready_to_stop = false; static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndShutdownHandler(SIGNAL_ARGS); static void WalSndQuickDieHandler(SIGNAL_ARGS); +static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -144,6 +143,16 @@ WalSenderMain(void) /* Handle handshake messages before streaming */ WalSndHandshake(); + /* Initialize shared memory status */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + /* Main loop of walsender */ return WalSndLoop(); } @@ -380,8 +389,6 @@ WalSndLoop(void) /* Loop forever, unless we get an error */ for (;;) { - long remain; /* remaining time (us) */ - /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. @@ -421,32 +428,42 @@ WalSndLoop(void) /* * If we had sent all accumulated WAL in last round, nap for the * configured time before retrying. - * - * On some platforms, signals won't interrupt the sleep. To ensure we - * respond reasonably promptly when someone signals us, break down the - * sleep into NAPTIME_PER_CYCLE increments, and check for interrupts - * after each nap. */ if (caughtup) { - remain = WalSndDelay * 1000L; - while (remain > 0) - { - /* Check for interrupts */ - if (got_SIGHUP || shutdown_requested || ready_to_stop) - break; + /* + * Even if we wrote all the WAL that was available when we started + * sending, more might have arrived while we were sending this + * batch. We had the latch set while sending, so we have not + * received any signals from that time. Let's arm the latch + * again, and after that check that we're still up-to-date. + */ + ResetLatch(&MyWalSnd->latch); - /* Sleep and check that the connection is still alive */ - pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); - CheckClosedConnection(); + if (!XLogSend(output_message, &caughtup)) + break; + if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested) + { + /* + * XXX: We don't really need the periodic wakeups anymore, + * WaitLatchOrSocket should reliably wake up as soon as + * something interesting happens. + */ - remain -= NAPTIME_PER_CYCLE; + /* Sleep */ + WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, + WalSndDelay); } - } - /* Attempt to send the log once every loop */ - if (!XLogSend(output_message, &caughtup)) - break; + /* Check if the connection was closed */ + CheckClosedConnection(); + } + else + { + /* Attempt to send the log once every loop */ + if (!XLogSend(output_message, &caughtup)) + break; + } } /* @@ -493,10 +510,15 @@ InitWalSnd(void) } else { - /* found */ - MyWalSnd = (WalSnd *) walsnd; + /* + * Found a free slot. Take ownership of the latch and initialize + * the other fields. + */ + OwnLatch((Latch *) &walsnd->latch); walsnd->pid = MyProcPid; - MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr)); + MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr)); + /* Set MyWalSnd only after it's fully initialized. */ + MyWalSnd = (WalSnd *) walsnd; SpinLockRelease(&walsnd->mutex); break; } @@ -523,6 +545,7 @@ WalSndKill(int code, Datum arg) * for this. */ MyWalSnd->pid = 0; + DisownLatch(&MyWalSnd->latch); /* WalSnd struct isn't mine anymore */ MyWalSnd = NULL; @@ -787,6 +810,8 @@ static void WalSndSigHupHandler(SIGNAL_ARGS) { got_SIGHUP = true; + if (MyWalSnd) + SetLatch(&MyWalSnd->latch); } /* SIGTERM: set flag to shut down */ @@ -794,6 +819,8 @@ static void WalSndShutdownHandler(SIGNAL_ARGS) { shutdown_requested = true; + if (MyWalSnd) + SetLatch(&MyWalSnd->latch); } /* @@ -828,11 +855,20 @@ WalSndQuickDieHandler(SIGNAL_ARGS) exit(2); } +/* SIGUSR1: set flag to send WAL records */ +static void +WalSndXLogSendHandler(SIGNAL_ARGS) +{ + latch_sigusr1_handler(); +} + /* SIGUSR2: set flag to do a last cycle and shut down afterwards */ static void WalSndLastCycleHandler(SIGNAL_ARGS) { ready_to_stop = true; + if (MyWalSnd) + SetLatch(&MyWalSnd->latch); } /* Set up signal handlers */ @@ -847,7 +883,7 @@ WalSndSignals(void) pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); /* not used */ + pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ @@ -891,10 +927,21 @@ WalSndShmemInit(void) WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); + InitSharedLatch(&walsnd->latch); } } } +/* Wake up all walsenders */ +void +WalSndWakeup(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + SetLatch(&WalSndCtl->walsnds[i].latch); +} + /* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the |