aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordanielk1977 <danielk1977@noemail.net>2007-05-02 13:16:30 +0000
committerdanielk1977 <danielk1977@noemail.net>2007-05-02 13:16:30 +0000
commitd04417963fddd8b027e43d8d6b24f9498223606d (patch)
tree177a2d33432fe6e33c6f559d87f9d41056237dcf /src
parent8cff382e7db1fe07e5a2975a24687084b9bc8eb4 (diff)
downloadsqlite-d04417963fddd8b027e43d8d6b24f9498223606d.tar.gz
sqlite-d04417963fddd8b027e43d8d6b24f9498223606d.zip
Use the pointer-map pages to make the incremental blob API more efficient. (CVS 3896)
FossilOrigin-Name: 93a3bf71d576096f4b5a3db256ca6f9b5521d137
Diffstat (limited to 'src')
-rw-r--r--src/btree.c295
-rw-r--r--src/tclsqlite.c66
2 files changed, 256 insertions, 105 deletions
diff --git a/src/btree.c b/src/btree.c
index 2ad735805..484b52879 100644
--- a/src/btree.c
+++ b/src/btree.c
@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
-** $Id: btree.c,v 1.364 2007/05/02 01:34:31 drh Exp $
+** $Id: btree.c,v 1.365 2007/05/02 13:16:30 danielk1977 Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
@@ -3025,6 +3025,91 @@ int sqlite3BtreeDataSize(BtCursor *pCur, u32 *pSize){
}
/*
+** Given the page number of an overflow page in the database (parameter
+** ovfl), this function finds the page number of the next page in the
+** linked list of overflow pages. If possible, it uses the auto-vacuum
+** pointer-map data instead of reading the content of page ovfl to do so.
+**
+** If an error occurs an SQLite error code is returned. Otherwise:
+**
+** Unless pPgnoNext is NULL, the page number of the next overflow
+** page in the linked list is written to *pPgnoNext. If page ovfl
+** is the last page in it's linked list, *pPgnoNext is set to zero.
+**
+** If ppPage is not NULL, *ppPage is set to the MemPage* handle
+** for page ovfl. The underlying pager page may have been requested
+** with the noContent flag set, so the page data accessable via
+** this handle may not be trusted.
+*/
+static int getOverflowPage(
+ BtShared *pBt,
+ Pgno ovfl, /* Overflow page */
+ MemPage **ppPage, /* OUT: MemPage handle */
+ Pgno *pPgnoNext /* OUT: Next overflow page number */
+){
+ Pgno next = 0;
+ int rc;
+
+ /* One of these must not be NULL. Otherwise, why call this function? */
+ assert(ppPage || pPgnoNext);
+
+ /* If pPgnoNext is NULL, then this function is being called to obtain
+ ** a MemPage* reference only. No page-data is required in this case.
+ */
+ if( !pPgnoNext ){
+ return getPage(pBt, ovfl, ppPage, 1);
+ }
+
+#ifndef SQLITE_OMIT_AUTOVACUUM
+ /* Try to find the next page in the overflow list using the
+ ** autovacuum pointer-map pages. Guess that the next page in
+ ** the overflow list is page number (ovfl+1). If that guess turns
+ ** out to be wrong, fall back to loading the data of page
+ ** number ovfl to determine the next page number.
+ */
+ if( pBt->autoVacuum ){
+ Pgno pgno;
+ Pgno iGuess = ovfl+1;
+ u8 eType;
+
+ while( PTRMAP_ISPAGE(pBt, iGuess) || iGuess==PENDING_BYTE_PAGE(pBt) ){
+ iGuess++;
+ }
+
+ if( iGuess<sqlite3PagerPagecount(pBt->pPager) ){
+ rc = ptrmapGet(pBt, iGuess, &eType, &pgno);
+ if( rc!=SQLITE_OK ){
+ return rc;
+ }
+ if( eType==PTRMAP_OVERFLOW2 && pgno==ovfl ){
+ next = iGuess;
+ }
+ }
+ }
+#endif
+
+ if( next==0 || ppPage ){
+ MemPage *pPage = 0;
+
+ rc = getPage(pBt, ovfl, &pPage, next!=0);
+ assert(rc==SQLITE_OK || pPage==0);
+ if( next==0 && rc==SQLITE_OK ){
+ next = get4byte(pPage->aData);
+ }
+
+ if( ppPage ){
+ *ppPage = pPage;
+ }else{
+ releasePage(pPage);
+ }
+ }
+ *pPgnoNext = next;
+
+ return rc;
+}
+
+
+/*
** Read payload information from the entry that the pCur cursor is
** pointing to. Begin reading the payload at "offset" and read
** a total of "amt" bytes. Put the result in zBuf.
@@ -3086,15 +3171,28 @@ static int getPayload(
if( amt>0 ){
nextPage = get4byte(&aPayload[pCur->info.nLocal]);
while( amt>0 && nextPage ){
- DbPage *pDbPage;
- rc = sqlite3PagerGet(pBt->pPager, nextPage, &pDbPage);
- if( rc!=0 ){
- return rc;
- }
- aPayload = sqlite3PagerGetData(pDbPage);
- nextPage = get4byte(aPayload);
- if( offset<ovflSize ){
+ if( offset>=ovflSize ){
+ /* The only reason to read this page is to obtain the page
+ ** number for the next page in the overflow chain. So try
+ ** the getOverflowPage() shortcut.
+ */
+ rc = getOverflowPage(pBt, nextPage, 0, &nextPage);
+ if( rc!=SQLITE_OK ){
+ return rc;
+ }
+ offset -= ovflSize;
+ }else{
+ /* Need to read this page properly, to obtain data to copy into
+ ** the caller's buffer.
+ */
+ DbPage *pDbPage;
int a = amt;
+ rc = sqlite3PagerGet(pBt->pPager, nextPage, &pDbPage);
+ if( rc!=0 ){
+ return rc;
+ }
+ aPayload = sqlite3PagerGetData(pDbPage);
+ nextPage = get4byte(aPayload);
if( a + offset > ovflSize ){
a = ovflSize - offset;
}
@@ -3102,10 +3200,8 @@ static int getPayload(
offset = 0;
amt -= a;
pBuf += a;
- }else{
- offset -= ovflSize;
+ sqlite3PagerUnref(pDbPage);
}
- sqlite3PagerUnref(pDbPage);
}
}
@@ -4047,60 +4143,6 @@ static int freePage(MemPage *pPage){
}
/*
-** Get a MemPage structure for overflow page number ovfl. If it
-** is not zero, the page number of the overflow page following the
-** one retrieved is written to *pPgnoNext.
-**
-** If it is possible to figure out the page-number of the next
-** overflow page without reading the data of page ovfl, then
-** sqlite3PagerAcquire() is passed the "noContent" flag when
-** page ovfl is retrieved.
-*/
-static int getOverflowPage(
- BtShared *pBt,
- Pgno ovfl,
- MemPage **ppPage,
- Pgno *pPgnoNext
-){
- Pgno next = 0;
- int rc;
-
- if( !pPgnoNext ){
- return getPage(pBt, ovfl, ppPage, 1);
- }
-
-#ifndef SQLITE_OMIT_AUTOVACUUM
- if( pBt->autoVacuum ){
- Pgno pgno;
- Pgno iGuess = ovfl+1;
- u8 eType;
-
- while( PTRMAP_ISPAGE(pBt, iGuess) || iGuess==PENDING_BYTE_PAGE(pBt) ){
- iGuess++;
- }
-
- if( iGuess<sqlite3PagerPagecount(pBt->pPager) ){
- rc = ptrmapGet(pBt, iGuess, &eType, &pgno);
- if( rc!=SQLITE_OK ){
- return rc;
- }
- if( eType==PTRMAP_OVERFLOW2 && pgno==ovfl ){
- next = iGuess;
- }
- }
- }
-#endif
-
- rc = getPage(pBt, ovfl, ppPage, 0);
- if( rc==SQLITE_OK ){
- assert(next==0 || next==get4byte((*ppPage)->aData));
- next = get4byte((*ppPage)->aData);
- }
- *pPgnoNext = next;
- return rc;
-}
-
-/*
** Free any overflow pages associated with the given Cell.
*/
static int clearCell(MemPage *pPage, unsigned char *pCell){
@@ -6825,49 +6867,108 @@ int sqlite3BtreeLockTable(Btree *p, int iTab, u8 isWriteLock){
** to change the length of the data stored.
*/
int sqlite3BtreePutData(BtCursor *pCsr, u32 offset, u32 amt, const void *z){
- /* TODO: The following is only a stop-gap implementation. It needs
- ** to be made efficient using the optimistic overflow page trick.
- ** Similar changes need to be made to sqlite3BtreeData().
- */
- i64 iKey;
+ BtShared *pBt = pCsr->pBtree->pBt;
int rc;
- int nCopy;
- u32 nData;
- char *zData;
+ u32 iRem = amt; /* Remaining bytes to write */
+ u8 *zRem = (u8 *)z; /* Pointer to data not yet written */
+ u32 iOffset = offset; /* Offset from traversal point to start of write */
- rc = sqlite3BtreeKeySize(pCsr, &iKey);
- if( rc!=SQLITE_OK ){
- return rc;
- }
+ Pgno iOvfl; /* Page number for next overflow page */
+ int ovflSize; /* Bytes of data per overflow page. */
- rc = sqlite3BtreeDataSize(pCsr, &nData);
- if( rc!=SQLITE_OK ){
- return rc;
- }
+ CellInfo *pInfo;
- zData = sqliteMalloc(nData);
- if( !zData ){
- return SQLITE_NOMEM;
+ /* Check some preconditions:
+ ** (a) a write-transaction is open,
+ ** (b) the cursor is open for writing,
+ ** (c) there is no read-lock on the table being modified and
+ ** (d) the cursor points at a valid row of an intKey table.
+ */
+ if( pBt->inTransaction!=TRANS_WRITE ){
+ /* Must start a transaction before doing an insert */
+ return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
+ }
+ assert( !pBt->readOnly );
+ if( !pCsr->wrFlag ){
+ return SQLITE_PERM; /* Cursor not open for writing */
+ }
+ if( checkReadLocks(pCsr->pBtree, pCsr->pgnoRoot, pCsr) ){
+ return SQLITE_LOCKED; /* The table pCur points to has a read lock */
+ }
+ if( pCsr->eState==CURSOR_INVALID || !pCsr->pPage->intKey ){
+ return SQLITE_ERROR;
}
- rc = sqlite3BtreeData(pCsr, 0, nData, (void *)zData);
- if( rc!=SQLITE_OK ){
- sqliteFree(zData);
- return rc;
+ /* Parse the cell-info. Check that the cell-data area is large
+ ** enough for the proposed write operation.
+ */
+ getCellInfo(pCsr);
+ pInfo = &pCsr->info;
+ if( pInfo->nData<(offset+amt) ){
+ return SQLITE_ERROR;
}
- nCopy = amt;
- if( nCopy>(nData-offset) ){
- nCopy = nData-offset;
+ if( pInfo->nLocal>iOffset ){
+ /* In this case data must be written to the b-tree page. */
+ int iWrite = pInfo->nLocal - offset;
+ if( iWrite>iRem ){
+ iWrite = iRem;
+ }
+ rc = sqlite3PagerWrite(pCsr->pPage->pDbPage);
+ if( rc!=SQLITE_OK ){
+ return rc;
+ }
+ memcpy(&pInfo->pCell[iOffset+pInfo->nHeader], zRem, iWrite);
+
+ zRem += iWrite;
+ iRem -= iWrite;
}
- if( nCopy>0 ){
- memcpy(&zData[offset], z, amt);
- rc = sqlite3BtreeInsert(pCsr, 0, iKey, zData, nData, 0, 0);
+ iOffset = ((iOffset<pInfo->nLocal)?0:(iOffset-pInfo->nLocal));
+
+ ovflSize = pBt->usableSize - 4;
+ assert(pInfo->iOverflow>0 || iRem==0);
+ iOvfl = get4byte(&pInfo->pCell[pInfo->iOverflow]);
+ while( iRem>0 ){
+ if( iOffset>ovflSize ){
+ /* The only reason to read this page is to obtain the page
+ ** number for the next page in the overflow chain. So try
+ ** the getOverflowPage() shortcut. */
+ rc = getOverflowPage(pBt, iOvfl, 0, &iOvfl);
+ if( rc!=SQLITE_OK ){
+ return rc;
+ }
+ iOffset -= ovflSize;
+ }else{
+ int iWrite = ovflSize - iOffset;
+ DbPage *pOvfl; /* The overflow page. */
+ u8 *aData; /* Page data */
+
+ rc = sqlite3PagerGet(pBt->pPager, iOvfl, &pOvfl);
+ if( rc!=SQLITE_OK ){
+ return rc;
+ }
+ rc = sqlite3PagerWrite(pOvfl);
+ if( rc!=SQLITE_OK ){
+ sqlite3PagerUnref(pOvfl);
+ return rc;
+ }
+
+ aData = sqlite3PagerGetData(pOvfl);
+ iOvfl = get4byte(aData);
+ if( iWrite>iRem ){
+ iWrite = iRem;
+ }
+ memcpy(&aData[iOffset+4], zRem, iWrite);
+ sqlite3PagerUnref(pOvfl);
+
+ zRem += iWrite;
+ iRem -= iWrite;
+ iOffset = ((iOffset<ovflSize)?0:(iOffset-ovflSize));
+ }
}
- sqliteFree(zData);
- return rc;
+ return SQLITE_OK;
}
#endif
diff --git a/src/tclsqlite.c b/src/tclsqlite.c
index 936406183..70481a75a 100644
--- a/src/tclsqlite.c
+++ b/src/tclsqlite.c
@@ -12,7 +12,7 @@
** A TCL Interface to SQLite. Append this file to sqlite3.c and
** compile the whole thing to build a TCL-enabled version of SQLite.
**
-** $Id: tclsqlite.c,v 1.180 2007/05/01 17:49:49 danielk1977 Exp $
+** $Id: tclsqlite.c,v 1.181 2007/05/02 13:16:31 danielk1977 Exp $
*/
#include "tcl.h"
#include <errno.h>
@@ -89,6 +89,8 @@ struct SqlPreparedStmt {
char zSql[1]; /* Text of the SQL statement */
};
+typedef struct IncrblobChannel IncrblobChannel;
+
/*
** There is one instance of this structure for each SQLite database
** that has been opened by the SQLite TCL interface.
@@ -114,20 +116,56 @@ struct SqliteDb {
SqlPreparedStmt *stmtLast; /* Last statement in the list */
int maxStmt; /* The next maximum number of stmtList */
int nStmt; /* Number of statements in stmtList */
+ IncrblobChannel *pIncrblob;/* Linked list of open incrblob channels */
};
-typedef struct IncrblobChannel IncrblobChannel;
struct IncrblobChannel {
- sqlite3_blob *pBlob;
- int iSeek; /* Current seek offset */
+ SqliteDb *pDb; /* Associated database connection */
+ sqlite3_blob *pBlob; /* sqlite3 blob handle */
+ int iSeek; /* Current seek offset */
+
+ Tcl_Channel channel; /* Channel identifier */
+ IncrblobChannel *pNext; /* Linked list of all open incrblob channels */
+ IncrblobChannel *pPrev; /* Linked list of all open incrblob channels */
};
/*
+** Close all incrblob channels opened using database connection pDb.
+** This is called when shutting down the database connection.
+*/
+static void closeIncrblobChannels(SqliteDb *pDb){
+ IncrblobChannel *p;
+ IncrblobChannel *pNext;
+
+ for(p=pDb->pIncrblob; p; p=pNext){
+ pNext = p->pNext;
+
+ /* Note: Calling unregister here call Tcl_Close on the incrblob channel,
+ ** which deletes the IncrblobChannel structure at *p. So do not
+ ** call Tcl_Free() here.
+ */
+ Tcl_UnregisterChannel(pDb->interp, p->channel);
+ }
+}
+
+/*
** Close an incremental blob channel.
*/
static int incrblobClose(ClientData instanceData, Tcl_Interp *interp){
IncrblobChannel *p = (IncrblobChannel *)instanceData;
sqlite3_blob_close(p->pBlob);
+
+ /* Remove the channel from the SqliteDb.pIncrblob list. */
+ if( p->pNext ){
+ p->pNext->pPrev = p->pPrev;
+ }
+ if( p->pPrev ){
+ p->pPrev->pNext = p->pNext;
+ }
+ if( p->pDb->pIncrblob==p ){
+ p->pDb->pIncrblob = p->pNext;
+ }
+
Tcl_Free((char *)p);
return TCL_OK;
}
@@ -164,6 +202,9 @@ static int incrblobInput(
return nRead;
}
+/*
+** Write data to an incremental blob channel.
+*/
static int incrblobOutput(
ClientData instanceData,
CONST char *buf,
@@ -263,7 +304,6 @@ static int createIncrblobChannel(
IncrblobChannel *p;
sqlite3_blob *pBlob;
int rc;
- Tcl_Channel channel;
int flags = TCL_READABLE|TCL_WRITABLE;
/* This variable is used to name the channels: "incrblob_[incr count]" */
@@ -281,10 +321,19 @@ static int createIncrblobChannel(
p->pBlob = pBlob;
sprintf(zChannel, "incrblob_%d", ++count);
- channel = Tcl_CreateChannel(&IncrblobChannelType, zChannel, p, flags);
- Tcl_RegisterChannel(interp, channel);
+ p->channel = Tcl_CreateChannel(&IncrblobChannelType, zChannel, p, flags);
+ Tcl_RegisterChannel(interp, p->channel);
+
+ /* Link the new channel into the SqliteDb.pIncrblob list. */
+ p->pNext = pDb->pIncrblob;
+ p->pPrev = 0;
+ if( p->pNext ){
+ p->pNext->pPrev = p;
+ }
+ pDb->pIncrblob = p;
+ p->pDb = pDb;
- Tcl_SetResult(interp, (char *)Tcl_GetChannelName(channel), TCL_VOLATILE);
+ Tcl_SetResult(interp, (char *)Tcl_GetChannelName(p->channel), TCL_VOLATILE);
return TCL_OK;
}
@@ -363,6 +412,7 @@ static void flushStmtCache( SqliteDb *pDb ){
static void DbDeleteCmd(void *db){
SqliteDb *pDb = (SqliteDb*)db;
flushStmtCache(pDb);
+ closeIncrblobChannels(pDb);
sqlite3_close(pDb->db);
while( pDb->pFunc ){
SqlFunc *pFunc = pDb->pFunc;