aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeHashjoin.c
diff options
context:
space:
mode:
authorThomas Munro <tmunro@postgresql.org>2023-03-31 11:01:51 +1300
committerThomas Munro <tmunro@postgresql.org>2023-03-31 11:34:03 +1300
commit11c2d6fdf5af1aacec9ca2005543f1b0fc4cc364 (patch)
tree24f7dcba5bd58fbf207e8e58b3f97291b2a873b4 /src/backend/executor/nodeHashjoin.c
parentca7b3c4c00042038ba9c282c4807e05c0a527e42 (diff)
downloadpostgresql-11c2d6fdf5af1aacec9ca2005543f1b0fc4cc364.tar.gz
postgresql-11c2d6fdf5af1aacec9ca2005543f1b0fc4cc364.zip
Parallel Hash Full Join.
Full and right outer joins were not supported in the initial implementation of Parallel Hash Join because of deadlock hazards (see discussion). Therefore FULL JOIN inhibited parallelism, as the other join strategies can't do that in parallel either. Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on the inner side of one batch's hash table. For now, sidestep the deadlock problem by terminating parallelism there. The last process to arrive at that phase emits the unmatched tuples, while others detach and are free to go and work on other batches, if there are any, but otherwise they finish the join early. That unfairness is considered acceptable for now, because it's better than no parallelism at all. The build and probe phases are run in parallel, and the new scan-for-unmatched phase, while serial, is usually applied to the smaller of the two relations and is either limited by some multiple of work_mem, or it's too big and is partitioned into batches and then the situation is improved by batch-level parallelism. Author: Melanie Plageman <melanieplageman@gmail.com> Author: Thomas Munro <thomas.munro@gmail.com> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r--src/backend/executor/nodeHashjoin.c81
1 files changed, 54 insertions, 27 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 2fc80808e30..52ed05c6f5a 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -86,6 +86,7 @@
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
* PHJ_BATCH_LOAD -- all load the hash table from disk
* PHJ_BATCH_PROBE -- all probe
+ * PHJ_BATCH_SCAN* -- one does full/right unmatched scan
* PHJ_BATCH_FREE* -- one frees memory
*
* Batch 0 is a special case, because it starts out in phase
@@ -103,9 +104,10 @@
* to a barrier, unless the barrier has reached a phase that means that no
* process will wait on it again. We emit tuples while attached to the build
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
- * respectively without waiting, using BarrierArriveAndDetach(). The last to
- * detach receives a different return value so that it knows that it's safe to
+ * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively. The last to detach
+ * receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
* will see that it's too late to participate or access the relevant shared
* memory objects.
@@ -393,8 +395,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (HJ_FILL_INNER(node))
{
/* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ /*
+ * Only one process is currently allow to handle
+ * each batch's unmatched tuples, in a parallel
+ * join.
+ */
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -487,25 +504,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node), but
+ * we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -563,7 +568,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -966,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
}
/* End of this batch */
+ hashtable->batches[curbatch].outer_eof = true;
+
return NULL;
}
@@ -1197,13 +1205,32 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_FREE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase so
+ * that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+
return true;
+ case PHJ_BATCH_SCAN:
+
+ /*
+ * In principle, we could help scan for unmatched tuples,
+ * since that phase is already underway (the thing we
+ * can't do under current deadlock-avoidance rules is wait
+ * for others to arrive at PHJ_BATCH_SCAN, because
+ * PHJ_BATCH_PROBE emits tuples, but in this case we just
+ * got here without waiting). That is not yet done. For
+ * now, we just detach and go around again. We have to
+ * use ExecHashTableDetachBatch() because there's a small
+ * chance we'll be the last to detach, and then we're
+ * responsible for freeing memory.
+ */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ hashtable->batches[batchno].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ break;
case PHJ_BATCH_FREE: