aboutsummaryrefslogtreecommitdiff
path: root/ext/session/sqlite3session.c
diff options
context:
space:
mode:
authordan <dan@noemail.net>2018-03-16 18:02:47 +0000
committerdan <dan@noemail.net>2018-03-16 18:02:47 +0000
commitf01d3a7ef77ea7fbddc95befa13c78062168fcf7 (patch)
tree935559c512b0116f157533c94831bc0916745b95 /ext/session/sqlite3session.c
parentf1b40e830544fa64f065d06787c9fa9b73a2ff48 (diff)
downloadsqlite-f01d3a7ef77ea7fbddc95befa13c78062168fcf7.tar.gz
sqlite-f01d3a7ef77ea7fbddc95befa13c78062168fcf7.zip
Fix a problem with handling rebasing UPDATE changes for REPLACE conflict
resolution. FossilOrigin-Name: f7bf71f1d47044e3cbc74018294b8af5ad52c2bb84954e99bbd4e9b8c36fc077
Diffstat (limited to 'ext/session/sqlite3session.c')
-rw-r--r--ext/session/sqlite3session.c165
1 files changed, 134 insertions, 31 deletions
diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c
index 45803c7b2..005b2308d 100644
--- a/ext/session/sqlite3session.c
+++ b/ext/session/sqlite3session.c
@@ -232,8 +232,8 @@ struct SessionTable {
** statement.
**
** For a DELETE change, all fields within the record except those associated
-** with PRIMARY KEY columns are set to "undefined". The PRIMARY KEY fields
-** contain the values identifying the row to delete.
+** with PRIMARY KEY columns are omitted. The PRIMARY KEY fields contain the
+** values identifying the row to delete.
**
** For an UPDATE change, all fields except those associated with PRIMARY KEY
** columns and columns that are modified by the UPDATE are set to "undefined".
@@ -3816,34 +3816,18 @@ static int sessionRebaseAdd(
assert( eType==SQLITE_CHANGESET_REPLACE||eType==SQLITE_CHANGESET_OMIT );
assert( eOp==SQLITE_DELETE || eOp==SQLITE_INSERT || eOp==SQLITE_UPDATE );
- if( eType==SQLITE_CHANGESET_REPLACE ){
- sessionAppendByte(&p->rebase, SQLITE_DELETE, &rc);
- sessionAppendByte(&p->rebase, 0, &rc);
- for(i=0; i<p->nCol; i++){
- if( p->abPK[i]==0 ){
- sessionAppendByte(&p->rebase, 0, &rc);
- }else{
- sqlite3_value *pVal = 0;
- if( eOp==SQLITE_INSERT ){
- sqlite3changeset_new(pIter, i, &pVal);
- }else{
- sqlite3changeset_old(pIter, i, &pVal);
- }
- sessionAppendValue(&p->rebase, pVal, &rc);
- }
- }
- }else{
- sessionAppendByte(&p->rebase, SQLITE_INSERT, &rc);
- sessionAppendByte(&p->rebase, eOp==SQLITE_DELETE, &rc);
- for(i=0; i<p->nCol; i++){
- sqlite3_value *pVal = 0;
- if( eOp!=SQLITE_INSERT && p->abPK[i] ){
- sqlite3changeset_old(pIter, i, &pVal);
- }else{
- sqlite3changeset_new(pIter, i, &pVal);
- }
- sessionAppendValue(&p->rebase, pVal, &rc);
+ sessionAppendByte(&p->rebase,
+ (eOp==SQLITE_DELETE ? SQLITE_DELETE : SQLITE_INSERT), &rc
+ );
+ sessionAppendByte(&p->rebase, (eType==SQLITE_CHANGESET_REPLACE), &rc);
+ for(i=0; i<p->nCol; i++){
+ sqlite3_value *pVal = 0;
+ if( eOp==SQLITE_DELETE || (eOp==SQLITE_UPDATE && p->abPK[i]) ){
+ sqlite3changeset_old(pIter, i, &pVal);
+ }else{
+ sqlite3changeset_new(pIter, i, &pVal);
}
+ sessionAppendValue(&p->rebase, pVal, &rc);
}
return rc;
@@ -5026,6 +5010,56 @@ static void sessionAppendRecordMerge(
}
}
+static void sessionAppendPartialUpdate(
+ SessionBuffer *pBuf,
+ sqlite3_changeset_iter *pIter,
+ u8 *aRec, int nRec,
+ u8 *aChange, int nChange,
+ int *pRc
+){
+ sessionBufferGrow(pBuf, 2+nRec+nChange, pRc);
+ if( *pRc==SQLITE_OK ){
+ int bData = 0;
+ u8 *pOut = &pBuf->aBuf[pBuf->nBuf];
+ int i;
+ u8 *a1 = aRec;
+ u8 *a2 = aChange;
+
+ *pOut++ = SQLITE_UPDATE;
+ *pOut++ = pIter->bIndirect;
+ for(i=0; i<pIter->nCol; i++){
+ int n1 = sessionSerialLen(a1);
+ int n2 = sessionSerialLen(a2);
+ if( pIter->abPK[i] || a2[0]==0 ){
+ if( !pIter->abPK[i] ) bData = 1;
+ memcpy(pOut, a1, n1);
+ pOut += n1;
+ }else{
+ *pOut++ = '\0';
+ }
+ a1 += n1;
+ a2 += n2;
+ }
+ if( bData ){
+ a2 = aChange;
+ for(i=0; i<pIter->nCol; i++){
+ int n1 = sessionSerialLen(a1);
+ int n2 = sessionSerialLen(a2);
+ if( pIter->abPK[i] || a2[0]==0 ){
+ memcpy(pOut, a1, n1);
+ pOut += n1;
+ }else{
+ *pOut++ = '\0';
+ }
+ a1 += n1;
+ a2 += n2;
+ }
+ pBuf->nBuf = (pOut - pBuf->aBuf);
+ }
+ }
+}
+
+
static int sessionRebase(
sqlite3_rebaser *p, /* Rebaser hash table */
sqlite3_changeset_iter *pIter, /* Input data */
@@ -5043,6 +5077,7 @@ static int sessionRebase(
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, &bNew) ){
SessionChange *pChange = 0;
+ int bDone = 0;
if( bNew ){
const char *zTab = pIter->zTab;
@@ -5071,6 +5106,70 @@ static int sessionRebase(
if( pChange ){
assert( pChange->op==SQLITE_DELETE || pChange->op==SQLITE_INSERT );
+ switch( pIter->op ){
+ case SQLITE_INSERT:
+ if( pChange->op==SQLITE_INSERT ){
+ bDone = 1;
+ if( pChange->bIndirect==0 ){
+ sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
+ sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc);
+ sessionAppendBlob(&sOut, aRec, nRec, &rc);
+ }
+ }
+ break;
+
+ case SQLITE_UPDATE:
+ bDone = 1;
+ if( pChange->op==SQLITE_DELETE ){
+ if( pChange->bIndirect==0 ){
+ u8 *pCsr = aRec;
+ sessionSkipRecord(&pCsr, pIter->nCol);
+ sessionAppendByte(&sOut, SQLITE_INSERT, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
+ sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
+ pCsr, nRec-(pCsr-aRec),
+ pChange->aRecord, pChange->nRecord, &rc
+ );
+ }
+ }else{
+ if( pChange->bIndirect==0 ){
+ u8 *pCsr = aRec;
+ sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
+ sessionAppendRecordMerge(&sOut, pIter->nCol, 0,
+ aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
+ );
+ sessionSkipRecord(&pCsr, pIter->nCol);
+ sessionAppendBlob(&sOut, pCsr, nRec - (pCsr-aRec), &rc);
+ }else{
+ sessionAppendPartialUpdate(&sOut, pIter,
+ aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
+ );
+ }
+ }
+ break;
+
+ default:
+ assert( pIter->op==SQLITE_DELETE );
+ bDone = 1;
+ if( pChange->op==SQLITE_INSERT ){
+ sessionAppendByte(&sOut, SQLITE_DELETE, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
+ sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
+ pChange->aRecord, pChange->nRecord, aRec, nRec, &rc
+ );
+ }
+ break;
+ }
+ }
+
+ if( bDone==0 ){
+ sessionAppendByte(&sOut, pIter->op, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
+ sessionAppendBlob(&sOut, aRec, nRec, &rc);
+ }
+#if 0
/* If pChange is an INSERT, then rebase the change. If it is a
** DELETE, omit the change from the output altogether. */
if( pChange->op==SQLITE_INSERT ){
@@ -5097,12 +5196,15 @@ static int sessionRebase(
}
}
}else{
- sessionAppendByte(&sOut, pIter->op, &rc);
- sessionAppendByte(&sOut, pIter->bIndirect, &rc);
if( pIter->op==SQLITE_INSERT ){
+ sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
+ sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}else{
u8 *pCsr = aRec;
+ sessionAppendByte(&sOut, pIter->op, &rc);
+ sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendRecordMerge(&sOut, pIter->nCol, 0,
aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
);
@@ -5118,6 +5220,7 @@ static int sessionRebase(
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}
+#endif
if( rc==SQLITE_OK && xOutput && sOut.nBuf>SESSIONS_STRM_CHUNK_SIZE ){
rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);