diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execQual.c | 63 | ||||
-rw-r--r-- | src/backend/executor/execUtils.c | 5 | ||||
-rw-r--r-- | src/backend/executor/nodeAgg.c | 1315 |
3 files changed, 1033 insertions, 350 deletions
diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c index e5994112a42..d414e20f120 100644 --- a/src/backend/executor/execQual.c +++ b/src/backend/executor/execQual.c @@ -181,6 +181,9 @@ static Datum ExecEvalArrayCoerceExpr(ArrayCoerceExprState *astate, bool *isNull, ExprDoneCond *isDone); static Datum ExecEvalCurrentOfExpr(ExprState *exprstate, ExprContext *econtext, bool *isNull, ExprDoneCond *isDone); +static Datum ExecEvalGroupingFuncExpr(GroupingFuncExprState *gstate, + ExprContext *econtext, + bool *isNull, ExprDoneCond *isDone); /* ---------------------------------------------------------------- @@ -3016,6 +3019,44 @@ ExecEvalCaseTestExpr(ExprState *exprstate, return econtext->caseValue_datum; } +/* + * ExecEvalGroupingFuncExpr + * + * Return a bitmask with a bit for each (unevaluated) argument expression + * (rightmost arg is least significant bit). + * + * A bit is set if the corresponding expression is NOT part of the set of + * grouping expressions in the current grouping set. + */ +static Datum +ExecEvalGroupingFuncExpr(GroupingFuncExprState *gstate, + ExprContext *econtext, + bool *isNull, + ExprDoneCond *isDone) +{ + int result = 0; + int attnum = 0; + Bitmapset *grouped_cols = gstate->aggstate->grouped_cols; + ListCell *lc; + + if (isDone) + *isDone = ExprSingleResult; + + *isNull = false; + + foreach(lc, (gstate->clauses)) + { + attnum = lfirst_int(lc); + + result = result << 1; + + if (!bms_is_member(attnum, grouped_cols)) + result = result | 1; + } + + return (Datum) result; +} + /* ---------------------------------------------------------------- * ExecEvalArray - ARRAY[] expressions * ---------------------------------------------------------------- @@ -4482,6 +4523,28 @@ ExecInitExpr(Expr *node, PlanState *parent) state = (ExprState *) astate; } break; + case T_GroupingFunc: + { + GroupingFunc *grp_node = (GroupingFunc *) node; + GroupingFuncExprState *grp_state = makeNode(GroupingFuncExprState); + Agg *agg = NULL; + + if (!parent || !IsA(parent, AggState) || !IsA(parent->plan, Agg)) + elog(ERROR, "parent of GROUPING is not Agg node"); + + grp_state->aggstate = (AggState *) parent; + + agg = (Agg *) (parent->plan); + + if (agg->groupingSets) + grp_state->clauses = grp_node->cols; + else + grp_state->clauses = NIL; + + state = (ExprState *) grp_state; + state->evalfunc = (ExprStateEvalFunc) ExecEvalGroupingFuncExpr; + } + break; case T_WindowFunc: { WindowFunc *wfunc = (WindowFunc *) node; diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 0da8e53e816..3963408b18c 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -642,9 +642,10 @@ get_last_attnums(Node *node, ProjectionInfo *projInfo) /* * Don't examine the arguments or filters of Aggrefs or WindowFuncs, * because those do not represent expressions to be evaluated within the - * overall targetlist's econtext. + * overall targetlist's econtext. GroupingFunc arguments are never + * evaluated at all. */ - if (IsA(node, Aggref)) + if (IsA(node, Aggref) || IsA(node, GroupingFunc)) return false; if (IsA(node, WindowFunc)) return false; diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index fcb61177c51..01a1e67f09e 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -45,15 +45,19 @@ * needed to allow resolution of a polymorphic aggregate's result type. * * We compute aggregate input expressions and run the transition functions - * in a temporary econtext (aggstate->tmpcontext). This is reset at - * least once per input tuple, so when the transvalue datatype is + * in a temporary econtext (aggstate->tmpcontext). This is reset at least + * once per input tuple, so when the transvalue datatype is * pass-by-reference, we have to be careful to copy it into a longer-lived - * memory context, and free the prior value to avoid memory leakage. - * We store transvalues in the memory context aggstate->aggcontext, - * which is also used for the hashtable structures in AGG_HASHED mode. - * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) - * is used to run finalize functions and compute the output tuple; - * this context can be reset once per output tuple. + * memory context, and free the prior value to avoid memory leakage. We + * store transvalues in another set of econtexts, aggstate->aggcontexts + * (one per grouping set, see below), which are also used for the hashtable + * structures in AGG_HASHED mode. These econtexts are rescanned, not just + * reset, at group boundaries so that aggregate transition functions can + * register shutdown callbacks via AggRegisterCallback. + * + * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to + * run finalize functions and compute the output tuple; this context can be + * reset once per output tuple. * * The executor's AggState node is passed as the fmgr "context" value in * all transfunc and finalfunc calls. It is not recommended that the @@ -84,6 +88,36 @@ * need some fallback logic to use this, since there's no Aggref node * for a window function.) * + * Grouping sets: + * + * A list of grouping sets which is structurally equivalent to a ROLLUP + * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over + * ordered data. We do this by keeping a separate set of transition values + * for each grouping set being concurrently processed; for each input tuple + * we update them all, and on group boundaries we reset those states + * (starting at the front of the list) whose grouping values have changed + * (the list of grouping sets is ordered from most specific to least + * specific). + * + * Where more complex grouping sets are used, we break them down into + * "phases", where each phase has a different sort order. During each + * phase but the last, the input tuples are additionally stored in a + * tuplesort which is keyed to the next phase's sort order; during each + * phase but the first, the input tuples are drawn from the previously + * sorted data. (The sorting of the data for the first phase is handled by + * the planner, as it might be satisfied by underlying nodes.) + * + * From the perspective of aggregate transition and final functions, the + * only issue regarding grouping sets is this: a single call site (flinfo) + * of an aggregate function may be used for updating several different + * transition values in turn. So the function must not cache in the flinfo + * anything which logically belongs as part of the transition value (most + * importantly, the memory context in which the transition value exists). + * The support API functions (AggCheckCallContext, AggRegisterCallback) are + * sensitive to the grouping set for which the aggregate function is + * currently being called. + * + * TODO: AGG_HASHED doesn't support multiple grouping sets yet. * * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -241,9 +275,11 @@ typedef struct AggStatePerAggData * then at completion of the input tuple group, we scan the sorted values, * eliminate duplicates if needed, and run the transition function on the * rest. + * + * We need a separate tuplesort for each grouping set. */ - Tuplesortstate *sortstate; /* sort object, if DISTINCT or ORDER BY */ + Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ /* * This field is a pre-initialized FunctionCallInfo struct used for @@ -287,6 +323,27 @@ typedef struct AggStatePerGroupData } AggStatePerGroupData; /* + * AggStatePerPhaseData - per-grouping-set-phase state + * + * Grouping sets are divided into "phases", where a single phase can be + * processed in one pass over the input. If there is more than one phase, then + * at the end of input from the current phase, state is reset and another pass + * taken over the data which has been re-sorted in the mean time. + * + * Accordingly, each phase specifies a list of grouping sets and group clause + * information, plus each phase after the first also has a sort order. + */ +typedef struct AggStatePerPhaseData +{ + int numsets; /* number of grouping sets (or 0) */ + int *gset_lengths; /* lengths of grouping sets */ + Bitmapset **grouped_cols; /* column groupings for rollup */ + FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ + Agg *aggnode; /* Agg node for phase data */ + Sort *sortnode; /* Sort node for input ordering for phase */ +} AggStatePerPhaseData; + +/* * To implement hashed aggregation, we need a hashtable that stores a * representative tuple and an array of AggStatePerGroup structs for each * distinct set of GROUP BY column values. We compute the hash key from @@ -302,9 +359,12 @@ typedef struct AggHashEntryData } AggHashEntryData; +static void initialize_phase(AggState *aggstate, int newphase); +static TupleTableSlot *fetch_input_tuple(AggState *aggstate); static void initialize_aggregates(AggState *aggstate, AggStatePerAgg peragg, - AggStatePerGroup pergroup); + AggStatePerGroup pergroup, + int numReset); static void advance_transition_function(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); @@ -319,6 +379,14 @@ static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull); +static void prepare_projection_slot(AggState *aggstate, + TupleTableSlot *slot, + int currentSet); +static void finalize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup, + int currentSet); +static TupleTableSlot *project_aggregates(AggState *aggstate); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_table(AggState *aggstate); @@ -331,46 +399,135 @@ static Datum GetAggInitVal(Datum textInitVal, Oid transtype); /* - * Initialize all aggregates for a new group of input values. - * - * When called, CurrentMemoryContext should be the per-query context. + * Switch to phase "newphase", which must either be 0 (to reset) or + * current_phase + 1. Juggle the tuplesorts accordingly. */ static void -initialize_aggregates(AggState *aggstate, - AggStatePerAgg peragg, - AggStatePerGroup pergroup) +initialize_phase(AggState *aggstate, int newphase) { - int aggno; + Assert(newphase == 0 || newphase == aggstate->current_phase + 1); - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + /* + * Whatever the previous state, we're now done with whatever input + * tuplesort was in use. + */ + if (aggstate->sort_in) { - AggStatePerAgg peraggstate = &peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; + tuplesort_end(aggstate->sort_in); + aggstate->sort_in = NULL; + } + if (newphase == 0) + { /* - * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. + * Discard any existing output tuplesort. */ - if (peraggstate->numSortCols > 0) + if (aggstate->sort_out) { - /* - * In case of rescan, maybe there could be an uncompleted sort - * operation? Clean it up if so. - */ - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); + tuplesort_end(aggstate->sort_out); + aggstate->sort_out = NULL; + } + } + else + { + /* + * The old output tuplesort becomes the new input one, and this is the + * right time to actually sort it. + */ + aggstate->sort_in = aggstate->sort_out; + aggstate->sort_out = NULL; + Assert(aggstate->sort_in); + tuplesort_performsort(aggstate->sort_in); + } - /* - * We use a plain Datum sorter when there's a single input column; - * otherwise sort the full tuple. (See comments for - * process_ordered_aggregate_single.) - */ - peraggstate->sortstate = - (peraggstate->numInputs == 1) ? + /* + * If this isn't the last phase, we need to sort appropriately for the next + * phase in sequence. + */ + if (newphase < aggstate->numphases - 1) + { + Sort *sortnode = aggstate->phases[newphase+1].sortnode; + PlanState *outerNode = outerPlanState(aggstate); + TupleDesc tupDesc = ExecGetResultType(outerNode); + + aggstate->sort_out = tuplesort_begin_heap(tupDesc, + sortnode->numCols, + sortnode->sortColIdx, + sortnode->sortOperators, + sortnode->collations, + sortnode->nullsFirst, + work_mem, + false); + } + + aggstate->current_phase = newphase; + aggstate->phase = &aggstate->phases[newphase]; +} + +/* + * Fetch a tuple from either the outer plan (for phase 0) or from the sorter + * populated by the previous phase. Copy it to the sorter for the next phase + * if any. + */ +static TupleTableSlot * +fetch_input_tuple(AggState *aggstate) +{ + TupleTableSlot *slot; + + if (aggstate->sort_in) + { + if (!tuplesort_gettupleslot(aggstate->sort_in, true, aggstate->sort_slot)) + return NULL; + slot = aggstate->sort_slot; + } + else + slot = ExecProcNode(outerPlanState(aggstate)); + + if (!TupIsNull(slot) && aggstate->sort_out) + tuplesort_puttupleslot(aggstate->sort_out, slot); + + return slot; +} + +/* + * (Re)Initialize an individual aggregate. + * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +initialize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate) +{ + /* + * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. + */ + if (peraggstate->numSortCols > 0) + { + /* + * In case of rescan, maybe there could be an uncompleted sort + * operation? Clean it up if so. + */ + if (peraggstate->sortstates[aggstate->current_set]) + tuplesort_end(peraggstate->sortstates[aggstate->current_set]); + + + /* + * We use a plain Datum sorter when there's a single input column; + * otherwise sort the full tuple. (See comments for + * process_ordered_aggregate_single.) + */ + if (peraggstate->numInputs == 1) + peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_datum(peraggstate->evaldesc->attrs[0]->atttypid, peraggstate->sortOperators[0], peraggstate->sortCollations[0], peraggstate->sortNullsFirst[0], - work_mem, false) : + work_mem, false); + else + peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc, peraggstate->numSortCols, peraggstate->sortColIdx, @@ -378,41 +535,83 @@ initialize_aggregates(AggState *aggstate, peraggstate->sortCollations, peraggstate->sortNullsFirst, work_mem, false); - } + } - /* - * (Re)set transValue to the initial value. - * - * Note that when the initial value is pass-by-ref, we must copy it - * (into the aggcontext) since we will pfree the transValue later. - */ - if (peraggstate->initValueIsNull) - pergroupstate->transValue = peraggstate->initValue; - else + /* + * (Re)set transValue to the initial value. + * + * Note that when the initial value is pass-by-ref, we must copy + * it (into the aggcontext) since we will pfree the transValue + * later. + */ + if (peraggstate->initValueIsNull) + pergroupstate->transValue = peraggstate->initValue; + else + { + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo( + aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + pergroupstate->transValue = datumCopy(peraggstate->initValue, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + pergroupstate->transValueIsNull = peraggstate->initValueIsNull; + + /* + * If the initial value for the transition state doesn't exist in + * the pg_aggregate table then we will let the first non-NULL + * value returned from the outer procNode become the initial + * value. (This is useful for aggregates like max() and min().) + * The noTransValue flag signals that we still need to do this. + */ + pergroupstate->noTransValue = peraggstate->initValueIsNull; +} + +/* + * Initialize all aggregates for a new group of input values. + * + * If there are multiple grouping sets, we initialize only the first numReset + * of them (the grouping sets are ordered so that the most specific one, which + * is reset most often, is first). As a convenience, if numReset is < 1, we + * reinitialize all sets. + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +initialize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup, + int numReset) +{ + int aggno; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int setno = 0; + + if (numReset < 1) + numReset = numGroupingSets; + + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &peragg[aggno]; + + for (setno = 0; setno < numReset; setno++) { - MemoryContext oldContext; + AggStatePerGroup pergroupstate; - oldContext = MemoryContextSwitchTo(aggstate->aggcontext); - pergroupstate->transValue = datumCopy(peraggstate->initValue, - peraggstate->transtypeByVal, - peraggstate->transtypeLen); - MemoryContextSwitchTo(oldContext); - } - pergroupstate->transValueIsNull = peraggstate->initValueIsNull; + pergroupstate = &pergroup[aggno + (setno * (aggstate->numaggs))]; - /* - * If the initial value for the transition state doesn't exist in the - * pg_aggregate table then we will let the first non-NULL value - * returned from the outer procNode become the initial value. (This is - * useful for aggregates like max() and min().) The noTransValue flag - * signals that we still need to do this. - */ - pergroupstate->noTransValue = peraggstate->initValueIsNull; + aggstate->current_set = setno; + + initialize_aggregate(aggstate, peraggstate, pergroupstate); + } } } /* - * Given new input value(s), advance the transition function of an aggregate. + * Given new input value(s), advance the transition function of one aggregate + * within one grouping set only (already set in aggstate->current_set) * * The new values (and null flags) have been preloaded into argument positions * 1 and up in peraggstate->transfn_fcinfo, so that we needn't copy them again @@ -455,7 +654,8 @@ advance_transition_function(AggState *aggstate, * We must copy the datum into aggcontext if it is pass-by-ref. We * do not need to pfree the old transValue, since it's NULL. */ - oldContext = MemoryContextSwitchTo(aggstate->aggcontext); + oldContext = MemoryContextSwitchTo( + aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); pergroupstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -503,7 +703,7 @@ advance_transition_function(AggState *aggstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(aggstate->aggcontext); + MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -530,11 +730,13 @@ static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) { int aggno; + int setno = 0; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int numAggs = aggstate->numaggs; - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + for (aggno = 0; aggno < numAggs; aggno++) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; ExprState *filter = peraggstate->aggrefstate->aggfilter; int numTransInputs = peraggstate->numTransInputs; int i; @@ -578,13 +780,16 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) continue; } - /* OK, put the tuple into the tuplesort object */ - if (peraggstate->numInputs == 1) - tuplesort_putdatum(peraggstate->sortstate, - slot->tts_values[0], - slot->tts_isnull[0]); - else - tuplesort_puttupleslot(peraggstate->sortstate, slot); + for (setno = 0; setno < numGroupingSets; setno++) + { + /* OK, put the tuple into the tuplesort object */ + if (peraggstate->numInputs == 1) + tuplesort_putdatum(peraggstate->sortstates[setno], + slot->tts_values[0], + slot->tts_isnull[0]); + else + tuplesort_puttupleslot(peraggstate->sortstates[setno], slot); + } } else { @@ -600,7 +805,14 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) fcinfo->argnull[i + 1] = slot->tts_isnull[i]; } - advance_transition_function(aggstate, peraggstate, pergroupstate); + for (setno = 0; setno < numGroupingSets; setno++) + { + AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)]; + + aggstate->current_set = setno; + + advance_transition_function(aggstate, peraggstate, pergroupstate); + } } } } @@ -623,6 +835,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) * is around 300% faster. (The speedup for by-reference types is less * but still noticeable.) * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * * When called, CurrentMemoryContext should be the per-query context. */ static void @@ -642,7 +857,7 @@ process_ordered_aggregate_single(AggState *aggstate, Assert(peraggstate->numDistinctCols < 2); - tuplesort_performsort(peraggstate->sortstate); + tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); /* Load the column into argument 1 (arg 0 will be transition value) */ newVal = fcinfo->arg + 1; @@ -654,8 +869,8 @@ process_ordered_aggregate_single(AggState *aggstate, * pfree them when they are no longer needed. */ - while (tuplesort_getdatum(peraggstate->sortstate, true, - newVal, isNull)) + while (tuplesort_getdatum(peraggstate->sortstates[aggstate->current_set], + true, newVal, isNull)) { /* * Clear and select the working context for evaluation of the equality @@ -698,8 +913,8 @@ process_ordered_aggregate_single(AggState *aggstate, if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); - tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = NULL; + tuplesort_end(peraggstate->sortstates[aggstate->current_set]); + peraggstate->sortstates[aggstate->current_set] = NULL; } /* @@ -709,6 +924,9 @@ process_ordered_aggregate_single(AggState *aggstate, * sort, read out the values in sorted order, and run the transition * function on each value (applying DISTINCT if appropriate). * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * * When called, CurrentMemoryContext should be the per-query context. */ static void @@ -725,13 +943,14 @@ process_ordered_aggregate_multi(AggState *aggstate, bool haveOldValue = false; int i; - tuplesort_performsort(peraggstate->sortstate); + tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]); ExecClearTuple(slot1); if (slot2) ExecClearTuple(slot2); - while (tuplesort_gettupleslot(peraggstate->sortstate, true, slot1)) + while (tuplesort_gettupleslot(peraggstate->sortstates[aggstate->current_set], + true, slot1)) { /* * Extract the first numTransInputs columns as datums to pass to the @@ -779,13 +998,16 @@ process_ordered_aggregate_multi(AggState *aggstate, if (slot2) ExecClearTuple(slot2); - tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = NULL; + tuplesort_end(peraggstate->sortstates[aggstate->current_set]); + peraggstate->sortstates[aggstate->current_set] = NULL; } /* * Compute the final value of one aggregate. * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * * The finalfunction will be run, and the result delivered, in the * output-tuple context; caller's CurrentMemoryContext does not matter. */ @@ -832,7 +1054,7 @@ finalize_aggregate(AggState *aggstate, /* set up aggstate->curperagg for AggGetAggref() */ aggstate->curperagg = peraggstate; - InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), + InitFunctionCallInfoData(fcinfo, &peraggstate->finalfn, numFinalArgs, peraggstate->aggCollation, (void *) aggstate, NULL); @@ -882,6 +1104,154 @@ finalize_aggregate(AggState *aggstate, MemoryContextSwitchTo(oldContext); } + +/* + * Prepare to finalize and project based on the specified representative tuple + * slot and grouping set. + * + * In the specified tuple slot, force to null all attributes that should be + * read as null in the context of the current grouping set. Also stash the + * current group bitmap where GroupingExpr can get at it. + * + * This relies on three conditions: + * + * 1) Nothing is ever going to try and extract the whole tuple from this slot, + * only reference it in evaluations, which will only access individual + * attributes. + * + * 2) No system columns are going to need to be nulled. (If a system column is + * referenced in a group clause, it is actually projected in the outer plan + * tlist.) + * + * 3) Within a given phase, we never need to recover the value of an attribute + * once it has been set to null. + * + * Poking into the slot this way is a bit ugly, but the consensus is that the + * alternative was worse. + */ +static void +prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet) +{ + if (aggstate->phase->grouped_cols) + { + Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet]; + + aggstate->grouped_cols = grouped_cols; + + if (slot->tts_isempty) + { + /* + * Force all values to be NULL if working on an empty input tuple + * (i.e. an empty grouping set for which no input rows were + * supplied). + */ + ExecStoreAllNullTuple(slot); + } + else if (aggstate->all_grouped_cols) + { + ListCell *lc; + + /* all_grouped_cols is arranged in desc order */ + slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols)); + + foreach(lc, aggstate->all_grouped_cols) + { + int attnum = lfirst_int(lc); + + if (!bms_is_member(attnum, grouped_cols)) + slot->tts_isnull[attnum - 1] = true; + } + } + } +} + +/* + * Compute the final value of all aggregates for one group. + * + * This function handles only one grouping set at a time. + * + * Results are stored in the output econtext aggvalues/aggnulls. + */ +static void +finalize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup, + int currentSet) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + Datum *aggvalues = econtext->ecxt_aggvalues; + bool *aggnulls = econtext->ecxt_aggnulls; + int aggno; + + Assert(currentSet == 0 || + ((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + aggstate->current_set = currentSet; + + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &peragg[aggno]; + AggStatePerGroup pergroupstate; + + pergroupstate = &pergroup[aggno + (currentSet * (aggstate->numaggs))]; + + if (peraggstate->numSortCols > 0) + { + Assert(((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + + if (peraggstate->numInputs == 1) + process_ordered_aggregate_single(aggstate, + peraggstate, + pergroupstate); + else + process_ordered_aggregate_multi(aggstate, + peraggstate, + pergroupstate); + } + + finalize_aggregate(aggstate, peraggstate, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); + } +} + +/* + * Project the result of a group (whose aggs have already been calculated by + * finalize_aggregates). Returns the result slot, or NULL if no row is + * projected (suppressed by qual or by an empty SRF). + */ +static TupleTableSlot * +project_aggregates(AggState *aggstate) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + + /* + * Check the qual (HAVING clause); if the group does not match, ignore + * it. + */ + if (ExecQual(aggstate->ss.ps.qual, econtext, false)) + { + /* + * Form and return or store a projection tuple using the aggregate + * results and the representative input tuple. + */ + ExprDoneCond isDone; + TupleTableSlot *result; + + result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); + + if (isDone != ExprEndResult) + { + aggstate->ss.ps.ps_TupFromTlist = + (isDone == ExprMultipleResult); + return result; + } + } + else + InstrCountFiltered1(aggstate, 1); + + return NULL; +} + /* * find_unaggregated_cols * Construct a bitmapset of the column numbers of un-aggregated Vars @@ -916,8 +1286,11 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) *colnos = bms_add_member(*colnos, var->varattno); return false; } - if (IsA(node, Aggref)) /* do not descend into aggregate exprs */ + if (IsA(node, Aggref) || IsA(node, GroupingFunc)) + { + /* do not descend into aggregate exprs */ return false; + } return expression_tree_walker(node, find_unaggregated_cols_walker, (void *) colnos); } @@ -942,11 +1315,11 @@ build_hash_table(AggState *aggstate) aggstate->hashtable = BuildTupleHashTable(node->numCols, node->grpColIdx, - aggstate->eqfunctions, + aggstate->phase->eqfunctions, aggstate->hashfunctions, node->numGroups, entrysize, - aggstate->aggcontext, + aggstate->aggcontexts[0]->ecxt_per_tuple_memory, tmpmem); } @@ -1057,7 +1430,7 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) if (isnew) { /* initialize aggregates for new tuple group */ - initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); + initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup, 0); } return entry; @@ -1079,6 +1452,8 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) TupleTableSlot * ExecAgg(AggState *node) { + TupleTableSlot *result; + /* * Check to see if we're still projecting out tuples from a previous agg * tuple (because there is a function-returning-set in the projection @@ -1086,7 +1461,6 @@ ExecAgg(AggState *node) */ if (node->ss.ps.ps_TupFromTlist) { - TupleTableSlot *result; ExprDoneCond isDone; result = ExecProject(node->ss.ps.ps_ProjInfo, &isDone); @@ -1097,22 +1471,30 @@ ExecAgg(AggState *node) } /* - * Exit if nothing left to do. (We must do the ps_TupFromTlist check - * first, because in some cases agg_done gets set before we emit the final - * aggregate tuple, and we have to finish running SRFs for it.) + * (We must do the ps_TupFromTlist check first, because in some cases + * agg_done gets set before we emit the final aggregate tuple, and we have + * to finish running SRFs for it.) */ - if (node->agg_done) - return NULL; - - /* Dispatch based on strategy */ - if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + if (!node->agg_done) { - if (!node->table_filled) - agg_fill_hash_table(node); - return agg_retrieve_hash_table(node); + /* Dispatch based on strategy */ + switch (node->phase->aggnode->aggstrategy) + { + case AGG_HASHED: + if (!node->table_filled) + agg_fill_hash_table(node); + result = agg_retrieve_hash_table(node); + break; + default: + result = agg_retrieve_direct(node); + break; + } + + if (!TupIsNull(result)) + return result; } - else - return agg_retrieve_direct(node); + + return NULL; } /* @@ -1121,28 +1503,30 @@ ExecAgg(AggState *node) static TupleTableSlot * agg_retrieve_direct(AggState *aggstate) { - Agg *node = (Agg *) aggstate->ss.ps.plan; - PlanState *outerPlan; + Agg *node = aggstate->phase->aggnode; ExprContext *econtext; ExprContext *tmpcontext; - Datum *aggvalues; - bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; TupleTableSlot *outerslot; TupleTableSlot *firstSlot; - int aggno; + TupleTableSlot *result; + bool hasGroupingSets = aggstate->phase->numsets > 0; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int currentSet; + int nextSetSize; + int numReset; + int i; /* * get state info from node + * + * econtext is the per-output-tuple expression context + * tmpcontext is the per-input-tuple expression context */ - outerPlan = outerPlanState(aggstate); - /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; - aggvalues = econtext->ecxt_aggvalues; - aggnulls = econtext->ecxt_aggnulls; - /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; + peragg = aggstate->peragg; pergroup = aggstate->pergroup; firstSlot = aggstate->ss.ss_ScanTupleSlot; @@ -1150,172 +1534,281 @@ agg_retrieve_direct(AggState *aggstate) /* * We loop retrieving groups until we find one matching * aggstate->ss.ps.qual + * + * For grouping sets, we have the invariant that aggstate->projected_set + * is either -1 (initial call) or the index (starting from 0) in + * gset_lengths for the group we just completed (either by projecting a + * row or by discarding it in the qual). */ while (!aggstate->agg_done) { /* - * If we don't already have the first tuple of the new group, fetch it - * from the outer plan. + * Clear the per-output-tuple context for each group, as well as + * aggcontext (which contains any pass-by-ref transvalues of the old + * group). Some aggregate functions store working state in child + * contexts; those now get reset automatically without us needing to + * do anything special. + * + * We use ReScanExprContext not just ResetExprContext because we want + * any registered shutdown callbacks to be called. That allows + * aggregate functions to ensure they've cleaned up any non-memory + * resources. + */ + ReScanExprContext(econtext); + + /* + * Determine how many grouping sets need to be reset at this boundary. */ - if (aggstate->grp_firstTuple == NULL) + if (aggstate->projected_set >= 0 && + aggstate->projected_set < numGroupingSets) + numReset = aggstate->projected_set + 1; + else + numReset = numGroupingSets; + + /* + * numReset can change on a phase boundary, but that's OK; we want to + * reset the contexts used in _this_ phase, and later, after possibly + * changing phase, initialize the right number of aggregates for the + * _new_ phase. + */ + + for (i = 0; i < numReset; i++) + { + ReScanExprContext(aggstate->aggcontexts[i]); + } + + /* + * Check if input is complete and there are no more groups to project + * in this phase; move to next phase or mark as done. + */ + if (aggstate->input_done == true && + aggstate->projected_set >= (numGroupingSets - 1)) { - outerslot = ExecProcNode(outerPlan); - if (!TupIsNull(outerslot)) + if (aggstate->current_phase < aggstate->numphases - 1) { - /* - * Make a copy of the first input tuple; we will use this for - * comparisons (in group mode) and for projection. - */ - aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); + initialize_phase(aggstate, aggstate->current_phase + 1); + aggstate->input_done = false; + aggstate->projected_set = -1; + numGroupingSets = Max(aggstate->phase->numsets, 1); + node = aggstate->phase->aggnode; + numReset = numGroupingSets; } else { - /* outer plan produced no tuples at all */ aggstate->agg_done = true; - /* If we are grouping, we should produce no tuples too */ - if (node->aggstrategy != AGG_PLAIN) - return NULL; + break; } } /* - * Clear the per-output-tuple context for each group, as well as - * aggcontext (which contains any pass-by-ref transvalues of the old - * group). We also clear any child contexts of the aggcontext; some - * aggregate functions store working state in such contexts. - * - * We use ReScanExprContext not just ResetExprContext because we want - * any registered shutdown callbacks to be called. That allows - * aggregate functions to ensure they've cleaned up any non-memory - * resources. + * Get the number of columns in the next grouping set after the last + * projected one (if any). This is the number of columns to compare to + * see if we reached the boundary of that set too. */ - ReScanExprContext(econtext); - - MemoryContextResetAndDeleteChildren(aggstate->aggcontext); + if (aggstate->projected_set >= 0 && + aggstate->projected_set < (numGroupingSets - 1)) + nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1]; + else + nextSetSize = 0; - /* - * Initialize working state for a new input tuple group + /*- + * If a subgroup for the current grouping set is present, project it. + * + * We have a new group if: + * - we're out of input but haven't projected all grouping sets + * (checked above) + * OR + * - we already projected a row that wasn't from the last grouping + * set + * AND + * - the next grouping set has at least one grouping column (since + * empty grouping sets project only once input is exhausted) + * AND + * - the previous and pending rows differ on the grouping columns + * of the next grouping set */ - initialize_aggregates(aggstate, peragg, pergroup); + if (aggstate->input_done || + (node->aggstrategy == AGG_SORTED && + aggstate->projected_set != -1 && + aggstate->projected_set < (numGroupingSets - 1) && + nextSetSize > 0 && + !execTuplesMatch(econtext->ecxt_outertuple, + tmpcontext->ecxt_outertuple, + nextSetSize, + node->grpColIdx, + aggstate->phase->eqfunctions, + tmpcontext->ecxt_per_tuple_memory))) + { + aggstate->projected_set += 1; - if (aggstate->grp_firstTuple != NULL) + Assert(aggstate->projected_set < numGroupingSets); + Assert(nextSetSize > 0 || aggstate->input_done); + } + else { /* - * Store the copied first input tuple in the tuple table slot - * reserved for it. The tuple will be deleted when it is cleared - * from the slot. + * We no longer care what group we just projected, the next + * projection will always be the first (or only) grouping set + * (unless the input proves to be empty). */ - ExecStoreTuple(aggstate->grp_firstTuple, - firstSlot, - InvalidBuffer, - true); - aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ - - /* set up for first advance_aggregates call */ - tmpcontext->ecxt_outertuple = firstSlot; + aggstate->projected_set = 0; /* - * Process each outer-plan tuple, and then fetch the next one, - * until we exhaust the outer plan or cross a group boundary. + * If we don't already have the first tuple of the new group, + * fetch it from the outer plan. */ - for (;;) + if (aggstate->grp_firstTuple == NULL) { - advance_aggregates(aggstate, pergroup); - - /* Reset per-input-tuple context after each tuple */ - ResetExprContext(tmpcontext); - - outerslot = ExecProcNode(outerPlan); - if (TupIsNull(outerslot)) + outerslot = fetch_input_tuple(aggstate); + if (!TupIsNull(outerslot)) { - /* no more outer-plan tuples available */ - aggstate->agg_done = true; - break; + /* + * Make a copy of the first input tuple; we will use this + * for comparisons (in group mode) and for projection. + */ + aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); } - /* set up for next advance_aggregates call */ - tmpcontext->ecxt_outertuple = outerslot; - - /* - * If we are grouping, check whether we've crossed a group - * boundary. - */ - if (node->aggstrategy == AGG_SORTED) + else { - if (!execTuplesMatch(firstSlot, - outerslot, - node->numCols, node->grpColIdx, - aggstate->eqfunctions, - tmpcontext->ecxt_per_tuple_memory)) + /* outer plan produced no tuples at all */ + if (hasGroupingSets) { /* - * Save the first input tuple of the next group. + * If there was no input at all, we need to project + * rows only if there are grouping sets of size 0. + * Note that this implies that there can't be any + * references to ungrouped Vars, which would otherwise + * cause issues with the empty output slot. + * + * XXX: This is no longer true, we currently deal with + * this in finalize_aggregates(). */ - aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); - break; + aggstate->input_done = true; + + while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0) + { + aggstate->projected_set += 1; + if (aggstate->projected_set >= numGroupingSets) + { + /* + * We can't set agg_done here because we might + * have more phases to do, even though the + * input is empty. So we need to restart the + * whole outer loop. + */ + break; + } + } + + if (aggstate->projected_set >= numGroupingSets) + continue; + } + else + { + aggstate->agg_done = true; + /* If we are grouping, we should produce no tuples too */ + if (node->aggstrategy != AGG_PLAIN) + return NULL; } } } - } - /* - * Use the representative input tuple for any references to - * non-aggregated input columns in aggregate direct args, the node - * qual, and the tlist. (If we are not grouping, and there are no - * input rows at all, we will come here with an empty firstSlot ... - * but if not grouping, there can't be any references to - * non-aggregated input columns, so no problem.) - */ - econtext->ecxt_outertuple = firstSlot; - - /* - * Done scanning input tuple group. Finalize each aggregate - * calculation, and stash results in the per-output-tuple context. - */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) - { - AggStatePerAgg peraggstate = &peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; + /* + * Initialize working state for a new input tuple group. + */ + initialize_aggregates(aggstate, peragg, pergroup, numReset); - if (peraggstate->numSortCols > 0) + if (aggstate->grp_firstTuple != NULL) { - if (peraggstate->numInputs == 1) - process_ordered_aggregate_single(aggstate, - peraggstate, - pergroupstate); - else - process_ordered_aggregate_multi(aggstate, - peraggstate, - pergroupstate); - } + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecStoreTuple(aggstate->grp_firstTuple, + firstSlot, + InvalidBuffer, + true); + aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ - finalize_aggregate(aggstate, peraggstate, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); - } + /* set up for first advance_aggregates call */ + tmpcontext->ecxt_outertuple = firstSlot; - /* - * Check the qual (HAVING clause); if the group does not match, ignore - * it and loop back to try to process another group. - */ - if (ExecQual(aggstate->ss.ps.qual, econtext, false)) - { - /* - * Form and return a projection tuple using the aggregate results - * and the representative input tuple. - */ - TupleTableSlot *result; - ExprDoneCond isDone; + /* + * Process each outer-plan tuple, and then fetch the next one, + * until we exhaust the outer plan or cross a group boundary. + */ + for (;;) + { + advance_aggregates(aggstate, pergroup); - result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); - if (isDone != ExprEndResult) - { - aggstate->ss.ps.ps_TupFromTlist = - (isDone == ExprMultipleResult); - return result; + outerslot = fetch_input_tuple(aggstate); + if (TupIsNull(outerslot)) + { + /* no more outer-plan tuples available */ + if (hasGroupingSets) + { + aggstate->input_done = true; + break; + } + else + { + aggstate->agg_done = true; + break; + } + } + /* set up for next advance_aggregates call */ + tmpcontext->ecxt_outertuple = outerslot; + + /* + * If we are grouping, check whether we've crossed a group + * boundary. + */ + if (node->aggstrategy == AGG_SORTED) + { + if (!execTuplesMatch(firstSlot, + outerslot, + node->numCols, + node->grpColIdx, + aggstate->phase->eqfunctions, + tmpcontext->ecxt_per_tuple_memory)) + { + aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot); + break; + } + } + } } + + /* + * Use the representative input tuple for any references to + * non-aggregated input columns in aggregate direct args, the node + * qual, and the tlist. (If we are not grouping, and there are no + * input rows at all, we will come here with an empty firstSlot ... + * but if not grouping, there can't be any references to + * non-aggregated input columns, so no problem.) + */ + econtext->ecxt_outertuple = firstSlot; } - else - InstrCountFiltered1(aggstate, 1); + + Assert(aggstate->projected_set >= 0); + + currentSet = aggstate->projected_set; + + prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); + + finalize_aggregates(aggstate, peragg, pergroup, currentSet); + + /* + * If there's no row to project right now, we must continue rather than + * returning a null since there might be more groups. + */ + result = project_aggregates(aggstate); + if (result) + return result; } /* No more groups */ @@ -1328,16 +1821,15 @@ agg_retrieve_direct(AggState *aggstate) static void agg_fill_hash_table(AggState *aggstate) { - PlanState *outerPlan; ExprContext *tmpcontext; AggHashEntry entry; TupleTableSlot *outerslot; /* * get state info from node + * + * tmpcontext is the per-input-tuple expression context */ - outerPlan = outerPlanState(aggstate); - /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; /* @@ -1346,7 +1838,7 @@ agg_fill_hash_table(AggState *aggstate) */ for (;;) { - outerslot = ExecProcNode(outerPlan); + outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) break; /* set up for advance_aggregates call */ @@ -1374,21 +1866,17 @@ static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate) { ExprContext *econtext; - Datum *aggvalues; - bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; AggHashEntry entry; TupleTableSlot *firstSlot; - int aggno; + TupleTableSlot *result; /* * get state info from node */ /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; - aggvalues = econtext->ecxt_aggvalues; - aggnulls = econtext->ecxt_aggnulls; peragg = aggstate->peragg; firstSlot = aggstate->ss.ss_ScanTupleSlot; @@ -1428,19 +1916,7 @@ agg_retrieve_hash_table(AggState *aggstate) pergroup = entry->pergroup; - /* - * Finalize each aggregate calculation, and stash results in the - * per-output-tuple context. - */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) - { - AggStatePerAgg peraggstate = &peragg[aggno]; - AggStatePerGroup pergroupstate = &pergroup[aggno]; - - Assert(peraggstate->numSortCols == 0); - finalize_aggregate(aggstate, peraggstate, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); - } + finalize_aggregates(aggstate, peragg, pergroup, 0); /* * Use the representative input tuple for any references to @@ -1448,30 +1924,9 @@ agg_retrieve_hash_table(AggState *aggstate) */ econtext->ecxt_outertuple = firstSlot; - /* - * Check the qual (HAVING clause); if the group does not match, ignore - * it and loop back to try to process another group. - */ - if (ExecQual(aggstate->ss.ps.qual, econtext, false)) - { - /* - * Form and return a projection tuple using the aggregate results - * and the representative input tuple. - */ - TupleTableSlot *result; - ExprDoneCond isDone; - - result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone); - - if (isDone != ExprEndResult) - { - aggstate->ss.ps.ps_TupFromTlist = - (isDone == ExprMultipleResult); - return result; - } - } - else - InstrCountFiltered1(aggstate, 1); + result = project_aggregates(aggstate); + if (result) + return result; } /* No more groups */ @@ -1494,7 +1949,14 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) ExprContext *econtext; int numaggs, aggno; + int phase; ListCell *l; + Bitmapset *all_grouped_cols = NULL; + int numGroupingSets = 1; + int numPhases; + int currentsortno = 0; + int i = 0; + int j = 0; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -1508,38 +1970,68 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->aggs = NIL; aggstate->numaggs = 0; - aggstate->eqfunctions = NULL; + aggstate->maxsets = 0; aggstate->hashfunctions = NULL; + aggstate->projected_set = -1; + aggstate->current_set = 0; aggstate->peragg = NULL; aggstate->curperagg = NULL; aggstate->agg_done = false; + aggstate->input_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; aggstate->hashtable = NULL; + aggstate->sort_in = NULL; + aggstate->sort_out = NULL; /* - * Create expression contexts. We need two, one for per-input-tuple - * processing and one for per-output-tuple processing. We cheat a little - * by using ExecAssignExprContext() to build both. + * Calculate the maximum number of grouping sets in any phase; this + * determines the size of some allocations. */ - ExecAssignExprContext(estate, &aggstate->ss.ps); - aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; - ExecAssignExprContext(estate, &aggstate->ss.ps); + if (node->groupingSets) + { + Assert(node->aggstrategy != AGG_HASHED); + + numGroupingSets = list_length(node->groupingSets); + + foreach(l, node->chain) + { + Agg *agg = lfirst(l); + + numGroupingSets = Max(numGroupingSets, + list_length(agg->groupingSets)); + } + } + + aggstate->maxsets = numGroupingSets; + aggstate->numphases = numPhases = 1 + list_length(node->chain); + + aggstate->aggcontexts = (ExprContext **) + palloc0(sizeof(ExprContext *) * numGroupingSets); /* - * We also need a long-lived memory context for holding hashtable data - * structures and transition values. NOTE: the details of what is stored - * in aggcontext and what is stored in the regular per-query memory - * context are driven by a simple decision: we want to reset the - * aggcontext at group boundaries (if not hashing) and in ExecReScanAgg to - * recover no-longer-wanted space. + * Create expression contexts. We need three or more, one for + * per-input-tuple processing, one for per-output-tuple processing, and + * one for each grouping set. The per-tuple memory context of the + * per-grouping-set ExprContexts (aggcontexts) replaces the standalone + * memory context formerly used to hold transition values. We cheat a + * little by using ExecAssignExprContext() to build all of them. + * + * NOTE: the details of what is stored in aggcontexts and what is stored + * in the regular per-query memory context are driven by a simple + * decision: we want to reset the aggcontext at group boundaries (if not + * hashing) and in ExecReScanAgg to recover no-longer-wanted space. */ - aggstate->aggcontext = - AllocSetContextCreate(CurrentMemoryContext, - "AggContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; + + for (i = 0; i < numGroupingSets; ++i) + { + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext; + } + + ExecAssignExprContext(estate, &aggstate->ss.ps); /* * tuple table initialization @@ -1547,6 +2039,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) ExecInitScanTupleSlot(estate, &aggstate->ss); ExecInitResultTupleSlot(estate, &aggstate->ss.ps); aggstate->hashslot = ExecInitExtraTupleSlot(estate); + aggstate->sort_slot = ExecInitExtraTupleSlot(estate); /* * initialize child expressions @@ -1565,7 +2058,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) (PlanState *) aggstate); /* - * initialize child nodes + * Initialize child nodes. * * If we are doing a hashed aggregation then the child plan does not need * to handle REWIND efficiently; see ExecReScanAgg. @@ -1579,6 +2072,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * initialize source tuple type. */ ExecAssignScanTypeFromOuterPlan(&aggstate->ss); + if (node->chain) + ExecSetSlotDescriptor(aggstate->sort_slot, + aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor); /* * Initialize result tuple type and projection info. @@ -1606,24 +2102,105 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* - * If we are grouping, precompute fmgr lookup data for inner loop. We need - * both equality and hashing functions to do it by hashing, but only - * equality if not hashing. + * For each phase, prepare grouping set data and fmgr lookup data for + * compare functions. Accumulate all_grouped_cols in passing. */ - if (node->numCols > 0) + + aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); + + for (phase = 0; phase < numPhases; ++phase) { - if (node->aggstrategy == AGG_HASHED) - execTuplesHashPrepare(node->numCols, - node->grpOperators, - &aggstate->eqfunctions, - &aggstate->hashfunctions); + AggStatePerPhase phasedata = &aggstate->phases[phase]; + Agg *aggnode; + Sort *sortnode; + int num_sets; + + if (phase > 0) + { + aggnode = list_nth(node->chain, phase-1); + sortnode = (Sort *) aggnode->plan.lefttree; + Assert(IsA(sortnode, Sort)); + } + else + { + aggnode = node; + sortnode = NULL; + } + + phasedata->numsets = num_sets = list_length(aggnode->groupingSets); + + if (num_sets) + { + phasedata->gset_lengths = palloc(num_sets * sizeof(int)); + phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); + + i = 0; + foreach(l, aggnode->groupingSets) + { + int current_length = list_length(lfirst(l)); + Bitmapset *cols = NULL; + + /* planner forces this to be correct */ + for (j = 0; j < current_length; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; + phasedata->gset_lengths[i] = current_length; + ++i; + } + + all_grouped_cols = bms_add_members(all_grouped_cols, + phasedata->grouped_cols[0]); + } else - aggstate->eqfunctions = - execTuplesMatchPrepare(node->numCols, - node->grpOperators); + { + Assert(phase == 0); + + phasedata->gset_lengths = NULL; + phasedata->grouped_cols = NULL; + } + + /* + * If we are grouping, precompute fmgr lookup data for inner loop. + */ + if (aggnode->aggstrategy == AGG_SORTED) + { + Assert(aggnode->numCols > 0); + + phasedata->eqfunctions = + execTuplesMatchPrepare(aggnode->numCols, + aggnode->grpOperators); + } + + phasedata->aggnode = aggnode; + phasedata->sortnode = sortnode; } /* + * Convert all_grouped_cols to a descending-order list. + */ + i = -1; + while ((i = bms_next_member(all_grouped_cols, i)) >= 0) + aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); + + /* + * Hashing can only appear in the initial phase. + */ + + if (node->aggstrategy == AGG_HASHED) + execTuplesHashPrepare(node->numCols, + node->grpOperators, + &aggstate->phases[0].eqfunctions, + &aggstate->hashfunctions); + + /* + * Initialize current phase-dependent values to initial phase + */ + + aggstate->current_phase = 0; + initialize_phase(aggstate, 0); + + /* * Set up aggregate-result storage in the output expr context, and also * allocate my private per-agg working storage */ @@ -1645,7 +2222,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) { AggStatePerGroup pergroup; - pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); + pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) + * numaggs + * numGroupingSets); + aggstate->pergroup = pergroup; } @@ -1708,7 +2288,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Begin filling in the peraggstate data */ peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; - peraggstate->sortstate = NULL; + peraggstate->sortstates =(Tuplesortstate**) + palloc0(sizeof(Tuplesortstate*) * numGroupingSets); + + for (currentsortno = 0; currentsortno < numGroupingSets; currentsortno++) + peraggstate->sortstates[currentsortno] = NULL; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, @@ -2016,31 +2600,41 @@ ExecEndAgg(AggState *node) { PlanState *outerPlan; int aggno; + int numGroupingSets = Max(node->maxsets, 1); + int setno; /* Make sure we have closed any open tuplesorts */ + + if (node->sort_in) + tuplesort_end(node->sort_in); + if (node->sort_out) + tuplesort_end(node->sort_out); + for (aggno = 0; aggno < node->numaggs; aggno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); + for (setno = 0; setno < numGroupingSets; setno++) + { + if (peraggstate->sortstates[setno]) + tuplesort_end(peraggstate->sortstates[setno]); + } } /* And ensure any agg shutdown callbacks have been called */ - ReScanExprContext(node->ss.ps.ps_ExprContext); + for (setno = 0; setno < numGroupingSets; setno++) + ReScanExprContext(node->aggcontexts[setno]); /* - * Free both the expr contexts. + * We don't actually free any ExprContexts here (see comment in + * ExecFreeExprContext), just unlinking the output one from the plan node + * suffices. */ ExecFreeExprContext(&node->ss.ps); - node->ss.ps.ps_ExprContext = node->tmpcontext; - ExecFreeExprContext(&node->ss.ps); /* clean up tuple table */ ExecClearTuple(node->ss.ss_ScanTupleSlot); - MemoryContextDelete(node->aggcontext); - outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -2050,13 +2644,16 @@ ExecReScanAgg(AggState *node) { ExprContext *econtext = node->ss.ps.ps_ExprContext; PlanState *outerPlan = outerPlanState(node); + Agg *aggnode = (Agg *) node->ss.ps.plan; int aggno; + int numGroupingSets = Max(node->maxsets, 1); + int setno; node->agg_done = false; node->ss.ps.ps_TupFromTlist = false; - if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + if (aggnode->aggstrategy == AGG_HASHED) { /* * In the hashed case, if we haven't yet built the hash table then we @@ -2082,14 +2679,34 @@ ExecReScanAgg(AggState *node) /* Make sure we have closed any open tuplesorts */ for (aggno = 0; aggno < node->numaggs; aggno++) { - AggStatePerAgg peraggstate = &node->peragg[aggno]; + for (setno = 0; setno < numGroupingSets; setno++) + { + AggStatePerAgg peraggstate = &node->peragg[aggno]; - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = NULL; + if (peraggstate->sortstates[setno]) + { + tuplesort_end(peraggstate->sortstates[setno]); + peraggstate->sortstates[setno] = NULL; + } + } } - /* We don't need to ReScanExprContext here; ExecReScan already did it */ + /* + * We don't need to ReScanExprContext the output tuple context here; + * ExecReScan already did it. But we do need to reset our per-grouping-set + * contexts, which may have transvalues stored in them. (We use rescan + * rather than just reset because transfns may have registered callbacks + * that need to be run now.) + * + * Note that with AGG_HASHED, the hash table is allocated in a sub-context + * of the aggcontext. This used to be an issue, but now, resetting a + * context automatically deletes sub-contexts too. + */ + + for (setno = 0; setno < numGroupingSets; setno++) + { + ReScanExprContext(node->aggcontexts[setno]); + } /* Release first tuple of group, if we have made a copy */ if (node->grp_firstTuple != NULL) @@ -2097,21 +2714,13 @@ ExecReScanAgg(AggState *node) heap_freetuple(node->grp_firstTuple); node->grp_firstTuple = NULL; } + ExecClearTuple(node->ss.ss_ScanTupleSlot); /* Forget current agg values */ MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); - /* - * Release all temp storage. Note that with AGG_HASHED, the hash table is - * allocated in a sub-context of the aggcontext. We're going to rebuild - * the hash table from scratch, so we need to use - * MemoryContextResetAndDeleteChildren() to avoid leaking the old hash - * table's memory context header. - */ - MemoryContextResetAndDeleteChildren(node->aggcontext); - - if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + if (aggnode->aggstrategy == AGG_HASHED) { /* Rebuild an empty hash table */ build_hash_table(node); @@ -2123,13 +2732,15 @@ ExecReScanAgg(AggState *node) * Reset the per-group state (in particular, mark transvalues null) */ MemSet(node->pergroup, 0, - sizeof(AggStatePerGroupData) * node->numaggs); + sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets); + + /* reset to phase 0 */ + initialize_phase(node, 0); + + node->input_done = false; + node->projected_set = -1; } - /* - * if chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. - */ if (outerPlan->chgParam == NULL) ExecReScan(outerPlan); } @@ -2151,8 +2762,11 @@ ExecReScanAgg(AggState *node) * values could conceivably appear in future.) * * If aggcontext isn't NULL, the function also stores at *aggcontext the - * identity of the memory context that aggregate transition values are - * being stored in. + * identity of the memory context that aggregate transition values are being + * stored in. Note that the same aggregate call site (flinfo) may be called + * interleaved on different transition values in different contexts, so it's + * not kosher to cache aggcontext under fn_extra. It is, however, kosher to + * cache it in the transvalue itself (for internal-type transvalues). */ int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) @@ -2160,7 +2774,11 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) if (fcinfo->context && IsA(fcinfo->context, AggState)) { if (aggcontext) - *aggcontext = ((AggState *) fcinfo->context)->aggcontext; + { + AggState *aggstate = ((AggState *) fcinfo->context); + ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set]; + *aggcontext = cxt->ecxt_per_tuple_memory; + } return AGG_CONTEXT_AGGREGATE; } if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) @@ -2244,8 +2862,9 @@ AggRegisterCallback(FunctionCallInfo fcinfo, if (fcinfo->context && IsA(fcinfo->context, AggState)) { AggState *aggstate = (AggState *) fcinfo->context; + ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set]; - RegisterExprContextCallback(aggstate->ss.ps.ps_ExprContext, func, arg); + RegisterExprContextCallback(cxt, func, arg); return; } |