aboutsummaryrefslogtreecommitdiff
path: root/src/wal.c
diff options
context:
space:
mode:
authordan <dan@noemail.net>2020-07-25 20:16:27 +0000
committerdan <dan@noemail.net>2020-07-25 20:16:27 +0000
commitd3e38b7c0ea40acedc5fc3d95bb86baebd77cb82 (patch)
tree111f99aa4f0cc2e58a3bba749d1884f68401024d /src/wal.c
parent8adc8f1ecb394f2780bfcc1507671424c753e6fa (diff)
downloadsqlite-d3e38b7c0ea40acedc5fc3d95bb86baebd77cb82.tar.gz
sqlite-d3e38b7c0ea40acedc5fc3d95bb86baebd77cb82.zip
Allow a wal mode recovery to proceed even if there are readers.
FossilOrigin-Name: 74374aebf9abf3d6b6a3920967a079ceaa4c6276dc6c177682742c2be405b7b7
Diffstat (limited to 'src/wal.c')
-rw-r--r--src/wal.c98
1 files changed, 61 insertions, 37 deletions
diff --git a/src/wal.c b/src/wal.c
index c6d4476e7..9ad0b496e 100644
--- a/src/wal.c
+++ b/src/wal.c
@@ -1161,12 +1161,6 @@ static int walIndexRecover(Wal *pWal){
assert( pWal->writeLock );
iLock = WAL_ALL_BUT_WRITE + pWal->ckptLock;
rc = walLockExclusive(pWal, iLock, WAL_READ_LOCK(0)-iLock);
- if( rc==SQLITE_OK ){
- rc = walLockExclusive(pWal, WAL_READ_LOCK(1), WAL_NREADER-1);
- if( rc!=SQLITE_OK ){
- walUnlockExclusive(pWal, iLock, WAL_READ_LOCK(0)-iLock);
- }
- }
if( rc ){
return rc;
}
@@ -1182,15 +1176,16 @@ static int walIndexRecover(Wal *pWal){
if( nSize>WAL_HDRSIZE ){
u8 aBuf[WAL_HDRSIZE]; /* Buffer to load WAL header into */
+ u32 *aPrivate = 0; /* Heap copy of *-shm hash being populated */
u8 *aFrame = 0; /* Malloc'd buffer to load entire frame */
int szFrame; /* Number of bytes in buffer aFrame[] */
u8 *aData; /* Pointer to data part of aFrame buffer */
- int iFrame; /* Index of last frame read */
- i64 iOffset; /* Next offset to read from log file */
int szPage; /* Page size according to the log */
u32 magic; /* Magic value read from WAL header */
u32 version; /* Magic value read from WAL header */
int isValid; /* True if this frame is valid */
+ int iPg; /* Current 32KB wal-index page */
+ int iLastFrame; /* Last frame in wal, based on nSize alone */
/* Read in the WAL header. */
rc = sqlite3OsRead(pWal->pWalFd, aBuf, WAL_HDRSIZE, 0);
@@ -1237,38 +1232,59 @@ static int walIndexRecover(Wal *pWal){
/* Malloc a buffer to read frames into. */
szFrame = szPage + WAL_FRAME_HDRSIZE;
- aFrame = (u8 *)sqlite3_malloc64(szFrame);
+ aFrame = (u8 *)sqlite3_malloc64(szFrame + WALINDEX_PGSZ);
if( !aFrame ){
rc = SQLITE_NOMEM_BKPT;
goto recovery_error;
}
aData = &aFrame[WAL_FRAME_HDRSIZE];
+ aPrivate = (u32*)&aData[szPage];
/* Read all frames from the log file. */
- iFrame = 0;
- for(iOffset=WAL_HDRSIZE; (iOffset+szFrame)<=nSize; iOffset+=szFrame){
- u32 pgno; /* Database page number for frame */
- u32 nTruncate; /* dbsize field from frame header */
-
- /* Read and decode the next log frame. */
- iFrame++;
- rc = sqlite3OsRead(pWal->pWalFd, aFrame, szFrame, iOffset);
- if( rc!=SQLITE_OK ) break;
- isValid = walDecodeFrame(pWal, &pgno, &nTruncate, aData, aFrame);
- if( !isValid ) break;
- rc = walIndexAppend(pWal, iFrame, pgno);
- if( rc!=SQLITE_OK ) break;
-
- /* If nTruncate is non-zero, this is a commit record. */
- if( nTruncate ){
- pWal->hdr.mxFrame = iFrame;
- pWal->hdr.nPage = nTruncate;
- pWal->hdr.szPage = (u16)((szPage&0xff00) | (szPage>>16));
- testcase( szPage<=32768 );
- testcase( szPage>=65536 );
- aFrameCksum[0] = pWal->hdr.aFrameCksum[0];
- aFrameCksum[1] = pWal->hdr.aFrameCksum[1];
+ iLastFrame = (nSize - WAL_HDRSIZE) / szFrame;
+ for(iPg=0; iPg<=walFramePage(iLastFrame); iPg++){
+ u32 *aShare;
+ int iFrame; /* Index of last frame read */
+ int iLast = MIN(iLastFrame, HASHTABLE_NPAGE_ONE+iPg*HASHTABLE_NPAGE);
+ int iFirst = 1 + (iPg==0?0:HASHTABLE_NPAGE_ONE+(iPg-1)*HASHTABLE_NPAGE);
+ rc = walIndexPage(pWal, iPg, (volatile u32**)&aShare);
+ if( rc ) break;
+ pWal->apWiData[iPg] = aPrivate;
+
+ for(iFrame=iFirst; iFrame<=iLast; iFrame++){
+ i64 iOffset = walFrameOffset(iFrame, szPage);
+ u32 pgno; /* Database page number for frame */
+ u32 nTruncate; /* dbsize field from frame header */
+
+ /* Read and decode the next log frame. */
+ rc = sqlite3OsRead(pWal->pWalFd, aFrame, szFrame, iOffset);
+ if( rc!=SQLITE_OK ) break;
+ isValid = walDecodeFrame(pWal, &pgno, &nTruncate, aData, aFrame);
+ if( !isValid ) break;
+ rc = walIndexAppend(pWal, iFrame, pgno);
+ if( rc!=SQLITE_OK ) break;
+
+ /* If nTruncate is non-zero, this is a commit record. */
+ if( nTruncate ){
+ pWal->hdr.mxFrame = iFrame;
+ pWal->hdr.nPage = nTruncate;
+ pWal->hdr.szPage = (u16)((szPage&0xff00) | (szPage>>16));
+ testcase( szPage<=32768 );
+ testcase( szPage>=65536 );
+ aFrameCksum[0] = pWal->hdr.aFrameCksum[0];
+ aFrameCksum[1] = pWal->hdr.aFrameCksum[1];
+ }
}
+ pWal->apWiData[iPg] = aShare;
+
+ {
+ int nHdr = (iPg==0 ? WALINDEX_HDR_SIZE : 0);
+ int nHdr32 = nHdr / sizeof(u32);
+ if( memcpy(&aShare[nHdr32], &aPrivate[nHdr32], WALINDEX_PGSZ-nHdr) ){
+ memcpy(&aShare[nHdr32], &aPrivate[nHdr32], WALINDEX_PGSZ-nHdr);
+ }
+ }
+ if( iFrame<=iLast ) break;
}
sqlite3_free(aFrame);
@@ -1283,15 +1299,24 @@ finished:
walIndexWriteHdr(pWal);
/* Reset the checkpoint-header. This is safe because this thread is
- ** currently holding locks that exclude all other readers, writers and
- ** checkpointers.
+ ** currently holding locks that exclude all other writers and
+ ** checkpointers. Then set the values of read-mark slots 1 through N.
*/
pInfo = walCkptInfo(pWal);
pInfo->nBackfill = 0;
pInfo->nBackfillAttempted = pWal->hdr.mxFrame;
pInfo->aReadMark[0] = 0;
- for(i=1; i<WAL_NREADER; i++) pInfo->aReadMark[i] = READMARK_NOT_USED;
- if( pWal->hdr.mxFrame ) pInfo->aReadMark[1] = pWal->hdr.mxFrame;
+ for(i=1; i<WAL_NREADER; i++){
+ rc = walLockExclusive(pWal, WAL_READ_LOCK(i), 1);
+ if( rc==SQLITE_OK ){
+ if( i==1 && pWal->hdr.mxFrame ){
+ pInfo->aReadMark[i] = pWal->hdr.mxFrame;
+ }else{
+ pInfo->aReadMark[i] = READMARK_NOT_USED;
+ }
+ walUnlockExclusive(pWal, WAL_READ_LOCK(i), 1);
+ }
+ }
/* If more than one frame was recovered from the log file, report an
** event via sqlite3_log(). This is to help with identifying performance
@@ -1309,7 +1334,6 @@ finished:
recovery_error:
WALTRACE(("WAL%p: recovery %s\n", pWal, rc ? "failed" : "ok"));
walUnlockExclusive(pWal, iLock, WAL_READ_LOCK(0)-iLock);
- walUnlockExclusive(pWal, WAL_READ_LOCK(1), WAL_NREADER-1);
return rc;
}