diff options
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 187 |
1 files changed, 49 insertions, 138 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 39bea204d16..c99a0de4ddb 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -208,7 +208,16 @@ * * Spilled data is written to logical tapes. These provide better control * over memory usage, disk space, and the number of files than if we were - * to use a BufFile for each spill. + * to use a BufFile for each spill. We don't know the number of tapes needed + * at the start of the algorithm (because it can recurse), so a tape set is + * allocated at the beginning, and individual tapes are created as needed. + * As a particular tape is read, logtape.c recycles its disk space. When a + * tape is read to completion, it is destroyed entirely. + * + * Tapes' buffers can take up substantial memory when many tapes are open at + * once. We only need one tape open at a time in read mode (using a buffer + * that's a multiple of BLCKSZ); but we need one tape open in write mode (each + * requiring a buffer of size BLCKSZ) for each partition. * * Note that it's possible for transition states to start small but then * grow very large; for instance in the case of ARRAY_AGG. In such cases, @@ -312,27 +321,6 @@ #define CHUNKHDRSZ 16 /* - * Track all tapes needed for a HashAgg that spills. We don't know the maximum - * number of tapes needed at the start of the algorithm (because it can - * recurse), so one tape set is allocated and extended as needed for new - * tapes. When a particular tape is already read, rewind it for write mode and - * put it in the free list. - * - * Tapes' buffers can take up substantial memory when many tapes are open at - * once. We only need one tape open at a time in read mode (using a buffer - * that's a multiple of BLCKSZ); but we need one tape open in write mode (each - * requiring a buffer of size BLCKSZ) for each partition. - */ -typedef struct HashTapeInfo -{ - LogicalTapeSet *tapeset; - int ntapes; - int *freetapes; - int nfreetapes; - int freetapes_alloc; -} HashTapeInfo; - -/* * Represents partitioned spill data for a single hashtable. Contains the * necessary information to route tuples to the correct partition, and to * transform the spilled data into new batches. @@ -343,9 +331,8 @@ typedef struct HashTapeInfo */ typedef struct HashAggSpill { - LogicalTapeSet *tapeset; /* borrowed reference to tape set */ int npartitions; /* number of partitions */ - int *partitions; /* spill partition tape numbers */ + LogicalTape **partitions; /* spill partition tapes */ int64 *ntuples; /* number of tuples in each partition */ uint32 mask; /* mask to find partition from hash value */ int shift; /* after masking, shift by this amount */ @@ -365,8 +352,7 @@ typedef struct HashAggBatch { int setno; /* grouping set */ int used_bits; /* number of bits of hash already used */ - LogicalTapeSet *tapeset; /* borrowed reference to tape set */ - int input_tapenum; /* input partition tape */ + LogicalTape *input_tape; /* input partition tape */ int64 input_tuples; /* number of tuples in this batch */ double input_card; /* estimated group cardinality */ } HashAggBatch; @@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions); static void hashagg_finish_initial_spills(AggState *aggstate); static void hashagg_reset_spill_state(AggState *aggstate); -static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, - int input_tapenum, int setno, +static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); -static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, +static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts, int used_bits, double input_groups, double hashentrysize); static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash); static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno); -static void hashagg_tapeinfo_init(AggState *aggstate); -static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, - int ndest); -static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, @@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate) if (!aggstate->hash_ever_spilled) { - Assert(aggstate->hash_tapeinfo == NULL); + Assert(aggstate->hash_tapeset == NULL); Assert(aggstate->hash_spills == NULL); aggstate->hash_ever_spilled = true; - hashagg_tapeinfo_init(aggstate); + aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1); aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes); @@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate) AggStatePerHash perhash = &aggstate->perhash[setno]; HashAggSpill *spill = &aggstate->hash_spills[setno]; - hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + hashagg_spill_init(spill, aggstate->hash_tapeset, 0, perhash->aggnode->numGroups, aggstate->hashentrysize); } @@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) aggstate->hash_mem_peak = total_mem; /* update disk usage */ - if (aggstate->hash_tapeinfo != NULL) + if (aggstate->hash_tapeset != NULL) { - uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024); + uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024); if (aggstate->hash_disk_used < disk_used) aggstate->hash_disk_used = disk_used; @@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate) TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; if (spill->partitions == NULL) - hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + hashagg_spill_init(spill, aggstate->hash_tapeset, 0, perhash->aggnode->numGroups, aggstate->hashentrysize); @@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate) HashAggBatch *batch; AggStatePerHash perhash; HashAggSpill spill; - HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; + LogicalTapeSet *tapeset = aggstate->hash_tapeset; bool spill_initialized = false; if (aggstate->hash_batches == NIL) @@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate) * that we don't assign tapes that will never be used. */ spill_initialized = true; - hashagg_spill_init(&spill, tapeinfo, batch->used_bits, + hashagg_spill_init(&spill, tapeset, batch->used_bits, batch->input_card, aggstate->hashentrysize); } /* no memory for a new group, spill */ @@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate) ResetExprContext(aggstate->tmpcontext); } - hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum); + LogicalTapeClose(batch->input_tape); /* change back to phase 0 */ aggstate->current_phase = 0; @@ -2885,74 +2866,13 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) } /* - * Initialize HashTapeInfo - */ -static void -hashagg_tapeinfo_init(AggState *aggstate) -{ - HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo)); - int init_tapes = 16; /* expanded dynamically */ - - tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1); - tapeinfo->ntapes = init_tapes; - tapeinfo->nfreetapes = init_tapes; - tapeinfo->freetapes_alloc = init_tapes; - tapeinfo->freetapes = palloc(init_tapes * sizeof(int)); - for (int i = 0; i < init_tapes; i++) - tapeinfo->freetapes[i] = i; - - aggstate->hash_tapeinfo = tapeinfo; -} - -/* - * Assign unused tapes to spill partitions, extending the tape set if - * necessary. - */ -static void -hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions, - int npartitions) -{ - int partidx = 0; - - /* use free tapes if available */ - while (partidx < npartitions && tapeinfo->nfreetapes > 0) - partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes]; - - if (partidx < npartitions) - { - LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx); - - while (partidx < npartitions) - partitions[partidx++] = tapeinfo->ntapes++; - } -} - -/* - * After a tape has already been written to and then read, this function - * rewinds it for writing and adds it to the free list. - */ -static void -hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) -{ - /* rewinding frees the buffer while not in use */ - LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum); - if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes) - { - tapeinfo->freetapes_alloc <<= 1; - tapeinfo->freetapes = repalloc(tapeinfo->freetapes, - tapeinfo->freetapes_alloc * sizeof(int)); - } - tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum; -} - -/* * hashagg_spill_init * * Called after we determined that spilling is necessary. Chooses the number * of partitions to create, and initializes them. */ static void -hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, +hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, double input_groups, double hashentrysize) { int npartitions; @@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, npartitions = hash_choose_num_partitions(input_groups, hashentrysize, used_bits, &partition_bits); - spill->partitions = palloc0(sizeof(int) * npartitions); + spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions); spill->ntuples = palloc0(sizeof(int64) * npartitions); spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions); - hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); + for (int i = 0; i < npartitions; i++) + spill->partitions[i] = LogicalTapeCreate(tapeset); - spill->tapeset = tapeinfo->tapeset; spill->shift = 32 - used_bits - partition_bits; spill->mask = (npartitions - 1) << spill->shift; spill->npartitions = npartitions; @@ -2986,11 +2906,10 @@ static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *inputslot, uint32 hash) { - LogicalTapeSet *tapeset = spill->tapeset; TupleTableSlot *spillslot; int partition; MinimalTuple tuple; - int tapenum; + LogicalTape *tape; int total_written = 0; bool shouldFree; @@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, */ addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash)); - tapenum = spill->partitions[partition]; + tape = spill->partitions[partition]; - LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); + LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32)); total_written += sizeof(uint32); - LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len); + LogicalTapeWrite(tape, (void *) tuple, tuple->t_len); total_written += tuple->t_len; if (shouldFree) @@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, * be done. */ static HashAggBatch * -hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, +hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); batch->setno = setno; batch->used_bits = used_bits; - batch->tapeset = tapeset; - batch->input_tapenum = tapenum; + batch->input_tape = input_tape; batch->input_tuples = input_tuples; batch->input_card = input_card; @@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) { - LogicalTapeSet *tapeset = batch->tapeset; - int tapenum = batch->input_tapenum; + LogicalTape *tape = batch->input_tape; MinimalTuple tuple; uint32 t_len; size_t nread; uint32 hash; - nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32)); + nread = LogicalTapeRead(tape, &hash, sizeof(uint32)); if (nread == 0) return NULL; if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", - tapenum, sizeof(uint32), nread))); + errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes", + tape, sizeof(uint32), nread))); if (hashp != NULL) *hashp = hash; - nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len)); + nread = LogicalTapeRead(tape, &t_len, sizeof(t_len)); if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", - tapenum, sizeof(uint32), nread))); + errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes", + tape, sizeof(uint32), nread))); tuple = (MinimalTuple) palloc(t_len); tuple->t_len = t_len; - nread = LogicalTapeRead(tapeset, tapenum, + nread = LogicalTapeRead(tape, (void *) ((char *) tuple + sizeof(uint32)), t_len - sizeof(uint32)); if (nread != t_len - sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", - tapenum, t_len - sizeof(uint32), nread))); + errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes", + tape, t_len - sizeof(uint32), nread))); return tuple; } @@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) for (i = 0; i < spill->npartitions; i++) { - LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset; - int tapenum = spill->partitions[i]; + LogicalTape *tape = spill->partitions[i]; HashAggBatch *new_batch; double cardinality; @@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) freeHyperLogLog(&spill->hll_card[i]); /* rewinding frees the buffer while not in use */ - LogicalTapeRewindForRead(tapeset, tapenum, - HASHAGG_READ_BUFFER_SIZE); + LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE); - new_batch = hashagg_batch_new(tapeset, tapenum, setno, + new_batch = hashagg_batch_new(tape, setno, spill->ntuples[i], cardinality, used_bits); aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); @@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate) aggstate->hash_batches = NIL; /* close tape set */ - if (aggstate->hash_tapeinfo != NULL) + if (aggstate->hash_tapeset != NULL) { - HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; - - LogicalTapeSetClose(tapeinfo->tapeset); - pfree(tapeinfo->freetapes); - pfree(tapeinfo); - aggstate->hash_tapeinfo = NULL; + LogicalTapeSetClose(aggstate->hash_tapeset); + aggstate->hash_tapeset = NULL; } } |