diff options
Diffstat (limited to 'src/backend/access/spgist')
-rw-r--r-- | src/backend/access/spgist/Makefile | 3 | ||||
-rw-r--r-- | src/backend/access/spgist/README | 6 | ||||
-rw-r--r-- | src/backend/access/spgist/spgkdtreeproc.c | 82 | ||||
-rw-r--r-- | src/backend/access/spgist/spgproc.c | 88 | ||||
-rw-r--r-- | src/backend/access/spgist/spgquadtreeproc.c | 115 | ||||
-rw-r--r-- | src/backend/access/spgist/spgscan.c | 875 | ||||
-rw-r--r-- | src/backend/access/spgist/spgutils.c | 92 | ||||
-rw-r--r-- | src/backend/access/spgist/spgvalidate.c | 27 |
8 files changed, 980 insertions, 308 deletions
diff --git a/src/backend/access/spgist/Makefile b/src/backend/access/spgist/Makefile index 14948a531ee..5be3df59926 100644 --- a/src/backend/access/spgist/Makefile +++ b/src/backend/access/spgist/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o spgvalidate.o \ spgdoinsert.o spgxlog.o \ - spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o + spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o \ + spgproc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README index 09ab21af264..b55b0738320 100644 --- a/src/backend/access/spgist/README +++ b/src/backend/access/spgist/README @@ -41,7 +41,11 @@ contain exactly one inner tuple. When the search traversal algorithm reaches an inner tuple, it chooses a set of nodes to continue tree traverse in depth. If it reaches a leaf page it -scans a list of leaf tuples to find the ones that match the query. +scans a list of leaf tuples to find the ones that match the query. SP-GiST +also supports ordered (nearest-neighbor) searches - that is during scan pending +nodes are put into priority queue, so traversal is performed by the +closest-first model. + The insertion algorithm descends the tree similarly, except it must choose just one node to descend to from each inner tuple. Insertion might also have diff --git a/src/backend/access/spgist/spgkdtreeproc.c b/src/backend/access/spgist/spgkdtreeproc.c index 556f3a4e076..105fc72c7a2 100644 --- a/src/backend/access/spgist/spgkdtreeproc.c +++ b/src/backend/access/spgist/spgkdtreeproc.c @@ -16,9 +16,11 @@ #include "postgres.h" #include "access/spgist.h" +#include "access/spgist_private.h" #include "access/stratnum.h" #include "catalog/pg_type.h" #include "utils/builtins.h" +#include "utils/float.h" #include "utils/geo_decls.h" @@ -162,6 +164,7 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS) double coord; int which; int i; + BOX bboxes[2]; Assert(in->hasPrefix); coord = DatumGetFloat8(in->prefixDatum); @@ -248,12 +251,87 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS) } /* We must descend into the children identified by which */ - out->nodeNumbers = (int *) palloc(sizeof(int) * 2); out->nNodes = 0; + + /* Fast-path for no matching children */ + if (!which) + PG_RETURN_VOID(); + + out->nodeNumbers = (int *) palloc(sizeof(int) * 2); + + /* + * When ordering scan keys are specified, we've to calculate distance for + * them. In order to do that, we need calculate bounding boxes for both + * children nodes. Calculation of those bounding boxes on non-zero level + * require knowledge of bounding box of upper node. So, we save bounding + * boxes to traversalValues. + */ + if (in->norderbys > 0) + { + BOX infArea; + BOX *area; + + out->distances = (double **) palloc(sizeof(double *) * in->nNodes); + out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes); + + if (in->level == 0) + { + float8 inf = get_float8_infinity(); + + infArea.high.x = inf; + infArea.high.y = inf; + infArea.low.x = -inf; + infArea.low.y = -inf; + area = &infArea; + } + else + { + area = (BOX *) in->traversalValue; + Assert(area); + } + + bboxes[0].low = area->low; + bboxes[1].high = area->high; + + if (in->level % 2) + { + /* split box by x */ + bboxes[0].high.x = bboxes[1].low.x = coord; + bboxes[0].high.y = area->high.y; + bboxes[1].low.y = area->low.y; + } + else + { + /* split box by y */ + bboxes[0].high.y = bboxes[1].low.y = coord; + bboxes[0].high.x = area->high.x; + bboxes[1].low.x = area->low.x; + } + } + for (i = 1; i <= 2; i++) { if (which & (1 << i)) - out->nodeNumbers[out->nNodes++] = i - 1; + { + out->nodeNumbers[out->nNodes] = i - 1; + + if (in->norderbys > 0) + { + MemoryContext oldCtx = MemoryContextSwitchTo( + in->traversalMemoryContext); + BOX *box = box_copy(&bboxes[i - 1]); + + MemoryContextSwitchTo(oldCtx); + + out->traversalValues[out->nNodes] = box; + + out->distances[out->nNodes] = spg_key_orderbys_distances( + BoxPGetDatum(box), false, + in->orderbys, in->norderbys); + } + + out->nNodes++; + } } /* Set up level increments, too */ diff --git a/src/backend/access/spgist/spgproc.c b/src/backend/access/spgist/spgproc.c new file mode 100644 index 00000000000..0bf80015e12 --- /dev/null +++ b/src/backend/access/spgist/spgproc.c @@ -0,0 +1,88 @@ +/*------------------------------------------------------------------------- + * + * spgproc.c + * Common supporting procedures for SP-GiST opclasses. + * + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgproc.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <math.h> + +#include "access/spgist_private.h" +#include "utils/builtins.h" +#include "utils/float.h" +#include "utils/geo_decls.h" + +#define point_point_distance(p1,p2) \ + DatumGetFloat8(DirectFunctionCall2(point_distance, \ + PointPGetDatum(p1), PointPGetDatum(p2))) + +/* Point-box distance in the assumption that box is aligned by axis */ +static double +point_box_distance(Point *point, BOX *box) +{ + double dx, + dy; + + if (isnan(point->x) || isnan(box->low.x) || + isnan(point->y) || isnan(box->low.y)) + return get_float8_nan(); + + if (point->x < box->low.x) + dx = box->low.x - point->x; + else if (point->x > box->high.x) + dx = point->x - box->high.x; + else + dx = 0.0; + + if (point->y < box->low.y) + dy = box->low.y - point->y; + else if (point->y > box->high.y) + dy = point->y - box->high.y; + else + dy = 0.0; + + return HYPOT(dx, dy); +} + +/* + * Returns distances from given key to array of ordering scan keys. Leaf key + * is expected to be point, non-leaf key is expected to be box. Scan key + * arguments are expected to be points. + */ +double * +spg_key_orderbys_distances(Datum key, bool isLeaf, + ScanKey orderbys, int norderbys) +{ + int sk_num; + double *distances = (double *) palloc(norderbys * sizeof(double)), + *distance = distances; + + for (sk_num = 0; sk_num < norderbys; ++sk_num, ++orderbys, ++distance) + { + Point *point = DatumGetPointP(orderbys->sk_argument); + + *distance = isLeaf ? point_point_distance(point, DatumGetPointP(key)) + : point_box_distance(point, DatumGetBoxP(key)); + } + + return distances; +} + +BOX * +box_copy(BOX *orig) +{ + BOX *result = palloc(sizeof(BOX)); + + *result = *orig; + return result; +} diff --git a/src/backend/access/spgist/spgquadtreeproc.c b/src/backend/access/spgist/spgquadtreeproc.c index 8700ff35731..dee438a307d 100644 --- a/src/backend/access/spgist/spgquadtreeproc.c +++ b/src/backend/access/spgist/spgquadtreeproc.c @@ -17,8 +17,10 @@ #include "access/spgist.h" #include "access/stratnum.h" +#include "access/spgist_private.h" #include "catalog/pg_type.h" #include "utils/builtins.h" +#include "utils/float.h" #include "utils/geo_decls.h" @@ -77,6 +79,38 @@ getQuadrant(Point *centroid, Point *tst) return 0; } +/* Returns bounding box of a given quadrant inside given bounding box */ +static BOX * +getQuadrantArea(BOX *bbox, Point *centroid, int quadrant) +{ + BOX *result = (BOX *) palloc(sizeof(BOX)); + + switch (quadrant) + { + case 1: + result->high = bbox->high; + result->low = *centroid; + break; + case 2: + result->high.x = bbox->high.x; + result->high.y = centroid->y; + result->low.x = centroid->x; + result->low.y = bbox->low.y; + break; + case 3: + result->high = *centroid; + result->low = bbox->low; + break; + case 4: + result->high.x = centroid->x; + result->high.y = bbox->high.y; + result->low.x = bbox->low.x; + result->low.y = centroid->y; + break; + } + + return result; +} Datum spg_quad_choose(PG_FUNCTION_ARGS) @@ -196,19 +230,68 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS) spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); Point *centroid; + BOX infbbox; + BOX *bbox = NULL; int which; int i; Assert(in->hasPrefix); centroid = DatumGetPointP(in->prefixDatum); + /* + * When ordering scan keys are specified, we've to calculate distance for + * them. In order to do that, we need calculate bounding boxes for all + * children nodes. Calculation of those bounding boxes on non-zero level + * require knowledge of bounding box of upper node. So, we save bounding + * boxes to traversalValues. + */ + if (in->norderbys > 0) + { + out->distances = (double **) palloc(sizeof(double *) * in->nNodes); + out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes); + + if (in->level == 0) + { + double inf = get_float8_infinity(); + + infbbox.high.x = inf; + infbbox.high.y = inf; + infbbox.low.x = -inf; + infbbox.low.y = -inf; + bbox = &infbbox; + } + else + { + bbox = in->traversalValue; + Assert(bbox); + } + } + if (in->allTheSame) { /* Report that all nodes should be visited */ out->nNodes = in->nNodes; out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes); for (i = 0; i < in->nNodes; i++) + { out->nodeNumbers[i] = i; + + if (in->norderbys > 0) + { + MemoryContext oldCtx = MemoryContextSwitchTo( + in->traversalMemoryContext); + + /* Use parent quadrant box as traversalValue */ + BOX *quadrant = box_copy(bbox); + + MemoryContextSwitchTo(oldCtx); + + out->traversalValues[i] = quadrant; + out->distances[i] = spg_key_orderbys_distances( + BoxPGetDatum(quadrant), false, + in->orderbys, in->norderbys); + } + } PG_RETURN_VOID(); } @@ -286,13 +369,37 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS) break; /* no need to consider remaining conditions */ } + out->levelAdds = palloc(sizeof(int) * 4); + for (i = 0; i < 4; ++i) + out->levelAdds[i] = 1; + /* We must descend into the quadrant(s) identified by which */ out->nodeNumbers = (int *) palloc(sizeof(int) * 4); out->nNodes = 0; + for (i = 1; i <= 4; i++) { if (which & (1 << i)) - out->nodeNumbers[out->nNodes++] = i - 1; + { + out->nodeNumbers[out->nNodes] = i - 1; + + if (in->norderbys > 0) + { + MemoryContext oldCtx = MemoryContextSwitchTo( + in->traversalMemoryContext); + BOX *quadrant = getQuadrantArea(bbox, centroid, i); + + MemoryContextSwitchTo(oldCtx); + + out->traversalValues[out->nNodes] = quadrant; + + out->distances[out->nNodes] = spg_key_orderbys_distances( + BoxPGetDatum(quadrant), false, + in->orderbys, in->norderbys); + } + + out->nNodes++; + } } PG_RETURN_VOID(); @@ -356,5 +463,11 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS) break; } + if (res && in->norderbys > 0) + /* ok, it passes -> let's compute the distances */ + out->distances = spg_key_orderbys_distances( + BoxPGetDatum(in->leafDatum), true, + in->orderbys, in->norderbys); + PG_RETURN_BOOL(res); } diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index 5260d5017d1..a63fde2c8af 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -15,64 +15,136 @@ #include "postgres.h" +#include "access/genam.h" #include "access/relscan.h" #include "access/spgist_private.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/datum.h" +#include "utils/float.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" - typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool isnull, bool recheck); + Datum leafValue, bool isNull, bool recheck, + bool recheckDistances, double *distances); -typedef struct ScanStackEntry +/* + * Pairing heap comparison function for the SpGistSearchItem queue. + * KNN-searches currently only support NULLS LAST. So, preserve this logic + * here. + */ +static int +pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a, + const pairingheap_node *b, void *arg) { - Datum reconstructedValue; /* value reconstructed from parent */ - void *traversalValue; /* opclass-specific traverse value */ - int level; /* level of items on this page */ - ItemPointerData ptr; /* block and offset to scan from */ -} ScanStackEntry; + const SpGistSearchItem *sa = (const SpGistSearchItem *) a; + const SpGistSearchItem *sb = (const SpGistSearchItem *) b; + SpGistScanOpaque so = (SpGistScanOpaque) arg; + int i; + + if (sa->isNull) + { + if (!sb->isNull) + return -1; + } + else if (sb->isNull) + { + return 1; + } + else + { + /* Order according to distance comparison */ + for (i = 0; i < so->numberOfOrderBys; i++) + { + if (isnan(sa->distances[i]) && isnan(sb->distances[i])) + continue; /* NaN == NaN */ + if (isnan(sa->distances[i])) + return -1; /* NaN > number */ + if (isnan(sb->distances[i])) + return 1; /* number < NaN */ + if (sa->distances[i] != sb->distances[i]) + return (sa->distances[i] < sb->distances[i]) ? 1 : -1; + } + } + /* Leaf items go before inner pages, to ensure a depth-first search */ + if (sa->isLeaf && !sb->isLeaf) + return 1; + if (!sa->isLeaf && sb->isLeaf) + return -1; + + return 0; +} -/* Free a ScanStackEntry */ static void -freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry) +spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem * item) { if (!so->state.attLeafType.attbyval && - DatumGetPointer(stackEntry->reconstructedValue) != NULL) - pfree(DatumGetPointer(stackEntry->reconstructedValue)); - if (stackEntry->traversalValue) - pfree(stackEntry->traversalValue); + DatumGetPointer(item->value) != NULL) + pfree(DatumGetPointer(item->value)); - pfree(stackEntry); + if (item->traversalValue) + pfree(item->traversalValue); + + pfree(item); } -/* Free the entire stack */ +/* + * Add SpGistSearchItem to queue + * + * Called in queue context + */ static void -freeScanStack(SpGistScanOpaque so) +spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem * item) { - ListCell *lc; + pairingheap_add(so->scanQueue, &item->phNode); +} - foreach(lc, so->scanStack) - { - freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc)); - } - list_free(so->scanStack); - so->scanStack = NIL; +static SpGistSearchItem * +spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances) +{ + /* allocate distance array only for non-NULL items */ + SpGistSearchItem *item = + palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfOrderBys)); + + item->isNull = isnull; + + if (!isnull && so->numberOfOrderBys > 0) + memcpy(item->distances, distances, + so->numberOfOrderBys * sizeof(double)); + + return item; +} + +static void +spgAddStartItem(SpGistScanOpaque so, bool isnull) +{ + SpGistSearchItem *startEntry = + spgAllocSearchItem(so, isnull, so->zeroDistances); + + ItemPointerSet(&startEntry->heapPtr, + isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO, + FirstOffsetNumber); + startEntry->isLeaf = false; + startEntry->level = 0; + startEntry->value = (Datum) 0; + startEntry->traversalValue = NULL; + startEntry->recheck = false; + startEntry->recheckDistances = false; + + spgAddSearchItemToQueue(so, startEntry); } /* - * Initialize scanStack to search the root page, resetting + * Initialize queue to search the root page, resetting * any previously active scan */ static void resetSpGistScanOpaque(SpGistScanOpaque so) { - ScanStackEntry *startEntry; - - freeScanStack(so); + MemoryContext oldCtx; /* * clear traversal context before proceeding to the next scan; this must @@ -81,20 +153,29 @@ resetSpGistScanOpaque(SpGistScanOpaque so) */ MemoryContextReset(so->traversalCxt); + oldCtx = MemoryContextSwitchTo(so->traversalCxt); + + /* initialize queue only for distance-ordered scans */ + so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so); + if (so->searchNulls) - { - /* Stack a work item to scan the null index entries */ - startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry)); - ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber); - so->scanStack = lappend(so->scanStack, startEntry); - } + /* Add a work item to scan the null index entries */ + spgAddStartItem(so, true); if (so->searchNonNulls) + /* Add a work item to scan the non-null index entries */ + spgAddStartItem(so, false); + + MemoryContextSwitchTo(oldCtx); + + if (so->numberOfOrderBys > 0) { - /* Stack a work item to scan the non-null index entries */ - startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry)); - ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber); - so->scanStack = lappend(so->scanStack, startEntry); + /* Must pfree distances to avoid memory leak */ + int i; + + for (i = 0; i < so->nPtrs; i++) + if (so->distances[i]) + pfree(so->distances[i]); } if (so->want_itup) @@ -129,6 +210,9 @@ spgPrepareScanKeys(IndexScanDesc scan) int nkeys; int i; + so->numberOfOrderBys = scan->numberOfOrderBys; + so->orderByData = scan->orderByData; + if (scan->numberOfKeys <= 0) { /* If no quals, whole-index scan is required */ @@ -189,8 +273,9 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) { IndexScanDesc scan; SpGistScanOpaque so; + int i; - scan = RelationGetIndexScan(rel, keysz, 0); + scan = RelationGetIndexScan(rel, keysz, orderbysz); so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData)); if (keysz > 0) @@ -198,6 +283,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) else so->keyData = NULL; initSpGistState(&so->state, scan->indexRelation); + so->tempCxt = AllocSetContextCreate(CurrentMemoryContext, "SP-GiST search temporary context", ALLOCSET_DEFAULT_SIZES); @@ -208,6 +294,32 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */ so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel); + if (scan->numberOfOrderBys > 0) + { + so->zeroDistances = palloc(sizeof(double) * scan->numberOfOrderBys); + so->infDistances = palloc(sizeof(double) * scan->numberOfOrderBys); + + for (i = 0; i < scan->numberOfOrderBys; i++) + { + so->zeroDistances[i] = 0.0; + so->infDistances[i] = get_float8_infinity(); + } + + scan->xs_orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys); + scan->xs_orderbynulls = palloc(sizeof(bool) * scan->numberOfOrderBys); + memset(scan->xs_orderbynulls, true, sizeof(bool) * scan->numberOfOrderBys); + } + + fmgr_info_copy(&so->innerConsistentFn, + index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC), + CurrentMemoryContext); + + fmgr_info_copy(&so->leafConsistentFn, + index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC), + CurrentMemoryContext); + + so->indexCollation = rel->rd_indcollation[0]; + scan->opaque = so; return scan; @@ -221,15 +333,42 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, /* copy scankeys into local storage */ if (scankey && scan->numberOfKeys > 0) - { memmove(scan->keyData, scankey, scan->numberOfKeys * sizeof(ScanKeyData)); + + if (orderbys && scan->numberOfOrderBys > 0) + { + int i; + + memmove(scan->orderByData, orderbys, + scan->numberOfOrderBys * sizeof(ScanKeyData)); + + so->orderByTypes = (Oid *) palloc(sizeof(Oid) * scan->numberOfOrderBys); + + for (i = 0; i < scan->numberOfOrderBys; i++) + { + ScanKey skey = &scan->orderByData[i]; + + /* + * Look up the datatype returned by the original ordering + * operator. SP-GiST always uses a float8 for the distance + * function, but the ordering operator could be anything else. + * + * XXX: The distance function is only allowed to be lossy if the + * ordering operator's result type is float4 or float8. Otherwise + * we don't know how to return the distance to the executor. But + * we cannot check that here, as we won't know if the distance + * function is lossy until it returns *recheck = true for the + * first time. + */ + so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid); + } } /* preprocess scankeys, set up the representation in *so */ spgPrepareScanKeys(scan); - /* set up starting stack entries */ + /* set up starting queue entries */ resetSpGistScanOpaque(so); } @@ -240,65 +379,332 @@ spgendscan(IndexScanDesc scan) MemoryContextDelete(so->tempCxt); MemoryContextDelete(so->traversalCxt); + + if (scan->numberOfOrderBys > 0) + { + pfree(so->zeroDistances); + pfree(so->infDistances); + } +} + +/* + * Leaf SpGistSearchItem constructor, called in queue context + */ +static SpGistSearchItem * +spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr, + Datum leafValue, bool recheck, bool recheckDistances, + bool isnull, double *distances) +{ + SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances); + + item->level = level; + item->heapPtr = *heapPtr; + /* copy value to queue cxt out of tmp cxt */ + item->value = isnull ? (Datum) 0 : + datumCopy(leafValue, so->state.attLeafType.attbyval, + so->state.attLeafType.attlen); + item->traversalValue = NULL; + item->isLeaf = true; + item->recheck = recheck; + item->recheckDistances = recheckDistances; + + return item; } /* * Test whether a leaf tuple satisfies all the scan keys * - * *leafValue is set to the reconstructed datum, if provided - * *recheck is set true if any of the operators are lossy + * *reportedSome is set to true if: + * the scan is not ordered AND the item satisfies the scankeys */ static bool -spgLeafTest(Relation index, SpGistScanOpaque so, +spgLeafTest(SpGistScanOpaque so, SpGistSearchItem * item, SpGistLeafTuple leafTuple, bool isnull, - int level, Datum reconstructedValue, - void *traversalValue, - Datum *leafValue, bool *recheck) + bool *reportedSome, storeRes_func storeRes) { + Datum leafValue; + double *distances; bool result; - Datum leafDatum; - spgLeafConsistentIn in; - spgLeafConsistentOut out; - FmgrInfo *procinfo; - MemoryContext oldCtx; + bool recheck; + bool recheckDistances; if (isnull) { /* Should not have arrived on a nulls page unless nulls are wanted */ Assert(so->searchNulls); - *leafValue = (Datum) 0; - *recheck = false; - return true; + leafValue = (Datum) 0; + distances = NULL; + recheck = false; + recheckDistances = false; + result = true; + } + else + { + spgLeafConsistentIn in; + spgLeafConsistentOut out; + + /* use temp context for calling leaf_consistent */ + MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt); + + in.scankeys = so->keyData; + in.nkeys = so->numberOfKeys; + in.orderbys = so->orderByData; + in.norderbys = so->numberOfOrderBys; + in.reconstructedValue = item->value; + in.traversalValue = item->traversalValue; + in.level = item->level; + in.returnData = so->want_itup; + in.leafDatum = SGLTDATUM(leafTuple, &so->state); + + out.leafValue = (Datum) 0; + out.recheck = false; + out.distances = NULL; + out.recheckDistances = false; + + result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn, + so->indexCollation, + PointerGetDatum(&in), + PointerGetDatum(&out))); + recheck = out.recheck; + recheckDistances = out.recheckDistances; + leafValue = out.leafValue; + distances = out.distances; + + MemoryContextSwitchTo(oldCxt); } - leafDatum = SGLTDATUM(leafTuple, &so->state); + if (result) + { + /* item passes the scankeys */ + if (so->numberOfOrderBys > 0) + { + /* the scan is ordered -> add the item to the queue */ + MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt); + SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level, + &leafTuple->heapPtr, + leafValue, + recheck, + recheckDistances, + isnull, + distances); + + spgAddSearchItemToQueue(so, heapItem); + + MemoryContextSwitchTo(oldCxt); + } + else + { + /* non-ordered scan, so report the item right away */ + Assert(!recheckDistances); + storeRes(so, &leafTuple->heapPtr, leafValue, isnull, + recheck, false, NULL); + *reportedSome = true; + } + } - /* use temp context for calling leaf_consistent */ - oldCtx = MemoryContextSwitchTo(so->tempCxt); + return result; +} - in.scankeys = so->keyData; - in.nkeys = so->numberOfKeys; - in.reconstructedValue = reconstructedValue; - in.traversalValue = traversalValue; - in.level = level; - in.returnData = so->want_itup; - in.leafDatum = leafDatum; +/* A bundle initializer for inner_consistent methods */ +static void +spgInitInnerConsistentIn(spgInnerConsistentIn *in, + SpGistScanOpaque so, + SpGistSearchItem * item, + SpGistInnerTuple innerTuple) +{ + in->scankeys = so->keyData; + in->orderbys = so->orderByData; + in->nkeys = so->numberOfKeys; + in->norderbys = so->numberOfOrderBys; + in->reconstructedValue = item->value; + in->traversalMemoryContext = so->traversalCxt; + in->traversalValue = item->traversalValue; + in->level = item->level; + in->returnData = so->want_itup; + in->allTheSame = innerTuple->allTheSame; + in->hasPrefix = (innerTuple->prefixSize > 0); + in->prefixDatum = SGITDATUM(innerTuple, &so->state); + in->nNodes = innerTuple->nNodes; + in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple); +} - out.leafValue = (Datum) 0; - out.recheck = false; +static SpGistSearchItem * +spgMakeInnerItem(SpGistScanOpaque so, + SpGistSearchItem * parentItem, + SpGistNodeTuple tuple, + spgInnerConsistentOut *out, int i, bool isnull, + double *distances) +{ + SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances); - procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC); - result = DatumGetBool(FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out))); + item->heapPtr = tuple->t_tid; + item->level = out->levelAdds ? parentItem->level + out->levelAdds[i] + : parentItem->level; - *leafValue = out.leafValue; - *recheck = out.recheck; + /* Must copy value out of temp context */ + item->value = out->reconstructedValues + ? datumCopy(out->reconstructedValues[i], + so->state.attLeafType.attbyval, + so->state.attLeafType.attlen) + : (Datum) 0; - MemoryContextSwitchTo(oldCtx); + /* + * Elements of out.traversalValues should be allocated in + * in.traversalMemoryContext, which is actually a long lived context of + * index scan. + */ + item->traversalValue = + out->traversalValues ? out->traversalValues[i] : NULL; - return result; + item->isLeaf = false; + item->recheck = false; + item->recheckDistances = false; + + return item; +} + +static void +spgInnerTest(SpGistScanOpaque so, SpGistSearchItem * item, + SpGistInnerTuple innerTuple, bool isnull) +{ + MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt); + spgInnerConsistentOut out; + int nNodes = innerTuple->nNodes; + int i; + + memset(&out, 0, sizeof(out)); + + if (!isnull) + { + spgInnerConsistentIn in; + + spgInitInnerConsistentIn(&in, so, item, innerTuple); + + /* use user-defined inner consistent method */ + FunctionCall2Coll(&so->innerConsistentFn, + so->indexCollation, + PointerGetDatum(&in), + PointerGetDatum(&out)); + } + else + { + /* force all children to be visited */ + out.nNodes = nNodes; + out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes); + for (i = 0; i < nNodes; i++) + out.nodeNumbers[i] = i; + } + + /* If allTheSame, they should all or none of them match */ + if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes) + elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple"); + + if (out.nNodes) + { + /* collect node pointers */ + SpGistNodeTuple node; + SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc( + sizeof(SpGistNodeTuple) * nNodes); + + SGITITERATE(innerTuple, i, node) + { + nodes[i] = node; + } + + MemoryContextSwitchTo(so->traversalCxt); + + for (i = 0; i < out.nNodes; i++) + { + int nodeN = out.nodeNumbers[i]; + SpGistSearchItem *innerItem; + double *distances; + + Assert(nodeN >= 0 && nodeN < nNodes); + + node = nodes[nodeN]; + + if (!ItemPointerIsValid(&node->t_tid)) + continue; + + /* + * Use infinity distances if innerConsistent() failed to return + * them or if is a NULL item (their distances are really unused). + */ + distances = out.distances ? out.distances[i] : so->infDistances; + + innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull, + distances); + + spgAddSearchItemToQueue(so, innerItem); + } + } + + MemoryContextSwitchTo(oldCxt); +} + +/* Returns a next item in an (ordered) scan or null if the index is exhausted */ +static SpGistSearchItem * +spgGetNextQueueItem(SpGistScanOpaque so) +{ + if (pairingheap_is_empty(so->scanQueue)) + return NULL; /* Done when both heaps are empty */ + + /* Return item; caller is responsible to pfree it */ + return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue); +} + +enum SpGistSpecialOffsetNumbers +{ + SpGistBreakOffsetNumber = InvalidOffsetNumber, + SpGistRedirectOffsetNumber = MaxOffsetNumber + 1, + SpGistErrorOffsetNumber = MaxOffsetNumber + 2 +}; + +static OffsetNumber +spgTestLeafTuple(SpGistScanOpaque so, + SpGistSearchItem * item, + Page page, OffsetNumber offset, + bool isnull, bool isroot, + bool *reportedSome, + storeRes_func storeRes) +{ + SpGistLeafTuple leafTuple = (SpGistLeafTuple) + PageGetItem(page, PageGetItemId(page, offset)); + + if (leafTuple->tupstate != SPGIST_LIVE) + { + if (!isroot) /* all tuples on root should be live */ + { + if (leafTuple->tupstate == SPGIST_REDIRECT) + { + /* redirection tuple should be first in chain */ + Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr)); + /* transfer attention to redirect point */ + item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer; + Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO); + return SpGistRedirectOffsetNumber; + } + + if (leafTuple->tupstate == SPGIST_DEAD) + { + /* dead tuple should be first in chain */ + Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr)); + /* No live entries on this page */ + Assert(leafTuple->nextOffset == InvalidOffsetNumber); + return SpGistBreakOffsetNumber; + } + } + + /* We should not arrive at a placeholder */ + elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate); + return SpGistErrorOffsetNumber; + } + + Assert(ItemPointerIsValid(&leafTuple->heapPtr)); + + spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes); + + return leafTuple->nextOffset; } /* @@ -317,247 +723,101 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex, while (scanWholeIndex || !reportedSome) { - ScanStackEntry *stackEntry; - BlockNumber blkno; - OffsetNumber offset; - Page page; - bool isnull; - - /* Pull next to-do item from the list */ - if (so->scanStack == NIL) - break; /* there are no more pages to scan */ + SpGistSearchItem *item = spgGetNextQueueItem(so); - stackEntry = (ScanStackEntry *) linitial(so->scanStack); - so->scanStack = list_delete_first(so->scanStack); + if (item == NULL) + break; /* No more items in queue -> done */ redirect: /* Check for interrupts, just in case of infinite loop */ CHECK_FOR_INTERRUPTS(); - blkno = ItemPointerGetBlockNumber(&stackEntry->ptr); - offset = ItemPointerGetOffsetNumber(&stackEntry->ptr); - - if (buffer == InvalidBuffer) + if (item->isLeaf) { - buffer = ReadBuffer(index, blkno); - LockBuffer(buffer, BUFFER_LOCK_SHARE); + /* We store heap items in the queue only in case of ordered search */ + Assert(so->numberOfOrderBys > 0); + storeRes(so, &item->heapPtr, item->value, item->isNull, + item->recheck, item->recheckDistances, item->distances); + reportedSome = true; } - else if (blkno != BufferGetBlockNumber(buffer)) + else { - UnlockReleaseBuffer(buffer); - buffer = ReadBuffer(index, blkno); - LockBuffer(buffer, BUFFER_LOCK_SHARE); - } - /* else new pointer points to the same page, no work needed */ + BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr); + OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr); + Page page; + bool isnull; - page = BufferGetPage(buffer); - TestForOldSnapshot(snapshot, index, page); + if (buffer == InvalidBuffer) + { + buffer = ReadBuffer(index, blkno); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + } + else if (blkno != BufferGetBlockNumber(buffer)) + { + UnlockReleaseBuffer(buffer); + buffer = ReadBuffer(index, blkno); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + } - isnull = SpGistPageStoresNulls(page) ? true : false; + /* else new pointer points to the same page, no work needed */ - if (SpGistPageIsLeaf(page)) - { - SpGistLeafTuple leafTuple; - OffsetNumber max = PageGetMaxOffsetNumber(page); - Datum leafValue = (Datum) 0; - bool recheck = false; + page = BufferGetPage(buffer); + TestForOldSnapshot(snapshot, index, page); + + isnull = SpGistPageStoresNulls(page) ? true : false; - if (SpGistBlockIsRoot(blkno)) + if (SpGistPageIsLeaf(page)) { - /* When root is a leaf, examine all its tuples */ - for (offset = FirstOffsetNumber; offset <= max; offset++) - { - leafTuple = (SpGistLeafTuple) - PageGetItem(page, PageGetItemId(page, offset)); - if (leafTuple->tupstate != SPGIST_LIVE) - { - /* all tuples on root should be live */ - elog(ERROR, "unexpected SPGiST tuple state: %d", - leafTuple->tupstate); - } + /* Page is a leaf - that is, all it's tuples are heap items */ + OffsetNumber max = PageGetMaxOffsetNumber(page); - Assert(ItemPointerIsValid(&leafTuple->heapPtr)); - if (spgLeafTest(index, so, - leafTuple, isnull, - stackEntry->level, - stackEntry->reconstructedValue, - stackEntry->traversalValue, - &leafValue, - &recheck)) - { - storeRes(so, &leafTuple->heapPtr, - leafValue, isnull, recheck); - reportedSome = true; - } + if (SpGistBlockIsRoot(blkno)) + { + /* When root is a leaf, examine all its tuples */ + for (offset = FirstOffsetNumber; offset <= max; offset++) + (void) spgTestLeafTuple(so, item, page, offset, + isnull, true, + &reportedSome, storeRes); } - } - else - { - /* Normal case: just examine the chain we arrived at */ - while (offset != InvalidOffsetNumber) + else { - Assert(offset >= FirstOffsetNumber && offset <= max); - leafTuple = (SpGistLeafTuple) - PageGetItem(page, PageGetItemId(page, offset)); - if (leafTuple->tupstate != SPGIST_LIVE) + /* Normal case: just examine the chain we arrived at */ + while (offset != InvalidOffsetNumber) { - if (leafTuple->tupstate == SPGIST_REDIRECT) - { - /* redirection tuple should be first in chain */ - Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr)); - /* transfer attention to redirect point */ - stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer; - Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO); + Assert(offset >= FirstOffsetNumber && offset <= max); + offset = spgTestLeafTuple(so, item, page, offset, + isnull, false, + &reportedSome, storeRes); + if (offset == SpGistRedirectOffsetNumber) goto redirect; - } - if (leafTuple->tupstate == SPGIST_DEAD) - { - /* dead tuple should be first in chain */ - Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr)); - /* No live entries on this page */ - Assert(leafTuple->nextOffset == InvalidOffsetNumber); - break; - } - /* We should not arrive at a placeholder */ - elog(ERROR, "unexpected SPGiST tuple state: %d", - leafTuple->tupstate); - } - - Assert(ItemPointerIsValid(&leafTuple->heapPtr)); - if (spgLeafTest(index, so, - leafTuple, isnull, - stackEntry->level, - stackEntry->reconstructedValue, - stackEntry->traversalValue, - &leafValue, - &recheck)) - { - storeRes(so, &leafTuple->heapPtr, - leafValue, isnull, recheck); - reportedSome = true; } - - offset = leafTuple->nextOffset; - } - } - } - else /* page is inner */ - { - SpGistInnerTuple innerTuple; - spgInnerConsistentIn in; - spgInnerConsistentOut out; - FmgrInfo *procinfo; - SpGistNodeTuple *nodes; - SpGistNodeTuple node; - int i; - MemoryContext oldCtx; - - innerTuple = (SpGistInnerTuple) PageGetItem(page, - PageGetItemId(page, offset)); - - if (innerTuple->tupstate != SPGIST_LIVE) - { - if (innerTuple->tupstate == SPGIST_REDIRECT) - { - /* transfer attention to redirect point */ - stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer; - Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO); - goto redirect; } - elog(ERROR, "unexpected SPGiST tuple state: %d", - innerTuple->tupstate); - } - - /* use temp context for calling inner_consistent */ - oldCtx = MemoryContextSwitchTo(so->tempCxt); - - in.scankeys = so->keyData; - in.nkeys = so->numberOfKeys; - in.reconstructedValue = stackEntry->reconstructedValue; - in.traversalMemoryContext = so->traversalCxt; - in.traversalValue = stackEntry->traversalValue; - in.level = stackEntry->level; - in.returnData = so->want_itup; - in.allTheSame = innerTuple->allTheSame; - in.hasPrefix = (innerTuple->prefixSize > 0); - in.prefixDatum = SGITDATUM(innerTuple, &so->state); - in.nNodes = innerTuple->nNodes; - in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple); - - /* collect node pointers */ - nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes); - SGITITERATE(innerTuple, i, node) - { - nodes[i] = node; - } - - memset(&out, 0, sizeof(out)); - - if (!isnull) - { - /* use user-defined inner consistent method */ - procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC); - FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out)); - } - else - { - /* force all children to be visited */ - out.nNodes = in.nNodes; - out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes); - for (i = 0; i < in.nNodes; i++) - out.nodeNumbers[i] = i; } - - MemoryContextSwitchTo(oldCtx); - - /* If allTheSame, they should all or none of 'em match */ - if (innerTuple->allTheSame) - if (out.nNodes != 0 && out.nNodes != in.nNodes) - elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple"); - - for (i = 0; i < out.nNodes; i++) + else /* page is inner */ { - int nodeN = out.nodeNumbers[i]; + SpGistInnerTuple innerTuple = (SpGistInnerTuple) + PageGetItem(page, PageGetItemId(page, offset)); - Assert(nodeN >= 0 && nodeN < in.nNodes); - if (ItemPointerIsValid(&nodes[nodeN]->t_tid)) + if (innerTuple->tupstate != SPGIST_LIVE) { - ScanStackEntry *newEntry; - - /* Create new work item for this node */ - newEntry = palloc(sizeof(ScanStackEntry)); - newEntry->ptr = nodes[nodeN]->t_tid; - if (out.levelAdds) - newEntry->level = stackEntry->level + out.levelAdds[i]; - else - newEntry->level = stackEntry->level; - /* Must copy value out of temp context */ - if (out.reconstructedValues) - newEntry->reconstructedValue = - datumCopy(out.reconstructedValues[i], - so->state.attLeafType.attbyval, - so->state.attLeafType.attlen); - else - newEntry->reconstructedValue = (Datum) 0; - - /* - * Elements of out.traversalValues should be allocated in - * in.traversalMemoryContext, which is actually a long - * lived context of index scan. - */ - newEntry->traversalValue = (out.traversalValues) ? - out.traversalValues[i] : NULL; - - so->scanStack = lcons(newEntry, so->scanStack); + if (innerTuple->tupstate == SPGIST_REDIRECT) + { + /* transfer attention to redirect point */ + item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer; + Assert(ItemPointerGetBlockNumber(&item->heapPtr) != + SPGIST_METAPAGE_BLKNO); + goto redirect; + } + elog(ERROR, "unexpected SPGiST tuple state: %d", + innerTuple->tupstate); } + + spgInnerTest(so, item, innerTuple, isnull); } } - /* done with this scan stack entry */ - freeScanStackEntry(so, stackEntry); + /* done with this scan item */ + spgFreeSearchItem(so, item); /* clear temp context before proceeding to the next one */ MemoryContextReset(so->tempCxt); } @@ -566,11 +826,14 @@ redirect: UnlockReleaseBuffer(buffer); } + /* storeRes subroutine for getbitmap case */ static void storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool isnull, bool recheck) + Datum leafValue, bool isnull, bool recheck, bool recheckDistances, + double *distances) { + Assert(!recheckDistances && !distances); tbm_add_tuples(so->tbm, heapPtr, 1, recheck); so->ntids++; } @@ -594,11 +857,26 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm) /* storeRes subroutine for gettuple case */ static void storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool isnull, bool recheck) + Datum leafValue, bool isnull, bool recheck, bool recheckDistances, + double *distances) { Assert(so->nPtrs < MaxIndexTuplesPerPage); so->heapPtrs[so->nPtrs] = *heapPtr; so->recheck[so->nPtrs] = recheck; + so->recheckDistances[so->nPtrs] = recheckDistances; + + if (so->numberOfOrderBys > 0) + { + if (isnull) + so->distances[so->nPtrs] = NULL; + else + { + Size size = sizeof(double) * so->numberOfOrderBys; + + so->distances[so->nPtrs] = memcpy(palloc(size), distances, size); + } + } + if (so->want_itup) { /* @@ -627,14 +905,29 @@ spggettuple(IndexScanDesc scan, ScanDirection dir) { if (so->iPtr < so->nPtrs) { - /* continuing to return tuples from a leaf page */ + /* continuing to return reported tuples */ scan->xs_ctup.t_self = so->heapPtrs[so->iPtr]; scan->xs_recheck = so->recheck[so->iPtr]; scan->xs_hitup = so->reconTups[so->iPtr]; + + if (so->numberOfOrderBys > 0) + index_store_float8_orderby_distances(scan, so->orderByTypes, + so->distances[so->iPtr], + so->recheckDistances[so->iPtr]); so->iPtr++; return true; } + if (so->numberOfOrderBys > 0) + { + /* Must pfree distances to avoid memory leak */ + int i; + + for (i = 0; i < so->nPtrs; i++) + if (so->distances[i]) + pfree(so->distances[i]); + } + if (so->want_itup) { /* Must pfree reconstructed tuples to avoid memory leak */ diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 6d59b316ae3..9919e6f0d72 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -15,17 +15,26 @@ #include "postgres.h" +#include "access/amvalidate.h" +#include "access/htup_details.h" #include "access/reloptions.h" #include "access/spgist_private.h" #include "access/transam.h" #include "access/xact.h" +#include "catalog/pg_amop.h" +#include "optimizer/paths.h" #include "storage/bufmgr.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/catcache.h" #include "utils/index_selfuncs.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" +extern Expr *spgcanorderbyop(IndexOptInfo *index, + PathKey *pathkey, int pathkeyno, + Expr *orderby_clause, int *indexcol_p); /* * SP-GiST handler function: return IndexAmRoutine with access method parameters @@ -39,7 +48,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amstrategies = 0; amroutine->amsupport = SPGISTNProc; amroutine->amcanorder = false; - amroutine->amcanorderbyop = false; + amroutine->amcanorderbyop = true; amroutine->amcanbackward = false; amroutine->amcanunique = false; amroutine->amcanmulticol = false; @@ -61,7 +70,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amcanreturn = spgcanreturn; amroutine->amcostestimate = spgcostestimate; amroutine->amoptions = spgoptions; - amroutine->amproperty = NULL; + amroutine->amproperty = spgproperty; amroutine->amvalidate = spgvalidate; amroutine->ambeginscan = spgbeginscan; amroutine->amrescan = spgrescan; @@ -949,3 +958,82 @@ SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, return offnum; } + +/* + * spgproperty() -- Check boolean properties of indexes. + * + * This is optional for most AMs, but is required for SP-GiST because the core + * property code doesn't support AMPROP_DISTANCE_ORDERABLE. + */ +bool +spgproperty(Oid index_oid, int attno, + IndexAMProperty prop, const char *propname, + bool *res, bool *isnull) +{ + Oid opclass, + opfamily, + opcintype; + CatCList *catlist; + int i; + + /* Only answer column-level inquiries */ + if (attno == 0) + return false; + + switch (prop) + { + case AMPROP_DISTANCE_ORDERABLE: + break; + default: + return false; + } + + /* + * Currently, SP-GiST distance-ordered scans require that there be a + * distance operator in the opclass with the default types. So we assume + * that if such a operator exists, then there's a reason for it. + */ + + /* First we need to know the column's opclass. */ + opclass = get_index_column_opclass(index_oid, attno); + if (!OidIsValid(opclass)) + { + *isnull = true; + return true; + } + + /* Now look up the opclass family and input datatype. */ + if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype)) + { + *isnull = true; + return true; + } + + /* And now we can check whether the operator is provided. */ + catlist = SearchSysCacheList1(AMOPSTRATEGY, + ObjectIdGetDatum(opfamily)); + + *res = false; + + for (i = 0; i < catlist->n_members; i++) + { + HeapTuple amoptup = &catlist->members[i]->tuple; + Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup); + + if (amopform->amoppurpose == AMOP_ORDER && + (amopform->amoplefttype == opcintype || + amopform->amoprighttype == opcintype) && + opfamily_can_sort_type(amopform->amopsortfamily, + get_op_rettype(amopform->amopopr))) + { + *res = true; + break; + } + } + + ReleaseSysCacheList(catlist); + + *isnull = false; + + return true; +} diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c index c7acc7fc025..8ba6c26f0c6 100644 --- a/src/backend/access/spgist/spgvalidate.c +++ b/src/backend/access/spgist/spgvalidate.c @@ -187,6 +187,7 @@ spgvalidate(Oid opclassoid) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); + Oid op_rettype; /* TODO: Check that only allowed strategy numbers exist */ if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63) @@ -200,20 +201,26 @@ spgvalidate(Oid opclassoid) result = false; } - /* spgist doesn't support ORDER BY operators */ - if (oprform->amoppurpose != AMOP_SEARCH || - OidIsValid(oprform->amopsortfamily)) + /* spgist supports ORDER BY operators */ + if (oprform->amoppurpose != AMOP_SEARCH) { - ereport(INFO, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", - opfamilyname, "spgist", - format_operator(oprform->amopopr)))); - result = false; + /* ... and operator result must match the claimed btree opfamily */ + op_rettype = get_op_rettype(oprform->amopopr); + if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "spgist", + format_operator(oprform->amopopr)))); + result = false; + } } + else + op_rettype = BOOLOID; /* Check operator signature --- same for all spgist strategies */ - if (!check_amop_signature(oprform->amopopr, BOOLOID, + if (!check_amop_signature(oprform->amopopr, op_rettype, oprform->amoplefttype, oprform->amoprighttype)) { |