aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2010-02-12 17:33:21 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2010-02-12 17:33:21 +0000
commitec4be2ee6827b6bd85e0813c7a8993cfbb0e6fa7 (patch)
treef4f98eb0f5ff45dbcd23778a1c683a1f597431b7 /src/backend/executor
parenta5348fafd182d5b84c89b43af3746711ce28f319 (diff)
downloadpostgresql-ec4be2ee6827b6bd85e0813c7a8993cfbb0e6fa7.tar.gz
postgresql-ec4be2ee6827b6bd85e0813c7a8993cfbb0e6fa7.zip
Extend the set of frame options supported for window functions.
This patch allows the frame to start from CURRENT ROW (in either RANGE or ROWS mode), and it also adds support for ROWS n PRECEDING and ROWS n FOLLOWING start and end points. (RANGE value PRECEDING/FOLLOWING isn't there yet --- the grammar works, but that's all.) Hitoshi Harada, reviewed by Pavel Stehule
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/nodeAgg.c4
-rw-r--r--src/backend/executor/nodeWindowAgg.c648
2 files changed, 528 insertions, 124 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index ea722f1ee3b..9f748ca6c0a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -71,7 +71,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.172 2010/02/08 20:39:51 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.173 2010/02/12 17:33:19 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1999,7 +1999,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
{
if (aggcontext)
- *aggcontext = ((WindowAggState *) fcinfo->context)->wincontext;
+ *aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext;
return AGG_CONTEXT_WINDOW;
}
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 0b90992b80c..c2c0af3cde2 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -27,7 +27,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.9 2010/01/02 16:57:45 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.10 2010/02/12 17:33:19 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -165,6 +165,7 @@ static void release_partition(WindowAggState *winstate);
static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
TupleTableSlot *slot);
+static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
@@ -193,7 +194,7 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValue = peraggstate->initValue;
else
{
- oldContext = MemoryContextSwitchTo(winstate->wincontext);
+ oldContext = MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->transValue = datumCopy(peraggstate->initValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -258,10 +259,10 @@ advance_windowaggregate(WindowAggState *winstate,
* already checked that the agg's input type is binary-compatible
* with its transtype, so straight copy here is OK.)
*
- * We must copy the datum into wincontext if it is pass-by-ref. We
+ * We must copy the datum into aggcontext if it is pass-by-ref. We
* do not need to pfree the old transValue, since it's NULL.
*/
- MemoryContextSwitchTo(winstate->wincontext);
+ MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->transValue = datumCopy(fcinfo->arg[1],
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -294,7 +295,7 @@ advance_windowaggregate(WindowAggState *winstate,
newVal = FunctionCallInvoke(fcinfo);
/*
- * If pass-by-ref datatype, must copy the new value into wincontext and
+ * If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything.
*/
@@ -303,7 +304,7 @@ advance_windowaggregate(WindowAggState *winstate,
{
if (!fcinfo->isnull)
{
- MemoryContextSwitchTo(winstate->wincontext);
+ MemoryContextSwitchTo(winstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -390,6 +391,7 @@ eval_windowaggregates(WindowAggState *winstate)
int i;
MemoryContext oldContext;
ExprContext *econtext;
+ WindowObject agg_winobj;
TupleTableSlot *agg_row_slot;
numaggs = winstate->numaggs;
@@ -398,10 +400,14 @@ eval_windowaggregates(WindowAggState *winstate)
/* final output execution is in ps_ExprContext */
econtext = winstate->ss.ps.ps_ExprContext;
+ agg_winobj = winstate->agg_winobj;
+ agg_row_slot = winstate->agg_row_slot;
/*
* Currently, we support only a subset of the SQL-standard window framing
- * rules. In all the supported cases, the window frame always consists of
+ * rules.
+ *
+ * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
* a contiguous group of rows extending forward from the start of the
* partition, and rows only enter the frame, never exit it, as the current
* row advances forward. This makes it possible to use an incremental
@@ -413,6 +419,10 @@ eval_windowaggregates(WindowAggState *winstate)
* damage the running transition value, but we have the same assumption
* in nodeAgg.c too (when it rescans an existing hash table).
*
+ * For other frame start rules, we discard the aggregate state and re-run
+ * the aggregates whenever the frame head row moves. We can still
+ * optimize as above whenever successive rows share the same frame head.
+ *
* In many common cases, multiple rows share the same frame and hence the
* same aggregate value. (In particular, if there's no ORDER BY in a RANGE
* window, then all rows are peers and so they all have window frame equal
@@ -424,63 +434,90 @@ eval_windowaggregates(WindowAggState *winstate)
* accumulated into the aggregate transition values. Whenever we start a
* new peer group, we accumulate forward to the end of the peer group.
*
- * TODO: In the future, we should implement the full SQL-standard set of
- * framing rules. We could implement the other cases by recalculating the
- * aggregates whenever a row exits the frame. That would be pretty slow,
- * though. For aggregates like SUM and COUNT we could implement a
- * "negative transition function" that would be called for each row as it
- * exits the frame. We'd have to think about avoiding recalculation of
- * volatile arguments of aggregate functions, too.
+ * TODO: Rerunning aggregates from the frame start can be pretty slow.
+ * For some aggregates like SUM and COUNT we could avoid that by
+ * implementing a "negative transition function" that would be called for
+ * each row as it exits the frame. We'd have to think about avoiding
+ * recalculation of volatile arguments of aggregate functions, too.
*/
/*
- * If we've already aggregated up through current row, reuse the saved
- * result values. NOTE: this test works for the currently supported
- * framing rules, but will need fixing when more are added.
+ * First, update the frame head position.
*/
- if (winstate->aggregatedupto > winstate->currentpos)
+ update_frameheadpos(agg_winobj, winstate->temp_slot_1);
+
+ /*
+ * Initialize aggregates on first call for partition, or if the frame
+ * head position moved since last time.
+ */
+ if (winstate->currentpos == 0 ||
+ winstate->frameheadpos != winstate->aggregatedbase)
{
+ /*
+ * Discard transient aggregate values
+ */
+ MemoryContextResetAndDeleteChildren(winstate->aggcontext);
+
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
- econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
- econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
+ initialize_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate);
}
- return;
+
+ /*
+ * If we created a mark pointer for aggregates, keep it pushed up
+ * to frame head, so that tuplestore can discard unnecessary rows.
+ */
+ if (agg_winobj->markptr >= 0)
+ WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
+
+ /*
+ * Initialize for loop below
+ */
+ ExecClearTuple(agg_row_slot);
+ winstate->aggregatedbase = winstate->frameheadpos;
+ winstate->aggregatedupto = winstate->frameheadpos;
}
- /* Initialize aggregates on first call for partition */
- if (winstate->currentpos == 0)
+ /*
+ * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
+ * except when the frame head moves. In END_CURRENT_ROW mode, we only
+ * have to recalculate when the frame head moves or currentpos has advanced
+ * past the place we'd aggregated up to. Check for these cases and if
+ * so, reuse the saved result values.
+ */
+ if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
+ FRAMEOPTION_END_CURRENT_ROW)) &&
+ winstate->aggregatedbase <= winstate->currentpos &&
+ winstate->aggregatedupto > winstate->currentpos)
{
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
- initialize_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
+ econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
}
+ return;
}
/*
* Advance until we reach a row not in frame (or end of partition).
*
* Note the loop invariant: agg_row_slot is either empty or holds the row
- * at position aggregatedupto. The agg_ptr read pointer must always point
- * to the next row to read into agg_row_slot.
+ * at position aggregatedupto. We advance aggregatedupto after processing
+ * a row.
*/
- agg_row_slot = winstate->agg_row_slot;
for (;;)
{
/* Fetch next row if we didn't already */
if (TupIsNull(agg_row_slot))
{
- spool_tuples(winstate, winstate->aggregatedupto);
- tuplestore_select_read_pointer(winstate->buffer,
- winstate->agg_ptr);
- if (!tuplestore_gettupleslot(winstate->buffer, true, true,
- agg_row_slot))
+ if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
+ agg_row_slot))
break; /* must be end of partition */
}
@@ -544,11 +581,11 @@ eval_windowaggregates(WindowAggState *winstate)
pfree(DatumGetPointer(peraggstate->resultValue));
/*
- * If pass-by-ref, copy it into our global context.
+ * If pass-by-ref, copy it into our aggregate context.
*/
if (!*isnull)
{
- oldContext = MemoryContextSwitchTo(winstate->wincontext);
+ oldContext = MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->resultValue =
datumCopy(*result,
peraggstate->resulttypeByVal,
@@ -624,11 +661,12 @@ begin_partition(WindowAggState *winstate)
int i;
winstate->partition_spooled = false;
+ winstate->framehead_valid = false;
winstate->frametail_valid = false;
winstate->spooled_rows = 0;
winstate->currentpos = 0;
+ winstate->frameheadpos = 0;
winstate->frametailpos = -1;
- winstate->aggregatedupto = 0;
ExecClearTuple(winstate->agg_row_slot);
/*
@@ -654,18 +692,39 @@ begin_partition(WindowAggState *winstate)
winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
/*
- * Set up read pointers for the tuplestore. The current and agg pointers
- * don't need BACKWARD capability, but the per-window-function read
- * pointers do.
+ * Set up read pointers for the tuplestore. The current pointer doesn't
+ * need BACKWARD capability, but the per-window-function read pointers do,
+ * and the aggregate pointer does if frame start is movable.
*/
winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
/* reset default REWIND capability bit for current ptr */
tuplestore_set_eflags(winstate->buffer, 0);
- /* create a read pointer for aggregates, if needed */
+ /* create read pointers for aggregates, if needed */
if (winstate->numaggs > 0)
- winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
+ {
+ WindowObject agg_winobj = winstate->agg_winobj;
+ int readptr_flags = 0;
+
+ /* If the frame head is potentially movable ... */
+ if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+ {
+ /* ... create a mark pointer to track the frame head */
+ agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
+ /* and the read pointer will need BACKWARD capability */
+ readptr_flags |= EXEC_FLAG_BACKWARD;
+ }
+
+ agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
+ readptr_flags);
+ agg_winobj->markpos = -1;
+ agg_winobj->seekpos = -1;
+
+ /* Also reset the row counters for aggregates */
+ winstate->aggregatedbase = 0;
+ winstate->aggregatedupto = 0;
+ }
/* create mark and read pointers for each real window function */
for (i = 0; i < numfuncs; i++)
@@ -694,8 +753,8 @@ begin_partition(WindowAggState *winstate)
}
/*
- * Read tuples from the outer node, up to position 'pos', and store them
- * into the tuplestore. If pos is -1, reads the whole partition.
+ * Read tuples from the outer node, up to and including position 'pos', and
+ * store them into the tuplestore. If pos is -1, reads the whole partition.
*/
static void
spool_tuples(WindowAggState *winstate, int64 pos)
@@ -789,7 +848,8 @@ release_partition(WindowAggState *winstate)
* any aggregate temp data). We don't rely on retail pfree because some
* aggregates might have allocated data we don't have direct pointers to.
*/
- MemoryContextResetAndDeleteChildren(winstate->wincontext);
+ MemoryContextResetAndDeleteChildren(winstate->partcontext);
+ MemoryContextResetAndDeleteChildren(winstate->aggcontext);
if (winstate->buffer)
tuplestore_end(winstate->buffer);
@@ -809,108 +869,303 @@ release_partition(WindowAggState *winstate)
static bool
row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
{
- WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
- int frameOptions = node->frameOptions;
+ int frameOptions = winstate->frameOptions;
Assert(pos >= 0); /* else caller error */
- /* We only support frame start mode UNBOUNDED PRECEDING for now */
- Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
+ /* First, check frame starting conditions */
+ if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* rows before current row are out of frame */
+ if (pos < winstate->currentpos)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* preceding row that is not peer is out of frame */
+ if (pos < winstate->currentpos &&
+ !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ return false;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_START_VALUE)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ int64 offset = DatumGetInt64(winstate->startOffsetValue);
- /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
- if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
- return true;
+ /* rows before current row + offset are out of frame */
+ if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
+ offset = -offset;
- /* Else frame tail mode must be CURRENT ROW */
- Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
+ if (pos < winstate->currentpos + offset)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
+ }
- /* if row is current row or a predecessor, it must be in frame */
- if (pos <= winstate->currentpos)
- return true;
+ /* Okay so far, now check frame ending conditions */
+ if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* rows after current row are out of frame */
+ if (pos > winstate->currentpos)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* following row that is not peer is out of frame */
+ if (pos > winstate->currentpos &&
+ !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ return false;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_END_VALUE)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ int64 offset = DatumGetInt64(winstate->endOffsetValue);
- /* In ROWS mode, *only* such rows are in frame */
- if (frameOptions & FRAMEOPTION_ROWS)
- return false;
+ /* rows after current row + offset are out of frame */
+ if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
+ offset = -offset;
- /* Else must be RANGE mode */
- Assert(frameOptions & FRAMEOPTION_RANGE);
+ if (pos > winstate->currentpos + offset)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
+ }
- /* In frame iff it's a peer of current row */
- return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot);
+ /* If we get here, it's in frame */
+ return true;
}
/*
- * update_frametailpos
- * make frametailpos valid for the current row
+ * update_frameheadpos
+ * make frameheadpos valid for the current row
*
- * Uses the winobj's read pointer for any required fetches; the winobj's
- * mark must not be past the currently known frame tail. Also uses the
- * specified slot for any required fetches.
+ * Uses the winobj's read pointer for any required fetches; hence, if the
+ * frame mode is one that requires row comparisons, the winobj's mark must
+ * not be past the currently known frame head. Also uses the specified slot
+ * for any required fetches.
*/
static void
-update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
+update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
{
WindowAggState *winstate = winobj->winstate;
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
- int frameOptions = node->frameOptions;
- int64 ftnext;
+ int frameOptions = winstate->frameOptions;
- if (winstate->frametail_valid)
+ if (winstate->framehead_valid)
return; /* already known for current row */
- /* We only support frame start mode UNBOUNDED PRECEDING for now */
- Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
-
- /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
- if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
+ if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
{
- spool_tuples(winstate, -1);
- winstate->frametailpos = winstate->spooled_rows - 1;
- winstate->frametail_valid = true;
- return;
+ /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
+ winstate->frameheadpos = 0;
+ winstate->framehead_valid = true;
}
+ else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, frame head is the same as current */
+ winstate->frameheadpos = winstate->currentpos;
+ winstate->framehead_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ int64 fhprev;
- /* Else frame tail mode must be CURRENT ROW */
- Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
+ /* If no ORDER BY, all rows are peers with each other */
+ if (node->ordNumCols == 0)
+ {
+ winstate->frameheadpos = 0;
+ winstate->framehead_valid = true;
+ return;
+ }
- /* In ROWS mode, exactly the rows up to current are in frame */
- if (frameOptions & FRAMEOPTION_ROWS)
+ /*
+ * In RANGE START_CURRENT mode, frame head is the first row that
+ * is a peer of current row. We search backwards from current,
+ * which could be a bit inefficient if peer sets are large.
+ * Might be better to have a separate read pointer that moves
+ * forward tracking the frame head.
+ */
+ fhprev = winstate->currentpos - 1;
+ for (;;)
+ {
+ /* assume the frame head can't go backwards */
+ if (fhprev < winstate->frameheadpos)
+ break;
+ if (!window_gettupleslot(winobj, fhprev, slot))
+ break; /* start of partition */
+ if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ break; /* not peer of current row */
+ fhprev--;
+ }
+ winstate->frameheadpos = fhprev + 1;
+ winstate->framehead_valid = true;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_START_VALUE)
{
- winstate->frametailpos = winstate->currentpos;
- winstate->frametail_valid = true;
- return;
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, bound is physically n before/after current */
+ int64 offset = DatumGetInt64(winstate->startOffsetValue);
+
+ if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
+ offset = -offset;
+
+ winstate->frameheadpos = winstate->currentpos + offset;
+ /* frame head can't go before first row */
+ if (winstate->frameheadpos < 0)
+ winstate->frameheadpos = 0;
+ else if (winstate->frameheadpos > winstate->currentpos)
+ {
+ /* make sure frameheadpos is not past end of partition */
+ spool_tuples(winstate, winstate->frameheadpos - 1);
+ if (winstate->frameheadpos > winstate->spooled_rows)
+ winstate->frameheadpos = winstate->spooled_rows;
+ }
+ winstate->framehead_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
}
+ else
+ Assert(false);
+}
- /* Else must be RANGE mode */
- Assert(frameOptions & FRAMEOPTION_RANGE);
+/*
+ * update_frametailpos
+ * make frametailpos valid for the current row
+ *
+ * Uses the winobj's read pointer for any required fetches; hence, if the
+ * frame mode is one that requires row comparisons, the winobj's mark must
+ * not be past the currently known frame tail. Also uses the specified slot
+ * for any required fetches.
+ */
+static void
+update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
+{
+ WindowAggState *winstate = winobj->winstate;
+ WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
+ int frameOptions = winstate->frameOptions;
- /* If no ORDER BY, all rows are peers with each other */
- if (node->ordNumCols == 0)
+ if (winstate->frametail_valid)
+ return; /* already known for current row */
+
+ if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
{
+ /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
spool_tuples(winstate, -1);
winstate->frametailpos = winstate->spooled_rows - 1;
winstate->frametail_valid = true;
- return;
}
+ else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, exactly the rows up to current are in frame */
+ winstate->frametailpos = winstate->currentpos;
+ winstate->frametail_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ int64 ftnext;
- /*
- * Else we have to search for the first non-peer of the current row. We
- * assume the current value of frametailpos is a lower bound on the
- * possible frame tail location, ie, frame tail never goes backward, and
- * that currentpos is also a lower bound, ie, current row is always in
- * frame.
- */
- ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
- for (;;)
+ /* If no ORDER BY, all rows are peers with each other */
+ if (node->ordNumCols == 0)
+ {
+ spool_tuples(winstate, -1);
+ winstate->frametailpos = winstate->spooled_rows - 1;
+ winstate->frametail_valid = true;
+ return;
+ }
+
+ /*
+ * Else we have to search for the first non-peer of the current
+ * row. We assume the current value of frametailpos is a lower
+ * bound on the possible frame tail location, ie, frame tail never
+ * goes backward, and that currentpos is also a lower bound, ie,
+ * frame end always >= current row.
+ */
+ ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
+ for (;;)
+ {
+ if (!window_gettupleslot(winobj, ftnext, slot))
+ break; /* end of partition */
+ if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ break; /* not peer of current row */
+ ftnext++;
+ }
+ winstate->frametailpos = ftnext - 1;
+ winstate->frametail_valid = true;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_END_VALUE)
{
- if (!window_gettupleslot(winobj, ftnext, slot))
- break; /* end of partition */
- if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
- break; /* not peer of current row */
- ftnext++;
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, bound is physically n before/after current */
+ int64 offset = DatumGetInt64(winstate->endOffsetValue);
+
+ if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
+ offset = -offset;
+
+ winstate->frametailpos = winstate->currentpos + offset;
+ /* smallest allowable value of frametailpos is -1 */
+ if (winstate->frametailpos < 0)
+ winstate->frametailpos = -1;
+ else if (winstate->frametailpos > winstate->currentpos)
+ {
+ /* make sure frametailpos is not past last row of partition */
+ spool_tuples(winstate, winstate->frametailpos);
+ if (winstate->frametailpos >= winstate->spooled_rows)
+ winstate->frametailpos = winstate->spooled_rows - 1;
+ }
+ winstate->frametail_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
}
- winstate->frametailpos = ftnext - 1;
- winstate->frametail_valid = true;
+ else
+ Assert(false);
}
@@ -953,6 +1208,73 @@ ExecWindowAgg(WindowAggState *winstate)
winstate->ss.ps.ps_TupFromTlist = false;
}
+ /*
+ * Compute frame offset values, if any, during first call.
+ */
+ if (winstate->all_first)
+ {
+ int frameOptions = winstate->frameOptions;
+ ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
+ Datum value;
+ bool isnull;
+ int16 len;
+ bool byval;
+
+ if (frameOptions & FRAMEOPTION_START_VALUE)
+ {
+ Assert(winstate->startOffset != NULL);
+ value = ExecEvalExprSwitchContext(winstate->startOffset,
+ econtext,
+ &isnull,
+ NULL);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("frame starting offset must not be NULL")));
+ /* copy value into query-lifespan context */
+ get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
+ &len, &byval);
+ winstate->startOffsetValue = datumCopy(value, byval, len);
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* value is known to be int8 */
+ int64 offset = DatumGetInt64(value);
+
+ if (offset < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("frame starting offset must not be negative")));
+ }
+ }
+ if (frameOptions & FRAMEOPTION_END_VALUE)
+ {
+ Assert(winstate->endOffset != NULL);
+ value = ExecEvalExprSwitchContext(winstate->endOffset,
+ econtext,
+ &isnull,
+ NULL);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("frame ending offset must not be NULL")));
+ /* copy value into query-lifespan context */
+ get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
+ &len, &byval);
+ winstate->endOffsetValue = datumCopy(value, byval, len);
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* value is known to be int8 */
+ int64 offset = DatumGetInt64(value);
+
+ if (offset < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("frame ending offset must not be negative")));
+ }
+ }
+ winstate->all_first = false;
+ }
+
restart:
if (winstate->buffer == NULL)
{
@@ -964,7 +1286,8 @@ restart:
{
/* Advance current row within partition */
winstate->currentpos++;
- /* This might mean that the frame tail moves, too */
+ /* This might mean that the frame moves, too */
+ winstate->framehead_valid = false;
winstate->frametail_valid = false;
}
@@ -1099,10 +1422,18 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
winstate->tmpcontext = tmpcontext;
ExecAssignExprContext(estate, &winstate->ss.ps);
- /* Create long-lived context for storage of aggregate transvalues etc */
- winstate->wincontext =
+ /* Create long-lived context for storage of partition-local memory etc */
+ winstate->partcontext =
AllocSetContextCreate(CurrentMemoryContext,
- "WindowAggContext",
+ "WindowAgg_Partition",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Create mid-lived context for aggregate trans values etc */
+ winstate->aggcontext =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "WindowAgg_Aggregates",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -1229,7 +1560,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
perfuncstate->numArguments = list_length(wfuncstate->args);
fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
- tmpcontext->ecxt_per_query_memory);
+ econtext->ecxt_per_query_memory);
perfuncstate->flinfo.fn_expr = (Node *) wfunc;
get_typlenbyval(wfunc->wintype,
&perfuncstate->resulttypeLen,
@@ -1264,6 +1595,30 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
winstate->numfuncs = wfuncno + 1;
winstate->numaggs = aggno + 1;
+ /* Set up WindowObject for aggregates, if needed */
+ if (winstate->numaggs > 0)
+ {
+ WindowObject agg_winobj = makeNode(WindowObjectData);
+
+ agg_winobj->winstate = winstate;
+ agg_winobj->argstates = NIL;
+ agg_winobj->localmem = NULL;
+ /* make sure markptr = -1 to invalidate. It may not get used */
+ agg_winobj->markptr = -1;
+ agg_winobj->readptr = -1;
+ winstate->agg_winobj = agg_winobj;
+ }
+
+ /* copy frame options to state node for easy access */
+ winstate->frameOptions = node->frameOptions;
+
+ /* initialize frame bound offset expressions */
+ winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
+ (PlanState *) winstate);
+ winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
+ (PlanState *) winstate);
+
+ winstate->all_first = true;
winstate->partition_spooled = false;
winstate->more_partitions = false;
@@ -1297,7 +1652,8 @@ ExecEndWindowAgg(WindowAggState *node)
node->ss.ps.ps_ExprContext = node->tmpcontext;
ExecFreeExprContext(&node->ss.ps);
- MemoryContextDelete(node->wincontext);
+ MemoryContextDelete(node->partcontext);
+ MemoryContextDelete(node->aggcontext);
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
@@ -1315,6 +1671,7 @@ ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
node->all_done = false;
node->ss.ps.ps_TupFromTlist = false;
+ node->all_first = true;
/* release tuplestore et al */
release_partition(node);
@@ -1566,7 +1923,7 @@ window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
* There's no API to refetch the tuple at the current position. We have to
* move one tuple forward, and then one backward. (We don't do it the
* other way because we might try to fetch the row before our mark, which
- * isn't allowed.)
+ * isn't allowed.) XXX this case could stand to be optimized.
*/
if (winobj->seekpos == pos)
{
@@ -1616,8 +1973,8 @@ WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
{
Assert(WindowObjectIsValid(winobj));
if (winobj->localmem == NULL)
- winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
- sz);
+ winobj->localmem =
+ MemoryContextAllocZero(winobj->winstate->partcontext, sz);
return winobj->localmem;
}
@@ -1791,7 +2148,30 @@ WinGetFuncArgInPartition(WindowObject winobj, int argno,
if (isout)
*isout = false;
if (set_mark)
- WinSetMarkPosition(winobj, abs_pos);
+ {
+ int frameOptions = winstate->frameOptions;
+ int64 mark_pos = abs_pos;
+
+ /*
+ * In RANGE mode with a moving frame head, we must not let the
+ * mark advance past frameheadpos, since that row has to be
+ * fetchable during future update_frameheadpos calls.
+ *
+ * XXX it is very ugly to pollute window functions' marks with
+ * this consideration; it could for instance mask a logic bug
+ * that lets a window function fetch rows before what it had
+ * claimed was its mark. Perhaps use a separate mark for
+ * frame head probes?
+ */
+ if ((frameOptions & FRAMEOPTION_RANGE) &&
+ !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+ {
+ update_frameheadpos(winobj, winstate->temp_slot_2);
+ if (mark_pos > winstate->frameheadpos)
+ mark_pos = winstate->frameheadpos;
+ }
+ WinSetMarkPosition(winobj, mark_pos);
+ }
econtext->ecxt_outertuple = slot;
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
econtext, isnull, NULL);
@@ -1838,7 +2218,8 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno,
abs_pos = winstate->currentpos + relpos;
break;
case WINDOW_SEEK_HEAD:
- abs_pos = relpos;
+ update_frameheadpos(winobj, slot);
+ abs_pos = winstate->frameheadpos + relpos;
break;
case WINDOW_SEEK_TAIL:
update_frametailpos(winobj, slot);
@@ -1866,7 +2247,30 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno,
if (isout)
*isout = false;
if (set_mark)
- WinSetMarkPosition(winobj, abs_pos);
+ {
+ int frameOptions = winstate->frameOptions;
+ int64 mark_pos = abs_pos;
+
+ /*
+ * In RANGE mode with a moving frame head, we must not let the
+ * mark advance past frameheadpos, since that row has to be
+ * fetchable during future update_frameheadpos calls.
+ *
+ * XXX it is very ugly to pollute window functions' marks with
+ * this consideration; it could for instance mask a logic bug
+ * that lets a window function fetch rows before what it had
+ * claimed was its mark. Perhaps use a separate mark for
+ * frame head probes?
+ */
+ if ((frameOptions & FRAMEOPTION_RANGE) &&
+ !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+ {
+ update_frameheadpos(winobj, winstate->temp_slot_2);
+ if (mark_pos > winstate->frameheadpos)
+ mark_pos = winstate->frameheadpos;
+ }
+ WinSetMarkPosition(winobj, mark_pos);
+ }
econtext->ecxt_outertuple = slot;
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
econtext, isnull, NULL);