diff options
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r-- | src/backend/executor/nodeHash.c | 175 |
1 files changed, 170 insertions, 5 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 748c9b00243..a45bd3a3156 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2072,6 +2072,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) } /* + * Decide if this process is allowed to run the unmatched scan. If so, the + * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned. + * Otherwise the batch is detached and false is returned. + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE); + + /* + * It would not be deadlock-free to wait on the batch barrier, because it + * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have + * already emitted tuples. Therefore, we'll hold a wait-free election: + * only one process can continue to the next phase, and all others detach + * from this batch. They can still go any work on other batches, if there + * are any. + */ + if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier)) + { + /* This process considers the batch to be done. */ + hashtable->batches[hashtable->curbatch].done = true; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* + * Track largest batch we've seen, which would normally happen in + * ExecHashTableDetachBatch(). + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + return false; + } + + /* Now we are alone with this batch. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + Assert(BarrierParticipants(&batch->batch_barrier) == 1); + + /* + * Has another process decided to give up early and command all processes + * to skip the unmatched scan? + */ + if (batch->skip_unmatched) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + return false; + } + + /* Now prepare the process local state, just as for non-parallel join. */ + ExecPrepHashTableForUnmatched(hjstate); + + return true; +} + +/* * ExecScanHashTableForUnmatched * scan the hash table for unmatched inner tuples * @@ -2146,6 +2209,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) } /* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel join + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + + for (;;) + { + /* + * hj_CurTuple is the address of the tuple last returned from the + * current bucket, or NULL if it's time to start scanning a new + * bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo++); + else + break; /* finished all buckets */ + + while (hashTuple != NULL) + { + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + +/* * ExecHashTableReset * * reset hash table header for new batch @@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->outer_eof = false; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, @@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) { int curbatch = hashtable->curbatch; ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool attached = true; /* Make sure any temporary files are closed. */ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); - /* Detach from the batch we were last working on. */ - if (BarrierArriveAndDetach(&batch->batch_barrier)) + /* After attaching we always get at least to PHJ_BATCH_PROBE. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || + BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + + /* + * If we're abandoning the PHJ_BATCH_PROBE phase early without having + * reached the end of it, it means the plan doesn't want any more + * tuples, and it is happy to abandon any tuples buffered in this + * process's subplans. For correctness, we can't allow any process to + * execute the PHJ_BATCH_SCAN phase, because we will never have the + * complete set of match bits. Therefore we skip emitting unmatched + * tuples in all backends (if this is a full/right join), as if those + * tuples were all due to be emitted by this process and it has + * abandoned them too. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + !hashtable->batches[curbatch].outer_eof) + { + /* + * This flag may be written to by multiple backends during + * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN + * phase so requires no extra locking. + */ + batch->skip_unmatched = true; + } + + /* + * Even if we aren't doing a full/right outer join, we'll step through + * the PHJ_BATCH_SCAN phase just to maintain the invariant that + * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE) + attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); + if (attached && BarrierArriveAndDetach(&batch->batch_barrier)) { /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. + * We are not longer attached to the batch barrier, but we're the + * process that was chosen to free resources and it's safe to + * assert the current phase. The ParallelHashJoinBatch can't go + * away underneath us while we are attached to the build barrier, + * making this access safe. */ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); |