aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/aggregatecmds.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2014-04-12 11:58:53 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2014-04-12 12:03:30 -0400
commita9d9acbf219b9e96585779cd5f99d674d4ccba74 (patch)
tree4bd26a78fa7f6f0bc558c611278e42a9f41d4875 /src/backend/commands/aggregatecmds.c
parent3c41b812c5578fd7bd5c2de42941012d7d56dde2 (diff)
downloadpostgresql-a9d9acbf219b9e96585779cd5f99d674d4ccba74.tar.gz
postgresql-a9d9acbf219b9e96585779cd5f99d674d4ccba74.zip
Create infrastructure for moving-aggregate optimization.
Until now, when executing an aggregate function as a window function within a window with moving frame start (that is, any frame start mode except UNBOUNDED PRECEDING), we had to recalculate the aggregate from scratch each time the frame head moved. This patch allows an aggregate definition to include an alternate "moving aggregate" implementation that includes an inverse transition function for removing rows from the aggregate's running state. As long as this can be done successfully, runtime is proportional to the total number of input rows, rather than to the number of input rows times the average frame length. This commit includes the core infrastructure, documentation, and regression tests using user-defined aggregates. Follow-on commits will update some of the built-in aggregates to use this feature. David Rowley and Florian Pflug, reviewed by Dean Rasheed; additional hacking by me
Diffstat (limited to 'src/backend/commands/aggregatecmds.c')
-rw-r--r--src/backend/commands/aggregatecmds.c101
1 files changed, 100 insertions, 1 deletions
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index 640e19cf120..9714112f6d4 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -61,11 +61,17 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
char aggKind = AGGKIND_NORMAL;
List *transfuncName = NIL;
List *finalfuncName = NIL;
+ List *mtransfuncName = NIL;
+ List *minvtransfuncName = NIL;
+ List *mfinalfuncName = NIL;
List *sortoperatorName = NIL;
TypeName *baseType = NULL;
TypeName *transType = NULL;
+ TypeName *mtransType = NULL;
int32 transSpace = 0;
+ int32 mtransSpace = 0;
char *initval = NULL;
+ char *minitval = NULL;
int numArgs;
int numDirectArgs = 0;
oidvector *parameterTypes;
@@ -75,7 +81,9 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
List *parameterDefaults;
Oid variadicArgType;
Oid transTypeId;
+ Oid mtransTypeId = InvalidOid;
char transTypeType;
+ char mtransTypeType = 0;
ListCell *pl;
/* Convert list of names to a name and namespace */
@@ -114,6 +122,12 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
finalfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "msfunc") == 0)
+ mtransfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "minvfunc") == 0)
+ minvtransfuncName = defGetQualifiedName(defel);
+ else if (pg_strcasecmp(defel->defname, "mfinalfunc") == 0)
+ mfinalfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "sortop") == 0)
sortoperatorName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "basetype") == 0)
@@ -135,10 +149,16 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transType = defGetTypeName(defel);
else if (pg_strcasecmp(defel->defname, "sspace") == 0)
transSpace = defGetInt32(defel);
+ else if (pg_strcasecmp(defel->defname, "mstype") == 0)
+ mtransType = defGetTypeName(defel);
+ else if (pg_strcasecmp(defel->defname, "msspace") == 0)
+ mtransSpace = defGetInt32(defel);
else if (pg_strcasecmp(defel->defname, "initcond") == 0)
initval = defGetString(defel);
else if (pg_strcasecmp(defel->defname, "initcond1") == 0)
initval = defGetString(defel);
+ else if (pg_strcasecmp(defel->defname, "minitcond") == 0)
+ minitval = defGetString(defel);
else
ereport(WARNING,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -159,6 +179,46 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
errmsg("aggregate sfunc must be specified")));
/*
+ * if mtransType is given, mtransfuncName and minvtransfuncName must be as
+ * well; if not, then none of the moving-aggregate options should have
+ * been given.
+ */
+ if (mtransType != NULL)
+ {
+ if (mtransfuncName == NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate msfunc must be specified when mstype is specified")));
+ if (minvtransfuncName == NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate minvfunc must be specified when mstype is specified")));
+ }
+ else
+ {
+ if (mtransfuncName != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate msfunc must not be specified without mstype")));
+ if (minvtransfuncName != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate minvfunc must not be specified without mstype")));
+ if (mfinalfuncName != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate mfinalfunc must not be specified without mstype")));
+ if (mtransSpace != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate msspace must not be specified without mstype")));
+ if (minitval != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate minitcond must not be specified without mstype")));
+ }
+
+ /*
* look up the aggregate's input datatype(s).
*/
if (oldstyle)
@@ -251,6 +311,27 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
}
/*
+ * If a moving-aggregate transtype is specified, look that up. Same
+ * restrictions as for transtype.
+ */
+ if (mtransType)
+ {
+ mtransTypeId = typenameTypeId(NULL, mtransType);
+ mtransTypeType = get_typtype(mtransTypeId);
+ if (mtransTypeType == TYPTYPE_PSEUDO &&
+ !IsPolymorphicType(mtransTypeId))
+ {
+ if (mtransTypeId == INTERNALOID && superuser())
+ /* okay */ ;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate transition data type cannot be %s",
+ format_type_be(mtransTypeId))));
+ }
+ }
+
+ /*
* If we have an initval, and it's not for a pseudotype (particularly a
* polymorphic type), make sure it's acceptable to the type's input
* function. We will store the initval as text, because the input
@@ -269,6 +350,18 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
}
/*
+ * Likewise for moving-aggregate initval.
+ */
+ if (minitval && mtransTypeType != TYPTYPE_PSEUDO)
+ {
+ Oid typinput,
+ typioparam;
+
+ getTypeInputInfo(mtransTypeId, &typinput, &typioparam);
+ (void) OidInputFunctionCall(typinput, minitval, typioparam, -1);
+ }
+
+ /*
* Most of the argument-checking is done inside of AggregateCreate
*/
return AggregateCreate(aggName, /* aggregate name */
@@ -284,8 +377,14 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
variadicArgType,
transfuncName, /* step function name */
finalfuncName, /* final function name */
+ mtransfuncName, /* fwd trans function name */
+ minvtransfuncName, /* inv trans function name */
+ mfinalfuncName, /* final function name */
sortoperatorName, /* sort operator name */
transTypeId, /* transition data type */
transSpace, /* transition space */
- initval); /* initial condition */
+ mtransTypeId, /* transition data type */
+ mtransSpace, /* transition space */
+ initval, /* initial condition */
+ minitval); /* initial condition */
}