diff options
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r-- | src/backend/executor/nodeHashjoin.c | 81 |
1 files changed, 54 insertions, 27 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 2fc80808e30..52ed05c6f5a 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -86,6 +86,7 @@ * PHJ_BATCH_ALLOCATE* -- one allocates buckets * PHJ_BATCH_LOAD -- all load the hash table from disk * PHJ_BATCH_PROBE -- all probe + * PHJ_BATCH_SCAN* -- one does full/right unmatched scan * PHJ_BATCH_FREE* -- one frees memory * * Batch 0 is a special case, because it starts out in phase @@ -103,9 +104,10 @@ * to a barrier, unless the barrier has reached a phase that means that no * process will wait on it again. We emit tuples while attached to the build * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase - * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE - * respectively without waiting, using BarrierArriveAndDetach(). The last to - * detach receives a different return value so that it knows that it's safe to + * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN + * respectively without waiting, using BarrierArriveAndDetach() and + * BarrierArriveAndDetachExceptLast() respectively. The last to detach + * receives a different return value so that it knows that it's safe to * clean up. Any straggler process that attaches after that phase is reached * will see that it's too late to participate or access the relevant shared * memory objects. @@ -393,8 +395,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + /* + * Only one process is currently allow to handle + * each batch's unmatched tuples, in a parallel + * join. + */ + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -487,25 +504,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), but + * we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI) @@ -563,7 +568,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -966,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, } /* End of this batch */ + hashtable->batches[curbatch].outer_eof = true; + return NULL; } @@ -1197,13 +1205,32 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_FREE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase so + * that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + case PHJ_BATCH_SCAN: + + /* + * In principle, we could help scan for unmatched tuples, + * since that phase is already underway (the thing we + * can't do under current deadlock-avoidance rules is wait + * for others to arrive at PHJ_BATCH_SCAN, because + * PHJ_BATCH_PROBE emits tuples, but in this case we just + * got here without waiting). That is not yet done. For + * now, we just detach and go around again. We have to + * use ExecHashTableDetachBatch() because there's a small + * chance we'll be the last to detach, and then we're + * responsible for freeing memory. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + hashtable->batches[batchno].done = true; + ExecHashTableDetachBatch(hashtable); + break; case PHJ_BATCH_FREE: |