diff options
Diffstat (limited to 'src/bin/pg_dump/pg_dump.c')
-rw-r--r-- | src/bin/pg_dump/pg_dump.c | 464 |
1 files changed, 464 insertions, 0 deletions
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 883fde1e5aa..0bb363957a4 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -246,6 +246,9 @@ static void getBlobs(Archive *fout); static void dumpBlob(Archive *fout, BlobInfo *binfo); static int dumpBlobs(Archive *fout, void *arg); static void dumpPolicy(Archive *fout, PolicyInfo *polinfo); +static void dumpPublication(Archive *fout, PublicationInfo *pubinfo); +static void dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo); +static void dumpSubscription(Archive *fout, SubscriptionInfo *subinfo); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); @@ -338,6 +341,7 @@ main(int argc, char **argv) {"enable-row-security", no_argument, &dopt.enable_row_security, 1}, {"exclude-table-data", required_argument, NULL, 4}, {"if-exists", no_argument, &dopt.if_exists, 1}, + {"include-subscriptions", no_argument, &dopt.include_subscriptions, 1}, {"inserts", no_argument, &dopt.dump_inserts, 1}, {"lock-wait-timeout", required_argument, NULL, 2}, {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1}, @@ -348,6 +352,7 @@ main(int argc, char **argv) {"snapshot", required_argument, NULL, 6}, {"strict-names", no_argument, &strict_names, 1}, {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1}, + {"no-create-subscription-slots", no_argument, &dopt.no_create_subscription_slots, 1}, {"no-security-labels", no_argument, &dopt.no_security_labels, 1}, {"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1}, {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1}, @@ -849,6 +854,7 @@ main(int argc, char **argv) ropt->include_everything = dopt.include_everything; ropt->enable_row_security = dopt.enable_row_security; ropt->sequence_data = dopt.sequence_data; + ropt->include_subscriptions = dopt.include_subscriptions; if (compressLevel == -1) ropt->compression = 0; @@ -929,7 +935,10 @@ help(const char *progname) " access to)\n")); printf(_(" --exclude-table-data=TABLE do NOT dump data for the named table(s)\n")); printf(_(" --if-exists use IF EXISTS when dropping objects\n")); + printf(_(" --include-subscriptions dump logical replication subscriptions\n")); printf(_(" --inserts dump data as INSERT commands, rather than COPY\n")); + printf(_(" --no-create-subscription-slots\n" + " do not create replication slots for subscriptions\n")); printf(_(" --no-security-labels do not dump security label assignments\n")); printf(_(" --no-synchronized-snapshots do not use synchronized snapshots in parallel jobs\n")); printf(_(" --no-tablespaces do not dump tablespace assignments\n")); @@ -3311,6 +3320,449 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo) destroyPQExpBuffer(delqry); } +/* + * getPublications + * get information about publications + */ +void +getPublications(Archive *fout) +{ + PQExpBuffer query; + PGresult *res; + PublicationInfo *pubinfo; + int i_tableoid; + int i_oid; + int i_pubname; + int i_rolname; + int i_puballtables; + int i_pubinsert; + int i_pubupdate; + int i_pubdelete; + int i, + ntups; + + if (fout->remoteVersion < 100000) + return; + + query = createPQExpBuffer(); + + resetPQExpBuffer(query); + + /* Get the publications. */ + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete " + "FROM pg_catalog.pg_publication p", + username_subquery); + + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * There are no publications defined. Clean up and return. + */ + PQclear(res); + return; + } + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pubname = PQfnumber(res, "pubname"); + i_rolname = PQfnumber(res, "rolname"); + i_puballtables = PQfnumber(res, "puballtables"); + i_pubinsert = PQfnumber(res, "pubinsert"); + i_pubupdate = PQfnumber(res, "pubupdate"); + i_pubdelete = PQfnumber(res, "pubdelete"); + + pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); + + for (i = 0; i < ntups; i++) + { + pubinfo[i].dobj.objType = DO_PUBLICATION; + pubinfo[i].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&pubinfo[i].dobj); + pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname)); + pubinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); + pubinfo[i].puballtables = + (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0); + pubinfo[i].pubinsert = + (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0); + pubinfo[i].pubupdate = + (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0); + pubinfo[i].pubdelete = + (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); + + if (strlen(pubinfo[i].rolname) == 0) + write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n", + pubinfo[i].dobj.name); + } + PQclear(res); + + destroyPQExpBuffer(query); +} + +/* + * dumpPublication + * dump the definition of the given publication + */ +static void +dumpPublication(Archive *fout, PublicationInfo *pubinfo) +{ + DumpOptions *dopt = fout->dopt; + PQExpBuffer delq; + PQExpBuffer query; + + if (dopt->dataOnly) + return; + + delq = createPQExpBuffer(); + query = createPQExpBuffer(); + + appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n", + fmtId(pubinfo->dobj.name)); + + appendPQExpBuffer(query, "CREATE PUBLICATION %s", + fmtId(pubinfo->dobj.name)); + + if (pubinfo->puballtables) + appendPQExpBufferStr(query, " FOR ALL TABLES"); + + appendPQExpBufferStr(query, " WITH ("); + if (pubinfo->pubinsert) + appendPQExpBufferStr(query, "PUBLISH INSERT"); + else + appendPQExpBufferStr(query, "NOPUBLISH INSERT"); + + if (pubinfo->pubupdate) + appendPQExpBufferStr(query, ", PUBLISH UPDATE"); + else + appendPQExpBufferStr(query, ", NOPUBLISH UPDATE"); + + if (pubinfo->pubdelete) + appendPQExpBufferStr(query, ", PUBLISH DELETE"); + else + appendPQExpBufferStr(query, ", NOPUBLISH DELETE"); + + appendPQExpBufferStr(query, ");\n"); + + ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, + pubinfo->dobj.name, + NULL, + NULL, + pubinfo->rolname, false, + "PUBLICATION", SECTION_POST_DATA, + query->data, delq->data, NULL, + NULL, 0, + NULL, NULL); + + destroyPQExpBuffer(delq); + destroyPQExpBuffer(query); +} + +/* + * getPublicationTables + * get information about publication membership for dumpable tables. + */ +void +getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) +{ + PQExpBuffer query; + PGresult *res; + PublicationRelInfo *pubrinfo; + int i_tableoid; + int i_oid; + int i_pubname; + int i, + j, + ntups; + + if (fout->remoteVersion < 100000) + return; + + query = createPQExpBuffer(); + + for (i = 0; i < numTables; i++) + { + TableInfo *tbinfo = &tblinfo[i]; + + /* Only plain tables can be aded to publications. */ + if (tbinfo->relkind != RELKIND_RELATION) + continue; + + /* + * Ignore publication membership of tables whose definitions are + * not to be dumped. + */ + if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + continue; + + if (g_verbose) + write_msg(NULL, "reading publication membership for table \"%s.%s\"\n", + tbinfo->dobj.namespace->dobj.name, + tbinfo->dobj.name); + + resetPQExpBuffer(query); + + /* Get the publication memebership for the table. */ + appendPQExpBuffer(query, + "SELECT pr.tableoid, pr.oid, p.pubname " + "FROM pg_catalog.pg_publication_rel pr," + " pg_catalog.pg_publication p " + "WHERE pr.prrelid = '%u'" + " AND p.oid = pr.prpubid", + tbinfo->dobj.catId.oid); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * Table is not member of any publications. Clean up and return. + */ + PQclear(res); + continue; + } + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pubname = PQfnumber(res, "pubname"); + + pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); + + for (j = 0; j < ntups; j++) + { + pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + pubrinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, j, i_tableoid)); + pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid)); + AssignDumpId(&pubrinfo[j].dobj); + pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace; + pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname)); + pubrinfo[j].pubtable = tbinfo; + } + PQclear(res); + } + destroyPQExpBuffer(query); +} + +/* + * dumpPublicationTable + * dump the definition of the given publication table mapping + */ +static void +dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo) +{ + DumpOptions *dopt = fout->dopt; + TableInfo *tbinfo = pubrinfo->pubtable; + PQExpBuffer query; + char *tag; + + if (dopt->dataOnly) + return; + + tag = psprintf("%s %s", pubrinfo->pubname, tbinfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE", + fmtId(pubrinfo->pubname)); + appendPQExpBuffer(query, " %s;", + fmtId(tbinfo->dobj.name)); + + /* + * There is no point in creating drop query as drop query as the drop + * is done by table drop. + */ + ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId, + tag, + tbinfo->dobj.namespace->dobj.name, + NULL, + "", false, + "PUBLICATION TABLE", SECTION_POST_DATA, + query->data, "", NULL, + NULL, 0, + NULL, NULL); + + free(tag); + destroyPQExpBuffer(query); +} + + +/* + * getSubscriptions + * get information about subscriptions + */ +void +getSubscriptions(Archive *fout) +{ + DumpOptions *dopt = fout->dopt; + PQExpBuffer query; + PGresult *res; + SubscriptionInfo *subinfo; + int i_tableoid; + int i_oid; + int i_subname; + int i_rolname; + int i_subenabled; + int i_subconninfo; + int i_subslotname; + int i_subpublications; + int i, + ntups; + + if (!dopt->include_subscriptions || fout->remoteVersion < 100000) + return; + + query = createPQExpBuffer(); + + resetPQExpBuffer(query); + + /* Get the subscriptions in current database. */ + appendPQExpBuffer(query, + "SELECT s.tableoid, s.oid, s.subname," + "(%s s.subowner) AS rolname, s.subenabled, " + " s.subconninfo, s.subslotname, s.subpublications " + "FROM pg_catalog.pg_subscription s " + "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" + " WHERE datname = current_database())", + username_subquery); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * There are no subscriptions defined. Clean up and return. + */ + PQclear(res); + return; + } + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_subname = PQfnumber(res, "subname"); + i_rolname = PQfnumber(res, "rolname"); + i_subenabled = PQfnumber(res, "subenabled"); + i_subconninfo = PQfnumber(res, "subconninfo"); + i_subslotname = PQfnumber(res, "subslotname"); + i_subpublications = PQfnumber(res, "subpublications"); + + subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); + + for (i = 0; i < ntups; i++) + { + subinfo[i].dobj.objType = DO_SUBSCRIPTION; + subinfo[i].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&subinfo[i].dobj); + subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname)); + subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); + subinfo[i].subenabled = + (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); + subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); + subinfo[i].subpublications = + pg_strdup(PQgetvalue(res, i, i_subpublications)); + + if (strlen(subinfo[i].rolname) == 0) + write_msg(NULL, "WARNING: owner of subscription \"%s\" appears to be invalid\n", + subinfo[i].dobj.name); + } + PQclear(res); + + destroyPQExpBuffer(query); +} + +/* + * dumpSubscription + * dump the definition of the given subscription + */ +static void +dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) +{ + DumpOptions *dopt = fout->dopt; + PQExpBuffer delq; + PQExpBuffer query; + PQExpBuffer publications; + char **pubnames = NULL; + int npubnames = 0; + int i; + + if (dopt->dataOnly) + return; + + delq = createPQExpBuffer(); + query = createPQExpBuffer(); + + appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", + fmtId(subinfo->dobj.name)); + + appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", + fmtId(subinfo->dobj.name)); + appendStringLiteralAH(query, subinfo->subconninfo, fout); + + /* Build list of quoted publications and append them to query. */ + if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) + { + write_msg(NULL, + "WARNING: could not parse subpublications array\n"); + if (pubnames) + free(pubnames); + pubnames = NULL; + npubnames = 0; + } + + publications = createPQExpBuffer(); + for (i = 0; i < npubnames; i++) + { + if (i > 0) + appendPQExpBufferStr(publications, ", "); + + appendPQExpBufferStr(publications, fmtId(pubnames[i])); + } + + appendPQExpBuffer(query, " PUBLICATION %s WITH (", publications->data); + + if (subinfo->subenabled) + appendPQExpBufferStr(query, "ENABLED"); + else + appendPQExpBufferStr(query, "DISABLED"); + + appendPQExpBufferStr(query, ", SLOT NAME = "); + appendStringLiteralAH(query, subinfo->subslotname, fout); + + if (dopt->no_create_subscription_slots) + appendPQExpBufferStr(query, ", NOCREATE SLOT"); + + appendPQExpBufferStr(query, ");\n"); + + ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, + subinfo->dobj.name, + NULL, + NULL, + subinfo->rolname, false, + "SUBSCRIPTION", SECTION_POST_DATA, + query->data, delq->data, NULL, + NULL, 0, + NULL, NULL); + + destroyPQExpBuffer(publications); + if (pubnames) + free(pubnames); + + destroyPQExpBuffer(delq); + destroyPQExpBuffer(query); +} + static void binary_upgrade_set_type_oids_by_type_oid(Archive *fout, PQExpBuffer upgrade_buffer, @@ -8752,6 +9204,15 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_POLICY: dumpPolicy(fout, (PolicyInfo *) dobj); break; + case DO_PUBLICATION: + dumpPublication(fout, (PublicationInfo *) dobj); + break; + case DO_PUBLICATION_REL: + dumpPublicationTable(fout, (PublicationRelInfo *) dobj); + break; + case DO_SUBSCRIPTION: + dumpSubscription(fout, (SubscriptionInfo *) dobj); + break; case DO_PRE_DATA_BOUNDARY: case DO_POST_DATA_BOUNDARY: /* never dumped, nothing to do */ @@ -16627,6 +17088,9 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_EVENT_TRIGGER: case DO_DEFAULT_ACL: case DO_POLICY: + case DO_PUBLICATION: + case DO_PUBLICATION_REL: + case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); break; |