aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-06-30 08:45:47 +0530
committerAmit Kapila <akapila@postgresql.org>2021-06-30 08:45:47 +0530
commitcda03cfed6b8bd5f64567bccbc9578fba035691e (patch)
tree5bfc6c435b73b5c1772d0269e5ddd4934dabd356 /src
parent17707c059cf4bf610e3b1833df5ca17cf223fe5f (diff)
downloadpostgresql-cda03cfed6b8bd5f64567bccbc9578fba035691e.tar.gz
postgresql-cda03cfed6b8bd5f64567bccbc9578fba035691e.zip
Allow enabling two-phase option via replication protocol.
Extend the replication command CREATE_REPLICATION_SLOT to support the TWO_PHASE option. This will allow decoding commands like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED for slots created with this option. The decoding of the transaction happens at prepare command. This patch also adds support of two-phase in pg_recvlogical via a new option --two-phase. This option will also be used by future patches that allow streaming of transactions at prepare time for built-in logical replication. With this, the out-of-core logical replication solutions can enable replication of two-phase transactions via replication protocol. Author: Ajin Cherian Reviewed-By: Jeff Davis, Vignesh C, Amit Kapila Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/64b9f783c6e125f18f88fbc0c0234e34e71d8639.camel@j-davis.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/repl_gram.y6
-rw-r--r--src/backend/replication/repl_scanner.l1
-rw-r--r--src/backend/replication/walsender.c18
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c2
-rw-r--r--src/bin/pg_basebackup/pg_receivewal.c2
-rw-r--r--src/bin/pg_basebackup/pg_recvlogical.c19
-rw-r--r--src/bin/pg_basebackup/streamutil.c6
-rw-r--r--src/bin/pg_basebackup/streamutil.h2
-rw-r--r--src/bin/pg_basebackup/t/030_pg_recvlogical.pl45
9 files changed, 91 insertions, 10 deletions
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index eb283a86327..e1e8ec29cc4 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_TWO_PHASE
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
@@ -283,6 +284,11 @@ create_slot_opt:
$$ = makeDefElem("reserve_wal",
(Node *)makeInteger(true), -1);
}
+ | K_TWO_PHASE
+ {
+ $$ = makeDefElem("two_phase",
+ (Node *)makeInteger(true), -1);
+ }
;
/* DROP_REPLICATION_SLOT slot */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index dcc3c3fc515..c038a636c38 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -103,6 +103,7 @@ RESERVE_WAL { return K_RESERVE_WAL; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+TWO_PHASE { return K_TWO_PHASE; }
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 32245363561..92c755f346e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
- CRSSnapshotAction *snapshot_action)
+ CRSSnapshotAction *snapshot_action,
+ bool *two_phase)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
+ bool two_phase_given = false;
/* Parse options */
foreach(lc, cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
reserve_wal_given = true;
*reserve_wal = true;
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_given = true;
+ *two_phase = true;
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char xloc[MAXFNAMELEN];
char *slot_name;
bool reserve_wal = false;
+ bool two_phase = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
- parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
/* setup state for WalSndSegmentOpen */
sendTimeLineIsHistoric = false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- false);
+ two_phase);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 16d8929b238..8bb0acf498e 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
if (temp_replication_slot || create_slot)
{
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
- temp_replication_slot, true, true, false))
+ temp_replication_slot, true, true, false, false))
exit(1);
if (verbose)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 0d15012c295..c1334fad357 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -741,7 +741,7 @@ main(int argc, char **argv)
pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
- slot_exists_ok))
+ slot_exists_ok, false))
exit(1);
exit(0);
}
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 5efec160e88..76bd153fac2 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -35,6 +35,7 @@
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
+static bool two_phase = false;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -93,6 +94,7 @@ usage(void)
printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
+ printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
@@ -678,6 +680,7 @@ main(int argc, char **argv)
{"fsync-interval", required_argument, NULL, 'F'},
{"no-loop", no_argument, NULL, 'n'},
{"verbose", no_argument, NULL, 'v'},
+ {"two-phase", no_argument, NULL, 't'},
{"version", no_argument, NULL, 'V'},
{"help", no_argument, NULL, '?'},
/* connection options */
@@ -726,7 +729,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -749,6 +752,9 @@ main(int argc, char **argv)
case 'v':
verbose++;
break;
+ case 't':
+ two_phase = true;
+ break;
/* connection options */
case 'd':
dbname = pg_strdup(optarg);
@@ -920,6 +926,15 @@ main(int argc, char **argv)
exit(1);
}
+ if (two_phase && !do_create_slot)
+ {
+ pg_log_error("--two-phase may only be specified with --create-slot");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
@@ -976,7 +991,7 @@ main(int argc, char **argv)
pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
- false, false, slot_exists_ok))
+ false, false, slot_exists_ok, two_phase))
exit(1);
startpos = InvalidXLogRecPtr;
}
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 99daf0e9727..f5b3b476e52 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
bool is_temporary, bool is_physical, bool reserve_wal,
- bool slot_exists_ok)
+ bool slot_exists_ok, bool two_phase)
{
PQExpBuffer query;
PGresult *res;
@@ -495,6 +495,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
Assert((is_physical && plugin == NULL) ||
(!is_physical && plugin != NULL));
+ Assert(!(two_phase && is_physical));
Assert(slot_name != NULL);
/* Build query */
@@ -510,6 +511,9 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
else
{
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+ if (two_phase && PQserverVersion(conn) >= 150000)
+ appendPQExpBufferStr(query, " TWO_PHASE");
+
if (PQserverVersion(conn) >= 100000)
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 10f87ad0c14..504803b9763 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -34,7 +34,7 @@ extern PGconn *GetConnection(void);
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
const char *plugin, bool is_temporary,
bool is_physical, bool reserve_wal,
- bool slot_exists_ok);
+ bool slot_exists_ok, bool two_phase);
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
TimeLineID *starttli,
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
index 53f41814b0b..bbbf9e21dba 100644
--- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -5,7 +5,7 @@ use strict;
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 15;
+use Test::More tests => 20;
program_help_ok('pg_recvlogical');
program_version_ok('pg_recvlogical');
@@ -22,6 +22,7 @@ max_replication_slots = 4
max_wal_senders = 4
log_min_messages = 'debug1'
log_error_verbosity = verbose
+max_prepared_transactions = 10
});
$node->dump_info;
$node->start;
@@ -63,3 +64,45 @@ $node->command_ok(
'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
],
'replayed a transaction');
+
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S',
+ 'test', '-d',
+ $node->connstr('postgres'), '--drop-slot'
+ ],
+ 'slot dropped');
+
+#test with two-phase option enabled
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S',
+ 'test', '-d',
+ $node->connstr('postgres'), '--create-slot', '--two-phase'
+ ],
+ 'slot with two-phase created');
+
+$slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->safe_psql('postgres',
+ "BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
+$node->safe_psql('postgres',
+ "COMMIT PREPARED 'test'");
+$nextlsn =
+ $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
+chomp($nextlsn);
+
+$node->command_fails(
+ [
+ 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+ '--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-'
+ ],
+ 'incorrect usage');
+
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+ '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+ ],
+ 'replayed a two-phase transaction');