aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeHashjoin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r--src/backend/executor/nodeHashjoin.c617
1 files changed, 597 insertions, 20 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ab1632cc13d..5d1dc1f401e 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -10,18 +10,112 @@
* IDENTIFICATION
* src/backend/executor/nodeHashjoin.c
*
+ * PARALLELISM
+ *
+ * Hash joins can participate in parallel query execution in several ways. A
+ * parallel-oblivious hash join is one where the node is unaware that it is
+ * part of a parallel plan. In this case, a copy of the inner plan is used to
+ * build a copy of the hash table in every backend, and the outer plan could
+ * either be built from a partial or complete path, so that the results of the
+ * hash join are correspondingly either partial or complete. A parallel-aware
+ * hash join is one that behaves differently, coordinating work between
+ * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
+ * Hash Join always appears with a Parallel Hash node.
+ *
+ * Parallel-aware hash joins use the same per-backend state machine to track
+ * progress through the hash join algorithm as parallel-oblivious hash joins.
+ * In a parallel-aware hash join, there is also a shared state machine that
+ * co-operating backends use to synchronize their local state machines and
+ * program counters. The shared state machine is managed with a Barrier IPC
+ * primitive. When all attached participants arrive at a barrier, the phase
+ * advances and all waiting participants are released.
+ *
+ * When a participant begins working on a parallel hash join, it must first
+ * figure out how much progress has already been made, because participants
+ * don't wait for each other to begin. For this reason there are switch
+ * statements at key points in the code where we have to synchronize our local
+ * state machine with the phase, and then jump to the correct part of the
+ * algorithm so that we can get started.
+ *
+ * One barrier called build_barrier is used to coordinate the hashing phases.
+ * The phase is represented by an integer which begins at zero and increments
+ * one by one, but in the code it is referred to by symbolic names as follows:
+ *
+ * PHJ_BUILD_ELECTING -- initial state
+ * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
+ * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
+ * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
+ * PHJ_BUILD_DONE -- building done, probing can begin
+ *
+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets. Their phases are as follows:
+ *
+ * PHJ_GROW_BATCHES_ELECTING -- initial state
+ * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
+ * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
+ * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
+ *
+ * PHJ_GROW_BUCKETS_ELECTING -- initial state
+ * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
+ * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
+ *
+ * If the planner got the number of batches and buckets right, those won't be
+ * necessary, but on the other hand we might finish up needing to expand the
+ * buckets or batches multiple times while hashing the inner relation to stay
+ * within our memory budget and load factor target. For that reason it's a
+ * separate pair of barriers using circular phases.
+ *
+ * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
+ * because we need to divide the outer relation into batches up front in order
+ * to be able to process batches entirely independently. In contrast, the
+ * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
+ * batches whenever it encounters them while scanning and probing, which it
+ * can do because it processes batches in serial order.
+ *
+ * Once PHJ_BUILD_DONE is reached, backends then split up and process
+ * different batches, or gang up and work together on probing batches if there
+ * aren't enough to go around. For each batch there is a separate barrier
+ * with the following phases:
+ *
+ * PHJ_BATCH_ELECTING -- initial state
+ * PHJ_BATCH_ALLOCATING -- one allocates buckets
+ * PHJ_BATCH_LOADING -- all load the hash table from disk
+ * PHJ_BATCH_PROBING -- all probe
+ * PHJ_BATCH_DONE -- end
+ *
+ * Batch 0 is a special case, because it starts out in phase
+ * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
+ * PHJ_BUILD_HASHING_INNER so we can skip loading.
+ *
+ * Initially we try to plan for a single-batch hash join using the combined
+ * work_mem of all participants to create a large shared hash table. If that
+ * turns out either at planning or execution time to be impossible then we
+ * fall back to regular work_mem sized hash tables.
+ *
+ * To avoid deadlocks, we never wait for any barrier unless it is known that
+ * all other backends attached to it are actively executing the node or have
+ * already arrived. Practically, that means that we never return a tuple
+ * while attached to a barrier, unless the barrier has reached its final
+ * state. In the slightly special case of the per-batch barrier, we return
+ * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ *
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "utils/memutils.h"
+#include "utils/sharedtuplestore.h"
/*
@@ -42,24 +136,34 @@
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
+static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot);
static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
/* ----------------------------------------------------------------
- * ExecHashJoin
+ * ExecHashJoinImpl
*
- * This function implements the Hybrid Hashjoin algorithm.
+ * This function implements the Hybrid Hashjoin algorithm. It is marked
+ * with an always-inline attribute so that ExecHashJoin() and
+ * ExecParallelHashJoin() can inline it. Compilers that respect the
+ * attribute should create versions specialized for parallel == true and
+ * parallel == false with unnecessary branches removed.
*
* Note: the relation we build hash table on is the "inner"
* the other one is "outer".
* ----------------------------------------------------------------
*/
-static TupleTableSlot * /* return: a tuple or NULL */
-ExecHashJoin(PlanState *pstate)
+pg_attribute_always_inline
+static inline TupleTableSlot *
+ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
HashJoinState *node = castNode(HashJoinState, pstate);
PlanState *outerNode;
@@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate)
TupleTableSlot *outerTupleSlot;
uint32 hashvalue;
int batchno;
+ ParallelHashJoinState *parallel_state;
/*
* get information from HashJoin node
@@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate)
outerNode = outerPlanState(node);
hashtable = node->hj_HashTable;
econtext = node->js.ps.ps_ExprContext;
+ parallel_state = hashNode->parallel_state;
/*
* Reset per-tuple memory context to free any expression evaluation
@@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate)
/* no chance to not build the hash table */
node->hj_FirstOuterTupleSlot = NULL;
}
+ else if (parallel)
+ {
+ /*
+ * The empty-outer optimization is not implemented for
+ * shared hash tables, because no one participant can
+ * determine that there are no outer tuples, and it's not
+ * yet clear that it's worth the synchronization overhead
+ * of reaching consensus to figure that out. So we have
+ * to build the hash table.
+ */
+ node->hj_FirstOuterTupleSlot = NULL;
+ }
else if (HJ_FILL_OUTER(node) ||
(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
!node->hj_OuterNotEmpty))
@@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate)
node->hj_FirstOuterTupleSlot = NULL;
/*
- * create the hash table
+ * Create the hash table. If using Parallel Hash, then
+ * whoever gets here first will create the hash table and any
+ * later arrivals will merely attach to it.
*/
- hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
+ hashtable = ExecHashTableCreate(hashNode,
node->hj_HashOperators,
HJ_FILL_INNER(node));
node->hj_HashTable = hashtable;
/*
- * execute the Hash node, to build the hash table
+ * Execute the Hash node, to build the hash table. If using
+ * Parallel Hash, then we'll try to help hashing unless we
+ * arrived too late.
*/
hashNode->hashtable = hashtable;
(void) MultiExecProcNode((PlanState *) hashNode);
@@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate)
*/
node->hj_OuterNotEmpty = false;
- node->hj_JoinState = HJ_NEED_NEW_OUTER;
+ if (parallel)
+ {
+ Barrier *build_barrier;
+
+ build_barrier = &parallel_state->build_barrier;
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+ BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+ if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+ {
+ /*
+ * If multi-batch, we need to hash the outer relation
+ * up front.
+ */
+ if (hashtable->nbatch > 1)
+ ExecParallelHashJoinPartitionOuter(node);
+ BarrierArriveAndWait(build_barrier,
+ WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+ }
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+ /* Each backend should now select a batch to work on. */
+ hashtable->curbatch = -1;
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+ continue;
+ }
+ else
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
/* FALL THRU */
@@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate)
/*
* We don't have an outer tuple, try to get the next one
*/
- outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
- node,
- &hashvalue);
+ if (parallel)
+ outerTupleSlot =
+ ExecParallelHashJoinOuterGetTuple(outerNode, node,
+ &hashvalue);
+ else
+ outerTupleSlot =
+ ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
+
if (TupIsNull(outerTupleSlot))
{
/* end of batch, or maybe whole join */
@@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate)
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
+ Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
&hashtable->outerBatchFile[batchno]);
+
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
continue;
}
@@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate)
/*
* Scan the selected hash bucket for matches to current outer
*/
- if (!ExecScanHashBucket(node, econtext))
+ if (parallel)
{
- /* out of matches; check for possible outer-join fill */
- node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
- continue;
+ if (!ExecParallelScanHashBucket(node, econtext))
+ {
+ /* out of matches; check for possible outer-join fill */
+ node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+ continue;
+ }
+ }
+ else
+ {
+ if (!ExecScanHashBucket(node, econtext))
+ {
+ /* out of matches; check for possible outer-join fill */
+ node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+ continue;
+ }
}
/*
@@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate)
/*
* Try to advance to next batch. Done if there are no more.
*/
- if (!ExecHashJoinNewBatch(node))
- return NULL; /* end of join */
+ if (parallel)
+ {
+ if (!ExecParallelHashJoinNewBatch(node))
+ return NULL; /* end of parallel-aware join */
+ }
+ else
+ {
+ if (!ExecHashJoinNewBatch(node))
+ return NULL; /* end of parallel-oblivious join */
+ }
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break;
@@ -375,6 +551,38 @@ ExecHashJoin(PlanState *pstate)
}
/* ----------------------------------------------------------------
+ * ExecHashJoin
+ *
+ * Parallel-oblivious version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot * /* return: a tuple or NULL */
+ExecHashJoin(PlanState *pstate)
+{
+ /*
+ * On sufficiently smart compilers this should be inlined with the
+ * parallel-aware branches removed.
+ */
+ return ExecHashJoinImpl(pstate, false);
+}
+
+/* ----------------------------------------------------------------
+ * ExecParallelHashJoin
+ *
+ * Parallel-aware version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot * /* return: a tuple or NULL */
+ExecParallelHashJoin(PlanState *pstate)
+{
+ /*
+ * On sufficiently smart compilers this should be inlined with the
+ * parallel-oblivious branches removed.
+ */
+ return ExecHashJoinImpl(pstate, true);
+}
+
+/* ----------------------------------------------------------------
* ExecInitHashJoin
*
* Init routine for HashJoin node.
@@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate = makeNode(HashJoinState);
hjstate->js.ps.plan = (Plan *) node;
hjstate->js.ps.state = estate;
+
+ /*
+ * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
+ * where this function may be replaced with a parallel version, if we
+ * managed to launch a parallel query.
+ */
hjstate->js.ps.ExecProcNode = ExecHashJoin;
/*
@@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node)
/*
* ExecHashJoinOuterGetTuple
*
- * get the next outer tuple for hashjoin: either by
- * executing the outer plan node in the first pass, or from
- * the temp files for the hashjoin batches.
+ * get the next outer tuple for a parallel oblivious hashjoin: either by
+ * executing the outer plan node in the first pass, or from the temp
+ * files for the hashjoin batches.
*
* Returns a null slot if no more outer tuples (within the current batch).
*
@@ -662,6 +876,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
/*
+ * ExecHashJoinOuterGetTuple variant for the parallel case.
+ */
+static TupleTableSlot *
+ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ TupleTableSlot *slot;
+
+ /*
+ * In the Parallel Hash case we only run the outer plan directly for
+ * single-batch hash joins. Otherwise we have to go to batch files, even
+ * for batch 0.
+ */
+ if (curbatch == 0 && hashtable->nbatch == 1)
+ {
+ slot = ExecProcNode(outerNode);
+
+ while (!TupIsNull(slot))
+ {
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ HJ_FILL_OUTER(hjstate),
+ hashvalue))
+ return slot;
+
+ /*
+ * That tuple couldn't match because of a NULL, so discard it and
+ * continue with the next one.
+ */
+ slot = ExecProcNode(outerNode);
+ }
+ }
+ else if (curbatch < hashtable->nbatch)
+ {
+ MinimalTuple tuple;
+
+ tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
+ hashvalue);
+ if (tuple != NULL)
+ {
+ slot = ExecStoreMinimalTuple(tuple,
+ hjstate->hj_OuterTupleSlot,
+ false);
+ return slot;
+ }
+ else
+ ExecClearTuple(hjstate->hj_OuterTupleSlot);
+ }
+
+ /* End of this batch */
+ return NULL;
+}
+
+/*
* ExecHashJoinNewBatch
* switch to a new hashjoin batch
*
@@ -804,6 +1079,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
}
/*
+ * Choose a batch to work on, and attach to it. Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int start_batchno;
+ int batchno;
+
+ /*
+ * If we started up so late that the batch tracking array has been freed
+ * already by ExecHashTableDetach(), then we are finished. See also
+ * ExecParallelHashEnsureBatchAccessors().
+ */
+ if (hashtable->batches == NULL)
+ return false;
+
+ /*
+ * If we were already attached to a batch, remember not to bother checking
+ * it again, and detach from it (possibly freeing the hash table if we are
+ * last to detach).
+ */
+ if (hashtable->curbatch >= 0)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ }
+
+ /*
+ * Search for a batch that isn't done. We use an atomic counter to start
+ * our search at a different batch in every participant when there are
+ * more batches than participants.
+ */
+ batchno = start_batchno =
+ pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
+ hashtable->nbatch;
+ do
+ {
+ uint32 hashvalue;
+ MinimalTuple tuple;
+ TupleTableSlot *slot;
+
+ if (!hashtable->batches[batchno].done)
+ {
+ SharedTuplestoreAccessor *inner_tuples;
+ Barrier *batch_barrier =
+ &hashtable->batches[batchno].shared->batch_barrier;
+
+ switch (BarrierAttach(batch_barrier))
+ {
+ case PHJ_BATCH_ELECTING:
+
+ /* One backend allocates the hash table. */
+ if (BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_ELECTING))
+ ExecParallelHashTableAlloc(hashtable, batchno);
+ /* Fall through. */
+
+ case PHJ_BATCH_ALLOCATING:
+ /* Wait for allocation to complete. */
+ BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_BATCH_LOADING:
+ /* Start (or join in) loading tuples. */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ inner_tuples = hashtable->batches[batchno].inner_tuples;
+ sts_begin_parallel_scan(inner_tuples);
+ while ((tuple = sts_parallel_scan_next(inner_tuples,
+ &hashvalue)))
+ {
+ slot = ExecStoreMinimalTuple(tuple,
+ hjstate->hj_HashTupleSlot,
+ false);
+ ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
+ hashvalue);
+ }
+ sts_end_parallel_scan(inner_tuples);
+ BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_LOADING);
+ /* Fall through. */
+
+ case PHJ_BATCH_PROBING:
+
+ /*
+ * This batch is ready to probe. Return control to
+ * caller. We stay attached to batch_barrier so that the
+ * hash table stays alive until everyone's finished
+ * probing it, but no participant is allowed to wait at
+ * this barrier again (or else a deadlock could occur).
+ * All attached participants must eventually call
+ * BarrierArriveAndDetach() so that the final phase
+ * PHJ_BATCH_DONE can be reached.
+ */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+ return true;
+
+ case PHJ_BATCH_DONE:
+
+ /*
+ * Already done. Detach and go around again (if any
+ * remain).
+ */
+ BarrierDetach(batch_barrier);
+
+ /*
+ * We didn't work on this batch, but we need to observe
+ * its size for EXPLAIN.
+ */
+ ExecParallelHashUpdateSpacePeak(hashtable, batchno);
+ hashtable->batches[batchno].done = true;
+ hashtable->curbatch = -1;
+ break;
+
+ default:
+ elog(ERROR, "unexpected batch phase %d",
+ BarrierPhase(batch_barrier));
+ }
+ }
+ batchno = (batchno + 1) % hashtable->nbatch;
+ } while (batchno != start_batchno);
+
+ return false;
+}
+
+/*
* ExecHashJoinSaveTuple
* save a tuple to a batch file.
*
@@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node)
if (node->js.ps.lefttree->chgParam == NULL)
ExecReScan(node->js.ps.lefttree);
}
+
+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+ if (node->hj_HashTable)
+ {
+ /*
+ * Detach from shared state before DSM memory goes away. This makes
+ * sure that we don't have any pointers into DSM memory by the time
+ * ExecEndHashJoin runs.
+ */
+ ExecHashTableDetachBatch(node->hj_HashTable);
+ ExecHashTableDetach(node->hj_HashTable);
+ }
+}
+
+static void
+ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
+{
+ PlanState *outerState = outerPlanState(hjstate);
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ TupleTableSlot *slot;
+ uint32 hashvalue;
+ int i;
+
+ Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
+
+ /* Execute outer plan, writing all tuples to shared tuplestores. */
+ for (;;)
+ {
+ slot = ExecProcNode(outerState);
+ if (TupIsNull(slot))
+ break;
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ false, /* outer join, currently unsupported */
+ &hashvalue))
+ {
+ int batchno;
+ int bucketno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+ &batchno);
+ sts_puttuple(hashtable->batches[batchno].outer_tuples,
+ &hashvalue, ExecFetchSlotMinimalTuple(slot));
+ }
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Make sure all outer partitions are readable by any backend. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ sts_end_write(hashtable->batches[i].outer_tuples);
+}
+
+void
+ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
+{
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+void
+ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
+{
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ HashState *hashNode;
+ ParallelHashJoinState *pstate;
+
+ /*
+ * Disable shared hash table mode if we failed to create a real DSM
+ * segment, because that means that we don't have a DSA area to work with.
+ */
+ if (pcxt->seg == NULL)
+ return;
+
+ ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+
+ /*
+ * Set up the state needed to coordinate access to the shared hash
+ * table(s), using the plan node ID as the toc key.
+ */
+ pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
+ shm_toc_insert(pcxt->toc, plan_node_id, pstate);
+
+ /*
+ * Set up the shared hash join state with no batches initially.
+ * ExecHashTableCreate() will prepare at least one later and set nbatch
+ * and space_allowed.
+ */
+ pstate->nbatch = 0;
+ pstate->space_allowed = 0;
+ pstate->batches = InvalidDsaPointer;
+ pstate->old_batches = InvalidDsaPointer;
+ pstate->nbuckets = 0;
+ pstate->growth = PHJ_GROWTH_OK;
+ pstate->chunk_work_queue = InvalidDsaPointer;
+ pg_atomic_init_u32(&pstate->distributor, 0);
+ pstate->nparticipants = pcxt->nworkers + 1;
+ pstate->total_tuples = 0;
+ LWLockInitialize(&pstate->lock,
+ LWTRANCHE_PARALLEL_HASH_JOIN);
+ BarrierInit(&pstate->build_barrier, 0);
+ BarrierInit(&pstate->grow_batches_barrier, 0);
+ BarrierInit(&pstate->grow_buckets_barrier, 0);
+
+ /* Set up the space we'll use for shared temporary files. */
+ SharedFileSetInit(&pstate->fileset, pcxt->seg);
+
+ /* Initialize the shared state in the hash node. */
+ hashNode = (HashState *) innerPlanState(state);
+ hashNode->parallel_state = pstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashJoinReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ ParallelHashJoinState *pstate =
+ shm_toc_lookup(cxt->toc, plan_node_id, false);
+
+ /*
+ * It would be possible to reuse the shared hash table in single-batch
+ * cases by resetting and then fast-forwarding build_barrier to
+ * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
+ * currently shared hash tables are already freed by now (by the last
+ * participant to detach from the batch). We could consider keeping it
+ * around for single-batch joins. We'd also need to adjust
+ * finalize_plan() so that it doesn't record a dummy dependency for
+ * Parallel Hash nodes, preventing the rescan optimization. For now we
+ * don't try.
+ */
+
+ /* Detach, freeing any remaining shared memory. */
+ if (state->hj_HashTable != NULL)
+ {
+ ExecHashTableDetachBatch(state->hj_HashTable);
+ ExecHashTableDetach(state->hj_HashTable);
+ }
+
+ /* Clear any shared batch files. */
+ SharedFileSetDeleteAll(&pstate->fileset);
+
+ /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
+ BarrierInit(&pstate->build_barrier, 0);
+}
+
+void
+ExecHashJoinInitializeWorker(HashJoinState *state,
+ ParallelWorkerContext *pwcxt)
+{
+ HashState *hashNode;
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ ParallelHashJoinState *pstate =
+ shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+
+ /* Attach to the space for shared temporary files. */
+ SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
+
+ /* Attach to the shared state in the hash node. */
+ hashNode = (HashState *) innerPlanState(state);
+ hashNode->parallel_state = pstate;
+
+ ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+}