aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execQual.c63
-rw-r--r--src/backend/executor/execUtils.c5
-rw-r--r--src/backend/executor/nodeAgg.c1315
3 files changed, 1033 insertions, 350 deletions
diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c
index e5994112a42..d414e20f120 100644
--- a/src/backend/executor/execQual.c
+++ b/src/backend/executor/execQual.c
@@ -181,6 +181,9 @@ static Datum ExecEvalArrayCoerceExpr(ArrayCoerceExprState *astate,
bool *isNull, ExprDoneCond *isDone);
static Datum ExecEvalCurrentOfExpr(ExprState *exprstate, ExprContext *econtext,
bool *isNull, ExprDoneCond *isDone);
+static Datum ExecEvalGroupingFuncExpr(GroupingFuncExprState *gstate,
+ ExprContext *econtext,
+ bool *isNull, ExprDoneCond *isDone);
/* ----------------------------------------------------------------
@@ -3016,6 +3019,44 @@ ExecEvalCaseTestExpr(ExprState *exprstate,
return econtext->caseValue_datum;
}
+/*
+ * ExecEvalGroupingFuncExpr
+ *
+ * Return a bitmask with a bit for each (unevaluated) argument expression
+ * (rightmost arg is least significant bit).
+ *
+ * A bit is set if the corresponding expression is NOT part of the set of
+ * grouping expressions in the current grouping set.
+ */
+static Datum
+ExecEvalGroupingFuncExpr(GroupingFuncExprState *gstate,
+ ExprContext *econtext,
+ bool *isNull,
+ ExprDoneCond *isDone)
+{
+ int result = 0;
+ int attnum = 0;
+ Bitmapset *grouped_cols = gstate->aggstate->grouped_cols;
+ ListCell *lc;
+
+ if (isDone)
+ *isDone = ExprSingleResult;
+
+ *isNull = false;
+
+ foreach(lc, (gstate->clauses))
+ {
+ attnum = lfirst_int(lc);
+
+ result = result << 1;
+
+ if (!bms_is_member(attnum, grouped_cols))
+ result = result | 1;
+ }
+
+ return (Datum) result;
+}
+
/* ----------------------------------------------------------------
* ExecEvalArray - ARRAY[] expressions
* ----------------------------------------------------------------
@@ -4482,6 +4523,28 @@ ExecInitExpr(Expr *node, PlanState *parent)
state = (ExprState *) astate;
}
break;
+ case T_GroupingFunc:
+ {
+ GroupingFunc *grp_node = (GroupingFunc *) node;
+ GroupingFuncExprState *grp_state = makeNode(GroupingFuncExprState);
+ Agg *agg = NULL;
+
+ if (!parent || !IsA(parent, AggState) || !IsA(parent->plan, Agg))
+ elog(ERROR, "parent of GROUPING is not Agg node");
+
+ grp_state->aggstate = (AggState *) parent;
+
+ agg = (Agg *) (parent->plan);
+
+ if (agg->groupingSets)
+ grp_state->clauses = grp_node->cols;
+ else
+ grp_state->clauses = NIL;
+
+ state = (ExprState *) grp_state;
+ state->evalfunc = (ExprStateEvalFunc) ExecEvalGroupingFuncExpr;
+ }
+ break;
case T_WindowFunc:
{
WindowFunc *wfunc = (WindowFunc *) node;
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 0da8e53e816..3963408b18c 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -642,9 +642,10 @@ get_last_attnums(Node *node, ProjectionInfo *projInfo)
/*
* Don't examine the arguments or filters of Aggrefs or WindowFuncs,
* because those do not represent expressions to be evaluated within the
- * overall targetlist's econtext.
+ * overall targetlist's econtext. GroupingFunc arguments are never
+ * evaluated at all.
*/
- if (IsA(node, Aggref))
+ if (IsA(node, Aggref) || IsA(node, GroupingFunc))
return false;
if (IsA(node, WindowFunc))
return false;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index fcb61177c51..01a1e67f09e 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -45,15 +45,19 @@
* needed to allow resolution of a polymorphic aggregate's result type.
*
* We compute aggregate input expressions and run the transition functions
- * in a temporary econtext (aggstate->tmpcontext). This is reset at
- * least once per input tuple, so when the transvalue datatype is
+ * in a temporary econtext (aggstate->tmpcontext). This is reset at least
+ * once per input tuple, so when the transvalue datatype is
* pass-by-reference, we have to be careful to copy it into a longer-lived
- * memory context, and free the prior value to avoid memory leakage.
- * We store transvalues in the memory context aggstate->aggcontext,
- * which is also used for the hashtable structures in AGG_HASHED mode.
- * The node's regular econtext (aggstate->ss.ps.ps_ExprContext)
- * is used to run finalize functions and compute the output tuple;
- * this context can be reset once per output tuple.
+ * memory context, and free the prior value to avoid memory leakage. We
+ * store transvalues in another set of econtexts, aggstate->aggcontexts
+ * (one per grouping set, see below), which are also used for the hashtable
+ * structures in AGG_HASHED mode. These econtexts are rescanned, not just
+ * reset, at group boundaries so that aggregate transition functions can
+ * register shutdown callbacks via AggRegisterCallback.
+ *
+ * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
+ * run finalize functions and compute the output tuple; this context can be
+ * reset once per output tuple.
*
* The executor's AggState node is passed as the fmgr "context" value in
* all transfunc and finalfunc calls. It is not recommended that the
@@ -84,6 +88,36 @@
* need some fallback logic to use this, since there's no Aggref node
* for a window function.)
*
+ * Grouping sets:
+ *
+ * A list of grouping sets which is structurally equivalent to a ROLLUP
+ * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
+ * ordered data. We do this by keeping a separate set of transition values
+ * for each grouping set being concurrently processed; for each input tuple
+ * we update them all, and on group boundaries we reset those states
+ * (starting at the front of the list) whose grouping values have changed
+ * (the list of grouping sets is ordered from most specific to least
+ * specific).
+ *
+ * Where more complex grouping sets are used, we break them down into
+ * "phases", where each phase has a different sort order. During each
+ * phase but the last, the input tuples are additionally stored in a
+ * tuplesort which is keyed to the next phase's sort order; during each
+ * phase but the first, the input tuples are drawn from the previously
+ * sorted data. (The sorting of the data for the first phase is handled by
+ * the planner, as it might be satisfied by underlying nodes.)
+ *
+ * From the perspective of aggregate transition and final functions, the
+ * only issue regarding grouping sets is this: a single call site (flinfo)
+ * of an aggregate function may be used for updating several different
+ * transition values in turn. So the function must not cache in the flinfo
+ * anything which logically belongs as part of the transition value (most
+ * importantly, the memory context in which the transition value exists).
+ * The support API functions (AggCheckCallContext, AggRegisterCallback) are
+ * sensitive to the grouping set for which the aggregate function is
+ * currently being called.
+ *
+ * TODO: AGG_HASHED doesn't support multiple grouping sets yet.
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -241,9 +275,11 @@ typedef struct AggStatePerAggData
* then at completion of the input tuple group, we scan the sorted values,
* eliminate duplicates if needed, and run the transition function on the
* rest.
+ *
+ * We need a separate tuplesort for each grouping set.
*/
- Tuplesortstate *sortstate; /* sort object, if DISTINCT or ORDER BY */
+ Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */
/*
* This field is a pre-initialized FunctionCallInfo struct used for
@@ -287,6 +323,27 @@ typedef struct AggStatePerGroupData
} AggStatePerGroupData;
/*
+ * AggStatePerPhaseData - per-grouping-set-phase state
+ *
+ * Grouping sets are divided into "phases", where a single phase can be
+ * processed in one pass over the input. If there is more than one phase, then
+ * at the end of input from the current phase, state is reset and another pass
+ * taken over the data which has been re-sorted in the mean time.
+ *
+ * Accordingly, each phase specifies a list of grouping sets and group clause
+ * information, plus each phase after the first also has a sort order.
+ */
+typedef struct AggStatePerPhaseData
+{
+ int numsets; /* number of grouping sets (or 0) */
+ int *gset_lengths; /* lengths of grouping sets */
+ Bitmapset **grouped_cols; /* column groupings for rollup */
+ FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
+ Agg *aggnode; /* Agg node for phase data */
+ Sort *sortnode; /* Sort node for input ordering for phase */
+} AggStatePerPhaseData;
+
+/*
* To implement hashed aggregation, we need a hashtable that stores a
* representative tuple and an array of AggStatePerGroup structs for each
* distinct set of GROUP BY column values. We compute the hash key from
@@ -302,9 +359,12 @@ typedef struct AggHashEntryData
} AggHashEntryData;
+static void initialize_phase(AggState *aggstate, int newphase);
+static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
static void initialize_aggregates(AggState *aggstate,
AggStatePerAgg peragg,
- AggStatePerGroup pergroup);
+ AggStatePerGroup pergroup,
+ int numReset);
static void advance_transition_function(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate);
@@ -319,6 +379,14 @@ static void finalize_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull);
+static void prepare_projection_slot(AggState *aggstate,
+ TupleTableSlot *slot,
+ int currentSet);
+static void finalize_aggregates(AggState *aggstate,
+ AggStatePerAgg peragg,
+ AggStatePerGroup pergroup,
+ int currentSet);
+static TupleTableSlot *project_aggregates(AggState *aggstate);
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
static void build_hash_table(AggState *aggstate);
@@ -331,46 +399,135 @@ static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
/*
- * Initialize all aggregates for a new group of input values.
- *
- * When called, CurrentMemoryContext should be the per-query context.
+ * Switch to phase "newphase", which must either be 0 (to reset) or
+ * current_phase + 1. Juggle the tuplesorts accordingly.
*/
static void
-initialize_aggregates(AggState *aggstate,
- AggStatePerAgg peragg,
- AggStatePerGroup pergroup)
+initialize_phase(AggState *aggstate, int newphase)
{
- int aggno;
+ Assert(newphase == 0 || newphase == aggstate->current_phase + 1);
- for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ /*
+ * Whatever the previous state, we're now done with whatever input
+ * tuplesort was in use.
+ */
+ if (aggstate->sort_in)
{
- AggStatePerAgg peraggstate = &peragg[aggno];
- AggStatePerGroup pergroupstate = &pergroup[aggno];
+ tuplesort_end(aggstate->sort_in);
+ aggstate->sort_in = NULL;
+ }
+ if (newphase == 0)
+ {
/*
- * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
+ * Discard any existing output tuplesort.
*/
- if (peraggstate->numSortCols > 0)
+ if (aggstate->sort_out)
{
- /*
- * In case of rescan, maybe there could be an uncompleted sort
- * operation? Clean it up if so.
- */
- if (peraggstate->sortstate)
- tuplesort_end(peraggstate->sortstate);
+ tuplesort_end(aggstate->sort_out);
+ aggstate->sort_out = NULL;
+ }
+ }
+ else
+ {
+ /*
+ * The old output tuplesort becomes the new input one, and this is the
+ * right time to actually sort it.
+ */
+ aggstate->sort_in = aggstate->sort_out;
+ aggstate->sort_out = NULL;
+ Assert(aggstate->sort_in);
+ tuplesort_performsort(aggstate->sort_in);
+ }
- /*
- * We use a plain Datum sorter when there's a single input column;
- * otherwise sort the full tuple. (See comments for
- * process_ordered_aggregate_single.)
- */
- peraggstate->sortstate =
- (peraggstate->numInputs == 1) ?
+ /*
+ * If this isn't the last phase, we need to sort appropriately for the next
+ * phase in sequence.
+ */
+ if (newphase < aggstate->numphases - 1)
+ {
+ Sort *sortnode = aggstate->phases[newphase+1].sortnode;
+ PlanState *outerNode = outerPlanState(aggstate);
+ TupleDesc tupDesc = ExecGetResultType(outerNode);
+
+ aggstate->sort_out = tuplesort_begin_heap(tupDesc,
+ sortnode->numCols,
+ sortnode->sortColIdx,
+ sortnode->sortOperators,
+ sortnode->collations,
+ sortnode->nullsFirst,
+ work_mem,
+ false);
+ }
+
+ aggstate->current_phase = newphase;
+ aggstate->phase = &aggstate->phases[newphase];
+}
+
+/*
+ * Fetch a tuple from either the outer plan (for phase 0) or from the sorter
+ * populated by the previous phase. Copy it to the sorter for the next phase
+ * if any.
+ */
+static TupleTableSlot *
+fetch_input_tuple(AggState *aggstate)
+{
+ TupleTableSlot *slot;
+
+ if (aggstate->sort_in)
+ {
+ if (!tuplesort_gettupleslot(aggstate->sort_in, true, aggstate->sort_slot))
+ return NULL;
+ slot = aggstate->sort_slot;
+ }
+ else
+ slot = ExecProcNode(outerPlanState(aggstate));
+
+ if (!TupIsNull(slot) && aggstate->sort_out)
+ tuplesort_puttupleslot(aggstate->sort_out, slot);
+
+ return slot;
+}
+
+/*
+ * (Re)Initialize an individual aggregate.
+ *
+ * This function handles only one grouping set (already set in
+ * aggstate->current_set).
+ *
+ * When called, CurrentMemoryContext should be the per-query context.
+ */
+static void
+initialize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate)
+{
+ /*
+ * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
+ */
+ if (peraggstate->numSortCols > 0)
+ {
+ /*
+ * In case of rescan, maybe there could be an uncompleted sort
+ * operation? Clean it up if so.
+ */
+ if (peraggstate->sortstates[aggstate->current_set])
+ tuplesort_end(peraggstate->sortstates[aggstate->current_set]);
+
+
+ /*
+ * We use a plain Datum sorter when there's a single input column;
+ * otherwise sort the full tuple. (See comments for
+ * process_ordered_aggregate_single.)
+ */
+ if (peraggstate->numInputs == 1)
+ peraggstate->sortstates[aggstate->current_set] =
tuplesort_begin_datum(peraggstate->evaldesc->attrs[0]->atttypid,
peraggstate->sortOperators[0],
peraggstate->sortCollations[0],
peraggstate->sortNullsFirst[0],
- work_mem, false) :
+ work_mem, false);
+ else
+ peraggstate->sortstates[aggstate->current_set] =
tuplesort_begin_heap(peraggstate->evaldesc,
peraggstate->numSortCols,
peraggstate->sortColIdx,
@@ -378,41 +535,83 @@ initialize_aggregates(AggState *aggstate,
peraggstate->sortCollations,
peraggstate->sortNullsFirst,
work_mem, false);
- }
+ }
- /*
- * (Re)set transValue to the initial value.
- *
- * Note that when the initial value is pass-by-ref, we must copy it
- * (into the aggcontext) since we will pfree the transValue later.
- */
- if (peraggstate->initValueIsNull)
- pergroupstate->transValue = peraggstate->initValue;
- else
+ /*
+ * (Re)set transValue to the initial value.
+ *
+ * Note that when the initial value is pass-by-ref, we must copy
+ * it (into the aggcontext) since we will pfree the transValue
+ * later.
+ */
+ if (peraggstate->initValueIsNull)
+ pergroupstate->transValue = peraggstate->initValue;
+ else
+ {
+ MemoryContext oldContext;
+
+ oldContext = MemoryContextSwitchTo(
+ aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
+ pergroupstate->transValue = datumCopy(peraggstate->initValue,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ MemoryContextSwitchTo(oldContext);
+ }
+ pergroupstate->transValueIsNull = peraggstate->initValueIsNull;
+
+ /*
+ * If the initial value for the transition state doesn't exist in
+ * the pg_aggregate table then we will let the first non-NULL
+ * value returned from the outer procNode become the initial
+ * value. (This is useful for aggregates like max() and min().)
+ * The noTransValue flag signals that we still need to do this.
+ */
+ pergroupstate->noTransValue = peraggstate->initValueIsNull;
+}
+
+/*
+ * Initialize all aggregates for a new group of input values.
+ *
+ * If there are multiple grouping sets, we initialize only the first numReset
+ * of them (the grouping sets are ordered so that the most specific one, which
+ * is reset most often, is first). As a convenience, if numReset is < 1, we
+ * reinitialize all sets.
+ *
+ * When called, CurrentMemoryContext should be the per-query context.
+ */
+static void
+initialize_aggregates(AggState *aggstate,
+ AggStatePerAgg peragg,
+ AggStatePerGroup pergroup,
+ int numReset)
+{
+ int aggno;
+ int numGroupingSets = Max(aggstate->phase->numsets, 1);
+ int setno = 0;
+
+ if (numReset < 1)
+ numReset = numGroupingSets;
+
+ for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &peragg[aggno];
+
+ for (setno = 0; setno < numReset; setno++)
{
- MemoryContext oldContext;
+ AggStatePerGroup pergroupstate;
- oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
- pergroupstate->transValue = datumCopy(peraggstate->initValue,
- peraggstate->transtypeByVal,
- peraggstate->transtypeLen);
- MemoryContextSwitchTo(oldContext);
- }
- pergroupstate->transValueIsNull = peraggstate->initValueIsNull;
+ pergroupstate = &pergroup[aggno + (setno * (aggstate->numaggs))];
- /*
- * If the initial value for the transition state doesn't exist in the
- * pg_aggregate table then we will let the first non-NULL value
- * returned from the outer procNode become the initial value. (This is
- * useful for aggregates like max() and min().) The noTransValue flag
- * signals that we still need to do this.
- */
- pergroupstate->noTransValue = peraggstate->initValueIsNull;
+ aggstate->current_set = setno;
+
+ initialize_aggregate(aggstate, peraggstate, pergroupstate);
+ }
}
}
/*
- * Given new input value(s), advance the transition function of an aggregate.
+ * Given new input value(s), advance the transition function of one aggregate
+ * within one grouping set only (already set in aggstate->current_set)
*
* The new values (and null flags) have been preloaded into argument positions
* 1 and up in peraggstate->transfn_fcinfo, so that we needn't copy them again
@@ -455,7 +654,8 @@ advance_transition_function(AggState *aggstate,
* We must copy the datum into aggcontext if it is pass-by-ref. We
* do not need to pfree the old transValue, since it's NULL.
*/
- oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
+ oldContext = MemoryContextSwitchTo(
+ aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
pergroupstate->transValue = datumCopy(fcinfo->arg[1],
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -503,7 +703,7 @@ advance_transition_function(AggState *aggstate,
{
if (!fcinfo->isnull)
{
- MemoryContextSwitchTo(aggstate->aggcontext);
+ MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -530,11 +730,13 @@ static void
advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
{
int aggno;
+ int setno = 0;
+ int numGroupingSets = Max(aggstate->phase->numsets, 1);
+ int numAggs = aggstate->numaggs;
- for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ for (aggno = 0; aggno < numAggs; aggno++)
{
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
- AggStatePerGroup pergroupstate = &pergroup[aggno];
ExprState *filter = peraggstate->aggrefstate->aggfilter;
int numTransInputs = peraggstate->numTransInputs;
int i;
@@ -578,13 +780,16 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
continue;
}
- /* OK, put the tuple into the tuplesort object */
- if (peraggstate->numInputs == 1)
- tuplesort_putdatum(peraggstate->sortstate,
- slot->tts_values[0],
- slot->tts_isnull[0]);
- else
- tuplesort_puttupleslot(peraggstate->sortstate, slot);
+ for (setno = 0; setno < numGroupingSets; setno++)
+ {
+ /* OK, put the tuple into the tuplesort object */
+ if (peraggstate->numInputs == 1)
+ tuplesort_putdatum(peraggstate->sortstates[setno],
+ slot->tts_values[0],
+ slot->tts_isnull[0]);
+ else
+ tuplesort_puttupleslot(peraggstate->sortstates[setno], slot);
+ }
}
else
{
@@ -600,7 +805,14 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
fcinfo->argnull[i + 1] = slot->tts_isnull[i];
}
- advance_transition_function(aggstate, peraggstate, pergroupstate);
+ for (setno = 0; setno < numGroupingSets; setno++)
+ {
+ AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)];
+
+ aggstate->current_set = setno;
+
+ advance_transition_function(aggstate, peraggstate, pergroupstate);
+ }
}
}
}
@@ -623,6 +835,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
* is around 300% faster. (The speedup for by-reference types is less
* but still noticeable.)
*
+ * This function handles only one grouping set (already set in
+ * aggstate->current_set).
+ *
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
@@ -642,7 +857,7 @@ process_ordered_aggregate_single(AggState *aggstate,
Assert(peraggstate->numDistinctCols < 2);
- tuplesort_performsort(peraggstate->sortstate);
+ tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]);
/* Load the column into argument 1 (arg 0 will be transition value) */
newVal = fcinfo->arg + 1;
@@ -654,8 +869,8 @@ process_ordered_aggregate_single(AggState *aggstate,
* pfree them when they are no longer needed.
*/
- while (tuplesort_getdatum(peraggstate->sortstate, true,
- newVal, isNull))
+ while (tuplesort_getdatum(peraggstate->sortstates[aggstate->current_set],
+ true, newVal, isNull))
{
/*
* Clear and select the working context for evaluation of the equality
@@ -698,8 +913,8 @@ process_ordered_aggregate_single(AggState *aggstate,
if (!oldIsNull && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
- tuplesort_end(peraggstate->sortstate);
- peraggstate->sortstate = NULL;
+ tuplesort_end(peraggstate->sortstates[aggstate->current_set]);
+ peraggstate->sortstates[aggstate->current_set] = NULL;
}
/*
@@ -709,6 +924,9 @@ process_ordered_aggregate_single(AggState *aggstate,
* sort, read out the values in sorted order, and run the transition
* function on each value (applying DISTINCT if appropriate).
*
+ * This function handles only one grouping set (already set in
+ * aggstate->current_set).
+ *
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
@@ -725,13 +943,14 @@ process_ordered_aggregate_multi(AggState *aggstate,
bool haveOldValue = false;
int i;
- tuplesort_performsort(peraggstate->sortstate);
+ tuplesort_performsort(peraggstate->sortstates[aggstate->current_set]);
ExecClearTuple(slot1);
if (slot2)
ExecClearTuple(slot2);
- while (tuplesort_gettupleslot(peraggstate->sortstate, true, slot1))
+ while (tuplesort_gettupleslot(peraggstate->sortstates[aggstate->current_set],
+ true, slot1))
{
/*
* Extract the first numTransInputs columns as datums to pass to the
@@ -779,13 +998,16 @@ process_ordered_aggregate_multi(AggState *aggstate,
if (slot2)
ExecClearTuple(slot2);
- tuplesort_end(peraggstate->sortstate);
- peraggstate->sortstate = NULL;
+ tuplesort_end(peraggstate->sortstates[aggstate->current_set]);
+ peraggstate->sortstates[aggstate->current_set] = NULL;
}
/*
* Compute the final value of one aggregate.
*
+ * This function handles only one grouping set (already set in
+ * aggstate->current_set).
+ *
* The finalfunction will be run, and the result delivered, in the
* output-tuple context; caller's CurrentMemoryContext does not matter.
*/
@@ -832,7 +1054,7 @@ finalize_aggregate(AggState *aggstate,
/* set up aggstate->curperagg for AggGetAggref() */
aggstate->curperagg = peraggstate;
- InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
+ InitFunctionCallInfoData(fcinfo, &peraggstate->finalfn,
numFinalArgs,
peraggstate->aggCollation,
(void *) aggstate, NULL);
@@ -882,6 +1104,154 @@ finalize_aggregate(AggState *aggstate,
MemoryContextSwitchTo(oldContext);
}
+
+/*
+ * Prepare to finalize and project based on the specified representative tuple
+ * slot and grouping set.
+ *
+ * In the specified tuple slot, force to null all attributes that should be
+ * read as null in the context of the current grouping set. Also stash the
+ * current group bitmap where GroupingExpr can get at it.
+ *
+ * This relies on three conditions:
+ *
+ * 1) Nothing is ever going to try and extract the whole tuple from this slot,
+ * only reference it in evaluations, which will only access individual
+ * attributes.
+ *
+ * 2) No system columns are going to need to be nulled. (If a system column is
+ * referenced in a group clause, it is actually projected in the outer plan
+ * tlist.)
+ *
+ * 3) Within a given phase, we never need to recover the value of an attribute
+ * once it has been set to null.
+ *
+ * Poking into the slot this way is a bit ugly, but the consensus is that the
+ * alternative was worse.
+ */
+static void
+prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
+{
+ if (aggstate->phase->grouped_cols)
+ {
+ Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
+
+ aggstate->grouped_cols = grouped_cols;
+
+ if (slot->tts_isempty)
+ {
+ /*
+ * Force all values to be NULL if working on an empty input tuple
+ * (i.e. an empty grouping set for which no input rows were
+ * supplied).
+ */
+ ExecStoreAllNullTuple(slot);
+ }
+ else if (aggstate->all_grouped_cols)
+ {
+ ListCell *lc;
+
+ /* all_grouped_cols is arranged in desc order */
+ slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
+
+ foreach(lc, aggstate->all_grouped_cols)
+ {
+ int attnum = lfirst_int(lc);
+
+ if (!bms_is_member(attnum, grouped_cols))
+ slot->tts_isnull[attnum - 1] = true;
+ }
+ }
+ }
+}
+
+/*
+ * Compute the final value of all aggregates for one group.
+ *
+ * This function handles only one grouping set at a time.
+ *
+ * Results are stored in the output econtext aggvalues/aggnulls.
+ */
+static void
+finalize_aggregates(AggState *aggstate,
+ AggStatePerAgg peragg,
+ AggStatePerGroup pergroup,
+ int currentSet)
+{
+ ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
+ Datum *aggvalues = econtext->ecxt_aggvalues;
+ bool *aggnulls = econtext->ecxt_aggnulls;
+ int aggno;
+
+ Assert(currentSet == 0 ||
+ ((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED);
+
+ aggstate->current_set = currentSet;
+
+ for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &peragg[aggno];
+ AggStatePerGroup pergroupstate;
+
+ pergroupstate = &pergroup[aggno + (currentSet * (aggstate->numaggs))];
+
+ if (peraggstate->numSortCols > 0)
+ {
+ Assert(((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED);
+
+ if (peraggstate->numInputs == 1)
+ process_ordered_aggregate_single(aggstate,
+ peraggstate,
+ pergroupstate);
+ else
+ process_ordered_aggregate_multi(aggstate,
+ peraggstate,
+ pergroupstate);
+ }
+
+ finalize_aggregate(aggstate, peraggstate, pergroupstate,
+ &aggvalues[aggno], &aggnulls[aggno]);
+ }
+}
+
+/*
+ * Project the result of a group (whose aggs have already been calculated by
+ * finalize_aggregates). Returns the result slot, or NULL if no row is
+ * projected (suppressed by qual or by an empty SRF).
+ */
+static TupleTableSlot *
+project_aggregates(AggState *aggstate)
+{
+ ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
+
+ /*
+ * Check the qual (HAVING clause); if the group does not match, ignore
+ * it.
+ */
+ if (ExecQual(aggstate->ss.ps.qual, econtext, false))
+ {
+ /*
+ * Form and return or store a projection tuple using the aggregate
+ * results and the representative input tuple.
+ */
+ ExprDoneCond isDone;
+ TupleTableSlot *result;
+
+ result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone);
+
+ if (isDone != ExprEndResult)
+ {
+ aggstate->ss.ps.ps_TupFromTlist =
+ (isDone == ExprMultipleResult);
+ return result;
+ }
+ }
+ else
+ InstrCountFiltered1(aggstate, 1);
+
+ return NULL;
+}
+
/*
* find_unaggregated_cols
* Construct a bitmapset of the column numbers of un-aggregated Vars
@@ -916,8 +1286,11 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
*colnos = bms_add_member(*colnos, var->varattno);
return false;
}
- if (IsA(node, Aggref)) /* do not descend into aggregate exprs */
+ if (IsA(node, Aggref) || IsA(node, GroupingFunc))
+ {
+ /* do not descend into aggregate exprs */
return false;
+ }
return expression_tree_walker(node, find_unaggregated_cols_walker,
(void *) colnos);
}
@@ -942,11 +1315,11 @@ build_hash_table(AggState *aggstate)
aggstate->hashtable = BuildTupleHashTable(node->numCols,
node->grpColIdx,
- aggstate->eqfunctions,
+ aggstate->phase->eqfunctions,
aggstate->hashfunctions,
node->numGroups,
entrysize,
- aggstate->aggcontext,
+ aggstate->aggcontexts[0]->ecxt_per_tuple_memory,
tmpmem);
}
@@ -1057,7 +1430,7 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot)
if (isnew)
{
/* initialize aggregates for new tuple group */
- initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup);
+ initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup, 0);
}
return entry;
@@ -1079,6 +1452,8 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot)
TupleTableSlot *
ExecAgg(AggState *node)
{
+ TupleTableSlot *result;
+
/*
* Check to see if we're still projecting out tuples from a previous agg
* tuple (because there is a function-returning-set in the projection
@@ -1086,7 +1461,6 @@ ExecAgg(AggState *node)
*/
if (node->ss.ps.ps_TupFromTlist)
{
- TupleTableSlot *result;
ExprDoneCond isDone;
result = ExecProject(node->ss.ps.ps_ProjInfo, &isDone);
@@ -1097,22 +1471,30 @@ ExecAgg(AggState *node)
}
/*
- * Exit if nothing left to do. (We must do the ps_TupFromTlist check
- * first, because in some cases agg_done gets set before we emit the final
- * aggregate tuple, and we have to finish running SRFs for it.)
+ * (We must do the ps_TupFromTlist check first, because in some cases
+ * agg_done gets set before we emit the final aggregate tuple, and we have
+ * to finish running SRFs for it.)
*/
- if (node->agg_done)
- return NULL;
-
- /* Dispatch based on strategy */
- if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
+ if (!node->agg_done)
{
- if (!node->table_filled)
- agg_fill_hash_table(node);
- return agg_retrieve_hash_table(node);
+ /* Dispatch based on strategy */
+ switch (node->phase->aggnode->aggstrategy)
+ {
+ case AGG_HASHED:
+ if (!node->table_filled)
+ agg_fill_hash_table(node);
+ result = agg_retrieve_hash_table(node);
+ break;
+ default:
+ result = agg_retrieve_direct(node);
+ break;
+ }
+
+ if (!TupIsNull(result))
+ return result;
}
- else
- return agg_retrieve_direct(node);
+
+ return NULL;
}
/*
@@ -1121,28 +1503,30 @@ ExecAgg(AggState *node)
static TupleTableSlot *
agg_retrieve_direct(AggState *aggstate)
{
- Agg *node = (Agg *) aggstate->ss.ps.plan;
- PlanState *outerPlan;
+ Agg *node = aggstate->phase->aggnode;
ExprContext *econtext;
ExprContext *tmpcontext;
- Datum *aggvalues;
- bool *aggnulls;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
TupleTableSlot *outerslot;
TupleTableSlot *firstSlot;
- int aggno;
+ TupleTableSlot *result;
+ bool hasGroupingSets = aggstate->phase->numsets > 0;
+ int numGroupingSets = Max(aggstate->phase->numsets, 1);
+ int currentSet;
+ int nextSetSize;
+ int numReset;
+ int i;
/*
* get state info from node
+ *
+ * econtext is the per-output-tuple expression context
+ * tmpcontext is the per-input-tuple expression context
*/
- outerPlan = outerPlanState(aggstate);
- /* econtext is the per-output-tuple expression context */
econtext = aggstate->ss.ps.ps_ExprContext;
- aggvalues = econtext->ecxt_aggvalues;
- aggnulls = econtext->ecxt_aggnulls;
- /* tmpcontext is the per-input-tuple expression context */
tmpcontext = aggstate->tmpcontext;
+
peragg = aggstate->peragg;
pergroup = aggstate->pergroup;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
@@ -1150,172 +1534,281 @@ agg_retrieve_direct(AggState *aggstate)
/*
* We loop retrieving groups until we find one matching
* aggstate->ss.ps.qual
+ *
+ * For grouping sets, we have the invariant that aggstate->projected_set
+ * is either -1 (initial call) or the index (starting from 0) in
+ * gset_lengths for the group we just completed (either by projecting a
+ * row or by discarding it in the qual).
*/
while (!aggstate->agg_done)
{
/*
- * If we don't already have the first tuple of the new group, fetch it
- * from the outer plan.
+ * Clear the per-output-tuple context for each group, as well as
+ * aggcontext (which contains any pass-by-ref transvalues of the old
+ * group). Some aggregate functions store working state in child
+ * contexts; those now get reset automatically without us needing to
+ * do anything special.
+ *
+ * We use ReScanExprContext not just ResetExprContext because we want
+ * any registered shutdown callbacks to be called. That allows
+ * aggregate functions to ensure they've cleaned up any non-memory
+ * resources.
+ */
+ ReScanExprContext(econtext);
+
+ /*
+ * Determine how many grouping sets need to be reset at this boundary.
*/
- if (aggstate->grp_firstTuple == NULL)
+ if (aggstate->projected_set >= 0 &&
+ aggstate->projected_set < numGroupingSets)
+ numReset = aggstate->projected_set + 1;
+ else
+ numReset = numGroupingSets;
+
+ /*
+ * numReset can change on a phase boundary, but that's OK; we want to
+ * reset the contexts used in _this_ phase, and later, after possibly
+ * changing phase, initialize the right number of aggregates for the
+ * _new_ phase.
+ */
+
+ for (i = 0; i < numReset; i++)
+ {
+ ReScanExprContext(aggstate->aggcontexts[i]);
+ }
+
+ /*
+ * Check if input is complete and there are no more groups to project
+ * in this phase; move to next phase or mark as done.
+ */
+ if (aggstate->input_done == true &&
+ aggstate->projected_set >= (numGroupingSets - 1))
{
- outerslot = ExecProcNode(outerPlan);
- if (!TupIsNull(outerslot))
+ if (aggstate->current_phase < aggstate->numphases - 1)
{
- /*
- * Make a copy of the first input tuple; we will use this for
- * comparisons (in group mode) and for projection.
- */
- aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
+ initialize_phase(aggstate, aggstate->current_phase + 1);
+ aggstate->input_done = false;
+ aggstate->projected_set = -1;
+ numGroupingSets = Max(aggstate->phase->numsets, 1);
+ node = aggstate->phase->aggnode;
+ numReset = numGroupingSets;
}
else
{
- /* outer plan produced no tuples at all */
aggstate->agg_done = true;
- /* If we are grouping, we should produce no tuples too */
- if (node->aggstrategy != AGG_PLAIN)
- return NULL;
+ break;
}
}
/*
- * Clear the per-output-tuple context for each group, as well as
- * aggcontext (which contains any pass-by-ref transvalues of the old
- * group). We also clear any child contexts of the aggcontext; some
- * aggregate functions store working state in such contexts.
- *
- * We use ReScanExprContext not just ResetExprContext because we want
- * any registered shutdown callbacks to be called. That allows
- * aggregate functions to ensure they've cleaned up any non-memory
- * resources.
+ * Get the number of columns in the next grouping set after the last
+ * projected one (if any). This is the number of columns to compare to
+ * see if we reached the boundary of that set too.
*/
- ReScanExprContext(econtext);
-
- MemoryContextResetAndDeleteChildren(aggstate->aggcontext);
+ if (aggstate->projected_set >= 0 &&
+ aggstate->projected_set < (numGroupingSets - 1))
+ nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
+ else
+ nextSetSize = 0;
- /*
- * Initialize working state for a new input tuple group
+ /*-
+ * If a subgroup for the current grouping set is present, project it.
+ *
+ * We have a new group if:
+ * - we're out of input but haven't projected all grouping sets
+ * (checked above)
+ * OR
+ * - we already projected a row that wasn't from the last grouping
+ * set
+ * AND
+ * - the next grouping set has at least one grouping column (since
+ * empty grouping sets project only once input is exhausted)
+ * AND
+ * - the previous and pending rows differ on the grouping columns
+ * of the next grouping set
*/
- initialize_aggregates(aggstate, peragg, pergroup);
+ if (aggstate->input_done ||
+ (node->aggstrategy == AGG_SORTED &&
+ aggstate->projected_set != -1 &&
+ aggstate->projected_set < (numGroupingSets - 1) &&
+ nextSetSize > 0 &&
+ !execTuplesMatch(econtext->ecxt_outertuple,
+ tmpcontext->ecxt_outertuple,
+ nextSetSize,
+ node->grpColIdx,
+ aggstate->phase->eqfunctions,
+ tmpcontext->ecxt_per_tuple_memory)))
+ {
+ aggstate->projected_set += 1;
- if (aggstate->grp_firstTuple != NULL)
+ Assert(aggstate->projected_set < numGroupingSets);
+ Assert(nextSetSize > 0 || aggstate->input_done);
+ }
+ else
{
/*
- * Store the copied first input tuple in the tuple table slot
- * reserved for it. The tuple will be deleted when it is cleared
- * from the slot.
+ * We no longer care what group we just projected, the next
+ * projection will always be the first (or only) grouping set
+ * (unless the input proves to be empty).
*/
- ExecStoreTuple(aggstate->grp_firstTuple,
- firstSlot,
- InvalidBuffer,
- true);
- aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
-
- /* set up for first advance_aggregates call */
- tmpcontext->ecxt_outertuple = firstSlot;
+ aggstate->projected_set = 0;
/*
- * Process each outer-plan tuple, and then fetch the next one,
- * until we exhaust the outer plan or cross a group boundary.
+ * If we don't already have the first tuple of the new group,
+ * fetch it from the outer plan.
*/
- for (;;)
+ if (aggstate->grp_firstTuple == NULL)
{
- advance_aggregates(aggstate, pergroup);
-
- /* Reset per-input-tuple context after each tuple */
- ResetExprContext(tmpcontext);
-
- outerslot = ExecProcNode(outerPlan);
- if (TupIsNull(outerslot))
+ outerslot = fetch_input_tuple(aggstate);
+ if (!TupIsNull(outerslot))
{
- /* no more outer-plan tuples available */
- aggstate->agg_done = true;
- break;
+ /*
+ * Make a copy of the first input tuple; we will use this
+ * for comparisons (in group mode) and for projection.
+ */
+ aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
}
- /* set up for next advance_aggregates call */
- tmpcontext->ecxt_outertuple = outerslot;
-
- /*
- * If we are grouping, check whether we've crossed a group
- * boundary.
- */
- if (node->aggstrategy == AGG_SORTED)
+ else
{
- if (!execTuplesMatch(firstSlot,
- outerslot,
- node->numCols, node->grpColIdx,
- aggstate->eqfunctions,
- tmpcontext->ecxt_per_tuple_memory))
+ /* outer plan produced no tuples at all */
+ if (hasGroupingSets)
{
/*
- * Save the first input tuple of the next group.
+ * If there was no input at all, we need to project
+ * rows only if there are grouping sets of size 0.
+ * Note that this implies that there can't be any
+ * references to ungrouped Vars, which would otherwise
+ * cause issues with the empty output slot.
+ *
+ * XXX: This is no longer true, we currently deal with
+ * this in finalize_aggregates().
*/
- aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
- break;
+ aggstate->input_done = true;
+
+ while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
+ {
+ aggstate->projected_set += 1;
+ if (aggstate->projected_set >= numGroupingSets)
+ {
+ /*
+ * We can't set agg_done here because we might
+ * have more phases to do, even though the
+ * input is empty. So we need to restart the
+ * whole outer loop.
+ */
+ break;
+ }
+ }
+
+ if (aggstate->projected_set >= numGroupingSets)
+ continue;
+ }
+ else
+ {
+ aggstate->agg_done = true;
+ /* If we are grouping, we should produce no tuples too */
+ if (node->aggstrategy != AGG_PLAIN)
+ return NULL;
}
}
}
- }
- /*
- * Use the representative input tuple for any references to
- * non-aggregated input columns in aggregate direct args, the node
- * qual, and the tlist. (If we are not grouping, and there are no
- * input rows at all, we will come here with an empty firstSlot ...
- * but if not grouping, there can't be any references to
- * non-aggregated input columns, so no problem.)
- */
- econtext->ecxt_outertuple = firstSlot;
-
- /*
- * Done scanning input tuple group. Finalize each aggregate
- * calculation, and stash results in the per-output-tuple context.
- */
- for (aggno = 0; aggno < aggstate->numaggs; aggno++)
- {
- AggStatePerAgg peraggstate = &peragg[aggno];
- AggStatePerGroup pergroupstate = &pergroup[aggno];
+ /*
+ * Initialize working state for a new input tuple group.
+ */
+ initialize_aggregates(aggstate, peragg, pergroup, numReset);
- if (peraggstate->numSortCols > 0)
+ if (aggstate->grp_firstTuple != NULL)
{
- if (peraggstate->numInputs == 1)
- process_ordered_aggregate_single(aggstate,
- peraggstate,
- pergroupstate);
- else
- process_ordered_aggregate_multi(aggstate,
- peraggstate,
- pergroupstate);
- }
+ /*
+ * Store the copied first input tuple in the tuple table slot
+ * reserved for it. The tuple will be deleted when it is
+ * cleared from the slot.
+ */
+ ExecStoreTuple(aggstate->grp_firstTuple,
+ firstSlot,
+ InvalidBuffer,
+ true);
+ aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
- finalize_aggregate(aggstate, peraggstate, pergroupstate,
- &aggvalues[aggno], &aggnulls[aggno]);
- }
+ /* set up for first advance_aggregates call */
+ tmpcontext->ecxt_outertuple = firstSlot;
- /*
- * Check the qual (HAVING clause); if the group does not match, ignore
- * it and loop back to try to process another group.
- */
- if (ExecQual(aggstate->ss.ps.qual, econtext, false))
- {
- /*
- * Form and return a projection tuple using the aggregate results
- * and the representative input tuple.
- */
- TupleTableSlot *result;
- ExprDoneCond isDone;
+ /*
+ * Process each outer-plan tuple, and then fetch the next one,
+ * until we exhaust the outer plan or cross a group boundary.
+ */
+ for (;;)
+ {
+ advance_aggregates(aggstate, pergroup);
- result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone);
+ /* Reset per-input-tuple context after each tuple */
+ ResetExprContext(tmpcontext);
- if (isDone != ExprEndResult)
- {
- aggstate->ss.ps.ps_TupFromTlist =
- (isDone == ExprMultipleResult);
- return result;
+ outerslot = fetch_input_tuple(aggstate);
+ if (TupIsNull(outerslot))
+ {
+ /* no more outer-plan tuples available */
+ if (hasGroupingSets)
+ {
+ aggstate->input_done = true;
+ break;
+ }
+ else
+ {
+ aggstate->agg_done = true;
+ break;
+ }
+ }
+ /* set up for next advance_aggregates call */
+ tmpcontext->ecxt_outertuple = outerslot;
+
+ /*
+ * If we are grouping, check whether we've crossed a group
+ * boundary.
+ */
+ if (node->aggstrategy == AGG_SORTED)
+ {
+ if (!execTuplesMatch(firstSlot,
+ outerslot,
+ node->numCols,
+ node->grpColIdx,
+ aggstate->phase->eqfunctions,
+ tmpcontext->ecxt_per_tuple_memory))
+ {
+ aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
+ break;
+ }
+ }
+ }
}
+
+ /*
+ * Use the representative input tuple for any references to
+ * non-aggregated input columns in aggregate direct args, the node
+ * qual, and the tlist. (If we are not grouping, and there are no
+ * input rows at all, we will come here with an empty firstSlot ...
+ * but if not grouping, there can't be any references to
+ * non-aggregated input columns, so no problem.)
+ */
+ econtext->ecxt_outertuple = firstSlot;
}
- else
- InstrCountFiltered1(aggstate, 1);
+
+ Assert(aggstate->projected_set >= 0);
+
+ currentSet = aggstate->projected_set;
+
+ prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
+
+ finalize_aggregates(aggstate, peragg, pergroup, currentSet);
+
+ /*
+ * If there's no row to project right now, we must continue rather than
+ * returning a null since there might be more groups.
+ */
+ result = project_aggregates(aggstate);
+ if (result)
+ return result;
}
/* No more groups */
@@ -1328,16 +1821,15 @@ agg_retrieve_direct(AggState *aggstate)
static void
agg_fill_hash_table(AggState *aggstate)
{
- PlanState *outerPlan;
ExprContext *tmpcontext;
AggHashEntry entry;
TupleTableSlot *outerslot;
/*
* get state info from node
+ *
+ * tmpcontext is the per-input-tuple expression context
*/
- outerPlan = outerPlanState(aggstate);
- /* tmpcontext is the per-input-tuple expression context */
tmpcontext = aggstate->tmpcontext;
/*
@@ -1346,7 +1838,7 @@ agg_fill_hash_table(AggState *aggstate)
*/
for (;;)
{
- outerslot = ExecProcNode(outerPlan);
+ outerslot = fetch_input_tuple(aggstate);
if (TupIsNull(outerslot))
break;
/* set up for advance_aggregates call */
@@ -1374,21 +1866,17 @@ static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
ExprContext *econtext;
- Datum *aggvalues;
- bool *aggnulls;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
AggHashEntry entry;
TupleTableSlot *firstSlot;
- int aggno;
+ TupleTableSlot *result;
/*
* get state info from node
*/
/* econtext is the per-output-tuple expression context */
econtext = aggstate->ss.ps.ps_ExprContext;
- aggvalues = econtext->ecxt_aggvalues;
- aggnulls = econtext->ecxt_aggnulls;
peragg = aggstate->peragg;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
@@ -1428,19 +1916,7 @@ agg_retrieve_hash_table(AggState *aggstate)
pergroup = entry->pergroup;
- /*
- * Finalize each aggregate calculation, and stash results in the
- * per-output-tuple context.
- */
- for (aggno = 0; aggno < aggstate->numaggs; aggno++)
- {
- AggStatePerAgg peraggstate = &peragg[aggno];
- AggStatePerGroup pergroupstate = &pergroup[aggno];
-
- Assert(peraggstate->numSortCols == 0);
- finalize_aggregate(aggstate, peraggstate, pergroupstate,
- &aggvalues[aggno], &aggnulls[aggno]);
- }
+ finalize_aggregates(aggstate, peragg, pergroup, 0);
/*
* Use the representative input tuple for any references to
@@ -1448,30 +1924,9 @@ agg_retrieve_hash_table(AggState *aggstate)
*/
econtext->ecxt_outertuple = firstSlot;
- /*
- * Check the qual (HAVING clause); if the group does not match, ignore
- * it and loop back to try to process another group.
- */
- if (ExecQual(aggstate->ss.ps.qual, econtext, false))
- {
- /*
- * Form and return a projection tuple using the aggregate results
- * and the representative input tuple.
- */
- TupleTableSlot *result;
- ExprDoneCond isDone;
-
- result = ExecProject(aggstate->ss.ps.ps_ProjInfo, &isDone);
-
- if (isDone != ExprEndResult)
- {
- aggstate->ss.ps.ps_TupFromTlist =
- (isDone == ExprMultipleResult);
- return result;
- }
- }
- else
- InstrCountFiltered1(aggstate, 1);
+ result = project_aggregates(aggstate);
+ if (result)
+ return result;
}
/* No more groups */
@@ -1494,7 +1949,14 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
ExprContext *econtext;
int numaggs,
aggno;
+ int phase;
ListCell *l;
+ Bitmapset *all_grouped_cols = NULL;
+ int numGroupingSets = 1;
+ int numPhases;
+ int currentsortno = 0;
+ int i = 0;
+ int j = 0;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -1508,38 +1970,68 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->aggs = NIL;
aggstate->numaggs = 0;
- aggstate->eqfunctions = NULL;
+ aggstate->maxsets = 0;
aggstate->hashfunctions = NULL;
+ aggstate->projected_set = -1;
+ aggstate->current_set = 0;
aggstate->peragg = NULL;
aggstate->curperagg = NULL;
aggstate->agg_done = false;
+ aggstate->input_done = false;
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
aggstate->hashtable = NULL;
+ aggstate->sort_in = NULL;
+ aggstate->sort_out = NULL;
/*
- * Create expression contexts. We need two, one for per-input-tuple
- * processing and one for per-output-tuple processing. We cheat a little
- * by using ExecAssignExprContext() to build both.
+ * Calculate the maximum number of grouping sets in any phase; this
+ * determines the size of some allocations.
*/
- ExecAssignExprContext(estate, &aggstate->ss.ps);
- aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
- ExecAssignExprContext(estate, &aggstate->ss.ps);
+ if (node->groupingSets)
+ {
+ Assert(node->aggstrategy != AGG_HASHED);
+
+ numGroupingSets = list_length(node->groupingSets);
+
+ foreach(l, node->chain)
+ {
+ Agg *agg = lfirst(l);
+
+ numGroupingSets = Max(numGroupingSets,
+ list_length(agg->groupingSets));
+ }
+ }
+
+ aggstate->maxsets = numGroupingSets;
+ aggstate->numphases = numPhases = 1 + list_length(node->chain);
+
+ aggstate->aggcontexts = (ExprContext **)
+ palloc0(sizeof(ExprContext *) * numGroupingSets);
/*
- * We also need a long-lived memory context for holding hashtable data
- * structures and transition values. NOTE: the details of what is stored
- * in aggcontext and what is stored in the regular per-query memory
- * context are driven by a simple decision: we want to reset the
- * aggcontext at group boundaries (if not hashing) and in ExecReScanAgg to
- * recover no-longer-wanted space.
+ * Create expression contexts. We need three or more, one for
+ * per-input-tuple processing, one for per-output-tuple processing, and
+ * one for each grouping set. The per-tuple memory context of the
+ * per-grouping-set ExprContexts (aggcontexts) replaces the standalone
+ * memory context formerly used to hold transition values. We cheat a
+ * little by using ExecAssignExprContext() to build all of them.
+ *
+ * NOTE: the details of what is stored in aggcontexts and what is stored
+ * in the regular per-query memory context are driven by a simple
+ * decision: we want to reset the aggcontext at group boundaries (if not
+ * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
*/
- aggstate->aggcontext =
- AllocSetContextCreate(CurrentMemoryContext,
- "AggContext",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
+ aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
+
+ for (i = 0; i < numGroupingSets; ++i)
+ {
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
+ aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
+ }
+
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
/*
* tuple table initialization
@@ -1547,6 +2039,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
ExecInitScanTupleSlot(estate, &aggstate->ss);
ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
aggstate->hashslot = ExecInitExtraTupleSlot(estate);
+ aggstate->sort_slot = ExecInitExtraTupleSlot(estate);
/*
* initialize child expressions
@@ -1565,7 +2058,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
(PlanState *) aggstate);
/*
- * initialize child nodes
+ * Initialize child nodes.
*
* If we are doing a hashed aggregation then the child plan does not need
* to handle REWIND efficiently; see ExecReScanAgg.
@@ -1579,6 +2072,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* initialize source tuple type.
*/
ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
+ if (node->chain)
+ ExecSetSlotDescriptor(aggstate->sort_slot,
+ aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
/*
* Initialize result tuple type and projection info.
@@ -1606,24 +2102,105 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
}
/*
- * If we are grouping, precompute fmgr lookup data for inner loop. We need
- * both equality and hashing functions to do it by hashing, but only
- * equality if not hashing.
+ * For each phase, prepare grouping set data and fmgr lookup data for
+ * compare functions. Accumulate all_grouped_cols in passing.
*/
- if (node->numCols > 0)
+
+ aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
+
+ for (phase = 0; phase < numPhases; ++phase)
{
- if (node->aggstrategy == AGG_HASHED)
- execTuplesHashPrepare(node->numCols,
- node->grpOperators,
- &aggstate->eqfunctions,
- &aggstate->hashfunctions);
+ AggStatePerPhase phasedata = &aggstate->phases[phase];
+ Agg *aggnode;
+ Sort *sortnode;
+ int num_sets;
+
+ if (phase > 0)
+ {
+ aggnode = list_nth(node->chain, phase-1);
+ sortnode = (Sort *) aggnode->plan.lefttree;
+ Assert(IsA(sortnode, Sort));
+ }
+ else
+ {
+ aggnode = node;
+ sortnode = NULL;
+ }
+
+ phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
+
+ if (num_sets)
+ {
+ phasedata->gset_lengths = palloc(num_sets * sizeof(int));
+ phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
+
+ i = 0;
+ foreach(l, aggnode->groupingSets)
+ {
+ int current_length = list_length(lfirst(l));
+ Bitmapset *cols = NULL;
+
+ /* planner forces this to be correct */
+ for (j = 0; j < current_length; ++j)
+ cols = bms_add_member(cols, aggnode->grpColIdx[j]);
+
+ phasedata->grouped_cols[i] = cols;
+ phasedata->gset_lengths[i] = current_length;
+ ++i;
+ }
+
+ all_grouped_cols = bms_add_members(all_grouped_cols,
+ phasedata->grouped_cols[0]);
+ }
else
- aggstate->eqfunctions =
- execTuplesMatchPrepare(node->numCols,
- node->grpOperators);
+ {
+ Assert(phase == 0);
+
+ phasedata->gset_lengths = NULL;
+ phasedata->grouped_cols = NULL;
+ }
+
+ /*
+ * If we are grouping, precompute fmgr lookup data for inner loop.
+ */
+ if (aggnode->aggstrategy == AGG_SORTED)
+ {
+ Assert(aggnode->numCols > 0);
+
+ phasedata->eqfunctions =
+ execTuplesMatchPrepare(aggnode->numCols,
+ aggnode->grpOperators);
+ }
+
+ phasedata->aggnode = aggnode;
+ phasedata->sortnode = sortnode;
}
/*
+ * Convert all_grouped_cols to a descending-order list.
+ */
+ i = -1;
+ while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
+ aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
+
+ /*
+ * Hashing can only appear in the initial phase.
+ */
+
+ if (node->aggstrategy == AGG_HASHED)
+ execTuplesHashPrepare(node->numCols,
+ node->grpOperators,
+ &aggstate->phases[0].eqfunctions,
+ &aggstate->hashfunctions);
+
+ /*
+ * Initialize current phase-dependent values to initial phase
+ */
+
+ aggstate->current_phase = 0;
+ initialize_phase(aggstate, 0);
+
+ /*
* Set up aggregate-result storage in the output expr context, and also
* allocate my private per-agg working storage
*/
@@ -1645,7 +2222,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
{
AggStatePerGroup pergroup;
- pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs);
+ pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
+ * numaggs
+ * numGroupingSets);
+
aggstate->pergroup = pergroup;
}
@@ -1708,7 +2288,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
/* Begin filling in the peraggstate data */
peraggstate->aggrefstate = aggrefstate;
peraggstate->aggref = aggref;
- peraggstate->sortstate = NULL;
+ peraggstate->sortstates =(Tuplesortstate**)
+ palloc0(sizeof(Tuplesortstate*) * numGroupingSets);
+
+ for (currentsortno = 0; currentsortno < numGroupingSets; currentsortno++)
+ peraggstate->sortstates[currentsortno] = NULL;
/* Fetch the pg_aggregate row */
aggTuple = SearchSysCache1(AGGFNOID,
@@ -2016,31 +2600,41 @@ ExecEndAgg(AggState *node)
{
PlanState *outerPlan;
int aggno;
+ int numGroupingSets = Max(node->maxsets, 1);
+ int setno;
/* Make sure we have closed any open tuplesorts */
+
+ if (node->sort_in)
+ tuplesort_end(node->sort_in);
+ if (node->sort_out)
+ tuplesort_end(node->sort_out);
+
for (aggno = 0; aggno < node->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &node->peragg[aggno];
- if (peraggstate->sortstate)
- tuplesort_end(peraggstate->sortstate);
+ for (setno = 0; setno < numGroupingSets; setno++)
+ {
+ if (peraggstate->sortstates[setno])
+ tuplesort_end(peraggstate->sortstates[setno]);
+ }
}
/* And ensure any agg shutdown callbacks have been called */
- ReScanExprContext(node->ss.ps.ps_ExprContext);
+ for (setno = 0; setno < numGroupingSets; setno++)
+ ReScanExprContext(node->aggcontexts[setno]);
/*
- * Free both the expr contexts.
+ * We don't actually free any ExprContexts here (see comment in
+ * ExecFreeExprContext), just unlinking the output one from the plan node
+ * suffices.
*/
ExecFreeExprContext(&node->ss.ps);
- node->ss.ps.ps_ExprContext = node->tmpcontext;
- ExecFreeExprContext(&node->ss.ps);
/* clean up tuple table */
ExecClearTuple(node->ss.ss_ScanTupleSlot);
- MemoryContextDelete(node->aggcontext);
-
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
}
@@ -2050,13 +2644,16 @@ ExecReScanAgg(AggState *node)
{
ExprContext *econtext = node->ss.ps.ps_ExprContext;
PlanState *outerPlan = outerPlanState(node);
+ Agg *aggnode = (Agg *) node->ss.ps.plan;
int aggno;
+ int numGroupingSets = Max(node->maxsets, 1);
+ int setno;
node->agg_done = false;
node->ss.ps.ps_TupFromTlist = false;
- if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
+ if (aggnode->aggstrategy == AGG_HASHED)
{
/*
* In the hashed case, if we haven't yet built the hash table then we
@@ -2082,14 +2679,34 @@ ExecReScanAgg(AggState *node)
/* Make sure we have closed any open tuplesorts */
for (aggno = 0; aggno < node->numaggs; aggno++)
{
- AggStatePerAgg peraggstate = &node->peragg[aggno];
+ for (setno = 0; setno < numGroupingSets; setno++)
+ {
+ AggStatePerAgg peraggstate = &node->peragg[aggno];
- if (peraggstate->sortstate)
- tuplesort_end(peraggstate->sortstate);
- peraggstate->sortstate = NULL;
+ if (peraggstate->sortstates[setno])
+ {
+ tuplesort_end(peraggstate->sortstates[setno]);
+ peraggstate->sortstates[setno] = NULL;
+ }
+ }
}
- /* We don't need to ReScanExprContext here; ExecReScan already did it */
+ /*
+ * We don't need to ReScanExprContext the output tuple context here;
+ * ExecReScan already did it. But we do need to reset our per-grouping-set
+ * contexts, which may have transvalues stored in them. (We use rescan
+ * rather than just reset because transfns may have registered callbacks
+ * that need to be run now.)
+ *
+ * Note that with AGG_HASHED, the hash table is allocated in a sub-context
+ * of the aggcontext. This used to be an issue, but now, resetting a
+ * context automatically deletes sub-contexts too.
+ */
+
+ for (setno = 0; setno < numGroupingSets; setno++)
+ {
+ ReScanExprContext(node->aggcontexts[setno]);
+ }
/* Release first tuple of group, if we have made a copy */
if (node->grp_firstTuple != NULL)
@@ -2097,21 +2714,13 @@ ExecReScanAgg(AggState *node)
heap_freetuple(node->grp_firstTuple);
node->grp_firstTuple = NULL;
}
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
/* Forget current agg values */
MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
- /*
- * Release all temp storage. Note that with AGG_HASHED, the hash table is
- * allocated in a sub-context of the aggcontext. We're going to rebuild
- * the hash table from scratch, so we need to use
- * MemoryContextResetAndDeleteChildren() to avoid leaking the old hash
- * table's memory context header.
- */
- MemoryContextResetAndDeleteChildren(node->aggcontext);
-
- if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
+ if (aggnode->aggstrategy == AGG_HASHED)
{
/* Rebuild an empty hash table */
build_hash_table(node);
@@ -2123,13 +2732,15 @@ ExecReScanAgg(AggState *node)
* Reset the per-group state (in particular, mark transvalues null)
*/
MemSet(node->pergroup, 0,
- sizeof(AggStatePerGroupData) * node->numaggs);
+ sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets);
+
+ /* reset to phase 0 */
+ initialize_phase(node, 0);
+
+ node->input_done = false;
+ node->projected_set = -1;
}
- /*
- * if chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
- */
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
}
@@ -2151,8 +2762,11 @@ ExecReScanAgg(AggState *node)
* values could conceivably appear in future.)
*
* If aggcontext isn't NULL, the function also stores at *aggcontext the
- * identity of the memory context that aggregate transition values are
- * being stored in.
+ * identity of the memory context that aggregate transition values are being
+ * stored in. Note that the same aggregate call site (flinfo) may be called
+ * interleaved on different transition values in different contexts, so it's
+ * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
+ * cache it in the transvalue itself (for internal-type transvalues).
*/
int
AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
@@ -2160,7 +2774,11 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
if (fcinfo->context && IsA(fcinfo->context, AggState))
{
if (aggcontext)
- *aggcontext = ((AggState *) fcinfo->context)->aggcontext;
+ {
+ AggState *aggstate = ((AggState *) fcinfo->context);
+ ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set];
+ *aggcontext = cxt->ecxt_per_tuple_memory;
+ }
return AGG_CONTEXT_AGGREGATE;
}
if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
@@ -2244,8 +2862,9 @@ AggRegisterCallback(FunctionCallInfo fcinfo,
if (fcinfo->context && IsA(fcinfo->context, AggState))
{
AggState *aggstate = (AggState *) fcinfo->context;
+ ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set];
- RegisterExprContextCallback(aggstate->ss.ps.ps_ExprContext, func, arg);
+ RegisterExprContextCallback(cxt, func, arg);
return;
}