diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execParallel.c | 21 | ||||
-rw-r--r-- | src/backend/executor/execProcnode.c | 3 | ||||
-rw-r--r-- | src/backend/executor/nodeHash.c | 1647 | ||||
-rw-r--r-- | src/backend/executor/nodeHashjoin.c | 617 |
4 files changed, 2198 insertions, 90 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 604f4f5b613..b344d4b589d 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -31,6 +31,7 @@ #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeHash.h" +#include "executor/nodeHashjoin.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeSeqscan.h" @@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, e->pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinEstimate((HashJoinState *) planstate, + e->pcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashEstimate((HashState *) planstate, e->pcxt); @@ -474,6 +480,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, d->pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinInitializeDSM((HashJoinState *) planstate, + d->pcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashInitializeDSM((HashState *) planstate, d->pcxt); @@ -898,6 +909,11 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, pcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinReInitializeDSM((HashJoinState *) planstate, + pcxt); + break; case T_HashState: case T_SortState: /* these nodes have DSM state, but no reinitialization is required */ @@ -1196,6 +1212,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, pwcxt); break; + case T_HashJoinState: + if (planstate->plan->parallel_aware) + ExecHashJoinInitializeWorker((HashJoinState *) planstate, + pwcxt); + break; case T_HashState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecHashInitializeWorker((HashState *) planstate, pwcxt); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index fcb8b569999..699dc691792 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -770,6 +770,9 @@ ExecShutdownNode(PlanState *node) case T_HashState: ExecShutdownHash((HashState *) node); break; + case T_HashJoinState: + ExecShutdownHashJoin((HashJoinState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index afd7384e945..4284e8682a0 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -10,6 +10,8 @@ * IDENTIFICATION * src/backend/executor/nodeHash.c * + * See note on parallelism in nodeHashjoin.c. + * *------------------------------------------------------------------------- */ /* @@ -25,6 +27,7 @@ #include <limits.h> #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/pg_statistic.h" #include "commands/tablespace.h" #include "executor/execdebug.h" @@ -32,6 +35,8 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "port/atomics.h" #include "utils/dynahash.h" #include "utils/memutils.h" #include "utils/lsyscache.h" @@ -40,6 +45,8 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); +static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -49,6 +56,30 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); static void *dense_alloc(HashJoinTable hashtable, Size size); +static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, + size_t size, + dsa_pointer *shared); +static void MultiExecPrivateHash(HashState *node); +static void MultiExecParallelHash(HashState *node); +static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, + int bucketno); +static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, + HashJoinTuple tuple); +static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared); +static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); +static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); +static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); +static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); +static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, + dsa_pointer *shared); +static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, + int batchno, + size_t size); +static void ExecParallelHashMergeCounters(HashJoinTable hashtable); +static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); + /* ---------------------------------------------------------------- * ExecHash @@ -73,6 +104,39 @@ ExecHash(PlanState *pstate) Node * MultiExecHash(HashState *node) { + /* must provide our own instrumentation support */ + if (node->ps.instrument) + InstrStartNode(node->ps.instrument); + + if (node->parallel_state != NULL) + MultiExecParallelHash(node); + else + MultiExecPrivateHash(node); + + /* must provide our own instrumentation support */ + if (node->ps.instrument) + InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); + + /* + * We do not return the hash table directly because it's not a subtype of + * Node, and so would violate the MultiExecProcNode API. Instead, our + * parent Hashjoin node is expected to know how to fish it out of our node + * state. Ugly but not really worth cleaning up, since Hashjoin knows + * quite a bit more about Hash besides that. + */ + return NULL; +} + +/* ---------------------------------------------------------------- + * MultiExecPrivateHash + * + * parallel-oblivious version, building a backend-private + * hash table and (if necessary) batch files. + * ---------------------------------------------------------------- + */ +static void +MultiExecPrivateHash(HashState *node) +{ PlanState *outerNode; List *hashkeys; HashJoinTable hashtable; @@ -80,10 +144,6 @@ MultiExecHash(HashState *node) ExprContext *econtext; uint32 hashvalue; - /* must provide our own instrumentation support */ - if (node->ps.instrument) - InstrStartNode(node->ps.instrument); - /* * get state info from node */ @@ -138,18 +198,147 @@ MultiExecHash(HashState *node) if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - /* must provide our own instrumentation support */ - if (node->ps.instrument) - InstrStopNode(node->ps.instrument, hashtable->totalTuples); + hashtable->partialTuples = hashtable->totalTuples; +} + +/* ---------------------------------------------------------------- + * MultiExecParallelHash + * + * parallel-aware version, building a shared hash table and + * (if necessary) batch files using the combined effort of + * a set of co-operating backends. + * ---------------------------------------------------------------- + */ +static void +MultiExecParallelHash(HashState *node) +{ + ParallelHashJoinState *pstate; + PlanState *outerNode; + List *hashkeys; + HashJoinTable hashtable; + TupleTableSlot *slot; + ExprContext *econtext; + uint32 hashvalue; + Barrier *build_barrier; + int i; /* - * We do not return the hash table directly because it's not a subtype of - * Node, and so would violate the MultiExecProcNode API. Instead, our - * parent Hashjoin node is expected to know how to fish it out of our node - * state. Ugly but not really worth cleaning up, since Hashjoin knows - * quite a bit more about Hash besides that. + * get state info from node */ - return NULL; + outerNode = outerPlanState(node); + hashtable = node->hashtable; + + /* + * set expression context + */ + hashkeys = node->hashkeys; + econtext = node->ps.ps_ExprContext; + + /* + * Synchronize the parallel hash table build. At this stage we know that + * the shared hash table has been or is being set up by + * ExecHashTableCreate(), but we don't know if our peers have returned + * from there or are here in MultiExecParallelHash(), and if so how far + * through they are. To find out, we check the build_barrier phase then + * and jump to the right step in the build algorithm. + */ + pstate = hashtable->parallel_state; + build_barrier = &pstate->build_barrier; + Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING); + switch (BarrierPhase(build_barrier)) + { + case PHJ_BUILD_ALLOCATING: + + /* + * Either I just allocated the initial hash table in + * ExecHashTableCreate(), or someone else is doing that. Either + * way, wait for everyone to arrive here so we can proceed. + */ + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING); + /* Fall through. */ + + case PHJ_BUILD_HASHING_INNER: + + /* + * It's time to begin hashing, or if we just arrived here then + * hashing is already underway, so join in that effort. While + * hashing we have to be prepared to help increase the number of + * batches or buckets at any time, and if we arrived here when + * that was already underway we'll have to help complete that work + * immediately so that it's safe to access batches and buckets + * below. + */ + if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != + PHJ_GROW_BATCHES_ELECTING) + ExecParallelHashIncreaseNumBatches(hashtable); + if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != + PHJ_GROW_BUCKETS_ELECTING) + ExecParallelHashIncreaseNumBuckets(hashtable); + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + for (;;) + { + slot = ExecProcNode(outerNode); + if (TupIsNull(slot)) + break; + econtext->ecxt_innertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, hashkeys, + false, hashtable->keepNulls, + &hashvalue)) + ExecParallelHashTableInsert(hashtable, slot, hashvalue); + hashtable->partialTuples++; + } + BarrierDetach(&pstate->grow_buckets_barrier); + BarrierDetach(&pstate->grow_batches_barrier); + + /* + * Make sure that any tuples we wrote to disk are visible to + * others before anyone tries to load them. + */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].inner_tuples); + + /* + * Update shared counters. We need an accurate total tuple count + * to control the empty table optimization. + */ + ExecParallelHashMergeCounters(hashtable); + + /* + * Wait for everyone to finish building and flushing files and + * counters. + */ + if (BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_INNER)) + { + /* + * Elect one backend to disable any further growth. Batches + * are now fixed. While building them we made sure they'd fit + * in our memory budget when we load them back in later (or we + * tried to do that and gave up because we detected extreme + * skew). + */ + pstate->growth = PHJ_GROWTH_DISABLED; + } + } + + /* + * We're not yet attached to a batch. We all agree on the dimensions and + * number of inner tuples (for the empty table optimization). + */ + hashtable->curbatch = -1; + hashtable->nbuckets = pstate->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + hashtable->totalTuples = pstate->total_tuples; + ExecParallelHashEnsureBatchAccessors(hashtable); + + /* + * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE + * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't + * there already). + */ + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); } /* ---------------------------------------------------------------- @@ -240,12 +429,15 @@ ExecEndHash(HashState *node) * ---------------------------------------------------------------- */ HashJoinTable -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) { + Hash *node; HashJoinTable hashtable; Plan *outerNode; + size_t space_allowed; int nbuckets; int nbatch; + double rows; int num_skew_mcvs; int log2_nbuckets; int nkeys; @@ -258,10 +450,22 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) * "outer" subtree of this node, but the inner relation of the hashjoin). * Compute the appropriate size of the hash table. */ + node = (Hash *) state->ps.plan; outerNode = outerPlan(node); - ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, + /* + * If this is shared hash table with a partial plan, then we can't use + * outerNode->plan_rows to estimate its size. We need an estimate of the + * total number of rows across all copies of the partial plan. + */ + rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; + + ExecChooseHashTableSize(rows, outerNode->plan_width, OidIsValid(node->skewTable), + state->parallel_state != NULL, + state->parallel_state != NULL ? + state->parallel_state->nparticipants - 1 : 0, + &space_allowed, &nbuckets, &nbatch, &num_skew_mcvs); /* nbuckets must be a power of 2 */ @@ -280,7 +484,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; hashtable->log2_nbuckets_optimal = log2_nbuckets; - hashtable->buckets = NULL; + hashtable->buckets.unshared = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; hashtable->skewBucket = NULL; @@ -293,16 +497,21 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->partialTuples = 0; hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; - hashtable->spaceAllowed = work_mem * 1024L; + hashtable->spaceAllowed = space_allowed; hashtable->spaceUsedSkew = 0; hashtable->spaceAllowedSkew = hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; hashtable->chunks = NULL; + hashtable->current_chunk = NULL; + hashtable->parallel_state = state->parallel_state; + hashtable->area = state->ps.state->es_query_dsa; + hashtable->batches = NULL; #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -351,10 +560,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); - if (nbatch > 1) + if (nbatch > 1 && hashtable->parallel_state == NULL) { /* - * allocate and initialize the file arrays in hashCxt + * allocate and initialize the file arrays in hashCxt (not needed for + * parallel case which uses shared tuplestores instead of raw files) */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); @@ -365,23 +575,77 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) PrepareTempTablespaces(); } - /* - * Prepare context for the first-scan space allocations; allocate the - * hashbucket array therein, and set each bucket "empty". - */ - MemoryContextSwitchTo(hashtable->batchCxt); + MemoryContextSwitchTo(oldcxt); - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + if (hashtable->parallel_state) + { + ParallelHashJoinState *pstate = hashtable->parallel_state; + Barrier *build_barrier; - /* - * Set up for skew optimization, if possible and there's a need for more - * than one batch. (In a one-batch join, there's no point in it.) - */ - if (nbatch > 1) - ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + /* + * Attach to the build barrier. The corresponding detach operation is + * in ExecHashTableDetach. Note that we won't attach to the + * batch_barrier for batch 0 yet. We'll attach later and start it out + * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front + * and then loaded while hashing (the standard hybrid hash join + * algorithm), and we'll coordinate that using build_barrier. + */ + build_barrier = &pstate->build_barrier; + BarrierAttach(build_barrier); - MemoryContextSwitchTo(oldcxt); + /* + * So far we have no idea whether there are any other participants, + * and if so, what phase they are working on. The only thing we care + * about at this point is whether someone has already created the + * SharedHashJoinBatch objects and the hash table for batch 0. One + * backend will be elected to do that now if necessary. + */ + if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING && + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING)) + { + pstate->nbatch = nbatch; + pstate->space_allowed = space_allowed; + pstate->growth = PHJ_GROWTH_OK; + + /* Set up the shared state for coordinating batches. */ + ExecParallelHashJoinSetUpBatches(hashtable, nbatch); + + /* + * Allocate batch 0's hash table up front so we can load it + * directly while hashing. + */ + pstate->nbuckets = nbuckets; + ExecParallelHashTableAlloc(hashtable, 0); + } + + /* + * The next Parallel Hash synchronization point is in + * MultiExecParallelHash(), which will progress it all the way to + * PHJ_BUILD_DONE. The caller must not return control from this + * executor node between now and then. + */ + } + else + { + /* + * Prepare context for the first-scan space allocations; allocate the + * hashbucket array therein, and set each bucket "empty". + */ + MemoryContextSwitchTo(hashtable->batchCxt); + + hashtable->buckets.unshared = (HashJoinTuple *) + palloc0(nbuckets * sizeof(HashJoinTuple)); + + /* + * Set up for skew optimization, if possible and there's a need for + * more than one batch. (In a one-batch join, there's no point in + * it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + + MemoryContextSwitchTo(oldcxt); + } return hashtable; } @@ -399,6 +663,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool try_combined_work_mem, + int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs) @@ -434,6 +701,16 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, hash_table_bytes = work_mem * 1024L; /* + * Parallel Hash tries to use the combined work_mem of all workers to + * avoid the need to batch. If that won't work, it falls back to work_mem + * per worker and tries to process batches in parallel. + */ + if (try_combined_work_mem) + hash_table_bytes += hash_table_bytes * parallel_workers; + + *space_allowed = hash_table_bytes; + + /* * If skew optimization is possible, estimate the number of skew buckets * that will fit in the memory allowed, and decrement the assumed space * available for the main hash table accordingly. @@ -478,7 +755,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * Note that both nbuckets and nbatch must be powers of 2 to make * ExecHashGetBucketAndBatch fast. */ - max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple); + max_pointers = *space_allowed / sizeof(HashJoinTuple); max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); /* If max_pointers isn't a power of 2, must round it down to one */ mppow2 = 1L << my_log2(max_pointers); @@ -511,6 +788,21 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, long bucket_size; /* + * If Parallel Hash with combined work_mem would still need multiple + * batches, we'll have to fall back to regular work_mem budget. + */ + if (try_combined_work_mem) + { + ExecChooseHashTableSize(ntuples, tupwidth, useskew, + false, parallel_workers, + space_allowed, + numbuckets, + numbatches, + num_skew_mcvs); + return; + } + + /* * Estimate the number of buckets we'll want to have when work_mem is * entirely full. Each bucket will contain a bucket pointer plus * NTUP_PER_BUCKET tuples, whose projected size already includes @@ -564,14 +856,17 @@ ExecHashTableDestroy(HashJoinTable hashtable) /* * Make sure all the temp files are closed. We skip batch 0, since it * can't have any temp files (and the arrays might not even exist if - * nbatch is only 1). + * nbatch is only 1). Parallel hash joins don't use these files. */ - for (i = 1; i < hashtable->nbatch; i++) + if (hashtable->innerBatchFile != NULL) { - if (hashtable->innerBatchFile[i]) - BufFileClose(hashtable->innerBatchFile[i]); - if (hashtable->outerBatchFile[i]) - BufFileClose(hashtable->outerBatchFile[i]); + for (i = 1; i < hashtable->nbatch; i++) + { + if (hashtable->innerBatchFile[i]) + BufFileClose(hashtable->innerBatchFile[i]); + if (hashtable->outerBatchFile[i]) + BufFileClose(hashtable->outerBatchFile[i]); + } } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -657,8 +952,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable->nbuckets = hashtable->nbuckets_optimal; hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; - hashtable->buckets = repalloc(hashtable->buckets, - sizeof(HashJoinTuple) * hashtable->nbuckets); + hashtable->buckets.unshared = + repalloc(hashtable->buckets.unshared, + sizeof(HashJoinTuple) * hashtable->nbuckets); } /* @@ -666,14 +962,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) * buckets now and not have to keep track which tuples in the buckets have * already been processed. We will free the old chunks as we go. */ - memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); + memset(hashtable->buckets.unshared, 0, + sizeof(HashJoinTuple) * hashtable->nbuckets); oldchunks = hashtable->chunks; hashtable->chunks = NULL; /* so, let's scan through the old chunks, and all tuples in each chunk */ while (oldchunks != NULL) { - HashMemoryChunk nextchunk = oldchunks->next; + HashMemoryChunk nextchunk = oldchunks->next.unshared; /* position within the buffer (up to oldchunks->used) */ size_t idx = 0; @@ -700,8 +997,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, hashTupleSize); /* and add it back to the appropriate bucket */ - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = copyTuple; } else { @@ -751,6 +1048,380 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } /* + * ExecParallelHashIncreaseNumBatches + * Every participant attached to grow_barrier must run this function + * when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. + */ +static void +ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * It's unlikely, but we need to be prepared for new participants to show + * up while we're in the middle of this operation so we need to switch on + * barrier phase here. + */ + switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) + { + case PHJ_GROW_BATCHES_ELECTING: + + /* + * Elect one participant to prepare to grow the number of batches. + * This involves reallocating or resetting the buckets of batch 0 + * in preparation for all participants to begin repartitioning the + * tuples. + */ + if (BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_ELECTING)) + { + dsa_pointer_atomic *buckets; + ParallelHashJoinBatch *old_batch0; + int new_nbatch; + int i; + + /* Move the old batch out of the way. */ + old_batch0 = hashtable->batches[0].shared; + pstate->old_batches = pstate->batches; + pstate->old_nbatch = hashtable->nbatch; + pstate->batches = InvalidDsaPointer; + + /* Free this backend's old accessors. */ + ExecParallelHashCloseBatchAccessors(hashtable); + + /* Figure out how many batches to use. */ + if (hashtable->nbatch == 1) + { + /* + * We are going from single-batch to multi-batch. We need + * to switch from one large combined memory budget to the + * regular work_mem budget. + */ + pstate->space_allowed = work_mem * 1024L; + + /* + * The combined work_mem of all participants wasn't + * enough. Therefore one batch per participant would be + * approximately equivalent and would probably also be + * insufficient. So try two batches per particiant, + * rounded up to a power of two. + */ + new_nbatch = 1 << my_log2(pstate->nparticipants * 2); + } + else + { + /* + * We were already multi-batched. Try doubling the number + * of batches. + */ + new_nbatch = hashtable->nbatch * 2; + } + + /* Allocate new larger generation of batches. */ + Assert(hashtable->nbatch == pstate->nbatch); + ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); + Assert(hashtable->nbatch == pstate->nbatch); + + /* Replace or recycle batch 0's bucket array. */ + if (pstate->old_nbatch == 1) + { + double dtuples; + double dbuckets; + int new_nbuckets; + + /* + * We probably also need a smaller bucket array. How many + * tuples do we expect per batch, assuming we have only + * half of them so far? Normally we don't need to change + * the bucket array's size, because the size of each batch + * stays the same as we add more batches, but in this + * special case we move from a large batch to many smaller + * batches and it would be wasteful to keep the large + * array. + */ + dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; + dbuckets = ceil(dtuples / NTUP_PER_BUCKET); + dbuckets = Min(dbuckets, + MaxAllocSize / sizeof(dsa_pointer_atomic)); + new_nbuckets = (int) dbuckets; + new_nbuckets = Max(new_nbuckets, 1024); + new_nbuckets = 1 << my_log2(new_nbuckets); + dsa_free(hashtable->area, old_batch0->buckets); + hashtable->batches[0].shared->buckets = + dsa_allocate(hashtable->area, + sizeof(dsa_pointer_atomic) * new_nbuckets); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[0].shared->buckets); + for (i = 0; i < new_nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); + pstate->nbuckets = new_nbuckets; + } + else + { + /* Recycle the existing bucket array. */ + hashtable->batches[0].shared->buckets = old_batch0->buckets; + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, old_batch0->buckets); + for (i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); + } + + /* Move all chunks to the work queue for parallel processing. */ + pstate->chunk_work_queue = old_batch0->chunks; + + /* Disable further growth temporarily while we're growing. */ + pstate->growth = PHJ_GROWTH_DISABLED; + } + else + { + /* All other participants just flush their tuples to disk. */ + ExecParallelHashCloseBatchAccessors(hashtable); + } + /* Fall through. */ + + case PHJ_GROW_BATCHES_ALLOCATING: + /* Wait for the above to be finished. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING); + /* Fall through. */ + + case PHJ_GROW_BATCHES_REPARTITIONING: + /* Make sure that we have the current dimensions and buckets. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + /* Then partition, flush counters. */ + ExecParallelHashRepartitionFirst(hashtable); + ExecParallelHashRepartitionRest(hashtable); + ExecParallelHashMergeCounters(hashtable); + /* Wait for the above to be finished. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING); + /* Fall through. */ + + case PHJ_GROW_BATCHES_DECIDING: + + /* + * Elect one participant to clean up and decide whether further + * repartitioning is needed, or should be disabled because it's + * not helping. + */ + if (BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_DECIDING)) + { + bool space_exhausted = false; + bool extreme_skew_detected = false; + + /* Make sure that we have the current dimensions and buckets. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + + /* Are any of the new generation of batches exhausted? */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatch *batch = hashtable->batches[i].shared; + + if (batch->space_exhausted || + batch->estimated_size > pstate->space_allowed) + { + int parent; + + space_exhausted = true; + + /* + * Did this batch receive ALL of the tuples from its + * parent batch? That would indicate that further + * repartitioning isn't going to help (the hash values + * are probably all the same). + */ + parent = i % pstate->old_nbatch; + if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) + extreme_skew_detected = true; + } + } + + /* Don't keep growing if it's not helping or we'd overflow. */ + if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) + pstate->growth = PHJ_GROWTH_DISABLED; + else if (space_exhausted) + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + else + pstate->growth = PHJ_GROWTH_OK; + + /* Free the old batches in shared memory. */ + dsa_free(hashtable->area, pstate->old_batches); + pstate->old_batches = InvalidDsaPointer; + } + /* Fall through. */ + + case PHJ_GROW_BATCHES_FINISHING: + /* Wait for the above to complete. */ + BarrierArriveAndWait(&pstate->grow_batches_barrier, + WAIT_EVENT_HASH_GROW_BATCHES_FINISHING); + } +} + +/* + * Repartition the tuples currently loaded into memory for inner batch 0 + * because the number of batches has been increased. Some tuples are retained + * in memory and some are written out to a later batch. + */ +static void +ExecParallelHashRepartitionFirst(HashJoinTable hashtable) +{ + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + + Assert(hashtable->nbatch = hashtable->parallel_state->nbatch); + + while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) + { + size_t idx = 0; + + /* Repartition all tuples in this chunk. */ + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + HashJoinTuple copyTuple; + dsa_pointer shared; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + Assert(batchno < hashtable->nbatch); + if (batchno == 0) + { + /* It still belongs in batch 0. Copy to a new chunk. */ + copyTuple = + ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + copyTuple->hashvalue = hashTuple->hashvalue; + memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + copyTuple, shared); + } + else + { + size_t tuple_size = + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + /* It belongs in a later batch. */ + hashtable->batches[batchno].estimated_size += tuple_size; + sts_puttuple(hashtable->batches[batchno].inner_tuples, + &hashTuple->hashvalue, tuple); + } + + /* Count this tuple. */ + ++hashtable->batches[0].old_ntuples; + ++hashtable->batches[batchno].ntuples; + + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + + /* Free this chunk. */ + dsa_free(hashtable->area, chunk_shared); + + CHECK_FOR_INTERRUPTS(); + } +} + +/* + * Help repartition inner batches 1..n. + */ +static void +ExecParallelHashRepartitionRest(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int old_nbatch = pstate->old_nbatch; + SharedTuplestoreAccessor **old_inner_tuples; + ParallelHashJoinBatch *old_batches; + int i; + + /* Get our hands on the previous generation of batches. */ + old_batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->old_batches); + old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch); + for (i = 1; i < old_nbatch; ++i) + { + ParallelHashJoinBatch *shared = + NthParallelHashJoinBatch(old_batches, i); + + old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + } + + /* Join in the effort to repartition them. */ + for (i = 1; i < old_nbatch; ++i) + { + MinimalTuple tuple; + uint32 hashvalue; + + /* Scan one partition from the previous generation. */ + sts_begin_parallel_scan(old_inner_tuples[i]); + while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) + { + size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + int bucketno; + int batchno; + + /* Decide which partition it goes to in the new generation. */ + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + + hashtable->batches[batchno].estimated_size += tuple_size; + ++hashtable->batches[batchno].ntuples; + ++hashtable->batches[i].old_ntuples; + + /* Store the tuple its new batch. */ + sts_puttuple(hashtable->batches[batchno].inner_tuples, + &hashvalue, tuple); + + CHECK_FOR_INTERRUPTS(); + } + sts_end_parallel_scan(old_inner_tuples[i]); + } + + pfree(old_inner_tuples); +} + +/* + * Transfer the backend-local per-batch counters to the shared totals. + */ +static void +ExecParallelHashMergeCounters(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + pstate->total_tuples = 0; + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; + + batch->shared->size += batch->size; + batch->shared->estimated_size += batch->estimated_size; + batch->shared->ntuples += batch->ntuples; + batch->shared->old_ntuples += batch->old_ntuples; + batch->size = 0; + batch->estimated_size = 0; + batch->ntuples = 0; + batch->old_ntuples = 0; + pstate->total_tuples += batch->shared->ntuples; + } + LWLockRelease(&pstate->lock); +} + +/* * ExecHashIncreaseNumBuckets * increase the original number of buckets in order to reduce * number of tuples per bucket @@ -782,14 +1453,15 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) * ExecHashIncreaseNumBatches, but without all the copying into new * chunks) */ - hashtable->buckets = - (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->buckets.unshared = + (HashJoinTuple *) repalloc(hashtable->buckets.unshared, hashtable->nbuckets * sizeof(HashJoinTuple)); - memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple)); + memset(hashtable->buckets.unshared, 0, + hashtable->nbuckets * sizeof(HashJoinTuple)); /* scan through all tuples in all chunks to rebuild the hash table */ - for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) + for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) { /* process all tuples stored in this chunk */ size_t idx = 0; @@ -804,8 +1476,8 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) &bucketno, &batchno); /* add the tuple to the proper bucket */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = hashTuple; /* advance index past the tuple */ idx += MAXALIGN(HJTUPLE_OVERHEAD + @@ -817,6 +1489,93 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) } } +static void +ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + HashMemoryChunk chunk; + dsa_pointer chunk_s; + + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * It's unlikely, but we need to be prepared for new participants to show + * up while we're in the middle of this operation so we need to switch on + * barrier phase here. + */ + switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) + { + case PHJ_GROW_BUCKETS_ELECTING: + /* Elect one participant to prepare to increase nbuckets. */ + if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING)) + { + size_t size; + dsa_pointer_atomic *buckets; + + /* Double the size of the bucket array. */ + pstate->nbuckets *= 2; + size = pstate->nbuckets * sizeof(dsa_pointer_atomic); + hashtable->batches[0].shared->size += size / 2; + dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); + hashtable->batches[0].shared->buckets = + dsa_allocate(hashtable->area, size); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[0].shared->buckets); + for (i = 0; i < pstate->nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); + + /* Put the chunk list onto the work queue. */ + pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; + + /* Clear the flag. */ + pstate->growth = PHJ_GROWTH_OK; + } + /* Fall through. */ + + case PHJ_GROW_BUCKETS_ALLOCATING: + /* Wait for the above to complete. */ + BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING); + /* Fall through. */ + + case PHJ_GROW_BUCKETS_REINSERTING: + /* Reinsert all tuples into the hash table. */ + ExecParallelHashEnsureBatchAccessors(hashtable); + ExecParallelHashTableSetCurrentBatch(hashtable, 0); + while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) + { + size_t idx = 0; + + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + Assert(batchno == 0); + + /* add the tuple to the proper bucket */ + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); + + /* advance index past the tuple */ + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + BarrierArriveAndWait(&pstate->grow_buckets_barrier, + WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING); + } +} /* * ExecHashTableInsert @@ -869,8 +1628,8 @@ ExecHashTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = hashTuple; /* * Increase the (optimal) number of buckets if we just exceeded the @@ -911,6 +1670,94 @@ ExecHashTableInsert(HashJoinTable hashtable, } /* + * ExecHashTableParallelInsert + * insert a tuple into a shared hash table or shared batch tuplestore + */ +void +ExecParallelHashTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + dsa_pointer shared; + int bucketno; + int batchno; + +retry: + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + + if (batchno == 0) + { + HashJoinTuple hashTuple; + + /* Try to load it into memory. */ + Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == + PHJ_BUILD_HASHING_INNER); + hashTuple = ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + if (hashTuple == NULL) + goto retry; + + /* Store the hash value in the HashJoinTuple header. */ + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + + /* Push it onto the front of the bucket's list */ + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); + } + else + { + size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); + + Assert(batchno > 0); + + /* Try to preallocate space in the batch if necessary. */ + if (hashtable->batches[batchno].preallocated < tuple_size) + { + if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) + goto retry; + } + + Assert(hashtable->batches[batchno].preallocated >= tuple_size); + hashtable->batches[batchno].preallocated -= tuple_size; + sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, + tuple); + } + ++hashtable->batches[batchno].ntuples; +} + +/* + * Insert a tuple into the current hash table. Unlike + * ExecParallelHashTableInsert, this version is not prepared to send the tuple + * to other batches or to run out of memory, and should only be called with + * tuples that belong in the current batch once growth has been disabled. + */ +void +ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + HashJoinTuple hashTuple; + dsa_pointer shared; + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + Assert(batchno == hashtable->curbatch); + hashTuple = ExecParallelHashTupleAlloc(hashtable, + HJTUPLE_OVERHEAD + tuple->t_len, + &shared); + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], + hashTuple, shared); +} + +/* * ExecHashGetHashValue * Compute the hash value for a tuple * @@ -1076,11 +1923,11 @@ ExecScanHashBucket(HashJoinState *hjstate, * otherwise scan the standard hashtable bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; else - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; while (hashTuple != NULL) { @@ -1104,7 +1951,67 @@ ExecScanHashBucket(HashJoinState *hjstate, } } - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; + } + + /* + * no match + */ + return false; +} + +/* + * ExecParallelScanHashBucket + * scan a hash bucket for matches to the current outer tuple + * + * The current outer tuple must be stored in econtext->ecxt_outertuple. + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashBucket(HashJoinState *hjstate, + ExprContext *econtext) +{ + ExprState *hjclauses = hjstate->hashclauses; + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + uint32 hashvalue = hjstate->hj_CurHashValue; + + /* + * hj_CurTuple is the address of the tuple last returned from the current + * bucket, or NULL if it's time to start scanning a new bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo); + + while (hashTuple != NULL) + { + if (hashTuple->hashvalue == hashvalue) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot so ExecQual sees it */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* reset temp memory each time to avoid leaks from qual expr */ + ResetExprContext(econtext); + + if (ExecQual(hjclauses, econtext)) + { + hjstate->hj_CurTuple = hashTuple; + return true; + } + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); } /* @@ -1155,10 +2062,10 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) * bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) { - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; hjstate->hj_CurBucketNo++; } else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) @@ -1194,7 +2101,7 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return true; } - hashTuple = hashTuple->next; + hashTuple = hashTuple->next.unshared; } /* allow this loop to be cancellable */ @@ -1226,7 +2133,7 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) + hashtable->buckets.unshared = (HashJoinTuple *) palloc0(nbuckets * sizeof(HashJoinTuple)); hashtable->spaceUsed = 0; @@ -1250,7 +2157,8 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) /* Reset all flags in the main table ... */ for (i = 0; i < hashtable->nbuckets; i++) { - for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next) + for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; + tuple = tuple->next.unshared) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } @@ -1260,7 +2168,7 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) int j = hashtable->skewBucketNums[i]; HashSkewBucket *skewBucket = hashtable->skewBucket[j]; - for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next) + for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } } @@ -1505,8 +2413,9 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the skew bucket's list */ - hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; + hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; hashtable->skewBucket[bucketNumber]->tuples = hashTuple; + Assert(hashTuple != hashTuple->next.unshared); /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; @@ -1554,7 +2463,7 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) hashTuple = bucket->tuples; while (hashTuple != NULL) { - HashJoinTuple nextHashTuple = hashTuple->next; + HashJoinTuple nextHashTuple = hashTuple->next.unshared; MinimalTuple tuple; Size tupleSize; @@ -1580,8 +2489,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, tupleSize); pfree(hashTuple); - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = copyTuple; /* We have reduced skew space, but overall space doesn't change */ hashtable->spaceUsedSkew -= tupleSize; @@ -1760,11 +2669,11 @@ dense_alloc(HashJoinTable hashtable, Size size) if (hashtable->chunks != NULL) { newChunk->next = hashtable->chunks->next; - hashtable->chunks->next = newChunk; + hashtable->chunks->next.unshared = newChunk; } else { - newChunk->next = hashtable->chunks; + newChunk->next.unshared = hashtable->chunks; hashtable->chunks = newChunk; } @@ -1789,7 +2698,7 @@ dense_alloc(HashJoinTable hashtable, Size size) newChunk->used = size; newChunk->ntuples = 1; - newChunk->next = hashtable->chunks; + newChunk->next.unshared = hashtable->chunks; hashtable->chunks = newChunk; return newChunk->data; @@ -1803,3 +2712,601 @@ dense_alloc(HashJoinTable hashtable, Size size) /* return pointer to the start of the tuple memory */ return ptr; } + +/* + * Allocate space for a tuple in shared dense storage. This is equivalent to + * dense_alloc but for Parallel Hash using shared memory. + * + * While loading a tuple into shared memory, we might run out of memory and + * decide to repartition, or determine that the load factor is too high and + * decide to expand the bucket array, or discover that another participant has + * commanded us to help do that. Return NULL if number of buckets or batches + * has changed, indicating that the caller must retry (considering the + * possibility that the tuple no longer belongs in the same batch). + */ +static HashJoinTuple +ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, + dsa_pointer *shared) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + Size chunk_size; + HashJoinTuple result; + int curbatch = hashtable->curbatch; + + size = MAXALIGN(size); + + /* + * Fast path: if there is enough space in this backend's current chunk, + * then we can allocate without any locking. + */ + chunk = hashtable->current_chunk; + if (chunk != NULL && + size < HASH_CHUNK_THRESHOLD && + chunk->maxlen - chunk->used >= size) + { + + chunk_shared = hashtable->current_chunk_shared; + Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); + *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; + result = (HashJoinTuple) (chunk->data + chunk->used); + chunk->used += size; + + Assert(chunk->used <= chunk->maxlen); + Assert(result == dsa_get_address(hashtable->area, *shared)); + + return result; + } + + /* Slow path: try to allocate a new chunk. */ + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + + /* + * Check if we need to help increase the number of buckets or batches. + */ + if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || + pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + { + ParallelHashGrowth growth = pstate->growth; + + hashtable->current_chunk = NULL; + LWLockRelease(&pstate->lock); + + /* Another participant has commanded us to help grow. */ + if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) + ExecParallelHashIncreaseNumBatches(hashtable); + else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + ExecParallelHashIncreaseNumBuckets(hashtable); + + /* The caller must retry. */ + return NULL; + } + + /* Oversized tuples get their own chunk. */ + if (size > HASH_CHUNK_THRESHOLD) + chunk_size = size + HASH_CHUNK_HEADER_SIZE; + else + chunk_size = HASH_CHUNK_SIZE; + + /* Check if it's time to grow batches or buckets. */ + if (pstate->growth != PHJ_GROWTH_DISABLED) + { + Assert(curbatch == 0); + Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + + /* + * Check if our space limit would be exceeded. To avoid choking on + * very large tuples or very low work_mem setting, we'll always allow + * each backend to allocate at least one chunk. + */ + if (hashtable->batches[0].at_least_one_chunk && + hashtable->batches[0].shared->size + + chunk_size > pstate->space_allowed) + { + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + hashtable->batches[0].shared->space_exhausted = true; + LWLockRelease(&pstate->lock); + + return NULL; + } + + /* Check if our load factor limit would be exceeded. */ + if (hashtable->nbatch == 1) + { + hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; + hashtable->batches[0].ntuples = 0; + if (hashtable->batches[0].shared->ntuples + 1 > + hashtable->nbuckets * NTUP_PER_BUCKET && + hashtable->nbuckets < (INT_MAX / 2)) + { + pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; + LWLockRelease(&pstate->lock); + + return NULL; + } + } + } + + /* We are cleared to allocate a new chunk. */ + chunk_shared = dsa_allocate(hashtable->area, chunk_size); + hashtable->batches[curbatch].shared->size += chunk_size; + hashtable->batches[curbatch].at_least_one_chunk = true; + + /* Set up the chunk. */ + chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); + *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; + chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; + chunk->used = size; + + /* + * Push it onto the list of chunks, so that it can be found if we need to + * increase the number of buckets or batches (batch 0 only) and later for + * freeing the memory (all batches). + */ + chunk->next.shared = hashtable->batches[curbatch].shared->chunks; + hashtable->batches[curbatch].shared->chunks = chunk_shared; + + if (size <= HASH_CHUNK_THRESHOLD) + { + /* + * Make this the current chunk so that we can use the fast path to + * fill the rest of it up in future calls. + */ + hashtable->current_chunk = chunk; + hashtable->current_chunk_shared = chunk_shared; + } + LWLockRelease(&pstate->lock); + + Assert(chunk->data == dsa_get_address(hashtable->area, *shared)); + result = (HashJoinTuple) chunk->data; + + return result; +} + +/* + * One backend needs to set up the shared batch state including tuplestores. + * Other backends will ensure they have correctly configured accessors by + * called ExecParallelHashEnsureBatchAccessors(). + */ +static void +ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatch *batches; + MemoryContext oldcxt; + int i; + + Assert(hashtable->batches == NULL); + + /* Allocate space. */ + pstate->batches = + dsa_allocate0(hashtable->area, + EstimateParallelHashJoinBatch(hashtable) * nbatch); + pstate->nbatch = nbatch; + batches = dsa_get_address(hashtable->area, pstate->batches); + + /* Use hash join memory context. */ + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + + /* Allocate this backend's accessor array. */ + hashtable->nbatch = nbatch; + hashtable->batches = (ParallelHashJoinBatchAccessor *) + palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); + + /* Set up the shared state, tuplestores and backend-local accessors. */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; + ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + char name[MAXPGPATH]; + + /* + * All members of shared were zero-initialized. We just need to set + * up the Barrier. + */ + BarrierInit(&shared->batch_barrier, 0); + if (i == 0) + { + /* Batch 0 doesn't need to be loaded. */ + BarrierAttach(&shared->batch_barrier); + while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING) + BarrierArriveAndWait(&shared->batch_barrier, 0); + BarrierDetach(&shared->batch_barrier); + } + + /* Initialize accessor state. All members were zero-initialized. */ + accessor->shared = shared; + + /* Initialize the shared tuplestores. */ + snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch); + accessor->inner_tuples = + sts_initialize(ParallelHashJoinBatchInner(shared), + pstate->nparticipants, + ParallelWorkerNumber + 1, + sizeof(uint32), + SHARED_TUPLESTORE_SINGLE_PASS, + &pstate->fileset, + name); + snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch); + accessor->outer_tuples = + sts_initialize(ParallelHashJoinBatchOuter(shared, + pstate->nparticipants), + pstate->nparticipants, + ParallelWorkerNumber + 1, + sizeof(uint32), + SHARED_TUPLESTORE_SINGLE_PASS, + &pstate->fileset, + name); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Free the current set of ParallelHashJoinBatchAccessor objects. + */ +static void +ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) +{ + int i; + + for (i = 0; i < hashtable->nbatch; ++i) + { + /* Make sure no files are left open. */ + sts_end_write(hashtable->batches[i].inner_tuples); + sts_end_write(hashtable->batches[i].outer_tuples); + sts_end_parallel_scan(hashtable->batches[i].inner_tuples); + sts_end_parallel_scan(hashtable->batches[i].outer_tuples); + } + pfree(hashtable->batches); + hashtable->batches = NULL; +} + +/* + * Make sure this backend has up-to-date accessors for the current set of + * batches. + */ +static void +ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatch *batches; + MemoryContext oldcxt; + int i; + + if (hashtable->batches != NULL) + { + if (hashtable->nbatch == pstate->nbatch) + return; + ExecParallelHashCloseBatchAccessors(hashtable); + } + + /* + * It's possible for a backend to start up very late so that the whole + * join is finished and the shm state for tracking batches has already + * been freed by ExecHashTableDetach(). In that case we'll just leave + * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives + * up early. + */ + if (!DsaPointerIsValid(pstate->batches)) + return; + + /* Use hash join memory context. */ + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + + /* Allocate this backend's accessor array. */ + hashtable->nbatch = pstate->nbatch; + hashtable->batches = (ParallelHashJoinBatchAccessor *) + palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); + + /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ + batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->batches); + + /* Set up the accessor array and attach to the tuplestores. */ + for (i = 0; i < hashtable->nbatch; ++i) + { + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; + ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); + + accessor->shared = shared; + accessor->preallocated = 0; + accessor->done = false; + accessor->inner_tuples = + sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + accessor->outer_tuples = + sts_attach(ParallelHashJoinBatchOuter(shared, + pstate->nparticipants), + ParallelWorkerNumber + 1, + &pstate->fileset); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Allocate an empty shared memory hash table for a given batch. + */ +void +ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) +{ + ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; + dsa_pointer_atomic *buckets; + int nbuckets = hashtable->parallel_state->nbuckets; + int i; + + batch->buckets = + dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); + buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, batch->buckets); + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); +} + +/* + * If we are currently attached to a shared hash join batch, detach. If we + * are last to detach, clean up. + */ +void +ExecHashTableDetachBatch(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* Detach from the batch we were last working on. */ + if (BarrierArriveAndDetach(&batch->batch_barrier)) + { + /* + * Technically we shouldn't access the barrier because we're no + * longer attached, but since there is no way it's moving after + * this point it seems safe to make the following assertion. + */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); + + /* Free shared chunks and buckets. */ + while (DsaPointerIsValid(batch->chunks)) + { + HashMemoryChunk chunk = + dsa_get_address(hashtable->area, batch->chunks); + dsa_pointer next = chunk->next.shared; + + dsa_free(hashtable->area, batch->chunks); + batch->chunks = next; + } + if (DsaPointerIsValid(batch->buckets)) + { + dsa_free(hashtable->area, batch->buckets); + batch->buckets = InvalidDsaPointer; + } + } + ExecParallelHashUpdateSpacePeak(hashtable, curbatch); + /* Remember that we are not attached to a batch. */ + hashtable->curbatch = -1; + } +} + +/* + * Detach from all shared resources. If we are last to detach, clean up. + */ +void +ExecHashTableDetach(HashJoinTable hashtable) +{ + if (hashtable->parallel_state) + { + ParallelHashJoinState *pstate = hashtable->parallel_state; + int i; + + /* Make sure any temporary files are closed. */ + if (hashtable->batches) + { + for (i = 0; i < hashtable->nbatch; ++i) + { + sts_end_write(hashtable->batches[i].inner_tuples); + sts_end_write(hashtable->batches[i].outer_tuples); + sts_end_parallel_scan(hashtable->batches[i].inner_tuples); + sts_end_parallel_scan(hashtable->batches[i].outer_tuples); + } + } + + /* If we're last to detach, clean up shared memory. */ + if (BarrierDetach(&pstate->build_barrier)) + { + if (DsaPointerIsValid(pstate->batches)) + { + dsa_free(hashtable->area, pstate->batches); + pstate->batches = InvalidDsaPointer; + } + } + + hashtable->parallel_state = NULL; + } +} + +/* + * Get the first tuple in a given bucket identified by number. + */ +static inline HashJoinTuple +ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) +{ + HashJoinTuple tuple; + dsa_pointer p; + + Assert(hashtable->parallel_state); + p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); + tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); + + return tuple; +} + +/* + * Get the next tuple in the same bucket as 'tuple'. + */ +static inline HashJoinTuple +ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) +{ + HashJoinTuple next; + + Assert(hashtable->parallel_state); + next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); + + return next; +} + +/* + * Insert a tuple at the front of a chain of tuples in DSA memory atomically. + */ +static inline void +ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared) +{ + for (;;) + { + tuple->next.shared = dsa_pointer_atomic_read(head); + if (dsa_pointer_atomic_compare_exchange(head, + &tuple->next.shared, + tuple_shared)) + break; + } +} + +/* + * Prepare to work on a given batch. + */ +void +ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) +{ + Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); + + hashtable->curbatch = batchno; + hashtable->buckets.shared = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, + hashtable->batches[batchno].shared->buckets); + hashtable->nbuckets = hashtable->parallel_state->nbuckets; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + hashtable->current_chunk = NULL; + hashtable->current_chunk_shared = InvalidDsaPointer; + hashtable->batches[batchno].at_least_one_chunk = false; +} + +/* + * Take the next available chunk from the queue of chunks being worked on in + * parallel. Return NULL if there are none left. Otherwise return a pointer + * to the chunk, and set *shared to the DSA pointer to the chunk. + */ +static HashMemoryChunk +ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + HashMemoryChunk chunk; + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + if (DsaPointerIsValid(pstate->chunk_work_queue)) + { + *shared = pstate->chunk_work_queue; + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, *shared); + pstate->chunk_work_queue = chunk->next.shared; + } + else + chunk = NULL; + LWLockRelease(&pstate->lock); + + return chunk; +} + +/* + * Increase the space preallocated in this backend for a given inner batch by + * at least a given amount. This allows us to track whether a given batch + * would fit in memory when loaded back in. Also increase the number of + * batches or buckets if required. + * + * This maintains a running estimation of how much space will be taken when we + * load the batch back into memory by simulating the way chunks will be handed + * out to workers. It's not perfectly accurate because the tuples will be + * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but + * it should be pretty close. It tends to overestimate by a fraction of a + * chunk per worker since all workers gang up to preallocate during hashing, + * but workers tend to reload batches alone if there are enough to go around, + * leaving fewer partially filled chunks. This effect is bounded by + * nparticipants. + * + * Return false if the number of batches or buckets has changed, and the + * caller should reconsider which batch a given tuple now belongs in and call + * again. + */ +static bool +ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; + size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); + + Assert(batchno > 0); + Assert(batchno < hashtable->nbatch); + + LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + + /* Has another participant commanded us to help grow? */ + if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || + pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + { + ParallelHashGrowth growth = pstate->growth; + + LWLockRelease(&pstate->lock); + if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) + ExecParallelHashIncreaseNumBatches(hashtable); + else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) + ExecParallelHashIncreaseNumBuckets(hashtable); + + return false; + } + + if (pstate->growth != PHJ_GROWTH_DISABLED && + batch->at_least_one_chunk && + (batch->shared->estimated_size + size > pstate->space_allowed)) + { + /* + * We have determined that this batch would exceed the space budget if + * loaded into memory. Command all participants to help repartition. + */ + batch->shared->space_exhausted = true; + pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; + LWLockRelease(&pstate->lock); + + return false; + } + + batch->at_least_one_chunk = true; + batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; + batch->preallocated = want; + LWLockRelease(&pstate->lock); + + return true; +} + +/* + * Update this backend's copy of hashtable->spacePeak to account for a given + * batch. This is called at the end of hashing for batch 0, and then for each + * batch when it is done or discovered to be already done. The result is used + * for EXPLAIN output. + */ +void +ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno) +{ + size_t size; + + size = hashtable->batches[batchno].shared->size; + size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets; + hashtable->spacePeak = Max(hashtable->spacePeak, size); +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ab1632cc13d..5d1dc1f401e 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -10,18 +10,112 @@ * IDENTIFICATION * src/backend/executor/nodeHashjoin.c * + * PARALLELISM + * + * Hash joins can participate in parallel query execution in several ways. A + * parallel-oblivious hash join is one where the node is unaware that it is + * part of a parallel plan. In this case, a copy of the inner plan is used to + * build a copy of the hash table in every backend, and the outer plan could + * either be built from a partial or complete path, so that the results of the + * hash join are correspondingly either partial or complete. A parallel-aware + * hash join is one that behaves differently, coordinating work between + * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel + * Hash Join always appears with a Parallel Hash node. + * + * Parallel-aware hash joins use the same per-backend state machine to track + * progress through the hash join algorithm as parallel-oblivious hash joins. + * In a parallel-aware hash join, there is also a shared state machine that + * co-operating backends use to synchronize their local state machines and + * program counters. The shared state machine is managed with a Barrier IPC + * primitive. When all attached participants arrive at a barrier, the phase + * advances and all waiting participants are released. + * + * When a participant begins working on a parallel hash join, it must first + * figure out how much progress has already been made, because participants + * don't wait for each other to begin. For this reason there are switch + * statements at key points in the code where we have to synchronize our local + * state machine with the phase, and then jump to the correct part of the + * algorithm so that we can get started. + * + * One barrier called build_barrier is used to coordinate the hashing phases. + * The phase is represented by an integer which begins at zero and increments + * one by one, but in the code it is referred to by symbolic names as follows: + * + * PHJ_BUILD_ELECTING -- initial state + * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 + * PHJ_BUILD_HASHING_INNER -- all hash the inner rel + * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer + * PHJ_BUILD_DONE -- building done, probing can begin + * + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may + * be used repeatedly as required to coordinate expansions in the number of + * batches or buckets. Their phases are as follows: + * + * PHJ_GROW_BATCHES_ELECTING -- initial state + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches + * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition + * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew + * + * PHJ_GROW_BUCKETS_ELECTING -- initial state + * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets + * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples + * + * If the planner got the number of batches and buckets right, those won't be + * necessary, but on the other hand we might finish up needing to expand the + * buckets or batches multiple times while hashing the inner relation to stay + * within our memory budget and load factor target. For that reason it's a + * separate pair of barriers using circular phases. + * + * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins, + * because we need to divide the outer relation into batches up front in order + * to be able to process batches entirely independently. In contrast, the + * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' + * batches whenever it encounters them while scanning and probing, which it + * can do because it processes batches in serial order. + * + * Once PHJ_BUILD_DONE is reached, backends then split up and process + * different batches, or gang up and work together on probing batches if there + * aren't enough to go around. For each batch there is a separate barrier + * with the following phases: + * + * PHJ_BATCH_ELECTING -- initial state + * PHJ_BATCH_ALLOCATING -- one allocates buckets + * PHJ_BATCH_LOADING -- all load the hash table from disk + * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_DONE -- end + * + * Batch 0 is a special case, because it starts out in phase + * PHJ_BATCH_PROBING; populating batch 0's hash table is done during + * PHJ_BUILD_HASHING_INNER so we can skip loading. + * + * Initially we try to plan for a single-batch hash join using the combined + * work_mem of all participants to create a large shared hash table. If that + * turns out either at planning or execution time to be impossible then we + * fall back to regular work_mem sized hash tables. + * + * To avoid deadlocks, we never wait for any barrier unless it is known that + * all other backends attached to it are actively executing the node or have + * already arrived. Practically, that means that we never return a tuple + * while attached to a barrier, unless the barrier has reached its final + * state. In the slightly special case of the per-batch barrier, we return + * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/hashjoin.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" +#include "utils/sharedtuplestore.h" /* @@ -42,24 +136,34 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); +static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue); static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); +static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); /* ---------------------------------------------------------------- - * ExecHashJoin + * ExecHashJoinImpl * - * This function implements the Hybrid Hashjoin algorithm. + * This function implements the Hybrid Hashjoin algorithm. It is marked + * with an always-inline attribute so that ExecHashJoin() and + * ExecParallelHashJoin() can inline it. Compilers that respect the + * attribute should create versions specialized for parallel == true and + * parallel == false with unnecessary branches removed. * * Note: the relation we build hash table on is the "inner" * the other one is "outer". * ---------------------------------------------------------------- */ -static TupleTableSlot * /* return: a tuple or NULL */ -ExecHashJoin(PlanState *pstate) +pg_attribute_always_inline +static inline TupleTableSlot * +ExecHashJoinImpl(PlanState *pstate, bool parallel) { HashJoinState *node = castNode(HashJoinState, pstate); PlanState *outerNode; @@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate) TupleTableSlot *outerTupleSlot; uint32 hashvalue; int batchno; + ParallelHashJoinState *parallel_state; /* * get information from HashJoin node @@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate) outerNode = outerPlanState(node); hashtable = node->hj_HashTable; econtext = node->js.ps.ps_ExprContext; + parallel_state = hashNode->parallel_state; /* * Reset per-tuple memory context to free any expression evaluation @@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (parallel) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables, because no one participant can + * determine that there are no outer tuples, and it's not + * yet clear that it's worth the synchronization overhead + * of reaching consensus to figure that out. So we have + * to build the hash table. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate) node->hj_FirstOuterTupleSlot = NULL; /* - * create the hash table + * Create the hash table. If using Parallel Hash, then + * whoever gets here first will create the hash table and any + * later arrivals will merely attach to it. */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; /* - * execute the Hash node, to build the hash table + * Execute the Hash node, to build the hash table. If using + * Parallel Hash, then we'll try to help hashing unless we + * arrived too late. */ hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); @@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (parallel) + { + Barrier *build_barrier; + + build_barrier = ¶llel_state->build_barrier; + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) + { + /* + * If multi-batch, we need to hash the outer relation + * up front. + */ + if (hashtable->nbatch > 1) + ExecParallelHashJoinPartitionOuter(node); + BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER); + } + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + + /* Each backend should now select a batch to work on. */ + hashtable->curbatch = -1; + node->hj_JoinState = HJ_NEED_NEW_BATCH; + + continue; + } + else + node->hj_JoinState = HJ_NEED_NEW_OUTER; /* FALL THRU */ @@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate) /* * We don't have an outer tuple, try to get the next one */ - outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, - node, - &hashvalue); + if (parallel) + outerTupleSlot = + ExecParallelHashJoinOuterGetTuple(outerNode, node, + &hashvalue); + else + outerTupleSlot = + ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); + if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ @@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate) * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ + Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, &hashtable->outerBatchFile[batchno]); + /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate) /* * Scan the selected hash bucket for matches to current outer */ - if (!ExecScanHashBucket(node, econtext)) + if (parallel) { - /* out of matches; check for possible outer-join fill */ - node->hj_JoinState = HJ_FILL_OUTER_TUPLE; - continue; + if (!ExecParallelScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } + } + else + { + if (!ExecScanHashBucket(node, econtext)) + { + /* out of matches; check for possible outer-join fill */ + node->hj_JoinState = HJ_FILL_OUTER_TUPLE; + continue; + } } /* @@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate) /* * Try to advance to next batch. Done if there are no more. */ - if (!ExecHashJoinNewBatch(node)) - return NULL; /* end of join */ + if (parallel) + { + if (!ExecParallelHashJoinNewBatch(node)) + return NULL; /* end of parallel-aware join */ + } + else + { + if (!ExecHashJoinNewBatch(node)) + return NULL; /* end of parallel-oblivious join */ + } node->hj_JoinState = HJ_NEED_NEW_OUTER; break; @@ -375,6 +551,38 @@ ExecHashJoin(PlanState *pstate) } /* ---------------------------------------------------------------- + * ExecHashJoin + * + * Parallel-oblivious version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-aware branches removed. + */ + return ExecHashJoinImpl(pstate, false); +} + +/* ---------------------------------------------------------------- + * ExecParallelHashJoin + * + * Parallel-aware version. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * /* return: a tuple or NULL */ +ExecParallelHashJoin(PlanState *pstate) +{ + /* + * On sufficiently smart compilers this should be inlined with the + * parallel-oblivious branches removed. + */ + return ExecHashJoinImpl(pstate, true); +} + +/* ---------------------------------------------------------------- * ExecInitHashJoin * * Init routine for HashJoin node. @@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate = makeNode(HashJoinState); hjstate->js.ps.plan = (Plan *) node; hjstate->js.ps.state = estate; + + /* + * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() + * where this function may be replaced with a parallel version, if we + * managed to launch a parallel query. + */ hjstate->js.ps.ExecProcNode = ExecHashJoin; /* @@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node) /* * ExecHashJoinOuterGetTuple * - * get the next outer tuple for hashjoin: either by - * executing the outer plan node in the first pass, or from - * the temp files for the hashjoin batches. + * get the next outer tuple for a parallel oblivious hashjoin: either by + * executing the outer plan node in the first pass, or from the temp + * files for the hashjoin batches. * * Returns a null slot if no more outer tuples (within the current batch). * @@ -662,6 +876,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } /* + * ExecHashJoinOuterGetTuple variant for the parallel case. + */ +static TupleTableSlot * +ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, + HashJoinState *hjstate, + uint32 *hashvalue) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + + /* + * In the Parallel Hash case we only run the outer plan directly for + * single-batch hash joins. Otherwise we have to go to batch files, even + * for batch 0. + */ + if (curbatch == 0 && hashtable->nbatch == 1) + { + slot = ExecProcNode(outerNode); + + while (!TupIsNull(slot)) + { + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate), + hashvalue)) + return slot; + + /* + * That tuple couldn't match because of a NULL, so discard it and + * continue with the next one. + */ + slot = ExecProcNode(outerNode); + } + } + else if (curbatch < hashtable->nbatch) + { + MinimalTuple tuple; + + tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, + hashvalue); + if (tuple != NULL) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_OuterTupleSlot, + false); + return slot; + } + else + ExecClearTuple(hjstate->hj_OuterTupleSlot); + } + + /* End of this batch */ + return NULL; +} + +/* * ExecHashJoinNewBatch * switch to a new hashjoin batch * @@ -804,6 +1079,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) } /* + * Choose a batch to work on, and attach to it. Returns true if successful, + * false if there are no more batches. + */ +static bool +ExecParallelHashJoinNewBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int start_batchno; + int batchno; + + /* + * If we started up so late that the batch tracking array has been freed + * already by ExecHashTableDetach(), then we are finished. See also + * ExecParallelHashEnsureBatchAccessors(). + */ + if (hashtable->batches == NULL) + return false; + + /* + * If we were already attached to a batch, remember not to bother checking + * it again, and detach from it (possibly freeing the hash table if we are + * last to detach). + */ + if (hashtable->curbatch >= 0) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + /* + * Search for a batch that isn't done. We use an atomic counter to start + * our search at a different batch in every participant when there are + * more batches than participants. + */ + batchno = start_batchno = + pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % + hashtable->nbatch; + do + { + uint32 hashvalue; + MinimalTuple tuple; + TupleTableSlot *slot; + + if (!hashtable->batches[batchno].done) + { + SharedTuplestoreAccessor *inner_tuples; + Barrier *batch_barrier = + &hashtable->batches[batchno].shared->batch_barrier; + + switch (BarrierAttach(batch_barrier)) + { + case PHJ_BATCH_ELECTING: + + /* One backend allocates the hash table. */ + if (BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ELECTING)) + ExecParallelHashTableAlloc(hashtable, batchno); + /* Fall through. */ + + case PHJ_BATCH_ALLOCATING: + /* Wait for allocation to complete. */ + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_ALLOCATING); + /* Fall through. */ + + case PHJ_BATCH_LOADING: + /* Start (or join in) loading tuples. */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + inner_tuples = hashtable->batches[batchno].inner_tuples; + sts_begin_parallel_scan(inner_tuples); + while ((tuple = sts_parallel_scan_next(inner_tuples, + &hashvalue))) + { + slot = ExecStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + ExecParallelHashTableInsertCurrentBatch(hashtable, slot, + hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(batch_barrier, + WAIT_EVENT_HASH_BATCH_LOADING); + /* Fall through. */ + + case PHJ_BATCH_PROBING: + + /* + * This batch is ready to probe. Return control to + * caller. We stay attached to batch_barrier so that the + * hash table stays alive until everyone's finished + * probing it, but no participant is allowed to wait at + * this barrier again (or else a deadlock could occur). + * All attached participants must eventually call + * BarrierArriveAndDetach() so that the final phase + * PHJ_BATCH_DONE can be reached. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + + case PHJ_BATCH_DONE: + + /* + * Already done. Detach and go around again (if any + * remain). + */ + BarrierDetach(batch_barrier); + + /* + * We didn't work on this batch, but we need to observe + * its size for EXPLAIN. + */ + ExecParallelHashUpdateSpacePeak(hashtable, batchno); + hashtable->batches[batchno].done = true; + hashtable->curbatch = -1; + break; + + default: + elog(ERROR, "unexpected batch phase %d", + BarrierPhase(batch_barrier)); + } + } + batchno = (batchno + 1) % hashtable->nbatch; + } while (batchno != start_batchno); + + return false; +} + +/* * ExecHashJoinSaveTuple * save a tuple to a batch file. * @@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void +ExecShutdownHashJoin(HashJoinState *node) +{ + if (node->hj_HashTable) + { + /* + * Detach from shared state before DSM memory goes away. This makes + * sure that we don't have any pointers into DSM memory by the time + * ExecEndHashJoin runs. + */ + ExecHashTableDetachBatch(node->hj_HashTable); + ExecHashTableDetach(node->hj_HashTable); + } +} + +static void +ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) +{ + PlanState *outerState = outerPlanState(hjstate); + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + HashJoinTable hashtable = hjstate->hj_HashTable; + TupleTableSlot *slot; + uint32 hashvalue; + int i; + + Assert(hjstate->hj_FirstOuterTupleSlot == NULL); + + /* Execute outer plan, writing all tuples to shared tuplestores. */ + for (;;) + { + slot = ExecProcNode(outerState); + if (TupIsNull(slot)) + break; + econtext->ecxt_outertuple = slot; + if (ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + false, /* outer join, currently unsupported */ + &hashvalue)) + { + int batchno; + int bucketno; + + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, + &batchno); + sts_puttuple(hashtable->batches[batchno].outer_tuples, + &hashvalue, ExecFetchSlotMinimalTuple(slot)); + } + CHECK_FOR_INTERRUPTS(); + } + + /* Make sure all outer partitions are readable by any backend. */ + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].outer_tuples); +} + +void +ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + HashState *hashNode; + ParallelHashJoinState *pstate; + + /* + * Disable shared hash table mode if we failed to create a real DSM + * segment, because that means that we don't have a DSA area to work with. + */ + if (pcxt->seg == NULL) + return; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); + + /* + * Set up the state needed to coordinate access to the shared hash + * table(s), using the plan node ID as the toc key. + */ + pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); + shm_toc_insert(pcxt->toc, plan_node_id, pstate); + + /* + * Set up the shared hash join state with no batches initially. + * ExecHashTableCreate() will prepare at least one later and set nbatch + * and space_allowed. + */ + pstate->nbatch = 0; + pstate->space_allowed = 0; + pstate->batches = InvalidDsaPointer; + pstate->old_batches = InvalidDsaPointer; + pstate->nbuckets = 0; + pstate->growth = PHJ_GROWTH_OK; + pstate->chunk_work_queue = InvalidDsaPointer; + pg_atomic_init_u32(&pstate->distributor, 0); + pstate->nparticipants = pcxt->nworkers + 1; + pstate->total_tuples = 0; + LWLockInitialize(&pstate->lock, + LWTRANCHE_PARALLEL_HASH_JOIN); + BarrierInit(&pstate->build_barrier, 0); + BarrierInit(&pstate->grow_batches_barrier, 0); + BarrierInit(&pstate->grow_buckets_barrier, 0); + + /* Set up the space we'll use for shared temporary files. */ + SharedFileSetInit(&pstate->fileset, pcxt->seg); + + /* Initialize the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; +} + +/* ---------------------------------------------------------------- + * ExecHashJoinReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) +{ + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(cxt->toc, plan_node_id, false); + + /* + * It would be possible to reuse the shared hash table in single-batch + * cases by resetting and then fast-forwarding build_barrier to + * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but + * currently shared hash tables are already freed by now (by the last + * participant to detach from the batch). We could consider keeping it + * around for single-batch joins. We'd also need to adjust + * finalize_plan() so that it doesn't record a dummy dependency for + * Parallel Hash nodes, preventing the rescan optimization. For now we + * don't try. + */ + + /* Detach, freeing any remaining shared memory. */ + if (state->hj_HashTable != NULL) + { + ExecHashTableDetachBatch(state->hj_HashTable); + ExecHashTableDetach(state->hj_HashTable); + } + + /* Clear any shared batch files. */ + SharedFileSetDeleteAll(&pstate->fileset); + + /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */ + BarrierInit(&pstate->build_barrier, 0); +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt) +{ + HashState *hashNode; + int plan_node_id = state->js.ps.plan->plan_node_id; + ParallelHashJoinState *pstate = + shm_toc_lookup(pwcxt->toc, plan_node_id, false); + + /* Attach to the space for shared temporary files. */ + SharedFileSetAttach(&pstate->fileset, pwcxt->seg); + + /* Attach to the shared state in the hash node. */ + hashNode = (HashState *) innerPlanState(state); + hashNode->parallel_state = pstate; + + ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); +} |