diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/Makefile | 17 | ||||
-rw-r--r-- | src/backend/replication/README | 58 | ||||
-rw-r--r-- | src/backend/replication/walreceiver/Makefile | 32 | ||||
-rw-r--r-- | src/backend/replication/walreceiver/walreceiver.c | 796 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 262 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 851 |
6 files changed, 2016 insertions, 0 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile new file mode 100644 index 00000000000..7903c1ac5e4 --- /dev/null +++ b/src/backend/replication/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication +# +# IDENTIFICATION +# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $ +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = walsender.o walreceiverfuncs.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README new file mode 100644 index 00000000000..0f40dc79e90 --- /dev/null +++ b/src/backend/replication/README @@ -0,0 +1,58 @@ +$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $ + +Walreceiver IPC +--------------- + +When the WAL replay in startup process has reached the end of archived WAL, +recoverable using recovery_command, it starts up the walreceiver process +to fetch more WAL (if streaming replication is configured). + +Walreceiver is a postmaster subprocess, so the startup process can't fork it +directly. Instead, it sends a signal to postmaster, asking postmaster to launch +it. Before that, however, startup process fills in WalRcvData->conninfo, +and initializes the starting point in WalRcvData->receivedUpTo. + +As walreceiver receives WAL from the master server, and writes and flushes +it to disk (in pg_xlog), it updates WalRcvData->receivedUpTo. Startup process +polls that to know how far it can proceed with WAL replay. + +Walsender IPC +------------- + +At shutdown, postmaster handles walsender processes differently from regular +backends. It waits for regular backends to die before writing the +shutdown checkpoint and terminating pgarch and other auxiliary processes, but +that's not desirable for walsenders, because we want the standby servers to +receive all the WAL, including the shutdown checkpoint, before the master +is shut down. Therefore postmaster treats walsenders like the pgarch process, +and instructs them to terminate at PM_SHUTDOWN_2 phase, after all regular +backends have died and bgwriter has written the shutdown checkpoint. + +When postmaster accepts a connection, it immediately forks a new process +to handle the handshake and authentication, and the process initializes to +become a backend. Postmaster doesn't know if the process becomes a regular +backend or a walsender process at that time - that's indicated in the +connection handshake - so we need some extra signaling to let postmaster +identify walsender processes. + +When walsender process starts up, it marks itself as a walsender process in +the PMSignal array. That way postmaster can tell it apart from regular +backends. + +Note that no big harm is done if postmaster thinks that a walsender is a +regular backend; it will just terminate the walsender earlier in the shutdown +phase. A walsenders will look like a regular backends until it's done with the +initialization and has marked itself in PMSignal array, and at process +termination, after unmarking the PMSignal slot. + +Each walsender allocates an entry from the WalSndCtl array, and advertises +there how far it has streamed WAL already. This is used at checkpoints, to +avoid recycling WAL that hasn't been streamed to a slave yet. However, +that doesn't stop such WAL from being recycled when the connection is not +established. + + +Walsender - walreceiver protocol +-------------------------------- + +See manual. diff --git a/src/backend/replication/walreceiver/Makefile b/src/backend/replication/walreceiver/Makefile new file mode 100644 index 00000000000..2b26a95b4de --- /dev/null +++ b/src/backend/replication/walreceiver/Makefile @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/walreceiver +# +# IDENTIFICATION +# $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $ +# +#------------------------------------------------------------------------- + +subdir = src/backend/postmaster/walreceiver +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = walreceiver.o +SHLIB_LINK = $(libpq) +NAME = walreceiver + +all: submake-libpq all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/walreceiver/walreceiver.c b/src/backend/replication/walreceiver/walreceiver.c new file mode 100644 index 00000000000..1d0d6404b90 --- /dev/null +++ b/src/backend/replication/walreceiver/walreceiver.c @@ -0,0 +1,796 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.c + * + * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It + * is the process in the standby server that takes charge of receiving + * XLOG records from a primary server during streaming replication. + * + * When the startup process determines that it's time to start streaming, + * it instructs postmaster to start walreceiver. Walreceiver first connects + * connects to the primary server (it will be served by a walsender process + * in the primary server), and then keeps receiving XLOG records and + * writing them to the disk as long as the connection is alive. As XLOG + * records are received and flushed to disk, it updates the + * WalRcv->receivedUpTo variable in shared memory, to inform the startup + * process of how far it can proceed with XLOG replay. + * + * Normal termination is by SIGTERM, which instructs the walreceiver to + * exit(0). Emergency termination is by SIGQUIT; like any postmaster child + * process, the walreceiver will simply abort and exit on SIGQUIT. A close + * of the connection and a FATAL error are treated not as a crash but as + * normal operation. + * + * Walreceiver is a postmaster child process like others, but it's compiled + * as a dynamic module to avoid linking libpq with the main server binary. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/xlog_internal.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/resowner.h" + +#ifdef HAVE_POLL_H +#include <poll.h> +#endif +#ifdef HAVE_SYS_POLL_H +#include <sys/poll.h> +#endif +#ifdef HAVE_SYS_SELECT_H +#include <sys/select.h> +#endif + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(WalReceiverMain); +Datum WalReceiverMain(PG_FUNCTION_ARGS); + +/* streamConn is a PGconn object of a connection to walsender from walreceiver */ +static PGconn *streamConn = NULL; + +#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ + +/* + * These variables are used similarly to openLogFile/Id/Seg/Off, + * but for walreceiver to write the XLOG. + */ +static int recvFile = -1; +static uint32 recvId = 0; +static uint32 recvSeg = 0; +static uint32 recvOff = 0; + +/* Buffer for currently read records */ +static char *recvBuf = NULL; + +/* Flags set by interrupt handlers of walreceiver for later service in the main loop */ +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGTERM = false; + +static void ProcessWalRcvInterrupts(void); +static void EnableImmediateExit(void); +static void DisableImmediateExit(void); + +/* + * About SIGTERM handling: + * + * We can't just exit(1) within SIGTERM signal handler, because the signal + * might arrive in the middle of some critical operation, like while we're + * holding a spinlock. We also can't just set a flag in signal handler and + * check it in the main loop, because we perform some blocking libpq + * operations like PQexec(), which can take a long time to finish. + * + * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's + * safe for the signal handler to elog(FATAL) immediately. Otherwise it just + * sets got_SIGTERM flag, which is checked in the main loop when convenient. + * + * This is very much like what regular backends do with ImmediateInterruptOK, + * ProcessInterrupts() etc. + */ +static volatile bool WalRcvImmediateInterruptOK = false; + +static void +ProcessWalRcvInterrupts(void) +{ + /* + * Although walreceiver interrupt handling doesn't use the same scheme + * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we + * receive any incoming signals on Win32. + */ + CHECK_FOR_INTERRUPTS(); + + if (got_SIGTERM) + { + WalRcvImmediateInterruptOK = false; + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating walreceiver process due to administrator command"))); + } +} + +static void +EnableImmediateExit() +{ + WalRcvImmediateInterruptOK = true; + ProcessWalRcvInterrupts(); +} + +static void +DisableImmediateExit() +{ + WalRcvImmediateInterruptOK = false; + ProcessWalRcvInterrupts(); +} + +/* Signal handlers */ +static void WalRcvSigHupHandler(SIGNAL_ARGS); +static void WalRcvShutdownHandler(SIGNAL_ARGS); +static void WalRcvQuickDieHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +static void WalRcvLoop(void); +static void InitWalRcv(void); +static void WalRcvConnect(void); +static bool WalRcvWait(int timeout_ms); +static void WalRcvKill(int code, Datum arg); +static void XLogRecv(void); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalRcvFlush(void); + +/* + * LogstreamResult indicates the byte positions that we have already + * written/fsynced. + */ +static struct +{ + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; + +/* Main entry point for walreceiver process */ +Datum +WalReceiverMain(PG_FUNCTION_ARGS) +{ + sigjmp_buf local_sigjmp_buf; + MemoryContext walrcv_context; + + /* Mark walreceiver in progress */ + InitWalRcv(); + + /* + * If possible, make this process a group leader, so that the postmaster + * can signal any child processes too. (walreceiver probably never has + * any child processes, but for consistency we make all postmaster child + * processes do this.) + */ +#ifdef HAVE_SETSID + if (setsid() < 0) + elog(FATAL, "setsid() failed: %m"); +#endif + + /* Properly accept or ignore signals the postmaster might send us */ + pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ + pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR2, SIG_IGN); + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); + + /* We allow SIGQUIT (quickdie) at all times */ + sigdelset(&BlockSig, SIGQUIT); + + /* + * Create a resource owner to keep track of our resources (not clear that + * we need this, but may as well have one). + */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); + + /* + * Create a memory context that we will do all our work in. We do this so + * that we can reset the context during error recovery and thereby avoid + * possible memory leaks. + */ + walrcv_context = AllocSetContextCreate(TopMemoryContext, + "Wal Receiver", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(walrcv_context); + + /* + * If an exception is encountered, processing resumes here. + * + * This code is heavily based on bgwriter.c, q.v. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Reset WalRcvImmediateInterruptOK */ + DisableImmediateExit(); + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* Free the data structure related to a connection */ + PQfinish(streamConn); + streamConn = NULL; + if (recvBuf != NULL) + PQfreemem(recvBuf); + recvBuf = NULL; + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(walrcv_context); + FlushErrorState(); + + /* Flush any leaked data in the top-level context */ + MemoryContextResetAndDeleteChildren(walrcv_context); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* + * Sleep at least 1 second after any error. A write error is likely + * to be repeated, and we don't want to be filling the error logs as + * fast as we can. + */ + pg_usleep(1000000L); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); + + /* Establish the connection to the primary for XLOG streaming */ + WalRcvConnect(); + + /* Main loop of walreceiver */ + WalRcvLoop(); + + PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */ +} + +/* Main loop of walreceiver process */ +static void +WalRcvLoop(void) +{ + /* Loop until end-of-streaming or error */ + for (;;) + { + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + + /* + * Exit walreceiver if we're not in recovery. This should not happen, + * but cross-check the status here. + */ + if (!RecoveryInProgress()) + ereport(FATAL, + (errmsg("cannot continue XLOG streaming, recovery has already ended"))); + + /* Process any requests or signals received recently */ + ProcessWalRcvInterrupts(); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Wait a while for data to arrive */ + if (WalRcvWait(NAPTIME_PER_CYCLE)) + { + /* data has arrived. Process it */ + if (PQconsumeInput(streamConn) == 0) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + XLogRecv(); + } + } +} + +/* Advertise our pid in shared memory, so that startup process can kill us. */ +static void +InitWalRcv(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* + * WalRcv should be set up already (if we are a backend, we inherit + * this by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + if (walrcv == NULL) + elog(PANIC, "walreceiver control data uninitialized"); + + /* If we've already been requested to stop, don't start up */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + if (walrcv->walRcvState == WALRCV_STOPPED || + walrcv->walRcvState == WALRCV_STOPPING) + { + walrcv->walRcvState = WALRCV_STOPPED; + SpinLockRelease(&walrcv->mutex); + proc_exit(1); + } + walrcv->pid = MyProcPid; + SpinLockRelease(&walrcv->mutex); + + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvKill, 0); +} + +/* + * Establish the connection to the primary server for XLOG streaming + */ +static void +WalRcvConnect(void) +{ + char conninfo[MAXCONNINFO + 14]; + char *primary_sysid; + char standby_sysid[32]; + TimeLineID primary_tli; + TimeLineID standby_tli; + PGresult *res; + XLogRecPtr recptr; + char cmd[64]; + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* + * Set up a connection for XLOG streaming + */ + SpinLockAcquire(&walrcv->mutex); + snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo); + recptr = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + /* initialize local XLOG pointers */ + LogstreamResult.Write = LogstreamResult.Flush = recptr; + + Assert(recptr.xlogid != 0 || recptr.xrecoff != 0); + + EnableImmediateExit(); + streamConn = PQconnectdb(conninfo); + DisableImmediateExit(); + if (PQstatus(streamConn) != CONNECTION_OK) + ereport(ERROR, + (errmsg("could not connect to the primary server : %s", + PQerrorMessage(streamConn)))); + + /* + * Get the system identifier and timeline ID as a DataRow message + * from the primary server. + */ + EnableImmediateExit(); + res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + DisableImmediateExit(); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive the SYSID and timeline ID from " + "the primary server: %s", + PQerrorMessage(streamConn)))); + } + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields", + ntuples, nfields))); + } + primary_sysid = PQgetvalue(res, 0, 0); + primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); + + /* + * Confirm that the system identifier of the primary is the same + * as ours. + */ + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + PQclear(res); + ereport(ERROR, + (errmsg("system differs between the primary and standby"), + errdetail("the primary SYSID is %s, standby SYSID is %s", + primary_sysid, standby_sysid))); + } + + /* + * Confirm that the current timeline of the primary is the same + * as the recovery target timeline. + */ + standby_tli = GetRecoveryTargetTLI(); + PQclear(res); + if (primary_tli != standby_tli) + ereport(ERROR, + (errmsg("timeline %u of the primary does not match recovery target timeline %u", + primary_tli, standby_tli))); + ThisTimeLineID = primary_tli; + + /* Start streaming from the point requested by startup process */ + snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff); + EnableImmediateExit(); + res = PQexec(streamConn, cmd); + DisableImmediateExit(); + if (PQresultStatus(res) != PGRES_COPY_OUT) + ereport(ERROR, + (errmsg("could not start XLOG streaming: %s", + PQerrorMessage(streamConn)))); + PQclear(res); + + /* + * Process the outstanding messages before beginning to wait for + * new message to arrive. + */ + XLogRecv(); +} + +/* + * Wait until we can read WAL stream, or timeout. + * + * Returns true if data has become available for reading, false if timed out + * or interrupted by signal. + * + * This is based on pqSocketCheck. + */ +static bool +WalRcvWait(int timeout_ms) +{ + int ret; + + Assert(streamConn != NULL); + if (PQsocket(streamConn) < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("socket not open"))); + + /* We use poll(2) if available, otherwise select(2) */ + { +#ifdef HAVE_POLL + struct pollfd input_fd; + + input_fd.fd = PQsocket(streamConn); + input_fd.events = POLLIN | POLLERR; + input_fd.revents = 0; + + ret = poll(&input_fd, 1, timeout_ms); +#else /* !HAVE_POLL */ + + fd_set input_mask; + struct timeval timeout; + struct timeval *ptr_timeout; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(streamConn), &input_mask); + + if (timeout_ms < 0) + ptr_timeout = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; + } + + ret = select(PQsocket(streamConn) + 1, &input_mask, + NULL, NULL, ptr_timeout); +#endif /* HAVE_POLL */ + } + + if (ret == 0 || (ret < 0 && errno == EINTR)) + return false; + if (ret < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + return true; +} + +/* + * Clear our pid from shared memory at exit. + */ +static void +WalRcvKill(int code, Datum arg) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + bool stopped = false; + + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_STOPPING || + walrcv->walRcvState == WALRCV_STOPPED) + { + walrcv->walRcvState = WALRCV_STOPPED; + stopped = true; + elog(LOG, "walreceiver stopped"); + } + walrcv->pid = 0; + SpinLockRelease(&walrcv->mutex); + + PQfinish(streamConn); + + /* If requested to stop, tell postmaster to not restart us. */ + if (stopped) + SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +WalRcvSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} + +/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ +static void +WalRcvShutdownHandler(SIGNAL_ARGS) +{ + got_SIGTERM = true; + + /* Don't joggle the elbow of proc_exit */ + if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) + ProcessWalRcvInterrupts(); +} + +/* + * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. + * + * Some backend has bought the farm, so we need to stop what we're doing and + * exit. + */ +static void +WalRcvQuickDieHandler(SIGNAL_ARGS) +{ + PG_SETMASK(&BlockSig); + + /* + * We DO NOT want to run proc_exit() callbacks -- we're here because + * shared memory may be corrupted, so we don't want to try to clean up our + * transaction. Just nail the windows shut and get out of town. Now that + * there's an atexit callback to prevent third-party code from breaking + * things by calling exit() directly, we have to reset the callbacks + * explicitly to make this work as intended. + */ + on_exit_reset(); + + /* + * Note we do exit(2) not exit(0). This is to force the postmaster into a + * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random + * backend. This is necessary precisely because we don't clean up our + * shared memory state. (The "dead man switch" mechanism in pmsignal.c + * should ensure the postmaster sees this as a crash, too, but no harm + * in being doubly sure.) + */ + exit(2); +} + +/* + * Receive any WAL records available without blocking from XLOG stream and + * write it to the disk. + */ +static void +XLogRecv(void) +{ + XLogRecPtr *recptr; + int len; + + for (;;) + { + /* Receive CopyData message */ + len = PQgetCopyData(streamConn, &recvBuf, 1); + if (len == 0) /* no records available yet, then return */ + break; + if (len == -1) /* end-of-streaming or error */ + { + PGresult *res; + + res = PQgetResult(streamConn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("replication terminated by primary server"))); + } + PQclear(res); + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + } + if (len < -1) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + + if (len < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + /* Write received WAL records to disk */ + recptr = (XLogRecPtr *) recvBuf; + XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr), + len - sizeof(XLogRecPtr), *recptr); + + if (recvBuf != NULL) + PQfreemem(recvBuf); + recvBuf = NULL; + } + + /* + * Now that we've written some records, flush them to disk and let the + * startup process know about them. + */ + XLogWalRcvFlush(); +} + +/* + * Write XLOG data to disk. + */ +static void +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) +{ + int startoff; + int byteswritten; + + while (nbytes > 0) + { + int segbytes; + + if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg)) + { + bool use_existent; + + /* + * XLOG segment files will be re-read in recovery operation soon, + * so we don't need to advise the OS to release any cache page. + */ + if (recvFile >= 0) + { + /* + * fsync() before we switch to next file. We would otherwise + * have to reopen this file to fsync it later + */ + XLogWalRcvFlush(); + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log file %u, segment %u: %m", + recvId, recvSeg))); + } + recvFile = -1; + + /* Create/use new log file */ + XLByteToSeg(recptr, recvId, recvSeg); + use_existent = true; + recvFile = XLogFileInit(recvId, recvSeg, + &use_existent, true); + recvOff = 0; + } + + /* Calculate the start offset of the received logs */ + startoff = recptr.xrecoff % XLogSegSize; + + if (startoff + nbytes > XLogSegSize) + segbytes = XLogSegSize - startoff; + else + segbytes = nbytes; + + /* Need to seek in the file? */ + if (recvOff != startoff) + { + if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, " + "segment %u to offset %u: %m", + recvId, recvSeg, startoff))); + recvOff = startoff; + } + + /* OK to write the logs */ + errno = 0; + + byteswritten = write(recvFile, buf, segbytes); + if (byteswritten <= 0) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log file %u, segment %u " + "at offset %u, length %lu: %m", + recvId, recvSeg, + recvOff, (unsigned long) segbytes))); + } + + /* Update state for write */ + XLByteAdvance(recptr, byteswritten); + + recvOff += byteswritten; + nbytes -= byteswritten; + buf += byteswritten; + + LogstreamResult.Write = recptr; + + /* + * XXX: Should we signal bgwriter to start a restartpoint + * if we've consumed too much xlog since the last one, like + * in normal processing? But this is not worth doing unless + * a restartpoint can be created independently from a + * checkpoint record. + */ + } +} + +/* Flush the log to disk */ +static void +XLogWalRcvFlush(void) +{ + if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write)) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + char activitymsg[50]; + + issue_xlog_fsync(recvFile, recvId, recvSeg); + + LogstreamResult.Flush = LogstreamResult.Write; + + /* Update shared-memory status */ + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = LogstreamResult.Flush; + SpinLockRelease(&walrcv->mutex); + + /* Report XLOG streaming progress in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); + set_ps_display(activitymsg, false); + } +} diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c new file mode 100644 index 00000000000..4342e252d65 --- /dev/null +++ b/src/backend/replication/walreceiverfuncs.c @@ -0,0 +1,262 @@ +/*------------------------------------------------------------------------- + * + * walreceiverfuncs.c + * + * This file contains functions used by the startup process to communicate + * with the walreceiver process. Functions implementing walreceiver itself + * are in src/backend/replication/walreceiver subdirectory. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <signal.h> + +#include "access/xlog_internal.h" +#include "replication/walreceiver.h" +#include "storage/fd.h" +#include "storage/pmsignal.h" +#include "storage/shmem.h" +#include "utils/guc.h" + +WalRcvData *WalRcv = NULL; + +static bool CheckForStandbyTrigger(void); +static void ShutdownWalRcv(void); + +/* Report shared memory space needed by WalRcvShmemInit */ +Size +WalRcvShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(WalRcvData)); + + return size; +} + +/* Allocate and initialize walreceiver-related shared memory */ +void +WalRcvShmemInit(void) +{ + bool found; + + WalRcv = (WalRcvData *) + ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); + + if (WalRcv == NULL) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("not enough shared memory for walreceiver"))); + if (found) + return; /* already initialized */ + + /* Initialize the data structures */ + MemSet(WalRcv, 0, WalRcvShmemSize()); + WalRcv->walRcvState = WALRCV_NOT_STARTED; + SpinLockInit(&WalRcv->mutex); +} + +/* Is walreceiver in progress (or starting up)? */ +bool +WalRcvInProgress(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + WalRcvState state; + + SpinLockAcquire(&walrcv->mutex); + state = walrcv->walRcvState; + SpinLockRelease(&walrcv->mutex); + + if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) + return true; + else + return false; +} + +/* + * Wait for the XLOG record at given position to become available. + * + * 'recptr' indicates the byte position which caller wants to read the + * XLOG record up to. The byte position actually written and flushed + * by walreceiver is returned. It can be higher than the requested + * location, and the caller can safely read up to that point without + * calling WaitNextXLogAvailable() again. + * + * If WAL streaming is ended (because a trigger file is found), *finished + * is set to true and function returns immediately. The returned position + * can be lower than requested in that case. + * + * Called by the startup process during streaming recovery. + */ +XLogRecPtr +WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished) +{ + static XLogRecPtr receivedUpto = {0, 0}; + + *finished = false; + + /* Quick exit if already known available */ + if (XLByteLT(recptr, receivedUpto)) + return receivedUpto; + + for (;;) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* Update local status */ + SpinLockAcquire(&walrcv->mutex); + receivedUpto = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + /* If available already, leave here */ + if (XLByteLT(recptr, receivedUpto)) + return receivedUpto; + + /* Check to see if the trigger file exists */ + if (CheckForStandbyTrigger()) + { + *finished = true; + return receivedUpto; + } + + pg_usleep(100000L); /* 100ms */ + + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + } +} + +/* + * Stop walreceiver and wait for it to die. + */ +static void +ShutdownWalRcv(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + pid_t walrcvpid; + + /* + * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED + * mode once it's finished, and will also request postmaster to not + * restart itself. + */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_RUNNING); + walrcv->walRcvState = WALRCV_STOPPING; + walrcvpid = walrcv->pid; + SpinLockRelease(&walrcv->mutex); + + /* + * Pid can be 0, if no walreceiver process is active right now. + * Postmaster should restart it, and when it does, it will see the + * STOPPING state. + */ + if (walrcvpid != 0) + kill(walrcvpid, SIGTERM); + + /* + * Wait for walreceiver to acknowledge its death by setting state to + * WALRCV_STOPPED. + */ + while (WalRcvInProgress()) + { + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + + pg_usleep(100000); /* 100ms */ + } +} + +/* + * Check to see if the trigger file exists. If it does, request postmaster + * to shut down walreceiver and wait for it to exit, and remove the trigger + * file. + */ +static bool +CheckForStandbyTrigger(void) +{ + struct stat stat_buf; + + if (TriggerFile == NULL) + return false; + + if (stat(TriggerFile, &stat_buf) == 0) + { + ereport(LOG, + (errmsg("trigger file found: %s", TriggerFile))); + ShutdownWalRcv(); + unlink(TriggerFile); + return true; + } + return false; +} + +/* + * Request postmaster to start walreceiver. + * + * recptr indicates the position where streaming should begin, and conninfo + * is a libpq connection string to use. + */ +void +RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); + + /* locking is just pro forma here; walreceiver isn't started yet */ + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = recptr; + if (conninfo != NULL) + strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); + else + walrcv->conninfo[0] = '\0'; + walrcv->walRcvState = WALRCV_RUNNING; + SpinLockRelease(&walrcv->mutex); + + SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); +} + +/* + * Returns the byte position that walreceiver has written + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + XLogRecPtr recptr; + + SpinLockAcquire(&walrcv->mutex); + recptr = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + return recptr; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c new file mode 100644 index 00000000000..8a3352e0755 --- /dev/null +++ b/src/backend/replication/walsender.c @@ -0,0 +1,851 @@ +/*------------------------------------------------------------------------- + * + * walsender.c + * + * The WAL sender process (walsender) is new as of Postgres 8.5. It takes + * charge of XLOG streaming sender in the primary server. At first, it is + * started by the postmaster when the walreceiver in the standby server + * connects to the primary server and requests XLOG streaming replication, + * i.e., unlike any auxiliary process, it is not an always-running process. + * It attempts to keep reading XLOG records from the disk and sending them + * to the standby server, as long as the connection is alive (i.e., like + * any backend, there is an one to one relationship between a connection + * and a walsender process). + * + * Normal termination is by SIGTERM, which instructs the walsender to + * close the connection and exit(0) at next convenient moment. Emergency + * termination is by SIGQUIT; like any backend, the walsender will simply + * abort and exit on SIGQUIT. A close of the connection and a FATAL error + * are treated as not a crash but approximately normal termination; + * the walsender will exit quickly without sending any more XLOG records. + * + * If the server is shut down, postmaster sends us SIGUSR2 after all + * regular backends have exited and the shutdown checkpoint has been written. + * This instruct walsender to send any outstanding WAL, including the + * shutdown checkpoint record, and then exit. + * + * Note that there can be more than one walsender process concurrently. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/xlog_internal.h" +#include "catalog/pg_type.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "replication/walsender.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lock.h" +#include "storage/pmsignal.h" +#include "tcop/tcopprot.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" + +/* Array of WalSnds in shared memory */ +WalSndCtlData *WalSndCtl = NULL; + +/* My slot in the shared memory array */ +static WalSnd *MyWalSnd = NULL; + +/* Global state */ +bool am_walsender = false; /* Am I a walsender process ? */ + +/* User-settable parameters for walsender */ +int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */ +int WalSndDelay = 200; /* max sleep time between some actions */ + +#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ + +/* + * These variables are used similarly to openLogFile/Id/Seg/Off, + * but for walsender to read the XLOG. + */ +static int sendFile = -1; +static uint32 sendId = 0; +static uint32 sendSeg = 0; +static uint32 sendOff = 0; + +/* + * How far have we sent WAL already? This is also advertised in + * MyWalSnd->sentPtr. + */ +static XLogRecPtr sentPtr = {0, 0}; + +/* Flags set by signal handlers for later service in main loop */ +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t shutdown_requested = false; +static volatile sig_atomic_t ready_to_stop = false; + +/* Signal handlers */ +static void WalSndSigHupHandler(SIGNAL_ARGS); +static void WalSndShutdownHandler(SIGNAL_ARGS); +static void WalSndQuickDieHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +static int WalSndLoop(void); +static void InitWalSnd(void); +static void WalSndHandshake(void); +static void WalSndKill(int code, Datum arg); +static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); +static bool XLogSend(StringInfo outMsg); +static void CheckClosedConnection(void); + +/* + * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. + */ +#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2) + +/* Main entry point for walsender process */ +int +WalSenderMain(void) +{ + MemoryContext walsnd_context; + + if (!superuser()) + ereport(FATAL, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to start walsender"))); + + /* Create a per-walsender data structure in shared memory */ + InitWalSnd(); + + /* + * Create a memory context that we will do all our work in. We do this so + * that we can reset the context during error recovery and thereby avoid + * possible memory leaks. Formerly this code just ran in + * TopMemoryContext, but resetting that would be a really bad idea. + * + * XXX: we don't actually attempt error recovery in walsender, we just + * close the connection and exit. + */ + walsnd_context = AllocSetContextCreate(TopMemoryContext, + "Wal Sender", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(walsnd_context); + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); + + /* Tell the standby that walsender is ready for receiving commands */ + ReadyForQuery(DestRemote); + + /* Handle handshake messages before streaming */ + WalSndHandshake(); + + /* Main loop of walsender */ + return WalSndLoop(); +} + +static void +WalSndHandshake(void) +{ + StringInfoData input_message; + bool replication_started = false; + + initStringInfo(&input_message); + + while (!replication_started) + { + int firstchar; + + /* Wait for a command to arrive */ + firstchar = pq_getbyte(); + + /* + * Check for any other interesting events that happened while we + * slept. + */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (firstchar == EOF) + { + /* standby disconnected */ + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + } + else + { + /* + * Read the message contents. This is expected to be done without + * blocking because we've been able to get message type code. + */ + if (pq_getmessage(&input_message, 0)) + firstchar = EOF; /* suitable message already logged */ + } + + + /* Handle the very limited subset of commands expected in this phase */ + + switch (firstchar) + { + case 'Q': /* Query message */ + { + const char *query_string; + XLogRecPtr recptr; + + query_string = pq_getmsgstring(&input_message); + pq_getmsgend(&input_message); + + if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) + { + StringInfoData buf; + char sysid[32]; + char tli[11]; + + /* + * Reply with a result set with one row, two columns. + * First col is system ID, and second if timeline ID + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "systemid"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "timeline"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, INT4OID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ + pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); + pq_sendint(&buf, strlen(tli), 4); /* col2 len */ + pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_endmessage(&buf); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + } + else if (sscanf(query_string, "START_REPLICATION %X/%X", + &recptr.xlogid, &recptr.xrecoff) == 2) + { + StringInfoData buf; + + /* Send a CopyOutResponse message, and start streaming */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* + * Initialize position to the received one, then + * the xlog records begin to be shipped from that position + */ + sentPtr = recptr; + + /* break out of the loop */ + replication_started = true; + } + else + { + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby query string: %s", query_string))); + } + break; + } + + /* 'X' means that the standby is closing the connection */ + case 'X': + proc_exit(0); + + case EOF: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby handshake message type %d", firstchar))); + } + } +} + +/* + * Check if the remote end has closed the connection. + */ +static void +CheckClosedConnection(void) +{ + unsigned char firstchar; + int r; + + r = pq_getbyte_if_available(&firstchar); + if (r < 0) + { + /* no data available */ + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + /* + * Ok if interrupted, though it shouldn't really happen with + * a non-blocking operation. + */ + if (errno == EINTR) + return; + + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("could not receive data from client: %m"))); + } + if (r == 0) + { + /* standby disconnected unexpectedly */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + } + + /* Handle the very limited subset of commands expected in this phase */ + switch (firstchar) + { + /* + * 'X' means that the standby is closing down the socket. EOF means + * unexpected loss of standby connection. Either way, perform normal + * shutdown. + */ + case 'X': + proc_exit(0); + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby closing message type %d", + firstchar))); + } +} + +/* Main loop of walsender process */ +static int +WalSndLoop(void) +{ + StringInfoData output_message; + + initStringInfo(&output_message); + + /* Loop forever */ + for (;;) + { + int remain; /* remaining time (ms) */ + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + /* Process any requests or signals received recently */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * When SIGUSR2 arrives, we send all outstanding logs up to the + * shutdown checkpoint record (i.e., the latest record) and exit. + */ + if (ready_to_stop) + { + XLogSend(&output_message); + shutdown_requested = true; + } + + /* Normal exit from the walsender is here */ + if (shutdown_requested) + { + /* Inform the standby that XLOG streaming was done */ + pq_puttextmessage('C', "COPY 0"); + pq_flush(); + + proc_exit(0); + } + + /* + * Nap for the configured time or until a message arrives. + * + * On some platforms, signals won't interrupt the sleep. To ensure we + * respond reasonably promptly when someone signals us, break down the + * sleep into NAPTIME_PER_CYCLE (ms) increments, and check for + * interrupts after each nap. + */ + remain = WalSndDelay; + while (remain > 0) + { + if (got_SIGHUP || shutdown_requested || ready_to_stop) + break; + + /* + * Check to see whether a message from the standby or an interrupt + * from other processes has arrived. + */ + pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); + CheckClosedConnection(); + + remain -= NAPTIME_PER_CYCLE; + } + + /* Attempt to send the log once every loop */ + if (!XLogSend(&output_message)) + goto eof; + } + + /* can't get here because the above loop never exits */ + return 1; + +eof: + /* + * Reset whereToSendOutput to prevent ereport from attempting + * to send any more messages to the standby. + */ + if (whereToSendOutput == DestRemote) + whereToSendOutput = DestNone; + + proc_exit(0); + return 1; /* keep the compiler quiet */ +} + +/* Initialize a per-walsender data structure for this walsender process */ +static void +InitWalSnd(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalSndCtlData *walsndctl = WalSndCtl; + int i; + + /* + * WalSndCtl should be set up already (we inherit this by fork() or + * EXEC_BACKEND mechanism from the postmaster). + */ + Assert(walsndctl != NULL); + Assert(MyWalSnd == NULL); + + /* + * Find a free walsender slot and reserve it. If this fails, we must be + * out of WalSnd structures. + */ + for (i = 0; i < MaxWalSenders; i++) + { + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockAcquire(&walsnd->mutex); + + if (walsnd->pid != 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + else + { + /* found */ + MyWalSnd = (WalSnd *) walsnd; + walsnd->pid = MyProcPid; + MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr)); + SpinLockRelease(&walsnd->mutex); + break; + } + } + if (MyWalSnd == NULL) + ereport(FATAL, + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("sorry, too many standbys already"))); + + /* Arrange to clean up at walsender exit */ + on_shmem_exit(WalSndKill, 0); +} + +/* Destroy the per-walsender data structure for this walsender process */ +static void +WalSndKill(int code, Datum arg) +{ + Assert(MyWalSnd != NULL); + + /* + * Mark WalSnd struct no longer in use. Assume that no lock is required + * for this. + */ + MyWalSnd->pid = 0; + + /* WalSnd struct isn't mine anymore */ + MyWalSnd = NULL; +} + +/* + * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + */ +void +XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) +{ + char path[MAXPGPATH]; + uint32 startoff; + + while (nbytes > 0) + { + int segbytes; + int readbytes; + + startoff = recptr.xrecoff % XLogSegSize; + + if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) + { + /* Switch to another logfile segment */ + if (sendFile >= 0) + close(sendFile); + + XLByteToSeg(recptr, sendId, sendSeg); + XLogFilePath(path, ThisTimeLineID, sendId, sendSeg); + + sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (sendFile < 0) + ereport(FATAL, /* XXX: Why FATAL? */ + (errcode_for_file_access(), + errmsg("could not open file \"%s\" (log file %u, segment %u): %m", + path, sendId, sendSeg))); + sendOff = 0; + } + + /* Need to seek in the file? */ + if (sendOff != startoff) + { + if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, segment %u to offset %u: %m", + sendId, sendSeg, startoff))); + sendOff = startoff; + } + + /* How many bytes are within this segment? */ + if (nbytes > (XLogSegSize - startoff)) + segbytes = XLogSegSize - startoff; + else + segbytes = nbytes; + + readbytes = read(sendFile, buf, segbytes); + if (readbytes <= 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not read from log file %u, segment %u, offset %u, " + "length %lu: %m", + sendId, sendSeg, sendOff, (unsigned long) segbytes))); + + /* Update state for read */ + XLByteAdvance(recptr, readbytes); + + sendOff += readbytes; + nbytes -= readbytes; + buf += readbytes; + } +} + +/* + * Read all WAL that's been written (and flushed) since last cycle, and send + * it to client. + * + * Returns true if OK, false if trouble. + */ +static bool +XLogSend(StringInfo outMsg) +{ + XLogRecPtr SendRqstPtr; + char activitymsg[50]; + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + /* + * Invalid position means that we have not yet received the initial + * CopyData message from the slave that indicates where to start the + * streaming. + */ + if (sentPtr.xlogid == 0 && + sentPtr.xrecoff == 0) + return true; + + /* Attempt to send all the records which were written to the disk */ + SendRqstPtr = GetWriteRecPtr(); + + /* Quick exit if nothing to do */ + if (!XLByteLT(sentPtr, SendRqstPtr)) + return true; + + /* + * Since successive pages in a segment are consecutively written, + * we can gather multiple records together by issuing just one + * read() call, and send them as one CopyData message at one time; + * nmsgs is the number of CopyData messages sent in this XLogSend; + * npages is the number of pages we have determined can be read and + * sent together; startpos is the starting position of reading and + * sending in the first page, startoff is the file offset at which + * it should go and endpos is the end position of reading and + * sending in the last page. We must initialize all of them to + * keep the compiler quiet. + */ + + while (XLByteLT(sentPtr, SendRqstPtr)) + { + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; + + /* + * Figure out how much to send in one message. If there's less than + * MAX_SEND_SIZE bytes to send, send everything. Otherwise send + * MAX_SEND_SIZE bytes, but round to page boundary for efficiency. + */ + startptr = sentPtr; + endptr = startptr; + XLByteAdvance(endptr, MAX_SEND_SIZE); + + /* + * Round down to page boundary. This is not only for performance + * reasons, walreceiver relies on the fact that we never split a WAL + * record across two messages. Since a long WAL record is split at + * page boundary into continuation records, page boundary is always + * safe cut-off point. We also assume that SendRqstPtr never points + * in the middle of a WAL record. + */ + endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + + if (XLByteLT(SendRqstPtr, endptr)) + endptr = SendRqstPtr; + + /* + * OK to read and send the log. + * + * We don't need to convert the xlogid/xrecoff from host byte order + * to network byte order because the both server can be expected to + * have the same byte order. If they have the different order, we + * don't reach here. + */ + pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); + + if (endptr.xlogid != startptr.xlogid) + { + Assert(endptr.xlogid == startptr.xlogid + 1); + nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; + } + else + nbytes = endptr.xrecoff - startptr.xrecoff; + + sentPtr = endptr; + + /* + * Read the log into the output buffer directly to prevent + * extra memcpy calls. + */ + enlargeStringInfo(outMsg, nbytes); + + XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); + outMsg->len += nbytes; + outMsg->data[outMsg->len] = '\0'; + + pq_putmessage('d', outMsg->data, outMsg->len); + resetStringInfo(outMsg); + } + + /* Update shared memory status */ + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + + /* Flush pending output */ + if (pq_flush()) + return false; + + /* Report progress of XLOG streaming in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + set_ps_display(activitymsg, false); + + return true; +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +WalSndSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} + +/* SIGTERM: set flag to shut down */ +static void +WalSndShutdownHandler(SIGNAL_ARGS) +{ + shutdown_requested = true; +} + +/* + * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. + * + * Some backend has bought the farm, + * so we need to stop what we're doing and exit. + */ +static void +WalSndQuickDieHandler(SIGNAL_ARGS) +{ + PG_SETMASK(&BlockSig); + + /* + * We DO NOT want to run proc_exit() callbacks -- we're here because + * shared memory may be corrupted, so we don't want to try to clean up our + * transaction. Just nail the windows shut and get out of town. Now that + * there's an atexit callback to prevent third-party code from breaking + * things by calling exit() directly, we have to reset the callbacks + * explicitly to make this work as intended. + */ + on_exit_reset(); + + /* + * Note we do exit(2) not exit(0). This is to force the postmaster into a + * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random + * backend. This is necessary precisely because we don't clean up our + * shared memory state. (The "dead man switch" mechanism in pmsignal.c + * should ensure the postmaster sees this as a crash, too, but no harm + * in being doubly sure.) + */ + exit(2); +} + +/* SIGUSR2: set flag to do a last cycle and shut down afterwards */ +static void +WalSndLastCycleHandler(SIGNAL_ARGS) +{ + ready_to_stop = true; +} + +/* Set up signal handlers */ +void +WalSndSignals(void) +{ + /* Set up signal handlers */ + pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config file */ + pqsignal(SIGINT, SIG_IGN); /* not used */ + pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */ + pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); /* not used */ + pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and shutdown */ + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); +} + +/* Report shared-memory space needed by WalSndShmemInit */ +Size +WalSndShmemSize(void) +{ + Size size = 0; + + size = offsetof(WalSndCtlData, walsnds); + size = add_size(size, mul_size(MaxWalSenders, sizeof(WalSnd))); + + return size; +} + +/* Allocate and initialize walsender-related shared memory */ +void +WalSndShmemInit(void) +{ + bool found; + int i; + + WalSndCtl = (WalSndCtlData *) + ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); + + if (WalSndCtl == NULL) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("not enough shared memory for walsender"))); + if (found) + return; /* already initialized */ + + /* Initialize the data structures */ + MemSet(WalSndCtl, 0, WalSndShmemSize()); + + for (i = 0; i < MaxWalSenders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockInit(&walsnd->mutex); + } +} + +/* + * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr + * if none. + */ +XLogRecPtr +GetOldestWALSendPointer(void) +{ + XLogRecPtr oldest = {0, 0}; + int i; + bool found = false; + + for (i = 0; i < MaxWalSenders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + XLogRecPtr recptr; + + if (walsnd->pid == 0) + continue; + + SpinLockAcquire(&walsnd->mutex); + recptr = walsnd->sentPtr; + SpinLockRelease(&walsnd->mutex); + + if (recptr.xlogid == 0 && recptr.xrecoff == 0) + continue; + + if (!found || XLByteLT(recptr, oldest)) + oldest = recptr; + found = true; + } + return oldest; +} |