diff options
-rw-r--r-- | doc/src/sgml/logicaldecoding.sgml | 23 | ||||
-rw-r--r-- | doc/src/sgml/protocol.sgml | 16 | ||||
-rw-r--r-- | doc/src/sgml/ref/pg_recvlogical.sgml | 16 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 6 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 1 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 18 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 2 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_receivewal.c | 2 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_recvlogical.c | 19 | ||||
-rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 6 | ||||
-rw-r--r-- | src/bin/pg_basebackup/streamutil.h | 2 | ||||
-rw-r--r-- | src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 45 |
12 files changed, 143 insertions, 13 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5b8065901a4..985db5ca11e 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -144,16 +144,19 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot'); </programlisting> <para> - The following example shows how logical decoding is controlled over the + The following examples shows how logical decoding is controlled over the streaming replication protocol, using the program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL distribution. This requires that client authentication is set up to allow replication connections (see <xref linkend="streaming-replication-authentication"/>) and that <varname>max_wal_senders</varname> is set sufficiently high to allow - an additional connection. + an additional connection. The second example shows how to stream two-phase + transactions. Before you use two-phase commands, you must set + <xref linkend="guc-max-prepared-transactions"/> to atleast 1. </para> <programlisting> +Example 1: $ pg_recvlogical -d postgres --slot=test --create-slot $ pg_recvlogical -d postgres --slot=test --start -f - <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo> @@ -164,6 +167,22 @@ table public.data: INSERT: id[integer]:4 data[text]:'4' COMMIT 693 <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo> $ pg_recvlogical -d postgres --slot=test --drop-slot + +Example 2: +$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase +$ pg_recvlogical -d postgres --slot=test --start -f - +<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo> +$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';" +$ fg +BEGIN 694 +table public.data: INSERT: id[integer]:5 data[text]:'5' +PREPARE TRANSACTION 'test', txid 694 +<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo> +$ psql -d postgres -c "COMMIT PREPARED 'test';" +$ fg +COMMIT PREPARED 'test', txid 694 +<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo> +$ pg_recvlogical -d postgres --slot=test --drop-slot </programlisting> <para> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 01e87617f40..a3562f3d089 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1914,7 +1914,7 @@ The commands accepted in replication mode are: </varlistentry> <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT"> - <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> ] } + <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] } <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm> </term> <listitem> @@ -1956,6 +1956,20 @@ The commands accepted in replication mode are: </varlistentry> <varlistentry> + <term><literal>TWO_PHASE</literal></term> + <listitem> + <para> + Specify that this logical replication slot supports decoding of two-phase + transactions. With this option, two-phase commands like + <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal> + and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted. + The transaction will be decoded and transmitted at + <literal>PREPARE TRANSACTION</literal> time. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>RESERVE_WAL</literal></term> <listitem> <para> diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml index 6b1d98d06ef..1a882254095 100644 --- a/doc/src/sgml/ref/pg_recvlogical.sgml +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -65,6 +65,11 @@ PostgreSQL documentation <option>--plugin</option>, for the database specified by <option>--dbname</option>. </para> + + <para> + The <option>--two-phase</option> can be specified with + <option>--create-slot</option> to enable two-phase decoding. + </para> </listitem> </varlistentry> @@ -257,6 +262,17 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>-t</option></term> + <term><option>--two-phase</option></term> + <listitem> + <para> + Enables two-phase decoding. This option should only be specified with + <option>--create-slot</option> + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>-v</option></term> <term><option>--verbose</option></term> <listitem> diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index eb283a86327..e1e8ec29cc4 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void); %token K_SLOT %token K_RESERVE_WAL %token K_TEMPORARY +%token K_TWO_PHASE %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT @@ -283,6 +284,11 @@ create_slot_opt: $$ = makeDefElem("reserve_wal", (Node *)makeInteger(true), -1); } + | K_TWO_PHASE + { + $$ = makeDefElem("two_phase", + (Node *)makeInteger(true), -1); + } ; /* DROP_REPLICATION_SLOT slot */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index dcc3c3fc515..c038a636c38 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } +TWO_PHASE { return K_TWO_PHASE; } EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 32245363561..92c755f346e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, - CRSSnapshotAction *snapshot_action) + CRSSnapshotAction *snapshot_action, + bool *two_phase) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; + bool two_phase_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, reserve_wal_given = true; *reserve_wal = true; } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + *two_phase = true; + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char xloc[MAXFNAMELEN]; char *slot_name; bool reserve_wal = false; + bool two_phase = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); /* setup state for WalSndSegmentOpen */ sendTimeLineIsHistoric = false; @@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - false); + two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 16d8929b238..8bb0acf498e 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) if (temp_replication_slot || create_slot) { if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL, - temp_replication_slot, true, true, false)) + temp_replication_slot, true, true, false, false)) exit(1); if (verbose) diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 0d15012c295..c1334fad357 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -741,7 +741,7 @@ main(int argc, char **argv) pg_log_info("creating replication slot \"%s\"", replication_slot); if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false, - slot_exists_ok)) + slot_exists_ok, false)) exit(1); exit(0); } diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 5efec160e88..76bd153fac2 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -35,6 +35,7 @@ /* Global Options */ static char *outfile = NULL; static int verbose = 0; +static bool two_phase = false; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */ @@ -93,6 +94,7 @@ usage(void) printf(_(" -s, --status-interval=SECS\n" " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n")); + printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); @@ -678,6 +680,7 @@ main(int argc, char **argv) {"fsync-interval", required_argument, NULL, 'F'}, {"no-loop", no_argument, NULL, 'n'}, {"verbose", no_argument, NULL, 'v'}, + {"two-phase", no_argument, NULL, 't'}, {"version", no_argument, NULL, 'V'}, {"help", no_argument, NULL, '?'}, /* connection options */ @@ -726,7 +729,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:", + while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:", long_options, &option_index)) != -1) { switch (c) @@ -749,6 +752,9 @@ main(int argc, char **argv) case 'v': verbose++; break; + case 't': + two_phase = true; + break; /* connection options */ case 'd': dbname = pg_strdup(optarg); @@ -920,6 +926,15 @@ main(int argc, char **argv) exit(1); } + if (two_phase && !do_create_slot) + { + pg_log_error("--two-phase may only be specified with --create-slot"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + #ifndef WIN32 pqsignal(SIGINT, sigint_handler); pqsignal(SIGHUP, sighup_handler); @@ -976,7 +991,7 @@ main(int argc, char **argv) pg_log_info("creating replication slot \"%s\"", replication_slot); if (!CreateReplicationSlot(conn, replication_slot, plugin, false, - false, false, slot_exists_ok)) + false, false, slot_exists_ok, two_phase)) exit(1); startpos = InvalidXLogRecPtr; } diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 99daf0e9727..f5b3b476e52 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, - bool slot_exists_ok) + bool slot_exists_ok, bool two_phase) { PQExpBuffer query; PGresult *res; @@ -495,6 +495,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, Assert((is_physical && plugin == NULL) || (!is_physical && plugin != NULL)); + Assert(!(two_phase && is_physical)); Assert(slot_name != NULL); /* Build query */ @@ -510,6 +511,9 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, else { appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin); + if (two_phase && PQserverVersion(conn) >= 150000) + appendPQExpBufferStr(query, " TWO_PHASE"); + if (PQserverVersion(conn) >= 100000) /* pg_recvlogical doesn't use an exported snapshot, so suppress */ appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT"); diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 10f87ad0c14..504803b9763 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -34,7 +34,7 @@ extern PGconn *GetConnection(void); extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, - bool slot_exists_ok); + bool slot_exists_ok, bool two_phase); extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); extern bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl index 53f41814b0b..bbbf9e21dba 100644 --- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl +++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl @@ -5,7 +5,7 @@ use strict; use warnings; use TestLib; use PostgresNode; -use Test::More tests => 15; +use Test::More tests => 20; program_help_ok('pg_recvlogical'); program_version_ok('pg_recvlogical'); @@ -22,6 +22,7 @@ max_replication_slots = 4 max_wal_senders = 4 log_min_messages = 'debug1' log_error_verbosity = verbose +max_prepared_transactions = 10 }); $node->dump_info; $node->start; @@ -63,3 +64,45 @@ $node->command_ok( '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' ], 'replayed a transaction'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--drop-slot' + ], + 'slot dropped'); + +#test with two-phase option enabled +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot', '--two-phase' + ], + 'slot with two-phase created'); + +$slot = $node->slot('test'); +isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot'); + +$node->safe_psql('postgres', + "BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'"); +$node->safe_psql('postgres', + "COMMIT PREPARED 'test'"); +$nextlsn = + $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()'); +chomp($nextlsn); + +$node->command_fails( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-' + ], + 'incorrect usage'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'replayed a two-phase transaction'); |