diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/pg_basebackup/pg_createsubscriber.c | 111 | ||||
-rw-r--r-- | src/bin/pg_basebackup/t/040_pg_createsubscriber.pl | 14 |
2 files changed, 120 insertions, 5 deletions
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index fb57737f7cd..21dd50f8089 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name); static void pg_ctl_status(const char *pg_ctl_cmd, int rc); static void start_standby_server(const struct CreateSubscriberOptions *opt, - bool restricted_access); + bool restricted_access, + bool restrict_logical_worker); static void stop_standby_server(const char *datadir); static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt); @@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn); static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo); +static void check_and_drop_existing_subscriptions(PGconn *conn, + const struct LogicalRepInfo *dbinfo); +static void drop_existing_subscriptions(PGconn *conn, const char *subname, + const char *dbname); #define USEC_PER_SEC 1000000 #define WAIT_INTERVAL 1 /* 1 second */ @@ -1026,6 +1031,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) } /* + * Drop a specified subscription. This is to avoid duplicate subscriptions on + * the primary (publisher node) and the newly created subscriber. We + * shouldn't drop the associated slot as that would be used by the publisher + * node. + */ +static void +drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname) +{ + PQExpBuffer query = createPQExpBuffer(); + PGresult *res; + + Assert(conn != NULL); + + /* + * Construct a query string. These commands are allowed to be executed + * within a transaction. + */ + appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;", + subname); + appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);", + subname); + appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname); + + pg_log_info("dropping subscription \"%s\" on database \"%s\"", + subname, dbname); + + if (!dry_run) + { + res = PQexec(conn, query->data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not drop a subscription \"%s\" settings: %s", + subname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + PQclear(res); + } + + destroyPQExpBuffer(query); +} + +/* + * Retrieve and drop the pre-existing subscriptions. + */ +static void +check_and_drop_existing_subscriptions(PGconn *conn, + const struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer query = createPQExpBuffer(); + char *dbname; + PGresult *res; + + Assert(conn != NULL); + + dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname)); + + appendPQExpBuffer(query, + "SELECT s.subname FROM pg_catalog.pg_subscription s " + "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) " + "WHERE d.datname = %s", + dbname); + res = PQexec(conn, query->data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain pre-existing subscriptions: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + for (int i = 0; i < PQntuples(res); i++) + drop_existing_subscriptions(conn, PQgetvalue(res, i, 0), + dbinfo->dbname); + + PQclear(res); + destroyPQExpBuffer(query); +} + +/* * Create the subscriptions, adjust the initial location for logical * replication and enable the subscriptions. That's the last step for logical * replication setup. @@ -1041,6 +1127,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn) conn = connect_database(dbinfo[i].subconninfo, true); /* + * We don't need the pre-existing subscriptions on the newly formed + * subscriber. They can connect to other publisher nodes and either + * get some unwarranted data or can lead to ERRORs in connecting to + * such nodes. + */ + check_and_drop_existing_subscriptions(conn, &dbinfo[i]); + + /* * Since the publication was created before the consistent LSN, it is * available on the subscriber when the physical replica is promoted. * Remove publications from the subscriber because it has no use. @@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc) } static void -start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access) +start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, + bool restrict_logical_worker) { PQExpBuffer pg_ctl_cmd = createPQExpBuffer(); int rc; @@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_ if (opt->config_file != NULL) appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"", opt->config_file); + + /* Suppress to start logical replication if requested */ + if (restrict_logical_worker) + appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\""); + pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data); rc = system(pg_ctl_cmd->data); pg_ctl_status(pg_ctl_cmd->data, rc); @@ -2067,7 +2167,7 @@ main(int argc, char **argv) * transformation steps. */ pg_log_info("starting the standby with command-line options"); - start_standby_server(&opt, true); + start_standby_server(&opt, true, false); /* Check if the standby server is ready for logical replication */ check_subscriber(dbinfo); @@ -2098,10 +2198,11 @@ main(int argc, char **argv) /* * Start subscriber so the recovery parameters will take effect. Wait - * until accepting connections. + * until accepting connections. We don't want to start logical replication + * during setup. */ pg_log_info("starting the subscriber"); - start_standby_server(&opt, true); + start_standby_server(&opt, true, true); /* Waiting the subscriber to be promoted */ wait_for_end_recovery(dbinfo[0].subconninfo, &opt); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 1241bf6c6a7..80002c5a17f 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -298,6 +298,13 @@ my $result = $node_s->safe_psql('postgres', "SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary" ); is($result, 'failover_slot', 'failover slot is synced'); + +# Create subscription to test its removal +my $dummy_sub = 'regress_sub_dummy'; +$node_p->safe_psql($db1, + "CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)" +); +$node_p->wait_for_replay_catchup($node_s); $node_s->stop; # dry run mode on node S @@ -372,6 +379,13 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')"); # Start subscriber $node_s->start; +# Confirm the pre-existing subscription has been removed +$result = $node_s->safe_psql( + 'postgres', qq( + SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub' +)); +is($result, qq(0), 'pre-existing subscription was dropped'); + # Get subscription names $result = $node_s->safe_psql( 'postgres', qq( |