aboutsummaryrefslogtreecommitdiff
path: root/tool/sqlite3_rsync.c
diff options
context:
space:
mode:
authordrh <>2025-05-02 17:39:21 +0000
committerdrh <>2025-05-02 17:39:21 +0000
commitfbafaa561906e6c4ec38d225f33f33e3080782ad (patch)
treef96c4d2eb51d5dce71a6e6d10abe3a71d7d91633 /tool/sqlite3_rsync.c
parent5db695197b74580c777b37ab1b787531f15f7f9f (diff)
downloadsqlite-fbafaa561906e6c4ec38d225f33f33e3080782ad.tar.gz
sqlite-fbafaa561906e6c4ec38d225f33f33e3080782ad.zip
This is the start of an experiment in getting sqlite3_rsync to use less
bandwidth when the two databases are very similar, by sending hashes over blocks of pages initially, rather than over individual pages, then requesting more detail when hashes do not match. FossilOrigin-Name: 266b4b8f0104bd4b1cff87ed78b0223006bf661a9650294a2b330d50c7ee8a0c
Diffstat (limited to 'tool/sqlite3_rsync.c')
-rw-r--r--tool/sqlite3_rsync.c378
1 files changed, 305 insertions, 73 deletions
diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c
index 34faaf0fd..6fb365094 100644
--- a/tool/sqlite3_rsync.c
+++ b/tool/sqlite3_rsync.c
@@ -75,20 +75,26 @@ struct SQLiteRsync {
/* Magic numbers to identify particular messages sent over the wire.
*/
+/**** Baseline: protocol version 1 ****/
#define ORIGIN_BEGIN 0x41 /* Initial message */
#define ORIGIN_END 0x42 /* Time to quit */
#define ORIGIN_ERROR 0x43 /* Error message from the remote */
#define ORIGIN_PAGE 0x44 /* New page data */
#define ORIGIN_TXN 0x45 /* Transaction commit */
#define ORIGIN_MSG 0x46 /* Informational message */
+/**** Added in protocol version 2 ****/
+#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */
+#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */
+/**** Baseline: protocol version 1 ****/
#define REPLICA_BEGIN 0x61 /* Welcome message */
#define REPLICA_ERROR 0x62 /* Error. Report and quit. */
#define REPLICA_END 0x63 /* Replica wants to stop */
#define REPLICA_HASH 0x64 /* One or more pages hashes to report */
#define REPLICA_READY 0x65 /* Read to receive page content */
#define REPLICA_MSG 0x66 /* Informational message */
-
+/**** Added in protocol version 2 ****/
+#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */
/****************************************************************************
** Beginning of the popen2() implementation copied from Fossil *************
@@ -796,11 +802,49 @@ static void hashFunc(
sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT);
}
+/*
+** Implementation of the agghash(X) function.
+**
+** Return a 160-bit BLOB which is the hash of the concatenation
+** of all X inputs.
+*/
+static void agghashStep(
+ sqlite3_context *context,
+ int argc,
+ sqlite3_value **argv
+){
+ HashContext *pCx;
+ int eType = sqlite3_value_type(argv[0]);
+ int nByte = sqlite3_value_bytes(argv[0]);
+ if( eType==SQLITE_NULL ) return;
+ pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx));
+ if( pCx==0 ) return;
+ if( pCx->iSize==0 ) HashInit(pCx, 160);
+ if( eType==SQLITE_BLOB ){
+ HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte);
+ }else{
+ HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte);
+ }
+}
+static void agghashFinal(sqlite3_context *context){
+ HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0);
+ if( pCx ){
+ sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT);
+ }
+}
+
/* Register the hash function */
static int hashRegister(sqlite3 *db){
- return sqlite3_create_function(db, "hash", 1,
+ int rc;
+ rc = sqlite3_create_function(db, "hash", 1,
SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
0, hashFunc, 0, 0);
+ if( rc==SQLITE_OK ){
+ rc = sqlite3_create_function(db, "agghash", 1,
+ SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
+ 0, 0, agghashStep, agghashFinal);
+ }
+ return rc;
}
/* End of the hashing logic
@@ -1192,6 +1236,13 @@ static void closeDb(SQLiteRsync *p){
** nPage, and szPage. Then enter a loop responding to message from
** the replica:
**
+** REPLICA_BEGIN iProtocol
+**
+** An optional message sent by the replica in response to the
+** prior ORIGIN_BEGIN with a counter-proposal for the protocol
+** level. If seen, try to reduce the protocol level to what is
+** requested and send a new ORGIN_BEGIN.
+**
** REPLICA_ERROR size text
**
** Report an error from the replica and quit
@@ -1202,24 +1253,36 @@ static void closeDb(SQLiteRsync *p){
**
** REPLICA_HASH hash
**
-** The argument is the 20-byte SHA1 hash for the next page
-** page hashes appear in sequential order with no gaps.
+** The argument is the 20-byte SHA1 hash for the next page or
+** block of pages. Hashes appear in sequential order with no gaps,
+** unless there is an intervening REPLICA_CONFIG message.
+**
+** REPLICA_CONFIG pgno cnt
+**
+** Set counters used by REPLICA_HASH. The next hash will start
+** on page pgno and all subsequent hashes will cover cnt pages
+** each. Note that for a multi-page hash, the hash value is
+** actually a hash of the individual page hashes.
**
** REPLICA_READY
**
** The replica has sent all the hashes that it intends to send.
** This side (the origin) can now start responding with page
-** content for pages that do not have a matching hash.
+** content for pages that do not have a matching hash or with
+** ORIGIN_DETAIL messages with requests for more detail.
*/
static void originSide(SQLiteRsync *p){
int rc = 0;
int c = 0;
unsigned int nPage = 0;
- unsigned int iPage = 0;
+ unsigned int iHash = 1; /* Pgno for next hash to receive */
+ unsigned int nHash = 1; /* Number of pages per hash received */
unsigned int lockBytePage = 0;
unsigned int szPg = 0;
- sqlite3_stmt *pCkHash = 0;
- sqlite3_stmt *pInsHash = 0;
+ sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */
+ sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */
+ sqlite3_stmt *pInsHash = 0; /* Record a bad hash */
+ unsigned int nMulti = 0; /* Multi-page hashes not matched */
char buf[200];
p->isReplica = 0;
@@ -1270,11 +1333,16 @@ static void originSide(SQLiteRsync *p){
** that is larger than what it knows about. The replica sends back
** a counter-proposal of an earlier protocol which the origin can
** accept by resending a new ORIGIN_BEGIN. */
- p->iProtocol = readByte(p);
- writeByte(p, ORIGIN_BEGIN);
- writeByte(p, p->iProtocol);
- writePow2(p, p->szPage);
- writeUint32(p, p->nPage);
+ u8 newProtocol = readByte(p);
+ if( newProtocol < p->iProtocol ){
+ p->iProtocol = newProtocol;
+ writeByte(p, ORIGIN_BEGIN);
+ writeByte(p, p->iProtocol);
+ writePow2(p, p->szPage);
+ writeUint32(p, p->nPage);
+ }else{
+ reportError(p, "Invalid REPLICA_BEGIN reply");
+ }
break;
}
case REPLICA_MSG:
@@ -1282,25 +1350,60 @@ static void originSide(SQLiteRsync *p){
readAndDisplayMessage(p, c);
break;
}
+ case REPLICA_CONFIG: {
+ readUint32(p, &iHash);
+ readUint32(p, &nHash);
+ break;
+ }
case REPLICA_HASH: {
if( pCkHash==0 ){
- runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
+ runSql(p, "CREATE TEMP TABLE badHash("
+ " pgno INTEGER PRIMARY KEY,"
+ " sz INT)");
pCkHash = prepareStmt(p,
"SELECT pgno FROM sqlite_dbpage('main')"
- " WHERE pgno=?1 AND hash(data)!=?2"
+ " WHERE pgno=?1 AND hash(data)!=?3"
);
if( pCkHash==0 ) break;
- pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)");
+ pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)");
if( pInsHash==0 ) break;
}
p->nHashSent++;
- iPage++;
- sqlite3_bind_int64(pCkHash, 1, iPage);
- readBytes(p, 20, buf);
- sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC);
- rc = sqlite3_step(pCkHash);
+ if( nHash>1 ){
+ if( pCkHashN==0 ){
+ pCkHashN = prepareStmt(p,
+ "WITH a1(pgno) AS "
+ "(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgno<?2)"
+ "SELECT 1 FROM a1 CROSS JOIN sqlite_dbpage('main')"
+ " USING(pgno)"
+ " WHERE agghash(hash(data))!=?3");
+ if( pCkHashN==0 ) break;
+ }
+ sqlite3_bind_int64(pCkHashN, 1, iHash);
+ sqlite3_bind_int64(pCkHashN, 2, iHash + nHash);
+ readBytes(p, 20, buf);
+ sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+ if( rc==SQLITE_ROW ){
+ nMulti++;
+ }else if( rc==SQLITE_ERROR ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pCkHashN);
+ }else{
+ sqlite3_bind_int64(pCkHash, 1, iHash);
+ readBytes(p, 20, buf);
+ sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+ rc = sqlite3_step(pCkHash);
+ if( rc==SQLITE_ERROR ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pCkHash);
+ }
if( rc==SQLITE_ROW ){
- sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0));
+ sqlite3_bind_int64(pInsHash, 1, iHash);
+ sqlite3_bind_int64(pInsHash, 2, nHash);
rc = sqlite3_step(pInsHash);
if( rc!=SQLITE_DONE ){
reportError(p, "SQL statement [%s] failed: %s",
@@ -1308,42 +1411,57 @@ static void originSide(SQLiteRsync *p){
}
sqlite3_reset(pInsHash);
}
- else if( rc!=SQLITE_DONE ){
- reportError(p, "SQL statement [%s] failed: %s",
- sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
- }
- sqlite3_reset(pCkHash);
+ iHash += nHash;
break;
}
case REPLICA_READY: {
- sqlite3_stmt *pStmt;
- sqlite3_finalize(pCkHash);
- sqlite3_finalize(pInsHash);
- pCkHash = 0;
- pInsHash = 0;
- if( iPage+1<p->nPage ){
- runSql(p, "WITH RECURSIVE c(n) AS"
- " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
- " INSERT INTO badHash SELECT n FROM c",
- iPage+1, p->nPage);
- }
- runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
- pStmt = prepareStmt(p,
- "SELECT pgno, data"
- " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
- if( pStmt==0 ) break;
- while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
- unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
- const void *pContent = sqlite3_column_blob(pStmt, 1);
- writeByte(p, ORIGIN_PAGE);
- writeUint32(p, pgno);
- writeBytes(p, szPg, pContent);
- p->nPageSent++;
+ if( nMulti>0 ){
+ sqlite3_stmt *pStmt;
+ pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1");
+ if( pStmt==0 ) break;
+ while( sqlite3_step(pStmt)==SQLITE_ROW ){
+ writeByte(p, ORIGIN_DETAIL);
+ writeUint32(p, sqlite3_column_int(pStmt, 0));
+ writeUint32(p, sqlite3_column_int(pStmt, 1));
+ }
+ sqlite3_finalize(pStmt);
+ runSql(p, "DELETE FROM badHash WHERE sz>1");
+ nMulti = 0;
+ writeByte(p, ORIGIN_READY);
+ }else{
+ sqlite3_stmt *pStmt;
+ sqlite3_finalize(pCkHash);
+ sqlite3_finalize(pCkHashN);
+ sqlite3_finalize(pInsHash);
+ pCkHash = 0;
+ pInsHash = 0;
+ if( iHash+1<p->nPage ){
+ runSql(p, "WITH RECURSIVE c(n) AS"
+ " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+ " INSERT INTO badHash SELECT n, 1 FROM c",
+ iHash+1, p->nPage);
+ }
+ runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
+ pStmt = prepareStmt(p,
+ "SELECT pgno, data"
+ " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
+ if( pStmt==0 ) break;
+ while( sqlite3_step(pStmt)==SQLITE_ROW
+ && p->nErr==0
+ && p->nWrErr==0
+ ){
+ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
+ const void *pContent = sqlite3_column_blob(pStmt, 1);
+ writeByte(p, ORIGIN_PAGE);
+ writeUint32(p, pgno);
+ writeBytes(p, szPg, pContent);
+ p->nPageSent++;
+ }
+ sqlite3_finalize(pStmt);
+ writeByte(p, ORIGIN_TXN);
+ writeUint32(p, nPage);
+ writeByte(p, ORIGIN_END);
}
- sqlite3_finalize(pStmt);
- writeByte(p, ORIGIN_TXN);
- writeUint32(p, nPage);
- writeByte(p, ORIGIN_END);
fflush(p->pOut);
break;
}
@@ -1361,6 +1479,88 @@ static void originSide(SQLiteRsync *p){
}
/*
+** Send a REPLICA_HASH message for each entry in the sendHash table.
+** The sendHash table looks like this:
+**
+** CREATE TABLE sendHash(
+** fpg INTEGER PRIMARY KEY, -- Page number of the hash
+** npg INT -- Number of pages in this hash
+** );
+**
+** If iHash is page number for the next page that the origin will
+** be expecting, and nHash is the number of pages that the origin will
+** be expecting in the hash that follows. Send a REPLICA_CONFIG message
+** if either of these values if not correct.
+*/
+static void sendHashMessages(
+ SQLiteRsync *p, /* The replica-side of the sync */
+ unsigned int iHash, /* Next page expected by origin */
+ unsigned int nHash /* Next number of pages expected by origin */
+){
+ sqlite3_stmt *pStmt;
+ pStmt = prepareStmt(p,
+ "SELECT if(npg==1,"
+ " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg),"
+ " (WITH RECURSIVE c(n) AS"
+ " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg)"
+ " SELECT agghash(hash(data))"
+ " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash,"
+ " fpg,"
+ " npg"
+ " FROM sendHash ORDER BY fpg"
+ );
+ while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
+ const unsigned char *a = sqlite3_column_blob(pStmt, 0);
+ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1);
+ unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2);
+ if( pgno!=iHash || npg!=nHash ){
+ writeByte(p, REPLICA_CONFIG);
+ writeUint32(p, pgno);
+ writeUint32(p, npg);
+ }
+ writeByte(p, REPLICA_HASH);
+ writeBytes(p, 20, a);
+ p->nHashSent++;
+ iHash = pgno + npg;
+ nHash = npg;
+ }
+ sqlite3_finalize(pStmt);
+ runSql(p, "DELETE FROM sendHash");
+ writeByte(p, REPLICA_READY);
+ fflush(p->pOut);
+}
+
+/*
+** Make entries in the sendHash table to send hashes for
+** npg (mnemonic: Number of PaGes) pages starting with fpg
+** (mnemonic: First PaGe).
+*/
+static void subdivideHashRange(
+ SQLiteRsync *p, /* The replica-side of the sync */
+ unsigned int fpg, /* First page of the range */
+ unsigned int npg /* Number of pages */
+){
+ unsigned int nChunk; /* How many pages to request per hash */
+ sqlite3_uint64 iEnd; /* One more than the last page */
+ if( npg<=30 ){
+ nChunk = 1;
+ }else if( npg<=1000 ){
+ nChunk = 30;
+ }else{
+ nChunk = 1000;
+ }
+ iEnd = fpg;
+ iEnd += npg;
+ runSql(p,
+ "WITH RECURSIVE c(n) AS"
+ " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)"
+ "REPLACE INTO sendHash(fpg,npg)"
+ " SELECT n, min(%llu-fpg,%u) FROM c",
+ fpg, nChunk, iEnd, iEnd, nChunk
+ );
+}
+
+/*
** Run the replica-side protocol. The protocol is passive in the sense
** that it only response to message from the origin side.
**
@@ -1370,15 +1570,35 @@ static void originSide(SQLiteRsync *p){
** each page in the origin database (sent as a single-byte power-of-2),
** and the number of pages in the origin database.
** This procedure checks compatibility, and if everything is ok,
-** it starts sending hashes of pages already present back to the origin.
+** it starts sending hashes back to the origin using REPLICA_HASH
+** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY.
+** REPLICA_CONFIG is only sent if the protocol is 2 or greater.
+**
+** ORIGIN_ERROR size text
+**
+** Report an error and quit.
**
-** ORIGIN_ERROR size text
+** ORIGIN_DETAIL pgno cnt
**
-** Report the received error and quit.
+** The origin reports that a multi-page hash starting at pgno and
+** spanning cnt pages failed to match. The origin is requesting
+** details (more REPLICA_HASH message with a smaller cnt). The
+** replica must wait on ORIGIN_READY before sending its reply.
**
-** ORIGIN_PAGE pgno content
+** ORIGIN_READY
**
-** Update the content of the given page.
+** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY
+** is sent by the origin to indicate that it has finished sending
+** requests for detail and is ready for the replicate to reply
+** with a new round of REPLICA_CONFIG and REPLICA_HASH messages.
+**
+** ORIGIN_PAGE pgno content
+**
+** Once the origin believes it knows exactly which pages need to be
+** updated in the replica, it starts sending those pages using these
+** messages. These messages will only appear immediately after
+** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and
+** ORIGIN_PAGE messages in the same batch.
**
** ORIGIN_TXN pgno
**
@@ -1418,7 +1638,6 @@ static void replicaSide(SQLiteRsync *p){
unsigned int nOPage = 0;
unsigned int nRPage = 0, szRPage = 0;
int rc = 0;
- sqlite3_stmt *pStmt = 0;
closeDb(p);
p->iProtocol = readByte(p);
@@ -1458,6 +1677,12 @@ static void replicaSide(SQLiteRsync *p){
closeDb(p);
break;
}
+ runSql(p,
+ "CREATE TABLE sendHash("
+ " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */
+ " npg INT" /* Number of pages in this hash */
+ ")"
+ );
hashRegister(p->db);
if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){
break;
@@ -1484,23 +1709,30 @@ static void replicaSide(SQLiteRsync *p){
"replica is %d bytes", szOPage, szRPage);
break;
}
-
- pStmt = prepareStmt(p,
- "SELECT hash(data) FROM sqlite_dbpage('replica')"
- " WHERE pgno<=min(%d,%d)"
- " ORDER BY pgno", nRPage, nOPage);
- while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
- const unsigned char *a = sqlite3_column_blob(pStmt, 0);
- writeByte(p, REPLICA_HASH);
- writeBytes(p, 20, a);
- p->nHashSent++;
+ if( p->iProtocol<2 ){
+ runSql(p,
+ "WITH RECURSIVE c(n) AS"
+ "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+ "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c",
+ nRPage);
+ }else{
+ subdivideHashRange(p, 1, nRPage);
}
- sqlite3_finalize(pStmt);
- writeByte(p, REPLICA_READY);
- fflush(p->pOut);
+ sendHashMessages(p, 1, 1);
runSql(p, "PRAGMA writable_schema=ON");
break;
}
+ case ORIGIN_DETAIL: {
+ unsigned int fpg, npg;
+ readUint32(p, &fpg);
+ readUint32(p, &npg);
+ subdivideHashRange(p, fpg, npg);
+ break;
+ }
+ case ORIGIN_READY: {
+ sendHashMessages(p, 0, 0);
+ break;
+ }
case ORIGIN_TXN: {
unsigned int nOPage = 0;
readUint32(p, &nOPage);