aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gist.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r--src/backend/access/gist/gist.c256
1 files changed, 148 insertions, 108 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index cb10cbc35bd..54ac45ee2fe 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.137 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -181,32 +181,13 @@ gistbuildCallback(Relation index,
{
GISTBuildState *buildstate = (GISTBuildState *) state;
IndexTuple itup;
- GISTENTRY tmpcentry;
- int i;
MemoryContext oldCtx;
- /* GiST cannot index tuples with leading NULLs */
- if (isnull[0])
- return;
-
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
- /* immediately compress keys to normalize */
- for (i = 0; i < buildstate->numindexattrs; i++)
- {
- if (isnull[i])
- values[i] = (Datum) 0;
- else
- {
- gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i],
- NULL, NULL, (OffsetNumber) 0,
- -1 /* size is currently bogus */ , TRUE, FALSE);
- values[i] = tmpcentry.key;
- }
- }
-
/* form an index tuple and point it at the heap tuple */
- itup = index_form_tuple(buildstate->giststate.tupdesc, values, isnull);
+ itup = gistFormTuple(&buildstate->giststate, index,
+ values, NULL /* size is currently bogus */, isnull);
itup->t_tid = htup->t_self;
/*
@@ -243,34 +224,16 @@ gistinsert(PG_FUNCTION_ARGS)
#endif
IndexTuple itup;
GISTSTATE giststate;
- GISTENTRY tmpentry;
- int i;
MemoryContext oldCtx;
MemoryContext insertCtx;
- /* GiST cannot index tuples with leading NULLs */
- if (isnull[0])
- PG_RETURN_BOOL(false);
-
insertCtx = createTempGistContext();
oldCtx = MemoryContextSwitchTo(insertCtx);
initGISTstate(&giststate, r);
- /* immediately compress keys to normalize */
- for (i = 0; i < r->rd_att->natts; i++)
- {
- if (isnull[i])
- values[i] = (Datum) 0;
- else
- {
- gistcentryinit(&giststate, i, &tmpentry, values[i],
- NULL, NULL, (OffsetNumber) 0,
- -1 /* size is currently bogus */ , TRUE, FALSE);
- values[i] = tmpentry.key;
- }
- }
- itup = index_form_tuple(giststate.tupdesc, values, isnull);
+ itup = gistFormTuple(&giststate, r,
+ values, NULL /* size is currently bogus */, isnull);
itup->t_tid = *ht_ctid;
gistdoinsert(r, itup, &giststate);
@@ -937,7 +900,147 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
}
/*
- * gistSplit -- split a page in the tree.
+ * simple split page
+ */
+static void
+gistSplitHalf(GIST_SPLITVEC *v, int len) {
+ int i;
+
+ v->spl_nright = v->spl_nleft = 0;
+ v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ for(i = 1; i <= len; i++)
+ if ( i<len/2 )
+ v->spl_right[ v->spl_nright++ ] = i;
+ else
+ v->spl_left[ v->spl_nleft++ ] = i;
+}
+
+/*
+ * if it was invalid tuple then we need special processing.
+ * We move all invalid tuples on right page.
+ *
+ * if there is no place on left page, gistSplit will be called one more
+ * time for left page.
+ *
+ * Normally, we never exec this code, but after crash replay it's possible
+ * to get 'invalid' tuples (probability is low enough)
+ */
+static void
+gistSplitByInvalid(GISTSTATE *giststate, GIST_SPLITVEC *v, IndexTuple *itup, int len) {
+ int i;
+ static OffsetNumber offInvTuples[ MaxOffsetNumber ];
+ int nOffInvTuples = 0;
+
+ for (i = 1; i <= len; i++)
+ if ( GistTupleIsInvalid(itup[i - 1]) )
+ offInvTuples[ nOffInvTuples++ ] = i;
+
+ if ( nOffInvTuples == len ) {
+ /* corner case, all tuples are invalid */
+ v->spl_rightvalid= v->spl_leftvalid = false;
+ gistSplitHalf( v, len );
+ } else {
+ GistSplitVec gsvp;
+
+ v->spl_right = offInvTuples;
+ v->spl_nright = nOffInvTuples;
+ v->spl_rightvalid = false;
+
+ v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ v->spl_nleft = 0;
+ for(i = 1; i <= len; i++)
+ if ( !GistTupleIsInvalid(itup[i - 1]) )
+ v->spl_left[ v->spl_nleft++ ] = i;
+ v->spl_leftvalid = true;
+
+ gsvp.idgrp = NULL;
+ gsvp.attrsize = v->spl_lattrsize;
+ gsvp.attr = v->spl_lattr;
+ gsvp.len = v->spl_nleft;
+ gsvp.entries = v->spl_left;
+ gsvp.isnull = v->spl_lisnull;
+
+ gistunionsubkeyvec(giststate, itup, &gsvp, 0);
+ }
+}
+
+/*
+ * trys to split page by attno key, in a case of null
+ * values move its to separate page.
+ */
+static void
+gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
+ GIST_SPLITVEC *v, GistEntryVector *entryvec, int attno) {
+ int i;
+ static OffsetNumber offNullTuples[ MaxOffsetNumber ];
+ int nOffNullTuples = 0;
+
+
+ for (i = 1; i <= len; i++) {
+ Datum datum;
+ bool IsNull;
+
+ if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) {
+ gistSplitByInvalid(giststate, v, itup, len);
+ return;
+ }
+
+ datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull);
+ gistdentryinit(giststate, attno, &(entryvec->vector[i]),
+ datum, r, page, i,
+ ATTSIZE(datum, giststate->tupdesc, attno+1, IsNull),
+ FALSE, IsNull);
+ if ( IsNull )
+ offNullTuples[ nOffNullTuples++ ] = i;
+ }
+
+ v->spl_leftvalid = v->spl_rightvalid = true;
+
+ if ( nOffNullTuples == len ) {
+ /*
+ * Corner case: All keys in attno column are null, we should try to
+ * by keys in next column. It all keys in all columns
+ * are NULL just split page half by half
+ */
+ v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
+ if ( attno+1 == r->rd_att->natts )
+ gistSplitHalf( v, len );
+ else
+ gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
+ } else if ( nOffNullTuples > 0 ) {
+ int j=0;
+
+ /*
+ * We don't want to mix NULLs and not-NULLs keys
+ * on one page, so move nulls to right page
+ */
+ v->spl_right = offNullTuples;
+ v->spl_nright = nOffNullTuples;
+ v->spl_risnull[attno] = TRUE;
+
+ v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+ v->spl_nleft = 0;
+ for(i = 1; i <= len; i++)
+ if ( j<v->spl_nright && offNullTuples[j] == i )
+ j++;
+ else
+ v->spl_left[ v->spl_nleft++ ] = i;
+
+ v->spl_idgrp = NULL;
+ gistunionsubkey(giststate, itup, v, 0);
+ } else {
+ /*
+ * all keys are not-null
+ */
+ gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate);
+ }
+}
+
+/*
+ * gistSplit -- split a page in the tree and fill struct
+ * used for XLOG and real writes buffers. Function is recursive, ie
+ * it will split page until keys will fit in every page.
*/
SplitedPageLayout *
gistSplit(Relation r,
@@ -951,77 +1054,14 @@ gistSplit(Relation r,
GIST_SPLITVEC v;
GistEntryVector *entryvec;
int i;
- OffsetNumber offInvTuples[ MaxOffsetNumber ];
- int nOffInvTuples = 0;
SplitedPageLayout *res = NULL;
/* generate the item array */
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
- for (i = 1; i <= len; i++)
- {
- Datum datum;
- bool IsNull;
-
- if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
- /* remember position of invalid tuple */
- offInvTuples[ nOffInvTuples++ ] = i;
-
- if ( nOffInvTuples > 0 )
- /* we can safely do not decompress other keys, because
- we will do splecial processing, but
- it's needed to find another invalid tuples */
- continue;
-
- datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
- gistdentryinit(giststate, 0, &(entryvec->vector[i]),
- datum, r, page, i,
- ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
- FALSE, IsNull);
- }
-
- /*
- * if it was invalid tuple then we need special processing.
- * We move all invalid tuples on right page.
- *
- * if there is no place on left page, gistSplit will be called one more
- * time for left page.
- *
- * Normally, we never exec this code, but after crash replay it's possible
- * to get 'invalid' tuples (probability is low enough)
- */
- if (nOffInvTuples > 0)
- {
- GistSplitVec gsvp;
-
- v.spl_right = offInvTuples;
- v.spl_nright = nOffInvTuples;
- v.spl_rightvalid = false;
-
- v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
- v.spl_nleft = 0;
- for(i = 1; i <= len; i++)
- if ( !GistTupleIsInvalid(itup[i - 1]) )
- v.spl_left[ v.spl_nleft++ ] = i;
- v.spl_leftvalid = true;
-
- gsvp.idgrp = NULL;
- gsvp.attrsize = v.spl_lattrsize;
- gsvp.attr = v.spl_lattr;
- gsvp.len = v.spl_nleft;
- gsvp.entries = v.spl_left;
- gsvp.isnull = v.spl_lisnull;
-
- gistunionsubkeyvec(giststate, itup, &gsvp, true);
- }
- else
- {
- /* there is no invalid tuples, so usial processing */
- gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
- v.spl_leftvalid = v.spl_rightvalid = true;
- }
-
+ gistSplitByKey(r, page, itup, len, giststate,
+ &v, entryvec, 0);
/* form left and right vector */
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));