aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeHashjoin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r--src/backend/executor/nodeHashjoin.c81
1 files changed, 54 insertions, 27 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 2fc80808e30..52ed05c6f5a 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -86,6 +86,7 @@
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
* PHJ_BATCH_LOAD -- all load the hash table from disk
* PHJ_BATCH_PROBE -- all probe
+ * PHJ_BATCH_SCAN* -- one does full/right unmatched scan
* PHJ_BATCH_FREE* -- one frees memory
*
* Batch 0 is a special case, because it starts out in phase
@@ -103,9 +104,10 @@
* to a barrier, unless the barrier has reached a phase that means that no
* process will wait on it again. We emit tuples while attached to the build
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
- * respectively without waiting, using BarrierArriveAndDetach(). The last to
- * detach receives a different return value so that it knows that it's safe to
+ * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively. The last to detach
+ * receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
* will see that it's too late to participate or access the relevant shared
* memory objects.
@@ -393,8 +395,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (HJ_FILL_INNER(node))
{
/* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ /*
+ * Only one process is currently allow to handle
+ * each batch's unmatched tuples, in a parallel
+ * join.
+ */
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -487,25 +504,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node), but
+ * we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -563,7 +568,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -966,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
}
/* End of this batch */
+ hashtable->batches[curbatch].outer_eof = true;
+
return NULL;
}
@@ -1197,13 +1205,32 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_FREE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase so
+ * that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+
return true;
+ case PHJ_BATCH_SCAN:
+
+ /*
+ * In principle, we could help scan for unmatched tuples,
+ * since that phase is already underway (the thing we
+ * can't do under current deadlock-avoidance rules is wait
+ * for others to arrive at PHJ_BATCH_SCAN, because
+ * PHJ_BATCH_PROBE emits tuples, but in this case we just
+ * got here without waiting). That is not yet done. For
+ * now, we just detach and go around again. We have to
+ * use ExecHashTableDetachBatch() because there's a small
+ * chance we'll be the last to detach, and then we're
+ * responsible for freeing memory.
+ */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ hashtable->batches[batchno].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ break;
case PHJ_BATCH_FREE: