diff options
Diffstat (limited to 'tool/sqlite3_rsync.c')
-rw-r--r-- | tool/sqlite3_rsync.c | 99 |
1 files changed, 59 insertions, 40 deletions
diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c index bf8a8dfb9..62f751ad3 100644 --- a/tool/sqlite3_rsync.c +++ b/tool/sqlite3_rsync.c @@ -1298,12 +1298,12 @@ static void originSide(SQLiteRsync *p){ unsigned int nPage = 0; unsigned int iHash = 1; /* Pgno for next hash to receive */ unsigned int nHash = 1; /* Number of pages per hash received */ + unsigned int mxHash = 0; /* Maximum hash value received */ unsigned int lockBytePage = 0; unsigned int szPg = 0; sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */ sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */ sqlite3_stmt *pInsHash = 0; /* Record a bad hash */ - unsigned int nMulti = 0; /* Multi-page hashes not matched */ char buf[200]; p->isReplica = 0; @@ -1387,38 +1387,37 @@ static void originSide(SQLiteRsync *p){ break; } case REPLICA_HASH: { + int bMatch = 0; if( pCkHash==0 ){ runSql(p, "CREATE TEMP TABLE badHash(" " pgno INTEGER PRIMARY KEY," " sz INT)"); pCkHash = prepareStmt(p, - "SELECT pgno FROM sqlite_dbpage('main')" - " WHERE pgno=?1 AND hash(data)!=?3" + "SELECT hash(data)==?3 FROM sqlite_dbpage('main')" + " WHERE pgno=?1" ); if( pCkHash==0 ) break; pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)"); if( pInsHash==0 ) break; } p->nHashSent++; - if( p->zDebugFile ){ - debugMessage(p, "<- REPLICA_HASH %u %u\n", iHash, nHash); - } + readBytes(p, 20, buf); if( nHash>1 ){ if( pCkHashN==0 ){ pCkHashN = prepareStmt(p, - "WITH a1(pgno) AS " - "(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgno<?2)" - "SELECT count(*) FROM a1 CROSS JOIN sqlite_dbpage('main')" - " USING(pgno)" - " HAVING agghash(hash(data))!=?3"); + "WITH c(n) AS " + " (VALUES(?1) UNION ALL SELECT n+1 FROM c WHERE n<?2)" + "SELECT agghash(hash(data))==?3" + " FROM c CROSS JOIN sqlite_dbpage('main') ON pgno=n" + ); if( pCkHashN==0 ) break; } sqlite3_bind_int64(pCkHashN, 1, iHash); - sqlite3_bind_int64(pCkHashN, 2, iHash + nHash); - readBytes(p, 20, buf); - sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); + sqlite3_bind_int64(pCkHashN, 2, iHash + nHash - 1); + sqlite3_bind_blob(pCkHashN, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHashN); if( rc==SQLITE_ROW ){ - nMulti++; + bMatch = sqlite3_column_int(pCkHashN,0); }else if( rc==SQLITE_ERROR ){ reportError(p, "SQL statement [%s] failed: %s", sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db)); @@ -1426,16 +1425,24 @@ static void originSide(SQLiteRsync *p){ sqlite3_reset(pCkHashN); }else{ sqlite3_bind_int64(pCkHash, 1, iHash); - readBytes(p, 20, buf); sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); rc = sqlite3_step(pCkHash); if( rc==SQLITE_ERROR ){ reportError(p, "SQL statement [%s] failed: %s", sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); + }else if( rc==SQLITE_ROW && sqlite3_column_int(pCkHash,0) ){ + bMatch = 1; } sqlite3_reset(pCkHash); } - if( rc==SQLITE_ROW ){ + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_HASH %u %u %s %08x...\n", + iHash, nHash, + bMatch ? "match" : "fail", + *(unsigned int*)buf + ); + } + if( !bMatch ){ sqlite3_bind_int64(pInsHash, 1, iHash); sqlite3_bind_int64(pInsHash, 2, nHash); rc = sqlite3_step(pInsHash); @@ -1445,30 +1452,32 @@ static void originSide(SQLiteRsync *p){ } sqlite3_reset(pInsHash); } + if( iHash+nHash>mxHash ) mxHash = iHash+nHash; iHash += nHash; break; } case REPLICA_READY: { + int nMulti = 0; + sqlite3_stmt *pStmt; if( p->zDebugFile ){ debugMessage(p, "<- REPLICA_READY\n"); } - if( nMulti>0 ){ - sqlite3_stmt *pStmt; - pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1"); - if( pStmt==0 ) break; - while( sqlite3_step(pStmt)==SQLITE_ROW ){ - unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); - unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1); - writeByte(p, ORIGIN_DETAIL); - writeUint32(p, pgno); - writeUint32(p, cnt); - if( p->zDebugFile ){ - debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt); - } + pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW ){ + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); + unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1); + writeByte(p, ORIGIN_DETAIL); + writeUint32(p, pgno); + writeUint32(p, cnt); + nMulti++; + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt); } - sqlite3_finalize(pStmt); + } + sqlite3_finalize(pStmt); + if( nMulti ){ runSql(p, "DELETE FROM badHash WHERE sz>1"); - nMulti = 0; writeByte(p, ORIGIN_READY); if( p->zDebugFile ) debugMessage(p, "-> ORIGIN_READY\n"); }else{ @@ -1478,11 +1487,11 @@ static void originSide(SQLiteRsync *p){ sqlite3_finalize(pInsHash); pCkHash = 0; pInsHash = 0; - if( iHash+1<p->nPage ){ + if( mxHash<p->nPage ){ runSql(p, "WITH RECURSIVE c(n) AS" " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" " INSERT INTO badHash SELECT n, 1 FROM c", - iHash+1, p->nPage); + mxHash, p->nPage); } runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); pStmt = prepareStmt(p, @@ -1551,7 +1560,7 @@ static void sendHashMessages( "SELECT if(npg==1," " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg)," " (WITH RECURSIVE c(n) AS" - " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg)" + " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg-1)" " SELECT agghash(hash(data))" " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash," " fpg," @@ -1570,9 +1579,18 @@ static void sendHashMessages( debugMessage(p, "-> REPLICA_CONFIG %u %u\n", pgno, npg); } } - writeByte(p, REPLICA_HASH); - writeBytes(p, 20, a); - if( p->zDebugFile ) debugMessage(p, "-> REPLICA_HASH %u\n", iHash); + if( a==0 ){ + if( p->zDebugFile ){ + debugMessage(p, "# Oops: No hash for %u %u\n", pgno, npg); + } + }else{ + writeByte(p, REPLICA_HASH); + writeBytes(p, 20, a); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_HASH %u %u (%08x...)\n", + pgno, npg, *(unsigned int*)a); + } + } p->nHashSent++; iHash = pgno + npg; nHash = npg; @@ -1770,14 +1788,15 @@ static void replicaSide(SQLiteRsync *p){ "replica is %d bytes", szOPage, szRPage); break; } - if( p->iProtocol<2 ){ + if( p->iProtocol<2 || nRPage<=100 ){ runSql(p, "WITH RECURSIVE c(n) AS" "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)" "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c", nRPage); }else{ - subdivideHashRange(p, 1, nRPage); + runSql(p,"INSERT INTO sendHash VALUES(1,1)"); + subdivideHashRange(p, 2, nRPage); } sendHashMessages(p, 1, 1); runSql(p, "PRAGMA writable_schema=ON"); |