aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginxlog.c')
-rw-r--r--src/backend/access/gin/ginxlog.c544
1 files changed, 544 insertions, 0 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
new file mode 100644
index 00000000000..bc6a458e5fc
--- /dev/null
+++ b/src/backend/access/gin/ginxlog.c
@@ -0,0 +1,544 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginxlog.c
+ * WAL replay logic for inverted index.
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.1 2006/05/02 11:28:54 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+static MemoryContext opCtx; /* working memory for operations */
+static MemoryContext topCtx;
+
+typedef struct ginIncompleteSplit {
+ RelFileNode node;
+ BlockNumber leftBlkno;
+ BlockNumber rightBlkno;
+ BlockNumber rootBlkno;
+} ginIncompleteSplit;
+
+static List *incomplete_splits;
+
+static void
+pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno) {
+ ginIncompleteSplit *split;
+
+ MemoryContextSwitchTo( topCtx );
+
+ split = palloc(sizeof(ginIncompleteSplit));
+
+ split->node = node;
+ split->leftBlkno = leftBlkno;
+ split->rightBlkno = rightBlkno;
+ split->rootBlkno = rootBlkno;
+
+ incomplete_splits = lappend(incomplete_splits, split);
+
+ MemoryContextSwitchTo( opCtx );
+}
+
+static void
+forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno) {
+ ListCell *l;
+
+ foreach(l, incomplete_splits) {
+ ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+
+ if ( RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno ) {
+ incomplete_splits = list_delete_ptr(incomplete_splits, split);
+ break;
+ }
+ }
+}
+
+static void
+ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
+ RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(*node);
+ buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ GinInitBuffer(buffer, GIN_LEAF);
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree*)XLogRecGetData(record);
+ ItemPointerData *items = (ItemPointerData*)(XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(data->node);
+ buffer = XLogReadBuffer(reln, data->blkno, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ GinInitBuffer(buffer, GIN_DATA|GIN_LEAF);
+ memcpy( GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem );
+ GinPageGetOpaque(page)->maxoff = data->nitem;
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogInsert *data = (ginxlogInsert*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ /* nothing else to do if page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ reln = XLogOpenRelation(data->node);
+ buffer = XLogReadBuffer(reln, data->blkno, false);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ if ( data->isData ) {
+ Assert( data->isDelete == FALSE );
+ Assert( GinPageIsData( page ) );
+
+ if ( data->isLeaf ) {
+ OffsetNumber i;
+ ItemPointerData *items = (ItemPointerData*)( XLogRecGetData(record) + sizeof(ginxlogInsert) );
+
+ Assert( GinPageIsLeaf( page ) );
+ Assert( data->updateBlkno == InvalidBlockNumber );
+
+ for(i=0;i<data->nitem;i++)
+ GinDataPageAddItem( page, items+i, data->offset + i );
+ } else {
+ PostingItem *pitem;
+
+ Assert( !GinPageIsLeaf( page ) );
+
+ if ( data->updateBlkno != InvalidBlockNumber ) {
+ /* update link to right page after split */
+ pitem = (PostingItem*)GinDataPageGetItem(page, data->offset);
+ PostingItemSetBlockNumber( pitem, data->updateBlkno );
+ }
+
+ pitem = (PostingItem*)( XLogRecGetData(record) + sizeof(ginxlogInsert) );
+
+ GinDataPageAddItem( page, pitem, data->offset );
+
+ if ( data->updateBlkno != InvalidBlockNumber )
+ forgetIncompleteSplit(data->node, PostingItemGetBlockNumber( pitem ), data->updateBlkno);
+ }
+ } else {
+ IndexTuple itup;
+
+ Assert( !GinPageIsData( page ) );
+
+ if ( data->updateBlkno != InvalidBlockNumber ) {
+ /* update link to right page after split */
+ Assert( !GinPageIsLeaf( page ) );
+ Assert( data->offset>=FirstOffsetNumber && data->offset<=PageGetMaxOffsetNumber(page) );
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
+ ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
+ }
+
+ if ( data->isDelete ) {
+ Assert( GinPageIsLeaf( page ) );
+ Assert( data->offset>=FirstOffsetNumber && data->offset<=PageGetMaxOffsetNumber(page) );
+ PageIndexTupleDelete(page, data->offset);
+ }
+
+ itup = (IndexTuple)( XLogRecGetData(record) + sizeof(ginxlogInsert) );
+
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), data->offset, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+
+ if ( !data->isLeaf && data->updateBlkno != InvalidBlockNumber )
+ forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber( &itup->t_tid ), data->updateBlkno);
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogSplit *data = (ginxlogSplit*)XLogRecGetData(record);
+ Relation reln;
+ Buffer lbuffer, rbuffer;
+ Page lpage, rpage;
+ uint32 flags = 0;
+
+ reln = XLogOpenRelation(data->node);
+
+ if ( data->isLeaf )
+ flags |= GIN_LEAF;
+ if ( data->isData )
+ flags |= GIN_DATA;
+
+ lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
+ Assert(BufferIsValid(lbuffer));
+ lpage = (Page) BufferGetPage(lbuffer);
+ GinInitBuffer(lbuffer, flags);
+
+ rbuffer = XLogReadBuffer(reln, data->rblkno, true);
+ Assert(BufferIsValid(rbuffer));
+ rpage = (Page) BufferGetPage(rbuffer);
+ GinInitBuffer(rbuffer, flags);
+
+ GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber( rbuffer );
+ GinPageGetOpaque(rpage)->rightlink = data->rrlink;
+
+ if ( data->isData ) {
+ char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
+ Size sizeofitem = GinSizeOfItem(lpage);
+ OffsetNumber i;
+ ItemPointer bound;
+
+ for(i=0;i<data->separator;i++) {
+ GinDataPageAddItem( lpage, ptr, InvalidOffsetNumber );
+ ptr += sizeofitem;
+ }
+
+ for(i=data->separator;i<data->nitem;i++) {
+ GinDataPageAddItem( rpage, ptr, InvalidOffsetNumber );
+ ptr += sizeofitem;
+ }
+
+ /* set up right key */
+ bound = GinDataPageGetRightBound(lpage);
+ if ( data->isLeaf )
+ *bound = *(ItemPointerData*)GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff);
+ else
+ *bound = ((PostingItem*)GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff))->key;
+
+ bound = GinDataPageGetRightBound(rpage);
+ *bound = data->rightbound;
+ } else {
+ IndexTuple itup = (IndexTuple)( XLogRecGetData(record) + sizeof(ginxlogSplit) );
+ OffsetNumber i;
+
+ for(i=0;i<data->separator;i++) {
+ if ( PageAddItem( lpage, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+ itup = (IndexTuple)( ((char*)itup) + MAXALIGN( IndexTupleSize(itup) ) );
+ }
+
+ for(i=data->separator;i<data->nitem;i++) {
+ if ( PageAddItem( rpage, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+ itup = (IndexTuple)( ((char*)itup) + MAXALIGN( IndexTupleSize(itup) ) );
+ }
+ }
+
+ PageSetLSN(rpage, lsn);
+ PageSetTLI(lpage, ThisTimeLineID);
+ MarkBufferDirty(rbuffer);
+
+ PageSetLSN(lpage, lsn);
+ PageSetTLI(lpage, ThisTimeLineID);
+ MarkBufferDirty(lbuffer);
+
+ if ( !data->isLeaf && data->updateBlkno != InvalidBlockNumber )
+ forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
+
+ if ( data->isRootSplit ) {
+ Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
+ Page rootPage = BufferGetPage( rootBuf );
+
+ GinInitBuffer( rootBuf, flags & ~GIN_LEAF );
+
+ if ( data->isData ) {
+ Assert( data->rootBlkno != GIN_ROOT_BLKNO );
+ dataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ } else {
+ Assert( data->rootBlkno == GIN_ROOT_BLKNO );
+ entryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ }
+
+ PageSetLSN(rootPage, lsn);
+ PageSetTLI(rootPage, ThisTimeLineID);
+
+ MarkBufferDirty(rootBuf);
+ UnlockReleaseBuffer(rootBuf);
+ } else
+ pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
+
+ UnlockReleaseBuffer(rbuffer);
+ UnlockReleaseBuffer(lbuffer);
+}
+
+static void
+ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogVacuumPage *data = (ginxlogVacuumPage*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ /* nothing else to do if page was backed up (and no info to do it with) */
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ reln = XLogOpenRelation(data->node);
+ buffer = XLogReadBuffer(reln, data->blkno, false);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ if ( GinPageIsData( page ) ) {
+ memcpy( GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+ GinSizeOfItem(page) * data->nitem );
+ GinPageGetOpaque(page)->maxoff = data->nitem;
+ } else {
+ OffsetNumber i, *tod;
+ IndexTuple itup = (IndexTuple)( XLogRecGetData(record) + sizeof(ginxlogVacuumPage) );
+
+ tod = (OffsetNumber*)palloc( sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page) );
+ for(i=FirstOffsetNumber;i<=PageGetMaxOffsetNumber(page);i++)
+ tod[i-1] = i;
+
+ PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+
+ for(i=0;i<data->nitem;i++) {
+ if ( PageAddItem( page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber )
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode );
+ itup = (IndexTuple)( ((char*)itup) + MAXALIGN( IndexTupleSize(itup) ) );
+ }
+ }
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
+ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) {
+ ginxlogDeletePage *data = (ginxlogDeletePage*)XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+
+ reln = XLogOpenRelation(data->node);
+
+ if ( !( record->xl_info & XLR_BKP_BLOCK_1) ) {
+ buffer = XLogReadBuffer(reln, data->blkno, false);
+ page = BufferGetPage( buffer );
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->flags = GIN_DELETED;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+
+ if ( !( record->xl_info & XLR_BKP_BLOCK_2) ) {
+ buffer = XLogReadBuffer(reln, data->parentBlkno, false);
+ page = BufferGetPage( buffer );
+ Assert(GinPageIsData(page));
+ Assert(!GinPageIsLeaf(page));
+ PageDeletePostingItem(page, data->parentOffset);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+
+ if ( !( record->xl_info & XLR_BKP_BLOCK_2) && data->leftBlkno != InvalidBlockNumber ) {
+ buffer = XLogReadBuffer(reln, data->leftBlkno, false);
+ page = BufferGetPage( buffer );
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->rightlink = data->rightLink;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+ }
+}
+
+void
+gin_redo(XLogRecPtr lsn, XLogRecord *record) {
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ topCtx = MemoryContextSwitchTo(opCtx);
+ switch (info) {
+ case XLOG_GIN_CREATE_INDEX:
+ ginRedoCreateIndex(lsn, record);
+ break;
+ case XLOG_GIN_CREATE_PTREE:
+ ginRedoCreatePTree(lsn, record);
+ break;
+ case XLOG_GIN_INSERT:
+ ginRedoInsert(lsn, record);
+ break;
+ case XLOG_GIN_SPLIT:
+ ginRedoSplit(lsn, record);
+ break;
+ case XLOG_GIN_VACUUM_PAGE:
+ ginRedoVacuumPage(lsn, record);
+ break;
+ case XLOG_GIN_DELETE_PAGE:
+ ginRedoDeletePage(lsn, record);
+ break;
+ default:
+ elog(PANIC, "gin_redo: unknown op code %u", info);
+ }
+ MemoryContextSwitchTo(topCtx);
+ MemoryContextReset(opCtx);
+}
+
+static void
+desc_node( StringInfo buf, RelFileNode node, BlockNumber blkno ) {
+ appendStringInfo(buf,"node: %u/%u/%u blkno: %u",
+ node.spcNode, node.dbNode, node.relNode, blkno);
+}
+
+void
+gin_desc(StringInfo buf, uint8 xl_info, char *rec) {
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ switch (info) {
+ case XLOG_GIN_CREATE_INDEX:
+ appendStringInfo(buf,"Create index, ");
+ desc_node(buf, *(RelFileNode*)rec, GIN_ROOT_BLKNO );
+ break;
+ case XLOG_GIN_CREATE_PTREE:
+ appendStringInfo(buf,"Create posting tree, ");
+ desc_node(buf, ((ginxlogCreatePostingTree*)rec)->node, ((ginxlogCreatePostingTree*)rec)->blkno );
+ break;
+ case XLOG_GIN_INSERT:
+ appendStringInfo(buf,"Insert item, ");
+ desc_node(buf, ((ginxlogInsert*)rec)->node, ((ginxlogInsert*)rec)->blkno );
+ appendStringInfo(buf," offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
+ ((ginxlogInsert*)rec)->offset,
+ ((ginxlogInsert*)rec)->nitem,
+ ( ((ginxlogInsert*)rec)->isData ) ? 'T' : 'F',
+ ( ((ginxlogInsert*)rec)->isLeaf ) ? 'T' : 'F',
+ ( ((ginxlogInsert*)rec)->isDelete ) ? 'T' : 'F',
+ ((ginxlogInsert*)rec)->updateBlkno
+ );
+
+ break;
+ case XLOG_GIN_SPLIT:
+ appendStringInfo(buf,"Page split, ");
+ desc_node(buf, ((ginxlogSplit*)rec)->node, ((ginxlogSplit*)rec)->lblkno );
+ appendStringInfo(buf," isrootsplit: %c", ( ((ginxlogSplit*)rec)->isRootSplit ) ? 'T' : 'F');
+ break;
+ case XLOG_GIN_VACUUM_PAGE:
+ appendStringInfo(buf,"Vacuum page, ");
+ desc_node(buf, ((ginxlogVacuumPage*)rec)->node, ((ginxlogVacuumPage*)rec)->blkno );
+ break;
+ case XLOG_GIN_DELETE_PAGE:
+ appendStringInfo(buf,"Delete page, ");
+ desc_node(buf, ((ginxlogDeletePage*)rec)->node, ((ginxlogDeletePage*)rec)->blkno );
+ break;
+ default:
+ elog(PANIC, "gin_desc: unknown op code %u", info);
+ }
+}
+
+void
+gin_xlog_startup(void) {
+ incomplete_splits = NIL;
+
+ opCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "GIN recovery temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+}
+
+static void
+ginContinueSplit( ginIncompleteSplit *split ) {
+ GinBtreeData btree;
+ Relation reln;
+ Buffer buffer;
+ GinBtreeStack stack;
+
+ /* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno, split->leftBlkno, split->rightBlkno); */
+ reln = XLogOpenRelation(split->node);
+
+ buffer = XLogReadBuffer(reln, split->leftBlkno, false);
+
+ if ( split->rootBlkno == GIN_ROOT_BLKNO ) {
+ prepareEntryScan( &btree, reln, (Datum)0, NULL );
+ btree.entry = ginPageGetLinkItup( buffer );
+ } else {
+ Page page = BufferGetPage( buffer );
+
+ prepareDataScan( &btree, reln );
+
+ PostingItemSetBlockNumber( &(btree.pitem), split->leftBlkno );
+ if ( GinPageIsLeaf(page) )
+ btree.pitem.key = *(ItemPointerData*)GinDataPageGetItem(page,
+ GinPageGetOpaque(page)->maxoff);
+ else
+ btree.pitem.key = ((PostingItem*)GinDataPageGetItem(page,
+ GinPageGetOpaque(page)->maxoff))->key;
+ }
+
+ btree.rightblkno = split->rightBlkno;
+
+ stack.blkno = split->leftBlkno;
+ stack.buffer = buffer;
+ stack.off = InvalidOffsetNumber;
+ stack.parent = NULL;
+
+ findParents( &btree, &stack, split->rootBlkno);
+ ginInsertValue( &btree, stack.parent );
+
+ UnlockReleaseBuffer( buffer );
+}
+
+void
+gin_xlog_cleanup(void) {
+ ListCell *l;
+ MemoryContext topCtx;
+
+ topCtx = MemoryContextSwitchTo(opCtx);
+
+ foreach(l, incomplete_splits) {
+ ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+ ginContinueSplit( split );
+ MemoryContextReset( opCtx );
+ }
+
+ MemoryContextSwitchTo(topCtx);
+ MemoryContextDelete(opCtx);
+}
+