aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2016-04-21 10:46:09 -0400
committerRobert Haas <rhaas@postgresql.org>2016-04-21 10:49:09 -0400
commitf039eaac7131ef2a4cf63a10cf98486f8bcd09d2 (patch)
tree7cf01d74aa16537ce41f72f9c796cc0e896270c6 /contrib/postgres_fdw/postgres_fdw.c
parent11e178d0dc4bc2328ae4759090b3c48b07023fab (diff)
downloadpostgresql-f039eaac7131ef2a4cf63a10cf98486f8bcd09d2.tar.gz
postgresql-f039eaac7131ef2a4cf63a10cf98486f8bcd09d2.zip
Allow queries submitted by postgres_fdw to be canceled.
This fixes a problem which is not new, but with the advent of direct foreign table modification in 0bf3ae88af330496517722e391e7c975e6bad219, it's somewhat more likely to be annoying than previously. So, arrange for a local query cancelation to propagate to the remote side. Michael Paquier, reviewed by Etsuro Fujita. Original report by Thom Brown.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c123
1 files changed, 79 insertions, 44 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 28093e54562..2f492683a86 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@@ -1749,18 +1749,24 @@ postgresExecForeignInsert(EState *estate,
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1819,18 +1825,24 @@ postgresExecForeignUpdate(EState *estate,
slot);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1889,18 +1901,24 @@ postgresExecForeignDelete(EState *estate,
NULL);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1950,7 +1968,7 @@ postgresEndForeignModify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@@ -2712,7 +2730,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@@ -2817,12 +2835,18 @@ create_cursor(ForeignScanState *node)
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecParams(conn, buf.data, numParams, NULL, values,
- NULL, NULL, 0);
+ res = pgfdw_get_result(conn, buf.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
@@ -2868,7 +2892,7 @@ fetch_more_data(ForeignScanState *node)
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +3002,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -3006,16 +3030,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* with the remote server using different type OIDs than we do. All of
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
+ */
+ if (!PQsendPrepare(fmstate->conn,
+ p_name,
+ fmstate->query,
+ 0,
+ NULL))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQprepare(fmstate->conn,
- p_name,
- fmstate->query,
- 0,
- NULL);
-
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
@@ -3147,12 +3176,18 @@ execute_dml_stmt(ForeignScanState *node)
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
- numParams, NULL, values, NULL, NULL, 0);
+ dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3390,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3449,7 +3484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@@ -3500,7 +3535,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
fetch_size, cursor_number);
- res = PQexec(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = PQexec(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -3774,7 +3809,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = PQexec(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);