aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/nbtree/nbtsort.c2
-rw-r--r--src/backend/access/transam/parallel.c18
-rw-r--r--src/backend/access/transam/xact.c7
-rw-r--r--src/backend/executor/execParallel.c2
-rw-r--r--src/backend/optimizer/plan/planner.c11
-rw-r--r--src/backend/storage/lmgr/lwlock.c1
-rw-r--r--src/backend/storage/lmgr/predicate.c237
-rw-r--r--src/backend/utils/resowner/resowner.c2
-rw-r--r--src/include/access/parallel.h3
-rw-r--r--src/include/storage/lwlock.h1
-rw-r--r--src/include/storage/predicate.h11
-rw-r--r--src/include/storage/predicate_internals.h10
-rw-r--r--src/test/isolation/expected/serializable-parallel-2.out44
-rw-r--r--src/test/isolation/expected/serializable-parallel.out44
-rw-r--r--src/test/isolation/isolation_schedule2
-rw-r--r--src/test/isolation/specs/serializable-parallel-2.spec30
-rw-r--r--src/test/isolation/specs/serializable-parallel.spec47
17 files changed, 423 insertions, 49 deletions
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 28c1aeefabb..363dceb5b1c 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1265,7 +1265,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
EnterParallelMode();
Assert(request > 0);
pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main",
- request, true);
+ request);
scantuplesortstates = leaderparticipates ? request + 1 : request;
/*
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index ce2b61631db..55d129a64f7 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -31,6 +31,7 @@
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "storage/ipc.h"
+#include "storage/predicate.h"
#include "storage/sinval.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
@@ -91,6 +92,7 @@ typedef struct FixedParallelState
BackendId parallel_master_backend_id;
TimestampTz xact_ts;
TimestampTz stmt_ts;
+ SerializableXactHandle serializable_xact_handle;
/* Mutex protects remaining fields. */
slock_t mutex;
@@ -155,7 +157,7 @@ static void ParallelWorkerShutdown(int code, Datum arg);
*/
ParallelContext *
CreateParallelContext(const char *library_name, const char *function_name,
- int nworkers, bool serializable_okay)
+ int nworkers)
{
MemoryContext oldcontext;
ParallelContext *pcxt;
@@ -166,16 +168,6 @@ CreateParallelContext(const char *library_name, const char *function_name,
/* Number of workers should be non-negative. */
Assert(nworkers >= 0);
- /*
- * If we are running under serializable isolation, we can't use parallel
- * workers, at least not until somebody enhances that mechanism to be
- * parallel-aware. Utility statement callers may ask us to ignore this
- * restriction because they're always able to safely ignore the fact that
- * SIREAD locks do not work with parallelism.
- */
- if (IsolationIsSerializable() && !serializable_okay)
- nworkers = 0;
-
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
@@ -327,6 +319,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_backend_id = MyBackendId;
fps->xact_ts = GetCurrentTransactionStartTimestamp();
fps->stmt_ts = GetCurrentStatementStartTimestamp();
+ fps->serializable_xact_handle = ShareSerializableXact();
SpinLockInit(&fps->mutex);
fps->last_xlog_end = 0;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -1422,6 +1415,9 @@ ParallelWorkerMain(Datum main_arg)
false);
RestoreEnumBlacklist(enumblacklistspace);
+ /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
+ AttachSerializableXact(fps->serializable_xact_handle);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e93262975d3..6e5891749b4 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2024,9 +2024,12 @@ CommitTransaction(void)
/*
* Mark serializable transaction as complete for predicate locking
* purposes. This should be done as late as we can put it and still allow
- * errors to be raised for failure patterns found at commit.
+ * errors to be raised for failure patterns found at commit. This is not
+ * appropriate in a parallel worker however, because we aren't committing
+ * the leader's transaction and its serializable state will live on.
*/
- PreCommit_CheckForSerializationFailure();
+ if (!is_parallel_worker)
+ PreCommit_CheckForSerializationFailure();
/*
* Insert notifications sent by NOTIFY commands into the queue. This
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index b79be91655b..3d4b01cb4d6 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -604,7 +604,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
- pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
+ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
pei->pcxt = pcxt;
/*
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 9bb068a52e9..e408e77d6fb 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -337,22 +337,13 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* parallel worker. We might eventually be able to relax this
* restriction, but for now it seems best not to have parallel workers
* trying to create their own parallel workers.
- *
- * We can't use parallelism in serializable mode because the predicate
- * locking code is not parallel-aware. It's not catastrophic if someone
- * tries to run a parallel plan in serializable mode; it just won't get
- * any workers and will run serially. But it seems like a good heuristic
- * to assume that the same serialization level will be in effect at plan
- * time and execution time, so don't generate a parallel plan if we're in
- * serializable mode.
*/
if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster &&
parse->commandType == CMD_SELECT &&
!parse->hasModifyingCTE &&
max_parallel_workers_per_gather > 0 &&
- !IsParallelWorker() &&
- !IsolationIsSerializable())
+ !IsParallelWorker())
{
/* all the cheap tests pass, so scan the query tree */
glob->maxParallelHazard = max_parallel_hazard(parse);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 81dac45ae57..bc1aa88322b 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -521,6 +521,7 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
+ LWLockRegisterTranche(LWTRANCHE_SXACT, "serializable_xact");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 6fc11f26f0e..92beaab5663 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -97,7 +97,9 @@
* - All transactions share this single lock (with no partitioning).
* - There is never a need for a process other than the one running
* an active transaction to walk the list of locks held by that
- * transaction.
+ * transaction, except parallel query workers sharing the leader's
+ * transaction. In the parallel case, an extra per-sxact lock is
+ * taken; see below.
* - It is relatively infrequent that another process needs to
* modify the list for a transaction, but it does happen for such
* things as index page splits for pages with predicate locks and
@@ -116,6 +118,12 @@
* than its own active transaction must acquire an exclusive
* lock.
*
+ * SERIALIZABLEXACT's member 'predicateLockListLock'
+ * - Protects the linked list of locks held by a transaction. Only
+ * needed for parallel mode, where multiple backends share the
+ * same SERIALIZABLEXACT object. Not needed if
+ * SerializablePredicateLockListLock is held exclusively.
+ *
* PredicateLockHashPartitionLock(hashcode)
* - The same lock protects a target, all locks on that target, and
* the linked list of locks on the target.
@@ -162,7 +170,7 @@
* PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
* BlockNumber newblkno)
* TransferPredicateLocksToHeapRelation(Relation relation)
- * ReleasePredicateLocks(bool isCommit)
+ * ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
*
* conflict detection (may also trigger rollback)
* CheckForSerializableConflictOut(bool visible, Relation relation,
@@ -187,6 +195,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/slru.h"
#include "access/subtrans.h"
#include "access/transam.h"
@@ -279,6 +288,7 @@
#define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
#define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
#define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
/*
* Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -409,6 +419,15 @@ static HTAB *LocalPredicateLockHash = NULL;
static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
static bool MyXactDidWrite = false;
+/*
+ * The SXACT_FLAG_RO_UNSAFE optimization might lead us to release
+ * MySerializableXact early. If that happens in a parallel query, the leader
+ * needs to defer the destruction of the SERIALIZABLEXACT until end of
+ * transaction, because the workers still have a reference to it. In that
+ * case, the leader stores it here.
+ */
+static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact;
+
/* local functions */
static SERIALIZABLEXACT *CreatePredXact(void);
@@ -465,6 +484,8 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
SERIALIZABLEXACT *writer);
+static void CreateLocalPredicateLockHash(void);
+static void ReleasePredicateLocksLocal(void);
/*------------------------------------------------------------------------*/
@@ -521,7 +542,7 @@ SerializationNeededForRead(Relation relation, Snapshot snapshot)
*/
if (SxactIsROSafe(MySerializableXact))
{
- ReleasePredicateLocks(false);
+ ReleasePredicateLocks(false, true);
return false;
}
@@ -1168,6 +1189,8 @@ InitPredicateLocks(void)
memset(PredXact->element, 0, requestSize);
for (i = 0; i < max_table_size; i++)
{
+ LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock,
+ LWTRANCHE_SXACT);
SHMQueueInsertBefore(&(PredXact->availableList),
&(PredXact->element[i].link));
}
@@ -1513,14 +1536,14 @@ GetSafeSnapshot(Snapshot origSnapshot)
ereport(DEBUG2,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("deferrable snapshot was unsafe; trying a new one")));
- ReleasePredicateLocks(false);
+ ReleasePredicateLocks(false, false);
}
/*
* Now we have a safe snapshot, so we don't need to do any further checks.
*/
Assert(SxactIsROSafe(MySerializableXact));
- ReleasePredicateLocks(false);
+ ReleasePredicateLocks(false, true);
return snapshot;
}
@@ -1634,6 +1657,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
Assert(IsolationIsSerializable());
/*
+ * If this is called by parallel.c in a parallel worker, we don't want to
+ * create a SERIALIZABLEXACT just yet because the leader's
+ * SERIALIZABLEXACT will be installed with AttachSerializableXact(). We
+ * also don't want to reject SERIALIZABLE READ ONLY DEFERRABLE in this
+ * case, because the leader has already determined that the snapshot it
+ * has passed us is safe. So there is nothing for us to do.
+ */
+ if (IsParallelWorker())
+ return;
+
+ /*
* We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
* import snapshots, since there's no way to wait for a safe snapshot when
* we're using the snap we're told to. (XXX instead of throwing an error,
@@ -1666,7 +1700,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
VirtualTransactionId vxid;
SERIALIZABLEXACT *sxact,
*othersxact;
- HASHCTL hash_ctl;
/* We only do this for serializable transactions. Once. */
Assert(MySerializableXact == InvalidSerializableXact);
@@ -1813,6 +1846,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
LWLockRelease(SerializableXactHashLock);
+ CreateLocalPredicateLockHash();
+
+ return snapshot;
+}
+
+static void
+CreateLocalPredicateLockHash(void)
+{
+ HASHCTL hash_ctl;
+
/* Initialize the backend-local hash table of parent locks */
Assert(LocalPredicateLockHash == NULL);
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
@@ -1822,8 +1865,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
max_predicate_locks_per_xact,
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
-
- return snapshot;
}
/*
@@ -2078,7 +2119,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
* This implementation is assuming that the usage of each target tag field
* is uniform. No need to make this hard if we don't have to.
*
- * We aren't acquiring lightweight locks for the predicate lock or lock
+ * We acquire an LWLock in the case of parallel mode, because worker
+ * backends have access to the leader's SERIALIZABLEXACT. Otherwise,
+ * we aren't acquiring LWLocks for the predicate lock or lock
* target structures associated with this transaction unless we're going
* to modify them, because no other process is permitted to modify our
* locks.
@@ -2091,6 +2134,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
sxact = MySerializableXact;
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
@@ -2144,6 +2189,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
predlock = nextpredlock;
}
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
}
@@ -2342,6 +2389,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
partitionLock = PredicateLockHashPartitionLock(targettaghash);
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/* Make sure that the target is represented. */
@@ -2379,6 +2428,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
}
LWLockRelease(partitionLock);
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
}
@@ -2566,7 +2617,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
PREDICATELOCK *nextpredlock;
bool found;
- Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE));
Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
predlock = (PREDICATELOCK *)
@@ -2626,7 +2678,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
* covers it, or if we are absolutely certain that no one will need to
* refer to that lock in the future.
*
- * Caller must hold SerializablePredicateLockListLock.
+ * Caller must hold SerializablePredicateLockListLock exclusively.
*/
static bool
TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
@@ -2641,7 +2693,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
bool found;
bool outOfShmem = false;
- Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE));
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
@@ -3217,9 +3270,17 @@ SetNewSxactGlobalXmin(void)
* If this transaction is committing and is holding any predicate locks,
* it must be added to a list of completed serializable transactions still
* holding locks.
+ *
+ * If isReadOnlySafe is true, then predicate locks are being released before
+ * the end of the transaction because MySerializableXact has been determined
+ * to be RO_SAFE. In non-parallel mode we can release it completely, but it
+ * in parallel mode we partially release the SERIALIZABLEXACT and keep it
+ * around until the end of the transaction, allowing each backend to clear its
+ * MySerializableXact variable and benefit from the optimization in its own
+ * time.
*/
void
-ReleasePredicateLocks(bool isCommit)
+ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
{
bool needToClear;
RWConflict conflict,
@@ -3238,6 +3299,44 @@ ReleasePredicateLocks(bool isCommit)
*/
bool topLevelIsDeclaredReadOnly;
+ /* We can't be both committing and releasing early due to RO_SAFE. */
+ Assert(!(isCommit && isReadOnlySafe));
+
+ /* Are we at the end of a transaction, that is, a commit or abort? */
+ if (!isReadOnlySafe)
+ {
+ /*
+ * Parallel workers mustn't release predicate locks at the end of
+ * their transaction. The leader will do that at the end of its
+ * transaction.
+ */
+ if (IsParallelWorker())
+ {
+ ReleasePredicateLocksLocal();
+ return;
+ }
+
+ /*
+ * By the time the leader in a parallel query reaches end of
+ * transaction, it has waited for all workers to exit.
+ */
+ Assert(!ParallelContextActive());
+
+ /*
+ * If the leader in a parallel query earlier stashed a partially
+ * released SERIALIZABLEXACT for final clean-up at end of transaction
+ * (because workers might still have been accessing it), then it's
+ * time to restore it.
+ */
+ if (SavedSerializableXact != InvalidSerializableXact)
+ {
+ Assert(MySerializableXact == InvalidSerializableXact);
+ MySerializableXact = SavedSerializableXact;
+ SavedSerializableXact = InvalidSerializableXact;
+ Assert(SxactIsPartiallyReleased(MySerializableXact));
+ }
+ }
+
if (MySerializableXact == InvalidSerializableXact)
{
Assert(LocalPredicateLockHash == NULL);
@@ -3246,10 +3345,51 @@ ReleasePredicateLocks(bool isCommit)
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ /*
+ * If the transaction is committing, but it has been partially released
+ * already, then treat this as a roll back. It was marked as rolled back.
+ */
+ if (isCommit && SxactIsPartiallyReleased(MySerializableXact))
+ isCommit = false;
+
+ /*
+ * If we're called in the middle of a transaction because we discovered
+ * that the SXACT_FLAG_RO_SAFE flag was set, then we'll partially release
+ * it (that is, release the predicate locks and conflicts, but not the
+ * SERIALIZABLEXACT itself) if we're the first backend to have noticed.
+ */
+ if (isReadOnlySafe && IsInParallelMode())
+ {
+ /*
+ * The leader needs to stash a pointer to it, so that it can
+ * completely release it at end-of-transaction.
+ */
+ if (!IsParallelWorker())
+ SavedSerializableXact = MySerializableXact;
+
+ /*
+ * The first backend to reach this condition will partially release
+ * the SERIALIZABLEXACT. All others will just clear their
+ * backend-local state so that they stop doing SSI checks for the rest
+ * of the transaction.
+ */
+ if (SxactIsPartiallyReleased(MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ReleasePredicateLocksLocal();
+ return;
+ }
+ else
+ {
+ MySerializableXact->flags |= SXACT_FLAG_PARTIALLY_RELEASED;
+ /* ... and proceed to perform the partial release below. */
+ }
+ }
Assert(!isCommit || SxactIsPrepared(MySerializableXact));
Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
Assert(!SxactIsCommitted(MySerializableXact));
- Assert(!SxactIsRolledBack(MySerializableXact));
+ Assert(SxactIsPartiallyReleased(MySerializableXact)
+ || !SxactIsRolledBack(MySerializableXact));
/* may not be serializable during COMMIT/ROLLBACK PREPARED */
Assert(MySerializableXact->pid == 0 || IsolationIsSerializable());
@@ -3273,8 +3413,8 @@ ReleasePredicateLocks(bool isCommit)
MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
/*
- * If it's not a commit it's a rollback, and we can clear our locks
- * immediately.
+ * If it's not a commit it's either a rollback or a read-only transaction
+ * flagged SXACT_FLAG_RO_SAFE, and we can clear our locks immediately.
*/
if (isCommit)
{
@@ -3298,7 +3438,8 @@ ReleasePredicateLocks(bool isCommit)
* cleanup. This means it should not be considered when calculating
* SxactGlobalXmin.
*/
- MySerializableXact->flags |= SXACT_FLAG_DOOMED;
+ if (!isReadOnlySafe)
+ MySerializableXact->flags |= SXACT_FLAG_DOOMED;
MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
/*
@@ -3494,7 +3635,8 @@ ReleasePredicateLocks(bool isCommit)
* was launched.
*/
needToClear = false;
- if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
+ if (!isReadOnlySafe &&
+ TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
{
Assert(PredXact->SxactGlobalXminCount > 0);
if (--(PredXact->SxactGlobalXminCount) == 0)
@@ -3513,14 +3655,28 @@ ReleasePredicateLocks(bool isCommit)
SHMQueueInsertBefore(FinishedSerializableTransactions,
&MySerializableXact->finishedLink);
+ /*
+ * If we're releasing a RO_SAFE transaction in parallel mode, we'll only
+ * partially release it. That's necessary because other backends may have
+ * a reference to it. The leader will release the SERIALIZABLEXACT itself
+ * at the end of the transaction after workers have stopped running.
+ */
if (!isCommit)
- ReleaseOneSerializableXact(MySerializableXact, false, false);
+ ReleaseOneSerializableXact(MySerializableXact,
+ isReadOnlySafe && IsInParallelMode(),
+ false);
LWLockRelease(SerializableFinishedListLock);
if (needToClear)
ClearOldPredicateLocks();
+ ReleasePredicateLocksLocal();
+}
+
+static void
+ReleasePredicateLocksLocal(void)
+{
MySerializableXact = InvalidSerializableXact;
MyXactDidWrite = false;
@@ -3712,6 +3868,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
* them to OldCommittedSxact if summarize is true)
*/
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
@@ -3791,6 +3949,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
*/
SHMQueueInit(&sxact->predicateLocks);
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
sxidtag.xid = sxact->topXid;
@@ -4213,6 +4373,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
PREDICATELOCK *rmpredlock;
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&MySerializableXact->predicateLockListLock, LW_EXCLUSIVE);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
@@ -4247,6 +4409,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
LWLockRelease(SerializableXactHashLock);
LWLockRelease(partitionLock);
+ if (IsInParallelMode())
+ LWLockRelease(&MySerializableXact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
if (rmpredlock != NULL)
@@ -4677,6 +4841,7 @@ PreCommit_CheckForSerializationFailure(void)
/* Check if someone else has already decided that we need to die */
if (SxactIsDoomed(MySerializableXact))
{
+ Assert(!SxactIsPartiallyReleased(MySerializableXact));
LWLockRelease(SerializableXactHashLock);
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
@@ -4795,6 +4960,13 @@ AtPrepare_PredicateLocks(void)
*/
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ /*
+ * No need to take sxact->predicateLockListLock in parallel mode because
+ * there cannot be any parallel workers running while we are preparing a
+ * transaction.
+ */
+ Assert(!IsParallelWorker() && !ParallelContextActive());
+
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
@@ -4867,7 +5039,7 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
MySerializableXact = sxid->myXact;
MyXactDidWrite = true; /* conservatively assume that we wrote
* something */
- ReleasePredicateLocks(isCommit);
+ ReleasePredicateLocks(isCommit, false);
}
/*
@@ -5003,3 +5175,28 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
}
}
+
+/*
+ * Prepare to share the current SERIALIZABLEXACT with parallel workers.
+ * Return a handle object that can be used by AttachSerializableXact() in a
+ * parallel worker.
+ */
+SerializableXactHandle
+ShareSerializableXact(void)
+{
+ return MySerializableXact;
+}
+
+/*
+ * Allow parallel workers to import the leader's SERIALIZABLEXACT.
+ */
+void
+AttachSerializableXact(SerializableXactHandle handle)
+{
+
+ Assert(MySerializableXact == InvalidSerializableXact);
+
+ MySerializableXact = (SERIALIZABLEXACT *) handle;
+ if (MySerializableXact != InvalidSerializableXact)
+ CreateLocalPredicateLockHash();
+}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index f7597b0991b..64aafef3114 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -566,7 +566,7 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
if (owner == TopTransactionResourceOwner)
{
ProcReleaseLocks(isCommit);
- ReleasePredicateLocks(isCommit);
+ ReleasePredicateLocks(isCommit, false);
}
}
else
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index fc220df533d..e650bb2eef1 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -60,8 +60,7 @@ extern PGDLLIMPORT bool InitializingParallelWorker;
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
extern ParallelContext *CreateParallelContext(const char *library_name,
- const char *function_name, int nworkers,
- bool serializable_okay);
+ const char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 96c77320066..08e0dc8144b 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -219,6 +219,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_SHARED_TUPLESTORE,
LWTRANCHE_TBM,
LWTRANCHE_PARALLEL_APPEND,
+ LWTRANCHE_SXACT,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 3d87a631db9..23980c6ede7 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -30,6 +30,11 @@ extern int max_predicate_locks_per_page;
/* Number of SLRU buffers to use for predicate locking */
#define NUM_OLDSERXID_BUFFERS 16
+/*
+ * A handle used for sharing SERIALIZABLEXACT objects between the participants
+ * in a parallel query.
+ */
+typedef void *SerializableXactHandle;
/*
* function prototypes
@@ -56,7 +61,7 @@ extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snap
extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
extern void TransferPredicateLocksToHeapRelation(Relation relation);
-extern void ReleasePredicateLocks(bool isCommit);
+extern void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe);
/* conflict detection (may also trigger rollback) */
extern void CheckForSerializableConflictOut(bool valid, Relation relation, HeapTuple tuple,
@@ -74,4 +79,8 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
void *recdata, uint32 len);
+/* parallel query support */
+extern SerializableXactHandle ShareSerializableXact(void);
+extern void AttachSerializableXact(SerializableXactHandle handle);
+
#endif /* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 7814d7e7219..451ec0eecdf 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -15,6 +15,7 @@
#define PREDICATE_INTERNALS_H
#include "storage/lock.h"
+#include "storage/lwlock.h"
/*
* Commit number.
@@ -91,6 +92,9 @@ typedef struct SERIALIZABLEXACT
SHM_QUEUE finishedLink; /* list link in
* FinishedSerializableTransactions */
+ LWLock predicateLockListLock; /* protects predicateLocks in parallel
+ * mode */
+
/*
* for r/o transactions: list of concurrent r/w transactions that we could
* potentially have conflicts with, and vice versa for r/w transactions
@@ -123,6 +127,12 @@ typedef struct SERIALIZABLEXACT
#define SXACT_FLAG_RO_UNSAFE 0x00000100
#define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200
#define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+/*
+ * The following flag means the transaction has been partially released
+ * already, but is being preserved because parallel workers might have a
+ * reference to it. It'll be recycled by the leader at end-of-transaction.
+ */
+#define SXACT_FLAG_PARTIALLY_RELEASED 0x00000800
/*
* The following types are used to provide an ad hoc list for holding
diff --git a/src/test/isolation/expected/serializable-parallel-2.out b/src/test/isolation/expected/serializable-parallel-2.out
new file mode 100644
index 00000000000..9a693c4dc62
--- /dev/null
+++ b/src/test/isolation/expected/serializable-parallel-2.out
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1r s2r1 s1c s2r2 s2c
+step s1r: SELECT * FROM foo;
+a
+
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+step s2r1: SELECT * FROM foo;
+a
+
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+step s1c: COMMIT;
+step s2r2: SELECT * FROM foo;
+a
+
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+step s2c: COMMIT;
diff --git a/src/test/isolation/expected/serializable-parallel.out b/src/test/isolation/expected/serializable-parallel.out
new file mode 100644
index 00000000000..f43aa6a2990
--- /dev/null
+++ b/src/test/isolation/expected/serializable-parallel.out
@@ -0,0 +1,44 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s2rx s2ry s1ry s1wy s1c s2wx s2c s3c
+step s2rx: SELECT balance FROM bank_account WHERE id = 'X';
+balance
+
+0
+step s2ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y';
+step s1c: COMMIT;
+step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X';
+step s2c: COMMIT;
+step s3c: COMMIT;
+
+starting permutation: s2rx s2ry s1ry s1wy s1c s3r s3c s2wx
+step s2rx: SELECT balance FROM bank_account WHERE id = 'X';
+balance
+
+0
+step s2ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y';
+step s1c: COMMIT;
+step s3r: SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id;
+id balance
+
+X 0
+Y 20
+step s3c: COMMIT;
+step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X';
+ERROR: could not serialize access due to read/write dependencies among transactions
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 91d9d90135b..70d47b3e687 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -78,3 +78,5 @@ test: partition-key-update-3
test: partition-key-update-4
test: plpgsql-toast
test: truncate-conflict
+test: serializable-parallel
+test: serializable-parallel-2
diff --git a/src/test/isolation/specs/serializable-parallel-2.spec b/src/test/isolation/specs/serializable-parallel-2.spec
new file mode 100644
index 00000000000..7f90f75d882
--- /dev/null
+++ b/src/test/isolation/specs/serializable-parallel-2.spec
@@ -0,0 +1,30 @@
+# Exercise the case where a read-only serializable transaction has
+# SXACT_FLAG_RO_SAFE set in a parallel query.
+
+setup
+{
+ CREATE TABLE foo AS SELECT generate_series(1, 10)::int a;
+ ALTER TABLE foo SET (parallel_workers = 2);
+}
+
+teardown
+{
+ DROP TABLE foo;
+}
+
+session "s1"
+setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s1r" { SELECT * FROM foo; }
+step "s1c" { COMMIT; }
+
+session "s2"
+setup {
+ BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY;
+ SET parallel_setup_cost = 0;
+ SET parallel_tuple_cost = 0;
+ }
+step "s2r1" { SELECT * FROM foo; }
+step "s2r2" { SELECT * FROM foo; }
+step "s2c" { COMMIT; }
+
+permutation "s1r" "s2r1" "s1c" "s2r2" "s2c"
diff --git a/src/test/isolation/specs/serializable-parallel.spec b/src/test/isolation/specs/serializable-parallel.spec
new file mode 100644
index 00000000000..a4f488adfc8
--- /dev/null
+++ b/src/test/isolation/specs/serializable-parallel.spec
@@ -0,0 +1,47 @@
+# The example from the paper "A read-only transaction anomaly under snapshot
+# isolation"[1].
+#
+# Here we test that serializable snapshot isolation (SERIALIZABLE) doesn't
+# suffer from the anomaly, because s2 is aborted upon detection of a cycle.
+# In this case the read only query s3 happens to be running in a parallel
+# worker.
+#
+# [1] http://www.cs.umb.edu/~poneil/ROAnom.pdf
+
+setup
+{
+ CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL);
+ INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0);
+}
+
+teardown
+{
+ DROP TABLE bank_account;
+}
+
+session "s1"
+setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s1ry" { SELECT balance FROM bank_account WHERE id = 'Y'; }
+step "s1wy" { UPDATE bank_account SET balance = 20 WHERE id = 'Y'; }
+step "s1c" { COMMIT; }
+
+session "s2"
+setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s2rx" { SELECT balance FROM bank_account WHERE id = 'X'; }
+step "s2ry" { SELECT balance FROM bank_account WHERE id = 'Y'; }
+step "s2wx" { UPDATE bank_account SET balance = -11 WHERE id = 'X'; }
+step "s2c" { COMMIT; }
+
+session "s3"
+setup {
+ BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+ SET force_parallel_mode = on;
+ }
+step "s3r" { SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; }
+step "s3c" { COMMIT; }
+
+# without s3, s1 and s2 commit
+permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s2wx" "s2c" "s3c"
+
+# once s3 observes the data committed by s1, a cycle is created and s2 aborts
+permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s3r" "s3c" "s2wx"