aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/Makefile17
-rw-r--r--src/backend/replication/README58
-rw-r--r--src/backend/replication/walreceiver/Makefile32
-rw-r--r--src/backend/replication/walreceiver/walreceiver.c796
-rw-r--r--src/backend/replication/walreceiverfuncs.c262
-rw-r--r--src/backend/replication/walsender.c851
6 files changed, 2016 insertions, 0 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
new file mode 100644
index 00000000000..7903c1ac5e4
--- /dev/null
+++ b/src/backend/replication/Makefile
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/replication
+#
+# IDENTIFICATION
+# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = walsender.o walreceiverfuncs.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/README b/src/backend/replication/README
new file mode 100644
index 00000000000..0f40dc79e90
--- /dev/null
+++ b/src/backend/replication/README
@@ -0,0 +1,58 @@
+$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $
+
+Walreceiver IPC
+---------------
+
+When the WAL replay in startup process has reached the end of archived WAL,
+recoverable using recovery_command, it starts up the walreceiver process
+to fetch more WAL (if streaming replication is configured).
+
+Walreceiver is a postmaster subprocess, so the startup process can't fork it
+directly. Instead, it sends a signal to postmaster, asking postmaster to launch
+it. Before that, however, startup process fills in WalRcvData->conninfo,
+and initializes the starting point in WalRcvData->receivedUpTo.
+
+As walreceiver receives WAL from the master server, and writes and flushes
+it to disk (in pg_xlog), it updates WalRcvData->receivedUpTo. Startup process
+polls that to know how far it can proceed with WAL replay.
+
+Walsender IPC
+-------------
+
+At shutdown, postmaster handles walsender processes differently from regular
+backends. It waits for regular backends to die before writing the
+shutdown checkpoint and terminating pgarch and other auxiliary processes, but
+that's not desirable for walsenders, because we want the standby servers to
+receive all the WAL, including the shutdown checkpoint, before the master
+is shut down. Therefore postmaster treats walsenders like the pgarch process,
+and instructs them to terminate at PM_SHUTDOWN_2 phase, after all regular
+backends have died and bgwriter has written the shutdown checkpoint.
+
+When postmaster accepts a connection, it immediately forks a new process
+to handle the handshake and authentication, and the process initializes to
+become a backend. Postmaster doesn't know if the process becomes a regular
+backend or a walsender process at that time - that's indicated in the
+connection handshake - so we need some extra signaling to let postmaster
+identify walsender processes.
+
+When walsender process starts up, it marks itself as a walsender process in
+the PMSignal array. That way postmaster can tell it apart from regular
+backends.
+
+Note that no big harm is done if postmaster thinks that a walsender is a
+regular backend; it will just terminate the walsender earlier in the shutdown
+phase. A walsenders will look like a regular backends until it's done with the
+initialization and has marked itself in PMSignal array, and at process
+termination, after unmarking the PMSignal slot.
+
+Each walsender allocates an entry from the WalSndCtl array, and advertises
+there how far it has streamed WAL already. This is used at checkpoints, to
+avoid recycling WAL that hasn't been streamed to a slave yet. However,
+that doesn't stop such WAL from being recycled when the connection is not
+established.
+
+
+Walsender - walreceiver protocol
+--------------------------------
+
+See manual.
diff --git a/src/backend/replication/walreceiver/Makefile b/src/backend/replication/walreceiver/Makefile
new file mode 100644
index 00000000000..2b26a95b4de
--- /dev/null
+++ b/src/backend/replication/walreceiver/Makefile
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/replication/walreceiver
+#
+# IDENTIFICATION
+# $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/postmaster/walreceiver
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = walreceiver.o
+SHLIB_LINK = $(libpq)
+NAME = walreceiver
+
+all: submake-libpq all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean maintainer-clean: clean-lib
+ rm -f $(OBJS)
diff --git a/src/backend/replication/walreceiver/walreceiver.c b/src/backend/replication/walreceiver/walreceiver.c
new file mode 100644
index 00000000000..1d0d6404b90
--- /dev/null
+++ b/src/backend/replication/walreceiver/walreceiver.c
@@ -0,0 +1,796 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiver.c
+ *
+ * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
+ * is the process in the standby server that takes charge of receiving
+ * XLOG records from a primary server during streaming replication.
+ *
+ * When the startup process determines that it's time to start streaming,
+ * it instructs postmaster to start walreceiver. Walreceiver first connects
+ * connects to the primary server (it will be served by a walsender process
+ * in the primary server), and then keeps receiving XLOG records and
+ * writing them to the disk as long as the connection is alive. As XLOG
+ * records are received and flushed to disk, it updates the
+ * WalRcv->receivedUpTo variable in shared memory, to inform the startup
+ * process of how far it can proceed with XLOG replay.
+ *
+ * Normal termination is by SIGTERM, which instructs the walreceiver to
+ * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
+ * process, the walreceiver will simply abort and exit on SIGQUIT. A close
+ * of the connection and a FATAL error are treated not as a crash but as
+ * normal operation.
+ *
+ * Walreceiver is a postmaster child process like others, but it's compiled
+ * as a dynamic module to avoid linking libpq with the main server binary.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.1 2010/01/15 09:19:03 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "storage/ipc.h"
+#include "storage/pmsignal.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/resowner.h"
+
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#ifdef HAVE_SYS_POLL_H
+#include <sys/poll.h>
+#endif
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(WalReceiverMain);
+Datum WalReceiverMain(PG_FUNCTION_ARGS);
+
+/* streamConn is a PGconn object of a connection to walsender from walreceiver */
+static PGconn *streamConn = NULL;
+
+#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
+
+/*
+ * These variables are used similarly to openLogFile/Id/Seg/Off,
+ * but for walreceiver to write the XLOG.
+ */
+static int recvFile = -1;
+static uint32 recvId = 0;
+static uint32 recvSeg = 0;
+static uint32 recvOff = 0;
+
+/* Buffer for currently read records */
+static char *recvBuf = NULL;
+
+/* Flags set by interrupt handlers of walreceiver for later service in the main loop */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+
+static void ProcessWalRcvInterrupts(void);
+static void EnableImmediateExit(void);
+static void DisableImmediateExit(void);
+
+/*
+ * About SIGTERM handling:
+ *
+ * We can't just exit(1) within SIGTERM signal handler, because the signal
+ * might arrive in the middle of some critical operation, like while we're
+ * holding a spinlock. We also can't just set a flag in signal handler and
+ * check it in the main loop, because we perform some blocking libpq
+ * operations like PQexec(), which can take a long time to finish.
+ *
+ * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
+ * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
+ * sets got_SIGTERM flag, which is checked in the main loop when convenient.
+ *
+ * This is very much like what regular backends do with ImmediateInterruptOK,
+ * ProcessInterrupts() etc.
+ */
+static volatile bool WalRcvImmediateInterruptOK = false;
+
+static void
+ProcessWalRcvInterrupts(void)
+{
+ /*
+ * Although walreceiver interrupt handling doesn't use the same scheme
+ * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we
+ * receive any incoming signals on Win32.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ if (got_SIGTERM)
+ {
+ WalRcvImmediateInterruptOK = false;
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating walreceiver process due to administrator command")));
+ }
+}
+
+static void
+EnableImmediateExit()
+{
+ WalRcvImmediateInterruptOK = true;
+ ProcessWalRcvInterrupts();
+}
+
+static void
+DisableImmediateExit()
+{
+ WalRcvImmediateInterruptOK = false;
+ ProcessWalRcvInterrupts();
+}
+
+/* Signal handlers */
+static void WalRcvSigHupHandler(SIGNAL_ARGS);
+static void WalRcvShutdownHandler(SIGNAL_ARGS);
+static void WalRcvQuickDieHandler(SIGNAL_ARGS);
+
+/* Prototypes for private functions */
+static void WalRcvLoop(void);
+static void InitWalRcv(void);
+static void WalRcvConnect(void);
+static bool WalRcvWait(int timeout_ms);
+static void WalRcvKill(int code, Datum arg);
+static void XLogRecv(void);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+static void XLogWalRcvFlush(void);
+
+/*
+ * LogstreamResult indicates the byte positions that we have already
+ * written/fsynced.
+ */
+static struct
+{
+ XLogRecPtr Write; /* last byte + 1 written out in the standby */
+ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
+} LogstreamResult;
+
+/* Main entry point for walreceiver process */
+Datum
+WalReceiverMain(PG_FUNCTION_ARGS)
+{
+ sigjmp_buf local_sigjmp_buf;
+ MemoryContext walrcv_context;
+
+ /* Mark walreceiver in progress */
+ InitWalRcv();
+
+ /*
+ * If possible, make this process a group leader, so that the postmaster
+ * can signal any child processes too. (walreceiver probably never has
+ * any child processes, but for consistency we make all postmaster child
+ * processes do this.)
+ */
+#ifdef HAVE_SETSID
+ if (setsid() < 0)
+ elog(FATAL, "setsid() failed: %m");
+#endif
+
+ /* Properly accept or ignore signals the postmaster might send us */
+ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
+ pqsignal(SIGINT, SIG_IGN);
+ pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
+ pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
+ pqsignal(SIGALRM, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, SIG_IGN);
+ pqsignal(SIGUSR2, SIG_IGN);
+
+ /* Reset some signals that are accepted by postmaster but not here */
+ pqsignal(SIGCHLD, SIG_DFL);
+ pqsignal(SIGTTIN, SIG_DFL);
+ pqsignal(SIGTTOU, SIG_DFL);
+ pqsignal(SIGCONT, SIG_DFL);
+ pqsignal(SIGWINCH, SIG_DFL);
+
+ /* We allow SIGQUIT (quickdie) at all times */
+ sigdelset(&BlockSig, SIGQUIT);
+
+ /*
+ * Create a resource owner to keep track of our resources (not clear that
+ * we need this, but may as well have one).
+ */
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
+
+ /*
+ * Create a memory context that we will do all our work in. We do this so
+ * that we can reset the context during error recovery and thereby avoid
+ * possible memory leaks.
+ */
+ walrcv_context = AllocSetContextCreate(TopMemoryContext,
+ "Wal Receiver",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(walrcv_context);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * This code is heavily based on bgwriter.c, q.v.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Reset WalRcvImmediateInterruptOK */
+ DisableImmediateExit();
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /* Free the data structure related to a connection */
+ PQfinish(streamConn);
+ streamConn = NULL;
+ if (recvBuf != NULL)
+ PQfreemem(recvBuf);
+ recvBuf = NULL;
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(walrcv_context);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(walrcv_context);
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /*
+ * Sleep at least 1 second after any error. A write error is likely
+ * to be repeated, and we don't want to be filling the error logs as
+ * fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /* Unblock signals (they were blocked when the postmaster forked us) */
+ PG_SETMASK(&UnBlockSig);
+
+ /* Establish the connection to the primary for XLOG streaming */
+ WalRcvConnect();
+
+ /* Main loop of walreceiver */
+ WalRcvLoop();
+
+ PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */
+}
+
+/* Main loop of walreceiver process */
+static void
+WalRcvLoop(void)
+{
+ /* Loop until end-of-streaming or error */
+ for (;;)
+ {
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+
+ /*
+ * Exit walreceiver if we're not in recovery. This should not happen,
+ * but cross-check the status here.
+ */
+ if (!RecoveryInProgress())
+ ereport(FATAL,
+ (errmsg("cannot continue XLOG streaming, recovery has already ended")));
+
+ /* Process any requests or signals received recently */
+ ProcessWalRcvInterrupts();
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Wait a while for data to arrive */
+ if (WalRcvWait(NAPTIME_PER_CYCLE))
+ {
+ /* data has arrived. Process it */
+ if (PQconsumeInput(streamConn) == 0)
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+ XLogRecv();
+ }
+ }
+}
+
+/* Advertise our pid in shared memory, so that startup process can kill us. */
+static void
+InitWalRcv(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ /*
+ * WalRcv should be set up already (if we are a backend, we inherit
+ * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ */
+ if (walrcv == NULL)
+ elog(PANIC, "walreceiver control data uninitialized");
+
+ /* If we've already been requested to stop, don't start up */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->pid == 0);
+ if (walrcv->walRcvState == WALRCV_STOPPED ||
+ walrcv->walRcvState == WALRCV_STOPPING)
+ {
+ walrcv->walRcvState = WALRCV_STOPPED;
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ }
+ walrcv->pid = MyProcPid;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* Arrange to clean up at walreceiver exit */
+ on_shmem_exit(WalRcvKill, 0);
+}
+
+/*
+ * Establish the connection to the primary server for XLOG streaming
+ */
+static void
+WalRcvConnect(void)
+{
+ char conninfo[MAXCONNINFO + 14];
+ char *primary_sysid;
+ char standby_sysid[32];
+ TimeLineID primary_tli;
+ TimeLineID standby_tli;
+ PGresult *res;
+ XLogRecPtr recptr;
+ char cmd[64];
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ /*
+ * Set up a connection for XLOG streaming
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo);
+ recptr = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* initialize local XLOG pointers */
+ LogstreamResult.Write = LogstreamResult.Flush = recptr;
+
+ Assert(recptr.xlogid != 0 || recptr.xrecoff != 0);
+
+ EnableImmediateExit();
+ streamConn = PQconnectdb(conninfo);
+ DisableImmediateExit();
+ if (PQstatus(streamConn) != CONNECTION_OK)
+ ereport(ERROR,
+ (errmsg("could not connect to the primary server : %s",
+ PQerrorMessage(streamConn))));
+
+ /*
+ * Get the system identifier and timeline ID as a DataRow message
+ * from the primary server.
+ */
+ EnableImmediateExit();
+ res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+ DisableImmediateExit();
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive the SYSID and timeline ID from "
+ "the primary server: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (PQnfields(res) != 2 || PQntuples(res) != 1)
+ {
+ int ntuples = PQntuples(res);
+ int nfields = PQnfields(res);
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
+ ntuples, nfields)));
+ }
+ primary_sysid = PQgetvalue(res, 0, 0);
+ primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+
+ /*
+ * Confirm that the system identifier of the primary is the same
+ * as ours.
+ */
+ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ if (strcmp(primary_sysid, standby_sysid) != 0)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("system differs between the primary and standby"),
+ errdetail("the primary SYSID is %s, standby SYSID is %s",
+ primary_sysid, standby_sysid)));
+ }
+
+ /*
+ * Confirm that the current timeline of the primary is the same
+ * as the recovery target timeline.
+ */
+ standby_tli = GetRecoveryTargetTLI();
+ PQclear(res);
+ if (primary_tli != standby_tli)
+ ereport(ERROR,
+ (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+ primary_tli, standby_tli)));
+ ThisTimeLineID = primary_tli;
+
+ /* Start streaming from the point requested by startup process */
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff);
+ EnableImmediateExit();
+ res = PQexec(streamConn, cmd);
+ DisableImmediateExit();
+ if (PQresultStatus(res) != PGRES_COPY_OUT)
+ ereport(ERROR,
+ (errmsg("could not start XLOG streaming: %s",
+ PQerrorMessage(streamConn))));
+ PQclear(res);
+
+ /*
+ * Process the outstanding messages before beginning to wait for
+ * new message to arrive.
+ */
+ XLogRecv();
+}
+
+/*
+ * Wait until we can read WAL stream, or timeout.
+ *
+ * Returns true if data has become available for reading, false if timed out
+ * or interrupted by signal.
+ *
+ * This is based on pqSocketCheck.
+ */
+static bool
+WalRcvWait(int timeout_ms)
+{
+ int ret;
+
+ Assert(streamConn != NULL);
+ if (PQsocket(streamConn) < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+
+ /* We use poll(2) if available, otherwise select(2) */
+ {
+#ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = PQsocket(streamConn);
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ ret = poll(&input_fd, 1, timeout_ms);
+#else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(streamConn), &input_mask);
+
+ if (timeout_ms < 0)
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
+ }
+
+ ret = select(PQsocket(streamConn) + 1, &input_mask,
+ NULL, NULL, ptr_timeout);
+#endif /* HAVE_POLL */
+ }
+
+ if (ret == 0 || (ret < 0 && errno == EINTR))
+ return false;
+ if (ret < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ return true;
+}
+
+/*
+ * Clear our pid from shared memory at exit.
+ */
+static void
+WalRcvKill(int code, Datum arg)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ bool stopped = false;
+
+ SpinLockAcquire(&walrcv->mutex);
+ if (walrcv->walRcvState == WALRCV_STOPPING ||
+ walrcv->walRcvState == WALRCV_STOPPED)
+ {
+ walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
+ elog(LOG, "walreceiver stopped");
+ }
+ walrcv->pid = 0;
+ SpinLockRelease(&walrcv->mutex);
+
+ PQfinish(streamConn);
+
+ /* If requested to stop, tell postmaster to not restart us. */
+ if (stopped)
+ SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
+}
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+WalRcvSigHupHandler(SIGNAL_ARGS)
+{
+ got_SIGHUP = true;
+}
+
+/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
+static void
+WalRcvShutdownHandler(SIGNAL_ARGS)
+{
+ got_SIGTERM = true;
+
+ /* Don't joggle the elbow of proc_exit */
+ if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
+ ProcessWalRcvInterrupts();
+}
+
+/*
+ * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm, so we need to stop what we're doing and
+ * exit.
+ */
+static void
+WalRcvQuickDieHandler(SIGNAL_ARGS)
+{
+ PG_SETMASK(&BlockSig);
+
+ /*
+ * We DO NOT want to run proc_exit() callbacks -- we're here because
+ * shared memory may be corrupted, so we don't want to try to clean up our
+ * transaction. Just nail the windows shut and get out of town. Now that
+ * there's an atexit callback to prevent third-party code from breaking
+ * things by calling exit() directly, we have to reset the callbacks
+ * explicitly to make this work as intended.
+ */
+ on_exit_reset();
+
+ /*
+ * Note we do exit(2) not exit(0). This is to force the postmaster into a
+ * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+ * backend. This is necessary precisely because we don't clean up our
+ * shared memory state. (The "dead man switch" mechanism in pmsignal.c
+ * should ensure the postmaster sees this as a crash, too, but no harm
+ * in being doubly sure.)
+ */
+ exit(2);
+}
+
+/*
+ * Receive any WAL records available without blocking from XLOG stream and
+ * write it to the disk.
+ */
+static void
+XLogRecv(void)
+{
+ XLogRecPtr *recptr;
+ int len;
+
+ for (;;)
+ {
+ /* Receive CopyData message */
+ len = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (len == 0) /* no records available yet, then return */
+ break;
+ if (len == -1) /* end-of-streaming or error */
+ {
+ PGresult *res;
+
+ res = PQgetResult(streamConn);
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("replication terminated by primary server")));
+ }
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (len < -1)
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+
+ if (len < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ /* Write received WAL records to disk */
+ recptr = (XLogRecPtr *) recvBuf;
+ XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr),
+ len - sizeof(XLogRecPtr), *recptr);
+
+ if (recvBuf != NULL)
+ PQfreemem(recvBuf);
+ recvBuf = NULL;
+ }
+
+ /*
+ * Now that we've written some records, flush them to disk and let the
+ * startup process know about them.
+ */
+ XLogWalRcvFlush();
+}
+
+/*
+ * Write XLOG data to disk.
+ */
+static void
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+{
+ int startoff;
+ int byteswritten;
+
+ while (nbytes > 0)
+ {
+ int segbytes;
+
+ if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
+ {
+ bool use_existent;
+
+ /*
+ * XLOG segment files will be re-read in recovery operation soon,
+ * so we don't need to advise the OS to release any cache page.
+ */
+ if (recvFile >= 0)
+ {
+ /*
+ * fsync() before we switch to next file. We would otherwise
+ * have to reopen this file to fsync it later
+ */
+ XLogWalRcvFlush();
+ if (close(recvFile) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close log file %u, segment %u: %m",
+ recvId, recvSeg)));
+ }
+ recvFile = -1;
+
+ /* Create/use new log file */
+ XLByteToSeg(recptr, recvId, recvSeg);
+ use_existent = true;
+ recvFile = XLogFileInit(recvId, recvSeg,
+ &use_existent, true);
+ recvOff = 0;
+ }
+
+ /* Calculate the start offset of the received logs */
+ startoff = recptr.xrecoff % XLogSegSize;
+
+ if (startoff + nbytes > XLogSegSize)
+ segbytes = XLogSegSize - startoff;
+ else
+ segbytes = nbytes;
+
+ /* Need to seek in the file? */
+ if (recvOff != startoff)
+ {
+ if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log file %u, "
+ "segment %u to offset %u: %m",
+ recvId, recvSeg, startoff)));
+ recvOff = startoff;
+ }
+
+ /* OK to write the logs */
+ errno = 0;
+
+ byteswritten = write(recvFile, buf, segbytes);
+ if (byteswritten <= 0)
+ {
+ /* if write didn't set errno, assume no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to log file %u, segment %u "
+ "at offset %u, length %lu: %m",
+ recvId, recvSeg,
+ recvOff, (unsigned long) segbytes)));
+ }
+
+ /* Update state for write */
+ XLByteAdvance(recptr, byteswritten);
+
+ recvOff += byteswritten;
+ nbytes -= byteswritten;
+ buf += byteswritten;
+
+ LogstreamResult.Write = recptr;
+
+ /*
+ * XXX: Should we signal bgwriter to start a restartpoint
+ * if we've consumed too much xlog since the last one, like
+ * in normal processing? But this is not worth doing unless
+ * a restartpoint can be created independently from a
+ * checkpoint record.
+ */
+ }
+}
+
+/* Flush the log to disk */
+static void
+XLogWalRcvFlush(void)
+{
+ if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ char activitymsg[50];
+
+ issue_xlog_fsync(recvFile, recvId, recvSeg);
+
+ LogstreamResult.Flush = LogstreamResult.Write;
+
+ /* Update shared-memory status */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->receivedUpto = LogstreamResult.Flush;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* Report XLOG streaming progress in PS display */
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+ set_ps_display(activitymsg, false);
+ }
+}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
new file mode 100644
index 00000000000..4342e252d65
--- /dev/null
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -0,0 +1,262 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiverfuncs.c
+ *
+ * This file contains functions used by the startup process to communicate
+ * with the walreceiver process. Functions implementing walreceiver itself
+ * are in src/backend/replication/walreceiver subdirectory.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include "access/xlog_internal.h"
+#include "replication/walreceiver.h"
+#include "storage/fd.h"
+#include "storage/pmsignal.h"
+#include "storage/shmem.h"
+#include "utils/guc.h"
+
+WalRcvData *WalRcv = NULL;
+
+static bool CheckForStandbyTrigger(void);
+static void ShutdownWalRcv(void);
+
+/* Report shared memory space needed by WalRcvShmemInit */
+Size
+WalRcvShmemSize(void)
+{
+ Size size = 0;
+
+ size = add_size(size, sizeof(WalRcvData));
+
+ return size;
+}
+
+/* Allocate and initialize walreceiver-related shared memory */
+void
+WalRcvShmemInit(void)
+{
+ bool found;
+
+ WalRcv = (WalRcvData *)
+ ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
+
+ if (WalRcv == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for walreceiver")));
+ if (found)
+ return; /* already initialized */
+
+ /* Initialize the data structures */
+ MemSet(WalRcv, 0, WalRcvShmemSize());
+ WalRcv->walRcvState = WALRCV_NOT_STARTED;
+ SpinLockInit(&WalRcv->mutex);
+}
+
+/* Is walreceiver in progress (or starting up)? */
+bool
+WalRcvInProgress(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ WalRcvState state;
+
+ SpinLockAcquire(&walrcv->mutex);
+ state = walrcv->walRcvState;
+ SpinLockRelease(&walrcv->mutex);
+
+ if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
+ return true;
+ else
+ return false;
+}
+
+/*
+ * Wait for the XLOG record at given position to become available.
+ *
+ * 'recptr' indicates the byte position which caller wants to read the
+ * XLOG record up to. The byte position actually written and flushed
+ * by walreceiver is returned. It can be higher than the requested
+ * location, and the caller can safely read up to that point without
+ * calling WaitNextXLogAvailable() again.
+ *
+ * If WAL streaming is ended (because a trigger file is found), *finished
+ * is set to true and function returns immediately. The returned position
+ * can be lower than requested in that case.
+ *
+ * Called by the startup process during streaming recovery.
+ */
+XLogRecPtr
+WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
+{
+ static XLogRecPtr receivedUpto = {0, 0};
+
+ *finished = false;
+
+ /* Quick exit if already known available */
+ if (XLByteLT(recptr, receivedUpto))
+ return receivedUpto;
+
+ for (;;)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ /* Update local status */
+ SpinLockAcquire(&walrcv->mutex);
+ receivedUpto = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ /* If available already, leave here */
+ if (XLByteLT(recptr, receivedUpto))
+ return receivedUpto;
+
+ /* Check to see if the trigger file exists */
+ if (CheckForStandbyTrigger())
+ {
+ *finished = true;
+ return receivedUpto;
+ }
+
+ pg_usleep(100000L); /* 100ms */
+
+ /*
+ * This possibly-long loop needs to handle interrupts of startup
+ * process.
+ */
+ HandleStartupProcInterrupts();
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+ }
+}
+
+/*
+ * Stop walreceiver and wait for it to die.
+ */
+static void
+ShutdownWalRcv(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ pid_t walrcvpid;
+
+ /*
+ * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
+ * mode once it's finished, and will also request postmaster to not
+ * restart itself.
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->walRcvState == WALRCV_RUNNING);
+ walrcv->walRcvState = WALRCV_STOPPING;
+ walrcvpid = walrcv->pid;
+ SpinLockRelease(&walrcv->mutex);
+
+ /*
+ * Pid can be 0, if no walreceiver process is active right now.
+ * Postmaster should restart it, and when it does, it will see the
+ * STOPPING state.
+ */
+ if (walrcvpid != 0)
+ kill(walrcvpid, SIGTERM);
+
+ /*
+ * Wait for walreceiver to acknowledge its death by setting state to
+ * WALRCV_STOPPED.
+ */
+ while (WalRcvInProgress())
+ {
+ /*
+ * This possibly-long loop needs to handle interrupts of startup
+ * process.
+ */
+ HandleStartupProcInterrupts();
+
+ pg_usleep(100000); /* 100ms */
+ }
+}
+
+/*
+ * Check to see if the trigger file exists. If it does, request postmaster
+ * to shut down walreceiver and wait for it to exit, and remove the trigger
+ * file.
+ */
+static bool
+CheckForStandbyTrigger(void)
+{
+ struct stat stat_buf;
+
+ if (TriggerFile == NULL)
+ return false;
+
+ if (stat(TriggerFile, &stat_buf) == 0)
+ {
+ ereport(LOG,
+ (errmsg("trigger file found: %s", TriggerFile)));
+ ShutdownWalRcv();
+ unlink(TriggerFile);
+ return true;
+ }
+ return false;
+}
+
+/*
+ * Request postmaster to start walreceiver.
+ *
+ * recptr indicates the position where streaming should begin, and conninfo
+ * is a libpq connection string to use.
+ */
+void
+RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+
+ /* locking is just pro forma here; walreceiver isn't started yet */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->receivedUpto = recptr;
+ if (conninfo != NULL)
+ strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
+ else
+ walrcv->conninfo[0] = '\0';
+ walrcv->walRcvState = WALRCV_RUNNING;
+ SpinLockRelease(&walrcv->mutex);
+
+ SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+}
+
+/*
+ * Returns the byte position that walreceiver has written
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&walrcv->mutex);
+ recptr = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ return recptr;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
new file mode 100644
index 00000000000..8a3352e0755
--- /dev/null
+++ b/src/backend/replication/walsender.c
@@ -0,0 +1,851 @@
+/*-------------------------------------------------------------------------
+ *
+ * walsender.c
+ *
+ * The WAL sender process (walsender) is new as of Postgres 8.5. It takes
+ * charge of XLOG streaming sender in the primary server. At first, it is
+ * started by the postmaster when the walreceiver in the standby server
+ * connects to the primary server and requests XLOG streaming replication,
+ * i.e., unlike any auxiliary process, it is not an always-running process.
+ * It attempts to keep reading XLOG records from the disk and sending them
+ * to the standby server, as long as the connection is alive (i.e., like
+ * any backend, there is an one to one relationship between a connection
+ * and a walsender process).
+ *
+ * Normal termination is by SIGTERM, which instructs the walsender to
+ * close the connection and exit(0) at next convenient moment. Emergency
+ * termination is by SIGQUIT; like any backend, the walsender will simply
+ * abort and exit on SIGQUIT. A close of the connection and a FATAL error
+ * are treated as not a crash but approximately normal termination;
+ * the walsender will exit quickly without sending any more XLOG records.
+ *
+ * If the server is shut down, postmaster sends us SIGUSR2 after all
+ * regular backends have exited and the shutdown checkpoint has been written.
+ * This instruct walsender to send any outstanding WAL, including the
+ * shutdown checkpoint record, and then exit.
+ *
+ * Note that there can be more than one walsender process concurrently.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.1 2010/01/15 09:19:03 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "catalog/pg_type.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "replication/walsender.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lock.h"
+#include "storage/pmsignal.h"
+#include "tcop/tcopprot.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+
+/* Array of WalSnds in shared memory */
+WalSndCtlData *WalSndCtl = NULL;
+
+/* My slot in the shared memory array */
+static WalSnd *MyWalSnd = NULL;
+
+/* Global state */
+bool am_walsender = false; /* Am I a walsender process ? */
+
+/* User-settable parameters for walsender */
+int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */
+int WalSndDelay = 200; /* max sleep time between some actions */
+
+#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
+
+/*
+ * These variables are used similarly to openLogFile/Id/Seg/Off,
+ * but for walsender to read the XLOG.
+ */
+static int sendFile = -1;
+static uint32 sendId = 0;
+static uint32 sendSeg = 0;
+static uint32 sendOff = 0;
+
+/*
+ * How far have we sent WAL already? This is also advertised in
+ * MyWalSnd->sentPtr.
+ */
+static XLogRecPtr sentPtr = {0, 0};
+
+/* Flags set by signal handlers for later service in main loop */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t shutdown_requested = false;
+static volatile sig_atomic_t ready_to_stop = false;
+
+/* Signal handlers */
+static void WalSndSigHupHandler(SIGNAL_ARGS);
+static void WalSndShutdownHandler(SIGNAL_ARGS);
+static void WalSndQuickDieHandler(SIGNAL_ARGS);
+
+/* Prototypes for private functions */
+static int WalSndLoop(void);
+static void InitWalSnd(void);
+static void WalSndHandshake(void);
+static void WalSndKill(int code, Datum arg);
+static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
+static bool XLogSend(StringInfo outMsg);
+static void CheckClosedConnection(void);
+
+/*
+ * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+ */
+#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+
+/* Main entry point for walsender process */
+int
+WalSenderMain(void)
+{
+ MemoryContext walsnd_context;
+
+ if (!superuser())
+ ereport(FATAL,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to start walsender")));
+
+ /* Create a per-walsender data structure in shared memory */
+ InitWalSnd();
+
+ /*
+ * Create a memory context that we will do all our work in. We do this so
+ * that we can reset the context during error recovery and thereby avoid
+ * possible memory leaks. Formerly this code just ran in
+ * TopMemoryContext, but resetting that would be a really bad idea.
+ *
+ * XXX: we don't actually attempt error recovery in walsender, we just
+ * close the connection and exit.
+ */
+ walsnd_context = AllocSetContextCreate(TopMemoryContext,
+ "Wal Sender",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(walsnd_context);
+
+ /* Unblock signals (they were blocked when the postmaster forked us) */
+ PG_SETMASK(&UnBlockSig);
+
+ /* Tell the standby that walsender is ready for receiving commands */
+ ReadyForQuery(DestRemote);
+
+ /* Handle handshake messages before streaming */
+ WalSndHandshake();
+
+ /* Main loop of walsender */
+ return WalSndLoop();
+}
+
+static void
+WalSndHandshake(void)
+{
+ StringInfoData input_message;
+ bool replication_started = false;
+
+ initStringInfo(&input_message);
+
+ while (!replication_started)
+ {
+ int firstchar;
+
+ /* Wait for a command to arrive */
+ firstchar = pq_getbyte();
+
+ /*
+ * Check for any other interesting events that happened while we
+ * slept.
+ */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (firstchar == EOF)
+ {
+ /* standby disconnected */
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ }
+ else
+ {
+ /*
+ * Read the message contents. This is expected to be done without
+ * blocking because we've been able to get message type code.
+ */
+ if (pq_getmessage(&input_message, 0))
+ firstchar = EOF; /* suitable message already logged */
+ }
+
+
+ /* Handle the very limited subset of commands expected in this phase */
+
+ switch (firstchar)
+ {
+ case 'Q': /* Query message */
+ {
+ const char *query_string;
+ XLogRecPtr recptr;
+
+ query_string = pq_getmsgstring(&input_message);
+ pq_getmsgend(&input_message);
+
+ if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
+ {
+ StringInfoData buf;
+ char sysid[32];
+ char tli[11];
+
+ /*
+ * Reply with a result set with one row, two columns.
+ * First col is system ID, and second if timeline ID
+ */
+
+ snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
+
+ /* Send a RowDescription message */
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 2, 2); /* 2 fields */
+
+ /* first field */
+ pq_sendstring(&buf, "systemid"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field */
+ pq_sendstring(&buf, "timeline"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, INT4OID, 4); /* type oid */
+ pq_sendint(&buf, 4, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 2, 2); /* # of columns */
+ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
+ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
+ pq_sendint(&buf, strlen(tli), 4); /* col2 len */
+ pq_sendbytes(&buf, (char *) tli, strlen(tli));
+ pq_endmessage(&buf);
+
+ /* Send CommandComplete and ReadyForQuery messages */
+ EndCommand("SELECT", DestRemote);
+ ReadyForQuery(DestRemote);
+ }
+ else if (sscanf(query_string, "START_REPLICATION %X/%X",
+ &recptr.xlogid, &recptr.xrecoff) == 2)
+ {
+ StringInfoData buf;
+
+ /* Send a CopyOutResponse message, and start streaming */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+
+ /*
+ * Initialize position to the received one, then
+ * the xlog records begin to be shipped from that position
+ */
+ sentPtr = recptr;
+
+ /* break out of the loop */
+ replication_started = true;
+ }
+ else
+ {
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby query string: %s", query_string)));
+ }
+ break;
+ }
+
+ /* 'X' means that the standby is closing the connection */
+ case 'X':
+ proc_exit(0);
+
+ case EOF:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby handshake message type %d", firstchar)));
+ }
+ }
+}
+
+/*
+ * Check if the remote end has closed the connection.
+ */
+static void
+CheckClosedConnection(void)
+{
+ unsigned char firstchar;
+ int r;
+
+ r = pq_getbyte_if_available(&firstchar);
+ if (r < 0)
+ {
+ /* no data available */
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return;
+
+ /*
+ * Ok if interrupted, though it shouldn't really happen with
+ * a non-blocking operation.
+ */
+ if (errno == EINTR)
+ return;
+
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not receive data from client: %m")));
+ }
+ if (r == 0)
+ {
+ /* standby disconnected unexpectedly */
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ }
+
+ /* Handle the very limited subset of commands expected in this phase */
+ switch (firstchar)
+ {
+ /*
+ * 'X' means that the standby is closing down the socket. EOF means
+ * unexpected loss of standby connection. Either way, perform normal
+ * shutdown.
+ */
+ case 'X':
+ proc_exit(0);
+
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby closing message type %d",
+ firstchar)));
+ }
+}
+
+/* Main loop of walsender process */
+static int
+WalSndLoop(void)
+{
+ StringInfoData output_message;
+
+ initStringInfo(&output_message);
+
+ /* Loop forever */
+ for (;;)
+ {
+ int remain; /* remaining time (ms) */
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+ /* Process any requests or signals received recently */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /*
+ * When SIGUSR2 arrives, we send all outstanding logs up to the
+ * shutdown checkpoint record (i.e., the latest record) and exit.
+ */
+ if (ready_to_stop)
+ {
+ XLogSend(&output_message);
+ shutdown_requested = true;
+ }
+
+ /* Normal exit from the walsender is here */
+ if (shutdown_requested)
+ {
+ /* Inform the standby that XLOG streaming was done */
+ pq_puttextmessage('C', "COPY 0");
+ pq_flush();
+
+ proc_exit(0);
+ }
+
+ /*
+ * Nap for the configured time or until a message arrives.
+ *
+ * On some platforms, signals won't interrupt the sleep. To ensure we
+ * respond reasonably promptly when someone signals us, break down the
+ * sleep into NAPTIME_PER_CYCLE (ms) increments, and check for
+ * interrupts after each nap.
+ */
+ remain = WalSndDelay;
+ while (remain > 0)
+ {
+ if (got_SIGHUP || shutdown_requested || ready_to_stop)
+ break;
+
+ /*
+ * Check to see whether a message from the standby or an interrupt
+ * from other processes has arrived.
+ */
+ pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
+ CheckClosedConnection();
+
+ remain -= NAPTIME_PER_CYCLE;
+ }
+
+ /* Attempt to send the log once every loop */
+ if (!XLogSend(&output_message))
+ goto eof;
+ }
+
+ /* can't get here because the above loop never exits */
+ return 1;
+
+eof:
+ /*
+ * Reset whereToSendOutput to prevent ereport from attempting
+ * to send any more messages to the standby.
+ */
+ if (whereToSendOutput == DestRemote)
+ whereToSendOutput = DestNone;
+
+ proc_exit(0);
+ return 1; /* keep the compiler quiet */
+}
+
+/* Initialize a per-walsender data structure for this walsender process */
+static void
+InitWalSnd(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ int i;
+
+ /*
+ * WalSndCtl should be set up already (we inherit this by fork() or
+ * EXEC_BACKEND mechanism from the postmaster).
+ */
+ Assert(walsndctl != NULL);
+ Assert(MyWalSnd == NULL);
+
+ /*
+ * Find a free walsender slot and reserve it. If this fails, we must be
+ * out of WalSnd structures.
+ */
+ for (i = 0; i < MaxWalSenders; i++)
+ {
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ SpinLockAcquire(&walsnd->mutex);
+
+ if (walsnd->pid != 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
+ continue;
+ }
+ else
+ {
+ /* found */
+ MyWalSnd = (WalSnd *) walsnd;
+ walsnd->pid = MyProcPid;
+ MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ SpinLockRelease(&walsnd->mutex);
+ break;
+ }
+ }
+ if (MyWalSnd == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
+ errmsg("sorry, too many standbys already")));
+
+ /* Arrange to clean up at walsender exit */
+ on_shmem_exit(WalSndKill, 0);
+}
+
+/* Destroy the per-walsender data structure for this walsender process */
+static void
+WalSndKill(int code, Datum arg)
+{
+ Assert(MyWalSnd != NULL);
+
+ /*
+ * Mark WalSnd struct no longer in use. Assume that no lock is required
+ * for this.
+ */
+ MyWalSnd->pid = 0;
+
+ /* WalSnd struct isn't mine anymore */
+ MyWalSnd = NULL;
+}
+
+/*
+ * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ */
+void
+XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+{
+ char path[MAXPGPATH];
+ uint32 startoff;
+
+ while (nbytes > 0)
+ {
+ int segbytes;
+ int readbytes;
+
+ startoff = recptr.xrecoff % XLogSegSize;
+
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+ {
+ /* Switch to another logfile segment */
+ if (sendFile >= 0)
+ close(sendFile);
+
+ XLByteToSeg(recptr, sendId, sendSeg);
+ XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+
+ sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+ if (sendFile < 0)
+ ereport(FATAL, /* XXX: Why FATAL? */
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
+ path, sendId, sendSeg)));
+ sendOff = 0;
+ }
+
+ /* Need to seek in the file? */
+ if (sendOff != startoff)
+ {
+ if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+ sendId, sendSeg, startoff)));
+ sendOff = startoff;
+ }
+
+ /* How many bytes are within this segment? */
+ if (nbytes > (XLogSegSize - startoff))
+ segbytes = XLogSegSize - startoff;
+ else
+ segbytes = nbytes;
+
+ readbytes = read(sendFile, buf, segbytes);
+ if (readbytes <= 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not read from log file %u, segment %u, offset %u, "
+ "length %lu: %m",
+ sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+
+ /* Update state for read */
+ XLByteAdvance(recptr, readbytes);
+
+ sendOff += readbytes;
+ nbytes -= readbytes;
+ buf += readbytes;
+ }
+}
+
+/*
+ * Read all WAL that's been written (and flushed) since last cycle, and send
+ * it to client.
+ *
+ * Returns true if OK, false if trouble.
+ */
+static bool
+XLogSend(StringInfo outMsg)
+{
+ XLogRecPtr SendRqstPtr;
+ char activitymsg[50];
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ /*
+ * Invalid position means that we have not yet received the initial
+ * CopyData message from the slave that indicates where to start the
+ * streaming.
+ */
+ if (sentPtr.xlogid == 0 &&
+ sentPtr.xrecoff == 0)
+ return true;
+
+ /* Attempt to send all the records which were written to the disk */
+ SendRqstPtr = GetWriteRecPtr();
+
+ /* Quick exit if nothing to do */
+ if (!XLByteLT(sentPtr, SendRqstPtr))
+ return true;
+
+ /*
+ * Since successive pages in a segment are consecutively written,
+ * we can gather multiple records together by issuing just one
+ * read() call, and send them as one CopyData message at one time;
+ * nmsgs is the number of CopyData messages sent in this XLogSend;
+ * npages is the number of pages we have determined can be read and
+ * sent together; startpos is the starting position of reading and
+ * sending in the first page, startoff is the file offset at which
+ * it should go and endpos is the end position of reading and
+ * sending in the last page. We must initialize all of them to
+ * keep the compiler quiet.
+ */
+
+ while (XLByteLT(sentPtr, SendRqstPtr))
+ {
+ XLogRecPtr startptr;
+ XLogRecPtr endptr;
+ Size nbytes;
+
+ /*
+ * Figure out how much to send in one message. If there's less than
+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+ * MAX_SEND_SIZE bytes, but round to page boundary for efficiency.
+ */
+ startptr = sentPtr;
+ endptr = startptr;
+ XLByteAdvance(endptr, MAX_SEND_SIZE);
+
+ /*
+ * Round down to page boundary. This is not only for performance
+ * reasons, walreceiver relies on the fact that we never split a WAL
+ * record across two messages. Since a long WAL record is split at
+ * page boundary into continuation records, page boundary is always
+ * safe cut-off point. We also assume that SendRqstPtr never points
+ * in the middle of a WAL record.
+ */
+ endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+
+ if (XLByteLT(SendRqstPtr, endptr))
+ endptr = SendRqstPtr;
+
+ /*
+ * OK to read and send the log.
+ *
+ * We don't need to convert the xlogid/xrecoff from host byte order
+ * to network byte order because the both server can be expected to
+ * have the same byte order. If they have the different order, we
+ * don't reach here.
+ */
+ pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
+
+ if (endptr.xlogid != startptr.xlogid)
+ {
+ Assert(endptr.xlogid == startptr.xlogid + 1);
+ nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
+ }
+ else
+ nbytes = endptr.xrecoff - startptr.xrecoff;
+
+ sentPtr = endptr;
+
+ /*
+ * Read the log into the output buffer directly to prevent
+ * extra memcpy calls.
+ */
+ enlargeStringInfo(outMsg, nbytes);
+
+ XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
+ outMsg->len += nbytes;
+ outMsg->data[outMsg->len] = '\0';
+
+ pq_putmessage('d', outMsg->data, outMsg->len);
+ resetStringInfo(outMsg);
+ }
+
+ /* Update shared memory status */
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+
+ /* Flush pending output */
+ if (pq_flush())
+ return false;
+
+ /* Report progress of XLOG streaming in PS display */
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+ set_ps_display(activitymsg, false);
+
+ return true;
+}
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+WalSndSigHupHandler(SIGNAL_ARGS)
+{
+ got_SIGHUP = true;
+}
+
+/* SIGTERM: set flag to shut down */
+static void
+WalSndShutdownHandler(SIGNAL_ARGS)
+{
+ shutdown_requested = true;
+}
+
+/*
+ * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm,
+ * so we need to stop what we're doing and exit.
+ */
+static void
+WalSndQuickDieHandler(SIGNAL_ARGS)
+{
+ PG_SETMASK(&BlockSig);
+
+ /*
+ * We DO NOT want to run proc_exit() callbacks -- we're here because
+ * shared memory may be corrupted, so we don't want to try to clean up our
+ * transaction. Just nail the windows shut and get out of town. Now that
+ * there's an atexit callback to prevent third-party code from breaking
+ * things by calling exit() directly, we have to reset the callbacks
+ * explicitly to make this work as intended.
+ */
+ on_exit_reset();
+
+ /*
+ * Note we do exit(2) not exit(0). This is to force the postmaster into a
+ * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+ * backend. This is necessary precisely because we don't clean up our
+ * shared memory state. (The "dead man switch" mechanism in pmsignal.c
+ * should ensure the postmaster sees this as a crash, too, but no harm
+ * in being doubly sure.)
+ */
+ exit(2);
+}
+
+/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
+static void
+WalSndLastCycleHandler(SIGNAL_ARGS)
+{
+ ready_to_stop = true;
+}
+
+/* Set up signal handlers */
+void
+WalSndSignals(void)
+{
+ /* Set up signal handlers */
+ pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config file */
+ pqsignal(SIGINT, SIG_IGN); /* not used */
+ pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
+ pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
+ pqsignal(SIGALRM, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, SIG_IGN); /* not used */
+ pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and shutdown */
+
+ /* Reset some signals that are accepted by postmaster but not here */
+ pqsignal(SIGCHLD, SIG_DFL);
+ pqsignal(SIGTTIN, SIG_DFL);
+ pqsignal(SIGTTOU, SIG_DFL);
+ pqsignal(SIGCONT, SIG_DFL);
+ pqsignal(SIGWINCH, SIG_DFL);
+}
+
+/* Report shared-memory space needed by WalSndShmemInit */
+Size
+WalSndShmemSize(void)
+{
+ Size size = 0;
+
+ size = offsetof(WalSndCtlData, walsnds);
+ size = add_size(size, mul_size(MaxWalSenders, sizeof(WalSnd)));
+
+ return size;
+}
+
+/* Allocate and initialize walsender-related shared memory */
+void
+WalSndShmemInit(void)
+{
+ bool found;
+ int i;
+
+ WalSndCtl = (WalSndCtlData *)
+ ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
+
+ if (WalSndCtl == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("not enough shared memory for walsender")));
+ if (found)
+ return; /* already initialized */
+
+ /* Initialize the data structures */
+ MemSet(WalSndCtl, 0, WalSndShmemSize());
+
+ for (i = 0; i < MaxWalSenders; i++)
+ {
+ WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ SpinLockInit(&walsnd->mutex);
+ }
+}
+
+/*
+ * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
+ * if none.
+ */
+XLogRecPtr
+GetOldestWALSendPointer(void)
+{
+ XLogRecPtr oldest = {0, 0};
+ int i;
+ bool found = false;
+
+ for (i = 0; i < MaxWalSenders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ XLogRecPtr recptr;
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ recptr = walsnd->sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (recptr.xlogid == 0 && recptr.xrecoff == 0)
+ continue;
+
+ if (!found || XLByteLT(recptr, oldest))
+ oldest = recptr;
+ found = true;
+ }
+ return oldest;
+}