aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/protocol.sgml37
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c61
-rw-r--r--src/backend/replication/repl_gram.y35
-rw-r--r--src/backend/replication/walsender.c40
-rw-r--r--src/bin/pg_basebackup/streamutil.c40
5 files changed, 150 insertions, 63 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 8ed88334442..b95cc88599a 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
</varlistentry>
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
- <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
+ <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> | <literal>LOGICAL</literal> } [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
</term>
<listitem>
@@ -1954,46 +1954,50 @@ The commands accepted in replication mode are:
</para>
</listitem>
</varlistentry>
+ </variablelist>
+
+ <para>The following options are supported:</para>
+ <variablelist>
<varlistentry>
- <term><literal>TWO_PHASE</literal></term>
+ <term><literal>TWO_PHASE [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
- Specify that this logical replication slot supports decoding of two-phase
+ If true, this logical replication slot supports decoding of two-phase
transactions. With this option, two-phase commands like
<literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
The transaction will be decoded and transmitted at
<literal>PREPARE TRANSACTION</literal> time.
+ The default is false.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><literal>RESERVE_WAL</literal></term>
+ <term><literal>RESERVE_WAL [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
- Specify that this physical replication slot reserves <acronym>WAL</acronym>
+ If true, this physical replication slot reserves <acronym>WAL</acronym>
immediately. Otherwise, <acronym>WAL</acronym> is only reserved upon
connection from a streaming replication client.
+ The default is false.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><literal>EXPORT_SNAPSHOT</literal></term>
- <term><literal>NOEXPORT_SNAPSHOT</literal></term>
- <term><literal>USE_SNAPSHOT</literal></term>
+ <term><literal>SNAPSHOT { 'export' | 'use' | 'nothing' }</literal></term>
<listitem>
<para>
Decides what to do with the snapshot created during logical slot
- initialization. <literal>EXPORT_SNAPSHOT</literal>, which is the default,
+ initialization. <literal>'export'</literal>, which is the default,
will export the snapshot for use in other sessions. This option can't
- be used inside a transaction. <literal>USE_SNAPSHOT</literal> will use the
+ be used inside a transaction. <literal>'use'</literal> will use the
snapshot for the current transaction executing the command. This
option must be used in a transaction, and
<literal>CREATE_REPLICATION_SLOT</literal> must be the first command
- run in that transaction. Finally, <literal>NOEXPORT_SNAPSHOT</literal> will
+ run in that transaction. Finally, <literal>'nothing'</literal> will
just use the snapshot for logical decoding as normal but won't do
anything else with it.
</para>
@@ -2053,6 +2057,17 @@ The commands accepted in replication mode are:
</varlistentry>
<varlistentry>
+ <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
+ </term>
+ <listitem>
+ <para>
+ For compatibility with older releases, this alternative syntax for
+ the <literal>CREATE_REPLICATION_SLOT</literal> command is still supported.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
<indexterm><primary>START_REPLICATION</primary></indexterm>
</term>
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 19ea159af4f..5c6e56a5b24 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
PGresult *res;
StringInfoData cmd;
char *snapshot;
+ int use_new_options_syntax;
+
+ use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
initStringInfo(&cmd);
@@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
if (conn->logical)
{
- appendStringInfoString(&cmd, " LOGICAL pgoutput");
+ appendStringInfoString(&cmd, " LOGICAL pgoutput ");
+ if (use_new_options_syntax)
+ appendStringInfoChar(&cmd, '(');
if (two_phase)
- appendStringInfoString(&cmd, " TWO_PHASE");
+ {
+ appendStringInfoString(&cmd, "TWO_PHASE");
+ if (use_new_options_syntax)
+ appendStringInfoString(&cmd, ", ");
+ else
+ appendStringInfoChar(&cmd, ' ');
+ }
- switch (snapshot_action)
+ if (use_new_options_syntax)
{
- case CRS_EXPORT_SNAPSHOT:
- appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
- break;
- case CRS_NOEXPORT_SNAPSHOT:
- appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
- break;
- case CRS_USE_SNAPSHOT:
- appendStringInfoString(&cmd, " USE_SNAPSHOT");
- break;
+ switch (snapshot_action)
+ {
+ case CRS_EXPORT_SNAPSHOT:
+ appendStringInfoString(&cmd, "SNAPSHOT 'export'");
+ break;
+ case CRS_NOEXPORT_SNAPSHOT:
+ appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
+ break;
+ case CRS_USE_SNAPSHOT:
+ appendStringInfoString(&cmd, "SNAPSHOT 'use'");
+ break;
+ }
}
+ else
+ {
+ switch (snapshot_action)
+ {
+ case CRS_EXPORT_SNAPSHOT:
+ appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
+ break;
+ case CRS_NOEXPORT_SNAPSHOT:
+ appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
+ break;
+ case CRS_USE_SNAPSHOT:
+ appendStringInfoString(&cmd, "USE_SNAPSHOT");
+ break;
+ }
+ }
+
+ if (use_new_options_syntax)
+ appendStringInfoChar(&cmd, ')');
}
else
{
- appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
+ if (use_new_options_syntax)
+ appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
+ else
+ appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
}
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 3b59d62ed86..126380e2df7 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void);
%type <node> plugin_opt_arg
%type <str> opt_slot var_name ident_or_keyword
%type <boolval> opt_temporary
-%type <list> create_slot_opt_list
-%type <defelt> create_slot_opt
+%type <list> create_slot_options create_slot_legacy_opt_list
+%type <defelt> create_slot_legacy_opt
%%
@@ -243,8 +243,8 @@ base_backup_legacy_opt:
;
create_replication_slot:
- /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
- K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
+ /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */
+ K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
@@ -254,8 +254,8 @@ create_replication_slot:
cmd->options = $5;
$$ = (Node *) cmd;
}
- /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
+ /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */
+ | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
@@ -268,28 +268,33 @@ create_replication_slot:
}
;
-create_slot_opt_list:
- create_slot_opt_list create_slot_opt
+create_slot_options:
+ '(' generic_option_list ')' { $$ = $2; }
+ | create_slot_legacy_opt_list { $$ = $1; }
+ ;
+
+create_slot_legacy_opt_list:
+ create_slot_legacy_opt_list create_slot_legacy_opt
{ $$ = lappend($1, $2); }
| /* EMPTY */
{ $$ = NIL; }
;
-create_slot_opt:
+create_slot_legacy_opt:
K_EXPORT_SNAPSHOT
{
- $$ = makeDefElem("export_snapshot",
- (Node *)makeInteger(true), -1);
+ $$ = makeDefElem("snapshot",
+ (Node *)makeString("export"), -1);
}
| K_NOEXPORT_SNAPSHOT
{
- $$ = makeDefElem("export_snapshot",
- (Node *)makeInteger(false), -1);
+ $$ = makeDefElem("snapshot",
+ (Node *)makeString("nothing"), -1);
}
| K_USE_SNAPSHOT
{
- $$ = makeDefElem("use_snapshot",
- (Node *)makeInteger(true), -1);
+ $$ = makeDefElem("snapshot",
+ (Node *)makeString("use"), -1);
}
| K_RESERVE_WAL
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389d..b811a5c0ef2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
{
DefElem *defel = (DefElem *) lfirst(lc);
- if (strcmp(defel->defname, "export_snapshot") == 0)
+ if (strcmp(defel->defname, "snapshot") == 0)
{
+ char *action;
+
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
+ action = defGetString(defel);
snapshot_action_given = true;
- *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
- CRS_NOEXPORT_SNAPSHOT;
- }
- else if (strcmp(defel->defname, "use_snapshot") == 0)
- {
- if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+
+ if (strcmp(action, "export") == 0)
+ *snapshot_action = CRS_EXPORT_SNAPSHOT;
+ else if (strcmp(action, "nothing") == 0)
+ *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
+ else if (strcmp(action, "use") == 0)
+ *snapshot_action = CRS_USE_SNAPSHOT;
+ else
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
+ defel->defname, action)));
- snapshot_action_given = true;
- *snapshot_action = CRS_USE_SNAPSHOT;
}
else if (strcmp(defel->defname, "reserve_wal") == 0)
{
@@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
errmsg("conflicting or redundant options")));
reserve_wal_given = true;
- *reserve_wal = true;
+ *reserve_wal = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
two_phase_given = true;
- *two_phase = true;
+ *two_phase = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
@@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must not be called inside a transaction",
- "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
+ "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
need_full_snapshot = true;
}
@@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must be called inside a transaction",
- "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+ "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
if (XactIsoLevel != XACT_REPEATABLE_READ)
ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
- "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+ "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
if (FirstSnapshotSet)
ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must be called before any query",
- "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+ "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
if (IsSubTransaction())
ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must not be called in a subtransaction",
- "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
+ "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
need_full_snapshot = true;
}
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index d782b81adc6..37237cd5d95 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -490,6 +490,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
{
PQExpBuffer query;
PGresult *res;
+ bool use_new_option_syntax = (PQserverVersion(conn) >= 150000);
query = createPQExpBuffer();
@@ -498,27 +499,54 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
Assert(!(two_phase && is_physical));
Assert(slot_name != NULL);
- /* Build query */
+ /* Build base portion of query */
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
if (is_temporary)
appendPQExpBufferStr(query, " TEMPORARY");
if (is_physical)
- {
appendPQExpBufferStr(query, " PHYSICAL");
+ else
+ appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+
+ /* Add any requested options */
+ if (use_new_option_syntax)
+ appendPQExpBufferStr(query, " (");
+ if (is_physical)
+ {
if (reserve_wal)
- appendPQExpBufferStr(query, " RESERVE_WAL");
+ AppendPlainCommandOption(query, use_new_option_syntax,
+ "RESERVE_WAL");
}
else
{
- appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
if (two_phase && PQserverVersion(conn) >= 150000)
- appendPQExpBufferStr(query, " TWO_PHASE");
+ AppendPlainCommandOption(query, use_new_option_syntax,
+ "TWO_PHASE");
if (PQserverVersion(conn) >= 100000)
+ {
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
- appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
+ if (use_new_option_syntax)
+ AppendStringCommandOption(query, use_new_option_syntax,
+ "SNAPSHOT", "nothing");
+ else
+ AppendPlainCommandOption(query, use_new_option_syntax,
+ "NOEXPORT_SNAPSHOT");
+ }
+ }
+ if (use_new_option_syntax)
+ {
+ /* Suppress option list if it would be empty, otherwise terminate */
+ if (query->data[query->len - 1] == '(')
+ {
+ query->len -= 2;
+ query->data[query->len] = '\0';
+ }
+ else
+ appendPQExpBufferChar(query, ')');
}
+ /* Now run the query */
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{