diff options
-rw-r--r-- | src/backend/storage/page/bufpage.c | 284 |
1 files changed, 252 insertions, 32 deletions
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index d708117a406..4bc2bf955df 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -411,51 +411,250 @@ PageRestoreTempPage(Page tempPage, Page oldPage) } /* - * sorting support for PageRepairFragmentation and PageIndexMultiDelete + * Tuple defrag support for PageRepairFragmentation and PageIndexMultiDelete */ -typedef struct itemIdSortData +typedef struct itemIdCompactData { uint16 offsetindex; /* linp array index */ int16 itemoff; /* page offset of item data */ uint16 alignedlen; /* MAXALIGN(item data len) */ -} itemIdSortData; -typedef itemIdSortData *itemIdSort; - -static int -itemoffcompare(const void *itemidp1, const void *itemidp2) -{ - /* Sort in decreasing itemoff order */ - return ((itemIdSort) itemidp2)->itemoff - - ((itemIdSort) itemidp1)->itemoff; -} +} itemIdCompactData; +typedef itemIdCompactData *itemIdCompact; /* * After removing or marking some line pointers unused, move the tuples to - * remove the gaps caused by the removed items. + * remove the gaps caused by the removed items and reorder them back into + * reverse line pointer order in the page. + * + * This function can often be fairly hot, so it pays to take some measures to + * make it as optimal as possible. + * + * Callers may pass 'presorted' as true if the 'itemidbase' array is sorted in + * descending order of itemoff. When this is true we can just memmove() + * tuples towards the end of the page. This is quite a common case as it's + * the order that tuples are initially inserted into pages. When we call this + * function to defragment the tuples in the page then any new line pointers + * added to the page will keep that presorted order, so hitting this case is + * still very common for tables that are commonly updated. + * + * When the 'itemidbase' array is not presorted then we're unable to just + * memmove() tuples around freely. Doing so could cause us to overwrite the + * memory belonging to a tuple we've not moved yet. In this case, we copy all + * the tuples that need to be moved into a temporary buffer. We can then + * simply memcpy() out of that temp buffer back into the page at the correct + * location. Tuples are copied back into the page in the same order as the + * 'itemidbase' array, so we end up reordering the tuples back into reverse + * line pointer order. This will increase the chances of hitting the + * presorted case the next time around. + * + * Callers must ensure that nitems is > 0 */ static void -compactify_tuples(itemIdSort itemidbase, int nitems, Page page) +compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted) { PageHeader phdr = (PageHeader) page; Offset upper; + Offset copy_tail; + Offset copy_head; + itemIdCompact itemidptr; int i; - /* sort itemIdSortData array into decreasing itemoff order */ - qsort((char *) itemidbase, nitems, sizeof(itemIdSortData), - itemoffcompare); + /* Code within will not work correctly if nitems == 0 */ + Assert(nitems > 0); - upper = phdr->pd_special; - for (i = 0; i < nitems; i++) + if (presorted) { - itemIdSort itemidptr = &itemidbase[i]; - ItemId lp; - lp = PageGetItemId(page, itemidptr->offsetindex + 1); - upper -= itemidptr->alignedlen; +#ifdef USE_ASSERT_CHECKING + { + /* + * Verify we've not gotten any new callers that are incorrectly + * passing a true presorted value. + */ + Offset lastoff = phdr->pd_special; + + for (i = 0; i < nitems; i++) + { + itemidptr = &itemidbase[i]; + + Assert(lastoff > itemidptr->itemoff); + + lastoff = itemidptr->itemoff; + } + } +#endif /* USE_ASSERT_CHECKING */ + + /* + * 'itemidbase' is already in the optimal order, i.e, lower item + * pointers have a higher offset. This allows us to memmove() the + * tuples up to the end of the page without having to worry about + * overwriting other tuples that have not been moved yet. + * + * There's a good chance that there are tuples already right at the + * end of the page that we can simply skip over because they're + * already in the correct location within the page. We'll do that + * first... + */ + upper = phdr->pd_special; + i = 0; + do + { + itemidptr = &itemidbase[i]; + if (upper != itemidptr->itemoff + itemidptr->alignedlen) + break; + upper -= itemidptr->alignedlen; + + i++; + } while (i < nitems); + + /* + * Now that we've found the first tuple that needs to be moved, we can + * do the tuple compactification. We try and make the least number of + * memmove() calls and only call memmove() when there's a gap. When + * we see a gap we just move all tuples after the gap up until the + * point of the last move operation. + */ + copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen; + for (; i < nitems; i++) + { + ItemId lp; + + itemidptr = &itemidbase[i]; + lp = PageGetItemId(page, itemidptr->offsetindex + 1); + + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + memmove((char *) page + upper, + page + copy_head, + copy_tail - copy_head); + + /* + * We've now moved all tuples already seen, but not the + * current tuple, so we set the copy_tail to the end of this + * tuple so it can be moved in another iteration of the loop. + */ + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + /* shift the target offset down by the length of this tuple */ + upper -= itemidptr->alignedlen; + /* point the copy_head to the start of this tuple */ + copy_head = itemidptr->itemoff; + + /* update the line pointer to reference the new offset */ + lp->lp_off = upper; + + } + + /* move the remaining tuples. */ memmove((char *) page + upper, - (char *) page + itemidptr->itemoff, - itemidptr->alignedlen); - lp->lp_off = upper; + page + copy_head, + copy_tail - copy_head); + } + else + { + PGAlignedBlock scratch; + char *scratchptr = scratch.data; + + /* + * Non-presorted case: The tuples in the itemidbase array may be in + * any order. So, in order to move these to the end of the page we + * must make a temp copy of each tuple that needs to be moved before + * we copy them back into the page at the new offset. + * + * If a large percentage of tuples have been pruned (>75%) then we'll + * copy these into the temp buffer tuple-by-tuple, otherwise, we'll + * just do a single memcpy() for all tuples that need to be moved. + * When so many tuples have been removed there's likely to be a lot of + * gaps and it's unlikely that many non-movable tuples remain at the + * end of the page. + */ + if (nitems < PageGetMaxOffsetNumber(page) / 4) + { + i = 0; + do + { + itemidptr = &itemidbase[i]; + memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff, + itemidptr->alignedlen); + i++; + } while (i < nitems); + + /* Set things up for the compactification code below */ + i = 0; + itemidptr = &itemidbase[0]; + upper = phdr->pd_special; + } + else + { + upper = phdr->pd_special; + + /* + * Many tuples are likely to already be in the correct location. + * There's no need to copy these into the temp buffer. Instead + * we'll just skip forward in the itemidbase array to the position + * that we do need to move tuples from so that the code below just + * leaves these ones alone. + */ + i = 0; + do + { + itemidptr = &itemidbase[i]; + if (upper != itemidptr->itemoff + itemidptr->alignedlen) + break; + upper -= itemidptr->alignedlen; + + i++; + } while (i < nitems); + + /* Copy all tuples that need to be moved into the temp buffer */ + memcpy(scratchptr + phdr->pd_upper, + page + phdr->pd_upper, + upper - phdr->pd_upper); + } + + /* + * Do the tuple compactification. itemidptr is already pointing to + * the first tuple that we're going to move. Here we collapse the + * memcpy calls for adjacent tuples into a single call. This is done + * by delaying the memcpy call until we find a gap that needs to be + * closed. + */ + copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen; + for (; i < nitems; i++) + { + ItemId lp; + + itemidptr = &itemidbase[i]; + lp = PageGetItemId(page, itemidptr->offsetindex + 1); + + /* copy pending tuples when we detect a gap */ + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + memcpy((char *) page + upper, + scratchptr + copy_head, + copy_tail - copy_head); + + /* + * We've now copied all tuples already seen, but not the + * current tuple, so we set the copy_tail to the end of this + * tuple. + */ + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + /* shift the target offset down by the length of this tuple */ + upper -= itemidptr->alignedlen; + /* point the copy_head to the start of this tuple */ + copy_head = itemidptr->itemoff; + + /* update the line pointer to reference the new offset */ + lp->lp_off = upper; + + } + + /* Copy the remaining chunk */ + memcpy((char *) page + upper, + scratchptr + copy_head, + copy_tail - copy_head); } phdr->pd_upper = upper; @@ -477,14 +676,16 @@ PageRepairFragmentation(Page page) Offset pd_lower = ((PageHeader) page)->pd_lower; Offset pd_upper = ((PageHeader) page)->pd_upper; Offset pd_special = ((PageHeader) page)->pd_special; - itemIdSortData itemidbase[MaxHeapTuplesPerPage]; - itemIdSort itemidptr; + Offset last_offset; + itemIdCompactData itemidbase[MaxHeapTuplesPerPage]; + itemIdCompact itemidptr; ItemId lp; int nline, nstorage, nunused; int i; Size totallen; + bool presorted = true; /* For now */ /* * It's worth the trouble to be more paranoid here than in most places, @@ -509,6 +710,7 @@ PageRepairFragmentation(Page page) nline = PageGetMaxOffsetNumber(page); itemidptr = itemidbase; nunused = totallen = 0; + last_offset = pd_special; for (i = FirstOffsetNumber; i <= nline; i++) { lp = PageGetItemId(page, i); @@ -518,6 +720,12 @@ PageRepairFragmentation(Page page) { itemidptr->offsetindex = i - 1; itemidptr->itemoff = ItemIdGetOffset(lp); + + if (last_offset > itemidptr->itemoff) + last_offset = itemidptr->itemoff; + else + presorted = false; + if (unlikely(itemidptr->itemoff < (int) pd_upper || itemidptr->itemoff >= (int) pd_special)) ereport(ERROR, @@ -552,7 +760,7 @@ PageRepairFragmentation(Page page) errmsg("corrupted item lengths: total %u, available space %u", (unsigned int) totallen, pd_special - pd_lower))); - compactify_tuples(itemidbase, nstorage, page); + compactify_tuples(itemidbase, nstorage, page, presorted); } /* Set hint bit for PageAddItem */ @@ -831,9 +1039,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) Offset pd_lower = phdr->pd_lower; Offset pd_upper = phdr->pd_upper; Offset pd_special = phdr->pd_special; - itemIdSortData itemidbase[MaxIndexTuplesPerPage]; + Offset last_offset; + itemIdCompactData itemidbase[MaxIndexTuplesPerPage]; ItemIdData newitemids[MaxIndexTuplesPerPage]; - itemIdSort itemidptr; + itemIdCompact itemidptr; ItemId lp; int nline, nused; @@ -842,6 +1051,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) unsigned offset; int nextitm; OffsetNumber offnum; + bool presorted = true; /* For now */ Assert(nitems <= MaxIndexTuplesPerPage); @@ -883,6 +1093,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) totallen = 0; nused = 0; nextitm = 0; + last_offset = pd_special; for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum)) { lp = PageGetItemId(page, offnum); @@ -906,6 +1117,12 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) { itemidptr->offsetindex = nused; /* where it will go */ itemidptr->itemoff = offset; + + if (last_offset > itemidptr->itemoff) + last_offset = itemidptr->itemoff; + else + presorted = false; + itemidptr->alignedlen = MAXALIGN(size); totallen += itemidptr->alignedlen; newitemids[nused] = *lp; @@ -932,7 +1149,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData); /* and compactify the tuple data */ - compactify_tuples(itemidbase, nused, page); + if (nused > 0) + compactify_tuples(itemidbase, nused, page, presorted); + else + phdr->pd_upper = pd_special; } |