diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2005-11-17 22:14:56 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2005-11-17 22:14:56 +0000 |
commit | cecb6075594a407b7adcd9c9a0c243ca4b43c9a3 (patch) | |
tree | d3febb775476b082255aa6122b0ba80a8ba79b37 /src/backend/utils/adt/arrayfuncs.c | |
parent | c859308aba7edef428994e6de90ff35f35a328c5 (diff) | |
download | postgresql-cecb6075594a407b7adcd9c9a0c243ca4b43c9a3.tar.gz postgresql-cecb6075594a407b7adcd9c9a0c243ca4b43c9a3.zip |
Make SQL arrays support null elements. This commit fixes the core array
functionality, but I still need to make another pass looking at places
that incidentally use arrays (such as ACL manipulation) to make sure they
are null-safe. Contrib needs work too.
I have not changed the behaviors that are still under discussion about
array comparison and what to do with lower bounds.
Diffstat (limited to 'src/backend/utils/adt/arrayfuncs.c')
-rw-r--r-- | src/backend/utils/adt/arrayfuncs.c | 1932 |
1 files changed, 1363 insertions, 569 deletions
diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c index 5304d47fa8a..3818b181904 100644 --- a/src/backend/utils/adt/arrayfuncs.c +++ b/src/backend/utils/adt/arrayfuncs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.123 2005/10/15 02:49:27 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.124 2005/11/17 22:14:52 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -31,84 +31,73 @@ #include "utils/typcache.h" -/*---------- - * A standard varlena array has the following internal structure: - * <size> - total number of bytes (also, TOAST info flags) - * <ndim> - number of dimensions of the array - * <flags> - bit mask of flags - * <elemtype> - element type OID - * <dim> - size of each array axis (C array of int) - * <dim_lower> - lower boundary of each dimension (C array of int) - * <actual data> - whatever is the stored data - * The actual data starts on a MAXALIGN boundary. Individual items in the - * array are aligned as specified by the array element type. - * - * NOTE: it is important that array elements of toastable datatypes NOT be - * toasted, since the tupletoaster won't know they are there. (We could - * support compressed toasted items; only out-of-line items are dangerous. - * However, it seems preferable to store such items uncompressed and allow - * the toaster to compress the whole array as one input.) - * - * There is currently no support for NULL elements in arrays, either. - * A reasonable (and backwards-compatible) way to add support would be to - * add a nulls bitmap following the <dim_lower> array, which would be present - * if needed; and its presence would be signaled by a bit in the flags word. - * - * - * There are also some "fixed-length array" datatypes, such as NAME and - * POINT. These are simply a sequence of a fixed number of items each - * of a fixed-length datatype, with no overhead; the item size must be - * a multiple of its alignment requirement, because we do no padding. - * We support subscripting on these types, but array_in() and array_out() - * only work with varlena arrays. - *---------- +/* + * GUC parameter */ +bool Array_nulls = true; - -/* ---------- +/* * Local definitions - * ---------- */ #define ASSGN "=" -#define RETURN_NULL(type) do { *isNull = true; return (type) 0; } while (0) +typedef enum +{ + ARRAY_NO_LEVEL, + ARRAY_LEVEL_STARTED, + ARRAY_ELEM_STARTED, + ARRAY_ELEM_COMPLETED, + ARRAY_QUOTED_ELEM_STARTED, + ARRAY_QUOTED_ELEM_COMPLETED, + ARRAY_ELEM_DELIMITED, + ARRAY_LEVEL_COMPLETED, + ARRAY_LEVEL_DELIMITED +} ArrayParseState; -static int ArrayCount(char *str, int *dim, char typdelim); -static Datum *ReadArrayStr(char *arrayStr, const char *origStr, +static int ArrayCount(const char *str, int *dim, char typdelim); +static void ReadArrayStr(char *arrayStr, const char *origStr, int nitems, int ndim, int *dim, FmgrInfo *inputproc, Oid typioparam, int32 typmod, char typdelim, int typlen, bool typbyval, char typalign, - int *nbytes); -static Datum *ReadArrayBinary(StringInfo buf, int nitems, + Datum *values, bool *nulls, + bool *hasnulls, int32 *nbytes); +static void ReadArrayBinary(StringInfo buf, int nitems, FmgrInfo *receiveproc, Oid typioparam, int32 typmod, int typlen, bool typbyval, char typalign, - int *nbytes); -static void CopyArrayEls(char *p, Datum *values, int nitems, - int typlen, bool typbyval, char typalign, - bool freedata); + Datum *values, bool *nulls, + bool *hasnulls, int32 *nbytes); +static void CopyArrayEls(ArrayType *array, + Datum *values, bool *nulls, int nitems, + int typlen, bool typbyval, char typalign, + bool freedata); +static bool array_get_isnull(const bits8 *nullbitmap, int offset); +static void array_set_isnull(bits8 *nullbitmap, int offset, bool isNull); static Datum ArrayCast(char *value, bool byval, int len); static int ArrayCastAndSet(Datum src, int typlen, bool typbyval, char typalign, char *dest); -static int array_nelems_size(char *ptr, int nitems, - int typlen, bool typbyval, char typalign); -static char *array_seek(char *ptr, int nitems, - int typlen, bool typbyval, char typalign); -static int array_copy(char *destptr, int nitems, char *srcptr, - int typlen, bool typbyval, char typalign); -static int array_slice_size(int ndim, int *dim, int *lb, char *arraydataptr, - int *st, int *endp, - int typlen, bool typbyval, char typalign); -static void array_extract_slice(int ndim, int *dim, int *lb, - char *arraydataptr, - int *st, int *endp, char *destPtr, - int typlen, bool typbyval, char typalign); -static void array_insert_slice(int ndim, int *dim, int *lb, - char *origPtr, int origdatasize, - char *destPtr, - int *st, int *endp, char *srcPtr, - int typlen, bool typbyval, char typalign); +static char *array_seek(char *ptr, int offset, bits8 *nullbitmap, int nitems, + int typlen, bool typbyval, char typalign); +static int array_nelems_size(char *ptr, int offset, bits8 *nullbitmap, + int nitems, int typlen, bool typbyval, char typalign); +static int array_copy(char *destptr, int nitems, + char *srcptr, int offset, bits8 *nullbitmap, + int typlen, bool typbyval, char typalign); +static int array_slice_size(char *arraydataptr, bits8 *arraynullsptr, + int ndim, int *dim, int *lb, + int *st, int *endp, + int typlen, bool typbyval, char typalign); +static void array_extract_slice(ArrayType *newarray, + int ndim, int *dim, int *lb, + char *arraydataptr, bits8 *arraynullsptr, + int *st, int *endp, + int typlen, bool typbyval, char typalign); +static void array_insert_slice(ArrayType *destArray, ArrayType *origArray, + ArrayType *srcArray, + int ndim, int *dim, int *lb, + int *st, int *endp, + int typlen, bool typbyval, char typalign); static int array_cmp(FunctionCallInfo fcinfo); static Datum array_type_length_coerce_internal(ArrayType *src, int32 desttypmod, @@ -116,13 +105,13 @@ static Datum array_type_length_coerce_internal(ArrayType *src, FmgrInfo *fmgr_info); -/*--------------------------------------------------------------------- +/* * array_in : * converts an array from the external format in "string" to * its internal format. + * * return value : * the internal representation of the input array - *-------------------------------------------------------------------- */ Datum array_in(PG_FUNCTION_ARGS) @@ -140,8 +129,11 @@ array_in(PG_FUNCTION_ARGS) *p; int i, nitems; - int32 nbytes; Datum *dataPtr; + bool *nullsPtr; + bool hasnulls; + int32 nbytes; + int32 dataoffset; ArrayType *retval; int ndim, dim[MAXDIM], @@ -189,8 +181,8 @@ array_in(PG_FUNCTION_ARGS) * Otherwise, we require the input to be in curly-brace style, and we * prescan the input to determine dimensions. * - * Dimension info takes the form of one or more [n] or [m:n] items. The outer - * loop iterates once per dimension item. + * Dimension info takes the form of one or more [n] or [m:n] items. + * The outer loop iterates once per dimension item. */ p = string_save; ndim = 0; @@ -310,60 +302,60 @@ array_in(PG_FUNCTION_ARGS) printf(") for %s\n", string); #endif + /* This checks for overflow of the array dimensions */ nitems = ArrayGetNItems(ndim, dim); + /* Empty array? */ if (nitems == 0) + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); + + dataPtr = (Datum *) palloc(nitems * sizeof(Datum)); + nullsPtr = (bool *) palloc(nitems * sizeof(bool)); + ReadArrayStr(p, string, + nitems, ndim, dim, + &my_extra->proc, typioparam, typmod, + typdelim, + typlen, typbyval, typalign, + dataPtr, nullsPtr, + &hasnulls, &nbytes); + if (hasnulls) { - /* Return empty array */ - retval = (ArrayType *) palloc0(sizeof(ArrayType)); - retval->size = sizeof(ArrayType); - retval->elemtype = element_type; - PG_RETURN_ARRAYTYPE_P(retval); + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems); + nbytes += dataoffset; } - - if (*p != '{') - ereport(ERROR, - (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), - errmsg("missing left brace"))); - dataPtr = ReadArrayStr(p, string, - nitems, ndim, dim, &my_extra->proc, typioparam, - typmod, typdelim, typlen, typbyval, typalign, - &nbytes); - nbytes += ARR_OVERHEAD(ndim); - retval = (ArrayType *) palloc0(nbytes); + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + retval = (ArrayType *) palloc(nbytes); retval->size = nbytes; retval->ndim = ndim; + retval->dataoffset = dataoffset; retval->elemtype = element_type; memcpy(ARR_DIMS(retval), dim, ndim * sizeof(int)); memcpy(ARR_LBOUND(retval), lBound, ndim * sizeof(int)); - CopyArrayEls(ARR_DATA_PTR(retval), dataPtr, nitems, - typlen, typbyval, typalign, true); + CopyArrayEls(retval, + dataPtr, nullsPtr, nitems, + typlen, typbyval, typalign, + true); + pfree(dataPtr); + pfree(nullsPtr); pfree(string_save); + PG_RETURN_ARRAYTYPE_P(retval); } -/*----------------------------------------------------------------------------- +/* * ArrayCount - * Counts the number of dimensions and the *dim array for an array string. - * The syntax for array input is C-like nested curly braces - *----------------------------------------------------------------------------- + * Determines the dimensions for an array string. + * + * Returns number of dimensions as function result. The axis lengths are + * returned in dim[], which must be of size MAXDIM. */ -typedef enum -{ - ARRAY_NO_LEVEL, - ARRAY_LEVEL_STARTED, - ARRAY_ELEM_STARTED, - ARRAY_ELEM_COMPLETED, - ARRAY_QUOTED_ELEM_STARTED, - ARRAY_QUOTED_ELEM_COMPLETED, - ARRAY_ELEM_DELIMITED, - ARRAY_LEVEL_COMPLETED, - ARRAY_LEVEL_DELIMITED -} ArrayParseState; - static int -ArrayCount(char *str, int *dim, char typdelim) +ArrayCount(const char *str, int *dim, char typdelim) { int nest_level = 0, i; @@ -374,7 +366,7 @@ ArrayCount(char *str, int *dim, char typdelim) bool in_quotes = false; bool eoArray = false; bool empty_array = true; - char *ptr; + const char *ptr; ArrayParseState parse_state = ARRAY_NO_LEVEL; for (i = 0; i < MAXDIM; ++i) @@ -383,10 +375,6 @@ ArrayCount(char *str, int *dim, char typdelim) nelems_last[i] = nelems[i] = 1; } - /* special case for an empty array */ - if (strcmp(str, "{}") == 0) - return 0; - ptr = str; while (!eoArray) { @@ -588,24 +576,35 @@ ArrayCount(char *str, int *dim, char typdelim) return ndim; } -/*--------------------------------------------------------------------------- +/* * ReadArrayStr : - * parses the array string pointed by "arrayStr" and converts it to - * internal format. The external format expected is like C array - * declaration. Unspecified elements are initialized to zero for fixed length - * base types and to empty varlena structures for variable length base - * types. (This is pretty bogus; NULL would be much safer.) + * parses the array string pointed to by "arrayStr" and converts the values + * to internal format. Unspecified elements are initialized to nulls. + * The array dimensions must already have been determined. * - * result : - * returns a palloc'd array of Datum representations of the array elements. - * If element type is pass-by-ref, the Datums point to palloc'd values. - * *nbytes is set to the amount of data space needed for the array, - * including alignment padding but not including array header overhead. + * Inputs: + * arrayStr: the string to parse. + * CAUTION: the contents of "arrayStr" will be modified! + * origStr: the unmodified input string, used only in error messages. + * nitems: total number of array elements, as already determined. + * ndim: number of array dimensions + * dim[]: array axis lengths + * inputproc: type-specific input procedure for element datatype. + * typioparam, typmod: auxiliary values to pass to inputproc. + * typdelim: the value delimiter (type-specific). + * typlen, typbyval, typalign: storage parameters of element datatype. * - * CAUTION: the contents of "arrayStr" will be modified! - *--------------------------------------------------------------------------- + * Outputs: + * values[]: filled with converted data values. + * nulls[]: filled with is-null markers. + * *hasnulls: set TRUE iff there are any null elements. + * *nbytes: set to total size of data area needed (including alignment + * padding but not including array header overhead). + * + * Note that values[] and nulls[] are allocated by the caller, and must have + * nitems elements. */ -static Datum * +static void ReadArrayStr(char *arrayStr, const char *origStr, int nitems, @@ -618,31 +617,36 @@ ReadArrayStr(char *arrayStr, int typlen, bool typbyval, char typalign, - int *nbytes) + Datum *values, + bool *nulls, + bool *hasnulls, + int32 *nbytes) { int i, nest_level = 0; - Datum *values; char *srcptr; bool in_quotes = false; bool eoArray = false; - int totbytes; + bool hasnull; + int32 totbytes; int indx[MAXDIM], prod[MAXDIM]; mda_get_prod(ndim, dim, prod); - values = (Datum *) palloc0(nitems * sizeof(Datum)); MemSet(indx, 0, sizeof(indx)); + /* Initialize is-null markers to true */ + memset(nulls, true, nitems * sizeof(bool)); + /* * We have to remove " and \ characters to create a clean item value to * pass to the datatype input routine. We overwrite each item value * in-place within arrayStr to do this. srcptr is the current scan point, * and dstptr is where we are copying to. * - * We also want to suppress leading and trailing unquoted whitespace. We use - * the leadingspace flag to suppress leading space. Trailing space is - * tracked by using dstendptr to point to the last significant output + * We also want to suppress leading and trailing unquoted whitespace. + * We use the leadingspace flag to suppress leading space. Trailing space + * is tracked by using dstendptr to point to the last significant output * character. * * The error checking in this routine is mostly pro-forma, since we expect @@ -653,6 +657,7 @@ ReadArrayStr(char *arrayStr, { bool itemdone = false; bool leadingspace = true; + bool hasquoting = false; char *itemstart; char *dstptr; char *dstendptr; @@ -683,6 +688,7 @@ ReadArrayStr(char *arrayStr, /* Treat the escaped character as non-whitespace */ leadingspace = false; dstendptr = dstptr; + hasquoting = true; /* can't be a NULL marker */ break; case '\"': in_quotes = !in_quotes; @@ -697,6 +703,7 @@ ReadArrayStr(char *arrayStr, */ dstendptr = dstptr; } + hasquoting = true; /* can't be a NULL marker */ srcptr++; break; case '{': @@ -776,66 +783,57 @@ ReadArrayStr(char *arrayStr, errmsg("malformed array literal: \"%s\"", origStr))); - values[i] = FunctionCall3(inputproc, - CStringGetDatum(itemstart), - ObjectIdGetDatum(typioparam), - Int32GetDatum(typmod)); + if (Array_nulls && !hasquoting && + pg_strcasecmp(itemstart, "NULL") == 0) + { + /* it's a NULL item */ + nulls[i] = true; + } + else + { + values[i] = FunctionCall3(inputproc, + CStringGetDatum(itemstart), + ObjectIdGetDatum(typioparam), + Int32GetDatum(typmod)); + nulls[i] = false; + } } /* - * Initialize any unset items and compute total data space needed + * Check for nulls, compute total data space needed */ - if (typlen > 0) - { - totbytes = nitems * att_align(typlen, typalign); - if (!typbyval) - for (i = 0; i < nitems; i++) - if (values[i] == (Datum) 0) - values[i] = PointerGetDatum(palloc0(typlen)); - } - else + hasnull = false; + totbytes = 0; + for (i = 0; i < nitems; i++) { - Assert(!typbyval); - totbytes = 0; - for (i = 0; i < nitems; i++) + if (nulls[i]) + hasnull = true; + else { - if (values[i] != (Datum) 0) - { - /* let's just make sure data is not toasted */ - if (typlen == -1) - values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i])); - totbytes = att_addlength(totbytes, typlen, values[i]); - totbytes = att_align(totbytes, typalign); - } - else if (typlen == -1) - { - /* dummy varlena value (XXX bogus, see notes above) */ - values[i] = PointerGetDatum(palloc(sizeof(int32))); - VARATT_SIZEP(DatumGetPointer(values[i])) = sizeof(int32); - totbytes += sizeof(int32); - totbytes = att_align(totbytes, typalign); - } - else - { - /* dummy cstring value */ - Assert(typlen == -2); - values[i] = PointerGetDatum(palloc(1)); - *((char *) DatumGetPointer(values[i])) = '\0'; - totbytes += 1; - totbytes = att_align(totbytes, typalign); - } + /* let's just make sure data is not toasted */ + if (typlen == -1) + values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i])); + totbytes = att_addlength(totbytes, typlen, values[i]); + totbytes = att_align(totbytes, typalign); + /* check for overflow of total request */ + if (!AllocSizeIsValid(totbytes)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("array size exceeds the maximum allowed (%d)", + (int) MaxAllocSize))); } } + *hasnulls = hasnull; *nbytes = totbytes; - return values; } -/*---------- +/* * Copy data into an array object from a temporary array of Datums. * - * p: pointer to start of array data area + * array: array object (with header fields already filled in) * values: array of Datums to be copied + * nulls: array of is-null flags (can be NULL if no nulls) * nitems: number of Datums to be copied * typbyval, typlen, typalign: info about element datatype * freedata: if TRUE and element type is pass-by-ref, pfree data values @@ -844,17 +842,21 @@ ReadArrayStr(char *arrayStr, * If the input data is of varlena type, the caller must have ensured that * the values are not toasted. (Doing it here doesn't work since the * caller has already allocated space for the array...) - *---------- */ static void -CopyArrayEls(char *p, +CopyArrayEls(ArrayType *array, Datum *values, + bool *nulls, int nitems, int typlen, bool typbyval, char typalign, bool freedata) { + char *p = ARR_DATA_PTR(array); + bits8 *bitmap = ARR_NULLBITMAP(array); + int bitval = 0; + int bitmask = 1; int i; if (typbyval) @@ -862,23 +864,45 @@ CopyArrayEls(char *p, for (i = 0; i < nitems; i++) { - p += ArrayCastAndSet(values[i], typlen, typbyval, typalign, p); - if (freedata) - pfree(DatumGetPointer(values[i])); + if (nulls && nulls[i]) + { + if (!bitmap) /* shouldn't happen */ + elog(ERROR, "null array element where not supported"); + /* bitmap bit stays 0 */ + } + else + { + bitval |= bitmask; + p += ArrayCastAndSet(values[i], typlen, typbyval, typalign, p); + if (freedata) + pfree(DatumGetPointer(values[i])); + } + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + *bitmap++ = bitval; + bitval = 0; + bitmask = 1; + } + } } + + if (bitmap && bitmask != 1) + *bitmap = bitval; } -/*------------------------------------------------------------------------- +/* * array_out : * takes the internal representation of an array and returns a string * containing the array in its external format. - *------------------------------------------------------------------------- */ Datum array_out(PG_FUNCTION_ARGS) { ArrayType *v = PG_GETARG_ARRAYTYPE_P(0); - Oid element_type; + Oid element_type = ARR_ELEMTYPE(v); int typlen; bool typbyval; char typalign; @@ -887,13 +911,14 @@ array_out(PG_FUNCTION_ARGS) *tmp, *retval, **values, - + dims_str[(MAXDIM * 33) + 2]; /* * 33 per dim since we assume 15 digits per number + ':' +'[]' * * +2 allows for assignment operator + trailing null */ - dims_str[(MAXDIM * 33) + 2]; + bits8 *bitmap; + int bitmask; bool *needquotes, needdims = false; int nitems, @@ -907,8 +932,6 @@ array_out(PG_FUNCTION_ARGS) *lb; ArrayMetaState *my_extra; - element_type = ARR_ELEMTYPE(v); - /* * We arrange to look up info about element type, including its output * conversion proc, only once per series of calls, assuming the element @@ -920,7 +943,7 @@ array_out(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, sizeof(ArrayMetaState)); my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra; - my_extra->element_type = InvalidOid; + my_extra->element_type = ~element_type; } if (my_extra->element_type != element_type) @@ -972,41 +995,57 @@ array_out(PG_FUNCTION_ARGS) */ values = (char **) palloc(nitems * sizeof(char *)); needquotes = (bool *) palloc(nitems * sizeof(bool)); - p = ARR_DATA_PTR(v); overall_length = 1; /* don't forget to count \0 at end. */ + p = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + for (i = 0; i < nitems; i++) { - Datum itemvalue; bool needquote; - itemvalue = fetch_att(p, typbyval, typlen); - values[i] = DatumGetCString(FunctionCall1(&my_extra->proc, - itemvalue)); - p = att_addlength(p, typlen, PointerGetDatum(p)); - p = (char *) att_align(p, typalign); - - /* count data plus backslashes; detect chars needing quotes */ - if (values[i][0] == '\0') - needquote = true; /* force quotes for empty string */ - else + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + values[i] = pstrdup("NULL"); + overall_length += 4; needquote = false; - - for (tmp = values[i]; *tmp != '\0'; tmp++) + } + else { - char ch = *tmp; + Datum itemvalue; + + itemvalue = fetch_att(p, typbyval, typlen); + values[i] = DatumGetCString(FunctionCall1(&my_extra->proc, + itemvalue)); + p = att_addlength(p, typlen, PointerGetDatum(p)); + p = (char *) att_align(p, typalign); + + /* count data plus backslashes; detect chars needing quotes */ + if (values[i][0] == '\0') + needquote = true; /* force quotes for empty string */ + else if (pg_strcasecmp(values[i], "NULL") == 0) + needquote = true; /* force quotes for literal NULL */ + else + needquote = false; - overall_length += 1; - if (ch == '"' || ch == '\\') + for (tmp = values[i]; *tmp != '\0'; tmp++) { - needquote = true; -#ifndef TCL_ARRAYS + char ch = *tmp; + overall_length += 1; + if (ch == '"' || ch == '\\') + { + needquote = true; +#ifndef TCL_ARRAYS + overall_length += 1; #endif + } + else if (ch == '{' || ch == '}' || ch == typdelim || + isspace((unsigned char) ch)) + needquote = true; } - else if (ch == '{' || ch == '}' || ch == typdelim || - isspace((unsigned char) ch)) - needquote = true; } needquotes[i] = needquote; @@ -1014,9 +1053,19 @@ array_out(PG_FUNCTION_ARGS) /* Count the pair of double quotes, if needed */ if (needquote) overall_length += 2; - /* and the comma */ overall_length += 1; + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } } /* @@ -1104,13 +1153,13 @@ array_out(PG_FUNCTION_ARGS) PG_RETURN_CSTRING(retval); } -/*--------------------------------------------------------------------- +/* * array_recv : * converts an array from the external binary format to * its internal format. + * * return value : * the internal representation of the input array - *-------------------------------------------------------------------- */ Datum array_recv(PG_FUNCTION_ARGS) @@ -1126,8 +1175,11 @@ array_recv(PG_FUNCTION_ARGS) Oid typioparam; int i, nitems; - int32 nbytes; Datum *dataPtr; + bool *nullsPtr; + bool hasnulls; + int32 nbytes; + int32 dataoffset; ArrayType *retval; int ndim, flags, @@ -1148,7 +1200,7 @@ array_recv(PG_FUNCTION_ARGS) ndim, MAXDIM))); flags = pq_getmsgint(buf, 4); - if (flags != 0) + if (flags != 0 && flags != 1) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("invalid array flags"))); @@ -1167,6 +1219,8 @@ array_recv(PG_FUNCTION_ARGS) dim[i] = pq_getmsgint(buf, 4); lBound[i] = pq_getmsgint(buf, 4); } + + /* This checks for overflow of array dimensions */ nitems = ArrayGetNItems(ndim, dim); /* @@ -1203,10 +1257,7 @@ array_recv(PG_FUNCTION_ARGS) if (nitems == 0) { /* Return empty array ... but not till we've validated element_type */ - retval = (ArrayType *) palloc0(sizeof(ArrayType)); - retval->size = sizeof(ArrayType); - retval->elemtype = element_type; - PG_RETURN_ARRAYTYPE_P(retval); + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); } typlen = my_extra->typlen; @@ -1214,37 +1265,64 @@ array_recv(PG_FUNCTION_ARGS) typalign = my_extra->typalign; typioparam = my_extra->typioparam; - dataPtr = ReadArrayBinary(buf, nitems, &my_extra->proc, - typioparam, typmod, - typlen, typbyval, typalign, - &nbytes); - nbytes += ARR_OVERHEAD(ndim); - - retval = (ArrayType *) palloc0(nbytes); + dataPtr = (Datum *) palloc(nitems * sizeof(Datum)); + nullsPtr = (bool *) palloc(nitems * sizeof(bool)); + ReadArrayBinary(buf, nitems, + &my_extra->proc, typioparam, typmod, + typlen, typbyval, typalign, + dataPtr, nullsPtr, + &hasnulls, &nbytes); + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + retval = (ArrayType *) palloc(nbytes); retval->size = nbytes; retval->ndim = ndim; + retval->dataoffset = dataoffset; retval->elemtype = element_type; memcpy(ARR_DIMS(retval), dim, ndim * sizeof(int)); memcpy(ARR_LBOUND(retval), lBound, ndim * sizeof(int)); - CopyArrayEls(ARR_DATA_PTR(retval), dataPtr, nitems, - typlen, typbyval, typalign, true); + CopyArrayEls(retval, + dataPtr, nullsPtr, nitems, + typlen, typbyval, typalign, + true); + pfree(dataPtr); + pfree(nullsPtr); PG_RETURN_ARRAYTYPE_P(retval); } -/*--------------------------------------------------------------------------- +/* * ReadArrayBinary: * collect the data elements of an array being read in binary style. - * result : - * returns a palloc'd array of Datum representations of the array elements. - * If element type is pass-by-ref, the Datums point to palloc'd values. - * *nbytes is set to the amount of data space needed for the array, - * including alignment padding but not including array header overhead. - *--------------------------------------------------------------------------- + * + * Inputs: + * buf: the data buffer to read from. + * nitems: total number of array elements (already read). + * receiveproc: type-specific receive procedure for element datatype. + * typioparam, typmod: auxiliary values to pass to receiveproc. + * typlen, typbyval, typalign: storage parameters of element datatype. + * + * Outputs: + * values[]: filled with converted data values. + * nulls[]: filled with is-null markers. + * *hasnulls: set TRUE iff there are any null elements. + * *nbytes: set to total size of data area needed (including alignment + * padding but not including array header overhead). + * + * Note that values[] and nulls[] are allocated by the caller, and must have + * nitems elements. */ -static Datum * +static void ReadArrayBinary(StringInfo buf, int nitems, FmgrInfo *receiveproc, @@ -1253,12 +1331,14 @@ ReadArrayBinary(StringInfo buf, int typlen, bool typbyval, char typalign, - int *nbytes) + Datum *values, + bool *nulls, + bool *hasnulls, + int32 *nbytes) { - Datum *values; int i; - - values = (Datum *) palloc(nitems * sizeof(Datum)); + bool hasnull; + int32 totbytes; for (i = 0; i < nitems; i++) { @@ -1268,11 +1348,18 @@ ReadArrayBinary(StringInfo buf, /* Get and check the item length */ itemlen = pq_getmsgint(buf, 4); - if (itemlen < 0 || itemlen > (buf->len - buf->cursor)) + if (itemlen < -1 || itemlen > (buf->len - buf->cursor)) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("insufficient data left in message"))); + if (itemlen == -1) + { + /* -1 length means NULL */ + nulls[i] = true; + continue; + } + /* * Rather than copying data around, we just set up a phony StringInfo * pointing to the correct portion of the input buffer. We assume we @@ -1294,6 +1381,7 @@ ReadArrayBinary(StringInfo buf, PointerGetDatum(&elem_buf), ObjectIdGetDatum(typioparam), Int32GetDatum(typmod)); + nulls[i] = false; /* Trouble if it didn't eat the whole buffer */ if (elem_buf.cursor != itemlen) @@ -1306,43 +1394,50 @@ ReadArrayBinary(StringInfo buf, } /* - * Compute total data space needed + * Check for nulls, compute total data space needed */ - if (typlen > 0) - *nbytes = nitems * att_align(typlen, typalign); - else + hasnull = false; + totbytes = 0; + for (i = 0; i < nitems; i++) { - Assert(!typbyval); - *nbytes = 0; - for (i = 0; i < nitems; i++) + if (nulls[i]) + hasnull = true; + else { /* let's just make sure data is not toasted */ if (typlen == -1) values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i])); - *nbytes = att_addlength(*nbytes, typlen, values[i]); - *nbytes = att_align(*nbytes, typalign); + totbytes = att_addlength(totbytes, typlen, values[i]); + totbytes = att_align(totbytes, typalign); + /* check for overflow of total request */ + if (!AllocSizeIsValid(totbytes)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("array size exceeds the maximum allowed (%d)", + (int) MaxAllocSize))); } } - - return values; + *hasnulls = hasnull; + *nbytes = totbytes; } -/*------------------------------------------------------------------------- +/* * array_send : - * takes the internal representation of an array and returns a bytea + * takes the internal representation of an array and returns a bytea * containing the array in its external binary format. - *------------------------------------------------------------------------- */ Datum array_send(PG_FUNCTION_ARGS) { ArrayType *v = PG_GETARG_ARRAYTYPE_P(0); - Oid element_type; + Oid element_type = ARR_ELEMTYPE(v); int typlen; bool typbyval; char typalign; char *p; + bits8 *bitmap; + int bitmask; int nitems, i; int ndim, @@ -1350,9 +1445,6 @@ array_send(PG_FUNCTION_ARGS) StringInfoData buf; ArrayMetaState *my_extra; - /* Get information about the element type and the array dimensions */ - element_type = ARR_ELEMTYPE(v); - /* * We arrange to look up info about element type, including its send * conversion proc, only once per series of calls, assuming the element @@ -1364,7 +1456,7 @@ array_send(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, sizeof(ArrayMetaState)); my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra; - my_extra->element_type = InvalidOid; + my_extra->element_type = ~element_type; } if (my_extra->element_type != element_type) @@ -1395,7 +1487,7 @@ array_send(PG_FUNCTION_ARGS) /* Send the array header information */ pq_sendint(&buf, ndim, 4); - pq_sendint(&buf, v->flags, 4); + pq_sendint(&buf, ARR_HASNULL(v) ? 1 : 0, 4); pq_sendint(&buf, element_type, sizeof(Oid)); for (i = 0; i < ndim; i++) { @@ -1405,32 +1497,54 @@ array_send(PG_FUNCTION_ARGS) /* Send the array elements using the element's own sendproc */ p = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + for (i = 0; i < nitems; i++) { - Datum itemvalue; - bytea *outputbytes; + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + /* -1 length means a NULL */ + pq_sendint(&buf, -1, 4); + } + else + { + Datum itemvalue; + bytea *outputbytes; - itemvalue = fetch_att(p, typbyval, typlen); + itemvalue = fetch_att(p, typbyval, typlen); - outputbytes = DatumGetByteaP(FunctionCall1(&my_extra->proc, - itemvalue)); - /* We assume the result will not have been toasted */ - pq_sendint(&buf, VARSIZE(outputbytes) - VARHDRSZ, 4); - pq_sendbytes(&buf, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - pfree(outputbytes); + outputbytes = DatumGetByteaP(FunctionCall1(&my_extra->proc, + itemvalue)); + /* We assume the result will not have been toasted */ + pq_sendint(&buf, VARSIZE(outputbytes) - VARHDRSZ, 4); + pq_sendbytes(&buf, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + pfree(outputbytes); - p = att_addlength(p, typlen, PointerGetDatum(p)); - p = (char *) att_align(p, typalign); + p = att_addlength(p, typlen, PointerGetDatum(p)); + p = (char *) att_align(p, typalign); + } + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } } PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); } -/*----------------------------------------------------------------------------- +/* * array_dims : * returns the dimensions of the array pointed to by "v", as a "text" - *---------------------------------------------------------------------------- */ Datum array_dims(PG_FUNCTION_ARGS) @@ -1471,11 +1585,10 @@ array_dims(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(result); } -/*----------------------------------------------------------------------------- +/* * array_lower : * returns the lower dimension, of the DIM requested, for * the array pointed to by "v", as an int4 - *---------------------------------------------------------------------------- */ Datum array_lower(PG_FUNCTION_ARGS) @@ -1499,11 +1612,10 @@ array_lower(PG_FUNCTION_ARGS) PG_RETURN_INT32(result); } -/*----------------------------------------------------------------------------- +/* * array_upper : * returns the upper dimension, of the DIM requested, for * the array pointed to by "v", as an int4 - *---------------------------------------------------------------------------- */ Datum array_upper(PG_FUNCTION_ARGS) @@ -1530,18 +1642,32 @@ array_upper(PG_FUNCTION_ARGS) PG_RETURN_INT32(result); } -/*--------------------------------------------------------------------------- +/* * array_ref : - * This routine takes an array pointer and an index array and returns + * This routine takes an array pointer and a subscript array and returns * the referenced item as a Datum. Note that for a pass-by-reference * datatype, the returned Datum is a pointer into the array object. - *--------------------------------------------------------------------------- + * + * This handles both ordinary varlena arrays and fixed-length arrays. + * + * Inputs: + * array: the array object (mustn't be NULL) + * nSubscripts: number of subscripts supplied + * indx[]: the subscript values + * arraytyplen: pg_type.typlen for the array type + * elmlen: pg_type.typlen for the array's element type + * elmbyval: pg_type.typbyval for the array's element type + * elmalign: pg_type.typalign for the array's element type + * + * Outputs: + * The return value is the element Datum. + * *isNull is set to indicate whether the element is NULL. */ Datum array_ref(ArrayType *array, int nSubscripts, int *indx, - int arraylen, + int arraytyplen, int elmlen, bool elmbyval, char elmalign, @@ -1556,21 +1682,20 @@ array_ref(ArrayType *array, fixedLb[1]; char *arraydataptr, *retptr; + bits8 *arraynullsptr; - if (array == NULL) - RETURN_NULL(Datum); - - if (arraylen > 0) + if (arraytyplen > 0) { /* * fixed-length arrays -- these are assumed to be 1-d, 0-based */ ndim = 1; - fixedDim[0] = arraylen / elmlen; + fixedDim[0] = arraytyplen / elmlen; fixedLb[0] = 0; dim = fixedDim; lb = fixedLb; arraydataptr = (char *) array; + arraynullsptr = NULL; } else { @@ -1581,49 +1706,84 @@ array_ref(ArrayType *array, dim = ARR_DIMS(array); lb = ARR_LBOUND(array); arraydataptr = ARR_DATA_PTR(array); + arraynullsptr = ARR_NULLBITMAP(array); } /* * Return NULL for invalid subscript */ if (ndim != nSubscripts || ndim <= 0 || ndim > MAXDIM) - RETURN_NULL(Datum); + { + *isNull = true; + return (Datum) 0; + } for (i = 0; i < ndim; i++) + { if (indx[i] < lb[i] || indx[i] >= (dim[i] + lb[i])) - RETURN_NULL(Datum); + { + *isNull = true; + return (Datum) 0; + } + } /* - * OK, get the element + * Calculate the element number */ offset = ArrayGetOffset(nSubscripts, dim, lb, indx); - retptr = array_seek(arraydataptr, offset, elmlen, elmbyval, elmalign); + /* + * Check for NULL array element + */ + if (array_get_isnull(arraynullsptr, offset)) + { + *isNull = true; + return (Datum) 0; + } + /* + * OK, get the element + */ *isNull = false; + retptr = array_seek(arraydataptr, 0, arraynullsptr, offset, + elmlen, elmbyval, elmalign); return ArrayCast(retptr, elmbyval, elmlen); } -/*----------------------------------------------------------------------------- +/* * array_get_slice : * This routine takes an array and a range of indices (upperIndex and * lowerIndx), creates a new array structure for the referred elements * and returns a pointer to it. * - * NOTE: we assume it is OK to scribble on the provided index arrays + * This handles both ordinary varlena arrays and fixed-length arrays. + * + * Inputs: + * array: the array object (mustn't be NULL) + * nSubscripts: number of subscripts supplied (must be same for upper/lower) + * upperIndx[]: the upper subscript values + * lowerIndx[]: the lower subscript values + * arraytyplen: pg_type.typlen for the array type + * elmlen: pg_type.typlen for the array's element type + * elmbyval: pg_type.typbyval for the array's element type + * elmalign: pg_type.typalign for the array's element type + * + * Outputs: + * The return value is the new array Datum (it's never NULL) + * + * NOTE: we assume it is OK to scribble on the provided subscript arrays * lowerIndx[] and upperIndx[]. These are generally just temporaries. - *----------------------------------------------------------------------------- */ ArrayType * array_get_slice(ArrayType *array, int nSubscripts, int *upperIndx, int *lowerIndx, - int arraylen, + int arraytyplen, int elmlen, bool elmbyval, - char elmalign, - bool *isNull) + char elmalign) { + ArrayType *newarray; int i, ndim, *dim, @@ -1631,15 +1791,14 @@ array_get_slice(ArrayType *array, *newlb; int fixedDim[1], fixedLb[1]; + Oid elemtype; char *arraydataptr; - ArrayType *newarray; + bits8 *arraynullsptr; + int32 dataoffset; int bytes, span[MAXDIM]; - if (array == NULL) - RETURN_NULL(ArrayType *); - - if (arraylen > 0) + if (arraytyplen > 0) { /* * fixed-length arrays -- currently, cannot slice these because parser @@ -1652,15 +1811,18 @@ array_get_slice(ArrayType *array, errmsg("slices of fixed-length arrays not implemented"))); /* - * fixed-length arrays -- these are assumed to be 1-d, 0-based XXX - * where would we get the correct ELEMTYPE from? + * fixed-length arrays -- these are assumed to be 1-d, 0-based + * + * XXX where would we get the correct ELEMTYPE from? */ ndim = 1; - fixedDim[0] = arraylen / elmlen; + fixedDim[0] = arraytyplen / elmlen; fixedLb[0] = 0; dim = fixedDim; lb = fixedLb; + elemtype = InvalidOid; /* XXX */ arraydataptr = (char *) array; + arraynullsptr = NULL; } else { @@ -1670,16 +1832,18 @@ array_get_slice(ArrayType *array, ndim = ARR_NDIM(array); dim = ARR_DIMS(array); lb = ARR_LBOUND(array); + elemtype = ARR_ELEMTYPE(array); arraydataptr = ARR_DATA_PTR(array); + arraynullsptr = ARR_NULLBITMAP(array); } /* * Check provided subscripts. A slice exceeding the current array limits * is silently truncated to the array limits. If we end up with an empty - * slice, return NULL (should it be an empty array instead?) + * slice, return an empty array. */ if (ndim < nSubscripts || ndim <= 0 || ndim > MAXDIM) - RETURN_NULL(ArrayType *); + return construct_empty_array(elemtype); for (i = 0; i < nSubscripts; i++) { @@ -1688,7 +1852,7 @@ array_get_slice(ArrayType *array, if (upperIndx[i] >= (dim[i] + lb[i])) upperIndx[i] = dim[i] + lb[i] - 1; if (lowerIndx[i] > upperIndx[i]) - RETURN_NULL(ArrayType *); + return construct_empty_array(elemtype); } /* fill any missing subscript positions with full array range */ for (; i < ndim; i++) @@ -1696,21 +1860,36 @@ array_get_slice(ArrayType *array, lowerIndx[i] = lb[i]; upperIndx[i] = dim[i] + lb[i] - 1; if (lowerIndx[i] > upperIndx[i]) - RETURN_NULL(ArrayType *); + return construct_empty_array(elemtype); } mda_get_range(ndim, span, lowerIndx, upperIndx); - bytes = array_slice_size(ndim, dim, lb, arraydataptr, + bytes = array_slice_size(arraydataptr, arraynullsptr, + ndim, dim, lb, lowerIndx, upperIndx, elmlen, elmbyval, elmalign); - bytes += ARR_OVERHEAD(ndim); + + /* + * Currently, we put a null bitmap in the result if the source has one; + * could be smarter ... + */ + if (arraynullsptr) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, ArrayGetNItems(ndim, span)); + bytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + bytes += ARR_OVERHEAD_NONULLS(ndim); + } newarray = (ArrayType *) palloc(bytes); newarray->size = bytes; newarray->ndim = ndim; - newarray->flags = 0; - newarray->elemtype = ARR_ELEMTYPE(array); + newarray->dataoffset = dataoffset; + newarray->elemtype = elemtype; memcpy(ARR_DIMS(newarray), span, ndim * sizeof(int)); /* @@ -1721,63 +1900,77 @@ array_get_slice(ArrayType *array, for (i = 0; i < ndim; i++) newlb[i] = 1; - array_extract_slice(ndim, dim, lb, arraydataptr, - lowerIndx, upperIndx, ARR_DATA_PTR(newarray), + array_extract_slice(newarray, + ndim, dim, lb, + arraydataptr, arraynullsptr, + lowerIndx, upperIndx, elmlen, elmbyval, elmalign); return newarray; } -/*----------------------------------------------------------------------------- +/* * array_set : - * This routine sets the value of an array location (specified by - * an index array) to a new value specified by "dataValue". - * result : + * This routine sets the value of an array element (specified by + * a subscript array) to a new value specified by "dataValue". + * + * This handles both ordinary varlena arrays and fixed-length arrays. + * + * Inputs: + * array: the initial array object (mustn't be NULL) + * nSubscripts: number of subscripts supplied + * indx[]: the subscript values + * dataValue: the datum to be inserted at the given position + * isNull: whether dataValue is NULL + * arraytyplen: pg_type.typlen for the array type + * elmlen: pg_type.typlen for the array's element type + * elmbyval: pg_type.typbyval for the array's element type + * elmalign: pg_type.typalign for the array's element type + * + * Result: * A new array is returned, just like the old except for the one - * modified entry. + * modified entry. The original array object is not changed. * * For one-dimensional arrays only, we allow the array to be extended * by assigning to the position one above or one below the existing range. - * (We could be more flexible if we had a way to represent NULL elements.) + * (XXX we could be more flexible: perhaps allow NULL fill?) * * NOTE: For assignments, we throw an error for invalid subscripts etc, - * rather than returning a NULL as the fetch operations do. The reasoning - * is that returning a NULL would cause the user's whole array to be replaced - * with NULL, which will probably not make him happy. - *----------------------------------------------------------------------------- + * rather than returning a NULL as the fetch operations do. */ ArrayType * array_set(ArrayType *array, int nSubscripts, int *indx, Datum dataValue, - int arraylen, + bool isNull, + int arraytyplen, int elmlen, bool elmbyval, - char elmalign, - bool *isNull) + char elmalign) { + ArrayType *newarray; int i, ndim, dim[MAXDIM], lb[MAXDIM], offset; - ArrayType *newarray; char *elt_ptr; bool extendbefore = false; bool extendafter = false; - int olddatasize, + bool newhasnulls; + bits8 *oldnullbitmap; + int oldnitems, + olddatasize, newsize, olditemlen, newitemlen, overheadlen, + oldoverheadlen, lenbefore, lenafter; - if (array == NULL) - RETURN_NULL(ArrayType *); - - if (arraylen > 0) + if (arraytyplen > 0) { /* * fixed-length arrays -- these are assumed to be 1-d, 0-based. We @@ -1788,20 +1981,30 @@ array_set(ArrayType *array, (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), errmsg("invalid array subscripts"))); - if (indx[0] < 0 || indx[0] * elmlen >= arraylen) + if (indx[0] < 0 || indx[0] * elmlen >= arraytyplen) ereport(ERROR, (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), errmsg("invalid array subscripts"))); - newarray = (ArrayType *) palloc(arraylen); - memcpy(newarray, array, arraylen); + if (isNull) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("cannot assign NULL to an element of a fixed-length array"))); + + newarray = (ArrayType *) palloc(arraytyplen); + memcpy(newarray, array, arraytyplen); elt_ptr = (char *) newarray + indx[0] * elmlen; ArrayCastAndSet(dataValue, elmlen, elmbyval, elmalign, elt_ptr); return newarray; } + if (nSubscripts <= 0 || nSubscripts > MAXDIM) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("invalid array subscripts"))); + /* make sure item to be inserted is not toasted */ - if (elmlen == -1) + if (elmlen == -1 && !isNull) dataValue = PointerGetDatum(PG_DETOAST_DATUM(dataValue)); /* detoast input array if necessary */ @@ -1824,11 +2027,12 @@ array_set(ArrayType *array, lb[i] = indx[i]; } - return construct_md_array(&dataValue, nSubscripts, dim, lb, elmtype, + return construct_md_array(&dataValue, &isNull, nSubscripts, + dim, lb, elmtype, elmlen, elmbyval, elmalign); } - if (ndim != nSubscripts || ndim <= 0 || ndim > MAXDIM) + if (ndim != nSubscripts) ereport(ERROR, (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), errmsg("invalid array subscripts"))); @@ -1872,16 +2076,31 @@ array_set(ArrayType *array, /* * Compute sizes of items and areas to copy */ - overheadlen = ARR_OVERHEAD(ndim); - olddatasize = ARR_SIZE(array) - overheadlen; + if (ARR_HASNULL(array) || isNull) + { + newhasnulls = true; + overheadlen = ARR_OVERHEAD_WITHNULLS(ndim, + ArrayGetNItems(ndim, dim)); + } + else + { + newhasnulls = false; + overheadlen = ARR_OVERHEAD_NONULLS(ndim); + } + oldnitems = ArrayGetNItems(ndim, ARR_DIMS(array)); + oldnullbitmap = ARR_NULLBITMAP(array); + oldoverheadlen = ARR_DATA_OFFSET(array); + olddatasize = ARR_SIZE(array) - oldoverheadlen; if (extendbefore) { + offset = 0; lenbefore = 0; olditemlen = 0; lenafter = olddatasize; } else if (extendafter) { + offset = oldnitems; lenbefore = olddatasize; olditemlen = 0; lenafter = 0; @@ -1889,59 +2108,112 @@ array_set(ArrayType *array, else { offset = ArrayGetOffset(nSubscripts, dim, lb, indx); - elt_ptr = array_seek(ARR_DATA_PTR(array), offset, + elt_ptr = array_seek(ARR_DATA_PTR(array), 0, oldnullbitmap, offset, elmlen, elmbyval, elmalign); lenbefore = (int) (elt_ptr - ARR_DATA_PTR(array)); - olditemlen = att_addlength(0, elmlen, PointerGetDatum(elt_ptr)); - olditemlen = att_align(olditemlen, elmalign); + if (array_get_isnull(oldnullbitmap, offset)) + olditemlen = 0; + else + { + olditemlen = att_addlength(0, elmlen, PointerGetDatum(elt_ptr)); + olditemlen = att_align(olditemlen, elmalign); + } lenafter = (int) (olddatasize - lenbefore - olditemlen); } - newitemlen = att_addlength(0, elmlen, dataValue); - newitemlen = att_align(newitemlen, elmalign); + if (isNull) + newitemlen = 0; + else + { + newitemlen = att_addlength(0, elmlen, dataValue); + newitemlen = att_align(newitemlen, elmalign); + } newsize = overheadlen + lenbefore + newitemlen + lenafter; /* - * OK, do the assignment + * OK, create the new array and fill in header/dimensions */ newarray = (ArrayType *) palloc(newsize); newarray->size = newsize; newarray->ndim = ndim; - newarray->flags = 0; + newarray->dataoffset = newhasnulls ? overheadlen : 0; newarray->elemtype = ARR_ELEMTYPE(array); memcpy(ARR_DIMS(newarray), dim, ndim * sizeof(int)); memcpy(ARR_LBOUND(newarray), lb, ndim * sizeof(int)); + + /* + * Fill in data + */ memcpy((char *) newarray + overheadlen, - (char *) array + overheadlen, + (char *) array + oldoverheadlen, lenbefore); + if (!isNull) + ArrayCastAndSet(dataValue, elmlen, elmbyval, elmalign, + (char *) newarray + overheadlen + lenbefore); memcpy((char *) newarray + overheadlen + lenbefore + newitemlen, - (char *) array + overheadlen + lenbefore + olditemlen, + (char *) array + oldoverheadlen + lenbefore + olditemlen, lenafter); - ArrayCastAndSet(dataValue, elmlen, elmbyval, elmalign, - (char *) newarray + overheadlen + lenbefore); + /* + * Fill in nulls bitmap if needed + * + * Note: it's possible we just replaced the last NULL with a non-NULL, + * and could get rid of the bitmap. Seems not worth testing for though. + */ + if (newhasnulls) + { + bits8 *newnullbitmap = ARR_NULLBITMAP(newarray); + + array_set_isnull(newnullbitmap, offset, isNull); + if (extendbefore) + array_bitmap_copy(newnullbitmap, 1, + oldnullbitmap, 0, + oldnitems); + else + { + array_bitmap_copy(newnullbitmap, 0, + oldnullbitmap, 0, + offset); + if (!extendafter) + array_bitmap_copy(newnullbitmap, offset+1, + oldnullbitmap, offset+1, + oldnitems - offset - 1); + } + } return newarray; } -/*---------------------------------------------------------------------------- +/* * array_set_slice : * This routine sets the value of a range of array locations (specified - * by upper and lower index values ) to new values passed as - * another array - * result : + * by upper and lower subscript values) to new values passed as + * another array. + * + * This handles both ordinary varlena arrays and fixed-length arrays. + * + * Inputs: + * array: the initial array object (mustn't be NULL) + * nSubscripts: number of subscripts supplied (must be same for upper/lower) + * upperIndx[]: the upper subscript values + * lowerIndx[]: the lower subscript values + * srcArray: the source for the inserted values + * isNull: indicates whether srcArray is NULL + * arraytyplen: pg_type.typlen for the array type + * elmlen: pg_type.typlen for the array's element type + * elmbyval: pg_type.typbyval for the array's element type + * elmalign: pg_type.typalign for the array's element type + * + * Result: * A new array is returned, just like the old except for the - * modified range. + * modified range. The original array object is not changed. * * NOTE: we assume it is OK to scribble on the provided index arrays * lowerIndx[] and upperIndx[]. These are generally just temporaries. * * NOTE: For assignments, we throw an error for silly subscripts etc, - * rather than returning a NULL as the fetch operations do. The reasoning - * is that returning a NULL would cause the user's whole array to be replaced - * with NULL, which will probably not make him happy. - *---------------------------------------------------------------------------- + * rather than returning a NULL or empty array as the fetch operations do. */ ArrayType * array_set_slice(ArrayType *array, @@ -1949,33 +2221,38 @@ array_set_slice(ArrayType *array, int *upperIndx, int *lowerIndx, ArrayType *srcArray, - int arraylen, + bool isNull, + int arraytyplen, int elmlen, bool elmbyval, - char elmalign, - bool *isNull) + char elmalign) { + ArrayType *newarray; int i, ndim, dim[MAXDIM], lb[MAXDIM], span[MAXDIM]; - ArrayType *newarray; - int nsrcitems, + bool newhasnulls; + int nitems, + nsrcitems, olddatasize, newsize, olditemsize, newitemsize, overheadlen, + oldoverheadlen, lenbefore, - lenafter; + lenafter, + itemsbefore, + itemsafter, + nolditems; - if (array == NULL) - RETURN_NULL(ArrayType *); - if (srcArray == NULL) + /* Currently, assignment from a NULL source array is a no-op */ + if (isNull) return array; - if (arraylen > 0) + if (arraytyplen > 0) { /* * fixed-length arrays -- not got round to doing this... @@ -2001,11 +2278,12 @@ array_set_slice(ArrayType *array, if (ndim == 0) { Datum *dvalues; + bool *dnulls; int nelems; Oid elmtype = ARR_ELEMTYPE(array); deconstruct_array(srcArray, elmtype, elmlen, elmbyval, elmalign, - &dvalues, &nelems); + &dvalues, &dnulls, &nelems); for (i = 0; i < nSubscripts; i++) { @@ -2019,7 +2297,8 @@ array_set_slice(ArrayType *array, (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), errmsg("source array too small"))); - return construct_md_array(dvalues, nSubscripts, dim, lb, elmtype, + return construct_md_array(dvalues, dnulls, nSubscripts, + dim, lb, elmtype, elmlen, elmbyval, elmalign); } @@ -2076,6 +2355,9 @@ array_set_slice(ArrayType *array, errmsg("invalid array subscripts"))); } + /* Do this mainly to check for overflow */ + nitems = ArrayGetNItems(ndim, dim); + /* * Make sure source array has enough entries. Note we ignore the shape of * the source array and just read entries serially. @@ -2091,20 +2373,34 @@ array_set_slice(ArrayType *array, * Compute space occupied by new entries, space occupied by replaced * entries, and required space for new array. */ - newitemsize = array_nelems_size(ARR_DATA_PTR(srcArray), nsrcitems, + if (ARR_HASNULL(array) || ARR_HASNULL(srcArray)) + { + newhasnulls = true; + overheadlen = ARR_OVERHEAD_WITHNULLS(ndim, nitems); + } + else + { + newhasnulls = false; + overheadlen = ARR_OVERHEAD_NONULLS(ndim); + } + newitemsize = array_nelems_size(ARR_DATA_PTR(srcArray), 0, + ARR_NULLBITMAP(srcArray), nsrcitems, elmlen, elmbyval, elmalign); - overheadlen = ARR_OVERHEAD(ndim); - olddatasize = ARR_SIZE(array) - overheadlen; + oldoverheadlen = ARR_DATA_OFFSET(array); + olddatasize = ARR_SIZE(array) - oldoverheadlen; if (ndim > 1) { /* * here we do not need to cope with extension of the array; it would * be a lot more complicated if we had to do so... */ - olditemsize = array_slice_size(ndim, dim, lb, ARR_DATA_PTR(array), + olditemsize = array_slice_size(ARR_DATA_PTR(array), + ARR_NULLBITMAP(array), + ndim, dim, lb, lowerIndx, upperIndx, elmlen, elmbyval, elmalign); lenbefore = lenafter = 0; /* keep compiler quiet */ + itemsbefore = itemsafter = nolditems = 0; } else { @@ -2116,15 +2412,26 @@ array_set_slice(ArrayType *array, int slicelb = Max(oldlb, lowerIndx[0]); int sliceub = Min(oldub, upperIndx[0]); char *oldarraydata = ARR_DATA_PTR(array); + bits8 *oldarraybitmap = ARR_NULLBITMAP(array); - lenbefore = array_nelems_size(oldarraydata, slicelb - oldlb, + itemsbefore = slicelb - oldlb; + lenbefore = array_nelems_size(oldarraydata, 0, oldarraybitmap, + itemsbefore, elmlen, elmbyval, elmalign); if (slicelb > sliceub) + { + nolditems = 0; olditemsize = 0; + } else + { + nolditems = sliceub - slicelb + 1; olditemsize = array_nelems_size(oldarraydata + lenbefore, - sliceub - slicelb + 1, + itemsbefore, oldarraybitmap, + nolditems, elmlen, elmbyval, elmalign); + } + itemsafter = oldub - sliceub; lenafter = olddatasize - lenbefore - olditemsize; } @@ -2133,7 +2440,7 @@ array_set_slice(ArrayType *array, newarray = (ArrayType *) palloc(newsize); newarray->size = newsize; newarray->ndim = ndim; - newarray->flags = 0; + newarray->dataoffset = newhasnulls ? overheadlen : 0; newarray->elemtype = ARR_ELEMTYPE(array); memcpy(ARR_DIMS(newarray), dim, ndim * sizeof(int)); memcpy(ARR_LBOUND(newarray), lb, ndim * sizeof(int)); @@ -2144,22 +2451,39 @@ array_set_slice(ArrayType *array, * here we do not need to cope with extension of the array; it would * be a lot more complicated if we had to do so... */ - array_insert_slice(ndim, dim, lb, ARR_DATA_PTR(array), olddatasize, - ARR_DATA_PTR(newarray), - lowerIndx, upperIndx, ARR_DATA_PTR(srcArray), + array_insert_slice(newarray, array, srcArray, + ndim, dim, lb, + lowerIndx, upperIndx, elmlen, elmbyval, elmalign); } else { + /* fill in data */ memcpy((char *) newarray + overheadlen, - (char *) array + overheadlen, + (char *) array + oldoverheadlen, lenbefore); memcpy((char *) newarray + overheadlen + lenbefore, ARR_DATA_PTR(srcArray), newitemsize); memcpy((char *) newarray + overheadlen + lenbefore + newitemsize, - (char *) array + overheadlen + lenbefore + olditemsize, + (char *) array + oldoverheadlen + lenbefore + olditemsize, lenafter); + /* fill in nulls bitmap if needed */ + if (newhasnulls) + { + bits8 *newnullbitmap = ARR_NULLBITMAP(newarray); + bits8 *oldnullbitmap = ARR_NULLBITMAP(array); + + array_bitmap_copy(newnullbitmap, 0, + oldnullbitmap, 0, + itemsbefore); + array_bitmap_copy(newnullbitmap, itemsbefore, + ARR_NULLBITMAP(srcArray), 0, + nsrcitems); + array_bitmap_copy(newnullbitmap, itemsbefore+nsrcitems, + oldnullbitmap, itemsbefore+nolditems, + itemsafter); + } } return newarray; @@ -2192,9 +2516,8 @@ array_set_slice(ArrayType *array, * but better performance can be had if the state can be preserved across * a series of calls. * - * NB: caller must assure that input array is not NULL. Currently, - * any additional parameters passed to fn() may not be specified as NULL - * either. + * NB: caller must assure that input array is not NULL. NULL elements in + * the array are OK however. */ Datum array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType, @@ -2203,12 +2526,15 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType, ArrayType *v; ArrayType *result; Datum *values; + bool *nulls; Datum elt; int *dim; int ndim; int nitems; int i; - int nbytes = 0; + int32 nbytes = 0; + int32 dataoffset; + bool hasnulls; int inp_typlen; bool inp_typbyval; char inp_typalign; @@ -2216,6 +2542,8 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType, bool typbyval; char typalign; char *s; + bits8 *bitmap; + int bitmask; ArrayMetaState *inp_extra; ArrayMetaState *ret_extra; @@ -2236,10 +2564,7 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType, if (nitems <= 0) { /* Return empty array */ - result = (ArrayType *) palloc0(sizeof(ArrayType)); - result->size = sizeof(ArrayType); - result->elemtype = retType; - PG_RETURN_ARRAYTYPE_P(result); + PG_RETURN_ARRAYTYPE_P(construct_empty_array(retType)); } /* @@ -2274,79 +2599,137 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType, typbyval = ret_extra->typbyval; typalign = ret_extra->typalign; - /* Allocate temporary array for new values */ + /* Allocate temporary arrays for new values */ values = (Datum *) palloc(nitems * sizeof(Datum)); + nulls = (bool *) palloc(nitems * sizeof(bool)); /* Loop over source data */ - s = (char *) ARR_DATA_PTR(v); + s = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + hasnulls = false; + for (i = 0; i < nitems; i++) { - /* Get source element */ - elt = fetch_att(s, inp_typbyval, inp_typlen); + bool callit = true; - s = att_addlength(s, inp_typlen, PointerGetDatum(s)); - s = (char *) att_align(s, inp_typalign); + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + fcinfo->argnull[0] = true; + } + else + { + elt = fetch_att(s, inp_typbyval, inp_typlen); + s = att_addlength(s, inp_typlen, elt); + s = (char *) att_align(s, inp_typalign); + fcinfo->arg[0] = elt; + fcinfo->argnull[0] = false; + } /* * Apply the given function to source elt and extra args. - * - * We assume the extra args are non-NULL, so need not check whether fn() - * is strict. Would need to do more work here to support arrays - * containing nulls, too. */ - fcinfo->arg[0] = elt; - fcinfo->argnull[0] = false; - fcinfo->isnull = false; - values[i] = FunctionCallInvoke(fcinfo); - if (fcinfo->isnull) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("null array elements not supported"))); + if (fcinfo->flinfo->fn_strict) + { + int j; - /* Ensure data is not toasted */ - if (typlen == -1) - values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i])); + for (j = 0; j < fcinfo->nargs; j++) + { + if (fcinfo->argnull[j]) + { + callit = false; + break; + } + } + } + + if (callit) + { + fcinfo->isnull = false; + values[i] = FunctionCallInvoke(fcinfo); + } + else + fcinfo->isnull = true; - /* Update total result size */ - nbytes = att_addlength(nbytes, typlen, values[i]); - nbytes = att_align(nbytes, typalign); + nulls[i] = fcinfo->isnull; + if (fcinfo->isnull) + hasnulls = true; + else + { + /* Ensure data is not toasted */ + if (typlen == -1) + values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i])); + /* Update total result size */ + nbytes = att_addlength(nbytes, typlen, values[i]); + nbytes = att_align(nbytes, typalign); + /* check for overflow of total request */ + if (!AllocSizeIsValid(nbytes)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("array size exceeds the maximum allowed (%d)", + (int) MaxAllocSize))); + } + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } } /* Allocate and initialize the result array */ - nbytes += ARR_OVERHEAD(ndim); - result = (ArrayType *) palloc0(nbytes); - + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + result = (ArrayType *) palloc(nbytes); result->size = nbytes; result->ndim = ndim; + result->dataoffset = dataoffset; result->elemtype = retType; memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); /* * Note: do not risk trying to pfree the results of the called function */ - CopyArrayEls(ARR_DATA_PTR(result), values, nitems, - typlen, typbyval, typalign, false); + CopyArrayEls(result, + values, nulls, nitems, + typlen, typbyval, typalign, + false); + pfree(values); + pfree(nulls); PG_RETURN_ARRAYTYPE_P(result); } -/*---------- +/* * construct_array --- simple method for constructing an array object * * elems: array of Datum items to become the array contents + * (NULL element values are not supported). * nelems: number of items * elmtype, elmlen, elmbyval, elmalign: info for the datatype of the items * * A palloc'd 1-D array object is constructed and returned. Note that * elem values will be copied into the object even if pass-by-ref type. - * NULL element values are not supported. * * NOTE: it would be cleaner to look up the elmlen/elmbval/elmalign info * from the system catalogs, given the elmtype. However, the caller is * in a better position to cache this info across multiple uses, or even * to hard-wire values if the element type is hard-wired. - *---------- */ ArrayType * construct_array(Datum *elems, int nelems, @@ -2359,15 +2742,16 @@ construct_array(Datum *elems, int nelems, dims[0] = nelems; lbs[0] = 1; - return construct_md_array(elems, 1, dims, lbs, + return construct_md_array(elems, NULL, 1, dims, lbs, elmtype, elmlen, elmbyval, elmalign); } -/*---------- +/* * construct_md_array --- simple method for constructing an array object - * with arbitrary dimensions + * with arbitrary dimensions and possible NULLs * * elems: array of Datum items to become the array contents + * nulls: array of is-null flags (can be NULL if no nulls) * ndims: number of dimensions * dims: integer array with size of each dimension * lbs: integer array with lower bound of each dimension @@ -2375,23 +2759,24 @@ construct_array(Datum *elems, int nelems, * * A palloc'd ndims-D array object is constructed and returned. Note that * elem values will be copied into the object even if pass-by-ref type. - * NULL element values are not supported. * * NOTE: it would be cleaner to look up the elmlen/elmbval/elmalign info * from the system catalogs, given the elmtype. However, the caller is * in a better position to cache this info across multiple uses, or even * to hard-wire values if the element type is hard-wired. - *---------- */ ArrayType * construct_md_array(Datum *elems, + bool *nulls, int ndims, int *dims, int *lbs, Oid elmtype, int elmlen, bool elmbyval, char elmalign) { ArrayType *result; - int nbytes; + bool hasnulls; + int32 nbytes; + int32 dataoffset; int i; int nelems; @@ -2407,57 +2792,89 @@ construct_md_array(Datum *elems, /* fast track for empty array */ if (ndims == 0) - { - /* Allocate and initialize 0-D result array */ - result = (ArrayType *) palloc0(sizeof(ArrayType)); - result->size = sizeof(ArrayType); - result->elemtype = elmtype; - return result; - } + return construct_empty_array(elmtype); nelems = ArrayGetNItems(ndims, dims); /* compute required space */ - if (elmlen > 0) - nbytes = nelems * att_align(elmlen, elmalign); - else + nbytes = 0; + hasnulls = false; + for (i = 0; i < nelems; i++) { - Assert(!elmbyval); - nbytes = 0; - for (i = 0; i < nelems; i++) + if (nulls && nulls[i]) { - /* make sure data is not toasted */ - if (elmlen == -1) - elems[i] = PointerGetDatum(PG_DETOAST_DATUM(elems[i])); - nbytes = att_addlength(nbytes, elmlen, elems[i]); - nbytes = att_align(nbytes, elmalign); + hasnulls = true; + continue; } + /* make sure data is not toasted */ + if (elmlen == -1) + elems[i] = PointerGetDatum(PG_DETOAST_DATUM(elems[i])); + nbytes = att_addlength(nbytes, elmlen, elems[i]); + nbytes = att_align(nbytes, elmalign); + /* check for overflow of total request */ + if (!AllocSizeIsValid(nbytes)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("array size exceeds the maximum allowed (%d)", + (int) MaxAllocSize))); } - /* Allocate and initialize ndims-D result array */ - nbytes += ARR_OVERHEAD(ndims); + /* Allocate and initialize result array */ + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndims, nelems); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndims); + } result = (ArrayType *) palloc(nbytes); - result->size = nbytes; result->ndim = ndims; - result->flags = 0; + result->dataoffset = dataoffset; result->elemtype = elmtype; memcpy(ARR_DIMS(result), dims, ndims * sizeof(int)); memcpy(ARR_LBOUND(result), lbs, ndims * sizeof(int)); - CopyArrayEls(ARR_DATA_PTR(result), elems, nelems, - elmlen, elmbyval, elmalign, false); + + CopyArrayEls(result, + elems, nulls, nelems, + elmlen, elmbyval, elmalign, + false); return result; } -/*---------- +/* + * construct_empty_array --- make a zero-dimensional array of given type + */ +ArrayType * +construct_empty_array(Oid elmtype) +{ + ArrayType *result; + + result = (ArrayType *) palloc(sizeof(ArrayType)); + result->size = sizeof(ArrayType); + result->ndim = 0; + result->dataoffset = 0; + result->elemtype = elmtype; + return result; +} + +/* * deconstruct_array --- simple method for extracting data from an array * * array: array object to examine (must not be NULL) * elmtype, elmlen, elmbyval, elmalign: info for the datatype of the items * elemsp: return value, set to point to palloc'd array of Datum values + * nullsp: return value, set to point to palloc'd array of isnull markers * nelemsp: return value, set to number of extracted values * + * The caller may pass nullsp == NULL if it does not support NULLs in the + * array. Note that this produces a very uninformative error message, + * so do it only in cases where a NULL is really not expected. + * * If array elements are pass-by-ref data type, the returned Datums will * be pointers into the array object. * @@ -2465,42 +2882,72 @@ construct_md_array(Datum *elems, * from the system catalogs, given the elmtype. However, in most current * uses the type is hard-wired into the caller and so we can save a lookup * cycle by hard-wiring the type info as well. - *---------- */ void deconstruct_array(ArrayType *array, Oid elmtype, int elmlen, bool elmbyval, char elmalign, - Datum **elemsp, int *nelemsp) + Datum **elemsp, bool **nullsp, int *nelemsp) { Datum *elems; + bool *nulls; int nelems; char *p; + bits8 *bitmap; + int bitmask; int i; Assert(ARR_ELEMTYPE(array) == elmtype); nelems = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array)); - if (nelems <= 0) - { - *elemsp = NULL; - *nelemsp = 0; - return; - } *elemsp = elems = (Datum *) palloc(nelems * sizeof(Datum)); + if (nullsp) + *nullsp = nulls = (bool *) palloc(nelems * sizeof(bool)); + else + nulls = NULL; *nelemsp = nelems; p = ARR_DATA_PTR(array); + bitmap = ARR_NULLBITMAP(array); + bitmask = 1; + for (i = 0; i < nelems; i++) { - elems[i] = fetch_att(p, elmbyval, elmlen); - p = att_addlength(p, elmlen, PointerGetDatum(p)); - p = (char *) att_align(p, elmalign); + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + elems[i] = (Datum) 0; + if (nulls) + nulls[i] = true; + else + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("NULL array element not allowed in this context"))); + } + else + { + elems[i] = fetch_att(p, elmbyval, elmlen); + if (nulls) + nulls[i] = false; + p = att_addlength(p, elmlen, PointerGetDatum(p)); + p = (char *) att_align(p, elmalign); + } + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } } } -/*----------------------------------------------------------------------------- +/* * array_eq : * compares two arrays for equality * result : @@ -2508,15 +2955,12 @@ deconstruct_array(ArrayType *array, * * Note: we do not use array_cmp here, since equality may be meaningful in * datatypes that don't have a total ordering (and hence no btree support). - *----------------------------------------------------------------------------- */ Datum array_eq(PG_FUNCTION_ARGS) { ArrayType *array1 = PG_GETARG_ARRAYTYPE_P(0); ArrayType *array2 = PG_GETARG_ARRAYTYPE_P(1); - char *p1 = (char *) ARR_DATA_PTR(array1); - char *p2 = (char *) ARR_DATA_PTR(array2); int ndims1 = ARR_NDIM(array1); int ndims2 = ARR_NDIM(array2); int *dims1 = ARR_DIMS(array1); @@ -2529,6 +2973,11 @@ array_eq(PG_FUNCTION_ARGS) int typlen; bool typbyval; char typalign; + char *ptr1; + char *ptr2; + bits8 *bitmap1; + bits8 *bitmap2; + int bitmask; int i; FunctionCallInfoData locfcinfo; @@ -2572,21 +3021,68 @@ array_eq(PG_FUNCTION_ARGS) NULL, NULL); /* Loop over source data */ + ptr1 = ARR_DATA_PTR(array1); + ptr2 = ARR_DATA_PTR(array2); + bitmap1 = ARR_NULLBITMAP(array1); + bitmap2 = ARR_NULLBITMAP(array2); + bitmask = 1; /* use same bitmask for both arrays */ + for (i = 0; i < nitems1; i++) { Datum elt1; Datum elt2; + bool isnull1; + bool isnull2; bool oprresult; - /* Get element pair */ - elt1 = fetch_att(p1, typbyval, typlen); - elt2 = fetch_att(p2, typbyval, typlen); + /* Get elements, checking for NULL */ + if (bitmap1 && (*bitmap1 & bitmask) == 0) + { + isnull1 = true; + elt1 = (Datum) 0; + } + else + { + isnull1 = false; + elt1 = fetch_att(ptr1, typbyval, typlen); + ptr1 = att_addlength(ptr1, typlen, PointerGetDatum(ptr1)); + ptr1 = (char *) att_align(ptr1, typalign); + } - p1 = att_addlength(p1, typlen, PointerGetDatum(p1)); - p1 = (char *) att_align(p1, typalign); + if (bitmap2 && (*bitmap2 & bitmask) == 0) + { + isnull2 = true; + elt2 = (Datum) 0; + } + else + { + isnull2 = false; + elt2 = fetch_att(ptr2, typbyval, typlen); + ptr2 = att_addlength(ptr2, typlen, PointerGetDatum(ptr2)); + ptr2 = (char *) att_align(ptr2, typalign); + } - p2 = att_addlength(p2, typlen, PointerGetDatum(p2)); - p2 = (char *) att_align(p2, typalign); + /* advance bitmap pointers if any */ + bitmask <<= 1; + if (bitmask == 0x100) + { + if (bitmap1) + bitmap1++; + if (bitmap2) + bitmap2++; + bitmask = 1; + } + + /* + * We consider two NULLs equal; NULL and not-NULL are unequal. + */ + if (isnull1 && isnull2) + continue; + if (isnull1 || isnull2) + { + result = false; + break; + } /* * Apply the operator to the element pair @@ -2621,6 +3117,7 @@ array_eq(PG_FUNCTION_ARGS) * character-by-character. *---------------------------------------------------------------------------- */ + Datum array_ne(PG_FUNCTION_ARGS) { @@ -2668,8 +3165,6 @@ array_cmp(FunctionCallInfo fcinfo) { ArrayType *array1 = PG_GETARG_ARRAYTYPE_P(0); ArrayType *array2 = PG_GETARG_ARRAYTYPE_P(1); - char *p1 = (char *) ARR_DATA_PTR(array1); - char *p2 = (char *) ARR_DATA_PTR(array2); int ndims1 = ARR_NDIM(array1); int ndims2 = ARR_NDIM(array2); int *dims1 = ARR_DIMS(array1); @@ -2683,6 +3178,11 @@ array_cmp(FunctionCallInfo fcinfo) bool typbyval; char typalign; int min_nitems; + char *ptr1; + char *ptr2; + bits8 *bitmap1; + bits8 *bitmap2; + int bitmask; int i; FunctionCallInfoData locfcinfo; @@ -2721,22 +3221,76 @@ array_cmp(FunctionCallInfo fcinfo) NULL, NULL); /* Loop over source data */ + ptr1 = ARR_DATA_PTR(array1); + ptr2 = ARR_DATA_PTR(array2); + bitmap1 = ARR_NULLBITMAP(array1); + bitmap2 = ARR_NULLBITMAP(array2); + bitmask = 1; /* use same bitmask for both arrays */ + min_nitems = Min(nitems1, nitems2); for (i = 0; i < min_nitems; i++) { Datum elt1; Datum elt2; + bool isnull1; + bool isnull2; int32 cmpresult; - /* Get element pair */ - elt1 = fetch_att(p1, typbyval, typlen); - elt2 = fetch_att(p2, typbyval, typlen); + /* Get elements, checking for NULL */ + if (bitmap1 && (*bitmap1 & bitmask) == 0) + { + isnull1 = true; + elt1 = (Datum) 0; + } + else + { + isnull1 = false; + elt1 = fetch_att(ptr1, typbyval, typlen); + ptr1 = att_addlength(ptr1, typlen, PointerGetDatum(ptr1)); + ptr1 = (char *) att_align(ptr1, typalign); + } - p1 = att_addlength(p1, typlen, PointerGetDatum(p1)); - p1 = (char *) att_align(p1, typalign); + if (bitmap2 && (*bitmap2 & bitmask) == 0) + { + isnull2 = true; + elt2 = (Datum) 0; + } + else + { + isnull2 = false; + elt2 = fetch_att(ptr2, typbyval, typlen); + ptr2 = att_addlength(ptr2, typlen, PointerGetDatum(ptr2)); + ptr2 = (char *) att_align(ptr2, typalign); + } - p2 = att_addlength(p2, typlen, PointerGetDatum(p2)); - p2 = (char *) att_align(p2, typalign); + /* advance bitmap pointers if any */ + bitmask <<= 1; + if (bitmask == 0x100) + { + if (bitmap1) + bitmap1++; + if (bitmap2) + bitmap2++; + bitmask = 1; + } + + /* + * We consider two NULLs equal; NULL > not-NULL. + */ + if (isnull1 && isnull2) + continue; + if (isnull1) + { + /* arg1 is greater than arg2 */ + result = 1; + break; + } + if (isnull2) + { + /* arg1 is less than arg2 */ + result = -1; + break; + } /* Compare the pair of elements */ locfcinfo.arg[0] = elt1; @@ -2779,7 +3333,45 @@ array_cmp(FunctionCallInfo fcinfo) /***************************************************************************/ /* + * Check whether a specific array element is NULL + * + * nullbitmap: pointer to array's null bitmap (NULL if none) + * offset: 0-based linear element number of array element + */ +static bool +array_get_isnull(const bits8 *nullbitmap, int offset) +{ + if (nullbitmap == NULL) + return false; /* assume not null */ + if (nullbitmap[offset / 8] & (1 << (offset % 8))) + return false; /* not null */ + return true; +} + +/* + * Set a specific array element's null-bitmap entry + * + * nullbitmap: pointer to array's null bitmap (mustn't be NULL) + * offset: 0-based linear element number of array element + * isNull: null status to set + */ +static void +array_set_isnull(bits8 *nullbitmap, int offset, bool isNull) +{ + int bitmask; + + nullbitmap += offset / 8; + bitmask = 1 << (offset % 8); + if (isNull) + *nullbitmap &= ~bitmask; + else + *nullbitmap |= bitmask; +} + +/* * Fetch array element at pointer, converted correctly to a Datum + * + * Caller must have handled case of NULL element */ static Datum ArrayCast(char *value, bool byval, int len) @@ -2789,6 +3381,8 @@ ArrayCast(char *value, bool byval, int len) /* * Copy datum to *dest and return total space used (including align padding) + * + * Caller must have handled case of NULL element */ static int ArrayCastAndSet(Datum src, @@ -2819,67 +3413,194 @@ ArrayCastAndSet(Datum src, } /* - * Compute total size of the nitems array elements starting at *ptr + * Advance ptr over nitems array elements + * + * ptr: starting location in array + * offset: 0-based linear element number of first element (the one at *ptr) + * nullbitmap: start of array's null bitmap, or NULL if none + * nitems: number of array elements to advance over (>= 0) + * typlen, typbyval, typalign: storage parameters of array element datatype + * + * It is caller's responsibility to ensure that nitems is within range */ -static int -array_nelems_size(char *ptr, int nitems, - int typlen, bool typbyval, char typalign) +static char * +array_seek(char *ptr, int offset, bits8 *nullbitmap, int nitems, + int typlen, bool typbyval, char typalign) { - char *origptr; + int bitmask; int i; - /* fixed-size elements? */ - if (typlen > 0) - return nitems * att_align(typlen, typalign); + /* easy if fixed-size elements and no NULLs */ + if (typlen > 0 && !nullbitmap) + return ptr + nitems * ((Size) att_align(typlen, typalign)); - Assert(!typbyval); - origptr = ptr; - for (i = 0; i < nitems; i++) + /* seems worth having separate loops for NULL and no-NULLs cases */ + if (nullbitmap) + { + nullbitmap += offset / 8; + bitmask = 1 << (offset % 8); + + for (i = 0; i < nitems; i++) + { + if (*nullbitmap & bitmask) + { + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); + } + bitmask <<= 1; + if (bitmask == 0x100) + { + nullbitmap++; + bitmask = 1; + } + } + } + else { - ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); - ptr = (char *) att_align(ptr, typalign); + for (i = 0; i < nitems; i++) + { + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); + } } - return ptr - origptr; + return ptr; } /* - * Advance ptr over nitems array elements + * Compute total size of the nitems array elements starting at *ptr + * + * Parameters same as for array_seek */ -static char * -array_seek(char *ptr, int nitems, - int typlen, bool typbyval, char typalign) +static int +array_nelems_size(char *ptr, int offset, bits8 *nullbitmap, int nitems, + int typlen, bool typbyval, char typalign) { - return ptr + array_nelems_size(ptr, nitems, - typlen, typbyval, typalign); + return array_seek(ptr, offset, nullbitmap, nitems, + typlen, typbyval, typalign) - ptr; } /* * Copy nitems array elements from srcptr to destptr * + * destptr: starting destination location (must be enough room!) + * nitems: number of array elements to copy (>= 0) + * srcptr: starting location in source array + * offset: 0-based linear element number of first element (the one at *srcptr) + * nullbitmap: start of source array's null bitmap, or NULL if none + * typlen, typbyval, typalign: storage parameters of array element datatype + * * Returns number of bytes copied + * + * NB: this does not take care of setting up the destination's null bitmap! */ static int -array_copy(char *destptr, int nitems, char *srcptr, +array_copy(char *destptr, int nitems, + char *srcptr, int offset, bits8 *nullbitmap, int typlen, bool typbyval, char typalign) { - int numbytes = array_nelems_size(srcptr, nitems, - typlen, typbyval, typalign); + int numbytes; - memmove(destptr, srcptr, numbytes); + numbytes = array_nelems_size(srcptr, offset, nullbitmap, nitems, + typlen, typbyval, typalign); + memcpy(destptr, srcptr, numbytes); return numbytes; } /* + * Copy nitems null-bitmap bits from source to destination + * + * destbitmap: start of destination array's null bitmap (mustn't be NULL) + * destoffset: 0-based linear element number of first dest element + * srcbitmap: start of source array's null bitmap, or NULL if none + * srcoffset: 0-based linear element number of first source element + * nitems: number of bits to copy (>= 0) + * + * If srcbitmap is NULL then we assume the source is all-non-NULL and + * fill 1's into the destination bitmap. Note that only the specified + * bits in the destination map are changed, not any before or after. + * + * Note: this could certainly be optimized using standard bitblt methods. + * However, it's not clear that the typical Postgres array has enough elements + * to make it worth worrying too much. For the moment, KISS. + */ +void +array_bitmap_copy(bits8 *destbitmap, int destoffset, + const bits8 *srcbitmap, int srcoffset, + int nitems) +{ + int destbitmask, + destbitval, + srcbitmask, + srcbitval; + + Assert(destbitmap); + if (nitems <= 0) + return; /* don't risk fetch off end of memory */ + destbitmap += destoffset / 8; + destbitmask = 1 << (destoffset % 8); + destbitval = *destbitmap; + if (srcbitmap) + { + srcbitmap += srcoffset / 8; + srcbitmask = 1 << (srcoffset % 8); + srcbitval = *srcbitmap; + while (nitems-- > 0) + { + if (srcbitval & srcbitmask) + destbitval |= destbitmask; + else + destbitval &= ~destbitmask; + destbitmask <<= 1; + if (destbitmask == 0x100) + { + *destbitmap++ = destbitval; + destbitmask = 1; + if (nitems > 0) + destbitval = *destbitmap; + } + srcbitmask <<= 1; + if (srcbitmask == 0x100) + { + srcbitmap++; + srcbitmask = 1; + if (nitems > 0) + srcbitval = *srcbitmap; + } + } + if (destbitmask != 1) + *destbitmap = destbitval; + } + else + { + while (nitems-- > 0) + { + destbitval |= destbitmask; + destbitmask <<= 1; + if (destbitmask == 0x100) + { + *destbitmap++ = destbitval; + destbitmask = 1; + if (nitems > 0) + destbitval = *destbitmap; + } + } + if (destbitmask != 1) + *destbitmap = destbitval; + } +} + +/* * Compute space needed for a slice of an array * * We assume the caller has verified that the slice coordinates are valid. */ static int -array_slice_size(int ndim, int *dim, int *lb, char *arraydataptr, +array_slice_size(char *arraydataptr, bits8 *arraynullsptr, + int ndim, int *dim, int *lb, int *st, int *endp, int typlen, bool typbyval, char typalign) { - int st_pos, + int src_offset, span[MAXDIM], prod[MAXDIM], dist[MAXDIM], @@ -2892,13 +3613,13 @@ array_slice_size(int ndim, int *dim, int *lb, char *arraydataptr, mda_get_range(ndim, span, st, endp); - /* Pretty easy for fixed element length ... */ - if (typlen > 0) + /* Pretty easy for fixed element length without nulls ... */ + if (typlen > 0 && !arraynullsptr) return ArrayGetNItems(ndim, span) * att_align(typlen, typalign); /* Else gotta do it the hard way */ - st_pos = ArrayGetOffset(ndim, dim, lb, st); - ptr = array_seek(arraydataptr, st_pos, + src_offset = ArrayGetOffset(ndim, dim, lb, st); + ptr = array_seek(arraydataptr, 0, arraynullsptr, src_offset, typlen, typbyval, typalign); mda_get_prod(ndim, dim, prod); mda_get_offset_values(ndim, dist, prod, span); @@ -2907,131 +3628,197 @@ array_slice_size(int ndim, int *dim, int *lb, char *arraydataptr, j = ndim - 1; do { - ptr = array_seek(ptr, dist[j], - typlen, typbyval, typalign); - inc = att_addlength(0, typlen, PointerGetDatum(ptr)); - inc = att_align(inc, typalign); - ptr += inc; - count += inc; + if (dist[j]) + { + ptr = array_seek(ptr, src_offset, arraynullsptr, dist[j], + typlen, typbyval, typalign); + src_offset += dist[j]; + } + if (!array_get_isnull(arraynullsptr, src_offset)) + { + inc = att_addlength(0, typlen, PointerGetDatum(ptr)); + inc = att_align(inc, typalign); + ptr += inc; + count += inc; + } + src_offset++; } while ((j = mda_next_tuple(ndim, indx, span)) != -1); return count; } /* - * Extract a slice of an array into consecutive elements at *destPtr. + * Extract a slice of an array into consecutive elements in the destination + * array. * - * We assume the caller has verified that the slice coordinates are valid - * and allocated enough storage at *destPtr. + * We assume the caller has verified that the slice coordinates are valid, + * allocated enough storage for the result, and initialized the header + * of the new array. */ static void -array_extract_slice(int ndim, +array_extract_slice(ArrayType *newarray, + int ndim, int *dim, int *lb, char *arraydataptr, + bits8 *arraynullsptr, int *st, int *endp, - char *destPtr, int typlen, bool typbyval, char typalign) { - int st_pos, + char *destdataptr = ARR_DATA_PTR(newarray); + bits8 *destnullsptr = ARR_NULLBITMAP(newarray); + char *srcdataptr; + int src_offset, + dest_offset, prod[MAXDIM], span[MAXDIM], dist[MAXDIM], indx[MAXDIM]; - char *srcPtr; int i, j, inc; - st_pos = ArrayGetOffset(ndim, dim, lb, st); - srcPtr = array_seek(arraydataptr, st_pos, + src_offset = ArrayGetOffset(ndim, dim, lb, st); + srcdataptr = array_seek(arraydataptr, 0, arraynullsptr, src_offset, typlen, typbyval, typalign); mda_get_prod(ndim, dim, prod); mda_get_range(ndim, span, st, endp); mda_get_offset_values(ndim, dist, prod, span); for (i = 0; i < ndim; i++) indx[i] = 0; + dest_offset = 0; j = ndim - 1; do { - srcPtr = array_seek(srcPtr, dist[j], - typlen, typbyval, typalign); - inc = array_copy(destPtr, 1, srcPtr, + if (dist[j]) + { + /* skip unwanted elements */ + srcdataptr = array_seek(srcdataptr, src_offset, arraynullsptr, + dist[j], + typlen, typbyval, typalign); + src_offset += dist[j]; + } + inc = array_copy(destdataptr, 1, + srcdataptr, src_offset, arraynullsptr, typlen, typbyval, typalign); - destPtr += inc; - srcPtr += inc; + if (destnullsptr) + array_bitmap_copy(destnullsptr, dest_offset, + arraynullsptr, src_offset, + 1); + destdataptr += inc; + srcdataptr += inc; + src_offset++; + dest_offset++; } while ((j = mda_next_tuple(ndim, indx, span)) != -1); } /* * Insert a slice into an array. * - * ndim/dim/lb are dimensions of the dest array, which has data area - * starting at origPtr. A new array with those same dimensions is to - * be constructed; its data area starts at destPtr. + * ndim/dim[]/lb[] are dimensions of the original array. A new array with + * those same dimensions is to be constructed. destArray must already + * have been allocated and its header initialized. * - * Elements within the slice volume are taken from consecutive locations - * at srcPtr; elements outside it are copied from origPtr. + * st[]/endp[] identify the slice to be replaced. Elements within the slice + * volume are taken from consecutive elements of the srcArray; elements + * outside it are copied from origArray. * - * We assume the caller has verified that the slice coordinates are valid - * and allocated enough storage at *destPtr. + * We assume the caller has verified that the slice coordinates are valid. */ static void -array_insert_slice(int ndim, +array_insert_slice(ArrayType *destArray, + ArrayType *origArray, + ArrayType *srcArray, + int ndim, int *dim, int *lb, - char *origPtr, - int origdatasize, - char *destPtr, int *st, int *endp, - char *srcPtr, int typlen, bool typbyval, char typalign) { - int st_pos, + char *destPtr = ARR_DATA_PTR(destArray); + char *origPtr = ARR_DATA_PTR(origArray); + char *srcPtr = ARR_DATA_PTR(srcArray); + bits8 *destBitmap = ARR_NULLBITMAP(destArray); + bits8 *origBitmap = ARR_NULLBITMAP(origArray); + bits8 *srcBitmap = ARR_NULLBITMAP(srcArray); + int orignitems = ArrayGetNItems(ARR_NDIM(origArray), + ARR_DIMS(origArray)); + int dest_offset, + orig_offset, + src_offset, prod[MAXDIM], span[MAXDIM], dist[MAXDIM], indx[MAXDIM]; - char *origEndpoint = origPtr + origdatasize; int i, j, inc; - st_pos = ArrayGetOffset(ndim, dim, lb, st); - inc = array_copy(destPtr, st_pos, origPtr, + dest_offset = ArrayGetOffset(ndim, dim, lb, st); + /* copy items before the slice start */ + inc = array_copy(destPtr, dest_offset, + origPtr, 0, origBitmap, typlen, typbyval, typalign); destPtr += inc; origPtr += inc; + if (destBitmap) + array_bitmap_copy(destBitmap, 0, origBitmap, 0, dest_offset); + orig_offset = dest_offset; mda_get_prod(ndim, dim, prod); mda_get_range(ndim, span, st, endp); mda_get_offset_values(ndim, dist, prod, span); for (i = 0; i < ndim; i++) indx[i] = 0; + src_offset = 0; j = ndim - 1; do { /* Copy/advance over elements between here and next part of slice */ - inc = array_copy(destPtr, dist[j], origPtr, - typlen, typbyval, typalign); - destPtr += inc; - origPtr += inc; + if (dist[j]) + { + inc = array_copy(destPtr, dist[j], + origPtr, orig_offset, origBitmap, + typlen, typbyval, typalign); + destPtr += inc; + origPtr += inc; + if (destBitmap) + array_bitmap_copy(destBitmap, dest_offset, + origBitmap, orig_offset, + dist[j]); + dest_offset += dist[j]; + orig_offset += dist[j]; + } /* Copy new element at this slice position */ - inc = array_copy(destPtr, 1, srcPtr, + inc = array_copy(destPtr, 1, + srcPtr, src_offset, srcBitmap, typlen, typbyval, typalign); + if (destBitmap) + array_bitmap_copy(destBitmap, dest_offset, + srcBitmap, src_offset, + 1); destPtr += inc; srcPtr += inc; + dest_offset++; + src_offset++; /* Advance over old element at this slice position */ - origPtr = array_seek(origPtr, 1, + origPtr = array_seek(origPtr, orig_offset, origBitmap, 1, typlen, typbyval, typalign); + orig_offset++; } while ((j = mda_next_tuple(ndim, indx, span)) != -1); /* don't miss any data at the end */ - memcpy(destPtr, origPtr, origEndpoint - origPtr); + array_copy(destPtr, orignitems - orig_offset, + origPtr, orig_offset, origBitmap, + typlen, typbyval, typalign); + if (destBitmap) + array_bitmap_copy(destBitmap, dest_offset, + origBitmap, orig_offset, + orignitems - orig_offset); } /* @@ -3280,6 +4067,8 @@ accumArrayResult(ArrayBuildState *astate, astate->mcontext = arr_context; astate->dvalues = (Datum *) palloc(ARRAY_ELEMS_CHUNKSIZE * sizeof(Datum)); + astate->dnulls = (bool *) + palloc(ARRAY_ELEMS_CHUNKSIZE * sizeof(bool)); astate->nelems = 0; astate->element_type = element_type; get_typlenbyvalalign(element_type, @@ -3291,21 +4080,25 @@ accumArrayResult(ArrayBuildState *astate, { oldcontext = MemoryContextSwitchTo(astate->mcontext); Assert(astate->element_type == element_type); - /* enlarge dvalues[] if needed */ + /* enlarge dvalues[]/dnulls[] if needed */ if ((astate->nelems % ARRAY_ELEMS_CHUNKSIZE) == 0) + { astate->dvalues = (Datum *) repalloc(astate->dvalues, (astate->nelems + ARRAY_ELEMS_CHUNKSIZE) * sizeof(Datum)); + astate->dnulls = (bool *) + repalloc(astate->dnulls, + (astate->nelems + ARRAY_ELEMS_CHUNKSIZE) * sizeof(bool)); + } } - if (disnull) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("null array elements not supported"))); - /* Use datumCopy to ensure pass-by-ref stuff is copied into mcontext */ - astate->dvalues[astate->nelems++] = - datumCopy(dvalue, astate->typbyval, astate->typlen); + if (!disnull && !astate->typbyval) + dvalue = datumCopy(dvalue, astate->typbyval, astate->typlen); + + astate->dvalues[astate->nelems] = dvalue; + astate->dnulls[astate->nelems] = disnull; + astate->nelems++; MemoryContextSwitchTo(oldcontext); @@ -3354,6 +4147,7 @@ makeMdArrayResult(ArrayBuildState *astate, oldcontext = MemoryContextSwitchTo(rcontext); result = construct_md_array(astate->dvalues, + astate->dnulls, ndims, dims, lbs, |