diff options
Diffstat (limited to 'src/bin/pg_dump/pg_dump.c')
-rw-r--r-- | src/bin/pg_dump/pg_dump.c | 229 |
1 files changed, 226 insertions, 3 deletions
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 050a8312265..8973ec715c1 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -297,6 +297,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo); static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo); static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo); static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo); +static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo); static void dumpDatabase(Archive *fout); static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf, const char *dbname, Oid dboid); @@ -4638,6 +4639,8 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_suborigin; + int i_suboriginremotelsn; + int i_subenabled; int i, ntups; @@ -4693,16 +4696,30 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " s.subpasswordrequired,\n" " s.subrunasowner,\n" - " s.suborigin\n"); + " s.suborigin,\n"); else appendPQExpBuffer(query, " 't' AS subpasswordrequired,\n" " 't' AS subrunasowner,\n" - " '%s' AS suborigin\n", + " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY); + if (dopt->binary_upgrade && fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n" + " s.subenabled\n"); + else + appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n" + " false AS subenabled\n"); + + appendPQExpBufferStr(query, + "FROM pg_subscription s\n"); + + if (dopt->binary_upgrade && fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" + " ON o.external_id = 'pg_' || s.oid::text \n"); + appendPQExpBufferStr(query, - "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" " WHERE datname = current_database())"); @@ -4729,6 +4746,8 @@ getSubscriptions(Archive *fout) i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); i_suborigin = PQfnumber(res, "suborigin"); + i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); + i_subenabled = PQfnumber(res, "subenabled"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4766,6 +4785,13 @@ getSubscriptions(Archive *fout) subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + if (PQgetisnull(res, i, i_suboriginremotelsn)) + subinfo[i].suboriginremotelsn = NULL; + else + subinfo[i].suboriginremotelsn = + pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); + subinfo[i].subenabled = + pg_strdup(PQgetvalue(res, i, i_subenabled)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4776,6 +4802,162 @@ getSubscriptions(Archive *fout) } /* + * getSubscriptionTables + * Get information about subscription membership for dumpable tables. This + * will be used only in binary-upgrade mode for PG17 or later versions. + */ +void +getSubscriptionTables(Archive *fout) +{ + DumpOptions *dopt = fout->dopt; + SubscriptionInfo *subinfo = NULL; + SubRelInfo *subrinfo; + PGresult *res; + int i_srsubid; + int i_srrelid; + int i_srsubstate; + int i_srsublsn; + int ntups; + Oid last_srsubid = InvalidOid; + + if (dopt->no_subscriptions || !dopt->binary_upgrade || + fout->remoteVersion < 170000) + return; + + res = ExecuteSqlQuery(fout, + "SELECT srsubid, srrelid, srsubstate, srsublsn " + "FROM pg_catalog.pg_subscription_rel " + "ORDER BY srsubid", + PGRES_TUPLES_OK); + ntups = PQntuples(res); + if (ntups == 0) + goto cleanup; + + /* Get pg_subscription_rel attributes */ + i_srsubid = PQfnumber(res, "srsubid"); + i_srrelid = PQfnumber(res, "srrelid"); + i_srsubstate = PQfnumber(res, "srsubstate"); + i_srsublsn = PQfnumber(res, "srsublsn"); + + subrinfo = pg_malloc(ntups * sizeof(SubRelInfo)); + for (int i = 0; i < ntups; i++) + { + Oid cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid)); + Oid relid = atooid(PQgetvalue(res, i, i_srrelid)); + TableInfo *tblinfo; + + /* + * If we switched to a new subscription, check if the subscription + * exists. + */ + if (cur_srsubid != last_srsubid) + { + subinfo = findSubscriptionByOid(cur_srsubid); + if (subinfo == NULL) + pg_fatal("subscription with OID %u does not exist", cur_srsubid); + + last_srsubid = cur_srsubid; + } + + tblinfo = findTableByOid(relid); + if (tblinfo == NULL) + pg_fatal("failed sanity check, table with OID %u not found", + relid); + + /* OK, make a DumpableObject for this relationship */ + subrinfo[i].dobj.objType = DO_SUBSCRIPTION_REL; + subrinfo[i].dobj.catId.tableoid = relid; + subrinfo[i].dobj.catId.oid = cur_srsubid; + AssignDumpId(&subrinfo[i].dobj); + subrinfo[i].dobj.name = pg_strdup(subinfo->dobj.name); + subrinfo[i].tblinfo = tblinfo; + subrinfo[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0]; + if (PQgetisnull(res, i, i_srsublsn)) + subrinfo[i].srsublsn = NULL; + else + subrinfo[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn)); + + subrinfo[i].subinfo = subinfo; + + /* Decide whether we want to dump it */ + selectDumpableObject(&(subrinfo[i].dobj), fout); + } + +cleanup: + PQclear(res); +} + +/* + * dumpSubscriptionTable + * Dump the definition of the given subscription table mapping. This will be + * used only in binary-upgrade mode for PG17 or later versions. + */ +static void +dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo) +{ + DumpOptions *dopt = fout->dopt; + SubscriptionInfo *subinfo = subrinfo->subinfo; + PQExpBuffer query; + char *tag; + + /* Do nothing in data-only dump */ + if (dopt->dataOnly) + return; + + Assert(fout->dopt->binary_upgrade && fout->remoteVersion >= 170000); + + tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name); + + query = createPQExpBuffer(); + + if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) + { + /* + * binary_upgrade_add_sub_rel_state will add the subscription relation + * to pg_subscription_rel table. This will be used only in + * binary-upgrade mode. + */ + appendPQExpBufferStr(query, + "\n-- For binary upgrade, must preserve the subscriber table.\n"); + appendPQExpBufferStr(query, + "SELECT pg_catalog.binary_upgrade_add_sub_rel_state("); + appendStringLiteralAH(query, subrinfo->dobj.name, fout); + appendPQExpBuffer(query, + ", %u, '%c'", + subrinfo->tblinfo->dobj.catId.oid, + subrinfo->srsubstate); + + if (subrinfo->srsublsn && subrinfo->srsublsn[0] != '\0') + appendPQExpBuffer(query, ", '%s'", subrinfo->srsublsn); + else + appendPQExpBuffer(query, ", NULL"); + + appendPQExpBufferStr(query, ");\n"); + } + + /* + * There is no point in creating a drop query as the drop is done by table + * drop. (If you think to change this, see also _printTocEntry().) + * Although this object doesn't really have ownership as such, set the + * owner field anyway to ensure that the command is run by the correct + * role at restore time. + */ + if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) + ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name, + .owner = subinfo->rolname, + .description = "SUBSCRIPTION TABLE", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + /* These objects can't currently have comments or seclabels */ + + free(tag); + destroyPQExpBuffer(query); +} + +/* * dumpSubscription * dump the definition of the given subscription */ @@ -4855,6 +5037,43 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ");\n"); + /* + * In binary-upgrade mode, we allow the replication to continue after the + * upgrade. + */ + if (dopt->binary_upgrade && fout->remoteVersion >= 170000) + { + if (subinfo->suboriginremotelsn) + { + /* + * Preserve the remote_lsn for the subscriber's replication + * origin. This value is required to start the replication from + * the position before the upgrade. This value will be stale if + * the publisher gets upgraded before the subscriber node. + * However, this shouldn't be a problem as the upgrade of the + * publisher ensures that all the transactions were replicated + * before upgrading it. + */ + appendPQExpBufferStr(query, + "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n"); + appendPQExpBufferStr(query, + "SELECT pg_catalog.binary_upgrade_replorigin_advance("); + appendStringLiteralAH(query, subinfo->dobj.name, fout); + appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn); + } + + if (strcmp(subinfo->subenabled, "t") == 0) + { + /* + * Enable the subscription to allow the replication to continue + * after the upgrade. + */ + appendPQExpBufferStr(query, + "\n-- For binary upgrade, must preserve the subscriber's running state.\n"); + appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s ENABLE;\n", qsubname); + } + } + if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = subinfo->dobj.name, @@ -10477,6 +10696,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; + case DO_SUBSCRIPTION_REL: + dumpSubscriptionTable(fout, (const SubRelInfo *) dobj); + break; case DO_PRE_DATA_BOUNDARY: case DO_POST_DATA_BOUNDARY: /* never dumped, nothing to do */ @@ -18543,6 +18765,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_PUBLICATION_REL: case DO_PUBLICATION_TABLE_IN_SCHEMA: case DO_SUBSCRIPTION: + case DO_SUBSCRIPTION_REL: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); break; |