aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPeter Eisentraut <peter@eisentraut.org>2024-03-25 12:30:55 +0100
committerPeter Eisentraut <peter@eisentraut.org>2024-03-25 12:42:47 +0100
commitd44032d0146306971cd5ccf232fe37269717d6f2 (patch)
tree263b004d2d8c8d7eff548ac075ca80ac74760a55 /src
parenta11f330b5584f2430371d68871e00f5c63735299 (diff)
downloadpostgresql-d44032d0146306971cd5ccf232fe37269717d6f2.tar.gz
postgresql-d44032d0146306971cd5ccf232fe37269717d6f2.zip
pg_createsubscriber: creates a new logical replica from a standby server
It must be run on the target server and should be able to connect to the source server (publisher) and the target server (subscriber). All tables in the specified database(s) are included in the logical replication setup. A pair of publication and subscription objects are created for each database. The main advantage of pg_createsubscriber over the common logical replication setup is the initial data copy. It also reduces the catchup phase. Some prerequisites must be met to successfully run it. It is basically the logical replication requirements. It starts creating a publication using FOR ALL TABLES and a replication slot for each specified database. Write recovery parameters into the target data directory and start the target server. It specifies the LSN of the last replication slot (replication start point) up to which the recovery will proceed. Wait until the target server is promoted. Create one subscription per specified database (using publication and replication slot created in a previous step) on the target server. Set the replication progress to the replication start point for each subscription. Enable the subscription for each specified database on the target server. And finally, change the system identifier on the target server. Author: Euler Taveira <euler.taveira@enterprisedb.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com> Reviewed-by: Vignesh C <vignesh21@gmail.com> Reviewed-by: Shubham Khanna <khannashubham1197@gmail.com> Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> Reviewed-by: Peter Eisentraut <peter@eisentraut.org> Discussion: https://www.postgresql.org/message-id/flat/5ac50071-f2ed-4ace-a8fd-b892cffd33eb@www.fastmail.com
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/.gitignore1
-rw-r--r--src/bin/pg_basebackup/Makefile11
-rw-r--r--src/bin/pg_basebackup/meson.build19
-rw-r--r--src/bin/pg_basebackup/nls.mk1
-rw-r--r--src/bin/pg_basebackup/pg_createsubscriber.c2141
-rw-r--r--src/bin/pg_basebackup/t/040_pg_createsubscriber.pl364
6 files changed, 2534 insertions, 3 deletions
diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore
index 26048bdbd84..14d5de6c01e 100644
--- a/src/bin/pg_basebackup/.gitignore
+++ b/src/bin/pg_basebackup/.gitignore
@@ -1,4 +1,5 @@
/pg_basebackup
+/pg_createsubscriber
/pg_receivewal
/pg_recvlogical
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index abfb6440ec8..26c53e473f5 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -44,11 +44,14 @@ BBOBJS = \
bbstreamer_tar.o \
bbstreamer_zstd.o
-all: pg_basebackup pg_receivewal pg_recvlogical
+all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical
pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_createsubscriber: pg_createsubscriber.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -57,6 +60,7 @@ pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport subma
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ $(INSTALL_PROGRAM) pg_createsubscriber$(X) '$(DESTDIR)$(bindir)/pg_createsubscriber$(X)'
$(INSTALL_PROGRAM) pg_receivewal$(X) '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
$(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
@@ -65,12 +69,13 @@ installdirs:
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ rm -f '$(DESTDIR)$(bindir)/pg_createsubscriber$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
clean distclean:
- rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
- $(BBOBJS) pg_receivewal.o pg_recvlogical.o \
+ rm -f pg_basebackup$(X) pg_createsubscriber$(X) pg_receivewal$(X) pg_recvlogical$(X) \
+ $(BBOBJS) pg_createsubscriber.o pg_receivewal.o pg_recvlogical.o \
$(OBJS)
rm -rf tmp_check
diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build
index f7e60e6670a..c00acd5e118 100644
--- a/src/bin/pg_basebackup/meson.build
+++ b/src/bin/pg_basebackup/meson.build
@@ -38,6 +38,24 @@ pg_basebackup = executable('pg_basebackup',
bin_targets += pg_basebackup
+pg_createsubscriber_sources = files(
+ 'pg_createsubscriber.c'
+)
+
+if host_system == 'windows'
+ pg_createsubscriber_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'pg_createsubscriber',
+ '--FILEDESC', 'pg_createsubscriber - create a new logical replica from a standby server',])
+endif
+
+pg_createsubscriber = executable('pg_createsubscriber',
+ pg_createsubscriber_sources,
+ dependencies: [frontend_code, libpq],
+ kwargs: default_bin_args,
+)
+bin_targets += pg_createsubscriber
+
+
pg_receivewal_sources = files(
'pg_receivewal.c',
)
@@ -89,6 +107,7 @@ tests += {
't/011_in_place_tablespace.pl',
't/020_pg_receivewal.pl',
't/030_pg_recvlogical.pl',
+ 't/040_pg_createsubscriber.pl',
],
},
}
diff --git a/src/bin/pg_basebackup/nls.mk b/src/bin/pg_basebackup/nls.mk
index fc475003e8e..7870cea71ce 100644
--- a/src/bin/pg_basebackup/nls.mk
+++ b/src/bin/pg_basebackup/nls.mk
@@ -8,6 +8,7 @@ GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) \
bbstreamer_tar.c \
bbstreamer_zstd.c \
pg_basebackup.c \
+ pg_createsubscriber.c \
pg_receivewal.c \
pg_recvlogical.c \
receivelog.c \
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
new file mode 100644
index 00000000000..b8f82693405
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -0,0 +1,2141 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_createsubscriber.c
+ * Create a new logical replica from a standby server
+ *
+ * Copyright (C) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/pg_createsubscriber.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <time.h>
+
+#include "catalog/pg_authid_d.h"
+#include "common/connect.h"
+#include "common/controldata_utils.h"
+#include "common/file_perm.h"
+#include "common/logging.h"
+#include "common/pg_prng.h"
+#include "common/restricted_token.h"
+#include "fe_utils/recovery_gen.h"
+#include "fe_utils/simple_list.h"
+#include "getopt_long.h"
+
+#define DEFAULT_SUB_PORT "50432"
+
+/* Command-line options */
+struct CreateSubscriberOptions
+{
+ char *config_file; /* configuration file */
+ char *pub_conninfo_str; /* publisher connection string */
+ char *socket_dir; /* directory for Unix-domain socket, if any */
+ char *sub_port; /* subscriber port number */
+ const char *sub_username; /* subscriber username */
+ SimpleStringList database_names; /* list of database names */
+ SimpleStringList pub_names; /* list of publication names */
+ SimpleStringList sub_names; /* list of subscription names */
+ SimpleStringList replslot_names; /* list of replication slot names */
+ int recovery_timeout; /* stop recovery after this time */
+};
+
+struct LogicalRepInfo
+{
+ Oid oid; /* database OID */
+ char *dbname; /* database name */
+ char *pubconninfo; /* publisher connection string */
+ char *subconninfo; /* subscriber connection string */
+ char *pubname; /* publication name */
+ char *subname; /* subscription name */
+ char *replslotname; /* replication slot name */
+
+ bool made_replslot; /* replication slot was created */
+ bool made_publication; /* publication was created */
+};
+
+static void cleanup_objects_atexit(void);
+static void usage();
+static char *get_base_conninfo(const char *conninfo, char **dbname);
+static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
+static char *get_exec_path(const char *argv0, const char *progname);
+static void check_data_directory(const char *datadir);
+static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
+static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
+ const char *pub_base_conninfo,
+ const char *sub_base_conninfo);
+static PGconn *connect_database(const char *conninfo, bool exit_on_error);
+static void disconnect_database(PGconn *conn, bool exit_on_error);
+static uint64 get_primary_sysid(const char *conninfo);
+static uint64 get_standby_sysid(const char *datadir);
+static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
+static bool server_is_in_recovery(PGconn *conn);
+static char *generate_object_name(PGconn *conn);
+static void check_publisher(const struct LogicalRepInfo *dbinfo);
+static char *setup_publisher(struct LogicalRepInfo *dbinfo);
+static void check_subscriber(const struct LogicalRepInfo *dbinfo);
+static void setup_subscriber(struct LogicalRepInfo *dbinfo,
+ const char *consistent_lsn);
+static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
+ const char *lsn);
+static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
+ const char *slotname);
+static char *create_logical_replication_slot(PGconn *conn,
+ struct LogicalRepInfo *dbinfo);
+static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
+ const char *slot_name);
+static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
+static void start_standby_server(const struct CreateSubscriberOptions *opt,
+ bool restricted_access);
+static void stop_standby_server(const char *datadir);
+static void wait_for_end_recovery(const char *conninfo,
+ const struct CreateSubscriberOptions *opt);
+static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
+ const char *lsn);
+static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+
+#define USEC_PER_SEC 1000000
+#define WAIT_INTERVAL 1 /* 1 second */
+
+static const char *progname;
+
+static char *primary_slot_name = NULL;
+static bool dry_run = false;
+
+static bool success = false;
+
+static struct LogicalRepInfo *dbinfo;
+static int num_dbs = 0; /* number of specified databases */
+static int num_pubs = 0; /* number of specified publications */
+static int num_subs = 0; /* number of specified subscriptions */
+static int num_replslots = 0; /* number of specified replication slots */
+
+static pg_prng_state prng_state;
+
+static char *pg_ctl_path = NULL;
+static char *pg_resetwal_path = NULL;
+
+/* standby / subscriber data directory */
+static char *subscriber_dir = NULL;
+
+static bool recovery_ended = false;
+static bool standby_running = false;
+
+enum WaitPMResult
+{
+ POSTMASTER_READY,
+ POSTMASTER_STILL_STARTING
+};
+
+
+/*
+ * Cleanup objects that were created by pg_createsubscriber if there is an
+ * error.
+ *
+ * Publications and replication slots are created on primary. Depending on the
+ * step it failed, it should remove the already created objects if it is
+ * possible (sometimes it won't work due to a connection issue).
+ * There is no cleanup on the target server. The steps on the target server are
+ * executed *after* promotion, hence, at this point, a failure means recreate
+ * the physical replica and start again.
+ */
+static void
+cleanup_objects_atexit(void)
+{
+ if (success)
+ return;
+
+ /*
+ * If the server is promoted, there is no way to use the current setup
+ * again. Warn the user that a new replication setup should be done before
+ * trying again.
+ */
+ if (recovery_ended)
+ {
+ pg_log_warning("failed after the end of recovery");
+ pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
+ "You must recreate the physical replica before continuing.");
+ }
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+ {
+ PGconn *conn;
+
+ conn = connect_database(dbinfo[i].pubconninfo, false);
+ if (conn != NULL)
+ {
+ if (dbinfo[i].made_publication)
+ drop_publication(conn, &dbinfo[i]);
+ if (dbinfo[i].made_replslot)
+ drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
+ disconnect_database(conn, false);
+ }
+ else
+ {
+ /*
+ * If a connection could not be established, inform the user
+ * that some objects were left on primary and should be
+ * removed before trying again.
+ */
+ if (dbinfo[i].made_publication)
+ {
+ pg_log_warning("publication \"%s\" in database \"%s\" on primary might be left behind",
+ dbinfo[i].pubname, dbinfo[i].dbname);
+ pg_log_warning_hint("Consider dropping this publication before trying again.");
+ }
+ if (dbinfo[i].made_replslot)
+ {
+ pg_log_warning("replication slot \"%s\" in database \"%s\" on primary might be left behind",
+ dbinfo[i].replslotname, dbinfo[i].dbname);
+ pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+ }
+ }
+ }
+ }
+
+ if (standby_running)
+ stop_standby_server(subscriber_dir);
+}
+
+static void
+usage(void)
+{
+ printf(_("%s creates a new logical replica from a standby server.\n\n"),
+ progname);
+ printf(_("Usage:\n"));
+ printf(_(" %s [OPTION]...\n"), progname);
+ printf(_("\nOptions:\n"));
+ printf(_(" -d, --database=DBNAME database to create a subscription\n"));
+ printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
+ printf(_(" -n, --dry-run dry run, just show what would be done\n"));
+ printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
+ printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
+ printf(_(" -s, --socket-directory=DIR socket directory to use (default current directory)\n"));
+ printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
+ printf(_(" -U, --subscriber-username=NAME subscriber username\n"));
+ printf(_(" -v, --verbose output verbose messages\n"));
+ printf(_(" --config-file=FILENAME use specified main server configuration\n"
+ " file when running target cluster\n"));
+ printf(_(" --publication=NAME publication name\n"));
+ printf(_(" --replication-slot=NAME replication slot name\n"));
+ printf(_(" --subscription=NAME subscription name\n"));
+ printf(_(" -V, --version output version information, then exit\n"));
+ printf(_(" -?, --help show this help, then exit\n"));
+ printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+ printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+/*
+ * Validate a connection string. Returns a base connection string that is a
+ * connection string without a database name.
+ *
+ * Since we might process multiple databases, each database name will be
+ * appended to this base connection string to provide a final connection
+ * string. If the second argument (dbname) is not null, returns dbname if the
+ * provided connection string contains it. If option --database is not
+ * provided, uses dbname as the only database to setup the logical replica.
+ *
+ * It is the caller's responsibility to free the returned connection string and
+ * dbname.
+ */
+static char *
+get_base_conninfo(const char *conninfo, char **dbname)
+{
+ PQExpBuffer buf = createPQExpBuffer();
+ PQconninfoOption *conn_opts = NULL;
+ PQconninfoOption *conn_opt;
+ char *errmsg = NULL;
+ char *ret;
+ int i;
+
+ conn_opts = PQconninfoParse(conninfo, &errmsg);
+ if (conn_opts == NULL)
+ {
+ pg_log_error("could not parse connection string: %s", errmsg);
+ return NULL;
+ }
+
+ i = 0;
+ for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+ {
+ if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
+ {
+ if (dbname)
+ *dbname = pg_strdup(conn_opt->val);
+ continue;
+ }
+
+ if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+ {
+ if (i > 0)
+ appendPQExpBufferChar(buf, ' ');
+ appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
+ i++;
+ }
+ }
+
+ ret = pg_strdup(buf->data);
+
+ destroyPQExpBuffer(buf);
+ PQconninfoFree(conn_opts);
+
+ return ret;
+}
+
+/*
+ * Build a subscriber connection string. Only a few parameters are supported
+ * since it starts a server with restricted access.
+ */
+static char *
+get_sub_conninfo(const struct CreateSubscriberOptions *opt)
+{
+ PQExpBuffer buf = createPQExpBuffer();
+ char *ret;
+
+ appendPQExpBuffer(buf, "port=%s", opt->sub_port);
+#if !defined(WIN32)
+ appendPQExpBuffer(buf, " host=%s", opt->socket_dir);
+#endif
+ if (opt->sub_username != NULL)
+ appendPQExpBuffer(buf, " user=%s", opt->sub_username);
+ appendPQExpBuffer(buf, " fallback_application_name=%s", progname);
+
+ ret = pg_strdup(buf->data);
+
+ destroyPQExpBuffer(buf);
+
+ return ret;
+}
+
+/*
+ * Verify if a PostgreSQL binary (progname) is available in the same directory as
+ * pg_createsubscriber and it has the same version. It returns the absolute
+ * path of the progname.
+ */
+static char *
+get_exec_path(const char *argv0, const char *progname)
+{
+ char *versionstr;
+ char *exec_path;
+ int ret;
+
+ versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
+ exec_path = pg_malloc(MAXPGPATH);
+ ret = find_other_exec(argv0, progname, versionstr, exec_path);
+
+ if (ret < 0)
+ {
+ char full_path[MAXPGPATH];
+
+ if (find_my_exec(argv0, full_path) < 0)
+ strlcpy(full_path, progname, sizeof(full_path));
+
+ if (ret == -1)
+ pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
+ progname, "pg_createsubscriber", full_path);
+ else
+ pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
+ progname, full_path, "pg_createsubscriber");
+ }
+
+ pg_log_debug("%s path is: %s", progname, exec_path);
+
+ return exec_path;
+}
+
+/*
+ * Is it a cluster directory? These are preliminary checks. It is far from
+ * making an accurate check. If it is not a clone from the publisher, it will
+ * eventually fail in a future step.
+ */
+static void
+check_data_directory(const char *datadir)
+{
+ struct stat statbuf;
+ char versionfile[MAXPGPATH];
+
+ pg_log_info("checking if directory \"%s\" is a cluster data directory",
+ datadir);
+
+ if (stat(datadir, &statbuf) != 0)
+ {
+ if (errno == ENOENT)
+ pg_fatal("data directory \"%s\" does not exist", datadir);
+ else
+ pg_fatal("could not access directory \"%s\": %s", datadir,
+ strerror(errno));
+ }
+
+ snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
+ if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
+ {
+ pg_fatal("directory \"%s\" is not a database cluster directory",
+ datadir);
+ }
+}
+
+/*
+ * Append database name into a base connection string.
+ *
+ * dbname is the only parameter that changes so it is not included in the base
+ * connection string. This function concatenates dbname to build a "real"
+ * connection string.
+ */
+static char *
+concat_conninfo_dbname(const char *conninfo, const char *dbname)
+{
+ PQExpBuffer buf = createPQExpBuffer();
+ char *ret;
+
+ Assert(conninfo != NULL);
+
+ appendPQExpBufferStr(buf, conninfo);
+ appendPQExpBuffer(buf, " dbname=%s", dbname);
+
+ ret = pg_strdup(buf->data);
+ destroyPQExpBuffer(buf);
+
+ return ret;
+}
+
+/*
+ * Store publication and subscription information.
+ *
+ * If publication, replication slot and subscription names were specified,
+ * store it here. Otherwise, a generated name will be assigned to the object in
+ * setup_publisher().
+ */
+static struct LogicalRepInfo *
+store_pub_sub_info(const struct CreateSubscriberOptions *opt,
+ const char *pub_base_conninfo,
+ const char *sub_base_conninfo)
+{
+ struct LogicalRepInfo *dbinfo;
+ SimpleStringListCell *pubcell = NULL;
+ SimpleStringListCell *subcell = NULL;
+ SimpleStringListCell *replslotcell = NULL;
+ int i = 0;
+
+ dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
+
+ if (num_pubs > 0)
+ pubcell = opt->pub_names.head;
+ if (num_subs > 0)
+ subcell = opt->sub_names.head;
+ if (num_replslots > 0)
+ replslotcell = opt->replslot_names.head;
+
+ for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
+ {
+ char *conninfo;
+
+ /* Fill publisher attributes */
+ conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
+ dbinfo[i].pubconninfo = conninfo;
+ dbinfo[i].dbname = cell->val;
+ if (num_pubs > 0)
+ dbinfo[i].pubname = pubcell->val;
+ else
+ dbinfo[i].pubname = NULL;
+ if (num_replslots > 0)
+ dbinfo[i].replslotname = replslotcell->val;
+ else
+ dbinfo[i].replslotname = NULL;
+ dbinfo[i].made_replslot = false;
+ dbinfo[i].made_publication = false;
+ /* Fill subscriber attributes */
+ conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
+ dbinfo[i].subconninfo = conninfo;
+ if (num_subs > 0)
+ dbinfo[i].subname = subcell->val;
+ else
+ dbinfo[i].subname = NULL;
+ /* Other fields will be filled later */
+
+ pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+ dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
+ dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
+ dbinfo[i].pubconninfo);
+ pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+ dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
+ dbinfo[i].subconninfo);
+
+ if (num_pubs > 0)
+ pubcell = pubcell->next;
+ if (num_subs > 0)
+ subcell = subcell->next;
+ if (num_replslots > 0)
+ replslotcell = replslotcell->next;
+
+ i++;
+ }
+
+ return dbinfo;
+}
+
+/*
+ * Open a new connection. If exit_on_error is true, it has an undesired
+ * condition and it should exit immediately.
+ */
+static PGconn *
+connect_database(const char *conninfo, bool exit_on_error)
+{
+ PGconn *conn;
+ PGresult *res;
+
+ conn = PQconnectdb(conninfo);
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ pg_log_error("connection to database failed: %s",
+ PQerrorMessage(conn));
+ if (exit_on_error)
+ exit(1);
+
+ return NULL;
+ }
+
+ /* Secure search_path */
+ res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not clear search_path: %s",
+ PQresultErrorMessage(res));
+ if (exit_on_error)
+ exit(1);
+
+ return NULL;
+ }
+ PQclear(res);
+
+ return conn;
+}
+
+/*
+ * Close the connection. If exit_on_error is true, it has an undesired
+ * condition and it should exit immediately.
+ */
+static void
+disconnect_database(PGconn *conn, bool exit_on_error)
+{
+ Assert(conn != NULL);
+
+ PQfinish(conn);
+
+ if (exit_on_error)
+ exit(1);
+}
+
+/*
+ * Obtain the system identifier using the provided connection. It will be used
+ * to compare if a data directory is a clone of another one.
+ */
+static uint64
+get_primary_sysid(const char *conninfo)
+{
+ PGconn *conn;
+ PGresult *res;
+ uint64 sysid;
+
+ pg_log_info("getting system identifier from publisher");
+
+ conn = connect_database(conninfo, true);
+
+ res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get system identifier: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+ if (PQntuples(res) != 1)
+ {
+ pg_log_error("could not get system identifier: got %d rows, expected %d row",
+ PQntuples(res), 1);
+ disconnect_database(conn, true);
+ }
+
+ sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
+
+ pg_log_info("system identifier is %llu on publisher",
+ (unsigned long long) sysid);
+
+ PQclear(res);
+ disconnect_database(conn, false);
+
+ return sysid;
+}
+
+/*
+ * Obtain the system identifier from control file. It will be used to compare
+ * if a data directory is a clone of another one. This routine is used locally
+ * and avoids a connection.
+ */
+static uint64
+get_standby_sysid(const char *datadir)
+{
+ ControlFileData *cf;
+ bool crc_ok;
+ uint64 sysid;
+
+ pg_log_info("getting system identifier from subscriber");
+
+ cf = get_controlfile(datadir, &crc_ok);
+ if (!crc_ok)
+ pg_fatal("control file appears to be corrupt");
+
+ sysid = cf->system_identifier;
+
+ pg_log_info("system identifier is %llu on subscriber",
+ (unsigned long long) sysid);
+
+ pg_free(cf);
+
+ return sysid;
+}
+
+/*
+ * Modify the system identifier. Since a standby server preserves the system
+ * identifier, it makes sense to change it to avoid situations in which WAL
+ * files from one of the systems might be used in the other one.
+ */
+static void
+modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
+{
+ ControlFileData *cf;
+ bool crc_ok;
+ struct timeval tv;
+
+ char *cmd_str;
+
+ pg_log_info("modifying system identifier of subscriber");
+
+ cf = get_controlfile(subscriber_dir, &crc_ok);
+ if (!crc_ok)
+ pg_fatal("control file appears to be corrupt");
+
+ /*
+ * Select a new system identifier.
+ *
+ * XXX this code was extracted from BootStrapXLOG().
+ */
+ gettimeofday(&tv, NULL);
+ cf->system_identifier = ((uint64) tv.tv_sec) << 32;
+ cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
+ cf->system_identifier |= getpid() & 0xFFF;
+
+ if (!dry_run)
+ update_controlfile(subscriber_dir, cf, true);
+
+ pg_log_info("system identifier is %llu on subscriber",
+ (unsigned long long) cf->system_identifier);
+
+ pg_log_info("running pg_resetwal on the subscriber");
+
+ cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
+ subscriber_dir, DEVNULL);
+
+ pg_log_debug("pg_resetwal command is: %s", cmd_str);
+
+ if (!dry_run)
+ {
+ int rc = system(cmd_str);
+
+ if (rc == 0)
+ pg_log_info("subscriber successfully changed the system identifier");
+ else
+ pg_fatal("subscriber failed to change system identifier: exit code: %d", rc);
+ }
+
+ pg_free(cf);
+}
+
+/*
+ * Generate an object name using a prefix, database oid and a random integer.
+ * It is used in case the user does not specify an object name (publication,
+ * subscription, replication slot).
+ */
+static char *
+generate_object_name(PGconn *conn)
+{
+ PGresult *res;
+ Oid oid;
+ uint32 rand;
+ char *objname;
+
+ res = PQexec(conn,
+ "SELECT oid FROM pg_catalog.pg_database "
+ "WHERE datname = pg_catalog.current_database()");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain database OID: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) != 1)
+ {
+ pg_log_error("could not obtain database OID: got %d rows, expected %d rows",
+ PQntuples(res), 1);
+ disconnect_database(conn, true);
+ }
+
+ /* Database OID */
+ oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+
+ PQclear(res);
+
+ /* Random unsigned integer */
+ rand = pg_prng_uint32(&prng_state);
+
+ /*
+ * Build the object name. The name must not exceed NAMEDATALEN - 1. This
+ * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
+ * '\0').
+ */
+ objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
+
+ return objname;
+}
+
+/*
+ * Create the publications and replication slots in preparation for logical
+ * replication. Returns the LSN from latest replication slot. It will be the
+ * replication start point that is used to adjust the subscriptions (see
+ * set_replication_progress).
+ */
+static char *
+setup_publisher(struct LogicalRepInfo *dbinfo)
+{
+ char *lsn = NULL;
+
+ pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ char *genname = NULL;
+
+ conn = connect_database(dbinfo[i].pubconninfo, true);
+
+ /*
+ * If an object name was not specified as command-line options, assign
+ * a generated object name. The replication slot has a different rule.
+ * The subscription name is assigned to the replication slot name if
+ * no replication slot is specified. It follows the same rule as
+ * CREATE SUBSCRIPTION.
+ */
+ if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+ genname = generate_object_name(conn);
+ if (num_pubs == 0)
+ dbinfo[i].pubname = pg_strdup(genname);
+ if (num_subs == 0)
+ dbinfo[i].subname = pg_strdup(genname);
+ if (num_replslots == 0)
+ dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
+
+ /*
+ * Create publication on publisher. This step should be executed
+ * *before* promoting the subscriber to avoid any transactions between
+ * consistent LSN and the new publication rows (such transactions
+ * wouldn't see the new publication rows resulting in an error).
+ */
+ create_publication(conn, &dbinfo[i]);
+
+ /* Create replication slot on publisher */
+ if (lsn)
+ pg_free(lsn);
+ lsn = create_logical_replication_slot(conn, &dbinfo[i]);
+ if (lsn != NULL || dry_run)
+ pg_log_info("create replication slot \"%s\" on publisher",
+ dbinfo[i].replslotname);
+ else
+ exit(1);
+
+ disconnect_database(conn, false);
+ }
+
+ return lsn;
+}
+
+/*
+ * Is recovery still in progress?
+ */
+static bool
+server_is_in_recovery(PGconn *conn)
+{
+ PGresult *res;
+ int ret;
+
+ res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain recovery progress: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+
+ ret = strcmp("t", PQgetvalue(res, 0, 0));
+
+ PQclear(res);
+
+ return ret == 0;
+}
+
+/*
+ * Is the primary server ready for logical replication?
+ *
+ * XXX Does it not allow a synchronous replica?
+ */
+static void
+check_publisher(const struct LogicalRepInfo *dbinfo)
+{
+ PGconn *conn;
+ PGresult *res;
+ bool failed = false;
+
+ char *wal_level;
+ int max_repslots;
+ int cur_repslots;
+ int max_walsenders;
+ int cur_walsenders;
+
+ pg_log_info("checking settings on publisher");
+
+ conn = connect_database(dbinfo[0].pubconninfo, true);
+
+ /*
+ * If the primary server is in recovery (i.e. cascading replication),
+ * objects (publication) cannot be created because it is read only.
+ */
+ if (server_is_in_recovery(conn))
+ {
+ pg_log_error("primary server cannot be in recovery");
+ disconnect_database(conn, true);
+ }
+
+ /*------------------------------------------------------------------------
+ * Logical replication requires a few parameters to be set on publisher.
+ * Since these parameters are not a requirement for physical replication,
+ * we should check it to make sure it won't fail.
+ *
+ * - wal_level = logical
+ * - max_replication_slots >= current + number of dbs to be converted
+ * - max_wal_senders >= current + number of dbs to be converted
+ * -----------------------------------------------------------------------
+ */
+ res = PQexec(conn,
+ "WITH wl AS "
+ "(SELECT setting AS wallevel FROM pg_catalog.pg_settings "
+ "WHERE name = 'wal_level'), "
+ "total_mrs AS "
+ "(SELECT setting AS tmrs FROM pg_catalog.pg_settings "
+ "WHERE name = 'max_replication_slots'), "
+ "cur_mrs AS "
+ "(SELECT count(*) AS cmrs "
+ "FROM pg_catalog.pg_replication_slots), "
+ "total_mws AS "
+ "(SELECT setting AS tmws FROM pg_catalog.pg_settings "
+ "WHERE name = 'max_wal_senders'), "
+ "cur_mws AS "
+ "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity "
+ "WHERE backend_type = 'walsender') "
+ "SELECT wallevel, tmrs, cmrs, tmws, cmws "
+ "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain publisher settings: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ wal_level = pg_strdup(PQgetvalue(res, 0, 0));
+ max_repslots = atoi(PQgetvalue(res, 0, 1));
+ cur_repslots = atoi(PQgetvalue(res, 0, 2));
+ max_walsenders = atoi(PQgetvalue(res, 0, 3));
+ cur_walsenders = atoi(PQgetvalue(res, 0, 4));
+
+ PQclear(res);
+
+ pg_log_debug("publisher: wal_level: %s", wal_level);
+ pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
+ pg_log_debug("publisher: current replication slots: %d", cur_repslots);
+ pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
+ pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
+
+ /*
+ * If standby sets primary_slot_name, check if this replication slot is in
+ * use on primary for WAL retention purposes. This replication slot has no
+ * use after the transformation, hence, it will be removed at the end of
+ * this process.
+ */
+ if (primary_slot_name)
+ {
+ PQExpBuffer str = createPQExpBuffer();
+ char *psn_esc = PQescapeLiteral(conn, primary_slot_name, strlen(primary_slot_name));
+
+ appendPQExpBuffer(str,
+ "SELECT 1 FROM pg_catalog.pg_replication_slots "
+ "WHERE active AND slot_name = %s",
+ psn_esc);
+
+ pg_free(psn_esc);
+
+ pg_log_debug("command is: %s", str->data);
+
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain replication slot information: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) != 1)
+ {
+ pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
+ PQntuples(res), 1);
+ disconnect_database(conn, true);
+ }
+ else
+ pg_log_info("primary has replication slot \"%s\"",
+ primary_slot_name);
+
+ PQclear(res);
+ }
+
+ disconnect_database(conn, false);
+
+ if (strcmp(wal_level, "logical") != 0)
+ {
+ pg_log_error("publisher requires wal_level >= logical");
+ failed = true;
+ }
+
+ if (max_repslots - cur_repslots < num_dbs)
+ {
+ pg_log_error("publisher requires %d replication slots, but only %d remain",
+ num_dbs, max_repslots - cur_repslots);
+ pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
+ cur_repslots + num_dbs);
+ failed = true;
+ }
+
+ if (max_walsenders - cur_walsenders < num_dbs)
+ {
+ pg_log_error("publisher requires %d wal sender processes, but only %d remain",
+ num_dbs, max_walsenders - cur_walsenders);
+ pg_log_error_hint("Consider increasing max_wal_senders to at least %d.",
+ cur_walsenders + num_dbs);
+ failed = true;
+ }
+
+ if (failed)
+ exit(1);
+}
+
+/*
+ * Is the standby server ready for logical replication?
+ *
+ * XXX Does it not allow a time-delayed replica?
+ *
+ * XXX In a cascaded replication scenario (P -> S -> C), if the target server
+ * is S, it cannot detect there is a replica (server C) because server S starts
+ * accepting only local connections and server C cannot connect to it. Hence,
+ * there is not a reliable way to provide a suitable error saying the server C
+ * will be broken at the end of this process (due to pg_resetwal).
+ */
+static void
+check_subscriber(const struct LogicalRepInfo *dbinfo)
+{
+ PGconn *conn;
+ PGresult *res;
+ bool failed = false;
+
+ int max_lrworkers;
+ int max_repslots;
+ int max_wprocs;
+
+ pg_log_info("checking settings on subscriber");
+
+ conn = connect_database(dbinfo[0].subconninfo, true);
+
+ /* The target server must be a standby */
+ if (!server_is_in_recovery(conn))
+ {
+ pg_log_error("target server must be a standby");
+ disconnect_database(conn, true);
+ }
+
+ /*------------------------------------------------------------------------
+ * Logical replication requires a few parameters to be set on subscriber.
+ * Since these parameters are not a requirement for physical replication,
+ * we should check it to make sure it won't fail.
+ *
+ * - max_replication_slots >= number of dbs to be converted
+ * - max_logical_replication_workers >= number of dbs to be converted
+ * - max_worker_processes >= 1 + number of dbs to be converted
+ *------------------------------------------------------------------------
+ */
+ res = PQexec(conn,
+ "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
+ "'max_logical_replication_workers', "
+ "'max_replication_slots', "
+ "'max_worker_processes', "
+ "'primary_slot_name') "
+ "ORDER BY name");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain subscriber settings: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ max_lrworkers = atoi(PQgetvalue(res, 0, 0));
+ max_repslots = atoi(PQgetvalue(res, 1, 0));
+ max_wprocs = atoi(PQgetvalue(res, 2, 0));
+ if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
+ primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
+
+ pg_log_debug("subscriber: max_logical_replication_workers: %d",
+ max_lrworkers);
+ pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
+ pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
+ if (primary_slot_name)
+ pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
+
+ PQclear(res);
+
+ disconnect_database(conn, false);
+
+ if (max_repslots < num_dbs)
+ {
+ pg_log_error("subscriber requires %d replication slots, but only %d remain",
+ num_dbs, max_repslots);
+ pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
+ num_dbs);
+ failed = true;
+ }
+
+ if (max_lrworkers < num_dbs)
+ {
+ pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
+ num_dbs, max_lrworkers);
+ pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.",
+ num_dbs);
+ failed = true;
+ }
+
+ if (max_wprocs < num_dbs + 1)
+ {
+ pg_log_error("subscriber requires %d worker processes, but only %d remain",
+ num_dbs + 1, max_wprocs);
+ pg_log_error_hint("Consider increasing max_worker_processes to at least %d.",
+ num_dbs + 1);
+ failed = true;
+ }
+
+ if (failed)
+ exit(1);
+}
+
+/*
+ * Create the subscriptions, adjust the initial location for logical
+ * replication and enable the subscriptions. That's the last step for logical
+ * replication setup.
+ */
+static void
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+{
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+
+ /* Connect to subscriber. */
+ conn = connect_database(dbinfo[i].subconninfo, true);
+
+ /*
+ * Since the publication was created before the consistent LSN, it is
+ * available on the subscriber when the physical replica is promoted.
+ * Remove publications from the subscriber because it has no use.
+ */
+ drop_publication(conn, &dbinfo[i]);
+
+ create_subscription(conn, &dbinfo[i]);
+
+ /* Set the replication progress to the correct LSN */
+ set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+
+ /* Enable subscription */
+ enable_subscription(conn, &dbinfo[i]);
+
+ disconnect_database(conn, false);
+ }
+}
+
+/*
+ * Write the required recovery parameters.
+ */
+static void
+setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
+{
+ PGconn *conn;
+ PQExpBuffer recoveryconfcontents;
+
+ /*
+ * Despite of the recovery parameters will be written to the subscriber,
+ * use a publisher connection. The primary_conninfo is generated using the
+ * connection settings.
+ */
+ conn = connect_database(dbinfo[0].pubconninfo, true);
+
+ /*
+ * Write recovery parameters.
+ *
+ * The subscriber is not running yet. In dry run mode, the recovery
+ * parameters *won't* be written. An invalid LSN is used for printing
+ * purposes. Additional recovery parameters are added here. It avoids
+ * unexpected behavior such as end of recovery as soon as a consistent
+ * state is reached (recovery_target) and failure due to multiple recovery
+ * targets (name, time, xid, LSN).
+ */
+ recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
+ appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
+ appendPQExpBuffer(recoveryconfcontents,
+ "recovery_target_timeline = 'latest'\n");
+ appendPQExpBuffer(recoveryconfcontents,
+ "recovery_target_inclusive = true\n");
+ appendPQExpBuffer(recoveryconfcontents,
+ "recovery_target_action = promote\n");
+ appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
+ appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
+ appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
+
+ if (dry_run)
+ {
+ appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
+ appendPQExpBuffer(recoveryconfcontents,
+ "recovery_target_lsn = '%X/%X'\n",
+ LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+ }
+ else
+ {
+ appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
+ lsn);
+ WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
+ }
+ disconnect_database(conn, false);
+
+ pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+}
+
+/*
+ * Drop physical replication slot on primary if the standby was using it. After
+ * the transformation, it has no use.
+ *
+ * XXX we might not fail here. Instead, we provide a warning so the user
+ * eventually drops this replication slot later.
+ */
+static void
+drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
+{
+ PGconn *conn;
+
+ /* Replication slot does not exist, do nothing */
+ if (!primary_slot_name)
+ return;
+
+ conn = connect_database(dbinfo[0].pubconninfo, false);
+ if (conn != NULL)
+ {
+ drop_replication_slot(conn, &dbinfo[0], slotname);
+ disconnect_database(conn, false);
+ }
+ else
+ {
+ pg_log_warning("could not drop replication slot \"%s\" on primary",
+ slotname);
+ pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+ }
+}
+
+/*
+ * Create a logical replication slot and returns a LSN.
+ *
+ * CreateReplicationSlot() is not used because it does not provide the one-row
+ * result set that contains the LSN.
+ */
+static char *
+create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res = NULL;
+ const char *slot_name = dbinfo->replslotname;
+ char *slot_name_esc;
+ char *lsn = NULL;
+
+ Assert(conn != NULL);
+
+ pg_log_info("creating the replication slot \"%s\" on database \"%s\"",
+ slot_name, dbinfo->dbname);
+
+ slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
+
+ appendPQExpBuffer(str,
+ "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
+ slot_name_esc);
+
+ pg_free(slot_name_esc);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s",
+ slot_name, dbinfo->dbname,
+ PQresultErrorMessage(res));
+ return NULL;
+ }
+
+ lsn = pg_strdup(PQgetvalue(res, 0, 0));
+ PQclear(res);
+ }
+
+ /* For cleanup purposes */
+ dbinfo->made_replslot = true;
+
+ destroyPQExpBuffer(str);
+
+ return lsn;
+}
+
+static void
+drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
+ const char *slot_name)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ char *slot_name_esc;
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ pg_log_info("dropping the replication slot \"%s\" on database \"%s\"",
+ slot_name, dbinfo->dbname);
+
+ slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
+
+ appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
+
+ pg_free(slot_name_esc);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s",
+ slot_name, dbinfo->dbname, PQresultErrorMessage(res));
+ dbinfo->made_replslot = false; /* don't try again. */
+ }
+
+ PQclear(res);
+ }
+
+ destroyPQExpBuffer(str);
+}
+
+/*
+ * Reports a suitable message if pg_ctl fails.
+ */
+static void
+pg_ctl_status(const char *pg_ctl_cmd, int rc)
+{
+ if (rc != 0)
+ {
+ if (WIFEXITED(rc))
+ {
+ pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+ }
+ else if (WIFSIGNALED(rc))
+ {
+#if defined(WIN32)
+ pg_log_error("pg_ctl was terminated by exception 0x%X",
+ WTERMSIG(rc));
+ pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+#else
+ pg_log_error("pg_ctl was terminated by signal %d: %s",
+ WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+#endif
+ }
+ else
+ {
+ pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+ }
+
+ pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+ exit(1);
+ }
+}
+
+static void
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+{
+ PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
+ int rc;
+
+ appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s",
+ pg_ctl_path, subscriber_dir);
+ if (restricted_access)
+ {
+ appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
+#if !defined(WIN32)
+
+ /*
+ * An empty listen_addresses list means the server does not listen on
+ * any IP interfaces; only Unix-domain sockets can be used to connect
+ * to the server. Prevent external connections to minimize the chance
+ * of failure.
+ */
+ appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
+ if (opt->socket_dir)
+ appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
+ opt->socket_dir);
+ appendPQExpBufferChar(pg_ctl_cmd, '"');
+#endif
+ }
+ if (opt->config_file != NULL)
+ appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
+ opt->config_file);
+ pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
+ rc = system(pg_ctl_cmd->data);
+ pg_ctl_status(pg_ctl_cmd->data, rc);
+ standby_running = true;
+ destroyPQExpBuffer(pg_ctl_cmd);
+ pg_log_info("server was started");
+}
+
+static void
+stop_standby_server(const char *datadir)
+{
+ char *pg_ctl_cmd;
+ int rc;
+
+ pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
+ datadir);
+ pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
+ rc = system(pg_ctl_cmd);
+ pg_ctl_status(pg_ctl_cmd, rc);
+ standby_running = false;
+ pg_log_info("server was stopped");
+}
+
+/*
+ * Returns after the server finishes the recovery process.
+ *
+ * If recovery_timeout option is set, terminate abnormally without finishing
+ * the recovery process. By default, it waits forever.
+ */
+static void
+wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
+{
+ PGconn *conn;
+ int status = POSTMASTER_STILL_STARTING;
+ int timer = 0;
+ int count = 0; /* number of consecutive connection attempts */
+
+#define NUM_CONN_ATTEMPTS 10
+
+ pg_log_info("waiting for the target server to reach the consistent state");
+
+ conn = connect_database(conninfo, true);
+
+ for (;;)
+ {
+ PGresult *res;
+ bool in_recovery = server_is_in_recovery(conn);
+
+ /*
+ * Does the recovery process finish? In dry run mode, there is no
+ * recovery mode. Bail out as the recovery process has ended.
+ */
+ if (!in_recovery || dry_run)
+ {
+ status = POSTMASTER_READY;
+ recovery_ended = true;
+ break;
+ }
+
+ /*
+ * If it is still in recovery, make sure the target server is
+ * connected to the primary so it can receive the required WAL to
+ * finish the recovery process. If it is disconnected try
+ * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
+ */
+ res = PQexec(conn,
+ "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
+ if (PQntuples(res) == 0)
+ {
+ if (++count > NUM_CONN_ATTEMPTS)
+ {
+ stop_standby_server(subscriber_dir);
+ pg_log_error("standby server disconnected from the primary");
+ break;
+ }
+ }
+ else
+ count = 0; /* reset counter if it connects again */
+
+ PQclear(res);
+
+ /* Bail out after recovery_timeout seconds if this option is set */
+ if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
+ {
+ stop_standby_server(subscriber_dir);
+ pg_log_error("recovery timed out");
+ disconnect_database(conn, true);
+ }
+
+ /* Keep waiting */
+ pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+
+ timer += WAIT_INTERVAL;
+ }
+
+ disconnect_database(conn, false);
+
+ if (status == POSTMASTER_STILL_STARTING)
+ pg_fatal("server did not end recovery");
+
+ pg_log_info("target server reached the consistent state");
+ pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
+}
+
+/*
+ * Create a publication that includes all tables in the database.
+ */
+static void
+create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ char *ipubname_esc;
+ char *spubname_esc;
+
+ Assert(conn != NULL);
+
+ ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+ spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+
+ /* Check if the publication already exists */
+ appendPQExpBuffer(str,
+ "SELECT 1 FROM pg_catalog.pg_publication "
+ "WHERE pubname = %s",
+ spubname_esc);
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain publication information: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) == 1)
+ {
+ /*
+ * Unfortunately, if it reaches this code path, it will always fail
+ * (unless you decide to change the existing publication name). That's
+ * bad but it is very unlikely that the user will choose a name with
+ * pg_createsubscriber_ prefix followed by the exact database oid and
+ * a random number.
+ */
+ pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
+ pg_log_error_hint("Consider renaming this publication before continuing.");
+ disconnect_database(conn, true);
+ }
+
+ PQclear(res);
+ resetPQExpBuffer(str);
+
+ pg_log_info("creating publication \"%s\" on database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+
+ appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
+ ipubname_esc);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
+ dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+ PQclear(res);
+ }
+
+ /* For cleanup purposes */
+ dbinfo->made_publication = true;
+
+ pg_free(ipubname_esc);
+ pg_free(spubname_esc);
+ destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove publication if it couldn't finish all steps.
+ */
+static void
+drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ char *pubname_esc;
+
+ Assert(conn != NULL);
+
+ pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+
+ pg_log_info("dropping publication \"%s\" on database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+
+ appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
+
+ pg_free(pubname_esc);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not drop publication \"%s\" on database \"%s\": %s",
+ dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+ dbinfo->made_publication = false; /* don't try again. */
+
+ /*
+ * Don't disconnect and exit here. This routine is used by primary
+ * (cleanup publication / replication slot due to an error) and
+ * subscriber (remove the replicated publications). In both cases,
+ * it can continue and provide instructions for the user to remove
+ * it later if cleanup fails.
+ */
+ }
+ PQclear(res);
+ }
+
+ destroyPQExpBuffer(str);
+}
+
+/*
+ * Create a subscription with some predefined options.
+ *
+ * A replication slot was already created in a previous step. Let's use it. It
+ * is not required to copy data. The subscription will be created but it will
+ * not be enabled now. That's because the replication progress must be set and
+ * the replication origin name (one of the function arguments) contains the
+ * subscription OID in its name. Once the subscription is created,
+ * set_replication_progress() can obtain the chosen origin name and set up its
+ * initial location.
+ */
+static void
+create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ char *pubname_esc;
+ char *subname_esc;
+ char *pubconninfo_esc;
+ char *replslotname_esc;
+
+ Assert(conn != NULL);
+
+ pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+ subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
+ pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
+ replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
+
+ pg_log_info("creating subscription \"%s\" on database \"%s\"",
+ dbinfo->subname, dbinfo->dbname);
+
+ appendPQExpBuffer(str,
+ "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
+ "WITH (create_slot = false, enabled = false, "
+ "slot_name = %s, copy_data = false)",
+ subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+
+ pg_free(pubname_esc);
+ pg_free(subname_esc);
+ pg_free(pubconninfo_esc);
+ pg_free(replslotname_esc);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
+ dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+ PQclear(res);
+ }
+
+ destroyPQExpBuffer(str);
+}
+
+/*
+ * Sets the replication progress to the consistent LSN.
+ *
+ * The subscriber caught up to the consistent LSN provided by the last
+ * replication slot that was created. The goal is to set up the initial
+ * location for the logical replication that is the exact LSN that the
+ * subscriber was promoted. Once the subscription is enabled it will start
+ * streaming from that location onwards. In dry run mode, the subscription OID
+ * and LSN are set to invalid values for printing purposes.
+ */
+static void
+set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ Oid suboid;
+ char *subname;
+ char *dbname;
+ char *originname;
+ char *lsnstr;
+
+ Assert(conn != NULL);
+
+ subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
+ dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
+
+ appendPQExpBuffer(str,
+ "SELECT s.oid FROM pg_catalog.pg_subscription s "
+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE s.subname = %s AND d.datname = %s",
+ subname, dbname);
+
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain subscription OID: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) != 1 && !dry_run)
+ {
+ pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows",
+ PQntuples(res), 1);
+ disconnect_database(conn, true);
+ }
+
+ if (dry_run)
+ {
+ suboid = InvalidOid;
+ lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+ }
+ else
+ {
+ suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+ lsnstr = psprintf("%s", lsn);
+ }
+
+ PQclear(res);
+
+ /*
+ * The origin name is defined as pg_%u. %u is the subscription OID. See
+ * ApplyWorkerMain().
+ */
+ originname = psprintf("pg_%u", suboid);
+
+ pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
+ originname, lsnstr, dbinfo->dbname);
+
+ resetPQExpBuffer(str);
+ appendPQExpBuffer(str,
+ "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
+ originname, lsnstr);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not set replication progress for the subscription \"%s\": %s",
+ dbinfo->subname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+ PQclear(res);
+ }
+
+ pg_free(subname);
+ pg_free(dbname);
+ pg_free(originname);
+ pg_free(lsnstr);
+ destroyPQExpBuffer(str);
+}
+
+/*
+ * Enables the subscription.
+ *
+ * The subscription was created in a previous step but it was disabled. After
+ * adjusting the initial logical replication location, enable the subscription.
+ */
+static void
+enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ char *subname;
+
+ Assert(conn != NULL);
+
+ subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
+
+ pg_log_info("enabling subscription \"%s\" on database \"%s\"",
+ dbinfo->subname, dbinfo->dbname);
+
+ appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
+
+ pg_log_debug("command is: %s", str->data);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not enable subscription \"%s\": %s",
+ dbinfo->subname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ PQclear(res);
+ }
+
+ pg_free(subname);
+ destroyPQExpBuffer(str);
+}
+
+int
+main(int argc, char **argv)
+{
+ static struct option long_options[] =
+ {
+ {"database", required_argument, NULL, 'd'},
+ {"pgdata", required_argument, NULL, 'D'},
+ {"dry-run", no_argument, NULL, 'n'},
+ {"subscriber-port", required_argument, NULL, 'p'},
+ {"publisher-server", required_argument, NULL, 'P'},
+ {"socket-directory", required_argument, NULL, 's'},
+ {"recovery-timeout", required_argument, NULL, 't'},
+ {"subscriber-username", required_argument, NULL, 'U'},
+ {"verbose", no_argument, NULL, 'v'},
+ {"version", no_argument, NULL, 'V'},
+ {"help", no_argument, NULL, '?'},
+ {"config-file", required_argument, NULL, 1},
+ {"publication", required_argument, NULL, 2},
+ {"replication-slot", required_argument, NULL, 3},
+ {"subscription", required_argument, NULL, 4},
+ {NULL, 0, NULL, 0}
+ };
+
+ struct CreateSubscriberOptions opt = {0};
+
+ int c;
+ int option_index;
+
+ char *pub_base_conninfo;
+ char *sub_base_conninfo;
+ char *dbname_conninfo = NULL;
+
+ uint64 pub_sysid;
+ uint64 sub_sysid;
+ struct stat statbuf;
+
+ char *consistent_lsn;
+
+ char pidfile[MAXPGPATH];
+
+ pg_logging_init(argv[0]);
+ pg_logging_set_level(PG_LOG_WARNING);
+ progname = get_progname(argv[0]);
+ set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage();
+ exit(0);
+ }
+ else if (strcmp(argv[1], "-V") == 0
+ || strcmp(argv[1], "--version") == 0)
+ {
+ puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
+ /* Default settings */
+ subscriber_dir = NULL;
+ opt.config_file = NULL;
+ opt.pub_conninfo_str = NULL;
+ opt.socket_dir = NULL;
+ opt.sub_port = DEFAULT_SUB_PORT;
+ opt.sub_username = NULL;
+ opt.database_names = (SimpleStringList)
+ {
+ 0
+ };
+ opt.recovery_timeout = 0;
+
+ /*
+ * Don't allow it to be run as root. It uses pg_ctl which does not allow
+ * it either.
+ */
+#ifndef WIN32
+ if (geteuid() == 0)
+ {
+ pg_log_error("cannot be executed by \"root\"");
+ pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
+ progname);
+ exit(1);
+ }
+#endif
+
+ get_restricted_token();
+
+ while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+ long_options, &option_index)) != -1)
+ {
+ switch (c)
+ {
+ case 'd':
+ if (!simple_string_list_member(&opt.database_names, optarg))
+ {
+ simple_string_list_append(&opt.database_names, optarg);
+ num_dbs++;
+ }
+ else
+ {
+ pg_log_error("duplicate database \"%s\"", optarg);
+ exit(1);
+ }
+ break;
+ case 'D':
+ subscriber_dir = pg_strdup(optarg);
+ canonicalize_path(subscriber_dir);
+ break;
+ case 'n':
+ dry_run = true;
+ break;
+ case 'p':
+ opt.sub_port = pg_strdup(optarg);
+ break;
+ case 'P':
+ opt.pub_conninfo_str = pg_strdup(optarg);
+ break;
+ case 's':
+ opt.socket_dir = pg_strdup(optarg);
+ canonicalize_path(opt.socket_dir);
+ break;
+ case 't':
+ opt.recovery_timeout = atoi(optarg);
+ break;
+ case 'U':
+ opt.sub_username = pg_strdup(optarg);
+ break;
+ case 'v':
+ pg_logging_increase_verbosity();
+ break;
+ case 1:
+ opt.config_file = pg_strdup(optarg);
+ break;
+ case 2:
+ if (!simple_string_list_member(&opt.pub_names, optarg))
+ {
+ simple_string_list_append(&opt.pub_names, optarg);
+ num_pubs++;
+ }
+ else
+ {
+ pg_log_error("duplicate publication \"%s\"", optarg);
+ exit(1);
+ }
+ break;
+ case 3:
+ if (!simple_string_list_member(&opt.replslot_names, optarg))
+ {
+ simple_string_list_append(&opt.replslot_names, optarg);
+ num_replslots++;
+ }
+ else
+ {
+ pg_log_error("duplicate replication slot \"%s\"", optarg);
+ exit(1);
+ }
+ break;
+ case 4:
+ if (!simple_string_list_member(&opt.sub_names, optarg))
+ {
+ simple_string_list_append(&opt.sub_names, optarg);
+ num_subs++;
+ }
+ else
+ {
+ pg_log_error("duplicate subscription \"%s\"", optarg);
+ exit(1);
+ }
+ break;
+ default:
+ /* getopt_long already emitted a complaint */
+ pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ exit(1);
+ }
+ }
+
+ /* Any non-option arguments? */
+ if (optind < argc)
+ {
+ pg_log_error("too many command-line arguments (first is \"%s\")",
+ argv[optind]);
+ pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ exit(1);
+ }
+
+ /* Required arguments */
+ if (subscriber_dir == NULL)
+ {
+ pg_log_error("no subscriber data directory specified");
+ pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ exit(1);
+ }
+
+ /* If socket directory is not provided, use the current directory */
+ if (opt.socket_dir == NULL)
+ {
+ char cwd[MAXPGPATH];
+
+ if (!getcwd(cwd, MAXPGPATH))
+ pg_fatal("could not determine current directory");
+ opt.socket_dir = pg_strdup(cwd);
+ canonicalize_path(opt.socket_dir);
+ }
+
+ /*
+ * Parse connection string. Build a base connection string that might be
+ * reused by multiple databases.
+ */
+ if (opt.pub_conninfo_str == NULL)
+ {
+ /*
+ * TODO use primary_conninfo (if available) from subscriber and
+ * extract publisher connection string. Assume that there are
+ * identical entries for physical and logical replication. If there is
+ * not, we would fail anyway.
+ */
+ pg_log_error("no publisher connection string specified");
+ pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ exit(1);
+ }
+ pg_log_info("validating connection string on publisher");
+ pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
+ &dbname_conninfo);
+ if (pub_base_conninfo == NULL)
+ exit(1);
+
+ pg_log_info("validating connection string on subscriber");
+ sub_base_conninfo = get_sub_conninfo(&opt);
+
+ if (opt.database_names.head == NULL)
+ {
+ pg_log_info("no database was specified");
+
+ /*
+ * If --database option is not provided, try to obtain the dbname from
+ * the publisher conninfo. If dbname parameter is not available, error
+ * out.
+ */
+ if (dbname_conninfo)
+ {
+ simple_string_list_append(&opt.database_names, dbname_conninfo);
+ num_dbs++;
+
+ pg_log_info("database \"%s\" was extracted from the publisher connection string",
+ dbname_conninfo);
+ }
+ else
+ {
+ pg_log_error("no database name specified");
+ pg_log_error_hint("Try \"%s --help\" for more information.",
+ progname);
+ exit(1);
+ }
+ }
+
+ /* Number of object names must match number of databases */
+ if (num_pubs > 0 && num_pubs != num_dbs)
+ {
+ pg_log_error("wrong number of publication names");
+ pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).",
+ num_pubs, num_dbs);
+ exit(1);
+ }
+ if (num_subs > 0 && num_subs != num_dbs)
+ {
+ pg_log_error("wrong number of subscription names");
+ pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).",
+ num_subs, num_dbs);
+ exit(1);
+ }
+ if (num_replslots > 0 && num_replslots != num_dbs)
+ {
+ pg_log_error("wrong number of replication slot names");
+ pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).",
+ num_replslots, num_dbs);
+ exit(1);
+ }
+
+ /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
+ pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
+ pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
+
+ /* Rudimentary check for a data directory */
+ check_data_directory(subscriber_dir);
+
+ /*
+ * Store database information for publisher and subscriber. It should be
+ * called before atexit() because its return is used in the
+ * cleanup_objects_atexit().
+ */
+ dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
+
+ /* Register a function to clean up objects in case of failure */
+ atexit(cleanup_objects_atexit);
+
+ /*
+ * Check if the subscriber data directory has the same system identifier
+ * than the publisher data directory.
+ */
+ pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
+ sub_sysid = get_standby_sysid(subscriber_dir);
+ if (pub_sysid != sub_sysid)
+ pg_fatal("subscriber data directory is not a copy of the source database cluster");
+
+ /* Subscriber PID file */
+ snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
+
+ /*
+ * The standby server must not be running. If the server is started under
+ * service manager and pg_createsubscriber stops it, the service manager
+ * might react to this action and start the server again. Therefore,
+ * refuse to proceed if the server is running to avoid possible failures.
+ */
+ if (stat(pidfile, &statbuf) == 0)
+ {
+ pg_log_error("standby is up and running");
+ pg_log_error_hint("Stop the standby and try again.");
+ exit(1);
+ }
+
+ /*
+ * Start a short-lived standby server with temporary parameters (provided
+ * by command-line options). The goal is to avoid connections during the
+ * transformation steps.
+ */
+ pg_log_info("starting the standby with command-line options");
+ start_standby_server(&opt, true);
+
+ /* Check if the standby server is ready for logical replication */
+ check_subscriber(dbinfo);
+
+ /*
+ * Check if the primary server is ready for logical replication. This
+ * routine checks if a replication slot is in use on primary so it relies
+ * on check_subscriber() to obtain the primary_slot_name. That's why it is
+ * called after it.
+ */
+ check_publisher(dbinfo);
+
+ /*
+ * Stop the target server. The recovery process requires that the server
+ * reaches a consistent state before targeting the recovery stop point.
+ * Make sure a consistent state is reached (stop the target server
+ * guarantees it) *before* creating the replication slots in
+ * setup_publisher().
+ */
+ pg_log_info("stopping the subscriber");
+ stop_standby_server(subscriber_dir);
+
+ /*
+ * Create the required objects for each database on publisher. This step
+ * is here mainly because if we stop the standby we cannot verify if the
+ * primary slot is in use. We could use an extra connection for it but it
+ * doesn't seem worth.
+ */
+ consistent_lsn = setup_publisher(dbinfo);
+
+ /* Write the required recovery parameters */
+ setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
+
+ /*
+ * Start subscriber so the recovery parameters will take effect. Wait
+ * until accepting connections.
+ */
+ pg_log_info("starting the subscriber");
+ start_standby_server(&opt, true);
+
+ /* Waiting the subscriber to be promoted */
+ wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
+
+ /*
+ * Create the subscription for each database on subscriber. It does not
+ * enable it immediately because it needs to adjust the replication start
+ * point to the LSN reported by setup_publisher(). It also cleans up
+ * publications created by this tool and replication to the standby.
+ */
+ setup_subscriber(dbinfo, consistent_lsn);
+
+ /* Remove primary_slot_name if it exists on primary */
+ drop_primary_replication_slot(dbinfo, primary_slot_name);
+
+ /* Stop the subscriber */
+ pg_log_info("stopping the subscriber");
+ stop_standby_server(subscriber_dir);
+
+ /* Change system identifier from subscriber */
+ modify_subscriber_sysid(&opt);
+
+ success = true;
+
+ pg_log_info("Done!");
+
+ return 0;
+}
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
new file mode 100644
index 00000000000..63ae6fdfc67
--- /dev/null
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -0,0 +1,364 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+#
+# Test using a standby server as the subscriber.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+program_help_ok('pg_createsubscriber');
+program_version_ok('pg_createsubscriber');
+program_options_handling_ok('pg_createsubscriber');
+
+my $datadir = PostgreSQL::Test::Utils::tempdir;
+
+#
+# Test mandatory options
+command_fails(['pg_createsubscriber'],
+ 'no subscriber data directory specified');
+command_fails(
+ [ 'pg_createsubscriber', '--pgdata', $datadir ],
+ 'no publisher connection string specified');
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $datadir,
+ '--publisher-server', 'port=5432'
+ ],
+ 'no database name specified');
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $datadir,
+ '--publisher-server', 'port=5432',
+ '--database', 'pg1',
+ '--database', 'pg1'
+ ],
+ 'duplicate database name');
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $datadir,
+ '--publisher-server', 'port=5432',
+ '--publication', 'foo1',
+ '--publication', 'foo1',
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'duplicate publication name');
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $datadir,
+ '--publisher-server', 'port=5432',
+ '--publication', 'foo1',
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'wrong number of publication names');
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $datadir,
+ '--publisher-server', 'port=5432',
+ '--publication', 'foo1',
+ '--publication', 'foo2',
+ '--subscription', 'bar1',
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'wrong number of subscription names');
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $datadir,
+ '--publisher-server', 'port=5432',
+ '--publication', 'foo1',
+ '--publication', 'foo2',
+ '--subscription', 'bar1',
+ '--subscription', 'bar2',
+ '--replication-slot', 'baz1',
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'wrong number of replication slot names');
+
+# Set up node P as primary
+my $node_p = PostgreSQL::Test::Cluster->new('node_p');
+$node_p->init(allows_streaming => 'logical');
+$node_p->start;
+
+# Set up node F as about-to-fail node
+# Force it to initialize a new cluster instead of copying a
+# previously initdb'd cluster. New cluster has a different system identifier so
+# we can test if the target cluster is a copy of the source cluster.
+my $node_f = PostgreSQL::Test::Cluster->new('node_f');
+$node_f->init(force_initdb => 1, allows_streaming => 'logical');
+
+# On node P
+# - create databases
+# - create test tables
+# - insert a row
+# - create a physical replication slot
+$node_p->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_p->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_p->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+my $slotname = 'physical_slot';
+$node_p->safe_psql('pg2',
+ "SELECT pg_create_physical_replication_slot('$slotname')");
+
+# Set up node S as standby linking to node P
+$node_p->backup('backup_1');
+my $node_s = PostgreSQL::Test::Cluster->new('node_s');
+$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s->append_conf(
+ 'postgresql.conf', qq[
+primary_slot_name = '$slotname'
+]);
+$node_s->set_standby_mode();
+$node_s->start;
+
+# Set up node T as standby linking to node P then promote it
+my $node_t = PostgreSQL::Test::Cluster->new('node_t');
+$node_t->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_t->set_standby_mode();
+$node_t->start;
+$node_t->promote;
+$node_t->stop;
+
+# Run pg_createsubscriber on a promoted server
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_t->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_t->host,
+ '--subscriber-port', $node_t->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'target server is not in recovery');
+
+# Run pg_createsubscriber when standby is running
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_s->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_s->host,
+ '--subscriber-port', $node_s->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'standby is up and running');
+
+# Run pg_createsubscriber on about-to-fail node F
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--pgdata', $node_f->data_dir,
+ '--publisher-server', $node_p->connstr('pg1'),
+ '--socket-directory', $node_f->host,
+ '--subscriber-port', $node_f->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'subscriber data directory is not a copy of the source database cluster');
+
+# Set up node C as standby linking to node S
+$node_s->backup('backup_2');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_s, 'backup_2', has_streaming => 1);
+$node_c->adjust_conf('postgresql.conf', 'primary_slot_name', undef);
+$node_c->set_standby_mode();
+
+# Run pg_createsubscriber on node C (P -> S -> C)
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_c->data_dir, '--publisher-server',
+ $node_s->connstr('pg1'),
+ '--socket-directory', $node_c->host,
+ '--subscriber-port', $node_c->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'primary server is in recovery');
+
+# Insert another row on node P and wait node S to catch up
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')");
+$node_p->wait_for_replay_catchup($node_s);
+
+# Check some unmet conditions on node P
+$node_p->append_conf('postgresql.conf', q{
+wal_level = replica
+max_replication_slots = 1
+max_wal_senders = 1
+max_worker_processes = 2
+});
+$node_p->restart;
+$node_s->stop;
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_s->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_s->host,
+ '--subscriber-port', $node_s->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'primary contains unmet conditions on node P');
+# Restore default settings here but only apply it after testing standby. Some
+# standby settings should not be a lower setting than on the primary.
+$node_p->append_conf('postgresql.conf', q{
+wal_level = logical
+max_replication_slots = 10
+max_wal_senders = 10
+max_worker_processes = 8
+});
+
+# Check some unmet conditions on node S
+$node_s->append_conf('postgresql.conf', q{
+max_replication_slots = 1
+max_logical_replication_workers = 1
+max_worker_processes = 2
+});
+command_fails(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_s->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_s->host,
+ '--subscriber-port', $node_s->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'standby contains unmet conditions on node S');
+$node_s->append_conf('postgresql.conf', q{
+max_replication_slots = 10
+max_logical_replication_workers = 4
+max_worker_processes = 8
+});
+# Restore default settings on both servers
+$node_p->restart;
+
+# dry run mode on node S
+command_ok(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_s->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_s->host,
+ '--subscriber-port', $node_s->port,
+ '--publication', 'pub1',
+ '--publication', 'pub2',
+ '--subscription', 'sub1',
+ '--subscription', 'sub2',
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'run pg_createsubscriber --dry-run on node S');
+
+# Check if node S is still a standby
+$node_s->start;
+is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
+ 't', 'standby is in recovery');
+$node_s->stop;
+
+# pg_createsubscriber can run without --databases option
+command_ok(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--dry-run', '--pgdata',
+ $node_s->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_s->host,
+ '--subscriber-port', $node_s->port,
+ '--replication-slot', 'replslot1'
+ ],
+ 'run pg_createsubscriber without --databases');
+
+# Run pg_createsubscriber on node S
+command_ok(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--verbose', '--pgdata',
+ $node_s->data_dir, '--publisher-server',
+ $node_p->connstr('pg1'),
+ '--socket-directory', $node_s->host,
+ '--subscriber-port', $node_s->port,
+ '--publication', 'pub1',
+ '--publication', 'Pub2',
+ '--replication-slot', 'replslot1',
+ '--replication-slot', 'replslot2',
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'run pg_createsubscriber on node S');
+
+# Confirm the physical replication slot has been removed
+my $result = $node_p->safe_psql('pg1',
+ "SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slotname'"
+);
+is($result, qq(0),
+ 'the physical replication slot used as primary_slot_name has been removed'
+);
+
+# Insert rows on P
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('third row')");
+$node_p->safe_psql('pg2', "INSERT INTO tbl2 VALUES('row 1')");
+
+# Start subscriber
+$node_s->start;
+
+# Get subscription names
+$result = $node_s->safe_psql(
+ 'postgres', qq(
+ SELECT subname FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+my @subnames = split("\n", $result);
+
+# Wait subscriber to catch up
+$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+
+# Check result on database pg1
+$result = $node_s->safe_psql('pg1', 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row),
+ 'logical replication works on database pg1');
+
+# Check result on database pg2
+$result = $node_s->safe_psql('pg2', 'SELECT * FROM tbl2');
+is($result, qq(row 1), 'logical replication works on database pg2');
+
+# Different system identifier?
+my $sysid_p = $node_p->safe_psql('postgres',
+ 'SELECT system_identifier FROM pg_control_system()');
+my $sysid_s = $node_s->safe_psql('postgres',
+ 'SELECT system_identifier FROM pg_control_system()');
+ok($sysid_p != $sysid_s, 'system identifier was changed');
+
+# clean up
+$node_p->teardown_node;
+$node_s->teardown_node;
+$node_t->teardown_node;
+$node_f->teardown_node;
+
+done_testing();