aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c96
1 files changed, 76 insertions, 20 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 2326f391d34..dbee33b37db 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -95,6 +95,13 @@ static uint32 pgfdw_we_get_result = 0;
*/
#define CONNECTION_CLEANUP_TIMEOUT 30000
+/*
+ * Milliseconds to wait before issuing another cancel request. This covers
+ * the race condition where the remote session ignored our cancel request
+ * because it arrived while idle.
+ */
+#define RETRY_CANCEL_TIMEOUT 1000
+
/* Macro for constructing abort command to be sent */
#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
do { \
@@ -145,6 +152,7 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+ TimestampTz retrycanceltime,
bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
@@ -154,6 +162,7 @@ static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
bool consume_input,
bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
+ TimestampTz retrycanceltime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
@@ -1322,18 +1331,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static bool
pgfdw_cancel_query(PGconn *conn)
{
+ TimestampTz now = GetCurrentTimestamp();
TimestampTz endtime;
+ TimestampTz retrycanceltime;
/*
* If it takes too long to cancel the query and discard the result, assume
* the connection is dead.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
- CONNECTION_CLEANUP_TIMEOUT);
+ endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
+
+ /*
+ * Also, lose patience and re-issue the cancel request after a little bit.
+ * (This serves to close some race conditions.)
+ */
+ retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
if (!pgfdw_cancel_query_begin(conn, endtime))
return false;
- return pgfdw_cancel_query_end(conn, endtime, false);
+ return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
}
/*
@@ -1359,9 +1375,10 @@ pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
}
static bool
-pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
+pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+ TimestampTz retrycanceltime, bool consume_input)
{
- PGresult *result = NULL;
+ PGresult *result;
bool timed_out;
/*
@@ -1380,7 +1397,8 @@ pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
}
/* Get and discard the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
+ &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1453,7 +1471,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
TimestampTz endtime, bool consume_input,
bool ignore_errors)
{
- PGresult *result = NULL;
+ PGresult *result;
bool timed_out;
Assert(query != NULL);
@@ -1471,7 +1489,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
}
/* Get the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1495,28 +1513,36 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
}
/*
- * Get, during abort cleanup, the result of a query that is in progress. This
- * might be a query that is being interrupted by transaction abort, or it might
- * be a query that was initiated as part of transaction abort to get the remote
- * side back to the appropriate state.
+ * Get, during abort cleanup, the result of a query that is in progress.
+ * This might be a query that is being interrupted by a cancel request or by
+ * transaction abort, or it might be a query that was initiated as part of
+ * transaction abort to get the remote side back to the appropriate state.
+ *
+ * endtime is the time at which we should give up and assume the remote side
+ * is dead. retrycanceltime is the time at which we should issue a fresh
+ * cancel request (pass the same value as endtime if this is not wanted).
*
- * endtime is the time at which we should give up and assume the remote
- * side is dead. Returns true if the timeout expired or connection trouble
- * occurred, false otherwise. Sets *result except in case of a timeout.
- * Sets timed_out to true only when the timeout expired.
+ * Returns true if the timeout expired or connection trouble occurred,
+ * false otherwise. Sets *result except in case of a true result.
+ * Sets *timed_out to true only when the timeout expired.
*/
static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
+pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
+ TimestampTz retrycanceltime,
+ PGresult **result,
bool *timed_out)
{
volatile bool failed = false;
PGresult *volatile last_res = NULL;
+ *result = NULL;
*timed_out = false;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
+ int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
+
for (;;)
{
PGresult *res;
@@ -1527,8 +1553,33 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
TimestampTz now = GetCurrentTimestamp();
long cur_timeout;
+ /* If timeout has expired, give up. */
+ if (now >= endtime)
+ {
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
+
+ /* If we need to re-issue the cancel request, do that. */
+ if (now >= retrycanceltime)
+ {
+ /* We ignore failure to issue the repeated request. */
+ (void) libpqsrv_cancel(conn, endtime);
+
+ /* Recompute "now" in case that took measurable time. */
+ now = GetCurrentTimestamp();
+
+ /* Adjust re-cancel timeout in increasing steps. */
+ retrycanceltime = TimestampTzPlusMilliseconds(now,
+ canceldelta);
+ canceldelta += canceldelta;
+ }
+
/* If timeout has expired, give up, else get sleep time. */
- cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+ cur_timeout = TimestampDifferenceMilliseconds(now,
+ Min(endtime,
+ retrycanceltime));
if (cur_timeout <= 0)
{
*timed_out = true;
@@ -1849,7 +1900,9 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
foreach(lc, cancel_requested)
{
ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz now = GetCurrentTimestamp();
TimestampTz endtime;
+ TimestampTz retrycanceltime;
char sql[100];
Assert(entry->changing_xact_state);
@@ -1863,10 +1916,13 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
* remaining entries in the list, leading to slamming that entry's
* connection shut.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ endtime = TimestampTzPlusMilliseconds(now,
CONNECTION_CLEANUP_TIMEOUT);
+ retrycanceltime = TimestampTzPlusMilliseconds(now,
+ RETRY_CANCEL_TIMEOUT);
- if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
+ if (!pgfdw_cancel_query_end(entry->conn, endtime,
+ retrycanceltime, true))
{
/* Unable to cancel running query */
pgfdw_reset_xact_state(entry, toplevel);