aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c67
1 files changed, 65 insertions, 2 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 32a3138ce0f..22ac50e6f9f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -47,6 +47,8 @@ typedef struct ConnCacheEntry
PGconn *conn; /* connection to foreign server, or NULL */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
+ bool have_prep_stmt; /* have we prepared any stmts in this xact? */
+ bool have_error; /* have any subxacts aborted in this xact? */
} ConnCacheEntry;
/*
@@ -54,8 +56,9 @@ typedef struct ConnCacheEntry
*/
static HTAB *ConnectionHash = NULL;
-/* for assigning cursor numbers */
+/* for assigning cursor numbers and prepared statement numbers */
static unsigned int cursor_number = 0;
+static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
@@ -78,6 +81,10 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* if we don't already have a suitable one, and a transaction is opened at
* the right subtransaction nesting depth if we didn't do that already.
*
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements. Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
* XXX Note that caching connections theoretically requires a mechanism to
* detect change of FDW objects to invalidate already established connections.
* We could manage that by watching for invalidation events on the relevant
@@ -86,7 +93,8 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* mid-transaction anyway.
*/
PGconn *
-GetConnection(ForeignServer *server, UserMapping *user)
+GetConnection(ForeignServer *server, UserMapping *user,
+ bool will_prep_stmt)
{
bool found;
ConnCacheEntry *entry;
@@ -131,6 +139,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
/* initialize new hashtable entry (key is already filled in) */
entry->conn = NULL;
entry->xact_depth = 0;
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
/*
@@ -147,6 +157,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
if (entry->conn == NULL)
{
entry->xact_depth = 0; /* just to be sure */
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
entry->conn, server->servername);
@@ -157,6 +169,9 @@ GetConnection(ForeignServer *server, UserMapping *user)
*/
begin_remote_xact(entry);
+ /* Remember if caller will prepare statements */
+ entry->have_prep_stmt |= will_prep_stmt;
+
return entry->conn;
}
@@ -394,12 +409,30 @@ GetCursorNumber(PGconn *conn)
}
/*
+ * Assign a "unique" number for a prepared statement.
+ *
+ * This works much like GetCursorNumber, except that we never reset the counter
+ * within a session. That's because we can't be 100% sure we've gotten rid
+ * of all prepared statements on all connections, and it's not really worth
+ * increasing the risk of prepared-statement name collisions by resetting.
+ */
+unsigned int
+GetPrepStmtNumber(PGconn *conn)
+{
+ return ++prep_stmt_number;
+}
+
+/*
* Report an error we got from the remote server.
*
* elevel: error level to use (typically ERROR, but might be less)
* res: PGresult containing the error
* clear: if true, PQclear the result (otherwise caller will handle it)
* sql: NULL, or text of remote command we tried to execute
+ *
+ * Note: callers that choose not to throw ERROR for a remote error are
+ * responsible for making sure that the associated ConnCacheEntry gets
+ * marked with have_error = true.
*/
void
pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
@@ -480,6 +513,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION");
PQclear(res);
+
+ /*
+ * If there were any errors in subtransactions, and we made
+ * prepared statements, do a DEALLOCATE ALL to make sure we
+ * get rid of all prepared statements. This is annoying and
+ * not terribly bulletproof, but it's probably not worth
+ * trying harder. We intentionally ignore any errors in the
+ * DEALLOCATE.
+ */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
break;
case XACT_EVENT_PRE_PREPARE:
@@ -502,6 +551,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
elog(ERROR, "missed cleaning up connection during pre-commit");
break;
case XACT_EVENT_ABORT:
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
@@ -509,7 +560,17 @@ pgfdw_xact_callback(XactEvent event, void *arg)
pgfdw_report_error(WARNING, res, true,
"ABORT TRANSACTION");
else
+ {
PQclear(res);
+ /* As above, make sure we've cleared any prepared stmts */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
break;
}
@@ -593,6 +654,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
else
{
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",