aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/portalcmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/portalcmds.c')
-rw-r--r--src/backend/commands/portalcmds.c262
1 files changed, 237 insertions, 25 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 1ba72437ad7..7eabc58d495 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.10 2003/03/11 19:40:22 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.11 2003/03/27 16:51:27 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -17,18 +17,23 @@
#include <limits.h>
+#include "miscadmin.h"
#include "commands/portalcmds.h"
#include "executor/executor.h"
#include "optimizer/planner.h"
#include "rewrite/rewriteHandler.h"
-
+#include "utils/memutils.h"
static long DoRelativeFetch(Portal portal,
bool forward,
long count,
CommandDest dest);
+static long DoRelativeStoreFetch(Portal portal,
+ bool forward,
+ long count,
+ CommandDest dest);
static void DoPortalRewind(Portal portal);
-static Portal PreparePortal(char *portalName);
+static Portal PreparePortal(DeclareCursorStmt *stmt);
/*
@@ -46,8 +51,15 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest)
char *cursorName;
QueryDesc *queryDesc;
- /* Check for invalid context (must be in transaction block) */
- RequireTransactionChain((void *) stmt, "DECLARE CURSOR");
+ /*
+ * If this is a non-holdable cursor, we ensure that this statement
+ * has been executed inside a transaction block (or else, it would
+ * have no user-visible effect).
+ *
+ * XXX: surely there is a better way to check this?
+ */
+ if (!(stmt->options & CURSOR_OPT_HOLD))
+ RequireTransactionChain((void *) stmt, "DECLARE CURSOR");
/*
* The query has been through parse analysis, but not rewriting or
@@ -76,7 +88,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest)
/*
* Create a portal and copy the query and plan into its memory context.
*/
- portal = PreparePortal(stmt->portalname);
+ portal = PreparePortal(stmt);
oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
query = copyObject(query);
@@ -130,6 +142,7 @@ PerformPortalFetch(FetchStmt *stmt,
portal = GetPortalByName(stmt->portalname);
if (!PortalIsValid(portal))
{
+ /* FIXME: shouldn't this be an ERROR? */
elog(WARNING, "PerformPortalFetch: portal \"%s\" not found",
stmt->portalname);
return;
@@ -343,6 +356,9 @@ DoRelativeFetch(Portal portal,
ScanDirection direction;
QueryDesc temp_queryDesc;
+ if (portal->holdStore)
+ return DoRelativeStoreFetch(portal, forward, count, dest);
+
queryDesc = PortalGetQueryDesc(portal);
estate = queryDesc->estate;
@@ -407,7 +423,7 @@ DoRelativeFetch(Portal portal,
}
else
{
- if (!portal->backwardOK)
+ if (portal->scrollType == DISABLE_SCROLL)
elog(ERROR, "Cursor can only scan forward"
"\n\tDeclare it with SCROLL option to enable backward scan");
@@ -453,16 +469,84 @@ DoRelativeFetch(Portal portal,
}
/*
+ * DoRelativeStoreFetch
+ * Do fetch for a simple N-rows-forward-or-backward case, getting
+ * the results from the portal's tuple store.
+ */
+static long
+DoRelativeStoreFetch(Portal portal,
+ bool forward,
+ long count,
+ CommandDest dest)
+{
+ DestReceiver *destfunc;
+ QueryDesc *queryDesc = portal->queryDesc;
+ long rows_fetched = 0;
+
+ if (!forward && portal->scrollType == DISABLE_SCROLL)
+ elog(ERROR, "Cursor can only scan forward"
+ "\n\tDeclare it with SCROLL option to enable backward scan");
+
+ destfunc = DestToFunction(dest);
+ (*destfunc->setup) (destfunc, queryDesc->operation,
+ portal->name, queryDesc->tupDesc);
+
+ for (;;)
+ {
+ HeapTuple tup;
+ bool should_free;
+
+ if (rows_fetched >= count)
+ break;
+ if (portal->atEnd && forward)
+ break;
+ if (portal->atStart && !forward)
+ break;
+
+ tup = tuplestore_getheaptuple(portal->holdStore, forward, &should_free);
+
+ if (tup == NULL)
+ {
+ if (forward)
+ portal->atEnd = true;
+ else
+ portal->atStart = true;
+
+ break;
+ }
+
+ (*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc);
+
+ rows_fetched++;
+ if (forward)
+ portal->portalPos++;
+ else
+ portal->portalPos--;
+
+ if (forward && portal->atStart)
+ portal->atStart = false;
+ if (!forward && portal->atEnd)
+ portal->atEnd = false;
+
+ if (should_free)
+ pfree(tup);
+ }
+
+ (*destfunc->cleanup) (destfunc);
+
+ return rows_fetched;
+}
+
+/*
* DoPortalRewind - rewind a Portal to starting point
*/
static void
DoPortalRewind(Portal portal)
{
- QueryDesc *queryDesc;
-
- queryDesc = PortalGetQueryDesc(portal);
-
- ExecutorRewind(queryDesc);
+ if (portal->holdStore)
+ tuplestore_rescan(portal->holdStore);
+ else
+ ExecutorRewind(PortalGetQueryDesc(portal));
portal->atStart = true;
portal->atEnd = false;
@@ -493,22 +577,25 @@ PerformPortalClose(char *name)
/*
* Note: PortalCleanup is called as a side-effect
*/
- PortalDrop(portal);
+ PortalDrop(portal, false);
}
-
/*
* PreparePortal
+ * Given a DECLARE CURSOR statement, returns the Portal data
+ * structure based on that statement that is used to manage the
+ * Portal internally. If a portal with specified name already
+ * exists, it is replaced.
*/
static Portal
-PreparePortal(char *portalName)
+PreparePortal(DeclareCursorStmt *stmt)
{
Portal portal;
/*
* Check for already-in-use portal name.
*/
- portal = GetPortalByName(portalName);
+ portal = GetPortalByName(stmt->portalname);
if (PortalIsValid(portal))
{
/*
@@ -516,19 +603,30 @@ PreparePortal(char *portalName)
* portal?
*/
elog(WARNING, "Closing pre-existing portal \"%s\"",
- portalName);
- PortalDrop(portal);
+ stmt->portalname);
+ PortalDrop(portal, false);
}
/*
* Create the new portal.
*/
- portal = CreatePortal(portalName);
+ portal = CreatePortal(stmt->portalname);
+
+ /*
+ * Modify the newly created portal based on the options specified in
+ * the DECLARE CURSOR statement.
+ */
+ if (stmt->options & CURSOR_OPT_SCROLL)
+ portal->scrollType = ENABLE_SCROLL;
+ else if (stmt->options & CURSOR_OPT_NO_SCROLL)
+ portal->scrollType = DISABLE_SCROLL;
+
+ if (stmt->options & CURSOR_OPT_HOLD)
+ portal->holdOpen = true;
return portal;
}
-
/*
* PortalCleanup
*
@@ -545,14 +643,128 @@ PortalCleanup(Portal portal)
AssertArg(PortalIsValid(portal));
AssertArg(portal->cleanup == PortalCleanup);
+ if (portal->holdStore)
+ tuplestore_end(portal->holdStore);
+ else
+ ExecutorEnd(PortalGetQueryDesc(portal));
+
+}
+
+/*
+ * PersistHoldablePortal
+ *
+ * Prepare the specified Portal for access outside of the current
+ * transaction. When this function returns, all future accesses to the
+ * portal must be done via the Tuplestore (not by invoking the
+ * executor).
+ */
+void
+PersistHoldablePortal(Portal portal)
+{
+ MemoryContext oldcxt;
+ QueryDesc *queryDesc = PortalGetQueryDesc(portal);
+
+ /*
+ * If we're preserving a holdable portal, we had better be
+ * inside the transaction that originally created it.
+ */
+ Assert(portal->createXact == GetCurrentTransactionId());
+ Assert(portal->holdStore == NULL);
+
+ /*
+ * This context is used to store portal data that needs to persist
+ * between transactions.
+ */
+ oldcxt = MemoryContextSwitchTo(portal->holdContext);
+
+ /* XXX: Should SortMem be used for this? */
+ portal->holdStore = tuplestore_begin_heap(true, true, SortMem);
+
+ /* Set the destination to output to the tuplestore */
+ queryDesc->dest = Tuplestore;
+
+ /*
+ * Rewind the executor: we need to store the entire result set in
+ * the tuplestore, so that subsequent backward FETCHs can be
+ * processed.
+ */
+ ExecutorRewind(queryDesc);
+
+ /* Fetch the result set into the tuplestore */
+ ExecutorRun(queryDesc, ForwardScanDirection, 0);
+
+ /*
+ * Reset the position in the result set: ideally, this could be
+ * implemented by just skipping straight to the tuple # that we need
+ * to be at, but the tuplestore API doesn't support that. So we
+ * start at the beginning of the tuplestore and iterate through it
+ * until we reach where we need to be.
+ */
+ if (!portal->atEnd)
+ {
+ int store_pos = 0;
+ bool should_free;
+
+ tuplestore_rescan(portal->holdStore);
+
+ while (store_pos < portal->portalPos)
+ {
+ HeapTuple tmp = tuplestore_gettuple(portal->holdStore,
+ true, &should_free);
+
+ if (tmp == NULL)
+ elog(ERROR,
+ "PersistHoldablePortal: unexpected end of tuple stream");
+
+ store_pos++;
+
+ /*
+ * This could probably be optimized by creating and then
+ * deleting a separate memory context for this series of
+ * operations.
+ */
+ if (should_free)
+ pfree(tmp);
+ }
+ }
+
/*
- * tell the executor to shutdown the query
+ * The current Portal structure contains some data that will be
+ * needed by the holdable cursor, but it has been allocated in a
+ * memory context that is not sufficiently long-lived: we need to
+ * copy it into the portal's long-term memory context.
*/
- ExecutorEnd(PortalGetQueryDesc(portal));
+ {
+ TupleDesc tupDescCopy;
+ QueryDesc *queryDescCopy;
+
+ /*
+ * We need to use this order as ExecutorEnd invalidates the
+ * queryDesc's tuple descriptor
+ */
+ tupDescCopy = CreateTupleDescCopy(queryDesc->tupDesc);
+
+ ExecutorEnd(queryDesc);
+
+ queryDescCopy = palloc(sizeof(*queryDescCopy));
+
+ /*
+ * This doesn't copy all the dependant data in the QueryDesc,
+ * but that's okay -- the only complex field we need to keep is
+ * the query's tupledesc, which we've copied ourselves.
+ */
+ memcpy(queryDescCopy, queryDesc, sizeof(*queryDesc));
+
+ FreeQueryDesc(queryDesc);
+
+ queryDescCopy->tupDesc = tupDescCopy;
+ portal->queryDesc = queryDescCopy;
+ }
/*
- * This should be unnecessary since the querydesc should be in the
- * portal's memory context, but do it anyway for symmetry.
+ * We no longer need the portal's short-term memory context.
*/
- FreeQueryDesc(PortalGetQueryDesc(portal));
+ MemoryContextDelete(PortalGetHeapMemory(portal));
+
+ PortalGetHeapMemory(portal) = NULL;
}