aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-01-29 09:10:00 +0530
committerAmit Kapila <akapila@postgresql.org>2024-01-29 09:37:23 +0530
commit73292404370c9900a96e2bebdc7144f7010339cf (patch)
tree4a68dc7571398d93f8ea5c7e6a865269cef4b5f5
parent08e6344fd6423210b339e92c069bb979ba4e7cd6 (diff)
downloadpostgresql-73292404370c9900a96e2bebdc7144f7010339cf.tar.gz
postgresql-73292404370c9900a96e2bebdc7144f7010339cf.zip
Allow setting failover property in the replication command.
This commit implements a new replication command called ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has been extended to support the failover option. These new additions allow the modification of the failover property of a replication slot on the publisher. A subsequent commit will make use of these commands in subscription commands and will add the tests as well to cover the functionality added/changed by this commit. Author: Hou Zhijie, Shveta Malik Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
-rw-r--r--doc/src/sgml/protocol.sgml50
-rw-r--r--src/backend/commands/subscriptioncmds.c2
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c44
-rw-r--r--src/backend/replication/logical/tablesync.c1
-rw-r--r--src/backend/replication/repl_gram.y20
-rw-r--r--src/backend/replication/repl_scanner.l2
-rw-r--r--src/backend/replication/slot.c25
-rw-r--r--src/backend/replication/walreceiver.c2
-rw-r--r--src/backend/replication/walsender.c62
-rw-r--r--src/include/nodes/replnodes.h12
-rw-r--r--src/include/replication/slot.h1
-rw-r--r--src/include/replication/walreceiver.h18
-rw-r--r--src/tools/pgindent/typedefs.list2
13 files changed, 230 insertions, 11 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 6c3e8a631d7..bb4fef1f519 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
+ <listitem>
+ <para>
+ If true, the slot is enabled to be synced to the standbys.
+ The default is false.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
<para>
@@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</listitem>
</varlistentry>
+ <varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
+ <term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
+ <indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
+ </term>
+ <listitem>
+ <para>
+ Change the definition of a replication slot.
+ See <xref linkend="streaming-replication-slots"/> for more about
+ replication slots. This command is currently only supported for logical
+ replication slots.
+ </para>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">slot_name</replaceable></term>
+ <listitem>
+ <para>
+ The name of the slot to alter. Must be a valid replication slot
+ name (see <xref linkend="streaming-replication-slots-manipulation"/>).
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <para>The following option is supported:</para>
+
+ <variablelist>
+ <varlistentry>
+ <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
+ <listitem>
+ <para>
+ If true, the slot is enabled to be synced to the standbys.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ </listitem>
+ </varlistentry>
+
<varlistentry id="protocol-replication-read-replication-slot">
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 75e6cd8ae3c..eaf2ec3b362 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
- CRS_NOEXPORT_SNAPSHOT, NULL);
+ false, CRS_NOEXPORT_SNAPSHOT, NULL);
if (twophase_enabled)
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 77669074e82..2439733b55b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
+ bool failover,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
+static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
+ bool failover);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
const char *query,
@@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
+ .walrcv_alter_slot = libpqrcv_alter_slot,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
@@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
- bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
- XLogRecPtr *lsn)
+ bool temporary, bool two_phase, bool failover,
+ CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
@@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
appendStringInfoChar(&cmd, ' ');
}
+ if (failover)
+ {
+ appendStringInfoString(&cmd, "FAILOVER");
+ if (use_new_options_syntax)
+ appendStringInfoString(&cmd, ", ");
+ else
+ appendStringInfoChar(&cmd, ' ');
+ }
+
if (use_new_options_syntax)
{
switch (snapshot_action)
@@ -1038,6 +1051,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
}
/*
+ * Change the definition of the replication slot.
+ */
+static void
+libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
+ bool failover)
+{
+ StringInfoData cmd;
+ PGresult *res;
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
+ quote_identifier(slotname),
+ failover ? "true" : "false");
+
+ res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+ pfree(cmd.data);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not alter replication slot \"%s\": %s",
+ slotname, pchomp(PQerrorMessage(conn->streamConn)))));
+
+ PQclear(res);
+}
+
+/*
* Return PID of remote backend process.
*/
static pid_t
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 06d5b3df33a..4207b9356c5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
walrcv_create_slot(LogRepWorkerWalRcvConn,
slotname, false /* permanent */ , false /* two_phase */ ,
+ false,
CRS_USE_SNAPSHOT, origin_startpos);
/*
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 95e126eb4dc..7474f5bd671 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ Node *replication_parse_result;
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
%token K_DROP_REPLICATION_SLOT
+%token K_ALTER_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_WAIT
%token K_TIMELINE
@@ -80,8 +81,9 @@ Node *replication_parse_result;
%type <node> command
%type <node> base_backup start_replication start_logical_replication
- create_replication_slot drop_replication_slot identify_system
- read_replication_slot timeline_history show upload_manifest
+ create_replication_slot drop_replication_slot
+ alter_replication_slot identify_system read_replication_slot
+ timeline_history show upload_manifest
%type <list> generic_option_list
%type <defelt> generic_option
%type <uintval> opt_timeline
@@ -112,6 +114,7 @@ command:
| start_logical_replication
| create_replication_slot
| drop_replication_slot
+ | alter_replication_slot
| read_replication_slot
| timeline_history
| show
@@ -259,6 +262,18 @@ drop_replication_slot:
}
;
+/* ALTER_REPLICATION_SLOT slot (options) */
+alter_replication_slot:
+ K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
+ {
+ AlterReplicationSlotCmd *cmd;
+ cmd = makeNode(AlterReplicationSlotCmd);
+ cmd->slotname = $2;
+ cmd->options = $4;
+ $$ = (Node *) cmd;
+ }
+ ;
+
/*
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/
@@ -410,6 +425,7 @@ ident_or_keyword:
| K_START_REPLICATION { $$ = "start_replication"; }
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
+ | K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
| K_WAIT { $$ = "wait"; }
| K_TIMELINE { $$ = "timeline"; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 6fa625617bd..e7def800655 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
+ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; }
@@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
case K_START_REPLICATION:
case K_CREATE_REPLICATION_SLOT:
case K_DROP_REPLICATION_SLOT:
+ case K_ALTER_REPLICATION_SLOT:
case K_READ_REPLICATION_SLOT:
case K_TIMELINE_HISTORY:
case K_UPLOAD_MANIFEST:
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 02a14ec210e..f2781d0455a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -684,6 +684,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
}
/*
+ * Change the definition of the slot identified by the specified name.
+ */
+void
+ReplicationSlotAlter(const char *name, bool failover)
+{
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotAcquire(name, false);
+
+ if (SlotIsPhysical(MyReplicationSlot))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use %s with a physical replication slot",
+ "ALTER_REPLICATION_SLOT"));
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->data.failover = failover;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ ReplicationSlotRelease();
+}
+
+/*
* Permanently drop the currently acquired replication slot.
*/
static void
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 728059518e1..e29a6196a3e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -387,7 +387,7 @@ WalReceiverMain(void)
"pg_walreceiver_%lld",
(long long int) walrcv_get_backend_pid(wrconn));
- walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
+ walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
SpinLockAcquire(&walrcv->mutex);
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aa80f3de20f..77c8baa32a4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1126,12 +1126,13 @@ static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
CRSSnapshotAction *snapshot_action,
- bool *two_phase)
+ bool *two_phase, bool *failover)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
bool two_phase_given = false;
+ bool failover_given = false;
/* Parse options */
foreach(lc, cmd->options)
@@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
two_phase_given = true;
*two_phase = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "failover") == 0)
+ {
+ if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ failover_given = true;
+ *failover = defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char *slot_name;
bool reserve_wal = false;
bool two_phase = false;
+ bool failover = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
@@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
- parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
+ &failover);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
@@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase, false);
+ two_phase, failover);
/*
* Do options check early so that we can bail before calling the
@@ -1399,6 +1411,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
}
/*
+ * Process extra options given to ALTER_REPLICATION_SLOT.
+ */
+static void
+ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+{
+ bool failover_given = false;
+
+ /* Parse options */
+ foreach_ptr(DefElem, defel, cmd->options)
+ {
+ if (strcmp(defel->defname, "failover") == 0)
+ {
+ if (failover_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ failover_given = true;
+ *failover = defGetBoolean(defel);
+ }
+ else
+ elog(ERROR, "unrecognized option: %s", defel->defname);
+ }
+}
+
+/*
+ * Change the definition of a replication slot.
+ */
+static void
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
+{
+ bool failover = false;
+
+ ParseAlterReplSlotOptions(cmd, &failover);
+ ReplicationSlotAlter(cmd->slotname, failover);
+}
+
+/*
* Load previously initiated logical slot and prepare for sending data (via
* WalSndLoop).
*/
@@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
+ case T_AlterReplicationSlotCmd:
+ cmdtag = "ALTER_REPLICATION_SLOT";
+ set_ps_display(cmdtag);
+ AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
+ break;
+
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index af0a333f1a8..ed23333e928 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -73,6 +73,18 @@ typedef struct DropReplicationSlotCmd
/* ----------------------
+ * ALTER_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct AlterReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+ List *options;
+} AlterReplicationSlotCmd;
+
+
+/* ----------------------
* START_REPLICATION command
* ----------------------
*/
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index db9bb222661..da4c7764921 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -227,6 +227,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
bool two_phase, bool failover);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
+extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb8..f566a99ba16 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -355,10 +355,21 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
+ bool failover,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
/*
+ * walrcv_alter_slot_fn
+ *
+ * Change the definition of a replication slot. Currently, it only supports
+ * changing the failover property of the slot.
+ */
+typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
+ const char *slotname,
+ bool failover);
+
+/*
* walrcv_get_backend_pid_fn
*
* Returns the PID of the remote backend process.
@@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
+ walrcv_alter_slot_fn walrcv_alter_slot;
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
@@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
+#define walrcv_alter_slot(conn, slotname, failover) \
+ WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index dc3b0ef8710..90b37b919c2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -85,6 +85,7 @@ AlterOwnerStmt
AlterPolicyStmt
AlterPublicationAction
AlterPublicationStmt
+AlterReplicationSlotCmd
AlterRoleSetStmt
AlterRoleStmt
AlterSeqStmt
@@ -3879,6 +3880,7 @@ varattrib_1b_e
varattrib_4b
vbits
verifier_context
+walrcv_alter_slot_fn
walrcv_check_conninfo_fn
walrcv_connect_fn
walrcv_create_slot_fn