aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/hash/hashinsert.c41
-rw-r--r--src/backend/access/hash/hashovfl.c189
-rw-r--r--src/backend/access/hash/hashpage.c1
-rw-r--r--src/backend/storage/page/bufpage.c27
-rw-r--r--src/include/access/hash.h7
-rw-r--r--src/include/storage/bufpage.h1
6 files changed, 196 insertions, 70 deletions
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index dc63063ac1f..354e7339cf4 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -228,3 +228,44 @@ _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
return itup_off;
}
+
+/*
+ * _hash_pgaddmultitup() -- add a tuple vector to a particular page in the
+ * index.
+ *
+ * This routine has same requirements for locking and tuple ordering as
+ * _hash_pgaddtup().
+ *
+ * Returns the offset number array at which the tuples were inserted.
+ */
+void
+_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
+ OffsetNumber *itup_offsets, uint16 nitups)
+{
+ OffsetNumber itup_off;
+ Page page;
+ uint32 hashkey;
+ int i;
+
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+
+ for (i = 0; i < nitups; i++)
+ {
+ Size itemsize;
+
+ itemsize = IndexTupleDSize(*itups[i]);
+ itemsize = MAXALIGN(itemsize);
+
+ /* Find where to insert the tuple (preserving page's hashkey ordering) */
+ hashkey = _hash_get_indextuple_hashkey(itups[i]);
+ itup_off = _hash_binsearch(page, hashkey);
+
+ itup_offsets[i] = itup_off;
+
+ if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
+ == InvalidOffsetNumber)
+ elog(ERROR, "failed to add index item to \"%s\"",
+ RelationGetRelationName(rel));
+ }
+}
diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c
index 33340893291..ff6c4e295c5 100644
--- a/src/backend/access/hash/hashovfl.c
+++ b/src/backend/access/hash/hashovfl.c
@@ -391,6 +391,8 @@ _hash_firstfreebit(uint32 map)
* Remove this overflow page from its bucket's chain, and mark the page as
* free. On entry, ovflbuf is write-locked; it is released before exiting.
*
+ * Add the tuples (itups) to wbuf.
+ *
* Since this function is invoked in VACUUM, we provide an access strategy
* parameter that controls fetches of the bucket pages.
*
@@ -403,13 +405,16 @@ _hash_firstfreebit(uint32 map)
* has a lock on same.
*/
BlockNumber
-_hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
+_hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
+ Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
+ Size *tups_size, uint16 nitups,
BufferAccessStrategy bstrategy)
{
HashMetaPage metap;
Buffer metabuf;
Buffer mapbuf;
Buffer prevbuf = InvalidBuffer;
+ Buffer nextbuf = InvalidBuffer;
BlockNumber ovflblkno;
BlockNumber prevblkno;
BlockNumber blkno;
@@ -435,15 +440,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
bucket = ovflopaque->hasho_bucket;
/*
- * Zero the page for debugging's sake; then write and release it. (Note:
- * if we failed to zero the page here, we'd have problems with the Assert
- * in _hash_pageinit() when the page is reused.)
- */
- MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
- MarkBufferDirty(ovflbuf);
- _hash_relbuf(rel, ovflbuf);
-
- /*
* Fix up the bucket chain. this is a doubly-linked list, so we must fix
* up the bucket chain members behind and ahead of the overflow page being
* deleted. Concurrency issues are avoided by using lock chaining as
@@ -451,9 +447,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
*/
if (BlockNumberIsValid(prevblkno))
{
- Page prevpage;
- HashPageOpaque prevopaque;
-
if (prevblkno == writeblkno)
prevbuf = wbuf;
else
@@ -462,32 +455,13 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
HASH_WRITE,
LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
bstrategy);
-
- prevpage = BufferGetPage(prevbuf);
- prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
-
- Assert(prevopaque->hasho_bucket == bucket);
- prevopaque->hasho_nextblkno = nextblkno;
-
- MarkBufferDirty(prevbuf);
- if (prevblkno != writeblkno)
- _hash_relbuf(rel, prevbuf);
}
if (BlockNumberIsValid(nextblkno))
- {
- Buffer nextbuf = _hash_getbuf_with_strategy(rel,
- nextblkno,
- HASH_WRITE,
- LH_OVERFLOW_PAGE,
- bstrategy);
- Page nextpage = BufferGetPage(nextbuf);
- HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
-
- Assert(nextopaque->hasho_bucket == bucket);
- nextopaque->hasho_prevblkno = prevblkno;
- MarkBufferDirty(nextbuf);
- _hash_relbuf(rel, nextbuf);
- }
+ nextbuf = _hash_getbuf_with_strategy(rel,
+ nextblkno,
+ HASH_WRITE,
+ LH_OVERFLOW_PAGE,
+ bstrategy);
/* Note: bstrategy is intentionally not used for metapage and bitmap */
@@ -508,24 +482,71 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
/* Release metapage lock while we access the bitmap page */
LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
- /* Clear the bitmap bit to indicate that this overflow page is free */
+ /* read the bitmap page to clear the bitmap bit */
mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
mappage = BufferGetPage(mapbuf);
freep = HashPageGetBitmap(mappage);
Assert(ISSET(freep, bitmapbit));
- CLRBIT(freep, bitmapbit);
- MarkBufferDirty(mapbuf);
- _hash_relbuf(rel, mapbuf);
/* Get write-lock on metapage to update firstfree */
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+ /*
+ * we have to insert tuples on the "write" page, being careful to preserve
+ * hashkey ordering. (If we insert many tuples into the same "write" page
+ * it would be worth qsort'ing them).
+ */
+ if (nitups > 0)
+ {
+ _hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
+ MarkBufferDirty(wbuf);
+ }
+
+ /* Initialize the freed overflow page. */
+ _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
+ MarkBufferDirty(ovflbuf);
+
+ if (BufferIsValid(prevbuf))
+ {
+ Page prevpage = BufferGetPage(prevbuf);
+ HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+ Assert(prevopaque->hasho_bucket == bucket);
+ prevopaque->hasho_nextblkno = nextblkno;
+ MarkBufferDirty(prevbuf);
+ }
+ if (BufferIsValid(nextbuf))
+ {
+ Page nextpage = BufferGetPage(nextbuf);
+ HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
+
+ Assert(nextopaque->hasho_bucket == bucket);
+ nextopaque->hasho_prevblkno = prevblkno;
+ MarkBufferDirty(nextbuf);
+ }
+
+ /* Clear the bitmap bit to indicate that this overflow page is free */
+ CLRBIT(freep, bitmapbit);
+ MarkBufferDirty(mapbuf);
+
/* if this is now the first free page, update hashm_firstfree */
if (ovflbitno < metap->hashm_firstfree)
{
metap->hashm_firstfree = ovflbitno;
MarkBufferDirty(metabuf);
}
+
+ /* release previous bucket if it is not same as write bucket */
+ if (BufferIsValid(prevbuf) && prevblkno != writeblkno)
+ _hash_relbuf(rel, prevbuf);
+
+ if (BufferIsValid(ovflbuf))
+ _hash_relbuf(rel, ovflbuf);
+
+ if (BufferIsValid(nextbuf))
+ _hash_relbuf(rel, nextbuf);
+
+ _hash_relbuf(rel, mapbuf);
_hash_relbuf(rel, metabuf);
return nextblkno;
@@ -640,7 +661,6 @@ _hash_squeezebucket(Relation rel,
Page rpage;
HashPageOpaque wopaque;
HashPageOpaque ropaque;
- bool wbuf_dirty;
/*
* start squeezing into the primary bucket page.
@@ -686,15 +706,21 @@ _hash_squeezebucket(Relation rel,
/*
* squeeze the tuples.
*/
- wbuf_dirty = false;
for (;;)
{
OffsetNumber roffnum;
OffsetNumber maxroffnum;
OffsetNumber deletable[MaxOffsetNumber];
- int ndeletable = 0;
+ IndexTuple itups[MaxIndexTuplesPerPage];
+ Size tups_size[MaxIndexTuplesPerPage];
+ OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
+ uint16 ndeletable = 0;
+ uint16 nitups = 0;
+ Size all_tups_size = 0;
+ int i;
bool retain_pin = false;
+readpage:
/* Scan each tuple in "read" page */
maxroffnum = PageGetMaxOffsetNumber(rpage);
for (roffnum = FirstOffsetNumber;
@@ -715,11 +741,13 @@ _hash_squeezebucket(Relation rel,
/*
* Walk up the bucket chain, looking for a page big enough for
- * this item. Exit if we reach the read page.
+ * this item and all other accumulated items. Exit if we reach
+ * the read page.
*/
- while (PageGetFreeSpace(wpage) < itemsz)
+ while (PageGetFreeSpaceForMultipleTuples(wpage, nitups + 1) < (all_tups_size + itemsz))
{
Buffer next_wbuf = InvalidBuffer;
+ bool tups_moved = false;
Assert(!PageIsEmpty(wpage));
@@ -737,12 +765,30 @@ _hash_squeezebucket(Relation rel,
LH_OVERFLOW_PAGE,
bstrategy);
+ if (nitups > 0)
+ {
+ Assert(nitups == ndeletable);
+
+ /*
+ * we have to insert tuples on the "write" page, being
+ * careful to preserve hashkey ordering. (If we insert
+ * many tuples into the same "write" page it would be
+ * worth qsort'ing them).
+ */
+ _hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
+ MarkBufferDirty(wbuf);
+
+ /* Delete tuples we already moved off read page */
+ PageIndexMultiDelete(rpage, deletable, ndeletable);
+ MarkBufferDirty(rbuf);
+
+ tups_moved = true;
+ }
+
/*
* release the lock on previous page after acquiring the lock
* on next page
*/
- if (wbuf_dirty)
- MarkBufferDirty(wbuf);
if (retain_pin)
LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
else
@@ -751,12 +797,6 @@ _hash_squeezebucket(Relation rel,
/* nothing more to do if we reached the read page */
if (rblkno == wblkno)
{
- if (ndeletable > 0)
- {
- /* Delete tuples we already moved off read page */
- PageIndexMultiDelete(rpage, deletable, ndeletable);
- MarkBufferDirty(rbuf);
- }
_hash_relbuf(rel, rbuf);
return;
}
@@ -765,21 +805,34 @@ _hash_squeezebucket(Relation rel,
wpage = BufferGetPage(wbuf);
wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
Assert(wopaque->hasho_bucket == bucket);
- wbuf_dirty = false;
retain_pin = false;
- }
- /*
- * we have found room so insert on the "write" page, being careful
- * to preserve hashkey ordering. (If we insert many tuples into
- * the same "write" page it would be worth qsort'ing instead of
- * doing repeated _hash_pgaddtup.)
- */
- (void) _hash_pgaddtup(rel, wbuf, itemsz, itup);
- wbuf_dirty = true;
+ /* be tidy */
+ for (i = 0; i < nitups; i++)
+ pfree(itups[i]);
+ nitups = 0;
+ all_tups_size = 0;
+ ndeletable = 0;
+
+ /*
+ * after moving the tuples, rpage would have been compacted,
+ * so we need to rescan it.
+ */
+ if (tups_moved)
+ goto readpage;
+ }
/* remember tuple for deletion from "read" page */
deletable[ndeletable++] = roffnum;
+
+ /*
+ * we need a copy of index tuples as they can be freed as part of
+ * overflow page, however we need them to write a WAL record in
+ * _hash_freeovflpage.
+ */
+ itups[nitups] = CopyIndexTuple(itup);
+ tups_size[nitups++] = itemsz;
+ all_tups_size += itemsz;
}
/*
@@ -797,10 +850,12 @@ _hash_squeezebucket(Relation rel,
Assert(BlockNumberIsValid(rblkno));
/* free this overflow page (releases rbuf) */
- _hash_freeovflpage(rel, rbuf, wbuf, bstrategy);
+ _hash_freeovflpage(rel, bucket_buf, rbuf, wbuf, itups, itup_offsets,
+ tups_size, nitups, bstrategy);
- if (wbuf_dirty)
- MarkBufferDirty(wbuf);
+ /* be tidy */
+ for (i = 0; i < nitups; i++)
+ pfree(itups[i]);
/* are we freeing the page adjacent to wbuf? */
if (rblkno == wblkno)
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 9485978bfb0..00f3ea81a78 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -470,7 +470,6 @@ _hash_metapinit(Relation rel, double num_tuples, ForkNumber forkNum)
void
_hash_pageinit(Page page, Size size)
{
- Assert(PageIsNew(page));
PageInit(page, size, sizeof(HashPageOpaqueData));
}
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 6fc5fa4d05b..fdf045a45b0 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -598,6 +598,33 @@ PageGetFreeSpace(Page page)
}
/*
+ * PageGetFreeSpaceForMultipleTuples
+ * Returns the size of the free (allocatable) space on a page,
+ * reduced by the space needed for multiple new line pointers.
+ *
+ * Note: this should usually only be used on index pages. Use
+ * PageGetHeapFreeSpace on heap pages.
+ */
+Size
+PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
+{
+ int space;
+
+ /*
+ * Use signed arithmetic here so that we behave sensibly if pd_lower >
+ * pd_upper.
+ */
+ space = (int) ((PageHeader) page)->pd_upper -
+ (int) ((PageHeader) page)->pd_lower;
+
+ if (space < (int) (ntups * sizeof(ItemIdData)))
+ return 0;
+ space -= ntups * sizeof(ItemIdData);
+
+ return (Size) space;
+}
+
+/*
* PageGetExactFreeSpace
* Returns the size of the free (allocatable) space on a page,
* without any consideration for adding/removing line pointers.
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index 3bf587b1b70..5767deb0295 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -303,11 +303,14 @@ extern Datum hash_uint32(uint32 k);
extern void _hash_doinsert(Relation rel, IndexTuple itup);
extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
Size itemsize, IndexTuple itup);
+extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
+ OffsetNumber *itup_offsets, uint16 nitups);
/* hashovfl.c */
extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin);
-extern BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
- BufferAccessStrategy bstrategy);
+extern BlockNumber _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
+ Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
+ Size *tups_size, uint16 nitups, BufferAccessStrategy bstrategy);
extern void _hash_initbitmap(Relation rel, HashMetaPage metap,
BlockNumber blkno, ForkNumber forkNum);
extern void _hash_squeezebucket(Relation rel,
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 294f9cb85ac..e956dc33860 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -425,6 +425,7 @@ extern Page PageGetTempPageCopySpecial(Page page);
extern void PageRestoreTempPage(Page tempPage, Page oldPage);
extern void PageRepairFragmentation(Page page);
extern Size PageGetFreeSpace(Page page);
+extern Size PageGetFreeSpaceForMultipleTuples(Page page, int ntups);
extern Size PageGetExactFreeSpace(Page page);
extern Size PageGetHeapFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset);