diff options
author | Amit Kapila <akapila@postgresql.org> | 2024-01-29 09:10:00 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2024-01-29 09:37:23 +0530 |
commit | 73292404370c9900a96e2bebdc7144f7010339cf (patch) | |
tree | 4a68dc7571398d93f8ea5c7e6a865269cef4b5f5 | |
parent | 08e6344fd6423210b339e92c069bb979ba4e7cd6 (diff) | |
download | postgresql-73292404370c9900a96e2bebdc7144f7010339cf.tar.gz postgresql-73292404370c9900a96e2bebdc7144f7010339cf.zip |
Allow setting failover property in the replication command.
This commit implements a new replication command called
ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named
walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has
been extended to support the failover option.
These new additions allow the modification of the failover property of a
replication slot on the publisher. A subsequent commit will make use of
these commands in subscription commands and will add the tests as well to
cover the functionality added/changed by this commit.
Author: Hou Zhijie, Shveta Malik
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
-rw-r--r-- | doc/src/sgml/protocol.sgml | 50 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 2 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 44 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 1 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 20 | ||||
-rw-r--r-- | src/backend/replication/repl_scanner.l | 2 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 25 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 2 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 62 | ||||
-rw-r--r-- | src/include/nodes/replnodes.h | 12 | ||||
-rw-r--r-- | src/include/replication/slot.h | 1 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 18 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 2 |
13 files changed, 230 insertions, 11 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 6c3e8a631d7..bb4fef1f519 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term> + <listitem> + <para> + If true, the slot is enabled to be synced to the standbys. + The default is false. + </para> + </listitem> + </varlistentry> </variablelist> <para> @@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </listitem> </varlistentry> + <varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT"> + <term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] ) + <indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm> + </term> + <listitem> + <para> + Change the definition of a replication slot. + See <xref linkend="streaming-replication-slots"/> for more about + replication slots. This command is currently only supported for logical + replication slots. + </para> + + <variablelist> + <varlistentry> + <term><replaceable class="parameter">slot_name</replaceable></term> + <listitem> + <para> + The name of the slot to alter. Must be a valid replication slot + name (see <xref linkend="streaming-replication-slots-manipulation"/>). + </para> + </listitem> + </varlistentry> + </variablelist> + + <para>The following option is supported:</para> + + <variablelist> + <varlistentry> + <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term> + <listitem> + <para> + If true, the slot is enabled to be synced to the standbys. + </para> + </listitem> + </varlistentry> + </variablelist> + + </listitem> + </varlistentry> + <varlistentry id="protocol-replication-read-replication-slot"> <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 75e6cd8ae3c..eaf2ec3b362 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); + false, CRS_NOEXPORT_SNAPSHOT, NULL); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 77669074e82..2439733b55b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_alter_slot = libpqrcv_alter_slot, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + bool temporary, bool two_phase, bool failover, + CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoChar(&cmd, ' '); } + if (failover) + { + appendStringInfoString(&cmd, "FAILOVER"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } + if (use_new_options_syntax) { switch (snapshot_action) @@ -1038,6 +1051,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, } /* + * Change the definition of the replication slot. + */ +static void +libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover) +{ + StringInfoData cmd; + PGresult *res; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", + quote_identifier(slotname), + failover ? "true" : "false"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not alter replication slot \"%s\": %s", + slotname, pchomp(PQerrorMessage(conn->streamConn))))); + + PQclear(res); +} + +/* * Return PID of remote backend process. */ static pid_t diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 06d5b3df33a..4207b9356c5 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , + false, CRS_USE_SNAPSHOT, origin_startpos); /* diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 95e126eb4dc..7474f5bd671 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ Node *replication_parse_result; %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT +%token K_ALTER_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_WAIT %token K_TIMELINE @@ -80,8 +81,9 @@ Node *replication_parse_result; %type <node> command %type <node> base_backup start_replication start_logical_replication - create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show upload_manifest + create_replication_slot drop_replication_slot + alter_replication_slot identify_system read_replication_slot + timeline_history show upload_manifest %type <list> generic_option_list %type <defelt> generic_option %type <uintval> opt_timeline @@ -112,6 +114,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | alter_replication_slot | read_replication_slot | timeline_history | show @@ -259,6 +262,18 @@ drop_replication_slot: } ; +/* ALTER_REPLICATION_SLOT slot (options) */ +alter_replication_slot: + K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')' + { + AlterReplicationSlotCmd *cmd; + cmd = makeNode(AlterReplicationSlotCmd); + cmd->slotname = $2; + cmd->options = $4; + $$ = (Node *) cmd; + } + ; + /* * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ @@ -410,6 +425,7 @@ ident_or_keyword: | K_START_REPLICATION { $$ = "start_replication"; } | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } + | K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; } | K_WAIT { $$ = "wait"; } | K_TIMELINE { $$ = "timeline"; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 6fa625617bd..e7def800655 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } +ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } @@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void) case K_START_REPLICATION: case K_CREATE_REPLICATION_SLOT: case K_DROP_REPLICATION_SLOT: + case K_ALTER_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_UPLOAD_MANIFEST: diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 02a14ec210e..f2781d0455a 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -684,6 +684,31 @@ ReplicationSlotDrop(const char *name, bool nowait) } /* + * Change the definition of the slot identified by the specified name. + */ +void +ReplicationSlotAlter(const char *name, bool failover) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, false); + + if (SlotIsPhysical(MyReplicationSlot)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use %s with a physical replication slot", + "ALTER_REPLICATION_SLOT")); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.failover = failover; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); +} + +/* * Permanently drop the currently acquired replication slot. */ static void diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 728059518e1..e29a6196a3e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -387,7 +387,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aa80f3de20f..77c8baa32a4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1126,12 +1126,13 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, bool *failover) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool failover_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool failover = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &failover); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { @@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, false); + two_phase, failover); /* * Do options check early so that we can bail before calling the @@ -1399,6 +1411,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) } /* + * Process extra options given to ALTER_REPLICATION_SLOT. + */ +static void +ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +{ + bool failover_given = false; + + /* Parse options */ + foreach_ptr(DefElem, defel, cmd->options) + { + if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Change the definition of a replication slot. + */ +static void +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) +{ + bool failover = false; + + ParseAlterReplSlotOptions(cmd, &failover); + ReplicationSlotAlter(cmd->slotname, failover); +} + +/* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). */ @@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_AlterReplicationSlotCmd: + cmdtag = "ALTER_REPLICATION_SLOT"; + set_ps_display(cmdtag); + AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index af0a333f1a8..ed23333e928 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -73,6 +73,18 @@ typedef struct DropReplicationSlotCmd /* ---------------------- + * ALTER_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct AlterReplicationSlotCmd +{ + NodeTag type; + char *slotname; + List *options; +} AlterReplicationSlotCmd; + + +/* ---------------------- * START_REPLICATION command * ---------------------- */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index db9bb222661..da4c7764921 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -227,6 +227,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 0899891cdb8..f566a99ba16 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -355,10 +355,21 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); /* + * walrcv_alter_slot_fn + * + * Change the definition of a replication slot. Currently, it only supports + * changing the failover property of the slot. + */ +typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, + const char *slotname, + bool failover); + +/* * walrcv_get_backend_pid_fn * * Returns the PID of the remote backend process. @@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; + walrcv_alter_slot_fn walrcv_alter_slot; walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; @@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) +#define walrcv_alter_slot(conn, slotname, failover) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index dc3b0ef8710..90b37b919c2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -85,6 +85,7 @@ AlterOwnerStmt AlterPolicyStmt AlterPublicationAction AlterPublicationStmt +AlterReplicationSlotCmd AlterRoleSetStmt AlterRoleStmt AlterSeqStmt @@ -3879,6 +3880,7 @@ varattrib_1b_e varattrib_4b vbits verifier_context +walrcv_alter_slot_fn walrcv_check_conninfo_fn walrcv_connect_fn walrcv_create_slot_fn |