diff options
Diffstat (limited to 'contrib/postgres_fdw')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 99 | ||||
-rw-r--r-- | contrib/postgres_fdw/expected/postgres_fdw.out | 134 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 62 | ||||
-rw-r--r-- | contrib/postgres_fdw/sql/postgres_fdw.sql | 78 |
4 files changed, 338 insertions, 35 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 304f3c20f83..caf14462696 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry /* Remaining fields are invalid when conn is NULL: */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ + bool xact_read_only; /* xact r/o state */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ @@ -84,6 +85,12 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +/* + * tracks the nesting level of the topmost read-only transaction determined + * by GetTopReadOnlyTransactionNestLevel() + */ +static int top_read_only_level = 0; + /* custom wait event values, retrieved from shared memory */ static uint32 pgfdw_we_cleanup_result = 0; static uint32 pgfdw_we_connect = 0; @@ -372,6 +379,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) /* Reset all transient state fields, to be sure all are clean */ entry->xact_depth = 0; + entry->xact_read_only = false; entry->have_prep_stmt = false; entry->have_error = false; entry->changing_xact_state = false; @@ -843,29 +851,81 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) * those scans. A disadvantage is that we can't provide sane emulation of * READ COMMITTED behavior --- it would be nice if we had some other way to * control which remote queries share a snapshot. + * + * Note also that we always start the remote transaction with the same + * read/write and deferrable properties as the local transaction, and start + * the remote subtransaction with the same read/write property as the local + * subtransaction. */ static void begin_remote_xact(ConnCacheEntry *entry) { int curlevel = GetCurrentTransactionNestLevel(); - /* Start main transaction if we haven't yet */ + /* + * Set the nesting level of the topmost read-only transaction if the + * current transaction is read-only and we haven't yet. Once it's set, + * it's retained until that transaction is committed/aborted, and then + * reset (see pgfdw_xact_callback and pgfdw_subxact_callback). + */ + if (XactReadOnly) + { + if (top_read_only_level == 0) + top_read_only_level = GetTopReadOnlyTransactionNestLevel(); + Assert(top_read_only_level > 0); + } + else + Assert(top_read_only_level == 0); + + /* + * Start main transaction if we haven't yet; otherwise, change the + * already-started remote transaction/subtransaction to read-only if the + * local transaction/subtransaction have been done so after starting them + * and we haven't yet. + */ if (entry->xact_depth <= 0) { - const char *sql; + StringInfoData sql; + bool ro = (top_read_only_level == 1); elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + initStringInfo(&sql); + appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL "); if (IsolationIsSerializable()) - sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; + appendStringInfoString(&sql, "SERIALIZABLE"); else - sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; + appendStringInfoString(&sql, "REPEATABLE READ"); + if (ro) + appendStringInfoString(&sql, " READ ONLY"); + if (XactDeferrable) + appendStringInfoString(&sql, " DEFERRABLE"); entry->changing_xact_state = true; - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn, sql.data); entry->xact_depth = 1; + if (ro) + { + Assert(!entry->xact_read_only); + entry->xact_read_only = true; + } entry->changing_xact_state = false; } + else if (!entry->xact_read_only) + { + Assert(top_read_only_level == 0 || + entry->xact_depth <= top_read_only_level); + if (entry->xact_depth == top_read_only_level) + { + entry->changing_xact_state = true; + do_sql_command(entry->conn, "SET transaction_read_only = on"); + entry->xact_read_only = true; + entry->changing_xact_state = false; + } + } + else + Assert(top_read_only_level > 0 && + entry->xact_depth >= top_read_only_level); /* * If we're in a subtransaction, stack up savepoints to match our level. @@ -874,12 +934,21 @@ begin_remote_xact(ConnCacheEntry *entry) */ while (entry->xact_depth < curlevel) { - char sql[64]; + StringInfoData sql; + bool ro = (entry->xact_depth + 1 == top_read_only_level); - snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); + initStringInfo(&sql); + appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1); + if (ro) + appendStringInfoString(&sql, "; SET transaction_read_only = on"); entry->changing_xact_state = true; - do_sql_command(entry->conn, sql); + do_sql_command(entry->conn, sql.data); entry->xact_depth++; + if (ro) + { + Assert(!entry->xact_read_only); + entry->xact_read_only = true; + } entry->changing_xact_state = false; } } @@ -1174,6 +1243,9 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Also reset cursor numbering for next transaction */ cursor_number = 0; + + /* Likewise for top_read_only_level */ + top_read_only_level = 0; } /* @@ -1272,6 +1344,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, false); } } + + /* If in the topmost read-only transaction, reset top_read_only_level */ + if (curlevel == top_read_only_level) + top_read_only_level = 0; } /* @@ -1374,6 +1450,9 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) /* Reset state to show we're out of a transaction */ entry->xact_depth = 0; + /* Reset xact r/o state */ + entry->xact_read_only = false; + /* * If the connection isn't in a good idle state, it is marked as * invalid or keep_connections option of its server is disabled, then @@ -1394,6 +1473,10 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) { /* Reset state to show we're out of a subtransaction */ entry->xact_depth--; + + /* If in the topmost read-only transaction, reset xact r/o state */ + if (entry->xact_depth + 1 == top_read_only_level) + entry->xact_read_only = false; } } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 2185b42bb4f..eb4716bed81 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -12384,6 +12384,140 @@ SELECT count(*) FROM remote_application_name DROP FOREIGN TABLE remote_application_name; DROP VIEW my_application_name; -- =================================================================== +-- test read-only and/or deferrable transactions +-- =================================================================== +CREATE TABLE loct (f1 int, f2 text); +CREATE FUNCTION locf() RETURNS SETOF loct LANGUAGE SQL AS + 'UPDATE public.loct SET f2 = f2 || f2 RETURNING *'; +CREATE VIEW locv AS SELECT t.* FROM locf() t; +CREATE FOREIGN TABLE remt (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'locv'); +CREATE FOREIGN TABLE remt2 (f1 int, f2 text) + SERVER loopback2 OPTIONS (table_name 'locv'); +INSERT INTO loct VALUES (1, 'foo'), (2, 'bar'); +START TRANSACTION READ ONLY; +SAVEPOINT s; +SELECT * FROM remt; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SELECT * FROM remt; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK; +START TRANSACTION; +SAVEPOINT s; +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK; +START TRANSACTION; +SAVEPOINT s; +SELECT * FROM remt; -- should work + f1 | f2 +----+-------- + 1 | foofoo + 2 | barbar +(2 rows) + +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SELECT * FROM remt; -- should work + f1 | f2 +----+-------- + 1 | foofoo + 2 | barbar +(2 rows) + +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK; +START TRANSACTION; +SAVEPOINT s; +SELECT * FROM remt; -- should work + f1 | f2 +----+-------- + 1 | foofoo + 2 | barbar +(2 rows) + +SET transaction_read_only = on; +SELECT * FROM remt2; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SELECT * FROM remt; -- should work + f1 | f2 +----+-------- + 1 | foofoo + 2 | barbar +(2 rows) + +SET transaction_read_only = on; +SELECT * FROM remt2; -- should fail +ERROR: cannot execute UPDATE in a read-only transaction +CONTEXT: SQL function "locf" statement 1 +remote SQL command: SELECT f1, f2 FROM public.locv +ROLLBACK; +DROP FOREIGN TABLE remt; +CREATE FOREIGN TABLE remt (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'loct'); +START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY; +SELECT * FROM remt; + f1 | f2 +----+----- + 1 | foo + 2 | bar +(2 rows) + +COMMIT; +START TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE; +SELECT * FROM remt; + f1 | f2 +----+----- + 1 | foo + 2 | bar +(2 rows) + +COMMIT; +START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE; +SELECT * FROM remt; + f1 | f2 +----+----- + 1 | foo + 2 | bar +(2 rows) + +COMMIT; +-- Clean up +DROP FOREIGN TABLE remt; +DROP FOREIGN TABLE remt2; +DROP VIEW locv; +DROP FUNCTION locf(); +DROP TABLE loct; +-- =================================================================== -- test parallel commit and parallel abort -- =================================================================== ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 331f3fc088d..4283ce9f962 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -240,6 +240,7 @@ typedef struct PgFdwDirectModifyState PGresult *result; /* result for query */ int num_tuples; /* # of result tuples */ int next_tuple; /* index of next one to return */ + MemoryContextCallback result_cb; /* ensures result will get freed */ Relation resultRel; /* relcache entry for the target relation */ AttrNumber *attnoMap; /* array of attnums of input user columns */ AttrNumber ctidAttno; /* attnum of input ctid column */ @@ -2671,6 +2672,17 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) node->fdw_state = dmstate; /* + * We use a memory context callback to ensure that the dmstate's PGresult + * (if any) will be released, even if the query fails somewhere that's + * outside our control. The callback is always armed for the duration of + * the query; this relies on PQclear(NULL) being a no-op. + */ + dmstate->result_cb.func = (MemoryContextCallbackFunction) PQclear; + dmstate->result_cb.arg = NULL; + MemoryContextRegisterResetCallback(CurrentMemoryContext, + &dmstate->result_cb); + + /* * Identify which user to do the remote access as. This should match what * ExecCheckPermissions() does. */ @@ -2817,7 +2829,13 @@ postgresEndDirectModify(ForeignScanState *node) return; /* Release PGresult */ - PQclear(dmstate->result); + if (dmstate->result) + { + PQclear(dmstate->result); + dmstate->result = NULL; + /* ... and don't forget to disable the callback */ + dmstate->result_cb.arg = NULL; + } /* Release remote connection */ ReleaseConnection(dmstate->conn); @@ -4591,13 +4609,17 @@ execute_dml_stmt(ForeignScanState *node) /* * Get the result, and check for success. * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. + * We use a memory context callback to ensure that the PGresult will be + * released, even if the query fails somewhere that's outside our control. + * The callback is already registered, just need to fill in its arg. */ + Assert(dmstate->result == NULL); dmstate->result = pgfdw_get_result(dmstate->conn); + dmstate->result_cb.arg = dmstate->result; + if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, false, dmstate->query); /* Get the number of rows affected. */ @@ -4641,30 +4663,16 @@ get_returning_data(ForeignScanState *node) } else { - /* - * On error, be sure to release the PGresult on the way out. Callers - * do not have PG_TRY blocks to ensure this happens. - */ - PG_TRY(); - { - HeapTuple newtup; - - newtup = make_tuple_from_result_row(dmstate->result, - dmstate->next_tuple, - dmstate->rel, - dmstate->attinmeta, - dmstate->retrieved_attrs, - node, - dmstate->temp_cxt); - ExecStoreHeapTuple(newtup, slot, false); - } - PG_CATCH(); - { - PQclear(dmstate->result); - PG_RE_THROW(); - } - PG_END_TRY(); + HeapTuple newtup; + newtup = make_tuple_from_result_row(dmstate->result, + dmstate->next_tuple, + dmstate->rel, + dmstate->attinmeta, + dmstate->retrieved_attrs, + node, + dmstate->temp_cxt); + ExecStoreHeapTuple(newtup, slot, false); /* Get the updated/deleted tuple. */ if (dmstate->rel) resultSlot = slot; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e534b40de3c..20a535b99d8 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -4201,6 +4201,84 @@ DROP FOREIGN TABLE remote_application_name; DROP VIEW my_application_name; -- =================================================================== +-- test read-only and/or deferrable transactions +-- =================================================================== +CREATE TABLE loct (f1 int, f2 text); +CREATE FUNCTION locf() RETURNS SETOF loct LANGUAGE SQL AS + 'UPDATE public.loct SET f2 = f2 || f2 RETURNING *'; +CREATE VIEW locv AS SELECT t.* FROM locf() t; +CREATE FOREIGN TABLE remt (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'locv'); +CREATE FOREIGN TABLE remt2 (f1 int, f2 text) + SERVER loopback2 OPTIONS (table_name 'locv'); +INSERT INTO loct VALUES (1, 'foo'), (2, 'bar'); + +START TRANSACTION READ ONLY; +SAVEPOINT s; +SELECT * FROM remt; -- should fail +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SELECT * FROM remt; -- should fail +ROLLBACK; + +START TRANSACTION; +SAVEPOINT s; +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ROLLBACK; + +START TRANSACTION; +SAVEPOINT s; +SELECT * FROM remt; -- should work +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SELECT * FROM remt; -- should work +SET transaction_read_only = on; +SELECT * FROM remt; -- should fail +ROLLBACK; + +START TRANSACTION; +SAVEPOINT s; +SELECT * FROM remt; -- should work +SET transaction_read_only = on; +SELECT * FROM remt2; -- should fail +ROLLBACK TO s; +RELEASE SAVEPOINT s; +SELECT * FROM remt; -- should work +SET transaction_read_only = on; +SELECT * FROM remt2; -- should fail +ROLLBACK; + +DROP FOREIGN TABLE remt; +CREATE FOREIGN TABLE remt (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'loct'); + +START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY; +SELECT * FROM remt; +COMMIT; + +START TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE; +SELECT * FROM remt; +COMMIT; + +START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE; +SELECT * FROM remt; +COMMIT; + +-- Clean up +DROP FOREIGN TABLE remt; +DROP FOREIGN TABLE remt2; +DROP VIEW locv; +DROP FUNCTION locf(); +DROP TABLE loct; + +-- =================================================================== -- test parallel commit and parallel abort -- =================================================================== ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); |