diff options
author | drh <drh@noemail.net> | 2015-06-23 02:37:30 +0000 |
---|---|---|
committer | drh <drh@noemail.net> | 2015-06-23 02:37:30 +0000 |
commit | 1ffd247c0c9e01226a6803d130d00a2321daeb75 (patch) | |
tree | 48f3f838c5639a70e236521d287630cc8887cf42 /src/btree.c | |
parent | 658873bdb343e7750863b4292f53d468821c3ded (diff) | |
download | sqlite-1ffd247c0c9e01226a6803d130d00a2321daeb75.tar.gz sqlite-1ffd247c0c9e01226a6803d130d00a2321daeb75.zip |
Avoid computing cell sizes in balance_nonroot() until they are really needed.
This gives an overall 1.7% performance gain for about 1000 extra bytes of
code space.
FossilOrigin-Name: 43844537e8a372953386663f8177202901ba7566
Diffstat (limited to 'src/btree.c')
-rw-r--r-- | src/btree.c | 229 |
1 files changed, 147 insertions, 82 deletions
diff --git a/src/btree.c b/src/btree.c index ad86c36e7..9693b0d39 100644 --- a/src/btree.c +++ b/src/btree.c @@ -6284,6 +6284,52 @@ static void insertCell( } /* +** A CellArray object contains a cache of pointers and sizes for a +** consecutive sequence of cells that might be held multiple pages. +*/ +typedef struct CellArray CellArray; +struct CellArray { + int nCell; /* Number of cells in apCell[] */ + MemPage *pRef; /* Reference page */ + u8 **apCell; /* All cells begin balanced */ + u16 *szCell; /* Local size of all cells in apCell[] */ +}; + +/* +** Make sure the cell sizes at idx, idx+1, ..., idx+N-1 have been +** computed. +*/ +static void populateCellCache(CellArray *p, int idx, int N){ + assert( idx>=0 && idx+N<=p->nCell ); + while( N>0 ){ + assert( p->apCell[idx]!=0 ); + if( p->szCell[idx]==0 ){ + p->szCell[idx] = p->pRef->xCellSize(p->pRef, p->apCell[idx]); + }else{ + assert( CORRUPT_DB || + p->szCell[idx]==p->pRef->xCellSize(p->pRef, p->apCell[idx]) ); + } + idx++; + N--; + } +} + +/* +** Return the size of the Nth element of the cell array +*/ +static SQLITE_NOINLINE u16 computeCellSize(CellArray *p, int N){ + assert( N>=0 && N<p->nCell ); + assert( p->szCell[N]==0 ); + p->szCell[N] = p->pRef->xCellSize(p->pRef, p->apCell[N]); + return p->szCell[N]; +} +static u16 cachedCellSize(CellArray *p, int N){ + assert( N>=0 && N<p->nCell ); + if( p->szCell[N] ) return p->szCell[N]; + return computeCellSize(p, N); +} + +/* ** Array apCell[] contains pointers to nCell b-tree page cells. The ** szCell[] array contains the size in bytes of each cell. This function ** replaces the current contents of page pPg with the contents of the cell @@ -6462,8 +6508,7 @@ static int editPage( int iOld, /* Index of first cell currently on page */ int iNew, /* Index of new first cell on page */ int nNew, /* Final number of cells on page */ - u8 **apCell, /* Array of cells */ - u16 *szCell /* Array of cell sizes */ + CellArray *pCArray /* Array of cells and sizes */ ){ u8 * const aData = pPg->aData; const int hdr = pPg->hdrOffset; @@ -6482,15 +6527,19 @@ static int editPage( /* Remove cells from the start and end of the page */ if( iOld<iNew ){ - int nShift = pageFreeArray( - pPg, iNew-iOld, &apCell[iOld], &szCell[iOld] + int nShift; + populateCellCache(pCArray, iOld, iNew-iOld); + nShift = pageFreeArray( + pPg, iNew-iOld, &pCArray->apCell[iOld], &pCArray->szCell[iOld] ); memmove(pPg->aCellIdx, &pPg->aCellIdx[nShift*2], nCell*2); nCell -= nShift; } if( iNewEnd < iOldEnd ){ + populateCellCache(pCArray, iNewEnd, iOldEnd-iNewEnd); nCell -= pageFreeArray( - pPg, iOldEnd-iNewEnd, &apCell[iNewEnd], &szCell[iNewEnd] + pPg, iOldEnd-iNewEnd, + &pCArray->apCell[iNewEnd], &pCArray->szCell[iNewEnd] ); } @@ -6503,9 +6552,10 @@ static int editPage( assert( (iOld-iNew)<nNew || nCell==0 || CORRUPT_DB ); pCellptr = pPg->aCellIdx; memmove(&pCellptr[nAdd*2], pCellptr, nCell*2); + populateCellCache(pCArray, iNew, nAdd); if( pageInsertArray( pPg, pBegin, &pData, pCellptr, - nAdd, &apCell[iNew], &szCell[iNew] + nAdd, &pCArray->apCell[iNew], &pCArray->szCell[iNew] ) ) goto editpage_fail; nCell += nAdd; } @@ -6517,18 +6567,20 @@ static int editPage( pCellptr = &pPg->aCellIdx[iCell * 2]; memmove(&pCellptr[2], pCellptr, (nCell - iCell) * 2); nCell++; + (void)cachedCellSize(pCArray, iCell + iNew); if( pageInsertArray( pPg, pBegin, &pData, pCellptr, - 1, &apCell[iCell + iNew], &szCell[iCell + iNew] + 1, &pCArray->apCell[iCell + iNew], &pCArray->szCell[iCell + iNew] ) ) goto editpage_fail; } } /* Append cells to the end of the page */ pCellptr = &pPg->aCellIdx[nCell*2]; + populateCellCache(pCArray, iNew+nCell, nNew-nCell); if( pageInsertArray( pPg, pBegin, &pData, pCellptr, - nNew-nCell, &apCell[iNew+nCell], &szCell[iNew+nCell] + nNew-nCell, &pCArray->apCell[iNew+nCell], &pCArray->szCell[iNew+nCell] ) ) goto editpage_fail; pPg->nCell = nNew; @@ -6539,19 +6591,21 @@ static int editPage( #ifdef SQLITE_DEBUG for(i=0; i<nNew && !CORRUPT_DB; i++){ - u8 *pCell = apCell[i+iNew]; + u8 *pCell = pCArray->apCell[i+iNew]; int iOff = get2byte(&pPg->aCellIdx[i*2]); if( pCell>=aData && pCell<&aData[pPg->pBt->usableSize] ){ pCell = &pTmp[pCell - aData]; } - assert( 0==memcmp(pCell, &aData[iOff], szCell[i+iNew]) ); + assert( 0==memcmp(pCell, &aData[iOff], + pCArray->pRef->xCellSize(pCArray->pRef, pCArray->apCell[i+iNew])) ); } #endif return SQLITE_OK; editpage_fail: /* Unable to edit this page. Rebuild it from scratch instead. */ - return rebuildPage(pPg, nNew, &apCell[iNew], &szCell[iNew]); + populateCellCache(pCArray, iNew, nNew); + return rebuildPage(pPg, nNew, &pCArray->apCell[iNew], &pCArray->szCell[iNew]); } /* @@ -6828,7 +6882,6 @@ static int balance_nonroot( int bBulk /* True if this call is part of a bulk load */ ){ BtShared *pBt; /* The whole database */ - int nCell = 0; /* Number of cells in apCell[] */ int nMaxCells = 0; /* Allocated size of apCell, szCell, aFrom. */ int nNew = 0; /* Number of pages in apNew[] */ int nOld; /* Number of pages in apOld[] */ @@ -6846,19 +6899,20 @@ static int balance_nonroot( MemPage *apNew[NB+2]; /* pPage and up to NB siblings after balancing */ u8 *pRight; /* Location in parent of right-sibling pointer */ u8 *apDiv[NB-1]; /* Divider cells in pParent */ - int cntNew[NB+2]; /* Index in aCell[] of cell after i-th page */ - int cntOld[NB+2]; /* Old index in aCell[] after i-th page */ + int cntNew[NB+2]; /* Index in b.paCell[] of cell after i-th page */ + int cntOld[NB+2]; /* Old index in b.apCell[] */ int szNew[NB+2]; /* Combined size of cells placed on i-th page */ - u8 **apCell = 0; /* All cells begin balanced */ - u16 *szCell; /* Local size of all cells in apCell[] */ u8 *aSpace1; /* Space for copies of dividers cells */ Pgno pgno; /* Temp var to store a page number in */ u8 abDone[NB+2]; /* True after i'th new page is populated */ Pgno aPgno[NB+2]; /* Page numbers of new pages before shuffling */ Pgno aPgOrder[NB+2]; /* Copy of aPgno[] used for sorting pages */ u16 aPgFlags[NB+2]; /* flags field of new pages before shuffling */ + CellArray b; /* Parsed information on cells being balanced */ memset(abDone, 0, sizeof(abDone)); + b.nCell = 0; + b.apCell = 0; pBt = pParent->pBt; assert( sqlite3_mutex_held(pBt->mutex) ); assert( sqlite3PagerIswriteable(pParent->pDbPage) ); @@ -6967,40 +7021,41 @@ static int balance_nonroot( ** Allocate space for memory structures */ szScratch = - nMaxCells*sizeof(u8*) /* apCell */ - + nMaxCells*sizeof(u16) /* szCell */ + nMaxCells*sizeof(u8*) /* b.apCell */ + + nMaxCells*sizeof(u16) /* b.szCell */ + pBt->pageSize; /* aSpace1 */ /* EVIDENCE-OF: R-28375-38319 SQLite will never request a scratch buffer ** that is more than 6 times the database page size. */ assert( szScratch<=6*(int)pBt->pageSize ); - apCell = sqlite3ScratchMalloc( szScratch ); - if( apCell==0 ){ + b.apCell = sqlite3ScratchMalloc( szScratch ); + if( b.apCell==0 ){ rc = SQLITE_NOMEM; goto balance_cleanup; } - szCell = (u16*)&apCell[nMaxCells]; - aSpace1 = (u8*)&szCell[nMaxCells]; + b.szCell = (u16*)&b.apCell[nMaxCells]; + aSpace1 = (u8*)&b.szCell[nMaxCells]; assert( EIGHT_BYTE_ALIGNMENT(aSpace1) ); /* ** Load pointers to all cells on sibling pages and the divider cells - ** into the local apCell[] array. Make copies of the divider cells + ** into the local b.apCell[] array. Make copies of the divider cells ** into space obtained from aSpace1[]. The divider cells have already ** been removed from pParent. ** ** If the siblings are on leaf pages, then the child pointers of the ** divider cells are stripped from the cells before they are copied - ** into aSpace1[]. In this way, all cells in apCell[] are without + ** into aSpace1[]. In this way, all cells in b.apCell[] are without ** child pointers. If siblings are not leaves, then all cell in - ** apCell[] include child pointers. Either way, all cells in apCell[] + ** b.apCell[] include child pointers. Either way, all cells in b.apCell[] ** are alike. ** ** leafCorrection: 4 if pPage is a leaf. 0 if pPage is not a leaf. ** leafData: 1 if pPage holds key+data and pParent holds only keys. */ - leafCorrection = apOld[0]->leaf*4; - leafData = apOld[0]->intKeyLeaf; + b.pRef = apOld[0]; + leafCorrection = b.pRef->leaf*4; + leafData = b.pRef->intKeyLeaf; for(i=0; i<nOld; i++){ int limit; MemPage *pOld = apOld[i]; @@ -7014,71 +7069,70 @@ static int balance_nonroot( } limit = pOld->nCell+pOld->nOverflow; + memset(&b.szCell[b.nCell], 0, sizeof(b.szCell[0])*limit); if( pOld->nOverflow>0 ){ for(j=0; j<limit; j++){ - assert( nCell<nMaxCells ); - apCell[nCell] = findOverflowCell(pOld, j); - szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]); - nCell++; + assert( b.nCell<nMaxCells ); + b.apCell[b.nCell] = findOverflowCell(pOld, j); + b.nCell++; } }else{ u8 *aData = pOld->aData; u16 maskPage = pOld->maskPage; u16 cellOffset = pOld->cellOffset; for(j=0; j<limit; j++){ - assert( nCell<nMaxCells ); - apCell[nCell] = findCellv2(aData, maskPage, cellOffset, j); - szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]); - nCell++; + assert( b.nCell<nMaxCells ); + b.apCell[b.nCell] = findCellv2(aData, maskPage, cellOffset, j); + b.nCell++; } - } - cntOld[i] = nCell; + } + cntOld[i] = b.nCell; if( i<nOld-1 && !leafData){ u16 sz = (u16)szNew[i]; u8 *pTemp; - assert( nCell<nMaxCells ); - szCell[nCell] = sz; + assert( b.nCell<nMaxCells ); + b.szCell[b.nCell] = sz; pTemp = &aSpace1[iSpace1]; iSpace1 += sz; assert( sz<=pBt->maxLocal+23 ); assert( iSpace1 <= (int)pBt->pageSize ); memcpy(pTemp, apDiv[i], sz); - apCell[nCell] = pTemp+leafCorrection; + b.apCell[b.nCell] = pTemp+leafCorrection; assert( leafCorrection==0 || leafCorrection==4 ); - szCell[nCell] = szCell[nCell] - leafCorrection; + b.szCell[b.nCell] = b.szCell[b.nCell] - leafCorrection; if( !pOld->leaf ){ assert( leafCorrection==0 ); assert( pOld->hdrOffset==0 ); /* The right pointer of the child page pOld becomes the left ** pointer of the divider cell */ - memcpy(apCell[nCell], &pOld->aData[8], 4); + memcpy(b.apCell[b.nCell], &pOld->aData[8], 4); }else{ assert( leafCorrection==4 ); - while( szCell[nCell]<4 ){ + while( b.szCell[b.nCell]<4 ){ /* Do not allow any cells smaller than 4 bytes. If a smaller cell ** does exist, pad it with 0x00 bytes. */ - assert( szCell[nCell]==3 || CORRUPT_DB ); - assert( apCell[nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB ); + assert( b.szCell[b.nCell]==3 || CORRUPT_DB ); + assert( b.apCell[b.nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB ); aSpace1[iSpace1++] = 0x00; - szCell[nCell]++; + b.szCell[b.nCell]++; } } - nCell++; + b.nCell++; } } /* - ** Figure out the number of pages needed to hold all nCell cells. + ** Figure out the number of pages needed to hold all b.nCell cells. ** Store this number in "k". Also compute szNew[] which is the total ** size of all cells on the i-th page and cntNew[] which is the index - ** in apCell[] of the cell that divides page i from page i+1. - ** cntNew[k] should equal nCell. + ** in b.apCell[] of the cell that divides page i from page i+1. + ** cntNew[k] should equal b.nCell. ** ** Values computed by this block: ** ** k: The total number of sibling pages ** szNew[i]: Spaced used on the i-th sibling page. - ** cntNew[i]: Index in apCell[] and szCell[] for the first cell to + ** cntNew[i]: Index in b.apCell[] and b.szCell[] for the first cell to ** the right of the i-th sibling page. ** usableSpace: Number of bytes of space available on each sibling. ** @@ -7101,27 +7155,35 @@ static int balance_nonroot( k = i+2; if( k>NB+2 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; } szNew[k-1] = 0; - cntNew[k-1] = nCell; + cntNew[k-1] = b.nCell; } - sz = 2+szCell[cntNew[i]-1]; + sz = 2 + cachedCellSize(&b, cntNew[i]-1); szNew[i] -= sz; if( !leafData ){ - sz = cntNew[i]<nCell ? 2+szCell[cntNew[i]] : 0; + if( cntNew[i]<b.nCell ){ + sz = 2 + cachedCellSize(&b, cntNew[i]); + }else{ + sz = 0; + } } szNew[i+1] += sz; cntNew[i]--; } - while( cntNew[i]<nCell ){ - sz = 2+szCell[cntNew[i]]; + while( cntNew[i]<b.nCell ){ + sz = 2 + cachedCellSize(&b, cntNew[i]); if( szNew[i]+sz>usableSpace ) break; szNew[i] += sz; cntNew[i]++; if( !leafData ){ - sz = cntNew[i]<nCell ? 2+szCell[cntNew[i]] : 0; + if( cntNew[i]<b.nCell ){ + sz = 2 + cachedCellSize(&b, cntNew[i]); + }else{ + sz = 0; + } } szNew[i+1] -= sz; } - if( cntNew[i]>=nCell ){ + if( cntNew[i]>=b.nCell ){ k = i+1; }else if( cntNew[i] - (i>0 ? cntNew[i-1] : 0) <= 0 ){ rc = SQLITE_CORRUPT_BKPT; @@ -7146,22 +7208,24 @@ static int balance_nonroot( int r; /* Index of right-most cell in left sibling */ int d; /* Index of first cell to the left of right sibling */ - r = cntNew[i-1] - 1; - d = r + 1 - leafData; - assert( d<nMaxCells ); - assert( r<nMaxCells ); - while( szRight==0 - || (!bBulk && szRight+szCell[d]+2<=szLeft-(szCell[r]+2)) - ){ - szRight += szCell[d] + 2; - szLeft -= szCell[r] + 2; + while(1){ + r = cntNew[i-1] - 1; + d = r + 1 - leafData; + assert( d<nMaxCells ); + assert( r<nMaxCells ); + (void)cachedCellSize(&b, d); + (void)cachedCellSize(&b, r); + if( szRight!=0 + && (bBulk || szRight+b.szCell[d]+2 > szLeft-(b.szCell[r]+2)) ){ + break; + } + szRight += b.szCell[d] + 2; + szLeft -= b.szCell[r] + 2; cntNew[i-1]--; if( cntNew[i-1] <= 0 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; } - r = cntNew[i-1] - 1; - d = r + 1 - leafData; } szNew[i] = szRight; szNew[i-1] = szLeft; @@ -7200,7 +7264,7 @@ static int balance_nonroot( zeroPage(pNew, pageFlags); apNew[i] = pNew; nNew++; - cntOld[i] = nCell; + cntOld[i] = b.nCell; /* Set the pointer-map entry for the new sibling page. */ if( ISAUTOVACUUM ){ @@ -7305,8 +7369,8 @@ static int balance_nonroot( int iNew = 0; int iOld = 0; - for(i=0; i<nCell; i++){ - u8 *pCell = apCell[i]; + for(i=0; i<b.nCell; i++){ + u8 *pCell = b.apCell[i]; if( i==cntOldNext ){ MemPage *pOld = (++iOld)<nNew ? apNew[iOld] : apOld[iOld]; cntOldNext += pOld->nCell + pOld->nOverflow + !leafData; @@ -7332,7 +7396,7 @@ static int balance_nonroot( ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno, &rc); if( rc ) goto balance_cleanup; } - if( szCell[i]>pNew->minLocal ){ + if( cachedCellSize(&b,i)>pNew->minLocal ){ ptrmapPutOvflPtr(pNew, pCell, &rc); if( rc ) goto balance_cleanup; } @@ -7349,20 +7413,21 @@ static int balance_nonroot( j = cntNew[i]; assert( j<nMaxCells ); - pCell = apCell[j]; - sz = szCell[j] + leafCorrection; + assert( b.apCell[j]!=0 ); + pCell = b.apCell[j]; + sz = b.szCell[j] + leafCorrection; pTemp = &aOvflSpace[iOvflSpace]; if( !pNew->leaf ){ memcpy(&pNew->aData[8], pCell, 4); }else if( leafData ){ /* If the tree is a leaf-data tree, and the siblings are leaves, - ** then there is no divider cell in apCell[]. Instead, the divider + ** then there is no divider cell in b.apCell[]. Instead, the divider ** cell consists of the integer key for the right-most cell of ** the sibling-page assembled above only. */ CellInfo info; j--; - pNew->xParseCell(pNew, apCell[j], &info); + pNew->xParseCell(pNew, b.apCell[j], &info); pCell = pTemp; sz = 4 + putVarint(&pCell[4], info.nKey); pTemp = 0; @@ -7379,7 +7444,7 @@ static int balance_nonroot( ** cells are at least 4 bytes. It only happens in b-trees used ** to evaluate "IN (SELECT ...)" and similar clauses. */ - if( szCell[j]==4 ){ + if( b.szCell[j]==4 ){ assert(leafCorrection==4); sz = pParent->xCellSize(pParent, pCell); } @@ -7437,12 +7502,12 @@ static int balance_nonroot( iNew = iOld = 0; nNewCell = cntNew[0]; }else{ - iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : nCell; + iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : b.nCell; iNew = cntNew[iPg-1] + !leafData; nNewCell = cntNew[iPg] - iNew; } - rc = editPage(apNew[iPg], iOld, iNew, nNewCell, apCell, szCell); + rc = editPage(apNew[iPg], iOld, iNew, nNewCell, &b); if( rc ) goto balance_cleanup; abDone[iPg]++; apNew[iPg]->nFree = usableSpace-szNew[iPg]; @@ -7494,7 +7559,7 @@ static int balance_nonroot( assert( pParent->isInit ); TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n", - nOld, nNew, nCell)); + nOld, nNew, b.nCell)); /* Free any old pages that were not reused as new pages. */ @@ -7517,7 +7582,7 @@ static int balance_nonroot( ** Cleanup before returning. */ balance_cleanup: - sqlite3ScratchFree(apCell); + sqlite3ScratchFree(b.apCell); for(i=0; i<nOld; i++){ releasePage(apOld[i]); } |