diff options
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 172 |
1 files changed, 69 insertions, 103 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index c1d7b558874..4fb132dcd4e 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -18,6 +18,8 @@ #include <sys/types.h> #include <sys/stat.h> +#include <sys/time.h> +#include <time.h> #include <unistd.h> #include <signal.h> @@ -30,8 +32,11 @@ WalRcvData *WalRcv = NULL; -static bool CheckForStandbyTrigger(void); -static void ShutdownWalRcv(void); +/* + * How long to wait for walreceiver to start up after requesting + * postmaster to launch it. In seconds. + */ +#define WALRCV_STARTUP_TIMEOUT 10 /* Report shared memory space needed by WalRcvShmemInit */ Size @@ -62,7 +67,7 @@ WalRcvShmemInit(void) /* Initialize the data structures */ MemSet(WalRcv, 0, WalRcvShmemSize()); - WalRcv->walRcvState = WALRCV_NOT_STARTED; + WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); } @@ -73,90 +78,51 @@ WalRcvInProgress(void) /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; + pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); - state = walrcv->walRcvState; - SpinLockRelease(&walrcv->mutex); - if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) - return true; - else - return false; -} - -/* - * Wait for the XLOG record at given position to become available. - * - * 'recptr' indicates the byte position which caller wants to read the - * XLOG record up to. The byte position actually written and flushed - * by walreceiver is returned. It can be higher than the requested - * location, and the caller can safely read up to that point without - * calling WaitNextXLogAvailable() again. - * - * If WAL streaming is ended (because a trigger file is found), *finished - * is set to true and function returns immediately. The returned position - * can be lower than requested in that case. - * - * Called by the startup process during streaming recovery. - */ -XLogRecPtr -WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished) -{ - static XLogRecPtr receivedUpto = {0, 0}; - - *finished = false; + state = walrcv->walRcvState; + startTime = walrcv->startTime; - /* Quick exit if already known available */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; + SpinLockRelease(&walrcv->mutex); - for (;;) + /* + * If it has taken too long for walreceiver to start up, give up. + * Setting the state to STOPPED ensures that if walreceiver later + * does start up after all, it will see that it's not supposed to be + * running and die without doing anything. + */ + if (state == WALRCV_STARTING) { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* Update local status */ - SpinLockAcquire(&walrcv->mutex); - receivedUpto = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); + pg_time_t now = (pg_time_t) time(NULL); - /* If available already, leave here */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; - - /* Check to see if the trigger file exists */ - if (CheckForStandbyTrigger()) + if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - *finished = true; - return receivedUpto; - } + SpinLockAcquire(&walrcv->mutex); - pg_usleep(100000L); /* 100ms */ - - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); + if (walrcv->walRcvState == WALRCV_STARTING) + state = walrcv->walRcvState = WALRCV_STOPPED; - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive(true)) - exit(1); + SpinLockRelease(&walrcv->mutex); + } } + + if (state != WALRCV_STOPPED) + return true; + else + return false; } /* - * Stop walreceiver and wait for it to die. + * Stop walreceiver (if running) and wait for it to die. */ -static void +void ShutdownWalRcv(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - pid_t walrcvpid; + pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -164,15 +130,25 @@ ShutdownWalRcv(void) * restart itself. */ SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_RUNNING); - walrcv->walRcvState = WALRCV_STOPPING; - walrcvpid = walrcv->pid; + switch(walrcv->walRcvState) + { + case WALRCV_STOPPED: + break; + case WALRCV_STARTING: + walrcv->walRcvState = WALRCV_STOPPED; + break; + + case WALRCV_RUNNING: + walrcv->walRcvState = WALRCV_STOPPING; + /* fall through */ + case WALRCV_STOPPING: + walrcvpid = walrcv->pid; + break; + } SpinLockRelease(&walrcv->mutex); /* - * Pid can be 0, if no walreceiver process is active right now. - * Postmaster should restart it, and when it does, it will see the - * STOPPING state. + * Signal walreceiver process if it was still running. */ if (walrcvpid != 0) kill(walrcvpid, SIGTERM); @@ -194,30 +170,6 @@ ShutdownWalRcv(void) } /* - * Check to see if the trigger file exists. If it does, request postmaster - * to shut down walreceiver and wait for it to exit, and remove the trigger - * file. - */ -static bool -CheckForStandbyTrigger(void) -{ - struct stat stat_buf; - - if (TriggerFile == NULL) - return false; - - if (stat(TriggerFile, &stat_buf) == 0) - { - ereport(LOG, - (errmsg("trigger file found: %s", TriggerFile))); - ShutdownWalRcv(); - unlink(TriggerFile); - return true; - } - return false; -} - -/* * Request postmaster to start walreceiver. * * recptr indicates the position where streaming should begin, and conninfo @@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + pg_time_t now = (pg_time_t) time(NULL); - Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); + /* + * We always start at the beginning of the segment. + * That prevents a broken segment (i.e., with no records in the + * first half of a segment) from being created by XLOG streaming, + * which might cause trouble later on if the segment is e.g + * archived. + */ + if (recptr.xrecoff % XLogSegSize != 0) + recptr.xrecoff -= recptr.xrecoff % XLogSegSize; + + /* It better be stopped before we try to restart it */ + Assert(walrcv->walRcvState == WALRCV_STOPPED); - /* locking is just pro forma here; walreceiver isn't started yet */ SpinLockAcquire(&walrcv->mutex); - walrcv->receivedUpto = recptr; if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; - walrcv->walRcvState = WALRCV_RUNNING; + walrcv->walRcvState = WALRCV_STARTING; + walrcv->startTime = now; + + walrcv->receivedUpto = recptr; SpinLockRelease(&walrcv->mutex); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); @@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void) return recptr; } + |