aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/dblink/dblink.c21
-rw-r--r--contrib/postgres_fdw/connection.c47
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out15
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql7
-rw-r--r--src/include/libpq/libpq-be-fe-helpers.h89
5 files changed, 140 insertions, 39 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index edbc9ab02ac..de858e165ab 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1347,25 +1347,16 @@ Datum
dblink_cancel_query(PG_FUNCTION_ARGS)
{
PGconn *conn;
- PGcancelConn *cancelConn;
char *msg;
+ TimestampTz endtime;
dblink_init();
conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
- cancelConn = PQcancelCreate(conn);
-
- PG_TRY();
- {
- if (!PQcancelBlocking(cancelConn))
- msg = pchomp(PQcancelErrorMessage(cancelConn));
- else
- msg = "OK";
- }
- PG_FINALLY();
- {
- PQcancelFinish(cancelConn);
- }
- PG_END_TRY();
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ 30000);
+ msg = libpqsrv_cancel(conn, endtime);
+ if (msg == NULL)
+ msg = "OK";
PG_RETURN_TEXT_P(cstring_to_text(msg));
}
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf5915..2532e453c4e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
CONNECTION_CLEANUP_TIMEOUT);
- if (!pgfdw_cancel_query_begin(conn))
+ if (!pgfdw_cancel_query_begin(conn, endtime))
return false;
return pgfdw_cancel_query_end(conn, endtime, false);
}
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return true; if the timeout
+ * lapses without that, or the request fails for whatever reason, return
+ * false.
+ */
static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
{
- PGcancel *cancel;
- char errbuf[256];
+ char *errormsg = libpqsrv_cancel(conn, endtime);
- /*
- * Issue cancel request. Unfortunately, there's no good way to limit the
- * amount of time that we might block inside PQgetCancel().
- */
- if ((cancel = PQgetCancel(conn)))
- {
- if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
- {
- ereport(WARNING,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("could not send cancel request: %s",
- errbuf)));
- PQfreeCancel(cancel);
- return false;
- }
- PQfreeCancel(cancel);
- }
+ if (errormsg != NULL)
+ ereport(WARNING,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send cancel request: %s", errormsg));
- return true;
+ return errormsg == NULL;
}
static bool
@@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
{
- if (!pgfdw_cancel_query_begin(entry->conn))
+ TimestampTz endtime;
+
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+ if (!pgfdw_cancel_query_begin(entry->conn, endtime))
return false; /* Unable to cancel running query */
*cancel_requested = lappend(*cancel_requested, entry);
}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 3f0110c52b9..b7af86d3511 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
(10 rows)
ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+ Output: (count(*))
+ Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+ Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR: canceling statement due to statement timeout
+RESET statement_timeout;
-- ====================================================================
-- Check that userid to use when querying the remote table is correctly
-- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 5fffc4c53bd..6e1c819159c 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
-- ====================================================================
-- Check that userid to use when querying the remote table is correctly
-- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 5d33bcf32f7..2adf92030af 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -44,6 +44,8 @@
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/latch.h"
+#include "utils/timestamp.h"
+#include "utils/wait_event.h"
static inline void libpqsrv_connect_prepare(void);
@@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
return PQgetResult(conn);
}
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return NULL; if the cancel
+ * request fails, return an error message string (which is not to be
+ * freed).
+ *
+ * For other problems (to wit: OOM when strdup'ing an error message from
+ * libpq), this function can ereport(ERROR).
+ *
+ * Note: this function leaks a string's worth of memory when reporting
+ * libpq errors. Make sure to call it in a transient memory context.
+ */
+static inline char *
+libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
+{
+ PGcancelConn *cancel_conn;
+ char *error = NULL;
+
+ cancel_conn = PQcancelCreate(conn);
+ if (cancel_conn == NULL)
+ return _("out of memory");
+
+ /* In what follows, do not leak any PGcancelConn on any errors. */
+
+ PG_TRY();
+ {
+ if (!PQcancelStart(cancel_conn))
+ {
+ error = pchomp(PQcancelErrorMessage(cancel_conn));
+ goto exit;
+ }
+
+ for (;;)
+ {
+ PostgresPollingStatusType pollres;
+ TimestampTz now;
+ long cur_timeout;
+ int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ pollres = PQcancelPoll(cancel_conn);
+ if (pollres == PGRES_POLLING_OK)
+ break; /* success! */
+
+ /* If timeout has expired, give up, else get sleep time. */
+ now = GetCurrentTimestamp();
+ cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+ if (cur_timeout <= 0)
+ {
+ error = _("cancel request timed out");
+ break;
+ }
+
+ switch (pollres)
+ {
+ case PGRES_POLLING_READING:
+ waitEvents |= WL_SOCKET_READABLE;
+ break;
+ case PGRES_POLLING_WRITING:
+ waitEvents |= WL_SOCKET_WRITEABLE;
+ break;
+ default:
+ error = pchomp(PQcancelErrorMessage(cancel_conn));
+ goto exit;
+ }
+
+ /* Sleep until there's something to do */
+ WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+ cur_timeout, PG_WAIT_CLIENT);
+
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+exit: ;
+ }
+ PG_FINALLY();
+ {
+ PQcancelFinish(cancel_conn);
+ }
+ PG_END_TRY();
+
+ return error;
+}
+
#endif /* LIBPQ_BE_FE_HELPERS_H */