aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw')
-rw-r--r--contrib/postgres_fdw/connection.c99
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out134
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c62
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql78
4 files changed, 338 insertions, 35 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 304f3c20f83..caf14462696 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
/* Remaining fields are invalid when conn is NULL: */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
+ bool xact_read_only; /* xact r/o state */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
@@ -84,6 +85,12 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
+/*
+ * tracks the nesting level of the topmost read-only transaction determined
+ * by GetTopReadOnlyTransactionNestLevel()
+ */
+static int top_read_only_level = 0;
+
/* custom wait event values, retrieved from shared memory */
static uint32 pgfdw_we_cleanup_result = 0;
static uint32 pgfdw_we_connect = 0;
@@ -372,6 +379,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
/* Reset all transient state fields, to be sure all are clean */
entry->xact_depth = 0;
+ entry->xact_read_only = false;
entry->have_prep_stmt = false;
entry->have_error = false;
entry->changing_xact_state = false;
@@ -843,29 +851,81 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
* those scans. A disadvantage is that we can't provide sane emulation of
* READ COMMITTED behavior --- it would be nice if we had some other way to
* control which remote queries share a snapshot.
+ *
+ * Note also that we always start the remote transaction with the same
+ * read/write and deferrable properties as the local transaction, and start
+ * the remote subtransaction with the same read/write property as the local
+ * subtransaction.
*/
static void
begin_remote_xact(ConnCacheEntry *entry)
{
int curlevel = GetCurrentTransactionNestLevel();
- /* Start main transaction if we haven't yet */
+ /*
+ * Set the nesting level of the topmost read-only transaction if the
+ * current transaction is read-only and we haven't yet. Once it's set,
+ * it's retained until that transaction is committed/aborted, and then
+ * reset (see pgfdw_xact_callback and pgfdw_subxact_callback).
+ */
+ if (XactReadOnly)
+ {
+ if (top_read_only_level == 0)
+ top_read_only_level = GetTopReadOnlyTransactionNestLevel();
+ Assert(top_read_only_level > 0);
+ }
+ else
+ Assert(top_read_only_level == 0);
+
+ /*
+ * Start main transaction if we haven't yet; otherwise, change the
+ * already-started remote transaction/subtransaction to read-only if the
+ * local transaction/subtransaction have been done so after starting them
+ * and we haven't yet.
+ */
if (entry->xact_depth <= 0)
{
- const char *sql;
+ StringInfoData sql;
+ bool ro = (top_read_only_level == 1);
elog(DEBUG3, "starting remote transaction on connection %p",
entry->conn);
+ initStringInfo(&sql);
+ appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL ");
if (IsolationIsSerializable())
- sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
+ appendStringInfoString(&sql, "SERIALIZABLE");
else
- sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
+ appendStringInfoString(&sql, "REPEATABLE READ");
+ if (ro)
+ appendStringInfoString(&sql, " READ ONLY");
+ if (XactDeferrable)
+ appendStringInfoString(&sql, " DEFERRABLE");
entry->changing_xact_state = true;
- do_sql_command(entry->conn, sql);
+ do_sql_command(entry->conn, sql.data);
entry->xact_depth = 1;
+ if (ro)
+ {
+ Assert(!entry->xact_read_only);
+ entry->xact_read_only = true;
+ }
entry->changing_xact_state = false;
}
+ else if (!entry->xact_read_only)
+ {
+ Assert(top_read_only_level == 0 ||
+ entry->xact_depth <= top_read_only_level);
+ if (entry->xact_depth == top_read_only_level)
+ {
+ entry->changing_xact_state = true;
+ do_sql_command(entry->conn, "SET transaction_read_only = on");
+ entry->xact_read_only = true;
+ entry->changing_xact_state = false;
+ }
+ }
+ else
+ Assert(top_read_only_level > 0 &&
+ entry->xact_depth >= top_read_only_level);
/*
* If we're in a subtransaction, stack up savepoints to match our level.
@@ -874,12 +934,21 @@ begin_remote_xact(ConnCacheEntry *entry)
*/
while (entry->xact_depth < curlevel)
{
- char sql[64];
+ StringInfoData sql;
+ bool ro = (entry->xact_depth + 1 == top_read_only_level);
- snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
+ initStringInfo(&sql);
+ appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1);
+ if (ro)
+ appendStringInfoString(&sql, "; SET transaction_read_only = on");
entry->changing_xact_state = true;
- do_sql_command(entry->conn, sql);
+ do_sql_command(entry->conn, sql.data);
entry->xact_depth++;
+ if (ro)
+ {
+ Assert(!entry->xact_read_only);
+ entry->xact_read_only = true;
+ }
entry->changing_xact_state = false;
}
}
@@ -1174,6 +1243,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Also reset cursor numbering for next transaction */
cursor_number = 0;
+
+ /* Likewise for top_read_only_level */
+ top_read_only_level = 0;
}
/*
@@ -1272,6 +1344,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
false);
}
}
+
+ /* If in the topmost read-only transaction, reset top_read_only_level */
+ if (curlevel == top_read_only_level)
+ top_read_only_level = 0;
}
/*
@@ -1374,6 +1450,9 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
/* Reset state to show we're out of a transaction */
entry->xact_depth = 0;
+ /* Reset xact r/o state */
+ entry->xact_read_only = false;
+
/*
* If the connection isn't in a good idle state, it is marked as
* invalid or keep_connections option of its server is disabled, then
@@ -1394,6 +1473,10 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
{
/* Reset state to show we're out of a subtransaction */
entry->xact_depth--;
+
+ /* If in the topmost read-only transaction, reset xact r/o state */
+ if (entry->xact_depth + 1 == top_read_only_level)
+ entry->xact_read_only = false;
}
}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2185b42bb4f..eb4716bed81 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -12384,6 +12384,140 @@ SELECT count(*) FROM remote_application_name
DROP FOREIGN TABLE remote_application_name;
DROP VIEW my_application_name;
-- ===================================================================
+-- test read-only and/or deferrable transactions
+-- ===================================================================
+CREATE TABLE loct (f1 int, f2 text);
+CREATE FUNCTION locf() RETURNS SETOF loct LANGUAGE SQL AS
+ 'UPDATE public.loct SET f2 = f2 || f2 RETURNING *';
+CREATE VIEW locv AS SELECT t.* FROM locf() t;
+CREATE FOREIGN TABLE remt (f1 int, f2 text)
+ SERVER loopback OPTIONS (table_name 'locv');
+CREATE FOREIGN TABLE remt2 (f1 int, f2 text)
+ SERVER loopback2 OPTIONS (table_name 'locv');
+INSERT INTO loct VALUES (1, 'foo'), (2, 'bar');
+START TRANSACTION READ ONLY;
+SAVEPOINT s;
+SELECT * FROM remt; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SELECT * FROM remt; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK;
+START TRANSACTION;
+SAVEPOINT s;
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK;
+START TRANSACTION;
+SAVEPOINT s;
+SELECT * FROM remt; -- should work
+ f1 | f2
+----+--------
+ 1 | foofoo
+ 2 | barbar
+(2 rows)
+
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SELECT * FROM remt; -- should work
+ f1 | f2
+----+--------
+ 1 | foofoo
+ 2 | barbar
+(2 rows)
+
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK;
+START TRANSACTION;
+SAVEPOINT s;
+SELECT * FROM remt; -- should work
+ f1 | f2
+----+--------
+ 1 | foofoo
+ 2 | barbar
+(2 rows)
+
+SET transaction_read_only = on;
+SELECT * FROM remt2; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SELECT * FROM remt; -- should work
+ f1 | f2
+----+--------
+ 1 | foofoo
+ 2 | barbar
+(2 rows)
+
+SET transaction_read_only = on;
+SELECT * FROM remt2; -- should fail
+ERROR: cannot execute UPDATE in a read-only transaction
+CONTEXT: SQL function "locf" statement 1
+remote SQL command: SELECT f1, f2 FROM public.locv
+ROLLBACK;
+DROP FOREIGN TABLE remt;
+CREATE FOREIGN TABLE remt (f1 int, f2 text)
+ SERVER loopback OPTIONS (table_name 'loct');
+START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY;
+SELECT * FROM remt;
+ f1 | f2
+----+-----
+ 1 | foo
+ 2 | bar
+(2 rows)
+
+COMMIT;
+START TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE;
+SELECT * FROM remt;
+ f1 | f2
+----+-----
+ 1 | foo
+ 2 | bar
+(2 rows)
+
+COMMIT;
+START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;
+SELECT * FROM remt;
+ f1 | f2
+----+-----
+ 1 | foo
+ 2 | bar
+(2 rows)
+
+COMMIT;
+-- Clean up
+DROP FOREIGN TABLE remt;
+DROP FOREIGN TABLE remt2;
+DROP VIEW locv;
+DROP FUNCTION locf();
+DROP TABLE loct;
+-- ===================================================================
-- test parallel commit and parallel abort
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 331f3fc088d..4283ce9f962 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -240,6 +240,7 @@ typedef struct PgFdwDirectModifyState
PGresult *result; /* result for query */
int num_tuples; /* # of result tuples */
int next_tuple; /* index of next one to return */
+ MemoryContextCallback result_cb; /* ensures result will get freed */
Relation resultRel; /* relcache entry for the target relation */
AttrNumber *attnoMap; /* array of attnums of input user columns */
AttrNumber ctidAttno; /* attnum of input ctid column */
@@ -2671,6 +2672,17 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
node->fdw_state = dmstate;
/*
+ * We use a memory context callback to ensure that the dmstate's PGresult
+ * (if any) will be released, even if the query fails somewhere that's
+ * outside our control. The callback is always armed for the duration of
+ * the query; this relies on PQclear(NULL) being a no-op.
+ */
+ dmstate->result_cb.func = (MemoryContextCallbackFunction) PQclear;
+ dmstate->result_cb.arg = NULL;
+ MemoryContextRegisterResetCallback(CurrentMemoryContext,
+ &dmstate->result_cb);
+
+ /*
* Identify which user to do the remote access as. This should match what
* ExecCheckPermissions() does.
*/
@@ -2817,7 +2829,13 @@ postgresEndDirectModify(ForeignScanState *node)
return;
/* Release PGresult */
- PQclear(dmstate->result);
+ if (dmstate->result)
+ {
+ PQclear(dmstate->result);
+ dmstate->result = NULL;
+ /* ... and don't forget to disable the callback */
+ dmstate->result_cb.arg = NULL;
+ }
/* Release remote connection */
ReleaseConnection(dmstate->conn);
@@ -4591,13 +4609,17 @@ execute_dml_stmt(ForeignScanState *node)
/*
* Get the result, and check for success.
*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
+ * We use a memory context callback to ensure that the PGresult will be
+ * released, even if the query fails somewhere that's outside our control.
+ * The callback is already registered, just need to fill in its arg.
*/
+ Assert(dmstate->result == NULL);
dmstate->result = pgfdw_get_result(dmstate->conn);
+ dmstate->result_cb.arg = dmstate->result;
+
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, false,
dmstate->query);
/* Get the number of rows affected. */
@@ -4641,30 +4663,16 @@ get_returning_data(ForeignScanState *node)
}
else
{
- /*
- * On error, be sure to release the PGresult on the way out. Callers
- * do not have PG_TRY blocks to ensure this happens.
- */
- PG_TRY();
- {
- HeapTuple newtup;
-
- newtup = make_tuple_from_result_row(dmstate->result,
- dmstate->next_tuple,
- dmstate->rel,
- dmstate->attinmeta,
- dmstate->retrieved_attrs,
- node,
- dmstate->temp_cxt);
- ExecStoreHeapTuple(newtup, slot, false);
- }
- PG_CATCH();
- {
- PQclear(dmstate->result);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ HeapTuple newtup;
+ newtup = make_tuple_from_result_row(dmstate->result,
+ dmstate->next_tuple,
+ dmstate->rel,
+ dmstate->attinmeta,
+ dmstate->retrieved_attrs,
+ node,
+ dmstate->temp_cxt);
+ ExecStoreHeapTuple(newtup, slot, false);
/* Get the updated/deleted tuple. */
if (dmstate->rel)
resultSlot = slot;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e534b40de3c..20a535b99d8 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -4201,6 +4201,84 @@ DROP FOREIGN TABLE remote_application_name;
DROP VIEW my_application_name;
-- ===================================================================
+-- test read-only and/or deferrable transactions
+-- ===================================================================
+CREATE TABLE loct (f1 int, f2 text);
+CREATE FUNCTION locf() RETURNS SETOF loct LANGUAGE SQL AS
+ 'UPDATE public.loct SET f2 = f2 || f2 RETURNING *';
+CREATE VIEW locv AS SELECT t.* FROM locf() t;
+CREATE FOREIGN TABLE remt (f1 int, f2 text)
+ SERVER loopback OPTIONS (table_name 'locv');
+CREATE FOREIGN TABLE remt2 (f1 int, f2 text)
+ SERVER loopback2 OPTIONS (table_name 'locv');
+INSERT INTO loct VALUES (1, 'foo'), (2, 'bar');
+
+START TRANSACTION READ ONLY;
+SAVEPOINT s;
+SELECT * FROM remt; -- should fail
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SELECT * FROM remt; -- should fail
+ROLLBACK;
+
+START TRANSACTION;
+SAVEPOINT s;
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ROLLBACK;
+
+START TRANSACTION;
+SAVEPOINT s;
+SELECT * FROM remt; -- should work
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SELECT * FROM remt; -- should work
+SET transaction_read_only = on;
+SELECT * FROM remt; -- should fail
+ROLLBACK;
+
+START TRANSACTION;
+SAVEPOINT s;
+SELECT * FROM remt; -- should work
+SET transaction_read_only = on;
+SELECT * FROM remt2; -- should fail
+ROLLBACK TO s;
+RELEASE SAVEPOINT s;
+SELECT * FROM remt; -- should work
+SET transaction_read_only = on;
+SELECT * FROM remt2; -- should fail
+ROLLBACK;
+
+DROP FOREIGN TABLE remt;
+CREATE FOREIGN TABLE remt (f1 int, f2 text)
+ SERVER loopback OPTIONS (table_name 'loct');
+
+START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY;
+SELECT * FROM remt;
+COMMIT;
+
+START TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE;
+SELECT * FROM remt;
+COMMIT;
+
+START TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;
+SELECT * FROM remt;
+COMMIT;
+
+-- Clean up
+DROP FOREIGN TABLE remt;
+DROP FOREIGN TABLE remt2;
+DROP VIEW locv;
+DROP FUNCTION locf();
+DROP TABLE loct;
+
+-- ===================================================================
-- test parallel commit and parallel abort
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');