diff options
Diffstat (limited to 'src/backend/access/spgist/spgscan.c')
-rw-r--r-- | src/backend/access/spgist/spgscan.c | 92 |
1 files changed, 65 insertions, 27 deletions
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index 24a13d89df8..e14b9fa573a 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -27,7 +27,8 @@ #include "utils/rel.h" typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool isNull, bool recheck, + Datum leafValue, bool isNull, + SpGistLeafTuple leafTuple, bool recheck, bool recheckDistances, double *distances); /* @@ -88,6 +89,9 @@ spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item) DatumGetPointer(item->value) != NULL) pfree(DatumGetPointer(item->value)); + if (item->leafTuple) + pfree(item->leafTuple); + if (item->traversalValue) pfree(item->traversalValue); @@ -133,6 +137,7 @@ spgAddStartItem(SpGistScanOpaque so, bool isnull) startEntry->isLeaf = false; startEntry->level = 0; startEntry->value = (Datum) 0; + startEntry->leafTuple = NULL; startEntry->traversalValue = NULL; startEntry->recheck = false; startEntry->recheckDistances = false; @@ -299,7 +304,6 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) { IndexScanDesc scan; SpGistScanOpaque so; - TupleDesc outTupDesc; int i; scan = RelationGetIndexScan(rel, keysz, orderbysz); @@ -319,20 +323,13 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) ALLOCSET_DEFAULT_SIZES); /* - * Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan. + * Set up reconTupDesc and xs_hitupdesc in case it's an index-only scan, + * making sure that the key column is shown as being of type attType. * (It's rather annoying to do this work when it might be wasted, but for * most opclasses we can re-use the index reldesc instead of making one.) */ - if (so->state.attType.type == - TupleDescAttr(RelationGetDescr(rel), 0)->atttypid) - outTupDesc = RelationGetDescr(rel); - else - { - outTupDesc = CreateTemplateTupleDesc(1); - TupleDescInitEntry(outTupDesc, 1, NULL, - so->state.attType.type, -1, 0); - } - so->indexTupDesc = scan->xs_hitupdesc = outTupDesc; + so->reconTupDesc = scan->xs_hitupdesc = + getSpGistTupleDesc(rel, &so->state.attType); /* Allocate various arrays needed for order-by scans */ if (scan->numberOfOrderBys > 0) @@ -435,6 +432,10 @@ spgendscan(IndexScanDesc scan) if (so->keyData) pfree(so->keyData); + if (so->state.leafTupDesc && + so->state.leafTupDesc != RelationGetDescr(so->state.index)) + FreeTupleDesc(so->state.leafTupDesc); + if (so->state.deadTupleStorage) pfree(so->state.deadTupleStorage); @@ -455,14 +456,14 @@ spgendscan(IndexScanDesc scan) * Leaf SpGistSearchItem constructor, called in queue context */ static SpGistSearchItem * -spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr, +spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple, Datum leafValue, bool recheck, bool recheckDistances, bool isnull, double *distances) { SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances); item->level = level; - item->heapPtr = *heapPtr; + item->heapPtr = leafTuple->heapPtr; /* * If we need the reconstructed value, copy it to queue cxt out of tmp @@ -471,11 +472,28 @@ spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr, * the wrong type. The correct leafValue type is attType not leafType. */ if (so->want_itup) + { item->value = isnull ? (Datum) 0 : datumCopy(leafValue, so->state.attType.attbyval, so->state.attType.attlen); + + /* + * If we're going to need to reconstruct INCLUDE attributes, store the + * whole leaf tuple so we can get the INCLUDE attributes out of it. + */ + if (so->state.leafTupDesc->natts > 1) + { + item->leafTuple = palloc(leafTuple->size); + memcpy(item->leafTuple, leafTuple, leafTuple->size); + } + else + item->leafTuple = NULL; + } else + { item->value = (Datum) 0; + item->leafTuple = NULL; + } item->traversalValue = NULL; item->isLeaf = true; item->recheck = recheck; @@ -555,7 +573,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, /* the scan is ordered -> add the item to the queue */ MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt); SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level, - &leafTuple->heapPtr, + leafTuple, leafValue, recheck, recheckDistances, @@ -571,7 +589,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, /* non-ordered scan, so report the item right away */ Assert(!recheckDistances); storeRes(so, &leafTuple->heapPtr, leafValue, isnull, - recheck, false, NULL); + leafTuple, recheck, false, NULL); *reportedSome = true; } } @@ -624,6 +642,8 @@ spgMakeInnerItem(SpGistScanOpaque so, so->state.attLeafType.attlen) : (Datum) 0; + item->leafTuple = NULL; + /* * Elements of out.traversalValues should be allocated in * in.traversalMemoryContext, which is actually a long lived context of @@ -765,7 +785,7 @@ spgTestLeafTuple(SpGistScanOpaque so, /* dead tuple should be first in chain */ Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr)); /* No live entries on this page */ - Assert(leafTuple->nextOffset == InvalidOffsetNumber); + Assert(SGLT_GET_NEXTOFFSET(leafTuple) == InvalidOffsetNumber); return SpGistBreakOffsetNumber; } } @@ -779,7 +799,7 @@ spgTestLeafTuple(SpGistScanOpaque so, spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes); - return leafTuple->nextOffset; + return SGLT_GET_NEXTOFFSET(leafTuple); } /* @@ -812,7 +832,8 @@ redirect: /* We store heap items in the queue only in case of ordered search */ Assert(so->numberOfNonNullOrderBys > 0); storeRes(so, &item->heapPtr, item->value, item->isNull, - item->recheck, item->recheckDistances, item->distances); + item->leafTuple, item->recheck, + item->recheckDistances, item->distances); reportedSome = true; } else @@ -905,8 +926,9 @@ redirect: /* storeRes subroutine for getbitmap case */ static void storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool isnull, bool recheck, bool recheckDistances, - double *distances) + Datum leafValue, bool isnull, + SpGistLeafTuple leafTuple, bool recheck, + bool recheckDistances, double *distances) { Assert(!recheckDistances && !distances); tbm_add_tuples(so->tbm, heapPtr, 1, recheck); @@ -932,8 +954,9 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm) /* storeRes subroutine for gettuple case */ static void storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool isnull, bool recheck, bool recheckDistances, - double *nonNullDistances) + Datum leafValue, bool isnull, + SpGistLeafTuple leafTuple, bool recheck, + bool recheckDistances, double *nonNullDistances) { Assert(so->nPtrs < MaxIndexTuplesPerPage); so->heapPtrs[so->nPtrs] = *heapPtr; @@ -978,9 +1001,20 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, * Reconstruct index data. We have to copy the datum out of the temp * context anyway, so we may as well create the tuple here. */ - so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc, - &leafValue, - &isnull); + Datum leafDatums[INDEX_MAX_KEYS]; + bool leafIsnulls[INDEX_MAX_KEYS]; + + /* We only need to deform the old tuple if it has INCLUDE attributes */ + if (so->state.leafTupDesc->natts > 1) + spgDeformLeafTuple(leafTuple, so->state.leafTupDesc, + leafDatums, leafIsnulls, isnull); + + leafDatums[spgKeyColumn] = leafValue; + leafIsnulls[spgKeyColumn] = isnull; + + so->reconTups[so->nPtrs] = heap_form_tuple(so->reconTupDesc, + leafDatums, + leafIsnulls); } so->nPtrs++; } @@ -1048,6 +1082,10 @@ spgcanreturn(Relation index, int attno) { SpGistCache *cache; + /* INCLUDE attributes can always be fetched for index-only scans */ + if (attno > 1) + return true; + /* We can do it if the opclass config function says so */ cache = spgGetCache(index); |