diff options
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 67 |
1 files changed, 65 insertions, 2 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 32a3138ce0f..22ac50e6f9f 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -47,6 +47,8 @@ typedef struct ConnCacheEntry PGconn *conn; /* connection to foreign server, or NULL */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ + bool have_prep_stmt; /* have we prepared any stmts in this xact? */ + bool have_error; /* have any subxacts aborted in this xact? */ } ConnCacheEntry; /* @@ -54,8 +56,9 @@ typedef struct ConnCacheEntry */ static HTAB *ConnectionHash = NULL; -/* for assigning cursor numbers */ +/* for assigning cursor numbers and prepared statement numbers */ static unsigned int cursor_number = 0; +static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; @@ -78,6 +81,10 @@ static void pgfdw_subxact_callback(SubXactEvent event, * if we don't already have a suitable one, and a transaction is opened at * the right subtransaction nesting depth if we didn't do that already. * + * will_prep_stmt must be true if caller intends to create any prepared + * statements. Since those don't go away automatically at transaction end + * (not even on error), we need this flag to cue manual cleanup. + * * XXX Note that caching connections theoretically requires a mechanism to * detect change of FDW objects to invalidate already established connections. * We could manage that by watching for invalidation events on the relevant @@ -86,7 +93,8 @@ static void pgfdw_subxact_callback(SubXactEvent event, * mid-transaction anyway. */ PGconn * -GetConnection(ForeignServer *server, UserMapping *user) +GetConnection(ForeignServer *server, UserMapping *user, + bool will_prep_stmt) { bool found; ConnCacheEntry *entry; @@ -131,6 +139,8 @@ GetConnection(ForeignServer *server, UserMapping *user) /* initialize new hashtable entry (key is already filled in) */ entry->conn = NULL; entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; } /* @@ -147,6 +157,8 @@ GetConnection(ForeignServer *server, UserMapping *user) if (entry->conn == NULL) { entry->xact_depth = 0; /* just to be sure */ + entry->have_prep_stmt = false; + entry->have_error = false; entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", entry->conn, server->servername); @@ -157,6 +169,9 @@ GetConnection(ForeignServer *server, UserMapping *user) */ begin_remote_xact(entry); + /* Remember if caller will prepare statements */ + entry->have_prep_stmt |= will_prep_stmt; + return entry->conn; } @@ -394,12 +409,30 @@ GetCursorNumber(PGconn *conn) } /* + * Assign a "unique" number for a prepared statement. + * + * This works much like GetCursorNumber, except that we never reset the counter + * within a session. That's because we can't be 100% sure we've gotten rid + * of all prepared statements on all connections, and it's not really worth + * increasing the risk of prepared-statement name collisions by resetting. + */ +unsigned int +GetPrepStmtNumber(PGconn *conn) +{ + return ++prep_stmt_number; +} + +/* * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) * res: PGresult containing the error * clear: if true, PQclear the result (otherwise caller will handle it) * sql: NULL, or text of remote command we tried to execute + * + * Note: callers that choose not to throw ERROR for a remote error are + * responsible for making sure that the associated ConnCacheEntry gets + * marked with have_error = true. */ void pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql) @@ -480,6 +513,22 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION"); PQclear(res); + + /* + * If there were any errors in subtransactions, and we made + * prepared statements, do a DEALLOCATE ALL to make sure we + * get rid of all prepared statements. This is annoying and + * not terribly bulletproof, but it's probably not worth + * trying harder. We intentionally ignore any errors in the + * DEALLOCATE. + */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; break; case XACT_EVENT_PRE_PREPARE: @@ -502,6 +551,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) elog(ERROR, "missed cleaning up connection during pre-commit"); break; case XACT_EVENT_ABORT: + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; /* If we're aborting, abort all remote transactions too */ res = PQexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ @@ -509,7 +560,17 @@ pgfdw_xact_callback(XactEvent event, void *arg) pgfdw_report_error(WARNING, res, true, "ABORT TRANSACTION"); else + { PQclear(res); + /* As above, make sure we've cleared any prepared stmts */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; + } break; } @@ -593,6 +654,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } else { + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; /* Rollback all remote subtransactions during abort */ snprintf(sql, sizeof(sql), "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", |