diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 116 |
1 files changed, 111 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index eaf2ec3b362..b647a81fc86 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -69,8 +69,10 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 -#define SUBOPT_LSN 0x00002000 -#define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_FAILOVER 0x00002000 +#define SUBOPT_LSN 0x00004000 +#define SUBOPT_ORIGIN 0x00008000 + /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -95,6 +97,7 @@ typedef struct SubOpts bool disableonerr; bool passwordrequired; bool runasowner; + bool failover; char *origin; XLogRecPtr lsn; } SubOpts; @@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->passwordrequired = true; if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER)) opts->runasowner = false; + if (IsSet(supported_opts, SUBOPT_FAILOVER)) + opts->failover = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RUN_AS_OWNER; opts->runasowner = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_FAILOVER) && + strcmp(defel->defname, "failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_FAILOVER; + opts->failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (opts->failover && + IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "failover = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; @@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); + values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - false, CRS_NOEXPORT_SNAPSHOT, NULL); + opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); @@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); } + + /* + * If the slot_name is specified without the create_slot option, + * it is possible that the user intends to use an existing slot on + * the publisher, so here we alter the failover property of the + * slot to match the failover value in subscription. + * + * We do not need to change the failover to false if the server + * does not support failover (e.g. pre-PG17). + */ + else if (opts.slot_name && + (opts.failover || walrcv_server_version(wrconn) >= 170000)) + { + walrcv_alter_slot(wrconn, opts.slot_name, opts.failover); + ereport(NOTICE, + (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s", + opts.slot_name, opts.failover ? "true" : "false"))); + } } PG_FINALLY(); { @@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subrunasowner - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) + { + if (!sub->slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for a subscription that does not have a slot name", + "failover"))); + + /* + * Do not allow changing the failover state if the + * subscription is enabled. This is because the failover + * state of the slot on the publisher cannot be modified + * if the slot is currently acquired by the apply worker. + */ + if (sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for enabled subscription", + "failover"))); + + values[Anum_pg_subscription_subfailover - 1] = + BoolGetDatum(opts.failover); + replaces[Anum_pg_subscription_subfailover - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, heap_freetuple(tup); } + /* + * Try to acquire the connection necessary for altering slot. + * + * This has to be at the end because otherwise if there is an error while + * doing the database operations we won't be able to rollback altered + * slot. + */ + if (replaces[Anum_pg_subscription_subfailover - 1]) + { + bool must_use_password; + char *err; + WalReceiverConn *wrconn; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + must_use_password = sub->passwordrequired && !sub->ownersuperuser; + wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + PG_TRY(); + { + walrcv_alter_slot(wrconn, sub->slotname, opts.failover); + + ereport(NOTICE, + (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s", + sub->slotname, opts.failover ? "true" : "false"))); + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + } + table_close(rel, RowExclusiveLock); ObjectAddressSet(myself, SubscriptionRelationId, subid); |