aboutsummaryrefslogtreecommitdiff
path: root/ext/session/sqlite3session.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/session/sqlite3session.c')
-rw-r--r--ext/session/sqlite3session.c199
1 files changed, 169 insertions, 30 deletions
diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c
index a8dd1d9be..df40fdc1c 100644
--- a/ext/session/sqlite3session.c
+++ b/ext/session/sqlite3session.c
@@ -3396,14 +3396,15 @@ int sqlite3changeset_start_v2_strm(
** object and the buffer is full, discard some data to free up space.
*/
static void sessionDiscardData(SessionInput *pIn){
- if( pIn->xInput && pIn->iNext>=sessions_strm_chunk_size ){
- int nMove = pIn->buf.nBuf - pIn->iNext;
+ if( pIn->xInput && pIn->iCurrent>=sessions_strm_chunk_size ){
+ int nMove = pIn->buf.nBuf - pIn->iCurrent;
assert( nMove>=0 );
if( nMove>0 ){
- memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
+ memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iCurrent], nMove);
}
- pIn->buf.nBuf -= pIn->iNext;
- pIn->iNext = 0;
+ pIn->buf.nBuf -= pIn->iCurrent;
+ pIn->iNext -= pIn->iCurrent;
+ pIn->iCurrent = 0;
pIn->nData = pIn->buf.nBuf;
}
}
@@ -3757,8 +3758,8 @@ static int sessionChangesetNextOne(
p->rc = sessionInputBuffer(&p->in, 2);
if( p->rc!=SQLITE_OK ) return p->rc;
- sessionDiscardData(&p->in);
p->in.iCurrent = p->in.iNext;
+ sessionDiscardData(&p->in);
/* If the iterator is already at the end of the changeset, return DONE. */
if( p->in.iNext>=p->in.nData ){
@@ -5181,6 +5182,10 @@ static int sessionChangesetApply(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
),
+ int(*xFilterIter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ sqlite3_changeset_iter *p
+ ),
int(*xConflict)(
void *pCtx, /* Copy of fifth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
@@ -5321,6 +5326,9 @@ static int sessionChangesetApply(
** next change. A log message has already been issued. */
if( schemaMismatch ) continue;
+ /* If this is a call to apply_v3(), invoke xFilterIter here. */
+ if( xFilterIter && 0==xFilterIter(pCtx, pIter) ) continue;
+
rc = sessionApplyOneWithRetry(db, pIter, &sApply, xConflict, pCtx);
}
@@ -5389,17 +5397,64 @@ static int sessionChangesetApply(
}
/*
-** Apply the changeset passed via pChangeset/nChangeset to the main
-** database attached to handle "db".
+** This function is called by all six sqlite3changeset_apply() variants:
+**
+** + sqlite3changeset_apply()
+** + sqlite3changeset_apply_v2()
+** + sqlite3changeset_apply_v3()
+** + sqlite3changeset_apply_strm()
+** + sqlite3changeset_apply_strm_v2()
+** + sqlite3changeset_apply_strm_v3()
+**
+** Arguments passed to this function are as follows:
+**
+** db:
+** Database handle to apply changeset to main database of.
+**
+** nChangeset/pChangeset:
+** These are both passed zero for the streaming variants. For the normal
+** apply() functions, these are passed the size of and the buffer containing
+** the changeset, respectively.
+**
+** xInput/pIn:
+** These are both passed zero for the normal variants. For the streaming
+** apply() functions, these are passed the input callback and context
+** pointer, respectively.
+**
+** xFilter:
+** The filter function as passed to apply() or apply_v2() (to filter by
+** table name), if any. This is always NULL for apply_v3() calls.
+**
+** xFilterIter:
+** The filter function as passed to apply_v3(), if any.
+**
+** xConflict:
+** The conflict handler callback (must not be NULL).
+**
+** pCtx:
+** The context pointer passed to the xFilter and xConflict handler callbacks.
+**
+** ppRebase, pnRebase:
+** Zero for apply(). The rebase changeset output pointers, if any, for
+** apply_v2() and apply_v3().
+**
+** flags:
+** Zero for apply(). The flags parameter for apply_v2() and apply_v3().
*/
-int sqlite3changeset_apply_v2(
+static int sessionChangesetApplyV23(
sqlite3 *db, /* Apply change to "main" db of this handle */
int nChangeset, /* Size of changeset in bytes */
void *pChangeset, /* Changeset blob */
+ int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
+ void *pIn, /* First arg for xInput */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
),
+ int(*xFilterIter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ sqlite3_changeset_iter *p /* Handle describing current change */
+ ),
int(*xConflict)(
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
@@ -5410,19 +5465,75 @@ int sqlite3changeset_apply_v2(
int flags
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
- int bInv = !!(flags & SQLITE_CHANGESETAPPLY_INVERT);
- int rc = sessionChangesetStart(&pIter, 0, 0, nChangeset, pChangeset, bInv, 1);
-
+ int bInverse = !!(flags & SQLITE_CHANGESETAPPLY_INVERT);
+ int rc = sessionChangesetStart(
+ &pIter, xInput, pIn, nChangeset, pChangeset, bInverse, 1
+ );
if( rc==SQLITE_OK ){
- rc = sessionChangesetApply(
- db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase, flags
+ rc = sessionChangesetApply(db, pIter,
+ xFilter, xFilterIter, xConflict, pCtx, ppRebase, pnRebase, flags
);
}
-
return rc;
}
/*
+** Apply the changeset passed via pChangeset/nChangeset to the main
+** database attached to handle "db".
+*/
+int sqlite3changeset_apply_v2(
+ sqlite3 *db, /* Apply change to "main" db of this handle */
+ int nChangeset, /* Size of changeset in bytes */
+ void *pChangeset, /* Changeset blob */
+ int(*xFilter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ const char *zTab /* Table name */
+ ),
+ int(*xConflict)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
+ sqlite3_changeset_iter *p /* Handle describing change and conflict */
+ ),
+ void *pCtx, /* First argument passed to xConflict */
+ void **ppRebase, int *pnRebase,
+ int flags
+){
+ return sessionChangesetApplyV23(db,
+ nChangeset, pChangeset, 0, 0,
+ xFilter, 0, xConflict, pCtx,
+ ppRebase, pnRebase, flags
+ );
+}
+
+/*
+** Apply the changeset passed via pChangeset/nChangeset to the main
+** database attached to handle "db".
+*/
+int sqlite3changeset_apply_v3(
+ sqlite3 *db, /* Apply change to "main" db of this handle */
+ int nChangeset, /* Size of changeset in bytes */
+ void *pChangeset, /* Changeset blob */
+ int(*xFilter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ sqlite3_changeset_iter *p /* Handle describing current change */
+ ),
+ int(*xConflict)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
+ sqlite3_changeset_iter *p /* Handle describing change and conflict */
+ ),
+ void *pCtx, /* First argument passed to xConflict */
+ void **ppRebase, int *pnRebase,
+ int flags
+){
+ return sessionChangesetApplyV23(db,
+ nChangeset, pChangeset, 0, 0,
+ 0, xFilter, xConflict, pCtx,
+ ppRebase, pnRebase, flags
+ );
+}
+
+/*
** Apply the changeset passed via pChangeset/nChangeset to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
@@ -5442,8 +5553,10 @@ int sqlite3changeset_apply(
),
void *pCtx /* First argument passed to xConflict */
){
- return sqlite3changeset_apply_v2(
- db, nChangeset, pChangeset, xFilter, xConflict, pCtx, 0, 0, 0
+ return sessionChangesetApplyV23(db,
+ nChangeset, pChangeset, 0, 0,
+ xFilter, 0, xConflict, pCtx,
+ 0, 0, 0
);
}
@@ -5452,6 +5565,29 @@ int sqlite3changeset_apply(
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
+int sqlite3changeset_apply_v3_strm(
+ sqlite3 *db, /* Apply change to "main" db of this handle */
+ int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
+ void *pIn, /* First arg for xInput */
+ int(*xFilter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ sqlite3_changeset_iter *p
+ ),
+ int(*xConflict)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
+ sqlite3_changeset_iter *p /* Handle describing change and conflict */
+ ),
+ void *pCtx, /* First argument passed to xConflict */
+ void **ppRebase, int *pnRebase,
+ int flags
+){
+ return sessionChangesetApplyV23(db,
+ 0, 0, xInput, pIn,
+ 0, xFilter, xConflict, pCtx,
+ ppRebase, pnRebase, flags
+ );
+}
int sqlite3changeset_apply_v2_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
@@ -5469,15 +5605,11 @@ int sqlite3changeset_apply_v2_strm(
void **ppRebase, int *pnRebase,
int flags
){
- sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
- int bInverse = !!(flags & SQLITE_CHANGESETAPPLY_INVERT);
- int rc = sessionChangesetStart(&pIter, xInput, pIn, 0, 0, bInverse, 1);
- if( rc==SQLITE_OK ){
- rc = sessionChangesetApply(
- db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase, flags
- );
- }
- return rc;
+ return sessionChangesetApplyV23(db,
+ 0, 0, xInput, pIn,
+ xFilter, 0, xConflict, pCtx,
+ ppRebase, pnRebase, flags
+ );
}
int sqlite3changeset_apply_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
@@ -5494,8 +5626,10 @@ int sqlite3changeset_apply_strm(
),
void *pCtx /* First argument passed to xConflict */
){
- return sqlite3changeset_apply_v2_strm(
- db, xInput, pIn, xFilter, xConflict, pCtx, 0, 0, 0
+ return sessionChangesetApplyV23(db,
+ 0, 0, xInput, pIn,
+ xFilter, 0, xConflict, pCtx,
+ 0, 0, 0
);
}
@@ -6117,14 +6251,19 @@ int sqlite3changegroup_add_change(
sqlite3_changegroup *pGrp,
sqlite3_changeset_iter *pIter
){
+ int rc = SQLITE_OK;
+
if( pIter->in.iCurrent==pIter->in.iNext
|| pIter->rc!=SQLITE_OK
|| pIter->bInvert
){
/* Iterator does not point to any valid entry or is an INVERT iterator. */
- return SQLITE_ERROR;
+ rc = SQLITE_ERROR;
+ }else{
+ pIter->in.bNoDiscard = 1;
+ rc = sessionOneChangeToHash(pGrp, pIter, 0);
}
- return sessionOneChangeToHash(pGrp, pIter, 0);
+ return rc;
}
/*