diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2012-03-11 16:29:04 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2012-03-11 16:29:59 -0400 |
commit | c6a11b89e48dfb47b305cea405924333dabc20b6 (patch) | |
tree | 1ef16196fa824d0515789c59f34e46e829a43966 /src/backend/access/spgist | |
parent | fc227a4e3b84f7bc243c4606780dde28aea257ee (diff) | |
download | postgresql-c6a11b89e48dfb47b305cea405924333dabc20b6.tar.gz postgresql-c6a11b89e48dfb47b305cea405924333dabc20b6.zip |
Teach SPGiST to store nulls and do whole-index scans.
This patch fixes the other major compatibility-breaking limitation of
SPGiST, that it didn't store anything for null values of the indexed
column, and so could not support whole-index scans or "x IS NULL"
tests. The approach is to create a wholly separate search tree for
the null entries, and use fixed "allTheSame" insertion and search
rules when processing this tree, instead of calling the index opclass
methods. This way the opclass methods do not need to worry about
dealing with nulls.
Catversion bump is for pg_am updates as well as the change in on-disk
format of SPGiST indexes; there are some tweaks in SPGiST WAL records
as well.
Heavily rewritten version of a patch by Oleg Bartunov and Teodor Sigaev.
(The original also stored nulls separately, but it reused GIN code to do
so; which required undesirable compromises in the on-disk format, and
would likely lead to bugs due to the GIN code being required to work in
two very different contexts.)
Diffstat (limited to 'src/backend/access/spgist')
-rw-r--r-- | src/backend/access/spgist/README | 32 | ||||
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 191 | ||||
-rw-r--r-- | src/backend/access/spgist/spginsert.c | 48 | ||||
-rw-r--r-- | src/backend/access/spgist/spgscan.c | 77 | ||||
-rw-r--r-- | src/backend/access/spgist/spgutils.c | 76 | ||||
-rw-r--r-- | src/backend/access/spgist/spgvacuum.c | 17 | ||||
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 37 |
7 files changed, 330 insertions, 148 deletions
diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README index 4ff0e357cb4..d20ad17a4b6 100644 --- a/src/backend/access/spgist/README +++ b/src/backend/access/spgist/README @@ -11,6 +11,7 @@ should have a high fanout to minimize I/O. The challenge is to map tree nodes to disk pages in such a way that the search algorithm accesses only a few disk pages, even if it traverses many nodes. + COMMON STRUCTURE DESCRIPTION Logically, an SP-GiST tree is a set of tuples, each of which can be either @@ -71,6 +72,21 @@ Leaf tuple consists of: ItemPointer to the heap + +NULLS HANDLING + +We assume that SPGiST-indexable operators are strict (can never succeed for +null inputs). It is still desirable to index nulls, so that whole-table +indexscans are possible and so that "x IS NULL" can be implemented by an +SPGiST indexscan. However, we prefer that SPGiST index opclasses not have +to cope with nulls. Therefore, the main tree of an SPGiST index does not +include any null entries. We store null entries in a separate SPGiST tree +occupying a disjoint set of pages (in particular, its own root page). +Insertions and searches in the nulls tree do not use any of the +opclass-supplied functions, but just use hardwired logic comparable to +AllTheSame cases in the normal tree. + + INSERTION ALGORITHM Insertion algorithm is designed to keep the tree in a consistent state at @@ -181,6 +197,7 @@ described in (5). and a new tuple to another page, if the list is short enough. This improves space utilization, but doesn't change the basis of the algorithm. + CONCURRENCY While descending the tree, the insertion algorithm holds exclusive lock on @@ -218,6 +235,7 @@ scan that had already visited the parent level could possibly reach such a redirect tuple, so we can remove redirects once all active transactions have been flushed out of the system. + DEAD TUPLES Tuples on leaf pages can be in one of four states: @@ -269,6 +287,7 @@ to PLACEHOLDER status by VACUUM, and are then candidates for replacement. DEAD state is not currently possible, since VACUUM does not attempt to remove unused inner tuples. + VACUUM VACUUM (or more precisely, spgbulkdelete) performs a single sequential scan @@ -302,13 +321,16 @@ performed; otherwise, it does an spgbulkdelete scan with an empty target list, so as to clean up redirections and placeholders, update the free space map, and gather statistics. + LAST USED PAGE MANAGEMENT -List of last used pages contains four pages - a leaf page and three inner -pages, one from each "triple parity" group. This list is stored between -calls on the index meta page, but updates are never WAL-logged to decrease -WAL traffic. Incorrect data on meta page isn't critical, because we could -allocate a new page at any moment. +The list of last used pages contains four pages - a leaf page and three +inner pages, one from each "triple parity" group. (Actually, there's one +such list for the main tree and a separate one for the nulls tree.) This +list is stored between calls on the index meta page, but updates are never +WAL-logged to decrease WAL traffic. Incorrect data on meta page isn't +critical, because we could allocate a new page at any moment. + AUTHORS diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index 85704762a6f..5ddb6672c5c 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -200,7 +200,7 @@ saveNodeLink(Relation index, SPPageDesc *parent, */ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, - SPPageDesc *current, SPPageDesc *parent, bool isNew) + SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew) { XLogRecData rdata[4]; spgxlogAddLeaf xlrec; @@ -208,6 +208,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, xlrec.node = index->rd_node; xlrec.blknoLeaf = current->blkno; xlrec.newPage = isNew; + xlrec.storesNulls = isNulls; /* these will be filled below as needed */ xlrec.offnumLeaf = InvalidOffsetNumber; @@ -224,7 +225,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, START_CRIT_SECTION(); if (current->offnum == InvalidOffsetNumber || - current->blkno == SPGIST_HEAD_BLKNO) + SpGistBlockIsRoot(current->blkno)) { /* Tuple is not part of a chain */ leafTuple->nextOffset = InvalidOffsetNumber; @@ -337,7 +338,7 @@ checkSplitConditions(Relation index, SpGistState *state, n = 0, totalSize = 0; - if (current->blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(current->blkno)) { /* return impossible values to force split */ *nToSplit = BLCKSZ; @@ -386,7 +387,7 @@ checkSplitConditions(Relation index, SpGistState *state, static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, - SpGistLeafTuple newLeafTuple) + SpGistLeafTuple newLeafTuple, bool isNulls) { int i, nDelete, @@ -451,7 +452,8 @@ moveLeafs(Relation index, SpGistState *state, } /* Find a leaf page that will hold them */ - nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage); + nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), + size, &xlrec.newPage); npage = BufferGetPage(nbuf); nblkno = BufferGetBlockNumber(nbuf); Assert(nblkno != current->blkno); @@ -464,6 +466,7 @@ moveLeafs(Relation index, SpGistState *state, xlrec.blknoDst = nblkno; xlrec.nMoves = nDelete; xlrec.replaceDead = replaceDead; + xlrec.storesNulls = isNulls; xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; @@ -584,6 +587,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position, * If so, randomly divide the tuples into several nodes (all with the same * label) and return TRUE to select allTheSame mode for this inner tuple. * + * (This code is also used to forcibly select allTheSame mode for nulls.) + * * If we know that the leaf tuples wouldn't all fit on one page, then we * exclude the last tuple (which is the incoming new tuple that forced a split) * from the check to see if more than one node is used. The reason for this @@ -674,7 +679,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, - SpGistLeafTuple newLeafTuple, int level, bool isNew) + SpGistLeafTuple newLeafTuple, + int level, bool isNulls, bool isNew) { bool insertedNew = false; spgPickSplitIn in; @@ -733,11 +739,18 @@ doPickSplit(Relation index, SpGistState *state, * also, count up the amount of space that will be freed from current. * (Note that in the non-root case, we won't actually delete the old * tuples, only replace them with redirects or placeholders.) + * + * Note: the SGLTDATUM calls here are safe even when dealing with a nulls + * page. For a pass-by-value data type we will fetch a word that must + * exist even though it may contain garbage (because of the fact that leaf + * tuples must have size at least SGDTSIZE). For a pass-by-reference type + * we are just computing a pointer that isn't going to get dereferenced. + * So it's not worth guarding the calls with isNulls checks. */ nToInsert = 0; nToDelete = 0; spaceToDelete = 0; - if (current->blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(current->blkno)) { /* * We are splitting the root (which up to now is also a leaf page). @@ -813,26 +826,53 @@ doPickSplit(Relation index, SpGistState *state, heapPtrs[in.nTuples] = newLeafTuple->heapPtr; in.nTuples++; - /* - * Perform split using user-defined method. - */ memset(&out, 0, sizeof(out)); - procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC); - FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out)); + if (!isNulls) + { + /* + * Perform split using user-defined method. + */ + procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); - /* - * Form new leaf tuples and count up the total space needed. - */ - totalLeafSizes = 0; - for (i = 0; i < in.nTuples; i++) + /* + * Form new leaf tuples and count up the total space needed. + */ + totalLeafSizes = 0; + for (i = 0; i < in.nTuples; i++) + { + newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, + out.leafTupleDatums[i], + false); + totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + } + } + else { - newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, - out.leafTupleDatums[i]); - totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + /* + * Perform dummy split that puts all tuples into one node. + * checkAllTheSame will override this and force allTheSame mode. + */ + out.hasPrefix = false; + out.nNodes = 1; + out.nodeLabels = NULL; + out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples); + + /* + * Form new leaf tuples and count up the total space needed. + */ + totalLeafSizes = 0; + for (i = 0; i < in.nTuples; i++) + { + newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, + (Datum) 0, + true); + totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + } } /* @@ -872,11 +912,11 @@ doPickSplit(Relation index, SpGistState *state, for (i = 0; i < out.nNodes; i++) { Datum label = (Datum) 0; - bool isnull = (out.nodeLabels == NULL); + bool labelisnull = (out.nodeLabels == NULL); - if (!isnull) + if (!labelisnull) label = out.nodeLabels[i]; - nodes[i] = spgFormNodeTuple(state, label, isnull); + nodes[i] = spgFormNodeTuple(state, label, labelisnull); } innerTuple = spgFormInnerTuple(state, out.hasPrefix, out.prefixDatum, @@ -914,7 +954,7 @@ doPickSplit(Relation index, SpGistState *state, */ xlrec.initInner = false; if (parent->buffer != InvalidBuffer && - parent->blkno != SPGIST_HEAD_BLKNO && + !SpGistBlockIsRoot(parent->blkno) && (SpGistPageGetFreeSpace(parent->page, 1) >= innerTuple->size + sizeof(ItemIdData))) { @@ -925,7 +965,8 @@ doPickSplit(Relation index, SpGistState *state, { /* Send tuple to page with next triple parity (see README) */ newInnerBuffer = SpGistGetBuffer(index, - GBUF_INNER_PARITY(parent->blkno + 1), + GBUF_INNER_PARITY(parent->blkno + 1) | + (isNulls ? GBUF_NULLS : 0), innerTuple->size + sizeof(ItemIdData), &xlrec.initInner); } @@ -935,7 +976,7 @@ doPickSplit(Relation index, SpGistState *state, newInnerBuffer = InvalidBuffer; } - /*---------- + /* * Because a WAL record can't involve more than four buffers, we can * only afford to deal with two leaf pages in each picksplit action, * ie the current page and at most one other. @@ -956,9 +997,8 @@ doPickSplit(Relation index, SpGistState *state, * If we are splitting the root page (turning it from a leaf page into an * inner page), then no leaf tuples can go back to the current page; they * must all go somewhere else. - *---------- */ - if (current->blkno != SPGIST_HEAD_BLKNO) + if (!SpGistBlockIsRoot(current->blkno)) currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete; else currentFreeSpace = 0; /* prevent assigning any tuples to current */ @@ -996,7 +1036,8 @@ doPickSplit(Relation index, SpGistState *state, int curspace; int newspace; - newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF, + newLeafBuffer = SpGistGetBuffer(index, + GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), Min(totalLeafSizes, SPGIST_PAGE_CAPACITY), &xlrec.initDest); @@ -1076,6 +1117,7 @@ doPickSplit(Relation index, SpGistState *state, xlrec.blknoDest = InvalidBlockNumber; xlrec.nDelete = 0; xlrec.initSrc = isNew; + xlrec.storesNulls = isNulls; leafdata = leafptr = (char *) palloc(totalLeafSizes); @@ -1091,7 +1133,7 @@ doPickSplit(Relation index, SpGistState *state, * the root; in that case there's no need because we'll re-init the page * below. We do this first to make room for reinserting new leaf tuples. */ - if (current->blkno != SPGIST_HEAD_BLKNO) + if (!SpGistBlockIsRoot(current->blkno)) { /* * Init buffer instead of deleting individual tuples, but only if @@ -1102,7 +1144,8 @@ doPickSplit(Relation index, SpGistState *state, nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder == PageGetMaxOffsetNumber(current->page)) { - SpGistInitBuffer(current->buffer, SPGIST_LEAF); + SpGistInitBuffer(current->buffer, + SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0)); xlrec.initSrc = true; } else if (isNew) @@ -1317,10 +1360,10 @@ doPickSplit(Relation index, SpGistState *state, * Splitting root page, which was a leaf but now becomes inner page * (and so "current" continues to point at it) */ - Assert(current->blkno == SPGIST_HEAD_BLKNO); + Assert(SpGistBlockIsRoot(current->blkno)); Assert(redirectTuplePos == InvalidOffsetNumber); - SpGistInitBuffer(current->buffer, 0); + SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); xlrec.initInner = true; xlrec.blknoInner = current->blkno; @@ -1461,6 +1504,9 @@ spgAddNodeAction(Relation index, SpGistState *state, XLogRecData rdata[5]; spgxlogAddNode xlrec; + /* Should not be applied to nulls */ + Assert(!SpGistPageStoresNulls(current->page)); + /* Construct new inner tuple with additional node */ newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); @@ -1527,7 +1573,7 @@ spgAddNodeAction(Relation index, SpGistState *state, * allow only one inner tuple on the root page, and spgFormInnerTuple * always checks that inner tuples don't exceed the size of a page. */ - if (current->blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(current->blkno)) elog(ERROR, "cannot enlarge root tuple any more"); Assert(parent->buffer != InvalidBuffer); @@ -1657,6 +1703,9 @@ spgSplitNodeAction(Relation index, SpGistState *state, spgxlogSplitTuple xlrec; Buffer newBuffer = InvalidBuffer; + /* Should not be applied to nulls */ + Assert(!SpGistPageStoresNulls(current->page)); + /* * Construct new prefix tuple, containing a single node with the * specified label. (We'll update the node's downlink to point to the @@ -1709,7 +1758,7 @@ spgSplitNodeAction(Relation index, SpGistState *state, * For the space calculation, note that prefixTuple replaces innerTuple * but postfixTuple will be a new entry. */ - if (current->blkno == SPGIST_HEAD_BLKNO || + if (SpGistBlockIsRoot(current->blkno) || SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size < prefixTuple->size + postfixTuple->size + sizeof(ItemIdData)) { @@ -1804,7 +1853,7 @@ spgSplitNodeAction(Relation index, SpGistState *state, */ void spgdoinsert(Relation index, SpGistState *state, - ItemPointer heapPtr, Datum datum) + ItemPointer heapPtr, Datum datum, bool isnull) { int level = 0; Datum leafDatum; @@ -1817,7 +1866,7 @@ spgdoinsert(Relation index, SpGistState *state, * value to be inserted is not toasted; FormIndexDatum doesn't guarantee * that. */ - if (state->attType.attlen == -1) + if (!isnull && state->attType.attlen == -1) datum = PointerGetDatum(PG_DETOAST_DATUM(datum)); leafDatum = datum; @@ -1828,8 +1877,11 @@ spgdoinsert(Relation index, SpGistState *state, * If it isn't gonna fit, and the opclass can't reduce the datum size by * suffixing, bail out now rather than getting into an endless loop. */ - leafSize = SGLTHDRSZ + sizeof(ItemIdData) + - SpGistGetTypeSize(&state->attType, leafDatum); + if (!isnull) + leafSize = SGLTHDRSZ + sizeof(ItemIdData) + + SpGistGetTypeSize(&state->attType, leafDatum); + else + leafSize = SGDTSIZE + sizeof(ItemIdData); if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK) ereport(ERROR, @@ -1840,8 +1892,8 @@ spgdoinsert(Relation index, SpGistState *state, RelationGetRelationName(index)), errhint("Values larger than a buffer page cannot be indexed."))); - /* Initialize "current" to the root page */ - current.blkno = SPGIST_HEAD_BLKNO; + /* Initialize "current" to the appropriate root page */ + current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO; current.buffer = InvalidBuffer; current.page = NULL; current.offnum = FirstOffsetNumber; @@ -1873,10 +1925,11 @@ spgdoinsert(Relation index, SpGistState *state, * for doPickSplit to always have a leaf page at hand; so just * quietly limit our request to a page size. */ - current.buffer = SpGistGetBuffer(index, GBUF_LEAF, - Min(leafSize, - SPGIST_PAGE_CAPACITY), - &isNew); + current.buffer = + SpGistGetBuffer(index, + GBUF_LEAF | (isnull ? GBUF_NULLS : 0), + Min(leafSize, SPGIST_PAGE_CAPACITY), + &isNew); current.blkno = BufferGetBlockNumber(current.buffer); } else if (parent.buffer == InvalidBuffer || @@ -1892,19 +1945,25 @@ spgdoinsert(Relation index, SpGistState *state, } current.page = BufferGetPage(current.buffer); + /* should not arrive at a page of the wrong type */ + if (isnull ? !SpGistPageStoresNulls(current.page) : + SpGistPageStoresNulls(current.page)) + elog(ERROR, "SPGiST index page %u has wrong nulls flag", + current.blkno); + if (SpGistPageIsLeaf(current.page)) { SpGistLeafTuple leafTuple; int nToSplit, sizeToSplit; - leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum); + leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull); if (leafTuple->size + sizeof(ItemIdData) <= SpGistPageGetFreeSpace(current.page, 1)) { /* it fits on page, so insert it and we're done */ addLeafTuple(index, state, leafTuple, - ¤t, &parent, isNew); + ¤t, &parent, isnull, isNew); break; } else if ((sizeToSplit = @@ -1918,14 +1977,14 @@ spgdoinsert(Relation index, SpGistState *state, * chain to another leaf page rather than splitting it. */ Assert(!isNew); - moveLeafs(index, state, ¤t, &parent, leafTuple); + moveLeafs(index, state, ¤t, &parent, leafTuple, isnull); break; /* we're done */ } else { /* picksplit */ if (doPickSplit(index, state, ¤t, &parent, - leafTuple, level, isNew)) + leafTuple, level, isnull, isNew)) break; /* doPickSplit installed new tuples */ /* leaf tuple will not be inserted yet */ @@ -1972,11 +2031,20 @@ spgdoinsert(Relation index, SpGistState *state, memset(&out, 0, sizeof(out)); - procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); - FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out)); + if (!isnull) + { + /* use user-defined choose method */ + procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); + } + else + { + /* force "match" action (to insert to random subnode) */ + out.resultType = spgMatchNode; + } if (innerTuple->allTheSame) { @@ -2001,9 +2069,12 @@ spgdoinsert(Relation index, SpGistState *state, /* Adjust level as per opclass request */ level += out.result.matchNode.levelAdd; /* Replace leafDatum and recompute leafSize */ - leafDatum = out.result.matchNode.restDatum; - leafSize = SGLTHDRSZ + sizeof(ItemIdData) + - SpGistGetTypeSize(&state->attType, leafDatum); + if (!isnull) + { + leafDatum = out.result.matchNode.restDatum; + leafSize = SGLTHDRSZ + sizeof(ItemIdData) + + SpGistGetTypeSize(&state->attType, leafDatum); + } /* * Loop around and attempt to insert the new leafDatum diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c index cbcf655674a..8ff9245e179 100644 --- a/src/backend/access/spgist/spginsert.c +++ b/src/backend/access/spgist/spginsert.c @@ -38,18 +38,15 @@ spgistBuildCallback(Relation index, HeapTuple htup, Datum *values, bool *isnull, bool tupleIsAlive, void *state) { SpGistBuildState *buildstate = (SpGistBuildState *) state; + MemoryContext oldCtx; - /* SPGiST doesn't index nulls */ - if (*isnull == false) - { - /* Work in temp context, and reset it after each tuple */ - MemoryContext oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); + /* Work in temp context, and reset it after each tuple */ + oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); - spgdoinsert(index, &buildstate->spgstate, &htup->t_self, *values); + spgdoinsert(index, &buildstate->spgstate, &htup->t_self, *values, *isnull); - MemoryContextSwitchTo(oldCtx); - MemoryContextReset(buildstate->tmpCtx); - } + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(buildstate->tmpCtx); } /* @@ -65,20 +62,23 @@ spgbuild(PG_FUNCTION_ARGS) double reltuples; SpGistBuildState buildstate; Buffer metabuffer, - rootbuffer; + rootbuffer, + nullbuffer; if (RelationGetNumberOfBlocks(index) != 0) elog(ERROR, "index \"%s\" already contains data", RelationGetRelationName(index)); /* - * Initialize the meta page and root page + * Initialize the meta page and root pages */ metabuffer = SpGistNewBuffer(index); rootbuffer = SpGistNewBuffer(index); + nullbuffer = SpGistNewBuffer(index); Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO); - Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_HEAD_BLKNO); + Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO); + Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO); START_CRIT_SECTION(); @@ -86,6 +86,8 @@ spgbuild(PG_FUNCTION_ARGS) MarkBufferDirty(metabuffer); SpGistInitBuffer(rootbuffer, SPGIST_LEAF); MarkBufferDirty(rootbuffer); + SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS); + MarkBufferDirty(nullbuffer); if (RelationNeedsWAL(index)) { @@ -104,12 +106,15 @@ spgbuild(PG_FUNCTION_ARGS) PageSetTLI(BufferGetPage(metabuffer), ThisTimeLineID); PageSetLSN(BufferGetPage(rootbuffer), recptr); PageSetTLI(BufferGetPage(rootbuffer), ThisTimeLineID); + PageSetLSN(BufferGetPage(nullbuffer), recptr); + PageSetTLI(BufferGetPage(nullbuffer), ThisTimeLineID); } END_CRIT_SECTION(); UnlockReleaseBuffer(metabuffer); UnlockReleaseBuffer(rootbuffer); + UnlockReleaseBuffer(nullbuffer); /* * Now insert all the heap data into the index @@ -159,11 +164,20 @@ spgbuildempty(PG_FUNCTION_ARGS) /* Likewise for the root page. */ SpGistInitPage(page, SPGIST_LEAF); - smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_HEAD_BLKNO, + smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO, + (char *) page, true); + if (XLogIsNeeded()) + log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, + SPGIST_ROOT_BLKNO, page); + + /* Likewise for the null-tuples root page. */ + SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS); + + smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO, (char *) page, true); if (XLogIsNeeded()) log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, - SPGIST_HEAD_BLKNO, page); + SPGIST_NULL_BLKNO, page); /* * An immediate sync is required even if we xlog'd the pages, because the @@ -194,10 +208,6 @@ spginsert(PG_FUNCTION_ARGS) MemoryContext oldCtx; MemoryContext insertCtx; - /* SPGiST doesn't index nulls */ - if (*isnull) - PG_RETURN_BOOL(false); - insertCtx = AllocSetContextCreate(CurrentMemoryContext, "SP-GiST insert temporary context", ALLOCSET_DEFAULT_MINSIZE, @@ -207,7 +217,7 @@ spginsert(PG_FUNCTION_ARGS) initSpGistState(&spgstate, index); - spgdoinsert(index, &spgstate, ht_ctid, *values); + spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull); SpGistUpdateMetaPage(index); diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index 99b0852611f..7a3a96230d1 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -23,6 +23,9 @@ #include "utils/memutils.h" +typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr, + Datum leafValue, bool isnull, bool recheck); + typedef struct ScanStackEntry { Datum reconstructedValue; /* value reconstructed from parent */ @@ -66,14 +69,20 @@ resetSpGistScanOpaque(SpGistScanOpaque so) freeScanStack(so); - Assert(!so->searchNulls); /* XXX fixme */ + if (so->searchNulls) + { + /* Stack a work item to scan the null index entries */ + startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry)); + ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber); + so->scanStack = lappend(so->scanStack, startEntry); + } if (so->searchNonNulls) { /* Stack a work item to scan the non-null index entries */ startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry)); - ItemPointerSet(&startEntry->ptr, SPGIST_HEAD_BLKNO, FirstOffsetNumber); - so->scanStack = list_make1(startEntry); + ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber); + so->scanStack = lappend(so->scanStack, startEntry); } if (so->want_itup) @@ -243,22 +252,35 @@ spgrestrpos(PG_FUNCTION_ARGS) } /* - * Test whether a leaf datum satisfies all the scan keys + * Test whether a leaf tuple satisfies all the scan keys * * *leafValue is set to the reconstructed datum, if provided * *recheck is set true if any of the operators are lossy */ static bool -spgLeafTest(Relation index, SpGistScanOpaque so, Datum leafDatum, +spgLeafTest(Relation index, SpGistScanOpaque so, + SpGistLeafTuple leafTuple, bool isnull, int level, Datum reconstructedValue, Datum *leafValue, bool *recheck) { bool result; + Datum leafDatum; spgLeafConsistentIn in; spgLeafConsistentOut out; FmgrInfo *procinfo; MemoryContext oldCtx; + if (isnull) + { + /* Should not have arrived on a nulls page unless nulls are wanted */ + Assert(so->searchNulls); + *leafValue = (Datum) 0; + *recheck = false; + return true; + } + + leafDatum = SGLTDATUM(leafTuple, &so->state); + /* use temp context for calling leaf_consistent */ oldCtx = MemoryContextSwitchTo(so->tempCxt); @@ -295,7 +317,7 @@ spgLeafTest(Relation index, SpGistScanOpaque so, Datum leafDatum, */ static void spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex, - void (*storeRes) (SpGistScanOpaque, ItemPointer, Datum, bool)) + storeRes_func storeRes) { Buffer buffer = InvalidBuffer; bool reportedSome = false; @@ -306,6 +328,7 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex, BlockNumber blkno; OffsetNumber offset; Page page; + bool isnull; /* Pull next to-do item from the list */ if (so->scanStack == NIL) @@ -336,6 +359,8 @@ redirect: page = BufferGetPage(buffer); + isnull = SpGistPageStoresNulls(page) ? true : false; + if (SpGistPageIsLeaf(page)) { SpGistLeafTuple leafTuple; @@ -343,7 +368,7 @@ redirect: Datum leafValue = (Datum) 0; bool recheck = false; - if (blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(blkno)) { /* When root is a leaf, examine all its tuples */ for (offset = FirstOffsetNumber; offset <= max; offset++) @@ -359,13 +384,14 @@ redirect: Assert(ItemPointerIsValid(&leafTuple->heapPtr)); if (spgLeafTest(index, so, - SGLTDATUM(leafTuple, &so->state), + leafTuple, isnull, stackEntry->level, stackEntry->reconstructedValue, &leafValue, &recheck)) { - storeRes(so, &leafTuple->heapPtr, leafValue, recheck); + storeRes(so, &leafTuple->heapPtr, + leafValue, isnull, recheck); reportedSome = true; } } @@ -404,13 +430,14 @@ redirect: Assert(ItemPointerIsValid(&leafTuple->heapPtr)); if (spgLeafTest(index, so, - SGLTDATUM(leafTuple, &so->state), + leafTuple, isnull, stackEntry->level, stackEntry->reconstructedValue, &leafValue, &recheck)) { - storeRes(so, &leafTuple->heapPtr, leafValue, recheck); + storeRes(so, &leafTuple->heapPtr, + leafValue, isnull, recheck); reportedSome = true; } @@ -468,11 +495,23 @@ redirect: memset(&out, 0, sizeof(out)); - procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC); - FunctionCall2Coll(procinfo, - index->rd_indcollation[0], - PointerGetDatum(&in), - PointerGetDatum(&out)); + if (!isnull) + { + /* use user-defined inner consistent method */ + procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); + } + else + { + /* force all children to be visited */ + out.nNodes = in.nNodes; + out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes); + for (i = 0; i < in.nNodes; i++) + out.nodeNumbers[i] = i; + } MemoryContextSwitchTo(oldCtx); @@ -524,7 +563,7 @@ redirect: /* storeRes subroutine for getbitmap case */ static void storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool recheck) + Datum leafValue, bool isnull, bool recheck) { tbm_add_tuples(so->tbm, heapPtr, 1, recheck); so->ntids++; @@ -551,7 +590,7 @@ spggetbitmap(PG_FUNCTION_ARGS) /* storeRes subroutine for gettuple case */ static void storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, - Datum leafValue, bool recheck) + Datum leafValue, bool isnull, bool recheck) { Assert(so->nPtrs < MaxIndexTuplesPerPage); so->heapPtrs[so->nPtrs] = *heapPtr; @@ -562,8 +601,6 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, * Reconstruct desired IndexTuple. We have to copy the datum out of * the temp context anyway, so we may as well create the tuple here. */ - bool isnull = false; - so->indexTups[so->nPtrs] = index_form_tuple(so->indexTupDesc, &leafValue, &isnull); diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 1f88562be78..46a10f6a206 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -148,10 +148,10 @@ SpGistNewBuffer(Relation index) break; /* nothing known to FSM */ /* - * The root page shouldn't ever be listed in FSM, but just in case it - * is, ignore it. + * The fixed pages shouldn't ever be listed in FSM, but just in case + * one is, ignore it. */ - if (blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsFixed(blkno)) continue; buffer = ReadBuffer(index, blkno); @@ -226,9 +226,8 @@ SpGistUpdateMetaPage(Relation index) } /* Macro to select proper element of lastUsedPages cache depending on flags */ -#define GET_LUP(c, f) (((f) & GBUF_LEAF) ? \ - &(c)->lastUsedPages.leafPage : \ - &(c)->lastUsedPages.innerPage[(f) & GBUF_PARITY_MASK]) +/* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */ +#define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES]) /* * Allocate and initialize a new buffer of the type and parity specified by @@ -254,15 +253,21 @@ static Buffer allocNewBuffer(Relation index, int flags) { SpGistCache *cache = spgGetCache(index); + uint16 pageflags = 0; + + if (GBUF_REQ_LEAF(flags)) + pageflags |= SPGIST_LEAF; + if (GBUF_REQ_NULLS(flags)) + pageflags |= SPGIST_NULLS; for (;;) { Buffer buffer; buffer = SpGistNewBuffer(index); - SpGistInitBuffer(buffer, (flags & GBUF_LEAF) ? SPGIST_LEAF : 0); + SpGistInitBuffer(buffer, pageflags); - if (flags & GBUF_LEAF) + if (pageflags & SPGIST_LEAF) { /* Leaf pages have no parity concerns, so just use it */ return buffer; @@ -270,9 +275,9 @@ allocNewBuffer(Relation index, int flags) else { BlockNumber blkno = BufferGetBlockNumber(buffer); - int blkParity = blkno % 3; + int blkFlags = GBUF_INNER_PARITY(blkno); - if ((flags & GBUF_PARITY_MASK) == blkParity) + if ((flags & GBUF_PARITY_MASK) == blkFlags) { /* Page has right parity, use it */ return buffer; @@ -280,8 +285,10 @@ allocNewBuffer(Relation index, int flags) else { /* Page has wrong parity, record it in cache and try again */ - cache->lastUsedPages.innerPage[blkParity].blkno = blkno; - cache->lastUsedPages.innerPage[blkParity].freeSpace = + if (pageflags & SPGIST_NULLS) + blkFlags |= GBUF_NULLS; + cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno; + cache->lastUsedPages.cachedPage[blkFlags].freeSpace = PageGetExactFreeSpace(BufferGetPage(buffer)); UnlockReleaseBuffer(buffer); } @@ -329,8 +336,8 @@ SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew) return allocNewBuffer(index, flags); } - /* root page should never be in cache */ - Assert(lup->blkno != SPGIST_HEAD_BLKNO); + /* fixed pages should never be in cache */ + Assert(!SpGistBlockIsFixed(lup->blkno)); /* If cached freeSpace isn't enough, don't bother looking at the page */ if (lup->freeSpace >= needSpace) @@ -355,7 +362,13 @@ SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew) if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page)) { /* OK to initialize the page */ - SpGistInitBuffer(buffer, (flags & GBUF_LEAF) ? SPGIST_LEAF : 0); + uint16 pageflags = 0; + + if (GBUF_REQ_LEAF(flags)) + pageflags |= SPGIST_LEAF; + if (GBUF_REQ_NULLS(flags)) + pageflags |= SPGIST_NULLS; + SpGistInitBuffer(buffer, pageflags); lup->freeSpace = PageGetExactFreeSpace(page) - needSpace; *isNew = true; return buffer; @@ -365,8 +378,8 @@ SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew) * Check that page is of right type and has enough space. We must * recheck this since our cache isn't necessarily up to date. */ - if ((flags & GBUF_LEAF) ? SpGistPageIsLeaf(page) : - !SpGistPageIsLeaf(page)) + if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) && + (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page))) { int freeSpace = PageGetExactFreeSpace(page); @@ -407,14 +420,16 @@ SpGistSetLastUsedPage(Relation index, Buffer buffer) BlockNumber blkno = BufferGetBlockNumber(buffer); int flags; - /* Never enter the root page in cache, though */ - if (blkno == SPGIST_HEAD_BLKNO) + /* Never enter fixed pages (root pages) in cache, though */ + if (SpGistBlockIsFixed(blkno)) return; if (SpGistPageIsLeaf(page)) flags = GBUF_LEAF; else flags = GBUF_INNER_PARITY(blkno); + if (SpGistPageStoresNulls(page)) + flags |= GBUF_NULLS; lup = GET_LUP(cache, flags); @@ -459,6 +474,7 @@ void SpGistInitMetapage(Page page) { SpGistMetaPageData *metadata; + int i; SpGistInitPage(page, SPGIST_META); metadata = SpGistPageGetMeta(page); @@ -466,10 +482,8 @@ SpGistInitMetapage(Page page) metadata->magicNumber = SPGIST_MAGIC_NUMBER; /* initialize last-used-page cache to empty */ - metadata->lastUsedPages.innerPage[0].blkno = InvalidBlockNumber; - metadata->lastUsedPages.innerPage[1].blkno = InvalidBlockNumber; - metadata->lastUsedPages.innerPage[2].blkno = InvalidBlockNumber; - metadata->lastUsedPages.leafPage.blkno = InvalidBlockNumber; + for (i = 0; i < SPGIST_CACHED_PAGES; i++) + metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber; } /* @@ -490,7 +504,7 @@ spgoptions(PG_FUNCTION_ARGS) } /* - * Get the space needed to store a datum of the indicated type. + * Get the space needed to store a non-null datum of the indicated type. * Note the result is already rounded up to a MAXALIGN boundary. * Also, we follow the SPGiST convention that pass-by-val types are * just stored in their Datum representation (compare memcpyDatum). @@ -511,7 +525,7 @@ SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum) } /* - * Copy the given datum to *target + * Copy the given non-null datum to *target */ static void memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum) @@ -533,17 +547,20 @@ memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum) * Construct a leaf tuple containing the given heap TID and datum value */ SpGistLeafTuple -spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum) +spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, + Datum datum, bool isnull) { SpGistLeafTuple tup; unsigned int size; /* compute space needed (note result is already maxaligned) */ - size = SGLTHDRSZ + SpGistGetTypeSize(&state->attType, datum); + size = SGLTHDRSZ; + if (!isnull) + size += SpGistGetTypeSize(&state->attType, datum); /* * Ensure that we can replace the tuple with a dead tuple later. This - * test is unnecessary given current tuple layouts, but let's be safe. + * test is unnecessary when !isnull, but let's be safe. */ if (size < SGDTSIZE) size = SGDTSIZE; @@ -554,7 +571,8 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum) tup->size = size; tup->nextOffset = InvalidOffsetNumber; tup->heapPtr = *heapPtr; - memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum); + if (!isnull) + memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum); return tup; } diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index 4598ea8d67f..a09da84a2aa 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -307,7 +307,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer) } /* - * Vacuum the root page when it is a leaf + * Vacuum a root page when it is also a leaf * * On the root, we just delete any dead leaf tuples; no fancy business */ @@ -321,6 +321,7 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) OffsetNumber i, max = PageGetMaxOffsetNumber(page); + xlrec.blkno = BufferGetBlockNumber(buffer); xlrec.nDelete = 0; /* Scan page, identify tuples to delete, accumulate stats */ @@ -537,7 +538,7 @@ spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno) } else if (SpGistPageIsLeaf(page)) { - if (blkno == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(blkno)) { vacuumLeafRoot(bds, index, buffer); /* no need for vacuumRedirectAndPlaceholder */ @@ -560,7 +561,7 @@ spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno) * put a new tuple. Otherwise, check for empty/deletable page, and * make sure FSM knows about it. */ - if (blkno != SPGIST_HEAD_BLKNO) + if (!SpGistBlockIsRoot(blkno)) { /* If page is now empty, mark it deleted */ if (PageIsEmpty(page) && !SpGistPageIsDeleted(page)) @@ -598,7 +599,7 @@ spgvacuumscan(spgBulkDeleteState *bds) /* Finish setting up spgBulkDeleteState */ initSpGistState(&bds->spgstate, index); bds->OldestXmin = GetOldestXmin(true, false); - bds->lastFilledBlock = SPGIST_HEAD_BLKNO; + bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO; /* * Reset counts that will be incremented during the scan; needed in case @@ -619,7 +620,7 @@ spgvacuumscan(spgBulkDeleteState *bds) * delete some deletable tuples. See more extensive comments about * this in btvacuumscan(). */ - blkno = SPGIST_HEAD_BLKNO; + blkno = SPGIST_METAPAGE_BLKNO + 1; for (;;) { /* Get the current relation length */ @@ -648,6 +649,12 @@ spgvacuumscan(spgBulkDeleteState *bds) * XXX disabled because it's unsafe due to possible concurrent inserts. * We'd have to rescan the pages to make sure they're still empty, and it * doesn't seem worth it. Note that btree doesn't do this either. + * + * Another reason not to truncate is that it could invalidate the cached + * pages-with-freespace pointers in the metapage and other backends' + * relation caches, that is leave them pointing to nonexistent pages. + * Adding RelationGetNumberOfBlocks calls to protect the places that use + * those pointers would be unduly expensive. */ #ifdef NOT_USED if (num_pages > bds->lastFilledBlock + 1) diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index daa8ae300ba..8e87e2adc90 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -84,7 +84,7 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); - buffer = XLogReadBuffer(*node, SPGIST_HEAD_BLKNO, true); + buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); SpGistInitBuffer(buffer, SPGIST_LEAF); page = (Page) BufferGetPage(buffer); @@ -92,6 +92,15 @@ spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); + + buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true); + Assert(BufferIsValid(buffer)); + SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS); + page = (Page) BufferGetPage(buffer); + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); } static void @@ -116,7 +125,8 @@ spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record) page = BufferGetPage(buffer); if (xldata->newPage) - SpGistInitBuffer(buffer, SPGIST_LEAF); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); if (!XLByteLE(lsn, PageGetLSN(page))) { @@ -218,7 +228,8 @@ spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record) page = BufferGetPage(buffer); if (xldata->newPage) - SpGistInitBuffer(buffer, SPGIST_LEAF); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); if (!XLByteLE(lsn, PageGetLSN(page))) { @@ -344,6 +355,7 @@ spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record) { page = BufferGetPage(buffer); + /* AddNode is not used for nulls pages */ if (xldata->newPage) SpGistInitBuffer(buffer, 0); @@ -464,6 +476,7 @@ spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record) { page = BufferGetPage(buffer); + /* SplitTuple is not used for nulls pages */ if (xldata->newPage) SpGistInitBuffer(buffer, 0); @@ -545,7 +558,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) */ bbi = 0; - if (xldata->blknoSrc == SPGIST_HEAD_BLKNO) + if (SpGistBlockIsRoot(xldata->blknoSrc)) { /* when splitting root, we touch it only in the guise of new inner */ srcBuffer = InvalidBuffer; @@ -557,7 +570,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) Assert(BufferIsValid(srcBuffer)); page = (Page) BufferGetPage(srcBuffer); - SpGistInitBuffer(srcBuffer, SPGIST_LEAF); + SpGistInitBuffer(srcBuffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); /* don't update LSN etc till we're done with it */ } else @@ -612,7 +626,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) Assert(BufferIsValid(destBuffer)); page = (Page) BufferGetPage(destBuffer); - SpGistInitBuffer(destBuffer, SPGIST_LEAF); + SpGistInitBuffer(destBuffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); /* don't update LSN etc till we're done with it */ } else @@ -678,7 +693,8 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) page = BufferGetPage(buffer); if (xldata->initInner) - SpGistInitBuffer(buffer, 0); + SpGistInitBuffer(buffer, + (xldata->storesNulls ? SPGIST_NULLS : 0)); if (!XLByteLE(lsn, PageGetLSN(page))) { @@ -709,7 +725,7 @@ spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record) if (xldata->blknoParent == InvalidBlockNumber) { /* no parent cause we split the root */ - Assert(xldata->blknoInner == SPGIST_HEAD_BLKNO); + Assert(SpGistBlockIsRoot(xldata->blknoInner)); } else if (xldata->blknoInner != xldata->blknoParent) { @@ -842,7 +858,7 @@ spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record) if (!(record->xl_info & XLR_BKP_BLOCK_1)) { - buffer = XLogReadBuffer(xldata->node, SPGIST_HEAD_BLKNO, false); + buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (BufferIsValid(buffer)) { page = BufferGetPage(buffer); @@ -1039,7 +1055,8 @@ spg_desc(StringInfo buf, uint8 xl_info, char *rec) break; case XLOG_SPGIST_VACUUM_ROOT: out_target(buf, ((spgxlogVacuumRoot *) rec)->node); - appendStringInfo(buf, "vacuum leaf tuples on root page"); + appendStringInfo(buf, "vacuum leaf tuples on root page %u", + ((spgxlogVacuumRoot *) rec)->blkno); break; case XLOG_SPGIST_VACUUM_REDIRECT: out_target(buf, ((spgxlogVacuumRedirect *) rec)->node); |