diff options
Diffstat (limited to 'src')
35 files changed, 1768 insertions, 52 deletions
diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index ea08c3237c1..df87dfeb543 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -65,6 +65,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -2136,9 +2137,11 @@ IsThereFunctionInNamespace(const char *proname, int pronargs, /* * ExecuteDoStmt * Execute inline procedural-language code + * + * See at ExecuteCallStmt() about the atomic argument. */ void -ExecuteDoStmt(DoStmt *stmt) +ExecuteDoStmt(DoStmt *stmt, bool atomic) { InlineCodeBlock *codeblock = makeNode(InlineCodeBlock); ListCell *arg; @@ -2200,6 +2203,7 @@ ExecuteDoStmt(DoStmt *stmt) codeblock->langOid = HeapTupleGetOid(languageTuple); languageStruct = (Form_pg_language) GETSTRUCT(languageTuple); codeblock->langIsTrusted = languageStruct->lanpltrusted; + codeblock->atomic = atomic; if (languageStruct->lanpltrusted) { @@ -2236,9 +2240,28 @@ ExecuteDoStmt(DoStmt *stmt) /* * Execute CALL statement + * + * Inside a top-level CALL statement, transaction-terminating commands such as + * COMMIT or a PL-specific equivalent are allowed. The terminology in the SQL + * standard is that CALL establishes a non-atomic execution context. Most + * other commands establish an atomic execution context, in which transaction + * control actions are not allowed. If there are nested executions of CALL, + * we want to track the execution context recursively, so that the nested + * CALLs can also do transaction control. Note, however, that for example in + * CALL -> SELECT -> CALL, the second call cannot do transaction control, + * because the SELECT in between establishes an atomic execution context. + * + * So when ExecuteCallStmt() is called from the top level, we pass in atomic = + * false (recall that that means transactions = yes). We then create a + * CallContext node with content atomic = false, which is passed in the + * fcinfo->context field to the procedure invocation. The language + * implementation should then take appropriate measures to allow or prevent + * transaction commands based on that information, e.g., call + * SPI_connect_ext(SPI_OPT_NONATOMIC). The language should also pass on the + * atomic flag to any nested invocations to CALL. */ void -ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) +ExecuteCallStmt(ParseState *pstate, CallStmt *stmt, bool atomic) { List *targs; ListCell *lc; @@ -2249,6 +2272,8 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) AclResult aclresult; FmgrInfo flinfo; FunctionCallInfoData fcinfo; + CallContext *callcontext; + HeapTuple tp; targs = NIL; foreach(lc, stmt->funccall->args) @@ -2284,8 +2309,24 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) FUNC_MAX_ARGS, FUNC_MAX_ARGS))); + callcontext = makeNode(CallContext); + callcontext->atomic = atomic; + + /* + * If proconfig is set we can't allow transaction commands because of the + * way the GUC stacking works: The transaction boundary would have to pop + * the proconfig setting off the stack. That restriction could be lifted + * by redesigning the GUC nesting mechanism a bit. + */ + tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(fexpr->funcid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for function %u", fexpr->funcid); + if (!heap_attisnull(tp, Anum_pg_proc_proconfig)) + callcontext->atomic = true; + ReleaseSysCache(tp); + fmgr_info(fexpr->funcid, &flinfo); - InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, NULL, NULL); + InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, (Node *) callcontext, NULL); i = 0; foreach (lc, fexpr->args) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 995f67d2662..9fc4431b80c 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -83,6 +83,12 @@ static bool _SPI_checktuples(void); int SPI_connect(void) { + return SPI_connect_ext(0); +} + +int +SPI_connect_ext(int options) +{ int newdepth; /* Enlarge stack if necessary */ @@ -92,7 +98,7 @@ SPI_connect(void) elog(ERROR, "SPI stack corrupted"); newdepth = 16; _SPI_stack = (_SPI_connection *) - MemoryContextAlloc(TopTransactionContext, + MemoryContextAlloc(TopMemoryContext, newdepth * sizeof(_SPI_connection)); _SPI_stack_depth = newdepth; } @@ -124,19 +130,25 @@ SPI_connect(void) _SPI_current->execCxt = NULL; _SPI_current->connectSubid = GetCurrentSubTransactionId(); _SPI_current->queryEnv = NULL; + _SPI_current->atomic = (options & SPI_OPT_NONATOMIC ? false : true); + _SPI_current->internal_xact = false; /* * Create memory contexts for this procedure * - * XXX it would be better to use PortalContext as the parent context, but - * we may not be inside a portal (consider deferred-trigger execution). - * Perhaps CurTransactionContext would do? For now it doesn't matter - * because we clean up explicitly in AtEOSubXact_SPI(). + * In atomic contexts (the normal case), we use TopTransactionContext, + * otherwise PortalContext, so that it lives across transaction + * boundaries. + * + * XXX It could be better to use PortalContext as the parent context in + * all cases, but we may not be inside a portal (consider deferred-trigger + * execution). Perhaps CurTransactionContext could be an option? For now + * it doesn't matter because we clean up explicitly in AtEOSubXact_SPI(). */ - _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->procCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : PortalContext, "SPI Proc", ALLOCSET_DEFAULT_SIZES); - _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->execCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : _SPI_current->procCxt, "SPI Exec", ALLOCSET_DEFAULT_SIZES); /* ... and switch to procedure's context */ @@ -181,6 +193,73 @@ SPI_finish(void) return SPI_OK_FINISH; } +void +SPI_start_transaction(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + StartTransactionCommand(); + MemoryContextSwitchTo(oldcontext); +} + +void +SPI_commit(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + if (_SPI_current->atomic) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("invalid transaction termination"))); + + /* + * This restriction is required by PLs implemented on top of SPI. They + * use subtransactions to establish exception blocks that are supposed to + * be rolled back together if there is an error. Terminating the + * top-level transaction in such a block violates that idea. A future PL + * implementation might have different ideas about this, in which case + * this restriction would have to be refined or the check possibly be + * moved out of SPI into the PLs. + */ + if (IsSubTransaction()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot commit while a subtransaction is active"))); + + _SPI_current->internal_xact = true; + + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + CommitTransactionCommand(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; +} + +void +SPI_rollback(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + if (_SPI_current->atomic) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("invalid transaction termination"))); + + /* see under SPI_commit() */ + if (IsSubTransaction()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot roll back while a subtransaction is active"))); + + _SPI_current->internal_xact = true; + + AbortCurrentTransaction(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; +} + /* * Clean up SPI state at transaction commit or abort. */ @@ -188,6 +267,12 @@ void AtEOXact_SPI(bool isCommit) { /* + * Do nothing if the transaction end was initiated by SPI. + */ + if (_SPI_current && _SPI_current->internal_xact) + return; + + /* * Note that memory contexts belonging to SPI stack entries will be freed * automatically, so we can ignore them here. We just need to restore our * static variables to initial state. @@ -224,6 +309,9 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) if (connection->connectSubid != mySubid) break; /* couldn't be any underneath it either */ + if (connection->internal_xact) + break; + found = true; /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 26df660f350..3abe7d6155a 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -530,7 +530,8 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; case T_DoStmt: - ExecuteDoStmt((DoStmt *) parsetree); + ExecuteDoStmt((DoStmt *) parsetree, + (context != PROCESS_UTILITY_TOPLEVEL || IsTransactionBlock())); break; case T_CreateTableSpaceStmt: @@ -659,7 +660,8 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; case T_CallStmt: - ExecuteCallStmt(pstate, castNode(CallStmt, parsetree)); + ExecuteCallStmt(pstate, castNode(CallStmt, parsetree), + (context != PROCESS_UTILITY_TOPLEVEL || IsTransactionBlock())); break; case T_ClusterStmt: diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 84c68ac1895..f3f0add1d6d 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -742,11 +742,8 @@ PreCommit_Portals(bool isPrepare) /* * Abort processing for portals. * - * At this point we reset "active" status and run the cleanup hook if - * present, but we can't release the portal's memory until the cleanup call. - * - * The reason we need to reset active is so that we can replace the unnamed - * portal, else we'll fail to execute ROLLBACK when it arrives. + * At this point we run the cleanup hook if present, but we can't release the + * portal's memory until the cleanup call. */ void AtAbort_Portals(void) @@ -761,17 +758,6 @@ AtAbort_Portals(void) Portal portal = hentry->portal; /* - * See similar code in AtSubAbort_Portals(). This would fire if code - * orchestrating multiple top-level transactions within a portal, such - * as VACUUM, caught errors and continued under the same portal with a - * fresh transaction. No part of core PostgreSQL functions that way. - * XXX Such code would wish the portal to remain ACTIVE, as in - * PreCommit_Portals(). - */ - if (portal->status == PORTAL_ACTIVE) - MarkPortalFailed(portal); - - /* * Do nothing else to cursors held over from a previous transaction. */ if (portal->createSubid == InvalidSubTransactionId) @@ -810,9 +796,10 @@ AtAbort_Portals(void) * Although we can't delete the portal data structure proper, we can * release any memory in subsidiary contexts, such as executor state. * The cleanup hook was the last thing that might have needed data - * there. + * there. But leave active portals alone. */ - MemoryContextDeleteChildren(portal->portalContext); + if (portal->status != PORTAL_ACTIVE) + MemoryContextDeleteChildren(portal->portalContext); } } @@ -832,6 +819,13 @@ AtCleanup_Portals(void) { Portal portal = hentry->portal; + /* + * Do not touch active portals --- this can only happen in the case of + * a multi-transaction command. + */ + if (portal->status == PORTAL_ACTIVE) + continue; + /* Do nothing to cursors held over from a previous transaction */ if (portal->createSubid == InvalidSubTransactionId) { @@ -1161,3 +1155,22 @@ ThereAreNoReadyPortals(void) return true; } + +bool +ThereArePinnedPortals(void) +{ + HASH_SEQ_STATUS status; + PortalHashEnt *hentry; + + hash_seq_init(&status, PortalHashTable); + + while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) + { + Portal portal = hentry->portal; + + if (portal->portalPinned) + return true; + } + + return false; +} diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 41007162aa5..7b824c95af3 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -59,8 +59,8 @@ extern ObjectAddress CreateTransform(CreateTransformStmt *stmt); extern void DropTransformById(Oid transformOid); extern void IsThereFunctionInNamespace(const char *proname, int pronargs, oidvector *proargtypes, Oid nspOid); -extern void ExecuteDoStmt(DoStmt *stmt); -extern void ExecuteCallStmt(ParseState *pstate, CallStmt *stmt); +extern void ExecuteDoStmt(DoStmt *stmt, bool atomic); +extern void ExecuteCallStmt(ParseState *pstate, CallStmt *stmt, bool atomic); extern Oid get_cast_oid(Oid sourcetypeid, Oid targettypeid, bool missing_ok); extern Oid get_transform_oid(Oid type_id, Oid lang_id, bool missing_ok); extern void interpret_function_parameter_list(ParseState *pstate, diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index 43580c51585..e5bdaecc4e3 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -65,6 +65,8 @@ typedef struct _SPI_plan *SPIPlanPtr; #define SPI_OK_REL_UNREGISTER 16 #define SPI_OK_TD_REGISTER 17 +#define SPI_OPT_NONATOMIC (1 << 0) + /* These used to be functions, now just no-ops for backwards compatibility */ #define SPI_push() ((void) 0) #define SPI_pop() ((void) 0) @@ -78,6 +80,7 @@ extern PGDLLIMPORT SPITupleTable *SPI_tuptable; extern PGDLLIMPORT int SPI_result; extern int SPI_connect(void); +extern int SPI_connect_ext(int options); extern int SPI_finish(void); extern int SPI_execute(const char *src, bool read_only, long tcount); extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, @@ -156,6 +159,10 @@ extern int SPI_register_relation(EphemeralNamedRelation enr); extern int SPI_unregister_relation(const char *name); extern int SPI_register_trigger_data(TriggerData *tdata); +extern void SPI_start_transaction(void); +extern void SPI_commit(void); +extern void SPI_rollback(void); + extern void AtEOXact_SPI(bool isCommit); extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid); diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index 64f8a450eb5..263c8f14539 100644 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -36,6 +36,10 @@ typedef struct MemoryContext savedcxt; /* context of SPI_connect's caller */ SubTransactionId connectSubid; /* ID of connecting subtransaction */ QueryEnvironment *queryEnv; /* query environment setup for SPI level */ + + /* transaction management support */ + bool atomic; /* atomic execution context, does not allow transactions */ + bool internal_xact; /* SPI-managed transaction boundary, skip cleanup */ } _SPI_connection; /* diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 2eb3d6d3711..74b094a9c39 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -500,7 +500,8 @@ typedef enum NodeTag T_FdwRoutine, /* in foreign/fdwapi.h */ T_IndexAmRoutine, /* in access/amapi.h */ T_TsmRoutine, /* in access/tsmapi.h */ - T_ForeignKeyCacheInfo /* in utils/rel.h */ + T_ForeignKeyCacheInfo, /* in utils/rel.h */ + T_CallContext /* in nodes/parsenodes.h */ } NodeTag; /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 93122adae85..bbacbe144c8 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2789,6 +2789,7 @@ typedef struct InlineCodeBlock char *source_text; /* source text of anonymous code block */ Oid langOid; /* OID of selected language */ bool langIsTrusted; /* trusted property of the language */ + bool atomic; /* atomic execution context */ } InlineCodeBlock; /* ---------------------- @@ -2801,6 +2802,12 @@ typedef struct CallStmt FuncCall *funccall; } CallStmt; +typedef struct CallContext +{ + NodeTag type; + bool atomic; +} CallContext; + /* ---------------------- * Alter Object Rename Statement * ---------------------- diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index bc9d52e506e..b903cb0fbe1 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -231,5 +231,6 @@ extern PlannedStmt *PortalGetPrimaryStmt(Portal portal); extern void PortalCreateHoldStore(Portal portal); extern void PortalHashTableDeleteAll(void); extern bool ThereAreNoReadyPortals(void); +extern bool ThereArePinnedPortals(void); #endif /* PORTAL_H */ diff --git a/src/pl/plperl/GNUmakefile b/src/pl/plperl/GNUmakefile index b829027d05d..933abb47c49 100644 --- a/src/pl/plperl/GNUmakefile +++ b/src/pl/plperl/GNUmakefile @@ -55,7 +55,7 @@ endif # win32 SHLIB_LINK = $(perl_embed_ldflags) REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=plperl --load-extension=plperlu -REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array plperl_call +REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array plperl_call plperl_transaction # if Perl can support two interpreters in one backend, # test plperl-and-plperlu cases ifneq ($(PERL),) diff --git a/src/pl/plperl/SPI.xs b/src/pl/plperl/SPI.xs index d9e6f579d41..b98c547e8be 100644 --- a/src/pl/plperl/SPI.xs +++ b/src/pl/plperl/SPI.xs @@ -152,6 +152,15 @@ spi_spi_cursor_close(sv) plperl_spi_cursor_close(cursor); pfree(cursor); +void +spi_spi_commit() + CODE: + plperl_spi_commit(); + +void +spi_spi_rollback() + CODE: + plperl_spi_rollback(); BOOT: items = 0; /* avoid 'unused variable' warning */ diff --git a/src/pl/plperl/expected/plperl_transaction.out b/src/pl/plperl/expected/plperl_transaction.out new file mode 100644 index 00000000000..bd7b7f8660b --- /dev/null +++ b/src/pl/plperl/expected/plperl_transaction.out @@ -0,0 +1,133 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plperl +$$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +return 1; +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination at line 5. +CONTEXT: PL/Perl function "transaction_test2" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query("CALL transaction_test1()"); +return 1; +$$; +SELECT transaction_test3(); +ERROR: invalid transaction termination at line 5. at line 2. +CONTEXT: PL/Perl function "transaction_test3" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query('DO LANGUAGE plperl $x$ spi_commit(); $x$'); +return 1; +$$; +SELECT transaction_test4(); +ERROR: invalid transaction termination at line 1. at line 2. +CONTEXT: PL/Perl function "transaction_test4" +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_commit(); +} +$$; +ERROR: cannot commit transaction while a cursor is open at line 6. +CONTEXT: PL/Perl anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_rollback(); +} +$$; +ERROR: cannot abort transaction while a cursor is open at line 6. +CONTEXT: PL/Perl anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c index 10feef11cf3..77c41b28215 100644 --- a/src/pl/plperl/plperl.c +++ b/src/pl/plperl/plperl.c @@ -1929,7 +1929,7 @@ plperl_inline_handler(PG_FUNCTION_ARGS) current_call_data = &this_call_data; - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI manager"); select_perl_context(desc.lanpltrusted); @@ -2396,13 +2396,18 @@ plperl_call_perl_event_trigger_func(plperl_proc_desc *desc, static Datum plperl_func_handler(PG_FUNCTION_ARGS) { + bool nonatomic; plperl_proc_desc *prodesc; SV *perlret; Datum retval = 0; ReturnSetInfo *rsi; ErrorContextCallback pl_error_context; - if (SPI_connect() != SPI_OK_CONNECT) + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI manager"); prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false, false); @@ -3953,6 +3958,66 @@ plperl_spi_freeplan(char *query) SPI_freeplan(plan); } +void +plperl_spi_commit(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot commit transaction while a cursor is open"))); + + SPI_commit(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Punt the error to Perl */ + croak_cstr(edata->message); + } + PG_END_TRY(); +} + +void +plperl_spi_rollback(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction while a cursor is open"))); + + SPI_rollback(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Punt the error to Perl */ + croak_cstr(edata->message); + } + PG_END_TRY(); +} + /* * Implementation of plperl's elog() function * diff --git a/src/pl/plperl/plperl.h b/src/pl/plperl/plperl.h index 78366aac04f..6fe7803088e 100644 --- a/src/pl/plperl/plperl.h +++ b/src/pl/plperl/plperl.h @@ -125,6 +125,8 @@ HV *plperl_spi_exec_prepared(char *, HV *, int, SV **); SV *plperl_spi_query_prepared(char *, int, SV **); void plperl_spi_freeplan(char *); void plperl_spi_cursor_close(char *); +void plperl_spi_commit(void); +void plperl_spi_rollback(void); char *plperl_sv_to_literal(SV *, char *); void plperl_util_elog(int level, SV *msg); diff --git a/src/pl/plperl/sql/plperl_transaction.sql b/src/pl/plperl/sql/plperl_transaction.sql new file mode 100644 index 00000000000..5c14d4732eb --- /dev/null +++ b/src/pl/plperl/sql/plperl_transaction.sql @@ -0,0 +1,120 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plperl +$$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +return 1; +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query("CALL transaction_test1()"); +return 1; +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query('DO LANGUAGE plperl $x$ spi_commit(); $x$'); +return 1; +$$; + +SELECT transaction_test4(); + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_commit(); +} +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_rollback(); +} +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plpgsql/src/Makefile b/src/pl/plpgsql/src/Makefile index 14a4d835840..91e1ada7add 100644 --- a/src/pl/plpgsql/src/Makefile +++ b/src/pl/plpgsql/src/Makefile @@ -26,7 +26,7 @@ DATA = plpgsql.control plpgsql--1.0.sql plpgsql--unpackaged--1.0.sql REGRESS_OPTS = --dbname=$(PL_TESTDB) -REGRESS = plpgsql_call plpgsql_control +REGRESS = plpgsql_call plpgsql_control plpgsql_transaction all: all-lib diff --git a/src/pl/plpgsql/src/expected/plpgsql_transaction.out b/src/pl/plpgsql/src/expected/plpgsql_transaction.out new file mode 100644 index 00000000000..8ec22c646c2 --- /dev/null +++ b/src/pl/plpgsql/src/expected/plpgsql_transaction.out @@ -0,0 +1,241 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpgsql +$$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +-- transaction commands not allowed when called in transaction block +START TRANSACTION; +CALL transaction_test1(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT +COMMIT; +START TRANSACTION; +DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function inline_code_block line 1 at COMMIT +COMMIT; +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + RETURN 1; +END +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test2() line 6 at COMMIT +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + CALL transaction_test1(); + RETURN 1; +END; +$$; +SELECT transaction_test3(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT +SQL statement "CALL transaction_test1()" +PL/pgSQL function transaction_test3() line 3 at SQL statement +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE 'DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$'; + RETURN 1; +END; +$$; +SELECT transaction_test4(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function inline_code_block line 1 at COMMIT +SQL statement "DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$" +PL/pgSQL function transaction_test4() line 3 at EXECUTE +-- proconfig settings currently disallow transaction statements +CREATE PROCEDURE transaction_test5() +LANGUAGE plpgsql +SET work_mem = 555 +AS $$ +BEGIN + COMMIT; +END; +$$; +CALL transaction_test5(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test5() line 3 at COMMIT +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + COMMIT; + END LOOP; +END; +$$; +ERROR: committing inside a cursor loop is not supported +CONTEXT: PL/pgSQL function inline_code_block line 7 at COMMIT +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; +ERROR: cannot abort transaction inside a cursor loop +CONTEXT: PL/pgSQL function inline_code_block line 7 at ROLLBACK +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- commit inside block with exception handler +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + COMMIT; + INSERT INTO test1 (a) VALUES (1/0); + COMMIT; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; +ERROR: cannot commit while a subtransaction is active +CONTEXT: PL/pgSQL function inline_code_block line 5 at COMMIT +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside block with exception handler +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + ROLLBACK; + INSERT INTO test1 (a) VALUES (1/0); + ROLLBACK; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; +ERROR: cannot roll back while a subtransaction is active +CONTEXT: PL/pgSQL function inline_code_block line 5 at ROLLBACK +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- COMMIT failures +DO LANGUAGE plpgsql $$ +BEGIN + CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + INSERT INTO test3 (y) VALUES (2); + COMMIT; + INSERT INTO test3 (y) VALUES (3); -- won't get here +END; +$$; +ERROR: duplicate key value violates unique constraint "test3_y_key" +DETAIL: Key (y)=(1) already exists. +CONTEXT: PL/pgSQL function inline_code_block line 9 at COMMIT +SELECT * FROM test3; + y +--- + 1 +(1 row) + +DROP TABLE test1; +DROP TABLE test2; +DROP TABLE test3; diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index d096f242cdc..4478c5332e3 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -290,6 +290,10 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate *estate, PLpgSQL_stmt_dynexecute *stmt); static int exec_stmt_dynfors(PLpgSQL_execstate *estate, PLpgSQL_stmt_dynfors *stmt); +static int exec_stmt_commit(PLpgSQL_execstate *estate, + PLpgSQL_stmt_commit *stmt); +static int exec_stmt_rollback(PLpgSQL_execstate *estate, + PLpgSQL_stmt_rollback *stmt); static void plpgsql_estate_setup(PLpgSQL_execstate *estate, PLpgSQL_function *func, @@ -1731,6 +1735,14 @@ exec_stmt(PLpgSQL_execstate *estate, PLpgSQL_stmt *stmt) rc = exec_stmt_close(estate, (PLpgSQL_stmt_close *) stmt); break; + case PLPGSQL_STMT_COMMIT: + rc = exec_stmt_commit(estate, (PLpgSQL_stmt_commit *) stmt); + break; + + case PLPGSQL_STMT_ROLLBACK: + rc = exec_stmt_rollback(estate, (PLpgSQL_stmt_rollback *) stmt); + break; + default: estate->err_stmt = save_estmt; elog(ERROR, "unrecognized cmdtype: %d", stmt->cmd_type); @@ -4264,6 +4276,57 @@ exec_stmt_close(PLpgSQL_execstate *estate, PLpgSQL_stmt_close *stmt) return PLPGSQL_RC_OK; } +/* + * exec_stmt_commit + * + * Commit the transaction. + */ +static int +exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt) +{ + /* + * XXX This could be implemented by converting the pinned portals to + * holdable ones and organizing the cleanup separately. + */ + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("committing inside a cursor loop is not supported"))); + + SPI_commit(); + SPI_start_transaction(); + + estate->simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + + return PLPGSQL_RC_OK; +} + +/* + * exec_stmt_rollback + * + * Abort the transaction. + */ +static int +exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt) +{ + /* + * Unlike the COMMIT case above, this might not make sense at all, + * especially if the query driving the cursor loop has side effects. + */ + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction inside a cursor loop"))); + + SPI_rollback(); + SPI_start_transaction(); + + estate->simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + + return PLPGSQL_RC_OK; +} /* ---------- * exec_assign_expr Put an expression's result into a variable. @@ -6767,8 +6830,7 @@ plpgsql_xact_cb(XactEvent event, void *arg) */ if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PREPARE) { - /* Shouldn't be any econtext stack entries left at commit */ - Assert(simple_econtext_stack == NULL); + simple_econtext_stack = NULL; if (shared_simple_eval_estate) FreeExecutorState(shared_simple_eval_estate); diff --git a/src/pl/plpgsql/src/pl_funcs.c b/src/pl/plpgsql/src/pl_funcs.c index 80b8448b7fc..f0e85fcfcd9 100644 --- a/src/pl/plpgsql/src/pl_funcs.c +++ b/src/pl/plpgsql/src/pl_funcs.c @@ -284,6 +284,10 @@ plpgsql_stmt_typename(PLpgSQL_stmt *stmt) return "CLOSE"; case PLPGSQL_STMT_PERFORM: return "PERFORM"; + case PLPGSQL_STMT_COMMIT: + return "COMMIT"; + case PLPGSQL_STMT_ROLLBACK: + return "ROLLBACK"; } return "unknown"; @@ -363,6 +367,8 @@ static void free_open(PLpgSQL_stmt_open *stmt); static void free_fetch(PLpgSQL_stmt_fetch *stmt); static void free_close(PLpgSQL_stmt_close *stmt); static void free_perform(PLpgSQL_stmt_perform *stmt); +static void free_commit(PLpgSQL_stmt_commit *stmt); +static void free_rollback(PLpgSQL_stmt_rollback *stmt); static void free_expr(PLpgSQL_expr *expr); @@ -443,6 +449,12 @@ free_stmt(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_PERFORM: free_perform((PLpgSQL_stmt_perform *) stmt); break; + case PLPGSQL_STMT_COMMIT: + free_commit((PLpgSQL_stmt_commit *) stmt); + break; + case PLPGSQL_STMT_ROLLBACK: + free_rollback((PLpgSQL_stmt_rollback *) stmt); + break; default: elog(ERROR, "unrecognized cmd_type: %d", stmt->cmd_type); break; @@ -591,6 +603,16 @@ free_perform(PLpgSQL_stmt_perform *stmt) } static void +free_commit(PLpgSQL_stmt_commit *stmt) +{ +} + +static void +free_rollback(PLpgSQL_stmt_rollback *stmt) +{ +} + +static void free_exit(PLpgSQL_stmt_exit *stmt) { free_expr(stmt->cond); @@ -777,6 +799,8 @@ static void dump_fetch(PLpgSQL_stmt_fetch *stmt); static void dump_cursor_direction(PLpgSQL_stmt_fetch *stmt); static void dump_close(PLpgSQL_stmt_close *stmt); static void dump_perform(PLpgSQL_stmt_perform *stmt); +static void dump_commit(PLpgSQL_stmt_commit *stmt); +static void dump_rollback(PLpgSQL_stmt_rollback *stmt); static void dump_expr(PLpgSQL_expr *expr); @@ -867,6 +891,12 @@ dump_stmt(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_PERFORM: dump_perform((PLpgSQL_stmt_perform *) stmt); break; + case PLPGSQL_STMT_COMMIT: + dump_commit((PLpgSQL_stmt_commit *) stmt); + break; + case PLPGSQL_STMT_ROLLBACK: + dump_rollback((PLpgSQL_stmt_rollback *) stmt); + break; default: elog(ERROR, "unrecognized cmd_type: %d", stmt->cmd_type); break; @@ -1240,6 +1270,20 @@ dump_perform(PLpgSQL_stmt_perform *stmt) } static void +dump_commit(PLpgSQL_stmt_commit *stmt) +{ + dump_ind(); + printf("COMMIT\n"); +} + +static void +dump_rollback(PLpgSQL_stmt_rollback *stmt) +{ + dump_ind(); + printf("ROLLBACK\n"); +} + +static void dump_exit(PLpgSQL_stmt_exit *stmt) { dump_ind(); diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y index d9cab1ad7ee..42f6a2e1618 100644 --- a/src/pl/plpgsql/src/pl_gram.y +++ b/src/pl/plpgsql/src/pl_gram.y @@ -198,6 +198,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %type <stmt> stmt_return stmt_raise stmt_assert stmt_execsql %type <stmt> stmt_dynexecute stmt_for stmt_perform stmt_getdiag %type <stmt> stmt_open stmt_fetch stmt_move stmt_close stmt_null +%type <stmt> stmt_commit stmt_rollback %type <stmt> stmt_case stmt_foreach_a %type <list> proc_exceptions @@ -260,6 +261,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %token <keyword> K_COLLATE %token <keyword> K_COLUMN %token <keyword> K_COLUMN_NAME +%token <keyword> K_COMMIT %token <keyword> K_CONSTANT %token <keyword> K_CONSTRAINT %token <keyword> K_CONSTRAINT_NAME @@ -325,6 +327,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %token <keyword> K_RETURN %token <keyword> K_RETURNED_SQLSTATE %token <keyword> K_REVERSE +%token <keyword> K_ROLLBACK %token <keyword> K_ROW_COUNT %token <keyword> K_ROWTYPE %token <keyword> K_SCHEMA @@ -897,6 +900,10 @@ proc_stmt : pl_block ';' { $$ = $1; } | stmt_null { $$ = $1; } + | stmt_commit + { $$ = $1; } + | stmt_rollback + { $$ = $1; } ; stmt_perform : K_PERFORM expr_until_semi @@ -2151,6 +2158,31 @@ stmt_null : K_NULL ';' } ; +stmt_commit : K_COMMIT ';' + { + PLpgSQL_stmt_commit *new; + + new = palloc(sizeof(PLpgSQL_stmt_commit)); + new->cmd_type = PLPGSQL_STMT_COMMIT; + new->lineno = plpgsql_location_to_lineno(@1); + + $$ = (PLpgSQL_stmt *)new; + } + ; + +stmt_rollback : K_ROLLBACK ';' + { + PLpgSQL_stmt_rollback *new; + + new = palloc(sizeof(PLpgSQL_stmt_rollback)); + new->cmd_type = PLPGSQL_STMT_ROLLBACK; + new->lineno = plpgsql_location_to_lineno(@1); + + $$ = (PLpgSQL_stmt *)new; + } + ; + + cursor_variable : T_DATUM { /* @@ -2387,6 +2419,7 @@ unreserved_keyword : | K_COLLATE | K_COLUMN | K_COLUMN_NAME + | K_COMMIT | K_CONSTANT | K_CONSTRAINT | K_CONSTRAINT_NAME @@ -2438,6 +2471,7 @@ unreserved_keyword : | K_RETURN | K_RETURNED_SQLSTATE | K_REVERSE + | K_ROLLBACK | K_ROW_COUNT | K_ROWTYPE | K_SCHEMA diff --git a/src/pl/plpgsql/src/pl_handler.c b/src/pl/plpgsql/src/pl_handler.c index 4c2ba2f734f..c49428d9233 100644 --- a/src/pl/plpgsql/src/pl_handler.c +++ b/src/pl/plpgsql/src/pl_handler.c @@ -219,15 +219,20 @@ PG_FUNCTION_INFO_V1(plpgsql_call_handler); Datum plpgsql_call_handler(PG_FUNCTION_ARGS) { + bool nonatomic; PLpgSQL_function *func; PLpgSQL_execstate *save_cur_estate; Datum retval; int rc; + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* * Connect to SPI manager */ - if ((rc = SPI_connect()) != SPI_OK_CONNECT) + if ((rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); /* Find or compile the function */ @@ -301,7 +306,7 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS) /* * Connect to SPI manager */ - if ((rc = SPI_connect()) != SPI_OK_CONNECT) + if ((rc = SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC)) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); /* Compile the anonymous code block */ diff --git a/src/pl/plpgsql/src/pl_scanner.c b/src/pl/plpgsql/src/pl_scanner.c index ee9aef8bbc0..12a3e6b818f 100644 --- a/src/pl/plpgsql/src/pl_scanner.c +++ b/src/pl/plpgsql/src/pl_scanner.c @@ -106,6 +106,7 @@ static const ScanKeyword unreserved_keywords[] = { PG_KEYWORD("collate", K_COLLATE, UNRESERVED_KEYWORD) PG_KEYWORD("column", K_COLUMN, UNRESERVED_KEYWORD) PG_KEYWORD("column_name", K_COLUMN_NAME, UNRESERVED_KEYWORD) + PG_KEYWORD("commit", K_COMMIT, UNRESERVED_KEYWORD) PG_KEYWORD("constant", K_CONSTANT, UNRESERVED_KEYWORD) PG_KEYWORD("constraint", K_CONSTRAINT, UNRESERVED_KEYWORD) PG_KEYWORD("constraint_name", K_CONSTRAINT_NAME, UNRESERVED_KEYWORD) @@ -158,6 +159,7 @@ static const ScanKeyword unreserved_keywords[] = { PG_KEYWORD("return", K_RETURN, UNRESERVED_KEYWORD) PG_KEYWORD("returned_sqlstate", K_RETURNED_SQLSTATE, UNRESERVED_KEYWORD) PG_KEYWORD("reverse", K_REVERSE, UNRESERVED_KEYWORD) + PG_KEYWORD("rollback", K_ROLLBACK, UNRESERVED_KEYWORD) PG_KEYWORD("row_count", K_ROW_COUNT, UNRESERVED_KEYWORD) PG_KEYWORD("rowtype", K_ROWTYPE, UNRESERVED_KEYWORD) PG_KEYWORD("schema", K_SCHEMA, UNRESERVED_KEYWORD) diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h index c571afa34bb..a9b9d91de7a 100644 --- a/src/pl/plpgsql/src/plpgsql.h +++ b/src/pl/plpgsql/src/plpgsql.h @@ -105,7 +105,9 @@ typedef enum PLpgSQL_stmt_type PLPGSQL_STMT_OPEN, PLPGSQL_STMT_FETCH, PLPGSQL_STMT_CLOSE, - PLPGSQL_STMT_PERFORM + PLPGSQL_STMT_PERFORM, + PLPGSQL_STMT_COMMIT, + PLPGSQL_STMT_ROLLBACK } PLpgSQL_stmt_type; /* @@ -434,6 +436,24 @@ typedef struct PLpgSQL_stmt_perform } PLpgSQL_stmt_perform; /* + * COMMIT statement + */ +typedef struct PLpgSQL_stmt_commit +{ + PLpgSQL_stmt_type cmd_type; + int lineno; +} PLpgSQL_stmt_commit; + +/* + * ROLLBACK statement + */ +typedef struct PLpgSQL_stmt_rollback +{ + PLpgSQL_stmt_type cmd_type; + int lineno; +} PLpgSQL_stmt_rollback; + +/* * GET DIAGNOSTICS item */ typedef struct PLpgSQL_diag_item diff --git a/src/pl/plpgsql/src/sql/plpgsql_transaction.sql b/src/pl/plpgsql/src/sql/plpgsql_transaction.sql new file mode 100644 index 00000000000..02ee7350795 --- /dev/null +++ b/src/pl/plpgsql/src/sql/plpgsql_transaction.sql @@ -0,0 +1,215 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpgsql +$$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; + +SELECT * FROM test1; + + +-- transaction commands not allowed when called in transaction block +START TRANSACTION; +CALL transaction_test1(); +COMMIT; + +START TRANSACTION; +DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; +COMMIT; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + RETURN 1; +END +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + CALL transaction_test1(); + RETURN 1; +END; +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE 'DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$'; + RETURN 1; +END; +$$; + +SELECT transaction_test4(); + + +-- proconfig settings currently disallow transaction statements +CREATE PROCEDURE transaction_test5() +LANGUAGE plpgsql +SET work_mem = 555 +AS $$ +BEGIN + COMMIT; +END; +$$; + +CALL transaction_test5(); + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + COMMIT; + END LOOP; +END; +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; + +SELECT * FROM test1; + + +-- commit inside block with exception handler +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + COMMIT; + INSERT INTO test1 (a) VALUES (1/0); + COMMIT; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; + +SELECT * FROM test1; + + +-- rollback inside block with exception handler +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + ROLLBACK; + INSERT INTO test1 (a) VALUES (1/0); + ROLLBACK; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; + +SELECT * FROM test1; + + +-- COMMIT failures +DO LANGUAGE plpgsql $$ +BEGIN + CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + INSERT INTO test3 (y) VALUES (2); + COMMIT; + INSERT INTO test3 (y) VALUES (3); -- won't get here +END; +$$; + +SELECT * FROM test3; + + +DROP TABLE test1; +DROP TABLE test2; +DROP TABLE test3; diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index cc91afebde7..d09910835d1 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -90,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_transaction \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc412e..39b994f4468 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -48,6 +48,7 @@ select module_contents(); Error Fatal SPIError + commit cursor debug error @@ -60,10 +61,11 @@ select module_contents(); quote_ident quote_literal quote_nullable + rollback spiexceptions subtransaction warning -(18 rows) +(20 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/expected/plpython_transaction.out b/src/pl/plpython/expected/plpython_transaction.out new file mode 100644 index 00000000000..1fadc69b636 --- /dev/null +++ b/src/pl/plpython/expected/plpython_transaction.out @@ -0,0 +1,135 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +return 1 +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +CONTEXT: PL/Python function "transaction_test2" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("CALL transaction_test1()") +return 1 +$$; +SELECT transaction_test3(); +ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination +CONTEXT: Traceback (most recent call last): + PL/Python function "transaction_test3", line 2, in <module> + plpy.execute("CALL transaction_test1()") +PL/Python function "transaction_test3" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +return 1 +$$; +SELECT transaction_test4(); +ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination +CONTEXT: Traceback (most recent call last): + PL/Python function "transaction_test4", line 2, in <module> + plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +PL/Python function "transaction_test4" +-- commit inside subtransaction (prohibited) +DO LANGUAGE plpythonu $$ +with plpy.subtransaction(): + plpy.commit() +$$; +WARNING: forcibly aborting a subtransaction that has not been exited +ERROR: cannot commit while a subtransaction is active +CONTEXT: PL/Python anonymous code block +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.commit() +$$; +ERROR: cannot commit transaction while a cursor is open +CONTEXT: PL/Python anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.rollback() +$$; +ERROR: cannot abort transaction while a cursor is open +CONTEXT: PL/Python anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 695de305838..5a197ce27ac 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -60,7 +60,7 @@ static void plpython_error_callback(void *arg); static void plpython_inline_error_callback(void *arg); static void PLy_init_interp(void); -static PLyExecutionContext *PLy_push_execution_context(void); +static PLyExecutionContext *PLy_push_execution_context(bool atomic_context); static void PLy_pop_execution_context(void); /* static state for Python library conflict detection */ @@ -219,14 +219,19 @@ plpython2_validator(PG_FUNCTION_ARGS) Datum plpython_call_handler(PG_FUNCTION_ARGS) { + bool nonatomic; Datum retval; PLyExecutionContext *exec_ctx; ErrorContextCallback plerrcontext; PLy_initialize(); + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); /* @@ -235,7 +240,7 @@ plpython_call_handler(PG_FUNCTION_ARGS) * here and the PG_TRY. (plpython_error_callback expects the stack entry * to be there, so we have to make the context first.) */ - exec_ctx = PLy_push_execution_context(); + exec_ctx = PLy_push_execution_context(!nonatomic); /* * Setup error traceback support for ereport() @@ -303,7 +308,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS) PLy_initialize(); /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); MemSet(&fake_fcinfo, 0, sizeof(fake_fcinfo)); @@ -332,7 +337,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS) * need the stack entry, but for consistency with plpython_call_handler we * do it in this order.) */ - exec_ctx = PLy_push_execution_context(); + exec_ctx = PLy_push_execution_context(codeblock->atomic); /* * Setup error traceback support for ereport() @@ -430,12 +435,14 @@ PLy_get_scratch_context(PLyExecutionContext *context) } static PLyExecutionContext * -PLy_push_execution_context(void) +PLy_push_execution_context(bool atomic_context) { PLyExecutionContext *context; + /* Pick a memory context similar to what SPI uses. */ context = (PLyExecutionContext *) - MemoryContextAlloc(TopTransactionContext, sizeof(PLyExecutionContext)); + MemoryContextAlloc(atomic_context ? TopTransactionContext : PortalContext, + sizeof(PLyExecutionContext)); context->curr_proc = NULL; context->scratch_ctx = NULL; context->next = PLy_execution_contexts; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index 23f99e20ca3..3d7dd13f0cf 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -6,8 +6,10 @@ #include "postgres.h" +#include "access/xact.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" +#include "utils/snapmgr.h" #include "plpython.h" @@ -15,6 +17,7 @@ #include "plpy_cursorobject.h" #include "plpy_elog.h" +#include "plpy_main.h" #include "plpy_planobject.h" #include "plpy_resultobject.h" #include "plpy_spi.h" @@ -41,6 +44,8 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, PyObject *kw); static PyObject *PLy_quote_literal(PyObject *self, PyObject *args); static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args); static PyObject *PLy_quote_ident(PyObject *self, PyObject *args); +static PyObject *PLy_commit(PyObject *self, PyObject *args); +static PyObject *PLy_rollback(PyObject *self, PyObject *args); /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ @@ -95,6 +100,12 @@ static PyMethodDef PLy_methods[] = { */ {"cursor", PLy_cursor, METH_VARARGS, NULL}, + /* + * transaction control + */ + {"commit", PLy_commit, METH_NOARGS, NULL}, + {"rollback", PLy_rollback, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -577,3 +588,41 @@ PLy_output(volatile int level, PyObject *self, PyObject *args, PyObject *kw) */ Py_RETURN_NONE; } + +static PyObject * +PLy_commit(PyObject *self, PyObject *args) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot commit transaction while a cursor is open"))); + + SPI_commit(); + SPI_start_transaction(); + + /* was cleared at transaction end, reset pointer */ + exec_ctx->scratch_ctx = NULL; + + Py_RETURN_NONE; +} + +static PyObject * +PLy_rollback(PyObject *self, PyObject *args) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction while a cursor is open"))); + + SPI_rollback(); + SPI_start_transaction(); + + /* was cleared at transaction end, reset pointer */ + exec_ctx->scratch_ctx = NULL; + + Py_RETURN_NONE; +} diff --git a/src/pl/plpython/sql/plpython_transaction.sql b/src/pl/plpython/sql/plpython_transaction.sql new file mode 100644 index 00000000000..36c7b2ef385 --- /dev/null +++ b/src/pl/plpython/sql/plpython_transaction.sql @@ -0,0 +1,115 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +return 1 +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("CALL transaction_test1()") +return 1 +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +return 1 +$$; + +SELECT transaction_test4(); + + +-- commit inside subtransaction (prohibited) +DO LANGUAGE plpythonu $$ +with plpy.subtransaction(): + plpy.commit() +$$; + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.commit() +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.rollback() +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/tcl/Makefile b/src/pl/tcl/Makefile index 6a92a9b6aa7..ef61ee596e0 100644 --- a/src/pl/tcl/Makefile +++ b/src/pl/tcl/Makefile @@ -28,7 +28,7 @@ DATA = pltcl.control pltcl--1.0.sql pltcl--unpackaged--1.0.sql \ pltclu.control pltclu--1.0.sql pltclu--unpackaged--1.0.sql REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=pltcl -REGRESS = pltcl_setup pltcl_queries pltcl_call pltcl_start_proc pltcl_subxact pltcl_unicode +REGRESS = pltcl_setup pltcl_queries pltcl_call pltcl_start_proc pltcl_subxact pltcl_unicode pltcl_transaction # Tcl on win32 ships with import libraries only for Microsoft Visual C++, # which are not compatible with mingw gcc. Therefore we need to build a diff --git a/src/pl/tcl/expected/pltcl_transaction.out b/src/pl/tcl/expected/pltcl_transaction.out new file mode 100644 index 00000000000..007204b99ad --- /dev/null +++ b/src/pl/tcl/expected/pltcl_transaction.out @@ -0,0 +1,100 @@ +-- suppress CONTEXT so that function OIDs aren't in output +\set VERBOSITY terse +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +return 1 +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE pltcl +AS $$ +spi_exec "CALL transaction_test1()" +return 1 +$$; +SELECT transaction_test3(); +ERROR: invalid transaction termination +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +CREATE PROCEDURE transaction_test4a() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + commit +} +$$; +CALL transaction_test4a(); +ERROR: cannot commit while a subtransaction is active +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +CREATE PROCEDURE transaction_test4b() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + rollback +} +$$; +CALL transaction_test4b(); +ERROR: cannot roll back while a subtransaction is active +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/tcl/pltcl.c b/src/pl/tcl/pltcl.c index 8f5847c4ff2..5df4dfdf552 100644 --- a/src/pl/tcl/pltcl.c +++ b/src/pl/tcl/pltcl.c @@ -312,6 +312,10 @@ static int pltcl_SPI_lastoid(ClientData cdata, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]); static int pltcl_subtransaction(ClientData cdata, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]); +static int pltcl_commit(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]); +static int pltcl_rollback(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]); static void pltcl_subtrans_begin(MemoryContext oldcontext, ResourceOwner oldowner); @@ -524,6 +528,10 @@ pltcl_init_interp(pltcl_interp_desc *interp_desc, Oid prolang, bool pltrusted) pltcl_SPI_lastoid, NULL, NULL); Tcl_CreateObjCommand(interp, "subtransaction", pltcl_subtransaction, NULL, NULL); + Tcl_CreateObjCommand(interp, "commit", + pltcl_commit, NULL, NULL); + Tcl_CreateObjCommand(interp, "rollback", + pltcl_rollback, NULL, NULL); /************************************************************ * Call the appropriate start_proc, if there is one. @@ -797,6 +805,7 @@ static Datum pltcl_func_handler(PG_FUNCTION_ARGS, pltcl_call_state *call_state, bool pltrusted) { + bool nonatomic; pltcl_proc_desc *prodesc; Tcl_Interp *volatile interp; Tcl_Obj *tcl_cmd; @@ -804,8 +813,12 @@ pltcl_func_handler(PG_FUNCTION_ARGS, pltcl_call_state *call_state, int tcl_rc; Datum retval; + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* Connect to SPI manager */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI manager"); /* Find or compile the function */ @@ -2937,6 +2950,86 @@ pltcl_subtransaction(ClientData cdata, Tcl_Interp *interp, /********************************************************************** + * pltcl_commit() + * + * Commit the transaction and start a new one. + **********************************************************************/ +static int +pltcl_commit(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + SPI_commit(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Pass the error data to Tcl */ + pltcl_construct_errorCode(interp, edata); + UTF_BEGIN; + Tcl_SetObjResult(interp, Tcl_NewStringObj(UTF_E2U(edata->message), -1)); + UTF_END; + FreeErrorData(edata); + + return TCL_ERROR; + } + PG_END_TRY(); + + return TCL_OK; +} + + +/********************************************************************** + * pltcl_rollback() + * + * Abort the transaction and start a new one. + **********************************************************************/ +static int +pltcl_rollback(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + SPI_rollback(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Pass the error data to Tcl */ + pltcl_construct_errorCode(interp, edata); + UTF_BEGIN; + Tcl_SetObjResult(interp, Tcl_NewStringObj(UTF_E2U(edata->message), -1)); + UTF_END; + FreeErrorData(edata); + + return TCL_ERROR; + } + PG_END_TRY(); + + return TCL_OK; +} + + +/********************************************************************** * pltcl_set_tuple_values() - Set variables for all attributes * of a given tuple * diff --git a/src/pl/tcl/sql/pltcl_transaction.sql b/src/pl/tcl/sql/pltcl_transaction.sql new file mode 100644 index 00000000000..c752faf6654 --- /dev/null +++ b/src/pl/tcl/sql/pltcl_transaction.sql @@ -0,0 +1,98 @@ +-- suppress CONTEXT so that function OIDs aren't in output +\set VERBOSITY terse + +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +return 1 +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE pltcl +AS $$ +spi_exec "CALL transaction_test1()" +return 1 +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +CREATE PROCEDURE transaction_test4a() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + commit +} +$$; + +CALL transaction_test4a(); + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +CREATE PROCEDURE transaction_test4b() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + rollback +} +$$; + +CALL transaction_test4b(); + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2; |