diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 1999-12-13 01:27:21 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 1999-12-13 01:27:21 +0000 |
commit | a8ae19ec3d13452de931736126d0786a148ee643 (patch) | |
tree | 3dbdbba249c27b5362865f9ff1c7b2675c136574 /src/backend/executor/nodeAgg.c | |
parent | efb36d2be8272d03167fe4205b640129ffe583fb (diff) | |
download | postgresql-a8ae19ec3d13452de931736126d0786a148ee643.tar.gz postgresql-a8ae19ec3d13452de931736126d0786a148ee643.zip |
aggregate(DISTINCT ...) works, per SQL spec.
Note this forces initdb because of change of Aggref node in stored rules.
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 562 |
1 files changed, 364 insertions, 198 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 0956af455f1..0a95c92347f 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -3,15 +3,35 @@ * nodeAgg.c * Routines to handle aggregate nodes. * - * Copyright (c) 1994, Regents of the University of California + * ExecAgg evaluates each aggregate in the following steps: (initcond1, + * initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are + * the transition functions.) + * + * value1 = initcond1 + * value2 = initcond2 + * foreach input_value do + * value1 = sfunc1(value1, input_value) + * value2 = sfunc2(value2) + * value1 = finalfunc(value1, value2) + * + * If initcond1 is NULL then the first non-NULL input_value is + * assigned directly to value1. sfunc1 isn't applied until value1 + * is non-NULL. + * + * sfunc1 is never applied when the current tuple's input_value is NULL. + * sfunc2 is applied for each tuple if the aggref is marked 'usenulls', + * otherwise it is only applied when input_value is not NULL. + * (usenulls was formerly used for COUNT(*), but is no longer needed for + * that purpose; as of 10/1999 the support for usenulls is dead code. + * I have not removed it because it seems like a potentially useful + * feature for user-defined aggregates. We'd just need to add a + * flag column to pg_aggregate and a parameter to CREATE AGGREGATE...) * * - * NOTE - * The implementation of Agg node has been reworked to handle legal - * SQL aggregates. (Do not expect POSTQUEL semantics.) -- ay 2/95 + * Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.59 1999/10/30 02:35:14 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.60 1999/12/13 01:26:52 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -20,11 +40,15 @@ #include "access/heapam.h" #include "catalog/pg_aggregate.h" +#include "catalog/pg_operator.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "optimizer/clauses.h" +#include "parser/parse_expr.h" +#include "parser/parse_oper.h" #include "parser/parse_type.h" #include "utils/syscache.h" +#include "utils/tuplesort.h" /* * AggStatePerAggData - per-aggregate working state for the Agg scan @@ -36,6 +60,9 @@ typedef struct AggStatePerAggData * thereafter: */ + /* Link to Aggref node this working state is for */ + Aggref *aggref; + /* Oids of transfer functions */ Oid xfn1_oid; Oid xfn2_oid; @@ -48,6 +75,18 @@ typedef struct AggStatePerAggData FmgrInfo xfn2; FmgrInfo finalfn; /* + * Type of input data and Oid of sort operator to use for it; + * only set/used when aggregate has DISTINCT flag. (These are not + * used directly by nodeAgg, but must be passed to the Tuplesort object.) + */ + Oid inputType; + Oid sortOperator; + /* + * fmgr lookup data for input type's equality operator --- only set/used + * when aggregate has DISTINCT flag. + */ + FmgrInfo equalfn; + /* * initial values from pg_aggregate entry */ Datum initValue1; /* for transtype1 */ @@ -55,19 +94,29 @@ typedef struct AggStatePerAggData bool initValue1IsNull, initValue2IsNull; /* - * We need the len and byval info for the agg's transition status types - * in order to know how to copy/delete values. + * We need the len and byval info for the agg's input and transition + * data types in order to know how to copy/delete values. */ - int transtype1Len, + int inputtypeLen, + transtype1Len, transtype2Len; - bool transtype1ByVal, + bool inputtypeByVal, + transtype1ByVal, transtype2ByVal; /* * These values are working state that is initialized at the start - * of an input tuple group and updated for each input tuple: + * of an input tuple group and updated for each input tuple. + * + * For a simple (non DISTINCT) aggregate, we just feed the input values + * straight to the transition functions. If it's DISTINCT, we pass the + * input values into a Tuplesort object; then at completion of the input + * tuple group, we scan the sorted values, eliminate duplicates, and run + * the transition functions on the rest. */ + Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */ + Datum value1, /* current transfer values 1 and 2 */ value2; bool value1IsNull, @@ -82,28 +131,248 @@ typedef struct AggStatePerAggData } AggStatePerAggData; +static void initialize_aggregate (AggStatePerAgg peraggstate); +static void advance_transition_functions (AggStatePerAgg peraggstate, + Datum newVal, bool isNull); +static void finalize_aggregate (AggStatePerAgg peraggstate, + Datum *resultVal, bool *resultIsNull); +static Datum copyDatum(Datum val, int typLen, bool typByVal); + + /* - * Helper routine to make a copy of a Datum. - * - * NB: input had better not be a NULL; might cause null-pointer dereference. + * Initialize one aggregate for a new set of input values. */ -static Datum -copyDatum(Datum val, int typLen, bool typByVal) +static void +initialize_aggregate (AggStatePerAgg peraggstate) { - if (typByVal) - return val; + Aggref *aggref = peraggstate->aggref; + + /* + * Start a fresh sort operation for each DISTINCT aggregate. + */ + if (aggref->aggdistinct) + { + /* In case of rescan, maybe there could be an uncompleted + * sort operation? Clean it up if so. + */ + if (peraggstate->sortstate) + tuplesort_end(peraggstate->sortstate); + + peraggstate->sortstate = + tuplesort_begin_datum(peraggstate->inputType, + peraggstate->sortOperator, + false); + } + + /* + * (Re)set value1 and value2 to their initial values. + */ + if (OidIsValid(peraggstate->xfn1_oid) && + ! peraggstate->initValue1IsNull) + peraggstate->value1 = copyDatum(peraggstate->initValue1, + peraggstate->transtype1Len, + peraggstate->transtype1ByVal); + else + peraggstate->value1 = (Datum) NULL; + peraggstate->value1IsNull = peraggstate->initValue1IsNull; + + if (OidIsValid(peraggstate->xfn2_oid) && + ! peraggstate->initValue2IsNull) + peraggstate->value2 = copyDatum(peraggstate->initValue2, + peraggstate->transtype2Len, + peraggstate->transtype2ByVal); else + peraggstate->value2 = (Datum) NULL; + peraggstate->value2IsNull = peraggstate->initValue2IsNull; + + /* ------------------------------------------ + * If the initial value for the first transition function + * doesn't exist in the pg_aggregate table then we will let + * the first value returned from the outer procNode become + * the initial value. (This is useful for aggregates like + * max{} and min{}.) The noInitValue flag signals that we + * still need to do this. + * ------------------------------------------ + */ + peraggstate->noInitValue = peraggstate->initValue1IsNull; +} + +/* + * Given a new input value, advance the transition functions of an aggregate. + * + * Note: if the agg does not have usenulls set, null inputs will be filtered + * out before reaching here. + */ +static void +advance_transition_functions (AggStatePerAgg peraggstate, + Datum newVal, bool isNull) +{ + Datum args[2]; + + if (OidIsValid(peraggstate->xfn1_oid) && !isNull) { - char *newVal; + if (peraggstate->noInitValue) + { + /* + * value1 has not been initialized. This is the first non-NULL + * input value. We use it as the initial value for value1. + * + * XXX We assume, without having checked, that the agg's input + * type is binary-compatible with its transtype1! + * + * We have to copy the datum since the tuple from which it came + * will be freed on the next iteration of the scan. + */ + peraggstate->value1 = copyDatum(newVal, + peraggstate->transtype1Len, + peraggstate->transtype1ByVal); + peraggstate->value1IsNull = false; + peraggstate->noInitValue = false; + } + else + { + /* apply transition function 1 */ + args[0] = peraggstate->value1; + args[1] = newVal; + newVal = (Datum) fmgr_c(&peraggstate->xfn1, + (FmgrValues *) args, + &isNull); + if (! peraggstate->transtype1ByVal) + pfree(peraggstate->value1); + peraggstate->value1 = newVal; + } + } - if (typLen == -1) /* variable length type? */ - typLen = VARSIZE((struct varlena *) DatumGetPointer(val)); - newVal = (char *) palloc(typLen); - memcpy(newVal, DatumGetPointer(val), typLen); - return PointerGetDatum(newVal); + if (OidIsValid(peraggstate->xfn2_oid)) + { + /* apply transition function 2 */ + args[0] = peraggstate->value2; + isNull = false; /* value2 cannot be null, currently */ + newVal = (Datum) fmgr_c(&peraggstate->xfn2, + (FmgrValues *) args, + &isNull); + if (! peraggstate->transtype2ByVal) + pfree(peraggstate->value2); + peraggstate->value2 = newVal; } } +/* + * Compute the final value of one aggregate. + */ +static void +finalize_aggregate (AggStatePerAgg peraggstate, + Datum *resultVal, bool *resultIsNull) +{ + Aggref *aggref = peraggstate->aggref; + char *args[2]; + + /* + * If it's a DISTINCT aggregate, all we've done so far is to stuff the + * input values into the sort object. Complete the sort, then run + * the transition functions on the non-duplicate values. Note that + * DISTINCT always suppresses nulls, per SQL spec, regardless of usenulls. + */ + if (aggref->aggdistinct) + { + Datum oldVal = (Datum) 0; + bool haveOldVal = false; + Datum newVal; + bool isNull; + + tuplesort_performsort(peraggstate->sortstate); + while (tuplesort_getdatum(peraggstate->sortstate, true, + &newVal, &isNull)) + { + if (isNull) + continue; + if (haveOldVal) + { + Datum equal; + + equal = (Datum) (*fmgr_faddr(&peraggstate->equalfn)) (oldVal, + newVal); + if (DatumGetInt32(equal) != 0) + { + if (! peraggstate->inputtypeByVal) + pfree(DatumGetPointer(newVal)); + continue; + } + } + advance_transition_functions(peraggstate, newVal, false); + if (haveOldVal && ! peraggstate->inputtypeByVal) + pfree(DatumGetPointer(oldVal)); + oldVal = newVal; + haveOldVal = true; + } + if (haveOldVal && ! peraggstate->inputtypeByVal) + pfree(DatumGetPointer(oldVal)); + tuplesort_end(peraggstate->sortstate); + peraggstate->sortstate = NULL; + } + + /* + * Now apply the agg's finalfn, or substitute the appropriate transition + * value if there is no finalfn. + * + * XXX For now, only apply finalfn if we got at least one + * non-null input value. This prevents zero divide in AVG(). + * If we had cleaner handling of null inputs/results in functions, + * we could probably take out this hack and define the result + * for no inputs as whatever finalfn returns for null input. + */ + if (OidIsValid(peraggstate->finalfn_oid) && + ! peraggstate->noInitValue) + { + if (peraggstate->finalfn.fn_nargs > 1) + { + args[0] = (char *) peraggstate->value1; + args[1] = (char *) peraggstate->value2; + } + else if (OidIsValid(peraggstate->xfn1_oid)) + args[0] = (char *) peraggstate->value1; + else if (OidIsValid(peraggstate->xfn2_oid)) + args[0] = (char *) peraggstate->value2; + else + elog(ERROR, "ExecAgg: no valid transition functions??"); + *resultIsNull = false; + *resultVal = (Datum) fmgr_c(&peraggstate->finalfn, + (FmgrValues *) args, + resultIsNull); + } + else if (OidIsValid(peraggstate->xfn1_oid)) + { + /* Return value1 */ + *resultVal = peraggstate->value1; + *resultIsNull = peraggstate->value1IsNull; + /* prevent pfree below */ + peraggstate->value1IsNull = true; + } + else if (OidIsValid(peraggstate->xfn2_oid)) + { + /* Return value2 */ + *resultVal = peraggstate->value2; + *resultIsNull = peraggstate->value2IsNull; + /* prevent pfree below */ + peraggstate->value2IsNull = true; + } + else + elog(ERROR, "ExecAgg: no valid transition functions??"); + + /* + * Release any per-group working storage, unless we're passing + * it back as the result of the aggregate. + */ + if (OidIsValid(peraggstate->xfn1_oid) && + ! peraggstate->value1IsNull && + ! peraggstate->transtype1ByVal) + pfree(peraggstate->value1); + + if (OidIsValid(peraggstate->xfn2_oid) && + ! peraggstate->value2IsNull && + ! peraggstate->transtype2ByVal) + pfree(peraggstate->value2); +} /* --------------------------------------- * @@ -118,30 +387,6 @@ copyDatum(Datum val, int typLen, bool typByVal) * the expression context to be used when ExecProject evaluates the * result tuple. * - * ExecAgg evaluates each aggregate in the following steps: (initcond1, - * initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are - * the transition functions.) - * - * value1 = initcond1 - * value2 = initcond2 - * foreach tuple do - * value1 = sfunc1(value1, aggregated_value) - * value2 = sfunc2(value2) - * value1 = finalfunc(value1, value2) - * - * If initcond1 is NULL then the first non-NULL aggregated_value is - * assigned directly to value1. sfunc1 isn't applied until value1 - * is non-NULL. - * - * sfunc1 is never applied when the current tuple's aggregated_value - * is NULL. sfunc2 is applied for each tuple if the aggref is marked - * 'usenulls', otherwise it is only applied when aggregated_value is - * not NULL. (usenulls was formerly used for COUNT(*), but is no longer - * needed for that purpose; as of 10/1999 the support for usenulls is - * dead code. I have not removed it because it seems like a potentially - * useful feature for user-defined aggregates. We'd just need to add a - * flag column to pg_aggregate and a parameter to CREATE AGGREGATE...) - * * If the outer subplan is a Group node, ExecAgg returns as many tuples * as there are groups. * @@ -161,7 +406,6 @@ ExecAgg(Agg *node) TupleTableSlot *resultSlot; HeapTuple inputTuple; int aggno; - List *alist; bool isDone; bool isNull; @@ -190,42 +434,11 @@ ExecAgg(Agg *node) /* * Initialize working state for a new input tuple group */ - aggno = -1; - foreach(alist, aggstate->aggs) + for (aggno = 0; aggno < aggstate->numaggs; aggno++) { - AggStatePerAgg peraggstate = &peragg[++aggno]; + AggStatePerAgg peraggstate = &peragg[aggno]; - /* - * (Re)set value1 and value2 to their initial values. - */ - if (OidIsValid(peraggstate->xfn1_oid) && - ! peraggstate->initValue1IsNull) - peraggstate->value1 = copyDatum(peraggstate->initValue1, - peraggstate->transtype1Len, - peraggstate->transtype1ByVal); - else - peraggstate->value1 = (Datum) NULL; - peraggstate->value1IsNull = peraggstate->initValue1IsNull; - - if (OidIsValid(peraggstate->xfn2_oid) && - ! peraggstate->initValue2IsNull) - peraggstate->value2 = copyDatum(peraggstate->initValue2, - peraggstate->transtype2Len, - peraggstate->transtype2ByVal); - else - peraggstate->value2 = (Datum) NULL; - peraggstate->value2IsNull = peraggstate->initValue2IsNull; - - /* ------------------------------------------ - * If the initial value for the first transition function - * doesn't exist in the pg_aggregate table then we will let - * the first value returned from the outer procNode become - * the initial value. (This is useful for aggregates like - * max{} and min{}.) The noInitValue flag signals that we - * still need to do this. - * ------------------------------------------ - */ - peraggstate->noInitValue = peraggstate->initValue1IsNull; + initialize_aggregate(peraggstate); } inputTuple = NULL; /* no saved input tuple yet */ @@ -243,13 +456,11 @@ ExecAgg(Agg *node) break; econtext->ecxt_scantuple = outerslot; - aggno = -1; - foreach(alist, aggstate->aggs) + for (aggno = 0; aggno < aggstate->numaggs; aggno++) { - Aggref *aggref = (Aggref *) lfirst(alist); - AggStatePerAgg peraggstate = &peragg[++aggno]; + AggStatePerAgg peraggstate = &peragg[aggno]; + Aggref *aggref = peraggstate->aggref; Datum newVal; - Datum args[2]; newVal = ExecEvalExpr(aggref->target, econtext, &isNull, &isDone); @@ -257,53 +468,12 @@ ExecAgg(Agg *node) if (isNull && !aggref->usenulls) continue; /* ignore this tuple for this agg */ - if (OidIsValid(peraggstate->xfn1_oid) && !isNull) - { - if (peraggstate->noInitValue) - { - /* - * value1 has not been initialized. This is the - * first non-NULL input value. We use it as the - * initial value for value1. XXX We assume, - * without having checked, that the agg's input type - * is binary-compatible with its transtype1! - * - * We have to copy the datum since the tuple from - * which it came will be freed on the next iteration - * of the scan. - */ - peraggstate->value1 = copyDatum(newVal, - peraggstate->transtype1Len, - peraggstate->transtype1ByVal); - peraggstate->value1IsNull = false; - peraggstate->noInitValue = false; - } - else - { - /* apply transition function 1 */ - args[0] = peraggstate->value1; - args[1] = newVal; - newVal = (Datum) fmgr_c(&peraggstate->xfn1, - (FmgrValues *) args, - &isNull); - if (! peraggstate->transtype1ByVal) - pfree(peraggstate->value1); - peraggstate->value1 = newVal; - } - } - - if (OidIsValid(peraggstate->xfn2_oid)) - { - /* apply transition function 2 */ - args[0] = peraggstate->value2; - isNull = false; /* value2 cannot be null, currently */ - newVal = (Datum) fmgr_c(&peraggstate->xfn2, - (FmgrValues *) args, - &isNull); - if (! peraggstate->transtype2ByVal) - pfree(peraggstate->value2); - peraggstate->value2 = newVal; - } + if (aggref->aggdistinct) + tuplesort_putdatum(peraggstate->sortstate, + newVal, isNull); + else + advance_transition_functions(peraggstate, + newVal, isNull); } /* @@ -320,70 +490,12 @@ ExecAgg(Agg *node) * Done scanning input tuple group. * Finalize each aggregate calculation. */ - aggno = -1; - foreach(alist, aggstate->aggs) + for (aggno = 0; aggno < aggstate->numaggs; aggno++) { - AggStatePerAgg peraggstate = &peragg[++aggno]; - char *args[2]; - - /* - * XXX For now, only apply finalfn if we got at least one - * non-null input value. This prevents zero divide in AVG(). - * If we had cleaner handling of null inputs/results in functions, - * we could probably take out this hack and define the result - * for no inputs as whatever finalfn returns for null input. - */ - if (OidIsValid(peraggstate->finalfn_oid) && - ! peraggstate->noInitValue) - { - if (peraggstate->finalfn.fn_nargs > 1) - { - args[0] = (char *) peraggstate->value1; - args[1] = (char *) peraggstate->value2; - } - else if (OidIsValid(peraggstate->xfn1_oid)) - args[0] = (char *) peraggstate->value1; - else if (OidIsValid(peraggstate->xfn2_oid)) - args[0] = (char *) peraggstate->value2; - else - elog(ERROR, "ExecAgg: no valid transition functions??"); - aggnulls[aggno] = false; - aggvalues[aggno] = (Datum) fmgr_c(&peraggstate->finalfn, - (FmgrValues *) args, - &(aggnulls[aggno])); - } - else if (OidIsValid(peraggstate->xfn1_oid)) - { - /* Return value1 */ - aggvalues[aggno] = peraggstate->value1; - aggnulls[aggno] = peraggstate->value1IsNull; - /* prevent pfree below */ - peraggstate->value1IsNull = true; - } - else if (OidIsValid(peraggstate->xfn2_oid)) - { - /* Return value2 */ - aggvalues[aggno] = peraggstate->value2; - aggnulls[aggno] = peraggstate->value2IsNull; - /* prevent pfree below */ - peraggstate->value2IsNull = true; - } - else - elog(ERROR, "ExecAgg: no valid transition functions??"); - - /* - * Release any per-group working storage, unless we're passing - * it back as the result of the aggregate. - */ - if (OidIsValid(peraggstate->xfn1_oid) && - ! peraggstate->value1IsNull && - ! peraggstate->transtype1ByVal) - pfree(peraggstate->value1); + AggStatePerAgg peraggstate = &peragg[aggno]; - if (OidIsValid(peraggstate->xfn2_oid) && - ! peraggstate->value2IsNull && - ! peraggstate->transtype2ByVal) - pfree(peraggstate->value2); + finalize_aggregate(peraggstate, + & aggvalues[aggno], & aggnulls[aggno]); } /* @@ -458,14 +570,14 @@ ExecAgg(Agg *node) /* * Form a projection tuple using the aggregate results and the - * representative input tuple. Store it in the result tuple slot, - * and return it if it meets my qual condition. + * representative input tuple. Store it in the result tuple slot. */ resultSlot = ExecProject(projInfo, &isDone); /* * If the completed tuple does not match the qualifications, * it is ignored and we loop back to try to process another group. + * Otherwise, return the tuple. */ } while (! ExecQual(node->plan.qual, econtext)); @@ -505,6 +617,11 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) /* * find aggregates in targetlist and quals + * + * Note: pull_agg_clauses also checks that no aggs contain other agg + * calls in their arguments. This would make no sense under SQL semantics + * anyway (and it's forbidden by the spec). Because that is true, we + * don't need to worry about evaluating the aggs in any particular order. */ aggstate->aggs = nconc(pull_agg_clause((Node *) node->plan.targetlist), pull_agg_clause((Node *) node->plan.qual)); @@ -588,6 +705,9 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) /* Mark Aggref node with its associated index in the result array */ aggref->aggno = aggno; + /* Fill in the peraggstate data */ + peraggstate->aggref = aggref; + aggTuple = SearchSysCacheTuple(AGGNAME, PointerGetDatum(aggname), ObjectIdGetDatum(aggref->basetype), @@ -644,6 +764,29 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) { fmgr_info(finalfn_oid, &peraggstate->finalfn); } + + if (aggref->aggdistinct) + { + Oid inputType = exprType(aggref->target); + Operator eq_operator; + Form_pg_operator pgopform; + + peraggstate->inputType = inputType; + typeInfo = typeidType(inputType); + peraggstate->inputtypeLen = typeLen(typeInfo); + peraggstate->inputtypeByVal = typeByVal(typeInfo); + + eq_operator = oper("=", inputType, inputType, true); + if (!HeapTupleIsValid(eq_operator)) + { + elog(ERROR, "Unable to identify an equality operator for type '%s'", + typeidTypeName(inputType)); + } + pgopform = (Form_pg_operator) GETSTRUCT(eq_operator); + fmgr_info(pgopform->oprcode, &(peraggstate->equalfn)); + peraggstate->sortOperator = any_ordering_op(inputType); + peraggstate->sortstate = NULL; + } } return TRUE; @@ -690,3 +833,26 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node); } + + +/* + * Helper routine to make a copy of a Datum. + * + * NB: input had better not be a NULL; might cause null-pointer dereference. + */ +static Datum +copyDatum(Datum val, int typLen, bool typByVal) +{ + if (typByVal) + return val; + else + { + char *newVal; + + if (typLen == -1) /* variable length type? */ + typLen = VARSIZE((struct varlena *) DatumGetPointer(val)); + newVal = (char *) palloc(typLen); + memcpy(newVal, DatumGetPointer(val), typLen); + return PointerGetDatum(newVal); + } +} |