aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEtsuro Fujita <efujita@postgresql.org>2023-04-06 17:30:00 +0900
committerEtsuro Fujita <efujita@postgresql.org>2023-04-06 17:30:00 +0900
commit983ec23007bd83a649af9bc823f13feb0da27e0e (patch)
treed16c129d9a721b8ca9001390dc67bd58d072f451
parentb9b125b9c14381c4d04a446e335bb2da5f602596 (diff)
downloadpostgresql-983ec23007bd83a649af9bc823f13feb0da27e0e.tar.gz
postgresql-983ec23007bd83a649af9bc823f13feb0da27e0e.zip
postgres_fdw: Add support for parallel abort.
postgres_fdw aborts remote (sub)transactions opened on remote server(s) in a local (sub)transaction one by one when the local (sub)transaction aborts. This patch allows it to abort the remote (sub)transactions in parallel to improve performance. This is enabled by the server option "parallel_abort". The default is false. Etsuro Fujita, reviewed by David Zhang. Discussion: http://postgr.es/m/CAPmGK15FuPVGx3TGHKShsbPKKtF1y58-ZLcKoxfN-nqLj1dZ%3Dg%40mail.gmail.com
-rw-r--r--contrib/postgres_fdw/connection.c403
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out53
-rw-r--r--contrib/postgres_fdw/option.c2
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql27
-rw-r--r--doc/src/sgml/postgres-fdw.sgml47
5 files changed, 489 insertions, 43 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8eb9194506c..2969351e9a9 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -60,6 +60,7 @@ typedef struct ConnCacheEntry
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
bool parallel_commit; /* do we commit (sub)xacts in parallel? */
+ bool parallel_abort; /* do we abort (sub)xacts in parallel? */
bool invalidated; /* true if reconnect is pending */
bool keep_connections; /* setting value of keep_connections
* server option */
@@ -82,6 +83,25 @@ static unsigned int prep_stmt_number = 0;
static bool xact_got_connection = false;
/*
+ * Milliseconds to wait to cancel an in-progress query or execute a cleanup
+ * query; if it takes longer than 30 seconds to do these, we assume the
+ * connection is dead.
+ */
+#define CONNECTION_CLEANUP_TIMEOUT 30000
+
+/* Macro for constructing abort command to be sent */
+#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
+ do { \
+ if (toplevel) \
+ snprintf((sql), sizeof(sql), \
+ "ABORT TRANSACTION"); \
+ else \
+ snprintf((sql), sizeof(sql), \
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
+ (entry)->xact_depth, (entry)->xact_depth); \
+ } while(0)
+
+/*
* SQL functions
*/
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
@@ -107,14 +127,28 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+ bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
+static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
+static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime,
+ bool consume_input,
+ bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries,
+ List **cancel_requested);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
+static void pgfdw_finish_abort_cleanup(List *pending_entries,
+ List *cancel_requested,
+ bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -320,8 +354,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*
* By default, all the connections to any foreign servers are kept open.
*
- * Also determine whether to commit (sub)transactions opened on the remote
- * server in parallel at (sub)transaction end, which is disabled by
+ * Also determine whether to commit/abort (sub)transactions opened on the
+ * remote server in parallel at (sub)transaction end, which is disabled by
* default.
*
* Note: it's enough to determine these only when making a new connection
@@ -330,6 +364,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*/
entry->keep_connections = true;
entry->parallel_commit = false;
+ entry->parallel_abort = false;
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
@@ -338,6 +373,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->keep_connections = defGetBoolean(def);
else if (strcmp(def->defname, "parallel_commit") == 0)
entry->parallel_commit = defGetBoolean(def);
+ else if (strcmp(def->defname, "parallel_abort") == 0)
+ entry->parallel_abort = defGetBoolean(def);
}
/* Now try to make the connection */
@@ -892,6 +929,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
@@ -985,7 +1023,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/* Rollback all remote transactions during abort */
- pgfdw_abort_cleanup(entry, true);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, true,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, true);
break;
}
}
@@ -995,11 +1041,21 @@ pgfdw_xact_callback(XactEvent event, void *arg)
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
- event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
+ event == XACT_EVENT_PRE_COMMIT)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_commit_cleanup(pending_entries);
+ }
+ else
+ {
+ Assert(event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ true);
+ }
}
/*
@@ -1024,6 +1080,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1078,7 +1135,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
else
{
/* Rollback all remote subtransactions during abort */
- pgfdw_abort_cleanup(entry, false);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, false,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, false);
}
/* OK, we're outta that level of subtransaction */
@@ -1086,10 +1151,19 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ }
+ else
+ {
+ Assert(event == SUBXACT_EVENT_ABORT_SUB);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ false);
+ }
}
}
@@ -1233,17 +1307,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static bool
pgfdw_cancel_query(PGconn *conn)
{
- PGcancel *cancel;
- char errbuf[256];
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to cancel the query and discard the result, assume
* the connection is dead.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_begin(conn))
+ return false;
+ return pgfdw_cancel_query_end(conn, endtime, false);
+}
+
+static bool
+pgfdw_cancel_query_begin(PGconn *conn)
+{
+ PGcancel *cancel;
+ char errbuf[256];
/*
* Issue cancel request. Unfortunately, there's no good way to limit the
@@ -1263,6 +1345,30 @@ pgfdw_cancel_query(PGconn *conn)
PQfreeCancel(cancel);
}
+ return true;
+}
+
+static bool
+pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not get result of cancel request: %s",
+ pchomp(PQerrorMessage(conn)))));
+ return false;
+ }
+
/* Get and discard the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
@@ -1297,9 +1403,7 @@ pgfdw_cancel_query(PGconn *conn)
static bool
pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
{
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to execute a cleanup query, assume the connection
@@ -1307,8 +1411,18 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
* place (e.g. statement timeout, user cancel), so the timeout shouldn't
* be too long.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_begin(conn, query))
+ return false;
+ return pgfdw_exec_cleanup_query_end(conn, query, endtime,
+ false, ignore_errors);
+}
+static bool
+pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
+{
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -1319,6 +1433,29 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
return false;
}
+ return true;
+}
+
+static bool
+pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime, bool consume_input,
+ bool ignore_errors)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ pgfdw_report_error(WARNING, NULL, conn, false, query);
+ return false;
+ }
+
/* Get the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
@@ -1474,12 +1611,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
!pgfdw_cancel_query(entry->conn))
return; /* Unable to cancel running query */
- if (toplevel)
- snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
- else
- snprintf(sql, sizeof(sql),
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
- entry->xact_depth, entry->xact_depth);
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
return; /* Unable to abort remote (sub)transaction */
@@ -1509,6 +1641,65 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
}
/*
+ * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
+ * don't wait for the result.
+ *
+ * Returns true if the abort command or cancel request is successfully issued,
+ * false otherwise. If the abort command is successfully issued, the given
+ * connection cache entry is appended to *pending_entries. Othewise, if the
+ * cancel request is successfully issued, it is appended to *cancel_requested.
+ */
+static bool
+pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries, List **cancel_requested)
+{
+ /*
+ * Don't try to clean up the connection if we're already in error
+ * recursion trouble.
+ */
+ if (in_error_recursion_trouble())
+ entry->changing_xact_state = true;
+
+ /*
+ * If connection is already unsalvageable, don't touch it further.
+ */
+ if (entry->changing_xact_state)
+ return false;
+
+ /*
+ * Mark this connection as in the process of changing transaction state.
+ */
+ entry->changing_xact_state = true;
+
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
+
+ /*
+ * If a command has been submitted to the remote server by using an
+ * asynchronous execution function, the command might not have yet
+ * completed. Check to see if a command is still being processed by the
+ * remote server, and if so, request cancellation of the command.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ if (!pgfdw_cancel_query_begin(entry->conn))
+ return false; /* Unable to cancel running query */
+ *cancel_requested = lappend(*cancel_requested, entry);
+ }
+ else
+ {
+ char sql[100];
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ return false; /* Unable to abort remote transaction */
+ *pending_entries = lappend(*pending_entries, entry);
+ }
+
+ return true;
+}
+
+/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
*/
@@ -1617,6 +1808,168 @@ pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
}
/*
+ * Finish abort cleanup of connections on each of which we've sent an abort
+ * command or cancel request to the remote server.
+ */
+static void
+pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
+ bool toplevel)
+{
+ List *pending_deallocs = NIL;
+ ListCell *lc;
+
+ /*
+ * For each of the pending cancel requests (if any), get and discard the
+ * result of the query, and submit an abort command to the remote server.
+ */
+ if (cancel_requested)
+ {
+ foreach(lc, cancel_requested)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. You might think we should do this before issuing
+ * cancel request like in normal mode, but that is problematic,
+ * because if, for example, it took longer than 30 seconds to
+ * process the first few entries in the cancel_requested list, it
+ * would cause a timeout error when processing each of the
+ * remaining entries in the list, leading to slamming that entry's
+ * connection shut.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
+ {
+ /* Unable to cancel running query */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ /* Send an abort command in parallel if needed */
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_entries = lappend(pending_entries, entry);
+ }
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_entries)
+ return;
+
+ /*
+ * Get the result of the abort command for each of the pending entries
+ */
+ foreach(lc, pending_entries)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
+ true, false))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ if (toplevel)
+ {
+ /* Do a DEALLOCATE ALL in parallel if needed */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn,
+ "DEALLOCATE ALL"))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_deallocs)
+ return;
+ Assert(toplevel);
+
+ /*
+ * Get the result of the DEALLOCATE command for each of the pending
+ * entries
+ */
+ foreach(lc, pending_deallocs)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+
+ Assert(entry->changing_xact_state);
+ Assert(entry->have_prep_stmt);
+ Assert(entry->have_error);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
+ endtime, true, true))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+}
+
+/*
* List active foreign server connections.
*
* This function takes no input parameter and returns setof record made of
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 04a3ef450cf..8f6a04f71b6 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11683,10 +11683,12 @@ SELECT count(*) FROM remote_application_name
DROP FOREIGN TABLE remote_application_name;
DROP VIEW my_application_name;
-- ===================================================================
--- test parallel commit
+-- test parallel commit and parallel abort
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true');
ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true');
CREATE TABLE ploc1 (f1 int, f2 text);
CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
SERVER loopback OPTIONS (table_name 'ploc1');
@@ -11756,8 +11758,57 @@ SELECT * FROM prem2;
204 | quxqux
(3 rows)
+BEGIN;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+ f1 | f2
+-----+--------
+ 101 | foo
+ 102 | foofoo
+ 104 | bazbaz
+(3 rows)
+
+SELECT * FROM prem2;
+ f1 | f2
+-----+--------
+ 201 | bar
+ 202 | barbar
+ 204 | quxqux
+(3 rows)
+
+-- This tests executing DEALLOCATE ALL against foreign servers in parallel
+-- during post-abort
+BEGIN;
+SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ROLLBACK TO SAVEPOINT s;
+RELEASE SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+ f1 | f2
+-----+--------
+ 101 | foo
+ 102 | foofoo
+ 104 | bazbaz
+(3 rows)
+
+SELECT * FROM prem2;
+ f1 | f2
+-----+--------
+ 201 | bar
+ 202 | barbar
+ 204 | quxqux
+(3 rows)
+
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback OPTIONS (DROP parallel_abort);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback2 OPTIONS (DROP parallel_abort);
-- ===================================================================
-- test for ANALYZE sampling
-- ===================================================================
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index d530f7d0860..4229d2048c3 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -125,6 +125,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
strcmp(def->defname, "truncatable") == 0 ||
strcmp(def->defname, "async_capable") == 0 ||
strcmp(def->defname, "parallel_commit") == 0 ||
+ strcmp(def->defname, "parallel_abort") == 0 ||
strcmp(def->defname, "keep_connections") == 0)
{
/* these accept only boolean values */
@@ -271,6 +272,7 @@ InitPgFdwOptions(void)
{"async_capable", ForeignServerRelationId, false},
{"async_capable", ForeignTableRelationId, false},
{"parallel_commit", ForeignServerRelationId, false},
+ {"parallel_abort", ForeignServerRelationId, false},
{"keep_connections", ForeignServerRelationId, false},
{"password_required", UserMappingRelationId, false},
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 4f3088c03ea..5bd69339dfc 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3899,10 +3899,12 @@ DROP FOREIGN TABLE remote_application_name;
DROP VIEW my_application_name;
-- ===================================================================
--- test parallel commit
+-- test parallel commit and parallel abort
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true');
ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true');
CREATE TABLE ploc1 (f1 int, f2 text);
CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
@@ -3941,8 +3943,31 @@ COMMIT;
SELECT * FROM prem1;
SELECT * FROM prem2;
+BEGIN;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+SELECT * FROM prem2;
+
+-- This tests executing DEALLOCATE ALL against foreign servers in parallel
+-- during post-abort
+BEGIN;
+SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ROLLBACK TO SAVEPOINT s;
+RELEASE SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+SELECT * FROM prem2;
+
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback OPTIONS (DROP parallel_abort);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback2 OPTIONS (DROP parallel_abort);
-- ===================================================================
-- test for ANALYZE sampling
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index d43ea71407f..9e66987cf7f 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -510,12 +510,13 @@ OPTIONS (ADD password_required 'false');
corresponding remote transactions, and subtransactions are managed by
creating corresponding remote subtransactions. When multiple remote
transactions are involved in the current local transaction, by default
- <filename>postgres_fdw</filename> commits those remote transactions
- serially when the local transaction is committed. When multiple remote
- subtransactions are involved in the current local subtransaction, by
- default <filename>postgres_fdw</filename> commits those remote
- subtransactions serially when the local subtransaction is committed.
- Performance can be improved with the following option:
+ <filename>postgres_fdw</filename> commits or aborts those remote
+ transactions serially when the local transaction is committed or aborted.
+ When multiple remote subtransactions are involved in the current local
+ subtransaction, by default <filename>postgres_fdw</filename> commits or
+ aborts those remote subtransactions serially when the local subtransaction
+ is committed or abortd.
+ Performance can be improved with the following options:
</para>
<variablelist>
@@ -531,24 +532,38 @@ OPTIONS (ADD password_required 'false');
specified for foreign servers, not per-table. The default is
<literal>false</literal>.
</para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><literal>parallel_abort</literal> (<type>boolean</type>)</term>
+ <listitem>
<para>
- If multiple foreign servers with this option enabled are involved in a
- local transaction, multiple remote transactions on those foreign
- servers are committed in parallel across those foreign servers when
- the local transaction is committed.
- </para>
-
- <para>
- When this option is enabled, a foreign server with many remote
- transactions may see a negative performance impact when the local
- transaction is committed.
+ This option controls whether <filename>postgres_fdw</filename> aborts
+ in parallel remote transactions opened on a foreign server in a local
+ transaction when the local transaction is aborted. This setting also
+ applies to remote and local subtransactions. This option can only be
+ specified for foreign servers, not per-table. The default is
+ <literal>false</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
+ <para>
+ If multiple foreign servers with these options enabled are involved in a
+ local transaction, multiple remote transactions on those foreign servers
+ are committed or aborted in parallel across those foreign servers when
+ the local transaction is committed or aborted.
+ </para>
+
+ <para>
+ When these options are enabled, a foreign server with many remote
+ transactions may see a negative performance impact when the local
+ transaction is committed or aborted.
+ </para>
+
</sect3>
<sect3 id="postgres-fdw-options-updatability">