diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/spi.c | 321 |
1 files changed, 312 insertions, 9 deletions
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 4aa8c475c30..d3698277427 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -3,12 +3,13 @@ * spi.c * Server Programming Interface * - * $Id: spi.c,v 1.53 2001/03/22 03:59:29 momjian Exp $ + * $Id: spi.c,v 1.54 2001/05/21 14:22:17 wieck Exp $ * *------------------------------------------------------------------------- */ #include "executor/spi_priv.h" #include "access/printtup.h" +#include "commands/command.h" uint32 SPI_processed = 0; Oid SPI_lastoid = InvalidOid; @@ -26,6 +27,9 @@ static int _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount); static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, char *Nulls, int tcount); +static void _SPI_cursor_operation(Portal portal, bool forward, int count, + CommandDest dest); + static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location); static int _SPI_begin_call(bool execmem); @@ -272,6 +276,18 @@ SPI_saveplan(void *plan) } +int +SPI_freeplan(void *plan) +{ + _SPI_plan *spiplan = (_SPI_plan *)plan; + + if (plan == NULL) + return SPI_ERROR_ARGUMENT; + + MemoryContextDelete(spiplan->plancxt); + return 0; +} + HeapTuple SPI_copytuple(HeapTuple tuple) { @@ -555,6 +571,181 @@ SPI_freetuple(HeapTuple tuple) heap_freetuple(tuple); } +void +SPI_freetuptable(SPITupleTable *tuptable) +{ + if (tuptable != NULL) + MemoryContextDelete(tuptable->tuptabcxt); +} + + + +/* + * SPI_cursor_open() + * + * Open a prepared SPI plan as a portal + */ +Portal +SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls) +{ + static int unnamed_portal_count = 0; + + _SPI_plan *spiplan = (_SPI_plan *)plan; + List *qtlist = spiplan->qtlist; + List *ptlist = spiplan->ptlist; + Query *queryTree; + Plan *planTree; + QueryDesc *queryDesc; + EState *eState; + TupleDesc attinfo; + MemoryContext oldcontext; + Portal portal; + char portalname[64]; + int k; + + /* Ensure that the plan contains only one regular SELECT query */ + if (length(ptlist) != 1) + elog(ERROR, "cannot open multi-query plan as cursor"); + queryTree = (Query *)lfirst(qtlist); + planTree = (Plan *)lfirst(ptlist); + + if (queryTree->commandType != CMD_SELECT) + elog(ERROR, "plan in SPI_cursor_open() is not a SELECT"); + if (queryTree->isPortal) + elog(ERROR, "plan in SPI_cursor_open() must NOT be a DECLARE already"); + else if (queryTree->into != NULL) + elog(ERROR, "plan in SPI_cursor_open() must NOT be a SELECT INTO"); + + /* Reset SPI result */ + SPI_processed = 0; + SPI_tuptable = NULL; + _SPI_current->processed = 0; + _SPI_current->tuptable = NULL; + + /* Make up a portal name if none given */ + if (name == NULL) + { + for (;;) + { + unnamed_portal_count++; + if (unnamed_portal_count < 0) + unnamed_portal_count = 0; + sprintf(portalname, "<unnamed cursor %d>", unnamed_portal_count); + if (GetPortalByName(portalname) == NULL) + break; + } + + name = portalname; + } + + /* Ensure the portal doesn't exist already */ + portal = GetPortalByName(name); + if (portal != NULL) + elog(ERROR, "cursor \"%s\" already in use", name); + + /* Create the portal */ + portal = CreatePortal(name); + if (portal == NULL) + elog(ERROR, "failed to create portal \"%s\"", name); + + /* Switch to portals memory and copy the parsetree and plan to there */ + oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + queryTree = copyObject(queryTree); + planTree = copyObject(planTree); + + /* Modify the parsetree to be a cursor */ + queryTree->isPortal = true; + queryTree->into = pstrdup(name); + queryTree->isBinary = false; + + /* Create the QueryDesc object and the executor state */ + queryDesc = CreateQueryDesc(queryTree, planTree, SPI); + eState = CreateExecutorState(); + + /* If the plan has parameters, put them into the executor state */ + if (spiplan->nargs > 0) + { + ParamListInfo paramLI = (ParamListInfo) palloc((spiplan->nargs + 1) * + sizeof(ParamListInfoData)); + eState->es_param_list_info = paramLI; + for (k = 0; k < spiplan->nargs; paramLI++, k++) + { + paramLI->kind = PARAM_NUM; + paramLI->id = k + 1; + paramLI->isnull = (Nulls && Nulls[k] == 'n'); + paramLI->value = Values[k]; + } + paramLI->kind = PARAM_INVALID; + } + else + eState->es_param_list_info = NULL; + + /* Start the executor */ + attinfo = ExecutorStart(queryDesc, eState); + + /* Put all the objects into the portal */ + PortalSetQuery(portal, queryDesc, attinfo, eState, PortalCleanup); + + /* Switch back to the callers memory context */ + MemoryContextSwitchTo(oldcontext); + + /* Return the created portal */ + return portal; +} + + +/* + * SPI_cursor_find() + * + * Find the portal of an existing open cursor + */ +Portal +SPI_cursor_find(char *name) +{ + return GetPortalByName(name); +} + + +/* + * SPI_cursor_fetch() + * + * Fetch rows in a cursor + */ +void +SPI_cursor_fetch(Portal portal, bool forward, int count) +{ + _SPI_cursor_operation(portal, forward, count, SPI); +} + + +/* + * SPI_cursor_move() + * + * Move in a cursor + */ +void +SPI_cursor_move(Portal portal, bool forward, int count) +{ + _SPI_cursor_operation(portal, forward, count, None); +} + + +/* + * SPI_cursor_close() + * + * Close a cursor + */ +void +SPI_cursor_close(Portal portal) +{ + Portal my_portal = portal; + + if (!PortalIsValid(my_portal)) + elog(ERROR, "invalid portal in SPI cursor operation"); + + PortalDrop(&my_portal); +} + /* =================== private functions =================== */ /* @@ -568,6 +759,7 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self) { SPITupleTable *tuptable; MemoryContext oldcxt; + MemoryContext tuptabcxt; /* * When called by Executor _SPI_curid expected to be equal to @@ -583,18 +775,31 @@ spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self) tuptable = _SPI_current->tuptable; if (tuptable == NULL) { + tuptabcxt = AllocSetContextCreate(CurrentMemoryContext, + "SPI TupTable", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(tuptabcxt); + _SPI_current->tuptable = tuptable = (SPITupleTable *) palloc(sizeof(SPITupleTable)); + tuptable->tuptabcxt = tuptabcxt; tuptable->alloced = tuptable->free = 128; tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple)); tuptable->tupdesc = CreateTupleDescCopy(tupdesc); } - else if (tuptable->free == 0) + else { - tuptable->free = 256; - tuptable->alloced += tuptable->free; - tuptable->vals = (HeapTuple *) repalloc(tuptable->vals, - tuptable->alloced * sizeof(HeapTuple)); + MemoryContextSwitchTo(tuptable->tuptabcxt); + + if (tuptable->free == 0) + { + tuptable->free = 256; + tuptable->alloced += tuptable->free; + tuptable->vals = (HeapTuple *) repalloc(tuptable->vals, + tuptable->alloced * sizeof(HeapTuple)); + } } tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple(tuple); @@ -876,6 +1081,86 @@ _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount) } +/* + * _SPI_cursor_operation() + * + * Do a FETCH or MOVE in a cursor + */ +static void +_SPI_cursor_operation(Portal portal, bool forward, int count, + CommandDest dest) +{ + QueryDesc *querydesc; + EState *estate; + MemoryContext oldcontext; + CommandDest olddest; + + /* Check that the portal is valid */ + if (!PortalIsValid(portal)) + elog(ERROR, "invalid portal in SPI cursor operation"); + + /* Push the SPI stack */ + _SPI_begin_call(true); + + /* Reset the SPI result */ + SPI_processed = 0; + SPI_tuptable = NULL; + _SPI_current->processed = 0; + _SPI_current->tuptable = NULL; + + /* Switch to the portals memory context */ + oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + querydesc = PortalGetQueryDesc(portal); + estate = PortalGetState(portal); + + /* Save the queries command destination and set it to SPI (for fetch) */ + /* or None (for move) */ + olddest = querydesc->dest; + querydesc->dest = dest; + + /* Run the executor like PerformPortalFetch and remember states */ + if (forward) + { + if (!portal->atEnd) + { + ExecutorRun(querydesc, estate, EXEC_FOR, (long)count); + _SPI_current->processed = estate->es_processed; + if (estate->es_processed > 0) + portal->atStart = false; + if (count <= 0 || (int) estate->es_processed < count) + portal->atEnd = true; + } + } + else + { + if (!portal->atStart) + { + ExecutorRun(querydesc, estate, EXEC_BACK, (long) count); + _SPI_current->processed = estate->es_processed; + if (estate->es_processed > 0) + portal->atEnd = false; + if (count <= 0 || estate->es_processed < count) + portal->atStart = true; + } + } + + /* Restore the old command destination and switch back to callers */ + /* memory context */ + querydesc->dest = olddest; + MemoryContextSwitchTo(oldcontext); + + if (dest == SPI && _SPI_checktuples()) + elog(FATAL, "SPI_fetch: # of processed tuples check failed"); + + /* Put the result into place for access by caller */ + SPI_processed = _SPI_current->processed; + SPI_tuptable = _SPI_current->tuptable; + + /* Pop the SPI stack */ + _SPI_end_call(true); +} + + static MemoryContext _SPI_execmem() { @@ -956,14 +1241,33 @@ static _SPI_plan * _SPI_copy_plan(_SPI_plan *plan, int location) { _SPI_plan *newplan; - MemoryContext oldcxt = NULL; + MemoryContext oldcxt; + MemoryContext plancxt; + MemoryContext parentcxt = CurrentMemoryContext; + /* Determine correct parent for the plans memory context */ if (location == _SPI_CPLAN_PROCXT) + parentcxt = _SPI_current->procCxt; + /* oldcxt = MemoryContextSwitchTo(_SPI_current->procCxt); + */ else if (location == _SPI_CPLAN_TOPCXT) + parentcxt = TopMemoryContext; + /* oldcxt = MemoryContextSwitchTo(TopMemoryContext); + */ + /* Create a memory context for the plan */ + plancxt = AllocSetContextCreate(parentcxt, + "SPI Plan", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(plancxt); + + /* Copy the SPI plan into it's own context */ newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan)); + newplan->plancxt = plancxt; newplan->qtlist = (List *) copyObject(plan->qtlist); newplan->ptlist = (List *) copyObject(plan->ptlist); newplan->nargs = plan->nargs; @@ -975,8 +1279,7 @@ _SPI_copy_plan(_SPI_plan *plan, int location) else newplan->argtypes = NULL; - if (oldcxt != NULL) - MemoryContextSwitchTo(oldcxt); + MemoryContextSwitchTo(oldcxt); return newplan; } |