aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/executor/nodeAgg.c186
-rw-r--r--src/include/nodes/execnodes.h6
-rw-r--r--src/test/regress/expected/aggregates.out6
-rw-r--r--src/test/regress/sql/aggregates.sql2
4 files changed, 130 insertions, 70 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index f62b3b22ba9..8a6dfd64e8b 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -269,20 +269,22 @@ typedef struct AggStatePerTransData
int numInputs;
/*
- * At each input row, we evaluate all argument expressions needed for all
- * the aggregates in this Agg node in a single ExecProject call. inputoff
- * is the starting index of this aggregate's argument expressions in the
- * resulting tuple (in AggState->evalslot).
- */
- int inputoff;
-
- /*
* Number of aggregated input columns to pass to the transfn. This
* includes the ORDER BY columns for ordered-set aggs, but not for plain
* aggs. (This doesn't count the transition state value!)
*/
int numTransInputs;
+ /*
+ * At each input row, we perform a single ExecProject call to evaluate all
+ * argument expressions that will certainly be needed at this row; that
+ * includes this aggregate's filter expression if it has one, or its
+ * regular argument expressions (including any ORDER BY columns) if it
+ * doesn't. inputoff is the starting index of this aggregate's required
+ * expressions in the resulting tuple.
+ */
+ int inputoff;
+
/* Oid of the state transition or combine function */
Oid transfn_oid;
@@ -295,9 +297,8 @@ typedef struct AggStatePerTransData
/* Oid of state value's datatype */
Oid aggtranstype;
- /* ExprStates of the FILTER and argument expressions. */
- ExprState *aggfilter; /* state of FILTER expression, if any */
- List *aggdirectargs; /* states of direct-argument expressions */
+ /* ExprStates for any direct-argument expressions */
+ List *aggdirectargs;
/*
* fmgr lookup data for transition function or combine function. Note in
@@ -353,20 +354,21 @@ typedef struct AggStatePerTransData
transtypeByVal;
/*
- * Stuff for evaluation of aggregate inputs in cases where the aggregate
- * requires sorted input. The arguments themselves will be evaluated via
- * AggState->evalslot/evalproj for all aggregates at once, but we only
- * want to sort the relevant columns for individual aggregates.
+ * Stuff for evaluation of aggregate inputs, when they must be evaluated
+ * separately because there's a FILTER expression. In such cases we will
+ * create a sortslot and the result will be stored there, whether or not
+ * we're actually sorting.
*/
- TupleDesc sortdesc; /* descriptor of input tuples */
+ ProjectionInfo *evalproj; /* projection machinery */
/*
* Slots for holding the evaluated input arguments. These are set up
- * during ExecInitAgg() and then used for each input row requiring
- * processing besides what's done in AggState->evalproj.
+ * during ExecInitAgg() and then used for each input row requiring either
+ * FILTER or ORDER BY/DISTINCT processing.
*/
TupleTableSlot *sortslot; /* current input tuple */
TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */
+ TupleDesc sortdesc; /* descriptor of input tuples */
/*
* These values are working state that is initialized at the start of an
@@ -983,30 +985,36 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numHashes = aggstate->num_hashes;
int numTrans = aggstate->numtrans;
- TupleTableSlot *slot = aggstate->evalslot;
+ TupleTableSlot *combinedslot;
- /* compute input for all aggregates */
- if (aggstate->evalproj)
- aggstate->evalslot = ExecProject(aggstate->evalproj);
+ /* compute required inputs for all aggregates */
+ combinedslot = ExecProject(aggstate->combinedproj);
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
- ExprState *filter = pertrans->aggfilter;
int numTransInputs = pertrans->numTransInputs;
- int i;
int inputoff = pertrans->inputoff;
+ TupleTableSlot *slot;
+ int i;
/* Skip anything FILTERed out */
- if (filter)
+ if (pertrans->aggref->aggfilter)
{
- Datum res;
- bool isnull;
-
- res = ExecEvalExprSwitchContext(filter, aggstate->tmpcontext,
- &isnull);
- if (isnull || !DatumGetBool(res))
+ /* Check the result of the filter expression */
+ if (combinedslot->tts_isnull[inputoff] ||
+ !DatumGetBool(combinedslot->tts_values[inputoff]))
continue;
+
+ /* Now it's safe to evaluate this agg's arguments */
+ slot = ExecProject(pertrans->evalproj);
+ /* There's no offset needed in this slot, of course */
+ inputoff = 0;
+ }
+ else
+ {
+ /* arguments are already evaluated into combinedslot @ inputoff */
+ slot = combinedslot;
}
if (pertrans->numSortCols > 0)
@@ -1040,11 +1048,21 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
tuplesort_putdatum(pertrans->sortstates[setno],
slot->tts_values[inputoff],
slot->tts_isnull[inputoff]);
+ else if (pertrans->aggref->aggfilter)
+ {
+ /*
+ * When filtering and ordering, we already have a slot
+ * containing just the argument columns.
+ */
+ Assert(slot == pertrans->sortslot);
+ tuplesort_puttupleslot(pertrans->sortstates[setno], slot);
+ }
else
{
/*
- * Copy slot contents, starting from inputoff, into sort
- * slot.
+ * Copy argument columns from combined slot, starting at
+ * inputoff, into sortslot, so that we can store just the
+ * columns we want.
*/
ExecClearTuple(pertrans->sortslot);
memcpy(pertrans->sortslot->tts_values,
@@ -1053,9 +1071,9 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGro
memcpy(pertrans->sortslot->tts_isnull,
&slot->tts_isnull[inputoff],
pertrans->numInputs * sizeof(bool));
- pertrans->sortslot->tts_nvalid = pertrans->numInputs;
ExecStoreVirtualTuple(pertrans->sortslot);
- tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot);
+ tuplesort_puttupleslot(pertrans->sortstates[setno],
+ pertrans->sortslot);
}
}
}
@@ -1127,7 +1145,7 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
Assert(aggstate->phase->numsets <= 1);
/* compute input for all aggregates */
- slot = ExecProject(aggstate->evalproj);
+ slot = ExecProject(aggstate->combinedproj);
for (transno = 0; transno < numTrans; transno++)
{
@@ -2691,6 +2709,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
int phase;
int phaseidx;
List *combined_inputeval;
+ TupleDesc combineddesc;
+ TupleTableSlot *combinedslot;
ListCell *l;
Bitmapset *all_grouped_cols = NULL;
int numGroupingSets = 1;
@@ -3366,19 +3386,17 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->numtrans = transno + 1;
/*
- * Build a single projection computing the aggregate arguments for all
+ * Build a single projection computing the required arguments for all
* aggregates at once; if there's more than one, that's considerably
* faster than doing it separately for each.
*
- * First create a targetlist combining the targetlists of all the
- * per-trans states.
+ * First create a targetlist representing the values to compute.
*/
combined_inputeval = NIL;
column_offset = 0;
for (transno = 0; transno < aggstate->numtrans; transno++)
{
AggStatePerTrans pertrans = &pertransstates[transno];
- ListCell *arg;
/*
* Mark this per-trans state with its starting column in the combined
@@ -3387,38 +3405,70 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
pertrans->inputoff = column_offset;
/*
- * Adjust resnos in the copied target entries to match the combined
- * slot.
+ * If the aggregate has a FILTER, we can only evaluate the filter
+ * expression, not the actual input expressions, during the combined
+ * eval step --- unless we're ignoring the filter because this node is
+ * running combinefns not transfns.
*/
- foreach(arg, pertrans->aggref->args)
+ if (pertrans->aggref->aggfilter &&
+ !DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
{
- TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
TargetEntry *tle;
- tle = flatCopyTargetEntry(source_tle);
- tle->resno += column_offset;
-
+ tle = makeTargetEntry(pertrans->aggref->aggfilter,
+ column_offset + 1, NULL, false);
combined_inputeval = lappend(combined_inputeval, tle);
+ column_offset++;
+
+ /*
+ * We'll need separate projection machinery for the real args.
+ * Arrange to evaluate them into the sortslot previously created.
+ */
+ Assert(pertrans->sortslot);
+ pertrans->evalproj = ExecBuildProjectionInfo(pertrans->aggref->args,
+ aggstate->tmpcontext,
+ pertrans->sortslot,
+ &aggstate->ss.ps,
+ NULL);
}
+ else
+ {
+ /*
+ * Add agg's input expressions to combined_inputeval, adjusting
+ * resnos in the copied target entries to match the combined slot.
+ */
+ ListCell *arg;
+
+ foreach(arg, pertrans->aggref->args)
+ {
+ TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
+ TargetEntry *tle;
+
+ tle = flatCopyTargetEntry(source_tle);
+ tle->resno += column_offset;
- column_offset += list_length(pertrans->aggref->args);
+ combined_inputeval = lappend(combined_inputeval, tle);
+ }
+
+ column_offset += list_length(pertrans->aggref->args);
+ }
}
/* Now create a projection for the combined targetlist */
- aggstate->evaldesc = ExecTypeFromTL(combined_inputeval, false);
- aggstate->evalslot = ExecInitExtraTupleSlot(estate);
- aggstate->evalproj = ExecBuildProjectionInfo(combined_inputeval,
- aggstate->tmpcontext,
- aggstate->evalslot,
- &aggstate->ss.ps,
- NULL);
- ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc);
+ combineddesc = ExecTypeFromTL(combined_inputeval, false);
+ combinedslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(combinedslot, combineddesc);
+ aggstate->combinedproj = ExecBuildProjectionInfo(combined_inputeval,
+ aggstate->tmpcontext,
+ combinedslot,
+ &aggstate->ss.ps,
+ NULL);
/*
* Last, check whether any more aggregates got added onto the node while
* we processed the expressions for the aggregate arguments (including not
- * only the regular arguments handled immediately above, but any FILTER
- * expressions and direct arguments we might've handled earlier). If so,
+ * only the regular arguments and FILTER expressions handled immediately
+ * above, but any direct arguments we might've handled earlier). If so,
* we have nested aggregate functions, which is semantically nonsensical,
* so complain. (This should have been caught by the parser, so we don't
* need to work hard on a helpful error message; but we defend against it
@@ -3483,6 +3533,8 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
else
pertrans->numTransInputs = numArguments;
+ /* inputoff and evalproj will be set up later, in ExecInitAgg */
+
/*
* When combining states, we have no use at all for the aggregate
* function's transfn. Instead we use the combinefn. In this case, the
@@ -3598,9 +3650,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
}
- /* Initialize the input and FILTER expressions */
- pertrans->aggfilter = ExecInitExpr(aggref->aggfilter,
- (PlanState *) aggstate);
+ /* Initialize any direct-argument expressions */
pertrans->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
(PlanState *) aggstate);
@@ -3634,16 +3684,20 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
pertrans->numSortCols = numSortCols;
pertrans->numDistinctCols = numDistinctCols;
- if (numSortCols > 0)
+ /*
+ * If we have either sorting or filtering to do, create a tupledesc and
+ * slot corresponding to the aggregated inputs (including sort
+ * expressions) of the agg.
+ */
+ if (numSortCols > 0 || aggref->aggfilter)
{
- /*
- * Get a tupledesc and slot corresponding to the aggregated inputs
- * (including sort expressions) of the agg.
- */
pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
pertrans->sortslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc);
+ }
+ if (numSortCols > 0)
+ {
/*
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
* (yet)
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 01ceeef39c7..52d35325806 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1830,10 +1830,8 @@ typedef struct AggState
int num_hashes;
AggStatePerHash perhash;
AggStatePerGroup *hash_pergroup; /* array of per-group pointers */
- /* support for evaluation of agg inputs */
- TupleTableSlot *evalslot; /* slot for agg inputs */
- ProjectionInfo *evalproj; /* projection machinery */
- TupleDesc evaldesc; /* descriptor of input tuples */
+ /* support for evaluation of agg input expressions: */
+ ProjectionInfo *combinedproj; /* projection machinery */
} AggState;
/* ----------------
diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out
index 82ede655aa9..c4ea86ff050 100644
--- a/src/test/regress/expected/aggregates.out
+++ b/src/test/regress/expected/aggregates.out
@@ -1388,6 +1388,12 @@ select min(unique1) filter (where unique1 > 100) from tenk1;
101
(1 row)
+select sum(1/ten) filter (where ten > 0) from tenk1;
+ sum
+------
+ 1000
+(1 row)
+
select ten, sum(distinct four) filter (where four::text ~ '123') from onek a
group by ten;
ten | sum
diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql
index 77314522eb9..fefbef89e08 100644
--- a/src/test/regress/sql/aggregates.sql
+++ b/src/test/regress/sql/aggregates.sql
@@ -524,6 +524,8 @@ drop table bytea_test_table;
select min(unique1) filter (where unique1 > 100) from tenk1;
+select sum(1/ten) filter (where ten > 0) from tenk1;
+
select ten, sum(distinct four) filter (where four::text ~ '123') from onek a
group by ten;