aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2010-02-03 09:47:19 +0000
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2010-02-03 09:47:19 +0000
commit808969d0e7ac2d2fdbd915c6a6ac9ec68b6f63f9 (patch)
tree0215738e25ef02df0eedee365a6a38ca57f7df37
parent47c5b8f5588da67a95dca8cb14b2bc1b7f291e15 (diff)
downloadpostgresql-808969d0e7ac2d2fdbd915c6a6ac9ec68b6f63f9.tar.gz
postgresql-808969d0e7ac2d2fdbd915c6a6ac9ec68b6f63f9.zip
Add a message type header to the CopyData messages sent from primary
to standby in streaming replication. While we only have one message type at the moment, adding a message type header makes this easier to extend.
-rw-r--r--doc/src/sgml/protocol.sgml61
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c34
-rw-r--r--src/backend/replication/walreceiver.c49
-rw-r--r--src/backend/replication/walsender.c3
-rw-r--r--src/include/replication/walreceiver.h5
5 files changed, 117 insertions, 35 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 54e03998ff9..845e4181307 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.77 2010/01/15 09:18:59 heikki Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.78 2010/02/03 09:47:19 heikki Exp $ -->
<chapter id="protocol">
<title>Frontend/Backend Protocol</title>
@@ -4179,12 +4179,65 @@ The commands accepted in walsender mode are:
already been recycled. On success, server responds with a
CopyOutResponse message, and backend starts to stream WAL as CopyData
messages.
+ The payload in CopyData message consists of the following format.
</para>
<para>
- The payload in each CopyData message consists of an XLogRecPtr,
- indicating the starting point of the WAL in the message, immediately
- followed by the WAL data itself.
+ <variablelist>
+ <varlistentry>
+ <term>
+ XLogData (B)
+ </term>
+ <listitem>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ Byte1('w')
+ </term>
+ <listitem>
+ <para>
+ Identifies the message as WAL data.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Int32
+ </term>
+ <listitem>
+ <para>
+ The log file number of the LSN, indicating the starting point of
+ the WAL in the message.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Int32
+ </term>
+ <listitem>
+ <para>
+ The byte offset of the LSN, indicating the starting point of
+ the WAL in the message.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte<replaceable>n</replaceable>
+ </term>
+ <listitem>
+ <para>
+ Data that forms part of WAL data stream.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
</para>
<para>
A single WAL record is never split across two CopyData messages. When
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index b7a24e56f56..039370a8515 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.2 2010/01/20 11:58:44 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -48,8 +48,8 @@ static char *recvBuf = NULL;
/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
-static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
- int *len);
+static bool libpqrcv_receive(int timeout, unsigned char *type,
+ char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
@@ -236,13 +236,13 @@ libpqrcv_disconnect(void)
}
/*
- * Receive any WAL records available from XLOG stream, blocking for
+ * Receive a message available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
- * True if data was received. *recptr, *buffer and *len are set to
- * the WAL location of the received data, buffer holding it, and length,
+ * True if data was received. *type, *buffer and *len are set to
+ * the type of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
@@ -254,7 +254,7 @@ libpqrcv_disconnect(void)
* ereports on error.
*/
static bool
-libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
{
int rawlen;
@@ -275,14 +275,14 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
- (errmsg("could not read xlog records: %s",
+ (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
}
justconnected = false;
/* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
- if (rawlen == 0) /* no records available yet, then return */
+ if (rawlen == 0) /* no data available yet, then return */
return false;
if (rawlen == -1) /* end-of-streaming or error */
{
@@ -297,22 +297,18 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
}
PQclear(res);
ereport(ERROR,
- (errmsg("could not read xlog records: %s",
+ (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
- (errmsg("could not read xlog records: %s",
+ (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
- if (rawlen < sizeof(XLogRecPtr))
- ereport(ERROR,
- (errmsg("invalid WAL message received from primary")));
-
- /* Return received WAL records to caller */
- *recptr = *((XLogRecPtr *) recvBuf);
- *buffer = recvBuf + sizeof(XLogRecPtr);
- *len = rawlen - sizeof(XLogRecPtr);
+ /* Return received messages to caller */
+ *type = *((unsigned char *) recvBuf);
+ *buffer = recvBuf + sizeof(*type);
+ *len = rawlen - sizeof(*type);
return true;
}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 4a5ba5b4263..a2f15a9a03e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
static void WalRcvDie(int code, Datum arg);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
@@ -258,7 +259,7 @@ WalReceiverMain(void)
/* Loop until end-of-streaming or error */
for (;;)
{
- XLogRecPtr recptr;
+ unsigned char type;
char *buf;
int len;
@@ -287,17 +288,17 @@ WalReceiverMain(void)
}
/* Wait a while for data to arrive */
- if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
+ if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
- /* Write received WAL records to disk */
- XLogWalRcvWrite(buf, len, recptr);
+ /* Accept the received data, and process it */
+ XLogWalRcvProcessMsg(type, buf, len);
- /* Receive any more WAL records we can without sleeping */
- while(walrcv_receive(0, &recptr, &buf, &len))
- XLogWalRcvWrite(buf, len, recptr);
+ /* Receive any more data we can without sleeping */
+ while(walrcv_receive(0, &type, &buf, &len))
+ XLogWalRcvProcessMsg(type, buf, len);
/*
- * Now that we've written some records, flush them to disk and
+ * If we've written some records, flush them to disk and
* let the startup process know about them.
*/
XLogWalRcvFlush();
@@ -376,6 +377,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
}
/*
+ * Accept the message from XLOG stream, and process it.
+ */
+static void
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+{
+ switch (type)
+ {
+ case 'w': /* WAL records */
+ {
+ XLogRecPtr recptr;
+
+ if (len < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ recptr = *((XLogRecPtr *) buf);
+ buf += sizeof(XLogRecPtr);
+ len -= sizeof(XLogRecPtr);
+ XLogWalRcvWrite(buf, len, recptr);
+ break;
+ }
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication message type %d",
+ type)));
+ }
+}
+
+/*
* Write XLOG data to disk.
*/
static void
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0eb074ffe73..0115b70fa2a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -30,7 +30,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.4 2010/01/27 16:41:09 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.5 2010/02/03 09:47:19 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -659,6 +659,7 @@ XLogSend(StringInfo outMsg)
* have the same byte order. If they have different byte order, we
* don't reach here.
*/
+ pq_sendbyte(outMsg, 'w');
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
if (endptr.xlogid != startptr.xlogid)
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 083eb4f07fb..bf7ad41b068 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -5,7 +5,7 @@
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.6 2010/02/03 09:47:19 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -66,7 +66,8 @@ extern WalRcvData *WalRcv;
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
-typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
+typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
+ char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);