diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/Makefile | 25 | ||||
-rw-r--r-- | src/backend/replication/basebackup.c | 17 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 143 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 168 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 276 |
5 files changed, 507 insertions, 122 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 21fc096df3f..42c6eaf26c3 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -12,6 +12,29 @@ subdir = src/backend/replication top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o +OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ + repl_gram.o include $(top_srcdir)/src/backend/common.mk + +# repl_scanner is compiled as part of repl_gram +repl_gram.o: repl_scanner.c + +# See notes in src/backend/parser/Makefile about the following two rules + +repl_gram.c: repl_gram.y +ifdef BISON + $(BISON) -d $(BISONFLAGS) -o $@ $< +else + @$(missing) bison $< $@ +endif + +repl_scanner.c: repl_scanner.l +ifdef FLEX + $(FLEX) $(FLEXFLAGS) -o'$@' $< +else + @$(missing) flex $< $@ +endif + +# repl_gram.c and repl_scanner.c are in the distribution tarball, so +# they are not cleaned here. diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 7929f855f6d..1ed5e2a6c9e 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -98,12 +98,10 @@ perform_base_backup(const char *backup_label, List *tablespaces) * pg_stop_backup() for the user. */ void -SendBaseBackup(const char *options) +SendBaseBackup(const char *backup_label, bool progress) { DIR *dir; struct dirent *de; - char *backup_label = strchr(options, ';'); - bool progress = false; List *tablespaces = NIL; tablespaceinfo *ti; MemoryContext backup_context; @@ -119,18 +117,7 @@ SendBaseBackup(const char *options) WalSndSetState(WALSNDSTATE_BACKUP); if (backup_label == NULL) - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid base backup options: %s", options))); - backup_label++; /* Walk past the semicolon */ - - /* Currently the only option string supported is PROGRESS */ - if (strncmp(options, "PROGRESS", 8) == 0) - progress = true; - else if (options[0] != ';') - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid base backup options: %s", options))); + backup_label = "base backup"; if (update_process_title) { diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y new file mode 100644 index 00000000000..0ef33ddb4f9 --- /dev/null +++ b/src/backend/replication/repl_gram.y @@ -0,0 +1,143 @@ +%{ +/*------------------------------------------------------------------------- + * + * repl_gram.y - Parser for the replication commands + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/replication/repl_gram.y + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "replication/replnodes.h" +#include "replication/walsender.h" + +/* Result of the parsing is returned here */ +Node *replication_parse_result; + +/* Location tracking support --- simpler than bison's default */ +#define YYLLOC_DEFAULT(Current, Rhs, N) \ + do { \ + if (N) \ + (Current) = (Rhs)[1]; \ + else \ + (Current) = (Rhs)[0]; \ + } while (0) + +/* + * Bison doesn't allocate anything that needs to live across parser calls, + * so we can easily have it use palloc instead of malloc. This prevents + * memory leaks if we error out during parsing. Note this only works with + * bison >= 2.0. However, in bison 1.875 the default is to use alloca() + * if possible, so there's not really much problem anyhow, at least if + * you're building with gcc. + */ +#define YYMALLOC palloc +#define YYFREE pfree + +#define parser_yyerror(msg) replication_yyerror(msg, yyscanner) +#define parser_errposition(pos) replication_scanner_errposition(pos) + +%} + +%expect 0 +%name-prefix="replication_yy" + +%union { + char *str; + bool boolval; + + XLogRecPtr recptr; + Node *node; +} + +/* Non-keyword tokens */ +%token <str> SCONST +%token <recptr> RECPTR + +/* Keyword tokens. */ +%token K_BASE_BACKUP +%token K_IDENTIFY_SYSTEM +%token K_LABEL +%token K_PROGRESS +%token K_START_REPLICATION + +%type <node> command +%type <node> base_backup start_replication identify_system +%type <boolval> opt_progress +%type <str> opt_label + +%% + +firstcmd: command opt_semicolon + { + replication_parse_result = $1; + } + ; + +opt_semicolon: ';' + | /* EMPTY */ + ; + +command: + identify_system + | base_backup + | start_replication + ; + +/* + * IDENTIFY_SYSTEM + */ +identify_system: + K_IDENTIFY_SYSTEM + { + $$ = (Node *) makeNode(IdentifySystemCmd); + } + ; + +/* + * BASE_BACKUP [LABEL <label>] [PROGRESS] + */ +base_backup: + K_BASE_BACKUP opt_label opt_progress + { + BaseBackupCmd *cmd = (BaseBackupCmd *) makeNode(BaseBackupCmd); + + cmd->label = $2; + cmd->progress = $3; + + $$ = (Node *) cmd; + } + ; + +opt_label: K_LABEL SCONST { $$ = $2; } + | /* EMPTY */ { $$ = NULL; } + ; + +opt_progress: K_PROGRESS { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + +/* + * START_REPLICATION %X/%X + */ +start_replication: + K_START_REPLICATION RECPTR + { + StartReplicationCmd *cmd; + + cmd = makeNode(StartReplicationCmd); + cmd->startpoint = $2; + + $$ = (Node *) cmd; + } + ; +%% + +#include "repl_scanner.c" diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l new file mode 100644 index 00000000000..014a72059a4 --- /dev/null +++ b/src/backend/replication/repl_scanner.l @@ -0,0 +1,168 @@ +%{ +/*------------------------------------------------------------------------- + * + * repl_scanner.l + * a lexical scanner for the replication commands + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/replication/repl_scanner.l + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ +#undef fprintf +#define fprintf(file, fmt, msg) ereport(ERROR, (errmsg_internal("%s", msg))) + +/* Handle to the buffer that the lexer uses internally */ +static YY_BUFFER_STATE scanbufhandle; + +static StringInfoData litbuf; + +static void startlit(void); +static char *litbufdup(void); +static void addlit(char *ytext, int yleng); +static void addlitchar(unsigned char ychar); + +%} + +%option 8bit +%option never-interactive +%option nodefault +%option noinput +%option nounput +%option noyywrap +%option warn +%option prefix="replication_yy" + +%x xq + +/* Extended quote + * xqdouble implements embedded quote, '''' + */ +xqstart {quote} +xqdouble {quote}{quote} +xqinside [^']+ + +hexdigit [0-9A-Za-z]+ + +quote ' +quotestop {quote} + +%% + +BASE_BACKUP { return K_BASE_BACKUP; } +IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +LABEL { return K_LABEL; } +PROGRESS { return K_PROGRESS; } +START_REPLICATION { return K_START_REPLICATION; } +"," { return ','; } +";" { return ';'; } + +[\n] ; +[\t] ; +" " ; + +{hexdigit}+\/{hexdigit}+ { + if (sscanf(yytext, "%X/%X", &yylval.recptr.xlogid, &yylval.recptr.xrecoff) != 2) + yyerror("invalid streaming start location"); + return RECPTR; + } + +{xqstart} { + BEGIN(xq); + startlit(); + } +<xq>{quotestop} { + yyless(1); + BEGIN(INITIAL); + yylval.str = litbufdup(); + return SCONST; + } +<xq>{xqdouble} { + addlitchar('\''); + } +<xq>{xqinside} { + addlit(yytext, yyleng); + } + +<xq><<EOF>> { yyerror("unterminated quoted string"); } + + +<<EOF>> { + yyterminate(); + } + +. { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("syntax error: unexpected character \"%s\"", yytext))); + } +%% + + +static void +startlit(void) +{ + initStringInfo(&litbuf); +} + +static char * +litbufdup(void) +{ + return litbuf.data; +} + +static void +addlit(char *ytext, int yleng) +{ + appendBinaryStringInfo(&litbuf, ytext, yleng); +} + +static void +addlitchar(unsigned char ychar) +{ + appendStringInfoChar(&litbuf, ychar); +} + +void +yyerror(const char *message) +{ + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg_internal("%s", message))); +} + + +void +replication_scanner_init(const char *str) +{ + Size slen = strlen(str); + char *scanbuf; + + /* + * Might be left over after ereport() + */ + if (YY_CURRENT_BUFFER) + yy_delete_buffer(YY_CURRENT_BUFFER); + + /* + * Make a scan buffer with special termination needed by flex. + */ + scanbuf = (char *) palloc(slen + 2); + memcpy(scanbuf, str, slen); + scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR; + scanbufhandle = yy_scan_buffer(scanbuf, slen + 2); +} + +void +replication_scanner_finish() +{ + yy_delete_buffer(scanbufhandle); + scanbufhandle = NULL; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2347567ccd6..d078501814c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -45,6 +45,7 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "replication/basebackup.h" +#include "replication/replnodes.h" #include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -99,6 +100,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ +static bool HandleReplicationCommand(const char *cmd_string); static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndHandshake(void); @@ -106,6 +108,8 @@ static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(char *msgbuf, bool *caughtup); static void CheckClosedConnection(void); +static void IdentifySystem(void); +static void StartReplication(StartReplicationCmd * cmd); /* Main entry point for walsender process */ @@ -218,118 +222,14 @@ WalSndHandshake(void) case 'Q': /* Query message */ { const char *query_string; - XLogRecPtr recptr; query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); - if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) - { - StringInfoData buf; - char sysid[32]; - char tli[11]; - - /* - * Reply with a result set with one row, two columns. - * First col is system ID, and second is timeline ID - */ - - snprintf(sysid, sizeof(sysid), UINT64_FORMAT, - GetSystemIdentifier()); - snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - - /* Send a RowDescription message */ - pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 2, 2); /* 2 fields */ - - /* first field */ - pq_sendstring(&buf, "systemid"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* second field */ - pq_sendstring(&buf, "timeline"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, INT4OID, 4); /* type oid */ - pq_sendint(&buf, 4, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - pq_endmessage(&buf); - - /* Send a DataRow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* # of columns */ - pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ - pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); - pq_sendint(&buf, strlen(tli), 4); /* col2 len */ - pq_sendbytes(&buf, (char *) tli, strlen(tli)); - pq_endmessage(&buf); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ - } - else if (sscanf(query_string, "START_REPLICATION %X/%X", - &recptr.xlogid, &recptr.xrecoff) == 2) - { - StringInfoData buf; - - /* - * Check that we're logging enough information in the - * WAL for log-shipping. - * - * NOTE: This only checks the current value of - * wal_level. Even if the current setting is not - * 'minimal', there can be old WAL in the pg_xlog - * directory that was created with 'minimal'. So this - * is not bulletproof, the purpose is just to give a - * user-friendly error message that hints how to - * configure the system correctly. - */ - if (wal_level == WAL_LEVEL_MINIMAL) - ereport(FATAL, - (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("standby connections not allowed because wal_level=minimal"))); - - /* Send a CopyBothResponse message, and start streaming */ - pq_beginmessage(&buf, 'W'); - pq_sendbyte(&buf, 0); - pq_sendint(&buf, 0, 2); - pq_endmessage(&buf); - pq_flush(); - - /* - * Initialize position to the received one, then the - * xlog records begin to be shipped from that position - */ - sentPtr = recptr; - - /* break out of the loop */ + if (HandleReplicationCommand(query_string)) replication_started = true; - } - else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0) - { - /* Command is BASE_BACKUP <options>;<label> */ - SendBaseBackup(query_string + strlen("BASE_BACKUP ")); - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ - } - else - { - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby query string: %s", query_string))); - } - break; } + break; case 'X': /* standby is closing the connection */ @@ -351,6 +251,170 @@ WalSndHandshake(void) } /* + * IDENTIFY_SYSTEM + */ +static void +IdentifySystem(void) +{ + StringInfoData buf; + char sysid[32]; + char tli[11]; + + /* + * Reply with a result set with one row, two columns. First col is system + * ID, and second is timeline ID + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "systemid"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "timeline"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, INT4OID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ + pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); + pq_sendint(&buf, strlen(tli), 4); /* col2 len */ + pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_endmessage(&buf); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ +} + +/* + * START_REPLICATION + */ +static void +StartReplication(StartReplicationCmd * cmd) +{ + StringInfoData buf; + + /* + * Check that we're logging enough information in the WAL for + * log-shipping. + * + * NOTE: This only checks the current value of wal_level. Even if the + * current setting is not 'minimal', there can be old WAL in the pg_xlog + * directory that was created with 'minimal'. So this is not bulletproof, + * the purpose is just to give a user-friendly error message that hints + * how to configure the system correctly. + */ + if (wal_level == WAL_LEVEL_MINIMAL) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("standby connections not allowed because wal_level=minimal"))); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + pq_flush(); + + /* + * Initialize position to the received one, then the xlog records begin to + * be shipped from that position + */ + sentPtr = cmd->startpoint; +} + +/* + * Execute an incoming replication command. + */ +static bool +HandleReplicationCommand(const char *cmd_string) +{ + bool replication_started = false; + int parse_rc; + Node *cmd_node; + MemoryContext cmd_context; + MemoryContext old_context; + + elog(DEBUG1, "received replication command: %s", cmd_string); + + cmd_context = AllocSetContextCreate(CurrentMemoryContext, + "Replication command context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + old_context = MemoryContextSwitchTo(cmd_context); + + replication_scanner_init(cmd_string); + parse_rc = replication_yyparse(); + if (parse_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg_internal("replication command parser returned %d", + parse_rc)))); + + cmd_node = replication_parse_result; + + switch (cmd_node->type) + { + case T_IdentifySystemCmd: + IdentifySystem(); + break; + + case T_StartReplicationCmd: + StartReplication((StartReplicationCmd *) cmd_node); + + /* break out of the loop */ + replication_started = true; + break; + + case T_BaseBackupCmd: + { + BaseBackupCmd *cmd = (BaseBackupCmd *) cmd_node; + + SendBaseBackup(cmd->label, cmd->progress); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ + break; + } + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby query string: %s", cmd_string))); + } + + /* done */ + MemoryContextSwitchTo(old_context); + MemoryContextDelete(cmd_context); + + return replication_started; +} + +/* * Check if the remote end has closed the connection. */ static void |