aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/btree.c9
-rw-r--r--src/build.c4
-rw-r--r--src/global.c1
-rw-r--r--src/main.c17
-rw-r--r--src/mutex.c2
-rw-r--r--src/mutex_noop.c2
-rw-r--r--src/mutex_unix.c8
-rw-r--r--src/mutex_w32.c10
-rw-r--r--src/os_win.c11
-rw-r--r--src/select.c146
-rw-r--r--src/shell.c3
-rw-r--r--src/sqlite.h.in23
-rw-r--r--src/sqliteInt.h40
-rw-r--r--src/test1.c164
-rw-r--r--src/test_config.c4
-rw-r--r--src/test_malloc.c27
-rw-r--r--src/threads.c235
-rw-r--r--src/vdbe.c33
-rw-r--r--src/vdbeInt.h6
-rw-r--r--src/vdbeaux.c61
-rw-r--r--src/vdbesort.c2366
21 files changed, 2554 insertions, 618 deletions
diff --git a/src/btree.c b/src/btree.c
index 9032a9304..a5531f59e 100644
--- a/src/btree.c
+++ b/src/btree.c
@@ -4603,7 +4603,7 @@ int sqlite3BtreeMovetoUnpacked(
if( pIdxKey ){
xRecordCompare = sqlite3VdbeFindCompare(pIdxKey);
- pIdxKey->isCorrupt = 0;
+ pIdxKey->errCode = 0;
assert( pIdxKey->default_rc==1
|| pIdxKey->default_rc==0
|| pIdxKey->default_rc==-1
@@ -4727,7 +4727,10 @@ int sqlite3BtreeMovetoUnpacked(
c = xRecordCompare(nCell, pCellKey, pIdxKey, 0);
sqlite3_free(pCellKey);
}
- assert( pIdxKey->isCorrupt==0 || c==0 );
+ assert(
+ (pIdxKey->errCode!=SQLITE_CORRUPT || c==0)
+ && (pIdxKey->errCode!=SQLITE_NOMEM || pCur->pBtree->db->mallocFailed)
+ );
if( c<0 ){
lwr = idx+1;
}else if( c>0 ){
@@ -4737,7 +4740,7 @@ int sqlite3BtreeMovetoUnpacked(
*pRes = 0;
rc = SQLITE_OK;
pCur->aiIdx[pCur->iPage] = (u16)idx;
- if( pIdxKey->isCorrupt ) rc = SQLITE_CORRUPT;
+ if( pIdxKey->errCode ) rc = SQLITE_CORRUPT;
goto moveto_finish;
}
if( lwr>upr ) break;
diff --git a/src/build.c b/src/build.c
index 27c9671da..3113c35c8 100644
--- a/src/build.c
+++ b/src/build.c
@@ -2686,7 +2686,7 @@ static void sqlite3RefillIndex(Parse *pParse, Index *pIndex, int memRootPage){
/* Open the sorter cursor if we are to use one. */
iSorter = pParse->nTab++;
- sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, 0, (char*)
+ sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nKeyCol, (char*)
sqlite3KeyInfoRef(pKey), P4_KEYINFO);
/* Open the table. Loop through all rows of the table, inserting index
@@ -3035,7 +3035,7 @@ Index *sqlite3CreateIndex(
pParse->checkSchema = 1;
goto exit_create_index;
}
- assert( pTab->nCol<=0x7fff && j<=0x7fff );
+ assert( j<=0x7fff );
pIndex->aiColumn[i] = (i16)j;
if( pListItem->pExpr ){
int nColl;
diff --git a/src/global.c b/src/global.c
index 2c14b58ab..f152c3b5d 100644
--- a/src/global.c
+++ b/src/global.c
@@ -167,6 +167,7 @@ SQLITE_WSD struct Sqlite3Config sqlite3Config = {
0, /* nPage */
0, /* mxParserStack */
0, /* sharedCacheEnabled */
+ SQLITE_DEFAULT_WORKER_THREADS, /* nWorker */
/* All the rest should always be initialized to zero */
0, /* isInit */
0, /* inProgress */
diff --git a/src/main.c b/src/main.c
index 904e0a4fc..639491778 100644
--- a/src/main.c
+++ b/src/main.c
@@ -515,6 +515,15 @@ int sqlite3_config(int op, ...){
}
#endif
+ case SQLITE_CONFIG_WORKER_THREADS: {
+#if SQLITE_MAX_WORKER_THREADS>0
+ int n = va_arg(ap, int);
+ if( n>SQLITE_MAX_WORKER_THREADS ) n = SQLITE_MAX_WORKER_THREADS;
+ if( n>=0 ) sqlite3GlobalConfig.nWorker = n;
+#endif
+ break;
+ }
+
default: {
rc = SQLITE_ERROR;
break;
@@ -2497,6 +2506,7 @@ static int openDatabase(
db->nextAutovac = -1;
db->szMmap = sqlite3GlobalConfig.szMmap;
db->nextPagesize = 0;
+ db->nMaxSorterMmap = 0x7FFFFFFF;
db->flags |= SQLITE_ShortColNames | SQLITE_EnableTrigger | SQLITE_CacheSpill
#if !defined(SQLITE_DEFAULT_AUTOMATIC_INDEX) || SQLITE_DEFAULT_AUTOMATIC_INDEX
| SQLITE_AutoIndex
@@ -3361,6 +3371,13 @@ int sqlite3_test_control(int op, ...){
break;
}
+ /* sqlite3_test_control(SQLITE_TESTCTRL_SORTER_MMAP, db, nMax); */
+ case SQLITE_TESTCTRL_SORTER_MMAP: {
+ sqlite3 *db = va_arg(ap, sqlite3*);
+ db->nMaxSorterMmap = va_arg(ap, int);
+ break;
+ }
+
}
va_end(ap);
#endif /* SQLITE_OMIT_BUILTIN_TEST */
diff --git a/src/mutex.c b/src/mutex.c
index b567e7c27..bad5a7c11 100644
--- a/src/mutex.c
+++ b/src/mutex.c
@@ -81,7 +81,7 @@ int sqlite3MutexEnd(void){
*/
sqlite3_mutex *sqlite3_mutex_alloc(int id){
#ifndef SQLITE_OMIT_AUTOINIT
- if( sqlite3_initialize() ) return 0;
+ if( id<=SQLITE_MUTEX_RECURSIVE && sqlite3_initialize() ) return 0;
#endif
return sqlite3GlobalConfig.mutex.xMutexAlloc(id);
}
diff --git a/src/mutex_noop.c b/src/mutex_noop.c
index 456e82a25..1a900c225 100644
--- a/src/mutex_noop.c
+++ b/src/mutex_noop.c
@@ -107,7 +107,7 @@ static int debugMutexEnd(void){ return SQLITE_OK; }
** that means that a mutex could not be allocated.
*/
static sqlite3_mutex *debugMutexAlloc(int id){
- static sqlite3_debug_mutex aStatic[6];
+ static sqlite3_debug_mutex aStatic[SQLITE_MUTEX_STATIC_APP3 - 1];
sqlite3_debug_mutex *pNew = 0;
switch( id ){
case SQLITE_MUTEX_FAST:
diff --git a/src/mutex_unix.c b/src/mutex_unix.c
index eca729583..c8663144e 100644
--- a/src/mutex_unix.c
+++ b/src/mutex_unix.c
@@ -96,10 +96,13 @@ static int pthreadMutexEnd(void){ return SQLITE_OK; }
** <li> SQLITE_MUTEX_RECURSIVE
** <li> SQLITE_MUTEX_STATIC_MASTER
** <li> SQLITE_MUTEX_STATIC_MEM
-** <li> SQLITE_MUTEX_STATIC_MEM2
+** <li> SQLITE_MUTEX_STATIC_OPEN
** <li> SQLITE_MUTEX_STATIC_PRNG
** <li> SQLITE_MUTEX_STATIC_LRU
** <li> SQLITE_MUTEX_STATIC_PMEM
+** <li> SQLITE_MUTEX_STATIC_APP1
+** <li> SQLITE_MUTEX_STATIC_APP2
+** <li> SQLITE_MUTEX_STATIC_APP3
** </ul>
**
** The first two constants cause sqlite3_mutex_alloc() to create
@@ -133,6 +136,9 @@ static sqlite3_mutex *pthreadMutexAlloc(int iType){
SQLITE3_MUTEX_INITIALIZER,
SQLITE3_MUTEX_INITIALIZER,
SQLITE3_MUTEX_INITIALIZER,
+ SQLITE3_MUTEX_INITIALIZER,
+ SQLITE3_MUTEX_INITIALIZER,
+ SQLITE3_MUTEX_INITIALIZER,
SQLITE3_MUTEX_INITIALIZER
};
sqlite3_mutex *p;
diff --git a/src/mutex_w32.c b/src/mutex_w32.c
index f43a15238..1f5775cce 100644
--- a/src/mutex_w32.c
+++ b/src/mutex_w32.c
@@ -100,7 +100,10 @@ static int winMutexNotheld(sqlite3_mutex *p){
/*
** Initialize and deinitialize the mutex subsystem.
*/
-static sqlite3_mutex winMutex_staticMutexes[6] = {
+static sqlite3_mutex winMutex_staticMutexes[] = {
+ SQLITE3_MUTEX_INITIALIZER,
+ SQLITE3_MUTEX_INITIALIZER,
+ SQLITE3_MUTEX_INITIALIZER,
SQLITE3_MUTEX_INITIALIZER,
SQLITE3_MUTEX_INITIALIZER,
SQLITE3_MUTEX_INITIALIZER,
@@ -166,10 +169,13 @@ static int winMutexEnd(void){
** <li> SQLITE_MUTEX_RECURSIVE
** <li> SQLITE_MUTEX_STATIC_MASTER
** <li> SQLITE_MUTEX_STATIC_MEM
-** <li> SQLITE_MUTEX_STATIC_MEM2
+** <li> SQLITE_MUTEX_STATIC_OPEN
** <li> SQLITE_MUTEX_STATIC_PRNG
** <li> SQLITE_MUTEX_STATIC_LRU
** <li> SQLITE_MUTEX_STATIC_PMEM
+** <li> SQLITE_MUTEX_STATIC_APP1
+** <li> SQLITE_MUTEX_STATIC_APP2
+** <li> SQLITE_MUTEX_STATIC_APP3
** </ul>
**
** The first two constants cause sqlite3_mutex_alloc() to create
diff --git a/src/os_win.c b/src/os_win.c
index 015b6c61d..5c246a858 100644
--- a/src/os_win.c
+++ b/src/os_win.c
@@ -948,11 +948,7 @@ static struct win_syscall {
#define osWaitForSingleObject ((DWORD(WINAPI*)(HANDLE, \
DWORD))aSyscall[63].pCurrent)
-#if SQLITE_OS_WINRT
{ "WaitForSingleObjectEx", (SYSCALL)WaitForSingleObjectEx, 0 },
-#else
- { "WaitForSingleObjectEx", (SYSCALL)0, 0 },
-#endif
#define osWaitForSingleObjectEx ((DWORD(WINAPI*)(HANDLE,DWORD, \
BOOL))aSyscall[64].pCurrent)
@@ -1279,6 +1275,13 @@ void sqlite3_win32_sleep(DWORD milliseconds){
#endif
}
+DWORD sqlite3Win32Wait(HANDLE hObject){
+ DWORD rc;
+ while( (rc = osWaitForSingleObjectEx(hObject, INFINITE,
+ TRUE))==WAIT_IO_COMPLETION ){}
+ return rc;
+}
+
/*
** Return true (non-zero) if we are running under WinNT, Win2K, WinXP,
** or WinCE. Return false (zero) for Win95, Win98, or WinME.
diff --git a/src/select.c b/src/select.c
index 6ceb3fe94..74064db10 100644
--- a/src/select.c
+++ b/src/select.c
@@ -455,28 +455,43 @@ static KeyInfo *keyInfoFromExprList(
);
/*
-** Insert code into "v" that will push the record in register regData
-** into the sorter.
+** Generate code that will push the record in registers regData
+** through regData+nData-1 onto the sorter.
*/
static void pushOntoSorter(
Parse *pParse, /* Parser context */
SortCtx *pSort, /* Information about the ORDER BY clause */
Select *pSelect, /* The whole SELECT statement */
- int regData /* Register holding data to be sorted */
+ int regData, /* First register holding data to be sorted */
+ int nData, /* Number of elements in the data array */
+ int nPrefixReg /* No. of reg prior to regData available for use */
){
- Vdbe *v = pParse->pVdbe;
- int nExpr = pSort->pOrderBy->nExpr;
- int regRecord = ++pParse->nMem;
- int regBase = pParse->nMem+1;
- int nOBSat = pSort->nOBSat;
- int op;
+ Vdbe *v = pParse->pVdbe; /* Stmt under construction */
+ int bSeq = ((pSort->sortFlags & SORTFLAG_UseSorter)==0);
+ int nExpr = pSort->pOrderBy->nExpr; /* No. of ORDER BY terms */
+ int nBase = nExpr + bSeq + nData; /* Fields in sorter record */
+ int regBase; /* Regs for sorter record */
+ int regRecord = ++pParse->nMem; /* Assembled sorter record */
+ int nOBSat = pSort->nOBSat; /* ORDER BY terms to skip */
+ int op; /* Opcode to add sorter record to sorter */
+
+ assert( bSeq==0 || bSeq==1 );
+ if( nPrefixReg ){
+ assert( nPrefixReg==nExpr+bSeq );
+ regBase = regData - nExpr - bSeq;
+ }else{
+ regBase = pParse->nMem + 1;
+ pParse->nMem += nBase;
+ }
+ sqlite3ExprCodeExprList(pParse, pSort->pOrderBy, regBase, SQLITE_ECEL_DUP);
+ if( bSeq ){
+ sqlite3VdbeAddOp2(v, OP_Sequence, pSort->iECursor, regBase+nExpr);
+ }
+ if( nPrefixReg==0 ){
+ sqlite3VdbeAddOp3(v, OP_Move, regData, regBase+nExpr+bSeq, nData);
+ }
- pParse->nMem += nExpr+2; /* nExpr+2 registers allocated at regBase */
- sqlite3ExprCacheClear(pParse);
- sqlite3ExprCodeExprList(pParse, pSort->pOrderBy, regBase, 0);
- sqlite3VdbeAddOp2(v, OP_Sequence, pSort->iECursor, regBase+nExpr);
- sqlite3ExprCodeMove(pParse, regData, regBase+nExpr+1, 1);
- sqlite3VdbeAddOp3(v, OP_MakeRecord, regBase+nOBSat, nExpr+2-nOBSat,regRecord);
+ sqlite3VdbeAddOp3(v, OP_MakeRecord, regBase+nOBSat, nBase-nOBSat, regRecord);
if( nOBSat>0 ){
int regPrevKey; /* The first nOBSat columns of the previous row */
int addrFirst; /* Address of the OP_IfNot opcode */
@@ -487,12 +502,17 @@ static void pushOntoSorter(
regPrevKey = pParse->nMem+1;
pParse->nMem += pSort->nOBSat;
- nKey = nExpr - pSort->nOBSat + 1;
- addrFirst = sqlite3VdbeAddOp1(v, OP_IfNot, regBase+nExpr); VdbeCoverage(v);
+ nKey = nExpr - pSort->nOBSat + bSeq;
+ if( bSeq ){
+ addrFirst = sqlite3VdbeAddOp1(v, OP_IfNot, regBase+nExpr);
+ }else{
+ addrFirst = sqlite3VdbeAddOp1(v, OP_SequenceTest, pSort->iECursor);
+ }
+ VdbeCoverage(v);
sqlite3VdbeAddOp3(v, OP_Compare, regPrevKey, regBase, pSort->nOBSat);
pOp = sqlite3VdbeGetOp(v, pSort->addrSortIndex);
if( pParse->db->mallocFailed ) return;
- pOp->p2 = nKey + 1;
+ pOp->p2 = nKey + nData;
pKI = pOp->p4.pKeyInfo;
memset(pKI->aSortOrder, 0, pKI->nField); /* Makes OP_Jump below testable */
sqlite3VdbeChangeP4(v, -1, (char*)pKI, P4_KEYINFO);
@@ -627,6 +647,7 @@ static void selectInnerLoop(
int eDest = pDest->eDest; /* How to dispose of results */
int iParm = pDest->iSDParm; /* First argument to disposal method */
int nResultCol; /* Number of result columns */
+ int nPrefixReg = 0; /* Number of extra registers before regResult */
assert( v );
assert( pEList!=0 );
@@ -642,6 +663,11 @@ static void selectInnerLoop(
nResultCol = pEList->nExpr;
if( pDest->iSdst==0 ){
+ if( pSort ){
+ nPrefixReg = pSort->pOrderBy->nExpr;
+ if( !(pSort->sortFlags & SORTFLAG_UseSorter) ) nPrefixReg++;
+ pParse->nMem += nPrefixReg;
+ }
pDest->iSdst = pParse->nMem+1;
pParse->nMem += nResultCol;
}else if( pDest->iSdst+nResultCol > pParse->nMem ){
@@ -758,10 +784,10 @@ static void selectInnerLoop(
case SRT_DistFifo:
case SRT_Table:
case SRT_EphemTab: {
- int r1 = sqlite3GetTempReg(pParse);
+ int r1 = sqlite3GetTempRange(pParse, nPrefixReg+1);
testcase( eDest==SRT_Table );
testcase( eDest==SRT_EphemTab );
- sqlite3VdbeAddOp3(v, OP_MakeRecord, regResult, nResultCol, r1);
+ sqlite3VdbeAddOp3(v, OP_MakeRecord, regResult, nResultCol, r1+nPrefixReg);
#ifndef SQLITE_OMIT_CTE
if( eDest==SRT_DistFifo ){
/* If the destination is DistFifo, then cursor (iParm+1) is open
@@ -776,7 +802,7 @@ static void selectInnerLoop(
}
#endif
if( pSort ){
- pushOntoSorter(pParse, pSort, p, r1);
+ pushOntoSorter(pParse, pSort, p, r1+nPrefixReg, 1, nPrefixReg);
}else{
int r2 = sqlite3GetTempReg(pParse);
sqlite3VdbeAddOp2(v, OP_NewRowid, iParm, r2);
@@ -784,7 +810,7 @@ static void selectInnerLoop(
sqlite3VdbeChangeP5(v, OPFLAG_APPEND);
sqlite3ReleaseTempReg(pParse, r2);
}
- sqlite3ReleaseTempReg(pParse, r1);
+ sqlite3ReleaseTempRange(pParse, r1, nPrefixReg+1);
break;
}
@@ -802,7 +828,7 @@ static void selectInnerLoop(
** ORDER BY in this case since the order of entries in the set
** does not matter. But there might be a LIMIT clause, in which
** case the order does matter */
- pushOntoSorter(pParse, pSort, p, regResult);
+ pushOntoSorter(pParse, pSort, p, regResult, 1, nPrefixReg);
}else{
int r1 = sqlite3GetTempReg(pParse);
sqlite3VdbeAddOp4(v, OP_MakeRecord, regResult,1,r1, &pDest->affSdst, 1);
@@ -828,7 +854,7 @@ static void selectInnerLoop(
case SRT_Mem: {
assert( nResultCol==1 );
if( pSort ){
- pushOntoSorter(pParse, pSort, p, regResult);
+ pushOntoSorter(pParse, pSort, p, regResult, 1, nPrefixReg);
}else{
sqlite3ExprCodeMove(pParse, regResult, iParm, 1);
/* The LIMIT clause will jump out of the loop for us */
@@ -842,10 +868,7 @@ static void selectInnerLoop(
testcase( eDest==SRT_Coroutine );
testcase( eDest==SRT_Output );
if( pSort ){
- int r1 = sqlite3GetTempReg(pParse);
- sqlite3VdbeAddOp3(v, OP_MakeRecord, regResult, nResultCol, r1);
- pushOntoSorter(pParse, pSort, p, r1);
- sqlite3ReleaseTempReg(pParse, r1);
+ pushOntoSorter(pParse, pSort, p, regResult, nResultCol, nPrefixReg);
}else if( eDest==SRT_Coroutine ){
sqlite3VdbeAddOp1(v, OP_Yield, pDest->iSDParm);
}else{
@@ -1125,46 +1148,62 @@ static void generateSortTail(
int addr;
int addrOnce = 0;
int iTab;
- int pseudoTab = 0;
ExprList *pOrderBy = pSort->pOrderBy;
int eDest = pDest->eDest;
int iParm = pDest->iSDParm;
int regRow;
int regRowid;
int nKey;
+ int iSortTab; /* Sorter cursor to read from */
+ int nSortData; /* Trailing values to read from sorter */
+ u8 p5; /* p5 parameter for 1st OP_Column */
+ int i;
+ int bSeq; /* True if sorter record includes seq. no. */
+#ifdef SQLITE_ENABLE_EXPLAIN_COMMENTS
+ struct ExprList_item *aOutEx = p->pEList->a;
+#endif
if( pSort->labelBkOut ){
sqlite3VdbeAddOp2(v, OP_Gosub, pSort->regReturn, pSort->labelBkOut);
sqlite3VdbeAddOp2(v, OP_Goto, 0, addrBreak);
sqlite3VdbeResolveLabel(v, pSort->labelBkOut);
- addrOnce = sqlite3CodeOnce(pParse); VdbeCoverage(v);
}
iTab = pSort->iECursor;
- regRow = sqlite3GetTempReg(pParse);
if( eDest==SRT_Output || eDest==SRT_Coroutine ){
- pseudoTab = pParse->nTab++;
- sqlite3VdbeAddOp3(v, OP_OpenPseudo, pseudoTab, regRow, nColumn);
regRowid = 0;
+ regRow = pDest->iSdst;
+ nSortData = nColumn;
}else{
regRowid = sqlite3GetTempReg(pParse);
+ regRow = sqlite3GetTempReg(pParse);
+ nSortData = 1;
}
nKey = pOrderBy->nExpr - pSort->nOBSat;
if( pSort->sortFlags & SORTFLAG_UseSorter ){
int regSortOut = ++pParse->nMem;
- int ptab2 = pParse->nTab++;
- sqlite3VdbeAddOp3(v, OP_OpenPseudo, ptab2, regSortOut, nKey+2);
+ iSortTab = pParse->nTab++;
+ if( pSort->labelBkOut ){
+ addrOnce = sqlite3CodeOnce(pParse); VdbeCoverage(v);
+ }
+ sqlite3VdbeAddOp3(v, OP_OpenPseudo, iSortTab, regSortOut, nKey+1+nSortData);
if( addrOnce ) sqlite3VdbeJumpHere(v, addrOnce);
addr = 1 + sqlite3VdbeAddOp2(v, OP_SorterSort, iTab, addrBreak);
VdbeCoverage(v);
codeOffset(v, p->iOffset, addrContinue);
sqlite3VdbeAddOp2(v, OP_SorterData, iTab, regSortOut);
- sqlite3VdbeAddOp3(v, OP_Column, ptab2, nKey+1, regRow);
- sqlite3VdbeChangeP5(v, OPFLAG_CLEARCACHE);
+ p5 = OPFLAG_CLEARCACHE;
+ bSeq = 0;
}else{
- if( addrOnce ) sqlite3VdbeJumpHere(v, addrOnce);
addr = 1 + sqlite3VdbeAddOp2(v, OP_Sort, iTab, addrBreak); VdbeCoverage(v);
codeOffset(v, p->iOffset, addrContinue);
- sqlite3VdbeAddOp3(v, OP_Column, iTab, nKey+1, regRow);
+ iSortTab = iTab;
+ p5 = 0;
+ bSeq = 1;
+ }
+ for(i=0; i<nSortData; i++){
+ sqlite3VdbeAddOp3(v, OP_Column, iSortTab, nKey+bSeq+i, regRow+i);
+ if( i==0 ) sqlite3VdbeChangeP5(v, p5);
+ VdbeComment((v, "%s", aOutEx[i].zName ? aOutEx[i].zName : aOutEx[i].zSpan));
}
switch( eDest ){
case SRT_Table:
@@ -1193,17 +1232,9 @@ static void generateSortTail(
}
#endif
default: {
- int i;
assert( eDest==SRT_Output || eDest==SRT_Coroutine );
testcase( eDest==SRT_Output );
testcase( eDest==SRT_Coroutine );
- for(i=0; i<nColumn; i++){
- assert( regRow!=pDest->iSdst+i );
- sqlite3VdbeAddOp3(v, OP_Column, pseudoTab, i, pDest->iSdst+i);
- if( i==0 ){
- sqlite3VdbeChangeP5(v, OPFLAG_CLEARCACHE);
- }
- }
if( eDest==SRT_Output ){
sqlite3VdbeAddOp2(v, OP_ResultRow, pDest->iSdst, nColumn);
sqlite3ExprCacheAffinityChange(pParse, pDest->iSdst, nColumn);
@@ -1213,9 +1244,10 @@ static void generateSortTail(
break;
}
}
- sqlite3ReleaseTempReg(pParse, regRow);
- sqlite3ReleaseTempReg(pParse, regRowid);
-
+ if( regRowid ){
+ sqlite3ReleaseTempReg(pParse, regRow);
+ sqlite3ReleaseTempReg(pParse, regRowid);
+ }
/* The bottom of the loop
*/
sqlite3VdbeResolveLabel(v, addrContinue);
@@ -4756,8 +4788,9 @@ int sqlite3Select(
sSort.iECursor = pParse->nTab++;
sSort.addrSortIndex =
sqlite3VdbeAddOp4(v, OP_OpenEphemeral,
- sSort.iECursor, sSort.pOrderBy->nExpr+2, 0,
- (char*)pKeyInfo, P4_KEYINFO);
+ sSort.iECursor, sSort.pOrderBy->nExpr+1+pEList->nExpr, 0,
+ (char*)pKeyInfo, P4_KEYINFO
+ );
}else{
sSort.addrSortIndex = -1;
}
@@ -4888,7 +4921,7 @@ int sqlite3Select(
sNC.pSrcList = pTabList;
sNC.pAggInfo = &sAggInfo;
sAggInfo.mnReg = pParse->nMem+1;
- sAggInfo.nSortingColumn = pGroupBy ? pGroupBy->nExpr+1 : 0;
+ sAggInfo.nSortingColumn = pGroupBy ? pGroupBy->nExpr : 0;
sAggInfo.pGroupBy = pGroupBy;
sqlite3ExprAnalyzeAggList(&sNC, pEList);
sqlite3ExprAnalyzeAggList(&sNC, sSort.pOrderBy);
@@ -4981,8 +5014,8 @@ int sqlite3Select(
groupBySort = 1;
nGroupBy = pGroupBy->nExpr;
- nCol = nGroupBy + 1;
- j = nGroupBy+1;
+ nCol = nGroupBy;
+ j = nGroupBy;
for(i=0; i<sAggInfo.nColumn; i++){
if( sAggInfo.aCol[i].iSorterColumn>=j ){
nCol++;
@@ -4992,8 +5025,7 @@ int sqlite3Select(
regBase = sqlite3GetTempRange(pParse, nCol);
sqlite3ExprCacheClear(pParse);
sqlite3ExprCodeExprList(pParse, pGroupBy, regBase, 0);
- sqlite3VdbeAddOp2(v, OP_Sequence, sAggInfo.sortingIdx,regBase+nGroupBy);
- j = nGroupBy+1;
+ j = nGroupBy;
for(i=0; i<sAggInfo.nColumn; i++){
struct AggInfo_col *pCol = &sAggInfo.aCol[i];
if( pCol->iSorterColumn>=j ){
diff --git a/src/shell.c b/src/shell.c
index 371efa024..76bb89970 100644
--- a/src/shell.c
+++ b/src/shell.c
@@ -3814,7 +3814,8 @@ static void main_init(struct callback_data *data) {
sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> ");
- sqlite3_config(SQLITE_CONFIG_SINGLETHREAD);
+ sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
+ sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4);
}
/*
diff --git a/src/sqlite.h.in b/src/sqlite.h.in
index 67113dc41..6bad046d8 100644
--- a/src/sqlite.h.in
+++ b/src/sqlite.h.in
@@ -1726,6 +1726,16 @@ struct sqlite3_mem_methods {
** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value
** that specifies the maximum size of the created heap.
** </dl>
+**
+** [[SQLITE_CONFIG_WORKER_THREADS]]
+** <dt>SQLITE_CONFIG_WORKER_THREADS
+** <dd>^SQLITE_CONFIG_WORKER_THREADS takes a single argument of type int.
+** It is used to set the number of background worker threads that may be
+** launched when sorting large amounts of data. A value of 0 means launch
+** no background threads at all. The maximum number of background threads
+** allowed is configured at build-time by the SQLITE_MAX_WORKER_THREADS
+** pre-processor option.
+** </dl>
*/
#define SQLITE_CONFIG_SINGLETHREAD 1 /* nil */
#define SQLITE_CONFIG_MULTITHREAD 2 /* nil */
@@ -1750,6 +1760,7 @@ struct sqlite3_mem_methods {
#define SQLITE_CONFIG_SQLLOG 21 /* xSqllog, void* */
#define SQLITE_CONFIG_MMAP_SIZE 22 /* sqlite3_int64, sqlite3_int64 */
#define SQLITE_CONFIG_WIN32_HEAPSIZE 23 /* int nByte */
+#define SQLITE_CONFIG_WORKER_THREADS 24 /* int nWorker */
/*
** CAPI3REF: Database Connection Configuration Options
@@ -5875,10 +5886,12 @@ int sqlite3_vfs_unregister(sqlite3_vfs*);
** <li> SQLITE_MUTEX_RECURSIVE
** <li> SQLITE_MUTEX_STATIC_MASTER
** <li> SQLITE_MUTEX_STATIC_MEM
-** <li> SQLITE_MUTEX_STATIC_MEM2
+** <li> SQLITE_MUTEX_STATIC_OPEN
** <li> SQLITE_MUTEX_STATIC_PRNG
** <li> SQLITE_MUTEX_STATIC_LRU
-** <li> SQLITE_MUTEX_STATIC_LRU2
+** <li> SQLITE_MUTEX_STATIC_PMEM
+** <li> SQLITE_MUTEX_STATIC_APP1
+** <li> SQLITE_MUTEX_STATIC_APP2
** </ul>)^
**
** ^The first two constants (SQLITE_MUTEX_FAST and SQLITE_MUTEX_RECURSIVE)
@@ -6082,6 +6095,9 @@ int sqlite3_mutex_notheld(sqlite3_mutex*);
#define SQLITE_MUTEX_STATIC_LRU 6 /* lru page list */
#define SQLITE_MUTEX_STATIC_LRU2 7 /* NOT USED */
#define SQLITE_MUTEX_STATIC_PMEM 7 /* sqlite3PageMalloc() */
+#define SQLITE_MUTEX_STATIC_APP1 8 /* For use by application */
+#define SQLITE_MUTEX_STATIC_APP2 9 /* For use by application */
+#define SQLITE_MUTEX_STATIC_APP3 10 /* For use by application */
/*
** CAPI3REF: Retrieve the mutex for a database connection
@@ -6177,7 +6193,8 @@ int sqlite3_test_control(int op, ...);
#define SQLITE_TESTCTRL_NEVER_CORRUPT 20
#define SQLITE_TESTCTRL_VDBE_COVERAGE 21
#define SQLITE_TESTCTRL_BYTEORDER 22
-#define SQLITE_TESTCTRL_LAST 22
+#define SQLITE_TESTCTRL_SORTER_MMAP 23
+#define SQLITE_TESTCTRL_LAST 23
/*
** CAPI3REF: SQLite Runtime Status
diff --git a/src/sqliteInt.h b/src/sqliteInt.h
index 7a5d225b8..f8427cd23 100644
--- a/src/sqliteInt.h
+++ b/src/sqliteInt.h
@@ -423,6 +423,26 @@
#endif
/*
+** If no value has been provided for SQLITE_MAX_WORKER_THREADS, or if
+** SQLITE_TEMP_STORE is set to 3 (never use temporary files), set it
+** to zero.
+*/
+#if SQLITE_TEMP_STORE==3
+# undef SQLITE_MAX_WORKER_THREADS
+#endif
+#ifndef SQLITE_MAX_WORKER_THREADS
+# define SQLITE_MAX_WORKER_THREADS 0
+#endif
+#ifndef SQLITE_DEFAULT_WORKER_THREADS
+# define SQLITE_DEFAULT_WORKER_THREADS 0
+#endif
+#if SQLITE_DEFAULT_WORKER_THREADS>SQLITE_MAX_WORKER_THREADS
+# undef SQLITE_MAX_WORKER_THREADS
+# define SQLITE_MAX_WORKER_THREADS SQLITE_DEFAULT_WORKER_THREADS
+#endif
+
+
+/*
** GCC does not define the offsetof() macro so we'll have to do it
** ourselves.
*/
@@ -805,6 +825,7 @@ typedef struct PrintfArguments PrintfArguments;
typedef struct RowSet RowSet;
typedef struct Savepoint Savepoint;
typedef struct Select Select;
+typedef struct SQLiteThread SQLiteThread;
typedef struct SelectDest SelectDest;
typedef struct SrcList SrcList;
typedef struct StrAccum StrAccum;
@@ -984,6 +1005,7 @@ struct sqlite3 {
int nChange; /* Value returned by sqlite3_changes() */
int nTotalChange; /* Value returned by sqlite3_total_changes() */
int aLimit[SQLITE_N_LIMIT]; /* Limits */
+ int nMaxSorterMmap; /* Maximum size of regions mapped by sorter */
struct sqlite3InitInfo { /* Information used during initialization */
int newTnum; /* Rootpage of table being initialized */
u8 iDb; /* Which db file is being initialized */
@@ -1647,7 +1669,7 @@ struct UnpackedRecord {
KeyInfo *pKeyInfo; /* Collation and sort-order information */
u16 nField; /* Number of entries in apMem[] */
i8 default_rc; /* Comparison result if keys are equal */
- u8 isCorrupt; /* Corruption detected by xRecordCompare() */
+ u8 errCode; /* Error detected by xRecordCompare (CORRUPT or NOMEM) */
Mem *aMem; /* Values */
int r1; /* Value to return if (lhs > rhs) */
int r2; /* Value to return if (rhs < lhs) */
@@ -2726,6 +2748,7 @@ struct Sqlite3Config {
int nPage; /* Number of pages in pPage[] */
int mxParserStack; /* maximum depth of the parser stack */
int sharedCacheEnabled; /* true if shared-cache mode enabled */
+ int nWorker; /* Number of worker threads to use */
/* The above might be initialized to non-zero. The following need to always
** initially be zero, however. */
int isInit; /* True after initialization has finished */
@@ -3692,4 +3715,19 @@ SQLITE_EXTERN void (*sqlite3IoTrace)(const char*,...);
#define MEMTYPE_PCACHE 0x08 /* Page cache allocations */
#define MEMTYPE_DB 0x10 /* Uses sqlite3DbMalloc, not sqlite_malloc */
+/*
+** Threading interface
+*/
+#if SQLITE_MAX_WORKER_THREADS>0
+int sqlite3ThreadCreate(SQLiteThread**,void*(*)(void*),void*);
+int sqlite3ThreadJoin(SQLiteThread*, void**);
+#endif
+
+/*
+** Win32 interface
+*/
+#if SQLITE_OS_WIN
+ DWORD sqlite3Win32Wait(HANDLE hObject);
+#endif
+
#endif /* _SQLITEINT_H_ */
diff --git a/src/test1.c b/src/test1.c
index 8d2249cbb..5ea0e38b9 100644
--- a/src/test1.c
+++ b/src/test1.c
@@ -2718,6 +2718,46 @@ bad_args:
}
/*
+** Usage: add_test_utf16bin_collate <db ptr>
+**
+** Add a utf-16 collation sequence named "utf16bin" to the database
+** handle. This collation sequence compares arguments in the same way as the
+** built-in collation "binary".
+*/
+static int test_utf16bin_collate_func(
+ void *pCtx,
+ int nA, const void *zA,
+ int nB, const void *zB
+){
+ int nCmp = (nA>nB ? nB : nA);
+ int res = memcmp(zA, zB, nCmp);
+ if( res==0 ) res = nA - nB;
+ return res;
+}
+static int test_utf16bin_collate(
+ void * clientData,
+ Tcl_Interp *interp,
+ int objc,
+ Tcl_Obj *CONST objv[]
+){
+ sqlite3 *db;
+ int rc;
+
+ if( objc!=2 ) goto bad_args;
+ if( getDbPointer(interp, Tcl_GetString(objv[1]), &db) ) return TCL_ERROR;
+
+ rc = sqlite3_create_collation(db, "utf16bin", SQLITE_UTF16, 0,
+ test_utf16bin_collate_func
+ );
+ if( sqlite3TestErrCode(interp, db, rc) ) return TCL_ERROR;
+ return TCL_OK;
+
+bad_args:
+ Tcl_WrongNumArgs(interp, 1, objv, "DB");
+ return TCL_ERROR;
+}
+
+/*
** When the collation needed callback is invoked, record the name of
** the requested collating function here. The recorded name is linked
** to a TCL variable and used to make sure that the requested collation
@@ -5895,6 +5935,7 @@ static int test_test_control(
int i;
} aVerb[] = {
{ "SQLITE_TESTCTRL_LOCALTIME_FAULT", SQLITE_TESTCTRL_LOCALTIME_FAULT },
+ { "SQLITE_TESTCTRL_SORTER_MMAP", SQLITE_TESTCTRL_SORTER_MMAP },
};
int iVerb;
int iFlag;
@@ -5922,6 +5963,19 @@ static int test_test_control(
sqlite3_test_control(SQLITE_TESTCTRL_LOCALTIME_FAULT, val);
break;
}
+
+ case SQLITE_TESTCTRL_SORTER_MMAP: {
+ int val;
+ sqlite3 *db;
+ if( objc!=4 ){
+ Tcl_WrongNumArgs(interp, 2, objv, "DB LIMIT");
+ return TCL_ERROR;
+ }
+ if( getDbPointer(interp, Tcl_GetString(objv[2]), &db) ) return TCL_ERROR;
+ if( Tcl_GetIntFromObj(interp, objv[3], &val) ) return TCL_ERROR;
+ sqlite3_test_control(SQLITE_TESTCTRL_SORTER_MMAP, db, val);
+ break;
+ }
}
Tcl_ResetResult(interp);
@@ -6335,6 +6389,113 @@ static int tclLoadStaticExtensionCmd(
return TCL_OK;
}
+/*
+** sorter_test_fakeheap BOOL
+**
+*/
+static int sorter_test_fakeheap(
+ void * clientData,
+ Tcl_Interp *interp,
+ int objc,
+ Tcl_Obj *CONST objv[]
+){
+ int bArg;
+ if( objc!=2 ){
+ Tcl_WrongNumArgs(interp, 1, objv, "BOOL");
+ return TCL_ERROR;
+ }
+
+ if( Tcl_GetBooleanFromObj(interp, objv[1], &bArg) ){
+ return TCL_ERROR;
+ }
+
+ if( bArg ){
+ if( sqlite3GlobalConfig.pHeap==0 ){
+ sqlite3GlobalConfig.pHeap = SQLITE_INT_TO_PTR(-1);
+ }
+ }else{
+ if( sqlite3GlobalConfig.pHeap==SQLITE_INT_TO_PTR(-1) ){
+ sqlite3GlobalConfig.pHeap = 0;
+ }
+ }
+
+ Tcl_ResetResult(interp);
+ return TCL_OK;
+}
+
+/*
+** sorter_test_sort4_helper DB SQL1 NSTEP SQL2
+**
+** Compile SQL statement $SQL1 and step it $NSTEP times. For each row,
+** check that the leftmost and rightmost columns returned are both integers,
+** and that both contain the same value.
+**
+** Then execute statement $SQL2. Check that the statement returns the same
+** set of integers in the same order as in the previous step (using $SQL1).
+*/
+static int sorter_test_sort4_helper(
+ void * clientData,
+ Tcl_Interp *interp,
+ int objc,
+ Tcl_Obj *CONST objv[]
+){
+ const char *zSql1;
+ const char *zSql2;
+ int nStep;
+ int iStep;
+ int iCksum1 = 0;
+ int iCksum2 = 0;
+ int rc;
+ int iB;
+ sqlite3 *db;
+ sqlite3_stmt *pStmt;
+
+ if( objc!=5 ){
+ Tcl_WrongNumArgs(interp, 1, objv, "DB SQL1 NSTEP SQL2");
+ return TCL_ERROR;
+ }
+
+ if( getDbPointer(interp, Tcl_GetString(objv[1]), &db) ) return TCL_ERROR;
+ zSql1 = Tcl_GetString(objv[2]);
+ if( Tcl_GetIntFromObj(interp, objv[3], &nStep) ) return TCL_ERROR;
+ zSql2 = Tcl_GetString(objv[4]);
+
+ rc = sqlite3_prepare_v2(db, zSql1, -1, &pStmt, 0);
+ if( rc!=SQLITE_OK ) goto sql_error;
+
+ iB = sqlite3_column_count(pStmt)-1;
+ for(iStep=0; iStep<nStep && SQLITE_ROW==sqlite3_step(pStmt); iStep++){
+ int a = sqlite3_column_int(pStmt, 0);
+ if( a!=sqlite3_column_int(pStmt, iB) ){
+ Tcl_AppendResult(interp, "data error: (a!=b)", 0);
+ return TCL_ERROR;
+ }
+
+ iCksum1 += (iCksum1 << 3) + a;
+ }
+ rc = sqlite3_finalize(pStmt);
+ if( rc!=SQLITE_OK ) goto sql_error;
+
+ rc = sqlite3_prepare_v2(db, zSql2, -1, &pStmt, 0);
+ if( rc!=SQLITE_OK ) goto sql_error;
+ for(iStep=0; SQLITE_ROW==sqlite3_step(pStmt); iStep++){
+ int a = sqlite3_column_int(pStmt, 0);
+ iCksum2 += (iCksum2 << 3) + a;
+ }
+ rc = sqlite3_finalize(pStmt);
+ if( rc!=SQLITE_OK ) goto sql_error;
+
+ if( iCksum1!=iCksum2 ){
+ Tcl_AppendResult(interp, "checksum mismatch", 0);
+ return TCL_ERROR;
+ }
+
+ return TCL_OK;
+ sql_error:
+ Tcl_AppendResult(interp, "sql error: ", sqlite3_errmsg(db), 0);
+ return TCL_ERROR;
+}
+
/*
** Register commands with the TCL interpreter.
@@ -6537,6 +6698,7 @@ int Sqlitetest1_Init(Tcl_Interp *interp){
{ "add_test_collate", test_collate, 0 },
{ "add_test_collate_needed", test_collate_needed, 0 },
{ "add_test_function", test_function, 0 },
+ { "add_test_utf16bin_collate", test_utf16bin_collate, 0 },
#endif
{ "sqlite3_test_errstr", test_errstr, 0 },
{ "tcl_variable_type", tcl_variable_type, 0 },
@@ -6570,6 +6732,8 @@ int Sqlitetest1_Init(Tcl_Interp *interp){
{ "getrusage", test_getrusage },
#endif
{ "load_static_extension", tclLoadStaticExtensionCmd },
+ { "sorter_test_fakeheap", sorter_test_fakeheap },
+ { "sorter_test_sort4_helper", sorter_test_sort4_helper },
};
static int bitmask_size = sizeof(Bitmask)*8;
int i;
diff --git a/src/test_config.c b/src/test_config.c
index 9ce7f6fdf..78c65a8e5 100644
--- a/src/test_config.c
+++ b/src/test_config.c
@@ -103,6 +103,10 @@ static void set_options(Tcl_Interp *interp){
Tcl_SetVar2(interp, "sqlite_options", "mmap", "0", TCL_GLOBAL_ONLY);
#endif
+ Tcl_SetVar2(interp, "sqlite_options", "worker_threads",
+ STRINGVALUE(SQLITE_MAX_WORKER_THREADS), TCL_GLOBAL_ONLY
+ );
+
#if 1 /* def SQLITE_MEMDEBUG */
Tcl_SetVar2(interp, "sqlite_options", "memdebug", "1", TCL_GLOBAL_ONLY);
#else
diff --git a/src/test_malloc.c b/src/test_malloc.c
index e3cfcaa9f..6ac030f23 100644
--- a/src/test_malloc.c
+++ b/src/test_malloc.c
@@ -1254,6 +1254,32 @@ static int test_config_cis(
}
/*
+** Usage: sqlite3_config_worker_threads N
+*/
+static int test_config_worker_threads(
+ void * clientData,
+ Tcl_Interp *interp,
+ int objc,
+ Tcl_Obj *CONST objv[]
+){
+ int rc;
+ int nThread;
+
+ if( objc!=2 ){
+ Tcl_WrongNumArgs(interp, 1, objv, "N");
+ return TCL_ERROR;
+ }
+ if( Tcl_GetIntFromObj(interp, objv[1], &nThread) ){
+ return TCL_ERROR;
+ }
+
+ rc = sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, nThread);
+ Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE);
+
+ return TCL_OK;
+}
+
+/*
** Usage: sqlite3_dump_memsys3 FILENAME
** sqlite3_dump_memsys5 FILENAME
**
@@ -1506,6 +1532,7 @@ int Sqlitetest_malloc_Init(Tcl_Interp *interp){
{ "sqlite3_config_error", test_config_error ,0 },
{ "sqlite3_config_uri", test_config_uri ,0 },
{ "sqlite3_config_cis", test_config_cis ,0 },
+ { "sqlite3_config_worker_threads", test_config_worker_threads ,0 },
{ "sqlite3_db_config_lookaside",test_db_config_lookaside ,0 },
{ "sqlite3_dump_memsys3", test_dump_memsys3 ,3 },
{ "sqlite3_dump_memsys5", test_dump_memsys3 ,5 },
diff --git a/src/threads.c b/src/threads.c
new file mode 100644
index 000000000..1cfe2cc6c
--- /dev/null
+++ b/src/threads.c
@@ -0,0 +1,235 @@
+/*
+** 2012 July 21
+**
+** The author disclaims copyright to this source code. In place of
+** a legal notice, here is a blessing:
+**
+** May you do good and not evil.
+** May you find forgiveness for yourself and forgive others.
+** May you share freely, never taking more than you give.
+**
+******************************************************************************
+**
+** This file presents a simple cross-platform threading interface for
+** use internally by SQLite.
+**
+** A "thread" can be created using sqlite3ThreadCreate(). This thread
+** runs independently of its creator until it is joined using
+** sqlite3ThreadJoin(), at which point it terminates.
+**
+** Threads do not have to be real. It could be that the work of the
+** "thread" is done by the main thread at either the sqlite3ThreadCreate()
+** or sqlite3ThreadJoin() call. This is, in fact, what happens in
+** single threaded systems. Nothing in SQLite requires multiple threads.
+** This interface exists so that applications that want to take advantage
+** of multiple cores can do so, while also allowing applications to stay
+** single-threaded if desired.
+*/
+#include "sqliteInt.h"
+
+#if SQLITE_MAX_WORKER_THREADS>0
+
+/********************************* Unix Pthreads ****************************/
+#if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0
+
+#define SQLITE_THREADS_IMPLEMENTED 1 /* Prevent the single-thread code below */
+#include <pthread.h>
+
+/* A running thread */
+struct SQLiteThread {
+ pthread_t tid; /* Thread ID */
+ int done; /* Set to true when thread finishes */
+ void *pOut; /* Result returned by the thread */
+ void *(*xTask)(void*); /* The thread routine */
+ void *pIn; /* Argument to the thread */
+};
+
+/* Create a new thread */
+int sqlite3ThreadCreate(
+ SQLiteThread **ppThread, /* OUT: Write the thread object here */
+ void *(*xTask)(void*), /* Routine to run in a separate thread */
+ void *pIn /* Argument passed into xTask() */
+){
+ SQLiteThread *p;
+
+ assert( ppThread!=0 );
+ assert( xTask!=0 );
+ /* This routine is never used in single-threaded mode */
+ assert( sqlite3GlobalConfig.bCoreMutex!=0 );
+
+ *ppThread = 0;
+ p = sqlite3Malloc(sizeof(*p));
+ if( p==0 ) return SQLITE_NOMEM;
+ memset(p, 0, sizeof(*p));
+ p->xTask = xTask;
+ p->pIn = pIn;
+ if( sqlite3FaultSim(200) ? 1 : pthread_create(&p->tid, 0, xTask, pIn) ){
+ p->done = 1;
+ p->pOut = xTask(pIn);
+ }
+ *ppThread = p;
+ return SQLITE_OK;
+}
+
+/* Get the results of the thread */
+int sqlite3ThreadJoin(SQLiteThread *p, void **ppOut){
+ int rc;
+
+ assert( ppOut!=0 );
+ if( p==0 ) return SQLITE_NOMEM;
+ if( p->done ){
+ *ppOut = p->pOut;
+ rc = SQLITE_OK;
+ }else{
+ rc = pthread_join(p->tid, ppOut) ? SQLITE_ERROR : SQLITE_OK;
+ }
+ sqlite3_free(p);
+ return rc;
+}
+
+#endif /* SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) */
+/******************************** End Unix Pthreads *************************/
+
+
+/********************************* Win32 Threads ****************************/
+#if SQLITE_OS_WIN && !SQLITE_OS_WINRT && SQLITE_THREADSAFE>0
+
+#define SQLITE_THREADS_IMPLEMENTED 1 /* Prevent the single-thread code below */
+#include <process.h>
+
+/* A running thread */
+struct SQLiteThread {
+ uintptr_t tid; /* The thread handle */
+ void *(*xTask)(void*); /* The routine to run as a thread */
+ void *pIn; /* Argument to xTask */
+ void *pResult; /* Result of xTask */
+};
+
+/* Thread procedure Win32 compatibility shim */
+static void sqlite3ThreadProc(
+ void *pArg /* IN: Pointer to the SQLiteThread structure */
+){
+ SQLiteThread *p = (SQLiteThread *)pArg;
+
+ assert( p!=0 );
+ assert( p->xTask!=0 );
+ p->pResult = p->xTask(p->pIn);
+ _endthread();
+}
+
+/* Create a new thread */
+int sqlite3ThreadCreate(
+ SQLiteThread **ppThread, /* OUT: Write the thread object here */
+ void *(*xTask)(void*), /* Routine to run in a separate thread */
+ void *pIn /* Argument passed into xTask() */
+){
+ SQLiteThread *p;
+
+ assert( ppThread!=0 );
+ assert( xTask!=0 );
+ *ppThread = 0;
+ p = sqlite3Malloc(sizeof(*p));
+ if( p==0 ) return SQLITE_NOMEM;
+ if( sqlite3GlobalConfig.bCoreMutex==0 ){
+ memset(p, 0, sizeof(*p));
+ }else{
+ p->xTask = xTask;
+ p->pIn = pIn;
+ p->tid = _beginthread(sqlite3ThreadProc, 0, p);
+ if( p->tid==(uintptr_t)-1 ){
+ memset(p, 0, sizeof(*p));
+ }
+ }
+ if( p->xTask==0 ){
+ p->pResult = xTask(pIn);
+ }
+ *ppThread = p;
+ return SQLITE_OK;
+}
+
+/* Get the results of the thread */
+int sqlite3ThreadJoin(SQLiteThread *p, void **ppOut){
+ DWORD rc;
+
+ assert( ppOut!=0 );
+ if( p==0 ) return SQLITE_NOMEM;
+ if( p->xTask==0 ){
+ rc = WAIT_OBJECT_0;
+ }else{
+ rc = sqlite3Win32Wait((HANDLE)p->tid);
+ assert( rc!=WAIT_IO_COMPLETION );
+ }
+ if( rc==WAIT_OBJECT_0 ) *ppOut = p->pResult;
+ sqlite3_free(p);
+ return (rc==WAIT_OBJECT_0) ? SQLITE_OK : SQLITE_ERROR;
+}
+
+#endif /* SQLITE_OS_WIN && !SQLITE_OS_WINRT */
+/******************************** End Win32 Threads *************************/
+
+
+/********************************* Single-Threaded **************************/
+#ifndef SQLITE_THREADS_IMPLEMENTED
+/*
+** This implementation does not actually create a new thread. It does the
+** work of the thread in the main thread, when either the thread is created
+** or when it is joined
+*/
+
+/* A running thread */
+struct SQLiteThread {
+ void *(*xTask)(void*); /* The routine to run as a thread */
+ void *pIn; /* Argument to xTask */
+ void *pResult; /* Result of xTask */
+};
+
+/* Create a new thread */
+int sqlite3ThreadCreate(
+ SQLiteThread **ppThread, /* OUT: Write the thread object here */
+ void *(*xTask)(void*), /* Routine to run in a separate thread */
+ void *pIn /* Argument passed into xTask() */
+){
+ SQLiteThread *p;
+
+ assert( ppThread!=0 );
+ assert( xTask!=0 );
+ *ppThread = 0;
+ p = sqlite3Malloc(sizeof(*p));
+ if( p==0 ) return SQLITE_NOMEM;
+ if( (SQLITE_PTR_TO_INT(p)/17)&1 ){
+ p->xTask = xTask;
+ p->pIn = pIn;
+ }else{
+ p->xTask = 0;
+ p->pResult = xTask(pIn);
+ }
+ *ppThread = p;
+ return SQLITE_OK;
+}
+
+/* Get the results of the thread */
+int sqlite3ThreadJoin(SQLiteThread *p, void **ppOut){
+
+ assert( ppOut!=0 );
+ if( p==0 ) return SQLITE_NOMEM;
+ if( p->xTask ){
+ *ppOut = p->xTask(p->pIn);
+ }else{
+ *ppOut = p->pResult;
+ }
+ sqlite3_free(p);
+
+#if defined(SQLITE_TEST)
+ {
+ void *pTstAlloc = sqlite3Malloc(10);
+ if (!pTstAlloc) return SQLITE_NOMEM;
+ sqlite3_free(pTstAlloc);
+ }
+#endif
+
+ return SQLITE_OK;
+}
+
+#endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */
+/****************************** End Single-Threaded *************************/
+#endif /* SQLITE_MAX_WORKER_THREADS>0 */
diff --git a/src/vdbe.c b/src/vdbe.c
index 093669a82..df470d403 100644
--- a/src/vdbe.c
+++ b/src/vdbe.c
@@ -1128,7 +1128,7 @@ case OP_Move: {
assert( pIn1<=&aMem[(p->nMem-p->nCursor)] );
assert( memIsValid(pIn1) );
memAboutToChange(p, pOut);
- VdbeMemRelease(pOut);
+ sqlite3VdbeMemRelease(pOut);
zMalloc = pOut->zMalloc;
memcpy(pOut, pIn1, sizeof(Mem));
#ifdef SQLITE_DEBUG
@@ -3419,11 +3419,15 @@ case OP_OpenEphemeral: {
break;
}
-/* Opcode: SorterOpen P1 P2 * P4 *
+/* Opcode: SorterOpen P1 P2 P3 P4 *
**
** This opcode works like OP_OpenEphemeral except that it opens
** a transient index that is specifically designed to sort large
** tables using an external merge-sort algorithm.
+**
+** If argument P3 is non-zero, then it indicates that the sorter may
+** assume that a stable sort considering the first P3 fields of each
+** key is sufficient to produce the required results.
*/
case OP_SorterOpen: {
VdbeCursor *pCx;
@@ -3435,7 +3439,25 @@ case OP_SorterOpen: {
pCx->pKeyInfo = pOp->p4.pKeyInfo;
assert( pCx->pKeyInfo->db==db );
assert( pCx->pKeyInfo->enc==ENC(db) );
- rc = sqlite3VdbeSorterInit(db, pCx);
+ rc = sqlite3VdbeSorterInit(db, pOp->p3, pCx);
+ break;
+}
+
+/* Opcode: SequenceTest P1 P2 * * *
+** Synopsis: if( cursor[P1].ctr++ ) pc = P2
+**
+** P1 is a sorter cursor. If the sequence counter is currently zero, jump
+** to P2. Regardless of whether or not the jump is taken, increment the
+** the sequence value.
+*/
+case OP_SequenceTest: {
+ VdbeCursor *pC;
+ assert( pOp->p1>=0 && pOp->p1<p->nCursor );
+ pC = p->apCsr[pOp->p1];
+ assert( pC->pSorter );
+ if( (pC->seqCount++)==0 ){
+ pc = pOp->p2 - 1;
+ }
break;
}
@@ -4241,6 +4263,7 @@ case OP_SorterCompare: {
assert( pOp->p4type==P4_INT32 );
pIn3 = &aMem[pOp->p3];
nIgnore = pOp->p4.i;
+ res = 0;
rc = sqlite3VdbeSorterCompare(pC, pIn3, nIgnore, &res);
VdbeBranchTaken(res!=0,2);
if( res ){
@@ -4491,7 +4514,7 @@ case OP_Rewind: { /* jump */
assert( isSorter(pC)==(pOp->opcode==OP_SorterSort) );
res = 1;
if( isSorter(pC) ){
- rc = sqlite3VdbeSorterRewind(db, pC, &res);
+ rc = sqlite3VdbeSorterRewind(pC, &res);
}else{
pCrsr = pC->pCursor;
assert( pCrsr );
@@ -4650,7 +4673,7 @@ case OP_IdxInsert: { /* in2 */
rc = ExpandBlob(pIn2);
if( rc==SQLITE_OK ){
if( isSorter(pC) ){
- rc = sqlite3VdbeSorterWrite(db, pC, pIn2);
+ rc = sqlite3VdbeSorterWrite(pC, pIn2);
}else{
nKey = pIn2->n;
zKey = pIn2->z;
diff --git a/src/vdbeInt.h b/src/vdbeInt.h
index f541aa717..da0ae9f88 100644
--- a/src/vdbeInt.h
+++ b/src/vdbeInt.h
@@ -437,13 +437,13 @@ void sqlite3VdbeFrameDelete(VdbeFrame*);
int sqlite3VdbeFrameRestore(VdbeFrame *);
int sqlite3VdbeTransferError(Vdbe *p);
-int sqlite3VdbeSorterInit(sqlite3 *, VdbeCursor *);
+int sqlite3VdbeSorterInit(sqlite3 *, int, VdbeCursor *);
void sqlite3VdbeSorterReset(sqlite3 *, VdbeSorter *);
void sqlite3VdbeSorterClose(sqlite3 *, VdbeCursor *);
int sqlite3VdbeSorterRowkey(const VdbeCursor *, Mem *);
int sqlite3VdbeSorterNext(sqlite3 *, const VdbeCursor *, int *);
-int sqlite3VdbeSorterRewind(sqlite3 *, const VdbeCursor *, int *);
-int sqlite3VdbeSorterWrite(sqlite3 *, const VdbeCursor *, Mem *);
+int sqlite3VdbeSorterRewind(const VdbeCursor *, int *);
+int sqlite3VdbeSorterWrite(const VdbeCursor *, Mem *);
int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *, int, int *);
#if !defined(SQLITE_OMIT_SHARED_CACHE) && SQLITE_THREADSAFE>0
diff --git a/src/vdbeaux.c b/src/vdbeaux.c
index ac945df59..325fdb5c6 100644
--- a/src/vdbeaux.c
+++ b/src/vdbeaux.c
@@ -3135,10 +3135,14 @@ void sqlite3VdbeRecordUnpack(
** sqlite3VdbeSerialGet() and sqlite3MemCompare() functions. It is used
** in assert() statements to ensure that the optimized code in
** sqlite3VdbeRecordCompare() returns results with these two primitives.
+**
+** Return true if the result of comparison is equivalent to desiredResult.
+** Return false if there is a disagreement.
*/
static int vdbeRecordCompareDebug(
int nKey1, const void *pKey1, /* Left key */
- const UnpackedRecord *pPKey2 /* Right key */
+ const UnpackedRecord *pPKey2, /* Right key */
+ int desiredResult /* Correct answer */
){
u32 d1; /* Offset into aKey[] of next data element */
u32 idx1; /* Offset into aKey[] of next header element */
@@ -3150,6 +3154,7 @@ static int vdbeRecordCompareDebug(
Mem mem1;
pKeyInfo = pPKey2->pKeyInfo;
+ if( pKeyInfo->db==0 ) return 1;
mem1.enc = pKeyInfo->enc;
mem1.db = pKeyInfo->db;
/* mem1.flags = 0; // Will be initialized by sqlite3VdbeSerialGet() */
@@ -3200,7 +3205,7 @@ static int vdbeRecordCompareDebug(
if( pKeyInfo->aSortOrder[i] ){
rc = -rc; /* Invert the result for DESC sort order. */
}
- return rc;
+ goto debugCompareEnd;
}
i++;
}while( idx1<szHdr1 && i<pPKey2->nField );
@@ -3214,7 +3219,15 @@ static int vdbeRecordCompareDebug(
/* rc==0 here means that one of the keys ran out of fields and
** all the fields up to that point were equal. Return the the default_rc
** value. */
- return pPKey2->default_rc;
+ rc = pPKey2->default_rc;
+
+debugCompareEnd:
+ if( desiredResult==0 && rc==0 ) return 1;
+ if( desiredResult<0 && rc<0 ) return 1;
+ if( desiredResult>0 && rc>0 ) return 1;
+ if( CORRUPT_DB ) return 1;
+ if( pKeyInfo->db->mallocFailed ) return 1;
+ return 0;
}
#endif
@@ -3227,7 +3240,8 @@ static int vdbeRecordCompareDebug(
static int vdbeCompareMemString(
const Mem *pMem1,
const Mem *pMem2,
- const CollSeq *pColl
+ const CollSeq *pColl,
+ u8 *prcErr /* If an OOM occurs, set to SQLITE_NOMEM */
){
if( pMem1->enc==pColl->enc ){
/* The strings are already in the correct encoding. Call the
@@ -3250,6 +3264,7 @@ static int vdbeCompareMemString(
rc = pColl->xCmp(pColl->pUser, n1, v1, n2, v2);
sqlite3VdbeMemRelease(&c1);
sqlite3VdbeMemRelease(&c2);
+ if( (v1==0 || v2==0) && prcErr ) *prcErr = SQLITE_NOMEM;
return rc;
}
}
@@ -3332,7 +3347,7 @@ int sqlite3MemCompare(const Mem *pMem1, const Mem *pMem2, const CollSeq *pColl){
assert( !pColl || pColl->xCmp );
if( pColl ){
- return vdbeCompareMemString(pMem1, pMem2, pColl);
+ return vdbeCompareMemString(pMem1, pMem2, pColl, 0);
}
/* If a NULL pointer was passed as the collate function, fall through
** to the blob case and use memcmp(). */
@@ -3404,8 +3419,10 @@ static i64 vdbeRecordDecodeInt(u32 serial_type, const u8 *aKey){
** fields that appear in both keys are equal, then pPKey2->default_rc is
** returned.
**
-** If database corruption is discovered, set pPKey2->isCorrupt to non-zero
-** and return 0.
+** If database corruption is discovered, set pPKey2->errCode to
+** SQLITE_CORRUPT and return 0. If an OOM error is encountered,
+** pPKey2->errCode is set to SQLITE_NOMEM and, if it is not NULL, the
+** malloc-failed flag set on database handle (pPKey2->pKeyInfo->db).
*/
int sqlite3VdbeRecordCompare(
int nKey1, const void *pKey1, /* Left key */
@@ -3436,7 +3453,7 @@ int sqlite3VdbeRecordCompare(
idx1 = getVarint32(aKey1, szHdr1);
d1 = szHdr1;
if( d1>(unsigned)nKey1 ){
- pPKey2->isCorrupt = (u8)SQLITE_CORRUPT_BKPT;
+ pPKey2->errCode = (u8)SQLITE_CORRUPT_BKPT;
return 0; /* Corruption */
}
i = 0;
@@ -3515,14 +3532,16 @@ int sqlite3VdbeRecordCompare(
testcase( (d1+mem1.n)==(unsigned)nKey1 );
testcase( (d1+mem1.n+1)==(unsigned)nKey1 );
if( (d1+mem1.n) > (unsigned)nKey1 ){
- pPKey2->isCorrupt = (u8)SQLITE_CORRUPT_BKPT;
+ pPKey2->errCode = (u8)SQLITE_CORRUPT_BKPT;
return 0; /* Corruption */
}else if( pKeyInfo->aColl[i] ){
mem1.enc = pKeyInfo->enc;
mem1.db = pKeyInfo->db;
mem1.flags = MEM_Str;
mem1.z = (char*)&aKey1[d1];
- rc = vdbeCompareMemString(&mem1, pRhs, pKeyInfo->aColl[i]);
+ rc = vdbeCompareMemString(
+ &mem1, pRhs, pKeyInfo->aColl[i], &pPKey2->errCode
+ );
}else{
int nCmp = MIN(mem1.n, pRhs->n);
rc = memcmp(&aKey1[d1], pRhs->z, nCmp);
@@ -3542,7 +3561,7 @@ int sqlite3VdbeRecordCompare(
testcase( (d1+nStr)==(unsigned)nKey1 );
testcase( (d1+nStr+1)==(unsigned)nKey1 );
if( (d1+nStr) > (unsigned)nKey1 ){
- pPKey2->isCorrupt = (u8)SQLITE_CORRUPT_BKPT;
+ pPKey2->errCode = (u8)SQLITE_CORRUPT_BKPT;
return 0; /* Corruption */
}else{
int nCmp = MIN(nStr, pRhs->n);
@@ -3562,11 +3581,7 @@ int sqlite3VdbeRecordCompare(
if( pKeyInfo->aSortOrder[i] ){
rc = -rc;
}
- assert( CORRUPT_DB
- || (rc<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
- || (rc>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
- || pKeyInfo->db->mallocFailed
- );
+ assert( vdbeRecordCompareDebug(nKey1, pKey1, pPKey2, rc) );
assert( mem1.zMalloc==0 ); /* See comment below */
return rc;
}
@@ -3586,7 +3601,7 @@ int sqlite3VdbeRecordCompare(
** all the fields up to that point were equal. Return the the default_rc
** value. */
assert( CORRUPT_DB
- || pPKey2->default_rc==vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)
+ || vdbeRecordCompareDebug(nKey1, pKey1, pPKey2, pPKey2->default_rc)
|| pKeyInfo->db->mallocFailed
);
return pPKey2->default_rc;
@@ -3685,11 +3700,7 @@ static int vdbeRecordCompareInt(
res = pPKey2->default_rc;
}
- assert( (res==0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)==0)
- || (res<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
- || (res>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
- || CORRUPT_DB
- );
+ assert( vdbeRecordCompareDebug(nKey1, pKey1, pPKey2, res) );
return res;
}
@@ -3723,7 +3734,7 @@ static int vdbeRecordCompareString(
nStr = (serial_type-12) / 2;
if( (szHdr + nStr) > nKey1 ){
- pPKey2->isCorrupt = (u8)SQLITE_CORRUPT_BKPT;
+ pPKey2->errCode = (u8)SQLITE_CORRUPT_BKPT;
return 0; /* Corruption */
}
nCmp = MIN( pPKey2->aMem[0].n, nStr );
@@ -3749,9 +3760,7 @@ static int vdbeRecordCompareString(
}
}
- assert( (res==0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)==0)
- || (res<0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)<0)
- || (res>0 && vdbeRecordCompareDebug(nKey1, pKey1, pPKey2)>0)
+ assert( vdbeRecordCompareDebug(nKey1, pKey1, pPKey2, res)
|| CORRUPT_DB
|| pPKey2->pKeyInfo->db->mallocFailed
);
diff --git a/src/vdbesort.c b/src/vdbesort.c
index d3690244c..8065f6d98 100644
--- a/src/vdbesort.c
+++ b/src/vdbesort.c
@@ -1,5 +1,5 @@
/*
-** 2011 July 9
+** 2011-07-09
**
** The author disclaims copyright to this source code. In place of
** a legal notice, here is a blessing:
@@ -10,44 +10,197 @@
**
*************************************************************************
** This file contains code for the VdbeSorter object, used in concert with
-** a VdbeCursor to sort large numbers of keys (as may be required, for
-** example, by CREATE INDEX statements on tables too large to fit in main
-** memory).
+** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
+** or by SELECT statements with ORDER BY clauses that cannot be satisfied
+** using indexes and without LIMIT clauses.
+**
+** The VdbeSorter object implements a multi-threaded external merge sort
+** algorithm that is efficient even if the number of elements being sorted
+** exceeds the available memory.
+**
+** Here is the (internal, non-API) interface between this module and the
+** rest of the SQLite system:
+**
+** sqlite3VdbeSorterInit() Create a new VdbeSorter object.
+**
+** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter
+** object. The row is a binary blob in the
+** OP_MakeRecord format that contains both
+** the ORDER BY key columns and result columns
+** in the case of a SELECT w/ ORDER BY, or
+** the complete record for an index entry
+** in the case of a CREATE INDEX.
+**
+** sqlite3VdbeSorterRewind() Sort all content previously added.
+** Position the read cursor on the
+** first sorted element.
+**
+** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted
+** element.
+**
+** sqlite3VdbeSorterRowkey() Return the complete binary blob for the
+** row currently under the read cursor.
+**
+** sqlite3VdbeSorterCompare() Compare the binary blob for the row
+** currently under the read cursor against
+** another binary blob X and report if
+** X is strictly less than the read cursor.
+** Used to enforce uniqueness in a
+** CREATE UNIQUE INDEX statement.
+**
+** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim
+** all resources.
+**
+** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This
+** is like Close() followed by Init() only
+** much faster.
+**
+** The interfaces above must be called in a particular order. Write() can
+** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
+** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
+**
+** Init()
+** for each record: Write()
+** Rewind()
+** Rowkey()/Compare()
+** Next()
+** Close()
+**
+** Algorithm:
+**
+** Records passed to the sorter via calls to Write() are initially held
+** unsorted in main memory. Assuming the amount of memory used never exceeds
+** a threshold, when Rewind() is called the set of records is sorted using
+** an in-memory merge sort. In this case, no temporary files are required
+** and subsequent calls to Rowkey(), Next() and Compare() read records
+** directly from main memory.
+**
+** If the amount of space used to store records in main memory exceeds the
+** threshold, then the set of records currently in memory are sorted and
+** written to a temporary file in "Packed Memory Array" (PMA) format.
+** A PMA created at this point is known as a "level-0 PMA". Higher levels
+** of PMAs may be created by merging existing PMAs together - for example
+** merging two or more level-0 PMAs together creates a level-1 PMA.
+**
+** The threshold for the amount of main memory to use before flushing
+** records to a PMA is roughly the same as the limit configured for the
+** page-cache of the main database. Specifically, the threshold is set to
+** the value returned multiplied by "PRAGMA main.page_size" multipled by
+** that returned by "PRAGMA main.cache_size", in bytes.
+**
+** If the sorter is running in single-threaded mode, then all PMAs generated
+** are appended to a single temporary file. Or, if the sorter is running in
+** multi-threaded mode then up to (N+1) temporary files may be opened, where
+** N is the configured number of worker threads. In this case, instead of
+** sorting the records and writing the PMA to a temporary file itself, the
+** calling thread usually launches a worker thread to do so. Except, if
+** there are already N worker threads running, the main thread does the work
+** itself.
+**
+** The sorter is running in multi-threaded mode if (a) the library was built
+** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
+** than zero, and (b) worker threads have been enabled at runtime by calling
+** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
+**
+** When Rewind() is called, any data remaining in memory is flushed to a
+** final PMA. So at this point the data is stored in some number of sorted
+** PMAs within temporary files on disk.
+**
+** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
+** sorter is running in single-threaded mode, then these PMAs are merged
+** incrementally as keys are retreived from the sorter by the VDBE. The
+** MergeEngine object, described in further detail below, performs this
+** merge.
+**
+** Or, if running in multi-threaded mode, then a background thread is
+** launched to merge the existing PMAs. Once the background thread has
+** merged T bytes of data into a single sorted PMA, the main thread
+** begins reading keys from that PMA while the background thread proceeds
+** with merging the next T bytes of data. And so on.
+**
+** Parameter T is set to half the value of the memory threshold used
+** by Write() above to determine when to create a new PMA.
+**
+** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
+** Rewind() is called, then a hierarchy of incremental-merges is used.
+** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
+** disk are merged together. Then T bytes of data from the second set, and
+** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
+** PMAs at a time. This done is to improve locality.
+**
+** If running in multi-threaded mode and there are more than
+** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
+** than one background thread may be created. Specifically, there may be
+** one background thread for each temporary file on disk, and one background
+** thread to merge the output of each of the others to a single PMA for
+** the main thread to read from.
*/
-
#include "sqliteInt.h"
#include "vdbeInt.h"
+/*
+** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
+** messages to stderr that may be helpful in understanding the performance
+** characteristics of the sorter in multi-threaded mode.
+*/
+#if 0
+# define SQLITE_DEBUG_SORTER_THREADS 1
+#endif
-typedef struct VdbeSorterIter VdbeSorterIter;
-typedef struct SorterRecord SorterRecord;
-typedef struct FileWriter FileWriter;
+/*
+** Private objects used by the sorter
+*/
+typedef struct MergeEngine MergeEngine; /* Merge PMAs together */
+typedef struct PmaReader PmaReader; /* Incrementally read one PMA */
+typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */
+typedef struct SorterRecord SorterRecord; /* A record being sorted */
+typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
+typedef struct SorterFile SorterFile; /* Temporary file object wrapper */
+typedef struct SorterList SorterList; /* In-memory list of records */
+typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */
+
+/*
+** A container for a temp file handle and the current amount of data
+** stored in the file.
+*/
+struct SorterFile {
+ sqlite3_file *pFd; /* File handle */
+ i64 iEof; /* Bytes of data stored in pFd */
+};
/*
-** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES:
+** An in-memory list of objects to be sorted.
**
-** As keys are added to the sorter, they are written to disk in a series
-** of sorted packed-memory-arrays (PMAs). The size of each PMA is roughly
-** the same as the cache-size allowed for temporary databases. In order
-** to allow the caller to extract keys from the sorter in sorted order,
-** all PMAs currently stored on disk must be merged together. This comment
-** describes the data structure used to do so. The structure supports
-** merging any number of arrays in a single pass with no redundant comparison
-** operations.
+** If aMemory==0 then each object is allocated separately and the objects
+** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects
+** are stored in the aMemory[] bulk memory, one right after the other, and
+** are connected using SorterRecord.u.iNext.
+*/
+struct SorterList {
+ SorterRecord *pList; /* Linked list of records */
+ u8 *aMemory; /* If non-NULL, bulk memory to hold pList */
+ int szPMA; /* Size of pList as PMA in bytes */
+};
+
+/*
+** The MergeEngine object is used to combine two or more smaller PMAs into
+** one big PMA using a merge operation. Separate PMAs all need to be
+** combined into one big PMA in order to be able to step through the sorted
+** records in order.
**
-** The aIter[] array contains an iterator for each of the PMAs being merged.
-** An aIter[] iterator either points to a valid key or else is at EOF. For
-** the purposes of the paragraphs below, we assume that the array is actually
-** N elements in size, where N is the smallest power of 2 greater to or equal
-** to the number of iterators being merged. The extra aIter[] elements are
-** treated as if they are empty (always at EOF).
+** The aReadr[] array contains a PmaReader object for each of the PMAs being
+** merged. An aReadr[] object either points to a valid key or else is at EOF.
+** For the purposes of the paragraphs below, we assume that the array is
+** actually N elements in size, where N is the smallest power of 2 greater
+** to or equal to the number of PMAs being merged. The extra aReadr[] elements
+** are treated as if they are empty (always at EOF).
**
** The aTree[] array is also N elements in size. The value of N is stored in
-** the VdbeSorter.nTree variable.
+** the MergeEngine.nTree variable.
**
** The final (N/2) elements of aTree[] contain the results of comparing
-** pairs of iterator keys together. Element i contains the result of
-** comparing aIter[2*i-N] and aIter[2*i-N+1]. Whichever key is smaller, the
+** pairs of PMA keys together. Element i contains the result of
+** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
** aTree element is set to the index of it.
**
** For the purposes of this comparison, EOF is considered greater than any
@@ -55,34 +208,34 @@ typedef struct FileWriter FileWriter;
** values), it doesn't matter which index is stored.
**
** The (N/4) elements of aTree[] that precede the final (N/2) described
-** above contains the index of the smallest of each block of 4 iterators.
-** And so on. So that aTree[1] contains the index of the iterator that
+** above contains the index of the smallest of each block of 4 PmaReaders
+** And so on. So that aTree[1] contains the index of the PmaReader that
** currently points to the smallest key value. aTree[0] is unused.
**
** Example:
**
-** aIter[0] -> Banana
-** aIter[1] -> Feijoa
-** aIter[2] -> Elderberry
-** aIter[3] -> Currant
-** aIter[4] -> Grapefruit
-** aIter[5] -> Apple
-** aIter[6] -> Durian
-** aIter[7] -> EOF
+** aReadr[0] -> Banana
+** aReadr[1] -> Feijoa
+** aReadr[2] -> Elderberry
+** aReadr[3] -> Currant
+** aReadr[4] -> Grapefruit
+** aReadr[5] -> Apple
+** aReadr[6] -> Durian
+** aReadr[7] -> EOF
**
** aTree[] = { X, 5 0, 5 0, 3, 5, 6 }
**
** The current element is "Apple" (the value of the key indicated by
-** iterator 5). When the Next() operation is invoked, iterator 5 will
+** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
** be advanced to the next key in its segment. Say the next key is
** "Eggplant":
**
-** aIter[5] -> Eggplant
+** aReadr[5] -> Eggplant
**
-** The contents of aTree[] are updated first by comparing the new iterator
-** 5 key to the current key of iterator 4 (still "Grapefruit"). The iterator
+** The contents of aTree[] are updated first by comparing the new PmaReader
+** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
-** The value of iterator 6 - "Durian" - is now smaller than that of iterator
+** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
** so the value written into element 1 of the array is 0. As follows:
**
@@ -92,44 +245,150 @@ typedef struct FileWriter FileWriter;
** key comparison operations are required, where N is the number of segments
** being merged (rounded up to the next power of 2).
*/
+struct MergeEngine {
+ int nTree; /* Used size of aTree/aReadr (power of 2) */
+ int *aTree; /* Current state of incremental merge */
+ PmaReader *aReadr; /* Array of PmaReaders to merge data from */
+};
+
+/*
+** Exactly VdbeSorter.nTask instances of this object are allocated
+** as part of each VdbeSorter object. Instances are never allocated any
+** other way. VdbeSorter.nTask is set to the number of worker threads allowed
+** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
+**
+** Essentially, this structure contains all those fields of the VdbeSorter
+** structure for which each thread requires a separate instance. For example,
+** each thread requries its own UnpackedRecord object to unpack records in
+** as part of comparison operations.
+**
+** Before a background thread is launched, variable bDone is set to 0. Then,
+** right before it exits, the thread itself sets bDone to 1. This is used for
+** two purposes:
+**
+** 1. When flushing the contents of memory to a level-0 PMA on disk, to
+** attempt to select a SortSubtask for which there is not already an
+** active background thread (since doing so causes the main thread
+** to block until it finishes).
+**
+** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
+** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
+** block provoke debugging output.
+**
+** In both cases, the effects of the main thread seeing (bDone==0) even
+** after the thread has finished are not dire. So we don't worry about
+** memory barriers and such here.
+*/
+struct SortSubtask {
+ SQLiteThread *pThread; /* Background thread, if any */
+ int bDone; /* Set if thread is finished but not joined */
+ VdbeSorter *pSorter; /* Sorter that owns this sub-task */
+ UnpackedRecord *pUnpacked; /* Space to unpack a record */
+ SorterList list; /* List for thread to write to a PMA */
+ int nPMA; /* Number of PMAs currently in file */
+ SorterFile file; /* Temp file for level-0 PMAs */
+ SorterFile file2; /* Space for other PMAs */
+};
+
+/*
+** Main sorter structure. A single instance of this is allocated for each
+** sorter cursor created by the VDBE.
+**
+** mxKeysize:
+** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
+** this variable is updated so as to be set to the size on disk of the
+** largest record in the sorter.
+*/
struct VdbeSorter {
- i64 iWriteOff; /* Current write offset within file pTemp1 */
- i64 iReadOff; /* Current read offset within file pTemp1 */
- int nInMemory; /* Current size of pRecord list as PMA */
- int nTree; /* Used size of aTree/aIter (power of 2) */
- int nPMA; /* Number of PMAs stored in pTemp1 */
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
- VdbeSorterIter *aIter; /* Array of iterators to merge */
- int *aTree; /* Current state of incremental merge */
- sqlite3_file *pTemp1; /* PMA file 1 */
- SorterRecord *pRecord; /* Head of in-memory record list */
- UnpackedRecord *pUnpacked; /* Used to unpack keys */
+ int mxKeysize; /* Largest serialized key seen so far */
+ int pgsz; /* Main database page size */
+ PmaReader *pReader; /* Readr data from here after Rewind() */
+ MergeEngine *pMerger; /* Or here, if bUseThreads==0 */
+ sqlite3 *db; /* Database connection */
+ KeyInfo *pKeyInfo; /* How to compare records */
+ UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
+ SorterList list; /* List of in-memory records */
+ int iMemory; /* Offset of free space in list.aMemory */
+ int nMemory; /* Size of list.aMemory allocation in bytes */
+ u8 bUsePMA; /* True if one or more PMAs created */
+ u8 bUseThreads; /* True to use background threads */
+ u8 iPrev; /* Previous thread used to flush PMA */
+ u8 nTask; /* Size of aTask[] array */
+ SortSubtask aTask[1]; /* One or more subtasks */
};
/*
-** The following type is an iterator for a PMA. It caches the current key in
-** variables nKey/aKey. If the iterator is at EOF, pFile==0.
+** An instance of the following object is used to read records out of a
+** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
+** pFile==0 at EOF.
*/
-struct VdbeSorterIter {
+struct PmaReader {
i64 iReadOff; /* Current read offset */
- i64 iEof; /* 1 byte past EOF for this iterator */
+ i64 iEof; /* 1 byte past EOF for this PmaReader */
int nAlloc; /* Bytes of space at aAlloc */
int nKey; /* Number of bytes in key */
- sqlite3_file *pFile; /* File iterator is reading from */
+ sqlite3_file *pFile; /* File we are reading from */
u8 *aAlloc; /* Allocated space */
u8 *aKey; /* Pointer to current key */
u8 *aBuffer; /* Current read buffer */
int nBuffer; /* Size of read buffer in bytes */
+ u8 *aMap; /* Pointer to mapping of entire file */
+ IncrMerger *pIncr; /* Incremental merger */
+};
+
+/*
+** Normally, a PmaReader object iterates through an existing PMA stored
+** within a temp file. However, if the PmaReader.pIncr variable points to
+** an object of the following type, it may be used to iterate/merge through
+** multiple PMAs simultaneously.
+**
+** There are two types of IncrMerger object - single (bUseThread==0) and
+** multi-threaded (bUseThread==1).
+**
+** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
+** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
+** size. When the IncrMerger is initialized, it reads enough data from
+** pMerger to populate aFile[0]. It then sets variables within the
+** corresponding PmaReader object to read from that file and kicks off
+** a background thread to populate aFile[1] with the next mxSz bytes of
+** sorted record data from pMerger.
+**
+** When the PmaReader reaches the end of aFile[0], it blocks until the
+** background thread has finished populating aFile[1]. It then exchanges
+** the contents of the aFile[0] and aFile[1] variables within this structure,
+** sets the PmaReader fields to read from the new aFile[0] and kicks off
+** another background thread to populate the new aFile[1]. And so on, until
+** the contents of pMerger are exhausted.
+**
+** A single-threaded IncrMerger does not open any temporary files of its
+** own. Instead, it has exclusive access to mxSz bytes of space beginning
+** at offset iStartOff of file pTask->file2. And instead of using a
+** background thread to prepare data for the PmaReader, with a single
+** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
+** keys from pMerger by the calling thread whenever the PmaReader runs out
+** of data.
+*/
+struct IncrMerger {
+ SortSubtask *pTask; /* Task that owns this merger */
+ MergeEngine *pMerger; /* Merge engine thread reads data from */
+ i64 iStartOff; /* Offset to start writing file at */
+ int mxSz; /* Maximum bytes of data to store */
+ int bEof; /* Set to true when merge is finished */
+ int bUseThread; /* True to use a bg thread for this object */
+ SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
};
/*
-** An instance of this structure is used to organize the stream of records
-** being written to files by the merge-sort code into aligned, page-sized
-** blocks. Doing all I/O in aligned page-sized blocks helps I/O to go
-** faster on many operating systems.
+** An instance of this object is used for writing a PMA.
+**
+** The PMA is written one record at a time. Each record is of an arbitrary
+** size. But I/O is more efficient if it occurs in page-sized blocks where
+** each block is aligned on a page boundary. This object caches writes to
+** the PMA so that aligned, page-size blocks are written.
*/
-struct FileWriter {
+struct PmaWriter {
int eFWErr; /* Non-zero if in an error state */
u8 *aBuffer; /* Pointer to write buffer */
int nBuffer; /* Size of write buffer in bytes */
@@ -140,30 +399,59 @@ struct FileWriter {
};
/*
-** A structure to store a single record. All in-memory records are connected
-** together into a linked list headed at VdbeSorter.pRecord using the
-** SorterRecord.pNext pointer.
+** This object is the header on a single record while that record is being
+** held in memory and prior to being written out as part of a PMA.
+**
+** How the linked list is connected depends on how memory is being managed
+** by this module. If using a separate allocation for each in-memory record
+** (VdbeSorter.list.aMemory==0), then the list is always connected using the
+** SorterRecord.u.pNext pointers.
+**
+** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
+** then while records are being accumulated the list is linked using the
+** SorterRecord.u.iNext offset. This is because the aMemory[] array may
+** be sqlite3Realloc()ed while records are being accumulated. Once the VM
+** has finished passing records to the sorter, or when the in-memory buffer
+** is full, the list is sorted. As part of the sorting process, it is
+** converted to use the SorterRecord.u.pNext pointers. See function
+** vdbeSorterSort() for details.
*/
struct SorterRecord {
- void *pVal;
- int nVal;
- SorterRecord *pNext;
+ int nVal; /* Size of the record in bytes */
+ union {
+ SorterRecord *pNext; /* Pointer to next record in list */
+ int iNext; /* Offset within aMemory of next record */
+ } u;
+ /* The data for the record immediately follows this header */
};
-/* Minimum allowable value for the VdbeSorter.nWorking variable */
+/* Return a pointer to the buffer containing the record data for SorterRecord
+** object p. Should be used as if:
+**
+** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
+*/
+#define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
+
+/* The minimum PMA size is set to this value multiplied by the database
+** page size in bytes. */
#define SORTER_MIN_WORKING 10
-/* Maximum number of segments to merge in a single pass. */
+/* Maximum number of PMAs that a single MergeEngine can merge */
#define SORTER_MAX_MERGE_COUNT 16
+static int vdbeIncrSwap(IncrMerger*);
+static void vdbeIncrFree(IncrMerger *);
+
/*
-** Free all memory belonging to the VdbeSorterIter object passed as the second
+** Free all memory belonging to the PmaReader object passed as the second
** argument. All structure fields are set to zero before returning.
*/
-static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
- sqlite3DbFree(db, pIter->aAlloc);
- sqlite3DbFree(db, pIter->aBuffer);
- memset(pIter, 0, sizeof(VdbeSorterIter));
+static void vdbePmaReaderClear(PmaReader *pReadr){
+ sqlite3_free(pReadr->aAlloc);
+ sqlite3_free(pReadr->aBuffer);
+ if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFile, 0, pReadr->aMap);
+ vdbeIncrFree(pReadr->pIncr);
+ memset(pReadr, 0, sizeof(PmaReader));
}
/*
@@ -175,14 +463,20 @@ static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
** The buffer indicated by *ppOut may only be considered valid until the
** next call to this function.
*/
-static int vdbeSorterIterRead(
- sqlite3 *db, /* Database handle (for malloc) */
- VdbeSorterIter *p, /* Iterator */
+static int vdbePmaReadBlob(
+ PmaReader *p, /* PmaReader from which to take the blob */
int nByte, /* Bytes of data to read */
u8 **ppOut /* OUT: Pointer to buffer containing data */
){
int iBuf; /* Offset within buffer to read from */
int nAvail; /* Bytes of data available in buffer */
+
+ if( p->aMap ){
+ *ppOut = &p->aMap[p->iReadOff];
+ p->iReadOff += nByte;
+ return SQLITE_OK;
+ }
+
assert( p->aBuffer );
/* If there is no more data to be read from the buffer, read the next
@@ -201,7 +495,7 @@ static int vdbeSorterIterRead(
}
assert( nRead>0 );
- /* Read data from the file. Return early if an error occurs. */
+ /* Readr data from the file. Return early if an error occurs. */
rc = sqlite3OsRead(p->pFile, p->aBuffer, nRead, p->iReadOff);
assert( rc!=SQLITE_IOERR_SHORT_READ );
if( rc!=SQLITE_OK ) return rc;
@@ -222,11 +516,13 @@ static int vdbeSorterIterRead(
/* Extend the p->aAlloc[] allocation if required. */
if( p->nAlloc<nByte ){
- int nNew = p->nAlloc*2;
+ u8 *aNew;
+ int nNew = MAX(128, p->nAlloc*2);
while( nByte>nNew ) nNew = nNew*2;
- p->aAlloc = sqlite3DbReallocOrFree(db, p->aAlloc, nNew);
- if( !p->aAlloc ) return SQLITE_NOMEM;
+ aNew = sqlite3Realloc(p->aAlloc, nNew);
+ if( !aNew ) return SQLITE_NOMEM;
p->nAlloc = nNew;
+ p->aAlloc = aNew;
}
/* Copy as much data as is available in the buffer into the start of
@@ -238,13 +534,13 @@ static int vdbeSorterIterRead(
/* The following loop copies up to p->nBuffer bytes per iteration into
** the p->aAlloc[] buffer. */
while( nRem>0 ){
- int rc; /* vdbeSorterIterRead() return code */
+ int rc; /* vdbePmaReadBlob() return code */
int nCopy; /* Number of bytes to copy */
u8 *aNext; /* Pointer to buffer to copy data from */
nCopy = nRem;
if( nRem>p->nBuffer ) nCopy = p->nBuffer;
- rc = vdbeSorterIterRead(db, p, nCopy, &aNext);
+ rc = vdbePmaReadBlob(p, nCopy, &aNext);
if( rc!=SQLITE_OK ) return rc;
assert( aNext!=p->aAlloc );
memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
@@ -261,108 +557,165 @@ static int vdbeSorterIterRead(
** Read a varint from the stream of data accessed by p. Set *pnOut to
** the value read.
*/
-static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
+static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
int iBuf;
- iBuf = p->iReadOff % p->nBuffer;
- if( iBuf && (p->nBuffer-iBuf)>=9 ){
- p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
+ if( p->aMap ){
+ p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
}else{
- u8 aVarint[16], *a;
- int i = 0, rc;
- do{
- rc = vdbeSorterIterRead(db, p, 1, &a);
- if( rc ) return rc;
- aVarint[(i++)&0xf] = a[0];
- }while( (a[0]&0x80)!=0 );
- sqlite3GetVarint(aVarint, pnOut);
+ iBuf = p->iReadOff % p->nBuffer;
+ if( iBuf && (p->nBuffer-iBuf)>=9 ){
+ p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
+ }else{
+ u8 aVarint[16], *a;
+ int i = 0, rc;
+ do{
+ rc = vdbePmaReadBlob(p, 1, &a);
+ if( rc ) return rc;
+ aVarint[(i++)&0xf] = a[0];
+ }while( (a[0]&0x80)!=0 );
+ sqlite3GetVarint(aVarint, pnOut);
+ }
}
return SQLITE_OK;
}
+/*
+** Attempt to memory map file pFile. If successful, set *pp to point to the
+** new mapping and return SQLITE_OK. If the mapping is not attempted
+** (because the file is too large or the VFS layer is configured not to use
+** mmap), return SQLITE_OK and set *pp to NULL.
+**
+** Or, if an error occurs, return an SQLite error code. The final value of
+** *pp is undefined in this case.
+*/
+static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
+ int rc = SQLITE_OK;
+ if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
+ rc = sqlite3OsFetch(pFile->pFd, 0, (int)pFile->iEof, (void**)pp);
+ }
+ return rc;
+}
/*
-** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
-** no error occurs, or an SQLite error code if one does.
+** Seek PmaReader pReadr to offset iOff within file pFile. Return SQLITE_OK
+** if successful, or an SQLite error code if an error occurs.
*/
-static int vdbeSorterIterNext(
- sqlite3 *db, /* Database handle (for sqlite3DbMalloc() ) */
- VdbeSorterIter *pIter /* Iterator to advance */
+static int vdbePmaReaderSeek(
+ SortSubtask *pTask, /* Task context */
+ PmaReader *pReadr, /* Iterate to populate */
+ SorterFile *pFile, /* Sorter file to read from */
+ i64 iOff /* Offset in pFile */
){
- int rc; /* Return Code */
+ int rc = SQLITE_OK;
+
+ assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
+
+ if( pReadr->aMap ){
+ sqlite3OsUnfetch(pReadr->pFile, 0, pReadr->aMap);
+ pReadr->aMap = 0;
+ }
+ pReadr->iReadOff = iOff;
+ pReadr->iEof = pFile->iEof;
+ pReadr->pFile = pFile->pFd;
+
+ rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
+ if( rc==SQLITE_OK && pReadr->aMap==0 ){
+ int pgsz = pTask->pSorter->pgsz;
+ int iBuf = pReadr->iReadOff % pgsz;
+ if( pReadr->aBuffer==0 ){
+ pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
+ if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM;
+ pReadr->nBuffer = pgsz;
+ }
+ if( rc==SQLITE_OK && iBuf ){
+ int nRead = pgsz - iBuf;
+ if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
+ nRead = (int)(pReadr->iEof - pReadr->iReadOff);
+ }
+ rc = sqlite3OsRead(
+ pReadr->pFile, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
+ );
+ }
+ }
+
+ return rc;
+}
+
+/*
+** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
+** no error occurs, or an SQLite error code if one does.
+*/
+static int vdbePmaReaderNext(PmaReader *pReadr){
+ int rc = SQLITE_OK; /* Return Code */
u64 nRec = 0; /* Size of record in bytes */
- if( pIter->iReadOff>=pIter->iEof ){
- /* This is an EOF condition */
- vdbeSorterIterZero(db, pIter);
- return SQLITE_OK;
+
+ if( pReadr->iReadOff>=pReadr->iEof ){
+ IncrMerger *pIncr = pReadr->pIncr;
+ int bEof = 1;
+ if( pIncr ){
+ rc = vdbeIncrSwap(pIncr);
+ if( rc==SQLITE_OK && pIncr->bEof==0 ){
+ rc = vdbePmaReaderSeek(
+ pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
+ );
+ bEof = 0;
+ }
+ }
+
+ if( bEof ){
+ /* This is an EOF condition */
+ vdbePmaReaderClear(pReadr);
+ return rc;
+ }
}
- rc = vdbeSorterIterVarint(db, pIter, &nRec);
if( rc==SQLITE_OK ){
- pIter->nKey = (int)nRec;
- rc = vdbeSorterIterRead(db, pIter, (int)nRec, &pIter->aKey);
+ rc = vdbePmaReadVarint(pReadr, &nRec);
+ }
+ if( rc==SQLITE_OK ){
+ pReadr->nKey = (int)nRec;
+ rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
}
return rc;
}
/*
-** Initialize iterator pIter to scan through the PMA stored in file pFile
+** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
** starting at offset iStart and ending at offset iEof-1. This function
-** leaves the iterator pointing to the first key in the PMA (or EOF if the
+** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
** PMA is empty).
+**
+** If the pnByte parameter is NULL, then it is assumed that the file
+** contains a single PMA, and that that PMA omits the initial length varint.
*/
-static int vdbeSorterIterInit(
- sqlite3 *db, /* Database handle */
- const VdbeSorter *pSorter, /* Sorter object */
+static int vdbePmaReaderInit(
+ SortSubtask *pTask, /* Task context */
+ SorterFile *pFile, /* Sorter file to read from */
i64 iStart, /* Start offset in pFile */
- VdbeSorterIter *pIter, /* Iterator to populate */
+ PmaReader *pReadr, /* PmaReader to populate */
i64 *pnByte /* IN/OUT: Increment this value by PMA size */
){
- int rc = SQLITE_OK;
- int nBuf;
-
- nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
- assert( pSorter->iWriteOff>iStart );
- assert( pIter->aAlloc==0 );
- assert( pIter->aBuffer==0 );
- pIter->pFile = pSorter->pTemp1;
- pIter->iReadOff = iStart;
- pIter->nAlloc = 128;
- pIter->aAlloc = (u8 *)sqlite3DbMallocRaw(db, pIter->nAlloc);
- pIter->nBuffer = nBuf;
- pIter->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
-
- if( !pIter->aBuffer ){
- rc = SQLITE_NOMEM;
- }else{
- int iBuf;
+ int rc;
- iBuf = iStart % nBuf;
- if( iBuf ){
- int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pSorter->iWriteOff ){
- nRead = (int)(pSorter->iWriteOff - iStart);
- }
- rc = sqlite3OsRead(
- pSorter->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
- );
- }
+ assert( pFile->iEof>iStart );
+ assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
+ assert( pReadr->aBuffer==0 );
+ assert( pReadr->aMap==0 );
- if( rc==SQLITE_OK ){
- u64 nByte; /* Size of PMA in bytes */
- pIter->iEof = pSorter->iWriteOff;
- rc = vdbeSorterIterVarint(db, pIter, &nByte);
- pIter->iEof = pIter->iReadOff + nByte;
- *pnByte += nByte;
- }
+ rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
+ if( rc==SQLITE_OK ){
+ u64 nByte; /* Size of PMA in bytes */
+ rc = vdbePmaReadVarint(pReadr, &nByte);
+ pReadr->iEof = pReadr->iReadOff + nByte;
+ *pnByte += nByte;
}
if( rc==SQLITE_OK ){
- rc = vdbeSorterIterNext(db, pIter);
+ rc = vdbePmaReaderNext(pReadr);
}
return rc;
}
@@ -370,75 +723,57 @@ static int vdbeSorterIterInit(
/*
** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
-** size nKey2 bytes). Argument pKeyInfo supplies the collation functions
-** used by the comparison. If an error occurs, return an SQLite error code.
-** Otherwise, return SQLITE_OK and set *pRes to a negative, zero or positive
-** value, depending on whether key1 is smaller, equal to or larger than key2.
-**
-** If the bOmitRowid argument is non-zero, assume both keys end in a rowid
-** field. For the purposes of the comparison, ignore it. Also, if bOmitRowid
-** is true and key1 contains even a single NULL value, it is considered to
-** be less than key2. Even if key2 also contains NULL values.
-**
-** If pKey2 is passed a NULL pointer, then it is assumed that the pCsr->aSpace
-** has been allocated and contains an unpacked record that is used as key2.
-*/
-static void vdbeSorterCompare(
- const VdbeCursor *pCsr, /* Cursor object (for pKeyInfo) */
- int nIgnore, /* Ignore the last nIgnore fields */
+** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
+** used by the comparison. Return the result of the comparison.
+**
+** Before returning, object (pTask->pUnpacked) is populated with the
+** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it
+** is assumed that the (pTask->pUnpacked) structure already contains the
+** unpacked key to use as key2.
+**
+** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
+** to SQLITE_NOMEM.
+*/
+static int vdbeSorterCompare(
+ SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
const void *pKey1, int nKey1, /* Left side of comparison */
- const void *pKey2, int nKey2, /* Right side of comparison */
- int *pRes /* OUT: Result of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
){
- KeyInfo *pKeyInfo = pCsr->pKeyInfo;
- VdbeSorter *pSorter = pCsr->pSorter;
- UnpackedRecord *r2 = pSorter->pUnpacked;
- int i;
-
+ UnpackedRecord *r2 = pTask->pUnpacked;
if( pKey2 ){
- sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
+ sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
}
-
- if( nIgnore ){
- r2->nField = pKeyInfo->nField - nIgnore;
- assert( r2->nField>0 );
- for(i=0; i<r2->nField; i++){
- if( r2->aMem[i].flags & MEM_Null ){
- *pRes = -1;
- return;
- }
- }
- assert( r2->default_rc==0 );
- }
-
- *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
+ return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}
/*
-** This function is called to compare two iterator keys when merging
+** This function is called to compare two PmaReader keys when merging
** multiple b-tree segments. Parameter iOut is the index of the aTree[]
** value to recalculate.
*/
-static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
- VdbeSorter *pSorter = pCsr->pSorter;
+static int vdbeSorterDoCompare(
+ SortSubtask *pTask,
+ MergeEngine *pMerger,
+ int iOut
+){
int i1;
int i2;
int iRes;
- VdbeSorterIter *p1;
- VdbeSorterIter *p2;
+ PmaReader *p1;
+ PmaReader *p2;
- assert( iOut<pSorter->nTree && iOut>0 );
+ assert( iOut<pMerger->nTree && iOut>0 );
- if( iOut>=(pSorter->nTree/2) ){
- i1 = (iOut - pSorter->nTree/2) * 2;
+ if( iOut>=(pMerger->nTree/2) ){
+ i1 = (iOut - pMerger->nTree/2) * 2;
i2 = i1 + 1;
}else{
- i1 = pSorter->aTree[iOut*2];
- i2 = pSorter->aTree[iOut*2+1];
+ i1 = pMerger->aTree[iOut*2];
+ i2 = pMerger->aTree[iOut*2+1];
}
- p1 = &pSorter->aIter[i1];
- p2 = &pSorter->aIter[i2];
+ p1 = &pMerger->aReadr[i1];
+ p2 = &pMerger->aReadr[i2];
if( p1->pFile==0 ){
iRes = i2;
@@ -446,9 +781,9 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
iRes = i1;
}else{
int res;
- assert( pCsr->pSorter->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */
- vdbeSorterCompare(
- pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
+ assert( pTask->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */
+ res = vdbeSorterCompare(
+ pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey
);
if( res<=0 ){
iRes = i1;
@@ -457,39 +792,94 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
}
}
- pSorter->aTree[iOut] = iRes;
+ pMerger->aTree[iOut] = iRes;
return SQLITE_OK;
}
/*
** Initialize the temporary index cursor just opened as a sorter cursor.
+**
+** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField)
+** to determine the number of fields that should be compared from the
+** records being sorted. However, if the value passed as argument nField
+** is non-zero and the sorter is able to guarantee a stable sort, nField
+** is used instead. This is used when sorting records for a CREATE INDEX
+** statement. In this case, keys are always delivered to the sorter in
+** order of the primary key, which happens to be make up the final part
+** of the records being sorted. So if the sort is stable, there is never
+** any reason to compare PK fields and they can be ignored for a small
+** performance boost.
+**
+** The sorter can guarantee a stable sort when running in single-threaded
+** mode, but not in multi-threaded mode.
+**
+** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
*/
-int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
+int sqlite3VdbeSorterInit(
+ sqlite3 *db, /* Database connection (for malloc()) */
+ int nField, /* Number of key fields in each record */
+ VdbeCursor *pCsr /* Cursor that holds the new sorter */
+){
int pgsz; /* Page size of main database */
+ int i; /* Used to iterate through aTask[] */
int mxCache; /* Cache size */
VdbeSorter *pSorter; /* The new sorter */
- char *d; /* Dummy */
+ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
+ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
+ int sz; /* Size of pSorter in bytes */
+ int rc = SQLITE_OK;
+#if SQLITE_MAX_WORKER_THREADS==0
+# define nWorker 0
+#else
+ int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0);
+#endif
assert( pCsr->pKeyInfo && pCsr->pBt==0 );
- pCsr->pSorter = pSorter = sqlite3DbMallocZero(db, sizeof(VdbeSorter));
+ szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
+ sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
+
+ pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
+ pCsr->pSorter = pSorter;
if( pSorter==0 ){
- return SQLITE_NOMEM;
- }
-
- pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pCsr->pKeyInfo, 0, 0, &d);
- if( pSorter->pUnpacked==0 ) return SQLITE_NOMEM;
- assert( pSorter->pUnpacked==(UnpackedRecord *)d );
+ rc = SQLITE_NOMEM;
+ }else{
+ pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
+ memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
+ pKeyInfo->db = 0;
+ if( nField && nWorker==0 ) pKeyInfo->nField = nField;
+ pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
+ pSorter->nTask = nWorker + 1;
+ pSorter->bUseThreads = (pSorter->nTask>1);
+ pSorter->db = db;
+ for(i=0; i<pSorter->nTask; i++){
+ SortSubtask *pTask = &pSorter->aTask[i];
+ pTask->pSorter = pSorter;
+ }
- if( !sqlite3TempInMemory(db) ){
- pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
- pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
- mxCache = db->aDb[0].pSchema->cache_size;
- if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
- pSorter->mxPmaSize = mxCache * pgsz;
+ if( !sqlite3TempInMemory(db) ){
+ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
+ mxCache = db->aDb[0].pSchema->cache_size;
+ if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
+ pSorter->mxPmaSize = mxCache * pgsz;
+
+ /* If the application has not configure scratch memory using
+ ** SQLITE_CONFIG_SCRATCH then we assume it is OK to do large memory
+ ** allocations. If scratch memory has been configured, then assume
+ ** large memory allocations should be avoided to prevent heap
+ ** fragmentation.
+ */
+ if( sqlite3GlobalConfig.pScratch==0 ){
+ assert( pSorter->iMemory==0 );
+ pSorter->nMemory = pgsz;
+ pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
+ if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
+ }
+ }
}
- return SQLITE_OK;
+ return rc;
}
+#undef nWorker /* Defined at the top of this function */
/*
** Free the list of sorted records starting at pRecord.
@@ -498,37 +888,227 @@ static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
SorterRecord *p;
SorterRecord *pNext;
for(p=pRecord; p; p=pNext){
- pNext = p->pNext;
+ pNext = p->u.pNext;
sqlite3DbFree(db, p);
}
}
/*
-** Reset a sorting cursor back to its original empty state.
+** Free all resources owned by the object indicated by argument pTask. All
+** fields of *pTask are zeroed before returning.
*/
-void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
- if( pSorter->aIter ){
- int i;
- for(i=0; i<pSorter->nTree; i++){
- vdbeSorterIterZero(db, &pSorter->aIter[i]);
+static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
+ sqlite3DbFree(db, pTask->pUnpacked);
+ pTask->pUnpacked = 0;
+#if SQLITE_MAX_WORKER_THREADS>0
+ /* pTask->list.aMemory can only be non-zero if it was handed memory
+ ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */
+ if( pTask->list.aMemory ){
+ sqlite3_free(pTask->list.aMemory);
+ pTask->list.aMemory = 0;
+ }else
+#endif
+ {
+ assert( pTask->list.aMemory==0 );
+ vdbeSorterRecordFree(0, pTask->list.pList);
+ }
+ pTask->list.pList = 0;
+ if( pTask->file.pFd ){
+ sqlite3OsCloseFree(pTask->file.pFd);
+ pTask->file.pFd = 0;
+ pTask->file.iEof = 0;
+ }
+ if( pTask->file2.pFd ){
+ sqlite3OsCloseFree(pTask->file2.pFd);
+ pTask->file2.pFd = 0;
+ pTask->file2.iEof = 0;
+ }
+}
+
+#ifdef SQLITE_DEBUG_SORTER_THREADS
+static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
+ i64 t;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
+ fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterRewindDebug(const char *zEvent){
+ i64 t;
+ sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t);
+ fprintf(stderr, "%lld:X %s\n", t, zEvent);
+}
+static void vdbeSorterPopulateDebug(
+ SortSubtask *pTask,
+ const char *zEvent
+){
+ i64 t;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
+ fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterBlockDebug(
+ SortSubtask *pTask,
+ int bBlocked,
+ const char *zEvent
+){
+ if( bBlocked ){
+ i64 t;
+ sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
+ fprintf(stderr, "%lld:main %s\n", t, zEvent);
+ }
+}
+#else
+# define vdbeSorterWorkDebug(x,y)
+# define vdbeSorterRewindDebug(y)
+# define vdbeSorterPopulateDebug(x,y)
+# define vdbeSorterBlockDebug(x,y,z)
+#endif
+
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** Join thread pTask->thread.
+*/
+static int vdbeSorterJoinThread(SortSubtask *pTask){
+ int rc = SQLITE_OK;
+ if( pTask->pThread ){
+#ifdef SQLITE_DEBUG_SORTER_THREADS
+ int bDone = pTask->bDone;
+#endif
+ void *pRet;
+ vdbeSorterBlockDebug(pTask, !bDone, "enter");
+ rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
+ vdbeSorterBlockDebug(pTask, !bDone, "exit");
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ assert( pTask->bDone==1 );
+ pTask->bDone = 0;
+ pTask->pThread = 0;
+ }
+ return rc;
+}
+
+/*
+** Launch a background thread to run xTask(pIn).
+*/
+static int vdbeSorterCreateThread(
+ SortSubtask *pTask, /* Thread will use this task object */
+ void *(*xTask)(void*), /* Routine to run in a separate thread */
+ void *pIn /* Argument passed into xTask() */
+){
+ assert( pTask->pThread==0 && pTask->bDone==0 );
+ return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
+}
+
+/*
+** Join all outstanding threads launched by SorterWrite() to create
+** level-0 PMAs.
+*/
+static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
+ int rc = rcin;
+ int i;
+
+ /* This function is always called by the main user thread.
+ **
+ ** If this function is being called after SorterRewind() has been called,
+ ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
+ ** is currently attempt to join one of the other threads. To avoid a race
+ ** condition where this thread also attempts to join the same object, join
+ ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
+ for(i=pSorter->nTask-1; i>=0; i--){
+ SortSubtask *pTask = &pSorter->aTask[i];
+ int rc2 = vdbeSorterJoinThread(pTask);
+ if( rc==SQLITE_OK ) rc = rc2;
+ }
+ return rc;
+}
+#else
+# define vdbeSorterJoinAll(x,rcin) (rcin)
+# define vdbeSorterJoinThread(pTask) SQLITE_OK
+#endif
+
+/*
+** Allocate a new MergeEngine object with space for nReader PmaReaders.
+*/
+static MergeEngine *vdbeMergeEngineNew(int nReader){
+ int N = 2; /* Smallest power of two >= nReader */
+ int nByte; /* Total bytes of space to allocate */
+ MergeEngine *pNew; /* Pointer to allocated object to return */
+
+ assert( nReader<=SORTER_MAX_MERGE_COUNT );
+
+ while( N<nReader ) N += N;
+ nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
+
+ pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
+ if( pNew ){
+ pNew->nTree = N;
+ pNew->aReadr = (PmaReader*)&pNew[1];
+ pNew->aTree = (int*)&pNew->aReadr[N];
+ }
+ return pNew;
+}
+
+/*
+** Free the MergeEngine object passed as the only argument.
+*/
+static void vdbeMergeEngineFree(MergeEngine *pMerger){
+ int i;
+ if( pMerger ){
+ for(i=0; i<pMerger->nTree; i++){
+ vdbePmaReaderClear(&pMerger->aReadr[i]);
}
- sqlite3DbFree(db, pSorter->aIter);
- pSorter->aIter = 0;
- }
- if( pSorter->pTemp1 ){
- sqlite3OsCloseFree(pSorter->pTemp1);
- pSorter->pTemp1 = 0;
- }
- vdbeSorterRecordFree(db, pSorter->pRecord);
- pSorter->pRecord = 0;
- pSorter->iWriteOff = 0;
- pSorter->iReadOff = 0;
- pSorter->nInMemory = 0;
- pSorter->nTree = 0;
- pSorter->nPMA = 0;
- pSorter->aTree = 0;
+ }
+ sqlite3_free(pMerger);
}
+/*
+** Free all resources associated with the IncrMerger object indicated by
+** the first argument.
+*/
+static void vdbeIncrFree(IncrMerger *pIncr){
+ if( pIncr ){
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->bUseThread ){
+ vdbeSorterJoinThread(pIncr->pTask);
+ if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+ if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+ }
+#endif
+ vdbeMergeEngineFree(pIncr->pMerger);
+ sqlite3_free(pIncr);
+ }
+}
+
+/*
+** Reset a sorting cursor back to its original empty state.
+*/
+void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
+ int i;
+ (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
+ assert( pSorter->bUseThreads || pSorter->pReader==0 );
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pSorter->pReader ){
+ vdbePmaReaderClear(pSorter->pReader);
+ sqlite3DbFree(db, pSorter->pReader);
+ pSorter->pReader = 0;
+ }
+#endif
+ vdbeMergeEngineFree(pSorter->pMerger);
+ pSorter->pMerger = 0;
+ for(i=0; i<pSorter->nTask; i++){
+ SortSubtask *pTask = &pSorter->aTask[i];
+ vdbeSortSubtaskCleanup(db, pTask);
+ }
+ if( pSorter->list.aMemory==0 ){
+ vdbeSorterRecordFree(0, pSorter->list.pList);
+ }
+ pSorter->list.pList = 0;
+ pSorter->list.szPMA = 0;
+ pSorter->bUsePMA = 0;
+ pSorter->iMemory = 0;
+ pSorter->mxKeysize = 0;
+ sqlite3DbFree(db, pSorter->pUnpacked);
+ pSorter->pUnpacked = 0;
+}
/*
** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
@@ -537,54 +1117,110 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter);
- sqlite3DbFree(db, pSorter->pUnpacked);
+ sqlite3_free(pSorter->list.aMemory);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
}
}
+#if SQLITE_MAX_MMAP_SIZE>0
+/*
+** The first argument is a file-handle open on a temporary file. The file
+** is guaranteed to be nByte bytes or smaller in size. This function
+** attempts to extend the file to nByte bytes in size and to ensure that
+** the VFS has memory mapped it.
+**
+** Whether or not the file does end up memory mapped of course depends on
+** the specific VFS implementation.
+*/
+static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
+ if( nByte<=(i64)(db->nMaxSorterMmap) ){
+ int rc = sqlite3OsTruncate(pFile, nByte);
+ if( rc==SQLITE_OK ){
+ void *p = 0;
+ sqlite3OsFetch(pFile, 0, (int)nByte, &p);
+ sqlite3OsUnfetch(pFile, 0, p);
+ }
+ }
+}
+#else
+# define vdbeSorterExtendFile(x,y,z) SQLITE_OK
+#endif
+
/*
** Allocate space for a file-handle and open a temporary file. If successful,
** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK.
** Otherwise, set *ppFile to 0 and return an SQLite error code.
*/
-static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
- int dummy;
- return sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
+static int vdbeSorterOpenTempFile(
+ sqlite3 *db, /* Database handle doing sort */
+ i64 nExtend, /* Attempt to extend file to this size */
+ sqlite3_file **ppFile
+){
+ int rc;
+ rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
SQLITE_OPEN_TEMP_JOURNAL |
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
- SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &dummy
+ SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
);
+ if( rc==SQLITE_OK ){
+ i64 max = SQLITE_MAX_MMAP_SIZE;
+ sqlite3OsFileControlHint(*ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
+ if( nExtend>0 ){
+ vdbeSorterExtendFile(db, *ppFile, nExtend);
+ }
+ }
+ return rc;
}
/*
+** If it has not already been allocated, allocate the UnpackedRecord
+** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
+** if no allocation was required), or SQLITE_NOMEM otherwise.
+*/
+static int vdbeSortAllocUnpacked(SortSubtask *pTask){
+ if( pTask->pUnpacked==0 ){
+ char *pFree;
+ pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+ pTask->pSorter->pKeyInfo, 0, 0, &pFree
+ );
+ assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
+ if( pFree==0 ) return SQLITE_NOMEM;
+ pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
+ pTask->pUnpacked->errCode = 0;
+ }
+ return SQLITE_OK;
+}
+
+
+/*
** Merge the two sorted lists p1 and p2 into a single list.
** Set *ppOut to the head of the new list.
*/
static void vdbeSorterMerge(
- const VdbeCursor *pCsr, /* For pKeyInfo */
+ SortSubtask *pTask, /* Calling thread context */
SorterRecord *p1, /* First list to merge */
SorterRecord *p2, /* Second list to merge */
SorterRecord **ppOut /* OUT: Head of merged list */
){
SorterRecord *pFinal = 0;
SorterRecord **pp = &pFinal;
- void *pVal2 = p2 ? p2->pVal : 0;
+ void *pVal2 = p2 ? SRVAL(p2) : 0;
while( p1 && p2 ){
int res;
- vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
+ res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal);
if( res<=0 ){
*pp = p1;
- pp = &p1->pNext;
- p1 = p1->pNext;
+ pp = &p1->u.pNext;
+ p1 = p1->u.pNext;
pVal2 = 0;
}else{
*pp = p2;
- pp = &p2->pNext;
- p2 = p2->pNext;
+ pp = &p2->u.pNext;
+ p2 = p2->u.pNext;
if( p2==0 ) break;
- pVal2 = p2->pVal;
+ pVal2 = SRVAL(p2);
}
}
*pp = p1 ? p1 : p2;
@@ -592,27 +1228,41 @@ static void vdbeSorterMerge(
}
/*
-** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
-** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
-** occurs.
+** Sort the linked list of records headed at pTask->pList. Return
+** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
+** an error occurs.
*/
-static int vdbeSorterSort(const VdbeCursor *pCsr){
+static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
int i;
SorterRecord **aSlot;
SorterRecord *p;
- VdbeSorter *pSorter = pCsr->pSorter;
+ int rc;
+
+ rc = vdbeSortAllocUnpacked(pTask);
+ if( rc!=SQLITE_OK ) return rc;
aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
if( !aSlot ){
return SQLITE_NOMEM;
}
- p = pSorter->pRecord;
+ p = pList->pList;
while( p ){
- SorterRecord *pNext = p->pNext;
- p->pNext = 0;
+ SorterRecord *pNext;
+ if( pList->aMemory ){
+ if( (u8*)p==pList->aMemory ){
+ pNext = 0;
+ }else{
+ assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
+ pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
+ }
+ }else{
+ pNext = p->u.pNext;
+ }
+
+ p->u.pNext = 0;
for(i=0; aSlot[i]; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ vdbeSorterMerge(pTask, p, aSlot[i], &p);
aSlot[i] = 0;
}
aSlot[i] = p;
@@ -621,27 +1271,28 @@ static int vdbeSorterSort(const VdbeCursor *pCsr){
p = 0;
for(i=0; i<64; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ vdbeSorterMerge(pTask, p, aSlot[i], &p);
}
- pSorter->pRecord = p;
+ pList->pList = p;
sqlite3_free(aSlot);
- return SQLITE_OK;
+ assert( pTask->pUnpacked->errCode==SQLITE_OK
+ || pTask->pUnpacked->errCode==SQLITE_NOMEM
+ );
+ return pTask->pUnpacked->errCode;
}
/*
-** Initialize a file-writer object.
+** Initialize a PMA-writer object.
*/
-static void fileWriterInit(
- sqlite3 *db, /* Database (for malloc) */
+static void vdbePmaWriterInit(
sqlite3_file *pFile, /* File to write to */
- FileWriter *p, /* Object to populate */
+ PmaWriter *p, /* Object to populate */
+ int nBuf, /* Buffer size */
i64 iStart /* Offset of pFile to begin writing at */
){
- int nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
- memset(p, 0, sizeof(FileWriter));
- p->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
+ memset(p, 0, sizeof(PmaWriter));
+ p->aBuffer = (u8*)sqlite3Malloc(nBuf);
if( !p->aBuffer ){
p->eFWErr = SQLITE_NOMEM;
}else{
@@ -653,10 +1304,10 @@ static void fileWriterInit(
}
/*
-** Write nData bytes of data to the file-write object. Return SQLITE_OK
+** Write nData bytes of data to the PMA. Return SQLITE_OK
** if successful, or an SQLite error code if an error occurs.
*/
-static void fileWriterWrite(FileWriter *p, u8 *pData, int nData){
+static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
int nRem = nData;
while( nRem>0 && p->eFWErr==0 ){
int nCopy = nRem;
@@ -681,15 +1332,15 @@ static void fileWriterWrite(FileWriter *p, u8 *pData, int nData){
}
/*
-** Flush any buffered data to disk and clean up the file-writer object.
-** The results of using the file-writer after this call are undefined.
+** Flush any buffered data to disk and clean up the PMA-writer object.
+** The results of using the PMA-writer after this call are undefined.
** Return SQLITE_OK if flushing the buffered data succeeds or is not
** required. Otherwise, return an SQLite error code.
**
** Before returning, set *piEof to the offset immediately following the
** last byte written to the file.
*/
-static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){
+static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
int rc;
if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
p->eFWErr = sqlite3OsWrite(p->pFile,
@@ -698,26 +1349,27 @@ static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){
);
}
*piEof = (p->iWriteOff + p->iBufEnd);
- sqlite3DbFree(db, p->aBuffer);
+ sqlite3_free(p->aBuffer);
rc = p->eFWErr;
- memset(p, 0, sizeof(FileWriter));
+ memset(p, 0, sizeof(PmaWriter));
return rc;
}
/*
-** Write value iVal encoded as a varint to the file-write object. Return
+** Write value iVal encoded as a varint to the PMA. Return
** SQLITE_OK if successful, or an SQLite error code if an error occurs.
*/
-static void fileWriterWriteVarint(FileWriter *p, u64 iVal){
+static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
int nByte;
u8 aByte[10];
nByte = sqlite3PutVarint(aByte, iVal);
- fileWriterWrite(p, aByte, nByte);
+ vdbePmaWriteBlob(p, aByte, nByte);
}
/*
-** Write the current contents of the in-memory linked-list to a PMA. Return
-** SQLITE_OK if successful, or an SQLite error code otherwise.
+** Write the current contents of in-memory linked-list pList to a level-0
+** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
+** successful, or an SQLite error code otherwise.
**
** The format of a PMA is:
**
@@ -728,76 +1380,246 @@ static void fileWriterWriteVarint(FileWriter *p, u64 iVal){
** Each record consists of a varint followed by a blob of data (the
** key). The varint is the number of bytes in the blob of data.
*/
-static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
+static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
+ sqlite3 *db = pTask->pSorter->db;
int rc = SQLITE_OK; /* Return code */
- VdbeSorter *pSorter = pCsr->pSorter;
- FileWriter writer;
+ PmaWriter writer; /* Object used to write to the file */
- memset(&writer, 0, sizeof(FileWriter));
+#ifdef SQLITE_DEBUG
+ /* Set iSz to the expected size of file pTask->file after writing the PMA.
+ ** This is used by an assert() statement at the end of this function. */
+ i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
+#endif
- if( pSorter->nInMemory==0 ){
- assert( pSorter->pRecord==0 );
- return rc;
+ vdbeSorterWorkDebug(pTask, "enter");
+ memset(&writer, 0, sizeof(PmaWriter));
+ assert( pList->szPMA>0 );
+
+ /* If the first temporary PMA file has not been opened, open it now. */
+ if( pTask->file.pFd==0 ){
+ rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
+ assert( rc!=SQLITE_OK || pTask->file.pFd );
+ assert( pTask->file.iEof==0 );
+ assert( pTask->nPMA==0 );
}
- rc = vdbeSorterSort(pCsr);
+ /* Try to get the file to memory map */
+ if( rc==SQLITE_OK ){
+ vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
+ }
- /* If the first temporary PMA file has not been opened, open it now. */
- if( rc==SQLITE_OK && pSorter->pTemp1==0 ){
- rc = vdbeSorterOpenTempFile(db, &pSorter->pTemp1);
- assert( rc!=SQLITE_OK || pSorter->pTemp1 );
- assert( pSorter->iWriteOff==0 );
- assert( pSorter->nPMA==0 );
+ /* Sort the list */
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterSort(pTask, pList);
}
if( rc==SQLITE_OK ){
SorterRecord *p;
SorterRecord *pNext = 0;
- fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
- pSorter->nPMA++;
- fileWriterWriteVarint(&writer, pSorter->nInMemory);
- for(p=pSorter->pRecord; p; p=pNext){
- pNext = p->pNext;
- fileWriterWriteVarint(&writer, p->nVal);
- fileWriterWrite(&writer, p->pVal, p->nVal);
- sqlite3DbFree(db, p);
+ vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
+ pTask->file.iEof);
+ pTask->nPMA++;
+ vdbePmaWriteVarint(&writer, pList->szPMA);
+ for(p=pList->pList; p; p=pNext){
+ pNext = p->u.pNext;
+ vdbePmaWriteVarint(&writer, p->nVal);
+ vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
+ if( pList->aMemory==0 ) sqlite3_free(p);
+ }
+ pList->pList = p;
+ rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
+ }
+
+ vdbeSorterWorkDebug(pTask, "exit");
+ assert( rc!=SQLITE_OK || pList->pList==0 );
+ assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
+ return rc;
+}
+
+/*
+** Advance the MergeEngine PmaReader passed as the second argument to
+** the next entry. Set *pbEof to true if this means the PmaReader has
+** reached EOF.
+**
+** Return SQLITE_OK if successful or an error code if an error occurs.
+*/
+static int vdbeSorterNext(
+ SortSubtask *pTask,
+ MergeEngine *pMerger,
+ int *pbEof
+){
+ int rc;
+ int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
+
+ /* Advance the current PmaReader */
+ rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
+
+ /* Update contents of aTree[] */
+ if( rc==SQLITE_OK ){
+ int i; /* Index of aTree[] to recalculate */
+ PmaReader *pReadr1; /* First PmaReader to compare */
+ PmaReader *pReadr2; /* Second PmaReader to compare */
+ u8 *pKey2; /* To pReadr2->aKey, or 0 if record cached */
+
+ /* Find the first two PmaReaders to compare. The one that was just
+ ** advanced (iPrev) and the one next to it in the array. */
+ pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
+ pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
+ pKey2 = pReadr2->aKey;
+
+ for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
+ /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
+ int iRes;
+ if( pReadr1->pFile==0 ){
+ iRes = +1;
+ }else if( pReadr2->pFile==0 ){
+ iRes = -1;
+ }else{
+ iRes = vdbeSorterCompare(pTask,
+ pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey
+ );
+ }
+
+ /* If pReadr1 contained the smaller value, set aTree[i] to its index.
+ ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
+ ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
+ ** pKey2 to point to the record belonging to pReadr2.
+ **
+ ** Alternatively, if pReadr2 contains the smaller of the two values,
+ ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
+ ** was actually called above, then pTask->pUnpacked now contains
+ ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
+ ** vdbeSorterCompare() from decoding pReadr2 again.
+ **
+ ** If the two values were equal, then the value from the oldest
+ ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
+ ** is sorted from oldest to newest, so pReadr1 contains older values
+ ** than pReadr2 iff (pReadr1<pReadr2). */
+ if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
+ pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
+ pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
+ pKey2 = pReadr2->aKey;
+ }else{
+ if( pReadr1->pFile ) pKey2 = 0;
+ pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
+ pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
+ }
+ }
+ *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFile==0);
+ }
+
+ return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
+}
+
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** The main routine for background threads that write level-0 PMAs.
+*/
+static void *vdbeSorterFlushThread(void *pCtx){
+ SortSubtask *pTask = (SortSubtask*)pCtx;
+ int rc; /* Return code */
+ assert( pTask->bDone==0 );
+ rc = vdbeSorterListToPMA(pTask, &pTask->list);
+ pTask->bDone = 1;
+ return SQLITE_INT_TO_PTR(rc);
+}
+#endif /* SQLITE_MAX_WORKER_THREADS>0 */
+
+/*
+** Flush the current contents of VdbeSorter.list to a new PMA, possibly
+** using a background thread.
+*/
+static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
+#if SQLITE_MAX_WORKER_THREADS==0
+ pSorter->bUsePMA = 1;
+ return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
+#else
+ int rc = SQLITE_OK;
+ int i;
+ SortSubtask *pTask = 0; /* Thread context used to create new PMA */
+ int nWorker = (pSorter->nTask-1);
+
+ /* Set the flag to indicate that at least one PMA has been written.
+ ** Or will be, anyhow. */
+ pSorter->bUsePMA = 1;
+
+ /* Select a sub-task to sort and flush the current list of in-memory
+ ** records to disk. If the sorter is running in multi-threaded mode,
+ ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
+ ** the background thread from a sub-tasks previous turn is still running,
+ ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
+ ** fall back to using the final sub-task. The first (pSorter->nTask-1)
+ ** sub-tasks are prefered as they use background threads - the final
+ ** sub-task uses the main thread. */
+ for(i=0; i<nWorker; i++){
+ int iTest = (pSorter->iPrev + i + 1) % nWorker;
+ pTask = &pSorter->aTask[iTest];
+ if( pTask->bDone ){
+ rc = vdbeSorterJoinThread(pTask);
+ }
+ if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
+ }
+
+ if( rc==SQLITE_OK ){
+ if( i==nWorker ){
+ /* Use the foreground thread for this operation */
+ rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
+ }else{
+ /* Launch a background thread for this operation */
+ u8 *aMem = pTask->list.aMemory;
+ void *pCtx = (void*)pTask;
+
+ assert( pTask->pThread==0 && pTask->bDone==0 );
+ assert( pTask->list.pList==0 );
+ assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
+
+ pSorter->iPrev = (pTask - pSorter->aTask);
+ pTask->list = pSorter->list;
+ pSorter->list.pList = 0;
+ pSorter->list.szPMA = 0;
+ if( aMem ){
+ pSorter->list.aMemory = aMem;
+ pSorter->nMemory = sqlite3MallocSize(aMem);
+ }else if( pSorter->list.aMemory ){
+ pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
+ if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
+ }
+
+ rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
}
- pSorter->pRecord = p;
- rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
}
return rc;
+#endif /* SQLITE_MAX_WORKER_THREADS!=0 */
}
/*
** Add a record to the sorter.
*/
int sqlite3VdbeSorterWrite(
- sqlite3 *db, /* Database handle */
- const VdbeCursor *pCsr, /* Sorter cursor */
+ const VdbeCursor *pCsr, /* Sorter cursor */
Mem *pVal /* Memory cell containing record */
){
VdbeSorter *pSorter = pCsr->pSorter;
int rc = SQLITE_OK; /* Return Code */
SorterRecord *pNew; /* New list element */
- assert( pSorter );
- pSorter->nInMemory += sqlite3VarintLen(pVal->n) + pVal->n;
+ int bFlush; /* True to flush contents of memory to PMA */
+ int nReq; /* Bytes of memory required */
+ int nPMA; /* Bytes of PMA space required */
- pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n + sizeof(SorterRecord));
- if( pNew==0 ){
- rc = SQLITE_NOMEM;
- }else{
- pNew->pVal = (void *)&pNew[1];
- memcpy(pNew->pVal, pVal->z, pVal->n);
- pNew->nVal = pVal->n;
- pNew->pNext = pSorter->pRecord;
- pSorter->pRecord = pNew;
- }
+ assert( pSorter );
- /* See if the contents of the sorter should now be written out. They
- ** are written out when either of the following are true:
+ /* Figure out whether or not the current contents of memory should be
+ ** flushed to a PMA before continuing. If so, do so.
+ **
+ ** If using the single large allocation mode (pSorter->aMemory!=0), then
+ ** flush the contents of memory to a new PMA if (a) at least one value is
+ ** already in memory and (b) the new value will not fit in memory.
+ **
+ ** Or, if using separate allocations for each record, flush the contents
+ ** of memory to a PMA if either of the following are true:
**
** * The total memory allocated for the in-memory list is greater
** than (page-size * cache-size), or
@@ -805,161 +1627,693 @@ int sqlite3VdbeSorterWrite(
** * The total memory allocated for the in-memory list is greater
** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
*/
- if( rc==SQLITE_OK && pSorter->mxPmaSize>0 && (
- (pSorter->nInMemory>pSorter->mxPmaSize)
- || (pSorter->nInMemory>pSorter->mnPmaSize && sqlite3HeapNearlyFull())
- )){
-#ifdef SQLITE_DEBUG
- i64 nExpect = pSorter->iWriteOff
- + sqlite3VarintLen(pSorter->nInMemory)
- + pSorter->nInMemory;
+ nReq = pVal->n + sizeof(SorterRecord);
+ nPMA = pVal->n + sqlite3VarintLen(pVal->n);
+ if( pSorter->mxPmaSize ){
+ if( pSorter->list.aMemory ){
+ bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
+ }else{
+ bFlush = (
+ (pSorter->list.szPMA > pSorter->mxPmaSize)
+ || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
+ );
+ }
+ if( bFlush ){
+ rc = vdbeSorterFlushPMA(pSorter);
+ pSorter->list.szPMA = 0;
+ pSorter->iMemory = 0;
+ assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
+ }
+ }
+
+ pSorter->list.szPMA += nPMA;
+ if( nPMA>pSorter->mxKeysize ){
+ pSorter->mxKeysize = nPMA;
+ }
+
+ if( pSorter->list.aMemory ){
+ int nMin = pSorter->iMemory + nReq;
+
+ if( nMin>pSorter->nMemory ){
+ u8 *aNew;
+ int nNew = pSorter->nMemory * 2;
+ while( nNew < nMin ) nNew = nNew*2;
+ if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
+ if( nNew < nMin ) nNew = nMin;
+
+ aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
+ if( !aNew ) return SQLITE_NOMEM;
+ pSorter->list.pList = (SorterRecord*)(
+ aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory)
+ );
+ pSorter->list.aMemory = aNew;
+ pSorter->nMemory = nNew;
+ }
+
+ pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
+ pSorter->iMemory += ROUND8(nReq);
+ pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
+ }else{
+ pNew = (SorterRecord *)sqlite3Malloc(nReq);
+ if( pNew==0 ){
+ return SQLITE_NOMEM;
+ }
+ pNew->u.pNext = pSorter->list.pList;
+ }
+
+ memcpy(SRVAL(pNew), pVal->z, pVal->n);
+ pNew->nVal = pVal->n;
+ pSorter->list.pList = pNew;
+
+ return rc;
+}
+
+/*
+** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
+** of the data stored in aFile[1] is the same as that used by regular PMAs,
+** except that the number-of-bytes varint is omitted from the start.
+*/
+static int vdbeIncrPopulate(IncrMerger *pIncr){
+ int rc = SQLITE_OK;
+ int rc2;
+ i64 iStart = pIncr->iStartOff;
+ SorterFile *pOut = &pIncr->aFile[1];
+ SortSubtask *pTask = pIncr->pTask;
+ MergeEngine *pMerger = pIncr->pMerger;
+ PmaWriter writer;
+ assert( pIncr->bEof==0 );
+
+ vdbeSorterPopulateDebug(pTask, "enter");
+
+ vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
+ while( rc==SQLITE_OK ){
+ int dummy;
+ PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
+ int nKey = pReader->nKey;
+ i64 iEof = writer.iWriteOff + writer.iBufEnd;
+
+ /* Check if the output file is full or if the input has been exhausted.
+ ** In either case exit the loop. */
+ if( pReader->pFile==0 ) break;
+ if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
+
+ /* Write the next key to the output. */
+ vdbePmaWriteVarint(&writer, nKey);
+ vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
+ rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy);
+ }
+
+ rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
+ if( rc==SQLITE_OK ) rc = rc2;
+ vdbeSorterPopulateDebug(pTask, "exit");
+ return rc;
+}
+
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** The main routine for background threads that populate aFile[1] of
+** multi-threaded IncrMerger objects.
+*/
+static void *vdbeIncrPopulateThread(void *pCtx){
+ IncrMerger *pIncr = (IncrMerger*)pCtx;
+ void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+ pIncr->pTask->bDone = 1;
+ return pRet;
+}
+
+/*
+** Launch a background thread to populate aFile[1] of pIncr.
+*/
+static int vdbeIncrBgPopulate(IncrMerger *pIncr){
+ void *p = (void*)pIncr;
+ assert( pIncr->bUseThread );
+ return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
+}
+#endif
+
+/*
+** This function is called when the PmaReader corresponding to pIncr has
+** finished reading the contents of aFile[0]. Its purpose is to "refill"
+** aFile[0] such that the PmaReader should start rereading it from the
+** beginning.
+**
+** For single-threaded objects, this is accomplished by literally reading
+** keys from pIncr->pMerger and repopulating aFile[0].
+**
+** For multi-threaded objects, all that is required is to wait until the
+** background thread is finished (if it is not already) and then swap
+** aFile[0] and aFile[1] in place. If the contents of pMerger have not
+** been exhausted, this function also launches a new background thread
+** to populate the new aFile[1].
+**
+** SQLITE_OK is returned on success, or an SQLite error code otherwise.
+*/
+static int vdbeIncrSwap(IncrMerger *pIncr){
+ int rc = SQLITE_OK;
+
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->bUseThread ){
+ rc = vdbeSorterJoinThread(pIncr->pTask);
+
+ if( rc==SQLITE_OK ){
+ SorterFile f0 = pIncr->aFile[0];
+ pIncr->aFile[0] = pIncr->aFile[1];
+ pIncr->aFile[1] = f0;
+ }
+
+ if( rc==SQLITE_OK ){
+ if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
+ pIncr->bEof = 1;
+ }else{
+ rc = vdbeIncrBgPopulate(pIncr);
+ }
+ }
+ }else
#endif
- rc = vdbeSorterListToPMA(db, pCsr);
- pSorter->nInMemory = 0;
- assert( rc!=SQLITE_OK || (nExpect==pSorter->iWriteOff) );
+ {
+ rc = vdbeIncrPopulate(pIncr);
+ pIncr->aFile[0] = pIncr->aFile[1];
+ if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
+ pIncr->bEof = 1;
+ }
}
return rc;
}
/*
-** Helper function for sqlite3VdbeSorterRewind().
+** Allocate and return a new IncrMerger object to read data from pMerger.
+**
+** If an OOM condition is encountered, return NULL. In this case free the
+** pMerger argument before returning.
*/
-static int vdbeSorterInitMerge(
- sqlite3 *db, /* Database handle */
- const VdbeCursor *pCsr, /* Cursor handle for this sorter */
- i64 *pnByte /* Sum of bytes in all opened PMAs */
+static int vdbeIncrNew(
+ SortSubtask *pTask,
+ MergeEngine *pMerger,
+ IncrMerger **ppOut
+){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr = *ppOut = (IncrMerger*)
+ (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
+ if( pIncr ){
+ pIncr->pMerger = pMerger;
+ pIncr->pTask = pTask;
+ pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
+ pTask->file2.iEof += pIncr->mxSz;
+ }else{
+ vdbeMergeEngineFree(pMerger);
+ rc = SQLITE_NOMEM;
+ }
+ return rc;
+}
+
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** Set the "use-threads" flag on object pIncr.
+*/
+static void vdbeIncrSetThreads(IncrMerger *pIncr){
+ pIncr->bUseThread = 1;
+ pIncr->pTask->file2.iEof -= pIncr->mxSz;
+}
+#endif /* SQLITE_MAX_WORKER_THREADS>0 */
+
+#define INCRINIT_NORMAL 0
+#define INCRINIT_TASK 1
+#define INCRINIT_ROOT 2
+static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
+
+/*
+** Initialize the merger argument passed as the second argument. Once this
+** function returns, the first key of merged data may be read from the merger
+** object in the usual fashion.
+**
+** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
+** objects attached to the PmaReader objects that the merger reads from have
+** already been populated, but that they have not yet populated aFile[0] and
+** set the PmaReader objects up to read from it. In this case all that is
+** required is to call vdbePmaReaderNext() on each PmaReader to point it at
+** its first key.
+**
+** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
+** vdbePmaReaderIncrInit() to initialize each PmaReader that feeds data
+** to pMerger.
+**
+** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+*/
+static int vdbeIncrInitMerger(
+ SortSubtask *pTask,
+ MergeEngine *pMerger,
+ int eMode /* One of the INCRINIT_XXX constants */
){
- VdbeSorter *pSorter = pCsr->pSorter;
int rc = SQLITE_OK; /* Return code */
- int i; /* Used to iterator through aIter[] */
- i64 nByte = 0; /* Total bytes in all opened PMAs */
+ int i; /* For looping over PmaReader objects */
+ int nTree = pMerger->nTree;
+
+ for(i=0; rc==SQLITE_OK && i<nTree; i++){
+ if( eMode==INCRINIT_ROOT ){
+ /* PmaReaders should be normally initialized in order, as if they are
+ ** reading from the same temp file this makes for more linear file IO.
+ ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
+ ** in use it will block the vdbePmaReaderNext() call while it uses
+ ** the main thread to fill its buffer. So calling PmaReaderNext()
+ ** on this PmaReader before any of the multi-threaded PmaReaders takes
+ ** better advantage of multi-processor hardware. */
+ rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
+ }else{
+ rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
+ }
+ }
- /* Initialize the iterators. */
- for(i=0; i<SORTER_MAX_MERGE_COUNT; i++){
- VdbeSorterIter *pIter = &pSorter->aIter[i];
- rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte);
- pSorter->iReadOff = pIter->iEof;
- assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff );
- if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break;
+ for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+ rc = vdbeSorterDoCompare(pTask, pMerger, i);
}
- /* Initialize the aTree[] array. */
- for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(pCsr, i);
+ return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
+}
+
+/*
+** If the PmaReader passed as the first argument is not an incremental-reader
+** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
+** to open and/or initialize the temp file related fields of the IncrMerge
+** object at (pReadr->pIncr).
+**
+** If argument eMode is set to INCRINIT_NORMAL, then PmaReader PmaReaders
+** in the sub-tree headed by pReadr are also initialized. Data is then loaded
+** into the buffers belonging to this PmaReader, pReadr, and it is set to
+** point to the first key in its range.
+**
+** If argument eMode is set to INCRINIT_TASK, then PmaReader is guaranteed
+** to be a multi-threaded PmaReader and this function is being called in a
+** background thread. In this case all PmaReaders in the sub-tree are
+** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
+** pReadr is populated. However, the PmaReader itself is not set up to point
+** to its first key. A call to vdbePmaReaderNext() is still required to do
+** that.
+**
+** The reason this function does not call vdbePmaReaderNext() immediately
+** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that has
+** to block on thread (pTask->thread) before accessing aFile[1]. But, since
+** this entire function is being run by thread (pTask->thread), that will
+** lead to the current background thread attempting to join itself.
+**
+** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
+** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
+** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
+** In this case vdbePmaReaderNext() is called on all child PmaReaders and
+** the current PmaReader set to point to the first key in its range.
+**
+** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+*/
+static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr = pReadr->pIncr;
+ if( pIncr ){
+ SortSubtask *pTask = pIncr->pTask;
+ sqlite3 *db = pTask->pSorter->db;
+
+ rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);
+
+ /* Set up the required files for pIncr. A multi-theaded IncrMerge object
+ ** requires two temp files to itself, whereas a single-threaded object
+ ** only requires a region of pTask->file2. */
+ if( rc==SQLITE_OK ){
+ int mxSz = pIncr->mxSz;
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->bUseThread ){
+ rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
+ }
+ }else
+#endif
+ /*if( !pIncr->bUseThread )*/{
+ if( pTask->file2.pFd==0 ){
+ assert( pTask->file2.iEof>0 );
+ rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
+ pTask->file2.iEof = 0;
+ }
+ if( rc==SQLITE_OK ){
+ pIncr->aFile[1].pFd = pTask->file2.pFd;
+ pIncr->iStartOff = pTask->file2.iEof;
+ pTask->file2.iEof += mxSz;
+ }
+ }
+ }
+
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( rc==SQLITE_OK && pIncr->bUseThread ){
+ /* Use the current thread to populate aFile[1], even though this
+ ** PmaReader is multi-threaded. The reason being that this function
+ ** is already running in background thread pIncr->pTask->thread. */
+ assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
+ rc = vdbeIncrPopulate(pIncr);
+ }
+#endif
+
+ if( rc==SQLITE_OK && eMode!=INCRINIT_TASK ){
+ rc = vdbePmaReaderNext(pReadr);
+ }
+ }
+ return rc;
+}
+
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** The main routine for vdbePmaReaderIncrInit() operations run in
+** background threads.
+*/
+static void *vdbePmaReaderBgInit(void *pCtx){
+ PmaReader *pReader = (PmaReader*)pCtx;
+ void *pRet = SQLITE_INT_TO_PTR(vdbePmaReaderIncrInit(pReader,INCRINIT_TASK));
+ pReader->pIncr->pTask->bDone = 1;
+ return pRet;
+}
+
+/*
+** Use a background thread to invoke vdbePmaReaderIncrInit(INCRINIT_TASK)
+** on the the PmaReader object passed as the first argument.
+**
+** This call will initialize the various fields of the pReadr->pIncr
+** structure and, if it is a multi-threaded IncrMerger, launch a
+** background thread to populate aFile[1].
+*/
+static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){
+ void *pCtx = (void*)pReadr;
+ return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx);
+}
+#endif
+
+/*
+** Allocate a new MergeEngine object to merge the contents of nPMA level-0
+** PMAs from pTask->file. If no error occurs, set *ppOut to point to
+** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
+** to NULL and return an SQLite error code.
+**
+** When this function is called, *piOffset is set to the offset of the
+** first PMA to read from pTask->file. Assuming no error occurs, it is
+** set to the offset immediately following the last byte of the last
+** PMA before returning. If an error does occur, then the final value of
+** *piOffset is undefined.
+*/
+static int vdbeMergeEngineLevel0(
+ SortSubtask *pTask, /* Sorter task to read from */
+ int nPMA, /* Number of PMAs to read */
+ i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */
+ MergeEngine **ppOut /* OUT: New merge-engine */
+){
+ MergeEngine *pNew; /* Merge engine to return */
+ i64 iOff = *piOffset;
+ int i;
+ int rc = SQLITE_OK;
+
+ *ppOut = pNew = vdbeMergeEngineNew(nPMA);
+ if( pNew==0 ) rc = SQLITE_NOMEM;
+
+ for(i=0; i<nPMA && rc==SQLITE_OK; i++){
+ i64 nDummy;
+ PmaReader *pReadr = &pNew->aReadr[i];
+ rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
+ iOff = pReadr->iEof;
}
- *pnByte = nByte;
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pNew);
+ *ppOut = 0;
+ }
+ *piOffset = iOff;
return rc;
}
/*
-** Once the sorter has been populated, this function is called to prepare
-** for iterating through its contents in sorted order.
+** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
+** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
+**
+** i.e.
+**
+** nPMA<=16 -> TreeDepth() == 0
+** nPMA<=256 -> TreeDepth() == 1
+** nPMA<=65536 -> TreeDepth() == 2
+*/
+static int vdbeSorterTreeDepth(int nPMA){
+ int nDepth = 0;
+ i64 nDiv = SORTER_MAX_MERGE_COUNT;
+ while( nDiv < (i64)nPMA ){
+ nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
+ nDepth++;
+ }
+ return nDepth;
+}
+
+/*
+** pRoot is the root of an incremental merge-tree with depth nDepth (according
+** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
+** tree, counting from zero. This function adds pLeaf to the tree.
+**
+** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
+** code is returned and pLeaf is freed.
*/
-int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
- VdbeSorter *pSorter = pCsr->pSorter;
- int rc; /* Return code */
- sqlite3_file *pTemp2 = 0; /* Second temp file to use */
- i64 iWrite2 = 0; /* Write offset for pTemp2 */
- int nIter; /* Number of iterators used */
- int nByte; /* Bytes of space required for aIter/aTree */
- int N = 2; /* Power of 2 >= nIter */
+static int vdbeSorterAddToTree(
+ SortSubtask *pTask, /* Task context */
+ int nDepth, /* Depth of tree according to TreeDepth() */
+ int iSeq, /* Sequence number of leaf within tree */
+ MergeEngine *pRoot, /* Root of tree */
+ MergeEngine *pLeaf /* Leaf to add to tree */
+){
+ int rc = SQLITE_OK;
+ int nDiv = 1;
+ int i;
+ MergeEngine *p = pRoot;
+ IncrMerger *pIncr;
- assert( pSorter );
+ rc = vdbeIncrNew(pTask, pLeaf, &pIncr);
- /* If no data has been written to disk, then do not do so now. Instead,
- ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
- ** from the in-memory list. */
- if( pSorter->nPMA==0 ){
- *pbEof = !pSorter->pRecord;
- assert( pSorter->aTree==0 );
- return vdbeSorterSort(pCsr);
+ for(i=1; i<nDepth; i++){
+ nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
}
- /* Write the current in-memory list to a PMA. */
- rc = vdbeSorterListToPMA(db, pCsr);
- if( rc!=SQLITE_OK ) return rc;
+ for(i=1; i<nDepth && rc==SQLITE_OK; i++){
+ int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
+ PmaReader *pReadr = &p->aReadr[iIter];
- /* Allocate space for aIter[] and aTree[]. */
- nIter = pSorter->nPMA;
- if( nIter>SORTER_MAX_MERGE_COUNT ) nIter = SORTER_MAX_MERGE_COUNT;
- assert( nIter>0 );
- while( N<nIter ) N += N;
- nByte = N * (sizeof(int) + sizeof(VdbeSorterIter));
- pSorter->aIter = (VdbeSorterIter *)sqlite3DbMallocZero(db, nByte);
- if( !pSorter->aIter ) return SQLITE_NOMEM;
- pSorter->aTree = (int *)&pSorter->aIter[N];
- pSorter->nTree = N;
-
- do {
- int iNew; /* Index of new, merged, PMA */
-
- for(iNew=0;
- rc==SQLITE_OK && iNew*SORTER_MAX_MERGE_COUNT<pSorter->nPMA;
- iNew++
- ){
- int rc2; /* Return code from fileWriterFinish() */
- FileWriter writer; /* Object used to write to disk */
- i64 nWrite; /* Number of bytes in new PMA */
-
- memset(&writer, 0, sizeof(FileWriter));
-
- /* If there are SORTER_MAX_MERGE_COUNT or less PMAs in file pTemp1,
- ** initialize an iterator for each of them and break out of the loop.
- ** These iterators will be incrementally merged as the VDBE layer calls
- ** sqlite3VdbeSorterNext().
- **
- ** Otherwise, if pTemp1 contains more than SORTER_MAX_MERGE_COUNT PMAs,
- ** initialize interators for SORTER_MAX_MERGE_COUNT of them. These PMAs
- ** are merged into a single PMA that is written to file pTemp2.
- */
- rc = vdbeSorterInitMerge(db, pCsr, &nWrite);
- assert( rc!=SQLITE_OK || pSorter->aIter[ pSorter->aTree[1] ].pFile );
- if( rc!=SQLITE_OK || pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
- break;
+ if( pReadr->pIncr==0 ){
+ MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
+ if( pNew==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ rc = vdbeIncrNew(pTask, pNew, &pReadr->pIncr);
+ }
+ }
+ if( rc==SQLITE_OK ){
+ p = pReadr->pIncr->pMerger;
+ nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
+ }
+ }
+
+ if( rc==SQLITE_OK ){
+ p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
+ }else{
+ vdbeIncrFree(pIncr);
+ }
+ return rc;
+}
+
+/*
+** This function is called as part of a SorterRewind() operation on a sorter
+** that has already written two or more level-0 PMAs to one or more temp
+** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
+** can be used to incrementally merge all PMAs on disk.
+**
+** If successful, SQLITE_OK is returned and *ppOut set to point to the
+** MergeEngine object at the root of the tree before returning. Or, if an
+** error occurs, an SQLite error code is returned and the final value
+** of *ppOut is undefined.
+*/
+static int vdbeSorterMergeTreeBuild(VdbeSorter *pSorter, MergeEngine **ppOut){
+ MergeEngine *pMain = 0;
+ int rc = SQLITE_OK;
+ int iTask;
+
+#if SQLITE_MAX_WORKER_THREADS>0
+ /* If the sorter uses more than one task, then create the top-level
+ ** MergeEngine here. This MergeEngine will read data from exactly
+ ** one PmaReader per sub-task. */
+ assert( pSorter->bUseThreads || pSorter->nTask==1 );
+ if( pSorter->nTask>1 ){
+ pMain = vdbeMergeEngineNew(pSorter->nTask);
+ if( pMain==0 ) rc = SQLITE_NOMEM;
+ }
+#endif
+
+ for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
+ SortSubtask *pTask = &pSorter->aTask[iTask];
+ if( pTask->nPMA ){
+ MergeEngine *pRoot = 0; /* Root node of tree for this task */
+ int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
+ i64 iReadOff = 0;
+
+ if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
+ rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
+ }else{
+ int i;
+ int iSeq = 0;
+ pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
+ if( pRoot==0 ) rc = SQLITE_NOMEM;
+ for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
+ MergeEngine *pMerger = 0; /* New level-0 PMA merger */
+ int nReader; /* Number of level-0 PMAs to merge */
+
+ nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
+ rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
+ }
+ }
}
- /* Open the second temp file, if it is not already open. */
- if( pTemp2==0 ){
- assert( iWrite2==0 );
- rc = vdbeSorterOpenTempFile(db, &pTemp2);
+ if( rc==SQLITE_OK ){
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pMain!=0 ){
+ rc = vdbeIncrNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
+ }else
+#endif
+ {
+ assert( pMain==0 );
+ pMain = pRoot;
+ }
+ }else{
+ vdbeMergeEngineFree(pRoot);
}
+ }
+ }
+
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pMain);
+ pMain = 0;
+ }
+ *ppOut = pMain;
+ return rc;
+}
+
+/*
+** This function is called as part of an sqlite3VdbeSorterRewind() operation
+** on a sorter that has written two or more PMAs to temporary files. It sets
+** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
+** (for multi-threaded sorters) so that it can be used to iterate through
+** all records stored in the sorter.
+**
+** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+*/
+static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
+ int rc; /* Return code */
+ SortSubtask *pTask0 = &pSorter->aTask[0];
+ MergeEngine *pMain = 0;
+#if SQLITE_MAX_WORKER_THREADS
+ sqlite3 *db = pTask0->pSorter->db;
+#endif
+ rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
+ if( rc==SQLITE_OK ){
+#if SQLITE_MAX_WORKER_THREADS
+ assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
+ if( pSorter->bUseThreads ){
+ int iTask;
+ PmaReader *pReadr;
+ SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
+ rc = vdbeSortAllocUnpacked(pLast);
if( rc==SQLITE_OK ){
- int bEof = 0;
- fileWriterInit(db, pTemp2, &writer, iWrite2);
- fileWriterWriteVarint(&writer, nWrite);
- while( rc==SQLITE_OK && bEof==0 ){
- VdbeSorterIter *pIter = &pSorter->aIter[ pSorter->aTree[1] ];
- assert( pIter->pFile );
-
- fileWriterWriteVarint(&writer, pIter->nKey);
- fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
- rc = sqlite3VdbeSorterNext(db, pCsr, &bEof);
+ pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
+ pSorter->pReader = pReadr;
+ if( pReadr==0 ) rc = SQLITE_NOMEM;
+ }
+ if( rc==SQLITE_OK ){
+ rc = vdbeIncrNew(pLast, pMain, &pReadr->pIncr);
+ if( rc==SQLITE_OK ){
+ vdbeIncrSetThreads(pReadr->pIncr);
+ for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
+ IncrMerger *pIncr;
+ if( (pIncr = pMain->aReadr[iTask].pIncr) ){
+ vdbeIncrSetThreads(pIncr);
+ assert( pIncr->pTask!=pLast );
+ }
+ }
+ for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
+ PmaReader *p = &pMain->aReadr[iTask];
+ assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
+ if( p->pIncr ){
+ if( iTask==pSorter->nTask-1 ){
+ rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
+ }else{
+ rc = vdbePmaReaderBgIncrInit(p);
+ }
+ }
+ }
}
- rc2 = fileWriterFinish(db, &writer, &iWrite2);
- if( rc==SQLITE_OK ) rc = rc2;
+ pMain = 0;
+ }
+ if( rc==SQLITE_OK ){
+ rc = vdbePmaReaderIncrInit(pReadr, INCRINIT_ROOT);
}
+ }else
+#endif
+ {
+ rc = vdbeIncrInitMerger(pTask0, pMain, INCRINIT_NORMAL);
+ pSorter->pMerger = pMain;
+ pMain = 0;
}
+ }
+
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pMain);
+ }
+ return rc;
+}
+
+
+/*
+** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
+** this function is called to prepare for iterating through the records
+** in sorted order.
+*/
+int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
+ VdbeSorter *pSorter = pCsr->pSorter;
+ int rc = SQLITE_OK; /* Return code */
+
+ assert( pSorter );
- if( pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
- break;
+ /* If no data has been written to disk, then do not do so now. Instead,
+ ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
+ ** from the in-memory list. */
+ if( pSorter->bUsePMA==0 ){
+ if( pSorter->list.pList ){
+ *pbEof = 0;
+ rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
}else{
- sqlite3_file *pTmp = pSorter->pTemp1;
- pSorter->nPMA = iNew;
- pSorter->pTemp1 = pTemp2;
- pTemp2 = pTmp;
- pSorter->iWriteOff = iWrite2;
- pSorter->iReadOff = 0;
- iWrite2 = 0;
+ *pbEof = 1;
}
- }while( rc==SQLITE_OK );
+ return rc;
+ }
+
+ /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
+ ** function flushes the contents of memory to disk, it immediately always
+ ** creates a new list consisting of a single key immediately afterwards.
+ ** So the list is never empty at this point. */
+ assert( pSorter->list.pList );
+ rc = vdbeSorterFlushPMA(pSorter);
+
+ /* Join all threads */
+ rc = vdbeSorterJoinAll(pSorter, rc);
+
+ vdbeSorterRewindDebug("rewind");
- if( pTemp2 ){
- sqlite3OsCloseFree(pTemp2);
+ /* Assuming no errors have occurred, set up a merger structure to
+ ** incrementally read and merge all remaining PMAs. */
+ assert( pSorter->pReader==0 );
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterSetupMerge(pSorter);
+ *pbEof = 0;
}
- *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
+
+ vdbeSorterRewindDebug("rewinddone");
return rc;
}
@@ -970,63 +2324,26 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
VdbeSorter *pSorter = pCsr->pSorter;
int rc; /* Return code */
- if( pSorter->aTree ){
- int iPrev = pSorter->aTree[1];/* Index of iterator to advance */
- rc = vdbeSorterIterNext(db, &pSorter->aIter[iPrev]);
- if( rc==SQLITE_OK ){
- int i; /* Index of aTree[] to recalculate */
- VdbeSorterIter *pIter1; /* First iterator to compare */
- VdbeSorterIter *pIter2; /* Second iterator to compare */
- u8 *pKey2; /* To pIter2->aKey, or 0 if record cached */
-
- /* Find the first two iterators to compare. The one that was just
- ** advanced (iPrev) and the one next to it in the array. */
- pIter1 = &pSorter->aIter[(iPrev & 0xFFFE)];
- pIter2 = &pSorter->aIter[(iPrev | 0x0001)];
- pKey2 = pIter2->aKey;
-
- for(i=(pSorter->nTree+iPrev)/2; i>0; i=i/2){
- /* Compare pIter1 and pIter2. Store the result in variable iRes. */
- int iRes;
- if( pIter1->pFile==0 ){
- iRes = +1;
- }else if( pIter2->pFile==0 ){
- iRes = -1;
- }else{
- vdbeSorterCompare(pCsr, 0,
- pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey, &iRes
- );
- }
-
- /* If pIter1 contained the smaller value, set aTree[i] to its index.
- ** Then set pIter2 to the next iterator to compare to pIter1. In this
- ** case there is no cache of pIter2 in pSorter->pUnpacked, so set
- ** pKey2 to point to the record belonging to pIter2.
- **
- ** Alternatively, if pIter2 contains the smaller of the two values,
- ** set aTree[i] to its index and update pIter1. If vdbeSorterCompare()
- ** was actually called above, then pSorter->pUnpacked now contains
- ** a value equivalent to pIter2. So set pKey2 to NULL to prevent
- ** vdbeSorterCompare() from decoding pIter2 again. */
- if( iRes<=0 ){
- pSorter->aTree[i] = (int)(pIter1 - pSorter->aIter);
- pIter2 = &pSorter->aIter[ pSorter->aTree[i ^ 0x0001] ];
- pKey2 = pIter2->aKey;
- }else{
- if( pIter1->pFile ) pKey2 = 0;
- pSorter->aTree[i] = (int)(pIter2 - pSorter->aIter);
- pIter1 = &pSorter->aIter[ pSorter->aTree[i ^ 0x0001] ];
- }
-
- }
- *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
+ assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
+ if( pSorter->bUsePMA ){
+ assert( pSorter->pReader==0 || pSorter->pMerger==0 );
+ assert( pSorter->bUseThreads==0 || pSorter->pReader );
+ assert( pSorter->bUseThreads==1 || pSorter->pMerger );
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pSorter->bUseThreads ){
+ rc = vdbePmaReaderNext(pSorter->pReader);
+ *pbEof = (pSorter->pReader->pFile==0);
+ }else
+#endif
+ /*if( !pSorter->bUseThreads )*/ {
+ rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
}
}else{
- SorterRecord *pFree = pSorter->pRecord;
- pSorter->pRecord = pFree->pNext;
- pFree->pNext = 0;
- vdbeSorterRecordFree(db, pFree);
- *pbEof = !pSorter->pRecord;
+ SorterRecord *pFree = pSorter->list.pList;
+ pSorter->list.pList = pFree->u.pNext;
+ pFree->u.pNext = 0;
+ if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
+ *pbEof = !pSorter->list.pList;
rc = SQLITE_OK;
}
return rc;
@@ -1041,14 +2358,21 @@ static void *vdbeSorterRowkey(
int *pnKey /* OUT: Size of current key in bytes */
){
void *pKey;
- if( pSorter->aTree ){
- VdbeSorterIter *pIter;
- pIter = &pSorter->aIter[ pSorter->aTree[1] ];
- *pnKey = pIter->nKey;
- pKey = pIter->aKey;
+ if( pSorter->bUsePMA ){
+ PmaReader *pReader;
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pSorter->bUseThreads ){
+ pReader = pSorter->pReader;
+ }else
+#endif
+ /*if( !pSorter->bUseThreads )*/{
+ pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
+ }
+ *pnKey = pReader->nKey;
+ pKey = pReader->aKey;
}else{
- *pnKey = pSorter->pRecord->nVal;
- pKey = pSorter->pRecord->pVal;
+ *pnKey = pSorter->list.pList->nVal;
+ pKey = SRVAL(pSorter->list.pList);
}
return pKey;
}
@@ -1076,10 +2400,16 @@ int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
** passed as the first argument currently points to. For the purposes of
** the comparison, ignore the rowid field at the end of each record.
**
+** If the sorter cursor key contains any NULL values, consider it to be
+** less than pVal. Even if pVal also contains NULL values.
+**
** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
** Otherwise, set *pRes to a negative, zero or positive value if the
** key in pVal is smaller than, equal to or larger than the current sorter
** key.
+**
+** This routine forms the core of the OP_SorterCompare opcode, which in
+** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
*/
int sqlite3VdbeSorterCompare(
const VdbeCursor *pCsr, /* Sorter cursor */
@@ -1088,9 +2418,29 @@ int sqlite3VdbeSorterCompare(
int *pRes /* OUT: Result of comparison */
){
VdbeSorter *pSorter = pCsr->pSorter;
+ UnpackedRecord *r2 = pSorter->pUnpacked;
+ KeyInfo *pKeyInfo = pCsr->pKeyInfo;
+ int i;
void *pKey; int nKey; /* Sorter key to compare pVal with */
+ if( r2==0 ){
+ char *p;
+ r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p);
+ assert( pSorter->pUnpacked==(UnpackedRecord*)p );
+ if( r2==0 ) return SQLITE_NOMEM;
+ r2->nField = pKeyInfo->nField-nIgnore;
+ }
+ assert( r2->nField>=pKeyInfo->nField-nIgnore );
+
pKey = vdbeSorterRowkey(pSorter, &nKey);
- vdbeSorterCompare(pCsr, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
+ sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
+ for(i=0; i<r2->nField; i++){
+ if( r2->aMem[i].flags & MEM_Null ){
+ *pRes = -1;
+ return SQLITE_OK;
+ }
+ }
+
+ *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2, 0);
return SQLITE_OK;
}