diff options
Diffstat (limited to 'src/backend/access/spgist/spgutils.c')
-rw-r--r-- | src/backend/access/spgist/spgutils.c | 284 |
1 files changed, 232 insertions, 52 deletions
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index f2ba72b5aa3..72cbde7e0b6 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -19,6 +19,7 @@ #include "access/htup_details.h" #include "access/reloptions.h" #include "access/spgist_private.h" +#include "access/toast_compression.h" #include "access/transam.h" #include "access/xact.h" #include "catalog/pg_amop.h" @@ -58,7 +59,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; - amroutine->amcaninclude = false; + amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amparallelvacuumoptions = VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP; @@ -154,8 +155,19 @@ GetIndexInputType(Relation index, AttrNumber indexcol) static void fillTypeDesc(SpGistTypeDesc *desc, Oid type) { + HeapTuple tp; + Form_pg_type typtup; + desc->type = type; - get_typlenbyval(type, &desc->attlen, &desc->attbyval); + tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for type %u", type); + typtup = (Form_pg_type) GETSTRUCT(tp); + desc->attlen = typtup->typlen; + desc->attbyval = typtup->typbyval; + desc->attstorage = typtup->typstorage; + desc->attalign = typtup->typalign; + ReleaseSysCache(tp); } /* @@ -178,22 +190,23 @@ spgGetCache(Relation index) cache = MemoryContextAllocZero(index->rd_indexcxt, sizeof(SpGistCache)); - /* SPGiST doesn't support multi-column indexes */ - Assert(index->rd_att->natts == 1); + /* SPGiST must have one key column and can also have INCLUDE columns */ + Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1); + Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS); /* - * Get the actual (well, nominal) data type of the column being - * indexed. We pass this to the opclass config function so that - * polymorphic opclasses are possible. + * Get the actual (well, nominal) data type of the key column. We + * pass this to the opclass config function so that polymorphic + * opclasses are possible. */ - atttype = GetIndexInputType(index, 1); + atttype = GetIndexInputType(index, spgKeyColumn + 1); /* Call the config function to get config info for the opclass */ in.attType = atttype; procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC); FunctionCall2Coll(procinfo, - index->rd_indcollation[0], + index->rd_indcollation[spgKeyColumn], PointerGetDatum(&in), PointerGetDatum(&cache->config)); @@ -206,7 +219,7 @@ spgGetCache(Relation index) */ if (!OidIsValid(cache->config.leafType)) cache->config.leafType = - TupleDescAttr(RelationGetDescr(index), 0)->atttypid; + TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid; /* Get the information we need about each relevant datatype */ fillTypeDesc(&cache->attType, atttype); @@ -254,12 +267,60 @@ spgGetCache(Relation index) return cache; } +/* + * Compute a tuple descriptor for leaf tuples or index-only-scan result tuples. + * + * We can use the relcache's tupdesc as-is in many cases, and it's always + * OK so far as any INCLUDE columns are concerned. However, the entry for + * the key column has to match leafType in the first case or attType in the + * second case. While the relcache's tupdesc *should* show leafType, this + * might not hold for legacy user-defined opclasses, since before v14 they + * were not allowed to declare their true storage type in CREATE OPCLASS. + * Also, attType can be different from what is in the relcache. + * + * This function gives back either a pointer to the relcache's tupdesc + * if that is suitable, or a palloc'd copy that's been adjusted to match + * the specified key column type. We can avoid doing any catalog lookups + * here by insisting that the caller pass an SpGistTypeDesc not just an OID. + */ +TupleDesc +getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType) +{ + TupleDesc outTupDesc; + Form_pg_attribute att; + + if (keyType->type == + TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid) + outTupDesc = RelationGetDescr(index); + else + { + outTupDesc = CreateTupleDescCopy(RelationGetDescr(index)); + att = TupleDescAttr(outTupDesc, spgKeyColumn); + /* It's sufficient to update the type-dependent fields of the column */ + att->atttypid = keyType->type; + att->atttypmod = -1; + att->attlen = keyType->attlen; + att->attbyval = keyType->attbyval; + att->attalign = keyType->attalign; + att->attstorage = keyType->attstorage; + /* We shouldn't need to bother with making these valid: */ + att->attcollation = InvalidOid; + att->attcompression = InvalidCompressionMethod; + /* In case we changed typlen, we'd better reset following offsets */ + for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++) + TupleDescAttr(outTupDesc, i)->attcacheoff = -1; + } + return outTupDesc; +} + /* Initialize SpGistState for working with the given index */ void initSpGistState(SpGistState *state, Relation index) { SpGistCache *cache; + state->index = index; + /* Get cached static information about index */ cache = spgGetCache(index); @@ -269,6 +330,9 @@ initSpGistState(SpGistState *state, Relation index) state->attPrefixType = cache->attPrefixType; state->attLabelType = cache->attLabelType; + /* Ensure we have a valid descriptor for leaf tuples */ + state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType); + /* Make workspace for constructing dead tuples */ state->deadTupleStorage = palloc0(SGDTSIZE); @@ -697,24 +761,6 @@ SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum) } /* - * Get the space needed to store a non-null datum of the indicated type - * in a leaf tuple. This is just the usual storage space for the type, - * but rounded up to a MAXALIGN boundary. - */ -unsigned int -SpGistGetLeafTypeSize(SpGistTypeDesc *att, Datum datum) -{ - unsigned int size; - - if (att->attlen > 0) - size = att->attlen; - else - size = VARSIZE_ANY(datum); - - return MAXALIGN(size); -} - -/* * Copy the given non-null datum to *target, in the inner-tuple case */ static void @@ -734,42 +780,111 @@ memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum) } /* - * Copy the given non-null datum to *target, in the leaf-tuple case + * Compute space required for a leaf tuple holding the given data. + * + * This must match the size-calculation portion of spgFormLeafTuple. */ -static void -memcpyLeafDatum(void *target, SpGistTypeDesc *att, Datum datum) +Size +SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, + Datum *datums, bool *isnulls) { - unsigned int size; + Size size; + Size data_size; + bool needs_null_mask = false; + int natts = tupleDescriptor->natts; - if (att->attbyval) - { - store_att_byval(target, datum, att->attlen); - } - else + /* + * Decide whether we need a nulls bitmask. + * + * If there is only a key attribute (natts == 1), never use a bitmask, for + * compatibility with the pre-v14 layout of leaf tuples. Otherwise, we + * need one if any attribute is null. + */ + if (natts > 1) { - size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum); - memcpy(target, DatumGetPointer(datum), size); + for (int i = 0; i < natts; i++) + { + if (isnulls[i]) + { + needs_null_mask = true; + break; + } + } } + + /* + * Calculate size of the data part; same as for heap tuples. + */ + data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls); + + /* + * Compute total size. + */ + size = SGLTHDRSZ(needs_null_mask); + size += data_size; + size = MAXALIGN(size); + + /* + * Ensure that we can replace the tuple with a dead tuple later. This test + * is unnecessary when there are any non-null attributes, but be safe. + */ + if (size < SGDTSIZE) + size = SGDTSIZE; + + return size; } /* - * Construct a leaf tuple containing the given heap TID and datum value + * Construct a leaf tuple containing the given heap TID and datum values */ SpGistLeafTuple spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, - Datum datum, bool isnull) + Datum *datums, bool *isnulls) { SpGistLeafTuple tup; - unsigned int size; + TupleDesc tupleDescriptor = state->leafTupDesc; + Size size; + Size hoff; + Size data_size; + bool needs_null_mask = false; + int natts = tupleDescriptor->natts; + char *tp; /* ptr to tuple data */ + uint16 tupmask = 0; /* unused heap_fill_tuple output */ - /* compute space needed (note result is already maxaligned) */ - size = SGLTHDRSZ; - if (!isnull) - size += SpGistGetLeafTypeSize(&state->attLeafType, datum); + /* + * Decide whether we need a nulls bitmask. + * + * If there is only a key attribute (natts == 1), never use a bitmask, for + * compatibility with the pre-v14 layout of leaf tuples. Otherwise, we + * need one if any attribute is null. + */ + if (natts > 1) + { + for (int i = 0; i < natts; i++) + { + if (isnulls[i]) + { + needs_null_mask = true; + break; + } + } + } /* - * Ensure that we can replace the tuple with a dead tuple later. This - * test is unnecessary when !isnull, but let's be safe. + * Calculate size of the data part; same as for heap tuples. + */ + data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls); + + /* + * Compute total size. + */ + hoff = SGLTHDRSZ(needs_null_mask); + size = hoff + data_size; + size = MAXALIGN(size); + + /* + * Ensure that we can replace the tuple with a dead tuple later. This test + * is unnecessary when there are any non-null attributes, but be safe. */ if (size < SGDTSIZE) size = SGDTSIZE; @@ -778,10 +893,29 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, tup = (SpGistLeafTuple) palloc0(size); tup->size = size; - tup->nextOffset = InvalidOffsetNumber; + SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber); tup->heapPtr = *heapPtr; - if (!isnull) - memcpyLeafDatum(SGLTDATAPTR(tup), &state->attLeafType, datum); + + tp = (char *) tup + hoff; + + if (needs_null_mask) + { + bits8 *bp; /* ptr to null bitmap in tuple */ + + /* Set nullmask presence bit in SpGistLeafTuple header */ + SGLT_SET_HASNULLMASK(tup, true); + /* Fill the data area and null mask */ + bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData)); + heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size, + &tupmask, bp); + } + else if (natts > 1 || !isnulls[spgKeyColumn]) + { + /* Fill data area only */ + heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size, + &tupmask, (bits8 *) NULL); + } + /* otherwise we have no data, nor a bitmap, to fill */ return tup; } @@ -925,7 +1059,7 @@ spgFormDeadTuple(SpGistState *state, int tupstate, tuple->tupstate = tupstate; tuple->size = SGDTSIZE; - tuple->nextOffset = InvalidOffsetNumber; + SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber); if (tupstate == SPGIST_REDIRECT) { @@ -943,6 +1077,52 @@ spgFormDeadTuple(SpGistState *state, int tupstate, } /* + * Convert an SPGiST leaf tuple into Datum/isnull arrays. + * + * The caller must allocate sufficient storage for the output arrays. + * (INDEX_MAX_KEYS entries should be enough.) + */ +void +spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, + Datum *datums, bool *isnulls, bool keyColumnIsNull) +{ + bool hasNullsMask = SGLT_GET_HASNULLMASK(tup); + char *tp; /* ptr to tuple data */ + bits8 *bp; /* ptr to null bitmap in tuple */ + + if (keyColumnIsNull && tupleDescriptor->natts == 1) + { + /* + * Trivial case: there is only the key attribute and we're in a nulls + * tree. The hasNullsMask bit in the tuple header should not be set + * (and thus we can't use index_deform_tuple_internal), but + * nonetheless the result is NULL. + * + * Note: currently this is dead code, because noplace calls this when + * there is only the key attribute. But we should cover the case. + */ + Assert(!hasNullsMask); + + datums[spgKeyColumn] = (Datum) 0; + isnulls[spgKeyColumn] = true; + return; + } + + tp = (char *) tup + SGLTHDRSZ(hasNullsMask); + bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData)); + + index_deform_tuple_internal(tupleDescriptor, + datums, isnulls, + tp, bp, hasNullsMask); + + /* + * Key column isnull value from the tuple should be consistent with + * keyColumnIsNull flag from the caller. + */ + Assert(keyColumnIsNull == isnulls[spgKeyColumn]); +} + +/* * Extract the label datums of the nodes within innerTuple * * Returns NULL if label datums are NULLs |