aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_dump.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_dump.c')
-rw-r--r--src/bin/pg_dump/pg_dump.c229
1 files changed, 226 insertions, 3 deletions
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 050a8312265..8973ec715c1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -297,6 +297,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
+static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
static void dumpDatabase(Archive *fout);
static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
const char *dbname, Oid dboid);
@@ -4638,6 +4639,8 @@ getSubscriptions(Archive *fout)
int i_subsynccommit;
int i_subpublications;
int i_suborigin;
+ int i_suboriginremotelsn;
+ int i_subenabled;
int i,
ntups;
@@ -4693,16 +4696,30 @@ getSubscriptions(Archive *fout)
appendPQExpBufferStr(query,
" s.subpasswordrequired,\n"
" s.subrunasowner,\n"
- " s.suborigin\n");
+ " s.suborigin,\n");
else
appendPQExpBuffer(query,
" 't' AS subpasswordrequired,\n"
" 't' AS subrunasowner,\n"
- " '%s' AS suborigin\n",
+ " '%s' AS suborigin,\n",
LOGICALREP_ORIGIN_ANY);
+ if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
+ appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
+ " s.subenabled\n");
+ else
+ appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
+ " false AS subenabled\n");
+
+ appendPQExpBufferStr(query,
+ "FROM pg_subscription s\n");
+
+ if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
+ appendPQExpBufferStr(query,
+ "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
+ " ON o.external_id = 'pg_' || s.oid::text \n");
+
appendPQExpBufferStr(query,
- "FROM pg_subscription s\n"
"WHERE s.subdbid = (SELECT oid FROM pg_database\n"
" WHERE datname = current_database())");
@@ -4729,6 +4746,8 @@ getSubscriptions(Archive *fout)
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_suborigin = PQfnumber(res, "suborigin");
+ i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+ i_subenabled = PQfnumber(res, "subenabled");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -4766,6 +4785,13 @@ getSubscriptions(Archive *fout)
subinfo[i].subpublications =
pg_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+ if (PQgetisnull(res, i, i_suboriginremotelsn))
+ subinfo[i].suboriginremotelsn = NULL;
+ else
+ subinfo[i].suboriginremotelsn =
+ pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+ subinfo[i].subenabled =
+ pg_strdup(PQgetvalue(res, i, i_subenabled));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4776,6 +4802,162 @@ getSubscriptions(Archive *fout)
}
/*
+ * getSubscriptionTables
+ * Get information about subscription membership for dumpable tables. This
+ * will be used only in binary-upgrade mode for PG17 or later versions.
+ */
+void
+getSubscriptionTables(Archive *fout)
+{
+ DumpOptions *dopt = fout->dopt;
+ SubscriptionInfo *subinfo = NULL;
+ SubRelInfo *subrinfo;
+ PGresult *res;
+ int i_srsubid;
+ int i_srrelid;
+ int i_srsubstate;
+ int i_srsublsn;
+ int ntups;
+ Oid last_srsubid = InvalidOid;
+
+ if (dopt->no_subscriptions || !dopt->binary_upgrade ||
+ fout->remoteVersion < 170000)
+ return;
+
+ res = ExecuteSqlQuery(fout,
+ "SELECT srsubid, srrelid, srsubstate, srsublsn "
+ "FROM pg_catalog.pg_subscription_rel "
+ "ORDER BY srsubid",
+ PGRES_TUPLES_OK);
+ ntups = PQntuples(res);
+ if (ntups == 0)
+ goto cleanup;
+
+ /* Get pg_subscription_rel attributes */
+ i_srsubid = PQfnumber(res, "srsubid");
+ i_srrelid = PQfnumber(res, "srrelid");
+ i_srsubstate = PQfnumber(res, "srsubstate");
+ i_srsublsn = PQfnumber(res, "srsublsn");
+
+ subrinfo = pg_malloc(ntups * sizeof(SubRelInfo));
+ for (int i = 0; i < ntups; i++)
+ {
+ Oid cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid));
+ Oid relid = atooid(PQgetvalue(res, i, i_srrelid));
+ TableInfo *tblinfo;
+
+ /*
+ * If we switched to a new subscription, check if the subscription
+ * exists.
+ */
+ if (cur_srsubid != last_srsubid)
+ {
+ subinfo = findSubscriptionByOid(cur_srsubid);
+ if (subinfo == NULL)
+ pg_fatal("subscription with OID %u does not exist", cur_srsubid);
+
+ last_srsubid = cur_srsubid;
+ }
+
+ tblinfo = findTableByOid(relid);
+ if (tblinfo == NULL)
+ pg_fatal("failed sanity check, table with OID %u not found",
+ relid);
+
+ /* OK, make a DumpableObject for this relationship */
+ subrinfo[i].dobj.objType = DO_SUBSCRIPTION_REL;
+ subrinfo[i].dobj.catId.tableoid = relid;
+ subrinfo[i].dobj.catId.oid = cur_srsubid;
+ AssignDumpId(&subrinfo[i].dobj);
+ subrinfo[i].dobj.name = pg_strdup(subinfo->dobj.name);
+ subrinfo[i].tblinfo = tblinfo;
+ subrinfo[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+ if (PQgetisnull(res, i, i_srsublsn))
+ subrinfo[i].srsublsn = NULL;
+ else
+ subrinfo[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+
+ subrinfo[i].subinfo = subinfo;
+
+ /* Decide whether we want to dump it */
+ selectDumpableObject(&(subrinfo[i].dobj), fout);
+ }
+
+cleanup:
+ PQclear(res);
+}
+
+/*
+ * dumpSubscriptionTable
+ * Dump the definition of the given subscription table mapping. This will be
+ * used only in binary-upgrade mode for PG17 or later versions.
+ */
+static void
+dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ SubscriptionInfo *subinfo = subrinfo->subinfo;
+ PQExpBuffer query;
+ char *tag;
+
+ /* Do nothing in data-only dump */
+ if (dopt->dataOnly)
+ return;
+
+ Assert(fout->dopt->binary_upgrade && fout->remoteVersion >= 170000);
+
+ tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name);
+
+ query = createPQExpBuffer();
+
+ if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+ {
+ /*
+ * binary_upgrade_add_sub_rel_state will add the subscription relation
+ * to pg_subscription_rel table. This will be used only in
+ * binary-upgrade mode.
+ */
+ appendPQExpBufferStr(query,
+ "\n-- For binary upgrade, must preserve the subscriber table.\n");
+ appendPQExpBufferStr(query,
+ "SELECT pg_catalog.binary_upgrade_add_sub_rel_state(");
+ appendStringLiteralAH(query, subrinfo->dobj.name, fout);
+ appendPQExpBuffer(query,
+ ", %u, '%c'",
+ subrinfo->tblinfo->dobj.catId.oid,
+ subrinfo->srsubstate);
+
+ if (subrinfo->srsublsn && subrinfo->srsublsn[0] != '\0')
+ appendPQExpBuffer(query, ", '%s'", subrinfo->srsublsn);
+ else
+ appendPQExpBuffer(query, ", NULL");
+
+ appendPQExpBufferStr(query, ");\n");
+ }
+
+ /*
+ * There is no point in creating a drop query as the drop is done by table
+ * drop. (If you think to change this, see also _printTocEntry().)
+ * Although this object doesn't really have ownership as such, set the
+ * owner field anyway to ensure that the command is run by the correct
+ * role at restore time.
+ */
+ if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+ ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId,
+ ARCHIVE_OPTS(.tag = tag,
+ .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name,
+ .owner = subinfo->rolname,
+ .description = "SUBSCRIPTION TABLE",
+ .section = SECTION_POST_DATA,
+ .createStmt = query->data));
+
+ /* These objects can't currently have comments or seclabels */
+
+ free(tag);
+ destroyPQExpBuffer(query);
+}
+
+/*
* dumpSubscription
* dump the definition of the given subscription
*/
@@ -4855,6 +5037,43 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
+ /*
+ * In binary-upgrade mode, we allow the replication to continue after the
+ * upgrade.
+ */
+ if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
+ {
+ if (subinfo->suboriginremotelsn)
+ {
+ /*
+ * Preserve the remote_lsn for the subscriber's replication
+ * origin. This value is required to start the replication from
+ * the position before the upgrade. This value will be stale if
+ * the publisher gets upgraded before the subscriber node.
+ * However, this shouldn't be a problem as the upgrade of the
+ * publisher ensures that all the transactions were replicated
+ * before upgrading it.
+ */
+ appendPQExpBufferStr(query,
+ "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n");
+ appendPQExpBufferStr(query,
+ "SELECT pg_catalog.binary_upgrade_replorigin_advance(");
+ appendStringLiteralAH(query, subinfo->dobj.name, fout);
+ appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn);
+ }
+
+ if (strcmp(subinfo->subenabled, "t") == 0)
+ {
+ /*
+ * Enable the subscription to allow the replication to continue
+ * after the upgrade.
+ */
+ appendPQExpBufferStr(query,
+ "\n-- For binary upgrade, must preserve the subscriber's running state.\n");
+ appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s ENABLE;\n", qsubname);
+ }
+ }
+
if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = subinfo->dobj.name,
@@ -10477,6 +10696,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_SUBSCRIPTION:
dumpSubscription(fout, (const SubscriptionInfo *) dobj);
break;
+ case DO_SUBSCRIPTION_REL:
+ dumpSubscriptionTable(fout, (const SubRelInfo *) dobj);
+ break;
case DO_PRE_DATA_BOUNDARY:
case DO_POST_DATA_BOUNDARY:
/* never dumped, nothing to do */
@@ -18543,6 +18765,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
case DO_PUBLICATION_REL:
case DO_PUBLICATION_TABLE_IN_SCHEMA:
case DO_SUBSCRIPTION:
+ case DO_SUBSCRIPTION_REL:
/* Post-data objects: must come after the post-data boundary */
addObjectDependency(dobj, postDataBound->dumpId);
break;