diff options
-rw-r--r-- | doc/src/sgml/protocol.sgml | 24 | ||||
-rw-r--r-- | src/backend/postmaster/postmaster.c | 28 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 6 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 81 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 1 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 913 | ||||
-rw-r--r-- | src/backend/utils/init/postinit.c | 15 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 4 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_receivexlog.c | 4 | ||||
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 4 | ||||
-rw-r--r-- | src/include/replication/walsender.h | 1 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 1 |
12 files changed, 915 insertions, 167 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index d36f2f3af1f..cb2dfb2ebc0 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1302,10 +1302,13 @@ <para> To initiate streaming replication, the frontend sends the -<literal>replication</> parameter in the startup message. This tells the -backend to go into walsender mode, wherein a small set of replication commands -can be issued instead of SQL statements. Only the simple query protocol can be -used in walsender mode. +<literal>replication</> parameter in the startup message. A boolean value +of <literal>true</> tells the backend to go into walsender mode, wherein a +small set of replication commands can be issued instead of SQL statements. Only +the simple query protocol can be used in walsender mode. +Passing <literal>database</> as the value instructs walsender to connect to +the database specified in the <literal>dbname</> parameter, which will allow +the connection to be used for logical replication from that database. The commands accepted in walsender mode are: @@ -1315,7 +1318,7 @@ The commands accepted in walsender mode are: <listitem> <para> Requests the server to identify itself. Server replies with a result - set of a single row, containing three fields: + set of a single row, containing four fields: </para> <para> @@ -1357,6 +1360,17 @@ The commands accepted in walsender mode are: </listitem> </varlistentry> + <varlistentry> + <term> + dbname + </term> + <listitem> + <para> + Database connected to or NULL. + </para> + </listitem> + </varlistentry> + </variablelist> </para> </listitem> diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index b7f99fc18d3..5db01d104a1 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1884,10 +1884,23 @@ retry1: port->cmdline_options = pstrdup(valptr); else if (strcmp(nameptr, "replication") == 0) { - if (!parse_bool(valptr, &am_walsender)) + /* + * Due to backward compatibility concerns the replication + * parameter is a hybrid beast which allows the value to be + * either boolean or the string 'database'. The latter + * connects to a specific database which is e.g. required for + * logical decoding while. + */ + if (strcmp(valptr, "database") == 0) + { + am_walsender = true; + am_db_walsender = true; + } + else if (!parse_bool(valptr, &am_walsender)) ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for boolean option \"replication\""))); + errmsg("invalid value for parameter \"replication\""), + errhint("Valid values are: false, 0, true, 1, database."))); } else { @@ -1968,8 +1981,15 @@ retry1: if (strlen(port->user_name) >= NAMEDATALEN) port->user_name[NAMEDATALEN - 1] = '\0'; - /* Walsender is not related to a particular database */ - if (am_walsender) + /* + * Normal walsender backends, e.g. for streaming replication, are not + * connected to a particular database. But walsenders used for logical + * replication need to connect to a specific database. We allow streaming + * replication commands to be issued even if connected to a database as it + * can make sense to first make a basebackup and then stream changes + * starting from that. + */ + if (am_walsender && !am_db_walsender) port->database_name[0] = '\0'; /* diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index c10374cdbca..96f31c4c55b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -131,7 +131,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli) "the primary server: %s", PQerrorMessage(streamConn)))); } - if (PQnfields(res) != 3 || PQntuples(res) != 1) + if (PQnfields(res) < 3 || PQntuples(res) != 1) { int ntuples = PQntuples(res); int nfields = PQnfields(res); @@ -139,8 +139,8 @@ libpqrcv_identify_system(TimeLineID *primary_tli) PQclear(res); ereport(ERROR, (errmsg("invalid response from primary server"), - errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.", - ntuples, nfields))); + errdetail("Could not identify system: Got %d rows and %d fields, expected %d rows and %d or more fields.", + ntuples, nfields, 3, 1))); } primary_sysid = PQgetvalue(res, 0, 0); *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 308889b5c9a..154aaace9f5 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -73,13 +73,17 @@ Node *replication_parse_result; %token K_WAL %token K_TIMELINE %token K_PHYSICAL +%token K_LOGICAL %token K_SLOT %type <node> command -%type <node> base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history +%type <node> base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system timeline_history %type <list> base_backup_opt_list %type <defelt> base_backup_opt %type <uintval> opt_timeline +%type <list> plugin_options plugin_opt_list +%type <defelt> plugin_opt_elem +%type <node> plugin_opt_arg %type <str> opt_slot %% @@ -98,6 +102,7 @@ command: identify_system | base_backup | start_replication + | start_logical_replication | create_replication_slot | drop_replication_slot | timeline_history @@ -165,8 +170,8 @@ base_backup_opt: } ; -/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */ create_replication_slot: + /* CREATE_REPLICATION_SLOT slot PHYSICAL */ K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL { CreateReplicationSlotCmd *cmd; @@ -175,9 +180,19 @@ create_replication_slot: cmd->slotname = $2; $$ = (Node *) cmd; } + /* CREATE_REPLICATION_SLOT slot LOGICAL plugin */ + | K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT + { + CreateReplicationSlotCmd *cmd; + cmd = makeNode(CreateReplicationSlotCmd); + cmd->kind = REPLICATION_KIND_LOGICAL; + cmd->slotname = $2; + cmd->plugin = $4; + $$ = (Node *) cmd; + } ; -/* DROP_REPLICATION_SLOT SLOT slot */ +/* DROP_REPLICATION_SLOT slot */ drop_replication_slot: K_DROP_REPLICATION_SLOT IDENT { @@ -205,19 +220,19 @@ start_replication: } ; -opt_timeline: - K_TIMELINE UCONST +/* START_REPLICATION SLOT slot LOGICAL %X/%X options */ +start_logical_replication: + K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options { - if ($2 <= 0) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - (errmsg("invalid timeline %u", $2)))); - $$ = $2; + StartReplicationCmd *cmd; + cmd = makeNode(StartReplicationCmd); + cmd->kind = REPLICATION_KIND_LOGICAL;; + cmd->slotname = $3; + cmd->startpoint = $5; + cmd->options = $6; + $$ = (Node *) cmd; } - | /* EMPTY */ - { $$ = 0; } ; - /* * TIMELINE_HISTORY %d */ @@ -250,6 +265,46 @@ opt_slot: { $$ = NULL; } ; +opt_timeline: + K_TIMELINE UCONST + { + if ($2 <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg("invalid timeline %u", $2)))); + $$ = $2; + } + | /* EMPTY */ { $$ = 0; } + ; + + +plugin_options: + '(' plugin_opt_list ')' { $$ = $2; } + | /* EMPTY */ { $$ = NIL; } + ; + +plugin_opt_list: + plugin_opt_elem + { + $$ = list_make1($1); + } + | plugin_opt_list ',' plugin_opt_elem + { + $$ = lappend($1, $3); + } + ; + +plugin_opt_elem: + IDENT plugin_opt_arg + { + $$ = makeDefElem($1, $2); + } + ; + +plugin_opt_arg: + SCONST { $$ = (Node *) makeString($1); } + | /* EMPTY */ { $$ = NULL; } + ; %% #include "repl_scanner.c" diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index ca32aa67ff1..a2571244229 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -94,6 +94,7 @@ CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } +LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } "," { return ','; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 003c797e0ea..09854112062 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -45,15 +45,22 @@ #include "access/timeline.h" #include "access/transam.h" +#include "access/xact.h" #include "access/xlog_internal.h" + #include "catalog/pg_type.h" +#include "commands/dbcommands.h" #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/replnodes.h" #include "replication/basebackup.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/logicalfuncs.h" #include "replication/slot.h" +#include "replication/snapbuild.h" #include "replication/syncrep.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -92,9 +99,10 @@ WalSndCtlData *WalSndCtl = NULL; WalSnd *MyWalSnd = NULL; /* Global state */ -bool am_walsender = false; /* Am I a walsender process ? */ +bool am_walsender = false; /* Am I a walsender process? */ bool am_cascading_walsender = false; /* Am I cascading WAL to - * another standby ? */ + * another standby? */ +bool am_db_walsender = false; /* Connected to a database? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ @@ -145,7 +153,7 @@ static StringInfoData tmpbuf; static TimestampTz last_reply_timestamp; /* Have we sent a heartbeat message asking for reply, since last reply? */ -static bool ping_sent = false; +static bool waiting_for_ping_response = false; /* * While streaming WAL in Copy mode, streamingDoneSending is set to true @@ -156,6 +164,9 @@ static bool ping_sent = false; static bool streamingDoneSending; static bool streamingDoneReceiving; +/* Are we there yet? */ +static bool WalSndCaughtUp = false; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t walsender_ready_to_stop = false; @@ -168,24 +179,42 @@ static volatile sig_atomic_t walsender_ready_to_stop = false; */ static volatile sig_atomic_t replication_active = false; +static LogicalDecodingContext *logical_decoding_ctx = NULL; +static XLogRecPtr logical_startptr = InvalidXLogRecPtr; + /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void WalSndLoop(void); +typedef void (*WalSndSendDataCallback)(void); +static void WalSndLoop(WalSndSendDataCallback send_data); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); -static void XLogSend(bool *caughtup); +static void WalSndShutdown(void) __attribute__((noreturn)); +static void XLogSendPhysical(void); +static void XLogSendLogical(void); +static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); +static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); +static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); +static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); static void WalSndKeepalive(bool requestReply); +static void WalSndKeepaliveIfNecessary(TimestampTz now); +static void WalSndCheckTimeOut(TimestampTz now); +static long WalSndComputeSleeptime(TimestampTz now); +static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); +static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); +static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); + +static void XLogRead(char *buf, XLogRecPtr startptr, Size count); /* Initialize walsender process before entering the main command loop */ @@ -241,6 +270,23 @@ WalSndErrorCleanup() } /* + * Handle a client's connection abort in an orderly manner. + */ +static void +WalSndShutdown(void) +{ + /* + * Reset whereToSendOutput to prevent ereport from attempting to send any + * more messages to the standby. + */ + if (whereToSendOutput == DestRemote) + whereToSendOutput = DestNone; + + proc_exit(0); + abort(); /* keep the compiler quiet */ +} + +/* * Handle the IDENTIFY_SYSTEM command. */ static void @@ -251,10 +297,12 @@ IdentifySystem(void) char tli[11]; char xpos[MAXFNAMELEN]; XLogRecPtr logptr; + char *dbname = NULL; /* - * Reply with a result set with one row, three columns. First col is - * system ID, second is timeline ID, and third is current xlog location. + * Reply with a result set with one row, four columns. First col is system + * ID, second is timeline ID, third is current xlog location and the fourth + * contains the database name if we are connected to one. */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -273,9 +321,23 @@ IdentifySystem(void) snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); + if (MyDatabaseId != InvalidOid) + { + MemoryContext cur = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(cur); + dbname = get_database_name(MyDatabaseId); + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(cur); + } + /* Send a RowDescription message */ pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 3, 2); /* 3 fields */ + pq_sendint(&buf, 4, 2); /* 4 fields */ /* first field */ pq_sendstring(&buf, "systemid"); /* col name */ @@ -296,24 +358,43 @@ IdentifySystem(void) pq_sendint(&buf, 0, 2); /* format code */ /* third field */ - pq_sendstring(&buf, "xlogpos"); - pq_sendint(&buf, 0, 4); - pq_sendint(&buf, 0, 2); - pq_sendint(&buf, TEXTOID, 4); - pq_sendint(&buf, -1, 2); - pq_sendint(&buf, 0, 4); - pq_sendint(&buf, 0, 2); + pq_sendstring(&buf, "xlogpos"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* fourth field */ + pq_sendstring(&buf, "dbname"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 3, 2); /* # of columns */ + pq_sendint(&buf, 4, 2); /* # of columns */ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); pq_sendint(&buf, strlen(tli), 4); /* col2 len */ pq_sendbytes(&buf, (char *) tli, strlen(tli)); pq_sendint(&buf, strlen(xpos), 4); /* col3 len */ pq_sendbytes(&buf, (char *) xpos, strlen(xpos)); + /* send NULL if not connected to a database */ + if (dbname) + { + pq_sendint(&buf, strlen(dbname), 4); /* col4 len */ + pq_sendbytes(&buf, (char *) dbname, strlen(dbname)); + } + else + { + pq_sendint(&buf, -1, 4); /* col4 len, NULL */ + } pq_endmessage(&buf); } @@ -572,7 +653,7 @@ StartReplication(StartReplicationCmd *cmd) /* Main loop of walsender */ replication_active = true; - WalSndLoop(); + WalSndLoop(XLogSendPhysical); replication_active = false; if (walsender_ready_to_stop) @@ -643,12 +724,47 @@ StartReplication(StartReplicationCmd *cmd) } /* + * read_page callback for logical decoding contexts, as a walsender process. + * + * Inside the walsender we can do better than logical_read_local_xlog_page, + * which has to do a plain sleep/busy loop, because the walsender's latch gets + * set everytime WAL is flushed. + */ +static int +logical_read_xlog_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI) +{ + XLogRecPtr flushptr; + int count; + + /* make sure we have enough WAL available */ + flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + + /* more than one block available */ + if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + count = XLOG_BLCKSZ; + /* not enough WAL synced, that can happen during shutdown */ + else if (targetPagePtr + reqLen > flushptr) + return -1; + /* part of the page available */ + else + count = flushptr - targetPagePtr; + + /* now actually read the data, we know it's there */ + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + + return count; +} + +/* * Create a new replication slot. */ static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { const char *slot_name; + const char *snapshot_name = NULL; + char xpos[MAXFNAMELEN]; StringInfoData buf; Assert(!MyReplicationSlot); @@ -657,24 +773,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) sendTimeLineIsHistoric = false; sendTimeLine = ThisTimeLineID; - ReplicationSlotCreate(cmd->slotname, - cmd->kind == REPLICATION_KIND_LOGICAL, - RS_PERSISTENT); + if (cmd->kind == REPLICATION_KIND_PHYSICAL) + { + ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT); + } + else + { + CheckLogicalDecodingRequirements(); + ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL); + } initStringInfo(&output_message); slot_name = NameStr(MyReplicationSlot->data.name); - /* - * It may seem somewhat pointless to send back the same slot name the - * client just requested and nothing else, but logical replication - * will add more fields here. (We could consider removing the slot - * name from what's sent back, though, since the client has specified - * that.) - */ + if (cmd->kind == REPLICATION_KIND_LOGICAL) + { + LogicalDecodingContext *ctx; + + ctx = CreateInitDecodingContext( + cmd->plugin, NIL, + logical_read_xlog_page, + WalSndPrepareWrite, WalSndWriteData); + + /* build initial snapshot, might take a while */ + DecodingContextFindStartpoint(ctx); + + /* + * Export a plain (not of the snapbuild.c type) snapshot to the user + * that can be imported into another session. + */ + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); + + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + + ReplicationSlotPersist(); + } + + slot_name = NameStr(MyReplicationSlot->data.name); + snprintf(xpos, sizeof(xpos), "%X/%X", + (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), + (uint32) MyReplicationSlot->data.confirmed_flush); pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 1, 2); /* 1 field */ + pq_sendint(&buf, 4, 2); /* 4 fields */ /* first field: slot name */ pq_sendstring(&buf, "slot_name"); /* col name */ @@ -685,16 +828,65 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) pq_sendint(&buf, 0, 4); /* typmod */ pq_sendint(&buf, 0, 2); /* format code */ + /* second field: LSN at which we became consistent */ + pq_sendstring(&buf, "consistent_point"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* third field: exported snapshot's name */ + pq_sendstring(&buf, "snapshot_name"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* fourth field: output plugin */ + pq_sendstring(&buf, "output_plugin"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 1, 2); /* # of columns */ + pq_sendint(&buf, 4, 2); /* # of columns */ /* slot_name */ pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */ pq_sendbytes(&buf, slot_name, strlen(slot_name)); + /* consistent wal location */ + pq_sendint(&buf, strlen(xpos), 4); /* col2 len */ + pq_sendbytes(&buf, xpos, strlen(xpos)); + + /* snapshot name */ + if (snapshot_name != NULL) + { + pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */ + pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name)); + } + else + pq_sendint(&buf, -1, 4); /* col3 len, NULL */ + + /* plugin */ + if (cmd->plugin != NULL) + { + pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */ + pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin)); + } + else + pq_sendint(&buf, -1, 4); /* col4 len, NULL */ + pq_endmessage(&buf); /* @@ -714,6 +906,339 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) } /* + * Load previously initiated logical slot and prepare for sending data (via + * WalSndLoop). + */ +static void +StartLogicalReplication(StartReplicationCmd *cmd) +{ + StringInfoData buf; + + /* make sure that our requirements are still fulfilled */ + CheckLogicalDecodingRequirements(); + + Assert(!MyReplicationSlot); + + ReplicationSlotAcquire(cmd->slotname); + + /* + * Force a disconnect, so that the decoding code doesn't need to care + * about a eventual switch from running in recovery, to running in a + * normal environment. Client code is expected to handle reconnects. + */ + if (am_cascading_walsender && !RecoveryInProgress()) + { + ereport(LOG, + (errmsg("terminating walsender process after promotion"))); + walsender_ready_to_stop = true; + } + + WalSndSetState(WALSNDSTATE_CATCHUP); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + pq_flush(); + + /* setup state for XLogReadPage */ + sendTimeLineIsHistoric = false; + sendTimeLine = ThisTimeLineID; + + /* + * Initialize position to the last ack'ed one, then the xlog records begin + * to be shipped from that position. + */ + logical_decoding_ctx = CreateDecodingContext( + cmd->startpoint, cmd->options, + logical_read_xlog_page, + WalSndPrepareWrite, WalSndWriteData); + + /* Start reading WAL from the oldest required WAL. */ + logical_startptr = MyReplicationSlot->data.restart_lsn; + + /* + * Report the location after which we'll send out further commits as the + * current sentPtr. + */ + sentPtr = MyReplicationSlot->data.confirmed_flush; + + /* Also update the sent position status in shared memory */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = MyReplicationSlot->data.restart_lsn; + SpinLockRelease(&walsnd->mutex); + } + + replication_active = true; + + SyncRepInitConfig(); + + /* Main loop of walsender */ + WalSndLoop(XLogSendLogical); + + FreeDecodingContext(logical_decoding_ctx); + ReplicationSlotRelease(); + + replication_active = false; + if (walsender_ready_to_stop) + proc_exit(0); + WalSndSetState(WALSNDSTATE_STARTUP); + + /* Get out of COPY mode (CommandComplete). */ + EndCommand("COPY 0", DestRemote); +} + +/* + * LogicalDecodingContext 'prepare_write' callback. + * + * Prepare a write into a StringInfo. + * + * Don't do anything lasting in here, it's quite possible that nothing will done + * with the data. + */ +static void +WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) +{ + /* can't have sync rep confused by sending the same LSN several times */ + if (!last_write) + lsn = InvalidXLogRecPtr; + + resetStringInfo(ctx->out); + + pq_sendbyte(ctx->out, 'w'); + pq_sendint64(ctx->out, lsn); /* dataStart */ + pq_sendint64(ctx->out, lsn); /* walEnd */ + /* + * Fill out the sendtime later, just as it's done in XLogSendPhysical, but + * reserve space here. + */ + pq_sendint64(ctx->out, 0); /* sendtime */ +} + +/* + * LogicalDecodingContext 'write' callback. + * + * Actually write out data previously prepared by WalSndPrepareWrite out to + * the network. Take as long as needed, but process replies from the other + * side and check timeouts during that. + */ +static void +WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, + bool last_write) +{ + /* output previously gathered data in a CopyData packet */ + pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); + + /* + * Fill the send timestamp last, so that it is taken as late as + * possible. This is somewhat ugly, but the protocol's set as it's already + * used for several releases by streaming physical replication. + */ + resetStringInfo(&tmpbuf); + pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp()); + memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], + tmpbuf.data, sizeof(int64)); + + /* fast path */ + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + if (!pq_is_send_pending()) + return; + + for (;;) + { + int wakeEvents; + long sleeptime; + TimestampTz now; + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + /* Process any requests or signals received recently */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* Clear any already-pending wakeups */ + ResetLatch(&MyWalSnd->latch); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* If we finished clearing the buffered data, we're done here. */ + if (!pq_is_send_pending()) + break; + + now = GetCurrentTimestamp(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(now); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(now); + + sleeptime = WalSndComputeSleeptime(now); + + wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | + WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; + + /* Sleep until something happens or we time out */ + ImmediateInterruptOK = true; + CHECK_FOR_INTERRUPTS(); + WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + MyProcPort->sock, sleeptime); + ImmediateInterruptOK = false; + } + + /* reactivate latch so WalSndLoop knows to continue */ + SetLatch(&MyWalSnd->latch); +} + +/* + * Wait till WAL < loc is flushed to disk so it can be safely read. + */ +static XLogRecPtr +WalSndWaitForWal(XLogRecPtr loc) +{ + int wakeEvents; + static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + + + /* + * Fast path to avoid acquiring the spinlock in the we already know we + * have enough WAL available. This is particularly interesting if we're + * far behind. + */ + if (RecentFlushPtr != InvalidXLogRecPtr && + loc <= RecentFlushPtr) + return RecentFlushPtr; + + /* Get a more recent flush pointer. */ + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + + for (;;) + { + long sleeptime; + TimestampTz now; + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive()) + exit(1); + + /* Process any requests or signals received recently */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* Clear any already-pending wakeups */ + ResetLatch(&MyWalSnd->latch); + + /* Update our idea of the currently flushed position. */ + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + + /* + * If postmaster asked us to stop, don't wait here anymore. This will + * cause the xlogreader to return without reading a full record, which + * is the fastest way to reach the mainloop which then can quit. + * + * It's important to do this check after the recomputation of + * RecentFlushPtr, so we can send all remaining data before shutting + * down. + */ + if (walsender_ready_to_stop) + break; + + /* + * We only send regular messages to the client for full decoded + * transactions, but a synchronous replication and walsender shutdown + * possibly are waiting for a later location. So we send pings + * containing the flush location every now and then. + */ + if (MyWalSnd->flush < sentPtr && !waiting_for_ping_response) + { + WalSndKeepalive(true); + waiting_for_ping_response = true; + } + + /* check whether we're done */ + if (loc <= RecentFlushPtr) + break; + + /* Waiting for new WAL. Since we need to wait, we're now caught up. */ + WalSndCaughtUp = true; + + /* + * Try to flush pending output to the client. Also wait for the socket + * becoming writable, if there's still pending output after an attempt + * to flush. Otherwise we might just sit on output data while waiting + * for new WAL being generated. + */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + now = GetCurrentTimestamp(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(now); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(now); + + sleeptime = WalSndComputeSleeptime(now); + + wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | + WL_SOCKET_READABLE | WL_TIMEOUT; + + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; + + /* Sleep until something happens or we time out */ + ImmediateInterruptOK = true; + CHECK_FOR_INTERRUPTS(); + WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + MyProcPort->sock, sleeptime); + ImmediateInterruptOK = false; + } + + /* reactivate latch so WalSndLoop knows to continue */ + SetLatch(&MyWalSnd->latch); + return RecentFlushPtr; +} + +/* * Execute an incoming replication command. */ void @@ -724,6 +1249,12 @@ exec_replication_command(const char *cmd_string) MemoryContext cmd_context; MemoryContext old_context; + /* + * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next + * command arrives. Clean up the old stuff if there's anything. + */ + SnapBuildClearExportedSnapshot(); + elog(DEBUG1, "received replication command: %s", cmd_string); CHECK_FOR_INTERRUPTS(); @@ -769,7 +1300,7 @@ exec_replication_command(const char *cmd_string) if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else - elog(ERROR, "cannot handle logical decoding yet"); + StartLogicalReplication(cmd); break; } @@ -887,7 +1418,7 @@ ProcessRepliesIfAny(void) if (received) { last_reply_timestamp = GetCurrentTimestamp(); - ping_sent = false; + waiting_for_ping_response = false; } } @@ -1020,7 +1551,7 @@ ProcessStandbyReplyMessage(void) if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) { if (MyReplicationSlot->data.database != InvalidOid) - elog(ERROR, "cannot handle logical decoding yet"); + LogicalConfirmReceivedLocation(flushPtr); else PhysicalConfirmReceivedLocation(flushPtr); } @@ -1146,12 +1677,81 @@ ProcessStandbyHSFeedbackMessage(void) MyPgXact->xmin = feedbackXmin; } -/* Main loop of walsender process that streams the WAL over Copy messages. */ +/* + * Compute how long send/receive loops should sleep. + * + * If wal_sender_timeout is enabled we want to wake up in time to send + * keepalives and to abort the connection if wal_sender_timeout has been + * reached. + */ +static long +WalSndComputeSleeptime(TimestampTz now) +{ + long sleeptime = 10000; /* 10 s */ + + if (wal_sender_timeout > 0) + { + TimestampTz wakeup_time; + long sec_to_timeout; + int microsec_to_timeout; + + /* + * At the latest stop sleeping once wal_sender_timeout has been + * reached. + */ + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout); + + /* + * If no ping has been sent yet, wakeup when it's time to do + * so. WalSndKeepaliveIfNecessary() wants to send a keepalive once + * half of the timeout passed without a response. + */ + if (!waiting_for_ping_response) + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + + /* Compute relative time until wakeup. */ + TimestampDifference(now, wakeup_time, + &sec_to_timeout, µsec_to_timeout); + + sleeptime = sec_to_timeout * 1000 + + microsec_to_timeout / 1000; + } + + return sleeptime; +} + +/* + * Check whether there have been responses by the client within + * wal_sender_timeout and shutdown if not. + */ static void -WalSndLoop(void) +WalSndCheckTimeOut(TimestampTz now) { - bool caughtup = false; + TimestampTz timeout; + + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout); + if (wal_sender_timeout > 0 && now >= timeout) + { + /* + * Since typically expiration of replication timeout means + * communication problem, we don't send the error message to + * the standby. + */ + ereport(COMMERROR, + (errmsg("terminating walsender process due to replication timeout"))); + + WalSndShutdown(); + } +} + +/* Main loop of walsender process that streams the WAL over Copy messages. */ +static void +WalSndLoop(WalSndSendDataCallback send_data) +{ /* * Allocate buffers that will be used for each outgoing and incoming * message. We do this just once to reduce palloc overhead. @@ -1162,7 +1762,7 @@ WalSndLoop(void) /* Initialize the last reply timestamp */ last_reply_timestamp = GetCurrentTimestamp(); - ping_sent = false; + waiting_for_ping_response = false; /* * Loop until we reach the end of this timeline or the client requests to @@ -1170,8 +1770,7 @@ WalSndLoop(void) */ for (;;) { - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); + TimestampTz now; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1193,6 +1792,9 @@ WalSndLoop(void) /* Check for input from the client */ ProcessRepliesIfAny(); + /* Clear any already-pending wakeups */ + ResetLatch(&MyWalSnd->latch); + /* * If we have received CopyDone from the client, sent CopyDone * ourselves, and the output buffer is empty, it's time to exit @@ -1203,21 +1805,21 @@ WalSndLoop(void) /* * If we don't have any pending data in the output buffer, try to send - * some more. If there is some, we don't bother to call XLogSend + * some more. If there is some, we don't bother to call send_data * again until we've flushed it ... but we'd better assume we are not * caught up. */ if (!pq_is_send_pending()) - XLogSend(&caughtup); + send_data(); else - caughtup = false; + WalSndCaughtUp = false; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) - goto send_failure; + WalSndShutdown(); /* If nothing remains to be sent right now ... */ - if (caughtup && !pq_is_send_pending()) + if (WalSndCaughtUp && !pq_is_send_pending()) { /* * If we're in catchup state, move to streaming. This is an @@ -1243,111 +1845,47 @@ WalSndLoop(void) * the walsender is not sure which. */ if (walsender_ready_to_stop) - { - /* ... let's just be real sure we're caught up ... */ - XLogSend(&caughtup); - if (caughtup && sentPtr == MyWalSnd->flush && - !pq_is_send_pending()) - { - /* Inform the standby that XLOG streaming is done */ - EndCommand("COPY 0", DestRemote); - pq_flush(); - - proc_exit(0); - } - } + WalSndDone(send_data); } - /* - * If half of wal_sender_timeout has elapsed without receiving any - * reply from standby, send a keep-alive message requesting an - * immediate reply. - */ - if (wal_sender_timeout > 0 && !ping_sent) - { - TimestampTz timeout; + now = GetCurrentTimestamp(); - timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); - if (GetCurrentTimestamp() >= timeout) - { - WalSndKeepalive(true); - ping_sent = true; - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - goto send_failure; - } - } + /* Check for replication timeout. */ + WalSndCheckTimeOut(now); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(now); /* * We don't block if not caught up, unless there is unsent data * pending in which case we'd better block until the socket is - * write-ready. This test is only needed for the case where XLogSend - * loaded a subset of the available data but then pq_flush_if_writable - * flushed it all --- we should immediately try to send more. + * write-ready. This test is only needed for the case where the + * send_data callback handled a subset of the available data but then + * pq_flush_if_writable flushed it all --- we should immediately try + * to send more. */ - if ((caughtup && !streamingDoneSending) || pq_is_send_pending()) + if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending()) { - TimestampTz timeout; - long sleeptime = 10000; /* 10 s */ + long sleeptime; int wakeEvents; wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT | WL_SOCKET_READABLE; + sleeptime = WalSndComputeSleeptime(now); + if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - /* - * If wal_sender_timeout is active, sleep in smaller increments - * to not go over the timeout too much. XXX: Why not just sleep - * until the timeout has elapsed? - */ - if (wal_sender_timeout > 0) - sleeptime = 1 + (wal_sender_timeout / 10); - /* Sleep until something happens or we time out */ ImmediateInterruptOK = true; CHECK_FOR_INTERRUPTS(); WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, MyProcPort->sock, sleeptime); ImmediateInterruptOK = false; - - /* - * Check for replication timeout. Note we ignore the corner case - * possibility that the client replied just as we reached the - * timeout ... he's supposed to reply *before* that. - */ - timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout); - if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout) - { - /* - * Since typically expiration of replication timeout means - * communication problem, we don't send the error message to - * the standby. - */ - ereport(COMMERROR, - (errmsg("terminating walsender process due to replication timeout"))); - goto send_failure; - } } } return; - -send_failure: - - /* - * Get here on send failure. Clean up and exit. - * - * Reset whereToSendOutput to prevent ereport from attempting to send any - * more messages to the standby. - */ - if (whereToSendOutput == DestRemote) - whereToSendOutput = DestNone; - - proc_exit(0); - abort(); /* keep the compiler quiet */ } /* Initialize a per-walsender data structure for this walsender process */ @@ -1605,15 +2143,17 @@ retry: } /* + * Send out the WAL in its normal physical/stored form. + * * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, * but not yet sent to the client, and buffer it in the libpq output * buffer. * - * If there is no unsent WAL remaining, *caughtup is set to true, otherwise - * *caughtup is set to false. + * If there is no unsent WAL remaining, WalSndCaughtUp is set to true, + * otherwise WalSndCaughtUp is set to false. */ static void -XLogSend(bool *caughtup) +XLogSendPhysical(void) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; @@ -1622,7 +2162,7 @@ XLogSend(bool *caughtup) if (streamingDoneSending) { - *caughtup = true; + WalSndCaughtUp = true; return; } @@ -1739,7 +2279,7 @@ XLogSend(bool *caughtup) pq_putmessage_noblock('c', NULL, 0); streamingDoneSending = true; - *caughtup = true; + WalSndCaughtUp = true; elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, @@ -1751,7 +2291,7 @@ XLogSend(bool *caughtup) Assert(sentPtr <= SendRqstPtr); if (SendRqstPtr <= sentPtr) { - *caughtup = true; + WalSndCaughtUp = true; return; } @@ -1775,15 +2315,15 @@ XLogSend(bool *caughtup) { endptr = SendRqstPtr; if (sendTimeLineIsHistoric) - *caughtup = false; + WalSndCaughtUp = false; else - *caughtup = true; + WalSndCaughtUp = true; } else { /* round down to page boundary. */ endptr -= (endptr % XLOG_BLCKSZ); - *caughtup = false; + WalSndCaughtUp = false; } nbytes = endptr - startptr; @@ -1844,6 +2384,85 @@ XLogSend(bool *caughtup) } /* + * Stream out logically decoded data. + */ +static void +XLogSendLogical(void) +{ + XLogRecord *record; + char *errm; + + /* + * Don't know whether we've caught up yet. We'll set it to true in + * WalSndWaitForWal, if we're actually waiting. We also set to true if + * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - + * i.e. when we're shutting down. + */ + WalSndCaughtUp = false; + + record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); + logical_startptr = InvalidXLogRecPtr; + + /* xlog record was invalid */ + if (errm != NULL) + elog(ERROR, "%s", errm); + + if (record != NULL) + { + LogicalDecodingProcessRecord(logical_decoding_ctx, record); + + sentPtr = logical_decoding_ctx->reader->EndRecPtr; + } + else + { + /* + * If the record we just wanted read is at or beyond the flushed point, + * then we're caught up. + */ + if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) + WalSndCaughtUp = true; + } + + /* Update shared memory status */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } +} + +/* + * Shutdown if the sender is caught up. + * + * NB: This should only be called when the shutdown signal has been received + * from postmaster. + * + * Note that if we determine that there's still more data to send, this + * function will return control to the caller. + */ +static void +WalSndDone(WalSndSendDataCallback send_data) +{ + /* ... let's just be real sure we're caught up ... */ + send_data(); + + if (WalSndCaughtUp && sentPtr == MyWalSnd->flush && + !pq_is_send_pending()) + { + /* Inform the standby that XLOG streaming is done */ + EndCommand("COPY 0", DestRemote); + pq_flush(); + + proc_exit(0); + } + if (!waiting_for_ping_response) + WalSndKeepalive(true); +} + +/* * Returns the latest point in WAL that has been safely flushed to disk, and * can be sent to the standby. This should only be called when in recovery, * ie. we're streaming to a cascaded standby. @@ -2239,6 +2858,38 @@ WalSndKeepalive(bool requestReply) } /* + * Send keepalive message if too much time has elapsed. + */ +static void +WalSndKeepaliveIfNecessary(TimestampTz now) +{ + TimestampTz ping_time; + + if (wal_sender_timeout <= 0) + return; + + if (waiting_for_ping_response) + return; + + /* + * If half of wal_sender_timeout has lapsed without receiving any reply + * from the standby, send a keep-alive message to the standby requesting + * an immediate reply. + */ + ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + if (now >= ping_time) + { + WalSndKeepalive(true); + waiting_for_ping_response = true; + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } +} + +/* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the * future for synchronous replication. diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 3ecc4d3ae0d..89a7c9e15c2 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -729,11 +729,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("remaining connection slots are reserved for non-replication superuser connections"))); - /* - * If walsender, we don't want to connect to any particular database. Just - * finish the backend startup by processing any options from the startup - * packet, and we're done. - */ + /* Check replication permissions needed for walsender processes. */ if (am_walsender) { Assert(!bootstrap); @@ -742,7 +738,16 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ereport(FATAL, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or replication role to start walsender"))); + } + /* + * If this is a plain walsender only supporting physical replication, we + * don't want to connect to any particular database. Just finish the + * backend startup by processing any options from the startup packet, and + * we're done. + */ + if (am_walsender && !am_db_walsender) + { /* process any options passed in the startup packet */ if (MyProcPort != NULL) process_startup_options(MyProcPort, am_superuser); diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 919805f5cfa..1a468fa1b74 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1639,10 +1639,10 @@ BaseBackup(void) progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) != 3) + if (PQntuples(res) != 1 || PQnfields(res) < 3) { fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), progname, PQntuples(res), PQnfields(res), 1, 3); disconnect_and_exit(1); } diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 0f191ce6bb3..2478789ca21 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -275,10 +275,10 @@ StreamLog(void) progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) != 3) + if (PQntuples(res) != 1 || PQnfields(res) < 3) { fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), progname, PQntuples(res), PQnfields(res), 1, 3); disconnect_and_exit(1); } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index ef73b4b166c..febe3d1a2b7 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -563,10 +563,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (PQnfields(res) != 3 || PQntuples(res) != 1) + if (PQntuples(res) != 1 || PQnfields(res) < 3) { fprintf(stderr, - _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), progname, PQntuples(res), PQnfields(res), 1, 3); PQclear(res); return false; diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index b67cf63d766..cff2be6d8f6 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -19,6 +19,7 @@ /* global state */ extern bool am_walsender; extern bool am_cascading_walsender; +extern bool am_db_walsender; extern bool wake_wal_senders; /* user-settable parameters */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f9604541c7a..62a892be3ba 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1909,6 +1909,7 @@ WalRcvData WalRcvState WalSnd WalSndCtlData +WalSndSendDataCallback WalSndState WholeRowVarExprState WindowAgg |