diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-04-21 10:46:09 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-04-21 10:49:09 -0400 |
commit | f039eaac7131ef2a4cf63a10cf98486f8bcd09d2 (patch) | |
tree | 7cf01d74aa16537ce41f72f9c796cc0e896270c6 /contrib/postgres_fdw/postgres_fdw.c | |
parent | 11e178d0dc4bc2328ae4759090b3c48b07023fab (diff) | |
download | postgresql-f039eaac7131ef2a4cf63a10cf98486f8bcd09d2.tar.gz postgresql-f039eaac7131ef2a4cf63a10cf98486f8bcd09d2.zip |
Allow queries submitted by postgres_fdw to be canceled.
This fixes a problem which is not new, but with the advent of direct
foreign table modification in 0bf3ae88af330496517722e391e7c975e6bad219,
it's somewhat more likely to be annoying than previously. So,
arrange for a local query cancelation to propagate to the remote side.
Michael Paquier, reviewed by Etsuro Fujita. Original report by
Thom Brown.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 123 |
1 files changed, 79 insertions, 44 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 28093e54562..2f492683a86 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1749,18 +1749,24 @@ postgresExecForeignInsert(EState *estate, p_values = convert_prep_stmt_params(fmstate, NULL, slot); /* - * Execute the prepared statement, and check for success. + * Execute the prepared statement. + */ + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1819,18 +1825,24 @@ postgresExecForeignUpdate(EState *estate, slot); /* - * Execute the prepared statement, and check for success. + * Execute the prepared statement. + */ + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1889,18 +1901,24 @@ postgresExecForeignDelete(EState *estate, NULL); /* - * Execute the prepared statement, and check for success. + * Execute the prepared statement. + */ + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1950,7 +1968,7 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -2712,7 +2730,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = PQexec(conn, sql); + res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -2817,12 +2835,18 @@ create_cursor(ForeignScanState *node) * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. + */ + if (!PQsendQueryParams(conn, buf.data, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + + /* + * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecParams(conn, buf.data, numParams, NULL, values, - NULL, NULL, 0); + res = pgfdw_get_result(conn, buf.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); @@ -2868,7 +2892,7 @@ fetch_more_data(ForeignScanState *node) snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); - res = PQexec(conn, sql); + res = pgfdw_exec_query(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); @@ -2978,7 +3002,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(conn, sql); + res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -3006,16 +3030,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * with the remote server using different type OIDs than we do. All of * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. + */ + if (!PQsendPrepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQprepare(fmstate->conn, - p_name, - fmstate->query, - 0, - NULL); - + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); @@ -3147,12 +3176,18 @@ execute_dml_stmt(ForeignScanState *node) * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. + */ + if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + + /* + * Get the result, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = PQexecParams(dmstate->conn, dmstate->query, - numParams, NULL, values, NULL, NULL, 0); + dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, @@ -3355,7 +3390,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -3449,7 +3484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -3500,7 +3535,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); - res = PQexec(conn, fetch_sql); + res = pgfdw_exec_query(conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -3675,7 +3710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = PQexec(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -3774,7 +3809,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = PQexec(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); |