diff options
Diffstat (limited to 'tool/sqlite3_rsync.c')
-rw-r--r-- | tool/sqlite3_rsync.c | 378 |
1 files changed, 305 insertions, 73 deletions
diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c index 34faaf0fd..6fb365094 100644 --- a/tool/sqlite3_rsync.c +++ b/tool/sqlite3_rsync.c @@ -75,20 +75,26 @@ struct SQLiteRsync { /* Magic numbers to identify particular messages sent over the wire. */ +/**** Baseline: protocol version 1 ****/ #define ORIGIN_BEGIN 0x41 /* Initial message */ #define ORIGIN_END 0x42 /* Time to quit */ #define ORIGIN_ERROR 0x43 /* Error message from the remote */ #define ORIGIN_PAGE 0x44 /* New page data */ #define ORIGIN_TXN 0x45 /* Transaction commit */ #define ORIGIN_MSG 0x46 /* Informational message */ +/**** Added in protocol version 2 ****/ +#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */ +#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */ +/**** Baseline: protocol version 1 ****/ #define REPLICA_BEGIN 0x61 /* Welcome message */ #define REPLICA_ERROR 0x62 /* Error. Report and quit. */ #define REPLICA_END 0x63 /* Replica wants to stop */ #define REPLICA_HASH 0x64 /* One or more pages hashes to report */ #define REPLICA_READY 0x65 /* Read to receive page content */ #define REPLICA_MSG 0x66 /* Informational message */ - +/**** Added in protocol version 2 ****/ +#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */ /**************************************************************************** ** Beginning of the popen2() implementation copied from Fossil ************* @@ -796,11 +802,49 @@ static void hashFunc( sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT); } +/* +** Implementation of the agghash(X) function. +** +** Return a 160-bit BLOB which is the hash of the concatenation +** of all X inputs. +*/ +static void agghashStep( + sqlite3_context *context, + int argc, + sqlite3_value **argv +){ + HashContext *pCx; + int eType = sqlite3_value_type(argv[0]); + int nByte = sqlite3_value_bytes(argv[0]); + if( eType==SQLITE_NULL ) return; + pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx)); + if( pCx==0 ) return; + if( pCx->iSize==0 ) HashInit(pCx, 160); + if( eType==SQLITE_BLOB ){ + HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte); + }else{ + HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte); + } +} +static void agghashFinal(sqlite3_context *context){ + HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0); + if( pCx ){ + sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT); + } +} + /* Register the hash function */ static int hashRegister(sqlite3 *db){ - return sqlite3_create_function(db, "hash", 1, + int rc; + rc = sqlite3_create_function(db, "hash", 1, SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, 0, hashFunc, 0, 0); + if( rc==SQLITE_OK ){ + rc = sqlite3_create_function(db, "agghash", 1, + SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, + 0, 0, agghashStep, agghashFinal); + } + return rc; } /* End of the hashing logic @@ -1192,6 +1236,13 @@ static void closeDb(SQLiteRsync *p){ ** nPage, and szPage. Then enter a loop responding to message from ** the replica: ** +** REPLICA_BEGIN iProtocol +** +** An optional message sent by the replica in response to the +** prior ORIGIN_BEGIN with a counter-proposal for the protocol +** level. If seen, try to reduce the protocol level to what is +** requested and send a new ORGIN_BEGIN. +** ** REPLICA_ERROR size text ** ** Report an error from the replica and quit @@ -1202,24 +1253,36 @@ static void closeDb(SQLiteRsync *p){ ** ** REPLICA_HASH hash ** -** The argument is the 20-byte SHA1 hash for the next page -** page hashes appear in sequential order with no gaps. +** The argument is the 20-byte SHA1 hash for the next page or +** block of pages. Hashes appear in sequential order with no gaps, +** unless there is an intervening REPLICA_CONFIG message. +** +** REPLICA_CONFIG pgno cnt +** +** Set counters used by REPLICA_HASH. The next hash will start +** on page pgno and all subsequent hashes will cover cnt pages +** each. Note that for a multi-page hash, the hash value is +** actually a hash of the individual page hashes. ** ** REPLICA_READY ** ** The replica has sent all the hashes that it intends to send. ** This side (the origin) can now start responding with page -** content for pages that do not have a matching hash. +** content for pages that do not have a matching hash or with +** ORIGIN_DETAIL messages with requests for more detail. */ static void originSide(SQLiteRsync *p){ int rc = 0; int c = 0; unsigned int nPage = 0; - unsigned int iPage = 0; + unsigned int iHash = 1; /* Pgno for next hash to receive */ + unsigned int nHash = 1; /* Number of pages per hash received */ unsigned int lockBytePage = 0; unsigned int szPg = 0; - sqlite3_stmt *pCkHash = 0; - sqlite3_stmt *pInsHash = 0; + sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */ + sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */ + sqlite3_stmt *pInsHash = 0; /* Record a bad hash */ + unsigned int nMulti = 0; /* Multi-page hashes not matched */ char buf[200]; p->isReplica = 0; @@ -1270,11 +1333,16 @@ static void originSide(SQLiteRsync *p){ ** that is larger than what it knows about. The replica sends back ** a counter-proposal of an earlier protocol which the origin can ** accept by resending a new ORIGIN_BEGIN. */ - p->iProtocol = readByte(p); - writeByte(p, ORIGIN_BEGIN); - writeByte(p, p->iProtocol); - writePow2(p, p->szPage); - writeUint32(p, p->nPage); + u8 newProtocol = readByte(p); + if( newProtocol < p->iProtocol ){ + p->iProtocol = newProtocol; + writeByte(p, ORIGIN_BEGIN); + writeByte(p, p->iProtocol); + writePow2(p, p->szPage); + writeUint32(p, p->nPage); + }else{ + reportError(p, "Invalid REPLICA_BEGIN reply"); + } break; } case REPLICA_MSG: @@ -1282,25 +1350,60 @@ static void originSide(SQLiteRsync *p){ readAndDisplayMessage(p, c); break; } + case REPLICA_CONFIG: { + readUint32(p, &iHash); + readUint32(p, &nHash); + break; + } case REPLICA_HASH: { if( pCkHash==0 ){ - runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)"); + runSql(p, "CREATE TEMP TABLE badHash(" + " pgno INTEGER PRIMARY KEY," + " sz INT)"); pCkHash = prepareStmt(p, "SELECT pgno FROM sqlite_dbpage('main')" - " WHERE pgno=?1 AND hash(data)!=?2" + " WHERE pgno=?1 AND hash(data)!=?3" ); if( pCkHash==0 ) break; - pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)"); + pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)"); if( pInsHash==0 ) break; } p->nHashSent++; - iPage++; - sqlite3_bind_int64(pCkHash, 1, iPage); - readBytes(p, 20, buf); - sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC); - rc = sqlite3_step(pCkHash); + if( nHash>1 ){ + if( pCkHashN==0 ){ + pCkHashN = prepareStmt(p, + "WITH a1(pgno) AS " + "(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgno<?2)" + "SELECT 1 FROM a1 CROSS JOIN sqlite_dbpage('main')" + " USING(pgno)" + " WHERE agghash(hash(data))!=?3"); + if( pCkHashN==0 ) break; + } + sqlite3_bind_int64(pCkHashN, 1, iHash); + sqlite3_bind_int64(pCkHashN, 2, iHash + nHash); + readBytes(p, 20, buf); + sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); + if( rc==SQLITE_ROW ){ + nMulti++; + }else if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db)); + } + sqlite3_reset(pCkHashN); + }else{ + sqlite3_bind_int64(pCkHash, 1, iHash); + readBytes(p, 20, buf); + sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHash); + if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); + } + sqlite3_reset(pCkHash); + } if( rc==SQLITE_ROW ){ - sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0)); + sqlite3_bind_int64(pInsHash, 1, iHash); + sqlite3_bind_int64(pInsHash, 2, nHash); rc = sqlite3_step(pInsHash); if( rc!=SQLITE_DONE ){ reportError(p, "SQL statement [%s] failed: %s", @@ -1308,42 +1411,57 @@ static void originSide(SQLiteRsync *p){ } sqlite3_reset(pInsHash); } - else if( rc!=SQLITE_DONE ){ - reportError(p, "SQL statement [%s] failed: %s", - sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); - } - sqlite3_reset(pCkHash); + iHash += nHash; break; } case REPLICA_READY: { - sqlite3_stmt *pStmt; - sqlite3_finalize(pCkHash); - sqlite3_finalize(pInsHash); - pCkHash = 0; - pInsHash = 0; - if( iPage+1<p->nPage ){ - runSql(p, "WITH RECURSIVE c(n) AS" - " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" - " INSERT INTO badHash SELECT n FROM c", - iPage+1, p->nPage); - } - runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); - pStmt = prepareStmt(p, - "SELECT pgno, data" - " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); - if( pStmt==0 ) break; - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ - unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); - const void *pContent = sqlite3_column_blob(pStmt, 1); - writeByte(p, ORIGIN_PAGE); - writeUint32(p, pgno); - writeBytes(p, szPg, pContent); - p->nPageSent++; + if( nMulti>0 ){ + sqlite3_stmt *pStmt; + pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW ){ + writeByte(p, ORIGIN_DETAIL); + writeUint32(p, sqlite3_column_int(pStmt, 0)); + writeUint32(p, sqlite3_column_int(pStmt, 1)); + } + sqlite3_finalize(pStmt); + runSql(p, "DELETE FROM badHash WHERE sz>1"); + nMulti = 0; + writeByte(p, ORIGIN_READY); + }else{ + sqlite3_stmt *pStmt; + sqlite3_finalize(pCkHash); + sqlite3_finalize(pCkHashN); + sqlite3_finalize(pInsHash); + pCkHash = 0; + pInsHash = 0; + if( iHash+1<p->nPage ){ + runSql(p, "WITH RECURSIVE c(n) AS" + " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + " INSERT INTO badHash SELECT n, 1 FROM c", + iHash+1, p->nPage); + } + runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); + pStmt = prepareStmt(p, + "SELECT pgno, data" + " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW + && p->nErr==0 + && p->nWrErr==0 + ){ + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); + const void *pContent = sqlite3_column_blob(pStmt, 1); + writeByte(p, ORIGIN_PAGE); + writeUint32(p, pgno); + writeBytes(p, szPg, pContent); + p->nPageSent++; + } + sqlite3_finalize(pStmt); + writeByte(p, ORIGIN_TXN); + writeUint32(p, nPage); + writeByte(p, ORIGIN_END); } - sqlite3_finalize(pStmt); - writeByte(p, ORIGIN_TXN); - writeUint32(p, nPage); - writeByte(p, ORIGIN_END); fflush(p->pOut); break; } @@ -1361,6 +1479,88 @@ static void originSide(SQLiteRsync *p){ } /* +** Send a REPLICA_HASH message for each entry in the sendHash table. +** The sendHash table looks like this: +** +** CREATE TABLE sendHash( +** fpg INTEGER PRIMARY KEY, -- Page number of the hash +** npg INT -- Number of pages in this hash +** ); +** +** If iHash is page number for the next page that the origin will +** be expecting, and nHash is the number of pages that the origin will +** be expecting in the hash that follows. Send a REPLICA_CONFIG message +** if either of these values if not correct. +*/ +static void sendHashMessages( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int iHash, /* Next page expected by origin */ + unsigned int nHash /* Next number of pages expected by origin */ +){ + sqlite3_stmt *pStmt; + pStmt = prepareStmt(p, + "SELECT if(npg==1," + " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg)," + " (WITH RECURSIVE c(n) AS" + " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg)" + " SELECT agghash(hash(data))" + " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash," + " fpg," + " npg" + " FROM sendHash ORDER BY fpg" + ); + while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ + const unsigned char *a = sqlite3_column_blob(pStmt, 0); + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1); + unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2); + if( pgno!=iHash || npg!=nHash ){ + writeByte(p, REPLICA_CONFIG); + writeUint32(p, pgno); + writeUint32(p, npg); + } + writeByte(p, REPLICA_HASH); + writeBytes(p, 20, a); + p->nHashSent++; + iHash = pgno + npg; + nHash = npg; + } + sqlite3_finalize(pStmt); + runSql(p, "DELETE FROM sendHash"); + writeByte(p, REPLICA_READY); + fflush(p->pOut); +} + +/* +** Make entries in the sendHash table to send hashes for +** npg (mnemonic: Number of PaGes) pages starting with fpg +** (mnemonic: First PaGe). +*/ +static void subdivideHashRange( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int fpg, /* First page of the range */ + unsigned int npg /* Number of pages */ +){ + unsigned int nChunk; /* How many pages to request per hash */ + sqlite3_uint64 iEnd; /* One more than the last page */ + if( npg<=30 ){ + nChunk = 1; + }else if( npg<=1000 ){ + nChunk = 30; + }else{ + nChunk = 1000; + } + iEnd = fpg; + iEnd += npg; + runSql(p, + "WITH RECURSIVE c(n) AS" + " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)" + "REPLACE INTO sendHash(fpg,npg)" + " SELECT n, min(%llu-fpg,%u) FROM c", + fpg, nChunk, iEnd, iEnd, nChunk + ); +} + +/* ** Run the replica-side protocol. The protocol is passive in the sense ** that it only response to message from the origin side. ** @@ -1370,15 +1570,35 @@ static void originSide(SQLiteRsync *p){ ** each page in the origin database (sent as a single-byte power-of-2), ** and the number of pages in the origin database. ** This procedure checks compatibility, and if everything is ok, -** it starts sending hashes of pages already present back to the origin. +** it starts sending hashes back to the origin using REPLICA_HASH +** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY. +** REPLICA_CONFIG is only sent if the protocol is 2 or greater. +** +** ORIGIN_ERROR size text +** +** Report an error and quit. ** -** ORIGIN_ERROR size text +** ORIGIN_DETAIL pgno cnt ** -** Report the received error and quit. +** The origin reports that a multi-page hash starting at pgno and +** spanning cnt pages failed to match. The origin is requesting +** details (more REPLICA_HASH message with a smaller cnt). The +** replica must wait on ORIGIN_READY before sending its reply. ** -** ORIGIN_PAGE pgno content +** ORIGIN_READY ** -** Update the content of the given page. +** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY +** is sent by the origin to indicate that it has finished sending +** requests for detail and is ready for the replicate to reply +** with a new round of REPLICA_CONFIG and REPLICA_HASH messages. +** +** ORIGIN_PAGE pgno content +** +** Once the origin believes it knows exactly which pages need to be +** updated in the replica, it starts sending those pages using these +** messages. These messages will only appear immediately after +** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and +** ORIGIN_PAGE messages in the same batch. ** ** ORIGIN_TXN pgno ** @@ -1418,7 +1638,6 @@ static void replicaSide(SQLiteRsync *p){ unsigned int nOPage = 0; unsigned int nRPage = 0, szRPage = 0; int rc = 0; - sqlite3_stmt *pStmt = 0; closeDb(p); p->iProtocol = readByte(p); @@ -1458,6 +1677,12 @@ static void replicaSide(SQLiteRsync *p){ closeDb(p); break; } + runSql(p, + "CREATE TABLE sendHash(" + " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */ + " npg INT" /* Number of pages in this hash */ + ")" + ); hashRegister(p->db); if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){ break; @@ -1484,23 +1709,30 @@ static void replicaSide(SQLiteRsync *p){ "replica is %d bytes", szOPage, szRPage); break; } - - pStmt = prepareStmt(p, - "SELECT hash(data) FROM sqlite_dbpage('replica')" - " WHERE pgno<=min(%d,%d)" - " ORDER BY pgno", nRPage, nOPage); - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ - const unsigned char *a = sqlite3_column_blob(pStmt, 0); - writeByte(p, REPLICA_HASH); - writeBytes(p, 20, a); - p->nHashSent++; + if( p->iProtocol<2 ){ + runSql(p, + "WITH RECURSIVE c(n) AS" + "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c", + nRPage); + }else{ + subdivideHashRange(p, 1, nRPage); } - sqlite3_finalize(pStmt); - writeByte(p, REPLICA_READY); - fflush(p->pOut); + sendHashMessages(p, 1, 1); runSql(p, "PRAGMA writable_schema=ON"); break; } + case ORIGIN_DETAIL: { + unsigned int fpg, npg; + readUint32(p, &fpg); + readUint32(p, &npg); + subdivideHashRange(p, fpg, npg); + break; + } + case ORIGIN_READY: { + sendHashMessages(p, 0, 0); + break; + } case ORIGIN_TXN: { unsigned int nOPage = 0; readUint32(p, &nOPage); |