diff options
author | Andres Freund <andres@anarazel.de> | 2015-05-16 03:40:59 +0200 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2015-05-16 03:46:31 +0200 |
commit | f3d3118532175541a9a96ed78881a3b04a057128 (patch) | |
tree | d06e7177843c563491f3a132d29fec0f60f69bd3 /src/backend/executor | |
parent | 6e4415c6aa428132dd41c8bf23a0885fca0f2271 (diff) | |
download | postgresql-f3d3118532175541a9a96ed78881a3b04a057128.tar.gz postgresql-f3d3118532175541a9a96ed78881a3b04a057128.zip |
Support GROUPING SETS, CUBE and ROLLUP.
This SQL standard functionality allows to aggregate data by different
GROUP BY clauses at once. Each grouping set returns rows with columns
grouped by in other sets set to NULL.
This could previously be achieved by doing each grouping as a separate
query, conjoined by UNION ALLs. Besides being considerably more concise,
grouping sets will in many cases be faster, requiring only one scan over
the underlying data.
The current implementation of grouping sets only supports using sorting
for input. Individual sets that share a sort order are computed in one
pass. If there are sets that don't share a sort order, additional sort &
aggregation steps are performed. These additional passes are sourced by
the previous sort step; thus avoiding repeated scans of the source data.
The code is structured in a way that adding support for purely using
hash aggregation or a mix of hashing and sorting is possible. Sorting
was chosen to be supported first, as it is the most generic method of
implementation.
Instead of, as in an earlier versions of the patch, representing the
chain of sort and aggregation steps as full blown planner and executor
nodes, all but the first sort are performed inside the aggregation node
itself. This avoids the need to do some unusual gymnastics to handle
having to return aggregated and non-aggregated tuples from underlying
nodes, as well as having to shut down underlying nodes early to limit
memory usage. The optimizer still builds Sort/Agg node to describe each
phase, but they're not part of the plan tree, but instead additional
data for the aggregation node. They're a convenient and preexisting way
to describe aggregation and sorting. The first (and possibly only) sort
step is still performed as a separate execution step. That retains
similarity with existing group by plans, makes rescans fairly simple,
avoids very deep plans (leading to slow explains) and easily allows to
avoid the sorting step if the underlying data is sorted by other means.
A somewhat ugly side of this patch is having to deal with a grammar
ambiguity between the new CUBE keyword and the cube extension/functions
named cube (and rollup). To avoid breaking existing deployments of the
cube extension it has not been renamed, neither has cube been made a
reserved keyword. Instead precedence hacking is used to make GROUP BY
cube(..) refer to the CUBE grouping sets feature, and not the function
cube(). To actually group by a function cube(), unlikely as that might
be, the function name has to be quoted.
Needs a catversion bump because stored rules may change.
Author: Andrew Gierth and Atri Sharma, with contributions from Andres Freund
Reviewed-By: Andres Freund, Noah Misch, Tom Lane, Svenne Krap, Tomas
Vondra, Erik Rijkers, Marti Raudsepp, Pavel Stehule
Discussion: CAOeZVidmVRe2jU6aMk_5qkxnB7dfmPROzM7Ur8JPW5j8Y5X-Lw@mail.gmail.com
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execQual.c | 63 | ||||
-rw-r--r-- | src/backend/executor/execUtils.c | 5 | ||||
-rw-r--r-- | src/backend/executor/nodeAgg.c | 1315 |
3 files changed, 1033 insertions, 350 deletions
diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c index e5994112a42..d414e20f120 100644 --- a/src/backend/executor/execQual.c +++ b/src/backend/executor/execQual.c @@ -181,6 +181,9 @@ static Datum ExecEvalArrayCoerceExpr(ArrayCoerceExprState *astate, bool *isNull, ExprDoneCond *isDone); static Datum ExecEvalCurrentOfExpr(ExprState *exprstate, ExprContext *econtext, bool *isNull, ExprDoneCond *isDone); +static Datum ExecEvalGroupingFuncExpr(GroupingFuncExprState *gstate, + ExprContext *econtext, + bool *isNull, ExprDoneCond *isDone); /* ---------------------------------------------------------------- @@ -3016,6 +3019,44 @@ ExecEvalCaseTestExpr(ExprState *exprstate, return econtext->caseValue_datum; } +/* + * ExecEvalGroupingFuncExpr + * + * Return a bitmask with a bit for each (unevaluated) argument expression + * (rightmost arg is least significant bit). + * + * A bit is set if the corresponding expression is NOT part of the set of + * grouping expressions in the current grouping set. + */ +static Datum +ExecEvalGroupingFuncExpr(GroupingFuncExprState *gstate, + ExprContext *econtext, + bool *isNull, + ExprDoneCond *isDone) +{ + int result = 0; + int attnum = 0; + Bitmapset *grouped_cols = gstate->aggstate->grouped_cols; + ListCell *lc; + + if (isDone) + *isDone = ExprSingleResult; + + *isNull = false; + + foreach(lc, (gstate->clauses)) + { + attnum = lfirst_int(lc); + + result = result << 1; + + if (!bms_is_member(attnum, grouped_cols)) + result = result | 1; + } + + return (Datum) result; +} + /* ---------------------------------------------------------------- * ExecEvalArray - ARRAY[] expressions * ---------------------------------------------------------------- @@ -4482,6 +4523,28 @@ ExecInitExpr(Expr *node, PlanState *parent) state = (ExprState *) astate; } break; + case T_GroupingFunc: + { + GroupingFunc *grp_node = (GroupingFunc *) node; + GroupingFuncExprState *grp_state = makeNode(GroupingFuncExprState); + Agg *agg = NULL; + + if (!parent || !IsA(parent, AggState) || !IsA(parent->plan, Agg)) + elog(ERROR, "parent of GROUPING is not Agg node"); + + grp_state->aggstate = (AggState *) parent; + + agg = (Agg *) (parent->plan); + + if (agg->groupingSets) + grp_state->clauses = grp_node->cols; + else + grp_state->clauses = NIL; + + state = (ExprState *) grp_state; + state->evalfunc = (ExprStateEvalFunc) ExecEvalGroupingFuncExpr; + } + break; case T_WindowFunc: { WindowFunc *wfunc = (WindowFunc *) node; diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 0da8e53e816..3963408b18c 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -642,9 +642,10 @@ get_last_attnums(Node *node, ProjectionInfo *projInfo) /* * Don't examine the arguments or filters of Aggrefs or WindowFuncs, * because those do not represent expressions to be evaluated within the - * overall targetlist's econtext. + * overall targetlist's econtext. GroupingFunc arguments are never + * evaluated at all. */ - if (IsA(node, Aggref)) + if (IsA(node, Aggref) || IsA(node, GroupingFunc)) return false; if (IsA(node, WindowFunc)) return false; diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index fcb61177c51..01a1e67f09e 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -45,15 +45,19 @@ * needed to allow resolution of a polymorphic aggregate's result type. * * We compute aggregate input expressions and run the transition functions - * in a temporary econtext (aggstate->tmpcontext). This is reset at - * least once per input tuple, so when the transvalue datatype is + * in a temporary econtext (aggstate->tmpcontext). This is reset at least + * once per input tuple, so when the transvalue datatype is * pass-by-reference, we have to be careful to copy it into a longer-lived - * memory context, and free the prior value to avoid memory leakage. - * We store transvalues in the memory context aggstate->aggcontext, - * which is also used for the hashtable structures in AGG_HASHED mode. - * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) - * is used to run finalize functions and compute the output tuple; - * this context can be reset once per output tuple. + * memory context, and free the prior value to avoid memory leakage. We + * store transvalues in another set of econtexts, aggstate->aggcontexts + * (one per grouping set, see below), which are also used for the hashtable + * structures in AGG_HASHED mode. These econtexts are rescanned, not just + * reset, at group boundaries so that aggregate transition functions can + * register shutdown callbacks via AggRegisterCallback. + * + * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to + * run finalize functions and compute the output tuple; this context can be + * reset once per output tuple. * * The executor's AggState node is passed as the fmgr "context" value in * all transfunc and finalfunc calls. It is not recommended that the @@ -84,6 +88,36 @@ * need some fallback logic to use this, since there's no Aggref node * for a window function.) * + * Grouping sets: + * + * A list of grouping sets which is structurally equivalent to a ROLLUP + * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over + * ordered data. We do this by keeping a separate set of transition values + * for each grouping set being concurrently processed; for each input tuple + * we update them all, and on group boundaries we reset those states + * (starting at the front of the list) whose grouping values have changed + * (the list of grouping sets is ordered from most specific to least + * specific). + * + * Where more complex grouping sets are used, we break them down into + * "phases", where each phase has a different sort order. During each + * phase but the last, the input tuples are additionally stored in a + * tuplesort which is keyed to the next phase's sort order; during each + * phase but the first, the input tuples are drawn from the previously + * sorted data. (The sorting of the data for the first phase is handled by + * the planner, as it might be satisfied by underlying nodes.) + * + * From the perspective of aggregate transition and final functions, the + * only issue regarding grouping sets is this: a single call site (flinfo) + * of an aggregate function may be used for updating several different + * transition values in turn. So the function must not cache in the flinfo + * anything which logically belongs as part of the transition value (most + * importantly, the memory context in which the transition value exists). + * The support API functions (AggCheckCallContext, AggRegisterCallback) are + * sensitive to the grouping set for which the aggregate function is + * currently being called. + * + * TODO: AGG_HASHED doesn't support multiple grouping sets yet. * * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -241,9 +275,11 @@ typedef struct AggStatePerAggData * then at completion of the input tuple group, we scan the sorted values, * eliminate duplicates if needed, and run the transition function on the * rest. + * + * We need a separate tuplesort for each grouping set. */ - Tuplesortstate *sortstate; /* sort object, if DISTINCT or ORDER BY */ + Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ /* * This field is a pre-initialized FunctionCallInfo struct used for @@ -287,6 +323,27 @@ typedef struct AggStatePerGroupData } AggStatePerGroupData; /* + * AggStatePerPhaseData - per-grouping-set-phase state + * + * Grouping sets are divided into "phases", where a single phase can be + * processed in one pass over the input. If there is more than one phase, then + * at the end of input from the current phase, state is reset and another pass + * taken over the data which has been re-sorted in the mean time. + * + * Accordingly, each phase specifies a list of grouping sets and group clause + * information, plus each phase after the first also has a sort order. + */ +typedef struct AggStatePerPhaseData +{ + int numsets; /* number of grouping sets (or 0) */ + int *gset_lengths; /* lengths of grouping sets */ + Bitmapset **grouped_cols; /* column groupings for rollup */ + FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ + Agg *aggnode; /* Agg node for phase data */ + Sort *sortnode; /* Sort node for input ordering for phase */ +} AggStatePerPhaseData; + +/* * To implement hashed aggregation, we need a hashtable that stores a * representative tuple and an array of AggStatePerGroup structs for each * distinct set of GROUP BY column values. We compute the hash key from @@ -302,9 +359,12 @@ typedef struct AggHashEntryData } AggHashEntryData; +static void initialize_phase(AggState *aggstate, int newphase); +static TupleTableSlot *fetch_input_tuple(AggState *aggstate); static void initialize_aggregates(AggState *aggstate, AggStatePerAgg peragg, - AggStatePerGroup pergroup); + AggStatePerGroup pergroup, + int numReset); static void advance_transition_function(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); @@ -319,6 +379,14 @@ static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull); +static void prepare_projection_slot(AggState *aggstate, + TupleTableSlot *slot, + int currentSet); +static void finalize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup, + int currentSet); +static TupleTableSlot *project_aggregates(AggState *aggstate); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_table(AggState *aggstate); @@ -331,46 +399,135 @@ static Datum GetAggInitVal(Datum textInitVal, Oid transtype); /* - * Initialize all aggregates for a new group of input values. - * - * When called, CurrentMemoryContext should be the per-query context. + * Switch to phase "newphase", which must either be 0 (to reset) or + * current_phase + 1. Juggle the tuplesorts accordingly. */ static void -initialize_aggregates(AggState *aggstate, - AggStatePerAgg peragg, - AggStatePerGroup pergroup) +initialize_phase(AggState *aggstate, int newphase) { - int aggno; + Assert(newphase == 0 || newphase == aggstate->current_phase + 1); - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + /* + * Whatever the previous state, we're now done with whatever input + * tuplesort was in use. + */ + if (aggstate->sort_in) { - AggStatePerAgg peraggstate = &peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; + tuplesort_end(aggstate->sort_in); + aggstate->sort_in = NULL; + } + if (newphase == 0) + { /* - * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. + * Discard any existing output tuplesort. */ - if (peraggstate->numSortCols > 0) + if (aggstate->sort_out) { - /* - * In case of rescan, maybe there could be an uncompleted sort - * operation? Clean it up if so. - */ - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); + tuplesort_end(aggstate->sort_out); + aggstate->sort_out = NULL; + } + } + else + { + /* + * The old output tuplesort becomes the new input one, and this is the + * right time to actually sort it. + */ + aggstate->sort_in = aggstate->sort_out; + aggstate->sort_out = NULL; + Assert(aggstate->sort_in); + tuplesort_performsort(aggstate->sort_in); + } - /* - * We use a plain Datum sorter when there's a single input column; - * otherwise sort the full tuple. (See comments for - * process_ordered_aggregate_single.) - */ - peraggstate->sortstate = - (peraggstate->numInputs == 1) ? + /* + * If this isn't the last phase, we need to sort appropriately for the next + * phase in sequence. + */ + if (newphase < aggstate->numphases - 1) + { + Sort *sortnode = aggstate->phases[newphase+1].sortnode; + PlanState *outerNode = outerPlanState(aggstate); + TupleDesc tupDesc = ExecGetResultType(outerNode); + + aggstate->sort_out = tuplesort_begin_heap(tupDesc, + sortnode->numCols, + sortnode->sortColIdx, + sortnode->sortOperators, + sortnode->collations, + sortnode->nullsFirst, + work_mem, + false); + } + + aggstate->current_phase = newphase; + aggstate->phase = &aggstate->phases[newphase]; +} + +/* + * Fetch a tuple from either the outer plan (for phase 0) or from the sorter + * populated by the previous phase. Copy it to the sorter for the next phase + * if any. + */ +static TupleTableSlot * +fetch_input_tuple(AggState *aggstate) +{ + TupleTableSlot *slot; + + if (aggstate->sort_in) + { + if (!tuplesort_gettupleslot(aggstate->sort_in, true, aggstate->sort_slot)) + return NULL; + slot = aggstate->sort_slot; + } + else + slot = ExecProcNode(outerPlanState(aggstate)); + + if (!TupIsNull(slot) && aggstate->sort_out) + tuplesort_puttupleslot(aggstate->sort_out, slot); + + return slot; +} + +/* + * (Re)Initialize an individual aggregate. + * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +initialize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate) +{ + /* + * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. + */ + if (peraggstate->numSortCols > 0) + { + /* + * In case of rescan, maybe there could be an uncompleted sort + * operation? Clean it up if so. + */ + if (peraggstate->sortstates[aggstate->current_set]) + tuplesort_end(peraggstate->sortstates[aggstate->current_set]); + + + /* + * We use a plain Datum sorter when there's a single input column; + * otherwise sort the full tuple. (See comments for + * process_ordered_aggregate_single.) + */ + if (peraggstate->numInputs == 1) + peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_datum(peraggstate->evaldesc->attrs[0]->atttypid, peraggstate->sortOperators[0], peraggstate->sortCollations[0], peraggstate->sortNullsFirst[0], - work_mem, false) : + work_mem, false); + else + peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc, peraggstate->numSortCols, peraggstate->sortColIdx, @@ -378,41 +535,83 @@ initialize_aggregates(AggState *aggstate, peraggstate->sortCollations, peraggstate->sortNullsFirst, work_mem, false); - } + } - /* - * (Re)set transValue to the initial value. - * - * Note that when the initial value is pass-by-ref, we must copy it - * (into the aggcontext) since we will pfree the transValue later. - */ - if (peraggstate->initValueIsNull) - pergroupstate->transValue = peraggstate->initValue; - else + /* + * (Re)set transValue to the initial value. + * + * Note that when the initial value is pass-by-ref, we must copy + * it (into the aggcontext) since we will pfree the transValue + * later. + */ + if (peraggstate->initValueIsNull) + pergroupstate->transValue = peraggstate->initValue; + else + { + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo( + aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + pergroupstate->transValue = datumCopy(peraggstate->initValue, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + pergroupstate->transValueIsNull = peraggstate->initValueIsNull; + + /* + * If the initial value for the transition state doesn't exist in + * the pg_aggregate table then we will let the first non-NULL + * value returned from the outer procNode become the initial + * value. (This is useful for aggregates like max() and min().) + * The noTransValue flag signals that we still need to do this. + */ + pergroupstate->noTransValue = peraggstate->initValueIsNull; +} + +/* + * Initialize all aggregates for a new group of input values. + * + * If there are multiple grouping sets, we initialize only the first numReset + * of them (the grouping sets are ordered so that the most specific one, which + * is reset most often, is first). As a convenience, if numReset is < 1, we + * reinitialize all sets. + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +initialize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup, + int numReset) +{ + int aggno; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int setno = 0; + + if (numReset < 1) + numReset = numGroupingSets; + + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &peragg[aggno]; + + for (setno = 0; setno < numReset; setno++) { - MemoryContext oldContext; + AggStatePerGroup pergroupstate; - oldContext = MemoryContextSwitchTo(aggstate->aggcontext); - pergroupstate->transValue = datumCopy(peraggstate->initValue, - peraggstate->transtypeByVal, - peraggstate->transtypeLen); - MemoryContextSwitchTo(oldContext); - } - pergroupstate->transValueIsNull = peraggstate->initValueIsNull; + pergroupstate = &pergroup[aggno + (setno * (aggstate->numaggs))]; - /* - * If the initial value for the transition state doesn't exist in the - * pg_aggregate table then we will let the first non-NULL value - * returned from the outer procNode become the initial value. (This is - * useful for aggregates like max() and min().) The noTransValue flag - * signals that we still need to do this. - */ - pergroupstate->noTransValue = peraggstate->initValueIsNull; + aggstate->current_set = setno; + + initialize_aggregate(aggstate, peraggstate, pergroupstate); + } } } /* - * Given new input value(s), advance the transition function of an aggregate. + * Given new input value(s), advance the transition function of one aggregate + * within one grouping set only (already set in aggstate->current_set) * * The new values (and null flags) have been preloaded into argument positions * 1 and up in peraggstate->transfn_fcinfo, so that we needn't copy them again @@ -455,7 +654,8 @@ advance_transition_function(AggState *aggstate, * We must copy the datum into aggcontext if it is pass-by-ref. We * do not need to pfree the old transValue, since it's NULL. */ - oldContext = MemoryContextSwitchTo(aggstate->aggcontext); + oldContext = MemoryContextSwitchTo( + aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); pergroupstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -503,7 +703,7 @@ advance_transition_function(AggState *aggstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(aggstate->aggcontext); + MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -530,11 +730,13 @@ static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) { int aggno; + int setno = 0; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int numAggs = aggstate->numaggs; - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + for (aggno = 0; aggno < numAggs; aggno++) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; ExprState *filter = peraggstate->aggrefstate->aggfilter; int numTransInputs = peraggstate->numTransInputs; int i; @@ -578,13 +780,16 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) continue; } - /* OK, put the tuple into the tuplesort object */ - if (peraggstate->numInputs == 1) - tuplesort_putdatum(peraggstate->sortstate, - slot->tts_values[0], - slot->tts_isnull[0]); - else - tuplesort_puttupleslot(peraggstate->sortstate, slot); + for (setno = 0; setno < numGroupingSets; setno++) + { + /* OK, put the tuple into the tuplesort object */ + if (peraggstate->numInputs == 1) + tuplesort_putdatum(peraggstate->sortstates[setno], + slot->tts_values[0], + slot->tts_isnull[0]); + else + tuplesort_puttupleslot(peraggstate->sortstates[setno], slot); + } } else { @@ -600,7 +805,14 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) fcinfo->argnull[i + 1] = slot->tts_isnull[i]; } - advance_transition_function(aggstate, peraggstate, pergroupstate); + for (setno = 0; setno < numGroupingSets; setno++) + { + AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)]; + + aggstate->current_set = setno; + + advance_transition_function(aggstate, peraggstate, pergroupstate); + } } } } @@ -623,6 +835,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) * is around 300% faster. (The speedup for by-reference types is less * but still noticeable.) * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * * When called, CurrentMemoryContext should be the per-query context. */ static void @@ -642,7 +857,7 @@ process_ordered_aggregate_single(AggState *aggstate, Assert(peraggstate->numDistinctCols < 2); - tuplesort_performsort(peraggstate->sortstate); + tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); /* Load the column into argument 1 (arg 0 will be transition value) */ newVal = fcinfo->arg + 1; @@ -654,8 +869,8 @@ process_ordered_aggregate_single(AggState *aggstate, * pfree them when they are no longer needed. */ - while (tuplesort_getdatum(peraggstate->sortstate, true, - newVal, isNull)) + while (tuplesort_getdatum(peraggstate->sortstates[aggstate->current_set], + true, newVal, isNull)) { /* * Clear and select the working context for evaluation of the equality @@ -698,8 +913,8 @@ process_ordered_aggregate_single(AggState *aggstate, if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); - tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = NULL; + tuplesort_end(peraggstate->sortstates[aggstate->current_set]); + peraggstate->sortstates[aggstate->current_set] = NULL; } /* @@ -709,6 +924,9 @@ process_ordered_aggregate_single(AggState *aggstate, * sort, read out the values in sorted order, and run the transition * function on each value (applying DISTINCT if appropriate). * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * * When called, CurrentMemoryContext should be the per-query context. */ static void @@ -725,13 +943,14 @@ process_ordered_aggregate_multi(AggState *aggstate, bool haveOldValue = false; int i; - tuplesort_performsort(peraggstate->sortstate); + tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); ExecClearTuple(slot1); if (slot2) ExecClearTuple(slot2); - while (tuplesort_gettupleslot(peraggstate->sortstate, true, slot1)) + while (tuplesort_gettupleslot(peraggstate->sortstates[aggstate->current_set], + true, slot1)) { /* * Extract the first numTransInputs columns as datums to pass to the @@ -779,13 +998,16 @@ process_ordered_aggregate_multi(AggState *aggstate, if (slot2) ExecClearTuple(slot2); - tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = NULL; + tuplesort_end(peraggstate->sortstates[aggstate->current_set]); + peraggstate->sortstates[aggstate->current_set] = NULL; } /* * Compute the final value of one aggregate. * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * * The finalfunction will be run, and the result delivered, in the * output-tuple context; caller's CurrentMemoryContext does not matter. */ @@ -832,7 +1054,7 @@ finalize_aggregate(AggState *aggstate, /* set up aggstate->curperagg for AggGetAggref() */ aggstate->curperagg = peraggstate; - InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), + InitFunctionCallInfoData(fcinfo, &peraggstate->finalfn, numFinalArgs, peraggstate->aggCollation, (void *) aggstate, NULL); @@ -882,6 +1104,154 @@ finalize_aggregate(AggState *aggstate, MemoryContextSwitchTo(oldContext); } + +/* + * Prepare to finalize and project based on the specified representative tuple + * slot and grouping set. + * + * In the specified tuple slot, force to null all attributes that should be + * read as null in the context of the current grouping set. Also stash the + * current group bitmap where GroupingExpr can get at it. + * + * This relies on three conditions: + * + * 1) Nothing is ever going to try and extract the whole tuple from this slot, + * only reference it in evaluations, which will only access individual + * attributes. + * + * 2) No system columns are going to need to be nulled. (If a system column is + * referenced in a group clause, it is actually projected in the outer plan + * tlist.) + * + * 3) Within a given phase, we never need to recover the value of an attribute + * once it has been set to null. + * + * Poking into the slot this way is a bit ugly, but the consensus is that the + * alternative was worse. + */ +static void +prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet) +{ + if (aggstate->phase->grouped_cols) + { + Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet]; + + aggstate->grouped_cols = grouped_cols; + + if (slot->tts_isempty) + { + /* + * Force all values to be NULL if working on an empty input tuple + * (i.e. an empty grouping set for which no input rows were + * supplied). + */ + ExecStoreAllNullTuple(slot); + } + else if (aggstate->all_grouped_cols) + { + ListCell *lc; + + /* all_grouped_cols is arranged in desc order */ + slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols)); + + foreach(lc, aggstate->all_grouped_cols) + { + int attnum = lfirst_int(lc); + + if (!bms_is_member(attnum, grouped_cols)) + slot->tts_isnull[attnum - 1] = true; + } + } + } +} + +/* + * Compute the final value of all aggregates for one group. + * + * This function handles only one grouping set at a time. + * + * Results are stored in the output econtext aggvalues/aggnulls. + */ +static void +finalize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup, + int currentSet) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + Datum *aggvalues = econtext->ecxt_aggvalues; + bool *aggnulls = econtext->ecxt_aggnulls; + int aggno; + + Assert(currentSet == 0 || + ((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + aggstate->current_set = currentSet; + + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &peragg[aggno]; + AggStatePerGroup pergroupstate; + + pergroupstate = &pergroup[aggno + (currentSet * (aggstate->numaggs))]; + + if (peraggstate->numSortCols > 0) + { + Assert(((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + if (peraggstate->numInputs == 1) + process_ordered_aggregate_single(aggstate, + peraggstate, + pergroupstate); + else + process_ordered_aggregate_multi(aggstate, + peraggstate, + pergroupstate); + } + + finalize_aggregate(aggstate, peraggstate, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); + } +} + +/* + * Project the result of a group (whose aggs have already been calculated by + * finalize_aggregates). Returns the result slot, or NULL if no row is + * projected (suppressed by qual or by an empty SRF). + */ +static TupleTableSlot * +project_aggregates(AggState *aggstate) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + + /* + * Check the qual (HAVING clause); if the group does not match, ignore + * it. + */ + if (ExecQual(aggstate->ss.ps.qual, econtext, false)) + { + /* + * Form and return or store a projection tuple using the aggregate + * results and the representative input tuple. + */ + ExprDoneCond isDone; + TupleTableSlot *result; + + result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); + + if (isDone != ExprEndResult) + { + aggstate->ss.ps.ps_TupFromTlist = + (isDone == ExprMultipleResult); + return result; + } + } + else + InstrCountFiltered1(aggstate, 1); + + return NULL; +} + /* * find_unaggregated_cols * Construct a bitmapset of the column numbers of un-aggregated Vars @@ -916,8 +1286,11 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) *colnos = bms_add_member(*colnos, var->varattno); return false; } - if (IsA(node, Aggref)) /* do not descend into aggregate exprs */ + if (IsA(node, Aggref) || IsA(node, GroupingFunc)) + { + /* do not descend into aggregate exprs */ return false; + } return expression_tree_walker(node, find_unaggregated_cols_walker, (void *) colnos); } @@ -942,11 +1315,11 @@ build_hash_table(AggState *aggstate) aggstate->hashtable = BuildTupleHashTable(node->numCols, node->grpColIdx, - aggstate->eqfunctions, + aggstate->phase->eqfunctions, aggstate->hashfunctions, node->numGroups, entrysize, - aggstate->aggcontext, + aggstate->aggcontexts[0]->ecxt_per_tuple_memory, tmpmem); } @@ -1057,7 +1430,7 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) if (isnew) { /* initialize aggregates for new tuple group */ - initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); + initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup, 0); } return entry; @@ -1079,6 +1452,8 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) TupleTableSlot * ExecAgg(AggState *node) { + TupleTableSlot *result; + /* * Check to see if we're still projecting out tuples from a previous agg * tuple (because there is a function-returning-set in the projection @@ -1086,7 +1461,6 @@ ExecAgg(AggState *node) */ if (node->ss.ps.ps_TupFromTlist) { - TupleTableSlot *result; ExprDoneCond isDone; result = ExecProject(node->ss.ps.ps_ProjInfo, &isDone); @@ -1097,22 +1471,30 @@ ExecAgg(AggState *node) } /* - * Exit if nothing left to do. (We must do the ps_TupFromTlist check - * first, because in some cases agg_done gets set before we emit the final - * aggregate tuple, and we have to finish running SRFs for it.) + * (We must do the ps_TupFromTlist check first, because in some cases + * agg_done gets set before we emit the final aggregate tuple, and we have + * to finish running SRFs for it.) */ - if (node->agg_done) - return NULL; - - /* Dispatch based on strategy */ - if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + if (!node->agg_done) { - if (!node->table_filled) - agg_fill_hash_table(node); - return agg_retrieve_hash_table(node); + /* Dispatch based on strategy */ + switch (node->phase->aggnode->aggstrategy) + { + case AGG_HASHED: + if (!node->table_filled) + agg_fill_hash_table(node); + result = agg_retrieve_hash_table(node); + break; + default: + result = agg_retrieve_direct(node); + break; + } + + if (!TupIsNull(result)) + return result; } - else - return agg_retrieve_direct(node); + + return NULL; } /* @@ -1121,28 +1503,30 @@ ExecAgg(AggState *node) static TupleTableSlot * agg_retrieve_direct(AggState *aggstate) { - Agg *node = (Agg *) aggstate->ss.ps.plan; - PlanState *outerPlan; + Agg *node = aggstate->phase->aggnode; ExprContext *econtext; ExprContext *tmpcontext; - Datum *aggvalues; - bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; TupleTableSlot *outerslot; TupleTableSlot *firstSlot; - int aggno; + TupleTableSlot *result; + bool hasGroupingSets = aggstate->phase->numsets > 0; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int currentSet; + int nextSetSize; + int numReset; + int i; /* * get state info from node + * + * econtext is the per-output-tuple expression context + * tmpcontext is the per-input-tuple expression context */ - outerPlan = outerPlanState(aggstate); - /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; - aggvalues = econtext->ecxt_aggvalues; - aggnulls = econtext->ecxt_aggnulls; - /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; + peragg = aggstate->peragg; pergroup = aggstate->pergroup; firstSlot = aggstate->ss.ss_ScanTupleSlot; @@ -1150,172 +1534,281 @@ agg_retrieve_direct(AggState *aggstate) /* * We loop retrieving groups until we find one matching * aggstate->ss.ps.qual + * + * For grouping sets, we have the invariant that aggstate->projected_set + * is either -1 (initial call) or the index (starting from 0) in + * gset_lengths for the group we just completed (either by projecting a + * row or by discarding it in the qual). */ while (!aggstate->agg_done) { /* - * If we don't already have the first tuple of the new group, fetch it - * from the outer plan. + * Clear the per-output-tuple context for each group, as well as + * aggcontext (which contains any pass-by-ref transvalues of the old + * group). Some aggregate functions store working state in child + * contexts; those now get reset automatically without us needing to + * do anything special. + * + * We use ReScanExprContext not just ResetExprContext because we want + * any registered shutdown callbacks to be called. That allows + * aggregate functions to ensure they've cleaned up any non-memory + * resources. + */ + ReScanExprContext(econtext); + + /* + * Determine how many grouping sets need to be reset at this boundary. */ - if (aggstate->grp_firstTuple == NULL) + if (aggstate->projected_set >= 0 && + aggstate->projected_set < numGroupingSets) + numReset = aggstate->projected_set + 1; + else + numReset = numGroupingSets; + + /* + * numReset can change on a phase boundary, but that's OK; we want to + * reset the contexts used in _this_ phase, and later, after possibly + * changing phase, initialize the right number of aggregates for the + * _new_ phase. + */ + + for (i = 0; i < numReset; i++) + { + ReScanExprContext(aggstate->aggcontexts[i]); + } + + /* + * Check if input is complete and there are no more groups to project + * in this phase; move to next phase or mark as done. + */ + if (aggstate->input_done == true && + aggstate->projected_set >= (numGroupingSets - 1)) { - outerslot = ExecProcNode(outerPlan); - if (!TupIsNull(outerslot)) + if (aggstate->current_phase < aggstate->numphases - 1) { - /* - * Make a copy of the first input tuple; we will use this for - * comparisons (in group mode) and for projection. - */ - aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); + initialize_phase(aggstate, aggstate->current_phase + 1); + aggstate->input_done = false; + aggstate->projected_set = -1; + numGroupingSets = Max(aggstate->phase->numsets, 1); + node = aggstate->phase->aggnode; + numReset = numGroupingSets; } else { - /* outer plan produced no tuples at all */ aggstate->agg_done = true; - /* If we are grouping, we should produce no tuples too */ - if (node->aggstrategy != AGG_PLAIN) - return NULL; + break; } } /* - * Clear the per-output-tuple context for each group, as well as - * aggcontext (which contains any pass-by-ref transvalues of the old - * group). We also clear any child contexts of the aggcontext; some - * aggregate functions store working state in such contexts. - * - * We use ReScanExprContext not just ResetExprContext because we want - * any registered shutdown callbacks to be called. That allows - * aggregate functions to ensure they've cleaned up any non-memory - * resources. + * Get the number of columns in the next grouping set after the last + * projected one (if any). This is the number of columns to compare to + * see if we reached the boundary of that set too. */ - ReScanExprContext(econtext); - - MemoryContextResetAndDeleteChildren(aggstate->aggcontext); + if (aggstate->projected_set >= 0 && + aggstate->projected_set < (numGroupingSets - 1)) + nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1]; + else + nextSetSize = 0; - /* - * Initialize working state for a new input tuple group + /*- + * If a subgroup for the current grouping set is present, project it. + * + * We have a new group if: + * - we're out of input but haven't projected all grouping sets + * (checked above) + * OR + * - we already projected a row that wasn't from the last grouping + * set + * AND + * - the next grouping set has at least one grouping column (since + * empty grouping sets project only once input is exhausted) + * AND + * - the previous and pending rows differ on the grouping columns + * of the next grouping set */ - initialize_aggregates(aggstate, peragg, pergroup); + if (aggstate->input_done || + (node->aggstrategy == AGG_SORTED && + aggstate->projected_set != -1 && + aggstate->projected_set < (numGroupingSets - 1) && + nextSetSize > 0 && + !execTuplesMatch(econtext->ecxt_outertuple, + tmpcontext->ecxt_outertuple, + nextSetSize, + node->grpColIdx, + aggstate->phase->eqfunctions, + tmpcontext->ecxt_per_tuple_memory))) + { + aggstate->projected_set += 1; - if (aggstate->grp_firstTuple != NULL) + Assert(aggstate->projected_set < numGroupingSets); + Assert(nextSetSize > 0 || aggstate->input_done); + } + else { /* - * Store the copied first input tuple in the tuple table slot - * reserved for it. The tuple will be deleted when it is cleared - * from the slot. + * We no longer care what group we just projected, the next + * projection will always be the first (or only) grouping set + * (unless the input proves to be empty). */ - ExecStoreTuple(aggstate->grp_firstTuple, - firstSlot, - InvalidBuffer, - true); - aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ - - /* set up for first advance_aggregates call */ - tmpcontext->ecxt_outertuple = firstSlot; + aggstate->projected_set = 0; /* - * Process each outer-plan tuple, and then fetch the next one, - * until we exhaust the outer plan or cross a group boundary. + * If we don't already have the first tuple of the new group, + * fetch it from the outer plan. */ - for (;;) + if (aggstate->grp_firstTuple == NULL) { - advance_aggregates(aggstate, pergroup); - - /* Reset per-input-tuple context after each tuple */ - ResetExprContext(tmpcontext); - - outerslot = ExecProcNode(outerPlan); - if (TupIsNull(outerslot)) + outerslot = fetch_input_tuple(aggstate); + if (!TupIsNull(outerslot)) { - /* no more outer-plan tuples available */ - aggstate->agg_done = true; - break; + /* + * Make a copy of the first input tuple; we will use this + * for comparisons (in group mode) and for projection. + */ + aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); } - /* set up for next advance_aggregates call */ - tmpcontext->ecxt_outertuple = outerslot; - - /* - * If we are grouping, check whether we've crossed a group - * boundary. - */ - if (node->aggstrategy == AGG_SORTED) + else { - if (!execTuplesMatch(firstSlot, - outerslot, - node->numCols, node->grpColIdx, - aggstate->eqfunctions, - tmpcontext->ecxt_per_tuple_memory)) + /* outer plan produced no tuples at all */ + if (hasGroupingSets) { /* - * Save the first input tuple of the next group. + * If there was no input at all, we need to project + * rows only if there are grouping sets of size 0. + * Note that this implies that there can't be any + * references to ungrouped Vars, which would otherwise + * cause issues with the empty output slot. + * + * XXX: This is no longer true, we currently deal with + * this in finalize_aggregates(). */ - aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); - break; + aggstate->input_done = true; + + while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0) + { + aggstate->projected_set += 1; + if (aggstate->projected_set >= numGroupingSets) + { + /* + * We can't set agg_done here because we might + * have more phases to do, even though the + * input is empty. So we need to restart the + * whole outer loop. + */ + break; + } + } + + if (aggstate->projected_set >= numGroupingSets) + continue; + } + else + { + aggstate->agg_done = true; + /* If we are grouping, we should produce no tuples too */ + if (node->aggstrategy != AGG_PLAIN) + return NULL; } } } - } - /* - * Use the representative input tuple for any references to - * non-aggregated input columns in aggregate direct args, the node - * qual, and the tlist. (If we are not grouping, and there are no - * input rows at all, we will come here with an empty firstSlot ... - * but if not grouping, there can't be any references to - * non-aggregated input columns, so no problem.) - */ - econtext->ecxt_outertuple = firstSlot; - - /* - * Done scanning input tuple group. Finalize each aggregate - * calculation, and stash results in the per-output-tuple context. - */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) - { - AggStatePerAgg peraggstate = &peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; + /* + * Initialize working state for a new input tuple group. + */ + initialize_aggregates(aggstate, peragg, pergroup, numReset); - if (peraggstate->numSortCols > 0) + if (aggstate->grp_firstTuple != NULL) { - if (peraggstate->numInputs == 1) - process_ordered_aggregate_single(aggstate, - peraggstate, - pergroupstate); - else - process_ordered_aggregate_multi(aggstate, - peraggstate, - pergroupstate); - } + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecStoreTuple(aggstate->grp_firstTuple, + firstSlot, + InvalidBuffer, + true); + aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ - finalize_aggregate(aggstate, peraggstate, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); - } + /* set up for first advance_aggregates call */ + tmpcontext->ecxt_outertuple = firstSlot; - /* - * Check the qual (HAVING clause); if the group does not match, ignore - * it and loop back to try to process another group. - */ - if (ExecQual(aggstate->ss.ps.qual, econtext, false)) - { - /* - * Form and return a projection tuple using the aggregate results - * and the representative input tuple. - */ - TupleTableSlot *result; - ExprDoneCond isDone; + /* + * Process each outer-plan tuple, and then fetch the next one, + * until we exhaust the outer plan or cross a group boundary. + */ + for (;;) + { + advance_aggregates(aggstate, pergroup); - result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); - if (isDone != ExprEndResult) - { - aggstate->ss.ps.ps_TupFromTlist = - (isDone == ExprMultipleResult); - return result; + outerslot = fetch_input_tuple(aggstate); + if (TupIsNull(outerslot)) + { + /* no more outer-plan tuples available */ + if (hasGroupingSets) + { + aggstate->input_done = true; + break; + } + else + { + aggstate->agg_done = true; + break; + } + } + /* set up for next advance_aggregates call */ + tmpcontext->ecxt_outertuple = outerslot; + + /* + * If we are grouping, check whether we've crossed a group + * boundary. + */ + if (node->aggstrategy == AGG_SORTED) + { + if (!execTuplesMatch(firstSlot, + outerslot, + node->numCols, + node->grpColIdx, + aggstate->phase->eqfunctions, + tmpcontext->ecxt_per_tuple_memory)) + { + aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); + break; + } + } + } } + + /* + * Use the representative input tuple for any references to + * non-aggregated input columns in aggregate direct args, the node + * qual, and the tlist. (If we are not grouping, and there are no + * input rows at all, we will come here with an empty firstSlot ... + * but if not grouping, there can't be any references to + * non-aggregated input columns, so no problem.) + */ + econtext->ecxt_outertuple = firstSlot; } - else - InstrCountFiltered1(aggstate, 1); + + Assert(aggstate->projected_set >= 0); + + currentSet = aggstate->projected_set; + + prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); + + finalize_aggregates(aggstate, peragg, pergroup, currentSet); + + /* + * If there's no row to project right now, we must continue rather than + * returning a null since there might be more groups. + */ + result = project_aggregates(aggstate); + if (result) + return result; } /* No more groups */ @@ -1328,16 +1821,15 @@ agg_retrieve_direct(AggState *aggstate) static void agg_fill_hash_table(AggState *aggstate) { - PlanState *outerPlan; ExprContext *tmpcontext; AggHashEntry entry; TupleTableSlot *outerslot; /* * get state info from node + * + * tmpcontext is the per-input-tuple expression context */ - outerPlan = outerPlanState(aggstate); - /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; /* @@ -1346,7 +1838,7 @@ agg_fill_hash_table(AggState *aggstate) */ for (;;) { - outerslot = ExecProcNode(outerPlan); + outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) break; /* set up for advance_aggregates call */ @@ -1374,21 +1866,17 @@ static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate) { ExprContext *econtext; - Datum *aggvalues; - bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; AggHashEntry entry; TupleTableSlot *firstSlot; - int aggno; + TupleTableSlot *result; /* * get state info from node */ /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; - aggvalues = econtext->ecxt_aggvalues; - aggnulls = econtext->ecxt_aggnulls; peragg = aggstate->peragg; firstSlot = aggstate->ss.ss_ScanTupleSlot; @@ -1428,19 +1916,7 @@ agg_retrieve_hash_table(AggState *aggstate) pergroup = entry->pergroup; - /* - * Finalize each aggregate calculation, and stash results in the - * per-output-tuple context. - */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) - { - AggStatePerAgg peraggstate = &peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; - - Assert(peraggstate->numSortCols == 0); - finalize_aggregate(aggstate, peraggstate, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); - } + finalize_aggregates(aggstate, peragg, pergroup, 0); /* * Use the representative input tuple for any references to @@ -1448,30 +1924,9 @@ agg_retrieve_hash_table(AggState *aggstate) */ econtext->ecxt_outertuple = firstSlot; - /* - * Check the qual (HAVING clause); if the group does not match, ignore - * it and loop back to try to process another group. - */ - if (ExecQual(aggstate->ss.ps.qual, econtext, false)) - { - /* - * Form and return a projection tuple using the aggregate results - * and the representative input tuple. - */ - TupleTableSlot *result; - ExprDoneCond isDone; - - result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); - - if (isDone != ExprEndResult) - { - aggstate->ss.ps.ps_TupFromTlist = - (isDone == ExprMultipleResult); - return result; - } - } - else - InstrCountFiltered1(aggstate, 1); + result = project_aggregates(aggstate); + if (result) + return result; } /* No more groups */ @@ -1494,7 +1949,14 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) ExprContext *econtext; int numaggs, aggno; + int phase; ListCell *l; + Bitmapset *all_grouped_cols = NULL; + int numGroupingSets = 1; + int numPhases; + int currentsortno = 0; + int i = 0; + int j = 0; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -1508,38 +1970,68 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->aggs = NIL; aggstate->numaggs = 0; - aggstate->eqfunctions = NULL; + aggstate->maxsets = 0; aggstate->hashfunctions = NULL; + aggstate->projected_set = -1; + aggstate->current_set = 0; aggstate->peragg = NULL; aggstate->curperagg = NULL; aggstate->agg_done = false; + aggstate->input_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; aggstate->hashtable = NULL; + aggstate->sort_in = NULL; + aggstate->sort_out = NULL; /* - * Create expression contexts. We need two, one for per-input-tuple - * processing and one for per-output-tuple processing. We cheat a little - * by using ExecAssignExprContext() to build both. + * Calculate the maximum number of grouping sets in any phase; this + * determines the size of some allocations. */ - ExecAssignExprContext(estate, &aggstate->ss.ps); - aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; - ExecAssignExprContext(estate, &aggstate->ss.ps); + if (node->groupingSets) + { + Assert(node->aggstrategy != AGG_HASHED); + + numGroupingSets = list_length(node->groupingSets); + + foreach(l, node->chain) + { + Agg *agg = lfirst(l); + + numGroupingSets = Max(numGroupingSets, + list_length(agg->groupingSets)); + } + } + + aggstate->maxsets = numGroupingSets; + aggstate->numphases = numPhases = 1 + list_length(node->chain); + + aggstate->aggcontexts = (ExprContext **) + palloc0(sizeof(ExprContext *) * numGroupingSets); /* - * We also need a long-lived memory context for holding hashtable data - * structures and transition values. NOTE: the details of what is stored - * in aggcontext and what is stored in the regular per-query memory - * context are driven by a simple decision: we want to reset the - * aggcontext at group boundaries (if not hashing) and in ExecReScanAgg to - * recover no-longer-wanted space. + * Create expression contexts. We need three or more, one for + * per-input-tuple processing, one for per-output-tuple processing, and + * one for each grouping set. The per-tuple memory context of the + * per-grouping-set ExprContexts (aggcontexts) replaces the standalone + * memory context formerly used to hold transition values. We cheat a + * little by using ExecAssignExprContext() to build all of them. + * + * NOTE: the details of what is stored in aggcontexts and what is stored + * in the regular per-query memory context are driven by a simple + * decision: we want to reset the aggcontext at group boundaries (if not + * hashing) and in ExecReScanAgg to recover no-longer-wanted space. */ - aggstate->aggcontext = - AllocSetContextCreate(CurrentMemoryContext, - "AggContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; + + for (i = 0; i < numGroupingSets; ++i) + { + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext; + } + + ExecAssignExprContext(estate, &aggstate->ss.ps); /* * tuple table initialization @@ -1547,6 +2039,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) ExecInitScanTupleSlot(estate, &aggstate->ss); ExecInitResultTupleSlot(estate, &aggstate->ss.ps); aggstate->hashslot = ExecInitExtraTupleSlot(estate); + aggstate->sort_slot = ExecInitExtraTupleSlot(estate); /* * initialize child expressions @@ -1565,7 +2058,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) (PlanState *) aggstate); /* - * initialize child nodes + * Initialize child nodes. * * If we are doing a hashed aggregation then the child plan does not need * to handle REWIND efficiently; see ExecReScanAgg. @@ -1579,6 +2072,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * initialize source tuple type. */ ExecAssignScanTypeFromOuterPlan(&aggstate->ss); + if (node->chain) + ExecSetSlotDescriptor(aggstate->sort_slot, + aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor); /* * Initialize result tuple type and projection info. @@ -1606,24 +2102,105 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* - * If we are grouping, precompute fmgr lookup data for inner loop. We need - * both equality and hashing functions to do it by hashing, but only - * equality if not hashing. + * For each phase, prepare grouping set data and fmgr lookup data for + * compare functions. Accumulate all_grouped_cols in passing. */ - if (node->numCols > 0) + + aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); + + for (phase = 0; phase < numPhases; ++phase) { - if (node->aggstrategy == AGG_HASHED) - execTuplesHashPrepare(node->numCols, - node->grpOperators, - &aggstate->eqfunctions, - &aggstate->hashfunctions); + AggStatePerPhase phasedata = &aggstate->phases[phase]; + Agg *aggnode; + Sort *sortnode; + int num_sets; + + if (phase > 0) + { + aggnode = list_nth(node->chain, phase-1); + sortnode = (Sort *) aggnode->plan.lefttree; + Assert(IsA(sortnode, Sort)); + } + else + { + aggnode = node; + sortnode = NULL; + } + + phasedata->numsets = num_sets = list_length(aggnode->groupingSets); + + if (num_sets) + { + phasedata->gset_lengths = palloc(num_sets * sizeof(int)); + phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); + + i = 0; + foreach(l, aggnode->groupingSets) + { + int current_length = list_length(lfirst(l)); + Bitmapset *cols = NULL; + + /* planner forces this to be correct */ + for (j = 0; j < current_length; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; + phasedata->gset_lengths[i] = current_length; + ++i; + } + + all_grouped_cols = bms_add_members(all_grouped_cols, + phasedata->grouped_cols[0]); + } else - aggstate->eqfunctions = - execTuplesMatchPrepare(node->numCols, - node->grpOperators); + { + Assert(phase == 0); + + phasedata->gset_lengths = NULL; + phasedata->grouped_cols = NULL; + } + + /* + * If we are grouping, precompute fmgr lookup data for inner loop. + */ + if (aggnode->aggstrategy == AGG_SORTED) + { + Assert(aggnode->numCols > 0); + + phasedata->eqfunctions = + execTuplesMatchPrepare(aggnode->numCols, + aggnode->grpOperators); + } + + phasedata->aggnode = aggnode; + phasedata->sortnode = sortnode; } /* + * Convert all_grouped_cols to a descending-order list. + */ + i = -1; + while ((i = bms_next_member(all_grouped_cols, i)) >= 0) + aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); + + /* + * Hashing can only appear in the initial phase. + */ + + if (node->aggstrategy == AGG_HASHED) + execTuplesHashPrepare(node->numCols, + node->grpOperators, + &aggstate->phases[0].eqfunctions, + &aggstate->hashfunctions); + + /* + * Initialize current phase-dependent values to initial phase + */ + + aggstate->current_phase = 0; + initialize_phase(aggstate, 0); + + /* * Set up aggregate-result storage in the output expr context, and also * allocate my private per-agg working storage */ @@ -1645,7 +2222,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) { AggStatePerGroup pergroup; - pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); + pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) + * numaggs + * numGroupingSets); + aggstate->pergroup = pergroup; } @@ -1708,7 +2288,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Begin filling in the peraggstate data */ peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; - peraggstate->sortstate = NULL; + peraggstate->sortstates =(Tuplesortstate**) + palloc0(sizeof(Tuplesortstate*) * numGroupingSets); + + for (currentsortno = 0; currentsortno < numGroupingSets; currentsortno++) + peraggstate->sortstates[currentsortno] = NULL; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, @@ -2016,31 +2600,41 @@ ExecEndAgg(AggState *node) { PlanState *outerPlan; int aggno; + int numGroupingSets = Max(node->maxsets, 1); + int setno; /* Make sure we have closed any open tuplesorts */ + + if (node->sort_in) + tuplesort_end(node->sort_in); + if (node->sort_out) + tuplesort_end(node->sort_out); + for (aggno = 0; aggno < node->numaggs; aggno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); + for (setno = 0; setno < numGroupingSets; setno++) + { + if (peraggstate->sortstates[setno]) + tuplesort_end(peraggstate->sortstates[setno]); + } } /* And ensure any agg shutdown callbacks have been called */ - ReScanExprContext(node->ss.ps.ps_ExprContext); + for (setno = 0; setno < numGroupingSets; setno++) + ReScanExprContext(node->aggcontexts[setno]); /* - * Free both the expr contexts. + * We don't actually free any ExprContexts here (see comment in + * ExecFreeExprContext), just unlinking the output one from the plan node + * suffices. */ ExecFreeExprContext(&node->ss.ps); - node->ss.ps.ps_ExprContext = node->tmpcontext; - ExecFreeExprContext(&node->ss.ps); /* clean up tuple table */ ExecClearTuple(node->ss.ss_ScanTupleSlot); - MemoryContextDelete(node->aggcontext); - outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -2050,13 +2644,16 @@ ExecReScanAgg(AggState *node) { ExprContext *econtext = node->ss.ps.ps_ExprContext; PlanState *outerPlan = outerPlanState(node); + Agg *aggnode = (Agg *) node->ss.ps.plan; int aggno; + int numGroupingSets = Max(node->maxsets, 1); + int setno; node->agg_done = false; node->ss.ps.ps_TupFromTlist = false; - if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + if (aggnode->aggstrategy == AGG_HASHED) { /* * In the hashed case, if we haven't yet built the hash table then we @@ -2082,14 +2679,34 @@ ExecReScanAgg(AggState *node) /* Make sure we have closed any open tuplesorts */ for (aggno = 0; aggno < node->numaggs; aggno++) { - AggStatePerAgg peraggstate = &node->peragg[aggno]; + for (setno = 0; setno < numGroupingSets; setno++) + { + AggStatePerAgg peraggstate = &node->peragg[aggno]; - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = NULL; + if (peraggstate->sortstates[setno]) + { + tuplesort_end(peraggstate->sortstates[setno]); + peraggstate->sortstates[setno] = NULL; + } + } } - /* We don't need to ReScanExprContext here; ExecReScan already did it */ + /* + * We don't need to ReScanExprContext the output tuple context here; + * ExecReScan already did it. But we do need to reset our per-grouping-set + * contexts, which may have transvalues stored in them. (We use rescan + * rather than just reset because transfns may have registered callbacks + * that need to be run now.) + * + * Note that with AGG_HASHED, the hash table is allocated in a sub-context + * of the aggcontext. This used to be an issue, but now, resetting a + * context automatically deletes sub-contexts too. + */ + + for (setno = 0; setno < numGroupingSets; setno++) + { + ReScanExprContext(node->aggcontexts[setno]); + } /* Release first tuple of group, if we have made a copy */ if (node->grp_firstTuple != NULL) @@ -2097,21 +2714,13 @@ ExecReScanAgg(AggState *node) heap_freetuple(node->grp_firstTuple); node->grp_firstTuple = NULL; } + ExecClearTuple(node->ss.ss_ScanTupleSlot); /* Forget current agg values */ MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); - /* - * Release all temp storage. Note that with AGG_HASHED, the hash table is - * allocated in a sub-context of the aggcontext. We're going to rebuild - * the hash table from scratch, so we need to use - * MemoryContextResetAndDeleteChildren() to avoid leaking the old hash - * table's memory context header. - */ - MemoryContextResetAndDeleteChildren(node->aggcontext); - - if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + if (aggnode->aggstrategy == AGG_HASHED) { /* Rebuild an empty hash table */ build_hash_table(node); @@ -2123,13 +2732,15 @@ ExecReScanAgg(AggState *node) * Reset the per-group state (in particular, mark transvalues null) */ MemSet(node->pergroup, 0, - sizeof(AggStatePerGroupData) * node->numaggs); + sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets); + + /* reset to phase 0 */ + initialize_phase(node, 0); + + node->input_done = false; + node->projected_set = -1; } - /* - * if chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. - */ if (outerPlan->chgParam == NULL) ExecReScan(outerPlan); } @@ -2151,8 +2762,11 @@ ExecReScanAgg(AggState *node) * values could conceivably appear in future.) * * If aggcontext isn't NULL, the function also stores at *aggcontext the - * identity of the memory context that aggregate transition values are - * being stored in. + * identity of the memory context that aggregate transition values are being + * stored in. Note that the same aggregate call site (flinfo) may be called + * interleaved on different transition values in different contexts, so it's + * not kosher to cache aggcontext under fn_extra. It is, however, kosher to + * cache it in the transvalue itself (for internal-type transvalues). */ int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) @@ -2160,7 +2774,11 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) if (fcinfo->context && IsA(fcinfo->context, AggState)) { if (aggcontext) - *aggcontext = ((AggState *) fcinfo->context)->aggcontext; + { + AggState *aggstate = ((AggState *) fcinfo->context); + ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set]; + *aggcontext = cxt->ecxt_per_tuple_memory; + } return AGG_CONTEXT_AGGREGATE; } if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) @@ -2244,8 +2862,9 @@ AggRegisterCallback(FunctionCallInfo fcinfo, if (fcinfo->context && IsA(fcinfo->context, AggState)) { AggState *aggstate = (AggState *) fcinfo->context; + ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set]; - RegisterExprContextCallback(aggstate->ss.ps.ps_ExprContext, func, arg); + RegisterExprContextCallback(cxt, func, arg); return; } |