diff options
author | drh <drh@noemail.net> | 2018-04-14 20:24:36 +0000 |
---|---|---|
committer | drh <drh@noemail.net> | 2018-04-14 20:24:36 +0000 |
commit | 096fd476c1ebb53a8add1f89176bad80eadbd880 (patch) | |
tree | 454b5696689d777c9b792c58ffe15f216976f904 /src | |
parent | 0b30a116454e307e0a74d472aa2dee302fa89014 (diff) | |
download | sqlite-096fd476c1ebb53a8add1f89176bad80eadbd880.tar.gz sqlite-096fd476c1ebb53a8add1f89176bad80eadbd880.zip |
Make sure constraint checks occur in the correct order, even in the
presence of upserts.
FossilOrigin-Name: 07fb30c3de7ff396ae2ce8a0d20352b56f17a5db0af99a921c7bfe9bd4018115
Diffstat (limited to 'src')
-rw-r--r-- | src/insert.c | 138 |
1 files changed, 119 insertions, 19 deletions
diff --git a/src/insert.c b/src/insert.c index c36d50fb3..d98f127c5 100644 --- a/src/insert.c +++ b/src/insert.c @@ -1158,6 +1158,42 @@ static int checkConstraintUnchanged(Expr *pExpr, int *aiChng, int chngRowid){ } /* +** An instance of the ConstraintAddr object remembers the byte-code addresses +** for sections of the constraint checks that deal with uniqueness constraints +** on the rowid and on the upsert constraint. +** +** This information is passed into checkReorderConstraintChecks() to insert +** some OP_Goto operations so that the rowid and upsert constraints occur +** in the correct order relative to other constraints. +*/ +typedef struct ConstraintAddr ConstraintAddr; +struct ConstraintAddr { + int ipkTop; /* Subroutine for rowid constraint check */ + int upsertTop; /* Label for upsert constraint check subroutine */ + int ipkBtm; /* Return opcode rowid constraint check */ + int upsertBtm; /* upsert constraint returns to this label */ +}; + +/* +** Generate any OP_Goto operations needed to cause constraints to be +** run that haven't already been run. +*/ +static void reorderConstraintChecks(Vdbe *v, ConstraintAddr *p){ + if( p->upsertTop ){ + sqlite3VdbeGoto(v, p->upsertTop); + VdbeComment((v, "call upsert subroutine")); + sqlite3VdbeResolveLabel(v, p->upsertBtm); + p->upsertTop = 0; + } + if( p->ipkTop ){ + sqlite3VdbeGoto(v, p->ipkTop); + VdbeComment((v, "call rowid constraint-check subroutine")); + sqlite3VdbeJumpHere(v, p->ipkBtm); + p->ipkTop = 0; + } +} + +/* ** Generate code to do constraint checks prior to an INSERT or an UPDATE ** on table pTab. ** @@ -1266,10 +1302,11 @@ void sqlite3GenerateConstraintChecks( int addr1; /* Address of jump instruction */ int seenReplace = 0; /* True if REPLACE is used to resolve INT PK conflict */ int nPkField; /* Number of fields in PRIMARY KEY. 1 for ROWID tables */ - int ipkTop = 0; /* Top of the rowid change constraint check */ - int ipkBottom = 0; /* Bottom of the rowid change constraint check */ + ConstraintAddr sAddr;/* Address information for constraint reordering */ + Index *pUpIdx = 0; /* Index to which to apply the upsert */ u8 isUpdate; /* True if this is an UPDATE operation */ u8 bAffinityDone = 0; /* True if the OP_Affinity operation has been run */ + int upsertBypass = 0; /* Address of Goto to bypass upsert subroutine */ isUpdate = regOldData!=0; db = pParse->db; @@ -1277,6 +1314,8 @@ void sqlite3GenerateConstraintChecks( assert( v!=0 ); assert( pTab->pSelect==0 ); /* This table is not a VIEW */ nCol = pTab->nCol; + sAddr.ipkTop = 0; + sAddr.upsertTop = 0; /* pPk is the PRIMARY KEY index for WITHOUT ROWID tables and NULL for ** normal rowid tables. nPkField is the number of key fields in the @@ -1376,6 +1415,53 @@ void sqlite3GenerateConstraintChecks( } #endif /* !defined(SQLITE_OMIT_CHECK) */ + /* UNIQUE and PRIMARY KEY constraints should be handled in the following + ** order: + ** + ** (1) OE_Abort, OE_Fail, OE_Rollback, OE_Ignore + ** (2) OE_Update + ** (3) OE_Replace + ** + ** OE_Fail and OE_Ignore must happen before any changes are made. + ** OE_Update guarantees that only a single row will change, so it + ** must happen before OE_Replace. Technically, OE_Abort and OE_Rollback + ** could happen in any order, but they are grouped up front for + ** convenience. + ** + ** Constraint checking code is generated in this order: + ** (A) The rowid constraint + ** (B) Unique index constraints that do not have OE_Replace as their + ** default conflict resolution strategy + ** (C) Unique index that do use OE_Replace by default. + ** + ** The ordering of (2) and (3) is accomplished by making sure the linked + ** list of indexes attached to a table puts all OE_Replace indexes last + ** in the list. See sqlite3CreateIndex() for where that happens. + */ + + /* If there is an ON CONFLICT clause without a constraint-target + ** (In other words, one of "ON CONFLICT DO NOTHING" or + ** "ON DUPLICATION KEY UPDATE") then change the overrideError to + ** whichever is appropriate. + */ + if( pUpsert ){ + if( pUpsert->pUpsertTarget==0 ){ + if( pUpsert->pUpsertSet==0 ){ + /* An ON CONFLICT DO NOTHING clause, without a constraint-target. + ** Make all unique constraint resolution be OE_Ignore */ + overrideError = OE_Ignore; + pUpsert = 0; + }else{ + /* An ON DUPLICATE KEY UPDATE clause. All unique constraints + ** do upsert processing */ + overrideError = OE_Update; + } + }else if( (pUpIdx = pUpsert->pUpsertIdx)!=0 ){ + sAddr.upsertTop = sqlite3VdbeMakeLabel(v); + sAddr.upsertBtm = sqlite3VdbeMakeLabel(v); + } + } + /* If rowid is changing, make sure the new rowid does not previously ** exist in the table. */ @@ -1400,7 +1486,7 @@ void sqlite3GenerateConstraintChecks( } /* figure out whether or not upsert applies in this case */ - if( pUpsert && (pUpsert->pUpsertTarget==0 || pUpsert->pUpsertIdx==0) ){ + if( pUpsert && pUpsert->pUpsertIdx==0 ){ if( pUpsert->pUpsertSet==0 ){ onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */ }else{ @@ -1413,17 +1499,21 @@ void sqlite3GenerateConstraintChecks( ** to defer the running of the rowid conflict checking until after ** the UNIQUE constraints have run. */ - if( onError==OE_Replace && overrideError!=OE_Replace ){ - for(pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext){ - if( pIdx->onError==OE_Ignore || pIdx->onError==OE_Fail ){ - ipkTop = sqlite3VdbeAddOp0(v, OP_Goto); - break; - } - } + assert( OE_Update>OE_Replace ); + assert( OE_Ignore<OE_Replace ); + assert( OE_Fail<OE_Replace ); + assert( OE_Abort<OE_Replace ); + assert( OE_Rollback<OE_Replace ); + if( onError>=OE_Replace + && onError!=overrideError + && pTab->pIndex + ){ + sAddr.ipkTop = sqlite3VdbeAddOp0(v, OP_Goto)+1; } /* Check to see if the new rowid already exists in the table. Skip ** the following conflict logic if it does not. */ + VdbeNoopComment((v, "constraint checks for ROWID")); sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, addrRowidOk, regNewData); VdbeCoverage(v); @@ -1499,9 +1589,9 @@ void sqlite3GenerateConstraintChecks( } } sqlite3VdbeResolveLabel(v, addrRowidOk); - if( ipkTop ){ - ipkBottom = sqlite3VdbeAddOp0(v, OP_Goto); - sqlite3VdbeJumpHere(v, ipkTop); + if( sAddr.ipkTop ){ + sAddr.ipkBtm = sqlite3VdbeAddOp0(v, OP_Goto); + sqlite3VdbeJumpHere(v, sAddr.ipkTop-1); } } @@ -1519,12 +1609,17 @@ void sqlite3GenerateConstraintChecks( int addrUniqueOk; /* Jump here if the UNIQUE constraint is satisfied */ if( aRegIdx[ix]==0 ) continue; /* Skip indices that do not change */ + VdbeNoopComment((v, "constraint checks for %s", pIdx->zName)); if( bAffinityDone==0 ){ sqlite3TableAffinity(v, pTab, regNewData+1); bAffinityDone = 1; } iThisCur = iIdxCur+ix; - addrUniqueOk = sqlite3VdbeMakeLabel(v); + if( pUpIdx==pIdx ){ + addrUniqueOk = sAddr.upsertBtm; + }else{ + addrUniqueOk = sqlite3VdbeMakeLabel(v); + } /* Skip partial indices for which the WHERE clause is not true */ if( pIdx->pPartIdxWhere ){ @@ -1583,14 +1678,20 @@ void sqlite3GenerateConstraintChecks( }else if( onError==OE_Default ){ onError = OE_Abort; } + if( onError==OE_Replace ){ + reorderConstraintChecks(v, &sAddr); + } /* Figure out if the upsert clause applies to this index */ - if( pUpsert && (pUpsert->pUpsertTarget==0 || pUpsert->pUpsertIdx==pIdx) ){ + if( pUpIdx==pIdx ){ if( pUpsert->pUpsertSet==0 ){ onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */ }else{ onError = OE_Update; /* DO UPDATE */ } + upsertBypass = sqlite3VdbeGoto(v, 0); + VdbeComment((v, "Upsert bypass")); + sqlite3VdbeResolveLabel(v, sAddr.upsertTop); } /* Collision detection may be omitted if all of the following are true: @@ -1710,11 +1811,10 @@ void sqlite3GenerateConstraintChecks( sqlite3VdbeResolveLabel(v, addrUniqueOk); sqlite3ExprCachePop(pParse); if( regR!=regIdx ) sqlite3ReleaseTempRange(pParse, regR, nPkField); + if( pUpIdx==pIdx ) sqlite3VdbeJumpHere(v, upsertBypass); + } - if( ipkTop ){ - sqlite3VdbeGoto(v, ipkTop+1); - sqlite3VdbeJumpHere(v, ipkBottom); - } + reorderConstraintChecks(v, &sAddr); *pbMayReplace = seenReplace; VdbeModuleComment((v, "END: GenCnstCks(%d)", seenReplace)); |