aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorJan Wieck <JanWieck@Yahoo.com>2001-05-21 14:22:19 +0000
committerJan Wieck <JanWieck@Yahoo.com>2001-05-21 14:22:19 +0000
commitd27f363e3fceb7612997ae89a8fc84de6754a213 (patch)
tree844989e8d2c1b8f4ab444a5d439eab50142abbf3 /src/backend/executor
parentbe03eb25f34c9c95c400504ef76c8abe0081d09f (diff)
downloadpostgresql-d27f363e3fceb7612997ae89a8fc84de6754a213.tar.gz
postgresql-d27f363e3fceb7612997ae89a8fc84de6754a213.zip
Enhancement of SPI to get access to portals
- New functions to create a portal using a prepared/saved SPI plan or lookup an existing portal by name. - Functions to fetch/move from/in portals. Results are placed in the usual SPI_processed and SPI_tuptable, so the entire set of utility functions can be used to gain attribute access. - Prepared/saved SPI plans now use their own memory context and SPI_freeplan(plan) can remove them. - Tuple result sets (SPI_tuptable) now uses it's own memory context and can be free'd by SPI_freetuptable(tuptab). Enhancement of PL/pgSQL - Uses generic named portals internally in FOR ... SELECT loops to avoid running out of memory on huge result sets. - Support for CURSOR and REFCURSOR syntax using the new SPI functionality. Cursors used internally only need no explicit transaction block. Refcursor variables can be used inside of explicit transaction block to pass cursors between main application and functions. Jan
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/spi.c321
1 files changed, 312 insertions, 9 deletions
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 4aa8c475c30..d3698277427 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -3,12 +3,13 @@
* spi.c
* Server Programming Interface
*
- * $Id: spi.c,v 1.53 2001/03/22 03:59:29 momjian Exp $
+ * $Id: spi.c,v 1.54 2001/05/21 14:22:17 wieck Exp $
*
*-------------------------------------------------------------------------
*/
#include "executor/spi_priv.h"
#include "access/printtup.h"
+#include "commands/command.h"
uint32 SPI_processed = 0;
Oid SPI_lastoid = InvalidOid;
@@ -26,6 +27,9 @@ static int _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount);
static int _SPI_execute_plan(_SPI_plan *plan,
Datum *Values, char *Nulls, int tcount);
+static void _SPI_cursor_operation(Portal portal, bool forward, int count,
+ CommandDest dest);
+
static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location);
static int _SPI_begin_call(bool execmem);
@@ -272,6 +276,18 @@ SPI_saveplan(void *plan)
}
+int
+SPI_freeplan(void *plan)
+{
+ _SPI_plan *spiplan = (_SPI_plan *)plan;
+
+ if (plan == NULL)
+ return SPI_ERROR_ARGUMENT;
+
+ MemoryContextDelete(spiplan->plancxt);
+ return 0;
+}
+
HeapTuple
SPI_copytuple(HeapTuple tuple)
{
@@ -555,6 +571,181 @@ SPI_freetuple(HeapTuple tuple)
heap_freetuple(tuple);
}
+void
+SPI_freetuptable(SPITupleTable *tuptable)
+{
+ if (tuptable != NULL)
+ MemoryContextDelete(tuptable->tuptabcxt);
+}
+
+
+
+/*
+ * SPI_cursor_open()
+ *
+ * Open a prepared SPI plan as a portal
+ */
+Portal
+SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls)
+{
+ static int unnamed_portal_count = 0;
+
+ _SPI_plan *spiplan = (_SPI_plan *)plan;
+ List *qtlist = spiplan->qtlist;
+ List *ptlist = spiplan->ptlist;
+ Query *queryTree;
+ Plan *planTree;
+ QueryDesc *queryDesc;
+ EState *eState;
+ TupleDesc attinfo;
+ MemoryContext oldcontext;
+ Portal portal;
+ char portalname[64];
+ int k;
+
+ /* Ensure that the plan contains only one regular SELECT query */
+ if (length(ptlist) != 1)
+ elog(ERROR, "cannot open multi-query plan as cursor");
+ queryTree = (Query *)lfirst(qtlist);
+ planTree = (Plan *)lfirst(ptlist);
+
+ if (queryTree->commandType != CMD_SELECT)
+ elog(ERROR, "plan in SPI_cursor_open() is not a SELECT");
+ if (queryTree->isPortal)
+ elog(ERROR, "plan in SPI_cursor_open() must NOT be a DECLARE already");
+ else if (queryTree->into != NULL)
+ elog(ERROR, "plan in SPI_cursor_open() must NOT be a SELECT INTO");
+
+ /* Reset SPI result */
+ SPI_processed = 0;
+ SPI_tuptable = NULL;
+ _SPI_current->processed = 0;
+ _SPI_current->tuptable = NULL;
+
+ /* Make up a portal name if none given */
+ if (name == NULL)
+ {
+ for (;;)
+ {
+ unnamed_portal_count++;
+ if (unnamed_portal_count < 0)
+ unnamed_portal_count = 0;
+ sprintf(portalname, "<unnamed cursor %d>", unnamed_portal_count);
+ if (GetPortalByName(portalname) == NULL)
+ break;
+ }
+
+ name = portalname;
+ }
+
+ /* Ensure the portal doesn't exist already */
+ portal = GetPortalByName(name);
+ if (portal != NULL)
+ elog(ERROR, "cursor \"%s\" already in use", name);
+
+ /* Create the portal */
+ portal = CreatePortal(name);
+ if (portal == NULL)
+ elog(ERROR, "failed to create portal \"%s\"", name);
+
+ /* Switch to portals memory and copy the parsetree and plan to there */
+ oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+ queryTree = copyObject(queryTree);
+ planTree = copyObject(planTree);
+
+ /* Modify the parsetree to be a cursor */
+ queryTree->isPortal = true;
+ queryTree->into = pstrdup(name);
+ queryTree->isBinary = false;
+
+ /* Create the QueryDesc object and the executor state */
+ queryDesc = CreateQueryDesc(queryTree, planTree, SPI);
+ eState = CreateExecutorState();
+
+ /* If the plan has parameters, put them into the executor state */
+ if (spiplan->nargs > 0)
+ {
+ ParamListInfo paramLI = (ParamListInfo) palloc((spiplan->nargs + 1) *
+ sizeof(ParamListInfoData));
+ eState->es_param_list_info = paramLI;
+ for (k = 0; k < spiplan->nargs; paramLI++, k++)
+ {
+ paramLI->kind = PARAM_NUM;
+ paramLI->id = k + 1;
+ paramLI->isnull = (Nulls && Nulls[k] == 'n');
+ paramLI->value = Values[k];
+ }
+ paramLI->kind = PARAM_INVALID;
+ }
+ else
+ eState->es_param_list_info = NULL;
+
+ /* Start the executor */
+ attinfo = ExecutorStart(queryDesc, eState);
+
+ /* Put all the objects into the portal */
+ PortalSetQuery(portal, queryDesc, attinfo, eState, PortalCleanup);
+
+ /* Switch back to the callers memory context */
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Return the created portal */
+ return portal;
+}
+
+
+/*
+ * SPI_cursor_find()
+ *
+ * Find the portal of an existing open cursor
+ */
+Portal
+SPI_cursor_find(char *name)
+{
+ return GetPortalByName(name);
+}
+
+
+/*
+ * SPI_cursor_fetch()
+ *
+ * Fetch rows in a cursor
+ */
+void
+SPI_cursor_fetch(Portal portal, bool forward, int count)
+{
+ _SPI_cursor_operation(portal, forward, count, SPI);
+}
+
+
+/*
+ * SPI_cursor_move()
+ *
+ * Move in a cursor
+ */
+void
+SPI_cursor_move(Portal portal, bool forward, int count)
+{
+ _SPI_cursor_operation(portal, forward, count, None);
+}
+
+
+/*
+ * SPI_cursor_close()
+ *
+ * Close a cursor
+ */
+void
+SPI_cursor_close(Portal portal)
+{
+ Portal my_portal = portal;
+
+ if (!PortalIsValid(my_portal))
+ elog(ERROR, "invalid portal in SPI cursor operation");
+
+ PortalDrop(&my_portal);
+}
+
/* =================== private functions =================== */
/*
@@ -568,6 +759,7 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self)
{
SPITupleTable *tuptable;
MemoryContext oldcxt;
+ MemoryContext tuptabcxt;
/*
* When called by Executor _SPI_curid expected to be equal to
@@ -583,18 +775,31 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self)
tuptable = _SPI_current->tuptable;
if (tuptable == NULL)
{
+ tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
+ "SPI TupTable",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(tuptabcxt);
+
_SPI_current->tuptable = tuptable = (SPITupleTable *)
palloc(sizeof(SPITupleTable));
+ tuptable->tuptabcxt = tuptabcxt;
tuptable->alloced = tuptable->free = 128;
tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
tuptable->tupdesc = CreateTupleDescCopy(tupdesc);
}
- else if (tuptable->free == 0)
+ else
{
- tuptable->free = 256;
- tuptable->alloced += tuptable->free;
- tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
- tuptable->alloced * sizeof(HeapTuple));
+ MemoryContextSwitchTo(tuptable->tuptabcxt);
+
+ if (tuptable->free == 0)
+ {
+ tuptable->free = 256;
+ tuptable->alloced += tuptable->free;
+ tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
+ tuptable->alloced * sizeof(HeapTuple));
+ }
}
tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple(tuple);
@@ -876,6 +1081,86 @@ _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount)
}
+/*
+ * _SPI_cursor_operation()
+ *
+ * Do a FETCH or MOVE in a cursor
+ */
+static void
+_SPI_cursor_operation(Portal portal, bool forward, int count,
+ CommandDest dest)
+{
+ QueryDesc *querydesc;
+ EState *estate;
+ MemoryContext oldcontext;
+ CommandDest olddest;
+
+ /* Check that the portal is valid */
+ if (!PortalIsValid(portal))
+ elog(ERROR, "invalid portal in SPI cursor operation");
+
+ /* Push the SPI stack */
+ _SPI_begin_call(true);
+
+ /* Reset the SPI result */
+ SPI_processed = 0;
+ SPI_tuptable = NULL;
+ _SPI_current->processed = 0;
+ _SPI_current->tuptable = NULL;
+
+ /* Switch to the portals memory context */
+ oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+ querydesc = PortalGetQueryDesc(portal);
+ estate = PortalGetState(portal);
+
+ /* Save the queries command destination and set it to SPI (for fetch) */
+ /* or None (for move) */
+ olddest = querydesc->dest;
+ querydesc->dest = dest;
+
+ /* Run the executor like PerformPortalFetch and remember states */
+ if (forward)
+ {
+ if (!portal->atEnd)
+ {
+ ExecutorRun(querydesc, estate, EXEC_FOR, (long)count);
+ _SPI_current->processed = estate->es_processed;
+ if (estate->es_processed > 0)
+ portal->atStart = false;
+ if (count <= 0 || (int) estate->es_processed < count)
+ portal->atEnd = true;
+ }
+ }
+ else
+ {
+ if (!portal->atStart)
+ {
+ ExecutorRun(querydesc, estate, EXEC_BACK, (long) count);
+ _SPI_current->processed = estate->es_processed;
+ if (estate->es_processed > 0)
+ portal->atEnd = false;
+ if (count <= 0 || estate->es_processed < count)
+ portal->atStart = true;
+ }
+ }
+
+ /* Restore the old command destination and switch back to callers */
+ /* memory context */
+ querydesc->dest = olddest;
+ MemoryContextSwitchTo(oldcontext);
+
+ if (dest == SPI && _SPI_checktuples())
+ elog(FATAL, "SPI_fetch: # of processed tuples check failed");
+
+ /* Put the result into place for access by caller */
+ SPI_processed = _SPI_current->processed;
+ SPI_tuptable = _SPI_current->tuptable;
+
+ /* Pop the SPI stack */
+ _SPI_end_call(true);
+}
+
+
static MemoryContext
_SPI_execmem()
{
@@ -956,14 +1241,33 @@ static _SPI_plan *
_SPI_copy_plan(_SPI_plan *plan, int location)
{
_SPI_plan *newplan;
- MemoryContext oldcxt = NULL;
+ MemoryContext oldcxt;
+ MemoryContext plancxt;
+ MemoryContext parentcxt = CurrentMemoryContext;
+ /* Determine correct parent for the plans memory context */
if (location == _SPI_CPLAN_PROCXT)
+ parentcxt = _SPI_current->procCxt;
+ /*
oldcxt = MemoryContextSwitchTo(_SPI_current->procCxt);
+ */
else if (location == _SPI_CPLAN_TOPCXT)
+ parentcxt = TopMemoryContext;
+ /*
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+ */
+ /* Create a memory context for the plan */
+ plancxt = AllocSetContextCreate(parentcxt,
+ "SPI Plan",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(plancxt);
+
+ /* Copy the SPI plan into it's own context */
newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan));
+ newplan->plancxt = plancxt;
newplan->qtlist = (List *) copyObject(plan->qtlist);
newplan->ptlist = (List *) copyObject(plan->ptlist);
newplan->nargs = plan->nargs;
@@ -975,8 +1279,7 @@ _SPI_copy_plan(_SPI_plan *plan, int location)
else
newplan->argtypes = NULL;
- if (oldcxt != NULL)
- MemoryContextSwitchTo(oldcxt);
+ MemoryContextSwitchTo(oldcxt);
return newplan;
}