aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgdoinsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r--src/backend/access/spgist/spgdoinsert.c191
1 files changed, 131 insertions, 60 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 85704762a6f..5ddb6672c5c 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -200,7 +200,7 @@ saveNodeLink(Relation index, SPPageDesc *parent,
*/
static void
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
- SPPageDesc *current, SPPageDesc *parent, bool isNew)
+ SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
{
XLogRecData rdata[4];
spgxlogAddLeaf xlrec;
@@ -208,6 +208,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
xlrec.node = index->rd_node;
xlrec.blknoLeaf = current->blkno;
xlrec.newPage = isNew;
+ xlrec.storesNulls = isNulls;
/* these will be filled below as needed */
xlrec.offnumLeaf = InvalidOffsetNumber;
@@ -224,7 +225,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
- current->blkno == SPGIST_HEAD_BLKNO)
+ SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
@@ -337,7 +338,7 @@ checkSplitConditions(Relation index, SpGistState *state,
n = 0,
totalSize = 0;
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
{
/* return impossible values to force split */
*nToSplit = BLCKSZ;
@@ -386,7 +387,7 @@ checkSplitConditions(Relation index, SpGistState *state,
static void
moveLeafs(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
- SpGistLeafTuple newLeafTuple)
+ SpGistLeafTuple newLeafTuple, bool isNulls)
{
int i,
nDelete,
@@ -451,7 +452,8 @@ moveLeafs(Relation index, SpGistState *state,
}
/* Find a leaf page that will hold them */
- nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
+ nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
+ size, &xlrec.newPage);
npage = BufferGetPage(nbuf);
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
@@ -464,6 +466,7 @@ moveLeafs(Relation index, SpGistState *state,
xlrec.blknoDst = nblkno;
xlrec.nMoves = nDelete;
xlrec.replaceDead = replaceDead;
+ xlrec.storesNulls = isNulls;
xlrec.blknoParent = parent->blkno;
xlrec.offnumParent = parent->offnum;
@@ -584,6 +587,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
* If so, randomly divide the tuples into several nodes (all with the same
* label) and return TRUE to select allTheSame mode for this inner tuple.
*
+ * (This code is also used to forcibly select allTheSame mode for nulls.)
+ *
* If we know that the leaf tuples wouldn't all fit on one page, then we
* exclude the last tuple (which is the incoming new tuple that forced a split)
* from the check to see if more than one node is used. The reason for this
@@ -674,7 +679,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
static bool
doPickSplit(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
- SpGistLeafTuple newLeafTuple, int level, bool isNew)
+ SpGistLeafTuple newLeafTuple,
+ int level, bool isNulls, bool isNew)
{
bool insertedNew = false;
spgPickSplitIn in;
@@ -733,11 +739,18 @@ doPickSplit(Relation index, SpGistState *state,
* also, count up the amount of space that will be freed from current.
* (Note that in the non-root case, we won't actually delete the old
* tuples, only replace them with redirects or placeholders.)
+ *
+ * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
+ * page. For a pass-by-value data type we will fetch a word that must
+ * exist even though it may contain garbage (because of the fact that leaf
+ * tuples must have size at least SGDTSIZE). For a pass-by-reference type
+ * we are just computing a pointer that isn't going to get dereferenced.
+ * So it's not worth guarding the calls with isNulls checks.
*/
nToInsert = 0;
nToDelete = 0;
spaceToDelete = 0;
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
{
/*
* We are splitting the root (which up to now is also a leaf page).
@@ -813,26 +826,53 @@ doPickSplit(Relation index, SpGistState *state,
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
in.nTuples++;
- /*
- * Perform split using user-defined method.
- */
memset(&out, 0, sizeof(out));
- procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
- FunctionCall2Coll(procinfo,
- index->rd_indcollation[0],
- PointerGetDatum(&in),
- PointerGetDatum(&out));
+ if (!isNulls)
+ {
+ /*
+ * Perform split using user-defined method.
+ */
+ procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
+ FunctionCall2Coll(procinfo,
+ index->rd_indcollation[0],
+ PointerGetDatum(&in),
+ PointerGetDatum(&out));
- /*
- * Form new leaf tuples and count up the total space needed.
- */
- totalLeafSizes = 0;
- for (i = 0; i < in.nTuples; i++)
+ /*
+ * Form new leaf tuples and count up the total space needed.
+ */
+ totalLeafSizes = 0;
+ for (i = 0; i < in.nTuples; i++)
+ {
+ newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+ out.leafTupleDatums[i],
+ false);
+ totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ }
+ }
+ else
{
- newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
- out.leafTupleDatums[i]);
- totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ /*
+ * Perform dummy split that puts all tuples into one node.
+ * checkAllTheSame will override this and force allTheSame mode.
+ */
+ out.hasPrefix = false;
+ out.nNodes = 1;
+ out.nodeLabels = NULL;
+ out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
+
+ /*
+ * Form new leaf tuples and count up the total space needed.
+ */
+ totalLeafSizes = 0;
+ for (i = 0; i < in.nTuples; i++)
+ {
+ newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
+ (Datum) 0,
+ true);
+ totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
+ }
}
/*
@@ -872,11 +912,11 @@ doPickSplit(Relation index, SpGistState *state,
for (i = 0; i < out.nNodes; i++)
{
Datum label = (Datum) 0;
- bool isnull = (out.nodeLabels == NULL);
+ bool labelisnull = (out.nodeLabels == NULL);
- if (!isnull)
+ if (!labelisnull)
label = out.nodeLabels[i];
- nodes[i] = spgFormNodeTuple(state, label, isnull);
+ nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
innerTuple = spgFormInnerTuple(state,
out.hasPrefix, out.prefixDatum,
@@ -914,7 +954,7 @@ doPickSplit(Relation index, SpGistState *state,
*/
xlrec.initInner = false;
if (parent->buffer != InvalidBuffer &&
- parent->blkno != SPGIST_HEAD_BLKNO &&
+ !SpGistBlockIsRoot(parent->blkno) &&
(SpGistPageGetFreeSpace(parent->page, 1) >=
innerTuple->size + sizeof(ItemIdData)))
{
@@ -925,7 +965,8 @@ doPickSplit(Relation index, SpGistState *state,
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer(index,
- GBUF_INNER_PARITY(parent->blkno + 1),
+ GBUF_INNER_PARITY(parent->blkno + 1) |
+ (isNulls ? GBUF_NULLS : 0),
innerTuple->size + sizeof(ItemIdData),
&xlrec.initInner);
}
@@ -935,7 +976,7 @@ doPickSplit(Relation index, SpGistState *state,
newInnerBuffer = InvalidBuffer;
}
- /*----------
+ /*
* Because a WAL record can't involve more than four buffers, we can
* only afford to deal with two leaf pages in each picksplit action,
* ie the current page and at most one other.
@@ -956,9 +997,8 @@ doPickSplit(Relation index, SpGistState *state,
* If we are splitting the root page (turning it from a leaf page into an
* inner page), then no leaf tuples can go back to the current page; they
* must all go somewhere else.
- *----------
*/
- if (current->blkno != SPGIST_HEAD_BLKNO)
+ if (!SpGistBlockIsRoot(current->blkno))
currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
else
currentFreeSpace = 0; /* prevent assigning any tuples to current */
@@ -996,7 +1036,8 @@ doPickSplit(Relation index, SpGistState *state,
int curspace;
int newspace;
- newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
+ newLeafBuffer = SpGistGetBuffer(index,
+ GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
Min(totalLeafSizes,
SPGIST_PAGE_CAPACITY),
&xlrec.initDest);
@@ -1076,6 +1117,7 @@ doPickSplit(Relation index, SpGistState *state,
xlrec.blknoDest = InvalidBlockNumber;
xlrec.nDelete = 0;
xlrec.initSrc = isNew;
+ xlrec.storesNulls = isNulls;
leafdata = leafptr = (char *) palloc(totalLeafSizes);
@@ -1091,7 +1133,7 @@ doPickSplit(Relation index, SpGistState *state,
* the root; in that case there's no need because we'll re-init the page
* below. We do this first to make room for reinserting new leaf tuples.
*/
- if (current->blkno != SPGIST_HEAD_BLKNO)
+ if (!SpGistBlockIsRoot(current->blkno))
{
/*
* Init buffer instead of deleting individual tuples, but only if
@@ -1102,7 +1144,8 @@ doPickSplit(Relation index, SpGistState *state,
nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
PageGetMaxOffsetNumber(current->page))
{
- SpGistInitBuffer(current->buffer, SPGIST_LEAF);
+ SpGistInitBuffer(current->buffer,
+ SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
xlrec.initSrc = true;
}
else if (isNew)
@@ -1317,10 +1360,10 @@ doPickSplit(Relation index, SpGistState *state,
* Splitting root page, which was a leaf but now becomes inner page
* (and so "current" continues to point at it)
*/
- Assert(current->blkno == SPGIST_HEAD_BLKNO);
+ Assert(SpGistBlockIsRoot(current->blkno));
Assert(redirectTuplePos == InvalidOffsetNumber);
- SpGistInitBuffer(current->buffer, 0);
+ SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
xlrec.initInner = true;
xlrec.blknoInner = current->blkno;
@@ -1461,6 +1504,9 @@ spgAddNodeAction(Relation index, SpGistState *state,
XLogRecData rdata[5];
spgxlogAddNode xlrec;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/* Construct new inner tuple with additional node */
newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
@@ -1527,7 +1573,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
* allow only one inner tuple on the root page, and spgFormInnerTuple
* always checks that inner tuples don't exceed the size of a page.
*/
- if (current->blkno == SPGIST_HEAD_BLKNO)
+ if (SpGistBlockIsRoot(current->blkno))
elog(ERROR, "cannot enlarge root tuple any more");
Assert(parent->buffer != InvalidBuffer);
@@ -1657,6 +1703,9 @@ spgSplitNodeAction(Relation index, SpGistState *state,
spgxlogSplitTuple xlrec;
Buffer newBuffer = InvalidBuffer;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/*
* Construct new prefix tuple, containing a single node with the
* specified label. (We'll update the node's downlink to point to the
@@ -1709,7 +1758,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
* For the space calculation, note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry.
*/
- if (current->blkno == SPGIST_HEAD_BLKNO ||
+ if (SpGistBlockIsRoot(current->blkno) ||
SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
{
@@ -1804,7 +1853,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
*/
void
spgdoinsert(Relation index, SpGistState *state,
- ItemPointer heapPtr, Datum datum)
+ ItemPointer heapPtr, Datum datum, bool isnull)
{
int level = 0;
Datum leafDatum;
@@ -1817,7 +1866,7 @@ spgdoinsert(Relation index, SpGistState *state,
* value to be inserted is not toasted; FormIndexDatum doesn't guarantee
* that.
*/
- if (state->attType.attlen == -1)
+ if (!isnull && state->attType.attlen == -1)
datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
leafDatum = datum;
@@ -1828,8 +1877,11 @@ spgdoinsert(Relation index, SpGistState *state,
* If it isn't gonna fit, and the opclass can't reduce the datum size by
* suffixing, bail out now rather than getting into an endless loop.
*/
- leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
- SpGistGetTypeSize(&state->attType, leafDatum);
+ if (!isnull)
+ leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+ SpGistGetTypeSize(&state->attType, leafDatum);
+ else
+ leafSize = SGDTSIZE + sizeof(ItemIdData);
if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
ereport(ERROR,
@@ -1840,8 +1892,8 @@ spgdoinsert(Relation index, SpGistState *state,
RelationGetRelationName(index)),
errhint("Values larger than a buffer page cannot be indexed.")));
- /* Initialize "current" to the root page */
- current.blkno = SPGIST_HEAD_BLKNO;
+ /* Initialize "current" to the appropriate root page */
+ current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
current.buffer = InvalidBuffer;
current.page = NULL;
current.offnum = FirstOffsetNumber;
@@ -1873,10 +1925,11 @@ spgdoinsert(Relation index, SpGistState *state,
* for doPickSplit to always have a leaf page at hand; so just
* quietly limit our request to a page size.
*/
- current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
- Min(leafSize,
- SPGIST_PAGE_CAPACITY),
- &isNew);
+ current.buffer =
+ SpGistGetBuffer(index,
+ GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
+ Min(leafSize, SPGIST_PAGE_CAPACITY),
+ &isNew);
current.blkno = BufferGetBlockNumber(current.buffer);
}
else if (parent.buffer == InvalidBuffer ||
@@ -1892,19 +1945,25 @@ spgdoinsert(Relation index, SpGistState *state,
}
current.page = BufferGetPage(current.buffer);
+ /* should not arrive at a page of the wrong type */
+ if (isnull ? !SpGistPageStoresNulls(current.page) :
+ SpGistPageStoresNulls(current.page))
+ elog(ERROR, "SPGiST index page %u has wrong nulls flag",
+ current.blkno);
+
if (SpGistPageIsLeaf(current.page))
{
SpGistLeafTuple leafTuple;
int nToSplit,
sizeToSplit;
- leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
+ leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
/* it fits on page, so insert it and we're done */
addLeafTuple(index, state, leafTuple,
- &current, &parent, isNew);
+ &current, &parent, isnull, isNew);
break;
}
else if ((sizeToSplit =
@@ -1918,14 +1977,14 @@ spgdoinsert(Relation index, SpGistState *state,
* chain to another leaf page rather than splitting it.
*/
Assert(!isNew);
- moveLeafs(index, state, &current, &parent, leafTuple);
+ moveLeafs(index, state, &current, &parent, leafTuple, isnull);
break; /* we're done */
}
else
{
/* picksplit */
if (doPickSplit(index, state, &current, &parent,
- leafTuple, level, isNew))
+ leafTuple, level, isnull, isNew))
break; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
@@ -1972,11 +2031,20 @@ spgdoinsert(Relation index, SpGistState *state,
memset(&out, 0, sizeof(out));
- procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
- FunctionCall2Coll(procinfo,
- index->rd_indcollation[0],
- PointerGetDatum(&in),
- PointerGetDatum(&out));
+ if (!isnull)
+ {
+ /* use user-defined choose method */
+ procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
+ FunctionCall2Coll(procinfo,
+ index->rd_indcollation[0],
+ PointerGetDatum(&in),
+ PointerGetDatum(&out));
+ }
+ else
+ {
+ /* force "match" action (to insert to random subnode) */
+ out.resultType = spgMatchNode;
+ }
if (innerTuple->allTheSame)
{
@@ -2001,9 +2069,12 @@ spgdoinsert(Relation index, SpGistState *state,
/* Adjust level as per opclass request */
level += out.result.matchNode.levelAdd;
/* Replace leafDatum and recompute leafSize */
- leafDatum = out.result.matchNode.restDatum;
- leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
- SpGistGetTypeSize(&state->attType, leafDatum);
+ if (!isnull)
+ {
+ leafDatum = out.result.matchNode.restDatum;
+ leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
+ SpGistGetTypeSize(&state->attType, leafDatum);
+ }
/*
* Loop around and attempt to insert the new leafDatum