diff options
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r-- | src/backend/access/transam/xact.c | 162 |
1 files changed, 102 insertions, 60 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1d930752c57..df5a67e4c31 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -183,6 +183,10 @@ typedef enum TBlockState /* * transaction state structure + * + * Note: parallelModeLevel counts the number of unmatched EnterParallelMode + * calls done at this transaction level. parallelChildXact is true if any + * upper transaction level has nonzero parallelModeLevel. */ typedef struct TransactionStateData { @@ -205,6 +209,7 @@ typedef struct TransactionStateData bool startedInRecovery; /* did we start in recovery? */ bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ + bool parallelChildXact; /* is any parent transaction parallel? */ bool chain; /* start a new block after this one */ bool topXidLogged; /* for a subxact: is top-level XID logged? */ struct TransactionStateData *parent; /* back link to parent */ @@ -639,7 +644,9 @@ AssignTransactionId(TransactionState s) * operation, so we can't account for new XIDs at this point. */ if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot assign XIDs during a parallel operation"); + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot assign XIDs during a parallel operation"))); /* * Ensure parent(s) have XIDs, so that a child always has an XID later @@ -827,7 +834,11 @@ GetCurrentCommandId(bool used) * could relax this restriction when currentCommandIdUsed was already * true at the start of the parallel operation. */ - Assert(!IsParallelWorker()); + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot modify data in a parallel worker"))); + currentCommandIdUsed = true; } return currentCommandId; @@ -1052,7 +1063,8 @@ ExitParallelMode(void) TransactionState s = CurrentTransactionState; Assert(s->parallelModeLevel > 0); - Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); + Assert(s->parallelModeLevel > 1 || s->parallelChildXact || + !ParallelContextActive()); --s->parallelModeLevel; } @@ -1065,11 +1077,17 @@ ExitParallelMode(void) * match across all workers. Mere caches usually don't require such a * restriction. State modified in a strict push/pop fashion, such as the * active snapshot stack, is often fine. + * + * We say we are in parallel mode if we are in a subxact of a transaction + * that's initiated a parallel operation; for most purposes that context + * has all the same restrictions. */ bool IsInParallelMode(void) { - return CurrentTransactionState->parallelModeLevel != 0; + TransactionState s = CurrentTransactionState; + + return s->parallelModeLevel != 0 || s->parallelChildXact; } /* @@ -1092,7 +1110,9 @@ CommandCounterIncrement(void) * point. */ if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot start commands during a parallel operation"); + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start commands during a parallel operation"))); currentCommandId += 1; if (currentCommandId == InvalidCommandId) @@ -2210,9 +2230,26 @@ CommitTransaction(void) CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); - /* If we might have parallel workers, clean them up now. */ - if (IsInParallelMode()) - AtEOXact_Parallel(true); + /* + * If this xact has started any unfinished parallel operation, clean up + * its workers, warning about leaked resources. (But we don't actually + * reset parallelModeLevel till entering TRANS_COMMIT, a bit below. This + * keeps parallel mode restrictions active as long as possible in a + * parallel worker.) + */ + AtEOXact_Parallel(true); + if (is_parallel_worker) + { + if (s->parallelModeLevel != 1) + elog(WARNING, "parallelModeLevel is %d not 1 at end of parallel worker transaction", + s->parallelModeLevel); + } + else + { + if (s->parallelModeLevel != 0) + elog(WARNING, "parallelModeLevel is %d not 0 at end of transaction", + s->parallelModeLevel); + } /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2263,6 +2300,7 @@ CommitTransaction(void) */ s->state = TRANS_COMMIT; s->parallelModeLevel = 0; + s->parallelChildXact = false; /* should be false already */ /* Disable transaction timeout */ if (TransactionTimeout > 0) @@ -2804,12 +2842,13 @@ AbortTransaction(void) /* Reset snapshot export state. */ SnapBuildResetExportedSnapshotState(); - /* If in parallel mode, clean up workers and exit parallel mode. */ - if (IsInParallelMode()) - { - AtEOXact_Parallel(false); - s->parallelModeLevel = 0; - } + /* + * If this xact has started any unfinished parallel operation, clean up + * its workers and exit parallel mode. Don't warn about leaked resources. + */ + AtEOXact_Parallel(false); + s->parallelModeLevel = 0; + s->parallelChildXact = false; /* should be false already */ /* * do abort processing @@ -2937,6 +2976,7 @@ CleanupTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; s->parallelModeLevel = 0; + s->parallelChildXact = false; XactTopFullTransactionId = InvalidFullTransactionId; nParallelCurrentXids = 0; @@ -4303,7 +4343,7 @@ DefineSavepoint(const char *name) * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case * below.) */ - if (IsInParallelMode()) + if (IsInParallelMode() || IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot define savepoints during a parallel operation"))); @@ -4390,7 +4430,7 @@ ReleaseSavepoint(const char *name) * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case * below.) */ - if (IsInParallelMode()) + if (IsInParallelMode() || IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot release savepoints during a parallel operation"))); @@ -4499,7 +4539,7 @@ RollbackToSavepoint(const char *name) * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case * below.) */ - if (IsInParallelMode()) + if (IsInParallelMode() || IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot rollback to savepoints during a parallel operation"))); @@ -4605,38 +4645,40 @@ RollbackToSavepoint(const char *name) /* * BeginInternalSubTransaction * This is the same as DefineSavepoint except it allows TBLOCK_STARTED, - * TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_END, and TBLOCK_PREPARE states, - * and therefore it can safely be used in functions that might be called - * when not inside a BEGIN block or when running deferred triggers at - * COMMIT/PREPARE time. Also, it automatically does - * CommitTransactionCommand/StartTransactionCommand instead of expecting - * the caller to do it. + * TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_PARALLEL_INPROGRESS, TBLOCK_END, + * and TBLOCK_PREPARE states, and therefore it can safely be used in + * functions that might be called when not inside a BEGIN block or when + * running deferred triggers at COMMIT/PREPARE time. Also, it + * automatically does CommitTransactionCommand/StartTransactionCommand + * instead of expecting the caller to do it. */ void BeginInternalSubTransaction(const char *name) { TransactionState s = CurrentTransactionState; + bool save_ExitOnAnyError = ExitOnAnyError; /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for new subtransactions after that - * point. We might be able to make an exception for the type of - * subtransaction established by this function, which is typically used in - * contexts where we're going to release or roll back the subtransaction - * before proceeding further, so that no enduring change to the - * transaction state occurs. For now, however, we prohibit this case along - * with all the others. - */ - if (IsInParallelMode()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot start subtransactions during a parallel operation"))); + * Errors within this function are improbable, but if one does happen we + * force a FATAL exit. Callers generally aren't prepared to handle losing + * control, and moreover our transaction state is probably corrupted if we + * fail partway through; so an ordinary ERROR longjmp isn't okay. + */ + ExitOnAnyError = true; + + /* + * We do not check for parallel mode here. It's permissible to start and + * end "internal" subtransactions while in parallel mode, so long as no + * new XIDs or command IDs are assigned. Enforcement of that occurs in + * AssignTransactionId() and CommandCounterIncrement(). + */ switch (s->blockState) { case TBLOCK_STARTED: case TBLOCK_INPROGRESS: case TBLOCK_IMPLICIT_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_PREPARE: case TBLOCK_SUBINPROGRESS: @@ -4655,7 +4697,6 @@ BeginInternalSubTransaction(const char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -4674,6 +4715,8 @@ BeginInternalSubTransaction(const char *name) CommitTransactionCommand(); StartTransactionCommand(); + + ExitOnAnyError = save_ExitOnAnyError; } /* @@ -4689,16 +4732,10 @@ ReleaseCurrentSubTransaction(void) TransactionState s = CurrentTransactionState; /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for commit of subtransactions after that - * point. This should not happen anyway. Code calling this would - * typically have called BeginInternalSubTransaction() first, failing - * there. + * We do not check for parallel mode here. It's permissible to start and + * end "internal" subtransactions while in parallel mode, so long as no + * new XIDs or command IDs are assigned. */ - if (IsInParallelMode()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot commit subtransactions during a parallel operation"))); if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", @@ -4723,11 +4760,9 @@ RollbackAndReleaseCurrentSubTransaction(void) TransactionState s = CurrentTransactionState; /* - * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted - * during parallel operations. That's because we may be in the leader, - * recovering from an error thrown while we were in parallel mode. We - * won't reach here in a worker, because BeginInternalSubTransaction() - * will have failed. + * We do not check for parallel mode here. It's permissible to start and + * end "internal" subtransactions while in parallel mode, so long as no + * new XIDs or command IDs are assigned. */ switch (s->blockState) @@ -4774,6 +4809,7 @@ RollbackAndReleaseCurrentSubTransaction(void) Assert(s->blockState == TBLOCK_SUBINPROGRESS || s->blockState == TBLOCK_INPROGRESS || s->blockState == TBLOCK_IMPLICIT_INPROGRESS || + s->blockState == TBLOCK_PARALLEL_INPROGRESS || s->blockState == TBLOCK_STARTED); } @@ -5037,10 +5073,15 @@ CommitSubTransaction(void) CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, s->parent->subTransactionId); - /* If in parallel mode, clean up workers and exit parallel mode. */ - if (IsInParallelMode()) + /* + * If this subxact has started any unfinished parallel operation, clean up + * its workers and exit parallel mode. Warn about leaked resources. + */ + AtEOSubXact_Parallel(true, s->subTransactionId); + if (s->parallelModeLevel != 0) { - AtEOSubXact_Parallel(true, s->subTransactionId); + elog(WARNING, "parallelModeLevel is %d not 0 at end of subtransaction", + s->parallelModeLevel); s->parallelModeLevel = 0; } @@ -5213,12 +5254,12 @@ AbortSubTransaction(void) * exports are not supported in subtransactions. */ - /* Exit from parallel mode, if necessary. */ - if (IsInParallelMode()) - { - AtEOSubXact_Parallel(false, s->subTransactionId); - s->parallelModeLevel = 0; - } + /* + * If this subxact has started any unfinished parallel operation, clean up + * its workers and exit parallel mode. Don't warn about leaked resources. + */ + AtEOSubXact_Parallel(false, s->subTransactionId); + s->parallelModeLevel = 0; /* * We can skip all this stuff if the subxact failed before creating a @@ -5377,6 +5418,7 @@ PushTransaction(void) s->prevXactReadOnly = XactReadOnly; s->startedInRecovery = p->startedInRecovery; s->parallelModeLevel = 0; + s->parallelChildXact = (p->parallelModeLevel != 0 || p->parallelChildXact); s->topXidLogged = false; CurrentTransactionState = s; |