aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/btree.c852
1 files changed, 429 insertions, 423 deletions
diff --git a/src/btree.c b/src/btree.c
index 75b1d440c..c7688fd01 100644
--- a/src/btree.c
+++ b/src/btree.c
@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
-** $Id: btree.c,v 1.629 2009/06/16 04:31:49 danielk1977 Exp $
+** $Id: btree.c,v 1.630 2009/06/16 16:50:22 danielk1977 Exp $
**
** This file implements a external (disk-based) database using BTrees.
** See the header comment on "btreeInt.h" for additional information.
@@ -638,7 +638,6 @@ static int ptrmapGet(BtShared *pBt, Pgno key, u8 *pEType, Pgno *pPgno){
#else /* if defined SQLITE_OMIT_AUTOVACUUM */
#define ptrmapPut(w,x,y,z) SQLITE_OK
#define ptrmapGet(w,x,y,z) SQLITE_OK
- #define ptrmapPutOvfl(y,z) SQLITE_OK
#endif
/*
@@ -830,23 +829,12 @@ static int ptrmapPutOvflPtr(MemPage *pPage, u8 *pCell){
assert( pCell!=0 );
sqlite3BtreeParseCellPtr(pPage, pCell, &info);
assert( (info.nData+(pPage->intKey?0:info.nKey))==info.nPayload );
- if( (info.nData+(pPage->intKey?0:info.nKey))>info.nLocal ){
+ if( info.iOverflow ){
Pgno ovfl = get4byte(&pCell[info.iOverflow]);
return ptrmapPut(pPage->pBt, ovfl, PTRMAP_OVERFLOW1, pPage->pgno);
}
return SQLITE_OK;
}
-/*
-** If the cell with index iCell on page pPage contains a pointer
-** to an overflow page, insert an entry into the pointer-map
-** for the overflow page.
-*/
-static int ptrmapPutOvfl(MemPage *pPage, int iCell){
- u8 *pCell;
- assert( sqlite3_mutex_held(pPage->pBt->mutex) );
- pCell = findOverflowCell(pPage, iCell);
- return ptrmapPutOvflPtr(pPage, pCell);
-}
#endif
@@ -4573,7 +4561,7 @@ static int allocateBtreePage(
Pgno nPage;
*pPgno = iPage;
nPage = pagerPagecount(pBt);
- if( *pPgno>nPage ){
+ if( iPage>nPage ){
/* Free page off the end of the file */
rc = SQLITE_CORRUPT_BKPT;
goto end_allocate_page;
@@ -5063,7 +5051,7 @@ static int insertCell(
u8 *pCell, /* Content of the new cell */
int sz, /* Bytes of content in pCell */
u8 *pTemp, /* Temp storage space for pCell, if needed */
- u8 nSkip /* Do not write the first nSkip bytes of the cell */
+ Pgno iChild /* If non-zero, replace first 4 bytes with this value */
){
int idx; /* Where to write new cell content in data[] */
int j; /* Loop counter */
@@ -5075,6 +5063,8 @@ static int insertCell(
u8 *data; /* The content of the whole page */
u8 *ptr; /* Used for moving information around in data[] */
+ int nSkip = (iChild ? 4 : 0);
+
assert( i>=0 && i<=pPage->nCell+pPage->nOverflow );
assert( pPage->nCell<=MX_CELL(pPage->pBt) && MX_CELL(pPage->pBt)<=5460 );
assert( pPage->nOverflow<=ArraySize(pPage->aOvfl) );
@@ -5085,6 +5075,9 @@ static int insertCell(
memcpy(pTemp+nSkip, pCell+nSkip, sz-nSkip);
pCell = pTemp;
}
+ if( iChild ){
+ put4byte(pCell, iChild);
+ }
j = pPage->nOverflow++;
assert( j<(int)(sizeof(pPage->aOvfl)/sizeof(pPage->aOvfl[0])) );
pPage->aOvfl[j].pCell = pCell;
@@ -5118,6 +5111,9 @@ static int insertCell(
pPage->nCell++;
pPage->nFree -= 2;
memcpy(&data[idx+nSkip], pCell+nSkip, sz-nSkip);
+ if( iChild ){
+ put4byte(&data[idx], iChild);
+ }
for(j=end-2, ptr=&data[j]; j>ins; j-=2, ptr-=2){
ptr[0] = ptr[-2];
ptr[1] = ptr[-1];
@@ -5129,14 +5125,7 @@ static int insertCell(
/* The cell may contain a pointer to an overflow page. If so, write
** the entry for the overflow page into the pointer map.
*/
- CellInfo info;
- sqlite3BtreeParseCellPtr(pPage, pCell, &info);
- assert( (info.nData+(pPage->intKey?0:info.nKey))==info.nPayload );
- if( info.iOverflow ){
- Pgno pgnoOvfl = get4byte(&pCell[info.iOverflow]);
- rc = ptrmapPut(pPage->pBt, pgnoOvfl, PTRMAP_OVERFLOW1, pPage->pgno);
- if( rc!=SQLITE_OK ) return rc;
- }
+ rc = ptrmapPutOvflPtr(pPage, pCell);
}
#endif
}
@@ -5226,7 +5215,7 @@ static void assemblePage(
*/
static int balance_quick(MemPage *pParent, MemPage *pPage, u8 *pSpace){
BtShared *const pBt = pPage->pBt; /* B-Tree Database */
- MemPage *pNew = 0; /* Newly allocated page */
+ MemPage *pNew; /* Newly allocated page */
int rc; /* Return Code */
Pgno pgnoNew; /* Page number of pNew */
@@ -5241,6 +5230,7 @@ static int balance_quick(MemPage *pParent, MemPage *pPage, u8 *pSpace){
** may be inserted. If both these operations are successful, proceed.
*/
rc = allocateBtreePage(pBt, &pNew, &pgnoNew, 0, 0);
+
if( rc==SQLITE_OK ){
u8 *pOut = &pSpace[4];
@@ -5252,6 +5242,22 @@ static int balance_quick(MemPage *pParent, MemPage *pPage, u8 *pSpace){
assert( pPage->aData[0]==(PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF) );
zeroPage(pNew, PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF);
assemblePage(pNew, 1, &pCell, &szCell);
+
+ /* If this is an auto-vacuum database, update the pointer map
+ ** with entries for the new page, and any pointer from the
+ ** cell on the page to an overflow page. If either of these
+ ** operations fails, the return code is set, but the contents
+ ** of the parent page are still manipulated by thh code below.
+ ** That is Ok, at this point the parent page is guaranteed to
+ ** be marked as dirty. Returning an error code will cause a
+ ** rollback, undoing any changes made to the parent page.
+ */
+ if( ISAUTOVACUUM ){
+ rc = ptrmapPut(pBt, pgnoNew, PTRMAP_BTREE, pParent->pgno);
+ if( szCell>pNew->minLocal && rc==SQLITE_OK ){
+ rc = ptrmapPutOvflPtr(pNew, pCell);
+ }
+ }
/* Create a divider cell to insert into pParent. The divider cell
** consists of a 4-byte page number (the page number of pPage) and
@@ -5266,30 +5272,18 @@ static int balance_quick(MemPage *pParent, MemPage *pPage, u8 *pSpace){
** field. The second while(...) loop copies the key value from the
** cell on pPage into the pSpace buffer.
*/
- put4byte(pSpace, pPage->pgno);
pCell = findCell(pPage, pPage->nCell-1);
pStop = &pCell[9];
while( (*(pCell++)&0x80) && pCell<pStop );
pStop = &pCell[9];
while( ((*(pOut++) = *(pCell++))&0x80) && pCell<pStop );
- /* Insert the new divider cell into pParent */
- insertCell(pParent, pParent->nCell, pSpace, (int)(pOut-pSpace), 0, 0);
+ /* Insert the new divider cell into pParent. */
+ insertCell(pParent,pParent->nCell,pSpace,(int)(pOut-pSpace),0,pPage->pgno);
/* Set the right-child pointer of pParent to point to the new page. */
put4byte(&pParent->aData[pParent->hdrOffset+8], pgnoNew);
- /* If this is an auto-vacuum database, update the pointer map
- ** with entries for the new page, and any pointer from the
- ** cell on the page to an overflow page.
- */
- if( ISAUTOVACUUM ){
- rc = ptrmapPut(pBt, pgnoNew, PTRMAP_BTREE, pParent->pgno);
- if( rc==SQLITE_OK ){
- rc = ptrmapPutOvfl(pNew, 0);
- }
- }
-
/* Release the reference to the new page. */
releasePage(pNew);
}
@@ -5298,41 +5292,99 @@ static int balance_quick(MemPage *pParent, MemPage *pPage, u8 *pSpace){
}
#endif /* SQLITE_OMIT_QUICKBALANCE */
+#if 0
/*
-** This routine redistributes Cells on pPage and up to NN*2 siblings
-** of pPage so that all pages have about the same amount of free space.
-** Usually NN siblings on either side of pPage is used in the balancing,
-** though more siblings might come from one side if pPage is the first
-** or last child of its parent. If pPage has fewer than 2*NN siblings
-** (something which can only happen if pPage is the root page or a
-** child of root) then all available siblings participate in the balancing.
+** This function does not contribute anything to the operation of SQLite.
+** it is sometimes activated temporarily while debugging code responsible
+** for setting pointer-map entries.
+*/
+static int ptrmapCheckPages(MemPage **apPage, int nPage){
+ int i, j;
+ for(i=0; i<nPage; i++){
+ Pgno n;
+ u8 e;
+ MemPage *pPage = apPage[i];
+ BtShared *pBt = pPage->pBt;
+ assert( pPage->isInit );
+
+ for(j=0; j<pPage->nCell; j++){
+ CellInfo info;
+ u8 *z;
+
+ z = findCell(pPage, j);
+ sqlite3BtreeParseCellPtr(pPage, z, &info);
+ if( info.iOverflow ){
+ Pgno ovfl = get4byte(&z[info.iOverflow]);
+ ptrmapGet(pBt, ovfl, &e, &n);
+ assert( n==pPage->pgno && e==PTRMAP_OVERFLOW1 );
+ }
+ if( !pPage->leaf ){
+ Pgno child = get4byte(z);
+ ptrmapGet(pBt, child, &e, &n);
+ assert( n==pPage->pgno && e==PTRMAP_BTREE );
+ }
+ }
+ if( !pPage->leaf ){
+ Pgno child = get4byte(&pPage->aData[pPage->hdrOffset+8]);
+ ptrmapGet(pBt, child, &e, &n);
+ assert( n==pPage->pgno && e==PTRMAP_BTREE );
+ }
+ }
+ return 1;
+}
+#endif
+
+
+/*
+** This routine redistributes cells on the iParentIdx'th child of pParent
+** (hereafter "the page") and up to 2 siblings so that all pages have about the
+** same amount of free space. Usually a single sibling on either side of the
+** page are used in the balancing, though both siblings might come from one
+** side if the page is the first or last child of its parent. If the page
+** has fewer than 2 siblings (something which can only happen if the page
+** is a root page or a child of a root page) then all available siblings
+** participate in the balancing.
**
-** The number of siblings of pPage might be increased or decreased by one or
-** two in an effort to keep pages nearly full but not over full. The root page
-** is special and is allowed to be nearly empty. If pPage is
-** the root page, then the depth of the tree might be increased
-** or decreased by one, as necessary, to keep the root page from being
-** overfull or completely empty.
+** The number of siblings of the page might be increased or decreased by
+** one or two in an effort to keep pages nearly full but not over full.
**
-** Note that when this routine is called, some of the Cells on pPage
-** might not actually be stored in pPage->aData[]. This can happen
-** if the page is overfull. Part of the job of this routine is to
-** make sure all Cells for pPage once again fit in pPage->aData[].
+** Note that when this routine is called, some of the cells on the page
+** might not actually be stored in MemPage.aData[]. This can happen
+** if the page is overfull. This routine ensures that all cells allocated
+** to the page and its siblings fit into MemPage.aData[] before returning.
**
-** In the course of balancing the siblings of pPage, the parent of pPage
-** might become overfull or underfull. If that happens, then this routine
-** is called recursively on the parent.
+** In the course of balancing the page and its siblings, cells may be
+** inserted into or removed from the parent page (pParent). Doing so
+** may cause the parent page to become overfull or underfull. If this
+** happens, it is the responsibility of the caller to invoke the correct
+** balancing routine to fix this problem (see the balance() routine).
**
** If this routine fails for any reason, it might leave the database
** in a corrupted state. So if this routine fails, the database should
** be rolled back.
-*/
-static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
+**
+** The third argument to this function, aOvflSpace, is a pointer to a
+** buffer page-size bytes in size. If, in inserting cells into the parent
+** page (pParent), the parent page becomes overfull, this buffer is
+** used to store the parents overflow cells. Because this function inserts
+** a maximum of four divider cells into the parent page, and the maximum
+** size of a cell stored within an internal node is always less than 1/4
+** of the page-size, the aOvflSpace[] buffer is guaranteed to be large
+** enough for all overflow cells.
+**
+** If aOvflSpace is set to a null pointer, this function returns
+** SQLITE_NOMEM.
+*/
+static int balance_nonroot(
+ MemPage *pParent, /* Parent page of siblings being balanced */
+ int iParentIdx, /* Index of "the page" in pParent */
+ u8 *aOvflSpace /* page-size bytes of space for parent ovfl */
+){
BtShared *pBt; /* The whole database */
int nCell = 0; /* Number of cells in apCell[] */
int nMaxCells = 0; /* Allocated size of apCell, szCell, aFrom. */
- int nOld = 0; /* Number of pages in apOld[] */
int nNew = 0; /* Number of pages in apNew[] */
+ int nOld; /* Number of pages in apOld[] */
int i, j, k; /* Loop counters */
int nxDiv; /* Next divider slot in pParent->aCell[] */
int rc = SQLITE_OK; /* The return code */
@@ -5345,18 +5397,16 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
int iOvflSpace = 0; /* First unused byte of aOvflSpace[] */
int szScratch; /* Size of scratch memory requested */
MemPage *apOld[NB]; /* pPage and up to two siblings */
- Pgno pgnoOld[NB]; /* Page numbers for each page in apOld[] */
MemPage *apCopy[NB]; /* Private copies of apOld[] pages */
MemPage *apNew[NB+2]; /* pPage and up to NB siblings after balancing */
- Pgno pgnoNew[NB+2]; /* Page numbers for each page in apNew[] */
- u8 *apDiv[NB]; /* Divider cells in pParent */
+ u8 *pRight; /* Location in parent of right-sibling pointer */
+ u8 *apDiv[NB-1]; /* Divider cells in pParent */
int cntNew[NB+2]; /* Index in aCell[] of cell after i-th page */
int szNew[NB+2]; /* Combined size of cells place on i-th page */
u8 **apCell = 0; /* All cells begin balanced */
u16 *szCell; /* Local size of all cells in apCell[] */
- u8 *aCopy[NB]; /* Space for holding data of apCopy[] */
- u8 *aSpace1; /* Space for copies of dividers cells before balance */
- u8 *aFrom = 0;
+ u8 *aSpace1; /* Space for copies of dividers cells */
+ Pgno pgno; /* Temp var to store a page number in */
pBt = pParent->pBt;
assert( sqlite3_mutex_held(pBt->mutex) );
@@ -5364,35 +5414,71 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
TRACE(("BALANCE: begin page %d child of %d\n", pPage->pgno, pParent->pgno));
+ /* At this point pParent may have at most one overflow cell. And if
+ ** this overflow cell is present, it must be the cell with
+ ** index iParentIdx. This scenario comes about when this function
+ ** is called (indirectly) from sqlite3BtreeDelete(). */
+ assert( pParent->nOverflow==0 || pParent->nOverflow==1 );
+ assert( pParent->nOverflow==0 || pParent->aOvfl[0].idx==iParentIdx );
+
/* Find the sibling pages to balance. Also locate the cells in pParent
** that divide the siblings. An attempt is made to find NN siblings on
** either side of pPage. More siblings are taken from one side, however,
** if there are fewer than NN siblings on the other side. If pParent
- ** has NB or fewer children then all children of pParent are taken.
- */
- nxDiv = iParentIdx - NN;
- if( nxDiv + NB > pParent->nCell ){
- nxDiv = pParent->nCell - NB + 1;
- }
- if( nxDiv<0 ){
+ ** has NB or fewer children then all children of pParent are taken.
+ **
+ ** This loop also drops the divider cells from the parent page. This
+ ** way, the remainder of the function does not have to deal with any
+ ** overflow cells in the parent page, as if one existed it has already
+ ** been removed. */
+ i = pParent->nOverflow + pParent->nCell;
+ if( i<2 ){
nxDiv = 0;
- }
- for(i=0, k=nxDiv; i<NB; i++, k++){
- if( k<pParent->nCell ){
- apDiv[i] = findCell(pParent, k);
- assert( !pParent->leaf );
- pgnoOld[i] = get4byte(apDiv[i]);
- }else if( k==pParent->nCell ){
- pgnoOld[i] = get4byte(&pParent->aData[pParent->hdrOffset+8]);
+ nOld = i+1;
+ }else{
+ nOld = 3;
+ if( iParentIdx==0 ){
+ nxDiv = 0;
+ }else if( iParentIdx==i ){
+ nxDiv = i-2;
}else{
- break;
+ nxDiv = iParentIdx-1;
+ }
+ i = 2;
+ }
+ if( (i+nxDiv-pParent->nOverflow)==pParent->nCell ){
+ pRight = &pParent->aData[pParent->hdrOffset+8];
+ }else{
+ pRight = findCell(pParent, i+nxDiv-pParent->nOverflow);
+ }
+ pgno = get4byte(pRight);
+ while( 1 ){
+ rc = getAndInitPage(pBt, pgno, &apOld[i]);
+ if( rc ){
+ memset(apOld, 0, i*sizeof(MemPage*));
+ goto balance_cleanup;
}
- rc = getAndInitPage(pBt, pgnoOld[i], &apOld[i]);
- if( rc ) goto balance_cleanup;
- apCopy[i] = 0;
- assert( i==nOld );
- nOld++;
nMaxCells += 1+apOld[i]->nCell+apOld[i]->nOverflow;
+ if( (i--)==0 ) break;
+
+ if( pParent->nOverflow && i+nxDiv==pParent->aOvfl[0].idx ){
+ apDiv[i] = pParent->aOvfl[0].pCell;
+ pgno = get4byte(apDiv[i]);
+ szNew[i] = cellSizePtr(pParent, apDiv[i]);
+ pParent->nOverflow = 0;
+ }else{
+ apDiv[i] = findCell(pParent, i+nxDiv-pParent->nOverflow);
+ pgno = get4byte(apDiv[i]);
+ szNew[i] = cellSizePtr(pParent, apDiv[i]);
+
+ /* Drop the cell from the parent page. apDiv[i] still points to
+ ** the cell within the parent, even though it has been dropped.
+ ** This is safe because dropping a cell only overwrites the first
+ ** four bytes of it, and this function does not need the first
+ ** four bytes of the divider cell. So the pointer is safe to use
+ ** later on. */
+ dropCell(pParent, i+nxDiv-pParent->nOverflow, szNew[i]);
+ }
}
/* Make nMaxCells a multiple of 4 in order to preserve 8-byte
@@ -5402,47 +5488,25 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
/*
** Allocate space for memory structures
*/
+ k = pBt->pageSize + ROUND8(sizeof(MemPage));
szScratch =
nMaxCells*sizeof(u8*) /* apCell */
+ nMaxCells*sizeof(u16) /* szCell */
- + (ROUND8(sizeof(MemPage))+pBt->pageSize)*NB /* aCopy */
+ pBt->pageSize /* aSpace1 */
- + (ISAUTOVACUUM ? nMaxCells : 0); /* aFrom */
+ + k*nOld; /* Page copies (apCopy) */
apCell = sqlite3ScratchMalloc( szScratch );
if( apCell==0 || aOvflSpace==0 ){
rc = SQLITE_NOMEM;
goto balance_cleanup;
}
szCell = (u16*)&apCell[nMaxCells];
- aCopy[0] = (u8*)&szCell[nMaxCells];
- assert( EIGHT_BYTE_ALIGNMENT(aCopy[0]) );
- for(i=1; i<NB; i++){
- aCopy[i] = &aCopy[i-1][pBt->pageSize+ROUND8(sizeof(MemPage))];
- assert( ((aCopy[i] - (u8*)0) & 7)==0 ); /* 8-byte alignment required */
- }
- aSpace1 = &aCopy[NB-1][pBt->pageSize+ROUND8(sizeof(MemPage))];
+ aSpace1 = (u8*)&szCell[nMaxCells];
assert( EIGHT_BYTE_ALIGNMENT(aSpace1) );
- if( ISAUTOVACUUM ){
- aFrom = &aSpace1[pBt->pageSize];
- }
-
- /*
- ** Make copies of the content of pPage and its siblings into aOld[].
- ** The rest of this function will use data from the copies rather
- ** that the original pages since the original pages will be in the
- ** process of being overwritten.
- */
- for(i=0; i<nOld; i++){
- MemPage *p = apCopy[i] = (MemPage*)aCopy[i];
- memcpy(p, apOld[i], sizeof(MemPage));
- p->aData = (void*)&p[1];
- memcpy(p->aData, apOld[i]->aData, pBt->pageSize);
- }
/*
** Load pointers to all cells on sibling pages and the divider cells
** into the local apCell[] array. Make copies of the divider cells
- ** into space obtained form aSpace1[] and remove the the divider Cells
+ ** into space obtained from aSpace1[] and remove the the divider Cells
** from pParent.
**
** If the siblings are on leaf pages, then the child pointers of the
@@ -5455,68 +5519,54 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
** leafCorrection: 4 if pPage is a leaf. 0 if pPage is not a leaf.
** leafData: 1 if pPage holds key+data and pParent holds only keys.
*/
- nCell = 0;
leafCorrection = apOld[0]->leaf*4;
leafData = apOld[0]->hasData;
for(i=0; i<nOld; i++){
- MemPage *pOld = apCopy[i];
- int limit = pOld->nCell+pOld->nOverflow;
+ int limit;
+
+ /* Before doing anything else, take a copy of the i'th original sibling
+ ** The rest of this function will use data from the copies rather
+ ** that the original pages since the original pages will be in the
+ ** process of being overwritten. */
+ MemPage *pOld = apCopy[i] = (MemPage*)&aSpace1[pBt->pageSize + k*i];
+ memcpy(pOld, apOld[i], sizeof(MemPage));
+ pOld->aData = (void*)&pOld[1];
+ memcpy(pOld->aData, apOld[i]->aData, pBt->pageSize);
+
+ limit = pOld->nCell+pOld->nOverflow;
for(j=0; j<limit; j++){
assert( nCell<nMaxCells );
apCell[nCell] = findOverflowCell(pOld, j);
szCell[nCell] = cellSizePtr(pOld, apCell[nCell]);
- if( ISAUTOVACUUM ){
- int a;
- aFrom[nCell] = (u8)i; assert( i>=0 && i<6 );
- for(a=0; a<pOld->nOverflow; a++){
- if( pOld->aOvfl[a].pCell==apCell[nCell] ){
- aFrom[nCell] = 0xFF;
- break;
- }
- }
- }
nCell++;
}
- if( i<nOld-1 ){
- u16 sz = cellSizePtr(pParent, apDiv[i]);
- if( leafData ){
- /* With the LEAFDATA flag, pParent cells hold only INTKEYs that
- ** are duplicates of keys on the child pages. We need to remove
- ** the divider cells from pParent, but the dividers cells are not
- ** added to apCell[] because they are duplicates of child cells.
- */
- dropCell(pParent, nxDiv, sz);
+ if( i<nOld-1 && !leafData){
+ u16 sz = szNew[i];
+ u8 *pTemp;
+ assert( nCell<nMaxCells );
+ szCell[nCell] = sz;
+ pTemp = &aSpace1[iSpace1];
+ iSpace1 += sz;
+ assert( sz<=pBt->pageSize/4 );
+ assert( iSpace1<=pBt->pageSize );
+ memcpy(pTemp, apDiv[i], sz);
+ apCell[nCell] = pTemp+leafCorrection;
+ assert( leafCorrection==0 || leafCorrection==4 );
+ szCell[nCell] -= (u16)leafCorrection;
+ if( !pOld->leaf ){
+ assert( leafCorrection==0 );
+ assert( pOld->hdrOffset==0 );
+ /* The right pointer of the child page pOld becomes the left
+ ** pointer of the divider cell */
+ memcpy(apCell[nCell], &pOld->aData[8], 4);
}else{
- u8 *pTemp;
- assert( nCell<nMaxCells );
- szCell[nCell] = sz;
- pTemp = &aSpace1[iSpace1];
- iSpace1 += sz;
- assert( sz<=pBt->pageSize/4 );
- assert( iSpace1<=pBt->pageSize );
- memcpy(pTemp, apDiv[i], sz);
- apCell[nCell] = pTemp+leafCorrection;
- if( ISAUTOVACUUM ){
- aFrom[nCell] = 0xFF;
+ assert( leafCorrection==4 );
+ if( szCell[nCell]<4 ){
+ /* Do not allow any cells smaller than 4 bytes. */
+ szCell[nCell] = 4;
}
- dropCell(pParent, nxDiv, sz);
- assert( leafCorrection==0 || leafCorrection==4 );
- szCell[nCell] -= (u16)leafCorrection;
- assert( get4byte(pTemp)==pgnoOld[i] );
- if( !pOld->leaf ){
- assert( leafCorrection==0 );
- /* The right pointer of the child page pOld becomes the left
- ** pointer of the divider cell */
- memcpy(apCell[nCell], &pOld->aData[pOld->hdrOffset+8], 4);
- }else{
- assert( leafCorrection==4 );
- if( szCell[nCell]<4 ){
- /* Do not allow any cells smaller than 4 bytes. */
- szCell[nCell] = 4;
- }
- }
- nCell++;
}
+ nCell++;
}
}
@@ -5602,17 +5652,24 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
MemPage *pNew;
if( i<nOld ){
pNew = apNew[i] = apOld[i];
- pgnoNew[i] = pgnoOld[i];
apOld[i] = 0;
rc = sqlite3PagerWrite(pNew->pDbPage);
nNew++;
if( rc ) goto balance_cleanup;
}else{
assert( i>0 );
- rc = allocateBtreePage(pBt, &pNew, &pgnoNew[i], pgnoNew[i-1], 0);
+ rc = allocateBtreePage(pBt, &pNew, &pgno, pgno, 0);
if( rc ) goto balance_cleanup;
apNew[i] = pNew;
nNew++;
+
+ /* Set the pointer-map entry for the new sibling page. */
+ if( ISAUTOVACUUM ){
+ rc = ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno);
+ if( rc!=SQLITE_OK ){
+ goto balance_cleanup;
+ }
+ }
}
}
@@ -5641,34 +5698,35 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
** about 25% faster for large insertions and deletions.
*/
for(i=0; i<k-1; i++){
- int minV = pgnoNew[i];
+ int minV = apNew[i]->pgno;
int minI = i;
for(j=i+1; j<k; j++){
- if( pgnoNew[j]<(unsigned)minV ){
+ if( apNew[j]->pgno<(unsigned)minV ){
minI = j;
- minV = pgnoNew[j];
+ minV = apNew[j]->pgno;
}
}
if( minI>i ){
int t;
MemPage *pT;
- t = pgnoNew[i];
+ t = apNew[i]->pgno;
pT = apNew[i];
- pgnoNew[i] = pgnoNew[minI];
apNew[i] = apNew[minI];
- pgnoNew[minI] = t;
apNew[minI] = pT;
}
}
TRACE(("BALANCE: old: %d %d %d new: %d(%d) %d(%d) %d(%d) %d(%d) %d(%d)\n",
- pgnoOld[0],
- nOld>=2 ? pgnoOld[1] : 0,
- nOld>=3 ? pgnoOld[2] : 0,
- pgnoNew[0], szNew[0],
- nNew>=2 ? pgnoNew[1] : 0, nNew>=2 ? szNew[1] : 0,
- nNew>=3 ? pgnoNew[2] : 0, nNew>=3 ? szNew[2] : 0,
- nNew>=4 ? pgnoNew[3] : 0, nNew>=4 ? szNew[3] : 0,
- nNew>=5 ? pgnoNew[4] : 0, nNew>=5 ? szNew[4] : 0));
+ apOld[0]->pgno,
+ nOld>=2 ? apOld[1]->pgno : 0,
+ nOld>=3 ? apOld[2]->pgno : 0,
+ apNew[0]->pgno, szNew[0],
+ nNew>=2 ? apNew[1]->pgno : 0, nNew>=2 ? szNew[1] : 0,
+ nNew>=3 ? apNew[2]->pgno : 0, nNew>=3 ? szNew[2] : 0,
+ nNew>=4 ? apNew[3]->pgno : 0, nNew>=4 ? szNew[3] : 0,
+ nNew>=5 ? apNew[4]->pgno : 0, nNew>=5 ? szNew[4] : 0));
+
+ assert( sqlite3PagerIswriteable(pParent->pDbPage) );
+ put4byte(pRight, apNew[nNew-1]->pgno);
/*
** Evenly distribute the data in apCell[] across the new pages.
@@ -5679,32 +5737,11 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
/* Assemble the new sibling page. */
MemPage *pNew = apNew[i];
assert( j<nMaxCells );
- assert( pNew->pgno==pgnoNew[i] );
zeroPage(pNew, pageFlags);
assemblePage(pNew, cntNew[i]-j, &apCell[j], &szCell[j]);
assert( pNew->nCell>0 || (nNew==1 && cntNew[0]==0) );
assert( pNew->nOverflow==0 );
- /* If this is an auto-vacuum database, update the pointer map entries
- ** that point to the siblings that were rearranged. These can be: left
- ** children of cells, the right-child of the page, or overflow pages
- ** pointed to by cells.
- */
- if( ISAUTOVACUUM ){
- for(k=j; k<cntNew[i]; k++){
- assert( k<nMaxCells );
- if( aFrom[k]==0xFF || apCopy[aFrom[k]]->pgno!=pNew->pgno ){
- rc = ptrmapPutOvfl(pNew, k-j);
- if( rc==SQLITE_OK && leafCorrection==0 ){
- rc = ptrmapPut(pBt, get4byte(apCell[k]), PTRMAP_BTREE, pNew->pgno);
- }
- if( rc!=SQLITE_OK ){
- goto balance_cleanup;
- }
- }
- }
- }
-
j = cntNew[i];
/* If the sibling page assembled above was not the right-most sibling,
@@ -5721,14 +5758,6 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
pTemp = &aOvflSpace[iOvflSpace];
if( !pNew->leaf ){
memcpy(&pNew->aData[8], pCell, 4);
- if( ISAUTOVACUUM
- && (aFrom[j]==0xFF || apCopy[aFrom[j]]->pgno!=pNew->pgno)
- ){
- rc = ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno);
- if( rc!=SQLITE_OK ){
- goto balance_cleanup;
- }
- }
}else if( leafData ){
/* If the tree is a leaf-data tree, and the siblings are leaves,
** then there is no divider cell in apCell[]. Instead, the divider
@@ -5739,7 +5768,7 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
j--;
sqlite3BtreeParseCellPtr(pNew, apCell[j], &info);
pCell = pTemp;
- sz = 4 + putVarint(&pCell[4], info.nKey);
+ sz = 4 + putVarint(&pCell[4], info.nKey);
pTemp = 0;
}else{
pCell -= 4;
@@ -5762,32 +5791,13 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
iOvflSpace += sz;
assert( sz<=pBt->pageSize/4 );
assert( iOvflSpace<=pBt->pageSize );
- rc = insertCell(pParent, nxDiv, pCell, sz, pTemp, 4);
+ rc = insertCell(pParent, nxDiv, pCell, sz, pTemp, pNew->pgno);
if( rc!=SQLITE_OK ) goto balance_cleanup;
assert( sqlite3PagerIswriteable(pParent->pDbPage) );
- put4byte(findOverflowCell(pParent,nxDiv), pNew->pgno);
- /* If this is an auto-vacuum database, and not a leaf-data tree,
- ** then update the pointer map with an entry for the overflow page
- ** that the cell just inserted points to (if any).
- */
- if( ISAUTOVACUUM && !leafData ){
- rc = ptrmapPutOvfl(pParent, nxDiv);
- if( rc!=SQLITE_OK ){
- goto balance_cleanup;
- }
- }
j++;
nxDiv++;
}
-
- /* Set the pointer-map entry for the new sibling page. */
- if( ISAUTOVACUUM ){
- rc = ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno);
- if( rc!=SQLITE_OK ){
- goto balance_cleanup;
- }
- }
}
assert( j==nCell );
assert( nOld>0 );
@@ -5795,34 +5805,118 @@ static int balance_nonroot(MemPage *pParent, int iParentIdx, u8 *aOvflSpace){
if( (pageFlags & PTF_LEAF)==0 ){
u8 *zChild = &apCopy[nOld-1]->aData[8];
memcpy(&apNew[nNew-1]->aData[8], zChild, 4);
- if( ISAUTOVACUUM ){
- rc = ptrmapPut(pBt, get4byte(zChild), PTRMAP_BTREE, apNew[nNew-1]->pgno);
- if( rc!=SQLITE_OK ){
- goto balance_cleanup;
+ }
+
+ /* Fix the pointer-map entries for all the cells that were shifted around.
+ ** There are several different types of pointer-map entries that need to
+ ** be dealt with by this routine. Some of these have been set already, but
+ ** many have not. The following is a summary:
+ **
+ ** 1) The entries associated with new sibling pages that were not
+ ** siblings when this function was called. These have already
+ ** been set. We don't need to worry about old siblings that were
+ ** moved to the free-list - the freePage() code has taken care
+ ** of those.
+ **
+ ** 2) The pointer-map entries associated with the first overflow
+ ** page in any overflow chains used by new divider cells. These
+ ** have also already been taken care of by the insertCell() code.
+ **
+ ** 3) If the sibling pages are not leaves, then the child pages of
+ ** cells stored on the sibling pages may need to be updated.
+ **
+ ** 4) If the sibling pages are not internal intkey nodes, then any
+ ** overflow pages used by these cells may need to be updated
+ ** (internal intkey nodes never contain pointers to overflow pages).
+ **
+ ** 5) If the sibling pages are not leaves, then the pointer-map
+ ** entries for the right-child pages of each sibling may need
+ ** to be updated.
+ **
+ ** Cases 1 and 2 are dealt with above by other code. The following
+ ** block deals with cases 3 and 4. Since setting a pointer map entry
+ ** is a relatively expensive operation, this code only sets pointer
+ ** map entries for child or overflow pages that have actually moved
+ ** between pages. */
+ if( ISAUTOVACUUM ){
+ MemPage *pNew = apNew[0];
+ MemPage *pOld = apCopy[0];
+ int nOverflow = pOld->nOverflow;
+ int iNextOld = pOld->nCell + nOverflow;
+ int iOverflow = (nOverflow ? pOld->aOvfl[0].idx : -1);
+ j = 0; /* Current 'old' sibling page */
+ k = 0; /* Current 'new' sibling page */
+ for(i=0; i<nCell && rc==SQLITE_OK; i++){
+ int isDivider = 0;
+ while( i==iNextOld ){
+ /* Cell i is the cell immediately following the last cell on old
+ ** sibling page j. If the siblings are not leaf pages of an
+ ** intkey b-tree, then cell i was a divider cell. */
+ pOld = apCopy[++j];
+ iNextOld = i + !leafData + pOld->nCell + pOld->nOverflow;
+ if( pOld->nOverflow ){
+ nOverflow = pOld->nOverflow;
+ iOverflow = i + !leafData + pOld->aOvfl[0].idx;
+ }
+ isDivider = !leafData;
+ }
+
+ assert(nOverflow>0 || iOverflow<i );
+ assert(nOverflow<2 || pOld->aOvfl[0].idx==pOld->aOvfl[1].idx-1);
+ assert(nOverflow<3 || pOld->aOvfl[1].idx==pOld->aOvfl[2].idx-1);
+ if( i==iOverflow ){
+ isDivider = 1;
+ if( (--nOverflow)>0 ){
+ iOverflow++;
+ }
+ }
+
+ if( i==cntNew[k] ){
+ /* Cell i is the cell immediately following the last cell on new
+ ** sibling page k. If the siblings are not leaf pages of an
+ ** intkey b-tree, then cell i is a divider cell. */
+ pNew = apNew[++k];
+ if( !leafData ) continue;
+ }
+ assert( rc==SQLITE_OK );
+ assert( j<nOld );
+ assert( k<nNew );
+
+ /* If the cell was originally divider cell (and is not now) or
+ ** an overflow cell, or if the cell was located on a different sibling
+ ** page before the balancing, then the pointer map entries associated
+ ** with any child or overflow pages need to be updated. */
+ if( isDivider || pOld->pgno!=pNew->pgno ){
+ if( !leafCorrection ){
+ rc = ptrmapPut(pBt, get4byte(apCell[i]), PTRMAP_BTREE, pNew->pgno);
+ }
+ if( szCell[i]>pNew->minLocal && rc==SQLITE_OK ){
+ rc = ptrmapPutOvflPtr(pNew, apCell[i]);
+ }
}
}
- }
- assert( sqlite3PagerIswriteable(pParent->pDbPage) );
- if( nxDiv==pParent->nCell+pParent->nOverflow ){
- /* Right-most sibling is the right-most child of pParent */
- put4byte(&pParent->aData[pParent->hdrOffset+8], pgnoNew[nNew-1]);
- }else{
- /* Right-most sibling is the left child of the first entry in pParent
- ** past the right-most divider entry */
- put4byte(findOverflowCell(pParent, nxDiv), pgnoNew[nNew-1]);
+
+ if( !leafCorrection ){
+ for(i=0; rc==SQLITE_OK && i<nNew; i++){
+ rc = ptrmapPut(
+ pBt, get4byte(&apNew[i]->aData[8]), PTRMAP_BTREE, apNew[i]->pgno);
+ }
+ }
+
+#if 0
+ /* The ptrmapCheckPages() contains assert() statements that verify that
+ ** all pointer map pages are set correctly. This is helpful while
+ ** debugging. This is usually disabled because a corrupt database may
+ ** cause an assert() statement to fail. */
+ ptrmapCheckPages(apNew, nNew);
+ ptrmapCheckPages(&pParent, 1);
+#endif
}
- /*
- ** Balance the parent page. Note that the current page (pPage) might
- ** have been added to the freelist so it might no longer be initialized.
- ** But the parent page will always be initialized.
- */
assert( pParent->isInit );
- sqlite3ScratchFree(apCell);
- apCell = 0;
TRACE(("BALANCE: finished with %d: old=%d new=%d cells=%d\n",
pPage->pgno, nOld, nNew, nCell));
-
+
/*
** Cleanup before returning.
*/
@@ -6123,9 +6217,9 @@ static int balance(BtCursor *pCur){
sqlite3PageFree(pFree);
}
- /* The pSpace buffer will be freed after the next call to
- ** balance_nonroot(), or just before this function returns, whichever
- ** comes first. */
+ /* The pSpace buffer will be freed after the next call to
+ ** balance_nonroot(), or just before this function returns, whichever
+ ** comes first. */
pFree = pSpace;
VVA_ONLY( pCur->pagesShuffled = 1 );
}
@@ -6367,198 +6461,110 @@ end_insert:
** is left pointing at a arbitrary location.
*/
int sqlite3BtreeDelete(BtCursor *pCur){
- MemPage *pPage = pCur->apPage[pCur->iPage];
- int idx;
- unsigned char *pCell;
- int rc;
- Pgno pgnoChild = 0;
Btree *p = pCur->pBtree;
- BtShared *pBt = p->pBt;
+ BtShared *pBt = p->pBt;
+ int rc; /* Return code */
+ MemPage *pPage; /* Page to delete cell from */
+ unsigned char *pCell; /* Pointer to cell to delete */
+ int iCellIdx; /* Index of cell to delete */
+ int iCellDepth; /* Depth of node containing pCell */
assert( cursorHoldsMutex(pCur) );
- assert( pPage->isInit );
assert( pBt->inTransaction==TRANS_WRITE );
assert( !pBt->readOnly );
- if( pCur->eState==CURSOR_FAULT ){
- return pCur->skip;
- }
- if( NEVER(pCur->aiIdx[pCur->iPage]>=pPage->nCell) ){
- return SQLITE_ERROR; /* The cursor is not pointing to anything */
- }
assert( pCur->wrFlag );
+ if( NEVER(pCur->aiIdx[pCur->iPage]>=pCur->apPage[pCur->iPage]->nCell)
+ || NEVER(pCur->eState!=CURSOR_VALID)
+ ){
+ return SQLITE_ERROR; /* Something has gone awry. */
+ }
+
rc = checkForReadConflicts(p, pCur->pgnoRoot, pCur, pCur->info.nKey);
if( rc!=SQLITE_OK ){
- /* The table pCur points to has a read lock */
assert( rc==SQLITE_LOCKED_SHAREDCACHE );
- return rc;
+ return rc; /* The table pCur points to has a read lock */
}
- /* Restore the current cursor position (a no-op if the cursor is not in
- ** CURSOR_REQUIRESEEK state) and save the positions of any other cursors
- ** open on the same table. Then call sqlite3PagerWrite() on the page
- ** that the entry will be deleted from.
- */
- if(
- (rc = restoreCursorPosition(pCur))!=0 ||
- (rc = saveAllCursors(pBt, pCur->pgnoRoot, pCur))!=0 ||
- (rc = sqlite3PagerWrite(pPage->pDbPage))!=0
- ){
- return rc;
- }
+ iCellDepth = pCur->iPage;
+ iCellIdx = pCur->aiIdx[iCellDepth];
+ pPage = pCur->apPage[iCellDepth];
+ pCell = findCell(pPage, iCellIdx);
- /* Locate the cell within its page and leave pCell pointing to the
- ** data. The clearCell() call frees any overflow pages associated with the
- ** cell. The cell itself is still intact.
- */
- idx = pCur->aiIdx[pCur->iPage];
- pCell = findCell(pPage, idx);
+ /* If the page containing the entry to delete is not a leaf page, move
+ ** the cursor to the largest entry in the tree that is smaller than
+ ** the entry being deleted. This cell will replace the cell being deleted
+ ** from the internal node. The 'previous' entry is used for this instead
+ ** of the 'next' entry, as the previous entry is always a part of the
+ ** sub-tree headed by the child page of the cell being deleted. This makes
+ ** balancing the tree following the delete operation easier. */
if( !pPage->leaf ){
- pgnoChild = get4byte(pCell);
+ int notUsed;
+ if( SQLITE_OK!=(rc = sqlite3BtreePrevious(pCur, &notUsed)) ){
+ return rc;
+ }
}
- rc = clearCell(pPage, pCell);
- if( rc ){
+
+ /* Save the positions of any other cursors open on this table before
+ ** making any modifications. Make the page containing the entry to be
+ ** deleted writable. Then free any overflow pages associated with the
+ ** entry and finally remove the cell itself from within the page. */
+ if( SQLITE_OK!=(rc = saveAllCursors(pBt, pCur->pgnoRoot, pCur))
+ || SQLITE_OK!=(rc = sqlite3PagerWrite(pPage->pDbPage))
+ || SQLITE_OK!=(rc = clearCell(pPage, pCell))
+ || SQLITE_OK!=(rc = dropCell(pPage, iCellIdx, cellSizePtr(pPage, pCell)))
+ ){
return rc;
}
+ /* If the cell deleted was not located on a leaf page, then the cursor
+ ** is currently pointing to the largest entry in the sub-tree headed
+ ** by the child-page of the cell that was just deleted from an internal
+ ** node. The cell from the leaf node needs to be moved to the internal
+ ** node to replace the deleted cell. */
if( !pPage->leaf ){
- /*
- ** The entry we are about to delete is not a leaf so if we do not
- ** do something we will leave a hole on an internal page.
- ** We have to fill the hole by moving in a cell from a leaf. The
- ** next Cell after the one to be deleted is guaranteed to exist and
- ** to be a leaf so we can use it.
- */
- BtCursor leafCur;
- MemPage *pLeafPage = 0;
-
- unsigned char *pNext;
- int notUsed;
- unsigned char *tempCell = 0;
- assert( !pPage->intKey );
- sqlite3BtreeGetTempCursor(pCur, &leafCur);
- rc = sqlite3BtreeNext(&leafCur, &notUsed);
- if( rc==SQLITE_OK ){
- assert( leafCur.aiIdx[leafCur.iPage]==0 );
- pLeafPage = leafCur.apPage[leafCur.iPage];
- rc = sqlite3PagerWrite(pLeafPage->pDbPage);
- }
- if( rc==SQLITE_OK ){
- int leafCursorInvalid = 0;
- u16 szNext;
- TRACE(("DELETE: table=%d delete internal from %d replace from leaf %d\n",
- pCur->pgnoRoot, pPage->pgno, pLeafPage->pgno));
- dropCell(pPage, idx, cellSizePtr(pPage, pCell));
- pNext = findCell(pLeafPage, 0);
- szNext = cellSizePtr(pLeafPage, pNext);
- assert( MX_CELL_SIZE(pBt)>=szNext+4 );
- allocateTempSpace(pBt);
- tempCell = pBt->pTmpSpace;
- if( tempCell==0 ){
- rc = SQLITE_NOMEM;
- }
- if( rc==SQLITE_OK ){
- rc = insertCell(pPage, idx, pNext-4, szNext+4, tempCell, 0);
- }
-
+ MemPage *pLeaf = pCur->apPage[pCur->iPage];
+ int nCell;
+ Pgno n = pCur->apPage[iCellDepth+1]->pgno;
+ unsigned char *pTmp;
- /* The "if" statement in the next code block is critical. The
- ** slightest error in that statement would allow SQLite to operate
- ** correctly most of the time but produce very rare failures. To
- ** guard against this, the following macros help to verify that
- ** the "if" statement is well tested.
- */
- testcase( pPage->nOverflow==0 && pPage->nFree<pBt->usableSize*2/3
- && pLeafPage->nFree+2+szNext > pBt->usableSize*2/3 );
- testcase( pPage->nOverflow==0 && pPage->nFree==pBt->usableSize*2/3
- && pLeafPage->nFree+2+szNext > pBt->usableSize*2/3 );
- testcase( pPage->nOverflow==0 && pPage->nFree==pBt->usableSize*2/3+1
- && pLeafPage->nFree+2+szNext > pBt->usableSize*2/3 );
- testcase( pPage->nOverflow>0 && pPage->nFree<=pBt->usableSize*2/3
- && pLeafPage->nFree+2+szNext > pBt->usableSize*2/3 );
- testcase( (pPage->nOverflow>0 || (pPage->nFree > pBt->usableSize*2/3))
- && pLeafPage->nFree+2+szNext == pBt->usableSize*2/3 );
-
-
- if( (pPage->nOverflow>0 || (pPage->nFree > pBt->usableSize*2/3)) &&
- (pLeafPage->nFree+2+szNext > pBt->usableSize*2/3)
- ){
- /* This branch is taken if the internal node is now either overflowing
- ** or underfull and the leaf node will be underfull after the just cell
- ** copied to the internal node is deleted from it. This is a special
- ** case because the call to balance() to correct the internal node
- ** may change the tree structure and invalidate the contents of
- ** the leafCur.apPage[] and leafCur.aiIdx[] arrays, which will be
- ** used by the balance() required to correct the underfull leaf
- ** node.
- **
- ** The formula used in the expression above are based on facets of
- ** the SQLite file-format that do not change over time.
- */
- testcase( pPage->nFree==pBt->usableSize*2/3+1 );
- testcase( pLeafPage->nFree+2+szNext==pBt->usableSize*2/3+1 );
- leafCursorInvalid = 1;
- }
+ pCell = findCell(pLeaf, pLeaf->nCell-1);
+ nCell = cellSizePtr(pLeaf, pCell);
+ assert( MX_CELL_SIZE(pBt)>=nCell );
- if( rc==SQLITE_OK ){
- assert( sqlite3PagerIswriteable(pPage->pDbPage) );
- put4byte(findOverflowCell(pPage, idx), pgnoChild);
- VVA_ONLY( pCur->pagesShuffled = 0 );
- rc = balance(pCur);
- }
+ allocateTempSpace(pBt);
+ pTmp = pBt->pTmpSpace;
- if( rc==SQLITE_OK && leafCursorInvalid ){
- /* The leaf-node is now underfull and so the tree needs to be
- ** rebalanced. However, the balance() operation on the internal
- ** node above may have modified the structure of the B-Tree and
- ** so the current contents of leafCur.apPage[] and leafCur.aiIdx[]
- ** may not be trusted.
- **
- ** It is not possible to copy the ancestry from pCur, as the same
- ** balance() call has invalidated the pCur->apPage[] and aiIdx[]
- ** arrays.
- **
- ** The call to saveCursorPosition() below internally saves the
- ** key that leafCur is currently pointing to. Currently, there
- ** are two copies of that key in the tree - one here on the leaf
- ** page and one on some internal node in the tree. The copy on
- ** the leaf node is always the next key in tree-order after the
- ** copy on the internal node. So, the call to sqlite3BtreeNext()
- ** calls restoreCursorPosition() to point the cursor to the copy
- ** stored on the internal node, then advances to the next entry,
- ** which happens to be the copy of the key on the internal node.
- ** Net effect: leafCur is pointing back to the duplicate cell
- ** that needs to be removed, and the leafCur.apPage[] and
- ** leafCur.aiIdx[] arrays are correct.
- */
- VVA_ONLY( Pgno leafPgno = pLeafPage->pgno );
- rc = saveCursorPosition(&leafCur);
- if( rc==SQLITE_OK ){
- rc = sqlite3BtreeNext(&leafCur, &notUsed);
- }
- pLeafPage = leafCur.apPage[leafCur.iPage];
- assert( rc!=SQLITE_OK || pLeafPage->pgno==leafPgno );
- assert( rc!=SQLITE_OK || leafCur.aiIdx[leafCur.iPage]==0 );
- }
-
- if( SQLITE_OK==rc
- && SQLITE_OK==(rc = sqlite3PagerWrite(pLeafPage->pDbPage))
- ){
- dropCell(pLeafPage, 0, szNext);
- VVA_ONLY( leafCur.pagesShuffled = 0 );
- rc = balance(&leafCur);
- assert( leafCursorInvalid || !leafCur.pagesShuffled
- || !pCur->pagesShuffled );
- }
+ if( SQLITE_OK!=(rc = sqlite3PagerWrite(pLeaf->pDbPage))
+ || SQLITE_OK!=(rc = insertCell(pPage, iCellIdx, pCell-4, nCell+4, pTmp, n))
+ || SQLITE_OK!=(rc = dropCell(pLeaf, pLeaf->nCell-1, nCell))
+ ){
+ return rc;
}
- sqlite3BtreeReleaseTempCursor(&leafCur);
- }else{
- TRACE(("DELETE: table=%d delete from leaf %d\n",
- pCur->pgnoRoot, pPage->pgno));
- rc = dropCell(pPage, idx, cellSizePtr(pPage, pCell));
- if( rc==SQLITE_OK ){
- rc = balance(pCur);
+ }
+
+ /* Balance the tree. If the entry deleted was located on a leaf page,
+ ** then the cursor still points to that page. In this case the first
+ ** call to balance() repairs the tree, and the if(...) condition is
+ ** never true.
+ **
+ ** Otherwise, if the entry deleted was on an internal node page, then
+ ** pCur is pointing to the leaf page from which a cell was removed to
+ ** replace the cell deleted from the internal node. This is slightly
+ ** tricky as the leaf node may be underfull, and the internal node may
+ ** be either under or overfull. In this case run the balancing algorithm
+ ** on the leaf node first. If the balance proceeds far enough up the
+ ** tree that we can be sure that any problem in the internal node has
+ ** been corrected, so be it. Otherwise, after balancing the leaf node,
+ ** walk the cursor up the tree to the internal node and balance it as
+ ** well. */
+ rc = balance(pCur);
+ if( rc==SQLITE_OK && pCur->iPage>iCellDepth ){
+ while( pCur->iPage>iCellDepth ){
+ releasePage(pCur->apPage[pCur->iPage--]);
}
+ rc = balance(pCur);
}
+
if( rc==SQLITE_OK ){
moveToRoot(pCur);
}