aboutsummaryrefslogtreecommitdiff
path: root/src/pl/plpython/plpython.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pl/plpython/plpython.c')
-rw-r--r--src/pl/plpython/plpython.c283
1 files changed, 279 insertions, 4 deletions
diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c
index e5832782fb3..4744ee7bebd 100644
--- a/src/pl/plpython/plpython.c
+++ b/src/pl/plpython/plpython.c
@@ -233,6 +233,13 @@ typedef struct PLyProcedureEntry
PLyProcedure *proc;
} PLyProcedureEntry;
+/* explicit subtransaction data */
+typedef struct PLySubtransactionData
+{
+ MemoryContext oldcontext;
+ ResourceOwner oldowner;
+} PLySubtransactionData;
+
/* Python objects */
typedef struct PLyPlanObject
@@ -254,6 +261,13 @@ typedef struct PLyResultObject
PyObject *status; /* query status, SPI_OK_*, or SPI_ERR_* */
} PLyResultObject;
+typedef struct PLySubtransactionObject
+{
+ PyObject_HEAD
+ bool started;
+ bool exited;
+} PLySubtransactionObject;
+
/* function declarations */
@@ -382,6 +396,9 @@ static HeapTuple PLyGenericObject_ToTuple(PLyTypeInfo *, TupleDesc, PyObject *);
*/
static PLyProcedure *PLy_curr_procedure = NULL;
+/* list of explicit subtransaction data */
+static List *explicit_subtransactions = NIL;
+
static PyObject *PLy_interp_globals = NULL;
static PyObject *PLy_interp_safe_globals = NULL;
static HTAB *PLy_procedure_cache = NULL;
@@ -401,6 +418,10 @@ static char PLy_result_doc[] = {
"Results of a PostgreSQL query"
};
+static char PLy_subtransaction_doc[] = {
+ "PostgreSQL subtransaction context manager"
+};
+
/*
* the function definitions
@@ -1226,20 +1247,71 @@ PLy_function_handler(FunctionCallInfo fcinfo, PLyProcedure *proc)
return rv;
}
+/*
+ * Abort lingering subtransactions that have been explicitly started
+ * by plpy.subtransaction().start() and not properly closed.
+ */
+static void
+PLy_abort_open_subtransactions(int save_subxact_level)
+{
+ Assert(save_subxact_level >= 0);
+
+ while (list_length(explicit_subtransactions) > save_subxact_level)
+ {
+ PLySubtransactionData *subtransactiondata;
+
+ Assert(explicit_subtransactions != NIL);
+
+ ereport(WARNING,
+ (errmsg("forcibly aborting a subtransaction that has not been exited")));
+
+ RollbackAndReleaseCurrentSubTransaction();
+
+ SPI_restore_connection();
+
+ subtransactiondata = (PLySubtransactionData *) linitial(explicit_subtransactions);
+ explicit_subtransactions = list_delete_first(explicit_subtransactions);
+
+ MemoryContextSwitchTo(subtransactiondata->oldcontext);
+ CurrentResourceOwner = subtransactiondata->oldowner;
+ PLy_free(subtransactiondata);
+ }
+}
+
static PyObject *
PLy_procedure_call(PLyProcedure *proc, char *kargs, PyObject *vargs)
{
PyObject *rv;
+ int volatile save_subxact_level = list_length(explicit_subtransactions);
PyDict_SetItemString(proc->globals, kargs, vargs);
+
+ PG_TRY();
+ {
#if PY_VERSION_HEX >= 0x03020000
- rv = PyEval_EvalCode(proc->code,
- proc->globals, proc->globals);
+ rv = PyEval_EvalCode(proc->code,
+ proc->globals, proc->globals);
#else
- rv = PyEval_EvalCode((PyCodeObject *) proc->code,
- proc->globals, proc->globals);
+ rv = PyEval_EvalCode((PyCodeObject *) proc->code,
+ proc->globals, proc->globals);
#endif
+ /*
+ * Since plpy will only let you close subtransactions that
+ * you started, you cannot *unnest* subtransactions, only
+ * *nest* them without closing.
+ */
+ Assert(list_length(explicit_subtransactions) >= save_subxact_level);
+ }
+ PG_CATCH();
+ {
+ PLy_abort_open_subtransactions(save_subxact_level);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ PLy_abort_open_subtransactions(save_subxact_level);
+
/* If the Python code returned an error, propagate it */
if (rv == NULL)
PLy_elog(ERROR, NULL);
@@ -2762,6 +2834,12 @@ static PyObject *PLy_quote_literal(PyObject *self, PyObject *args);
static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args);
static PyObject *PLy_quote_ident(PyObject *self, PyObject *args);
+static PyObject *PLy_subtransaction(PyObject *, PyObject *);
+static PyObject *PLy_subtransaction_new(void);
+static void PLy_subtransaction_dealloc(PyObject *);
+static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
+static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
+
static PyMethodDef PLy_plan_methods[] = {
{"status", PLy_plan_status, METH_VARARGS, NULL},
@@ -2854,6 +2932,50 @@ static PyTypeObject PLy_ResultType = {
PLy_result_methods, /* tp_tpmethods */
};
+static PyMethodDef PLy_subtransaction_methods[] = {
+ {"__enter__", PLy_subtransaction_enter, METH_VARARGS, NULL},
+ {"__exit__", PLy_subtransaction_exit, METH_VARARGS, NULL},
+ /* user-friendly names for Python <2.6 */
+ {"enter", PLy_subtransaction_enter, METH_VARARGS, NULL},
+ {"exit", PLy_subtransaction_exit, METH_VARARGS, NULL},
+ {NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_SubtransactionType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "PLySubtransaction", /* tp_name */
+ sizeof(PLySubtransactionObject), /* tp_size */
+ 0, /* tp_itemsize */
+
+ /*
+ * methods
+ */
+ PLy_subtransaction_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ PLy_subtransaction_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ PLy_subtransaction_methods, /* tp_tpmethods */
+};
+
static PyMethodDef PLy_methods[] = {
/*
* logging methods
@@ -2883,6 +3005,11 @@ static PyMethodDef PLy_methods[] = {
{"quote_nullable", PLy_quote_nullable, METH_VARARGS, NULL},
{"quote_ident", PLy_quote_ident, METH_VARARGS, NULL},
+ /*
+ * create the subtransaction context manager
+ */
+ {"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
+
{NULL, NULL, 0, NULL}
};
@@ -3553,6 +3680,150 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
return (PyObject *) result;
}
+/* s = plpy.subtransaction() */
+static PyObject *
+PLy_subtransaction(PyObject *self, PyObject *unused)
+{
+ return PLy_subtransaction_new();
+}
+
+/* Allocate and initialize a PLySubtransactionObject */
+static PyObject *
+PLy_subtransaction_new(void)
+{
+ PLySubtransactionObject *ob;
+
+ ob = PyObject_New(PLySubtransactionObject, &PLy_SubtransactionType);
+
+ if (ob == NULL)
+ return NULL;
+
+ ob->started = false;
+ ob->exited = false;
+
+ return (PyObject *) ob;
+}
+
+/* Python requires a dealloc function to be defined */
+static void
+PLy_subtransaction_dealloc(PyObject *subxact)
+{
+}
+
+/*
+ * subxact.__enter__() or subxact.enter()
+ *
+ * Start an explicit subtransaction. SPI calls within an explicit
+ * subtransaction will not start another one, so you can atomically
+ * execute many SPI calls and still get a controllable exception if
+ * one of them fails.
+ */
+static PyObject *
+PLy_subtransaction_enter(PyObject *self, PyObject *unused)
+{
+ PLySubtransactionData *subxactdata;
+ MemoryContext oldcontext;
+ PLySubtransactionObject *subxact = (PLySubtransactionObject *) self;
+
+ if (subxact->started)
+ {
+ PLy_exception_set(PyExc_ValueError, "this subtransaction has already been entered");
+ return NULL;
+ }
+
+ if (subxact->exited)
+ {
+ PLy_exception_set(PyExc_ValueError, "this subtransaction has already been exited");
+ return NULL;
+ }
+
+ subxact->started = true;
+ oldcontext = CurrentMemoryContext;
+
+ subxactdata = PLy_malloc(sizeof(*subxactdata));
+ subxactdata->oldcontext = oldcontext;
+ subxactdata->oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ /* Do not want to leave the previous memory context */
+ MemoryContextSwitchTo(oldcontext);
+
+ explicit_subtransactions = lcons(subxactdata, explicit_subtransactions);
+
+ Py_INCREF(self);
+ return self;
+}
+
+/*
+ * subxact.__exit__(exc_type, exc, tb) or subxact.exit(exc_type, exc, tb)
+ *
+ * Exit an explicit subtransaction. exc_type is an exception type, exc
+ * is the exception object, tb is the traceback. If exc_type is None,
+ * commit the subtransactiony, if not abort it.
+ *
+ * The method signature is chosen to allow subtransaction objects to
+ * be used as context managers as described in
+ * <http://www.python.org/dev/peps/pep-0343/>.
+ */
+static PyObject *
+PLy_subtransaction_exit(PyObject *self, PyObject *args)
+{
+ PyObject *type;
+ PyObject *value;
+ PyObject *traceback;
+ PLySubtransactionData *subxactdata;
+ PLySubtransactionObject *subxact = (PLySubtransactionObject *) self;
+
+ if (!PyArg_ParseTuple(args, "OOO", &type, &value, &traceback))
+ return NULL;
+
+ if (!subxact->started)
+ {
+ PLy_exception_set(PyExc_ValueError, "this subtransaction has not been entered");
+ return NULL;
+ }
+
+ if (subxact->exited)
+ {
+ PLy_exception_set(PyExc_ValueError, "this subtransaction has already been exited");
+ return NULL;
+ }
+
+ if (explicit_subtransactions == NIL)
+ {
+ PLy_exception_set(PyExc_ValueError, "there is no subtransaction to exit from");
+ return NULL;
+ }
+
+ subxact->exited = true;
+
+ if (type != Py_None)
+ {
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+ else
+ {
+ ReleaseCurrentSubTransaction();
+ }
+
+ subxactdata = (PLySubtransactionData *) linitial(explicit_subtransactions);
+ explicit_subtransactions = list_delete_first(explicit_subtransactions);
+
+ MemoryContextSwitchTo(subxactdata->oldcontext);
+ CurrentResourceOwner = subxactdata->oldowner;
+ PLy_free(subxactdata);
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but
+ * just in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
/*
* language handler and interpreter initialization
@@ -3653,6 +3924,8 @@ _PG_init(void)
PLy_trigger_cache = hash_create("PL/Python triggers", 32, &hash_ctl,
HASH_ELEM | HASH_FUNCTION);
+ explicit_subtransactions = NIL;
+
inited = true;
}
@@ -3688,6 +3961,8 @@ PLy_init_plpy(void)
elog(ERROR, "could not initialize PLy_PlanType");
if (PyType_Ready(&PLy_ResultType) < 0)
elog(ERROR, "could not initialize PLy_ResultType");
+ if (PyType_Ready(&PLy_SubtransactionType) < 0)
+ elog(ERROR, "could not initialize PLy_SubtransactionType");
#if PY_MAJOR_VERSION >= 3
plpy = PyModule_Create(&PLy_module);