aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiverfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r--src/backend/replication/walreceiverfuncs.c172
1 files changed, 69 insertions, 103 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index c1d7b558874..4fb132dcd4e 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -18,6 +18,8 @@
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
#include <unistd.h>
#include <signal.h>
@@ -30,8 +32,11 @@
WalRcvData *WalRcv = NULL;
-static bool CheckForStandbyTrigger(void);
-static void ShutdownWalRcv(void);
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10
/* Report shared memory space needed by WalRcvShmemInit */
Size
@@ -62,7 +67,7 @@ WalRcvShmemInit(void)
/* Initialize the data structures */
MemSet(WalRcv, 0, WalRcvShmemSize());
- WalRcv->walRcvState = WALRCV_NOT_STARTED;
+ WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
}
@@ -73,90 +78,51 @@ WalRcvInProgress(void)
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
+ pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
- state = walrcv->walRcvState;
- SpinLockRelease(&walrcv->mutex);
- if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
- return true;
- else
- return false;
-}
-
-/*
- * Wait for the XLOG record at given position to become available.
- *
- * 'recptr' indicates the byte position which caller wants to read the
- * XLOG record up to. The byte position actually written and flushed
- * by walreceiver is returned. It can be higher than the requested
- * location, and the caller can safely read up to that point without
- * calling WaitNextXLogAvailable() again.
- *
- * If WAL streaming is ended (because a trigger file is found), *finished
- * is set to true and function returns immediately. The returned position
- * can be lower than requested in that case.
- *
- * Called by the startup process during streaming recovery.
- */
-XLogRecPtr
-WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
-{
- static XLogRecPtr receivedUpto = {0, 0};
-
- *finished = false;
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
- /* Quick exit if already known available */
- if (XLByteLT(recptr, receivedUpto))
- return receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- for (;;)
+ /*
+ * If it has taken too long for walreceiver to start up, give up.
+ * Setting the state to STOPPED ensures that if walreceiver later
+ * does start up after all, it will see that it's not supposed to be
+ * running and die without doing anything.
+ */
+ if (state == WALRCV_STARTING)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /* Update local status */
- SpinLockAcquire(&walrcv->mutex);
- receivedUpto = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
+ pg_time_t now = (pg_time_t) time(NULL);
- /* If available already, leave here */
- if (XLByteLT(recptr, receivedUpto))
- return receivedUpto;
-
- /* Check to see if the trigger file exists */
- if (CheckForStandbyTrigger())
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
- *finished = true;
- return receivedUpto;
- }
+ SpinLockAcquire(&walrcv->mutex);
- pg_usleep(100000L); /* 100ms */
-
- /*
- * This possibly-long loop needs to handle interrupts of startup
- * process.
- */
- HandleStartupProcInterrupts();
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ state = walrcv->walRcvState = WALRCV_STOPPED;
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive(true))
- exit(1);
+ SpinLockRelease(&walrcv->mutex);
+ }
}
+
+ if (state != WALRCV_STOPPED)
+ return true;
+ else
+ return false;
}
/*
- * Stop walreceiver and wait for it to die.
+ * Stop walreceiver (if running) and wait for it to die.
*/
-static void
+void
ShutdownWalRcv(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- pid_t walrcvpid;
+ pid_t walrcvpid = 0;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -164,15 +130,25 @@ ShutdownWalRcv(void)
* restart itself.
*/
SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->walRcvState == WALRCV_RUNNING);
- walrcv->walRcvState = WALRCV_STOPPING;
- walrcvpid = walrcv->pid;
+ switch(walrcv->walRcvState)
+ {
+ case WALRCV_STOPPED:
+ break;
+ case WALRCV_STARTING:
+ walrcv->walRcvState = WALRCV_STOPPED;
+ break;
+
+ case WALRCV_RUNNING:
+ walrcv->walRcvState = WALRCV_STOPPING;
+ /* fall through */
+ case WALRCV_STOPPING:
+ walrcvpid = walrcv->pid;
+ break;
+ }
SpinLockRelease(&walrcv->mutex);
/*
- * Pid can be 0, if no walreceiver process is active right now.
- * Postmaster should restart it, and when it does, it will see the
- * STOPPING state.
+ * Signal walreceiver process if it was still running.
*/
if (walrcvpid != 0)
kill(walrcvpid, SIGTERM);
@@ -194,30 +170,6 @@ ShutdownWalRcv(void)
}
/*
- * Check to see if the trigger file exists. If it does, request postmaster
- * to shut down walreceiver and wait for it to exit, and remove the trigger
- * file.
- */
-static bool
-CheckForStandbyTrigger(void)
-{
- struct stat stat_buf;
-
- if (TriggerFile == NULL)
- return false;
-
- if (stat(TriggerFile, &stat_buf) == 0)
- {
- ereport(LOG,
- (errmsg("trigger file found: %s", TriggerFile)));
- ShutdownWalRcv();
- unlink(TriggerFile);
- return true;
- }
- return false;
-}
-
-/*
* Request postmaster to start walreceiver.
*
* recptr indicates the position where streaming should begin, and conninfo
@@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ pg_time_t now = (pg_time_t) time(NULL);
- Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+ /*
+ * We always start at the beginning of the segment.
+ * That prevents a broken segment (i.e., with no records in the
+ * first half of a segment) from being created by XLOG streaming,
+ * which might cause trouble later on if the segment is e.g
+ * archived.
+ */
+ if (recptr.xrecoff % XLogSegSize != 0)
+ recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+
+ /* It better be stopped before we try to restart it */
+ Assert(walrcv->walRcvState == WALRCV_STOPPED);
- /* locking is just pro forma here; walreceiver isn't started yet */
SpinLockAcquire(&walrcv->mutex);
- walrcv->receivedUpto = recptr;
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
- walrcv->walRcvState = WALRCV_RUNNING;
+ walrcv->walRcvState = WALRCV_STARTING;
+ walrcv->startTime = now;
+
+ walrcv->receivedUpto = recptr;
SpinLockRelease(&walrcv->mutex);
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
@@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void)
return recptr;
}
+