aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2016-06-26 14:33:38 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2016-06-26 14:33:38 -0400
commit19e972d5580c655423572e3c870e47b5b7c346f6 (patch)
tree0e403e5090688a2947b4aec6d4e6e50a345fec27 /src/backend/executor
parent59a3795c2589a0e6dfe4d9a886de9423b3f8b057 (diff)
downloadpostgresql-19e972d5580c655423572e3c870e47b5b7c346f6.tar.gz
postgresql-19e972d5580c655423572e3c870e47b5b7c346f6.zip
Rethink node-level representation of partial-aggregation modes.
The original coding had three separate booleans representing partial aggregation behavior, which was confusing, unreadable, and error-prone, not least because the booleans weren't always listed in the same order. It was also inadequate for the allegedly-desirable future extension to support intermediate partial aggregation, because we'd need separate markers for serialization and deserialization in such a case. Merge these bools into an enum "AggSplit" to provide symbolic names for the supported operating modes (and document what those are). By assigning the values of the enum constants carefully, we can treat AggSplit values as options bitmasks so that tests of what to do aren't noticeably more expensive than before. While at it, get rid of Aggref.aggoutputtype. That's not needed since commit 59a3795c2 got rid of setrefs.c's special-purpose Aggref comparison code, and it likewise seemed more confusing than helpful. Assorted comment cleanup as well (there's still more that I want to do in that line). catversion bump for change in Aggref node contents. Should be the last one for partial-aggregation changes. Discussion: <29309.1466699160@sss.pgh.pa.us>
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execQual.c29
-rw-r--r--src/backend/executor/nodeAgg.c173
2 files changed, 85 insertions, 117 deletions
diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c
index 01e04d3b14e..d04d1a89a7f 100644
--- a/src/backend/executor/execQual.c
+++ b/src/backend/executor/execQual.c
@@ -4510,35 +4510,20 @@ ExecInitExpr(Expr *node, PlanState *parent)
case T_Aggref:
{
AggrefExprState *astate = makeNode(AggrefExprState);
- AggState *aggstate = (AggState *) parent;
- Aggref *aggref = (Aggref *) node;
astate->xprstate.evalfunc = (ExprStateEvalFunc) ExecEvalAggref;
- if (!aggstate || !IsA(aggstate, AggState))
+ if (parent && IsA(parent, AggState))
{
- /* planner messed up */
- elog(ERROR, "Aggref found in non-Agg plan node");
- }
- if (aggref->aggpartial == aggstate->finalizeAggs)
- {
- /* planner messed up */
- if (aggref->aggpartial)
- elog(ERROR, "partial Aggref found in finalize agg plan node");
- else
- elog(ERROR, "non-partial Aggref found in non-finalize agg plan node");
- }
+ AggState *aggstate = (AggState *) parent;
- if (aggref->aggcombine != aggstate->combineStates)
+ aggstate->aggs = lcons(astate, aggstate->aggs);
+ aggstate->numaggs++;
+ }
+ else
{
/* planner messed up */
- if (aggref->aggcombine)
- elog(ERROR, "combine Aggref found in non-combine agg plan node");
- else
- elog(ERROR, "non-combine Aggref found in combine agg plan node");
+ elog(ERROR, "Aggref found in non-Agg plan node");
}
-
- aggstate->aggs = lcons(astate, aggstate->aggs);
- aggstate->numaggs++;
state = (ExprState *) astate;
}
break;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index a4479646129..b3187e66681 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -10,51 +10,33 @@
* transvalue = transfunc(transvalue, input_value(s))
* result = finalfunc(transvalue, direct_argument(s))
*
- * If a finalfunc is not supplied or finalizeAggs is false, then the result
- * is just the ending value of transvalue.
- *
- * Other behavior is also supported and is controlled by the 'combineStates'
- * and 'finalizeAggs'. 'combineStates' controls whether the trans func or
- * the combine func is used during aggregation. When 'combineStates' is
- * true we expect other (previously) aggregated states as input rather than
- * input tuples. This mode facilitates multiple aggregate stages which
- * allows us to support pushing aggregation down deeper into the plan rather
- * than leaving it for the final stage. For example with a query such as:
- *
- * SELECT count(*) FROM (SELECT * FROM a UNION ALL SELECT * FROM b);
- *
- * with this functionality the planner has the flexibility to generate a
- * plan which performs count(*) on table a and table b separately and then
- * add a combine phase to combine both results. In this case the combine
- * function would simply add both counts together.
- *
- * When multiple aggregate stages exist the planner should have set the
- * 'finalizeAggs' to true only for the final aggregtion state, and each
- * stage, apart from the very first one should have 'combineStates' set to
- * true. This permits plans such as:
- *
- * Finalize Aggregate
- * -> Partial Aggregate
- * -> Partial Aggregate
- *
- * Combine functions which use pass-by-ref states should be careful to
- * always update the 1st state parameter by adding the 2nd parameter to it,
- * rather than the other way around. If the 1st state is NULL, then it's not
- * sufficient to simply return the 2nd state, as the memory context is
- * incorrect. Instead a new state should be created in the correct aggregate
- * memory context and the 2nd state should be copied over.
- *
- * The 'serialStates' option can be used to allow multi-stage aggregation
- * for aggregates with an INTERNAL state type. When this mode is disabled
- * only a pointer to the INTERNAL aggregate states are passed around the
- * executor. When enabled, INTERNAL states are serialized and deserialized
- * as required; this is useful when data must be passed between processes.
+ * If a finalfunc is not supplied then the result is just the ending
+ * value of transvalue.
+ *
+ * Other behaviors can be selected by the "aggsplit" mode, which exists
+ * to support partial aggregation. It is possible to:
+ * * Skip running the finalfunc, so that the output is always the
+ * final transvalue state.
+ * * Substitute the combinefunc for the transfunc, so that transvalue
+ * states (propagated up from a child partial-aggregation step) are merged
+ * rather than processing raw input rows. (The statements below about
+ * the transfunc apply equally to the combinefunc, when it's selected.)
+ * * Apply the serializefunc to the output values (this only makes sense
+ * when skipping the finalfunc, since the serializefunc works on the
+ * transvalue data type).
+ * * Apply the deserializefunc to the input values (this only makes sense
+ * when using the combinefunc, for similar reasons).
+ * It is the planner's responsibility to connect up Agg nodes using these
+ * alternate behaviors in a way that makes sense, with partial aggregation
+ * results being fed to nodes that expect them.
*
* If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
* input tuples and eliminate duplicates (if required) before performing
* the above-depicted process. (However, we don't do that for ordered-set
* aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
- * so far as this module is concerned.)
+ * so far as this module is concerned.) Note that partial aggregation
+ * is not supported in these cases, since we couldn't ensure global
+ * ordering or distinctness of the inputs.
*
* If transfunc is marked "strict" in pg_proc and initcond is NULL,
* then the first non-NULL input_value is assigned directly to transvalue,
@@ -862,8 +844,6 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numTrans = aggstate->numtrans;
- Assert(!aggstate->combineStates);
-
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
@@ -948,9 +928,11 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
}
/*
- * combine_aggregates is used when running in 'combineState' mode. This
- * advances each aggregate transition state by adding another transition state
- * to it.
+ * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE
+ * mode. The principal difference is that here we may need to apply the
+ * deserialization function before running the transfn (which, in this mode,
+ * is actually the aggregate's combinefn). Also, we know we don't need to
+ * handle FILTER, DISTINCT, ORDER BY, or grouping sets.
*/
static void
combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
@@ -960,14 +942,13 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
/* combine not supported with grouping sets */
Assert(aggstate->phase->numsets == 0);
- Assert(aggstate->combineStates);
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
+ AggStatePerGroup pergroupstate = &pergroup[transno];
TupleTableSlot *slot;
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
- AggStatePerGroup pergroupstate = &pergroup[transno];
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(pertrans->evalproj, NULL);
@@ -979,15 +960,12 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
*/
if (OidIsValid(pertrans->deserialfn_oid))
{
- /*
- * Don't call a strict deserialization function with NULL input. A
- * strict deserialization function and a null value means we skip
- * calling the combine function for this state. We assume that
- * this would be a waste of time and effort anyway so just skip
- * it.
- */
+ /* Don't call a strict deserialization function with NULL input */
if (pertrans->deserialfn.fn_strict && slot->tts_isnull[0])
- continue;
+ {
+ fcinfo->arg[1] = slot->tts_values[0];
+ fcinfo->argnull[1] = slot->tts_isnull[0];
+ }
else
{
FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo;
@@ -1110,7 +1088,6 @@ advance_combine_function(AggState *aggstate,
pergroupstate->transValueIsNull = fcinfo->isnull;
MemoryContextSwitchTo(oldContext);
-
}
@@ -1415,7 +1392,7 @@ finalize_aggregate(AggState *aggstate,
}
/*
- * Compute the final value of one partial aggregate.
+ * Compute the output value of one partial aggregate.
*
* The serialization function will be run, and the result delivered, in the
* output-tuple context; caller's CurrentMemoryContext does not matter.
@@ -1432,8 +1409,8 @@ finalize_partialaggregate(AggState *aggstate,
oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
/*
- * serialfn_oid will be set if we must serialize the input state before
- * calling the combine function on the state.
+ * serialfn_oid will be set if we must serialize the transvalue before
+ * returning it
*/
if (OidIsValid(pertrans->serialfn_oid))
{
@@ -1577,12 +1554,12 @@ finalize_aggregates(AggState *aggstate,
pergroupstate);
}
- if (aggstate->finalizeAggs)
- finalize_aggregate(aggstate, peragg, pergroupstate,
- &aggvalues[aggno], &aggnulls[aggno]);
- else
+ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
finalize_partialaggregate(aggstate, peragg, pergroupstate,
&aggvalues[aggno], &aggnulls[aggno]);
+ else
+ finalize_aggregate(aggstate, peragg, pergroupstate,
+ &aggvalues[aggno], &aggnulls[aggno]);
}
}
@@ -2114,10 +2091,10 @@ agg_retrieve_direct(AggState *aggstate)
*/
for (;;)
{
- if (!aggstate->combineStates)
- advance_aggregates(aggstate, pergroup);
- else
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
combine_aggregates(aggstate, pergroup);
+ else
+ advance_aggregates(aggstate, pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
@@ -2225,10 +2202,10 @@ agg_fill_hash_table(AggState *aggstate)
entry = lookup_hash_entry(aggstate, outerslot);
/* Advance the aggregates */
- if (!aggstate->combineStates)
- advance_aggregates(aggstate, entry->pergroup);
- else
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
combine_aggregates(aggstate, entry->pergroup);
+ else
+ advance_aggregates(aggstate, entry->pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
@@ -2352,6 +2329,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->aggs = NIL;
aggstate->numaggs = 0;
aggstate->numtrans = 0;
+ aggstate->aggsplit = node->aggsplit;
aggstate->maxsets = 0;
aggstate->hashfunctions = NULL;
aggstate->projected_set = -1;
@@ -2359,11 +2337,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->peragg = NULL;
aggstate->pertrans = NULL;
aggstate->curpertrans = NULL;
- aggstate->agg_done = false;
- aggstate->combineStates = node->combineStates;
- aggstate->finalizeAggs = node->finalizeAggs;
- aggstate->serialStates = node->serialStates;
aggstate->input_done = false;
+ aggstate->agg_done = false;
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
aggstate->hashtable = NULL;
@@ -2681,6 +2656,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0);
+ /* ... and the split mode should match */
+ Assert(aggref->aggsplit == aggstate->aggsplit);
/* 1. Check for already processed aggs which can be re-used */
existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
@@ -2724,7 +2701,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* If this aggregation is performing state combines, then instead of
* using the transition function, we'll use the combine function
*/
- if (aggstate->combineStates)
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
{
transfn_oid = aggform->aggcombinefn;
@@ -2736,39 +2713,45 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
transfn_oid = aggform->aggtransfn;
/* Final function only required if we're finalizing the aggregates */
- if (aggstate->finalizeAggs)
- peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
- else
+ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
peragg->finalfn_oid = finalfn_oid = InvalidOid;
+ else
+ peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
serialfn_oid = InvalidOid;
deserialfn_oid = InvalidOid;
/*
- * Determine if we require serialization or deserialization of the
- * aggregate states. This is only required if the aggregate state is
- * internal.
+ * Check if serialization/deserialization is required. We only do it
+ * for aggregates that have transtype INTERNAL.
*/
- if (aggstate->serialStates && aggtranstype == INTERNALOID)
+ if (aggtranstype == INTERNALOID)
{
/*
- * The planner should only have generated an agg node with
- * serialStates if every aggregate with an INTERNAL state has
- * serialization/deserialization functions. Verify that.
+ * The planner should only have generated a serialize agg node if
+ * every aggregate with an INTERNAL state has a serialization
+ * function. Verify that.
*/
- if (!OidIsValid(aggform->aggserialfn))
- elog(ERROR, "serialfunc not set during serialStates aggregation step");
-
- if (!OidIsValid(aggform->aggdeserialfn))
- elog(ERROR, "deserialfunc not set during serialStates aggregation step");
+ if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
+ {
+ /* serialization only valid when not running finalfn */
+ Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
- /* serialization func only required when not finalizing aggs */
- if (!aggstate->finalizeAggs)
+ if (!OidIsValid(aggform->aggserialfn))
+ elog(ERROR, "serialfunc not provided for serialization aggregation");
serialfn_oid = aggform->aggserialfn;
+ }
+
+ /* Likewise for deserialization functions */
+ if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
+ {
+ /* deserialization only valid when combining states */
+ Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
- /* deserialization func only required when combining states */
- if (aggstate->combineStates)
+ if (!OidIsValid(aggform->aggdeserialfn))
+ elog(ERROR, "deserialfunc not provided for deserialization aggregation");
deserialfn_oid = aggform->aggdeserialfn;
+ }
}
/* Check that aggregate owner has permission to call component fns */
@@ -2853,7 +2836,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
}
/* get info about the output value's datatype */
- get_typlenbyval(aggref->aggoutputtype,
+ get_typlenbyval(aggref->aggtype,
&peragg->resulttypeLen,
&peragg->resulttypeByVal);
@@ -2972,7 +2955,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
* transfn and transfn_oid fields of pertrans refer to the combine
* function rather than the transition function.
*/
- if (aggstate->combineStates)
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
{
Expr *combinefnexpr;