aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c276
1 files changed, 170 insertions, 106 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2347567ccd6..d078501814c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -45,6 +45,7 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/basebackup.h"
+#include "replication/replnodes.h"
#include "replication/walprotocol.h"
#include "replication/walsender.h"
#include "storage/fd.h"
@@ -99,6 +100,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
+static bool HandleReplicationCommand(const char *cmd_string);
static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndHandshake(void);
@@ -106,6 +108,8 @@ static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
+static void IdentifySystem(void);
+static void StartReplication(StartReplicationCmd * cmd);
/* Main entry point for walsender process */
@@ -218,118 +222,14 @@ WalSndHandshake(void)
case 'Q': /* Query message */
{
const char *query_string;
- XLogRecPtr recptr;
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
- if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
- {
- StringInfoData buf;
- char sysid[32];
- char tli[11];
-
- /*
- * Reply with a result set with one row, two columns.
- * First col is system ID, and second is timeline ID
- */
-
- snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
- GetSystemIdentifier());
- snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
- /* Send a RowDescription message */
- pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 2, 2); /* 2 fields */
-
- /* first field */
- pq_sendstring(&buf, "systemid"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* second field */
- pq_sendstring(&buf, "timeline"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, INT4OID, 4); /* type oid */
- pq_sendint(&buf, 4, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
- pq_endmessage(&buf);
-
- /* Send a DataRow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 2, 2); /* # of columns */
- pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
- pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
- pq_sendint(&buf, strlen(tli), 4); /* col2 len */
- pq_sendbytes(&buf, (char *) tli, strlen(tli));
- pq_endmessage(&buf);
-
- /* Send CommandComplete and ReadyForQuery messages */
- EndCommand("SELECT", DestRemote);
- ReadyForQuery(DestRemote);
- /* ReadyForQuery did pq_flush for us */
- }
- else if (sscanf(query_string, "START_REPLICATION %X/%X",
- &recptr.xlogid, &recptr.xrecoff) == 2)
- {
- StringInfoData buf;
-
- /*
- * Check that we're logging enough information in the
- * WAL for log-shipping.
- *
- * NOTE: This only checks the current value of
- * wal_level. Even if the current setting is not
- * 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'. So this
- * is not bulletproof, the purpose is just to give a
- * user-friendly error message that hints how to
- * configure the system correctly.
- */
- if (wal_level == WAL_LEVEL_MINIMAL)
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=minimal")));
-
- /* Send a CopyBothResponse message, and start streaming */
- pq_beginmessage(&buf, 'W');
- pq_sendbyte(&buf, 0);
- pq_sendint(&buf, 0, 2);
- pq_endmessage(&buf);
- pq_flush();
-
- /*
- * Initialize position to the received one, then the
- * xlog records begin to be shipped from that position
- */
- sentPtr = recptr;
-
- /* break out of the loop */
+ if (HandleReplicationCommand(query_string))
replication_started = true;
- }
- else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0)
- {
- /* Command is BASE_BACKUP <options>;<label> */
- SendBaseBackup(query_string + strlen("BASE_BACKUP "));
- /* Send CommandComplete and ReadyForQuery messages */
- EndCommand("SELECT", DestRemote);
- ReadyForQuery(DestRemote);
- /* ReadyForQuery did pq_flush for us */
- }
- else
- {
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby query string: %s", query_string)));
- }
- break;
}
+ break;
case 'X':
/* standby is closing the connection */
@@ -351,6 +251,170 @@ WalSndHandshake(void)
}
/*
+ * IDENTIFY_SYSTEM
+ */
+static void
+IdentifySystem(void)
+{
+ StringInfoData buf;
+ char sysid[32];
+ char tli[11];
+
+ /*
+ * Reply with a result set with one row, two columns. First col is system
+ * ID, and second is timeline ID
+ */
+
+ snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
+
+ /* Send a RowDescription message */
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 2, 2); /* 2 fields */
+
+ /* first field */
+ pq_sendstring(&buf, "systemid"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field */
+ pq_sendstring(&buf, "timeline"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, INT4OID, 4); /* type oid */
+ pq_sendint(&buf, 4, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 2, 2); /* # of columns */
+ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
+ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
+ pq_sendint(&buf, strlen(tli), 4); /* col2 len */
+ pq_sendbytes(&buf, (char *) tli, strlen(tli));
+ pq_endmessage(&buf);
+
+ /* Send CommandComplete and ReadyForQuery messages */
+ EndCommand("SELECT", DestRemote);
+ ReadyForQuery(DestRemote);
+ /* ReadyForQuery did pq_flush for us */
+}
+
+/*
+ * START_REPLICATION
+ */
+static void
+StartReplication(StartReplicationCmd * cmd)
+{
+ StringInfoData buf;
+
+ /*
+ * Check that we're logging enough information in the WAL for
+ * log-shipping.
+ *
+ * NOTE: This only checks the current value of wal_level. Even if the
+ * current setting is not 'minimal', there can be old WAL in the pg_xlog
+ * directory that was created with 'minimal'. So this is not bulletproof,
+ * the purpose is just to give a user-friendly error message that hints
+ * how to configure the system correctly.
+ */
+ if (wal_level == WAL_LEVEL_MINIMAL)
+ ereport(FATAL,
+ (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+ errmsg("standby connections not allowed because wal_level=minimal")));
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ /*
+ * Initialize position to the received one, then the xlog records begin to
+ * be shipped from that position
+ */
+ sentPtr = cmd->startpoint;
+}
+
+/*
+ * Execute an incoming replication command.
+ */
+static bool
+HandleReplicationCommand(const char *cmd_string)
+{
+ bool replication_started = false;
+ int parse_rc;
+ Node *cmd_node;
+ MemoryContext cmd_context;
+ MemoryContext old_context;
+
+ elog(DEBUG1, "received replication command: %s", cmd_string);
+
+ cmd_context = AllocSetContextCreate(CurrentMemoryContext,
+ "Replication command context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ old_context = MemoryContextSwitchTo(cmd_context);
+
+ replication_scanner_init(cmd_string);
+ parse_rc = replication_yyparse();
+ if (parse_rc != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ (errmsg_internal("replication command parser returned %d",
+ parse_rc))));
+
+ cmd_node = replication_parse_result;
+
+ switch (cmd_node->type)
+ {
+ case T_IdentifySystemCmd:
+ IdentifySystem();
+ break;
+
+ case T_StartReplicationCmd:
+ StartReplication((StartReplicationCmd *) cmd_node);
+
+ /* break out of the loop */
+ replication_started = true;
+ break;
+
+ case T_BaseBackupCmd:
+ {
+ BaseBackupCmd *cmd = (BaseBackupCmd *) cmd_node;
+
+ SendBaseBackup(cmd->label, cmd->progress);
+
+ /* Send CommandComplete and ReadyForQuery messages */
+ EndCommand("SELECT", DestRemote);
+ ReadyForQuery(DestRemote);
+ /* ReadyForQuery did pq_flush for us */
+ break;
+ }
+
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby query string: %s", cmd_string)));
+ }
+
+ /* done */
+ MemoryContextSwitchTo(old_context);
+ MemoryContextDelete(cmd_context);
+
+ return replication_started;
+}
+
+/*
* Check if the remote end has closed the connection.
*/
static void