aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
authorNoah Misch <noah@leadboat.com>2024-01-08 11:39:56 -0800
committerNoah Misch <noah@leadboat.com>2024-01-08 11:39:56 -0800
commitd3c5f37dd543498cc7c678815d3921823beec9e9 (patch)
tree9c430f23ec95036ded0b8bb16446b30528159c9e /contrib/postgres_fdw/connection.c
parent0efc8318477714600567d15812dc8d15841e269e (diff)
downloadpostgresql-d3c5f37dd543498cc7c678815d3921823beec9e9.tar.gz
postgresql-d3c5f37dd543498cc7c678815d3921823beec9e9.zip
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. Use them in postgres_fdw, replacing a local implementation from which the libpqsrv implementation derives. This is a bug fix for dblink. Code inspection identified the bug at least thirteen years ago, but user complaints have not appeared. Hence, no back-patch for now. Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c88
1 files changed, 17 insertions, 71 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index fc69e189d5b..4931ebf5915 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
HASHCTL ctl;
+ if (pgfdw_we_get_result == 0)
+ pgfdw_we_get_result =
+ WaitEventExtensionNew("PostgresFdwGetResult");
+
ctl.keysize = sizeof(ConnCacheKey);
ctl.entrysize = sizeof(ConnCacheEntry);
ConnectionHash = hash_create("postgres_fdw connections", 8,
@@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
*/
if (consume_input && !PQconsumeInput(conn))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
- res = pgfdw_get_result(conn, sql);
+ res = pgfdw_get_result(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn)
/*
* Submit a query and wait for the result.
*
- * This function is interruptible by signals.
+ * Since we don't use non-blocking mode, this can't process interrupts while
+ * pushing the query text to the server. That risk is relatively small, so we
+ * ignore that for now.
*
* Caller is responsible for the error handling on the result.
*/
@@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
if (state && state->pendingAreq)
process_pending_request(state->pendingAreq);
- /*
- * Submit a query. Since we don't use non-blocking mode, this also can
- * block. But its risk is relatively small, so we ignore that for now.
- */
if (!PQsendQuery(conn, query))
- pgfdw_report_error(ERROR, NULL, conn, false, query);
-
- /* Wait for the result. */
- return pgfdw_get_result(conn, query);
+ return NULL;
+ return pgfdw_get_result(conn);
}
/*
- * Wait for the result from a prior asynchronous execution function call.
- *
- * This function offers quick responsiveness by checking for any interruptions.
- *
- * This function emulates PQexec()'s behavior of returning the last result
- * when there are many.
+ * Wrap libpqsrv_get_result_last(), adding wait event.
*
* Caller is responsible for the error handling on the result.
*/
PGresult *
-pgfdw_get_result(PGconn *conn, const char *query)
+pgfdw_get_result(PGconn *conn)
{
- PGresult *volatile last_res = NULL;
-
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
- {
- for (;;)
- {
- PGresult *res;
-
- while (PQisBusy(conn))
- {
- int wc;
-
- /* first time, allocate or get the custom wait event */
- if (pgfdw_we_get_result == 0)
- pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult");
-
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
- -1L, pgfdw_we_get_result);
- ResetLatch(MyLatch);
-
- CHECK_FOR_INTERRUPTS();
-
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
- {
- if (!PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL, conn, false, query);
- }
- }
-
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
-
- PQclear(last_res);
- last_res = res;
- }
- }
- PG_CATCH();
- {
- PQclear(last_res);
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- return last_res;
+ return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
}
/*
@@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
/*
* If we don't get a message from the PGresult, try the PGconn. This
- * is needed because for connection-level failures, PQexec may just
- * return NULL, not a PGresult at all.
+ * is needed because for connection-level failures, PQgetResult may
+ * just return NULL, not a PGresult at all.
*/
if (message_primary == NULL)
message_primary = pchomp(PQerrorMessage(conn));
@@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (entry->have_prep_stmt && entry->have_error)
{
- res = PQexec(entry->conn, "DEALLOCATE ALL");
+ res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
+ NULL);
PQclear(res);
}
entry->have_prep_stmt = false;