aboutsummaryrefslogtreecommitdiff
path: root/ext/wasm
diff options
context:
space:
mode:
Diffstat (limited to 'ext/wasm')
-rw-r--r--ext/wasm/api/EXPORTED_FUNCTIONS.sqlite3-api1
-rw-r--r--ext/wasm/api/extern-post-js.js3
-rw-r--r--ext/wasm/api/sqlite3-api-opfs.js12
-rw-r--r--ext/wasm/api/sqlite3-api-prologue.js1
-rw-r--r--ext/wasm/api/sqlite3-opfs-async-proxy.js141
-rw-r--r--ext/wasm/index.html3
-rw-r--r--ext/wasm/tests/opfs/concurrency/index.html34
-rw-r--r--ext/wasm/tests/opfs/concurrency/test.js97
-rw-r--r--ext/wasm/tests/opfs/concurrency/worker.js95
9 files changed, 326 insertions, 61 deletions
diff --git a/ext/wasm/api/EXPORTED_FUNCTIONS.sqlite3-api b/ext/wasm/api/EXPORTED_FUNCTIONS.sqlite3-api
index b903bedee..1f7908e3b 100644
--- a/ext/wasm/api/EXPORTED_FUNCTIONS.sqlite3-api
+++ b/ext/wasm/api/EXPORTED_FUNCTIONS.sqlite3-api
@@ -7,6 +7,7 @@ _sqlite3_bind_null
_sqlite3_bind_parameter_count
_sqlite3_bind_parameter_index
_sqlite3_bind_text
+_sqlite3_busy_timeout
_sqlite3_changes
_sqlite3_changes64
_sqlite3_clear_bindings
diff --git a/ext/wasm/api/extern-post-js.js b/ext/wasm/api/extern-post-js.js
index cace6ed51..b32783781 100644
--- a/ext/wasm/api/extern-post-js.js
+++ b/ext/wasm/api/extern-post-js.js
@@ -59,6 +59,9 @@ const toExportForES6 =
li.pop();
initModuleState.sqlite3Dir = li.join('/') + '/';
}
+ if(initModuleState.sqlite3Dir){
+ initModuleState.sqlite3Dir = initModuleState.sqlite3Dir.replace(/[/]{2,}/g,'/');
+ }
self.sqlite3InitModule = (...args)=>{
//console.warn("Using replaced sqlite3InitModule()",self.location);
diff --git a/ext/wasm/api/sqlite3-api-opfs.js b/ext/wasm/api/sqlite3-api-opfs.js
index a3f73cc7b..1fd50dcc6 100644
--- a/ext/wasm/api/sqlite3-api-opfs.js
+++ b/ext/wasm/api/sqlite3-api-opfs.js
@@ -92,7 +92,8 @@ const installOpfsVfs = function callee(options){
}
const urlParams = new URL(self.location.href).searchParams;
if(undefined===options.verbose){
- options.verbose = urlParams.has('opfs-verbose') ? 3 : 2;
+ options.verbose = urlParams.has('opfs-verbose')
+ ? (+urlParams.get('opfs-verbose') || 2) : 1;
}
if(undefined===options.sanityChecks){
options.sanityChecks = urlParams.has('opfs-sanity-check');
@@ -101,6 +102,8 @@ const installOpfsVfs = function callee(options){
options.proxyUri = callee.defaultProxyUri;
}
+ //console.warn("OPFS options =",options,self.location);
+
if('function' === typeof options.proxyUri){
options.proxyUri = options.proxyUri();
}
@@ -1154,7 +1157,10 @@ const installOpfsVfs = function callee(options){
[
/* Truncate journal mode is faster than delete or wal for
this vfs, per speedtest1. */
- "pragma journal_mode=truncate;"
+ "pragma journal_mode=truncate;",
+ /* Set a default busy-timeout handler to help OPFS dbs
+ deal with multi-tab/multi-worker contention. */
+ "pragma busy_timeout=2000;",
/*
This vfs benefits hugely from cache on moderate/large
speedtest1 --size 50 and --size 100 workloads. We currently
@@ -1162,7 +1168,7 @@ const installOpfsVfs = function callee(options){
sqlite3.wasm. If that policy changes, the cache can
be set here.
*/
- //"pragma cache_size=-8388608;"
+ //"pragma cache_size=-16384;"
].join("")
);
}
diff --git a/ext/wasm/api/sqlite3-api-prologue.js b/ext/wasm/api/sqlite3-api-prologue.js
index fed1c5666..8b2ce0936 100644
--- a/ext/wasm/api/sqlite3-api-prologue.js
+++ b/ext/wasm/api/sqlite3-api-prologue.js
@@ -897,6 +897,7 @@ self.sqlite3ApiBootstrap = function sqlite3ApiBootstrap(
the lines of sqlite3_prepare_v3(). The slightly problematic
part is the final argument (text destructor). */
],
+ ["sqlite3_busy_timeout","int", "sqlite3*", "int"],
["sqlite3_close_v2", "int", "sqlite3*"],
["sqlite3_changes", "int", "sqlite3*"],
["sqlite3_clear_bindings","int", "sqlite3_stmt*"],
diff --git a/ext/wasm/api/sqlite3-opfs-async-proxy.js b/ext/wasm/api/sqlite3-opfs-async-proxy.js
index e4657484e..3701e8c30 100644
--- a/ext/wasm/api/sqlite3-opfs-async-proxy.js
+++ b/ext/wasm/api/sqlite3-opfs-async-proxy.js
@@ -53,7 +53,7 @@ const state = Object.create(null);
2 = warnings and errors
3 = debug, warnings, and errors
*/
-state.verbose = 2;
+state.verbose = 1;
const loggers = {
0:console.error.bind(console),
@@ -151,6 +151,57 @@ const getDirForFilename = async function f(absFilename, createDirs = false){
};
/**
+ If the given file-holding object has a sync handle attached to it,
+ that handle is remove and asynchronously closed. Though it may
+ sound sensible to continue work as soon as the close() returns
+ (noting that it's asynchronous), doing so can cause operations
+ performed soon afterwards, e.g. a call to getSyncHandle() to fail
+ because they may happen out of order from the close(). OPFS does
+ not guaranty that the actual order of operations is retained in
+ such cases. i.e. always "await" on the result of this function.
+*/
+const closeSyncHandle = async (fh)=>{
+ if(fh.syncHandle){
+ log("Closing sync handle for",fh.filenameAbs);
+ const h = fh.syncHandle;
+ delete fh.syncHandle;
+ delete fh.xLock;
+ __autoLocks.delete(fh.fid);
+ return h.close();
+ }
+};
+
+/**
+ A proxy for closeSyncHandle() which is guaranteed to not throw.
+
+ This function is part of a lock/unlock step in functions which
+ require a sync access handle but may be called without xLock()
+ having been called first. Such calls need to release that
+ handle to avoid locking the file for all of time. This is an
+ _attempt_ at reducing cross-tab contention but it may prove
+ to be more of a problem than a solution and may need to be
+ removed.
+*/
+const closeSyncHandleNoThrow = async (fh)=>{
+ try{await closeSyncHandle(fh)}
+ catch(e){
+ warn("closeSyncHandleNoThrow() ignoring:",e,fh);
+ }
+};
+
+/* Release all auto-locks. */
+const closeAutoLocks = async ()=>{
+ if(__autoLocks.size){
+ /* Release all auto-locks. */
+ for(const fid of __autoLocks){
+ const fh = __openFiles[fid];
+ await closeSyncHandleNoThrow(fh);
+ log("Auto-unlocked",fid,fh.filenameAbs);
+ }
+ }
+};
+
+/**
An error class specifically for use with getSyncHandle(), the goal
of which is to eventually be able to distinguish unambiguously
between locking-related failures and other types, noting that we
@@ -168,7 +219,25 @@ class GetSyncHandleError extends Error {
this.name = 'GetSyncHandleError';
}
};
-
+GetSyncHandleError.convertRc = (e,rc)=>{
+ if(1){
+ /* This approach returns SQLITE_LOCKED to the C API
+ when getSyncHandle() fails but makes the very
+ wild assumption that such a failure _is_ a locking
+ error. In practice that appears to be the most
+ common error, by far, but we cannot unambiguously
+ distinguish that from other errors.
+
+ This approach demonstrably reduces concurrency-related
+ errors but is highly questionable.
+ */
+ return (e instanceof GetSyncHandleError)
+ ? state.sq3Codes.SQLITE_LOCKED
+ : rc;
+ }else{
+ return ec;
+ }
+}
/**
Returns the sync access handle associated with the given file
handle object (which must be a valid handle object, as created by
@@ -201,7 +270,8 @@ const getSyncHandle = async (fh)=>{
);
}
warn("Error getting sync handle. Waiting",ms,
- "ms and trying again.",fh.filenameAbs,e);
+ "ms and trying again.",fh.filenameAbs,e);
+ //await closeAutoLocks();
Atomics.wait(state.sabOPView, state.opIds.retry, 0, ms);
}
}
@@ -215,45 +285,6 @@ const getSyncHandle = async (fh)=>{
};
/**
- If the given file-holding object has a sync handle attached to it,
- that handle is remove and asynchronously closed. Though it may
- sound sensible to continue work as soon as the close() returns
- (noting that it's asynchronous), doing so can cause operations
- performed soon afterwards, e.g. a call to getSyncHandle() to fail
- because they may happen out of order from the close(). OPFS does
- not guaranty that the actual order of operations is retained in
- such cases. i.e. always "await" on the result of this function.
-*/
-const closeSyncHandle = async (fh)=>{
- if(fh.syncHandle){
- log("Closing sync handle for",fh.filenameAbs);
- const h = fh.syncHandle;
- delete fh.syncHandle;
- delete fh.xLock;
- __autoLocks.delete(fh.fid);
- return h.close();
- }
-};
-
-/**
- A proxy for closeSyncHandle() which is guaranteed to not throw.
-
- This function is part of a lock/unlock step in functions which
- require a sync access handle but may be called without xLock()
- having been called first. Such calls need to release that
- handle to avoid locking the file for all of time. This is an
- _attempt_ at reducing cross-tab contention but it may prove
- to be more of a problem than a solution and may need to be
- removed.
-*/
-const closeSyncHandleNoThrow = async (fh)=>{
- try{await closeSyncHandle(fh)}
- catch(e){
- warn("closeSyncHandleNoThrow() ignoring:",e,fh);
- }
-};
-
-/**
Stores the given value at state.sabOPView[state.opIds.rc] and then
Atomics.notify()'s it.
*/
@@ -451,7 +482,7 @@ const vfsAsyncImpls = {
rc = 0;
}catch(e){
state.s11n.storeException(2,e);
- rc = state.sq3Codes.SQLITE_IOERR;
+ rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR);
}
wTimeEnd();
storeAndNotify('xFileSize', rc);
@@ -471,7 +502,7 @@ const vfsAsyncImpls = {
__autoLocks.delete(fid);
}catch(e){
state.s11n.storeException(1,e);
- rc = state.sq3Codes.SQLITE_IOERR_LOCK;
+ rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_LOCK);
fh.xLock = oldLockType;
}
wTimeEnd();
@@ -545,7 +576,7 @@ const vfsAsyncImpls = {
if(undefined===nRead) wTimeEnd();
error("xRead() failed",e,fh);
state.s11n.storeException(1,e);
- rc = state.sq3Codes.SQLITE_IOERR_READ;
+ rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_READ);
}
storeAndNotify('xRead',rc);
mTimeEnd();
@@ -579,7 +610,7 @@ const vfsAsyncImpls = {
}catch(e){
error("xTruncate():",e,fh);
state.s11n.storeException(2,e);
- rc = state.sq3Codes.SQLITE_IOERR_TRUNCATE;
+ rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_TRUNCATE);
}
wTimeEnd();
storeAndNotify('xTruncate',rc);
@@ -619,7 +650,7 @@ const vfsAsyncImpls = {
}catch(e){
error("xWrite():",e,fh);
state.s11n.storeException(1,e);
- rc = state.sq3Codes.SQLITE_IOERR_WRITE;
+ rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_WRITE);
}
wTimeEnd();
storeAndNotify('xWrite',rc);
@@ -746,22 +777,16 @@ const waitLoop = async function f(){
/**
waitTime is how long (ms) to wait for each Atomics.wait().
We need to wake up periodically to give the thread a chance
- to do other things.
+ to do other things. If this is too high (e.g. 500ms) then
+ even two workers/tabs can easily run into locking errors.
*/
- const waitTime = 500;
+ const waitTime = 150;
while(!flagAsyncShutdown){
try {
if('timed-out'===Atomics.wait(
state.sabOPView, state.opIds.whichOp, 0, waitTime
)){
- if(__autoLocks.size){
- /* Release all auto-locks. */
- for(const fid of __autoLocks){
- const fh = __openFiles[fid];
- await closeSyncHandleNoThrow(fh);
- log("Auto-unlocked",fid,fh.filenameAbs);
- }
- }
+ await closeAutoLocks();
continue;
}
const opId = Atomics.load(state.sabOPView, state.opIds.whichOp);
@@ -791,7 +816,7 @@ navigator.storage.getDirectory().then(function(d){
const opt = data.args;
state.littleEndian = opt.littleEndian;
state.asyncS11nExceptions = opt.asyncS11nExceptions;
- state.verbose = opt.verbose ?? 2;
+ state.verbose = opt.verbose ?? 1;
state.fileBufferSize = opt.fileBufferSize;
state.sabS11nOffset = opt.sabS11nOffset;
state.sabS11nSize = opt.sabS11nSize;
diff --git a/ext/wasm/index.html b/ext/wasm/index.html
index 37d66603f..9fa5bbdf4 100644
--- a/ext/wasm/index.html
+++ b/ext/wasm/index.html
@@ -104,6 +104,9 @@
synchronous sqlite3_vfs interface and the async OPFS
impl.
</li>
+ <li><a href='tests/opfs/concurrency/index.html'>OPFS concurrency</a>
+ tests using multiple workers.
+ </li>
</ul>
</li>
<!--li><a href='x.html'></a></li-->
diff --git a/ext/wasm/tests/opfs/concurrency/index.html b/ext/wasm/tests/opfs/concurrency/index.html
new file mode 100644
index 000000000..79a46692c
--- /dev/null
+++ b/ext/wasm/tests/opfs/concurrency/index.html
@@ -0,0 +1,34 @@
+<!doctype html>
+<html lang="en-us">
+ <head>
+ <meta charset="utf-8">
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+ <link rel="shortcut icon" href="data:image/x-icon;," type="image/x-icon">
+ <link rel="stylesheet" href="../../../common/testing.css"/>
+ <title>sqlite3 OPFS Worker concurrency tester</title>
+ <style>
+ body { display: revert; }
+ body > * {}
+ #test-output {
+ font-family: monospace;
+ }
+ </style>
+ </head>
+ <body>
+ <h1></h1>
+ <p>
+ OPFS concurrency tester using multiple independent Workers.
+ This app is incomplete.
+ </p>
+ <div class='input-wrapper'>
+ <input type='checkbox' id='cb-log-reverse'>
+ <label for='cb-log-reverse'>Reverse log order?</label>
+ </div>
+ <div id='test-output'></div>
+ <script>(function(){
+ document.querySelector('h1').innerHTML =
+ document.querySelector('title').innerHTML;
+ })();</script>
+ <script src="test.js?sqlite3.dir=../../../jswasm"></script>
+ </body>
+</html>
diff --git a/ext/wasm/tests/opfs/concurrency/test.js b/ext/wasm/tests/opfs/concurrency/test.js
new file mode 100644
index 000000000..d045f3271
--- /dev/null
+++ b/ext/wasm/tests/opfs/concurrency/test.js
@@ -0,0 +1,97 @@
+(async function(self){
+
+ const logClass = (function(){
+ const mapToString = (v)=>{
+ switch(typeof v){
+ case 'number': case 'string': case 'boolean':
+ case 'undefined': case 'bigint':
+ return ''+v;
+ default: break;
+ }
+ if(null===v) return 'null';
+ if(v instanceof Error){
+ v = {
+ message: v.message,
+ stack: v.stack,
+ errorClass: v.name
+ };
+ }
+ return JSON.stringify(v,undefined,2);
+ };
+ const normalizeArgs = (args)=>args.map(mapToString);
+ const logTarget = document.querySelector('#test-output');
+ const logClass = function(cssClass,...args){
+ const ln = document.createElement('div');
+ if(cssClass){
+ for(const c of (Array.isArray(cssClass) ? cssClass : [cssClass])){
+ ln.classList.add(c);
+ }
+ }
+ ln.append(document.createTextNode(normalizeArgs(args).join(' ')));
+ logTarget.append(ln);
+ };
+ const cbReverse = document.querySelector('#cb-log-reverse');
+ const cbReverseKey = 'tester1:cb-log-reverse';
+ const cbReverseIt = ()=>{
+ logTarget.classList[cbReverse.checked ? 'add' : 'remove']('reverse');
+ localStorage.setItem(cbReverseKey, cbReverse.checked ? 1 : 0);
+ };
+ cbReverse.addEventListener('change', cbReverseIt, true);
+ if(localStorage.getItem(cbReverseKey)){
+ cbReverse.checked = !!(+localStorage.getItem(cbReverseKey));
+ }
+ cbReverseIt();
+ return logClass;
+ })();
+ const stdout = (...args)=>logClass('',...args);
+ const stderr = (...args)=>logClass('error',...args);
+
+ const wait = async (ms)=>{
+ return new Promise((resolve)=>setTimeout(resolve,ms));
+ };
+
+ const urlArgsJs = new URL(document.currentScript.src).searchParams;
+ const urlArgsHtml = new URL(self.location.href).searchParams;
+ const options = Object.create(null);
+ options.sqlite3Dir = urlArgsJs.get('sqlite3.dir');
+ options.workerCount = (
+ urlArgsHtml.has('workers') ? +urlArgsHtml.get('workers') : 3
+ ) || 3;
+ const workers = [];
+ workers.post = (type,...args)=>{
+ for(const w of workers) w.postMessage({type, payload:args});
+ };
+ workers.loadedCount = 0;
+ workers.onmessage = function(msg){
+ msg = msg.data;
+ const wName = msg.worker;
+ const prefix = 'Worker ['+wName+']:';
+ switch(msg.type){
+ case 'stdout': stdout(prefix,...msg.payload); break;
+ case 'stderr': stderr(prefix,...msg.payload); break;
+ case 'error': stderr(prefix,"ERROR:",...msg.payload); break;
+ case 'loaded':
+ stdout(prefix,"loaded");
+ if(++workers.loadedCount === workers.length){
+ stdout("All workers loaded. Telling them to run...");
+ workers.post('run');
+ }
+ break;
+ default: logClass('error',"Unhandled message type:",msg); break;
+ }
+ };
+
+ stdout("Launching",options.workerCount,"workers...");
+ workers.uri = (
+ 'worker.js?'
+ + 'sqlite3.dir='+options.sqlite3Dir
+ + '&opfs-verbose=2'
+ );
+ for(let i = 0; i < options.workerCount; ++i){
+ stdout("Launching worker...");
+ workers.push(new Worker(workers.uri+(i ? '' : '&unlink-db')));
+ }
+ // Have to delay onmessage assignment until after the loop
+ // to avoid that early workers get an undue head start.
+ workers.forEach((w)=>w.onmessage = workers.onmessage);
+})(self);
diff --git a/ext/wasm/tests/opfs/concurrency/worker.js b/ext/wasm/tests/opfs/concurrency/worker.js
new file mode 100644
index 000000000..7ba15bf8c
--- /dev/null
+++ b/ext/wasm/tests/opfs/concurrency/worker.js
@@ -0,0 +1,95 @@
+importScripts(
+ (new URL(self.location.href).searchParams).get('sqlite3.dir') + '/sqlite3.js'
+);
+self.sqlite3InitModule().then(async function(sqlite3){
+ const wName = Math.round(Math.random()*10000);
+ const wPost = (type,...payload)=>{
+ postMessage({type, worker: wName, payload});
+ };
+ const stdout = (...args)=>wPost('stdout',...args);
+ const stderr = (...args)=>wPost('stderr',...args);
+ const postErr = (...args)=>wPost('error',...args);
+ if(!sqlite3.opfs){
+ stderr("OPFS support not detected. Aborting.");
+ return;
+ }
+
+ const wait = async (ms)=>{
+ return new Promise((resolve)=>setTimeout(resolve,ms));
+ };
+
+ const dbName = 'concurrency-tester.db';
+ if((new URL(self.location.href).searchParams).has('unlink-db')){
+ await sqlite3.opfs.unlink(dbName);
+ stdout("Unlinked",dbName);
+ }
+ wPost('loaded');
+
+ const run = async function(){
+ const db = new sqlite3.opfs.OpfsDb(dbName);
+ //sqlite3.capi.sqlite3_busy_timeout(db.pointer, 2000);
+ db.transaction((db)=>{
+ db.exec([
+ "create table if not exists t1(w TEXT UNIQUE ON CONFLICT REPLACE,v);",
+ "create table if not exists t2(w TEXT UNIQUE ON CONFLICT REPLACE,v);"
+ ]);
+ });
+
+ const maxIterations = 10;
+ const interval = Object.assign(Object.create(null),{
+ delay: 300,
+ handle: undefined,
+ count: 0
+ });
+ stdout("Starting interval-based db updates with delay of",interval.delay,"ms.");
+ const doWork = async ()=>{
+ const tm = new Date().getTime();
+ ++interval.count;
+ const prefix = "v(#"+interval.count+")";
+ stdout("Setting",prefix,"=",tm);
+ try{
+ db.exec({
+ sql:"INSERT OR REPLACE INTO t1(w,v) VALUES(?,?)",
+ bind: [wName, new Date().getTime()]
+ });
+ //stdout("Set",prefix);
+ }catch(e){
+ interval.error = e;
+ }
+ };
+ const finish = ()=>{
+ if(interval.error) stderr("Ending work due to error:",e.message);
+ else stdout("Ending work after",interval.count,"interval(s)");
+ db.close();
+ };
+ if(1){/*use setInterval()*/
+ interval.handle = setInterval(async ()=>{
+ await doWork();
+ if(interval.error || maxIterations === interval.count){
+ clearInterval(interval.handle);
+ finish();
+ }
+ }, interval.delay);
+ }else{
+ /*This approach provides no concurrency whatsoever: each worker
+ is run to completion before any others can work.*/
+ let i;
+ for(i = 0; i < maxIterations; ++i){
+ await doWork();
+ if(interval.error) break;
+ await wait(interval.ms);
+ }
+ finish();
+ }
+ }/*run()*/;
+
+ self.onmessage = function({data}){
+ switch(data.type){
+ case 'run': run().catch((e)=>postErr(e.message));
+ break;
+ default:
+ stderr("Unhandled message type '"+data.type+"'.");
+ break;
+ }
+ };
+});