aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gist
diff options
context:
space:
mode:
authorTeodor Sigaev <teodor@sigaev.ru>2006-05-19 11:10:25 +0000
committerTeodor Sigaev <teodor@sigaev.ru>2006-05-19 11:10:25 +0000
commit5890790b4a510ada1e6e00eb01de759f1dbe9ab3 (patch)
tree8c772cbed66f78657f58489151203243d2802439 /src/backend/access/gist
parent19892feb3c86eda36769f19b8ad868cf40d9a10f (diff)
downloadpostgresql-5890790b4a510ada1e6e00eb01de759f1dbe9ab3.tar.gz
postgresql-5890790b4a510ada1e6e00eb01de759f1dbe9ab3.zip
Rework completion of incomplete inserts. Now it writes
WAL log during inserts.
Diffstat (limited to 'src/backend/access/gist')
-rw-r--r--src/backend/access/gist/gistvacuum.c20
-rw-r--r--src/backend/access/gist/gistxlog.c270
2 files changed, 184 insertions, 106 deletions
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 9b32304d1ae..a47d81db78e 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.22 2006/05/19 11:10:25 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -104,19 +104,25 @@ gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
if (!gv->index->rd_istemp)
{
- XLogRecData rdata;
+ XLogRecData rdata[2];
XLogRecPtr recptr;
gistxlogPageDelete xlrec;
xlrec.node = gv->index->rd_node;
xlrec.blkno = blkno;
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(gistxlogPageDelete);
- rdata.next = NULL;
+ rdata[0].buffer = buffer;
+ rdata[0].buffer_std = true;
+ rdata[0].data = NULL;
+ rdata[0].len = 0;
+ rdata[0].next = &(rdata[1]);
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) &xlrec;
+ rdata[1].len = sizeof(gistxlogPageDelete);
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 01dab119b2e..1126727cd97 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -73,8 +73,18 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
PageSplitRecord *xlinfo /* to extract blkno info */ )
{
- MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
- gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
+ MemoryContext oldCxt;
+ gistIncompleteInsert *ninsert;
+
+ if ( !ItemPointerIsValid(&key) )
+ /*
+ * if key is null then we should not store insertion as incomplete,
+ * because it's a vacuum operation..
+ */
+ return;
+
+ oldCxt = MemoryContextSwitchTo(insertCtx);
+ ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
ninsert->node = node;
ninsert->key = key;
@@ -115,6 +125,12 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
{
ListCell *l;
+ if ( !ItemPointerIsValid(&key) )
+ return;
+
+ if (incomplete_inserts==NIL)
+ return;
+
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
@@ -180,16 +196,13 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
Page page;
/* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
- if (ItemPointerIsValid(&(xldata->key)))
- {
- if (incomplete_inserts != NIL)
- forgetIncompleteInsert(xldata->node, xldata->key);
+ forgetIncompleteInsert(xldata->node, xldata->key);
- if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
- pushIncompleteInsert(xldata->node, lsn, xldata->key,
- &(xldata->blkno), 1,
- NULL);
- }
+ if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
+ /* operation with root always finalizes insertion */
+ pushIncompleteInsert(xldata->node, lsn, xldata->key,
+ &(xldata->blkno), 1,
+ NULL);
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
@@ -252,12 +265,15 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* nothing else to do if page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
reln = XLogOpenRelation(xldata->node);
buffer = XLogReadBuffer(reln, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
- GISTInitBuffer( buffer, 0 );
page = (Page) BufferGetPage(buffer);
GistPageSetDeleted(page);
@@ -333,15 +349,11 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
}
- if (ItemPointerIsValid(&(xlrec.data->key)))
- {
- if (incomplete_inserts != NIL)
- forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+ forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
- pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
- NULL, 0,
- &xlrec);
- }
+ pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
+ NULL, 0,
+ &xlrec);
}
static void
@@ -536,7 +548,43 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
insert->path[i++] = ptr->blkno;
}
else
- elog(LOG, "lost parent for block %u", insert->origblkno);
+ elog(ERROR, "lost parent for block %u", insert->origblkno);
+}
+
+static SplitedPageLayout*
+gistMakePageLayout(Buffer *buffers, int nbuffers) {
+ SplitedPageLayout *res=NULL, *resptr;
+
+ while( nbuffers-- > 0 ) {
+ Page page = BufferGetPage( buffers[ nbuffers ] );
+ IndexTuple idxtup;
+ OffsetNumber i;
+ char *ptr;
+
+ resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
+
+ resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
+ resptr->block.num = PageGetMaxOffsetNumber( page );
+
+ for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
+ idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ resptr->lenlist += IndexTupleSize(idxtup);
+ }
+
+ resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
+ ptr = (char*)(resptr->list);
+
+ for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
+ idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
+ ptr += IndexTupleSize(idxtup);
+ }
+
+ resptr->next = res;
+ res = resptr;
+ }
+
+ return res;
}
/*
@@ -548,11 +596,11 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
* Note that we assume the index is now in a valid state, except for the
* unfinished insertion. In particular it's safe to invoke gistFindPath();
* there shouldn't be any garbage pages for it to run into.
- *
- * Although stored LSN in gistIncompleteInsert is a LSN of child page,
- * we can compare it with LSN of parent, because parent is always locked
- * while we change child page (look at gistmakedeal). So if parent's LSN is
- * less than stored lsn then changes in parent aren't done yet.
+ *
+ * To complete insert we can't use basic insertion algorithm because
+ * during insertion we can't call user-defined support functions of opclass.
+ * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
+ * 'invalid' tuple should be updated by vacuum full.
*/
static void
gistContinueInsert(gistIncompleteInsert *insert)
@@ -574,39 +622,27 @@ gistContinueInsert(gistIncompleteInsert *insert)
for (i = 0; i < insert->lenblk; i++)
itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
+ /*
+ * any insertion of itup[] should make LOG message about
+ */
+
if (insert->origblkno == GIST_ROOT_BLKNO)
{
/*
* it was split root, so we should only make new root. it can't be
- * simple insert into root, look at call pushIncompleteInsert in
- * gistRedoPageSplitRecord
+ * simple insert into root, we should replace all content of root.
*/
Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
- Page page;
-
- Assert(BufferIsValid(buffer));
- page = BufferGetPage(buffer);
- GISTInitBuffer(buffer, 0);
- gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
-
- PageSetLSN(page, insert->lsn);
- PageSetTLI(page, ThisTimeLineID);
-
- MarkBufferDirty(buffer);
+ gistnewroot(index, buffer, itup, lenitup, NULL);
UnlockReleaseBuffer(buffer);
-
- /*
- * XXX fall out to avoid making LOG message at bottom of routine.
- * I think the logic for when to emit that message is all wrong...
- */
- return;
}
else
{
Buffer *buffers;
Page *pages;
int numbuffer;
+ OffsetNumber *todelete;
/* construct path */
gistxlogFindPath(index, insert);
@@ -615,49 +651,60 @@ gistContinueInsert(gistIncompleteInsert *insert)
buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
+ todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
for (i = 0; i < insert->pathlen; i++)
{
int j,
k,
- pituplen = 0,
- childfound = 0;
+ pituplen = 0;
+ XLogRecData *rdata;
+ XLogRecPtr recptr;
+ Buffer tempbuffer = InvalidBuffer;
+ int ntodelete = 0;
numbuffer = 1;
- buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
- LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
- pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
+ buffers[0] = ReadBuffer(index, insert->path[i]);
+ LockBuffer(buffers[0], GIST_EXCLUSIVE);
+ /*
+ * we check buffer, because we restored page earlier
+ */
+ gistcheckpage(index, buffers[0]);
- if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
- {
- UnlockReleaseBuffer(buffers[numbuffer - 1]);
- return;
- }
+ pages[0] = BufferGetPage(buffers[0]);
+ Assert( !GistPageIsLeaf(pages[0]) );
- pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]);
+ pituplen = PageGetMaxOffsetNumber(pages[0]);
- /* remove old IndexTuples */
- for (j = 0; j < pituplen && childfound < lenitup; j++)
+ /* find remove old IndexTuples to remove */
+ for (j = 0; j < pituplen && ntodelete < lenitup; j++)
{
BlockNumber blkno;
- ItemId iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber);
- IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid);
+ ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
+ IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
for (k = 0; k < lenitup; k++)
if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
{
- PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber);
- j--;
- pituplen--;
- childfound++;
+ todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
+ ntodelete++;
break;
}
}
- if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
+ if ( ntodelete == 0 )
+ elog(PANIC,"gistContinueInsert: can't find pointer to page(s)");
+
+ /*
+ * we check space with subtraction only first tuple to delete, hope,
+ * that wiil be enough space....
+ */
+
+ if (gistnospace(pages[0], itup, lenitup, *todelete))
{
+
/* no space left on page, so we must split */
buffers[numbuffer] = ReadBuffer(index, P_NEW);
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
@@ -668,62 +715,86 @@ gistContinueInsert(gistIncompleteInsert *insert)
if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
{
- IndexTuple *parentitup;
+ Buffer tmp;
/*
- * we split root, just copy tuples from old root to new
- * page
+ * we split root, just copy content from root to new page
*/
- parentitup = gistextractpage(pages[numbuffer - 1],
- &pituplen);
/* sanity check */
if (i + 1 != insert->pathlen)
elog(PANIC, "unexpected pathlen in index \"%s\"",
RelationGetRelationName(index));
- /* fill new page */
- buffers[numbuffer] = ReadBuffer(index, P_NEW);
- LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
- GISTInitBuffer(buffers[numbuffer], 0);
- pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
- gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
- numbuffer++;
-
- /* fill root page */
- GISTInitBuffer(buffers[0], 0);
- for (j = 1; j < numbuffer; j++)
- {
- IndexTuple tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
-
- if (PageAddItem(pages[0],
- (Item) tuple,
- IndexTupleSize(tuple),
- (OffsetNumber) j,
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "failed to add item to index page in \"%s\"",
- RelationGetRelationName(index));
- }
+ /* fill new page, root will be changed later */
+ tempbuffer = ReadBuffer(index, P_NEW);
+ LockBuffer(tempbuffer, GIST_EXCLUSIVE);
+ memcpy( BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer) );
+
+ /* swap buffers[0] (was root) and temp buffer */
+ tmp = buffers[0];
+ buffers[0] = tempbuffer;
+ tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, it is still unchanged */
+
+ pages[0] = BufferGetPage(buffers[0]);
}
+
+ START_CRIT_SECTION();
+
+ for(j=0;j<ntodelete;j++)
+ PageIndexTupleDelete(pages[0], todelete[j]);
+
+ rdata = formSplitRdata(index->rd_node, insert->path[i],
+ false, &(insert->key),
+ gistMakePageLayout( buffers, numbuffer ) );
+
+ } else {
+ START_CRIT_SECTION();
+
+ for(j=0;j<ntodelete;j++)
+ PageIndexTupleDelete(pages[0], todelete[j]);
+ gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);
+
+ rdata = formUpdateRdata(index->rd_node, buffers[0],
+ todelete, ntodelete,
+ itup, lenitup, &(insert->key));
}
- else
- gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber);
- lenitup = numbuffer;
+ /*
+ * use insert->key as mark for completion of insert (form*Rdata() above)
+ * for following possible replays
+ */
+
+ /* write pages with XLOG LSN */
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
for (j = 0; j < numbuffer; j++)
{
- itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
- PageSetLSN(pages[j], insert->lsn);
+ PageSetLSN(pages[j], recptr);
PageSetTLI(pages[j], ThisTimeLineID);
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
MarkBufferDirty(buffers[j]);
+ }
+
+ END_CRIT_SECTION();
+
+ lenitup = numbuffer;
+ for (j = 0; j < numbuffer; j++) {
+ itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
UnlockReleaseBuffer(buffers[j]);
}
+
+ if ( tempbuffer != InvalidBuffer ) {
+ /*
+ * it was a root split, so fill it by new values
+ */
+ gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
+ UnlockReleaseBuffer(tempbuffer);
+ }
}
}
ereport(LOG,
- (errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery",
+ (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
errdetail("Incomplete insertion detected during crash replay.")));
}
@@ -747,6 +818,7 @@ gist_xlog_cleanup(void)
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
+
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);