diff options
author | David Rowley <drowley@postgresql.org> | 2023-01-23 17:35:01 +1300 |
---|---|---|
committer | David Rowley <drowley@postgresql.org> | 2023-01-23 17:35:01 +1300 |
commit | 16fd03e956540d1b47b743f6a84f37c54ac93dd4 (patch) | |
tree | 5d4e04184fcc5e119b92d48529b60bc160f99633 /src/backend/utils/adt/varlena.c | |
parent | 5a3a95385bd5a8f1a4fd50545b7efe9338581899 (diff) | |
download | postgresql-16fd03e956540d1b47b743f6a84f37c54ac93dd4.tar.gz postgresql-16fd03e956540d1b47b743f6a84f37c54ac93dd4.zip |
Allow parallel aggregate on string_agg and array_agg
This adds combine, serial and deserial functions for the array_agg() and
string_agg() aggregate functions, thus allowing these aggregates to
partake in partial aggregations. This allows both parallel aggregation to
take place when these aggregates are present and also allows additional
partition-wise aggregation plan shapes to include plans that require
additional aggregation once the partially aggregated results from the
partitions have been combined.
Author: David Rowley
Reviewed-by: Andres Freund, Tomas Vondra, Stephen Frost, Tom Lane
Discussion: https://postgr.es/m/CAKJS1f9sx_6GTcvd6TMuZnNtCh0VhBzhX6FZqw17TgVFH-ga_A@mail.gmail.com
Diffstat (limited to 'src/backend/utils/adt/varlena.c')
-rw-r--r-- | src/backend/utils/adt/varlena.c | 207 |
1 files changed, 191 insertions, 16 deletions
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index 33ffdb013a6..fd81c474745 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -506,29 +506,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); - /* Append the value unless null. */ + /* Append the value unless null, preceding it with the delimiter. */ if (!PG_ARGISNULL(1)) { bytea *value = PG_GETARG_BYTEA_PP(1); + bool isfirst = false; - /* On the first time through, we ignore the delimiter. */ + /* + * You might think we can just throw away the first delimiter, however + * we must keep it as we may be a parallel worker doing partial + * aggregation building a state to send to the main process. We need + * to keep the delimiter of every aggregation so that the combine + * function can properly join up the strings of two separately + * partially aggregated results. The first delimiter is only stripped + * off in the final function. To know how much to strip off the front + * of the string, we store the length of the first delimiter in the + * StringInfo's cursor field, which we don't otherwise need here. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) + isfirst = true; + } + + if (!PG_ARGISNULL(2)) { bytea *delim = PG_GETARG_BYTEA_PP(2); - appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim)); + appendBinaryStringInfo(state, VARDATA_ANY(delim), + VARSIZE_ANY_EXHDR(delim)); + if (isfirst) + state->cursor = VARSIZE_ANY_EXHDR(delim); } - appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value)); + appendBinaryStringInfo(state, VARDATA_ANY(value), + VARSIZE_ANY_EXHDR(value)); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ - PG_RETURN_POINTER(state); + if (state) + PG_RETURN_POINTER(state); + PG_RETURN_NULL(); } Datum @@ -543,11 +564,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS) if (state != NULL) { + /* As per comment in transfn, strip data before the cursor position */ bytea *result; + int strippedlen = state->len - state->cursor; - result = (bytea *) palloc(state->len + VARHDRSZ); - SET_VARSIZE(result, state->len + VARHDRSZ); - memcpy(VARDATA(result), state->data, state->len); + result = (bytea *) palloc(strippedlen + VARHDRSZ); + SET_VARSIZE(result, strippedlen + VARHDRSZ); + memcpy(VARDATA(result), &state->data[state->cursor], strippedlen); PG_RETURN_BYTEA_P(result); } else @@ -5372,23 +5395,171 @@ string_agg_transfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); - /* Append the value unless null. */ + /* Append the value unless null, preceding it with the delimiter. */ if (!PG_ARGISNULL(1)) { - /* On the first time through, we ignore the delimiter. */ + text *value = PG_GETARG_TEXT_PP(1); + bool isfirst = false; + + /* + * You might think we can just throw away the first delimiter, however + * we must keep it as we may be a parallel worker doing partial + * aggregation building a state to send to the main process. We need + * to keep the delimiter of every aggregation so that the combine + * function can properly join up the strings of two separately + * partially aggregated results. The first delimiter is only stripped + * off in the final function. To know how much to strip off the front + * of the string, we store the length of the first delimiter in the + * StringInfo's cursor field, which we don't otherwise need here. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) - appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */ + isfirst = true; + } - appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */ + if (!PG_ARGISNULL(2)) + { + text *delim = PG_GETARG_TEXT_PP(2); + + appendStringInfoText(state, delim); + if (isfirst) + state->cursor = VARSIZE_ANY_EXHDR(delim); + } + + appendStringInfoText(state, value); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ - PG_RETURN_POINTER(state); + if (state) + PG_RETURN_POINTER(state); + PG_RETURN_NULL(); +} + +/* + * string_agg_combine + * Aggregate combine function for string_agg(text) and string_agg(bytea) + */ +Datum +string_agg_combine(PG_FUNCTION_ARGS) +{ + StringInfo state1; + StringInfo state2; + MemoryContext agg_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1); + + if (state2 == NULL) + { + /* + * NULL state2 is easy, just return state1, which we know is already + * in the agg_context + */ + if (state1 == NULL) + PG_RETURN_NULL(); + PG_RETURN_POINTER(state1); + } + + if (state1 == NULL) + { + /* We must copy state2's data into the agg_context */ + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(agg_context); + state1 = makeStringAggState(fcinfo); + appendBinaryStringInfo(state1, state2->data, state2->len); + state1->cursor = state2->cursor; + MemoryContextSwitchTo(old_context); + } + else if (state2->len > 0) + { + /* Combine ... state1->cursor does not change in this case */ + appendBinaryStringInfo(state1, state2->data, state2->len); + } + + PG_RETURN_POINTER(state1); +} + +/* + * string_agg_serialize + * Aggregate serialize function for string_agg(text) and string_agg(bytea) + * + * This is strict, so we need not handle NULL input + */ +Datum +string_agg_serialize(PG_FUNCTION_ARGS) +{ + StringInfo state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (StringInfo) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* cursor */ + pq_sendint(&buf, state->cursor, 4); + + /* data */ + pq_sendbytes(&buf, state->data, state->len); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +/* + * string_agg_deserialize + * Aggregate deserial function for string_agg(text) and string_agg(bytea) + * + * This is strict, so we need not handle NULL input + */ +Datum +string_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + StringInfo result; + StringInfoData buf; + char *data; + int datalen; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeStringAggState(fcinfo); + + /* cursor */ + result->cursor = pq_getmsgint(&buf, 4); + + /* data */ + datalen = VARSIZE_ANY_EXHDR(sstate) - 4; + data = (char *) pq_getmsgbytes(&buf, datalen); + appendBinaryStringInfo(result, data, datalen); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); } Datum @@ -5402,7 +5573,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); if (state != NULL) - PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len)); + { + /* As per comment in transfn, strip data before the cursor position */ + PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor], + state->len - state->cursor)); + } else PG_RETURN_NULL(); } |