diff options
Diffstat (limited to 'src/backend/executor/nodeWindowAgg.c')
-rw-r--r-- | src/backend/executor/nodeWindowAgg.c | 648 |
1 files changed, 526 insertions, 122 deletions
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 0b90992b80c..c2c0af3cde2 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -27,7 +27,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.9 2010/01/02 16:57:45 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.10 2010/02/12 17:33:19 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -165,6 +165,7 @@ static void release_partition(WindowAggState *winstate); static bool row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot); +static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot); static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot); static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate, @@ -193,7 +194,7 @@ initialize_windowaggregate(WindowAggState *winstate, peraggstate->transValue = peraggstate->initValue; else { - oldContext = MemoryContextSwitchTo(winstate->wincontext); + oldContext = MemoryContextSwitchTo(winstate->aggcontext); peraggstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -258,10 +259,10 @@ advance_windowaggregate(WindowAggState *winstate, * already checked that the agg's input type is binary-compatible * with its transtype, so straight copy here is OK.) * - * We must copy the datum into wincontext if it is pass-by-ref. We + * We must copy the datum into aggcontext if it is pass-by-ref. We * do not need to pfree the old transValue, since it's NULL. */ - MemoryContextSwitchTo(winstate->wincontext); + MemoryContextSwitchTo(winstate->aggcontext); peraggstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -294,7 +295,7 @@ advance_windowaggregate(WindowAggState *winstate, newVal = FunctionCallInvoke(fcinfo); /* - * If pass-by-ref datatype, must copy the new value into wincontext and + * If pass-by-ref datatype, must copy the new value into aggcontext and * pfree the prior transValue. But if transfn returned a pointer to its * first input, we don't need to do anything. */ @@ -303,7 +304,7 @@ advance_windowaggregate(WindowAggState *winstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(winstate->wincontext); + MemoryContextSwitchTo(winstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -390,6 +391,7 @@ eval_windowaggregates(WindowAggState *winstate) int i; MemoryContext oldContext; ExprContext *econtext; + WindowObject agg_winobj; TupleTableSlot *agg_row_slot; numaggs = winstate->numaggs; @@ -398,10 +400,14 @@ eval_windowaggregates(WindowAggState *winstate) /* final output execution is in ps_ExprContext */ econtext = winstate->ss.ps.ps_ExprContext; + agg_winobj = winstate->agg_winobj; + agg_row_slot = winstate->agg_row_slot; /* * Currently, we support only a subset of the SQL-standard window framing - * rules. In all the supported cases, the window frame always consists of + * rules. + * + * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of * a contiguous group of rows extending forward from the start of the * partition, and rows only enter the frame, never exit it, as the current * row advances forward. This makes it possible to use an incremental @@ -413,6 +419,10 @@ eval_windowaggregates(WindowAggState *winstate) * damage the running transition value, but we have the same assumption * in nodeAgg.c too (when it rescans an existing hash table). * + * For other frame start rules, we discard the aggregate state and re-run + * the aggregates whenever the frame head row moves. We can still + * optimize as above whenever successive rows share the same frame head. + * * In many common cases, multiple rows share the same frame and hence the * same aggregate value. (In particular, if there's no ORDER BY in a RANGE * window, then all rows are peers and so they all have window frame equal @@ -424,63 +434,90 @@ eval_windowaggregates(WindowAggState *winstate) * accumulated into the aggregate transition values. Whenever we start a * new peer group, we accumulate forward to the end of the peer group. * - * TODO: In the future, we should implement the full SQL-standard set of - * framing rules. We could implement the other cases by recalculating the - * aggregates whenever a row exits the frame. That would be pretty slow, - * though. For aggregates like SUM and COUNT we could implement a - * "negative transition function" that would be called for each row as it - * exits the frame. We'd have to think about avoiding recalculation of - * volatile arguments of aggregate functions, too. + * TODO: Rerunning aggregates from the frame start can be pretty slow. + * For some aggregates like SUM and COUNT we could avoid that by + * implementing a "negative transition function" that would be called for + * each row as it exits the frame. We'd have to think about avoiding + * recalculation of volatile arguments of aggregate functions, too. */ /* - * If we've already aggregated up through current row, reuse the saved - * result values. NOTE: this test works for the currently supported - * framing rules, but will need fixing when more are added. + * First, update the frame head position. */ - if (winstate->aggregatedupto > winstate->currentpos) + update_frameheadpos(agg_winobj, winstate->temp_slot_1); + + /* + * Initialize aggregates on first call for partition, or if the frame + * head position moved since last time. + */ + if (winstate->currentpos == 0 || + winstate->frameheadpos != winstate->aggregatedbase) { + /* + * Discard transient aggregate values + */ + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; - econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); } - return; + + /* + * If we created a mark pointer for aggregates, keep it pushed up + * to frame head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Initialize for loop below + */ + ExecClearTuple(agg_row_slot); + winstate->aggregatedbase = winstate->frameheadpos; + winstate->aggregatedupto = winstate->frameheadpos; } - /* Initialize aggregates on first call for partition */ - if (winstate->currentpos == 0) + /* + * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates + * except when the frame head moves. In END_CURRENT_ROW mode, we only + * have to recalculate when the frame head moves or currentpos has advanced + * past the place we'd aggregated up to. Check for these cases and if + * so, reuse the saved result values. + */ + if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + FRAMEOPTION_END_CURRENT_ROW)) && + winstate->aggregatedbase <= winstate->currentpos && + winstate->aggregatedupto > winstate->currentpos) { for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - initialize_windowaggregate(winstate, - &winstate->perfunc[wfuncno], - peraggstate); + econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; + econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; } + return; } /* * Advance until we reach a row not in frame (or end of partition). * * Note the loop invariant: agg_row_slot is either empty or holds the row - * at position aggregatedupto. The agg_ptr read pointer must always point - * to the next row to read into agg_row_slot. + * at position aggregatedupto. We advance aggregatedupto after processing + * a row. */ - agg_row_slot = winstate->agg_row_slot; for (;;) { /* Fetch next row if we didn't already */ if (TupIsNull(agg_row_slot)) { - spool_tuples(winstate, winstate->aggregatedupto); - tuplestore_select_read_pointer(winstate->buffer, - winstate->agg_ptr); - if (!tuplestore_gettupleslot(winstate->buffer, true, true, - agg_row_slot)) + if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto, + agg_row_slot)) break; /* must be end of partition */ } @@ -544,11 +581,11 @@ eval_windowaggregates(WindowAggState *winstate) pfree(DatumGetPointer(peraggstate->resultValue)); /* - * If pass-by-ref, copy it into our global context. + * If pass-by-ref, copy it into our aggregate context. */ if (!*isnull) { - oldContext = MemoryContextSwitchTo(winstate->wincontext); + oldContext = MemoryContextSwitchTo(winstate->aggcontext); peraggstate->resultValue = datumCopy(*result, peraggstate->resulttypeByVal, @@ -624,11 +661,12 @@ begin_partition(WindowAggState *winstate) int i; winstate->partition_spooled = false; + winstate->framehead_valid = false; winstate->frametail_valid = false; winstate->spooled_rows = 0; winstate->currentpos = 0; + winstate->frameheadpos = 0; winstate->frametailpos = -1; - winstate->aggregatedupto = 0; ExecClearTuple(winstate->agg_row_slot); /* @@ -654,18 +692,39 @@ begin_partition(WindowAggState *winstate) winstate->buffer = tuplestore_begin_heap(false, false, work_mem); /* - * Set up read pointers for the tuplestore. The current and agg pointers - * don't need BACKWARD capability, but the per-window-function read - * pointers do. + * Set up read pointers for the tuplestore. The current pointer doesn't + * need BACKWARD capability, but the per-window-function read pointers do, + * and the aggregate pointer does if frame start is movable. */ winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */ /* reset default REWIND capability bit for current ptr */ tuplestore_set_eflags(winstate->buffer, 0); - /* create a read pointer for aggregates, if needed */ + /* create read pointers for aggregates, if needed */ if (winstate->numaggs > 0) - winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0); + { + WindowObject agg_winobj = winstate->agg_winobj; + int readptr_flags = 0; + + /* If the frame head is potentially movable ... */ + if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)) + { + /* ... create a mark pointer to track the frame head */ + agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0); + /* and the read pointer will need BACKWARD capability */ + readptr_flags |= EXEC_FLAG_BACKWARD; + } + + agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, + readptr_flags); + agg_winobj->markpos = -1; + agg_winobj->seekpos = -1; + + /* Also reset the row counters for aggregates */ + winstate->aggregatedbase = 0; + winstate->aggregatedupto = 0; + } /* create mark and read pointers for each real window function */ for (i = 0; i < numfuncs; i++) @@ -694,8 +753,8 @@ begin_partition(WindowAggState *winstate) } /* - * Read tuples from the outer node, up to position 'pos', and store them - * into the tuplestore. If pos is -1, reads the whole partition. + * Read tuples from the outer node, up to and including position 'pos', and + * store them into the tuplestore. If pos is -1, reads the whole partition. */ static void spool_tuples(WindowAggState *winstate, int64 pos) @@ -789,7 +848,8 @@ release_partition(WindowAggState *winstate) * any aggregate temp data). We don't rely on retail pfree because some * aggregates might have allocated data we don't have direct pointers to. */ - MemoryContextResetAndDeleteChildren(winstate->wincontext); + MemoryContextResetAndDeleteChildren(winstate->partcontext); + MemoryContextResetAndDeleteChildren(winstate->aggcontext); if (winstate->buffer) tuplestore_end(winstate->buffer); @@ -809,108 +869,303 @@ release_partition(WindowAggState *winstate) static bool row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot) { - WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; - int frameOptions = node->frameOptions; + int frameOptions = winstate->frameOptions; Assert(pos >= 0); /* else caller error */ - /* We only support frame start mode UNBOUNDED PRECEDING for now */ - Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING); + /* First, check frame starting conditions */ + if (frameOptions & FRAMEOPTION_START_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* rows before current row are out of frame */ + if (pos < winstate->currentpos) + return false; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* preceding row that is not peer is out of frame */ + if (pos < winstate->currentpos && + !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) + return false; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_START_VALUE) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + int64 offset = DatumGetInt64(winstate->startOffsetValue); - /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */ - if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) - return true; + /* rows before current row + offset are out of frame */ + if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING) + offset = -offset; - /* Else frame tail mode must be CURRENT ROW */ - Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW); + if (pos < winstate->currentpos + offset) + return false; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* parser should have rejected this */ + elog(ERROR, "window frame with value offset is not implemented"); + } + else + Assert(false); + } - /* if row is current row or a predecessor, it must be in frame */ - if (pos <= winstate->currentpos) - return true; + /* Okay so far, now check frame ending conditions */ + if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* rows after current row are out of frame */ + if (pos > winstate->currentpos) + return false; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* following row that is not peer is out of frame */ + if (pos > winstate->currentpos && + !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) + return false; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_END_VALUE) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + int64 offset = DatumGetInt64(winstate->endOffsetValue); - /* In ROWS mode, *only* such rows are in frame */ - if (frameOptions & FRAMEOPTION_ROWS) - return false; + /* rows after current row + offset are out of frame */ + if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING) + offset = -offset; - /* Else must be RANGE mode */ - Assert(frameOptions & FRAMEOPTION_RANGE); + if (pos > winstate->currentpos + offset) + return false; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* parser should have rejected this */ + elog(ERROR, "window frame with value offset is not implemented"); + } + else + Assert(false); + } - /* In frame iff it's a peer of current row */ - return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot); + /* If we get here, it's in frame */ + return true; } /* - * update_frametailpos - * make frametailpos valid for the current row + * update_frameheadpos + * make frameheadpos valid for the current row * - * Uses the winobj's read pointer for any required fetches; the winobj's - * mark must not be past the currently known frame tail. Also uses the - * specified slot for any required fetches. + * Uses the winobj's read pointer for any required fetches; hence, if the + * frame mode is one that requires row comparisons, the winobj's mark must + * not be past the currently known frame head. Also uses the specified slot + * for any required fetches. */ static void -update_frametailpos(WindowObject winobj, TupleTableSlot *slot) +update_frameheadpos(WindowObject winobj, TupleTableSlot *slot) { WindowAggState *winstate = winobj->winstate; WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; - int frameOptions = node->frameOptions; - int64 ftnext; + int frameOptions = winstate->frameOptions; - if (winstate->frametail_valid) + if (winstate->framehead_valid) return; /* already known for current row */ - /* We only support frame start mode UNBOUNDED PRECEDING for now */ - Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING); - - /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */ - if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) + if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) { - spool_tuples(winstate, -1); - winstate->frametailpos = winstate->spooled_rows - 1; - winstate->frametail_valid = true; - return; + /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */ + winstate->frameheadpos = 0; + winstate->framehead_valid = true; } + else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, frame head is the same as current */ + winstate->frameheadpos = winstate->currentpos; + winstate->framehead_valid = true; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + int64 fhprev; - /* Else frame tail mode must be CURRENT ROW */ - Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW); + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + { + winstate->frameheadpos = 0; + winstate->framehead_valid = true; + return; + } - /* In ROWS mode, exactly the rows up to current are in frame */ - if (frameOptions & FRAMEOPTION_ROWS) + /* + * In RANGE START_CURRENT mode, frame head is the first row that + * is a peer of current row. We search backwards from current, + * which could be a bit inefficient if peer sets are large. + * Might be better to have a separate read pointer that moves + * forward tracking the frame head. + */ + fhprev = winstate->currentpos - 1; + for (;;) + { + /* assume the frame head can't go backwards */ + if (fhprev < winstate->frameheadpos) + break; + if (!window_gettupleslot(winobj, fhprev, slot)) + break; /* start of partition */ + if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) + break; /* not peer of current row */ + fhprev--; + } + winstate->frameheadpos = fhprev + 1; + winstate->framehead_valid = true; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_START_VALUE) { - winstate->frametailpos = winstate->currentpos; - winstate->frametail_valid = true; - return; + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, bound is physically n before/after current */ + int64 offset = DatumGetInt64(winstate->startOffsetValue); + + if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING) + offset = -offset; + + winstate->frameheadpos = winstate->currentpos + offset; + /* frame head can't go before first row */ + if (winstate->frameheadpos < 0) + winstate->frameheadpos = 0; + else if (winstate->frameheadpos > winstate->currentpos) + { + /* make sure frameheadpos is not past end of partition */ + spool_tuples(winstate, winstate->frameheadpos - 1); + if (winstate->frameheadpos > winstate->spooled_rows) + winstate->frameheadpos = winstate->spooled_rows; + } + winstate->framehead_valid = true; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* parser should have rejected this */ + elog(ERROR, "window frame with value offset is not implemented"); + } + else + Assert(false); } + else + Assert(false); +} - /* Else must be RANGE mode */ - Assert(frameOptions & FRAMEOPTION_RANGE); +/* + * update_frametailpos + * make frametailpos valid for the current row + * + * Uses the winobj's read pointer for any required fetches; hence, if the + * frame mode is one that requires row comparisons, the winobj's mark must + * not be past the currently known frame tail. Also uses the specified slot + * for any required fetches. + */ +static void +update_frametailpos(WindowObject winobj, TupleTableSlot *slot) +{ + WindowAggState *winstate = winobj->winstate; + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + int frameOptions = winstate->frameOptions; - /* If no ORDER BY, all rows are peers with each other */ - if (node->ordNumCols == 0) + if (winstate->frametail_valid) + return; /* already known for current row */ + + if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) { + /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */ spool_tuples(winstate, -1); winstate->frametailpos = winstate->spooled_rows - 1; winstate->frametail_valid = true; - return; } + else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, exactly the rows up to current are in frame */ + winstate->frametailpos = winstate->currentpos; + winstate->frametail_valid = true; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + int64 ftnext; - /* - * Else we have to search for the first non-peer of the current row. We - * assume the current value of frametailpos is a lower bound on the - * possible frame tail location, ie, frame tail never goes backward, and - * that currentpos is also a lower bound, ie, current row is always in - * frame. - */ - ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1; - for (;;) + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + { + spool_tuples(winstate, -1); + winstate->frametailpos = winstate->spooled_rows - 1; + winstate->frametail_valid = true; + return; + } + + /* + * Else we have to search for the first non-peer of the current + * row. We assume the current value of frametailpos is a lower + * bound on the possible frame tail location, ie, frame tail never + * goes backward, and that currentpos is also a lower bound, ie, + * frame end always >= current row. + */ + ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1; + for (;;) + { + if (!window_gettupleslot(winobj, ftnext, slot)) + break; /* end of partition */ + if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) + break; /* not peer of current row */ + ftnext++; + } + winstate->frametailpos = ftnext - 1; + winstate->frametail_valid = true; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_END_VALUE) { - if (!window_gettupleslot(winobj, ftnext, slot)) - break; /* end of partition */ - if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) - break; /* not peer of current row */ - ftnext++; + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, bound is physically n before/after current */ + int64 offset = DatumGetInt64(winstate->endOffsetValue); + + if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING) + offset = -offset; + + winstate->frametailpos = winstate->currentpos + offset; + /* smallest allowable value of frametailpos is -1 */ + if (winstate->frametailpos < 0) + winstate->frametailpos = -1; + else if (winstate->frametailpos > winstate->currentpos) + { + /* make sure frametailpos is not past last row of partition */ + spool_tuples(winstate, winstate->frametailpos); + if (winstate->frametailpos >= winstate->spooled_rows) + winstate->frametailpos = winstate->spooled_rows - 1; + } + winstate->frametail_valid = true; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* parser should have rejected this */ + elog(ERROR, "window frame with value offset is not implemented"); + } + else + Assert(false); } - winstate->frametailpos = ftnext - 1; - winstate->frametail_valid = true; + else + Assert(false); } @@ -953,6 +1208,73 @@ ExecWindowAgg(WindowAggState *winstate) winstate->ss.ps.ps_TupFromTlist = false; } + /* + * Compute frame offset values, if any, during first call. + */ + if (winstate->all_first) + { + int frameOptions = winstate->frameOptions; + ExprContext *econtext = winstate->ss.ps.ps_ExprContext; + Datum value; + bool isnull; + int16 len; + bool byval; + + if (frameOptions & FRAMEOPTION_START_VALUE) + { + Assert(winstate->startOffset != NULL); + value = ExecEvalExprSwitchContext(winstate->startOffset, + econtext, + &isnull, + NULL); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("frame starting offset must not be NULL"))); + /* copy value into query-lifespan context */ + get_typlenbyval(exprType((Node *) winstate->startOffset->expr), + &len, &byval); + winstate->startOffsetValue = datumCopy(value, byval, len); + if (frameOptions & FRAMEOPTION_ROWS) + { + /* value is known to be int8 */ + int64 offset = DatumGetInt64(value); + + if (offset < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("frame starting offset must not be negative"))); + } + } + if (frameOptions & FRAMEOPTION_END_VALUE) + { + Assert(winstate->endOffset != NULL); + value = ExecEvalExprSwitchContext(winstate->endOffset, + econtext, + &isnull, + NULL); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("frame ending offset must not be NULL"))); + /* copy value into query-lifespan context */ + get_typlenbyval(exprType((Node *) winstate->endOffset->expr), + &len, &byval); + winstate->endOffsetValue = datumCopy(value, byval, len); + if (frameOptions & FRAMEOPTION_ROWS) + { + /* value is known to be int8 */ + int64 offset = DatumGetInt64(value); + + if (offset < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("frame ending offset must not be negative"))); + } + } + winstate->all_first = false; + } + restart: if (winstate->buffer == NULL) { @@ -964,7 +1286,8 @@ restart: { /* Advance current row within partition */ winstate->currentpos++; - /* This might mean that the frame tail moves, too */ + /* This might mean that the frame moves, too */ + winstate->framehead_valid = false; winstate->frametail_valid = false; } @@ -1099,10 +1422,18 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) winstate->tmpcontext = tmpcontext; ExecAssignExprContext(estate, &winstate->ss.ps); - /* Create long-lived context for storage of aggregate transvalues etc */ - winstate->wincontext = + /* Create long-lived context for storage of partition-local memory etc */ + winstate->partcontext = AllocSetContextCreate(CurrentMemoryContext, - "WindowAggContext", + "WindowAgg_Partition", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* Create mid-lived context for aggregate trans values etc */ + winstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg_Aggregates", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -1229,7 +1560,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) perfuncstate->numArguments = list_length(wfuncstate->args); fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo, - tmpcontext->ecxt_per_query_memory); + econtext->ecxt_per_query_memory); perfuncstate->flinfo.fn_expr = (Node *) wfunc; get_typlenbyval(wfunc->wintype, &perfuncstate->resulttypeLen, @@ -1264,6 +1595,30 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) winstate->numfuncs = wfuncno + 1; winstate->numaggs = aggno + 1; + /* Set up WindowObject for aggregates, if needed */ + if (winstate->numaggs > 0) + { + WindowObject agg_winobj = makeNode(WindowObjectData); + + agg_winobj->winstate = winstate; + agg_winobj->argstates = NIL; + agg_winobj->localmem = NULL; + /* make sure markptr = -1 to invalidate. It may not get used */ + agg_winobj->markptr = -1; + agg_winobj->readptr = -1; + winstate->agg_winobj = agg_winobj; + } + + /* copy frame options to state node for easy access */ + winstate->frameOptions = node->frameOptions; + + /* initialize frame bound offset expressions */ + winstate->startOffset = ExecInitExpr((Expr *) node->startOffset, + (PlanState *) winstate); + winstate->endOffset = ExecInitExpr((Expr *) node->endOffset, + (PlanState *) winstate); + + winstate->all_first = true; winstate->partition_spooled = false; winstate->more_partitions = false; @@ -1297,7 +1652,8 @@ ExecEndWindowAgg(WindowAggState *node) node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); - MemoryContextDelete(node->wincontext); + MemoryContextDelete(node->partcontext); + MemoryContextDelete(node->aggcontext); outerPlan = outerPlanState(node); ExecEndNode(outerPlan); @@ -1315,6 +1671,7 @@ ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt) node->all_done = false; node->ss.ps.ps_TupFromTlist = false; + node->all_first = true; /* release tuplestore et al */ release_partition(node); @@ -1566,7 +1923,7 @@ window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot) * There's no API to refetch the tuple at the current position. We have to * move one tuple forward, and then one backward. (We don't do it the * other way because we might try to fetch the row before our mark, which - * isn't allowed.) + * isn't allowed.) XXX this case could stand to be optimized. */ if (winobj->seekpos == pos) { @@ -1616,8 +1973,8 @@ WinGetPartitionLocalMemory(WindowObject winobj, Size sz) { Assert(WindowObjectIsValid(winobj)); if (winobj->localmem == NULL) - winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext, - sz); + winobj->localmem = + MemoryContextAllocZero(winobj->winstate->partcontext, sz); return winobj->localmem; } @@ -1791,7 +2148,30 @@ WinGetFuncArgInPartition(WindowObject winobj, int argno, if (isout) *isout = false; if (set_mark) - WinSetMarkPosition(winobj, abs_pos); + { + int frameOptions = winstate->frameOptions; + int64 mark_pos = abs_pos; + + /* + * In RANGE mode with a moving frame head, we must not let the + * mark advance past frameheadpos, since that row has to be + * fetchable during future update_frameheadpos calls. + * + * XXX it is very ugly to pollute window functions' marks with + * this consideration; it could for instance mask a logic bug + * that lets a window function fetch rows before what it had + * claimed was its mark. Perhaps use a separate mark for + * frame head probes? + */ + if ((frameOptions & FRAMEOPTION_RANGE) && + !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)) + { + update_frameheadpos(winobj, winstate->temp_slot_2); + if (mark_pos > winstate->frameheadpos) + mark_pos = winstate->frameheadpos; + } + WinSetMarkPosition(winobj, mark_pos); + } econtext->ecxt_outertuple = slot; return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), econtext, isnull, NULL); @@ -1838,7 +2218,8 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, abs_pos = winstate->currentpos + relpos; break; case WINDOW_SEEK_HEAD: - abs_pos = relpos; + update_frameheadpos(winobj, slot); + abs_pos = winstate->frameheadpos + relpos; break; case WINDOW_SEEK_TAIL: update_frametailpos(winobj, slot); @@ -1866,7 +2247,30 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, if (isout) *isout = false; if (set_mark) - WinSetMarkPosition(winobj, abs_pos); + { + int frameOptions = winstate->frameOptions; + int64 mark_pos = abs_pos; + + /* + * In RANGE mode with a moving frame head, we must not let the + * mark advance past frameheadpos, since that row has to be + * fetchable during future update_frameheadpos calls. + * + * XXX it is very ugly to pollute window functions' marks with + * this consideration; it could for instance mask a logic bug + * that lets a window function fetch rows before what it had + * claimed was its mark. Perhaps use a separate mark for + * frame head probes? + */ + if ((frameOptions & FRAMEOPTION_RANGE) && + !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)) + { + update_frameheadpos(winobj, winstate->temp_slot_2); + if (mark_pos > winstate->frameheadpos) + mark_pos = winstate->frameheadpos; + } + WinSetMarkPosition(winobj, mark_pos); + } econtext->ecxt_outertuple = slot; return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), econtext, isnull, NULL); |