aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xact.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r--src/backend/access/transam/xact.c486
1 files changed, 469 insertions, 17 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 511bcbbc519..a8f78d63762 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -22,6 +22,7 @@
#include "access/commit_ts.h"
#include "access/multixact.h"
+#include "access/parallel.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -51,6 +52,7 @@
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
+#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/combocid.h"
#include "utils/guc.h"
@@ -78,6 +80,33 @@ bool XactDeferrable;
int synchronous_commit = SYNCHRONOUS_COMMIT_ON;
/*
+ * When running as a parallel worker, we place only a single
+ * TransactionStateData on the parallel worker's state stack, and the XID
+ * reflected there will be that of the *innermost* currently-active
+ * subtransaction in the backend that initiated paralllelism. However,
+ * GetTopTransactionId() and TransactionIdIsCurrentTransactionId()
+ * need to return the same answers in the parallel worker as they would have
+ * in the user backend, so we need some additional bookkeeping.
+ *
+ * XactTopTransactionId stores the XID of our toplevel transaction, which
+ * will be the same as TopTransactionState.transactionId in an ordinary
+ * backend; but in a parallel backend, which does not have the entire
+ * transaction state, it will instead be copied from the backend that started
+ * the parallel operation.
+ *
+ * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary
+ * backend, but in a parallel backend, nParallelCurrentXids will contain the
+ * number of XIDs that need to be considered current, and ParallelCurrentXids
+ * will contain the XIDs themselves. This includes all XIDs that were current
+ * or sub-committed in the parent at the time the parallel operation began.
+ * The XIDs are stored sorted in numerical order (not logical order) to make
+ * lookups as fast as possible.
+ */
+TransactionId XactTopTransactionId = InvalidTransactionId;
+int nParallelCurrentXids = 0;
+TransactionId *ParallelCurrentXids;
+
+/*
* MyXactAccessedTempRel is set when a temporary relation is accessed.
* We don't allow PREPARE TRANSACTION in that case. (This is global
* so that it can be set from heapam.c.)
@@ -113,6 +142,7 @@ typedef enum TBlockState
/* transaction block states */
TBLOCK_BEGIN, /* starting transaction block */
TBLOCK_INPROGRESS, /* live transaction */
+ TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */
TBLOCK_END, /* COMMIT received */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */
@@ -154,6 +184,7 @@ typedef struct TransactionStateData
bool prevXactReadOnly; /* entry-time xact r/o state */
bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
+ int parallelModeLevel; /* Enter/ExitParallelMode counter */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@@ -184,6 +215,7 @@ static TransactionStateData TopTransactionStateData = {
false, /* entry-time xact r/o state */
false, /* startedInRecovery */
false, /* didLogXid */
+ 0, /* parallelMode */
NULL /* link to parent state block */
};
@@ -353,9 +385,9 @@ IsAbortedTransactionBlockState(void)
TransactionId
GetTopTransactionId(void)
{
- if (!TransactionIdIsValid(TopTransactionStateData.transactionId))
+ if (!TransactionIdIsValid(XactTopTransactionId))
AssignTransactionId(&TopTransactionStateData);
- return TopTransactionStateData.transactionId;
+ return XactTopTransactionId;
}
/*
@@ -368,7 +400,7 @@ GetTopTransactionId(void)
TransactionId
GetTopTransactionIdIfAny(void)
{
- return TopTransactionStateData.transactionId;
+ return XactTopTransactionId;
}
/*
@@ -462,6 +494,13 @@ AssignTransactionId(TransactionState s)
Assert(s->state == TRANS_INPROGRESS);
/*
+ * Workers synchronize transaction state at the beginning of each
+ * parallel operation, so we can't account for new XIDs at this point.
+ */
+ if (IsInParallelMode())
+ elog(ERROR, "cannot assign XIDs during a parallel operation");
+
+ /*
* Ensure parent(s) have XIDs, so that a child always has an XID later
* than its parent. Musn't recurse here, or we might get a stack overflow
* if we're at the bottom of a huge stack of subtransactions none of which
@@ -513,6 +552,8 @@ AssignTransactionId(TransactionState s)
* the Xid as "running". See GetNewTransactionId.
*/
s->transactionId = GetNewTransactionId(isSubXact);
+ if (!isSubXact)
+ XactTopTransactionId = s->transactionId;
if (isSubXact)
SubTransSetParent(s->transactionId, s->parent->transactionId, false);
@@ -644,7 +685,16 @@ GetCurrentCommandId(bool used)
{
/* this is global to a transaction, not subtransaction-local */
if (used)
+ {
+ /*
+ * Forbid setting currentCommandIdUsed in parallel mode, because we
+ * have no provision for communicating this back to the master. We
+ * could relax this restriction when currentCommandIdUsed was already
+ * true at the start of the parallel operation.
+ */
+ Assert(CurrentTransactionState->parallelModeLevel == 0);
currentCommandIdUsed = true;
+ }
return currentCommandId;
}
@@ -738,6 +788,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
return false;
/*
+ * In parallel workers, the XIDs we must consider as current are stored
+ * in ParallelCurrentXids rather than the transaction-state stack. Note
+ * that the XIDs in this array are sorted numerically rather than
+ * according to transactionIdPrecedes order.
+ */
+ if (nParallelCurrentXids > 0)
+ {
+ int low,
+ high;
+
+ low = 0;
+ high = nParallelCurrentXids - 1;
+ while (low <= high)
+ {
+ int middle;
+ TransactionId probe;
+
+ middle = low + (high - low) / 2;
+ probe = ParallelCurrentXids[middle];
+ if (probe == xid)
+ return true;
+ else if (probe < xid)
+ low = middle + 1;
+ else
+ high = middle - 1;
+ }
+ return false;
+ }
+
+ /*
* We will return true for the Xid of the current subtransaction, any of
* its subcommitted children, any of its parents, or any of their
* previously subcommitted children. However, a transaction being aborted
@@ -791,6 +871,48 @@ TransactionStartedDuringRecovery(void)
}
/*
+ * EnterParallelMode
+ */
+void
+EnterParallelMode(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(s->parallelModeLevel >= 0);
+
+ ++s->parallelModeLevel;
+}
+
+/*
+ * ExitParallelMode
+ */
+void
+ExitParallelMode(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ Assert(s->parallelModeLevel > 0);
+ Assert(s->parallelModeLevel > 1 || !ParallelContextActive());
+
+ --s->parallelModeLevel;
+}
+
+/*
+ * IsInParallelMode
+ *
+ * Are we in a parallel operation, as either the master or a worker? Check
+ * this to prohibit operations that change backend-local state expected to
+ * match across all workers. Mere caches usually don't require such a
+ * restriction. State modified in a strict push/pop fashion, such as the
+ * active snapshot stack, is often fine.
+ */
+bool
+IsInParallelMode(void)
+{
+ return CurrentTransactionState->parallelModeLevel != 0;
+}
+
+/*
* CommandCounterIncrement
*/
void
@@ -804,6 +926,14 @@ CommandCounterIncrement(void)
*/
if (currentCommandIdUsed)
{
+ /*
+ * Workers synchronize transaction state at the beginning of each
+ * parallel operation, so we can't account for new commands after that
+ * point.
+ */
+ if (IsInParallelMode())
+ elog(ERROR, "cannot start commands during a parallel operation");
+
currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
@@ -1650,6 +1780,8 @@ StartTransaction(void)
s = &TopTransactionStateData;
CurrentTransactionState = s;
+ Assert(XactTopTransactionId == InvalidTransactionId);
+
/*
* check the current transaction state
*/
@@ -1779,6 +1911,9 @@ CommitTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+ bool is_parallel_worker;
+
+ is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
ShowTransactionState("CommitTransaction");
@@ -1812,7 +1947,8 @@ CommitTransaction(void)
break;
}
- CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
+ CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
+ : XACT_EVENT_PRE_COMMIT);
/*
* The remaining actions cannot call any user-defined code, so it's safe
@@ -1821,6 +1957,13 @@ CommitTransaction(void)
* the transaction-abort path.
*/
+ /* If we might have parallel workers, clean them up now. */
+ if (IsInParallelMode())
+ {
+ AtEOXact_Parallel(true);
+ s->parallelModeLevel = 0;
+ }
+
/* Shut down the deferred-trigger manager */
AfterTriggerEndXact(true);
@@ -1859,10 +2002,28 @@ CommitTransaction(void)
*/
s->state = TRANS_COMMIT;
- /*
- * Here is where we really truly commit.
- */
- latestXid = RecordTransactionCommit();
+ if (!is_parallel_worker)
+ {
+ /*
+ * We need to mark our XIDs as commited in pg_clog. This is where we
+ * durably commit.
+ */
+ latestXid = RecordTransactionCommit();
+ }
+ else
+ {
+ /*
+ * We must not mark our XID committed; the parallel master is
+ * responsible for that.
+ */
+ latestXid = InvalidTransactionId;
+
+ /*
+ * Make sure the master will know about any WAL we wrote before it
+ * commits.
+ */
+ ParallelWorkerReportLastRecEnd(XactLastRecEnd);
+ }
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
@@ -1889,7 +2050,8 @@ CommitTransaction(void)
* state.
*/
- CallXactCallbacks(XACT_EVENT_COMMIT);
+ CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
+ : XACT_EVENT_COMMIT);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -1937,7 +2099,7 @@ CommitTransaction(void)
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
AtEOXact_on_commit_actions(true);
- AtEOXact_Namespace(true);
+ AtEOXact_Namespace(true, is_parallel_worker);
AtEOXact_SMgr();
AtEOXact_Files();
AtEOXact_ComboCid();
@@ -1962,6 +2124,9 @@ CommitTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
+ XactTopTransactionId = InvalidTransactionId;
+ nParallelCurrentXids = 0;
+
/*
* done with commit processing, set current transaction state back to
* default
@@ -1985,6 +2150,8 @@ PrepareTransaction(void)
GlobalTransaction gxact;
TimestampTz prepared_at;
+ Assert(!IsInParallelMode());
+
ShowTransactionState("PrepareTransaction");
/*
@@ -2204,7 +2371,7 @@ PrepareTransaction(void)
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
AtEOXact_on_commit_actions(true);
- AtEOXact_Namespace(true);
+ AtEOXact_Namespace(true, false);
AtEOXact_SMgr();
AtEOXact_Files();
AtEOXact_ComboCid();
@@ -2229,6 +2396,9 @@ PrepareTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
+ XactTopTransactionId = InvalidTransactionId;
+ nParallelCurrentXids = 0;
+
/*
* done with 1st phase commit processing, set current transaction state
* back to default
@@ -2247,6 +2417,7 @@ AbortTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+ bool is_parallel_worker;
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
@@ -2295,6 +2466,7 @@ AbortTransaction(void)
/*
* check the current transaction state
*/
+ is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
@@ -2318,6 +2490,13 @@ AbortTransaction(void)
*/
SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
+ /* If in parallel mode, clean up workers and exit parallel mode. */
+ if (IsInParallelMode())
+ {
+ AtEOXact_Parallel(false);
+ s->parallelModeLevel = 0;
+ }
+
/*
* do abort processing
*/
@@ -2330,9 +2509,23 @@ AbortTransaction(void)
/*
* Advertise the fact that we aborted in pg_clog (assuming that we got as
- * far as assigning an XID to advertise).
+ * far as assigning an XID to advertise). But if we're inside a parallel
+ * worker, skip this; the user backend must be the one to write the abort
+ * record.
*/
- latestXid = RecordTransactionAbort(false);
+ if (!is_parallel_worker)
+ latestXid = RecordTransactionAbort(false);
+ else
+ {
+ latestXid = InvalidTransactionId;
+
+ /*
+ * Since the parallel master won't get our value of XactLastRecEnd in this
+ * case, we nudge WAL-writer ourselves in this case. See related comments in
+ * RecordTransactionAbort for why this matters.
+ */
+ XLogSetAsyncXactLSN(XactLastRecEnd);
+ }
TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
@@ -2350,7 +2543,10 @@ AbortTransaction(void)
*/
if (TopTransactionResourceOwner != NULL)
{
- CallXactCallbacks(XACT_EVENT_ABORT);
+ if (is_parallel_worker)
+ CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT);
+ else
+ CallXactCallbacks(XACT_EVENT_ABORT);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -2371,7 +2567,7 @@ AbortTransaction(void)
AtEOXact_GUC(false, 1);
AtEOXact_SPI(false);
AtEOXact_on_commit_actions(false);
- AtEOXact_Namespace(false);
+ AtEOXact_Namespace(false, is_parallel_worker);
AtEOXact_SMgr();
AtEOXact_Files();
AtEOXact_ComboCid();
@@ -2423,6 +2619,10 @@ CleanupTransaction(void)
s->childXids = NULL;
s->nChildXids = 0;
s->maxChildXids = 0;
+ s->parallelModeLevel = 0;
+
+ XactTopTransactionId = InvalidTransactionId;
+ nParallelCurrentXids = 0;
/*
* done with abort processing, set current transaction state back to
@@ -2476,6 +2676,7 @@ StartTransactionCommand(void)
/* These cases are invalid. */
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -2511,11 +2712,13 @@ CommitTransactionCommand(void)
switch (s->blockState)
{
/*
- * This shouldn't happen, because it means the previous
+ * These shouldn't happen. TBLOCK_DEFAULT means the previous
* StartTransactionCommand didn't set the STARTED state
- * appropriately.
+ * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended
+ * by EndParallelWorkerTranaction(), not this function.
*/
case TBLOCK_DEFAULT:
+ case TBLOCK_PARALLEL_INPROGRESS:
elog(FATAL, "CommitTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2797,6 +3000,7 @@ AbortCurrentTransaction(void)
* ABORT state. We will stay in ABORT until we get a ROLLBACK.
*/
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
AbortTransaction();
s->blockState = TBLOCK_ABORT;
/* CleanupTransaction happens when we exit TBLOCK_ABORT_END */
@@ -3186,6 +3390,7 @@ BeginTransactionBlock(void)
* Already a transaction block in progress.
*/
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBINPROGRESS:
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
@@ -3363,6 +3568,16 @@ EndTransactionBlock(void)
result = true;
break;
+ /*
+ * The user issued a COMMIT that somehow ran inside a parallel
+ * worker. We can't cope with that.
+ */
+ case TBLOCK_PARALLEL_INPROGRESS:
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot commit during a parallel operation")));
+ break;
+
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
@@ -3456,6 +3671,16 @@ UserAbortTransactionBlock(void)
s->blockState = TBLOCK_ABORT_PENDING;
break;
+ /*
+ * The user issued an ABORT that somehow ran inside a parallel
+ * worker. We can't cope with that.
+ */
+ case TBLOCK_PARALLEL_INPROGRESS:
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot abort during a parallel operation")));
+ break;
+
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
@@ -3485,6 +3710,18 @@ DefineSavepoint(char *name)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for new subtransactions after that
+ * point. (Note that this check will certainly error out if s->blockState
+ * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+ * below.)
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot define savepoints during a parallel operation")));
+
switch (s->blockState)
{
case TBLOCK_INPROGRESS:
@@ -3505,6 +3742,7 @@ DefineSavepoint(char *name)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -3539,6 +3777,18 @@ ReleaseSavepoint(List *options)
ListCell *cell;
char *name = NULL;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for transaction state change after that
+ * point. (Note that this check will certainly error out if s->blockState
+ * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+ * below.)
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot release savepoints during a parallel operation")));
+
switch (s->blockState)
{
/*
@@ -3562,6 +3812,7 @@ ReleaseSavepoint(List *options)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -3639,6 +3890,18 @@ RollbackToSavepoint(List *options)
ListCell *cell;
char *name = NULL;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for transaction state change after that
+ * point. (Note that this check will certainly error out if s->blockState
+ * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+ * below.)
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot rollback to savepoints during a parallel operation")));
+
switch (s->blockState)
{
/*
@@ -3663,6 +3926,7 @@ RollbackToSavepoint(List *options)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -3751,6 +4015,20 @@ BeginInternalSubTransaction(char *name)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for new subtransactions after that point.
+ * We might be able to make an exception for the type of subtransaction
+ * established by this function, which is typically used in contexts where
+ * we're going to release or roll back the subtransaction before proceeding
+ * further, so that no enduring change to the transaction state occurs.
+ * For now, however, we prohibit this case along with all the others.
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot start subtransactions during a parallel operation")));
+
switch (s->blockState)
{
case TBLOCK_STARTED:
@@ -3773,6 +4051,7 @@ BeginInternalSubTransaction(char *name)
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_SUBRELEASE:
case TBLOCK_SUBCOMMIT:
@@ -3805,6 +4084,18 @@ ReleaseCurrentSubTransaction(void)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Workers synchronize transaction state at the beginning of each parallel
+ * operation, so we can't account for commit of subtransactions after that
+ * point. This should not happen anyway. Code calling this would
+ * typically have called BeginInternalSubTransaction() first, failing
+ * there.
+ */
+ if (IsInParallelMode())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot commit subtransactions during a parallel operation")));
+
if (s->blockState != TBLOCK_SUBINPROGRESS)
elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
@@ -3827,6 +4118,14 @@ RollbackAndReleaseCurrentSubTransaction(void)
{
TransactionState s = CurrentTransactionState;
+ /*
+ * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted
+ * during parallel operations. That's because we may be in the master,
+ * recovering from an error thrown while we were in parallel mode. We
+ * won't reach here in a worker, because BeginInternalSubTransaction()
+ * will have failed.
+ */
+
switch (s->blockState)
{
/* Must be in a subtransaction */
@@ -3838,6 +4137,7 @@ RollbackAndReleaseCurrentSubTransaction(void)
case TBLOCK_DEFAULT:
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_INPROGRESS:
case TBLOCK_END:
@@ -3913,6 +4213,7 @@ AbortOutOfAnyTransaction(void)
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_END:
case TBLOCK_ABORT_PENDING:
case TBLOCK_PREPARE:
@@ -4004,6 +4305,7 @@ TransactionBlockStatusCode(void)
case TBLOCK_BEGIN:
case TBLOCK_SUBBEGIN:
case TBLOCK_INPROGRESS:
+ case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBINPROGRESS:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
@@ -4107,6 +4409,13 @@ CommitSubTransaction(void)
CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId,
s->parent->subTransactionId);
+ /* If in parallel mode, clean up workers and exit parallel mode. */
+ if (IsInParallelMode())
+ {
+ AtEOSubXact_Parallel(true, s->subTransactionId);
+ s->parallelModeLevel = 0;
+ }
+
/* Do the actual "commit", such as it is */
s->state = TRANS_COMMIT;
@@ -4260,6 +4569,13 @@ AbortSubTransaction(void)
*/
SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
+ /* Exit from parallel mode, if necessary. */
+ if (IsInParallelMode())
+ {
+ AtEOSubXact_Parallel(false, s->subTransactionId);
+ s->parallelModeLevel = 0;
+ }
+
/*
* We can skip all this stuff if the subxact failed before creating a
* ResourceOwner...
@@ -4400,6 +4716,7 @@ PushTransaction(void)
s->blockState = TBLOCK_SUBBEGIN;
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
s->prevXactReadOnly = XactReadOnly;
+ s->parallelModeLevel = 0;
CurrentTransactionState = s;
@@ -4447,6 +4764,139 @@ PopTransaction(void)
}
/*
+ * EstimateTransactionStateSpace
+ * Estimate the amount of space that will be needed by
+ * SerializeTransactionState. It would be OK to overestimate slightly,
+ * but it's simple for us to work out the precise value, so we do.
+ */
+Size
+EstimateTransactionStateSpace(void)
+{
+ TransactionState s;
+ Size nxids = 5; /* iso level, deferrable, top & current XID, XID count */
+
+ for (s = CurrentTransactionState; s != NULL; s = s->parent)
+ {
+ if (TransactionIdIsValid(s->transactionId))
+ nxids = add_size(nxids, 1);
+ nxids = add_size(nxids, s->nChildXids);
+ }
+
+ nxids = add_size(nxids, nParallelCurrentXids);
+ return mul_size(nxids, sizeof(TransactionId));
+}
+
+/*
+ * SerializeTransactionState
+ * Write out relevant details of our transaction state that will be
+ * needed by a parallel worker.
+ *
+ * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs
+ * associated with this transaction. The first eight bytes of the result
+ * contain XactDeferrable and XactIsoLevel; the next eight bytes contain the
+ * XID of the top-level transaction and the XID of the current transaction
+ * (or, in each case, InvalidTransactionId if none). After that, the next 4
+ * bytes contain a count of how many additional XIDs follow; this is followed
+ * by all of those XIDs one after another. We emit the XIDs in sorted order
+ * for the convenience of the receiving process.
+ */
+void
+SerializeTransactionState(Size maxsize, char *start_address)
+{
+ TransactionState s;
+ Size nxids = 0;
+ Size i = 0;
+ TransactionId *workspace;
+ TransactionId *result = (TransactionId *) start_address;
+
+ Assert(maxsize >= 5 * sizeof(TransactionId));
+ result[0] = (TransactionId) XactIsoLevel;
+ result[1] = (TransactionId) XactDeferrable;
+ result[2] = XactTopTransactionId;
+ result[3] = CurrentTransactionState->transactionId;
+
+ /*
+ * If we're running in a parallel worker and launching a parallel worker
+ * of our own, we can just pass along the information that was passed to
+ * us.
+ */
+ if (nParallelCurrentXids > 0)
+ {
+ Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId));
+ result[4] = nParallelCurrentXids;
+ memcpy(&result[5], ParallelCurrentXids,
+ nParallelCurrentXids * sizeof(TransactionId));
+ return;
+ }
+
+ /*
+ * OK, we need to generate a sorted list of XIDs that our workers
+ * should view as current. First, figure out how many there are.
+ */
+ for (s = CurrentTransactionState; s != NULL; s = s->parent)
+ {
+ if (TransactionIdIsValid(s->transactionId))
+ nxids = add_size(nxids, 1);
+ nxids = add_size(nxids, s->nChildXids);
+ }
+ Assert(nxids * sizeof(TransactionId) < maxsize);
+
+ /* Copy them to our scratch space. */
+ workspace = palloc(nxids * sizeof(TransactionId));
+ for (s = CurrentTransactionState; s != NULL; s = s->parent)
+ {
+ if (TransactionIdIsValid(s->transactionId))
+ workspace[i++] = s->transactionId;
+ memcpy(&workspace[i], s->childXids,
+ s->nChildXids * sizeof(TransactionId));
+ i += s->nChildXids;
+ }
+ Assert(i == nxids);
+
+ /* Sort them. */
+ qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
+
+ /* Copy data into output area. */
+ result[4] = (TransactionId) nxids;
+ memcpy(&result[5], workspace, nxids * sizeof(TransactionId));
+}
+
+/*
+ * StartParallelWorkerTransaction
+ * Start a parallel worker transaction, restoring the relevant
+ * transaction state serialized by SerializeTransactionState.
+ */
+void
+StartParallelWorkerTransaction(char *tstatespace)
+{
+ TransactionId *tstate = (TransactionId *) tstatespace;
+
+ Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT);
+ StartTransaction();
+
+ XactIsoLevel = (int) tstate[0];
+ XactDeferrable = (bool) tstate[1];
+ XactTopTransactionId = tstate[2];
+ CurrentTransactionState->transactionId = tstate[3];
+ nParallelCurrentXids = (int) tstate[4];
+ ParallelCurrentXids = &tstate[5];
+
+ CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
+}
+
+/*
+ * EndParallelWorkerTransaction
+ * End a parallel worker transaction.
+ */
+void
+EndParallelWorkerTransaction(void)
+{
+ Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS);
+ CommitTransaction();
+ CurrentTransactionState->blockState = TBLOCK_DEFAULT;
+}
+
+/*
* ShowTransactionState
* Debug support
*/
@@ -4516,6 +4966,8 @@ BlockStateAsString(TBlockState blockState)
return "BEGIN";
case TBLOCK_INPROGRESS:
return "INPROGRESS";
+ case TBLOCK_PARALLEL_INPROGRESS:
+ return "PARALLEL_INPROGRESS";
case TBLOCK_END:
return "END";
case TBLOCK_ABORT: