diff options
Diffstat (limited to 'ext/session/sqlite3session.c')
-rw-r--r-- | ext/session/sqlite3session.c | 213 |
1 files changed, 175 insertions, 38 deletions
diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c index 256d5f089..62adb8de2 100644 --- a/ext/session/sqlite3session.c +++ b/ext/session/sqlite3session.c @@ -35,6 +35,7 @@ struct sqlite3_session { struct sqlite3_changeset_iter { u8 *aChangeset; /* Pointer to buffer containing changeset */ int nChangeset; /* Number of bytes in aChangeset */ + int bPatchset; /* True if this is a patchset */ u8 *pNext; /* Pointer to next change within aChangeset */ int rc; /* Iterator error code */ sqlite3_stmt *pConflict; /* Points to conflicting row, if any */ @@ -122,6 +123,7 @@ struct SessionTable { ** ** 1 byte: Constant 0x54 (capital 'T') ** Varint: Big-endian integer set to the number of columns in the table. +** nCol bytes: 0x01 for PK columns, 0x00 otherwise. ** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated. ** ** Followed by one or more changes to the table. @@ -130,6 +132,25 @@ struct SessionTable { ** 1 byte: The "indirect-change" flag. ** old.* record: (delete and update only) ** new.* record: (insert and update only) +** +** PATCHSET FORMAT: +** +** A patchset is also a collection of changes. It is similar to a changeset, +** but omits those fields that are not useful if no conflict resolution +** is required when applying the changeset. +** +** Each group of changes begins with a table header: +** +** 1 byte: Constant 0x50 (capital 'P') +** Varint: Big-endian integer set to the number of columns in the table. +** nCol bytes: 0x01 for PK columns, 0x00 otherwise. +** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated. +** +** Followed by one or more changes to the table. +** +** 1 byte: Either SQLITE_INSERT, UPDATE or DELETE. +** 1 byte: The "indirect-change" flag. +** single record: (PK fields for DELETE, or full record for INSERT/UPDATE). */ /* @@ -1449,6 +1470,7 @@ static void sessionAppendCol( */ static int sessionAppendUpdate( SessionBuffer *pBuf, /* Buffer to append to */ + int bPatchset, /* True for "patchset", 0 for "changeset" */ sqlite3_stmt *pStmt, /* Statement handle pointing at new row */ SessionChange *p, /* Object containing old values */ u8 *abPK /* Boolean array - true for PK columns */ @@ -1506,15 +1528,23 @@ static int sessionAppendUpdate( } } - if( bChanged || abPK[i] ){ - sessionAppendBlob(pBuf, pCsr, nAdvance, &rc); - }else{ - sessionAppendByte(pBuf, 0, &rc); + /* If at least one field has been modified, this is not a no-op. */ + if( bChanged ) bNoop = 0; + + /* Add a field to the old.* record. This is omitted if this modules is + ** currently generating a patchset. */ + if( bPatchset==0 ){ + if( bChanged || abPK[i] ){ + sessionAppendBlob(pBuf, pCsr, nAdvance, &rc); + }else{ + sessionAppendByte(pBuf, 0, &rc); + } } - if( bChanged ){ + /* Add a field to the new.* record. Or the only record if currently + ** generating a patchset. */ + if( bChanged || (bPatchset && abPK[i]) ){ sessionAppendCol(&buf2, pStmt, i, &rc); - bNoop = 0; }else{ sessionAppendByte(&buf2, 0, &rc); } @@ -1532,6 +1562,56 @@ static int sessionAppendUpdate( return rc; } +static int sessionAppendDelete( + SessionBuffer *pBuf, /* Buffer to append to */ + int bPatchset, /* True for "patchset", 0 for "changeset" */ + sqlite3_stmt *pStmt, /* Statement handle pointing at new row */ + SessionChange *p, /* Object containing old values */ + u8 *abPK /* Boolean array - true for PK columns */ +){ + int rc = SQLITE_OK; + + sessionAppendByte(pBuf, SQLITE_DELETE, &rc); + sessionAppendByte(pBuf, p->bIndirect, &rc); + + if( bPatchset==0 ){ + sessionAppendBlob(pBuf, p->aRecord, p->nRecord, &rc); + }else{ + int nCol = sqlite3_column_count(pStmt); + int i; + u8 *a = p->aRecord; + for(i=0; i<nCol; i++){ + u8 *pStart = a; + int eType = *a++; + + switch( eType ){ + case 0: + case SQLITE_NULL: + assert( abPK[i]==0 ); + break; + + case SQLITE_FLOAT: + case SQLITE_INTEGER: + a += 8; + break; + + default: { + int n; + a += sessionVarintGet(a, &n); + a += n; + break; + } + } + if( abPK[i] ){ + sessionAppendBlob(pBuf, pStart, a-pStart, &rc); + } + } + assert( (a - p->aRecord)==p->nRecord ); + } + + return rc; +} + /* ** Formulate and prepare a SELECT statement to retrieve a row from table ** zTab in database zDb based on its primary key. i.e. @@ -1654,25 +1734,20 @@ static int sessionSelectBind( */ static void sessionAppendTableHdr( SessionBuffer *pBuf, + int bPatchset, SessionTable *pTab, int *pRc ){ /* Write a table header */ - sessionAppendByte(pBuf, 'T', pRc); + sessionAppendByte(pBuf, (bPatchset ? 'P' : 'T'), pRc); sessionAppendVarint(pBuf, pTab->nCol, pRc); sessionAppendBlob(pBuf, pTab->abPK, pTab->nCol, pRc); sessionAppendBlob(pBuf, (u8 *)pTab->zName, (int)strlen(pTab->zName)+1, pRc); } -/* -** Obtain a changeset object containing all changes recorded by the -** session object passed as the first argument. -** -** It is the responsibility of the caller to eventually free the buffer -** using sqlite3_free(). -*/ -int sqlite3session_changeset( +int sessionGenerateChangeset( sqlite3_session *pSession, /* Session object */ + int bPatchset, /* True for patchset, false for changeset */ int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ void **ppChangeset /* OUT: Buffer containing changeset */ ){ @@ -1711,7 +1786,7 @@ int sqlite3session_changeset( } /* Write a table header */ - sessionAppendTableHdr(&buf, pTab, &rc); + sessionAppendTableHdr(&buf, bPatchset, pTab, &rc); /* Build and compile a statement to execute: */ if( rc==SQLITE_OK ){ @@ -1735,13 +1810,10 @@ int sqlite3session_changeset( sessionAppendCol(&buf, pSel, iCol, &rc); } }else{ - rc = sessionAppendUpdate(&buf, pSel, p, abPK); + rc = sessionAppendUpdate(&buf, bPatchset, pSel, p, abPK); } }else if( p->op!=SQLITE_INSERT ){ - /* A DELETE change */ - sessionAppendByte(&buf, SQLITE_DELETE, &rc); - sessionAppendByte(&buf, p->bIndirect, &rc); - sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc); + rc = sessionAppendDelete(&buf, bPatchset, pSel, p, abPK); } if( rc==SQLITE_OK ){ rc = sqlite3_reset(pSel); @@ -1770,6 +1842,36 @@ int sqlite3session_changeset( } /* +** Obtain a changeset object containing all changes recorded by the +** session object passed as the first argument. +** +** It is the responsibility of the caller to eventually free the buffer +** using sqlite3_free(). +*/ +int sqlite3session_changeset( + sqlite3_session *pSession, /* Session object */ + int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ + void **ppChangeset /* OUT: Buffer containing changeset */ +){ + return sessionGenerateChangeset(pSession, 0, pnChangeset, ppChangeset); +} + +/* +** Obtain a patchset object containing all changes recorded by the +** session object passed as the first argument. +** +** It is the responsibility of the caller to eventually free the buffer +** using sqlite3_free(). +*/ +int sqlite3session_patchset( + sqlite3_session *pSession, /* Session object */ + int *pnPatchset, /* OUT: Size of buffer at *ppChangeset */ + void **ppPatchset /* OUT: Buffer containing changeset */ +){ + return sessionGenerateChangeset(pSession, 1, pnPatchset, ppPatchset); +} + +/* ** Enable or disable the session object passed as the first argument. */ int sqlite3session_enable(sqlite3_session *pSession, int bEnable){ @@ -1866,13 +1968,16 @@ int sqlite3changeset_start( static int sessionReadRecord( u8 **paChange, /* IN/OUT: Pointer to binary record */ int nCol, /* Number of values in record */ + u8 *abPK, /* Array of primary key flags, or NULL */ sqlite3_value **apOut /* Write values to this array */ ){ int i; /* Used to iterate through columns */ u8 *aRec = *paChange; /* Cursor for the serialized record */ for(i=0; i<nCol; i++){ - int eType = *aRec++; /* Type of value (SQLITE_NULL, TEXT etc.) */ + int eType; + if( abPK && abPK[i]==0 ) continue; + eType = *aRec++; /* Type of value (SQLITE_NULL, TEXT etc.) */ assert( !apOut || apOut[i]==0 ); if( eType ){ if( apOut ){ @@ -1952,8 +2057,9 @@ static int sessionChangesetNext( } aChange = p->pNext; - if( aChange[0]=='T' ){ + if( aChange[0]=='T' || aChange[0]=='P' ){ int nByte; /* Bytes to allocate for apValue */ + p->bPatchset = (aChange[0]=='P'); aChange++; aChange += sessionVarintGet(aChange, &p->nCol); p->abPK = (u8 *)aChange; @@ -1981,18 +2087,36 @@ static int sessionChangesetNext( if( paRec ){ *paRec = aChange; } /* If this is an UPDATE or DELETE, read the old.* record. */ - if( p->op!=SQLITE_INSERT ){ - p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:p->apValue); + if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){ + u8 *abPK = p->bPatchset ? p->abPK : 0; + p->rc = sessionReadRecord(&aChange, p->nCol, abPK, paRec?0:p->apValue); if( p->rc!=SQLITE_OK ) return p->rc; } /* If this is an INSERT or UPDATE, read the new.* record. */ if( p->op!=SQLITE_DELETE ){ - p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:&p->apValue[p->nCol]); + sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]); + p->rc = sessionReadRecord(&aChange, p->nCol, 0, apOut); if( p->rc!=SQLITE_OK ) return p->rc; } - if( pnRec ){ *pnRec = (int)(aChange - *paRec); } + if( pnRec ){ + *pnRec = (int)(aChange - *paRec); + }else if( p->bPatchset && p->op==SQLITE_UPDATE ){ + /* If this is an UPDATE that is part of a patchset, then all PK and + ** modified fields are present in the new.* record. The old.* record + ** is currently completely empty. This block shifts the PK fields from + ** new.* to old.*, to accommodate the code that reads these arrays. */ + int i; + for(i=0; i<p->nCol; i++){ + assert( p->apValue[i]==0 ); + assert( p->abPK[i]==0 || p->apValue[i+p->nCol] ); + if( p->abPK[i] ){ + p->apValue[i] = p->apValue[i+p->nCol]; + p->apValue[i+p->nCol] = 0; + } + } + } p->pNext = aChange; return SQLITE_ROW; } @@ -2225,7 +2349,7 @@ int sqlite3changeset_invert( int nByte; u8 *aEnd = &aIn[i+2]; - sessionReadRecord(&aEnd, nCol, 0); + sessionReadRecord(&aEnd, nCol, 0, 0); aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE); aOut[i+1] = aIn[i+1]; nByte = (int)(aEnd - &aIn[i+2]); @@ -2249,9 +2373,9 @@ int sqlite3changeset_invert( } /* Read the old.* and new.* records for the update change. */ - rc = sessionReadRecord(&aEnd, nCol, &apVal[0]); + rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[0]); if( rc==SQLITE_OK ){ - rc = sessionReadRecord(&aEnd, nCol, &apVal[nCol]); + rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[nCol]); } /* Write the header for the new UPDATE change. Same as the original. */ @@ -2781,10 +2905,21 @@ static int sessionApplyOneOp( if( op==SQLITE_DELETE ){ - /* Bind values to the DELETE statement. */ - rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, 0, p->pDelete); + /* Bind values to the DELETE statement. If conflict handling is required, + ** bind values for all columns and set bound variable (nCol+1) to true. + ** Or, if conflict handling is not required, bind just the PK column + ** values and, if it exists, set (nCol+1) to false. Conflict handling + ** is not required if: + ** + ** * this is a patchset, or + ** * (pbRetry==0), or + ** * all columns of the table are PK columns (in this case there is + ** no (nCol+1) variable to bind to). + */ + u8 *abPK = (pIter->bPatchset ? p->abPK : 0); + rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, abPK, p->pDelete); if( rc==SQLITE_OK && sqlite3_bind_parameter_count(p->pDelete)>nCol ){ - rc = sqlite3_bind_int(p->pDelete, nCol+1, pbRetry==0); + rc = sqlite3_bind_int(p->pDelete, nCol+1, (pbRetry==0 || abPK)); } if( rc!=SQLITE_OK ) return rc; @@ -2816,7 +2951,9 @@ static int sessionApplyOneOp( rc = sessionBindValue(p->pUpdate, i*3+3, pNew); } } - if( rc==SQLITE_OK ) sqlite3_bind_int(p->pUpdate, nCol*3+1, pbRetry==0); + if( rc==SQLITE_OK ){ + sqlite3_bind_int(p->pUpdate, nCol*3+1, pbRetry==0 || pIter->bPatchset); + } if( rc!=SQLITE_OK ) return rc; /* Attempt the UPDATE. In the case of a NOTFOUND or DATA conflict, @@ -3099,7 +3236,7 @@ static int sessionChangeMerge( u8 *a1 = aRec; assert( op2==SQLITE_UPDATE ); pNew->op = SQLITE_INSERT; - sessionReadRecord(&a1, pTab->nCol, 0); + sessionReadRecord(&a1, pTab->nCol, 0, 0); sessionMergeRecord(&aCsr, pTab->nCol, pExist->aRecord, a1); }else if( op1==SQLITE_DELETE ){ /* DELETE + INSERT */ assert( op2==SQLITE_INSERT ); @@ -3112,8 +3249,8 @@ static int sessionChangeMerge( u8 *a1 = pExist->aRecord; u8 *a2 = aRec; assert( op1==SQLITE_UPDATE ); - sessionReadRecord(&a1, pTab->nCol, 0); - sessionReadRecord(&a2, pTab->nCol, 0); + sessionReadRecord(&a1, pTab->nCol, 0, 0); + sessionReadRecord(&a2, pTab->nCol, 0, 0); pNew->op = SQLITE_UPDATE; if( 0==sessionMergeUpdate(&aCsr, pTab, aRec, pExist->aRecord, a1, a2) ){ sqlite3_free(pNew); @@ -3274,7 +3411,7 @@ int sqlite3changeset_concat( int i; if( pTab->nEntry==0 ) continue; - sessionAppendTableHdr(&buf, pTab, &rc); + sessionAppendTableHdr(&buf, 0, pTab, &rc); for(i=0; i<pTab->nChange; i++){ SessionChange *p; for(p=pTab->apChange[i]; p; p=p->pNext){ |