diff options
author | dan <dan@noemail.net> | 2014-03-31 19:57:34 +0000 |
---|---|---|
committer | dan <dan@noemail.net> | 2014-03-31 19:57:34 +0000 |
commit | b3f56fdb69201329c96ef9bb904a5df46fc62c0d (patch) | |
tree | 22e2c276f7d3152a78a4871fce8d028b12b5d5c8 /src | |
parent | 853c4a7621b37ecb3c4cab8ca1fb7fc06fb1a533 (diff) | |
download | sqlite-b3f56fdb69201329c96ef9bb904a5df46fc62c0d.tar.gz sqlite-b3f56fdb69201329c96ef9bb904a5df46fc62c0d.zip |
Add the SQLITE_MAX_WORKER_THREADS compile time option. And the SQLITE_CONFIG_WORKER_THREADS sqlite3_config() switch.
FossilOrigin-Name: 2774710df8cd2bfaca49888c69f1b01c0ddadf9a
Diffstat (limited to 'src')
-rw-r--r-- | src/global.c | 1 | ||||
-rw-r--r-- | src/main.c | 7 | ||||
-rw-r--r-- | src/sqlite.h.in | 11 | ||||
-rw-r--r-- | src/sqliteInt.h | 14 | ||||
-rw-r--r-- | src/threads.c | 3 | ||||
-rw-r--r-- | src/vdbesort.c | 72 |
6 files changed, 78 insertions, 30 deletions
diff --git a/src/global.c b/src/global.c index 1ee3f6436..4ed597d29 100644 --- a/src/global.c +++ b/src/global.c @@ -167,6 +167,7 @@ SQLITE_WSD struct Sqlite3Config sqlite3Config = { 0, /* nPage */ 0, /* mxParserStack */ 0, /* sharedCacheEnabled */ + SQLITE_MAX_WORKER_THREADS, /* nWorker */ /* All the rest should always be initialized to zero */ 0, /* isInit */ 0, /* inProgress */ diff --git a/src/main.c b/src/main.c index 9e83d4963..c1eaa6849 100644 --- a/src/main.c +++ b/src/main.c @@ -515,6 +515,13 @@ int sqlite3_config(int op, ...){ } #endif + case SQLITE_CONFIG_WORKER_THREADS: { + int n = va_arg(ap, int); + if( n>SQLITE_MAX_WORKER_THREADS ) n = SQLITE_MAX_WORKER_THREADS; + if( n>=0 ) sqlite3GlobalConfig.nWorker = n; + break; + } + default: { rc = SQLITE_ERROR; break; diff --git a/src/sqlite.h.in b/src/sqlite.h.in index 5d2c87552..78aa9c36e 100644 --- a/src/sqlite.h.in +++ b/src/sqlite.h.in @@ -1715,6 +1715,16 @@ struct sqlite3_mem_methods { ** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value ** that specifies the maximum size of the created heap. ** </dl> +** +** [[SQLITE_CONFIG_WORKER_THREADS]] +** <dt>SQLITE_CONFIG_WORKER_THREADS +** <dd>^SQLITE_CONFIG_WORKER_THREADS takes a single argument of type int. +** It is used to set the number of background worker threads that may be +** launched when sorting large amounts of data. A value of 0 means launch +** no background threads at all. The maximum number of background threads +** allowed is configured at build-time by the SQLITE_MAX_WORKER_THREADS +** pre-processor option. +** </dl> */ #define SQLITE_CONFIG_SINGLETHREAD 1 /* nil */ #define SQLITE_CONFIG_MULTITHREAD 2 /* nil */ @@ -1739,6 +1749,7 @@ struct sqlite3_mem_methods { #define SQLITE_CONFIG_SQLLOG 21 /* xSqllog, void* */ #define SQLITE_CONFIG_MMAP_SIZE 22 /* sqlite3_int64, sqlite3_int64 */ #define SQLITE_CONFIG_WIN32_HEAPSIZE 23 /* int nByte */ +#define SQLITE_CONFIG_WORKER_THREADS 24 /* int nWorker */ /* ** CAPI3REF: Database Connection Configuration Options diff --git a/src/sqliteInt.h b/src/sqliteInt.h index 41219c710..5f85e80d8 100644 --- a/src/sqliteInt.h +++ b/src/sqliteInt.h @@ -423,6 +423,19 @@ #endif /* +** If no value has been provided for SQLITE_MAX_WORKER_THREADS, or if +** SQLITE_TEMP_STORE is set to 3 (never use temporary files), set it +** to zero. +*/ +#if SQLITE_TEMP_STORE==3 +# undef SQLITE_MAX_WORKER_THREADS +#endif +#ifndef SQLITE_MAX_WORKER_THREADS +# define SQLITE_MAX_WORKER_THREADS 0 +#endif + + +/* ** GCC does not define the offsetof() macro so we'll have to do it ** ourselves. */ @@ -2685,6 +2698,7 @@ struct Sqlite3Config { int nPage; /* Number of pages in pPage[] */ int mxParserStack; /* maximum depth of the parser stack */ int sharedCacheEnabled; /* true if shared-cache mode enabled */ + int nWorker; /* Number of worker threads to use */ /* The above might be initialized to non-zero. The following need to always ** initially be zero, however. */ int isInit; /* True after initialization has finished */ diff --git a/src/threads.c b/src/threads.c index cb148b6c6..96cb17cef 100644 --- a/src/threads.c +++ b/src/threads.c @@ -27,6 +27,8 @@ */ #include "sqliteInt.h" +#if SQLITE_MAX_WORKER_THREADS>0 + /********************************* Unix Pthreads ****************************/ #if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0 @@ -215,3 +217,4 @@ int sqlite3ThreadJoin(SQLiteThread *p, void **ppOut){ #endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */ /****************************** End Single-Threaded *************************/ +#endif /* SQLITE_MAX_WORKER_THREADS>0 */ diff --git a/src/vdbesort.c b/src/vdbesort.c index c6927f87a..84c6fadc9 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -27,14 +27,6 @@ typedef struct FileWriter FileWriter; /* -** Maximum number of threads to use. Setting this value to 1 forces all -** operations to be single-threaded. -*/ -#ifndef SQLITE_MAX_SORTER_THREAD -# define SQLITE_MAX_SORTER_THREAD 4 -#endif - -/* ** Candidate values for SorterThread.eWork */ #define SORTER_THREAD_SORT 1 @@ -48,9 +40,10 @@ typedef struct FileWriter FileWriter; ** is configured and passed to vdbeSorterThreadMain() - either directly by ** the main thread or via a background thread. ** -** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated +** Exactly SorterThread.nThread instances of this structure are allocated ** as part of each VdbeSorter object. Instances are never allocated any other -** way. +** way. SorterThread.nThread is set to the number of worker threads allowed +** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). ** ** When a background thread is launched to perform work, SorterThread.bDone ** is set to 0 and the SorterThread.pThread variable set to point to the @@ -59,7 +52,7 @@ typedef struct FileWriter FileWriter; ** exits. SorterThread.pThread and bDone are always cleared after the ** background thread has been joined. ** -** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1]) +** One object (specifically, VdbeSorter.aThread[SorterThread.nThread-1]) ** is reserved for the foreground thread. ** ** The nature of the work performed is determined by SorterThread.eWork, @@ -187,7 +180,8 @@ struct VdbeSorter { u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ - SorterThread aThread[SQLITE_MAX_SORTER_THREAD]; + int nThread; /* Size of aThread[] array */ + SorterThread aThread[1]; }; /* @@ -572,7 +566,7 @@ static int vdbeSorterDoCompare( iRes = i1; }else{ int res; - assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */ + assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterThreadMain() */ vdbeSorterCompare( pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res ); @@ -597,21 +591,26 @@ int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter; /* The new sorter */ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ + int sz; /* Size of pSorter in bytes */ int rc = SQLITE_OK; + int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0); assert( pCsr->pKeyInfo && pCsr->pBt==0 ); szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); - pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo); + sz = sizeof(VdbeSorter) + nWorker * sizeof(SorterThread); + + pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); pCsr->pSorter = pSorter; if( pSorter==0 ){ rc = SQLITE_NOMEM; }else{ - pKeyInfo = (KeyInfo*)&pSorter[1]; + pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); pKeyInfo->db = 0; pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); - for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){ + pSorter->nThread = nWorker + 1; + for(i=0; i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; pThread->pKeyInfo = pKeyInfo; pThread->pVfs = db->pVfs; @@ -674,10 +673,11 @@ static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){ /* ** Join all threads. */ +#if SQLITE_MAX_WORKER_THREADS>0 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; - for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){ + for(i=0; i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; if( pThread->pThread ){ void *pRet; @@ -690,6 +690,9 @@ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ } return rc; } +#else +# define vdbeSorterJoinAll(x,rcin) (rcin) +#endif /* ** Allocate a new SorterMerger object with space for nIter iterators. @@ -739,7 +742,7 @@ static void vdbeSorterMergerFree(SorterMerger *pMerger){ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ int i; vdbeSorterJoinAll(pSorter, SQLITE_OK); - for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){ + for(i=0; i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; vdbeSorterThreadCleanup(db, pThread); } @@ -1246,8 +1249,9 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ SorterThread *pThread; /* Thread context used to create new PMA */ pSorter->bUsePMA = 1; - for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){ + for(i=0; ALWAYS( i<pSorter->nThread ); i++){ pThread = &pSorter->aThread[i]; +#if SQLITE_MAX_WORKER_THREADS>0 if( pThread->bDone ){ void *pRet; assert( pThread->pThread ); @@ -1258,11 +1262,12 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ rc = SQLITE_PTR_TO_INT(pRet); } } +#endif if( pThread->pThread==0 ) break; } if( rc==SQLITE_OK ){ - int bUseFg = (bFg || i==(SQLITE_MAX_SORTER_THREAD-1)); + int bUseFg = (bFg || i==(pSorter->nThread-1)); assert( pThread->pThread==0 && pThread->bDone==0 ); pThread->eWork = SORTER_THREAD_TO_PMA; @@ -1277,6 +1282,7 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ pSorter->aMemory = aMem; } +#if SQLITE_MAX_WORKER_THREADS>0 if( bUseFg==0 ){ /* Launch a background thread for this operation */ void *pCtx = (void*)pThread; @@ -1290,7 +1296,9 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ } } rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx); - }else{ + }else +#endif + { /* Use the foreground thread for this operation */ u8 *aMem; rc = vdbeSorterRunThread(pThread); @@ -1370,7 +1378,9 @@ int sqlite3VdbeSorterWrite( aNew = sqlite3Realloc(pSorter->aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; - pSorter->pRecord = aNew + ((u8*)pSorter->pRecord - pSorter->aMemory); + pSorter->pRecord = (SorterRecord*)( + aNew + ((u8*)pSorter->pRecord - pSorter->aMemory) + ); pSorter->aMemory = aNew; pSorter->nMemory = nNew; } @@ -1399,7 +1409,7 @@ int sqlite3VdbeSorterWrite( static int vdbeSorterCountPMA(VdbeSorter *pSorter){ int nPMA = 0; int i; - for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){ + for(i=0; i<pSorter->nThread; i++){ nPMA += pSorter->aThread[i].nPMA; } return nPMA; @@ -1446,19 +1456,21 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ - assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD ); if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; - for(i=0; rc==SQLITE_OK && i<SQLITE_MAX_SORTER_THREAD; i++){ + for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; if( pThread->pTemp1 ){ - pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD; + pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/pSorter->nThread; pThread->eWork = SORTER_THREAD_CONS; - if( i<(SQLITE_MAX_SORTER_THREAD-1) ){ +#if SQLITE_MAX_WORKER_THREADS>0 + if( i<(pSorter->nThread-1) ){ void *pCtx = (void*)pThread; rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx); - }else{ + }else +#endif + { rc = vdbeSorterRunThread(pThread); } } @@ -1475,7 +1487,7 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ int nIter = 0; /* Number of iterators used */ int i; SorterMerger *pMerger; - for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){ + for(i=0; i<pSorter->nThread; i++){ nIter += pSorter->aThread[i].nPMA; } @@ -1485,7 +1497,7 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ }else{ int iIter = 0; int iThread = 0; - for(iThread=0; iThread<SQLITE_MAX_SORTER_THREAD; iThread++){ + for(iThread=0; iThread<pSorter->nThread; iThread++){ int iPMA; i64 iReadOff = 0; SorterThread *pThread = &pSorter->aThread[iThread]; |