aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/execAmi.c13
-rw-r--r--src/backend/executor/nodeFunctionscan.c41
-rw-r--r--src/backend/executor/nodeMaterial.c33
-rw-r--r--src/backend/utils/sort/tuplestore.c506
-rw-r--r--src/include/executor/nodeFunctionscan.h4
-rw-r--r--src/include/nodes/execnodes.h4
-rw-r--r--src/include/utils/tuplestore.h17
7 files changed, 382 insertions, 236 deletions
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 1381a4a4f09..afddf4d3920 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/executor/execAmi.c,v 1.97 2008/08/05 21:28:29 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/execAmi.c,v 1.98 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -239,10 +239,6 @@ ExecMarkPos(PlanState *node)
ExecTidMarkPos((TidScanState *) node);
break;
- case T_FunctionScanState:
- ExecFunctionMarkPos((FunctionScanState *) node);
- break;
-
case T_ValuesScanState:
ExecValuesMarkPos((ValuesScanState *) node);
break;
@@ -296,10 +292,6 @@ ExecRestrPos(PlanState *node)
ExecTidRestrPos((TidScanState *) node);
break;
- case T_FunctionScanState:
- ExecFunctionRestrPos((FunctionScanState *) node);
- break;
-
case T_ValuesScanState:
ExecValuesRestrPos((ValuesScanState *) node);
break;
@@ -332,7 +324,7 @@ ExecRestrPos(PlanState *node)
* (However, since the only present use of mark/restore is in mergejoin,
* there is no need to support mark/restore in any plan type that is not
* capable of generating ordered output. So the seqscan, tidscan,
- * functionscan, and valuesscan support is actually useless code at present.)
+ * and valuesscan support is actually useless code at present.)
*/
bool
ExecSupportsMarkRestore(NodeTag plantype)
@@ -342,7 +334,6 @@ ExecSupportsMarkRestore(NodeTag plantype)
case T_SeqScan:
case T_IndexScan:
case T_TidScan:
- case T_FunctionScan:
case T_ValuesScan:
case T_Material:
case T_Sort:
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index 6bbb5b139b6..6113a2c9067 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeFunctionscan.c,v 1.46 2008/02/29 02:49:39 neilc Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeFunctionscan.c,v 1.47 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -131,6 +131,9 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags)
TypeFuncClass functypclass;
TupleDesc tupdesc = NULL;
+ /* check for unsupported flags */
+ Assert(!(eflags & EXEC_FLAG_MARK));
+
/*
* FunctionScan should not have any children.
*/
@@ -274,42 +277,6 @@ ExecEndFunctionScan(FunctionScanState *node)
}
/* ----------------------------------------------------------------
- * ExecFunctionMarkPos
- *
- * Calls tuplestore to save the current position in the stored file.
- * ----------------------------------------------------------------
- */
-void
-ExecFunctionMarkPos(FunctionScanState *node)
-{
- /*
- * if we haven't materialized yet, just return.
- */
- if (!node->tuplestorestate)
- return;
-
- tuplestore_markpos(node->tuplestorestate);
-}
-
-/* ----------------------------------------------------------------
- * ExecFunctionRestrPos
- *
- * Calls tuplestore to restore the last saved file position.
- * ----------------------------------------------------------------
- */
-void
-ExecFunctionRestrPos(FunctionScanState *node)
-{
- /*
- * if we haven't materialized yet, just return.
- */
- if (!node->tuplestorestate)
- return;
-
- tuplestore_restorepos(node->tuplestorestate);
-}
-
-/* ----------------------------------------------------------------
* ExecFunctionReScan
*
* Rescans the relation.
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 3c096356a37..494560d0f46 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeMaterial.c,v 1.62 2008/03/23 00:54:04 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeMaterial.c,v 1.63 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,7 +51,7 @@ ExecMaterial(MaterialState *node)
estate = node->ss.ps.state;
dir = estate->es_direction;
forward = ScanDirectionIsForward(dir);
- tuplestorestate = (Tuplestorestate *) node->tuplestorestate;
+ tuplestorestate = node->tuplestorestate;
/*
* If first time through, and we need a tuplestore, initialize it.
@@ -60,7 +60,19 @@ ExecMaterial(MaterialState *node)
{
tuplestorestate = tuplestore_begin_heap(true, false, work_mem);
tuplestore_set_eflags(tuplestorestate, node->eflags);
- node->tuplestorestate = (void *) tuplestorestate;
+ if (node->eflags & EXEC_FLAG_MARK)
+ {
+ /*
+ * Allocate a second read pointer to serve as the mark.
+ * We know it must have index 1, so needn't store that.
+ */
+ int ptrn;
+
+ ptrn = tuplestore_alloc_read_pointer(tuplestorestate,
+ node->eflags);
+ Assert(ptrn == 1);
+ }
+ node->tuplestorestate = tuplestorestate;
}
/*
@@ -236,7 +248,7 @@ ExecEndMaterial(MaterialState *node)
* Release tuplestore resources
*/
if (node->tuplestorestate != NULL)
- tuplestore_end((Tuplestorestate *) node->tuplestorestate);
+ tuplestore_end(node->tuplestorestate);
node->tuplestorestate = NULL;
/*
@@ -262,7 +274,10 @@ ExecMaterialMarkPos(MaterialState *node)
if (!node->tuplestorestate)
return;
- tuplestore_markpos((Tuplestorestate *) node->tuplestorestate);
+ /*
+ * copy the active read pointer to the mark.
+ */
+ tuplestore_copy_read_pointer(node->tuplestorestate, 0, 1);
}
/* ----------------------------------------------------------------
@@ -283,9 +298,9 @@ ExecMaterialRestrPos(MaterialState *node)
return;
/*
- * restore the scan to the previously marked position
+ * copy the mark to the active read pointer.
*/
- tuplestore_restorepos((Tuplestorestate *) node->tuplestorestate);
+ tuplestore_copy_read_pointer(node->tuplestorestate, 1, 0);
}
/* ----------------------------------------------------------------
@@ -322,14 +337,14 @@ ExecMaterialReScan(MaterialState *node, ExprContext *exprCtxt)
if (((PlanState *) node)->lefttree->chgParam != NULL ||
(node->eflags & EXEC_FLAG_REWIND) == 0)
{
- tuplestore_end((Tuplestorestate *) node->tuplestorestate);
+ tuplestore_end(node->tuplestorestate);
node->tuplestorestate = NULL;
if (((PlanState *) node)->lefttree->chgParam == NULL)
ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
node->eof_underlying = false;
}
else
- tuplestore_rescan((Tuplestorestate *) node->tuplestorestate);
+ tuplestore_rescan(node->tuplestorestate);
}
else
{
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index 6d28a0eba50..3b53ad28a5e 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -11,6 +11,8 @@
* before it has all been written. This is particularly useful for cursors,
* because it allows random access within the already-scanned portion of
* a query without having to process the underlying scan to completion.
+ * Also, it is possible to support multiple independent read pointers.
+ *
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
@@ -20,25 +22,31 @@
* maxKBytes, we dump all the tuples into a temp file and then read from that
* when needed.
*
+ * Upon creation, a tuplestore supports a single read pointer, numbered 0.
+ * Additional read pointers can be created using tuplestore_alloc_read_pointer.
+ * Mark/restore behavior is supported by copying read pointers.
+ *
* When the caller requests backward-scan capability, we write the temp file
* in a format that allows either forward or backward scan. Otherwise, only
- * forward scan is allowed. Rewind and markpos/restorepos are normally allowed
- * but can be turned off via tuplestore_set_eflags; turning off both backward
- * scan and rewind enables truncation of the tuplestore at the mark point
- * (if any) for minimal memory usage.
+ * forward scan is allowed. A request for backward scan must be made before
+ * putting any tuples into the tuplestore. Rewind is normally allowed but
+ * can be turned off via tuplestore_set_eflags; turning off both backward
+ * scan and rewind for all read pointers enables truncation of the tuplestore
+ * at the oldest read point for minimal memory usage.
*
- * Because we allow reading before writing is complete, there are two
- * interesting positions in the temp file: the current read position and
- * the current write position. At any given instant, the temp file's seek
- * position corresponds to one of these, and the other one is remembered in
- * the Tuplestore's state.
+ * Note: in TSS_WRITEFILE state, the temp file's seek position is the
+ * current write position, and the write-position variables in the tuplestore
+ * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
+ * seek position is the active read pointer's position, and that read pointer
+ * isn't kept up to date. We update the appropriate variables using ftell()
+ * before switching to the other state or activating a different read pointer.
*
*
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.39 2008/05/12 00:00:53 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.40 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -64,12 +72,34 @@ typedef enum
} TupStoreStatus;
/*
+ * State for a single read pointer. If we are in state INMEM then all the
+ * read pointers' "current" fields denote the read positions. In state
+ * WRITEFILE, the file/offset fields denote the read positions. In state
+ * READFILE, inactive read pointers have valid file/offset, but the active
+ * read pointer implicitly has position equal to the temp file's seek position.
+ *
+ * Special case: if eof_reached is true, then the pointer's read position is
+ * implicitly equal to the write position, and current/file/offset aren't
+ * maintained. This way we need not update all the read pointers each time
+ * we write.
+ */
+typedef struct
+{
+ int eflags; /* capability flags */
+ bool eof_reached; /* read reached EOF */
+ int current; /* next array index to read */
+ int file; /* temp file# */
+ off_t offset; /* byte offset in file */
+} TSReadPointer;
+
+/*
* Private state of a Tuplestore operation.
*/
struct Tuplestorestate
{
TupStoreStatus status; /* enumerated value as shown above */
- int eflags; /* capability flags */
+ int eflags; /* capability flags (OR of pointers' flags) */
+ bool backward; /* store extra length words in file? */
bool interXact; /* keep open through transactions? */
long availMem; /* remaining memory available, in bytes */
BufFile *myfile; /* underlying file, or NULL if none */
@@ -116,31 +146,20 @@ struct Tuplestorestate
int memtupsize; /* allocated length of memtuples array */
/*
- * These variables are used to keep track of the current position.
+ * These variables are used to keep track of the current positions.
*
- * In state WRITEFILE, the current file seek position is the write point,
- * and the read position is remembered in readpos_xxx; in state READFILE,
- * the current file seek position is the read point, and the write
- * position is remembered in writepos_xxx. (The write position is the
- * same as EOF, but since BufFileSeek doesn't currently implement
- * SEEK_END, we have to remember it explicitly.)
- *
- * Special case: if we are in WRITEFILE state and eof_reached is true,
- * then the read position is implicitly equal to the write position (and
- * hence to the file seek position); this way we need not update the
- * readpos_xxx variables on each write.
+ * In state WRITEFILE, the current file seek position is the write point;
+ * in state READFILE, the write position is remembered in writepos_xxx.
+ * (The write position is the same as EOF, but since BufFileSeek doesn't
+ * currently implement SEEK_END, we have to remember it explicitly.)
*/
- bool eof_reached; /* read reached EOF (always valid) */
- int current; /* next array index (valid if INMEM) */
- int readpos_file; /* file# (valid if WRITEFILE and not eof) */
- off_t readpos_offset; /* offset (valid if WRITEFILE and not eof) */
- int writepos_file; /* file# (valid if READFILE) */
- off_t writepos_offset; /* offset (valid if READFILE) */
-
- /* markpos_xxx holds marked position for mark and restore */
- int markpos_current; /* saved "current" */
- int markpos_file; /* saved "readpos_file" */
- off_t markpos_offset; /* saved "readpos_offset" */
+ TSReadPointer *readptrs; /* array of read pointers */
+ int activeptr; /* index of the active read pointer */
+ int readptrcount; /* number of pointers currently valid */
+ int readptrsize; /* allocated length of readptrs array */
+
+ int writepos_file; /* file# (valid if READFILE state) */
+ off_t writepos_offset; /* offset (valid if READFILE state) */
};
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
@@ -160,11 +179,11 @@ struct Tuplestorestate
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
- * If state->eflags & EXEC_FLAG_BACKWARD, then the stored representation of
+ * If state->backward is true, then the stored representation of
* the tuple must be followed by another "unsigned int" that is a copy of the
* length --- so the total tape space used is actually sizeof(unsigned int)
* more than the stored length value. This allows read-backwards. When
- * EXEC_FLAG_BACKWARD is not set, the write/read routines may omit the extra
+ * state->backward is not set, the write/read routines may omit the extra
* length word.
*
* writetup is expected to write both length words as well as the tuple
@@ -184,6 +203,7 @@ struct Tuplestorestate
* We count space allocated for tuples against the maxKBytes limit,
* plus the space used by the variable-size array memtuples.
* Fixed-size space (primarily the BufFile I/O buffer) is not counted.
+ * We don't worry about the size of the read pointer array, either.
*
* Note that we count actual space used (as shown by GetMemoryChunkSpace)
* rather than the originally-requested size. This is important since
@@ -200,7 +220,7 @@ static Tuplestorestate *tuplestore_begin_common(int eflags,
int maxKBytes);
static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
static void dumptuples(Tuplestorestate *state);
-static void tuplestore_trim(Tuplestorestate *state, int ntuples);
+static void tuplestore_trim(Tuplestorestate *state);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
@@ -231,8 +251,15 @@ tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
- state->eof_reached = false;
- state->current = 0;
+ state->activeptr = 0;
+ state->readptrcount = 1;
+ state->readptrsize = 8; /* arbitrary */
+ state->readptrs = (TSReadPointer *)
+ palloc(state->readptrsize * sizeof(TSReadPointer));
+
+ state->readptrs[0].eflags = eflags;
+ state->readptrs[0].eof_reached = false;
+ state->readptrs[0].current = 0;
return state;
}
@@ -267,8 +294,8 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
* the pre-8.3 behavior of tuplestores.
*/
eflags = randomAccess ?
- (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND | EXEC_FLAG_MARK) :
- (EXEC_FLAG_REWIND | EXEC_FLAG_MARK);
+ (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
+ (EXEC_FLAG_REWIND);
state = tuplestore_begin_common(eflags, interXact, maxKBytes);
@@ -282,28 +309,71 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
/*
* tuplestore_set_eflags
*
- * Set capability flags at a finer grain than is allowed by
- * tuplestore_begin_xxx. This must be called before inserting any data
- * into the tuplestore.
+ * Set the capability flags for read pointer 0 at a finer grain than is
+ * allowed by tuplestore_begin_xxx. This must be called before inserting
+ * any data into the tuplestore.
*
* eflags is a bitmask following the meanings used for executor node
* startup flags (see executor.h). tuplestore pays attention to these bits:
* EXEC_FLAG_REWIND need rewind to start
* EXEC_FLAG_BACKWARD need backward fetch
- * EXEC_FLAG_MARK need mark/restore
- * If tuplestore_set_eflags is not called, REWIND and MARK are allowed,
- * and BACKWARD is set per "randomAccess" in the tuplestore_begin_xxx call.
+ * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
+ * is set per "randomAccess" in the tuplestore_begin_xxx call.
*/
void
tuplestore_set_eflags(Tuplestorestate *state, int eflags)
{
- Assert(state->status == TSS_INMEM);
- Assert(state->memtupcount == 0);
+ int i;
+
+ if (state->status != TSS_INMEM || state->memtupcount != 0)
+ elog(ERROR, "too late to call tuplestore_set_eflags");
+ state->readptrs[0].eflags = eflags;
+ for (i = 1; i < state->readptrcount; i++)
+ eflags |= state->readptrs[i].eflags;
state->eflags = eflags;
}
/*
+ * tuplestore_alloc_read_pointer - allocate another read pointer.
+ *
+ * Returns the pointer's index.
+ *
+ * The new pointer initially copies the position of read pointer 0.
+ * It can have its own eflags, but if any data has been inserted into
+ * the tuplestore, these eflags must not represent an increase in
+ * requirements.
+ */
+int
+tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
+{
+ /* Check for possible increase of requirements */
+ if (state->status != TSS_INMEM || state->memtupcount != 0)
+ {
+ if ((state->eflags | eflags) != state->eflags)
+ elog(ERROR, "too late to require new tuplestore eflags");
+ }
+
+ /* Make room for another read pointer if needed */
+ if (state->readptrcount >= state->readptrsize)
+ {
+ int newcnt = state->readptrsize * 2;
+
+ state->readptrs = (TSReadPointer *)
+ repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
+ state->readptrsize = newcnt;
+ }
+
+ /* And set it up */
+ state->readptrs[state->readptrcount] = state->readptrs[0];
+ state->readptrs[state->readptrcount].eflags = eflags;
+
+ state->eflags |= eflags;
+
+ return state->readptrcount++;
+}
+
+/*
* tuplestore_end
*
* Release resources and clean up.
@@ -321,18 +391,71 @@ tuplestore_end(Tuplestorestate *state)
pfree(state->memtuples[i]);
pfree(state->memtuples);
}
+ pfree(state->readptrs);
pfree(state);
}
/*
+ * tuplestore_select_read_pointer - make the specified read pointer active
+ */
+void
+tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
+{
+ TSReadPointer *readptr = &state->readptrs[ptr];
+
+ Assert(ptr >= 0 && ptr < state->readptrcount);
+
+ /* No work if already active */
+ if (ptr == state->activeptr)
+ return;
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+ case TSS_WRITEFILE:
+ /* no work */
+ break;
+ case TSS_READFILE:
+ /*
+ * We have to make the temp file's seek position equal to the
+ * logical position of the read pointer. In eof_reached state,
+ * that's the EOF, which we have available from the saved
+ * write position.
+ */
+ if (readptr->eof_reached)
+ {
+ if (BufFileSeek(state->myfile,
+ state->writepos_file,
+ state->writepos_offset,
+ SEEK_SET) != 0)
+ elog(ERROR, "tuplestore seek failed");
+ }
+ else
+ {
+ if (BufFileSeek(state->myfile,
+ readptr->file,
+ readptr->offset,
+ SEEK_SET) != 0)
+ elog(ERROR, "tuplestore seek failed");
+ }
+ break;
+ default:
+ elog(ERROR, "invalid tuplestore state");
+ break;
+ }
+
+ state->activeptr = ptr;
+}
+
+/*
* tuplestore_ateof
*
- * Returns the current eof_reached state.
+ * Returns the active read pointer's eof_reached state.
*/
bool
tuplestore_ateof(Tuplestorestate *state)
{
- return state->eof_reached;
+ return state->readptrs[state->activeptr].eof_reached;
}
/*
@@ -340,8 +463,8 @@ tuplestore_ateof(Tuplestorestate *state)
*
* Note that the input tuple is always copied; the caller need not save it.
*
- * If the read status is currently "AT EOF" then it remains so (the read
- * pointer advances along with the write pointer); otherwise the read
+ * Any read pointer that is currently "AT EOF" remains so (the read pointer
+ * implicitly advances along with the write pointer); otherwise the read
* pointer is unchanged. This is for the convenience of nodeMaterial.c.
*
* tuplestore_puttupleslot() is a convenience routine to collect data from
@@ -427,10 +550,6 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
/* Stash the tuple in the in-memory array */
state->memtuples[state->memtupcount++] = tuple;
- /* If eof_reached, keep read position in sync */
- if (state->eof_reached)
- state->current = state->memtupcount;
-
/*
* Done if we still fit in available memory and have array slots.
*/
@@ -443,6 +562,12 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
*/
PrepareTempTablespaces();
state->myfile = BufFileCreateTemp(state->interXact);
+ /*
+ * Freeze the decision about whether trailing length words
+ * will be used. We can't change this choice once data is on
+ * tape, even though callers might drop the requirement.
+ */
+ state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
state->status = TSS_WRITEFILE;
dumptuples(state);
break;
@@ -454,13 +579,14 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
/*
* Switch from reading to writing.
*/
- if (!state->eof_reached)
+ if (!state->readptrs[state->activeptr].eof_reached)
BufFileTell(state->myfile,
- &state->readpos_file, &state->readpos_offset);
+ &state->readptrs[state->activeptr].file,
+ &state->readptrs[state->activeptr].offset);
if (BufFileSeek(state->myfile,
state->writepos_file, state->writepos_offset,
SEEK_SET) != 0)
- elog(ERROR, "seek to EOF failed");
+ elog(ERROR, "tuplestore seek to EOF failed");
state->status = TSS_WRITEFILE;
WRITETUP(state, tuple);
break;
@@ -482,10 +608,11 @@ static void *
tuplestore_gettuple(Tuplestorestate *state, bool forward,
bool *should_free)
{
+ TSReadPointer *readptr = &state->readptrs[state->activeptr];
unsigned int tuplen;
void *tup;
- Assert(forward || (state->eflags & EXEC_FLAG_BACKWARD));
+ Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
switch (state->status)
{
@@ -493,35 +620,47 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*should_free = false;
if (forward)
{
- if (state->current < state->memtupcount)
- return state->memtuples[state->current++];
- state->eof_reached = true;
+ if (readptr->eof_reached)
+ return NULL;
+ if (readptr->current < state->memtupcount)
+ {
+ /*
+ * We have another tuple, so return it. Note: in
+ * principle we could try tuplestore_trim() here after
+ * advancing current, but this would cost cycles with
+ * little chance of success, so we don't bother.
+ */
+ return state->memtuples[readptr->current++];
+ }
+ readptr->eof_reached = true;
return NULL;
}
else
{
- if (state->current <= 0)
- return NULL;
-
/*
* if all tuples are fetched already then we return last
* tuple, else - tuple before last returned.
*/
- if (state->eof_reached)
- state->eof_reached = false;
+ if (readptr->eof_reached)
+ {
+ readptr->current = state->memtupcount;
+ readptr->eof_reached = false;
+ }
else
{
- state->current--; /* last returned tuple */
- if (state->current <= 0)
+ if (readptr->current <= 0)
return NULL;
+ readptr->current--; /* last returned tuple */
}
- return state->memtuples[state->current - 1];
+ if (readptr->current <= 0)
+ return NULL;
+ return state->memtuples[readptr->current - 1];
}
break;
case TSS_WRITEFILE:
/* Skip state change if we'll just return NULL */
- if (state->eof_reached && forward)
+ if (readptr->eof_reached && forward)
return NULL;
/*
@@ -529,11 +668,11 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*/
BufFileTell(state->myfile,
&state->writepos_file, &state->writepos_offset);
- if (!state->eof_reached)
+ if (!readptr->eof_reached)
if (BufFileSeek(state->myfile,
- state->readpos_file, state->readpos_offset,
+ readptr->file, readptr->offset,
SEEK_SET) != 0)
- elog(ERROR, "seek failed");
+ elog(ERROR, "tuplestore seek failed");
state->status = TSS_READFILE;
/* FALL THRU into READFILE case */
@@ -548,7 +687,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
}
else
{
- state->eof_reached = true;
+ readptr->eof_reached = true;
return NULL;
}
}
@@ -564,12 +703,16 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*/
if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
SEEK_CUR) != 0)
+ {
+ /* even a failed backwards fetch gets you out of eof state */
+ readptr->eof_reached = false;
return NULL;
+ }
tuplen = getlen(state, false);
- if (state->eof_reached)
+ if (readptr->eof_reached)
{
- state->eof_reached = false;
+ readptr->eof_reached = false;
/* We will return the tuple returned before returning NULL */
}
else
@@ -670,9 +813,9 @@ tuplestore_advance(Tuplestorestate *state, bool forward)
/*
* dumptuples - remove tuples from memory and write to tape
*
- * As a side effect, we must set readpos and markpos to the value
- * corresponding to "current"; otherwise, a dump would lose the current read
- * position.
+ * As a side effect, we must convert each read pointer's position from
+ * "current" to file/offset format. But eof_reached pointers don't
+ * need to change state.
*/
static void
dumptuples(Tuplestorestate *state)
@@ -681,12 +824,15 @@ dumptuples(Tuplestorestate *state)
for (i = 0;; i++)
{
- if (i == state->current)
- BufFileTell(state->myfile,
- &state->readpos_file, &state->readpos_offset);
- if (i == state->markpos_current)
- BufFileTell(state->myfile,
- &state->markpos_file, &state->markpos_offset);
+ TSReadPointer *readptr = state->readptrs;
+ int j;
+
+ for (j = 0; j < state->readptrcount; readptr++, j++)
+ {
+ if (i == readptr->current && !readptr->eof_reached)
+ BufFileTell(state->myfile,
+ &readptr->file, &readptr->offset);
+ }
if (i >= state->memtupcount)
break;
WRITETUP(state, state->memtuples[i]);
@@ -695,28 +841,30 @@ dumptuples(Tuplestorestate *state)
}
/*
- * tuplestore_rescan - rewind and replay the scan
+ * tuplestore_rescan - rewind the active read pointer to start
*/
void
tuplestore_rescan(Tuplestorestate *state)
{
- Assert(state->eflags & EXEC_FLAG_REWIND);
+ TSReadPointer *readptr = &state->readptrs[state->activeptr];
+
+ Assert(readptr->eflags & EXEC_FLAG_REWIND);
switch (state->status)
{
case TSS_INMEM:
- state->eof_reached = false;
- state->current = 0;
+ readptr->eof_reached = false;
+ readptr->current = 0;
break;
case TSS_WRITEFILE:
- state->eof_reached = false;
- state->readpos_file = 0;
- state->readpos_offset = 0L;
+ readptr->eof_reached = false;
+ readptr->file = 0;
+ readptr->offset = 0L;
break;
case TSS_READFILE:
- state->eof_reached = false;
+ readptr->eof_reached = false;
if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
- elog(ERROR, "seek to start failed");
+ elog(ERROR, "tuplestore seek to start failed");
break;
default:
elog(ERROR, "invalid tuplestore state");
@@ -725,85 +873,78 @@ tuplestore_rescan(Tuplestorestate *state)
}
/*
- * tuplestore_markpos - saves current position in the tuple sequence
+ * tuplestore_copy_read_pointer - copy a read pointer's state to another
*/
void
-tuplestore_markpos(Tuplestorestate *state)
+tuplestore_copy_read_pointer(Tuplestorestate *state,
+ int srcptr, int destptr)
{
- Assert(state->eflags & EXEC_FLAG_MARK);
+ TSReadPointer *sptr = &state->readptrs[srcptr];
+ TSReadPointer *dptr = &state->readptrs[destptr];
- switch (state->status)
- {
- case TSS_INMEM:
- state->markpos_current = state->current;
+ Assert(srcptr >= 0 && srcptr < state->readptrcount);
+ Assert(destptr >= 0 && destptr < state->readptrcount);
- /*
- * We can truncate the tuplestore if neither backward scan nor
- * rewind capability are required by the caller. There will never
- * be a need to back up past the mark point.
- *
- * Note: you might think we could remove all the tuples before
- * "current", since that one is the next to be returned. However,
- * since tuplestore_gettuple returns a direct pointer to our
- * internal copy of the tuple, it's likely that the caller has
- * still got the tuple just before "current" referenced in a slot.
- * Don't free it yet.
- */
- if (!(state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND)))
- tuplestore_trim(state, 1);
- break;
- case TSS_WRITEFILE:
- if (state->eof_reached)
- {
- /* Need to record the implicit read position */
- BufFileTell(state->myfile,
- &state->markpos_file,
- &state->markpos_offset);
- }
- else
- {
- state->markpos_file = state->readpos_file;
- state->markpos_offset = state->readpos_offset;
- }
- break;
- case TSS_READFILE:
- BufFileTell(state->myfile,
- &state->markpos_file,
- &state->markpos_offset);
- break;
- default:
- elog(ERROR, "invalid tuplestore state");
- break;
- }
-}
+ /* Assigning to self is a no-op */
+ if (srcptr == destptr)
+ return;
-/*
- * tuplestore_restorepos - restores current position in tuple sequence to
- * last saved position
- */
-void
-tuplestore_restorepos(Tuplestorestate *state)
-{
- Assert(state->eflags & EXEC_FLAG_MARK);
+ if (dptr->eflags != sptr->eflags)
+ {
+ /* Possible change of overall eflags, so copy and then recompute */
+ int eflags;
+ int i;
+
+ *dptr = *sptr;
+ eflags = state->readptrs[0].eflags;
+ for (i = 1; i < state->readptrcount; i++)
+ eflags |= state->readptrs[i].eflags;
+ state->eflags = eflags;
+ }
+ else
+ *dptr = *sptr;
switch (state->status)
{
case TSS_INMEM:
- state->eof_reached = false;
- state->current = state->markpos_current;
+ /* We might be able to truncate the tuplestore */
+ tuplestore_trim(state);
break;
case TSS_WRITEFILE:
- state->eof_reached = false;
- state->readpos_file = state->markpos_file;
- state->readpos_offset = state->markpos_offset;
break;
case TSS_READFILE:
- state->eof_reached = false;
- if (BufFileSeek(state->myfile,
- state->markpos_file,
- state->markpos_offset,
- SEEK_SET) != 0)
- elog(ERROR, "tuplestore_restorepos failed");
+ /*
+ * This case is a bit tricky since the active read pointer's
+ * position corresponds to the seek point, not what is in its
+ * variables. Assigning to the active requires a seek, and
+ * assigning from the active requires a tell, except when
+ * eof_reached.
+ */
+ if (destptr == state->activeptr)
+ {
+ if (dptr->eof_reached)
+ {
+ if (BufFileSeek(state->myfile,
+ state->writepos_file,
+ state->writepos_offset,
+ SEEK_SET) != 0)
+ elog(ERROR, "tuplestore seek failed");
+ }
+ else
+ {
+ if (BufFileSeek(state->myfile,
+ dptr->file, dptr->offset,
+ SEEK_SET) != 0)
+ elog(ERROR, "tuplestore seek failed");
+ }
+ }
+ else if (srcptr == state->activeptr)
+ {
+ if (!dptr->eof_reached)
+ BufFileTell(state->myfile,
+ &dptr->file,
+ &dptr->offset);
+ }
break;
default:
elog(ERROR, "invalid tuplestore state");
@@ -812,22 +953,46 @@ tuplestore_restorepos(Tuplestorestate *state)
}
/*
- * tuplestore_trim - remove all but ntuples tuples before current
+ * tuplestore_trim - remove all no-longer-needed tuples
*/
static void
-tuplestore_trim(Tuplestorestate *state, int ntuples)
+tuplestore_trim(Tuplestorestate *state)
{
+ int oldest;
int nremove;
int i;
/*
+ * We can truncate the tuplestore if neither backward scan nor
+ * rewind capability are required by any read pointer.
+ */
+ if (state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND))
+ return;
+
+ /*
* We don't bother trimming temp files since it usually would mean more
* work than just letting them sit in kernel buffers until they age out.
*/
if (state->status != TSS_INMEM)
return;
- nremove = state->current - ntuples;
+ /* Find the oldest read pointer */
+ oldest = state->memtupcount;
+ for (i = 0; i < state->readptrcount; i++)
+ {
+ if (!state->readptrs[i].eof_reached)
+ oldest = Min(oldest, state->readptrs[i].current);
+ }
+
+ /*
+ * Note: you might think we could remove all the tuples before the oldest
+ * "current", since that one is the next to be returned. However,
+ * since tuplestore_gettuple returns a direct pointer to our
+ * internal copy of the tuple, it's likely that the caller has
+ * still got the tuple just before "current" referenced in a slot.
+ * So we keep one extra tuple before the oldest "current".
+ */
+ nremove = oldest - 1;
if (nremove <= 0)
return; /* nothing to do */
Assert(nremove <= state->memtupcount);
@@ -856,8 +1021,11 @@ tuplestore_trim(Tuplestorestate *state, int ntuples)
(state->memtupcount - nremove) * sizeof(void *));
state->memtupcount -= nremove;
- state->current -= nremove;
- state->markpos_current -= nremove;
+ for (i = 0; i < state->readptrcount; i++)
+ {
+ if (!state->readptrs[i].eof_reached)
+ state->readptrs[i].current -= nremove;
+ }
}
@@ -910,7 +1078,7 @@ writetup_heap(Tuplestorestate *state, void *tup)
if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen)
elog(ERROR, "write failed");
- if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
+ if (state->backward) /* need trailing length word? */
if (BufFileWrite(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "write failed");
@@ -931,7 +1099,7 @@ readtup_heap(Tuplestorestate *state, unsigned int len)
if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)),
len - sizeof(int)) != (size_t) (len - sizeof(int)))
elog(ERROR, "unexpected end of data");
- if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
+ if (state->backward) /* need trailing length word? */
if (BufFileRead(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "unexpected end of data");
diff --git a/src/include/executor/nodeFunctionscan.h b/src/include/executor/nodeFunctionscan.h
index dd499a73c69..d83e9a4f864 100644
--- a/src/include/executor/nodeFunctionscan.h
+++ b/src/include/executor/nodeFunctionscan.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/executor/nodeFunctionscan.h,v 1.11 2008/01/01 19:45:57 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/executor/nodeFunctionscan.h,v 1.12 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -20,8 +20,6 @@ extern int ExecCountSlotsFunctionScan(FunctionScan *node);
extern FunctionScanState *ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecFunctionScan(FunctionScanState *node);
extern void ExecEndFunctionScan(FunctionScanState *node);
-extern void ExecFunctionMarkPos(FunctionScanState *node);
-extern void ExecFunctionRestrPos(FunctionScanState *node);
extern void ExecFunctionReScan(FunctionScanState *node, ExprContext *exprCtxt);
#endif /* NODEFUNCTIONSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ee8c4a2bef7..eb187ce4597 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.187 2008/08/22 00:16:04 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.188 2008/10/01 19:51:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1324,7 +1324,7 @@ typedef struct MaterialState
ScanState ss; /* its first field is NodeTag */
int eflags; /* capability flags to pass to tuplestore */
bool eof_underlying; /* reached end of underlying plan? */
- void *tuplestorestate; /* private state of tuplestore.c */
+ Tuplestorestate *tuplestorestate;
} MaterialState;
/* ----------------
diff --git a/src/include/utils/tuplestore.h b/src/include/utils/tuplestore.h
index 37f99fea3bd..3fe32f682b1 100644
--- a/src/include/utils/tuplestore.h
+++ b/src/include/utils/tuplestore.h
@@ -11,6 +11,8 @@
* before it has all been written. This is particularly useful for cursors,
* because it allows random access within the already-scanned portion of
* a query without having to process the underlying scan to completion.
+ * Also, it is possible to support multiple independent read pointers.
+ *
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
@@ -22,7 +24,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/utils/tuplestore.h,v 1.23 2008/03/25 19:26:53 neilc Exp $
+ * $PostgreSQL: pgsql/src/include/utils/tuplestore.h,v 1.24 2008/10/01 19:51:50 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -57,16 +59,21 @@ extern void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
/* tuplestore_donestoring() used to be required, but is no longer used */
#define tuplestore_donestoring(state) ((void) 0)
+extern int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags);
+
+extern void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr);
+
+extern void tuplestore_copy_read_pointer(Tuplestorestate *state,
+ int srcptr, int destptr);
+
extern bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
TupleTableSlot *slot);
extern bool tuplestore_advance(Tuplestorestate *state, bool forward);
-extern void tuplestore_end(Tuplestorestate *state);
-
extern bool tuplestore_ateof(Tuplestorestate *state);
extern void tuplestore_rescan(Tuplestorestate *state);
-extern void tuplestore_markpos(Tuplestorestate *state);
-extern void tuplestore_restorepos(Tuplestorestate *state);
+
+extern void tuplestore_end(Tuplestorestate *state);
#endif /* TUPLESTORE_H */