diff options
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r-- | src/backend/executor/nodeHash.c | 445 |
1 files changed, 435 insertions, 10 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 83be33b999a..dd97ef45725 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.117 2009/01/01 17:23:41 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.118 2009/03/21 00:04:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,6 +24,7 @@ #include <math.h> #include <limits.h> +#include "catalog/pg_statistic.h" #include "commands/tablespace.h" #include "executor/execdebug.h" #include "executor/hashjoin.h" @@ -35,9 +36,17 @@ #include "utils/dynahash.h" #include "utils/memutils.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, + int mcvsToUse); +static void ExecHashSkewTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue, + int bucketNumber); +static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); /* ---------------------------------------------------------------- @@ -99,7 +108,20 @@ MultiExecHash(HashState *node) if (ExecHashGetHashValue(hashtable, econtext, hashkeys, false, false, &hashvalue)) { - ExecHashTableInsert(hashtable, slot, hashvalue); + int bucketNumber; + + bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); + if (bucketNumber != INVALID_SKEW_BUCKET_NO) + { + /* It's a skew tuple, so put it into that hash table */ + ExecHashSkewTableInsert(hashtable, slot, hashvalue, + bucketNumber); + } + else + { + /* Not subject to skew optimization, so insert normally */ + ExecHashTableInsert(hashtable, slot, hashvalue); + } hashtable->totalTuples += 1; } } @@ -225,6 +247,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators) Plan *outerNode; int nbuckets; int nbatch; + int num_skew_mcvs; int log2_nbuckets; int nkeys; int i; @@ -239,7 +262,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators) outerNode = outerPlan(node); ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, - &nbuckets, &nbatch); + OidIsValid(node->skewTable), + &nbuckets, &nbatch, &num_skew_mcvs); #ifdef HJDEBUG printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets); @@ -259,6 +283,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators) hashtable->nbuckets = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; hashtable->buckets = NULL; + hashtable->skewEnabled = false; + hashtable->skewBucket = NULL; + hashtable->skewBucketLen = 0; + hashtable->nSkewBuckets = 0; + hashtable->skewBucketNums = NULL; hashtable->nbatch = nbatch; hashtable->curbatch = 0; hashtable->nbatch_original = nbatch; @@ -269,6 +298,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators) hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; hashtable->spaceAllowed = work_mem * 1024L; + hashtable->spaceUsedSkew = 0; + hashtable->spaceAllowedSkew = + hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; /* * Get info about the hash functions to be used for each hash key. Also @@ -339,6 +371,13 @@ ExecHashTableCreate(Hash *node, List *hashOperators) hashtable->buckets = (HashJoinTuple *) palloc0(nbuckets * sizeof(HashJoinTuple)); + /* + * Set up for skew optimization, if possible and there's a need for more + * than one batch. (In a one-batch join, there's no point in it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + MemoryContextSwitchTo(oldcxt); return hashtable; @@ -356,13 +395,15 @@ ExecHashTableCreate(Hash *node, List *hashOperators) #define NTUP_PER_BUCKET 10 void -ExecChooseHashTableSize(double ntuples, int tupwidth, +ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, int *numbuckets, - int *numbatches) + int *numbatches, + int *num_skew_mcvs) { int tupsize; double inner_rel_bytes; long hash_table_bytes; + long skew_table_bytes; int nbatch; int nbuckets; int i; @@ -387,6 +428,41 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, hash_table_bytes = work_mem * 1024L; /* + * If skew optimization is possible, estimate the number of skew buckets + * that will fit in the memory allowed, and decrement the assumed space + * available for the main hash table accordingly. + * + * We make the optimistic assumption that each skew bucket will contain + * one inner-relation tuple. If that turns out to be low, we will recover + * at runtime by reducing the number of skew buckets. + * + * hashtable->skewBucket will have up to 8 times as many HashSkewBucket + * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash + * will round up to the next power of 2 and then multiply by 4 to reduce + * collisions. + */ + if (useskew) + { + skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100; + + *num_skew_mcvs = skew_table_bytes / ( + /* size of a hash tuple */ + tupsize + + /* worst-case size of skewBucket[] per MCV */ + (8 * sizeof(HashSkewBucket *)) + + /* size of skewBucketNums[] entry */ + sizeof(int) + + /* size of skew bucket struct itself */ + SKEW_BUCKET_OVERHEAD + ); + + if (*num_skew_mcvs > 0) + hash_table_bytes -= skew_table_bytes; + } + else + *num_skew_mcvs = 0; + + /* * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when * memory is filled. Set nbatch to the smallest power of 2 that appears * sufficient. @@ -813,13 +889,18 @@ ExecScanHashBucket(HashJoinState *hjstate, uint32 hashvalue = hjstate->hj_CurHashValue; /* - * hj_CurTuple is NULL to start scanning a new bucket, or the address of - * the last tuple returned from the current bucket. + * hj_CurTuple is the address of the tuple last returned from the current + * bucket, or NULL if it's time to start scanning a new bucket. + * + * If the tuple hashed to a skew bucket then scan the skew bucket + * otherwise scan the standard hashtable bucket. */ - if (hashTuple == NULL) - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; - else + if (hashTuple != NULL) hashTuple = hashTuple->next; + else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) + hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; + else + hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; while (hashTuple != NULL) { @@ -889,3 +970,347 @@ ExecReScanHash(HashState *node, ExprContext *exprCtxt) if (((PlanState *) node)->lefttree->chgParam == NULL) ExecReScan(((PlanState *) node)->lefttree, exprCtxt); } + + +/* + * ExecHashBuildSkewHash + * + * Set up for skew optimization if we can identify the most common values + * (MCVs) of the outer relation's join key. We make a skew hash bucket + * for the hash value of each MCV, up to the number of slots allowed + * based on available memory. + */ +static void +ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) +{ + HeapTupleData *statsTuple; + Datum *values; + int nvalues; + float4 *numbers; + int nnumbers; + + /* Do nothing if planner didn't identify the outer relation's join key */ + if (!OidIsValid(node->skewTable)) + return; + /* Also, do nothing if we don't have room for at least one skew bucket */ + if (mcvsToUse <= 0) + return; + + /* + * Try to find the MCV statistics for the outer relation's join key. + */ + statsTuple = SearchSysCache(STATRELATT, + ObjectIdGetDatum(node->skewTable), + Int16GetDatum(node->skewColumn), + 0, 0); + if (!HeapTupleIsValid(statsTuple)) + return; + + if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod, + STATISTIC_KIND_MCV, InvalidOid, + &values, &nvalues, + &numbers, &nnumbers)) + { + double frac; + int nbuckets; + FmgrInfo *hashfunctions; + int i; + + if (mcvsToUse > nvalues) + mcvsToUse = nvalues; + + /* + * Calculate the expected fraction of outer relation that will + * participate in the skew optimization. If this isn't at least + * SKEW_MIN_OUTER_FRACTION, don't use skew optimization. + */ + frac = 0; + for (i = 0; i < mcvsToUse; i++) + frac += numbers[i]; + if (frac < SKEW_MIN_OUTER_FRACTION) + { + free_attstatsslot(node->skewColType, + values, nvalues, numbers, nnumbers); + ReleaseSysCache(statsTuple); + return; + } + + /* + * Okay, set up the skew hashtable. + * + * skewBucket[] is an open addressing hashtable with a power of 2 size + * that is greater than the number of MCV values. (This ensures there + * will be at least one null entry, so searches will always terminate.) + * + * Note: this code could fail if mcvsToUse exceeds INT_MAX/8, but + * that is not currently possible since we limit pg_statistic entries + * to much less than that. + */ + nbuckets = 2; + while (nbuckets <= mcvsToUse) + nbuckets <<= 1; + /* use two more bits just to help avoid collisions */ + nbuckets <<= 2; + + hashtable->skewEnabled = true; + hashtable->skewBucketLen = nbuckets; + + /* + * We allocate the bucket memory in the hashtable's batch context. + * It is only needed during the first batch, and this ensures it + * will be automatically removed once the first batch is done. + */ + hashtable->skewBucket = (HashSkewBucket **) + MemoryContextAllocZero(hashtable->batchCxt, + nbuckets * sizeof(HashSkewBucket *)); + hashtable->skewBucketNums = (int *) + MemoryContextAllocZero(hashtable->batchCxt, + mcvsToUse * sizeof(int)); + + hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *) + + mcvsToUse * sizeof(int); + hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + + mcvsToUse * sizeof(int); + + /* + * Create a skew bucket for each MCV hash value. + * + * Note: it is very important that we create the buckets in order + * of decreasing MCV frequency. If we have to remove some buckets, + * they must be removed in reverse order of creation (see notes in + * ExecHashRemoveNextSkewBucket) and we want the least common MCVs + * to be removed first. + */ + hashfunctions = hashtable->outer_hashfunctions; + + for (i = 0; i < mcvsToUse; i++) + { + uint32 hashvalue; + int bucket; + + hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0], + values[i])); + + /* + * While we have not hit a hole in the hashtable and have not hit + * the desired bucket, we have collided with some previous hash + * value, so try the next bucket location. NB: this code must + * match ExecHashGetSkewBucket. + */ + bucket = hashvalue & (nbuckets - 1); + while (hashtable->skewBucket[bucket] != NULL && + hashtable->skewBucket[bucket]->hashvalue != hashvalue) + bucket = (bucket + 1) & (nbuckets - 1); + + /* + * If we found an existing bucket with the same hashvalue, + * leave it alone. It's okay for two MCVs to share a hashvalue. + */ + if (hashtable->skewBucket[bucket] != NULL) + continue; + + /* Okay, create a new skew bucket for this hashvalue. */ + hashtable->skewBucket[bucket] = (HashSkewBucket *) + MemoryContextAlloc(hashtable->batchCxt, + sizeof(HashSkewBucket)); + hashtable->skewBucket[bucket]->hashvalue = hashvalue; + hashtable->skewBucket[bucket]->tuples = NULL; + hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; + hashtable->nSkewBuckets++; + hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; + hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; + } + + free_attstatsslot(node->skewColType, + values, nvalues, numbers, nnumbers); + } + + ReleaseSysCache(statsTuple); +} + +/* + * ExecHashGetSkewBucket + * + * Returns the index of the skew bucket for this hashvalue, + * or INVALID_SKEW_BUCKET_NO if the hashvalue is not + * associated with any active skew bucket. + */ +int +ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) +{ + int bucket; + + /* + * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization + * (in particular, this happens after the initial batch is done). + */ + if (!hashtable->skewEnabled) + return INVALID_SKEW_BUCKET_NO; + + /* + * Since skewBucketLen is a power of 2, we can do a modulo by ANDing. + */ + bucket = hashvalue & (hashtable->skewBucketLen - 1); + + /* + * While we have not hit a hole in the hashtable and have not hit the + * desired bucket, we have collided with some other hash value, so try + * the next bucket location. + */ + while (hashtable->skewBucket[bucket] != NULL && + hashtable->skewBucket[bucket]->hashvalue != hashvalue) + bucket = (bucket + 1) & (hashtable->skewBucketLen - 1); + + /* + * Found the desired bucket? + */ + if (hashtable->skewBucket[bucket] != NULL) + return bucket; + + /* + * There must not be any hashtable entry for this hash value. + */ + return INVALID_SKEW_BUCKET_NO; +} + +/* + * ExecHashSkewTableInsert + * + * Insert a tuple into the skew hashtable. + * + * This should generally match up with the current-batch case in + * ExecHashTableInsert. + */ +static void +ExecHashSkewTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue, + int bucketNumber) +{ + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); + HashJoinTuple hashTuple; + int hashTupleSize; + + /* Create the HashJoinTuple */ + hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; + hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, + hashTupleSize); + hashTuple->hashvalue = hashvalue; + memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + + /* Push it onto the front of the skew bucket's list */ + hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; + hashtable->skewBucket[bucketNumber]->tuples = hashTuple; + + /* Account for space used, and back off if we've used too much */ + hashtable->spaceUsed += hashTupleSize; + hashtable->spaceUsedSkew += hashTupleSize; + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) + ExecHashRemoveNextSkewBucket(hashtable); + + /* Check we are not over the total spaceAllowed, either */ + if (hashtable->spaceUsed > hashtable->spaceAllowed) + ExecHashIncreaseNumBatches(hashtable); +} + +/* + * ExecHashRemoveNextSkewBucket + * + * Remove the least valuable skew bucket by pushing its tuples into + * the main hash table. + */ +static void +ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) +{ + int bucketToRemove; + HashSkewBucket *bucket; + uint32 hashvalue; + int bucketno; + int batchno; + HashJoinTuple hashTuple; + + /* Locate the bucket to remove */ + bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; + bucket = hashtable->skewBucket[bucketToRemove]; + + /* + * Calculate which bucket and batch the tuples belong to in the main + * hashtable. They all have the same hash value, so it's the same for all + * of them. Also note that it's not possible for nbatch to increase + * while we are processing the tuples. + */ + hashvalue = bucket->hashvalue; + ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); + + /* Process all tuples in the bucket */ + hashTuple = bucket->tuples; + while (hashTuple != NULL) + { + HashJoinTuple nextHashTuple = hashTuple->next; + MinimalTuple tuple; + Size tupleSize; + + /* + * This code must agree with ExecHashTableInsert. We do not use + * ExecHashTableInsert directly as ExecHashTableInsert expects a + * TupleTableSlot while we already have HashJoinTuples. + */ + tuple = HJTUPLE_MINTUPLE(hashTuple); + tupleSize = HJTUPLE_OVERHEAD + tuple->t_len; + + /* Decide whether to put the tuple in the hash table or a temp file */ + if (batchno == hashtable->curbatch) + { + /* Move the tuple to the main hash table */ + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; + /* We have reduced skew space, but overall space doesn't change */ + hashtable->spaceUsedSkew -= tupleSize; + } + else + { + /* Put the tuple into a temp file for later batches */ + Assert(batchno > hashtable->curbatch); + ExecHashJoinSaveTuple(tuple, hashvalue, + &hashtable->innerBatchFile[batchno]); + pfree(hashTuple); + hashtable->spaceUsed -= tupleSize; + hashtable->spaceUsedSkew -= tupleSize; + } + + hashTuple = nextHashTuple; + } + + /* + * Free the bucket struct itself and reset the hashtable entry to NULL. + * + * NOTE: this is not nearly as simple as it looks on the surface, because + * of the possibility of collisions in the hashtable. Suppose that hash + * values A and B collide at a particular hashtable entry, and that A + * was entered first so B gets shifted to a different table entry. If + * we were to remove A first then ExecHashGetSkewBucket would mistakenly + * start reporting that B is not in the hashtable, because it would hit + * the NULL before finding B. However, we always remove entries in the + * reverse order of creation, so this failure cannot happen. + */ + hashtable->skewBucket[bucketToRemove] = NULL; + hashtable->nSkewBuckets--; + pfree(bucket); + hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD; + hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD; + + /* + * If we have removed all skew buckets then give up on skew optimization. + * Release the arrays since they aren't useful any more. + */ + if (hashtable->nSkewBuckets == 0) + { + hashtable->skewEnabled = false; + pfree(hashtable->skewBucket); + pfree(hashtable->skewBucketNums); + hashtable->skewBucket = NULL; + hashtable->skewBucketNums = NULL; + hashtable->spaceUsed -= hashtable->spaceUsedSkew; + hashtable->spaceUsedSkew = 0; + } +} |