diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 276 |
1 files changed, 170 insertions, 106 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2347567ccd6..d078501814c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -45,6 +45,7 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "replication/basebackup.h" +#include "replication/replnodes.h" #include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -99,6 +100,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ +static bool HandleReplicationCommand(const char *cmd_string); static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndHandshake(void); @@ -106,6 +108,8 @@ static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(char *msgbuf, bool *caughtup); static void CheckClosedConnection(void); +static void IdentifySystem(void); +static void StartReplication(StartReplicationCmd * cmd); /* Main entry point for walsender process */ @@ -218,118 +222,14 @@ WalSndHandshake(void) case 'Q': /* Query message */ { const char *query_string; - XLogRecPtr recptr; query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); - if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) - { - StringInfoData buf; - char sysid[32]; - char tli[11]; - - /* - * Reply with a result set with one row, two columns. - * First col is system ID, and second is timeline ID - */ - - snprintf(sysid, sizeof(sysid), UINT64_FORMAT, - GetSystemIdentifier()); - snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - - /* Send a RowDescription message */ - pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 2, 2); /* 2 fields */ - - /* first field */ - pq_sendstring(&buf, "systemid"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* second field */ - pq_sendstring(&buf, "timeline"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, INT4OID, 4); /* type oid */ - pq_sendint(&buf, 4, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - pq_endmessage(&buf); - - /* Send a DataRow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* # of columns */ - pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ - pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); - pq_sendint(&buf, strlen(tli), 4); /* col2 len */ - pq_sendbytes(&buf, (char *) tli, strlen(tli)); - pq_endmessage(&buf); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ - } - else if (sscanf(query_string, "START_REPLICATION %X/%X", - &recptr.xlogid, &recptr.xrecoff) == 2) - { - StringInfoData buf; - - /* - * Check that we're logging enough information in the - * WAL for log-shipping. - * - * NOTE: This only checks the current value of - * wal_level. Even if the current setting is not - * 'minimal', there can be old WAL in the pg_xlog - * directory that was created with 'minimal'. So this - * is not bulletproof, the purpose is just to give a - * user-friendly error message that hints how to - * configure the system correctly. - */ - if (wal_level == WAL_LEVEL_MINIMAL) - ereport(FATAL, - (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("standby connections not allowed because wal_level=minimal"))); - - /* Send a CopyBothResponse message, and start streaming */ - pq_beginmessage(&buf, 'W'); - pq_sendbyte(&buf, 0); - pq_sendint(&buf, 0, 2); - pq_endmessage(&buf); - pq_flush(); - - /* - * Initialize position to the received one, then the - * xlog records begin to be shipped from that position - */ - sentPtr = recptr; - - /* break out of the loop */ + if (HandleReplicationCommand(query_string)) replication_started = true; - } - else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0) - { - /* Command is BASE_BACKUP <options>;<label> */ - SendBaseBackup(query_string + strlen("BASE_BACKUP ")); - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ - } - else - { - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby query string: %s", query_string))); - } - break; } + break; case 'X': /* standby is closing the connection */ @@ -351,6 +251,170 @@ WalSndHandshake(void) } /* + * IDENTIFY_SYSTEM + */ +static void +IdentifySystem(void) +{ + StringInfoData buf; + char sysid[32]; + char tli[11]; + + /* + * Reply with a result set with one row, two columns. First col is system + * ID, and second is timeline ID + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "systemid"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "timeline"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, INT4OID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ + pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); + pq_sendint(&buf, strlen(tli), 4); /* col2 len */ + pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_endmessage(&buf); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ +} + +/* + * START_REPLICATION + */ +static void +StartReplication(StartReplicationCmd * cmd) +{ + StringInfoData buf; + + /* + * Check that we're logging enough information in the WAL for + * log-shipping. + * + * NOTE: This only checks the current value of wal_level. Even if the + * current setting is not 'minimal', there can be old WAL in the pg_xlog + * directory that was created with 'minimal'. So this is not bulletproof, + * the purpose is just to give a user-friendly error message that hints + * how to configure the system correctly. + */ + if (wal_level == WAL_LEVEL_MINIMAL) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("standby connections not allowed because wal_level=minimal"))); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + pq_flush(); + + /* + * Initialize position to the received one, then the xlog records begin to + * be shipped from that position + */ + sentPtr = cmd->startpoint; +} + +/* + * Execute an incoming replication command. + */ +static bool +HandleReplicationCommand(const char *cmd_string) +{ + bool replication_started = false; + int parse_rc; + Node *cmd_node; + MemoryContext cmd_context; + MemoryContext old_context; + + elog(DEBUG1, "received replication command: %s", cmd_string); + + cmd_context = AllocSetContextCreate(CurrentMemoryContext, + "Replication command context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + old_context = MemoryContextSwitchTo(cmd_context); + + replication_scanner_init(cmd_string); + parse_rc = replication_yyparse(); + if (parse_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg_internal("replication command parser returned %d", + parse_rc)))); + + cmd_node = replication_parse_result; + + switch (cmd_node->type) + { + case T_IdentifySystemCmd: + IdentifySystem(); + break; + + case T_StartReplicationCmd: + StartReplication((StartReplicationCmd *) cmd_node); + + /* break out of the loop */ + replication_started = true; + break; + + case T_BaseBackupCmd: + { + BaseBackupCmd *cmd = (BaseBackupCmd *) cmd_node; + + SendBaseBackup(cmd->label, cmd->progress); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ + break; + } + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby query string: %s", cmd_string))); + } + + /* done */ + MemoryContextSwitchTo(old_context); + MemoryContextDelete(cmd_context); + + return replication_started; +} + +/* * Check if the remote end has closed the connection. */ static void |