diff options
-rw-r--r-- | doc/src/sgml/ref/create_aggregate.sgml | 9 | ||||
-rw-r--r-- | src/backend/catalog/pg_aggregate.c | 5 | ||||
-rw-r--r-- | src/backend/commands/aggregatecmds.c | 21 | ||||
-rw-r--r-- | src/backend/commands/functioncmds.c | 5 | ||||
-rw-r--r-- | src/backend/optimizer/util/clauses.c | 7 | ||||
-rw-r--r-- | src/include/catalog/pg_aggregate.h | 3 | ||||
-rw-r--r-- | src/test/regress/expected/create_aggregate.out | 12 | ||||
-rw-r--r-- | src/test/regress/sql/create_aggregate.sql | 12 |
8 files changed, 63 insertions, 11 deletions
diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml index 7a6f8a97fda..3df330393de 100644 --- a/doc/src/sgml/ref/create_aggregate.sgml +++ b/doc/src/sgml/ref/create_aggregate.sgml @@ -40,6 +40,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea [ , MFINALFUNC_EXTRA ] [ , MINITCOND = <replaceable class="PARAMETER">minitial_condition</replaceable> ] [ , SORTOP = <replaceable class="PARAMETER">sort_operator</replaceable> ] + [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ] ) CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replaceable class="parameter">argmode</replaceable> ] [ <replaceable class="parameter">argname</replaceable> ] <replaceable class="parameter">arg_data_type</replaceable> [ , ... ] ] @@ -55,6 +56,8 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replac [ , SERIALTYPE = <replaceable class="PARAMETER">serialtype</replaceable> ] [ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ] [ , HYPOTHETICAL ] + [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ] + ) <phrase>or the old syntax</phrase> @@ -684,6 +687,12 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; Currently, ordered-set aggregates do not need to support moving-aggregate mode, since they cannot be used as window functions. </para> + + <para> + The meaning of <literal>PARALLEL SAFE</>, <literal>PARALLEL RESTRICTED</>, + and <literal>PARALLEL UNSAFE</> is the same as for + <xref linkend="sql-createfunction">. + </para> </refsect1> <refsect1> diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index b420349835b..bcc941104f5 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -72,7 +72,8 @@ AggregateCreate(const char *aggName, Oid aggmTransType, int32 aggmTransSpace, const char *agginitval, - const char *aggminitval) + const char *aggminitval, + char proparallel) { Relation aggdesc; HeapTuple tup; @@ -622,7 +623,7 @@ AggregateCreate(const char *aggName, false, /* isStrict (not needed for agg) */ PROVOLATILE_IMMUTABLE, /* volatility (not * needed for agg) */ - PROPARALLEL_UNSAFE, + proparallel, parameterTypes, /* paramTypes */ allParameterTypes, /* allParamTypes */ parameterModes, /* parameterModes */ diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index 3424f842b9c..5c4d576b866 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -78,6 +78,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, int32 mtransSpace = 0; char *initval = NULL; char *minitval = NULL; + char *parallel = NULL; int numArgs; int numDirectArgs = 0; oidvector *parameterTypes; @@ -91,6 +92,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, Oid mtransTypeId = InvalidOid; char transTypeType; char mtransTypeType = 0; + char proparallel = PROPARALLEL_UNSAFE; ListCell *pl; /* Convert list of names to a name and namespace */ @@ -178,6 +180,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, initval = defGetString(defel); else if (pg_strcasecmp(defel->defname, "minitcond") == 0) minitval = defGetString(defel); + else if (pg_strcasecmp(defel->defname, "parallel") == 0) + parallel = defGetString(defel); else ereport(WARNING, (errcode(ERRCODE_SYNTAX_ERROR), @@ -449,6 +453,20 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, (void) OidInputFunctionCall(typinput, minitval, typioparam, -1); } + if (parallel) + { + if (pg_strcasecmp(parallel, "safe") == 0) + proparallel = PROPARALLEL_SAFE; + else if (pg_strcasecmp(parallel, "restricted") == 0) + proparallel = PROPARALLEL_RESTRICTED; + else if (pg_strcasecmp(parallel, "unsafe") == 0) + proparallel = PROPARALLEL_UNSAFE; + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE"))); + } + /* * Most of the argument-checking is done inside of AggregateCreate */ @@ -480,5 +498,6 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, mtransTypeId, /* transition data type */ mtransSpace, /* transition space */ initval, /* initial condition */ - minitval); /* initial condition */ + minitval, /* initial condition */ + proparallel); /* parallel safe? */ } diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index a745d73c7a5..748c8f75d48 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -566,9 +566,8 @@ interpret_func_parallel(DefElem *defel) else { ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("parallel option \"%s\" not recognized", - str))); + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE"))); return PROPARALLEL_UNSAFE; /* keep compiler quiet */ } } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index c615717dea3..5674a73dfe0 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -1419,6 +1419,13 @@ has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *context) if (parallel_too_dangerous(func_parallel(expr->funcid), context)) return true; } + else if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + + if (parallel_too_dangerous(func_parallel(aggref->aggfnoid), context)) + return true; + } else if (IsA(node, OpExpr)) { OpExpr *expr = (OpExpr *) node; diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 101d073a04a..7d5015a1cf3 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -349,6 +349,7 @@ extern ObjectAddress AggregateCreate(const char *aggName, Oid aggmTransType, int32 aggmTransSpace, const char *agginitval, - const char *aggminitval); + const char *aggminitval, + char proparallel); #endif /* PG_AGGREGATE_H */ diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out index dac26982bca..1aba0c62669 100644 --- a/src/test/regress/expected/create_aggregate.out +++ b/src/test/regress/expected/create_aggregate.out @@ -20,9 +20,9 @@ CREATE AGGREGATE newsum ( -- zero-argument aggregate CREATE AGGREGATE newcnt (*) ( sfunc = int8inc, stype = int8, - initcond = '0' + initcond = '0', parallel = safe ); --- old-style spelling of same +-- old-style spelling of same (except without parallel-safe; that's too new) CREATE AGGREGATE oldcnt ( sfunc = int8inc, basetype = 'ANY', stype = int8, initcond = '0' @@ -188,6 +188,14 @@ WHERE aggfnoid = 'myavg'::REGPROC; (1 row) DROP AGGREGATE myavg (numeric); +-- invalid: bad parallel-safety marking +CREATE AGGREGATE mysum (int) +( + stype = int, + sfunc = int4pl, + parallel = pear +); +ERROR: parameter "parallel" must be SAFE, RESTRICTED, or UNSAFE -- invalid: nonstrict inverse with strict forward function CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS $$ SELECT $1 - $2; $$ diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql index a7da31e5943..c98c154a829 100644 --- a/src/test/regress/sql/create_aggregate.sql +++ b/src/test/regress/sql/create_aggregate.sql @@ -23,10 +23,10 @@ CREATE AGGREGATE newsum ( -- zero-argument aggregate CREATE AGGREGATE newcnt (*) ( sfunc = int8inc, stype = int8, - initcond = '0' + initcond = '0', parallel = safe ); --- old-style spelling of same +-- old-style spelling of same (except without parallel-safe; that's too new) CREATE AGGREGATE oldcnt ( sfunc = int8inc, basetype = 'ANY', stype = int8, initcond = '0' @@ -201,6 +201,14 @@ WHERE aggfnoid = 'myavg'::REGPROC; DROP AGGREGATE myavg (numeric); +-- invalid: bad parallel-safety marking +CREATE AGGREGATE mysum (int) +( + stype = int, + sfunc = int4pl, + parallel = pear +); + -- invalid: nonstrict inverse with strict forward function CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS |