diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-27 12:45:23 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-27 12:45:23 +0000 |
commit | e8cab5fe49c45e9dc2990e36ecd6a42bf01dc4bc (patch) | |
tree | b4950c8a1550ce26aa276a157b8680732ae3ac9b /src/backend/access/gist/gistxlog.c | |
parent | c3be085ab7a21e01f530357d962fa22f74a637ef (diff) | |
download | postgresql-e8cab5fe49c45e9dc2990e36ecd6a42bf01dc4bc.tar.gz postgresql-e8cab5fe49c45e9dc2990e36ecd6a42bf01dc4bc.zip |
Concurrency for GiST
- full concurrency for insert/update/select/vacuum:
- select and vacuum never locks more than one page simultaneously
- select (gettuple) hasn't any lock across it's calls
- insert never locks more than two page simultaneously:
- during search of leaf to insert it locks only one page
simultaneously
- while walk upward to the root it locked only parent (may be
non-direct parent) and child. One of them X-lock, another may
be S- or X-lock
- 'vacuum full' locks index
- improve gistgetmulti
- simplify XLOG records
Fix bug in index_beginscan_internal: LockRelation may clean
rd_aminfo structure, so move GET_REL_PROCEDURE after LockRelation
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 101 |
1 files changed, 42 insertions, 59 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index d2f2697affa..de897894960 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.3 2005/06/20 15:22:37 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.4 2005/06/27 12:45:22 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -27,7 +27,6 @@ typedef struct { gistxlogEntryUpdate *data; int len; IndexTuple *itup; - BlockNumber *path; OffsetNumber *todelete; } EntryUpdateRecord; @@ -39,7 +38,6 @@ typedef struct { typedef struct { gistxlogPageSplit *data; NewPage *page; - BlockNumber *path; } PageSplitRecord; /* track for incomplete inserts, idea was taken from nbtxlog.c */ @@ -49,9 +47,9 @@ typedef struct gistIncompleteInsert { ItemPointerData key; int lenblk; BlockNumber *blkno; - int pathlen; - BlockNumber *path; XLogRecPtr lsn; + BlockNumber *path; + int pathlen; } gistIncompleteInsert; @@ -69,7 +67,6 @@ static List *incomplete_inserts; static void pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, BlockNumber *blkno, int lenblk, - BlockNumber *path, int pathlen, PageSplitRecord *xlinfo /* to extract blkno info */ ) { MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) ); @@ -93,15 +90,6 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, } Assert( ninsert->lenblk>0 ); - if ( path && pathlen ) { - ninsert->pathlen = pathlen; - ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen ); - memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen); - } else { - ninsert->pathlen = 0; - ninsert->path = NULL; - } - incomplete_inserts = lappend(incomplete_inserts, ninsert); MemoryContextSwitchTo(oldCxt); } @@ -116,7 +104,6 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) { if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) { /* found */ - if ( insert->path ) pfree( insert->path ); pfree( insert->blkno ); incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); pfree( insert ); @@ -132,15 +119,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { decoded->data = (gistxlogEntryUpdate*)begin; - if ( decoded->data->pathlen ) { - addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); - decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); - } else - decoded->path = NULL; - if ( decoded->data->ntodelete ) { decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath); - addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); + addpath = MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); } else decoded->todelete = NULL; @@ -244,7 +225,6 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO ) pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, &(xlrec.data->blkno), 1, - xlrec.path, xlrec.data->pathlen, NULL); } } @@ -252,18 +232,12 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; - int j,i=0, addpath = 0; + int j,i=0; decoded->data = (gistxlogPageSplit*)begin; decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage ); - if ( decoded->data->pathlen ) { - addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); - decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit )); - } else - decoded->path = NULL; - - ptr=begin+sizeof( gistxlogPageSplit ) + addpath; + ptr=begin+sizeof( gistxlogPageSplit ); for(i=0;i<decoded->data->npage;i++) { Assert( ptr - begin < record->xl_len ); decoded->page[i].header = (gistxlogPage*)ptr; @@ -342,7 +316,6 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, NULL, 0, - xlrec.path, xlrec.data->pathlen, &xlrec); } } @@ -500,6 +473,36 @@ gist_form_invalid_tuple(BlockNumber blkno) { } static void +gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) { + int i; + GISTInsertStack *top; + + insert->pathlen = 0; + insert->path = NULL; + + for(i=0;insert->lenblk;i++) { + if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) { + GISTInsertStack *ptr=top; + while(ptr) { + insert->pathlen++; + ptr = ptr->parent; + } + + insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen ); + + i=0; + ptr = top; + while(ptr) { + insert->path[i] = ptr->blkno; + i++; + ptr = ptr->parent; + } + break; + } + } +} + +static void gistContinueInsert(gistIncompleteInsert *insert) { IndexTuple *itup; int i, lenitup; @@ -523,6 +526,9 @@ gistContinueInsert(gistIncompleteInsert *insert) { for(i=0;i<insert->lenblk;i++) itup[i] = gist_form_invalid_tuple( insert->blkno[i] ); + /* construct path */ + gixtxlogFindPath( index, insert ); + if ( insert->pathlen==0 ) { /*it was split root, so we should only make new root*/ Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); @@ -662,8 +668,7 @@ gist_xlog_cleanup(void) { XLogRecData * formSplitRdata(RelFileNode node, BlockNumber blkno, - ItemPointer key, - BlockNumber *path, int pathlen, SplitedPageLayout *dist ) { + ItemPointer key, SplitedPageLayout *dist ) { XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit)); @@ -681,7 +686,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, xlrec->node = node; xlrec->origblkno = blkno; xlrec->npage = (uint16)npage; - xlrec->pathlen = (uint16)pathlen; if ( key ) xlrec->key = *key; else @@ -692,15 +696,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, rdata[0].len = sizeof( gistxlogPageSplit ); rdata[0].next = NULL; - if ( pathlen ) { - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)path; - rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); - rdata[cur].next = NULL; - cur++; - } - ptr=dist; while(ptr) { rdata[cur].buffer = InvalidBuffer; @@ -725,8 +720,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, XLogRecData * formUpdateRdata(RelFileNode node, BlockNumber blkno, OffsetNumber *todelete, int ntodelete, bool emptypage, - IndexTuple *itup, int ituplen, ItemPointer key, - BlockNumber *path, int pathlen) { + IndexTuple *itup, int ituplen, ItemPointer key ) { XLogRecData *rdata; gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate)); @@ -740,7 +734,6 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno, if ( emptypage ) { xlrec->isemptypage = true; xlrec->ntodelete = 0; - xlrec->pathlen = 0; rdata = (XLogRecData*)palloc( sizeof(XLogRecData) ); rdata->buffer = InvalidBuffer; @@ -752,24 +745,14 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno, xlrec->isemptypage = false; xlrec->ntodelete = ntodelete; - xlrec->pathlen = pathlen; - rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) ); + rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 2 + ituplen ) ); rdata->buffer = InvalidBuffer; rdata->data = (char*)xlrec; rdata->len = sizeof(gistxlogEntryUpdate); rdata->next = NULL; - if ( pathlen ) { - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)path; - rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); - rdata[cur].next = NULL; - cur++; - } - if ( ntodelete ) { rdata[cur-1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; |