diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/btree.c | 1026 |
1 files changed, 410 insertions, 616 deletions
diff --git a/src/btree.c b/src/btree.c index 5bec869c5..7a0ff65f2 100644 --- a/src/btree.c +++ b/src/btree.c @@ -21,7 +21,7 @@ ** http://www.hwaci.com/drh/ ** ************************************************************************* -** $Id: btree.c,v 1.11 2001/06/08 00:21:52 drh Exp $ +** $Id: btree.c,v 1.12 2001/06/10 19:56:59 drh Exp $ ** ** This file implements a external (disk-based) database using BTrees. ** For a detailed discussion of BTrees, refer to @@ -69,8 +69,10 @@ /* ** Primitive data types. u32 must be 4 bytes and u16 must be 2 bytes. +** The uptr type must be big enough to hold a pointer. ** Change these typedefs when porting to new architectures. */ +typedef unsigned int uptr; typedef unsigned int u32; typedef unsigned short int u16; typedef unsigned char u8; @@ -128,12 +130,12 @@ struct PageOne { ** structure. ** ** PageHdr.firstFree is 0 if there is no free space on this page. -** Otherwise, PageHdr.firstFree is the index in MemPage.aDisk[] of a +** Otherwise, PageHdr.firstFree is the index in MemPage.u.aDisk[] of a ** FreeBlk structure that describes the first block of free space. ** All free space is defined by a linked list of FreeBlk structures. ** ** Data is stored in a linked list of Cell structures. PageHdr.firstCell -** is the index into MemPage.aDisk[] of the first cell on the page. The +** is the index into MemPage.u.aDisk[] of the first cell on the page. The ** Cells are kept in sorted order. ** ** A Cell contains all information about a database entry and a pointer @@ -143,8 +145,8 @@ struct PageOne { */ struct PageHdr { Pgno rightChild; /* Child page that comes after all cells on this page */ - u16 firstCell; /* Index in MemPage.aDisk[] of the first cell */ - u16 firstFree; /* Index in MemPage.aDisk[] of the first free block */ + u16 firstCell; /* Index in MemPage.u.aDisk[] of the first cell */ + u16 firstFree; /* Index in MemPage.u.aDisk[] of the first free block */ }; /* @@ -160,7 +162,7 @@ struct PageHdr { struct CellHdr { Pgno leftChild; /* Child page that comes before this cell */ u16 nKey; /* Number of bytes in the key */ - u16 iNext; /* Index in MemPage.aDisk[] of next cell in sorted order */ + u16 iNext; /* Index in MemPage.u.aDisk[] of next cell in sorted order */ u32 nData; /* Number of bytes of data */ } @@ -214,11 +216,11 @@ struct Cell { */ struct FreeBlk { u16 iSize; /* Number of bytes in this block of free space */ - u16 iNext; /* Index in MemPage.aDisk[] of the next free block */ + u16 iNext; /* Index in MemPage.u.aDisk[] of the next free block */ }; /* -** Number of bytes on a single overflow page. +** The number of bytes of payload that will fit on a single overflow page. */ #define OVERFLOW_SIZE (SQLITE_PAGE_SIZE-sizeof(Pgno)) @@ -234,35 +236,48 @@ struct FreeBlk { ** pages. */ struct OverflowPage { - Pgno next; + Pgno iNext; char aPayload[OVERFLOW_SIZE]; }; /* ** For every page in the database file, an instance of the following structure -** is stored in memory. The aDisk[] array contains the raw bits read from +** is stored in memory. The u.aDisk[] array contains the raw bits read from ** the disk. The rest is auxiliary information that held in memory only. The ** auxiliary info is only valid for regular database pages - it is not ** used for overflow pages and pages on the freelist. ** ** Of particular interest in the auxiliary info is the apCell[] entry. Each -** apCell[] entry is a pointer to a Cell structure in aDisk[]. The cells are +** apCell[] entry is a pointer to a Cell structure in u.aDisk[]. The cells are ** put in this array so that they can be accessed in constant time, rather ** than in linear time which would be needed if we had to walk the linked ** list on every access. ** +** Note that apCell[] contains enough space to hold up to two more Cells +** than can possibly fit on one page. In the steady state, every apCell[] +** points to memory inside u.aDisk[]. But in the middle of an insert +** operation, some apCell[] entries may temporarily point to data space +** outside of u.aDisk[]. This is a transient situation that is quickly +** resolved. But while it is happening, it is possible for a database +** page to hold as many as two more cells than it might otherwise hold. +** The extra too entries in apCell[] are an allowance for this situation. +** ** The pParent field points back to the parent page. This allows us to ** walk up the BTree from any leaf to the root. Care must be taken to ** unref() the parent page pointer when this page is no longer referenced. ** The pageDestructor() routine handles that chore. */ struct MemPage { - char aDisk[SQLITE_PAGE_SIZE]; /* Page data stored on disk */ + union { + char aDisk[SQLITE_PAGE_SIZE]; /* Page data stored on disk */ + PageHdr hdr; /* Overlay page header */ + } u; int isInit; /* True if auxiliary data is initialized */ MemPage *pParent; /* The parent of this page. NULL for root */ - int nFree; /* Number of free bytes in aDisk[] */ + int nFree; /* Number of free bytes in u.aDisk[] */ int nCell; /* Number of entries on this page */ - Cell *apCell[MX_CELL+1]; /* All data entires in sorted order */ + int isOverfull; /* Some apCell[] points outside u.aDisk[] */ + Cell *apCell[MX_CELL+2]; /* All data entires in sorted order */ } /* @@ -290,7 +305,7 @@ typedef Btree Bt; */ struct BtCursor { Btree *pBt; /* The Btree to which this cursor belongs */ - BtCursor *pPrev, *pNext; /* List of all cursors */ + BtCursor *pNext, *pPrev; /* Forms a linked list of all cursors */ Pgno pgnoRoot; /* The root page of this tree */ MemPage *pPage; /* Page that contains the entry */ u16 idx; /* Index of the entry in pPage->apCell[] */ @@ -322,28 +337,27 @@ static int cellSize(Cell *pCell){ ** into one big FreeBlk at the end of the page. */ static void defragmentPage(MemPage *pPage){ - int pc; - int i, n; + int pc, i, n; FreeBlk *pFBlk; char newPage[SQLITE_PAGE_SIZE]; pc = sizeof(PageHdr); - ((PageHdr*)pPage)->firstCell = pc; - memcpy(newPage, pPage->aDisk, pc); + pPage->u.hdr.firstCell = pc; + memcpy(newPage, pPage->u.aDisk, pc); for(i=0; i<pPage->nCell; i++){ Cell *pCell = &pPage->apCell[i]; n = cellSize(pCell); - pCell->h.iNext = i<pPage->nCell ? pc + n : 0; + pCell->h.iNext = i<pPage->nCell-1 ? pc + n : 0; memcpy(&newPage[pc], pCell, n); - pPage->apCell[i] = (Cell*)&pPage->aDisk[pc]; + pPage->apCell[i] = (Cell*)&pPage->u.aDisk[pc]; pc += n; } assert( pPage->nFree==SQLITE_PAGE_SIZE-pc ); - memcpy(pPage->aDisk, newPage, pc); - pFBlk = &pPage->aDisk[pc]; + memcpy(pPage->u.aDisk, newPage, pc); + pFBlk = &pPage->u.aDisk[pc]; pFBlk->iSize = SQLITE_PAGE_SIZE - pc; pFBlk->iNext = 0; - ((PageHdr*)pPage)->firstFree = pc; + pPage->u.hdr.firstFree = pc; memset(&pFBlk[1], 0, SQLITE_PAGE_SIZE - pc - sizeof(FreeBlk)); } @@ -351,7 +365,7 @@ static void defragmentPage(MemPage *pPage){ ** Allocate nByte bytes of space on a page. nByte must be a ** multiple of 4. ** -** Return the index into pPage->aDisk[] of the first byte of +** Return the index into pPage->u.aDisk[] of the first byte of ** the new allocation. Or return 0 if there is not enough free ** space on the page to satisfy the allocation request. ** @@ -366,24 +380,24 @@ static int allocateSpace(MemPage *pPage, int nByte){ int start; assert( nByte==ROUNDUP(nByte) ); - if( pPage->nFree<nByte ) return 0; - pIdx = &((PageHdr*)pPage)->firstFree; - p = (FreeBlk*)&pPage->aDisk[*pIdx]; + if( pPage->nFree<nByte || pPage->isOverfull ) return 0; + pIdx = &pPage->u.hdr.firstFree; + p = (FreeBlk*)&pPage->u.aDisk[*pIdx]; while( p->iSize<nByte ){ if( p->iNext==0 ){ defragmentPage(pPage); - pIdx = &((PageHdr*)pPage)->firstFree; + pIdx = &pPage->u.hdr.firstFree; }else{ pIdx = &p->iNext; } - p = (FreeBlk*)&pPage->aDisk[*pIdx]; + p = (FreeBlk*)&pPage->u.aDisk[*pIdx]; } if( p->iSize==nByte ){ start = *pIdx; *pIdx = p->iNext; }else{ start = *pIdx; - FreeBlk *pNew = (FreeBlk*)&pPage->aDisk[start + nByte]; + FreeBlk *pNew = (FreeBlk*)&pPage->u.aDisk[start + nByte]; pNew->iNext = p->iNext; pNew->iSize = p->iSize - nByte; *pIdx = start + nByte; @@ -393,9 +407,10 @@ static int allocateSpace(MemPage *pPage, int nByte){ } /* -** Return a section of the MemPage.aDisk[] to the freelist. -** The first byte of the new free block is pPage->aDisk[start] -** and the size of the block is "size" bytes. +** Return a section of the MemPage.u.aDisk[] to the freelist. +** The first byte of the new free block is pPage->u.aDisk[start] +** and the size of the block is "size" bytes. Size must be +** a multiple of 4. ** ** Most of the effort here is involved in coalesing adjacent ** free blocks into a single big free block. @@ -409,14 +424,14 @@ static void freeSpace(MemPage *pPage, int start, int size){ assert( size == ROUNDUP(size) ); assert( start == ROUNDUP(start) ); - pIdx = &((PageHdr*)pPage)->firstFree; + pIdx = &pPage->u.hdr.firstFree; idx = *pIdx; while( idx!=0 && idx<start ){ - pFBlk = (FreeBlk*)&pPage->aDisk[idx]; + pFBlk = (FreeBlk*)&pPage->u.aDisk[idx]; if( idx + pFBlk->iSize == start ){ pFBlk->iSize += size; if( idx + pFBlk->iSize == pFBlk->iNext ){ - pNext = (FreeBlk*)&pPage->aDisk[pFblk->iNext]; + pNext = (FreeBlk*)&pPage->u.aDisk[pFblk->iNext]; pFBlk->iSize += pNext->iSize; pFBlk->iNext = pNext->iNext; } @@ -426,12 +441,12 @@ static void freeSpace(MemPage *pPage, int start, int size){ pIdx = &pFBlk->iNext; idx = *pIdx; } - pNew = (FreeBlk*)&pPage->aDisk[start]; + pNew = (FreeBlk*)&pPage->u.aDisk[start]; if( idx != end ){ pNew->iSize = size; pNew->iNext = idx; }else{ - pNext = (FreeBlk*)&pPage->aDisk[idx]; + pNext = (FreeBlk*)&pPage->u.aDisk[idx]; pNew->iSize = size + pNext->iSize; pNew->iNext = pNext->iNext; } @@ -454,9 +469,9 @@ static void freeSpace(MemPage *pPage, int start, int size){ ** we failed to detect any corruption. */ static int initPage(MemPage *pPage, Pgno pgnoThis, MemPage *pParent){ - int idx; /* An index into pPage->aDisk[] */ - Cell *pCell; /* A pointer to a Cell in pPage->aDisk[] */ - FreeBlk *pFBlk; /* A pointer to a free block in pPage->aDisk[] */ + int idx; /* An index into pPage->u.aDisk[] */ + Cell *pCell; /* A pointer to a Cell in pPage->u.aDisk[] */ + FreeBlk *pFBlk; /* A pointer to a free block in pPage->u.aDisk[] */ int sz; /* The size of a Cell in bytes */ int freeSpace; /* Amount of free space on the page */ @@ -472,11 +487,11 @@ static int initPage(MemPage *pPage, Pgno pgnoThis, MemPage *pParent){ pPage->isInit = 1; pPage->nCell = 0; freeSpace = SQLITE_PAGE_SIZE - sizeof(PageHdr); - idx = ((PageHdr*)pPage)->firstCell; + idx = pPage->u.hdr.firstCell; while( idx!=0 ){ if( idx>SQLITE_PAGE_SIZE-MN_CELL_SIZE ) goto page_format_error; if( idx<sizeof(PageHdr) ) goto page_format_error; - pCell = (Cell*)&pPage->aDisk[idx]; + pCell = (Cell*)&pPage->u.aDisk[idx]; sz = cellSize(pCell); if( idx+sz > SQLITE_PAGE_SIZE ) goto page_format_error; freeSpace -= sz; @@ -484,11 +499,11 @@ static int initPage(MemPage *pPage, Pgno pgnoThis, MemPage *pParent){ idx = pCell->h.iNext; } pPage->nFree = 0; - idx = ((PageHdr*)pPage)->firstFree; + idx = pPage->u.hdr.firstFree; while( idx!=0 ){ if( idx>SQLITE_PAGE_SIZE-sizeof(FreeBlk) ) goto page_format_error; if( idx<sizeof(PageHdr) ) goto page_format_error; - pFBlk = (FreeBlk*)&pPage->aDisk[idx]; + pFBlk = (FreeBlk*)&pPage->u.aDisk[idx]; pPage->nFree += pFBlk->iSize; if( pFBlk->iNext <= idx ) goto page_format_error; idx = pFBlk->iNext; @@ -506,36 +521,6 @@ page_format_error: } /* -** Recompute the MemPage.apCell[], MemPage.nCell, and MemPage.nFree parameters -** for a cell after the MemPage.aDisk[] content has be changed significantly. -** -** The computation here is similar to initPage() except that in this case -** the MemPage.aDisk[] field has been set up internally (instead of -** having been read from disk) so we do not need to do as much error -** checking. -*/ -static void reinitPage(MemPage *pPage){ - Cell *pCell; - - pPage->nCell = 0; - idx = ((PageHdr*)pPage)->firstCell; - while( idx!=0 ){ - pCell = (Cell*)&pPage->aDisk[idx]; - sz = cellSize(pCell); - pPage->apCell[pPage->nCell++] = pCell; - idx = pCell->h.iNext; - } - pPage->nFree = 0; - idx = ((PageHdr*)pPage)->firstFree; - while( idx!=0 ){ - pFBlk = (FreeBlk*)&pPage->aDisk[idx]; - pPage->nFree += pFBlk->iSize; - idx = pFBlk->iNext; - } - return SQLITE_OK; -} - -/* ** Set up a raw page so that it looks like a database page holding ** no entries. */ @@ -543,7 +528,7 @@ static void zeroPage(MemPage *pPage){ PageHdr *pHdr; FreeBlk *pFBlk; memset(pPage, 0, SQLITE_PAGE_SIZE); - pHdr = (PageHdr*)pPage; + pHdr = &pPage->u.hdr; pHdr->firstCell = 0; pHdr->firstFree = sizeof(*pHdr); pFBlk = (FreeBlk*)&pHdr[1]; @@ -757,14 +742,14 @@ int sqliteBtreeCursor(Btree *pBt, int iTable, BtCursor **ppCur){ if( rc!=SQLITE_OK ){ goto create_cursor_exception; } - pCur->pPrev = 0; + pCur->pBt = pBt; + pCur->idx = 0; pCur->pNext = pBt->pCursor; if( pCur->pNext ){ pCur->pNext->pPrev = pCur; } + pCur->pPrev = 0; pBt->pCursor = pCur; - pCur->pBt = pBt; - pCur->idx = 0; *ppCur = pCur; return SQLITE_OK; @@ -802,7 +787,7 @@ int sqliteBtreeCloseCursor(BtCursor *pCur){ ** Make a temporary cursor by filling in the fields of pTempCur. ** The temporary cursor is not on the cursor list for the Btree. */ -static void CreateTemporaryCursor(BtCursor *pCur, BtCursor *pTempCur){ +static void getTempCursor(BtCursor *pCur, BtCursor *pTempCur){ memcpy(pTempCur, pCur, sizeof(*pCur)); pTempCur->pNext = 0; pTempCur->pPrev = 0; @@ -813,7 +798,7 @@ static void CreateTemporaryCursor(BtCursor *pCur, BtCursor *pTempCur){ ** Delete a temporary cursor such as was made by the CreateTemporaryCursor() ** function above. */ -static void DestroyTemporaryCursor(BeCursor *pCur){ +static void releaseTempCursor(BtCursor *pCur){ sqlitepager_unref(pCur->pPage); } @@ -875,7 +860,7 @@ static int getPayload(BtCursor *pCur, int offset, int amt, char *zBuf){ if( rc!=0 ){ return rc; } - nextPage = pOvfl->next; + nextPage = pOvfl->iNext; if( offset<OVERFLOW_SIZE ){ int a = amt; if( a + offset > OVERFLOW_SIZE ){ @@ -1009,7 +994,7 @@ static int compareKey(BtCursor *pCur, char *pKey, int nKeyOrig, int *pResult){ if( rc ){ return rc; } - nextPage = pOvfl->next; + nextPage = pOvfl->iNext; n = nKey; if( n>OVERFLOW_SIZE ){ n = OVERFLOW_SIZE; @@ -1155,7 +1140,7 @@ int sqliteBtreeMoveto(BtCursor *pCur, void *pKey, int nKey, int *pRes){ } assert( lwr==upr+1 ); if( lwr>=pPage->nCell ){ - chldPg = ((PageHdr*)pPage)->rightChild; + chldPg = pPage->u.hdr.rightChild; }else{ chldPg = pPage->apCell[lwr]->h.leftChild; } @@ -1185,8 +1170,8 @@ int sqliteBtreeNext(BtCursor *pCur, int *pRes){ } pCur->idx++; if( pCur->idx>=pCur->pPage->nCell ){ - if( ((PageHdr*)pPage)->rightChild ){ - rc = moveToChild(pCur, ((PageHdr*)pPage)->rightChild); + if( pPage->u.hdr.rightChild ){ + rc = moveToChild(pCur, pPage->u.hdr.rightChild); if( rc ) return rc; rc = moveToLeftmost(pCur); if( rc ) return rc; @@ -1236,7 +1221,7 @@ static int allocatePage(Btree *pBt, MemPage **ppPage, Pgno *pPgno){ sqlitepager_unref(pOvfl); return rc; } - pPage1->freeList = pOvfl->next; + pPage1->freeList = pOvfl->iNext; *ppPage = (MemPage*)pOvfl; }else{ *pPgno = sqlitepager_pagecount(pBt->pPager); @@ -1279,7 +1264,7 @@ static int freePage(Btree *pBt, void *pPage, Pgno pgno){ if( needOvflUnref ) sqlitepager_unref(pOvfl); return rc; } - pOvfl->next = pPage1->freeList; + pOvfl->iNext = pPage1->freeList; pPage1->freeList = pgno; memset(pOvfl->aPayload, 0, OVERFLOW_SIZE); pPage->isInit = 0; @@ -1306,7 +1291,7 @@ static int clearCell(Btree *pBt, Cell *pCell){ while( ovfl ){ rc = sqlitepager_get(pPager, ovfl, &pOvfl); if( rc ) return rc; - nextOvfl = pOvfl->next; + nextOvfl = pOvfl->iNext; rc = freePage(pBt, pOvfl, ovfl); if( rc ) return rc; ovfl = nextOvfl; @@ -1354,7 +1339,7 @@ static int fillInCell( } spaceLeft = OVERFLOW_SIZE; pSpace = pOvfl->aPayload; - pNextPg = &pOvfl->next; + pNextPg = &pOvfl->iNext; } n = nPayload; if( n>spaceLeft ) n = spaceLeft; @@ -1403,17 +1388,115 @@ static void reparentChildPages(Pager *pPager, Page *pPage){ for(i=0; i<pPage->nCell; i++){ reparentPage(pPager, pPage->apCell[i]->leftChild, pPage); } - reparentPage(pPager, ((PageHdr*)pPage)->rightChild, pPage); + reparentPage(pPager, pPage->u.hdr.rightChild, pPage); +} + +/* +** Remove the i-th cell from pPage. This routine effects pPage only. +** The cell content is not freed or deallocated. It is assumed that +** the cell content has been copied someplace else. This routine just +** removes the reference to the cell from pPage. +** +** "sz" must be the number of bytes in the cell. +** +** Do not bother maintaining the integrity of the linked list of Cells. +** Only pPage->apCell[] is important. The relinkCellList() routine +** will be called soon after this routine in order to rebuild the +** linked list. +*/ +static void dropCell(MemPage *pPage, int i, int sz){ + int j; + assert( i>=0 && i<pPage->nCell ); + assert( sz==cellSize(pPage->apCell[i]); + freeSpace(pPage, idx, sz); + for(j=i, j<pPage->nCell-2; j++){ + pPage->apCell[j] = pPage->apCell[j+1]; + } + pPage->nCell--; +} + +/* +** Insert a new cell on pPage at cell index "i". pCell points to the +** content of the cell. +** +** If the cell content will fit on the page, then put it there. If it +** will not fit, then just make pPage->apCell[i] point to the content +** and set pPage->isOverfull. +** +** Do not bother maintaining the integrity of the linked list of Cells. +** Only pPage->apCell[] is important. The relinkCellList() routine +** will be called soon after this routine in order to rebuild the +** linked list. +*/ +static void insertCell(MemPage *pPage, int i, Cell *pCell, int sz){ + int idx, j; + assert( i>=0 && i<=pPage->nCell ); + assert( sz==cellSize(pCell) ); + for(j=pPage->nCell; j>i; j--){ + pPage->apCell[j] = pPage->apCell[j-1]; + } + pPage->nCell++; + idx = allocateSpace(pPage, sz); + if( idx<=0 ){ + pPage->isOverfull = 1; + pPage->apCell[i] = pCell; + }else{ + memcpy(&pPage->u.aDisk[idx], pCell, sz); + pPage->apCell[i] = (Cell*)&pPage->u.aDisk[idx]); + } +} + +/* +** Rebuild the linked list of cells on a page so that the cells +** occur in the order specified by pPage->apCell[]. Invoke this +** routine once to repair damage after one or more invocations +** of either insertCell() or dropCell(). +*/ +static void relinkCellList(MemPage *pPage){ + int i; + u16 *pIdx; + pIdx = &pPage->u.hdr.firstCell; + for(i=0; i<pPage->nCell; i++){ + int idx = ((uptr)pPage->apCell[i]) - (uptr)pPage; + *pIdx = idx; + pIdx = &pPage->apCell[i]->h.iNext; + } + *pIdx = 0; +} + +/* +** Make a copy of the contents of pFrom into pTo. The pFrom->apCell[] +** pointers that point intto pFrom->u.aDisk[] must be adjusted to point +** intto pTo->u.aDisk[] instead. But some pFrom->apCell[] entries might +** not point to pFrom->u.aDisk[]. Those are unchanged. +*/ +static void copyPage(MemPage *pTo, MemPage *pFrom){ + uptr from, to; + int i; + memcpy(pTo->u.aDisk, pFrom->u.aDisk, SQLITE_PAGE_SIZE); + pTo->pParent = pFrom->pParent; + pTo->isInit = 1; + pTo->nCell = pFrom->nCell; + pTo->nFree = pFrom->nFree; + pTo->isOverfull = pFrom->isOverfull; + to = (unsigned int)pTo; + from = (unsigned int)pFrom; + for(i=0; i<pTo->nCell; i++){ + uptr addr = (uptr)(pFrom->apCell[i]); + if( addr>from && addr<from+SQLITE_PAGE_SIZE ){ + *((uptr*)&pTo->apCell[i]) = addr + to - from; + } + } } /* ** This routine redistributes Cells on pPage and up to two siblings ** of pPage so that all pages have about the same amount of free space. -** Usually one siblings on either side of pPage are used in the repack, +** Usually one sibling on either side of pPage is used in the balancing, ** though both siblings might come from one side if pPage is the first ** or last child of its parent. If pPage has fewer than two siblings ** (something which can only happen if pPage is the root page or a -** child of root) then all siblings participate in the repack. +** child of root) then all available siblings participate in the balancing. ** ** The number of siblings of pPage might be increased or decreased by ** one in order to keep all pages between 2/3 and completely full. If @@ -1421,16 +1504,29 @@ static void reparentChildPages(Pager *pPager, Page *pPage){ ** or decreased by one, as necessary, to keep the root page from being ** overfull or empty. ** +** This routine calls relinkCellList() on its input page regardless of +** whether or not it does any real balancing. Client routines will typically +** invoke insertCell() or dropCell() before calling this routine, so we +** need to call relinkCellList() to clean up the mess that those other +** routines left behind. +** +** pCur is left pointing to the same cell as when this routine was called +** event if that cell gets moved to a different page. pCur may be NULL. +** ** Note that when this routine is called, some of the Cells on pPage -** might not actually be stored in pPage->aDisk[]. This can happen +** might not actually be stored in pPage->u.aDisk[]. This can happen ** if the page is overfull. Part of the job of this routine is to -** make sure all Cells for pPage once again fit in pPage->aDisk[]. +** make sure all Cells for pPage once again fit in pPage->u.aDisk[]. +** +** If this routine fails for any reason, it means the database may have +** been left in a corrupted state and should be rolled back. */ -static int repack(Btree *pBt, MemPage *pPage){ +static int balance(Btree *pBt, MemPage *pPage, BtCursor *pCur){ MemPage *pParent; /* The parent of pPage */ - MemPage *apOld[3]; /* pPage and up to two siblings before repack */ + MemPage *apOld[3]; /* pPage and up to two siblings */ Pgno pgnoOld[3]; /* Page numbers for each page in apOld[] */ - MemPage *apNew[4]; /* pPage and up to 3 siblings after repack */ + MemPage *apNew[4]; /* pPage and up to 3 siblings after balancing */ + Pgno pgnoNew[4]; /* Page numbers for each page in apNew[] */ int idxDiv[3]; /* Indices of divider cells in pParent */ Cell *apDiv[3]; /* Divider cells in pParent */ int nCell; /* Number of cells in apCell[] */ @@ -1438,35 +1534,46 @@ static int repack(Btree *pBt, MemPage *pPage){ int nNew; /* Number of pages in apNew[] */ int perPage; /* Approximate number of bytes per page */ int nDiv; /* Number of cells in apDiv[] */ - Cell *apCell[MX_CELL*3+5]; /* All cells from pages being repacked */ - int unrefPage = 0; /* If true, then unref pPage when done */ - - /* - ** Early out if no repacking is needed. + int i, j, k; /* Loop counters */ + int idx; /* Index of pPage in pParent->apCell[] */ + int nxDiv; /* Next divider slot in pParent->apCell[] */ + int rc; /* The return code */ + int iCur; /* apCell[iCur] is the cell of the cursor */ + int usedPerPage; /* Memory needed for each page */ + int freePerPage; /* Average free space per page */ + Cell *apCell[MX_CELL*3+5]; /* All cells from pages being balanceed */ + int szCell[MX_CELL*3+5]; /* Local size of all cells */ + Cell aTemp[2]; /* Temporary holding area for apDiv[] */ + MemPage aOld[3]; /* Temporary copies of pPage and its siblings */ + + /* + ** Return without doing any work if pPage is neither overfull nor + ** underfull. */ - if( pPage->nFree>=0 && pPage->nFree<SQLITE_PAGE_SIZE/2 ){ + if( !pPage->isOverfull && pPage->nFree<SQLITE_PAGE_SIZE/2 ){ + relinkCellList(pPage); return SQLITE_OK; } /* - ** Find the parent of the page to be repacked. - */ - pParent = pPage->pParent; - - /* - ** If there is no parent, it means the page is the root page. + ** Find the parent of the page to be balanceed. + ** If there is no parent, it means this page is the root page and ** special rules apply. */ + pParent = pPage->pParent; if( pParent==0 ){ Pgno pgnoChild; Page *pChild; if( pPage->nCell==0 ){ - if( ((PageHdr*)pPage)->rightChild ){ - /* The root page is under full. Copy the one child page + if( pPage->u.hdr.rightChild ){ + /* + ** The root page is empty. Copy the one child page ** into the root page and return. This reduces the depth ** of the BTree by one. */ - pgnoChild = ((PageHdr*)pPage->rightChild; + rc = sqlitepager_write(pPage); + if( rc ) return rc; + pgnoChild = pPage->u.hdr.rightChild; rc = sqlitepager_get(pBt, pgnoChild, &pChild); if( rc ) return rc; memcpy(pPage, pChild, SQLITE_PAGE_SIZE); @@ -1478,43 +1585,45 @@ static int repack(Btree *pBt, MemPage *pPage){ } return SQLITE_OK; } - if( pPage->nFree>=0 ){ + if( !pPage->isOverfull ){ /* It is OK for the root page to be less than half full. */ + relinkCellList(pPage); return SQLITE_OK; } - /* If we get to here, it means the root page is over full. + /* + ** If we get to here, it means the root page is overfull. ** When this happens, Create a new child page and copy the ** contents of the root into the child. Then make the root - ** page and empty page with rightChild pointing to the new + ** page an empty page with rightChild pointing to the new ** child. Then fall thru to the code below which will cause ** the overfull child page to be split. */ + rc = sqlitepager_write(pPage); + if( rc ) return rc; rc = allocatePage(pBt, &pChild, &pgnoChild); if( rc ) return rc; - memcpy(pChild, pPage, SQLITE_PAGE_SIZE); - for(i=0; i<pPage->nCell; i++){ - if( pPage->apCell[i]>(Cell*)pPage && pPage->apCell[i]<(Cell*)&pPage[1] ){ - int offset = (int)pPage->apCell[i] - (int)pPage; - pChild->apCell[i] = (Cell*)((int)pChild + offset); - }else{ - pChild->apCell[i] = pPage->apCell[i]; - } + copyPage(pChild, pPage); + pChild->pParent = pPage; + pChild->isOverfull = 1; + if( pCur ){ + sqlitepager_ref(pChild); + sqlitepager_unref(pCur->pPage); + pCur->pPage = pChild; } - pChild->isInit = 1; - pChild->nCell = pPage->nCell; - pChild->nFree = pPage->nFree; - /* reparentChildPages(pBt->pPager, pChild); */ zeroPage(pPage); - ((PageHdr*)pPage)->rightChild = pgnoChild; + pPage->u.hdr.rightChild = pgnoChild; pParent = pPage; pPage = pChild; - unrefPage = 1; + }else{ + rc = sqlitepager_write(pPage); + if( rc ) return rc; } - + /* - ** Find the Cell in the parent page that refers to the page - ** to be repacked. + ** Find the Cell in the parent page whose h.leftChild points back + ** to pPage. The "idx" variable is the index of that cell. If pPage + ** is the rightmost child of pParent then set idx to pParent->nCell */ idx = -1; pgno = sqlitepager_pagenumber(pPage); @@ -1524,453 +1633,197 @@ static int repack(Btree *pBt, MemPage *pPage){ break; } } - if( idx<0 && ((PageHdr*)pPage)->rightChild==pgno ){ + if( idx<0 && pPage->u.hdr.rightChild==pgno ){ idx = pPage->nCell; } if( idx<0 ){ - rc = SQLITE_CORRUPT; - goto end_of_repack; + return SQLITE_CORRUPT; } /* - ** Get sibling pages and their dividers + ** Initialize variables so that it will be safe to jump + ** directory to balance_cleanup at any moment. */ - if( idx==pPage->nCell ){ - idx -= 2; + nOld = nNew = 0; + sqlitepager_ref(pParent); + + /* + ** Find sibling pages to pPage and the Cells in pParent that divide + ** the siblings. An attempt is made to find one sibling on either + ** side of pPage. Both siblings are taken from one side, however, if + ** pPage is either the first or last child of its parent. If pParent + ** has 3 or fewer children then all children of pParent are taken. + */ + if( idx==pParent->nCell ){ + nxDiv = idx - 2; }else{ - idx--; + nxDiv = idx - 1; } - if( idx<0 ) idx = 0; + if( nxDiv<0 ) nxDiv = 0; nDiv = 0; - nOld = 0; - for(i=0; i<3; i++){ - if( i+idx<pParent->nCell ){ - idxDiv[i] = i+idx; - apDiv[i] = pParent->apCell[i+idx]; + for(i=0, k=nxDiv; i<3; i++, k++){ + if( k<pParent->nCell ){ + idxDiv[i] = k; + apDiv[i] = pParent->apCell[k]; nDiv++; pgnoOld[i] = apDiv[i]->h.leftChild; - rc = sqlitepager_get(pBt, pgnoOld[i], &apOld[i]); - if( rc ) goto end_of_repack; - nOld++; - } - if( i+idx==pParent->nCell ){ + }else if( k==pParent->nCell ){ pgnoOld[i] = pParent->rightChild; - rc = sqlitepager_get(pBt, pgnoOld[i], &apOld[i]); - if( rc ) goto end_of_repack; - nOld++; + }else{ + break; + } + rc = sqlitepager_get(pBt, pgnoOld[i], &apOld[i]); + if( rc ) goto balance_cleanup; + nOld++; + } + + /* + ** Set iCur to be the index in apCell[] of the cell that the cursor + ** is pointing to. We will need this later on in order to keep the + ** cursor pointing at the same cell. + */ + if( pCur ){ + iCur = pCur->idx; + for(i=0; idxDiv[i]<idx; i++){ + iCur += apOld[i]->nCell + 1; } + sqlitepager_unref(pCur->pPage); + pCur->pPage = 0; } /* - ** Get all cells + ** Make copies of the content of pPage and its siblings into aOld[]. + ** The rest of this function will use data from the copies rather + ** that the original pages since the original pages will be in the + ** process of being overwritten. + */ + for(i=0; i<nOld; i++){ + copyPage(&aOld[i], apOld[i]); + rc = freePage(pBt, apOld[i], pgnoOld[i]); + if( rc ) goto balance_cleanup; + apOld[i] = &aOld[i]; + } + + /* + ** Load pointers to all cells on sibling pages and the divider cells + ** into the local apCell[] array. Make copies of the divider cells + ** into aTemp[] and remove the the divider Cells from pParent. */ nCell = 0; for(i=0; i<nOld; i++){ MemPage *pOld = apOld[i]; for(j=0; j<pOld->nCell; j++){ - apCell[nCell++] = pOld->apCell[j]; + apCell[nCell] = pOld->apCell[j]; + szCell[nCell] = cellSize(apCell[nCell]); + nCell++; } if( i<nOld-1 ){ - apCell[nCell++] = apDiv[i]; + szCell[nCell] = cellSize(apDiv[i]); + memcpy(aTemp[i], apDiv[i], szCell[nCell]); + apCell[nCell] = &aTemp[i]; + dropCell(pParent, nxDiv, szCell[nCell]); + assert( apCell[nCell]->h.leftChild==pgnoOld[i] ); + apCell[nCell]->h.leftChild = pOld->u.hdr.rightChild; + nCell++; } } /* - ** Estimate the number of pages needed + ** Estimate the number of pages needed. Record this number in "k" + ** for now. It will get transferred to nNew as we allocate the + ** new pages. */ totalSize = 0; for(i=0; i<nCell; i++){ - totalSize += cellSize(apCell[i]); + totalSize += szCell[i]; } - nNew = (totalSize + (SQLITE_PAGE_SIZE - sizeof(PageHdr) - 1)) / + k = (totalSize + (SQLITE_PAGE_SIZE - sizeof(PageHdr) - 1)) / (SQLITE_PAGE_SIZE - sizeof(PageHdr)); - perPage = totalSize/nNew; + usedPerPage = (totalSize+k-1)/k; + freePerPage = SQLITE_PAGE_SIZE - usedPerPage; /* ** Allocate new pages */ - for(i=0; i<nNew; i++){ + for(i=0; i<k; i++){ rc = allocatePage(pBt, &apNew[i], &pgnoNew[i]); - if( rc ) goto end_of_repack; + if( rc ) goto balance_cleanup; + nNew++; zeroPage(apNew[i]); } /* - ** Copy data into the new pages + ** Evenly distribute the data in apCell[] across the new pages. + ** Insert divider cells into pParent as necessary. */ + j = 0; for(i=0; i<nNew; i++){ + MemPage *pNew = apNew[i]; + while( j<nCell && pNew->nFree<freePerPage && szCell[j]<=pNew->nFree ){ + if( pCur && iCur==j ){ pCur->pPage = pNew; pCur->idx = pNew->nCell; } + insertCell(pNew, pNew->nCell, apCell[j], szCell[j]); + j++; + } + assert( !pNew->isOverfull ); + relinkCellList(pNew); + if( i<nNew-1 && j<nCell ){ + pNew->u.hdr.rightChild = apCell[j]->h.leftChild; + apCell[j]->h.leftChild = pgnoNew[i]; + if( pCur && iCur==j ){ pCur->pPage = pParent; pCur->idx = nxDiv; } + insertCell(pParent, nxDiv, apCell[j], szCell[j]); + j++; + nxDiv++; + } } - - /* - ** Reparent children of all cells - */ - - /* - ** Release the old pages - */ - for(i=0; i<nOld; i++){ - releasePage(pBt, apOld[i], 0); - } - - /* - ** Repack the parent page, if necessary - */ - if( needToRepackParent ){ - return repack(pParent); - } - rc = SQLITE_OK; - -end_of_repack: - if( unrefPage ){ - sqlitepager_unref(pPage); - } - return rc; -} - -/* -** Attempt to move N or more bytes out of the page that the cursor -** points to into the left sibling page. (The left sibling page -** contains cells that are less than the cells on this page.) The -** entry that the cursor is pointing to cannot be moved. Return -** TRUE if successful and FALSE if not. -** -** Reasons for not being successful include: -** -** (1) there is no left sibling, -** (2) we could only move N-1 bytes or less, -** (3) some kind of file I/O error occurred -** -** Note that a partial rotation may have occurred even if this routine -** returns FALSE. Failure means we could not rotation a full N bytes. -** If it is possible to rotation some smaller number M, then the -** rotation occurs but we still return false. -** -** Example: Consider a segment of the Btree that looks like the -** figure below prior to rotation. The cursor is pointing to the -** entry *. The sort order of the entries is A B C D E * F Y. -** -** -** ------------------------- -** ... | C | Y | ... -** ------------------------- -** / \ -** --------- ----------------- -** | A | B | | D | E | * | F | -** --------- ----------------- -** -** After rotation of two cells (D and E), the same Btree segment -** looks like this: -** -** ------------------------- -** ... | E | Y | ... -** ------------------------- -** / \ -** ----------------- --------- -** | A | B | C | D | | * | F | -** ----------------- --------- -** -** The size of this rotation is the size by which the page containing -** the cursor was reduced. In this case, the size of D and E. -** -*/ -static int rotateLeft(BtCursor *pCur, int N){ - return 0; -} - -/* -** This routine is the same as rotateLeft() except that it move data -** to the right instead of to the left. See comments on the rotateLeft() -** routine for additional information. -*/ -static int rotateRight(BtCursor *pCur, int N){ - return 0; -} - -/* -** Append a cell onto the end of a page. -** -** The child page of the cell is reparented if pPager!=NULL. -*/ -static void appendCell( - Pager *pPager, /* The page cache. Needed for reparenting */ - Cell *pSrc, /* The Cell to be copied onto a new page */ - MemPage *pPage /* The page into which the cell is copied */ -){ - int pc; - int sz; - Cell *pDest; - - sz = cellSize(pSrc); - pc = allocateSpace(pPage, sz); - assert( pc>0 ){ - pDest = pPage->apCell[pPage->nCell] = &pPage->aDisk[pc]; - memcpy(pDest, pSrc, sz); - pDest->h.iNext = 0; - if( pPage->nCell>0 ){ - pPage->apCell[pPage->nCell-1]->h.iNext = pc; + apNew[nNew-1]->u.hdr.rightChild = apOld[nOld-1]->u.hdr.rightChild; + if( nxDiv==pParent->nCell ){ + pParent->u.hdr.rightChild = pgnoNew[nNew-1]; }else{ - ((PageHdr*)pPage)->firstCell = pc; + pParent->apCell[nxDiv]->h.leftChild = pgnoNew[nNew-1]; } - if( pPager && pDest->h.leftChild ){ - reparentPage(pPager, pDest->h.leftChild, pPage); + if( pCur ){ + assert( pCur->pPage!=0 ); + sqlitepager_ref(pCur->pPage); } -} - -/* -** Split a single database page into two roughly equal-sized pages. -** -** The input is an existing page and a new Cell. The Cell might contain -** a valid Cell.h.leftChild field pointing to a child page. -** -** The output is the Cell that divides the two new pages. The content -** of this divider Cell is written into *pCenter. pCenter->h.leftChild -** holds the page number of the new page that was created to hold the -** smaller of the cells from the divided page. The larger cells from -** the divided page are written to a newly allocated page and *ppOut -** is made to point to that page. Or if ppOut==NULL then the larger cells -** remain on pIn. -** -** Upon return, pCur should be pointing to the same cell, even if that -** cell has moved to a new page. The cell that pCur points to cannot -** be the pCenter cell. -*/ -static int split( - BtCursor *pCur, /* A cursor pointing at a cell on the page to be split */ - Cell *pNewCell, /* A new cell to add to pIn before dividing it up */ - Cell *pCenter, /* Write the cell that divides the two pages here */ - MemPage **ppOut /* If not NULL, put larger cells in new page at *ppOut */ -){ - MemPage *pLeft, *pRight; - Pgno pgnoLeft, pgnoRight; - PageHdr *pHdr; - int rc; - Pager *pPager = pCur->pBt->pPager; - MemPage tempPage; - /* Allocate pages to hold cells after the split and make pRight and - ** pLeft point to the newly allocated pages. + /* + ** Reparent children of all cells. */ - rc = allocatePage(pCur->pBt, &pLeft, &pgnoLeft); - if( rc ) return rc; - if( ppOut ){ - rc = allocatePage(pCur->pBt, &pRight, &pgnoRight); - if( rc ){ - freePage(pCur->pBt, pLeft, pgnoLeft); - return rc; - } - *ppOut = pRight; - }else{ - *ppOut = tempPage; + for(i=0; i<nNew; i++){ + reparentChildPages(pBt->pPager, apNew[i]); } + reparentChildPages(pBt->pPager, pParent); - /* Copy the smaller cells from the original page into the left page - ** of the split. - */ - zeroPage(pLeft); - if( pCur->idx==0 && pCur->match>0 ){ - appendCell(pPager, pNewCell, pLeft); - } - do{ - assert( i<pPage->nCell ); - appendCell(pPager, pPage->apCell[i++], pLeft); - if( pCur->idx==i && pCur->iMatch>0 ){ - appendCell(pPager, pNewCell, Left); - } - }while( pc < SQLITE_PAGE_SIZE/2 ); - - /* Copy the middle entry into *pCenter - */ - assert( i<pPage->nCell ); - memcpy(pCenter, pPage->aCell[i], cellSize(pPage->aCell[i])); - i++; - pHdr = (PageHdr*)pLeft; - pHdr->rightChild = pCenter->h.leftChild; - if( pHdr->rightChild ){ - reparentPage(pPager, pHdr->rightChild, pLeft); - } - pCenter->h.leftChild = pgnoLeft; - - /* Copy the larger cells from the original page into the right - ** page of the split + /* + ** balance the parent page. */ - zeroPage(pRight); - while( i<pPage->nCell ){ - appendCell(0, pPage->apCell[i++], pRight); - } + rc = balance(pBt, pParent, 0); - /* If ppOut==NULL then copy the temporary right page over top of - ** the original input page. + /* + ** Cleanup before returning. */ - if( ppOut==0 ){ - pRight->pParent = pPage->pParent; - pRight->isInit = 1; - memcpy(pPage, pRight, sizeof(*pPage)); - } - reparentChildPages(pPager, pPage); -} - -/* -** Unlink a cell from a database page. Add the space used by the cell -** back to the freelist for the database page on which the cell used to -** reside. -** -** This operation overwrites the cell header and content. -*/ -static void unlinkCell(BtCursor *pCur){ - MemPage *pPage; /* Page containing cell to be unlinked */ - int idx; /* The index of the cell to be unlinked */ - Cell *pCell; /* Pointer to the cell to be unlinked */ - u16 *piCell; /* iNext pointer from prior cell */ - int iCell; /* Index in pPage->aDisk[] of cell to be unlinked */ - int i; /* Loop counter */ - - pPage = pCur->pPage; - sqlitepager_write(pPage); - idx = pCur->idx; - pCell = pPage->apCell[idx]; - if( idx==0 ){ - piCell = &pPage->pHdr->firstCell; - }else{ - piCell = &pPage->apCell[idx-1]->h.iNext; +balance_cleanup: + for(i=0; i<nOld; i++){ + sqlitepager_unref(apOld[i]); } - iCell = *piCell; - *piCell = pCell->h.iNext; - freeSpace(pPage, iCell, cellSize(pCell)); - pPage->nCell--; - for(i=idx; i<pPage->nCell; i++){ - pPage->apCell[i] = pPage->apCell[i+1]; + for(i=0; i<nNew; i++){ + sqlitepager_unref(apNew[i]); } -} - -/* -** Add a Cell to a database page at the spot indicated by the cursor. -** -** With this routine, we know that the Cell pNewCell will fit into the -** database page that pCur points to. The calling routine has made -** sure it will fit. All this routine needs to do is add the Cell -** to the page. The addToPage() routine should be used for cases -** were it is not known if the new cell will fit. -** -** The new cell is added to the page either before or after the cell -** to which the cursor is pointing. The new cell is added before -** the cursor cell if pCur->iMatch>0 and the new cell is added after -** the cursor cell if pCur->iMatch<0. pCur->iMatch should have been set -** by a prior call to sqliteBtreeMoveto() where the key was the key -** of the cell being inserted. If sqliteBtreeMoveto() ended up on a -** cell that is larger than the key, then pCur->iMatch was set to a -** positive number, hence we insert the new record before the pointer -** if pCur->iMatch is positive. If sqliteBtreeMaveto() ended up on a -** cell that is smaller than the key then pCur->iMatch was set to a -** negative number, hence we insert the new record after then pointer -** if pCur->iMatch is negative. -*/ -static int insertCell(BtCursor *pCur, Cell *pNewCell){ - int sz; - int idx; - int i; - Cell *pCell, *pIdx; - MemPage *pPage; - - pPage = pCur->pPage; - sz = cellSize(pNewCell); - idx = allocateSpace(pPage, sz); - assert( idx>0 && idx<=SQLITE_PAGE_SIZE - sz ); - pCell = (Cell*)&pPage->aDisk[idx]; - memcpy(pCell, pNewCell, sz); - pIdx = pPage->aDisk[pCur->idx]; - if( pCur->iMatch<0 ){ - /* Insert the new cell after the cell pCur points to */ - pCell->h.iNext = pIdx->h.iNext; - pIdx->h.iNext = idx; - for(i=pPage->nCell-1; i>pCur->idx; i--){ - pPage->apCell[i+1] = pPage->apCell[i]; - } - pPage->apCell[pCur->idx+1] = pCell; + if( pCur && pCur->pPage==0 ){ + pCur->pPage = pParent; + pCur->idx = 0; }else{ - /* Insert the new cell before the cell pCur points to */ - pCell->h.iNext = pPage->pHdr->firstCell; - pPage->pHdr->firstCell = idx; - for(i=pPage->nCell; i>0; i++){ - pPage->apCell[i] = pPage->apCell[i-1]; - } - pPage->apCell[0] = pCell; - } - pPage->nCell++; - if( pCell->h.leftChild ){ - MemPage *pChild = sqlitepager_lookup(pCur->pBt, pCell->h.leftChild); - if( pChild && pChild->pParent ){ - sqlitepager_unref(pChild->pParent); - pChild->pParent = pPage; - sqlitepager_ref(pChild->pParent); - } - } - return SQLITE_OK; -} - -/* -** Insert pNewCell into the database page that pCur is pointing to at -** the place where pCur is pointing. -** -** This routine works just like insertCell() except that the cell -** to be inserted need not fit on the page. If the new cell does -** not fit, then the page sheds data to its siblings to try to get -** down to a size where the new cell will fit. If that effort fails, -** then the page is split. -*/ -static int addToPage(BtCursor *pCur, Cell *pNewCell){ - Cell tempCell; - Cell centerCell; - - for(;;){ - MemPage *pPage = pCur->pPage; - rc = sqlitepager_write(pPage); - if( rc ) return rc; - int sz = cellSize(pNewCell); - if( sz<=pPage->nFree ){ - insertCell(pCur, pNewCell); - return SQLITE_OK; - } - if( pPage->pParent==0 ){ - MemPage *pRight; - PageHdr *pHdr; - FreeBlk *pFBlk; - int pc; - rc = split(pCur, pNewCell, ¢erCell, &pRight); - if( rc ) return rc; - pHdr = pPage->pHdr; - pHdr->right = sqlitepager_pagenumber(pRight); - sqlitepager_unref(pRight); - pHdr->firstCell = pc = sizeof(*pHdr); - sz = cellSize(¢erCell); - memcpy(&pPage->aDisk[pc], ¢erCell, sz); - pc += sz; - pHdr->firstFree = pc; - pFBlk = (FreeBlk*)&pPage->aDisk[pc]; - pFBlk->iSize = SQLITE_PAGE_SIZE - pc; - pFBlk->iNext = 0; - memset(&pFBlk[1], 0, pFBlk->iSize-sizeof(*pFBlk)); - return SQLITE_OK; - } - if( rotateLeft(pCur, sz - pPage->nFree) - || rotateRight(pCur, sz - pPage->nFree) ){ - insertCell(pCur, pNewCell); - return SQLITE_OK; - } - rc = split(pCur, pNewCell, ¢erCell, 0); - if( rc ) return rc; - moveToParent(pCur); - tempCell = centerCell; - pNewPage = &tempCell; + sqlitepager_unref(pParent); } - /* NOT REACHED */ + return rc; } /* ** Insert a new record into the BTree. The key is given by (pKey,nKey) ** and the data is given by (pData,nData). The cursor is used only to ** define what database the record should be inserted into. The cursor -** is NOT left pointing at the new record. +** is left pointing at the new record. */ int sqliteBtreeInsert( BtCursor *pCur, /* Insert data into the table of this cursor */ @@ -1980,96 +1833,34 @@ int sqliteBtreeInsert( Cell newCell; int rc; int loc; + int szNew; MemPage *pPage; Btree *pBt = pCur->pBt; if( !pCur->pBt->inTrans ){ return SQLITE_ERROR; /* Must start a transaction first */ } - rc = sqliteBtreeMoveTo(pCur, pKey, nKey, &loc); + rc = sqliteBtreeMoveto(pCur, pKey, nKey, &loc); if( rc ) return rc; - rc = sqlitepager_write(pCur->pPage); + pPage = pCur->pPage; + rc = sqlitepager_write(pPage); if( rc ) return rc; rc = fillInCell(pBt, &newCell, pKey, nKey, pData, nData); if( rc ) return rc; + szNew = cellSize(&newCell); if( loc==0 ){ - newCell.h.leftChild = pCur->pPage->apCell[pCur->idx]->h.leftChild; - rc = clearCell(pBt, pCur->pPage->apCell[pCur->idx]); + newCell.h.leftChild = pPage->apCell[pCur->idx]->h.leftChild; + rc = clearCell(pBt, pPage->apCell[pCur->idx]); if( rc ) return rc; - unlinkCell(pCur); - } - return addToPage(pCur, &newCell); -} - -/* -** Check the page at which the cursor points to see if it is less than -** half full. If it is less than half full, then try to increase -** its fill factor by grabbing cells from siblings or by merging -** the page with siblings. -*/ -static int refillPage(BtCursor *pCur){ - MemPage *pPage; - BtCursor tempCur; - int rc; - Pager *pPager; - - pPage = pCur->pPage; - if( pPage->nFree < SQLITE_PAGE_SIZE/2 ){ - return SQLITE_OK; - } - rc = sqlitepager_write(pPage); - if( rc ) return rc; - pPager = pCur->pBt->pPager; - - if( pPage->nCell==0 ){ - /* The page being refilled is the root of the BTree and it has - ** no entries of its own. If there is a child page, then make the - ** child become the new root. - */ - MemPage *pChild; - Pgno pgnoChild; - assert( pPage->pParent==0 ); - assert( sqlitepager_pagenumber(pPage)==pCur->pgnoRoot ); - pgnoChild = ((PageHdr*)pPage)->rightChild; - if( pgnoChild==0 ){ - return SQLITE_OK; - } - rc = sqlitepager_get(pPager, pgno, &pChild); - if( rc ) return rc; - memcpy(pPage, pChild, SQLITE_PAGE_SIZE); - memset(&pPage->aDisk[SQLITE_PAGE_SIZE], 0, EXTRA_SIZE); - freePage(pCur->pBt, pChild, pgnoChild); - sqlitepager_unref(pChild); - rc = initPage(pPage, pCur->pgnoRoot, 0); - reparentChildPages(pPager, pPage); - return SQLITE_OK; + dropCell(pPage, pCur->idx, cellSize(pPage->apCell[pCur->idx])); + }else if( loc>0 ){ + assert( pPage->u.hdr.rightChild==0 ); /* Must be a leaf page */ + pCur->idx++; + }else{ + assert( pPage->u.hdr.rightChild==0 ); /* Must be a leaf page */ } - - /** merge with siblings **/ - - /** borrow from siblings **/ -} - -/* -** Replace the content of the cell that pCur is pointing to with the content -** in pNewContent. The pCur cell is not unlinked or moved in the Btree, -** its content is just replaced. -** -** If the size of pNewContent is greater than the current size of the -** cursor cell then the page that cursor points to might have to split. -*/ -static int ReplaceContentOfCell(BtCursor *pCur, Cell *pNewContent){ - Cell *pCell; /* The cell whose content will be changed */ - Pgno pgno; /* Temporary storage for a page number */ - - pCell = pCur->pPage->apCell[pCur->idx]; - rc = clearCell(pCur->pBt, pCell); - if( rc ) return rc; - pgno = pNewCell->h.leftChild; - pNewCell->h.leftChild = pCell->h.leftChild; - unlinkCell(pCur); - rc = addToPage(pCur, pNewCell); - pNewCell->h.leftChild = pgno; + insertCell(pPage, pCur->idx, &newCell, cellSize(&newCell)); + rc = balance(pCur->pBt, pPage, pCur); return rc; } @@ -2097,34 +1888,37 @@ int sqliteBtreeDelete(BtCursor *pCur){ rc = sqlitepager_write(pPage); if( rc ) return rc; pCell = pPage->apCell[pCur->idx]; - if( pPage->pHdr->rightChild ){ - /* The entry to be deleted is not on a leaf page. Non-leaf entries - ** cannot be deleted directly because they have to be present to - ** hold pointers to subpages. So what we do is look at the next - ** entry in sequence. The next entry is guaranteed to exist and - ** be a leaf. We copy the payload from the next entry into this - ** entry, then delete the next entry. + pgnoChild = pCell->h.leftChild; + clearCell(pCell); + dropCell(pPage, pCur->idx, cellSize(pCell)); + if( pgnoChild ){ + /* + ** If the entry we just deleted is not a leaf, then we've left a + ** whole in an internal page. We have to fill the whole by moving + ** in a page from a leaf. The next Cell after the one just deleted + ** is guaranteed to exist and to be a leaf so we can use it. */ - BtCursor origCur; - CreateTemporaryCursor(pCur, &origCur); - rc = sqliteBtreeNext(pCur, 0); - if( rc==SQLITE_OK ){ - pPage = pCur->pPage; - pCell = pPage->apCell[pCur->idx]; - rc = ReplaceContentOfCell(&origCur, pCell); + BtCursor leafCur; + Cell *pNext; + int szNext; + getTempCursor(pCur, &leafCur); + rc = sqliteBtreeNext(&leafCur, 0); + if( rc!=SQLITE_OK ){ + return SQLITE_CORRUPT; } - DestroyTemporaryCursor(&origCur); + pNext = leafCur.pPage->apCell[leafCur.idx] + szNext = cellSize(pNext); + insertCell(pPage, pCur->idx, pNext, szNext); + rc = balance(pCur->pBt, pPage, pCur); if( rc ) return rc; - } - rc = clearCell(pCell); - if( rc ) return rc; - unlinkCell(pCur->pBt, pCell); - if( pCur->idx == 0 ){ pCur->bSkipNext = 1; + dropCell(leafCur.pPage, leafCur.idx, szNext); + rc = balance(pCur->pBt, leafCur.pPage, 0); + releaseTempCur(&leafCur); }else{ - pCur->idx--; + rc = balance(pCur->pBt, pPage, pCur); + pCur->bSkipNext = 1; } - rc = refillPage(pCur); return rc; } @@ -2161,9 +1955,9 @@ static int clearDatabasePage(Btree *pBt, Pgno pgno){ rc = sqlitepager_get(pBt->pPager, pgno, &pPage); if( rc ) return rc; - idx = ((PageHdr*)pPage)->firstCell; + idx = pPage->u.hdr.firstCell; while( idx>0 ){ - pCell = (Cell*)&pPage->aDisk[idx]; + pCell = (Cell*)&pPage->u.aDisk[idx]; idx = pCell->h.iNext; if( pCell->h.leftChild ){ rc = clearDatabasePage(pBt, pCell->h.leftChild); |