diff options
Diffstat (limited to 'src/backend/access/gin/ginentrypage.c')
-rw-r--r-- | src/backend/access/gin/ginentrypage.c | 134 |
1 files changed, 83 insertions, 51 deletions
diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c index e60c708b01e..6f4e727b3e9 100644 --- a/src/backend/access/gin/ginentrypage.c +++ b/src/backend/access/gin/ginentrypage.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * ginentrypage.c - * page utilities routines for the postgres inverted index access method. + * routines for handling GIN entry tree pages. * * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group @@ -15,8 +15,15 @@ #include "postgres.h" #include "access/gin_private.h" +#include "miscadmin.h" #include "utils/rel.h" +static void entrySplitPage(GinBtree btree, Buffer origbuf, + GinBtreeStack *stack, + void *insertPayload, + BlockNumber updateblkno, XLogRecData **prdata, + Page *newlpage, Page *newrpage); + /* * Form a tuple for entry tree. * @@ -27,15 +34,15 @@ * format that is being built here. We build on the assumption that we * are making a leaf-level key entry containing a posting list of nipd items. * If the caller is actually trying to make a posting-tree entry, non-leaf - * entry, or pending-list entry, it should pass nipd = 0 and then overwrite - * the t_tid fields as necessary. In any case, ipd can be NULL to skip - * copying any itempointers into the posting list; the caller is responsible - * for filling the posting list afterwards, if ipd = NULL and nipd > 0. + * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite + * the t_tid fields as necessary. In any case, 'data' can be NULL to skip + * filling in the posting list; the caller is responsible for filling it + * afterwards if data = NULL and nipd > 0. */ IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, - ItemPointerData *ipd, uint32 nipd, + Pointer data, Size dataSize, int nipd, bool errorTooBig) { Datum datums[2]; @@ -80,27 +87,25 @@ GinFormTuple(GinState *ginstate, newsize = Max(newsize, minsize); } - newsize = SHORTALIGN(newsize); - GinSetPostingOffset(itup, newsize); - GinSetNPosting(itup, nipd); /* * Add space needed for posting list, if any. Then check that the tuple * won't be too big to store. */ - newsize += sizeof(ItemPointerData) * nipd; + newsize += dataSize; + newsize = MAXALIGN(newsize); - if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize)) + + if (newsize > GinMaxItemSize) { if (errorTooBig) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("index row size %lu exceeds maximum %lu for index \"%s\"", (unsigned long) newsize, - (unsigned long) Min(INDEX_SIZE_MASK, - GinMaxItemSize), + (unsigned long) GinMaxItemSize, RelationGetRelationName(ginstate->index)))); pfree(itup); return NULL; @@ -119,13 +124,21 @@ GinFormTuple(GinState *ginstate, */ memset((char *) itup + IndexTupleSize(itup), 0, newsize - IndexTupleSize(itup)); - /* set new size in tuple header */ itup->t_info &= ~INDEX_SIZE_MASK; itup->t_info |= newsize; } /* + * Copy in the posting list, if provided + */ + if (data) + { + char *ptr = GinGetPosting(itup); + memcpy(ptr, data, dataSize); + } + + /* * Insert category byte, if needed */ if (category != GIN_CAT_NORM_KEY) @@ -133,37 +146,45 @@ GinFormTuple(GinState *ginstate, Assert(IndexTupleHasNulls(itup)); GinSetNullCategory(itup, ginstate, category); } - - /* - * Copy in the posting list, if provided - */ - if (ipd) - memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd); - return itup; } /* - * Sometimes we reduce the number of posting list items in a tuple after - * having built it with GinFormTuple. This function adjusts the size - * fields to match. + * Read item pointers from leaf entry tuple. + * + * Returns a palloc'd array of ItemPointers. The number of items is returned + * in *nitems. */ -void -GinShortenTuple(IndexTuple itup, uint32 nipd) +ItemPointer +ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup, + int *nitems) { - uint32 newsize; - - Assert(nipd <= GinGetNPosting(itup)); + Pointer ptr = GinGetPosting(itup); + int nipd = GinGetNPosting(itup); + ItemPointer ipd; + int ndecoded; - newsize = GinGetPostingOffset(itup) + sizeof(ItemPointerData) * nipd; - newsize = MAXALIGN(newsize); - - Assert(newsize <= (itup->t_info & INDEX_SIZE_MASK)); - - itup->t_info &= ~INDEX_SIZE_MASK; - itup->t_info |= newsize; - - GinSetNPosting(itup, nipd); + if (GinItupIsCompressed(itup)) + { + if (nipd > 0) + { + ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded); + if (nipd != ndecoded) + elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded", + nipd, ndecoded); + } + else + { + ipd = palloc(0); + } + } + else + { + ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd); + memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd); + } + *nitems = nipd; + return ipd; } /* @@ -492,13 +513,14 @@ entryPreparePage(GinBtree btree, Page page, OffsetNumber off, * the downlink of the existing item at 'off' is updated to point to * 'updateblkno'. */ -static bool -entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, +static GinPlaceToPageRC +entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertPayload, BlockNumber updateblkno, - XLogRecData **prdata) + XLogRecData **prdata, Page *newlpage, Page *newrpage) { GinBtreeEntryInsertData *insertData = insertPayload; Page page = BufferGetPage(buf); + OffsetNumber off = stack->off; OffsetNumber placed; int cnt = 0; @@ -508,7 +530,13 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, /* quick exit if it doesn't fit */ if (!entryIsEnoughSpace(btree, buf, off, insertData)) - return false; + { + entrySplitPage(btree, buf, stack, insertPayload, updateblkno, + prdata, newlpage, newrpage); + return SPLIT; + } + + START_CRIT_SECTION(); *prdata = rdata; entryPreparePage(btree, page, off, insertData, updateblkno); @@ -522,6 +550,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, RelationGetRelationName(btree->index)); data.isDelete = insertData->isDelete; + data.offset = off; rdata[cnt].buffer = buf; rdata[cnt].buffer_std = false; @@ -536,21 +565,24 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, rdata[cnt].len = IndexTupleSize(insertData->entry); rdata[cnt].next = NULL; - return true; + return INSERTED; } /* * Place tuple and split page, original buffer(lbuf) leaves untouched, - * returns shadow page of lbuf filled new data. + * returns shadow pages filled with new data. * Tuples are distributed between pages by equal size on its, not * an equal number! */ -static Page -entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, +static void +entrySplitPage(GinBtree btree, Buffer origbuf, + GinBtreeStack *stack, void *insertPayload, - BlockNumber updateblkno, XLogRecData **prdata) + BlockNumber updateblkno, XLogRecData **prdata, + Page *newlpage, Page *newrpage) { GinBtreeEntryInsertData *insertData = insertPayload; + OffsetNumber off = stack->off; OffsetNumber i, maxoff, separator = InvalidOffsetNumber; @@ -561,8 +593,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, char *ptr; IndexTuple itup; Page page; - Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf)); - Page rpage = BufferGetPage(rbuf); + Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf)); + Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf)); Size pageSize = PageGetPageSize(lpage); /* these must be static so they can be returned to caller */ @@ -651,7 +683,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, rdata[1].len = tupstoresize; rdata[1].next = NULL; - return lpage; + *newlpage = lpage; + *newrpage = rpage; } /* @@ -719,7 +752,6 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum, btree->findItem = entryLocateLeafEntry; btree->findChildPtr = entryFindChildPtr; btree->placeToPage = entryPlaceToPage; - btree->splitPage = entrySplitPage; btree->fillRoot = ginEntryFillRoot; btree->prepareDownlink = entryPrepareDownlink; |