aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/subscriptioncmds.c116
1 files changed, 111 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index eaf2ec3b362..b647a81fc86 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -69,8 +69,10 @@
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
#define SUBOPT_RUN_AS_OWNER 0x00001000
-#define SUBOPT_LSN 0x00002000
-#define SUBOPT_ORIGIN 0x00004000
+#define SUBOPT_FAILOVER 0x00002000
+#define SUBOPT_LSN 0x00004000
+#define SUBOPT_ORIGIN 0x00008000
+
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -95,6 +97,7 @@ typedef struct SubOpts
bool disableonerr;
bool passwordrequired;
bool runasowner;
+ bool failover;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->passwordrequired = true;
if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
opts->runasowner = false;
+ if (IsSet(supported_opts, SUBOPT_FAILOVER))
+ opts->failover = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
opts->runasowner = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
+ strcmp(defel->defname, "failover") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_FAILOVER;
+ opts->failover = defGetBoolean(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
errmsg("%s and %s are mutually exclusive options",
"connect = false", "copy_data = true")));
+ if (opts->failover &&
+ IsSet(opts->specified_opts, SUBOPT_FAILOVER))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s and %s are mutually exclusive options",
+ "connect = false", "failover = true")));
+
/* Change the defaults of other options. */
opts->enabled = false;
opts->create_slot = false;
@@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
+ values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
- false, CRS_NOEXPORT_SNAPSHOT, NULL);
+ opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
if (twophase_enabled)
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
@@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
(errmsg("created replication slot \"%s\" on publisher",
opts.slot_name)));
}
+
+ /*
+ * If the slot_name is specified without the create_slot option,
+ * it is possible that the user intends to use an existing slot on
+ * the publisher, so here we alter the failover property of the
+ * slot to match the failover value in subscription.
+ *
+ * We do not need to change the failover to false if the server
+ * does not support failover (e.g. pre-PG17).
+ */
+ else if (opts.slot_name &&
+ (opts.failover || walrcv_server_version(wrconn) >= 170000))
+ {
+ walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
+ ereport(NOTICE,
+ (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
+ opts.slot_name, opts.failover ? "true" : "false")));
+ }
}
PG_FINALLY();
{
@@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+ SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subrunasowner - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ {
+ if (!sub->slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for a subscription that does not have a slot name",
+ "failover")));
+
+ /*
+ * Do not allow changing the failover state if the
+ * subscription is enabled. This is because the failover
+ * state of the slot on the publisher cannot be modified
+ * if the slot is currently acquired by the apply worker.
+ */
+ if (sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for enabled subscription",
+ "failover")));
+
+ values[Anum_pg_subscription_subfailover - 1] =
+ BoolGetDatum(opts.failover);
+ replaces[Anum_pg_subscription_subfailover - 1] = true;
+ }
+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
@@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
heap_freetuple(tup);
}
+ /*
+ * Try to acquire the connection necessary for altering slot.
+ *
+ * This has to be at the end because otherwise if there is an error while
+ * doing the database operations we won't be able to rollback altered
+ * slot.
+ */
+ if (replaces[Anum_pg_subscription_subfailover - 1])
+ {
+ bool must_use_password;
+ char *err;
+ WalReceiverConn *wrconn;
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ must_use_password = sub->passwordrequired && !sub->ownersuperuser;
+ wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+ sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
+
+ PG_TRY();
+ {
+ walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+
+ ereport(NOTICE,
+ (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
+ sub->slotname, opts.failover ? "true" : "false")));
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+ }
+
table_close(rel, RowExclusiveLock);
ObjectAddressSet(myself, SubscriptionRelationId, subid);