diff options
Diffstat (limited to 'src/test_async.c')
-rw-r--r-- | src/test_async.c | 104 |
1 files changed, 74 insertions, 30 deletions
diff --git a/src/test_async.c b/src/test_async.c index 376058356..422a0007b 100644 --- a/src/test_async.c +++ b/src/test_async.c @@ -244,6 +244,8 @@ static struct TestAsyncStaticData { volatile int ioDelay; /* Extra delay between write operations */ volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */ volatile int writerHaltNow; /* Writer thread halts after next op */ + int ioError; /* True if an IO error has occured */ + int nFile; /* Number of open files (from sqlite pov) */ } async = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER, @@ -332,7 +334,6 @@ struct AsyncWrite { */ struct AsyncFile { IoMethod *pMethod; /* Must be first */ - int ioError; /* Value of any asychronous error we have seen */ i64 iOffset; /* Current seek() offset in file */ char *zName; /* Underlying OS filename - used for debugging */ int nName; /* Number of characters in zName */ @@ -366,6 +367,13 @@ static void addAsyncWrite(AsyncWrite *pWrite){ TRACE(("PUSH %p (%s %s)\n", pWrite, azOpcodeName[pWrite->op], pWrite->pFile ? pWrite->pFile->zName : "-")); + if( pWrite->op==ASYNC_CLOSE ){ + async.nFile--; + if( async.nFile==0 ){ + async.ioError = SQLITE_OK; + } + } + /* Drop the queue mutex */ pthread_mutex_unlock(&async.queueMutex); @@ -375,6 +383,19 @@ static void addAsyncWrite(AsyncWrite *pWrite){ } /* +** Increment async.nFile in a thread-safe manner. +*/ +static void incrOpenFileCount(){ + /* We must hold the queue mutex in order to modify async.nFile */ + pthread_mutex_lock(&async.queueMutex); + if( async.nFile==0 ){ + async.ioError = SQLITE_OK; + } + async.nFile++; + pthread_mutex_unlock(&async.queueMutex); +} + +/* ** This is a utility function to allocate and populate a new AsyncWrite ** structure and insert it (via addAsyncWrite() ) into the global list. */ @@ -386,8 +407,8 @@ static int addNewAsyncWrite( const char *zByte ){ AsyncWrite *p; - if( pFile && pFile->ioError!=SQLITE_OK ){ - return pFile->ioError; + if( op!=ASYNC_CLOSE && async.ioError ){ + return async.ioError; } p = sqlite3OsMalloc(sizeof(AsyncWrite) + (zByte?nByte:0)); if( !p ){ @@ -484,8 +505,8 @@ static int asyncRead(OsFile *id, void *obuf, int amt){ /* If an I/O error has previously occurred on this file, then all ** subsequent operations fail. */ - if( pFile->ioError!=SQLITE_OK ){ - return pFile->ioError; + if( async.ioError!=SQLITE_OK ){ + return async.ioError; } /* Grab the write queue mutex for the duration of the call */ @@ -708,7 +729,6 @@ static int asyncOpenFile( p->pMethod = &iomethod; p->pBaseRead = pBaseRead; p->pBaseWrite = pBaseWrite; - p->ioError = SQLITE_OK; *pFile = (OsFile *)p; return SQLITE_OK; @@ -738,6 +758,9 @@ static int asyncOpenExclusive(const char *z, OsFile **ppFile, int delFlag){ *ppFile = 0; } } + if( rc==SQLITE_OK ){ + incrOpenFileCount(); + } return rc; } static int asyncOpenReadOnly(const char *z, OsFile **ppFile){ @@ -746,6 +769,9 @@ static int asyncOpenReadOnly(const char *z, OsFile **ppFile){ if( rc==SQLITE_OK ){ rc = asyncOpenFile(z, ppFile, pBase, 0); } + if( rc==SQLITE_OK ){ + incrOpenFileCount(); + } return rc; } static int asyncOpenReadWrite(const char *z, OsFile **ppFile, int *pReadOnly){ @@ -754,6 +780,9 @@ static int asyncOpenReadWrite(const char *z, OsFile **ppFile, int *pReadOnly){ if( rc==SQLITE_OK ){ rc = asyncOpenFile(z, ppFile, pBase, (*pReadOnly ? 0 : 1)); } + if( rc==SQLITE_OK ){ + incrOpenFileCount(); + } return rc; } @@ -869,16 +898,17 @@ static void asyncEnable(int enable){ static void *asyncWriterThread(void *NotUsed){ AsyncWrite *p = 0; int rc = SQLITE_OK; + int holdingMutex = 0; if( pthread_mutex_trylock(&async.writerMutex) ){ return 0; } while( async.writerHaltNow==0 ){ - int holdingMutex; OsFile *pBase = 0; - pthread_mutex_lock(&async.queueMutex); - holdingMutex = 1; + if( !holdingMutex ){ + pthread_mutex_lock(&async.queueMutex); + } while( (p = async.pQueueFirst)==0 ){ pthread_cond_broadcast(&async.emptySignal); if( async.writerHaltWhenIdle ){ @@ -891,6 +921,7 @@ static void *asyncWriterThread(void *NotUsed){ } } if( p==0 ) break; + holdingMutex = 1; /* Right now this thread is holding the mutex on the write-op queue. ** Variable 'p' points to the first entry in the write-op queue. In @@ -911,11 +942,11 @@ static void *asyncWriterThread(void *NotUsed){ ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two ** file-handles are open for the particular file being "synced". */ + if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ + p->op = ASYNC_NOOP; + } if( p->pFile ){ pBase = p->pFile->pBaseWrite; - if( p->pFile->ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ - p->op = ASYNC_NOOP; - } if( p->op==ASYNC_CLOSE || p->op==ASYNC_OPENEXCLUSIVE || @@ -1003,19 +1034,6 @@ static void *asyncWriterThread(void *NotUsed){ default: assert(!"Illegal value for AsyncWrite.op"); } - /* If an error happens, store the error code in the pFile.ioError - ** field. This will prevent any future operations on that file, - ** other than closing it. - ** - ** We cannot report the error back to the connection that requested - ** the I/O since the error happened asynchronously. The connection has - ** already moved on. There really is nobody to report the error to. - */ - if( rc!=SQLITE_OK ){ - p->pFile->ioError = rc; - rc = SQLITE_OK; - } - /* If we didn't hang on to the mutex during the IO op, obtain it now ** so that the AsyncWrite structure can be safely removed from the ** global write-op queue. @@ -1032,16 +1050,42 @@ static void *asyncWriterThread(void *NotUsed){ sqlite3OsFree(p); assert( holdingMutex ); + /* An IO error has occured. We cannot report the error back to the + ** connection that requested the I/O since the error happened + ** asynchronously. The connection has already moved on. There + ** really is nobody to report the error to. + ** + ** The file for which the error occured may have been a database or + ** journal file. Regardless, none of the currently queued operations + ** associated with the same database should now be performed. Nor should + ** any subsequently requested IO on either a database or journal file + ** handle for the same database be accepted until the main database + ** file handle has been closed and reopened. + ** + ** Furthermore, no further IO should be queued or performed on any file + ** handle associated with a database that may have been part of a + ** multi-file transaction that included the database associated with + ** the IO error (i.e. a database ATTACHed to the same handle at some + ** point in time). + */ + if( rc!=SQLITE_OK ){ + async.ioError = rc; + } + /* Drop the queue mutex before continuing to the next write operation ** in order to give other threads a chance to work with the write queue. */ - pthread_mutex_unlock(&async.queueMutex); - if( async.ioDelay>0 ){ - sqlite3OsSleep(async.ioDelay); - }else{ - sched_yield(); + if( !async.pQueueFirst || !async.ioError ){ + pthread_mutex_unlock(&async.queueMutex); + holdingMutex = 0; + if( async.ioDelay>0 ){ + sqlite3OsSleep(async.ioDelay); + }else{ + sched_yield(); + } } } + pthread_mutex_unlock(&async.writerMutex); return 0; } |