diff options
author | Andres Freund <andres@anarazel.de> | 2017-12-20 23:39:21 -0800 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2017-12-21 00:43:41 -0800 |
commit | 1804284042e659e7d16904e7bbb0ad546394b6a3 (patch) | |
tree | d1980f7d94cb31aed8880007e5a41ede08d0cb84 /src/backend/executor/nodeHashjoin.c | |
parent | f94eec490b2671399c102b89c9fa0311aea3a39f (diff) | |
download | postgresql-1804284042e659e7d16904e7bbb0ad546394b6a3.tar.gz postgresql-1804284042e659e7d16904e7bbb0ad546394b6a3.zip |
Add parallel-aware hash joins.
Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel
Hash Join with Parallel Hash. While hash joins could already appear in
parallel queries, they were previously always parallel-oblivious and had a
partial subplan only on the outer side, meaning that the work of the inner
subplan was duplicated in every worker.
After this commit, the planner will consider using a partial subplan on the
inner side too, using the Parallel Hash node to divide the work over the
available CPU cores and combine its results in shared memory. If the join
needs to be split into multiple batches in order to respect work_mem, then
workers process different batches as much as possible and then work together
on the remaining batches.
The advantages of a parallel-aware hash join over a parallel-oblivious hash
join used in a parallel query are that it:
* avoids wasting memory on duplicated hash tables
* avoids wasting disk space on duplicated batch files
* divides the work of building the hash table over the CPUs
One disadvantage is that there is some communication between the participating
CPUs which might outweigh the benefits of parallelism in the case of small
hash tables. This is avoided by the planner's existing reluctance to supply
partial plans for small scans, but it may be necessary to estimate
synchronization costs in future if that situation changes. Another is that
outer batch 0 must be written to disk if multiple batches are required.
A potential future advantage of parallel-aware hash joins is that right and
full outer joins could be supported, since there is a single set of matched
bits for each hashtable, but that is not yet implemented.
A new GUC enable_parallel_hash is defined to control the feature, defaulting
to on.
Author: Thomas Munro
Reviewed-By: Andres Freund, Robert Haas
Tested-By: Rafia Sabih, Prabhat Sahu
Discussion:
https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r-- | src/backend/executor/nodeHashjoin.c | 617 |
1 files changed, 597 insertions, 20 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ab1632cc13d..5d1dc1f401e 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -10,18 +10,112 @@ * IDENTIFICATION * src/backend/executor/nodeHashjoin.c * + * PARALLELISM + * + * Hash joins can participate in parallel query execution in several ways. A + * parallel-oblivious hash join is one where the node is unaware that it is + * part of a parallel plan. In this case, a copy of the inner plan is used to + * build a copy of the hash table in every backend, and the outer plan could + * either be built from a partial or complete path, so that the results of the + * hash join are correspondingly either partial or complete. A parallel-aware + * hash join is one that behaves differently, coordinating work between + * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel + * Hash Join always appears with a Parallel Hash node. + * + * Parallel-aware hash joins use the same per-backend state machine to track + * progress through the hash join algorithm as parallel-oblivious hash joins. + * In a parallel-aware hash join, there is also a shared state machine that + * co-operating backends use to synchronize their local state machines and + * program counters. The shared state machine is managed with a Barrier IPC + * primitive. When all attached participants arrive at a barrier, the phase + * advances and all waiting participants are released. + * + * When a participant begins working on a parallel hash join, it must first + * figure out how much progress has already been made, because participants + * don't wait for each other to begin. For this reason there are switch + * statements at key points in the code where we have to synchronize our local + * state machine with the phase, and then jump to the correct part of the + * algorithm so that we can get started. + * + * One barrier called build_barrier is used to coordinate the hashing phases. + * The phase is represented by an integer which begins at zero and increments + * one by one, but in the code it is referred to by symbolic names as follows: + * + * PHJ_BUILD_ELECTING -- initial state + * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 + * PHJ_BUILD_HASHING_INNER -- all hash the inner rel + * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer + * PHJ_BUILD_DONE -- building done, probing can begin + * + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may + * be used repeatedly as required to coordinate expansions in the number of + * batches or buckets. Their phases are as follows: + * + * PHJ_GROW_BATCHES_ELECTING -- initial state + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches + * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition + * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew + * + * PHJ_GROW_BUCKETS_ELECTING -- initial state + * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets + * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples + * + * If the planner got the number of batches and buckets right, those won't be + * necessary, but on the other hand we might finish up needing to expand the + * buckets or batches multiple times while hashing the inner relation to stay + * within our memory budget and load factor target. For that reason it's a + * separate pair of barriers using circular phases. + * + * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins, + * because we need to divide the outer relation into batches up front in order + * to be able to process batches entirely independently. In contrast, the + * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' + * batches whenever it encounters them while scanning and probing, which it + * can do because it processes batches in serial order. + * + * Once PHJ_BUILD_DONE is reached, backends then split up and process + * different batches, or gang up and work together on probing batches if there + * aren't enough to go around. For each batch there is a separate barrier + * with the following phases: + * + * PHJ_BATCH_ELECTING -- initial state + * PHJ_BATCH_ALLOCATING -- one allocates buckets + * PHJ_BATCH_LOADING -- all load the hash table from disk + * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_DONE -- end + * + * Batch 0 is a special case, because it starts out in phase + * PHJ_BATCH_PROBING; populating batch 0's hash table is done during + * PHJ_BUILD_HASHING_INNER so we can skip loading. + * + * Initially we try to plan for a single-batch hash join using the combined + * work_mem of all participants to create a large shared hash table. If that + * turns out either at planning or execution time to be impossible then we + * fall back to regular work_mem sized hash tables. + * + * To avoid deadlocks, we never wait for any barrier unless it is known that + * all other backends attached to it are actively executing the node or have + * already arrived. Practically, that means that we never return a tuple + * while attached to a barrier, unless the barrier has reached its final + * state. In the slightly special case of the per-batch barrier, we return + * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/hashjoin.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" +#include "utils/sharedtuplestore.h" /* @@ -42,24 +136,34 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); +static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue); static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); +static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); /* ---------------------------------------------------------------- - * ExecHashJoin + * ExecHashJoinImpl * - * This function implements the Hybrid Hashjoin algorithm. + * This function implements the Hybrid Hashjoin algorithm. It is marked + * with an always-inline attribute so that ExecHashJoin() and + * ExecParallelHashJoin() can inline it. Compilers that respect the + * attribute should create versions specialized for parallel == true and + * parallel == false with unnecessary branches removed. * * Note: the relation we build hash table on is the "inner" * the other one is "outer". * ---------------------------------------------------------------- */ -static TupleTableSlot * /* return: a tuple or NULL */ -ExecHashJoin(PlanState *pstate) +pg_attribute_always_inline +static inline TupleTableSlot * +ExecHashJoinImpl(PlanState *pstate, bool parallel) { HashJoinState *node = castNode(HashJoinState, pstate); PlanState *outerNode; @@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate) TupleTableSlot *outerTupleSlot; uint32 hashvalue; int batchno; + ParallelHashJoinState *parallel_state; /* * get information from HashJoin node @@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate) outerNode = outerPlanState(node); hashtable = node->hj_HashTable; econtext = node->js.ps.ps_ExprContext; + parallel_state = hashNode->parallel_state; /* * Reset per-tuple memory context to free any expression evaluation @@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (parallel) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables, because no one participant can + * determine that there are no outer tuples, and it's not + * yet clear that it's worth the synchronization overhead + * of reaching consensus to figure that out. So we have + * to build the hash table. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate) node->hj_FirstOuterTupleSlot = NULL; /* - * create the hash table + * Create the hash table. If using Parallel Hash, then + * whoever gets here first will create the hash table and any + * later arrivals will merely attach to it. */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; /* - * execute the Hash node, to build the hash table + * Execute the Hash node, to build the hash table. If using + * Parallel Hash, then we'll try to help hashing unless we + * arrived too late. */ hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); @@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (parallel) + { + Barrier *build_barrier; + + build_barrier = ¶llel_state->build_barrier; + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) + { + /* + * If multi-batch, we need to hash the outer relation + * up front. + */ + if (hashtable->nbatch > 1) + ExecParallelHashJoinPartitionOuter(node); + BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER); + } + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + + /* Each backend should now select a batch to work on. */ + hashtable->curbatch = -1; + node->hj_JoinState = HJ_NEED_NEW_BATCH; + + continue; + } + else + node->hj_JoinState = HJ_NEED_NEW_OUTER; /* FALL THRU */ @@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate) /* * We don't have an outer tuple, try to get the next one */ - outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, - node, - &hashvalue); + if (parallel) + outerTupleSlot = + ExecParallelHashJoinOuterGetTuple(outerNode, node, + &hashvalue); + else + outerTupleSlot = + ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); + if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ @@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate) * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ + Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, &hashtable->outerBatchFile[batchno]); + /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate) /* * Scan the selected hash bucket for matches to current outer */ - if (!ExecScanHashBucket(node, econtext)) + if (parallel) { - /* out of matches; check for possible outer-join fill */ - node->hj_JoinState = HJ_FILL_OUTER_TUPLE; - continue; + if (!ExecParallelScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } + } + else + { + if (!ExecScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } } /* @@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate) /* * Try to advance to next batch. Done if there are no more. */ - if (!ExecHashJoinNewBatch(node)) - return NULL; /* end of join */ + if (parallel) + { + if (!ExecParallelHashJoinNewBatch(node)) + return NULL; /* end of parallel-aware join */ + } + else + { + if (!ExecHashJoinNewBatch(node)) + return NULL; /* end of parallel-oblivious join */ + } node->hj_JoinState = HJ_NEED_NEW_OUTER; break; @@ -375,6 +551,38 @@ ExecHashJoin(PlanState *pstate) } /* ---------------------------------------------------------------- + * ExecHashJoin + * + * Parallel-oblivious version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-aware branches removed. + */ + return ExecHashJoinImpl(pstate, false); +} + +/* ---------------------------------------------------------------- + * ExecParallelHashJoin + * + * Parallel-aware version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecParallelHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-oblivious branches removed. + */ + return ExecHashJoinImpl(pstate, true); +} + +/* ---------------------------------------------------------------- * ExecInitHashJoin * * Init routine for HashJoin node. @@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate = makeNode(HashJoinState); hjstate->js.ps.plan = (Plan *) node; hjstate->js.ps.state = estate; + + /* + * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() + * where this function may be replaced with a parallel version, if we + * managed to launch a parallel query. + */ hjstate->js.ps.ExecProcNode = ExecHashJoin; /* @@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node) /* * ExecHashJoinOuterGetTuple * - * get the next outer tuple for hashjoin: either by - * executing the outer plan node in the first pass, or from - * the temp files for the hashjoin batches. + * get the next outer tuple for a parallel oblivious hashjoin: either by + * executing the outer plan node in the first pass, or from the temp + * files for the hashjoin batches. * * Returns a null slot if no more outer tuples (within the current batch). * @@ -662,6 +876,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } /* + * ExecHashJoinOuterGetTuple variant for the parallel case. + */ +static TupleTableSlot * +ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + + /* + * In the Parallel Hash case we only run the outer plan directly for + * single-batch hash joins. Otherwise we have to go to batch files, even + * for batch 0. + */ + if (curbatch == 0 && hashtable->nbatch == 1) + { + slot = ExecProcNode(outerNode); + + while (!TupIsNull(slot)) + { + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate), + hashvalue)) + return slot; + + /* + * That tuple couldn't match because of a NULL, so discard it and + * continue with the next one. + */ + slot = ExecProcNode(outerNode); + } + } + else if (curbatch < hashtable->nbatch) + { + MinimalTuple tuple; + + tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, + hashvalue); + if (tuple != NULL) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_OuterTupleSlot, + false); + return slot; + } + else + ExecClearTuple(hjstate->hj_OuterTupleSlot); + } + + /* End of this batch */ + return NULL; +} + +/* * ExecHashJoinNewBatch * switch to a new hashjoin batch * @@ -804,6 +1079,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) } /* + * Choose a batch to work on, and attach to it. Returns true if successful, + * false if there are no more batches. + */ +static bool +ExecParallelHashJoinNewBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int start_batchno; + int batchno; + + /* + * If we started up so late that the batch tracking array has been freed + * already by ExecHashTableDetach(), then we are finished. See also + * ExecParallelHashEnsureBatchAccessors(). + */ + if (hashtable->batches == NULL) + return false; + + /* + * If we were already attached to a batch, remember not to bother checking + * it again, and detach from it (possibly freeing the hash table if we are + * last to detach). + */ + if (hashtable->curbatch >= 0) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + /* + * Search for a batch that isn't done. We use an atomic counter to start + * our search at a different batch in every participant when there are + * more batches than participants. + */ + batchno = start_batchno = + pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % + hashtable->nbatch; + do + { + uint32 hashvalue; + MinimalTuple tuple; + TupleTableSlot *slot; + + if (!hashtable->batches[batchno].done) + { + SharedTuplestoreAccessor *inner_tuples; + Barrier *batch_barrier = + &hashtable->batches[batchno].shared->batch_barrier; + + switch (BarrierAttach(batch_barrier)) + { + case PHJ_BATCH_ELECTING: + + /* One backend allocates the hash table. */ + if (BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ELECTING)) + ExecParallelHashTableAlloc(hashtable, batchno); + /* Fall through. */ + + case PHJ_BATCH_ALLOCATING: + /* Wait for allocation to complete. */ + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ALLOCATING); + /* Fall through. */ + + case PHJ_BATCH_LOADING: + /* Start (or join in) loading tuples. */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + inner_tuples = hashtable->batches[batchno].inner_tuples; + sts_begin_parallel_scan(inner_tuples); + while ((tuple = sts_parallel_scan_next(inner_tuples, + &hashvalue))) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + ExecParallelHashTableInsertCurrentBatch(hashtable, slot, + hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_LOADING); + /* Fall through. */ + + case PHJ_BATCH_PROBING: + + /* + * This batch is ready to probe. Return control to + * caller. We stay attached to batch_barrier so that the + * hash table stays alive until everyone's finished + * probing it, but no participant is allowed to wait at + * this barrier again (or else a deadlock could occur). + * All attached participants must eventually call + * BarrierArriveAndDetach() so that the final phase + * PHJ_BATCH_DONE can be reached. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + + case PHJ_BATCH_DONE: + + /* + * Already done. Detach and go around again (if any + * remain). + */ + BarrierDetach(batch_barrier); + + /* + * We didn't work on this batch, but we need to observe + * its size for EXPLAIN. + */ + ExecParallelHashUpdateSpacePeak(hashtable, batchno); + hashtable->batches[batchno].done = true; + hashtable->curbatch = -1; + break; + + default: + elog(ERROR, "unexpected batch phase %d", + BarrierPhase(batch_barrier)); + } + } + batchno = (batchno + 1) % hashtable->nbatch; + } while (batchno != start_batchno); + + return false; +} + +/* * ExecHashJoinSaveTuple * save a tuple to a batch file. * @@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void +ExecShutdownHashJoin(HashJoinState *node) +{ + if (node->hj_HashTable) + { + /* + * Detach from shared state before DSM memory goes away. This makes + * sure that we don't have any pointers into DSM memory by the time + * ExecEndHashJoin runs. + */ + ExecHashTableDetachBatch(node->hj_HashTable); + ExecHashTableDetach(node->hj_HashTable); + } +} + +static void +ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) +{ + PlanState *outerState = outerPlanState(hjstate); + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + HashJoinTable hashtable = hjstate->hj_HashTable; + TupleTableSlot *slot; + uint32 hashvalue; + int i; + + Assert(hjstate->hj_FirstOuterTupleSlot == NULL); + + /* Execute outer plan, writing all tuples to shared tuplestores. */ + for (;;) + { + slot = ExecProcNode(outerState); + if (TupIsNull(slot)) + break; + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + false, /* outer join, currently unsupported */ + &hashvalue)) + { + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + sts_puttuple(hashtable->batches[batchno].outer_tuples, + &hashvalue, ExecFetchSlotMinimalTuple(slot)); + } + CHECK_FOR_INTERRUPTS(); + } + + /* Make sure all outer partitions are readable by any backend. */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].outer_tuples); +} + +void +ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + HashState *hashNode; + ParallelHashJoinState *pstate; + + /* + * Disable shared hash table mode if we failed to create a real DSM + * segment, because that means that we don't have a DSA area to work with. + */ + if (pcxt->seg == NULL) + return; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); + + /* + * Set up the state needed to coordinate access to the shared hash + * table(s), using the plan node ID as the toc key. + */ + pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); + shm_toc_insert(pcxt->toc, plan_node_id, pstate); + + /* + * Set up the shared hash join state with no batches initially. + * ExecHashTableCreate() will prepare at least one later and set nbatch + * and space_allowed. + */ + pstate->nbatch = 0; + pstate->space_allowed = 0; + pstate->batches = InvalidDsaPointer; + pstate->old_batches = InvalidDsaPointer; + pstate->nbuckets = 0; + pstate->growth = PHJ_GROWTH_OK; + pstate->chunk_work_queue = InvalidDsaPointer; + pg_atomic_init_u32(&pstate->distributor, 0); + pstate->nparticipants = pcxt->nworkers + 1; + pstate->total_tuples = 0; + LWLockInitialize(&pstate->lock, + LWTRANCHE_PARALLEL_HASH_JOIN); + BarrierInit(&pstate->build_barrier, 0); + BarrierInit(&pstate->grow_batches_barrier, 0); + BarrierInit(&pstate->grow_buckets_barrier, 0); + + /* Set up the space we'll use for shared temporary files. */ + SharedFileSetInit(&pstate->fileset, pcxt->seg); + + /* Initialize the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; +} + +/* ---------------------------------------------------------------- + * ExecHashJoinReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(cxt->toc, plan_node_id, false); + + /* + * It would be possible to reuse the shared hash table in single-batch + * cases by resetting and then fast-forwarding build_barrier to + * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but + * currently shared hash tables are already freed by now (by the last + * participant to detach from the batch). We could consider keeping it + * around for single-batch joins. We'd also need to adjust + * finalize_plan() so that it doesn't record a dummy dependency for + * Parallel Hash nodes, preventing the rescan optimization. For now we + * don't try. + */ + + /* Detach, freeing any remaining shared memory. */ + if (state->hj_HashTable != NULL) + { + ExecHashTableDetachBatch(state->hj_HashTable); + ExecHashTableDetach(state->hj_HashTable); + } + + /* Clear any shared batch files. */ + SharedFileSetDeleteAll(&pstate->fileset); + + /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */ + BarrierInit(&pstate->build_barrier, 0); +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt) +{ + HashState *hashNode; + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(pwcxt->toc, plan_node_id, false); + + /* Attach to the space for shared temporary files. */ + SharedFileSetAttach(&pstate->fileset, pwcxt->seg); + + /* Attach to the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); +} |