diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/nbtree/nbtsort.c | 2 | ||||
-rw-r--r-- | src/backend/access/transam/parallel.c | 18 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 7 | ||||
-rw-r--r-- | src/backend/executor/execParallel.c | 2 | ||||
-rw-r--r-- | src/backend/optimizer/plan/planner.c | 11 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 1 | ||||
-rw-r--r-- | src/backend/storage/lmgr/predicate.c | 237 | ||||
-rw-r--r-- | src/backend/utils/resowner/resowner.c | 2 | ||||
-rw-r--r-- | src/include/access/parallel.h | 3 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 1 | ||||
-rw-r--r-- | src/include/storage/predicate.h | 11 | ||||
-rw-r--r-- | src/include/storage/predicate_internals.h | 10 | ||||
-rw-r--r-- | src/test/isolation/expected/serializable-parallel-2.out | 44 | ||||
-rw-r--r-- | src/test/isolation/expected/serializable-parallel.out | 44 | ||||
-rw-r--r-- | src/test/isolation/isolation_schedule | 2 | ||||
-rw-r--r-- | src/test/isolation/specs/serializable-parallel-2.spec | 30 | ||||
-rw-r--r-- | src/test/isolation/specs/serializable-parallel.spec | 47 |
17 files changed, 423 insertions, 49 deletions
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 28c1aeefabb..363dceb5b1c 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1265,7 +1265,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) EnterParallelMode(); Assert(request > 0); pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main", - request, true); + request); scantuplesortstates = leaderparticipates ? request + 1 : request; /* diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ce2b61631db..55d129a64f7 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -31,6 +31,7 @@ #include "optimizer/optimizer.h" #include "pgstat.h" #include "storage/ipc.h" +#include "storage/predicate.h" #include "storage/sinval.h" #include "storage/spin.h" #include "tcop/tcopprot.h" @@ -91,6 +92,7 @@ typedef struct FixedParallelState BackendId parallel_master_backend_id; TimestampTz xact_ts; TimestampTz stmt_ts; + SerializableXactHandle serializable_xact_handle; /* Mutex protects remaining fields. */ slock_t mutex; @@ -155,7 +157,7 @@ static void ParallelWorkerShutdown(int code, Datum arg); */ ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, - int nworkers, bool serializable_okay) + int nworkers) { MemoryContext oldcontext; ParallelContext *pcxt; @@ -166,16 +168,6 @@ CreateParallelContext(const char *library_name, const char *function_name, /* Number of workers should be non-negative. */ Assert(nworkers >= 0); - /* - * If we are running under serializable isolation, we can't use parallel - * workers, at least not until somebody enhances that mechanism to be - * parallel-aware. Utility statement callers may ask us to ignore this - * restriction because they're always able to safely ignore the fact that - * SIREAD locks do not work with parallelism. - */ - if (IsolationIsSerializable() && !serializable_okay) - nworkers = 0; - /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); @@ -327,6 +319,7 @@ InitializeParallelDSM(ParallelContext *pcxt) fps->parallel_master_backend_id = MyBackendId; fps->xact_ts = GetCurrentTransactionStartTimestamp(); fps->stmt_ts = GetCurrentStatementStartTimestamp(); + fps->serializable_xact_handle = ShareSerializableXact(); SpinLockInit(&fps->mutex); fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); @@ -1422,6 +1415,9 @@ ParallelWorkerMain(Datum main_arg) false); RestoreEnumBlacklist(enumblacklistspace); + /* Attach to the leader's serializable transaction, if SERIALIZABLE. */ + AttachSerializableXact(fps->serializable_xact_handle); + /* * We've initialized all of our state now; nothing should change * hereafter. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e93262975d3..6e5891749b4 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2024,9 +2024,12 @@ CommitTransaction(void) /* * Mark serializable transaction as complete for predicate locking * purposes. This should be done as late as we can put it and still allow - * errors to be raised for failure patterns found at commit. + * errors to be raised for failure patterns found at commit. This is not + * appropriate in a parallel worker however, because we aren't committing + * the leader's transaction and its serializable state will live on. */ - PreCommit_CheckForSerializationFailure(); + if (!is_parallel_worker) + PreCommit_CheckForSerializationFailure(); /* * Insert notifications sent by NOTIFY commands into the queue. This diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index b79be91655b..3d4b01cb4d6 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -604,7 +604,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ - pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false); + pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); pei->pcxt = pcxt; /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 9bb068a52e9..e408e77d6fb 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -337,22 +337,13 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * parallel worker. We might eventually be able to relax this * restriction, but for now it seems best not to have parallel workers * trying to create their own parallel workers. - * - * We can't use parallelism in serializable mode because the predicate - * locking code is not parallel-aware. It's not catastrophic if someone - * tries to run a parallel plan in serializable mode; it just won't get - * any workers and will run serially. But it seems like a good heuristic - * to assume that the same serialization level will be in effect at plan - * time and execution time, so don't generate a parallel plan if we're in - * serializable mode. */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && parse->commandType == CMD_SELECT && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && - !IsParallelWorker() && - !IsolationIsSerializable()) + !IsParallelWorker()) { /* all the cheap tests pass, so scan the query tree */ glob->maxParallelHazard = max_parallel_hazard(parse); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 81dac45ae57..bc1aa88322b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -521,6 +521,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join"); + LWLockRegisterTranche(LWTRANCHE_SXACT, "serializable_xact"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 6fc11f26f0e..92beaab5663 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -97,7 +97,9 @@ * - All transactions share this single lock (with no partitioning). * - There is never a need for a process other than the one running * an active transaction to walk the list of locks held by that - * transaction. + * transaction, except parallel query workers sharing the leader's + * transaction. In the parallel case, an extra per-sxact lock is + * taken; see below. * - It is relatively infrequent that another process needs to * modify the list for a transaction, but it does happen for such * things as index page splits for pages with predicate locks and @@ -116,6 +118,12 @@ * than its own active transaction must acquire an exclusive * lock. * + * SERIALIZABLEXACT's member 'predicateLockListLock' + * - Protects the linked list of locks held by a transaction. Only + * needed for parallel mode, where multiple backends share the + * same SERIALIZABLEXACT object. Not needed if + * SerializablePredicateLockListLock is held exclusively. + * * PredicateLockHashPartitionLock(hashcode) * - The same lock protects a target, all locks on that target, and * the linked list of locks on the target. @@ -162,7 +170,7 @@ * PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, * BlockNumber newblkno) * TransferPredicateLocksToHeapRelation(Relation relation) - * ReleasePredicateLocks(bool isCommit) + * ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * * conflict detection (may also trigger rollback) * CheckForSerializableConflictOut(bool visible, Relation relation, @@ -187,6 +195,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/slru.h" #include "access/subtrans.h" #include "access/transam.h" @@ -279,6 +288,7 @@ #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0) #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0) #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0) +#define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0) /* * Compute the hash code associated with a PREDICATELOCKTARGETTAG. @@ -409,6 +419,15 @@ static HTAB *LocalPredicateLockHash = NULL; static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact; static bool MyXactDidWrite = false; +/* + * The SXACT_FLAG_RO_UNSAFE optimization might lead us to release + * MySerializableXact early. If that happens in a parallel query, the leader + * needs to defer the destruction of the SERIALIZABLEXACT until end of + * transaction, because the workers still have a reference to it. In that + * case, the leader stores it here. + */ +static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact; + /* local functions */ static SERIALIZABLEXACT *CreatePredXact(void); @@ -465,6 +484,8 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag); static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); +static void CreateLocalPredicateLockHash(void); +static void ReleasePredicateLocksLocal(void); /*------------------------------------------------------------------------*/ @@ -521,7 +542,7 @@ SerializationNeededForRead(Relation relation, Snapshot snapshot) */ if (SxactIsROSafe(MySerializableXact)) { - ReleasePredicateLocks(false); + ReleasePredicateLocks(false, true); return false; } @@ -1168,6 +1189,8 @@ InitPredicateLocks(void) memset(PredXact->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { + LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock, + LWTRANCHE_SXACT); SHMQueueInsertBefore(&(PredXact->availableList), &(PredXact->element[i].link)); } @@ -1513,14 +1536,14 @@ GetSafeSnapshot(Snapshot origSnapshot) ereport(DEBUG2, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("deferrable snapshot was unsafe; trying a new one"))); - ReleasePredicateLocks(false); + ReleasePredicateLocks(false, false); } /* * Now we have a safe snapshot, so we don't need to do any further checks. */ Assert(SxactIsROSafe(MySerializableXact)); - ReleasePredicateLocks(false); + ReleasePredicateLocks(false, true); return snapshot; } @@ -1634,6 +1657,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, Assert(IsolationIsSerializable()); /* + * If this is called by parallel.c in a parallel worker, we don't want to + * create a SERIALIZABLEXACT just yet because the leader's + * SERIALIZABLEXACT will be installed with AttachSerializableXact(). We + * also don't want to reject SERIALIZABLE READ ONLY DEFERRABLE in this + * case, because the leader has already determined that the snapshot it + * has passed us is safe. So there is nothing for us to do. + */ + if (IsParallelWorker()) + return; + + /* * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to * import snapshots, since there's no way to wait for a safe snapshot when * we're using the snap we're told to. (XXX instead of throwing an error, @@ -1666,7 +1700,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, VirtualTransactionId vxid; SERIALIZABLEXACT *sxact, *othersxact; - HASHCTL hash_ctl; /* We only do this for serializable transactions. Once. */ Assert(MySerializableXact == InvalidSerializableXact); @@ -1813,6 +1846,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, LWLockRelease(SerializableXactHashLock); + CreateLocalPredicateLockHash(); + + return snapshot; +} + +static void +CreateLocalPredicateLockHash(void) +{ + HASHCTL hash_ctl; + /* Initialize the backend-local hash table of parent locks */ Assert(LocalPredicateLockHash == NULL); MemSet(&hash_ctl, 0, sizeof(hash_ctl)); @@ -1822,8 +1865,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, max_predicate_locks_per_xact, &hash_ctl, HASH_ELEM | HASH_BLOBS); - - return snapshot; } /* @@ -2078,7 +2119,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) * This implementation is assuming that the usage of each target tag field * is uniform. No need to make this hard if we don't have to. * - * We aren't acquiring lightweight locks for the predicate lock or lock + * We acquire an LWLock in the case of parallel mode, because worker + * backends have access to the leader's SERIALIZABLEXACT. Otherwise, + * we aren't acquiring LWLocks for the predicate lock or lock * target structures associated with this transaction unless we're going * to modify them, because no other process is permitted to modify our * locks. @@ -2091,6 +2134,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); sxact = MySerializableXact; + if (IsInParallelMode()) + LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -2144,6 +2189,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) predlock = nextpredlock; } + if (IsInParallelMode()) + LWLockRelease(&sxact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); } @@ -2342,6 +2389,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, partitionLock = PredicateLockHashPartitionLock(targettaghash); LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* Make sure that the target is represented. */ @@ -2379,6 +2428,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, } LWLockRelease(partitionLock); + if (IsInParallelMode()) + LWLockRelease(&sxact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); } @@ -2566,7 +2617,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) PREDICATELOCK *nextpredlock; bool found; - Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE)); Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash))); predlock = (PREDICATELOCK *) @@ -2626,7 +2678,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) * covers it, or if we are absolutely certain that no one will need to * refer to that lock in the future. * - * Caller must hold SerializablePredicateLockListLock. + * Caller must hold SerializablePredicateLockListLock exclusively. */ static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, @@ -2641,7 +2693,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, bool found; bool outOfShmem = false; - Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE)); oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag); @@ -3217,9 +3270,17 @@ SetNewSxactGlobalXmin(void) * If this transaction is committing and is holding any predicate locks, * it must be added to a list of completed serializable transactions still * holding locks. + * + * If isReadOnlySafe is true, then predicate locks are being released before + * the end of the transaction because MySerializableXact has been determined + * to be RO_SAFE. In non-parallel mode we can release it completely, but it + * in parallel mode we partially release the SERIALIZABLEXACT and keep it + * around until the end of the transaction, allowing each backend to clear its + * MySerializableXact variable and benefit from the optimization in its own + * time. */ void -ReleasePredicateLocks(bool isCommit) +ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) { bool needToClear; RWConflict conflict, @@ -3238,6 +3299,44 @@ ReleasePredicateLocks(bool isCommit) */ bool topLevelIsDeclaredReadOnly; + /* We can't be both committing and releasing early due to RO_SAFE. */ + Assert(!(isCommit && isReadOnlySafe)); + + /* Are we at the end of a transaction, that is, a commit or abort? */ + if (!isReadOnlySafe) + { + /* + * Parallel workers mustn't release predicate locks at the end of + * their transaction. The leader will do that at the end of its + * transaction. + */ + if (IsParallelWorker()) + { + ReleasePredicateLocksLocal(); + return; + } + + /* + * By the time the leader in a parallel query reaches end of + * transaction, it has waited for all workers to exit. + */ + Assert(!ParallelContextActive()); + + /* + * If the leader in a parallel query earlier stashed a partially + * released SERIALIZABLEXACT for final clean-up at end of transaction + * (because workers might still have been accessing it), then it's + * time to restore it. + */ + if (SavedSerializableXact != InvalidSerializableXact) + { + Assert(MySerializableXact == InvalidSerializableXact); + MySerializableXact = SavedSerializableXact; + SavedSerializableXact = InvalidSerializableXact; + Assert(SxactIsPartiallyReleased(MySerializableXact)); + } + } + if (MySerializableXact == InvalidSerializableXact) { Assert(LocalPredicateLockHash == NULL); @@ -3246,10 +3345,51 @@ ReleasePredicateLocks(bool isCommit) LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); + /* + * If the transaction is committing, but it has been partially released + * already, then treat this as a roll back. It was marked as rolled back. + */ + if (isCommit && SxactIsPartiallyReleased(MySerializableXact)) + isCommit = false; + + /* + * If we're called in the middle of a transaction because we discovered + * that the SXACT_FLAG_RO_SAFE flag was set, then we'll partially release + * it (that is, release the predicate locks and conflicts, but not the + * SERIALIZABLEXACT itself) if we're the first backend to have noticed. + */ + if (isReadOnlySafe && IsInParallelMode()) + { + /* + * The leader needs to stash a pointer to it, so that it can + * completely release it at end-of-transaction. + */ + if (!IsParallelWorker()) + SavedSerializableXact = MySerializableXact; + + /* + * The first backend to reach this condition will partially release + * the SERIALIZABLEXACT. All others will just clear their + * backend-local state so that they stop doing SSI checks for the rest + * of the transaction. + */ + if (SxactIsPartiallyReleased(MySerializableXact)) + { + LWLockRelease(SerializableXactHashLock); + ReleasePredicateLocksLocal(); + return; + } + else + { + MySerializableXact->flags |= SXACT_FLAG_PARTIALLY_RELEASED; + /* ... and proceed to perform the partial release below. */ + } + } Assert(!isCommit || SxactIsPrepared(MySerializableXact)); Assert(!isCommit || !SxactIsDoomed(MySerializableXact)); Assert(!SxactIsCommitted(MySerializableXact)); - Assert(!SxactIsRolledBack(MySerializableXact)); + Assert(SxactIsPartiallyReleased(MySerializableXact) + || !SxactIsRolledBack(MySerializableXact)); /* may not be serializable during COMMIT/ROLLBACK PREPARED */ Assert(MySerializableXact->pid == 0 || IsolationIsSerializable()); @@ -3273,8 +3413,8 @@ ReleasePredicateLocks(bool isCommit) MySerializableXact->finishedBefore = ShmemVariableCache->nextXid; /* - * If it's not a commit it's a rollback, and we can clear our locks - * immediately. + * If it's not a commit it's either a rollback or a read-only transaction + * flagged SXACT_FLAG_RO_SAFE, and we can clear our locks immediately. */ if (isCommit) { @@ -3298,7 +3438,8 @@ ReleasePredicateLocks(bool isCommit) * cleanup. This means it should not be considered when calculating * SxactGlobalXmin. */ - MySerializableXact->flags |= SXACT_FLAG_DOOMED; + if (!isReadOnlySafe) + MySerializableXact->flags |= SXACT_FLAG_DOOMED; MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK; /* @@ -3494,7 +3635,8 @@ ReleasePredicateLocks(bool isCommit) * was launched. */ needToClear = false; - if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin)) + if (!isReadOnlySafe && + TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin)) { Assert(PredXact->SxactGlobalXminCount > 0); if (--(PredXact->SxactGlobalXminCount) == 0) @@ -3513,14 +3655,28 @@ ReleasePredicateLocks(bool isCommit) SHMQueueInsertBefore(FinishedSerializableTransactions, &MySerializableXact->finishedLink); + /* + * If we're releasing a RO_SAFE transaction in parallel mode, we'll only + * partially release it. That's necessary because other backends may have + * a reference to it. The leader will release the SERIALIZABLEXACT itself + * at the end of the transaction after workers have stopped running. + */ if (!isCommit) - ReleaseOneSerializableXact(MySerializableXact, false, false); + ReleaseOneSerializableXact(MySerializableXact, + isReadOnlySafe && IsInParallelMode(), + false); LWLockRelease(SerializableFinishedListLock); if (needToClear) ClearOldPredicateLocks(); + ReleasePredicateLocksLocal(); +} + +static void +ReleasePredicateLocksLocal(void) +{ MySerializableXact = InvalidSerializableXact; MyXactDidWrite = false; @@ -3712,6 +3868,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, * them to OldCommittedSxact if summarize is true) */ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -3791,6 +3949,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, */ SHMQueueInit(&sxact->predicateLocks); + if (IsInParallelMode()) + LWLockRelease(&sxact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); sxidtag.xid = sxact->topXid; @@ -4213,6 +4373,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) PREDICATELOCK *rmpredlock; LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&MySerializableXact->predicateLockListLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); @@ -4247,6 +4409,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) LWLockRelease(SerializableXactHashLock); LWLockRelease(partitionLock); + if (IsInParallelMode()) + LWLockRelease(&MySerializableXact->predicateLockListLock); LWLockRelease(SerializablePredicateLockListLock); if (rmpredlock != NULL) @@ -4677,6 +4841,7 @@ PreCommit_CheckForSerializationFailure(void) /* Check if someone else has already decided that we need to die */ if (SxactIsDoomed(MySerializableXact)) { + Assert(!SxactIsPartiallyReleased(MySerializableXact)); LWLockRelease(SerializableXactHashLock); ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), @@ -4795,6 +4960,13 @@ AtPrepare_PredicateLocks(void) */ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + /* + * No need to take sxact->predicateLockListLock in parallel mode because + * there cannot be any parallel workers running while we are preparing a + * transaction. + */ + Assert(!IsParallelWorker() && !ParallelContextActive()); + predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -4867,7 +5039,7 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) MySerializableXact = sxid->myXact; MyXactDidWrite = true; /* conservatively assume that we wrote * something */ - ReleasePredicateLocks(isCommit); + ReleasePredicateLocks(isCommit, false); } /* @@ -5003,3 +5175,28 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, CreatePredicateLock(&lockRecord->target, targettaghash, sxact); } } + +/* + * Prepare to share the current SERIALIZABLEXACT with parallel workers. + * Return a handle object that can be used by AttachSerializableXact() in a + * parallel worker. + */ +SerializableXactHandle +ShareSerializableXact(void) +{ + return MySerializableXact; +} + +/* + * Allow parallel workers to import the leader's SERIALIZABLEXACT. + */ +void +AttachSerializableXact(SerializableXactHandle handle) +{ + + Assert(MySerializableXact == InvalidSerializableXact); + + MySerializableXact = (SERIALIZABLEXACT *) handle; + if (MySerializableXact != InvalidSerializableXact) + CreateLocalPredicateLockHash(); +} diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index f7597b0991b..64aafef3114 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -566,7 +566,7 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, if (owner == TopTransactionResourceOwner) { ProcReleaseLocks(isCommit); - ReleasePredicateLocks(isCommit); + ReleasePredicateLocks(isCommit, false); } } else diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index fc220df533d..e650bb2eef1 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -60,8 +60,7 @@ extern PGDLLIMPORT bool InitializingParallelWorker; #define IsParallelWorker() (ParallelWorkerNumber >= 0) extern ParallelContext *CreateParallelContext(const char *library_name, - const char *function_name, int nworkers, - bool serializable_okay); + const char *function_name, int nworkers); extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 96c77320066..08e0dc8144b 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -219,6 +219,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SHARED_TUPLESTORE, LWTRANCHE_TBM, LWTRANCHE_PARALLEL_APPEND, + LWTRANCHE_SXACT, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index 3d87a631db9..23980c6ede7 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -30,6 +30,11 @@ extern int max_predicate_locks_per_page; /* Number of SLRU buffers to use for predicate locking */ #define NUM_OLDSERXID_BUFFERS 16 +/* + * A handle used for sharing SERIALIZABLEXACT objects between the participants + * in a parallel query. + */ +typedef void *SerializableXactHandle; /* * function prototypes @@ -56,7 +61,7 @@ extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snap extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno); extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno); extern void TransferPredicateLocksToHeapRelation(Relation relation); -extern void ReleasePredicateLocks(bool isCommit); +extern void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe); /* conflict detection (may also trigger rollback) */ extern void CheckForSerializableConflictOut(bool valid, Relation relation, HeapTuple tuple, @@ -74,4 +79,8 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); extern void predicatelock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len); +/* parallel query support */ +extern SerializableXactHandle ShareSerializableXact(void); +extern void AttachSerializableXact(SerializableXactHandle handle); + #endif /* PREDICATE_H */ diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 7814d7e7219..451ec0eecdf 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -15,6 +15,7 @@ #define PREDICATE_INTERNALS_H #include "storage/lock.h" +#include "storage/lwlock.h" /* * Commit number. @@ -91,6 +92,9 @@ typedef struct SERIALIZABLEXACT SHM_QUEUE finishedLink; /* list link in * FinishedSerializableTransactions */ + LWLock predicateLockListLock; /* protects predicateLocks in parallel + * mode */ + /* * for r/o transactions: list of concurrent r/w transactions that we could * potentially have conflicts with, and vice versa for r/w transactions @@ -123,6 +127,12 @@ typedef struct SERIALIZABLEXACT #define SXACT_FLAG_RO_UNSAFE 0x00000100 #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400 +/* + * The following flag means the transaction has been partially released + * already, but is being preserved because parallel workers might have a + * reference to it. It'll be recycled by the leader at end-of-transaction. + */ +#define SXACT_FLAG_PARTIALLY_RELEASED 0x00000800 /* * The following types are used to provide an ad hoc list for holding diff --git a/src/test/isolation/expected/serializable-parallel-2.out b/src/test/isolation/expected/serializable-parallel-2.out new file mode 100644 index 00000000000..9a693c4dc62 --- /dev/null +++ b/src/test/isolation/expected/serializable-parallel-2.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s1r s2r1 s1c s2r2 s2c +step s1r: SELECT * FROM foo; +a + +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +step s2r1: SELECT * FROM foo; +a + +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +step s1c: COMMIT; +step s2r2: SELECT * FROM foo; +a + +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +step s2c: COMMIT; diff --git a/src/test/isolation/expected/serializable-parallel.out b/src/test/isolation/expected/serializable-parallel.out new file mode 100644 index 00000000000..f43aa6a2990 --- /dev/null +++ b/src/test/isolation/expected/serializable-parallel.out @@ -0,0 +1,44 @@ +Parsed test spec with 3 sessions + +starting permutation: s2rx s2ry s1ry s1wy s1c s2wx s2c s3c +step s2rx: SELECT balance FROM bank_account WHERE id = 'X'; +balance + +0 +step s2ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y'; +step s1c: COMMIT; +step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X'; +step s2c: COMMIT; +step s3c: COMMIT; + +starting permutation: s2rx s2ry s1ry s1wy s1c s3r s3c s2wx +step s2rx: SELECT balance FROM bank_account WHERE id = 'X'; +balance + +0 +step s2ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y'; +step s1c: COMMIT; +step s3r: SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; +id balance + +X 0 +Y 20 +step s3c: COMMIT; +step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X'; +ERROR: could not serialize access due to read/write dependencies among transactions diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule index 91d9d90135b..70d47b3e687 100644 --- a/src/test/isolation/isolation_schedule +++ b/src/test/isolation/isolation_schedule @@ -78,3 +78,5 @@ test: partition-key-update-3 test: partition-key-update-4 test: plpgsql-toast test: truncate-conflict +test: serializable-parallel +test: serializable-parallel-2 diff --git a/src/test/isolation/specs/serializable-parallel-2.spec b/src/test/isolation/specs/serializable-parallel-2.spec new file mode 100644 index 00000000000..7f90f75d882 --- /dev/null +++ b/src/test/isolation/specs/serializable-parallel-2.spec @@ -0,0 +1,30 @@ +# Exercise the case where a read-only serializable transaction has +# SXACT_FLAG_RO_SAFE set in a parallel query. + +setup +{ + CREATE TABLE foo AS SELECT generate_series(1, 10)::int a; + ALTER TABLE foo SET (parallel_workers = 2); +} + +teardown +{ + DROP TABLE foo; +} + +session "s1" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s1r" { SELECT * FROM foo; } +step "s1c" { COMMIT; } + +session "s2" +setup { + BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY; + SET parallel_setup_cost = 0; + SET parallel_tuple_cost = 0; + } +step "s2r1" { SELECT * FROM foo; } +step "s2r2" { SELECT * FROM foo; } +step "s2c" { COMMIT; } + +permutation "s1r" "s2r1" "s1c" "s2r2" "s2c" diff --git a/src/test/isolation/specs/serializable-parallel.spec b/src/test/isolation/specs/serializable-parallel.spec new file mode 100644 index 00000000000..a4f488adfc8 --- /dev/null +++ b/src/test/isolation/specs/serializable-parallel.spec @@ -0,0 +1,47 @@ +# The example from the paper "A read-only transaction anomaly under snapshot +# isolation"[1]. +# +# Here we test that serializable snapshot isolation (SERIALIZABLE) doesn't +# suffer from the anomaly, because s2 is aborted upon detection of a cycle. +# In this case the read only query s3 happens to be running in a parallel +# worker. +# +# [1] http://www.cs.umb.edu/~poneil/ROAnom.pdf + +setup +{ + CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL); + INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0); +} + +teardown +{ + DROP TABLE bank_account; +} + +session "s1" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s1ry" { SELECT balance FROM bank_account WHERE id = 'Y'; } +step "s1wy" { UPDATE bank_account SET balance = 20 WHERE id = 'Y'; } +step "s1c" { COMMIT; } + +session "s2" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s2rx" { SELECT balance FROM bank_account WHERE id = 'X'; } +step "s2ry" { SELECT balance FROM bank_account WHERE id = 'Y'; } +step "s2wx" { UPDATE bank_account SET balance = -11 WHERE id = 'X'; } +step "s2c" { COMMIT; } + +session "s3" +setup { + BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; + SET force_parallel_mode = on; + } +step "s3r" { SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; } +step "s3c" { COMMIT; } + +# without s3, s1 and s2 commit +permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s2wx" "s2c" "s3c" + +# once s3 observes the data committed by s1, a cycle is created and s2 aborts +permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s3r" "s3c" "s2wx" |