diff options
Diffstat (limited to 'src/backend/commands/portalcmds.c')
-rw-r--r-- | src/backend/commands/portalcmds.c | 262 |
1 files changed, 237 insertions, 25 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 1ba72437ad7..7eabc58d495 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.10 2003/03/11 19:40:22 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.11 2003/03/27 16:51:27 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -17,18 +17,23 @@ #include <limits.h> +#include "miscadmin.h" #include "commands/portalcmds.h" #include "executor/executor.h" #include "optimizer/planner.h" #include "rewrite/rewriteHandler.h" - +#include "utils/memutils.h" static long DoRelativeFetch(Portal portal, bool forward, long count, CommandDest dest); +static long DoRelativeStoreFetch(Portal portal, + bool forward, + long count, + CommandDest dest); static void DoPortalRewind(Portal portal); -static Portal PreparePortal(char *portalName); +static Portal PreparePortal(DeclareCursorStmt *stmt); /* @@ -46,8 +51,15 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) char *cursorName; QueryDesc *queryDesc; - /* Check for invalid context (must be in transaction block) */ - RequireTransactionChain((void *) stmt, "DECLARE CURSOR"); + /* + * If this is a non-holdable cursor, we ensure that this statement + * has been executed inside a transaction block (or else, it would + * have no user-visible effect). + * + * XXX: surely there is a better way to check this? + */ + if (!(stmt->options & CURSOR_OPT_HOLD)) + RequireTransactionChain((void *) stmt, "DECLARE CURSOR"); /* * The query has been through parse analysis, but not rewriting or @@ -76,7 +88,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) /* * Create a portal and copy the query and plan into its memory context. */ - portal = PreparePortal(stmt->portalname); + portal = PreparePortal(stmt); oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); query = copyObject(query); @@ -130,6 +142,7 @@ PerformPortalFetch(FetchStmt *stmt, portal = GetPortalByName(stmt->portalname); if (!PortalIsValid(portal)) { + /* FIXME: shouldn't this be an ERROR? */ elog(WARNING, "PerformPortalFetch: portal \"%s\" not found", stmt->portalname); return; @@ -343,6 +356,9 @@ DoRelativeFetch(Portal portal, ScanDirection direction; QueryDesc temp_queryDesc; + if (portal->holdStore) + return DoRelativeStoreFetch(portal, forward, count, dest); + queryDesc = PortalGetQueryDesc(portal); estate = queryDesc->estate; @@ -407,7 +423,7 @@ DoRelativeFetch(Portal portal, } else { - if (!portal->backwardOK) + if (portal->scrollType == DISABLE_SCROLL) elog(ERROR, "Cursor can only scan forward" "\n\tDeclare it with SCROLL option to enable backward scan"); @@ -453,16 +469,84 @@ DoRelativeFetch(Portal portal, } /* + * DoRelativeStoreFetch + * Do fetch for a simple N-rows-forward-or-backward case, getting + * the results from the portal's tuple store. + */ +static long +DoRelativeStoreFetch(Portal portal, + bool forward, + long count, + CommandDest dest) +{ + DestReceiver *destfunc; + QueryDesc *queryDesc = portal->queryDesc; + long rows_fetched = 0; + + if (!forward && portal->scrollType == DISABLE_SCROLL) + elog(ERROR, "Cursor can only scan forward" + "\n\tDeclare it with SCROLL option to enable backward scan"); + + destfunc = DestToFunction(dest); + (*destfunc->setup) (destfunc, queryDesc->operation, + portal->name, queryDesc->tupDesc); + + for (;;) + { + HeapTuple tup; + bool should_free; + + if (rows_fetched >= count) + break; + if (portal->atEnd && forward) + break; + if (portal->atStart && !forward) + break; + + tup = tuplestore_getheaptuple(portal->holdStore, forward, &should_free); + + if (tup == NULL) + { + if (forward) + portal->atEnd = true; + else + portal->atStart = true; + + break; + } + + (*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc); + + rows_fetched++; + if (forward) + portal->portalPos++; + else + portal->portalPos--; + + if (forward && portal->atStart) + portal->atStart = false; + if (!forward && portal->atEnd) + portal->atEnd = false; + + if (should_free) + pfree(tup); + } + + (*destfunc->cleanup) (destfunc); + + return rows_fetched; +} + +/* * DoPortalRewind - rewind a Portal to starting point */ static void DoPortalRewind(Portal portal) { - QueryDesc *queryDesc; - - queryDesc = PortalGetQueryDesc(portal); - - ExecutorRewind(queryDesc); + if (portal->holdStore) + tuplestore_rescan(portal->holdStore); + else + ExecutorRewind(PortalGetQueryDesc(portal)); portal->atStart = true; portal->atEnd = false; @@ -493,22 +577,25 @@ PerformPortalClose(char *name) /* * Note: PortalCleanup is called as a side-effect */ - PortalDrop(portal); + PortalDrop(portal, false); } - /* * PreparePortal + * Given a DECLARE CURSOR statement, returns the Portal data + * structure based on that statement that is used to manage the + * Portal internally. If a portal with specified name already + * exists, it is replaced. */ static Portal -PreparePortal(char *portalName) +PreparePortal(DeclareCursorStmt *stmt) { Portal portal; /* * Check for already-in-use portal name. */ - portal = GetPortalByName(portalName); + portal = GetPortalByName(stmt->portalname); if (PortalIsValid(portal)) { /* @@ -516,19 +603,30 @@ PreparePortal(char *portalName) * portal? */ elog(WARNING, "Closing pre-existing portal \"%s\"", - portalName); - PortalDrop(portal); + stmt->portalname); + PortalDrop(portal, false); } /* * Create the new portal. */ - portal = CreatePortal(portalName); + portal = CreatePortal(stmt->portalname); + + /* + * Modify the newly created portal based on the options specified in + * the DECLARE CURSOR statement. + */ + if (stmt->options & CURSOR_OPT_SCROLL) + portal->scrollType = ENABLE_SCROLL; + else if (stmt->options & CURSOR_OPT_NO_SCROLL) + portal->scrollType = DISABLE_SCROLL; + + if (stmt->options & CURSOR_OPT_HOLD) + portal->holdOpen = true; return portal; } - /* * PortalCleanup * @@ -545,14 +643,128 @@ PortalCleanup(Portal portal) AssertArg(PortalIsValid(portal)); AssertArg(portal->cleanup == PortalCleanup); + if (portal->holdStore) + tuplestore_end(portal->holdStore); + else + ExecutorEnd(PortalGetQueryDesc(portal)); + +} + +/* + * PersistHoldablePortal + * + * Prepare the specified Portal for access outside of the current + * transaction. When this function returns, all future accesses to the + * portal must be done via the Tuplestore (not by invoking the + * executor). + */ +void +PersistHoldablePortal(Portal portal) +{ + MemoryContext oldcxt; + QueryDesc *queryDesc = PortalGetQueryDesc(portal); + + /* + * If we're preserving a holdable portal, we had better be + * inside the transaction that originally created it. + */ + Assert(portal->createXact == GetCurrentTransactionId()); + Assert(portal->holdStore == NULL); + + /* + * This context is used to store portal data that needs to persist + * between transactions. + */ + oldcxt = MemoryContextSwitchTo(portal->holdContext); + + /* XXX: Should SortMem be used for this? */ + portal->holdStore = tuplestore_begin_heap(true, true, SortMem); + + /* Set the destination to output to the tuplestore */ + queryDesc->dest = Tuplestore; + + /* + * Rewind the executor: we need to store the entire result set in + * the tuplestore, so that subsequent backward FETCHs can be + * processed. + */ + ExecutorRewind(queryDesc); + + /* Fetch the result set into the tuplestore */ + ExecutorRun(queryDesc, ForwardScanDirection, 0); + + /* + * Reset the position in the result set: ideally, this could be + * implemented by just skipping straight to the tuple # that we need + * to be at, but the tuplestore API doesn't support that. So we + * start at the beginning of the tuplestore and iterate through it + * until we reach where we need to be. + */ + if (!portal->atEnd) + { + int store_pos = 0; + bool should_free; + + tuplestore_rescan(portal->holdStore); + + while (store_pos < portal->portalPos) + { + HeapTuple tmp = tuplestore_gettuple(portal->holdStore, + true, &should_free); + + if (tmp == NULL) + elog(ERROR, + "PersistHoldablePortal: unexpected end of tuple stream"); + + store_pos++; + + /* + * This could probably be optimized by creating and then + * deleting a separate memory context for this series of + * operations. + */ + if (should_free) + pfree(tmp); + } + } + /* - * tell the executor to shutdown the query + * The current Portal structure contains some data that will be + * needed by the holdable cursor, but it has been allocated in a + * memory context that is not sufficiently long-lived: we need to + * copy it into the portal's long-term memory context. */ - ExecutorEnd(PortalGetQueryDesc(portal)); + { + TupleDesc tupDescCopy; + QueryDesc *queryDescCopy; + + /* + * We need to use this order as ExecutorEnd invalidates the + * queryDesc's tuple descriptor + */ + tupDescCopy = CreateTupleDescCopy(queryDesc->tupDesc); + + ExecutorEnd(queryDesc); + + queryDescCopy = palloc(sizeof(*queryDescCopy)); + + /* + * This doesn't copy all the dependant data in the QueryDesc, + * but that's okay -- the only complex field we need to keep is + * the query's tupledesc, which we've copied ourselves. + */ + memcpy(queryDescCopy, queryDesc, sizeof(*queryDesc)); + + FreeQueryDesc(queryDesc); + + queryDescCopy->tupDesc = tupDescCopy; + portal->queryDesc = queryDescCopy; + } /* - * This should be unnecessary since the querydesc should be in the - * portal's memory context, but do it anyway for symmetry. + * We no longer need the portal's short-term memory context. */ - FreeQueryDesc(PortalGetQueryDesc(portal)); + MemoryContextDelete(PortalGetHeapMemory(portal)); + + PortalGetHeapMemory(portal) = NULL; } |