aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNathan Bossart <nathan@postgresql.org>2024-09-16 16:10:33 -0500
committerNathan Bossart <nathan@postgresql.org>2024-09-16 16:10:33 -0500
commitcf2f82a37cc35895b67c83dd2b33d2fcf4688a55 (patch)
tree28ad4b53ed6cac06e1f5966e5b07ce776fffcaa7 /src
parentc34eabfbbfd3d3799bc7bc61f22b1fe730c53fe8 (diff)
downloadpostgresql-cf2f82a37cc35895b67c83dd2b33d2fcf4688a55.tar.gz
postgresql-cf2f82a37cc35895b67c83dd2b33d2fcf4688a55.zip
pg_upgrade: Parallelize incompatible polymorphics check.
This commit makes use of the new task framework in pg_upgrade to parallelize the check for usage of incompatible polymorphic functions, i.e., those with arguments of type anyarray/anyelement rather than the newer anycompatible variants. This step will now process multiple databases concurrently when pg_upgrade's --jobs option is provided a value greater than 1. Reviewed-by: Daniel Gustafsson, Ilya Gladyshev Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_upgrade/check.c159
1 files changed, 83 insertions, 76 deletions
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 28c4ddbca37..c5fa1a5bedf 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1414,6 +1414,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
}
/*
+ * Callback function for processing results of query for
+ * check_for_incompatible_polymorphics()'s UpgradeTask. If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_objkind = PQfnumber(res, "objkind");
+ int i_objname = PQfnumber(res, "objname");
+
+ AssertVariableIsOfType(&process_incompat_polymorphics,
+ UpgradeTaskProcessCB);
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (report->file == NULL &&
+ (report->file = fopen_priv(report->path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m", report->path);
+ if (!db_used)
+ {
+ fprintf(report->file, "In database: %s\n", dbinfo->db_name);
+ db_used = true;
+ }
+
+ fprintf(report->file, " %s: %s\n",
+ PQgetvalue(res, rowno, i_objkind),
+ PQgetvalue(res, rowno, i_objname));
+ }
+}
+
+/*
* check_for_incompatible_polymorphics()
*
* Make sure nothing is using old polymorphic functions with
@@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
static void
check_for_incompatible_polymorphics(ClusterInfo *cluster)
{
- PGresult *res;
- FILE *script = NULL;
- char output_path[MAXPGPATH];
PQExpBufferData old_polymorphics;
+ UpgradeTask *task = upgrade_task_create();
+ UpgradeTaskReport report;
+ char *query;
prep_status("Checking for incompatible polymorphic functions");
- snprintf(output_path, sizeof(output_path), "%s/%s",
+ report.file = NULL;
+ snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir,
"incompatible_polymorphics.txt");
@@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
", 'array_positions(anyarray,anyelement)'"
", 'width_bucket(anyelement,anyarray)'");
- for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- {
- bool db_used = false;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
- int ntups;
- int i_objkind,
- i_objname;
-
- /*
- * The query below hardcodes FirstNormalObjectId as 16384 rather than
- * interpolating that C #define into the query because, if that
- * #define is ever changed, the cutoff we want to use is the value
- * used by pre-version 14 servers, not that of some future version.
- */
- res = executeQueryOrDie(conn,
- /* Aggregate transition functions */
- "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
- "FROM pg_proc AS p "
- "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
- "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
- "WHERE p.oid >= 16384 "
- "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
- "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
- /* Aggregate final functions */
- "UNION ALL "
- "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
- "FROM pg_proc AS p "
- "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
- "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
- "WHERE p.oid >= 16384 "
- "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
- "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
- /* Operators */
- "UNION ALL "
- "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
- "FROM pg_operator AS op "
- "WHERE op.oid >= 16384 "
- "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
- "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
- old_polymorphics.data,
- old_polymorphics.data,
- old_polymorphics.data);
-
- ntups = PQntuples(res);
-
- i_objkind = PQfnumber(res, "objkind");
- i_objname = PQfnumber(res, "objname");
-
- for (int rowno = 0; rowno < ntups; rowno++)
- {
- if (script == NULL &&
- (script = fopen_priv(output_path, "w")) == NULL)
- pg_fatal("could not open file \"%s\": %m", output_path);
- if (!db_used)
- {
- fprintf(script, "In database: %s\n", active_db->db_name);
- db_used = true;
- }
-
- fprintf(script, " %s: %s\n",
- PQgetvalue(res, rowno, i_objkind),
- PQgetvalue(res, rowno, i_objname));
- }
+ /*
+ * The query below hardcodes FirstNormalObjectId as 16384 rather than
+ * interpolating that C #define into the query because, if that #define is
+ * ever changed, the cutoff we want to use is the value used by
+ * pre-version 14 servers, not that of some future version.
+ */
- PQclear(res);
- PQfinish(conn);
- }
+ /* Aggregate transition functions */
+ query = psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+ "FROM pg_proc AS p "
+ "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+ "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
+ "WHERE p.oid >= 16384 "
+ "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
+ "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+ /* Aggregate final functions */
+ "UNION ALL "
+ "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+ "FROM pg_proc AS p "
+ "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+ "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
+ "WHERE p.oid >= 16384 "
+ "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
+ "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+ /* Operators */
+ "UNION ALL "
+ "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
+ "FROM pg_operator AS op "
+ "WHERE op.oid >= 16384 "
+ "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
+ "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[])",
+ old_polymorphics.data,
+ old_polymorphics.data,
+ old_polymorphics.data);
+
+ upgrade_task_add_step(task, query, process_incompat_polymorphics,
+ true, &report);
+ upgrade_task_run(task, cluster);
+ upgrade_task_free(task);
- if (script)
+ if (report.file)
{
- fclose(script);
+ fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains user-defined objects that refer to internal\n"
"polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n"
@@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
"afterwards, changing them to refer to the new corresponding functions with\n"
"arguments of type \"anycompatiblearray\" and \"anycompatible\".\n"
"A list of the problematic objects is in the file:\n"
- " %s", output_path);
+ " %s", report.path);
}
else
check_ok();
termPQExpBuffer(&old_polymorphics);
+ pg_free(query);
}
/*