aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeHash.c
diff options
context:
space:
mode:
authorMarc G. Fournier <scrappy@hub.org>1996-07-09 06:22:35 +0000
committerMarc G. Fournier <scrappy@hub.org>1996-07-09 06:22:35 +0000
commitd31084e9d1118b25fd16580d9d8c2924b5740dff (patch)
tree3179e66307d54df9c7b966543550e601eb55e668 /src/backend/executor/nodeHash.c
downloadpostgresql-PG95-1_01.tar.gz
postgresql-PG95-1_01.zip
Postgres95 1.01 Distribution - Virgin SourcesPG95-1_01
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r--src/backend/executor/nodeHash.c828
1 files changed, 828 insertions, 0 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
new file mode 100644
index 00000000000..55a5e1f0276
--- /dev/null
+++ b/src/backend/executor/nodeHash.c
@@ -0,0 +1,828 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeHash.c--
+ * Routines to hash relations for hashjoin
+ *
+ * Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/executor/nodeHash.c,v 1.1.1.1 1996/07/09 06:21:26 scrappy Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ * INTERFACE ROUTINES
+ * ExecHash - generate an in-memory hash table of the relation
+ * ExecInitHash - initialize node and subnodes..
+ * ExecEndHash - shutdown node and subnodes
+ *
+ */
+
+#include <stdio.h> /* for sprintf() */
+#include <math.h>
+#include <sys/file.h>
+#include "storage/fd.h" /* for SEEK_ */
+#include "storage/ipc.h"
+#include "storage/bufmgr.h" /* for BLCKSZ */
+#include "executor/executor.h"
+#include "executor/nodeHash.h"
+#include "executor/nodeHashjoin.h"
+#include "utils/palloc.h"
+
+extern int NBuffers;
+static int HashTBSize;
+
+static void mk_hj_temp(char *tempname);
+static int hashFunc(char *key, int len);
+
+/* ----------------------------------------------------------------
+ * ExecHash
+ *
+ * build hash table for hashjoin, all do partitioning if more
+ * than one batches are required.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecHash(Hash *node)
+{
+ EState *estate;
+ HashState *hashstate;
+ Plan *outerNode;
+ Var *hashkey;
+ HashJoinTable hashtable;
+ TupleTableSlot *slot;
+ ExprContext *econtext;
+
+ int nbatch;
+ File *batches;
+ RelativeAddr *batchPos;
+ int *batchSizes;
+ int i;
+ RelativeAddr *innerbatchNames;
+
+ /* ----------------
+ * get state info from node
+ * ----------------
+ */
+
+ hashstate = node->hashstate;
+ estate = node->plan.state;
+ outerNode = outerPlan(node);
+
+ hashtable = node->hashtable;
+ if (hashtable == NULL)
+ elog(WARN, "ExecHash: hash table is NULL.");
+
+ nbatch = hashtable->nbatch;
+
+ if (nbatch > 0) { /* if needs hash partition */
+ innerbatchNames = (RelativeAddr *) ABSADDR(hashtable->innerbatchNames);
+
+ /* --------------
+ * allocate space for the file descriptors of batch files
+ * then open the batch files in the current processes.
+ * --------------
+ */
+ batches = (File*)palloc(nbatch * sizeof(File));
+ for (i=0; i<nbatch; i++) {
+ batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
+ O_CREAT | O_RDWR, 0600);
+ }
+ hashstate->hashBatches = batches;
+ batchPos = (RelativeAddr*) ABSADDR(hashtable->innerbatchPos);
+ batchSizes = (int*) ABSADDR(hashtable->innerbatchSizes);
+ }
+
+ /* ----------------
+ * set expression context
+ * ----------------
+ */
+ hashkey = node->hashkey;
+ econtext = hashstate->cstate.cs_ExprContext;
+
+ /* ----------------
+ * get tuple and insert into the hash table
+ * ----------------
+ */
+ for (;;) {
+ slot = ExecProcNode(outerNode, (Plan*)node);
+ if (TupIsNull(slot))
+ break;
+
+ econtext->ecxt_innertuple = slot;
+ ExecHashTableInsert(hashtable, econtext, hashkey,
+ hashstate->hashBatches);
+
+ ExecClearTuple(slot);
+ }
+
+ /*
+ * end of build phase, flush all the last pages of the batches.
+ */
+ for (i=0; i<nbatch; i++) {
+ if (FileSeek(batches[i], 0L, SEEK_END) < 0)
+ perror("FileSeek");
+ if (FileWrite(batches[i],ABSADDR(hashtable->batch)+i*BLCKSZ,BLCKSZ) < 0)
+ perror("FileWrite");
+ NDirectFileWrite++;
+ }
+
+ /* ---------------------
+ * Return the slot so that we have the tuple descriptor
+ * when we need to save/restore them. -Jeff 11 July 1991
+ * ---------------------
+ */
+ return slot;
+}
+
+/* ----------------------------------------------------------------
+ * ExecInitHash
+ *
+ * Init routine for Hash node
+ * ----------------------------------------------------------------
+ */
+bool
+ExecInitHash(Hash *node, EState *estate, Plan *parent)
+{
+ HashState *hashstate;
+ Plan *outerPlan;
+
+ SO1_printf("ExecInitHash: %s\n",
+ "initializing hash node");
+
+ /* ----------------
+ * assign the node's execution state
+ * ----------------
+ */
+ node->plan.state = estate;
+
+ /* ----------------
+ * create state structure
+ * ----------------
+ */
+ hashstate = makeNode(HashState);
+ node->hashstate = hashstate;
+ hashstate->hashBatches = NULL;
+
+ /* ----------------
+ * Miscellanious initialization
+ *
+ * + assign node's base_id
+ * + assign debugging hooks and
+ * + create expression context for node
+ * ----------------
+ */
+ ExecAssignNodeBaseInfo(estate, &hashstate->cstate, parent);
+ ExecAssignExprContext(estate, &hashstate->cstate);
+
+#define HASH_NSLOTS 1
+ /* ----------------
+ * initialize our result slot
+ * ----------------
+ */
+ ExecInitResultTupleSlot(estate, &hashstate->cstate);
+
+ /* ----------------
+ * initializes child nodes
+ * ----------------
+ */
+ outerPlan = outerPlan(node);
+ ExecInitNode(outerPlan, estate, (Plan *)node);
+
+ /* ----------------
+ * initialize tuple type. no need to initialize projection
+ * info because this node doesn't do projections
+ * ----------------
+ */
+ ExecAssignResultTypeFromOuterPlan((Plan *) node, &hashstate->cstate);
+ hashstate->cstate.cs_ProjInfo = NULL;
+
+ return TRUE;
+}
+
+int
+ExecCountSlotsHash(Hash *node)
+{
+ return ExecCountSlotsNode(outerPlan(node)) +
+ ExecCountSlotsNode(innerPlan(node)) +
+ HASH_NSLOTS;
+}
+
+/* ---------------------------------------------------------------
+ * ExecEndHash
+ *
+ * clean up routine for Hash node
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndHash(Hash *node)
+{
+ HashState *hashstate;
+ Plan *outerPlan;
+ File *batches;
+
+ /* ----------------
+ * get info from the hash state
+ * ----------------
+ */
+ hashstate = node->hashstate;
+ batches = hashstate->hashBatches;
+ if (batches != NULL)
+ pfree(batches);
+
+ /* ----------------
+ * free projection info. no need to free result type info
+ * because that came from the outer plan...
+ * ----------------
+ */
+ ExecFreeProjectionInfo(&hashstate->cstate);
+
+ /* ----------------
+ * shut down the subplan
+ * ----------------
+ */
+ outerPlan = outerPlan(node);
+ ExecEndNode(outerPlan, (Plan*)node);
+}
+
+RelativeAddr
+hashTableAlloc(int size, HashJoinTable hashtable)
+{
+ RelativeAddr p;
+ p = hashtable->top;
+ hashtable->top += size;
+ return p;
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashTableCreate
+ *
+ * create a hashtable in shared memory for hashjoin.
+ * ----------------------------------------------------------------
+ */
+#define NTUP_PER_BUCKET 10
+#define FUDGE_FAC 1.5
+
+HashJoinTable
+ExecHashTableCreate(Hash *node)
+{
+ Plan *outerNode;
+ int nbatch;
+ int ntuples;
+ int tupsize;
+ IpcMemoryId shmid;
+ HashJoinTable hashtable;
+ HashBucket bucket;
+ int nbuckets;
+ int totalbuckets;
+ int bucketsize;
+ int i;
+ RelativeAddr *outerbatchNames;
+ RelativeAddr *outerbatchPos;
+ RelativeAddr *innerbatchNames;
+ RelativeAddr *innerbatchPos;
+ int *innerbatchSizes;
+ RelativeAddr tempname;
+
+ nbatch = -1;
+ HashTBSize = NBuffers/2;
+ while (nbatch < 0) {
+ /*
+ * determine number of batches for the hashjoin
+ */
+ HashTBSize *= 2;
+ nbatch = ExecHashPartition(node);
+ }
+ /* ----------------
+ * get information about the size of the relation
+ * ----------------
+ */
+ outerNode = outerPlan(node);
+ ntuples = outerNode->plan_size;
+ if (ntuples <= 0)
+ ntuples = 1000; /* XXX just a hack */
+ tupsize = outerNode->plan_width + sizeof(HeapTupleData);
+
+ /*
+ * totalbuckets is the total number of hash buckets needed for
+ * the entire relation
+ */
+ totalbuckets = ceil((double)ntuples/NTUP_PER_BUCKET);
+ bucketsize = LONGALIGN (NTUP_PER_BUCKET * tupsize + sizeof(*bucket));
+
+ /*
+ * nbuckets is the number of hash buckets for the first pass
+ * of hybrid hashjoin
+ */
+ nbuckets = (HashTBSize - nbatch) * BLCKSZ / (bucketsize * FUDGE_FAC);
+ if (totalbuckets < nbuckets)
+ totalbuckets = nbuckets;
+ if (nbatch == 0)
+ nbuckets = totalbuckets;
+#ifdef HJDEBUG
+ printf("nbatch = %d, totalbuckets = %d, nbuckets = %d\n", nbatch, totalbuckets, nbuckets);
+#endif
+
+ /* ----------------
+ * in non-parallel machines, we don't need to put the hash table
+ * in the shared memory. We just palloc it.
+ * ----------------
+ */
+ hashtable = (HashJoinTable)palloc((HashTBSize+1)*BLCKSZ);
+ shmid = 0;
+
+ if (hashtable == NULL) {
+ elog(WARN, "not enough memory for hashjoin.");
+ }
+ /* ----------------
+ * initialize the hash table header
+ * ----------------
+ */
+ hashtable->nbuckets = nbuckets;
+ hashtable->totalbuckets = totalbuckets;
+ hashtable->bucketsize = bucketsize;
+ hashtable->shmid = shmid;
+ hashtable->top = sizeof(HashTableData);
+ hashtable->bottom = HashTBSize * BLCKSZ;
+ /*
+ * hashtable->readbuf has to be long aligned!!!
+ */
+ hashtable->readbuf = hashtable->bottom;
+ hashtable->nbatch = nbatch;
+ hashtable->curbatch = 0;
+ hashtable->pcount = hashtable->nprocess = 0;
+ if (nbatch > 0) {
+ /* ---------------
+ * allocate and initialize the outer batches
+ * ---------------
+ */
+ outerbatchNames = (RelativeAddr*)ABSADDR(
+ hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
+ outerbatchPos = (RelativeAddr*)ABSADDR(
+ hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
+ for (i=0; i<nbatch; i++) {
+ tempname = hashTableAlloc(12, hashtable);
+ mk_hj_temp(ABSADDR(tempname));
+ outerbatchNames[i] = tempname;
+ outerbatchPos[i] = -1;
+ }
+ hashtable->outerbatchNames = RELADDR(outerbatchNames);
+ hashtable->outerbatchPos = RELADDR(outerbatchPos);
+ /* ---------------
+ * allocate and initialize the inner batches
+ * ---------------
+ */
+ innerbatchNames = (RelativeAddr*)ABSADDR(
+ hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
+ innerbatchPos = (RelativeAddr*)ABSADDR(
+ hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
+ innerbatchSizes = (int*)ABSADDR(
+ hashTableAlloc(nbatch * sizeof(int), hashtable));
+ for (i=0; i<nbatch; i++) {
+ tempname = hashTableAlloc(12, hashtable);
+ mk_hj_temp(ABSADDR(tempname));
+ innerbatchNames[i] = tempname;
+ innerbatchPos[i] = -1;
+ innerbatchSizes[i] = 0;
+ }
+ hashtable->innerbatchNames = RELADDR(innerbatchNames);
+ hashtable->innerbatchPos = RELADDR(innerbatchPos);
+ hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
+ }
+ else {
+ hashtable->outerbatchNames = (RelativeAddr)NULL;
+ hashtable->outerbatchPos = (RelativeAddr)NULL;
+ hashtable->innerbatchNames = (RelativeAddr)NULL;
+ hashtable->innerbatchPos = (RelativeAddr)NULL;
+ hashtable->innerbatchSizes = (RelativeAddr)NULL;
+ }
+
+ hashtable->batch = (RelativeAddr)LONGALIGN(hashtable->top +
+ bucketsize * nbuckets);
+ hashtable->overflownext=hashtable->batch + nbatch * BLCKSZ;
+ /* ----------------
+ * initialize each hash bucket
+ * ----------------
+ */
+ bucket = (HashBucket)ABSADDR(hashtable->top);
+ for (i=0; i<nbuckets; i++) {
+ bucket->top = RELADDR((char*)bucket + sizeof(*bucket));
+ bucket->bottom = bucket->top;
+ bucket->firstotuple = bucket->lastotuple = -1;
+ bucket = (HashBucket)LONGALIGN(((char*)bucket + bucketsize));
+ }
+ return(hashtable);
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashTableInsert
+ *
+ * insert a tuple into the hash table depending on the hash value
+ * it may just go to a tmp file for other batches
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashTableInsert(HashJoinTable hashtable,
+ ExprContext *econtext,
+ Var *hashkey,
+ File *batches)
+{
+ TupleTableSlot *slot;
+ HeapTuple heapTuple;
+ HashBucket bucket;
+ int bucketno;
+ int nbatch;
+ int batchno;
+ char *buffer;
+ RelativeAddr *batchPos;
+ int *batchSizes;
+ char *pos;
+
+ nbatch = hashtable->nbatch;
+ batchPos = (RelativeAddr*)ABSADDR(hashtable->innerbatchPos);
+ batchSizes = (int*)ABSADDR(hashtable->innerbatchSizes);
+
+ slot = econtext->ecxt_innertuple;
+ heapTuple = slot->val;
+
+#ifdef HJDEBUG
+ printf("Inserting ");
+#endif
+
+ bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
+
+ /* ----------------
+ * decide whether to put the tuple in the hash table or a tmp file
+ * ----------------
+ */
+ if (bucketno < hashtable->nbuckets) {
+ /* ---------------
+ * put the tuple in hash table
+ * ---------------
+ */
+ bucket = (HashBucket)
+ (ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize);
+ if ((char*)LONGALIGN(ABSADDR(bucket->bottom))
+ -(char*)bucket+heapTuple->t_len > hashtable->bucketsize)
+ ExecHashOverflowInsert(hashtable, bucket, heapTuple);
+ else {
+ memmove((char*)LONGALIGN(ABSADDR(bucket->bottom)),
+ heapTuple,
+ heapTuple->t_len);
+ bucket->bottom =
+ ((RelativeAddr)LONGALIGN(bucket->bottom) + heapTuple->t_len);
+ }
+ }
+ else {
+ /* -----------------
+ * put the tuple into a tmp file for other batches
+ * -----------------
+ */
+ batchno = (float)(bucketno - hashtable->nbuckets)/
+ (float)(hashtable->totalbuckets - hashtable->nbuckets)
+ * nbatch;
+ buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ;
+ batchSizes[batchno]++;
+ pos= (char *)
+ ExecHashJoinSaveTuple(heapTuple,
+ buffer,
+ batches[batchno],
+ (char*)ABSADDR(batchPos[batchno]));
+ batchPos[batchno] = RELADDR(pos);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashTableDestroy
+ *
+ * destroy a hash table
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashTableDestroy(HashJoinTable hashtable)
+{
+ pfree(hashtable);
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashGetBucket
+ *
+ * Get the hash value for a tuple
+ * ----------------------------------------------------------------
+ */
+int
+ExecHashGetBucket(HashJoinTable hashtable,
+ ExprContext *econtext,
+ Var *hashkey)
+{
+ int bucketno;
+ Datum keyval;
+ bool isNull;
+
+
+ /* ----------------
+ * Get the join attribute value of the tuple
+ * ----------------
+ */
+ keyval = ExecEvalVar(hashkey, econtext, &isNull);
+
+ /* ------------------
+ * compute the hash function
+ * ------------------
+ */
+ if (execConstByVal)
+ bucketno =
+ hashFunc((char *) &keyval, execConstLen) % hashtable->totalbuckets;
+ else
+ bucketno =
+ hashFunc((char *) keyval, execConstLen) % hashtable->totalbuckets;
+#ifdef HJDEBUG
+ if (bucketno >= hashtable->nbuckets)
+ printf("hash(%d) = %d SAVED\n", keyval, bucketno);
+ else
+ printf("hash(%d) = %d\n", keyval, bucketno);
+#endif
+
+ return(bucketno);
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashOverflowInsert
+ *
+ * insert into the overflow area of a hash bucket
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashOverflowInsert(HashJoinTable hashtable,
+ HashBucket bucket,
+ HeapTuple heapTuple)
+{
+ OverflowTuple otuple;
+ RelativeAddr newend;
+ OverflowTuple firstotuple;
+ OverflowTuple lastotuple;
+
+ firstotuple = (OverflowTuple)ABSADDR(bucket->firstotuple);
+ lastotuple = (OverflowTuple)ABSADDR(bucket->lastotuple);
+ /* ----------------
+ * see if we run out of overflow space
+ * ----------------
+ */
+ newend = (RelativeAddr)LONGALIGN(hashtable->overflownext + sizeof(*otuple)
+ + heapTuple->t_len);
+ if (newend > hashtable->bottom) {
+ elog(DEBUG, "hash table out of memory. expanding.");
+ /* ------------------
+ * XXX this is a temporary hack
+ * eventually, recursive hash partitioning will be
+ * implemented
+ * ------------------
+ */
+ hashtable->readbuf = hashtable->bottom = 2 * hashtable->bottom;
+ hashtable =
+ (HashJoinTable)repalloc(hashtable, hashtable->bottom+BLCKSZ);
+ if (hashtable == NULL) {
+ perror("repalloc");
+ elog(WARN, "can't expand hashtable.");
+ }
+ }
+
+ /* ----------------
+ * establish the overflow chain
+ * ----------------
+ */
+ otuple = (OverflowTuple)ABSADDR(hashtable->overflownext);
+ hashtable->overflownext = newend;
+ if (firstotuple == NULL)
+ bucket->firstotuple = bucket->lastotuple = RELADDR(otuple);
+ else {
+ lastotuple->next = RELADDR(otuple);
+ bucket->lastotuple = RELADDR(otuple);
+ }
+
+ /* ----------------
+ * copy the tuple into the overflow area
+ * ----------------
+ */
+ otuple->next = -1;
+ otuple->tuple = RELADDR(LONGALIGN(((char*)otuple + sizeof(*otuple))));
+ memmove(ABSADDR(otuple->tuple),
+ heapTuple,
+ heapTuple->t_len);
+}
+
+/* ----------------------------------------------------------------
+ * ExecScanHashBucket
+ *
+ * scan a hash bucket of matches
+ * ----------------------------------------------------------------
+ */
+HeapTuple
+ExecScanHashBucket(HashJoinState *hjstate,
+ HashBucket bucket,
+ HeapTuple curtuple,
+ List *hjclauses,
+ ExprContext *econtext)
+{
+ HeapTuple heapTuple;
+ bool qualResult;
+ OverflowTuple otuple = NULL;
+ OverflowTuple curotuple;
+ TupleTableSlot *inntuple;
+ OverflowTuple firstotuple;
+ OverflowTuple lastotuple;
+ HashJoinTable hashtable;
+
+ hashtable = hjstate->hj_HashTable;
+ firstotuple = (OverflowTuple)ABSADDR(bucket->firstotuple);
+ lastotuple = (OverflowTuple)ABSADDR(bucket->lastotuple);
+
+ /* ----------------
+ * search the hash bucket
+ * ----------------
+ */
+ if (curtuple == NULL || curtuple < (HeapTuple)ABSADDR(bucket->bottom)) {
+ if (curtuple == NULL)
+ heapTuple = (HeapTuple)
+ LONGALIGN(ABSADDR(bucket->top));
+ else
+ heapTuple = (HeapTuple)
+ LONGALIGN(((char*)curtuple+curtuple->t_len));
+
+ while (heapTuple < (HeapTuple)ABSADDR(bucket->bottom)) {
+
+ inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
+ hjstate->hj_HashTupleSlot, /* slot */
+ InvalidBuffer,/* tuple has no buffer */
+ false); /* do not pfree this tuple */
+
+ econtext->ecxt_innertuple = inntuple;
+ qualResult = ExecQual((List*)hjclauses, econtext);
+
+ if (qualResult)
+ return heapTuple;
+
+ heapTuple = (HeapTuple)
+ LONGALIGN(((char*)heapTuple+heapTuple->t_len));
+ }
+
+ if (firstotuple == NULL)
+ return NULL;
+ otuple = firstotuple;
+ }
+
+ /* ----------------
+ * search the overflow area of the hash bucket
+ * ----------------
+ */
+ if (otuple == NULL) {
+ curotuple = hjstate->hj_CurOTuple;
+ otuple = (OverflowTuple)ABSADDR(curotuple->next);
+ }
+
+ while (otuple != NULL) {
+ heapTuple = (HeapTuple)ABSADDR(otuple->tuple);
+
+ inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
+ hjstate->hj_HashTupleSlot, /* slot */
+ InvalidBuffer, /* SP?? this tuple has no buffer */
+ false); /* do not pfree this tuple */
+
+ econtext->ecxt_innertuple = inntuple;
+ qualResult = ExecQual((List*)hjclauses, econtext);
+
+ if (qualResult) {
+ hjstate->hj_CurOTuple = otuple;
+ return heapTuple;
+ }
+
+ otuple = (OverflowTuple)ABSADDR(otuple->next);
+ }
+
+ /* ----------------
+ * no match
+ * ----------------
+ */
+ return NULL;
+}
+
+/* ----------------------------------------------------------------
+ * hashFunc
+ *
+ * the hash function, copied from Margo
+ * ----------------------------------------------------------------
+ */
+static int
+hashFunc(char *key, int len)
+{
+ register unsigned int h;
+ register int l;
+ register unsigned char *k;
+
+ /*
+ * If this is a variable length type, then 'k' points
+ * to a "struct varlena" and len == -1.
+ * NOTE:
+ * VARSIZE returns the "real" data length plus the sizeof the
+ * "vl_len" attribute of varlena (the length information).
+ * 'k' points to the beginning of the varlena struct, so
+ * we have to use "VARDATA" to find the beginning of the "real"
+ * data.
+ */
+ if (len == -1) {
+ l = VARSIZE(key) - VARHDRSZ;
+ k = (unsigned char*) VARDATA(key);
+ } else {
+ l = len;
+ k = (unsigned char *) key;
+ }
+
+ h = 0;
+
+ /*
+ * Convert string to integer
+ */
+ while (l--) h = h * PRIME1 ^ (*k++);
+ h %= PRIME2;
+
+ return (h);
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashPartition
+ *
+ * determine the number of batches needed for a hashjoin
+ * ----------------------------------------------------------------
+ */
+int
+ExecHashPartition(Hash *node)
+{
+ Plan *outerNode;
+ int b;
+ int pages;
+ int ntuples;
+ int tupsize;
+
+ /*
+ * get size information for plan node
+ */
+ outerNode = outerPlan(node);
+ ntuples = outerNode->plan_size;
+ if (ntuples == 0) ntuples = 1000;
+ tupsize = outerNode->plan_width + sizeof(HeapTupleData);
+ pages = ceil((double)ntuples * tupsize * FUDGE_FAC / BLCKSZ);
+
+ /*
+ * if amount of buffer space below hashjoin threshold,
+ * return negative
+ */
+ if (ceil(sqrt((double)pages)) > HashTBSize)
+ return -1;
+ if (pages <= HashTBSize)
+ b = 0; /* fit in memory, no partitioning */
+ else
+ b = ceil((double)(pages - HashTBSize)/(double)(HashTBSize - 1));
+
+ return b;
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashTableReset
+ *
+ * reset hash table header for new batch
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashTableReset(HashJoinTable hashtable, int ntuples)
+{
+ int i;
+ HashBucket bucket;
+
+ hashtable->nbuckets = hashtable->totalbuckets
+ = ceil((double)ntuples/NTUP_PER_BUCKET);
+
+ hashtable->overflownext = hashtable->top + hashtable->bucketsize *
+ hashtable->nbuckets;
+
+ bucket = (HashBucket)ABSADDR(hashtable->top);
+ for (i=0; i<hashtable->nbuckets; i++) {
+ bucket->top = RELADDR((char*)bucket + sizeof(*bucket));
+ bucket->bottom = bucket->top;
+ bucket->firstotuple = bucket->lastotuple = -1;
+ bucket = (HashBucket)((char*)bucket + hashtable->bucketsize);
+ }
+ hashtable->pcount = hashtable->nprocess;
+}
+
+static int hjtmpcnt = 0;
+
+static void
+mk_hj_temp(char *tempname)
+{
+ sprintf(tempname, "HJ%d.%d", getpid(), hjtmpcnt);
+ hjtmpcnt = (hjtmpcnt + 1) % 1000;
+}
+
+
+