diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2013-03-10 14:14:53 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2013-03-10 14:16:02 -0400 |
commit | 21734d2fb896e0ecdddd3251caa72a3576e2d415 (patch) | |
tree | aed4ee5509e618c0fd9746c8be17c5bf23a08a3f /contrib/postgres_fdw/connection.c | |
parent | 7f49a67f954db3e92fd96963169fb8302959576e (diff) | |
download | postgresql-21734d2fb896e0ecdddd3251caa72a3576e2d415.tar.gz postgresql-21734d2fb896e0ecdddd3251caa72a3576e2d415.zip |
Support writable foreign tables.
This patch adds the core-system infrastructure needed to support updates
on foreign tables, and extends contrib/postgres_fdw to allow updates
against remote Postgres servers. There's still a great deal of room for
improvement in optimization of remote updates, but at least there's basic
functionality there now.
KaiGai Kohei, reviewed by Alexander Korotkov and Laurenz Albe, and rather
heavily revised by Tom Lane.
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 67 |
1 files changed, 65 insertions, 2 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 32a3138ce0f..22ac50e6f9f 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -47,6 +47,8 @@ typedef struct ConnCacheEntry PGconn *conn; /* connection to foreign server, or NULL */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ + bool have_prep_stmt; /* have we prepared any stmts in this xact? */ + bool have_error; /* have any subxacts aborted in this xact? */ } ConnCacheEntry; /* @@ -54,8 +56,9 @@ typedef struct ConnCacheEntry */ static HTAB *ConnectionHash = NULL; -/* for assigning cursor numbers */ +/* for assigning cursor numbers and prepared statement numbers */ static unsigned int cursor_number = 0; +static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; @@ -78,6 +81,10 @@ static void pgfdw_subxact_callback(SubXactEvent event, * if we don't already have a suitable one, and a transaction is opened at * the right subtransaction nesting depth if we didn't do that already. * + * will_prep_stmt must be true if caller intends to create any prepared + * statements. Since those don't go away automatically at transaction end + * (not even on error), we need this flag to cue manual cleanup. + * * XXX Note that caching connections theoretically requires a mechanism to * detect change of FDW objects to invalidate already established connections. * We could manage that by watching for invalidation events on the relevant @@ -86,7 +93,8 @@ static void pgfdw_subxact_callback(SubXactEvent event, * mid-transaction anyway. */ PGconn * -GetConnection(ForeignServer *server, UserMapping *user) +GetConnection(ForeignServer *server, UserMapping *user, + bool will_prep_stmt) { bool found; ConnCacheEntry *entry; @@ -131,6 +139,8 @@ GetConnection(ForeignServer *server, UserMapping *user) /* initialize new hashtable entry (key is already filled in) */ entry->conn = NULL; entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; } /* @@ -147,6 +157,8 @@ GetConnection(ForeignServer *server, UserMapping *user) if (entry->conn == NULL) { entry->xact_depth = 0; /* just to be sure */ + entry->have_prep_stmt = false; + entry->have_error = false; entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", entry->conn, server->servername); @@ -157,6 +169,9 @@ GetConnection(ForeignServer *server, UserMapping *user) */ begin_remote_xact(entry); + /* Remember if caller will prepare statements */ + entry->have_prep_stmt |= will_prep_stmt; + return entry->conn; } @@ -394,12 +409,30 @@ GetCursorNumber(PGconn *conn) } /* + * Assign a "unique" number for a prepared statement. + * + * This works much like GetCursorNumber, except that we never reset the counter + * within a session. That's because we can't be 100% sure we've gotten rid + * of all prepared statements on all connections, and it's not really worth + * increasing the risk of prepared-statement name collisions by resetting. + */ +unsigned int +GetPrepStmtNumber(PGconn *conn) +{ + return ++prep_stmt_number; +} + +/* * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) * res: PGresult containing the error * clear: if true, PQclear the result (otherwise caller will handle it) * sql: NULL, or text of remote command we tried to execute + * + * Note: callers that choose not to throw ERROR for a remote error are + * responsible for making sure that the associated ConnCacheEntry gets + * marked with have_error = true. */ void pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql) @@ -480,6 +513,22 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION"); PQclear(res); + + /* + * If there were any errors in subtransactions, and we made + * prepared statements, do a DEALLOCATE ALL to make sure we + * get rid of all prepared statements. This is annoying and + * not terribly bulletproof, but it's probably not worth + * trying harder. We intentionally ignore any errors in the + * DEALLOCATE. + */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; break; case XACT_EVENT_PRE_PREPARE: @@ -502,6 +551,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) elog(ERROR, "missed cleaning up connection during pre-commit"); break; case XACT_EVENT_ABORT: + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; /* If we're aborting, abort all remote transactions too */ res = PQexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ @@ -509,7 +560,17 @@ pgfdw_xact_callback(XactEvent event, void *arg) pgfdw_report_error(WARNING, res, true, "ABORT TRANSACTION"); else + { PQclear(res); + /* As above, make sure we've cleared any prepared stmts */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; + } break; } @@ -593,6 +654,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } else { + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; /* Rollback all remote subtransactions during abort */ snprintf(sql, sizeof(sql), "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", |