diff options
Diffstat (limited to 'ext/session/sqlite3session.c')
-rw-r--r-- | ext/session/sqlite3session.c | 165 |
1 files changed, 134 insertions, 31 deletions
diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c index 45803c7b2..005b2308d 100644 --- a/ext/session/sqlite3session.c +++ b/ext/session/sqlite3session.c @@ -232,8 +232,8 @@ struct SessionTable { ** statement. ** ** For a DELETE change, all fields within the record except those associated -** with PRIMARY KEY columns are set to "undefined". The PRIMARY KEY fields -** contain the values identifying the row to delete. +** with PRIMARY KEY columns are omitted. The PRIMARY KEY fields contain the +** values identifying the row to delete. ** ** For an UPDATE change, all fields except those associated with PRIMARY KEY ** columns and columns that are modified by the UPDATE are set to "undefined". @@ -3816,34 +3816,18 @@ static int sessionRebaseAdd( assert( eType==SQLITE_CHANGESET_REPLACE||eType==SQLITE_CHANGESET_OMIT ); assert( eOp==SQLITE_DELETE || eOp==SQLITE_INSERT || eOp==SQLITE_UPDATE ); - if( eType==SQLITE_CHANGESET_REPLACE ){ - sessionAppendByte(&p->rebase, SQLITE_DELETE, &rc); - sessionAppendByte(&p->rebase, 0, &rc); - for(i=0; i<p->nCol; i++){ - if( p->abPK[i]==0 ){ - sessionAppendByte(&p->rebase, 0, &rc); - }else{ - sqlite3_value *pVal = 0; - if( eOp==SQLITE_INSERT ){ - sqlite3changeset_new(pIter, i, &pVal); - }else{ - sqlite3changeset_old(pIter, i, &pVal); - } - sessionAppendValue(&p->rebase, pVal, &rc); - } - } - }else{ - sessionAppendByte(&p->rebase, SQLITE_INSERT, &rc); - sessionAppendByte(&p->rebase, eOp==SQLITE_DELETE, &rc); - for(i=0; i<p->nCol; i++){ - sqlite3_value *pVal = 0; - if( eOp!=SQLITE_INSERT && p->abPK[i] ){ - sqlite3changeset_old(pIter, i, &pVal); - }else{ - sqlite3changeset_new(pIter, i, &pVal); - } - sessionAppendValue(&p->rebase, pVal, &rc); + sessionAppendByte(&p->rebase, + (eOp==SQLITE_DELETE ? SQLITE_DELETE : SQLITE_INSERT), &rc + ); + sessionAppendByte(&p->rebase, (eType==SQLITE_CHANGESET_REPLACE), &rc); + for(i=0; i<p->nCol; i++){ + sqlite3_value *pVal = 0; + if( eOp==SQLITE_DELETE || (eOp==SQLITE_UPDATE && p->abPK[i]) ){ + sqlite3changeset_old(pIter, i, &pVal); + }else{ + sqlite3changeset_new(pIter, i, &pVal); } + sessionAppendValue(&p->rebase, pVal, &rc); } return rc; @@ -5026,6 +5010,56 @@ static void sessionAppendRecordMerge( } } +static void sessionAppendPartialUpdate( + SessionBuffer *pBuf, + sqlite3_changeset_iter *pIter, + u8 *aRec, int nRec, + u8 *aChange, int nChange, + int *pRc +){ + sessionBufferGrow(pBuf, 2+nRec+nChange, pRc); + if( *pRc==SQLITE_OK ){ + int bData = 0; + u8 *pOut = &pBuf->aBuf[pBuf->nBuf]; + int i; + u8 *a1 = aRec; + u8 *a2 = aChange; + + *pOut++ = SQLITE_UPDATE; + *pOut++ = pIter->bIndirect; + for(i=0; i<pIter->nCol; i++){ + int n1 = sessionSerialLen(a1); + int n2 = sessionSerialLen(a2); + if( pIter->abPK[i] || a2[0]==0 ){ + if( !pIter->abPK[i] ) bData = 1; + memcpy(pOut, a1, n1); + pOut += n1; + }else{ + *pOut++ = '\0'; + } + a1 += n1; + a2 += n2; + } + if( bData ){ + a2 = aChange; + for(i=0; i<pIter->nCol; i++){ + int n1 = sessionSerialLen(a1); + int n2 = sessionSerialLen(a2); + if( pIter->abPK[i] || a2[0]==0 ){ + memcpy(pOut, a1, n1); + pOut += n1; + }else{ + *pOut++ = '\0'; + } + a1 += n1; + a2 += n2; + } + pBuf->nBuf = (pOut - pBuf->aBuf); + } + } +} + + static int sessionRebase( sqlite3_rebaser *p, /* Rebaser hash table */ sqlite3_changeset_iter *pIter, /* Input data */ @@ -5043,6 +5077,7 @@ static int sessionRebase( while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, &bNew) ){ SessionChange *pChange = 0; + int bDone = 0; if( bNew ){ const char *zTab = pIter->zTab; @@ -5071,6 +5106,70 @@ static int sessionRebase( if( pChange ){ assert( pChange->op==SQLITE_DELETE || pChange->op==SQLITE_INSERT ); + switch( pIter->op ){ + case SQLITE_INSERT: + if( pChange->op==SQLITE_INSERT ){ + bDone = 1; + if( pChange->bIndirect==0 ){ + sessionAppendByte(&sOut, SQLITE_UPDATE, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); + sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc); + sessionAppendBlob(&sOut, aRec, nRec, &rc); + } + } + break; + + case SQLITE_UPDATE: + bDone = 1; + if( pChange->op==SQLITE_DELETE ){ + if( pChange->bIndirect==0 ){ + u8 *pCsr = aRec; + sessionSkipRecord(&pCsr, pIter->nCol); + sessionAppendByte(&sOut, SQLITE_INSERT, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); + sessionAppendRecordMerge(&sOut, pIter->nCol, 1, + pCsr, nRec-(pCsr-aRec), + pChange->aRecord, pChange->nRecord, &rc + ); + } + }else{ + if( pChange->bIndirect==0 ){ + u8 *pCsr = aRec; + sessionAppendByte(&sOut, SQLITE_UPDATE, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); + sessionAppendRecordMerge(&sOut, pIter->nCol, 0, + aRec, nRec, pChange->aRecord, pChange->nRecord, &rc + ); + sessionSkipRecord(&pCsr, pIter->nCol); + sessionAppendBlob(&sOut, pCsr, nRec - (pCsr-aRec), &rc); + }else{ + sessionAppendPartialUpdate(&sOut, pIter, + aRec, nRec, pChange->aRecord, pChange->nRecord, &rc + ); + } + } + break; + + default: + assert( pIter->op==SQLITE_DELETE ); + bDone = 1; + if( pChange->op==SQLITE_INSERT ){ + sessionAppendByte(&sOut, SQLITE_DELETE, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); + sessionAppendRecordMerge(&sOut, pIter->nCol, 1, + pChange->aRecord, pChange->nRecord, aRec, nRec, &rc + ); + } + break; + } + } + + if( bDone==0 ){ + sessionAppendByte(&sOut, pIter->op, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); + sessionAppendBlob(&sOut, aRec, nRec, &rc); + } +#if 0 /* If pChange is an INSERT, then rebase the change. If it is a ** DELETE, omit the change from the output altogether. */ if( pChange->op==SQLITE_INSERT ){ @@ -5097,12 +5196,15 @@ static int sessionRebase( } } }else{ - sessionAppendByte(&sOut, pIter->op, &rc); - sessionAppendByte(&sOut, pIter->bIndirect, &rc); if( pIter->op==SQLITE_INSERT ){ + sessionAppendByte(&sOut, SQLITE_UPDATE, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); + sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc); sessionAppendBlob(&sOut, aRec, nRec, &rc); }else{ u8 *pCsr = aRec; + sessionAppendByte(&sOut, pIter->op, &rc); + sessionAppendByte(&sOut, pIter->bIndirect, &rc); sessionAppendRecordMerge(&sOut, pIter->nCol, 0, aRec, nRec, pChange->aRecord, pChange->nRecord, &rc ); @@ -5118,6 +5220,7 @@ static int sessionRebase( sessionAppendByte(&sOut, pIter->bIndirect, &rc); sessionAppendBlob(&sOut, aRec, nRec, &rc); } +#endif if( rc==SQLITE_OK && xOutput && sOut.nBuf>SESSIONS_STRM_CHUNK_SIZE ){ rc = xOutput(pOut, sOut.aBuf, sOut.nBuf); |