diff options
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r-- | src/backend/executor/nodeHashjoin.c | 617 |
1 files changed, 597 insertions, 20 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ab1632cc13d..5d1dc1f401e 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -10,18 +10,112 @@ * IDENTIFICATION * src/backend/executor/nodeHashjoin.c * + * PARALLELISM + * + * Hash joins can participate in parallel query execution in several ways. A + * parallel-oblivious hash join is one where the node is unaware that it is + * part of a parallel plan. In this case, a copy of the inner plan is used to + * build a copy of the hash table in every backend, and the outer plan could + * either be built from a partial or complete path, so that the results of the + * hash join are correspondingly either partial or complete. A parallel-aware + * hash join is one that behaves differently, coordinating work between + * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel + * Hash Join always appears with a Parallel Hash node. + * + * Parallel-aware hash joins use the same per-backend state machine to track + * progress through the hash join algorithm as parallel-oblivious hash joins. + * In a parallel-aware hash join, there is also a shared state machine that + * co-operating backends use to synchronize their local state machines and + * program counters. The shared state machine is managed with a Barrier IPC + * primitive. When all attached participants arrive at a barrier, the phase + * advances and all waiting participants are released. + * + * When a participant begins working on a parallel hash join, it must first + * figure out how much progress has already been made, because participants + * don't wait for each other to begin. For this reason there are switch + * statements at key points in the code where we have to synchronize our local + * state machine with the phase, and then jump to the correct part of the + * algorithm so that we can get started. + * + * One barrier called build_barrier is used to coordinate the hashing phases. + * The phase is represented by an integer which begins at zero and increments + * one by one, but in the code it is referred to by symbolic names as follows: + * + * PHJ_BUILD_ELECTING -- initial state + * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 + * PHJ_BUILD_HASHING_INNER -- all hash the inner rel + * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer + * PHJ_BUILD_DONE -- building done, probing can begin + * + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may + * be used repeatedly as required to coordinate expansions in the number of + * batches or buckets. Their phases are as follows: + * + * PHJ_GROW_BATCHES_ELECTING -- initial state + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches + * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition + * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew + * + * PHJ_GROW_BUCKETS_ELECTING -- initial state + * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets + * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples + * + * If the planner got the number of batches and buckets right, those won't be + * necessary, but on the other hand we might finish up needing to expand the + * buckets or batches multiple times while hashing the inner relation to stay + * within our memory budget and load factor target. For that reason it's a + * separate pair of barriers using circular phases. + * + * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins, + * because we need to divide the outer relation into batches up front in order + * to be able to process batches entirely independently. In contrast, the + * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' + * batches whenever it encounters them while scanning and probing, which it + * can do because it processes batches in serial order. + * + * Once PHJ_BUILD_DONE is reached, backends then split up and process + * different batches, or gang up and work together on probing batches if there + * aren't enough to go around. For each batch there is a separate barrier + * with the following phases: + * + * PHJ_BATCH_ELECTING -- initial state + * PHJ_BATCH_ALLOCATING -- one allocates buckets + * PHJ_BATCH_LOADING -- all load the hash table from disk + * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_DONE -- end + * + * Batch 0 is a special case, because it starts out in phase + * PHJ_BATCH_PROBING; populating batch 0's hash table is done during + * PHJ_BUILD_HASHING_INNER so we can skip loading. + * + * Initially we try to plan for a single-batch hash join using the combined + * work_mem of all participants to create a large shared hash table. If that + * turns out either at planning or execution time to be impossible then we + * fall back to regular work_mem sized hash tables. + * + * To avoid deadlocks, we never wait for any barrier unless it is known that + * all other backends attached to it are actively executing the node or have + * already arrived. Practically, that means that we never return a tuple + * while attached to a barrier, unless the barrier has reached its final + * state. In the slightly special case of the per-batch barrier, we return + * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/hashjoin.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" +#include "utils/sharedtuplestore.h" /* @@ -42,24 +136,34 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); +static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue); static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); +static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); /* ---------------------------------------------------------------- - * ExecHashJoin + * ExecHashJoinImpl * - * This function implements the Hybrid Hashjoin algorithm. + * This function implements the Hybrid Hashjoin algorithm. It is marked + * with an always-inline attribute so that ExecHashJoin() and + * ExecParallelHashJoin() can inline it. Compilers that respect the + * attribute should create versions specialized for parallel == true and + * parallel == false with unnecessary branches removed. * * Note: the relation we build hash table on is the "inner" * the other one is "outer". * ---------------------------------------------------------------- */ -static TupleTableSlot * /* return: a tuple or NULL */ -ExecHashJoin(PlanState *pstate) +pg_attribute_always_inline +static inline TupleTableSlot * +ExecHashJoinImpl(PlanState *pstate, bool parallel) { HashJoinState *node = castNode(HashJoinState, pstate); PlanState *outerNode; @@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate) TupleTableSlot *outerTupleSlot; uint32 hashvalue; int batchno; + ParallelHashJoinState *parallel_state; /* * get information from HashJoin node @@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate) outerNode = outerPlanState(node); hashtable = node->hj_HashTable; econtext = node->js.ps.ps_ExprContext; + parallel_state = hashNode->parallel_state; /* * Reset per-tuple memory context to free any expression evaluation @@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (parallel) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables, because no one participant can + * determine that there are no outer tuples, and it's not + * yet clear that it's worth the synchronization overhead + * of reaching consensus to figure that out. So we have + * to build the hash table. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate) node->hj_FirstOuterTupleSlot = NULL; /* - * create the hash table + * Create the hash table. If using Parallel Hash, then + * whoever gets here first will create the hash table and any + * later arrivals will merely attach to it. */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; /* - * execute the Hash node, to build the hash table + * Execute the Hash node, to build the hash table. If using + * Parallel Hash, then we'll try to help hashing unless we + * arrived too late. */ hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); @@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (parallel) + { + Barrier *build_barrier; + + build_barrier = ¶llel_state->build_barrier; + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) + { + /* + * If multi-batch, we need to hash the outer relation + * up front. + */ + if (hashtable->nbatch > 1) + ExecParallelHashJoinPartitionOuter(node); + BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER); + } + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + + /* Each backend should now select a batch to work on. */ + hashtable->curbatch = -1; + node->hj_JoinState = HJ_NEED_NEW_BATCH; + + continue; + } + else + node->hj_JoinState = HJ_NEED_NEW_OUTER; /* FALL THRU */ @@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate) /* * We don't have an outer tuple, try to get the next one */ - outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, - node, - &hashvalue); + if (parallel) + outerTupleSlot = + ExecParallelHashJoinOuterGetTuple(outerNode, node, + &hashvalue); + else + outerTupleSlot = + ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); + if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ @@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate) * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ + Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, &hashtable->outerBatchFile[batchno]); + /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate) /* * Scan the selected hash bucket for matches to current outer */ - if (!ExecScanHashBucket(node, econtext)) + if (parallel) { - /* out of matches; check for possible outer-join fill */ - node->hj_JoinState = HJ_FILL_OUTER_TUPLE; - continue; + if (!ExecParallelScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } + } + else + { + if (!ExecScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } } /* @@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate) /* * Try to advance to next batch. Done if there are no more. */ - if (!ExecHashJoinNewBatch(node)) - return NULL; /* end of join */ + if (parallel) + { + if (!ExecParallelHashJoinNewBatch(node)) + return NULL; /* end of parallel-aware join */ + } + else + { + if (!ExecHashJoinNewBatch(node)) + return NULL; /* end of parallel-oblivious join */ + } node->hj_JoinState = HJ_NEED_NEW_OUTER; break; @@ -375,6 +551,38 @@ ExecHashJoin(PlanState *pstate) } /* ---------------------------------------------------------------- + * ExecHashJoin + * + * Parallel-oblivious version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-aware branches removed. + */ + return ExecHashJoinImpl(pstate, false); +} + +/* ---------------------------------------------------------------- + * ExecParallelHashJoin + * + * Parallel-aware version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecParallelHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-oblivious branches removed. + */ + return ExecHashJoinImpl(pstate, true); +} + +/* ---------------------------------------------------------------- * ExecInitHashJoin * * Init routine for HashJoin node. @@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate = makeNode(HashJoinState); hjstate->js.ps.plan = (Plan *) node; hjstate->js.ps.state = estate; + + /* + * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() + * where this function may be replaced with a parallel version, if we + * managed to launch a parallel query. + */ hjstate->js.ps.ExecProcNode = ExecHashJoin; /* @@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node) /* * ExecHashJoinOuterGetTuple * - * get the next outer tuple for hashjoin: either by - * executing the outer plan node in the first pass, or from - * the temp files for the hashjoin batches. + * get the next outer tuple for a parallel oblivious hashjoin: either by + * executing the outer plan node in the first pass, or from the temp + * files for the hashjoin batches. * * Returns a null slot if no more outer tuples (within the current batch). * @@ -662,6 +876,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } /* + * ExecHashJoinOuterGetTuple variant for the parallel case. + */ +static TupleTableSlot * +ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + + /* + * In the Parallel Hash case we only run the outer plan directly for + * single-batch hash joins. Otherwise we have to go to batch files, even + * for batch 0. + */ + if (curbatch == 0 && hashtable->nbatch == 1) + { + slot = ExecProcNode(outerNode); + + while (!TupIsNull(slot)) + { + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate), + hashvalue)) + return slot; + + /* + * That tuple couldn't match because of a NULL, so discard it and + * continue with the next one. + */ + slot = ExecProcNode(outerNode); + } + } + else if (curbatch < hashtable->nbatch) + { + MinimalTuple tuple; + + tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, + hashvalue); + if (tuple != NULL) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_OuterTupleSlot, + false); + return slot; + } + else + ExecClearTuple(hjstate->hj_OuterTupleSlot); + } + + /* End of this batch */ + return NULL; +} + +/* * ExecHashJoinNewBatch * switch to a new hashjoin batch * @@ -804,6 +1079,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) } /* + * Choose a batch to work on, and attach to it. Returns true if successful, + * false if there are no more batches. + */ +static bool +ExecParallelHashJoinNewBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int start_batchno; + int batchno; + + /* + * If we started up so late that the batch tracking array has been freed + * already by ExecHashTableDetach(), then we are finished. See also + * ExecParallelHashEnsureBatchAccessors(). + */ + if (hashtable->batches == NULL) + return false; + + /* + * If we were already attached to a batch, remember not to bother checking + * it again, and detach from it (possibly freeing the hash table if we are + * last to detach). + */ + if (hashtable->curbatch >= 0) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + /* + * Search for a batch that isn't done. We use an atomic counter to start + * our search at a different batch in every participant when there are + * more batches than participants. + */ + batchno = start_batchno = + pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % + hashtable->nbatch; + do + { + uint32 hashvalue; + MinimalTuple tuple; + TupleTableSlot *slot; + + if (!hashtable->batches[batchno].done) + { + SharedTuplestoreAccessor *inner_tuples; + Barrier *batch_barrier = + &hashtable->batches[batchno].shared->batch_barrier; + + switch (BarrierAttach(batch_barrier)) + { + case PHJ_BATCH_ELECTING: + + /* One backend allocates the hash table. */ + if (BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ELECTING)) + ExecParallelHashTableAlloc(hashtable, batchno); + /* Fall through. */ + + case PHJ_BATCH_ALLOCATING: + /* Wait for allocation to complete. */ + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ALLOCATING); + /* Fall through. */ + + case PHJ_BATCH_LOADING: + /* Start (or join in) loading tuples. */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + inner_tuples = hashtable->batches[batchno].inner_tuples; + sts_begin_parallel_scan(inner_tuples); + while ((tuple = sts_parallel_scan_next(inner_tuples, + &hashvalue))) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + ExecParallelHashTableInsertCurrentBatch(hashtable, slot, + hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_LOADING); + /* Fall through. */ + + case PHJ_BATCH_PROBING: + + /* + * This batch is ready to probe. Return control to + * caller. We stay attached to batch_barrier so that the + * hash table stays alive until everyone's finished + * probing it, but no participant is allowed to wait at + * this barrier again (or else a deadlock could occur). + * All attached participants must eventually call + * BarrierArriveAndDetach() so that the final phase + * PHJ_BATCH_DONE can be reached. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + + case PHJ_BATCH_DONE: + + /* + * Already done. Detach and go around again (if any + * remain). + */ + BarrierDetach(batch_barrier); + + /* + * We didn't work on this batch, but we need to observe + * its size for EXPLAIN. + */ + ExecParallelHashUpdateSpacePeak(hashtable, batchno); + hashtable->batches[batchno].done = true; + hashtable->curbatch = -1; + break; + + default: + elog(ERROR, "unexpected batch phase %d", + BarrierPhase(batch_barrier)); + } + } + batchno = (batchno + 1) % hashtable->nbatch; + } while (batchno != start_batchno); + + return false; +} + +/* * ExecHashJoinSaveTuple * save a tuple to a batch file. * @@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void +ExecShutdownHashJoin(HashJoinState *node) +{ + if (node->hj_HashTable) + { + /* + * Detach from shared state before DSM memory goes away. This makes + * sure that we don't have any pointers into DSM memory by the time + * ExecEndHashJoin runs. + */ + ExecHashTableDetachBatch(node->hj_HashTable); + ExecHashTableDetach(node->hj_HashTable); + } +} + +static void +ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) +{ + PlanState *outerState = outerPlanState(hjstate); + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + HashJoinTable hashtable = hjstate->hj_HashTable; + TupleTableSlot *slot; + uint32 hashvalue; + int i; + + Assert(hjstate->hj_FirstOuterTupleSlot == NULL); + + /* Execute outer plan, writing all tuples to shared tuplestores. */ + for (;;) + { + slot = ExecProcNode(outerState); + if (TupIsNull(slot)) + break; + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + false, /* outer join, currently unsupported */ + &hashvalue)) + { + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + sts_puttuple(hashtable->batches[batchno].outer_tuples, + &hashvalue, ExecFetchSlotMinimalTuple(slot)); + } + CHECK_FOR_INTERRUPTS(); + } + + /* Make sure all outer partitions are readable by any backend. */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].outer_tuples); +} + +void +ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + HashState *hashNode; + ParallelHashJoinState *pstate; + + /* + * Disable shared hash table mode if we failed to create a real DSM + * segment, because that means that we don't have a DSA area to work with. + */ + if (pcxt->seg == NULL) + return; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); + + /* + * Set up the state needed to coordinate access to the shared hash + * table(s), using the plan node ID as the toc key. + */ + pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); + shm_toc_insert(pcxt->toc, plan_node_id, pstate); + + /* + * Set up the shared hash join state with no batches initially. + * ExecHashTableCreate() will prepare at least one later and set nbatch + * and space_allowed. + */ + pstate->nbatch = 0; + pstate->space_allowed = 0; + pstate->batches = InvalidDsaPointer; + pstate->old_batches = InvalidDsaPointer; + pstate->nbuckets = 0; + pstate->growth = PHJ_GROWTH_OK; + pstate->chunk_work_queue = InvalidDsaPointer; + pg_atomic_init_u32(&pstate->distributor, 0); + pstate->nparticipants = pcxt->nworkers + 1; + pstate->total_tuples = 0; + LWLockInitialize(&pstate->lock, + LWTRANCHE_PARALLEL_HASH_JOIN); + BarrierInit(&pstate->build_barrier, 0); + BarrierInit(&pstate->grow_batches_barrier, 0); + BarrierInit(&pstate->grow_buckets_barrier, 0); + + /* Set up the space we'll use for shared temporary files. */ + SharedFileSetInit(&pstate->fileset, pcxt->seg); + + /* Initialize the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; +} + +/* ---------------------------------------------------------------- + * ExecHashJoinReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(cxt->toc, plan_node_id, false); + + /* + * It would be possible to reuse the shared hash table in single-batch + * cases by resetting and then fast-forwarding build_barrier to + * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but + * currently shared hash tables are already freed by now (by the last + * participant to detach from the batch). We could consider keeping it + * around for single-batch joins. We'd also need to adjust + * finalize_plan() so that it doesn't record a dummy dependency for + * Parallel Hash nodes, preventing the rescan optimization. For now we + * don't try. + */ + + /* Detach, freeing any remaining shared memory. */ + if (state->hj_HashTable != NULL) + { + ExecHashTableDetachBatch(state->hj_HashTable); + ExecHashTableDetach(state->hj_HashTable); + } + + /* Clear any shared batch files. */ + SharedFileSetDeleteAll(&pstate->fileset); + + /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */ + BarrierInit(&pstate->build_barrier, 0); +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt) +{ + HashState *hashNode; + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(pwcxt->toc, plan_node_id, false); + + /* Attach to the space for shared temporary files. */ + SharedFileSetAttach(&pstate->fileset, pwcxt->seg); + + /* Attach to the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); +} |