aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/nodeHash.c585
-rw-r--r--src/backend/executor/nodeHashjoin.c606
2 files changed, 409 insertions, 782 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index b36a2ba4051..4589da32bc1 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -6,7 +6,7 @@
* Copyright (c) 1994, Regents of the University of California
*
*
- * $Id: nodeHash.c,v 1.34 1999/05/09 00:53:20 tgl Exp $
+ * $Id: nodeHash.c,v 1.35 1999/05/18 21:33:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -22,11 +22,6 @@
#include <stdio.h>
#include <math.h>
#include <string.h>
-#include <sys/file.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-#include <unistd.h>
#include "postgres.h"
#include "miscadmin.h"
@@ -34,17 +29,12 @@
#include "executor/executor.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
-#include "storage/ipc.h"
#include "utils/hsearch.h"
+#include "utils/portal.h"
-extern int NBuffers;
+extern int SortMem;
static int hashFunc(Datum key, int len, bool byVal);
-static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable);
-static void * absHashTableAlloc(int size, HashJoinTable hashtable);
-static void ExecHashOverflowInsert(HashJoinTable hashtable,
- HashBucket bucket,
- HeapTuple heapTuple);
/* ----------------------------------------------------------------
* ExecHash
@@ -63,11 +53,7 @@ ExecHash(Hash *node)
HashJoinTable hashtable;
TupleTableSlot *slot;
ExprContext *econtext;
-
int nbatch;
- File *batches = NULL;
- RelativeAddr *batchPos;
- int *batchSizes;
int i;
/* ----------------
@@ -79,27 +65,25 @@ ExecHash(Hash *node)
estate = node->plan.state;
outerNode = outerPlan(node);
- hashtable = node->hashtable;
+ hashtable = hashstate->hashtable;
if (hashtable == NULL)
elog(ERROR, "ExecHash: hash table is NULL.");
nbatch = hashtable->nbatch;
if (nbatch > 0)
- { /* if needs hash partition */
- /* --------------
- * allocate space for the file descriptors of batch files
- * then open the batch files in the current processes.
- * --------------
+ {
+ /* ----------------
+ * Open temp files for inner batches, if needed.
+ * Note that file buffers are palloc'd in regular executor context.
+ * ----------------
*/
- batches = (File *) palloc(nbatch * sizeof(File));
for (i = 0; i < nbatch; i++)
{
- batches[i] = OpenTemporaryFile();
+ File tfile = OpenTemporaryFile();
+ Assert(tfile >= 0);
+ hashtable->innerBatchFile[i] = BufFileCreate(tfile);
}
- hashstate->hashBatches = batches;
- batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
- batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes);
}
/* ----------------
@@ -110,7 +94,7 @@ ExecHash(Hash *node)
econtext = hashstate->cstate.cs_ExprContext;
/* ----------------
- * get tuple and insert into the hash table
+ * get all inner tuples and insert into the hash table (or temp files)
* ----------------
*/
for (;;)
@@ -118,26 +102,11 @@ ExecHash(Hash *node)
slot = ExecProcNode(outerNode, (Plan *) node);
if (TupIsNull(slot))
break;
-
econtext->ecxt_innertuple = slot;
- ExecHashTableInsert(hashtable, econtext, hashkey,
- hashstate->hashBatches);
-
+ ExecHashTableInsert(hashtable, econtext, hashkey);
ExecClearTuple(slot);
}
- /*
- * end of build phase, flush all the last pages of the batches.
- */
- for (i = 0; i < nbatch; i++)
- {
- if (FileSeek(batches[i], 0L, SEEK_END) < 0)
- perror("FileSeek");
- if (FileWrite(batches[i], ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ) < 0)
- perror("FileWrite");
- NDirectFileWrite++;
- }
-
/* ---------------------
* Return the slot so that we have the tuple descriptor
* when we need to save/restore them. -Jeff 11 July 1991
@@ -173,10 +142,10 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent)
*/
hashstate = makeNode(HashState);
node->hashstate = hashstate;
- hashstate->hashBatches = NULL;
+ hashstate->hashtable = NULL;
/* ----------------
- * Miscellanious initialization
+ * Miscellaneous initialization
*
* + assign node's base_id
* + assign debugging hooks and
@@ -186,7 +155,6 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent)
ExecAssignNodeBaseInfo(estate, &hashstate->cstate, parent);
ExecAssignExprContext(estate, &hashstate->cstate);
-#define HASH_NSLOTS 1
/* ----------------
* initialize our result slot
* ----------------
@@ -214,6 +182,7 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent)
int
ExecCountSlotsHash(Hash *node)
{
+#define HASH_NSLOTS 1
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
HASH_NSLOTS;
@@ -230,16 +199,12 @@ ExecEndHash(Hash *node)
{
HashState *hashstate;
Plan *outerPlan;
- File *batches;
/* ----------------
* get info from the hash state
* ----------------
*/
hashstate = node->hashstate;
- batches = hashstate->hashBatches;
- if (batches != NULL)
- pfree(batches);
/* ----------------
* free projection info. no need to free result type info
@@ -256,21 +221,6 @@ ExecEndHash(Hash *node)
ExecEndNode(outerPlan, (Plan *) node);
}
-static RelativeAddr
-hashTableAlloc(int size, HashJoinTable hashtable)
-{
- RelativeAddr p = hashtable->top;
- hashtable->top += MAXALIGN(size);
- return p;
-}
-
-static void *
-absHashTableAlloc(int size, HashJoinTable hashtable)
-{
- RelativeAddr p = hashTableAlloc(size, hashtable);
- return ABSADDR(p);
-}
-
/* ----------------------------------------------------------------
* ExecHashTableCreate
@@ -285,22 +235,19 @@ HashJoinTable
ExecHashTableCreate(Hash *node)
{
Plan *outerNode;
- int HashTBSize;
- int nbatch;
int ntuples;
int tupsize;
- int pages;
- int sqrtpages;
- IpcMemoryId shmid;
+ double inner_rel_bytes;
+ double hash_table_bytes;
+ int nbatch;
HashJoinTable hashtable;
- HashBucket bucket;
int nbuckets;
int totalbuckets;
int bucketsize;
int i;
- RelativeAddr *outerbatchPos;
- RelativeAddr *innerbatchPos;
- int *innerbatchSizes;
+ Portal myPortal;
+ char myPortalName[64];
+ MemoryContext oldcxt;
/* ----------------
* Get information about the size of the relation to be hashed
@@ -314,38 +261,48 @@ ExecHashTableCreate(Hash *node)
ntuples = outerNode->plan_size;
if (ntuples <= 0) /* force a plausible size if no info */
ntuples = 1000;
- tupsize = outerNode->plan_width + sizeof(HeapTupleData);
- pages = (int) ceil((double) ntuples * tupsize * FUDGE_FAC / BLCKSZ);
+ /* estimate tupsize based on footprint of tuple in hashtable...
+ * but what about palloc overhead?
+ */
+ tupsize = MAXALIGN(outerNode->plan_width) +
+ MAXALIGN(sizeof(HashJoinTupleData));
+ inner_rel_bytes = (double) ntuples * tupsize * FUDGE_FAC;
/*
- * Max hashtable size is NBuffers pages, but not less than
+ * Target hashtable size is SortMem kilobytes, but not less than
* sqrt(estimated inner rel size), so as to avoid horrible performance.
- * XXX since the hashtable is not allocated in shared mem anymore,
- * it would probably be more appropriate to drive this from -S than -B.
*/
- sqrtpages = (int) ceil(sqrt((double) pages));
- HashTBSize = NBuffers;
- if (sqrtpages > HashTBSize)
- HashTBSize = sqrtpages;
+ hash_table_bytes = sqrt(inner_rel_bytes);
+ if (hash_table_bytes < (SortMem * 1024L))
+ hash_table_bytes = SortMem * 1024L;
/*
* Count the number of hash buckets we want for the whole relation,
- * and the number we can actually fit in the allowed memory.
+ * for an average bucket load of NTUP_PER_BUCKET (per virtual bucket!).
+ */
+ totalbuckets = (int) ceil((double) ntuples * FUDGE_FAC / NTUP_PER_BUCKET);
+
+ /*
+ * Count the number of buckets we think will actually fit in the
+ * target memory size, at a loading of NTUP_PER_BUCKET (physical buckets).
* NOTE: FUDGE_FAC here determines the fraction of the hashtable space
- * saved for overflow records. Need a better approach...
+ * reserved to allow for nonuniform distribution of hash values.
+ * Perhaps this should be a different number from the other uses of
+ * FUDGE_FAC, but since we have no real good way to pick either one...
*/
- totalbuckets = (int) ceil((double) ntuples / NTUP_PER_BUCKET);
- bucketsize = MAXALIGN(NTUP_PER_BUCKET * tupsize + sizeof(*bucket));
- nbuckets = (int) ((HashTBSize * BLCKSZ) / (bucketsize * FUDGE_FAC));
+ bucketsize = NTUP_PER_BUCKET * tupsize;
+ nbuckets = (int) (hash_table_bytes / (bucketsize * FUDGE_FAC));
+ if (nbuckets <= 0)
+ nbuckets = 1;
if (totalbuckets <= nbuckets)
{
/* We have enough space, so no batching. In theory we could
- * even reduce HashTBSize, but as long as we don't have a way
- * to deal with overflow-space overrun, best to leave the
- * extra space available for overflow.
+ * even reduce nbuckets, but since that could lead to poor
+ * behavior if estimated ntuples is much less than reality,
+ * it seems better to make more buckets instead of fewer.
*/
- nbuckets = totalbuckets;
+ totalbuckets = nbuckets;
nbatch = 0;
}
else
@@ -356,7 +313,8 @@ ExecHashTableCreate(Hash *node)
* of groups we will use for the part of the data that doesn't
* fall into the first nbuckets hash buckets.
*/
- nbatch = (int) ceil((double) (pages - HashTBSize) / HashTBSize);
+ nbatch = (int) ceil((inner_rel_bytes - hash_table_bytes) /
+ hash_table_bytes);
if (nbatch <= 0)
nbatch = 1;
}
@@ -374,90 +332,117 @@ ExecHashTableCreate(Hash *node)
#endif
/* ----------------
- * in non-parallel machines, we don't need to put the hash table
- * in the shared memory. We just palloc it. The space needed
- * is the hash area itself plus nbatch+1 I/O buffer pages.
- * ----------------
- */
- hashtable = (HashJoinTable) palloc((HashTBSize + nbatch + 1) * BLCKSZ);
- shmid = 0;
-
- if (hashtable == NULL)
- elog(ERROR, "not enough memory for hashjoin.");
- /* ----------------
- * initialize the hash table header
+ * Initialize the hash table control block.
+ * The hashtable control block is just palloc'd from executor memory.
* ----------------
*/
+ hashtable = (HashJoinTable) palloc(sizeof(HashTableData));
hashtable->nbuckets = nbuckets;
hashtable->totalbuckets = totalbuckets;
- hashtable->bucketsize = bucketsize;
- hashtable->shmid = shmid;
- hashtable->top = MAXALIGN(sizeof(HashTableData));
- hashtable->bottom = HashTBSize * BLCKSZ;
- /*
- * hashtable->readbuf has to be maxaligned!!!
- * Note there are nbatch additional pages available after readbuf;
- * these are used for buffering the outgoing batch data.
- */
- hashtable->readbuf = hashtable->bottom;
- hashtable->batch = hashtable->bottom + BLCKSZ;
+ hashtable->buckets = NULL;
hashtable->nbatch = nbatch;
hashtable->curbatch = 0;
- hashtable->pcount = hashtable->nprocess = 0;
+ hashtable->innerBatchFile = NULL;
+ hashtable->outerBatchFile = NULL;
+ hashtable->innerBatchSize = NULL;
+ hashtable->outerBatchSize = NULL;
+
+ /* ----------------
+ * Create a named portal in which to keep the hashtable working storage.
+ * Each hashjoin must have its own portal, so be wary of name conflicts.
+ * ----------------
+ */
+ i = 0;
+ do {
+ i++;
+ sprintf(myPortalName, "<hashtable %d>", i);
+ myPortal = GetPortalByName(myPortalName);
+ } while (PortalIsValid(myPortal));
+ myPortal = CreatePortal(myPortalName);
+ Assert(PortalIsValid(myPortal));
+ hashtable->myPortal = (void*) myPortal; /* kluge for circular includes */
+ hashtable->hashCxt = (MemoryContext) PortalGetVariableMemory(myPortal);
+ hashtable->batchCxt = (MemoryContext) PortalGetHeapMemory(myPortal);
+
+ /* Allocate data that will live for the life of the hashjoin */
+
+ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+
if (nbatch > 0)
{
/* ---------------
- * allocate and initialize the outer batches
+ * allocate and initialize the file arrays in hashCxt
* ---------------
*/
- outerbatchPos = (RelativeAddr *)
- absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
+ hashtable->innerBatchFile = (BufFile **)
+ palloc(nbatch * sizeof(BufFile *));
+ hashtable->outerBatchFile = (BufFile **)
+ palloc(nbatch * sizeof(BufFile *));
+ hashtable->innerBatchSize = (long *)
+ palloc(nbatch * sizeof(long));
+ hashtable->outerBatchSize = (long *)
+ palloc(nbatch * sizeof(long));
for (i = 0; i < nbatch; i++)
{
- outerbatchPos[i] = -1;
+ hashtable->innerBatchFile[i] = NULL;
+ hashtable->outerBatchFile[i] = NULL;
+ hashtable->innerBatchSize[i] = 0;
+ hashtable->outerBatchSize[i] = 0;
}
- hashtable->outerbatchPos = RELADDR(outerbatchPos);
- /* ---------------
- * allocate and initialize the inner batches
- * ---------------
- */
- innerbatchPos = (RelativeAddr *)
- absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
- innerbatchSizes = (int *)
- absHashTableAlloc(nbatch * sizeof(int), hashtable);
- for (i = 0; i < nbatch; i++)
- {
- innerbatchPos[i] = -1;
- innerbatchSizes[i] = 0;
- }
- hashtable->innerbatchPos = RELADDR(innerbatchPos);
- hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
- }
- else
- {
- hashtable->outerbatchPos = (RelativeAddr) NULL;
- hashtable->innerbatchPos = (RelativeAddr) NULL;
- hashtable->innerbatchSizes = (RelativeAddr) NULL;
+ /* The files will not be opened until later... */
}
- hashtable->overflownext = hashtable->top + bucketsize * nbuckets;
- Assert(hashtable->overflownext < hashtable->bottom);
- /* ----------------
- * initialize each hash bucket
- * ----------------
+ /* Prepare portal for the first-scan space allocations;
+ * allocate the hashbucket array therein, and set each bucket "empty".
*/
- bucket = (HashBucket) ABSADDR(hashtable->top);
+ MemoryContextSwitchTo(hashtable->batchCxt);
+ StartPortalAllocMode(DefaultAllocMode, 0);
+
+ hashtable->buckets = (HashJoinTuple *)
+ palloc(nbuckets * sizeof(HashJoinTuple));
+
+ if (hashtable->buckets == NULL)
+ elog(ERROR, "Insufficient memory for hash table.");
+
for (i = 0; i < nbuckets; i++)
{
- bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket)));
- bucket->bottom = bucket->top;
- bucket->firstotuple = bucket->lastotuple = -1;
- bucket = (HashBucket) ((char *) bucket + bucketsize);
+ hashtable->buckets[i] = NULL;
}
+
+ MemoryContextSwitchTo(oldcxt);
+
return hashtable;
}
/* ----------------------------------------------------------------
+ * ExecHashTableDestroy
+ *
+ * destroy a hash table
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashTableDestroy(HashJoinTable hashtable)
+{
+ int i;
+
+ /* Make sure all the temp files are closed */
+ for (i = 0; i < hashtable->nbatch; i++)
+ {
+ if (hashtable->innerBatchFile[i])
+ BufFileClose(hashtable->innerBatchFile[i]);
+ if (hashtable->outerBatchFile[i])
+ BufFileClose(hashtable->outerBatchFile[i]);
+ }
+
+ /* Destroy the portal to release all working memory */
+ /* cast here is a kluge for circular includes... */
+ PortalDestroy((Portal*) & hashtable->myPortal);
+
+ /* And drop the control block */
+ pfree(hashtable);
+}
+
+/* ----------------------------------------------------------------
* ExecHashTableInsert
*
* insert a tuple into the hash table depending on the hash value
@@ -467,32 +452,11 @@ ExecHashTableCreate(Hash *node)
void
ExecHashTableInsert(HashJoinTable hashtable,
ExprContext *econtext,
- Var *hashkey,
- File *batches)
+ Var *hashkey)
{
- TupleTableSlot *slot;
- HeapTuple heapTuple;
- HashBucket bucket;
- int bucketno;
- int nbatch;
- int batchno;
- char *buffer;
- RelativeAddr *batchPos;
- int *batchSizes;
- char *pos;
-
- nbatch = hashtable->nbatch;
- batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
- batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes);
-
- slot = econtext->ecxt_innertuple;
- heapTuple = slot->val;
-
-#ifdef HJDEBUG
- printf("Inserting ");
-#endif
-
- bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
+ int bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
+ TupleTableSlot *slot = econtext->ecxt_innertuple;
+ HeapTuple heapTuple = slot->val;
/* ----------------
* decide whether to put the tuple in the hash table or a tmp file
@@ -504,22 +468,24 @@ ExecHashTableInsert(HashJoinTable hashtable,
* put the tuple in hash table
* ---------------
*/
- bucket = (HashBucket)
- (ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize);
- if (((char *) MAXALIGN(ABSADDR(bucket->bottom)) - (char *) bucket)
- + heapTuple->t_len + HEAPTUPLESIZE > hashtable->bucketsize)
- ExecHashOverflowInsert(hashtable, bucket, heapTuple);
- else
- {
- memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)),
- heapTuple,
- HEAPTUPLESIZE);
- memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)) + HEAPTUPLESIZE,
- heapTuple->t_data,
- heapTuple->t_len);
- bucket->bottom = ((RelativeAddr) MAXALIGN(bucket->bottom) +
- heapTuple->t_len + HEAPTUPLESIZE);
- }
+ HashJoinTuple hashTuple;
+ int hashTupleSize;
+
+ hashTupleSize = MAXALIGN(sizeof(*hashTuple)) + heapTuple->t_len;
+ hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
+ hashTupleSize);
+ if (hashTuple == NULL)
+ elog(ERROR, "Insufficient memory for hash table.");
+ memcpy((char *) & hashTuple->htup,
+ (char *) heapTuple,
+ sizeof(hashTuple->htup));
+ hashTuple->htup.t_data = (HeapTupleHeader)
+ (((char *) hashTuple) + MAXALIGN(sizeof(*hashTuple)));
+ memcpy((char *) hashTuple->htup.t_data,
+ (char *) heapTuple->t_data,
+ heapTuple->t_len);
+ hashTuple->next = hashtable->buckets[bucketno];
+ hashtable->buckets[bucketno] = hashTuple;
}
else
{
@@ -527,32 +493,15 @@ ExecHashTableInsert(HashJoinTable hashtable,
* put the tuple into a tmp file for other batches
* -----------------
*/
- batchno = (nbatch * (bucketno - hashtable->nbuckets)) /
+ int batchno = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) /
(hashtable->totalbuckets - hashtable->nbuckets);
- buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ;
- batchSizes[batchno]++;
- pos = (char *)
- ExecHashJoinSaveTuple(heapTuple,
- buffer,
- batches[batchno],
- (char *) ABSADDR(batchPos[batchno]));
- batchPos[batchno] = RELADDR(pos);
+ hashtable->innerBatchSize[batchno]++;
+ ExecHashJoinSaveTuple(heapTuple,
+ hashtable->innerBatchFile[batchno]);
}
}
/* ----------------------------------------------------------------
- * ExecHashTableDestroy
- *
- * destroy a hash table
- * ----------------------------------------------------------------
- */
-void
-ExecHashTableDestroy(HashJoinTable hashtable)
-{
- pfree(hashtable);
-}
-
-/* ----------------------------------------------------------------
* ExecHashGetBucket
*
* Get the hash value for a tuple
@@ -567,12 +516,12 @@ ExecHashGetBucket(HashJoinTable hashtable,
Datum keyval;
bool isNull;
-
/* ----------------
* Get the join attribute value of the tuple
- * ----------------
+ *
* ...It's quick hack - use ExecEvalExpr instead of ExecEvalVar:
* hashkey may be T_ArrayRef, not just T_Var. - vadim 04/22/97
+ * ----------------
*/
keyval = ExecEvalExpr((Node *) hashkey, econtext, &isNull, NULL);
@@ -604,62 +553,6 @@ ExecHashGetBucket(HashJoinTable hashtable,
}
/* ----------------------------------------------------------------
- * ExecHashOverflowInsert
- *
- * insert into the overflow area of a hash bucket
- * ----------------------------------------------------------------
- */
-static void
-ExecHashOverflowInsert(HashJoinTable hashtable,
- HashBucket bucket,
- HeapTuple heapTuple)
-{
- OverflowTuple otuple;
- RelativeAddr newend;
- OverflowTuple firstotuple;
- OverflowTuple lastotuple;
-
- firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple);
- lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple);
- /* ----------------
- * see if we run out of overflow space
- * ----------------
- */
- newend = (RelativeAddr) MAXALIGN(hashtable->overflownext + sizeof(*otuple)
- + heapTuple->t_len + HEAPTUPLESIZE);
- if (newend > hashtable->bottom)
- elog(ERROR,
- "hash table out of memory. Use -B parameter to increase buffers.");
-
- /* ----------------
- * establish the overflow chain
- * ----------------
- */
- otuple = (OverflowTuple) ABSADDR(hashtable->overflownext);
- hashtable->overflownext = newend;
- if (firstotuple == NULL)
- bucket->firstotuple = bucket->lastotuple = RELADDR(otuple);
- else
- {
- lastotuple->next = RELADDR(otuple);
- bucket->lastotuple = RELADDR(otuple);
- }
-
- /* ----------------
- * copy the tuple into the overflow area
- * ----------------
- */
- otuple->next = -1;
- otuple->tuple = RELADDR(MAXALIGN(((char *) otuple + sizeof(*otuple))));
- memmove(ABSADDR(otuple->tuple),
- heapTuple,
- HEAPTUPLESIZE);
- memmove(ABSADDR(otuple->tuple) + HEAPTUPLESIZE,
- heapTuple->t_data,
- heapTuple->t_len);
-}
-
-/* ----------------------------------------------------------------
* ExecScanHashBucket
*
* scan a hash bucket of matches
@@ -667,95 +560,46 @@ ExecHashOverflowInsert(HashJoinTable hashtable,
*/
HeapTuple
ExecScanHashBucket(HashJoinState *hjstate,
- HashBucket bucket,
- HeapTuple curtuple,
List *hjclauses,
ExprContext *econtext)
{
- HeapTuple heapTuple;
- bool qualResult;
- OverflowTuple otuple = NULL;
- OverflowTuple curotuple;
- TupleTableSlot *inntuple;
- OverflowTuple firstotuple;
- OverflowTuple lastotuple;
- HashJoinTable hashtable;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
- hashtable = hjstate->hj_HashTable;
- firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple);
- lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple);
-
- /* ----------------
- * search the hash bucket
- * ----------------
+ /* hj_CurTuple is NULL to start scanning a new bucket, or the address
+ * of the last tuple returned from the current bucket.
*/
- if (curtuple == NULL || curtuple < (HeapTuple) ABSADDR(bucket->bottom))
+ if (hashTuple == NULL)
{
- if (curtuple == NULL)
- heapTuple = (HeapTuple)
- MAXALIGN(ABSADDR(bucket->top));
- else
- heapTuple = (HeapTuple)
- MAXALIGN(((char *) curtuple + curtuple->t_len + HEAPTUPLESIZE));
-
- while (heapTuple < (HeapTuple) ABSADDR(bucket->bottom))
- {
-
- heapTuple->t_data = (HeapTupleHeader)
- ((char *) heapTuple + HEAPTUPLESIZE);
-
- inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
- hjstate->hj_HashTupleSlot, /* slot */
- InvalidBuffer, /* tuple has no buffer */
- false); /* do not pfree this tuple */
-
- econtext->ecxt_innertuple = inntuple;
- qualResult = ExecQual((List *) hjclauses, econtext);
-
- if (qualResult)
- return heapTuple;
-
- heapTuple = (HeapTuple)
- MAXALIGN(((char *) heapTuple + heapTuple->t_len + HEAPTUPLESIZE));
- }
-
- if (firstotuple == NULL)
- return NULL;
- otuple = firstotuple;
+ hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
}
-
- /* ----------------
- * search the overflow area of the hash bucket
- * ----------------
- */
- if (otuple == NULL)
+ else
{
- curotuple = hjstate->hj_CurOTuple;
- otuple = (OverflowTuple) ABSADDR(curotuple->next);
+ hashTuple = hashTuple->next;
}
- while (otuple != NULL)
+ while (hashTuple != NULL)
{
- heapTuple = (HeapTuple) ABSADDR(otuple->tuple);
- heapTuple->t_data = (HeapTupleHeader)
- ((char *) heapTuple + HEAPTUPLESIZE);
+ HeapTuple heapTuple = & hashTuple->htup;
+ TupleTableSlot *inntuple;
+ bool qualResult;
+ /* insert hashtable's tuple into exec slot so ExecQual sees it */
inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
hjstate->hj_HashTupleSlot, /* slot */
- InvalidBuffer, /* SP?? this tuple has
- * no buffer */
+ InvalidBuffer,
false); /* do not pfree this tuple */
-
econtext->ecxt_innertuple = inntuple;
- qualResult = ExecQual((List *) hjclauses, econtext);
+
+ qualResult = ExecQual(hjclauses, econtext);
if (qualResult)
{
- hjstate->hj_CurOTuple = otuple;
+ hjstate->hj_CurTuple = hashTuple;
return heapTuple;
}
- otuple = (OverflowTuple) ABSADDR(otuple->next);
+ hashTuple = hashTuple->next;
}
/* ----------------
@@ -819,60 +663,57 @@ hashFunc(Datum key, int len, bool byVal)
* reset hash table header for new batch
*
* ntuples is the number of tuples in the inner relation's batch
+ * (which we currently don't actually use...)
* ----------------------------------------------------------------
*/
void
-ExecHashTableReset(HashJoinTable hashtable, int ntuples)
+ExecHashTableReset(HashJoinTable hashtable, long ntuples)
{
+ MemoryContext oldcxt;
+ int nbuckets = hashtable->nbuckets;
int i;
- HashBucket bucket;
/*
- * We can reset the number of hashbuckets since we are going to
- * recalculate the hash values of all the tuples in the new batch
- * anyway. We might as well spread out the hash values as much as
- * we can within the available space. Note we must set nbuckets
- * equal to totalbuckets since we will NOT generate any new output
- * batches after this point.
+ * Release all the hash buckets and tuples acquired in the prior pass,
+ * and reinitialize the portal for a new pass.
*/
- hashtable->nbuckets = hashtable->totalbuckets =
- (int) (hashtable->bottom / (hashtable->bucketsize * FUDGE_FAC));
+ oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
+ EndPortalAllocMode();
+ StartPortalAllocMode(DefaultAllocMode, 0);
/*
- * reinitialize the overflow area to empty, and reinit each hash bucket.
+ * We still use the same number of physical buckets as in the first pass.
+ * (It could be different; but we already decided how many buckets would
+ * be appropriate for the allowed memory, so stick with that number.)
+ * We MUST set totalbuckets to equal nbuckets, because from now on
+ * no tuples will go out to temp files; there are no more virtual buckets,
+ * only real buckets. (This implies that tuples will go into different
+ * bucket numbers than they did on the first pass, but that's OK.)
*/
- hashtable->overflownext = hashtable->top + hashtable->bucketsize *
- hashtable->nbuckets;
- Assert(hashtable->overflownext < hashtable->bottom);
+ hashtable->totalbuckets = nbuckets;
+
+ /* Reallocate and reinitialize the hash bucket headers. */
+ hashtable->buckets = (HashJoinTuple *)
+ palloc(nbuckets * sizeof(HashJoinTuple));
+
+ if (hashtable->buckets == NULL)
+ elog(ERROR, "Insufficient memory for hash table.");
- bucket = (HashBucket) ABSADDR(hashtable->top);
- for (i = 0; i < hashtable->nbuckets; i++)
+ for (i = 0; i < nbuckets; i++)
{
- bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket)));
- bucket->bottom = bucket->top;
- bucket->firstotuple = bucket->lastotuple = -1;
- bucket = (HashBucket) ((char *) bucket + hashtable->bucketsize);
+ hashtable->buckets[i] = NULL;
}
- hashtable->pcount = hashtable->nprocess;
+ MemoryContextSwitchTo(oldcxt);
}
void
ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent)
{
- HashState *hashstate = node->hashstate;
-
- if (hashstate->hashBatches != NULL)
- {
- pfree(hashstate->hashBatches);
- hashstate->hashBatches = NULL;
- }
-
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (((Plan *) node)->lefttree->chgParam == NULL)
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
-
}
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 10e4cfb44fc..b3808fab367 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -7,15 +7,12 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.19 1999/05/09 00:53:21 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.20 1999/05/18 21:33:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include <sys/types.h>
#include <string.h>
-#include <sys/file.h>
-#include <sys/stat.h>
-#include <fcntl.h>
#include "postgres.h"
@@ -25,19 +22,15 @@
#include "executor/nodeHashjoin.h"
#include "optimizer/clauses.h" /* for get_leftop */
-static TupleTableSlot *
- ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, HashJoinState *hjstate);
-
-static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, char *buffer,
- File file, TupleTableSlot *tupleSlot, int *block, char **position);
-
-static int ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable,
- int nbatch);
-
+static TupleTableSlot *ExecHashJoinOuterGetTuple(Plan *node, Plan *parent,
+ HashJoinState *hjstate);
+static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
+ BufFile *file,
+ TupleTableSlot *tupleSlot);
+static int ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable);
static int ExecHashJoinNewBatch(HashJoinState *hjstate);
-
/* ----------------------------------------------------------------
* ExecHashJoin
*
@@ -61,27 +54,14 @@ ExecHashJoin(HashJoin *node)
TupleTableSlot *inntuple;
Var *outerVar;
ExprContext *econtext;
-
HashJoinTable hashtable;
- int bucketno;
- HashBucket bucket;
HeapTuple curtuple;
-
bool qualResult;
-
TupleTableSlot *outerTupleSlot;
TupleTableSlot *innerTupleSlot;
- int nbatch;
- int curbatch;
- File *outerbatches;
- RelativeAddr *outerbatchPos;
Var *innerhashkey;
- int batch;
- int batchno;
- char *buffer;
int i;
bool hashPhaseDone;
- char *pos;
/* ----------------
* get information from HashJoin node
@@ -103,8 +83,6 @@ ExecHashJoin(HashJoin *node)
* -----------------
*/
hashtable = hjstate->hj_HashTable;
- bucket = hjstate->hj_CurBucket;
- curtuple = hjstate->hj_CurTuple;
/* --------------------
* initialize expression context
@@ -121,13 +99,13 @@ ExecHashJoin(HashJoin *node)
if (!isDone)
return result;
}
+
/* ----------------
* if this is the first call, build the hash table for inner relation
* ----------------
*/
if (!hashPhaseDone)
{ /* if the hash phase not completed */
- hashtable = node->hashjointable;
if (hashtable == NULL)
{ /* if the hash table has not been created */
/* ----------------
@@ -143,44 +121,24 @@ ExecHashJoin(HashJoin *node)
* execute the Hash node, to build the hash table
* ----------------
*/
- hashNode->hashtable = hashtable;
+ hashNode->hashstate->hashtable = hashtable;
innerTupleSlot = ExecProcNode((Plan *) hashNode, (Plan *) node);
}
- bucket = NULL;
- curtuple = NULL;
- curbatch = 0;
node->hashdone = true;
- }
- else if (hashtable == NULL)
- return NULL;
-
- nbatch = hashtable->nbatch;
- outerbatches = hjstate->hj_OuterBatches;
- if (nbatch > 0 && outerbatches == NULL)
- { /* if needs hash partition */
- /* -----------------
- * allocate space for file descriptors of outer batch files
- * then open the batch files in the current process
- * -----------------
+ /* ----------------
+ * Open temp files for outer batches, if needed.
+ * Note that file buffers are palloc'd in regular executor context.
+ * ----------------
*/
- innerhashkey = hashNode->hashkey;
- hjstate->hj_InnerHashKey = innerhashkey;
- outerbatches = (File *) palloc(nbatch * sizeof(File));
- for (i = 0; i < nbatch; i++)
+ for (i = 0; i < hashtable->nbatch; i++)
{
- outerbatches[i] = OpenTemporaryFile();
+ File tfile = OpenTemporaryFile();
+ Assert(tfile >= 0);
+ hashtable->outerBatchFile[i] = BufFileCreate(tfile);
}
- hjstate->hj_OuterBatches = outerbatches;
-
- /* ------------------
- * get the inner batch file descriptors from the
- * hash node
- * ------------------
- */
- hjstate->hj_InnerBatches = hashNode->hashstate->hashBatches;
}
- outerbatchPos = (RelativeAddr *) ABSADDR(hashtable->outerbatchPos);
- curbatch = hashtable->curbatch;
+ else if (hashtable == NULL)
+ return NULL;
/* ----------------
* Now get an outer tuple and probe into the hash table for matches
@@ -189,185 +147,106 @@ ExecHashJoin(HashJoin *node)
outerTupleSlot = hjstate->jstate.cs_OuterTupleSlot;
outerVar = get_leftop(clause);
- bucketno = -1; /* if bucketno remains -1, means use old
- * outer tuple */
- if (TupIsNull(outerTupleSlot))
+ for (;;)
{
-
/*
* if the current outer tuple is nil, get a new one
*/
- outerTupleSlot = (TupleTableSlot *)
- ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
-
- while (curbatch <= nbatch && TupIsNull(outerTupleSlot))
+ if (TupIsNull(outerTupleSlot))
{
-
- /*
- * if the current batch runs out, switch to new batch
- */
- curbatch = ExecHashJoinNewBatch(hjstate);
- if (curbatch > nbatch)
+ outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
+ (Plan *) node,
+ hjstate);
+ if (TupIsNull(outerTupleSlot))
{
-
/*
- * when the last batch runs out, clean up
+ * when the last batch runs out, clean up and exit
*/
ExecHashTableDestroy(hashtable);
hjstate->hj_HashTable = NULL;
return NULL;
}
- else
- outerTupleSlot = (TupleTableSlot *)
- ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
- }
-
- /*
- * now we get an outer tuple, find the corresponding bucket for
- * this tuple from the hash table
- */
- econtext->ecxt_outertuple = outerTupleSlot;
-
-#ifdef HJDEBUG
- printf("Probing ");
-#endif
- bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
- bucket = (HashBucket) (ABSADDR(hashtable->top)
- + bucketno * hashtable->bucketsize);
- }
-
- for (;;)
- {
- /* ----------------
- * Now we've got an outer tuple and the corresponding hash bucket,
- * but this tuple may not belong to the current batch.
- * ----------------
- */
- if (curbatch == 0 && bucketno != -1) /* if this is the first
- * pass */
- batch = ExecHashJoinGetBatch(bucketno, hashtable, nbatch);
- else
- batch = 0;
- if (batch > 0)
- {
/*
- * if the current outer tuple does not belong to the current
- * batch, save to the tmp file for the corresponding batch.
+ * now we have an outer tuple, find the corresponding bucket for
+ * this tuple from the hash table
*/
- buffer = ABSADDR(hashtable->batch) + (batch - 1) * BLCKSZ;
- batchno = batch - 1;
- pos = ExecHashJoinSaveTuple(outerTupleSlot->val,
- buffer,
- outerbatches[batchno],
- ABSADDR(outerbatchPos[batchno]));
-
- outerbatchPos[batchno] = RELADDR(pos);
- }
- else if (bucket != NULL)
- {
- do
- {
+ econtext->ecxt_outertuple = outerTupleSlot;
+ hjstate->hj_CurBucketNo = ExecHashGetBucket(hashtable, econtext,
+ outerVar);
+ hjstate->hj_CurTuple = NULL;
- /*
- * scan the hash bucket for matches
- */
- curtuple = ExecScanHashBucket(hjstate,
- bucket,
- curtuple,
- hjclauses,
- econtext);
-
- if (curtuple != NULL)
+ /* ----------------
+ * Now we've got an outer tuple and the corresponding hash bucket,
+ * but this tuple may not belong to the current batch.
+ * This need only be checked in the first pass.
+ * ----------------
+ */
+ if (hashtable->curbatch == 0)
+ {
+ int batch = ExecHashJoinGetBatch(hjstate->hj_CurBucketNo,
+ hashtable);
+ if (batch > 0)
{
-
/*
- * we've got a match, but still need to test qpqual
- */
- inntuple = ExecStoreTuple(curtuple,
- hjstate->hj_HashTupleSlot,
- InvalidBuffer,
- false); /* don't pfree this
- * tuple */
-
- econtext->ecxt_innertuple = inntuple;
-
- /* ----------------
- * test to see if we pass the qualification
- * ----------------
- */
- qualResult = ExecQual((List *) qual, econtext);
-
- /* ----------------
- * if we pass the qual, then save state for next call and
- * have ExecProject form the projection, store it
- * in the tuple table, and return the slot.
- * ----------------
+ * Need to postpone this outer tuple to a later batch.
+ * Save it in the corresponding outer-batch file.
*/
- if (qualResult)
- {
- ProjectionInfo *projInfo;
- TupleTableSlot *result;
- bool isDone;
-
- hjstate->hj_CurBucket = bucket;
- hjstate->hj_CurTuple = curtuple;
- hashtable->curbatch = curbatch;
- hjstate->jstate.cs_OuterTupleSlot = outerTupleSlot;
-
- projInfo = hjstate->jstate.cs_ProjInfo;
- result = ExecProject(projInfo, &isDone);
- hjstate->jstate.cs_TupFromTlist = !isDone;
- return result;
- }
+ int batchno = batch - 1;
+ hashtable->outerBatchSize[batchno]++;
+ ExecHashJoinSaveTuple(outerTupleSlot->val,
+ hashtable->outerBatchFile[batchno]);
+ ExecClearTuple(outerTupleSlot);
+ continue; /* loop around for a new outer tuple */
}
}
- while (curtuple != NULL);
}
- /* ----------------
- * Now the current outer tuple has run out of matches,
- * so we free it and get a new outer tuple.
- * ----------------
+ /*
+ * OK, scan the selected hash bucket for matches
*/
- outerTupleSlot = (TupleTableSlot *)
- ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
-
- while (curbatch <= nbatch && TupIsNull(outerTupleSlot))
+ for (;;)
{
-
+ curtuple = ExecScanHashBucket(hjstate,
+ hjclauses,
+ econtext);
+ if (curtuple == NULL)
+ break; /* out of matches */
/*
- * if the current batch runs out, switch to new batch
+ * we've got a match, but still need to test qpqual
*/
- curbatch = ExecHashJoinNewBatch(hjstate);
- if (curbatch > nbatch)
+ inntuple = ExecStoreTuple(curtuple,
+ hjstate->hj_HashTupleSlot,
+ InvalidBuffer,
+ false); /* don't pfree this tuple */
+ econtext->ecxt_innertuple = inntuple;
+ qualResult = ExecQual(qual, econtext);
+ /* ----------------
+ * if we pass the qual, then save state for next call and
+ * have ExecProject form the projection, store it
+ * in the tuple table, and return the slot.
+ * ----------------
+ */
+ if (qualResult)
{
-
- /*
- * when the last batch runs out, clean up
- */
- ExecHashTableDestroy(hashtable);
- hjstate->hj_HashTable = NULL;
- return NULL;
+ ProjectionInfo *projInfo;
+ TupleTableSlot *result;
+ bool isDone;
+
+ hjstate->jstate.cs_OuterTupleSlot = outerTupleSlot;
+ projInfo = hjstate->jstate.cs_ProjInfo;
+ result = ExecProject(projInfo, &isDone);
+ hjstate->jstate.cs_TupFromTlist = !isDone;
+ return result;
}
- else
- outerTupleSlot = (TupleTableSlot *)
- ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate);
}
/* ----------------
- * Now get the corresponding hash bucket for the new
- * outer tuple.
+ * Now the current outer tuple has run out of matches,
+ * so we free it and loop around to get a new outer tuple.
* ----------------
*/
- econtext->ecxt_outertuple = outerTupleSlot;
-#ifdef HJDEBUG
- printf("Probing ");
-#endif
- bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
- bucket = (HashBucket) (ABSADDR(hashtable->top)
- + bucketno * hashtable->bucketsize);
- curtuple = NULL;
+ ExecClearTuple(outerTupleSlot);
}
}
@@ -399,7 +278,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent)
node->hashjoinstate = hjstate;
/* ----------------
- * Miscellanious initialization
+ * Miscellaneous initialization
*
* + assign node's base_id
* + assign debugging hooks and
@@ -456,22 +335,16 @@ ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent)
ExecAssignProjectionInfo((Plan *) node, &hjstate->jstate);
/* ----------------
- * XXX comment me
+ * initialize hash-specific info
* ----------------
*/
node->hashdone = false;
hjstate->hj_HashTable = (HashJoinTable) NULL;
- hjstate->hj_HashTableShmId = (IpcMemoryId) 0;
- hjstate->hj_CurBucket = (HashBucket) NULL;
- hjstate->hj_CurTuple = (HeapTuple) NULL;
- hjstate->hj_CurOTuple = (OverflowTuple) NULL;
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurTuple = (HashJoinTuple) NULL;
hjstate->hj_InnerHashKey = (Var *) NULL;
- hjstate->hj_OuterBatches = (File *) NULL;
- hjstate->hj_InnerBatches = (File *) NULL;
- hjstate->hj_OuterReadPos = (char *) NULL;
- hjstate->hj_OuterReadBlk = (int) 0;
hjstate->jstate.cs_OuterTupleSlot = (TupleTableSlot *) NULL;
hjstate->jstate.cs_TupFromTlist = (bool) false;
@@ -554,93 +427,69 @@ ExecEndHashJoin(HashJoin *node)
static TupleTableSlot *
ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, HashJoinState *hjstate)
{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
TupleTableSlot *slot;
- HashJoinTable hashtable;
- int curbatch;
- File *outerbatches;
- char *outerreadPos;
- int batchno;
- char *outerreadBuf;
- int outerreadBlk;
-
- hashtable = hjstate->hj_HashTable;
- curbatch = hashtable->curbatch;
if (curbatch == 0)
{ /* if it is the first pass */
slot = ExecProcNode(node, parent);
- return slot;
+ if (! TupIsNull(slot))
+ return slot;
+ /*
+ * We have just reached the end of the first pass.
+ * Try to switch to a saved batch.
+ */
+ curbatch = ExecHashJoinNewBatch(hjstate);
}
/*
- * otherwise, read from the tmp files
+ * Try to read from a temp file.
+ * Loop allows us to advance to new batch as needed.
*/
- outerbatches = hjstate->hj_OuterBatches;
- outerreadPos = hjstate->hj_OuterReadPos;
- outerreadBlk = hjstate->hj_OuterReadBlk;
- outerreadBuf = ABSADDR(hashtable->readbuf);
- batchno = curbatch - 1;
-
- slot = ExecHashJoinGetSavedTuple(hjstate,
- outerreadBuf,
- outerbatches[batchno],
- hjstate->hj_OuterTupleSlot,
- &outerreadBlk,
- &outerreadPos);
-
- hjstate->hj_OuterReadPos = outerreadPos;
- hjstate->hj_OuterReadBlk = outerreadBlk;
-
- return slot;
+ while (curbatch <= hashtable->nbatch)
+ {
+ slot = ExecHashJoinGetSavedTuple(hjstate,
+ hashtable->outerBatchFile[curbatch-1],
+ hjstate->hj_OuterTupleSlot);
+ if (! TupIsNull(slot))
+ return slot;
+ curbatch = ExecHashJoinNewBatch(hjstate);
+ }
+
+ /* Out of batches... */
+ return NULL;
}
/* ----------------------------------------------------------------
* ExecHashJoinGetSavedTuple
*
- * read the next tuple from a tmp file using a certain buffer
+ * read the next tuple from a tmp file
* ----------------------------------------------------------------
*/
static TupleTableSlot *
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
- char *buffer,
- File file,
- TupleTableSlot *tupleSlot,
- int *block, /* return parameter */
- char **position) /* return parameter */
+ BufFile *file,
+ TupleTableSlot *tupleSlot)
{
- char *bufstart;
- char *bufend;
- int cc;
- HeapTuple heapTuple;
- HashJoinTable hashtable;
-
- hashtable = hjstate->hj_HashTable;
- bufend = buffer + *(long *) buffer;
- bufstart = (char *) (buffer + sizeof(long));
- if ((*position == NULL) || (*position >= bufend))
- {
- if (*position == NULL)
- (*block) = 0;
- else
- (*block)++;
- FileSeek(file, *block * BLCKSZ, SEEK_SET);
- cc = FileRead(file, buffer, BLCKSZ);
- NDirectFileRead++;
- if (cc < 0)
- perror("FileRead");
- if (cc == 0) /* end of file */
- return NULL;
- else
- (*position) = bufstart;
- }
- heapTuple = (HeapTuple) (*position);
+ HeapTupleData htup;
+ size_t nread;
+ HeapTuple heapTuple;
+
+ nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData));
+ if (nread == 0)
+ return NULL; /* end of file */
+ if (nread != sizeof(HeapTupleData))
+ elog(ERROR, "Read from hashjoin temp file failed");
+ heapTuple = palloc(HEAPTUPLESIZE + htup.t_len);
+ memcpy((char *) heapTuple, (char *) &htup, sizeof(HeapTupleData));
heapTuple->t_data = (HeapTupleHeader)
((char *) heapTuple + HEAPTUPLESIZE);
- (*position) = (char *) MAXALIGN(*position +
- heapTuple->t_len + HEAPTUPLESIZE);
-
- return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, false);
+ nread = BufFileRead(file, (void *) heapTuple->t_data, htup.t_len);
+ if (nread != (size_t) htup.t_len)
+ elog(ERROR, "Read from hashjoin temp file failed");
+ return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, true);
}
/* ----------------------------------------------------------------
@@ -652,116 +501,80 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
static int
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
- File *innerBatches;
- File *outerBatches;
- int *innerBatchSizes;
- Var *innerhashkey;
- HashJoinTable hashtable;
- int nbatch;
- char *readPos;
- int readBlk;
- char *readBuf;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int nbatch = hashtable->nbatch;
+ int newbatch = hashtable->curbatch + 1;
+ long *innerBatchSize = hashtable->innerBatchSize;
+ long *outerBatchSize = hashtable->outerBatchSize;
+ BufFile *innerFile;
TupleTableSlot *slot;
ExprContext *econtext;
- int i;
- int cc;
- int newbatch;
-
- hashtable = hjstate->hj_HashTable;
- outerBatches = hjstate->hj_OuterBatches;
- innerBatches = hjstate->hj_InnerBatches;
- nbatch = hashtable->nbatch;
- newbatch = hashtable->curbatch + 1;
-
- /* ------------------
- * this is the last process, so it will do the cleanup and
- * batch-switching.
- * ------------------
- */
- if (newbatch == 1)
- {
+ Var *innerhashkey;
- /*
- * if it is end of the first pass, flush all the last pages for
- * the batches.
- */
- outerBatches = hjstate->hj_OuterBatches;
- for (i = 0; i < nbatch; i++)
- {
- cc = FileSeek(outerBatches[i], 0L, SEEK_END);
- if (cc < 0)
- perror("FileSeek");
- cc = FileWrite(outerBatches[i],
- ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ);
- NDirectFileWrite++;
- if (cc < 0)
- perror("FileWrite");
- }
- }
if (newbatch > 1)
{
-
/*
- * remove the previous outer batch
+ * We no longer need the previous outer batch file;
+ * close it right away to free disk space.
*/
- FileUnlink(outerBatches[newbatch - 2]);
+ BufFileClose(hashtable->outerBatchFile[newbatch - 2]);
+ hashtable->outerBatchFile[newbatch - 2] = NULL;
}
- /*
- * rebuild the hash table for the new inner batch
- */
- innerBatchSizes = (int *) ABSADDR(hashtable->innerbatchSizes);
/* --------------
- * skip over empty inner batches
+ * We can skip over any batches that are empty on either side.
+ * Release associated temp files right away.
* --------------
*/
- while (newbatch <= nbatch && innerBatchSizes[newbatch - 1] == 0)
+ while (newbatch <= nbatch &&
+ (innerBatchSize[newbatch - 1] == 0L ||
+ outerBatchSize[newbatch - 1] == 0L))
{
- FileUnlink(outerBatches[newbatch - 1]);
- FileUnlink(innerBatches[newbatch - 1]);
+ BufFileClose(hashtable->innerBatchFile[newbatch - 1]);
+ hashtable->innerBatchFile[newbatch - 1] = NULL;
+ BufFileClose(hashtable->outerBatchFile[newbatch - 1]);
+ hashtable->outerBatchFile[newbatch - 1] = NULL;
newbatch++;
}
+
if (newbatch > nbatch)
- {
- hashtable->pcount = hashtable->nprocess;
+ return newbatch; /* no more batches */
- return newbatch;
- }
- ExecHashTableReset(hashtable, innerBatchSizes[newbatch - 1]);
+ /*
+ * Rewind inner and outer batch files for this batch,
+ * so that we can start reading them.
+ */
+ if (BufFileSeek(hashtable->outerBatchFile[newbatch - 1], 0L,
+ SEEK_SET) != 0L)
+ elog(ERROR, "Failed to rewind hash temp file");
+
+ innerFile = hashtable->innerBatchFile[newbatch - 1];
+ if (BufFileSeek(innerFile, 0L, SEEK_SET) != 0L)
+ elog(ERROR, "Failed to rewind hash temp file");
+
+ /*
+ * Reload the hash table with the new inner batch
+ */
+ ExecHashTableReset(hashtable, innerBatchSize[newbatch - 1]);
econtext = hjstate->jstate.cs_ExprContext;
innerhashkey = hjstate->hj_InnerHashKey;
- readPos = NULL;
- readBlk = 0;
- readBuf = ABSADDR(hashtable->readbuf);
while ((slot = ExecHashJoinGetSavedTuple(hjstate,
- readBuf,
- innerBatches[newbatch - 1],
- hjstate->hj_HashTupleSlot,
- &readBlk,
- &readPos))
+ innerFile,
+ hjstate->hj_HashTupleSlot))
&& !TupIsNull(slot))
{
econtext->ecxt_innertuple = slot;
- ExecHashTableInsert(hashtable, econtext, innerhashkey, NULL);
- /* possible bug - glass */
+ ExecHashTableInsert(hashtable, econtext, innerhashkey);
}
-
- /* -----------------
- * only the last process comes to this branch
- * now all the processes have finished the build phase
- * ----------------
- */
-
/*
- * after we build the hash table, the inner batch is no longer needed
+ * after we build the hash table, the inner batch file is no longer needed
*/
- FileUnlink(innerBatches[newbatch - 1]);
- hjstate->hj_OuterReadPos = NULL;
- hashtable->pcount = hashtable->nprocess;
+ BufFileClose(innerFile);
+ hashtable->innerBatchFile[newbatch - 1] = NULL;
hashtable->curbatch = newbatch;
return newbatch;
@@ -777,63 +590,41 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* ----------------------------------------------------------------
*/
static int
-ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable, int nbatch)
+ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable)
{
int b;
- if (bucketno < hashtable->nbuckets || nbatch == 0)
+ if (bucketno < hashtable->nbuckets || hashtable->nbatch == 0)
return 0;
- b = (float) (bucketno - hashtable->nbuckets) /
- (float) (hashtable->totalbuckets - hashtable->nbuckets) *
- nbatch;
+ b = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) /
+ (hashtable->totalbuckets - hashtable->nbuckets);
return b + 1;
}
/* ----------------------------------------------------------------
* ExecHashJoinSaveTuple
*
- * save a tuple to a tmp file using a buffer.
- * the first few bytes in a page is an offset to the end
- * of the page.
+ * save a tuple to a tmp file.
+ *
+ * The data recorded in the file for each tuple is an image of its
+ * HeapTupleData (with meaningless t_data pointer) followed by the
+ * HeapTupleHeader and tuple data.
* ----------------------------------------------------------------
*/
-char *
+void
ExecHashJoinSaveTuple(HeapTuple heapTuple,
- char *buffer,
- File file,
- char *position)
+ BufFile *file)
{
- long *pageend;
- char *pagestart;
- char *pagebound;
- int cc;
-
- pageend = (long *) buffer;
- pagestart = (char *) (buffer + sizeof(long));
- pagebound = buffer + BLCKSZ;
- if (position == NULL)
- position = pagestart;
-
- if (position + heapTuple->t_len + HEAPTUPLESIZE >= pagebound)
- {
- cc = FileSeek(file, 0L, SEEK_END);
- if (cc < 0)
- perror("FileSeek");
- cc = FileWrite(file, buffer, BLCKSZ);
- NDirectFileWrite++;
- if (cc < 0)
- perror("FileWrite");
- position = pagestart;
- *pageend = 0;
- }
- memmove(position, heapTuple, HEAPTUPLESIZE);
- memmove(position + HEAPTUPLESIZE, heapTuple->t_data, heapTuple->t_len);
- position = (char *) MAXALIGN(position + heapTuple->t_len + HEAPTUPLESIZE);
- *pageend = position - buffer;
-
- return position;
+ size_t written;
+
+ written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData));
+ if (written != sizeof(HeapTupleData))
+ elog(ERROR, "Write to hashjoin temp file failed");
+ written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len);
+ if (written != (size_t) heapTuple->t_len)
+ elog(ERROR, "Write to hashjoin temp file failed");
}
void
@@ -855,14 +646,10 @@ ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent)
ExecHashTableDestroy(hjstate->hj_HashTable);
hjstate->hj_HashTable = NULL;
}
- hjstate->hj_CurBucket = (HashBucket) NULL;
- hjstate->hj_CurTuple = (HeapTuple) NULL;
- hjstate->hj_CurOTuple = (OverflowTuple) NULL;
+
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurTuple = (HashJoinTuple) NULL;
hjstate->hj_InnerHashKey = (Var *) NULL;
- hjstate->hj_OuterBatches = (File *) NULL;
- hjstate->hj_InnerBatches = (File *) NULL;
- hjstate->hj_OuterReadPos = (char *) NULL;
- hjstate->hj_OuterReadBlk = (int) 0;
hjstate->jstate.cs_OuterTupleSlot = (TupleTableSlot *) NULL;
hjstate->jstate.cs_TupFromTlist = (bool) false;
@@ -875,5 +662,4 @@ ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent)
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
if (((Plan *) node)->righttree->chgParam == NULL)
ExecReScan(((Plan *) node)->righttree, exprCtxt, (Plan *) node);
-
}