diff options
author | drh <> | 2025-05-03 08:17:46 +0000 |
---|---|---|
committer | drh <> | 2025-05-03 08:17:46 +0000 |
commit | f4d435dc0dd409167dd5cc28d3643052e91993a7 (patch) | |
tree | 7a591e08e73f0fd7ea4d797a048591171cd2f4dc | |
parent | 5db695197b74580c777b37ab1b787531f15f7f9f (diff) | |
parent | e2826db73ab4595754ab13d057099b705401272c (diff) | |
download | sqlite-f4d435dc0dd409167dd5cc28d3643052e91993a7.tar.gz sqlite-f4d435dc0dd409167dd5cc28d3643052e91993a7.zip |
Enhance sqlite3_rsync (in a backwards-compatible way) so that it has the
ability to send hashes for blocks of pages in addition to individual pages.
By judicious use of this capability, network bandwidth requirement to sync two
similar databases is reduced.
FossilOrigin-Name: e5d87aaa8fe1e8c8cb63813d26851183e77809a3d36e4c16c37f88b4b4724d6d
-rw-r--r-- | manifest | 13 | ||||
-rw-r--r-- | manifest.uuid | 2 | ||||
-rw-r--r-- | tool/sqlite3_rsync.c | 577 |
3 files changed, 481 insertions, 111 deletions
@@ -1,5 +1,5 @@ -C Do\snot\sallow\ssqlite3_rsync\sto\sconvert\sthe\sreplica\sfrom\sWAL-mode\sinto\nDELETE-mode,\sas\sthat\scan\sdisrupt\sexisting\sclients\son\sthe\sreplica\sside.\nDELETE-mode\sto\sWAL-mode\sconversions\sare\sallowed,\showever.\s\sSee\n[forum:/forumpost/6b575b66156673ee|forum\sthread\s6b575b66156]. -D 2025-05-02T11:18:09.023 +C Enhance\ssqlite3_rsync\s(in\sa\sbackwards-compatible\sway)\sso\sthat\sit\shas\sthe\nability\sto\ssend\shashes\sfor\sblocks\sof\spages\sin\saddition\sto\sindividual\spages.\nBy\sjudicious\suse\sof\sthis\scapability,\snetwork\sbandwidth\srequirement\sto\ssync\stwo\nsimilar\sdatabases\sis\sreduced. +D 2025-05-03T08:17:46.240 F .fossil-settings/binary-glob 61195414528fb3ea9693577e1980230d78a1f8b0a54c78cf1b9b24d0a409ed6a x F .fossil-settings/empty-dirs dbb81e8fc0401ac46a1491ab34a7f2c7c0452f2f06b54ebb845d024ca8283ef1 F .fossil-settings/ignore-glob 35175cdfcf539b2318cb04a9901442804be81cd677d8b889fcc9149c21f239ea @@ -2189,7 +2189,7 @@ F tool/spellsift.tcl 52b4b04dc4333c7ab024f09d9d66ed6b6f7c6eb00b38497a09f338fa55d F tool/split-sqlite3c.tcl 07e18a1d8cc3f6b3a4a1f3528e63c9b29a5c8a7bca0b8d394b231da464ce1247 F tool/sqldiff.c 134be7866be19f8beb32043d5aea5657f01aaeae2df8d33d758ff722c78666b9 F tool/sqlite3_analyzer.c.in 14f02cb5ec3c264cd6107d1f1dad77092b1cf440fc196c30b69ae87b56a1a43b -F tool/sqlite3_rsync.c a8e1962d9e0418b37d6865e483640c49498efe64bf542022e845b056f6eb9cce +F tool/sqlite3_rsync.c 756ad79858feaf050a9ff60ffdb201a7a2b6a1ee620ede5743a0cef51defcdc0 F tool/sqltclsh.c.in 1bcc2e9da58fadf17b0bf6a50e68c1159e602ce057210b655d50bad5aaaef898 F tool/sqltclsh.tcl 862f4cf1418df5e1315b5db3b5ebe88969e2a784525af5fbf9596592f14ed848 F tool/src-verify.c d00f93263aa2fa6ba0cba0106d95458e6effb94fdb5fc634f56834f90c05bbb4 @@ -2207,8 +2207,9 @@ F tool/version-info.c 3b36468a90faf1bbd59c65fd0eb66522d9f941eedd364fabccd7227350 F tool/warnings-clang.sh bbf6a1e685e534c92ec2bfba5b1745f34fb6f0bc2a362850723a9ee87c1b31a7 F tool/warnings.sh 49a486c5069de041aedcbde4de178293e0463ae9918ecad7539eedf0ec77a139 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f -P 4b53603fe468c0c28b818762917e41bdd870de6d4cc143688f1cdea3136c81a4 -R 77c0a71cca6be2a9f28e992f31255bba +P 660a035b6ce6684d429b882133e032181cc1664f4efadf1bc0e4ae27d45071c4 4f5a06e42010c3e047429f736ffb8e2e89a1eb599277c176945b57710f6713ca +R 2857adf0e10b56883ce6697a283afd6e +T +closed 4f5a06e42010c3e047429f736ffb8e2e89a1eb599277c176945b57710f6713ca U drh -Z 4ae06cab268d08247b608c21e7458e1c +Z c3005ffc7ac3fefab06e6e18950a9819 # Remove this line to create a well-formed Fossil manifest. diff --git a/manifest.uuid b/manifest.uuid index 0d00d48c7..b8cdd4302 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -660a035b6ce6684d429b882133e032181cc1664f4efadf1bc0e4ae27d45071c4 +e5d87aaa8fe1e8c8cb63813d26851183e77809a3d36e4c16c37f88b4b4724d6d diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c index 34faaf0fd..760559aec 100644 --- a/tool/sqlite3_rsync.c +++ b/tool/sqlite3_rsync.c @@ -46,9 +46,11 @@ struct SQLiteRsync { const char *zOrigin; /* Name of the origin */ const char *zReplica; /* Name of the replica */ const char *zErrFile; /* Append error messages to this file */ + const char *zDebugFile; /* Append debugging messages to this file */ FILE *pOut; /* Transmit to the other side */ FILE *pIn; /* Receive from the other side */ FILE *pLog; /* Duplicate output here if not NULL */ + FILE *pDebug; /* Write debug info here if not NULL */ sqlite3 *db; /* Database connection */ int nErr; /* Number of errors encountered */ int nWrErr; /* Number of failed attempts to write on the pipe */ @@ -70,25 +72,31 @@ struct SQLiteRsync { /* The version number of the protocol. Sent in the *_BEGIN message ** to verify that both sides speak the same dialect. */ -#define PROTOCOL_VERSION 1 +#define PROTOCOL_VERSION 2 /* Magic numbers to identify particular messages sent over the wire. */ +/**** Baseline: protocol version 1 ****/ #define ORIGIN_BEGIN 0x41 /* Initial message */ #define ORIGIN_END 0x42 /* Time to quit */ #define ORIGIN_ERROR 0x43 /* Error message from the remote */ #define ORIGIN_PAGE 0x44 /* New page data */ #define ORIGIN_TXN 0x45 /* Transaction commit */ #define ORIGIN_MSG 0x46 /* Informational message */ +/**** Added in protocol version 2 ****/ +#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */ +#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */ +/**** Baseline: protocol version 1 ****/ #define REPLICA_BEGIN 0x61 /* Welcome message */ #define REPLICA_ERROR 0x62 /* Error. Report and quit. */ #define REPLICA_END 0x63 /* Replica wants to stop */ #define REPLICA_HASH 0x64 /* One or more pages hashes to report */ #define REPLICA_READY 0x65 /* Read to receive page content */ #define REPLICA_MSG 0x66 /* Informational message */ - +/**** Added in protocol version 2 ****/ +#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */ /**************************************************************************** ** Beginning of the popen2() implementation copied from Fossil ************* @@ -796,11 +804,49 @@ static void hashFunc( sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT); } +/* +** Implementation of the agghash(X) function. +** +** Return a 160-bit BLOB which is the hash of the concatenation +** of all X inputs. +*/ +static void agghashStep( + sqlite3_context *context, + int argc, + sqlite3_value **argv +){ + HashContext *pCx; + int eType = sqlite3_value_type(argv[0]); + int nByte = sqlite3_value_bytes(argv[0]); + if( eType==SQLITE_NULL ) return; + pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx)); + if( pCx==0 ) return; + if( pCx->iSize==0 ) HashInit(pCx, 160); + if( eType==SQLITE_BLOB ){ + HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte); + }else{ + HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte); + } +} +static void agghashFinal(sqlite3_context *context){ + HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0); + if( pCx ){ + sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT); + } +} + /* Register the hash function */ static int hashRegister(sqlite3 *db){ - return sqlite3_create_function(db, "hash", 1, + int rc; + rc = sqlite3_create_function(db, "hash", 1, SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, 0, hashFunc, 0, 0); + if( rc==SQLITE_OK ){ + rc = sqlite3_create_function(db, "agghash", 1, + SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, + 0, 0, agghashStep, agghashFinal); + } + return rc; } /* End of the hashing logic @@ -838,6 +884,25 @@ static void logError(SQLiteRsync *p, const char *zFormat, ...){ p->nErr++; } +/* +** Append text to the debugging mesage file, if an that file is +** specified. +*/ +static void debugMessage(SQLiteRsync *p, const char *zFormat, ...){ + if( p->zDebugFile ){ + if( p->pDebug==0 ){ + p->pDebug = fopen(p->zDebugFile, "wb"); + } + if( p->pDebug ){ + va_list ap; + va_start(ap, zFormat); + vfprintf(p->pDebug, zFormat, ap); + va_end(ap); + fflush(p->pDebug); + } + } +} + /* Read a single big-endian 32-bit unsigned integer from the input ** stream. Return 0 on success and 1 if there are any errors. @@ -1192,6 +1257,13 @@ static void closeDb(SQLiteRsync *p){ ** nPage, and szPage. Then enter a loop responding to message from ** the replica: ** +** REPLICA_BEGIN iProtocol +** +** An optional message sent by the replica in response to the +** prior ORIGIN_BEGIN with a counter-proposal for the protocol +** level. If seen, try to reduce the protocol level to what is +** requested and send a new ORGIN_BEGIN. +** ** REPLICA_ERROR size text ** ** Report an error from the replica and quit @@ -1202,30 +1274,42 @@ static void closeDb(SQLiteRsync *p){ ** ** REPLICA_HASH hash ** -** The argument is the 20-byte SHA1 hash for the next page -** page hashes appear in sequential order with no gaps. +** The argument is the 20-byte SHA1 hash for the next page or +** block of pages. Hashes appear in sequential order with no gaps, +** unless there is an intervening REPLICA_CONFIG message. +** +** REPLICA_CONFIG pgno cnt +** +** Set counters used by REPLICA_HASH. The next hash will start +** on page pgno and all subsequent hashes will cover cnt pages +** each. Note that for a multi-page hash, the hash value is +** actually a hash of the individual page hashes. ** ** REPLICA_READY ** ** The replica has sent all the hashes that it intends to send. ** This side (the origin) can now start responding with page -** content for pages that do not have a matching hash. +** content for pages that do not have a matching hash or with +** ORIGIN_DETAIL messages with requests for more detail. */ static void originSide(SQLiteRsync *p){ int rc = 0; int c = 0; unsigned int nPage = 0; - unsigned int iPage = 0; + unsigned int iHash = 1; /* Pgno for next hash to receive */ + unsigned int nHash = 1; /* Number of pages per hash received */ + unsigned int mxHash = 0; /* Maximum hash value received */ unsigned int lockBytePage = 0; unsigned int szPg = 0; - sqlite3_stmt *pCkHash = 0; - sqlite3_stmt *pInsHash = 0; + sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */ + sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */ + sqlite3_stmt *pInsHash = 0; /* Record a bad hash */ char buf[200]; p->isReplica = 0; if( p->bCommCheck ){ infoMsg(p, "origin zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d", - p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION); + p->zOrigin, p->zReplica, p->isRemote, p->iProtocol); writeByte(p, ORIGIN_END); fflush(p->pOut); }else{ @@ -1251,13 +1335,15 @@ static void originSide(SQLiteRsync *p){ if( p->nErr==0 ){ /* Send the ORIGIN_BEGIN message */ writeByte(p, ORIGIN_BEGIN); - writeByte(p, PROTOCOL_VERSION); + writeByte(p, p->iProtocol); writePow2(p, szPg); writeUint32(p, nPage); fflush(p->pOut); + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_BEGIN %u %u %u\n", p->iProtocol,szPg,nPage); + } p->nPage = nPage; p->szPage = szPg; - p->iProtocol = PROTOCOL_VERSION; lockBytePage = (1<<30)/szPg + 1; } } @@ -1270,11 +1356,24 @@ static void originSide(SQLiteRsync *p){ ** that is larger than what it knows about. The replica sends back ** a counter-proposal of an earlier protocol which the origin can ** accept by resending a new ORIGIN_BEGIN. */ - p->iProtocol = readByte(p); - writeByte(p, ORIGIN_BEGIN); - writeByte(p, p->iProtocol); - writePow2(p, p->szPage); - writeUint32(p, p->nPage); + u8 newProtocol = readByte(p); + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_BEGIN %d\n", (int)newProtocol); + } + if( newProtocol < p->iProtocol ){ + p->iProtocol = newProtocol; + writeByte(p, ORIGIN_BEGIN); + writeByte(p, p->iProtocol); + writePow2(p, p->szPage); + writeUint32(p, p->nPage); + fflush(p->pOut); + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_BEGIN %d %d %u\n", p->iProtocol, + p->szPage, p->nPage); + } + }else{ + reportError(p, "Invalid REPLICA_BEGIN reply"); + } break; } case REPLICA_MSG: @@ -1282,25 +1381,73 @@ static void originSide(SQLiteRsync *p){ readAndDisplayMessage(p, c); break; } + case REPLICA_CONFIG: { + readUint32(p, &iHash); + readUint32(p, &nHash); + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_CONFIG %u %u\n", iHash, nHash); + } + break; + } case REPLICA_HASH: { + int bMatch = 0; if( pCkHash==0 ){ - runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)"); + runSql(p, "CREATE TEMP TABLE badHash(" + " pgno INTEGER PRIMARY KEY," + " sz INT)"); pCkHash = prepareStmt(p, - "SELECT pgno FROM sqlite_dbpage('main')" - " WHERE pgno=?1 AND hash(data)!=?2" + "SELECT hash(data)==?3 FROM sqlite_dbpage('main')" + " WHERE pgno=?1" ); if( pCkHash==0 ) break; - pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)"); + pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)"); if( pInsHash==0 ) break; } p->nHashSent++; - iPage++; - sqlite3_bind_int64(pCkHash, 1, iPage); readBytes(p, 20, buf); - sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC); - rc = sqlite3_step(pCkHash); - if( rc==SQLITE_ROW ){ - sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0)); + if( nHash>1 ){ + if( pCkHashN==0 ){ + pCkHashN = prepareStmt(p, + "WITH c(n) AS " + " (VALUES(?1) UNION ALL SELECT n+1 FROM c WHERE n<?2)" + "SELECT agghash(hash(data))==?3" + " FROM c CROSS JOIN sqlite_dbpage('main') ON pgno=n" + ); + if( pCkHashN==0 ) break; + } + sqlite3_bind_int64(pCkHashN, 1, iHash); + sqlite3_bind_int64(pCkHashN, 2, iHash + nHash - 1); + sqlite3_bind_blob(pCkHashN, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHashN); + if( rc==SQLITE_ROW ){ + bMatch = sqlite3_column_int(pCkHashN,0); + }else if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db)); + } + sqlite3_reset(pCkHashN); + }else{ + sqlite3_bind_int64(pCkHash, 1, iHash); + sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHash); + if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); + }else if( rc==SQLITE_ROW && sqlite3_column_int(pCkHash,0) ){ + bMatch = 1; + } + sqlite3_reset(pCkHash); + } + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_HASH %u %u %s %08x...\n", + iHash, nHash, + bMatch ? "match" : "fail", + *(unsigned int*)buf + ); + } + if( !bMatch ){ + sqlite3_bind_int64(pInsHash, 1, iHash); + sqlite3_bind_int64(pInsHash, 2, nHash); rc = sqlite3_step(pInsHash); if( rc!=SQLITE_DONE ){ reportError(p, "SQL statement [%s] failed: %s", @@ -1308,42 +1455,74 @@ static void originSide(SQLiteRsync *p){ } sqlite3_reset(pInsHash); } - else if( rc!=SQLITE_DONE ){ - reportError(p, "SQL statement [%s] failed: %s", - sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); - } - sqlite3_reset(pCkHash); + if( iHash+nHash>mxHash ) mxHash = iHash+nHash; + iHash += nHash; break; } case REPLICA_READY: { + int nMulti = 0; sqlite3_stmt *pStmt; - sqlite3_finalize(pCkHash); - sqlite3_finalize(pInsHash); - pCkHash = 0; - pInsHash = 0; - if( iPage+1<p->nPage ){ - runSql(p, "WITH RECURSIVE c(n) AS" - " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" - " INSERT INTO badHash SELECT n FROM c", - iPage+1, p->nPage); + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_READY\n"); } - runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); - pStmt = prepareStmt(p, - "SELECT pgno, data" - " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); + pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1"); if( pStmt==0 ) break; - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ + while( sqlite3_step(pStmt)==SQLITE_ROW ){ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); - const void *pContent = sqlite3_column_blob(pStmt, 1); - writeByte(p, ORIGIN_PAGE); + unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1); + writeByte(p, ORIGIN_DETAIL); writeUint32(p, pgno); - writeBytes(p, szPg, pContent); - p->nPageSent++; + writeUint32(p, cnt); + nMulti++; + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt); + } } sqlite3_finalize(pStmt); - writeByte(p, ORIGIN_TXN); - writeUint32(p, nPage); - writeByte(p, ORIGIN_END); + if( nMulti ){ + runSql(p, "DELETE FROM badHash WHERE sz>1"); + writeByte(p, ORIGIN_READY); + if( p->zDebugFile ) debugMessage(p, "-> ORIGIN_READY\n"); + }else{ + sqlite3_stmt *pStmt; + sqlite3_finalize(pCkHash); + sqlite3_finalize(pCkHashN); + sqlite3_finalize(pInsHash); + pCkHash = 0; + pInsHash = 0; + if( mxHash<p->nPage ){ + runSql(p, "WITH RECURSIVE c(n) AS" + " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + " INSERT INTO badHash SELECT n, 1 FROM c", + mxHash, p->nPage); + } + runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); + pStmt = prepareStmt(p, + "SELECT pgno, data" + " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW + && p->nErr==0 + && p->nWrErr==0 + ){ + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); + const void *pContent = sqlite3_column_blob(pStmt, 1); + writeByte(p, ORIGIN_PAGE); + writeUint32(p, pgno); + writeBytes(p, szPg, pContent); + p->nPageSent++; + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_PAGE %u\n", pgno); + } + } + sqlite3_finalize(pStmt); + writeByte(p, ORIGIN_TXN); + writeUint32(p, nPage); + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_TXN %u\n", nPage); + } + writeByte(p, ORIGIN_END); + } fflush(p->pOut); break; } @@ -1361,6 +1540,102 @@ static void originSide(SQLiteRsync *p){ } /* +** Send a REPLICA_HASH message for each entry in the sendHash table. +** The sendHash table looks like this: +** +** CREATE TABLE sendHash( +** fpg INTEGER PRIMARY KEY, -- Page number of the hash +** npg INT -- Number of pages in this hash +** ); +** +** If iHash is page number for the next page that the origin will +** be expecting, and nHash is the number of pages that the origin will +** be expecting in the hash that follows. Send a REPLICA_CONFIG message +** if either of these values if not correct. +*/ +static void sendHashMessages( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int iHash, /* Next page expected by origin */ + unsigned int nHash /* Next number of pages expected by origin */ +){ + sqlite3_stmt *pStmt; + pStmt = prepareStmt(p, + "SELECT if(npg==1," + " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg)," + " (WITH RECURSIVE c(n) AS" + " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg-1)" + " SELECT agghash(hash(data))" + " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash," + " fpg," + " npg" + " FROM sendHash ORDER BY fpg" + ); + while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ + const unsigned char *a = sqlite3_column_blob(pStmt, 0); + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1); + unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2); + if( pgno!=iHash || npg!=nHash ){ + writeByte(p, REPLICA_CONFIG); + writeUint32(p, pgno); + writeUint32(p, npg); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_CONFIG %u %u\n", pgno, npg); + } + } + if( a==0 ){ + if( p->zDebugFile ){ + debugMessage(p, "# Oops: No hash for %u %u\n", pgno, npg); + } + }else{ + writeByte(p, REPLICA_HASH); + writeBytes(p, 20, a); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_HASH %u %u (%08x...)\n", + pgno, npg, *(unsigned int*)a); + } + } + p->nHashSent++; + iHash = pgno + npg; + nHash = npg; + } + sqlite3_finalize(pStmt); + runSql(p, "DELETE FROM sendHash"); + writeByte(p, REPLICA_READY); + fflush(p->pOut); + if( p->zDebugFile ) debugMessage(p, "-> REPLICA_READY\n", iHash); +} + +/* +** Make entries in the sendHash table to send hashes for +** npg (mnemonic: Number of PaGes) pages starting with fpg +** (mnemonic: First PaGe). +*/ +static void subdivideHashRange( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int fpg, /* First page of the range */ + unsigned int npg /* Number of pages */ +){ + unsigned int nChunk; /* How many pages to request per hash */ + sqlite3_uint64 iEnd; /* One more than the last page */ + if( npg<=30 ){ + nChunk = 1; + }else if( npg<=1000 ){ + nChunk = 30; + }else{ + nChunk = 1000; + } + iEnd = fpg; + iEnd += npg; + runSql(p, + "WITH RECURSIVE c(n) AS" + " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)" + "REPLACE INTO sendHash(fpg,npg)" + " SELECT n, min(%llu-n,%u) FROM c", + fpg, nChunk, iEnd-nChunk, iEnd, nChunk + ); +} + +/* ** Run the replica-side protocol. The protocol is passive in the sense ** that it only response to message from the origin side. ** @@ -1370,15 +1645,35 @@ static void originSide(SQLiteRsync *p){ ** each page in the origin database (sent as a single-byte power-of-2), ** and the number of pages in the origin database. ** This procedure checks compatibility, and if everything is ok, -** it starts sending hashes of pages already present back to the origin. +** it starts sending hashes back to the origin using REPLICA_HASH +** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY. +** REPLICA_CONFIG is only sent if the protocol is 2 or greater. +** +** ORIGIN_ERROR size text +** +** Report an error and quit. +** +** ORIGIN_DETAIL pgno cnt +** +** The origin reports that a multi-page hash starting at pgno and +** spanning cnt pages failed to match. The origin is requesting +** details (more REPLICA_HASH message with a smaller cnt). The +** replica must wait on ORIGIN_READY before sending its reply. ** -** ORIGIN_ERROR size text +** ORIGIN_READY ** -** Report the received error and quit. +** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY +** is sent by the origin to indicate that it has finished sending +** requests for detail and is ready for the replicate to reply +** with a new round of REPLICA_CONFIG and REPLICA_HASH messages. ** -** ORIGIN_PAGE pgno content +** ORIGIN_PAGE pgno content ** -** Update the content of the given page. +** Once the origin believes it knows exactly which pages need to be +** updated in the replica, it starts sending those pages using these +** messages. These messages will only appear immediately after +** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and +** ORIGIN_PAGE messages in the same batch. ** ** ORIGIN_TXN pgno ** @@ -1399,10 +1694,11 @@ static void replicaSide(SQLiteRsync *p){ p->isReplica = 1; if( p->bCommCheck ){ infoMsg(p, "replica zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d", - p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION); + p->zOrigin, p->zReplica, p->isRemote, p->iProtocol); writeByte(p, REPLICA_END); fflush(p->pOut); } + if( p->iProtocol<=0 ) p->iProtocol = PROTOCOL_VERSION; /* Respond to message from the origin. The origin will initiate the ** the conversation with an ORIGIN_BEGIN message. @@ -1418,22 +1714,31 @@ static void replicaSide(SQLiteRsync *p){ unsigned int nOPage = 0; unsigned int nRPage = 0, szRPage = 0; int rc = 0; - sqlite3_stmt *pStmt = 0; + u8 iProtocol; closeDb(p); - p->iProtocol = readByte(p); + iProtocol = readByte(p); szOPage = readPow2(p); readUint32(p, &nOPage); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_BEGIN %d %d %u\n", iProtocol, szOPage, + nOPage); + } if( p->nErr ) break; - if( p->iProtocol>PROTOCOL_VERSION ){ + if( iProtocol>p->iProtocol ){ /* If the protocol version on the origin side is larger, send back ** a REPLICA_BEGIN message with the protocol version number of the ** replica side. This gives the origin an opportunity to resend ** a new ORIGIN_BEGIN with a reduced protocol version. */ writeByte(p, REPLICA_BEGIN); - writeByte(p, PROTOCOL_VERSION); + writeByte(p, p->iProtocol); + fflush(p->pOut); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_BEGIN %u\n", p->iProtocol); + } break; } + p->iProtocol = iProtocol; p->nPage = nOPage; p->szPage = szOPage; rc = sqlite3_open(":memory:", &p->db); @@ -1458,6 +1763,12 @@ static void replicaSide(SQLiteRsync *p){ closeDb(p); break; } + runSql(p, + "CREATE TABLE sendHash(" + " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */ + " npg INT" /* Number of pages in this hash */ + ")" + ); hashRegister(p->db); if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){ break; @@ -1484,26 +1795,43 @@ static void replicaSide(SQLiteRsync *p){ "replica is %d bytes", szOPage, szRPage); break; } - - pStmt = prepareStmt(p, - "SELECT hash(data) FROM sqlite_dbpage('replica')" - " WHERE pgno<=min(%d,%d)" - " ORDER BY pgno", nRPage, nOPage); - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ - const unsigned char *a = sqlite3_column_blob(pStmt, 0); - writeByte(p, REPLICA_HASH); - writeBytes(p, 20, a); - p->nHashSent++; + if( p->iProtocol<2 || nRPage<=100 ){ + runSql(p, + "WITH RECURSIVE c(n) AS" + "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c", + nRPage); + }else{ + runSql(p,"INSERT INTO sendHash VALUES(1,1)"); + subdivideHashRange(p, 2, nRPage); } - sqlite3_finalize(pStmt); - writeByte(p, REPLICA_READY); - fflush(p->pOut); + sendHashMessages(p, 1, 1); runSql(p, "PRAGMA writable_schema=ON"); break; } + case ORIGIN_DETAIL: { + unsigned int fpg, npg; + readUint32(p, &fpg); + readUint32(p, &npg); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_DETAIL %u %u\n", fpg, npg); + } + subdivideHashRange(p, fpg, npg); + break; + } + case ORIGIN_READY: { + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_READY\n"); + } + sendHashMessages(p, 0, 0); + break; + } case ORIGIN_TXN: { unsigned int nOPage = 0; readUint32(p, &nOPage); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_TXN %u\n", nOPage); + } if( pIns==0 ){ /* Nothing has changed */ runSql(p, "COMMIT"); @@ -1531,6 +1859,9 @@ static void replicaSide(SQLiteRsync *p){ unsigned int pgno = 0; int rc; readUint32(p, &pgno); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_PAGE %u\n", pgno); + } if( p->nErr ) break; if( pIns==0 ){ pIns = prepareStmt(p, @@ -1678,9 +2009,11 @@ int main(int argc, char const * const *argv){ sqlite3_int64 tmEnd; sqlite3_int64 tmElapse; const char *zRemoteErrFile = 0; + const char *zRemoteDebugFile = 0; #define cli_opt_val cmdline_option_value(argc, argv, ++i) memset(&ctx, 0, sizeof(ctx)); + ctx.iProtocol = PROTOCOL_VERSION; for(i=1; i<argc; i++){ const char *z = argv[i]; if( z[0]=='-' && z[1]=='-' && z[2]!=0 ) z++; @@ -1704,6 +2037,20 @@ int main(int argc, char const * const *argv){ zExe = cli_opt_val; continue; } + if( strcmp(z, "-wal-only")==0 ){ + ctx.bWalOnly = 1; + continue; + } + if( strcmp(z, "-version")==0 ){ + printf("%s\n", sqlite3_sourceid()); + return 0; + } + if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0 + || strcmp(z, "-?")==0 + ){ + printf("%s", zUsage); + return 0; + } if( strcmp(z, "-logfile")==0 ){ /* DEBUG OPTION: --logfile FILENAME ** Cause all local output traffic to be duplicated in FILENAME */ @@ -1729,39 +2076,49 @@ int main(int argc, char const * const *argv){ zRemoteErrFile = cli_opt_val; continue; } - if( strcmp(z, "-wal-only")==0 ){ - ctx.bWalOnly = 1; + if( strcmp(z, "-debugfile")==0 ){ + /* DEBUG OPTION: --debugfile FILENAME + ** Debugging messages on the local side are written into FILENAME */ + ctx.zDebugFile = cli_opt_val; continue; } - if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0 - || strcmp(z, "-?")==0 - ){ - printf("%s", zUsage); - return 0; - } - if( strcmp(z, "-version")==0 ){ - printf("%s\n", sqlite3_sourceid()); - return 0; + if( strcmp(z, "-remote-debugfile")==0 ){ + /* DEBUG OPTION: --remote-debugfile FILENAME + ** Error messages on the remote side are written into FILENAME on + ** the remote side. */ + zRemoteDebugFile = cli_opt_val; + continue; } - if( z[0]=='-' ){ - if( strcmp(z,"-commcheck")==0 ){ /* DEBUG ONLY */ - /* Run a communication check with the remote side. Do not attempt - ** to exchange any database connection */ - ctx.bCommCheck = 1; - continue; + if( strcmp(z, "-protocol")==0 ){ + /* DEBUG OPTION: --protocool N + ** Set the protocol version to N */ + ctx.iProtocol = atoi(cli_opt_val); + if( ctx.iProtocol<1 ){ + ctx.iProtocol = 1; + }else if( ctx.iProtocol>PROTOCOL_VERSION ){ + ctx.iProtocol = PROTOCOL_VERSION; } - if( strcmp(z,"-arg-escape-check")==0 ){ /* DEBUG ONLY */ - /* Test the append_escaped_arg() routine by using it to render a - ** copy of the input command-line, assuming all arguments except - ** this one are filenames. */ - sqlite3_str *pStr = sqlite3_str_new(0); - int k; - for(k=0; k<argc; k++){ - append_escaped_arg(pStr, argv[k], i!=k); - } - printf("%s\n", sqlite3_str_value(pStr)); - return 0; + continue; + } + if( strcmp(z,"-commcheck")==0 ){ /* DEBUG ONLY */ + /* Run a communication check with the remote side. Do not attempt + ** to exchange any database connection */ + ctx.bCommCheck = 1; + continue; + } + if( strcmp(z,"-arg-escape-check")==0 ){ /* DEBUG ONLY */ + /* Test the append_escaped_arg() routine by using it to render a + ** copy of the input command-line, assuming all arguments except + ** this one are filenames. */ + sqlite3_str *pStr = sqlite3_str_new(0); + int k; + for(k=0; k<argc; k++){ + append_escaped_arg(pStr, argv[k], i!=k); } + printf("%s\n", sqlite3_str_value(pStr)); + return 0; + } + if( z[i]=='-' ){ fprintf(stderr, "unknown option: \"%s\". Use --help for more detail.\n", z); return 1; @@ -1838,6 +2195,10 @@ int main(int argc, char const * const *argv){ append_escaped_arg(pStr, "--errorfile", 0); append_escaped_arg(pStr, zRemoteErrFile, 1); } + if( zRemoteDebugFile ){ + append_escaped_arg(pStr, "--debugfile", 0); + append_escaped_arg(pStr, zRemoteDebugFile, 1); + } if( ctx.bWalOnly ){ append_escaped_arg(pStr, "--wal-only", 0); } @@ -1867,6 +2228,10 @@ int main(int argc, char const * const *argv){ append_escaped_arg(pStr, "--errorfile", 0); append_escaped_arg(pStr, zRemoteErrFile, 1); } + if( zRemoteDebugFile ){ + append_escaped_arg(pStr, "--debugfile", 0); + append_escaped_arg(pStr, zRemoteDebugFile, 1); + } append_escaped_arg(pStr, file_tail(ctx.zOrigin), 1); append_escaped_arg(pStr, zDiv, 1); zCmd = sqlite3_str_finish(pStr); @@ -1888,6 +2253,10 @@ int main(int argc, char const * const *argv){ append_escaped_arg(pStr, "--errorfile", 0); append_escaped_arg(pStr, zRemoteErrFile, 1); } + if( zRemoteDebugFile ){ + append_escaped_arg(pStr, "--debugfile", 0); + append_escaped_arg(pStr, zRemoteDebugFile, 1); + } append_escaped_arg(pStr, ctx.zOrigin, 1); append_escaped_arg(pStr, ctx.zReplica, 1); zCmd = sqlite3_str_finish(pStr); |