aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeHash.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r--src/backend/executor/nodeHash.c175
1 files changed, 170 insertions, 5 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 748c9b00243..a45bd3a3156 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2072,6 +2072,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
}
/*
+ * Decide if this process is allowed to run the unmatched scan. If so, the
+ * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
+ * Otherwise the batch is detached and false is returned.
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
+
+ /*
+ * It would not be deadlock-free to wait on the batch barrier, because it
+ * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
+ * already emitted tuples. Therefore, we'll hold a wait-free election:
+ * only one process can continue to the next phase, and all others detach
+ * from this batch. They can still go any work on other batches, if there
+ * are any.
+ */
+ if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
+ {
+ /* This process considers the batch to be done. */
+ hashtable->batches[hashtable->curbatch].done = true;
+
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /*
+ * Track largest batch we've seen, which would normally happen in
+ * ExecHashTableDetachBatch().
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak,
+ batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ return false;
+ }
+
+ /* Now we are alone with this batch. */
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+ Assert(BarrierParticipants(&batch->batch_barrier) == 1);
+
+ /*
+ * Has another process decided to give up early and command all processes
+ * to skip the unmatched scan?
+ */
+ if (batch->skip_unmatched)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ return false;
+ }
+
+ /* Now prepare the process local state, just as for non-parallel join. */
+ ExecPrepHashTableForUnmatched(hjstate);
+
+ return true;
+}
+
+/*
* ExecScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples
*
@@ -2146,6 +2209,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
}
/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel join
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+
+ for (;;)
+ {
+ /*
+ * hj_CurTuple is the address of the tuple last returned from the
+ * current bucket, or NULL if it's time to start scanning a new
+ * bucket.
+ */
+ if (hashTuple != NULL)
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ hashTuple = ExecParallelHashFirstTuple(hashtable,
+ hjstate->hj_CurBucketNo++);
+ else
+ break; /* finished all buckets */
+
+ while (hashTuple != NULL)
+ {
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot */
+ inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it
+ * parallel to ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ }
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+/*
* ExecHashTableReset
*
* reset hash table header for new batch
@@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
accessor->shared = shared;
accessor->preallocated = 0;
accessor->done = false;
+ accessor->outer_eof = false;
accessor->inner_tuples =
sts_attach(ParallelHashJoinBatchInner(shared),
ParallelWorkerNumber + 1,
@@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
{
int curbatch = hashtable->curbatch;
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool attached = true;
/* Make sure any temporary files are closed. */
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
- /* Detach from the batch we were last working on. */
- if (BarrierArriveAndDetach(&batch->batch_barrier))
+ /* After attaching we always get at least to PHJ_BATCH_PROBE. */
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
+ BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+
+ /*
+ * If we're abandoning the PHJ_BATCH_PROBE phase early without having
+ * reached the end of it, it means the plan doesn't want any more
+ * tuples, and it is happy to abandon any tuples buffered in this
+ * process's subplans. For correctness, we can't allow any process to
+ * execute the PHJ_BATCH_SCAN phase, because we will never have the
+ * complete set of match bits. Therefore we skip emitting unmatched
+ * tuples in all backends (if this is a full/right join), as if those
+ * tuples were all due to be emitted by this process and it has
+ * abandoned them too.
+ */
+ if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
+ !hashtable->batches[curbatch].outer_eof)
+ {
+ /*
+ * This flag may be written to by multiple backends during
+ * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
+ * phase so requires no extra locking.
+ */
+ batch->skip_unmatched = true;
+ }
+
+ /*
+ * Even if we aren't doing a full/right outer join, we'll step through
+ * the PHJ_BATCH_SCAN phase just to maintain the invariant that
+ * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
+ */
+ if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
+ attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+ if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
{
/*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
+ * We are not longer attached to the batch barrier, but we're the
+ * process that was chosen to free resources and it's safe to
+ * assert the current phase. The ParallelHashJoinBatch can't go
+ * away underneath us while we are attached to the build barrier,
+ * making this access safe.
*/
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);