aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/adt/varlena.c
diff options
context:
space:
mode:
authorDavid Rowley <drowley@postgresql.org>2023-01-23 17:35:01 +1300
committerDavid Rowley <drowley@postgresql.org>2023-01-23 17:35:01 +1300
commit16fd03e956540d1b47b743f6a84f37c54ac93dd4 (patch)
tree5d4e04184fcc5e119b92d48529b60bc160f99633 /src/backend/utils/adt/varlena.c
parent5a3a95385bd5a8f1a4fd50545b7efe9338581899 (diff)
downloadpostgresql-16fd03e956540d1b47b743f6a84f37c54ac93dd4.tar.gz
postgresql-16fd03e956540d1b47b743f6a84f37c54ac93dd4.zip
Allow parallel aggregate on string_agg and array_agg
This adds combine, serial and deserial functions for the array_agg() and string_agg() aggregate functions, thus allowing these aggregates to partake in partial aggregations. This allows both parallel aggregation to take place when these aggregates are present and also allows additional partition-wise aggregation plan shapes to include plans that require additional aggregation once the partially aggregated results from the partitions have been combined. Author: David Rowley Reviewed-by: Andres Freund, Tomas Vondra, Stephen Frost, Tom Lane Discussion: https://postgr.es/m/CAKJS1f9sx_6GTcvd6TMuZnNtCh0VhBzhX6FZqw17TgVFH-ga_A@mail.gmail.com
Diffstat (limited to 'src/backend/utils/adt/varlena.c')
-rw-r--r--src/backend/utils/adt/varlena.c207
1 files changed, 191 insertions, 16 deletions
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 33ffdb013a6..fd81c474745 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -506,29 +506,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
- /* Append the value unless null. */
+ /* Append the value unless null, preceding it with the delimiter. */
if (!PG_ARGISNULL(1))
{
bytea *value = PG_GETARG_BYTEA_PP(1);
+ bool isfirst = false;
- /* On the first time through, we ignore the delimiter. */
+ /*
+ * You might think we can just throw away the first delimiter, however
+ * we must keep it as we may be a parallel worker doing partial
+ * aggregation building a state to send to the main process. We need
+ * to keep the delimiter of every aggregation so that the combine
+ * function can properly join up the strings of two separately
+ * partially aggregated results. The first delimiter is only stripped
+ * off in the final function. To know how much to strip off the front
+ * of the string, we store the length of the first delimiter in the
+ * StringInfo's cursor field, which we don't otherwise need here.
+ */
if (state == NULL)
+ {
state = makeStringAggState(fcinfo);
- else if (!PG_ARGISNULL(2))
+ isfirst = true;
+ }
+
+ if (!PG_ARGISNULL(2))
{
bytea *delim = PG_GETARG_BYTEA_PP(2);
- appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim));
+ appendBinaryStringInfo(state, VARDATA_ANY(delim),
+ VARSIZE_ANY_EXHDR(delim));
+ if (isfirst)
+ state->cursor = VARSIZE_ANY_EXHDR(delim);
}
- appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value));
+ appendBinaryStringInfo(state, VARDATA_ANY(value),
+ VARSIZE_ANY_EXHDR(value));
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
- PG_RETURN_POINTER(state);
+ if (state)
+ PG_RETURN_POINTER(state);
+ PG_RETURN_NULL();
}
Datum
@@ -543,11 +564,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS)
if (state != NULL)
{
+ /* As per comment in transfn, strip data before the cursor position */
bytea *result;
+ int strippedlen = state->len - state->cursor;
- result = (bytea *) palloc(state->len + VARHDRSZ);
- SET_VARSIZE(result, state->len + VARHDRSZ);
- memcpy(VARDATA(result), state->data, state->len);
+ result = (bytea *) palloc(strippedlen + VARHDRSZ);
+ SET_VARSIZE(result, strippedlen + VARHDRSZ);
+ memcpy(VARDATA(result), &state->data[state->cursor], strippedlen);
PG_RETURN_BYTEA_P(result);
}
else
@@ -5372,23 +5395,171 @@ string_agg_transfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
- /* Append the value unless null. */
+ /* Append the value unless null, preceding it with the delimiter. */
if (!PG_ARGISNULL(1))
{
- /* On the first time through, we ignore the delimiter. */
+ text *value = PG_GETARG_TEXT_PP(1);
+ bool isfirst = false;
+
+ /*
+ * You might think we can just throw away the first delimiter, however
+ * we must keep it as we may be a parallel worker doing partial
+ * aggregation building a state to send to the main process. We need
+ * to keep the delimiter of every aggregation so that the combine
+ * function can properly join up the strings of two separately
+ * partially aggregated results. The first delimiter is only stripped
+ * off in the final function. To know how much to strip off the front
+ * of the string, we store the length of the first delimiter in the
+ * StringInfo's cursor field, which we don't otherwise need here.
+ */
if (state == NULL)
+ {
state = makeStringAggState(fcinfo);
- else if (!PG_ARGISNULL(2))
- appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */
+ isfirst = true;
+ }
- appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */
+ if (!PG_ARGISNULL(2))
+ {
+ text *delim = PG_GETARG_TEXT_PP(2);
+
+ appendStringInfoText(state, delim);
+ if (isfirst)
+ state->cursor = VARSIZE_ANY_EXHDR(delim);
+ }
+
+ appendStringInfoText(state, value);
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
- PG_RETURN_POINTER(state);
+ if (state)
+ PG_RETURN_POINTER(state);
+ PG_RETURN_NULL();
+}
+
+/*
+ * string_agg_combine
+ * Aggregate combine function for string_agg(text) and string_agg(bytea)
+ */
+Datum
+string_agg_combine(PG_FUNCTION_ARGS)
+{
+ StringInfo state1;
+ StringInfo state2;
+ MemoryContext agg_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ {
+ /*
+ * NULL state2 is easy, just return state1, which we know is already
+ * in the agg_context
+ */
+ if (state1 == NULL)
+ PG_RETURN_NULL();
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state1 == NULL)
+ {
+ /* We must copy state2's data into the agg_context */
+ MemoryContext old_context;
+
+ old_context = MemoryContextSwitchTo(agg_context);
+ state1 = makeStringAggState(fcinfo);
+ appendBinaryStringInfo(state1, state2->data, state2->len);
+ state1->cursor = state2->cursor;
+ MemoryContextSwitchTo(old_context);
+ }
+ else if (state2->len > 0)
+ {
+ /* Combine ... state1->cursor does not change in this case */
+ appendBinaryStringInfo(state1, state2->data, state2->len);
+ }
+
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * string_agg_serialize
+ * Aggregate serialize function for string_agg(text) and string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_serialize(PG_FUNCTION_ARGS)
+{
+ StringInfo state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (StringInfo) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /* cursor */
+ pq_sendint(&buf, state->cursor, 4);
+
+ /* data */
+ pq_sendbytes(&buf, state->data, state->len);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * string_agg_deserialize
+ * Aggregate deserial function for string_agg(text) and string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ StringInfo result;
+ StringInfoData buf;
+ char *data;
+ int datalen;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ result = makeStringAggState(fcinfo);
+
+ /* cursor */
+ result->cursor = pq_getmsgint(&buf, 4);
+
+ /* data */
+ datalen = VARSIZE_ANY_EXHDR(sstate) - 4;
+ data = (char *) pq_getmsgbytes(&buf, datalen);
+ appendBinaryStringInfo(result, data, datalen);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
}
Datum
@@ -5402,7 +5573,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
if (state != NULL)
- PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len));
+ {
+ /* As per comment in transfn, strip data before the cursor position */
+ PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor],
+ state->len - state->cursor));
+ }
else
PG_RETURN_NULL();
}