aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_dump.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_dump.c')
-rw-r--r--src/bin/pg_dump/pg_dump.c464
1 files changed, 464 insertions, 0 deletions
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 883fde1e5aa..0bb363957a4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -246,6 +246,9 @@ static void getBlobs(Archive *fout);
static void dumpBlob(Archive *fout, BlobInfo *binfo);
static int dumpBlobs(Archive *fout, void *arg);
static void dumpPolicy(Archive *fout, PolicyInfo *polinfo);
+static void dumpPublication(Archive *fout, PublicationInfo *pubinfo);
+static void dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo);
+static void dumpSubscription(Archive *fout, SubscriptionInfo *subinfo);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
@@ -338,6 +341,7 @@ main(int argc, char **argv)
{"enable-row-security", no_argument, &dopt.enable_row_security, 1},
{"exclude-table-data", required_argument, NULL, 4},
{"if-exists", no_argument, &dopt.if_exists, 1},
+ {"include-subscriptions", no_argument, &dopt.include_subscriptions, 1},
{"inserts", no_argument, &dopt.dump_inserts, 1},
{"lock-wait-timeout", required_argument, NULL, 2},
{"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
@@ -348,6 +352,7 @@ main(int argc, char **argv)
{"snapshot", required_argument, NULL, 6},
{"strict-names", no_argument, &strict_names, 1},
{"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
+ {"no-create-subscription-slots", no_argument, &dopt.no_create_subscription_slots, 1},
{"no-security-labels", no_argument, &dopt.no_security_labels, 1},
{"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1},
{"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
@@ -849,6 +854,7 @@ main(int argc, char **argv)
ropt->include_everything = dopt.include_everything;
ropt->enable_row_security = dopt.enable_row_security;
ropt->sequence_data = dopt.sequence_data;
+ ropt->include_subscriptions = dopt.include_subscriptions;
if (compressLevel == -1)
ropt->compression = 0;
@@ -929,7 +935,10 @@ help(const char *progname)
" access to)\n"));
printf(_(" --exclude-table-data=TABLE do NOT dump data for the named table(s)\n"));
printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
+ printf(_(" --include-subscriptions dump logical replication subscriptions\n"));
printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
+ printf(_(" --no-create-subscription-slots\n"
+ " do not create replication slots for subscriptions\n"));
printf(_(" --no-security-labels do not dump security label assignments\n"));
printf(_(" --no-synchronized-snapshots do not use synchronized snapshots in parallel jobs\n"));
printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
@@ -3311,6 +3320,449 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo)
destroyPQExpBuffer(delqry);
}
+/*
+ * getPublications
+ * get information about publications
+ */
+void
+getPublications(Archive *fout)
+{
+ PQExpBuffer query;
+ PGresult *res;
+ PublicationInfo *pubinfo;
+ int i_tableoid;
+ int i_oid;
+ int i_pubname;
+ int i_rolname;
+ int i_puballtables;
+ int i_pubinsert;
+ int i_pubupdate;
+ int i_pubdelete;
+ int i,
+ ntups;
+
+ if (fout->remoteVersion < 100000)
+ return;
+
+ query = createPQExpBuffer();
+
+ resetPQExpBuffer(query);
+
+ /* Get the publications. */
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete "
+ "FROM pg_catalog.pg_publication p",
+ username_subquery);
+
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+
+ if (ntups == 0)
+ {
+ /*
+ * There are no publications defined. Clean up and return.
+ */
+ PQclear(res);
+ return;
+ }
+
+ i_tableoid = PQfnumber(res, "tableoid");
+ i_oid = PQfnumber(res, "oid");
+ i_pubname = PQfnumber(res, "pubname");
+ i_rolname = PQfnumber(res, "rolname");
+ i_puballtables = PQfnumber(res, "puballtables");
+ i_pubinsert = PQfnumber(res, "pubinsert");
+ i_pubupdate = PQfnumber(res, "pubupdate");
+ i_pubdelete = PQfnumber(res, "pubdelete");
+
+ pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
+
+ for (i = 0; i < ntups; i++)
+ {
+ pubinfo[i].dobj.objType = DO_PUBLICATION;
+ pubinfo[i].dobj.catId.tableoid =
+ atooid(PQgetvalue(res, i, i_tableoid));
+ pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
+ AssignDumpId(&pubinfo[i].dobj);
+ pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
+ pubinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
+ pubinfo[i].puballtables =
+ (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
+ pubinfo[i].pubinsert =
+ (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
+ pubinfo[i].pubupdate =
+ (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
+ pubinfo[i].pubdelete =
+ (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+
+ if (strlen(pubinfo[i].rolname) == 0)
+ write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n",
+ pubinfo[i].dobj.name);
+ }
+ PQclear(res);
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpPublication
+ * dump the definition of the given publication
+ */
+static void
+dumpPublication(Archive *fout, PublicationInfo *pubinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ PQExpBuffer delq;
+ PQExpBuffer query;
+
+ if (dopt->dataOnly)
+ return;
+
+ delq = createPQExpBuffer();
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
+ fmtId(pubinfo->dobj.name));
+
+ appendPQExpBuffer(query, "CREATE PUBLICATION %s",
+ fmtId(pubinfo->dobj.name));
+
+ if (pubinfo->puballtables)
+ appendPQExpBufferStr(query, " FOR ALL TABLES");
+
+ appendPQExpBufferStr(query, " WITH (");
+ if (pubinfo->pubinsert)
+ appendPQExpBufferStr(query, "PUBLISH INSERT");
+ else
+ appendPQExpBufferStr(query, "NOPUBLISH INSERT");
+
+ if (pubinfo->pubupdate)
+ appendPQExpBufferStr(query, ", PUBLISH UPDATE");
+ else
+ appendPQExpBufferStr(query, ", NOPUBLISH UPDATE");
+
+ if (pubinfo->pubdelete)
+ appendPQExpBufferStr(query, ", PUBLISH DELETE");
+ else
+ appendPQExpBufferStr(query, ", NOPUBLISH DELETE");
+
+ appendPQExpBufferStr(query, ");\n");
+
+ ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
+ pubinfo->dobj.name,
+ NULL,
+ NULL,
+ pubinfo->rolname, false,
+ "PUBLICATION", SECTION_POST_DATA,
+ query->data, delq->data, NULL,
+ NULL, 0,
+ NULL, NULL);
+
+ destroyPQExpBuffer(delq);
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * getPublicationTables
+ * get information about publication membership for dumpable tables.
+ */
+void
+getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
+{
+ PQExpBuffer query;
+ PGresult *res;
+ PublicationRelInfo *pubrinfo;
+ int i_tableoid;
+ int i_oid;
+ int i_pubname;
+ int i,
+ j,
+ ntups;
+
+ if (fout->remoteVersion < 100000)
+ return;
+
+ query = createPQExpBuffer();
+
+ for (i = 0; i < numTables; i++)
+ {
+ TableInfo *tbinfo = &tblinfo[i];
+
+ /* Only plain tables can be aded to publications. */
+ if (tbinfo->relkind != RELKIND_RELATION)
+ continue;
+
+ /*
+ * Ignore publication membership of tables whose definitions are
+ * not to be dumped.
+ */
+ if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+ continue;
+
+ if (g_verbose)
+ write_msg(NULL, "reading publication membership for table \"%s.%s\"\n",
+ tbinfo->dobj.namespace->dobj.name,
+ tbinfo->dobj.name);
+
+ resetPQExpBuffer(query);
+
+ /* Get the publication memebership for the table. */
+ appendPQExpBuffer(query,
+ "SELECT pr.tableoid, pr.oid, p.pubname "
+ "FROM pg_catalog.pg_publication_rel pr,"
+ " pg_catalog.pg_publication p "
+ "WHERE pr.prrelid = '%u'"
+ " AND p.oid = pr.prpubid",
+ tbinfo->dobj.catId.oid);
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+
+ if (ntups == 0)
+ {
+ /*
+ * Table is not member of any publications. Clean up and return.
+ */
+ PQclear(res);
+ continue;
+ }
+
+ i_tableoid = PQfnumber(res, "tableoid");
+ i_oid = PQfnumber(res, "oid");
+ i_pubname = PQfnumber(res, "pubname");
+
+ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
+
+ for (j = 0; j < ntups; j++)
+ {
+ pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
+ pubrinfo[j].dobj.catId.tableoid =
+ atooid(PQgetvalue(res, j, i_tableoid));
+ pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
+ AssignDumpId(&pubrinfo[j].dobj);
+ pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
+ pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
+ pubrinfo[j].pubtable = tbinfo;
+ }
+ PQclear(res);
+ }
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpPublicationTable
+ * dump the definition of the given publication table mapping
+ */
+static void
+dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ TableInfo *tbinfo = pubrinfo->pubtable;
+ PQExpBuffer query;
+ char *tag;
+
+ if (dopt->dataOnly)
+ return;
+
+ tag = psprintf("%s %s", pubrinfo->pubname, tbinfo->dobj.name);
+
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE",
+ fmtId(pubrinfo->pubname));
+ appendPQExpBuffer(query, " %s;",
+ fmtId(tbinfo->dobj.name));
+
+ /*
+ * There is no point in creating drop query as drop query as the drop
+ * is done by table drop.
+ */
+ ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
+ tag,
+ tbinfo->dobj.namespace->dobj.name,
+ NULL,
+ "", false,
+ "PUBLICATION TABLE", SECTION_POST_DATA,
+ query->data, "", NULL,
+ NULL, 0,
+ NULL, NULL);
+
+ free(tag);
+ destroyPQExpBuffer(query);
+}
+
+
+/*
+ * getSubscriptions
+ * get information about subscriptions
+ */
+void
+getSubscriptions(Archive *fout)
+{
+ DumpOptions *dopt = fout->dopt;
+ PQExpBuffer query;
+ PGresult *res;
+ SubscriptionInfo *subinfo;
+ int i_tableoid;
+ int i_oid;
+ int i_subname;
+ int i_rolname;
+ int i_subenabled;
+ int i_subconninfo;
+ int i_subslotname;
+ int i_subpublications;
+ int i,
+ ntups;
+
+ if (!dopt->include_subscriptions || fout->remoteVersion < 100000)
+ return;
+
+ query = createPQExpBuffer();
+
+ resetPQExpBuffer(query);
+
+ /* Get the subscriptions in current database. */
+ appendPQExpBuffer(query,
+ "SELECT s.tableoid, s.oid, s.subname,"
+ "(%s s.subowner) AS rolname, s.subenabled, "
+ " s.subconninfo, s.subslotname, s.subpublications "
+ "FROM pg_catalog.pg_subscription s "
+ "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
+ " WHERE datname = current_database())",
+ username_subquery);
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+
+ if (ntups == 0)
+ {
+ /*
+ * There are no subscriptions defined. Clean up and return.
+ */
+ PQclear(res);
+ return;
+ }
+
+ i_tableoid = PQfnumber(res, "tableoid");
+ i_oid = PQfnumber(res, "oid");
+ i_subname = PQfnumber(res, "subname");
+ i_rolname = PQfnumber(res, "rolname");
+ i_subenabled = PQfnumber(res, "subenabled");
+ i_subconninfo = PQfnumber(res, "subconninfo");
+ i_subslotname = PQfnumber(res, "subslotname");
+ i_subpublications = PQfnumber(res, "subpublications");
+
+ subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
+
+ for (i = 0; i < ntups; i++)
+ {
+ subinfo[i].dobj.objType = DO_SUBSCRIPTION;
+ subinfo[i].dobj.catId.tableoid =
+ atooid(PQgetvalue(res, i, i_tableoid));
+ subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
+ AssignDumpId(&subinfo[i].dobj);
+ subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
+ subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
+ subinfo[i].subenabled =
+ (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
+ subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
+ subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
+ subinfo[i].subpublications =
+ pg_strdup(PQgetvalue(res, i, i_subpublications));
+
+ if (strlen(subinfo[i].rolname) == 0)
+ write_msg(NULL, "WARNING: owner of subscription \"%s\" appears to be invalid\n",
+ subinfo[i].dobj.name);
+ }
+ PQclear(res);
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpSubscription
+ * dump the definition of the given subscription
+ */
+static void
+dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ PQExpBuffer delq;
+ PQExpBuffer query;
+ PQExpBuffer publications;
+ char **pubnames = NULL;
+ int npubnames = 0;
+ int i;
+
+ if (dopt->dataOnly)
+ return;
+
+ delq = createPQExpBuffer();
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
+ fmtId(subinfo->dobj.name));
+
+ appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+ fmtId(subinfo->dobj.name));
+ appendStringLiteralAH(query, subinfo->subconninfo, fout);
+
+ /* Build list of quoted publications and append them to query. */
+ if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
+ {
+ write_msg(NULL,
+ "WARNING: could not parse subpublications array\n");
+ if (pubnames)
+ free(pubnames);
+ pubnames = NULL;
+ npubnames = 0;
+ }
+
+ publications = createPQExpBuffer();
+ for (i = 0; i < npubnames; i++)
+ {
+ if (i > 0)
+ appendPQExpBufferStr(publications, ", ");
+
+ appendPQExpBufferStr(publications, fmtId(pubnames[i]));
+ }
+
+ appendPQExpBuffer(query, " PUBLICATION %s WITH (", publications->data);
+
+ if (subinfo->subenabled)
+ appendPQExpBufferStr(query, "ENABLED");
+ else
+ appendPQExpBufferStr(query, "DISABLED");
+
+ appendPQExpBufferStr(query, ", SLOT NAME = ");
+ appendStringLiteralAH(query, subinfo->subslotname, fout);
+
+ if (dopt->no_create_subscription_slots)
+ appendPQExpBufferStr(query, ", NOCREATE SLOT");
+
+ appendPQExpBufferStr(query, ");\n");
+
+ ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
+ subinfo->dobj.name,
+ NULL,
+ NULL,
+ subinfo->rolname, false,
+ "SUBSCRIPTION", SECTION_POST_DATA,
+ query->data, delq->data, NULL,
+ NULL, 0,
+ NULL, NULL);
+
+ destroyPQExpBuffer(publications);
+ if (pubnames)
+ free(pubnames);
+
+ destroyPQExpBuffer(delq);
+ destroyPQExpBuffer(query);
+}
+
static void
binary_upgrade_set_type_oids_by_type_oid(Archive *fout,
PQExpBuffer upgrade_buffer,
@@ -8752,6 +9204,15 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_POLICY:
dumpPolicy(fout, (PolicyInfo *) dobj);
break;
+ case DO_PUBLICATION:
+ dumpPublication(fout, (PublicationInfo *) dobj);
+ break;
+ case DO_PUBLICATION_REL:
+ dumpPublicationTable(fout, (PublicationRelInfo *) dobj);
+ break;
+ case DO_SUBSCRIPTION:
+ dumpSubscription(fout, (SubscriptionInfo *) dobj);
+ break;
case DO_PRE_DATA_BOUNDARY:
case DO_POST_DATA_BOUNDARY:
/* never dumped, nothing to do */
@@ -16627,6 +17088,9 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
case DO_EVENT_TRIGGER:
case DO_DEFAULT_ACL:
case DO_POLICY:
+ case DO_PUBLICATION:
+ case DO_PUBLICATION_REL:
+ case DO_SUBSCRIPTION:
/* Post-data objects: must come after the post-data boundary */
addObjectDependency(dobj, postDataBound->dumpId);
break;