aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeAgg.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r--src/backend/executor/nodeAgg.c187
1 files changed, 49 insertions, 138 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 39bea204d16..c99a0de4ddb 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -208,7 +208,16 @@
*
* Spilled data is written to logical tapes. These provide better control
* over memory usage, disk space, and the number of files than if we were
- * to use a BufFile for each spill.
+ * to use a BufFile for each spill. We don't know the number of tapes needed
+ * at the start of the algorithm (because it can recurse), so a tape set is
+ * allocated at the beginning, and individual tapes are created as needed.
+ * As a particular tape is read, logtape.c recycles its disk space. When a
+ * tape is read to completion, it is destroyed entirely.
+ *
+ * Tapes' buffers can take up substantial memory when many tapes are open at
+ * once. We only need one tape open at a time in read mode (using a buffer
+ * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ * requiring a buffer of size BLCKSZ) for each partition.
*
* Note that it's possible for transition states to start small but then
* grow very large; for instance in the case of ARRAY_AGG. In such cases,
@@ -312,27 +321,6 @@
#define CHUNKHDRSZ 16
/*
- * Track all tapes needed for a HashAgg that spills. We don't know the maximum
- * number of tapes needed at the start of the algorithm (because it can
- * recurse), so one tape set is allocated and extended as needed for new
- * tapes. When a particular tape is already read, rewind it for write mode and
- * put it in the free list.
- *
- * Tapes' buffers can take up substantial memory when many tapes are open at
- * once. We only need one tape open at a time in read mode (using a buffer
- * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
- * requiring a buffer of size BLCKSZ) for each partition.
- */
-typedef struct HashTapeInfo
-{
- LogicalTapeSet *tapeset;
- int ntapes;
- int *freetapes;
- int nfreetapes;
- int freetapes_alloc;
-} HashTapeInfo;
-
-/*
* Represents partitioned spill data for a single hashtable. Contains the
* necessary information to route tuples to the correct partition, and to
* transform the spilled data into new batches.
@@ -343,9 +331,8 @@ typedef struct HashTapeInfo
*/
typedef struct HashAggSpill
{
- LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int npartitions; /* number of partitions */
- int *partitions; /* spill partition tape numbers */
+ LogicalTape **partitions; /* spill partition tapes */
int64 *ntuples; /* number of tuples in each partition */
uint32 mask; /* mask to find partition from hash value */
int shift; /* after masking, shift by this amount */
@@ -365,8 +352,7 @@ typedef struct HashAggBatch
{
int setno; /* grouping set */
int used_bits; /* number of bits of hash already used */
- LogicalTapeSet *tapeset; /* borrowed reference to tape set */
- int input_tapenum; /* input partition tape */
+ LogicalTape *input_tape; /* input partition tape */
int64 input_tuples; /* number of tuples in this batch */
double input_card; /* estimated group cardinality */
} HashAggBatch;
@@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
int npartitions);
static void hashagg_finish_initial_spills(AggState *aggstate);
static void hashagg_reset_spill_state(AggState *aggstate);
-static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
- int input_tapenum, int setno,
+static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card,
int used_bits);
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
-static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
int used_bits, double input_groups,
double hashentrysize);
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *slot, uint32 hash);
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
int setno);
-static void hashagg_tapeinfo_init(AggState *aggstate);
-static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
- int ndest);
-static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
@@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
if (!aggstate->hash_ever_spilled)
{
- Assert(aggstate->hash_tapeinfo == NULL);
+ Assert(aggstate->hash_tapeset == NULL);
Assert(aggstate->hash_spills == NULL);
aggstate->hash_ever_spilled = true;
- hashagg_tapeinfo_init(aggstate);
+ aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
@@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
AggStatePerHash perhash = &aggstate->perhash[setno];
HashAggSpill *spill = &aggstate->hash_spills[setno];
- hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
}
@@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
aggstate->hash_mem_peak = total_mem;
/* update disk usage */
- if (aggstate->hash_tapeinfo != NULL)
+ if (aggstate->hash_tapeset != NULL)
{
- uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+ uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
if (aggstate->hash_disk_used < disk_used)
aggstate->hash_disk_used = disk_used;
@@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
if (spill->partitions == NULL)
- hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
@@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate)
HashAggBatch *batch;
AggStatePerHash perhash;
HashAggSpill spill;
- HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+ LogicalTapeSet *tapeset = aggstate->hash_tapeset;
bool spill_initialized = false;
if (aggstate->hash_batches == NIL)
@@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate)
* that we don't assign tapes that will never be used.
*/
spill_initialized = true;
- hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+ hashagg_spill_init(&spill, tapeset, batch->used_bits,
batch->input_card, aggstate->hashentrysize);
}
/* no memory for a new group, spill */
@@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate)
ResetExprContext(aggstate->tmpcontext);
}
- hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+ LogicalTapeClose(batch->input_tape);
/* change back to phase 0 */
aggstate->current_phase = 0;
@@ -2885,74 +2866,13 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
}
/*
- * Initialize HashTapeInfo
- */
-static void
-hashagg_tapeinfo_init(AggState *aggstate)
-{
- HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
- int init_tapes = 16; /* expanded dynamically */
-
- tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
- tapeinfo->ntapes = init_tapes;
- tapeinfo->nfreetapes = init_tapes;
- tapeinfo->freetapes_alloc = init_tapes;
- tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
- for (int i = 0; i < init_tapes; i++)
- tapeinfo->freetapes[i] = i;
-
- aggstate->hash_tapeinfo = tapeinfo;
-}
-
-/*
- * Assign unused tapes to spill partitions, extending the tape set if
- * necessary.
- */
-static void
-hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
- int npartitions)
-{
- int partidx = 0;
-
- /* use free tapes if available */
- while (partidx < npartitions && tapeinfo->nfreetapes > 0)
- partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
-
- if (partidx < npartitions)
- {
- LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
-
- while (partidx < npartitions)
- partitions[partidx++] = tapeinfo->ntapes++;
- }
-}
-
-/*
- * After a tape has already been written to and then read, this function
- * rewinds it for writing and adds it to the free list.
- */
-static void
-hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
-{
- /* rewinding frees the buffer while not in use */
- LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
- if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
- {
- tapeinfo->freetapes_alloc <<= 1;
- tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
- tapeinfo->freetapes_alloc * sizeof(int));
- }
- tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
-}
-
-/*
* hashagg_spill_init
*
* Called after we determined that spilling is necessary. Chooses the number
* of partitions to create, and initializes them.
*/
static void
-hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
double input_groups, double hashentrysize)
{
int npartitions;
@@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
used_bits, &partition_bits);
- spill->partitions = palloc0(sizeof(int) * npartitions);
+ spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions);
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
- hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+ for (int i = 0; i < npartitions; i++)
+ spill->partitions[i] = LogicalTapeCreate(tapeset);
- spill->tapeset = tapeinfo->tapeset;
spill->shift = 32 - used_bits - partition_bits;
spill->mask = (npartitions - 1) << spill->shift;
spill->npartitions = npartitions;
@@ -2986,11 +2906,10 @@ static Size
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *inputslot, uint32 hash)
{
- LogicalTapeSet *tapeset = spill->tapeset;
TupleTableSlot *spillslot;
int partition;
MinimalTuple tuple;
- int tapenum;
+ LogicalTape *tape;
int total_written = 0;
bool shouldFree;
@@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
*/
addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
- tapenum = spill->partitions[partition];
+ tape = spill->partitions[partition];
- LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+ LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
total_written += sizeof(uint32);
- LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+ LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
total_written += tuple->t_len;
if (shouldFree)
@@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
* be done.
*/
static HashAggBatch *
-hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card, int used_bits)
{
HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
batch->setno = setno;
batch->used_bits = used_bits;
- batch->tapeset = tapeset;
- batch->input_tapenum = tapenum;
+ batch->input_tape = input_tape;
batch->input_tuples = input_tuples;
batch->input_card = input_card;
@@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
static MinimalTuple
hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
{
- LogicalTapeSet *tapeset = batch->tapeset;
- int tapenum = batch->input_tapenum;
+ LogicalTape *tape = batch->input_tape;
MinimalTuple tuple;
uint32 t_len;
size_t nread;
uint32 hash;
- nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+ nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
if (nread == 0)
return NULL;
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, sizeof(uint32), nread)));
if (hashp != NULL)
*hashp = hash;
- nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+ nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, sizeof(uint32), nread)));
tuple = (MinimalTuple) palloc(t_len);
tuple->t_len = t_len;
- nread = LogicalTapeRead(tapeset, tapenum,
+ nread = LogicalTapeRead(tape,
(void *) ((char *) tuple + sizeof(uint32)),
t_len - sizeof(uint32));
if (nread != t_len - sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
- tapenum, t_len - sizeof(uint32), nread)));
+ errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+ tape, t_len - sizeof(uint32), nread)));
return tuple;
}
@@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
for (i = 0; i < spill->npartitions; i++)
{
- LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
- int tapenum = spill->partitions[i];
+ LogicalTape *tape = spill->partitions[i];
HashAggBatch *new_batch;
double cardinality;
@@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
freeHyperLogLog(&spill->hll_card[i]);
/* rewinding frees the buffer while not in use */
- LogicalTapeRewindForRead(tapeset, tapenum,
- HASHAGG_READ_BUFFER_SIZE);
+ LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
- new_batch = hashagg_batch_new(tapeset, tapenum, setno,
+ new_batch = hashagg_batch_new(tape, setno,
spill->ntuples[i], cardinality,
used_bits);
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
@@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate)
aggstate->hash_batches = NIL;
/* close tape set */
- if (aggstate->hash_tapeinfo != NULL)
+ if (aggstate->hash_tapeset != NULL)
{
- HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-
- LogicalTapeSetClose(tapeinfo->tapeset);
- pfree(tapeinfo->freetapes);
- pfree(tapeinfo);
- aggstate->hash_tapeinfo = NULL;
+ LogicalTapeSetClose(aggstate->hash_tapeset);
+ aggstate->hash_tapeset = NULL;
}
}