aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGatherMerge.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGatherMerge.c')
-rw-r--r--src/backend/executor/nodeGatherMerge.c159
1 files changed, 105 insertions, 54 deletions
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 67da5ff71ff..b8bb4f8eb04 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
bool nowait, bool *done);
-static void gather_merge_init(GatherMergeState *gm_state);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
+static void gather_merge_setup(GatherMergeState *gm_state);
+static void gather_merge_init(GatherMergeState *gm_state);
+static void gather_merge_clear_tuples(GatherMergeState *gm_state);
static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
bool nowait);
static void load_tuple_array(GatherMergeState *gm_state, int reader);
@@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
}
/*
- * store the tuple descriptor into gather merge state, so we can use it
- * later while initializing the gather merge slots.
+ * Store the tuple descriptor into gather merge state, so we can use it
+ * while initializing the gather merge slots.
*/
if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
hasoid = false;
tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
gm_state->tupDesc = tupDesc;
+ /* Now allocate the workspace for gather merge */
+ gather_merge_setup(gm_state);
+
return gm_state;
}
@@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
/* Make sure any existing workers are gracefully shut down */
ExecShutdownGatherMergeWorkers(node);
+ /* Free any unused tuples, so we don't leak memory across rescans */
+ gather_merge_clear_tuples(node);
+
/* Mark node so that shared state will be rebuilt at next call */
node->initialized = false;
node->gm_initialized = false;
@@ -370,49 +378,93 @@ ExecReScanGatherMerge(GatherMergeState *node)
}
/*
- * Initialize the Gather merge tuple read.
+ * Set up the data structures that we'll need for Gather Merge.
+ *
+ * We allocate these once on the basis of gm->num_workers, which is an
+ * upper bound for the number of workers we'll actually have. During
+ * a rescan, we reset the structures to empty. This approach simplifies
+ * not leaking memory across rescans.
*
- * Pull at least a single tuple from each worker + leader and set up the heap.
+ * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
+ * are for workers. The values placed into gm_heap correspond to indexes
+ * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
+ * 0 to n-1; it has no entry for the leader.
*/
static void
-gather_merge_init(GatherMergeState *gm_state)
+gather_merge_setup(GatherMergeState *gm_state)
{
- int nreaders = gm_state->nreaders;
- bool nowait = true;
+ GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
+ int nreaders = gm->num_workers;
int i;
/*
* Allocate gm_slots for the number of workers + one more slot for leader.
- * Last slot is always for leader. Leader always calls ExecProcNode() to
- * read the tuple which will return the TupleTableSlot. Later it will
- * directly get assigned to gm_slot. So just initialize leader gm_slot
- * with NULL. For other slots, code below will call
- * ExecInitExtraTupleSlot() to create a slot for the worker's results.
+ * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
+ * read the tuple, and then stores it directly into its gm_slots entry.
+ * For other slots, code below will call ExecInitExtraTupleSlot() to
+ * create a slot for the worker's results. Note that during any single
+ * scan, we might have fewer than num_workers available workers, in which
+ * case the extra array entries go unused.
*/
- gm_state->gm_slots =
- palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
- gm_state->gm_slots[gm_state->nreaders] = NULL;
-
- /* Initialize the tuple slot and tuple array for each worker */
- gm_state->gm_tuple_buffers =
- (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
- gm_state->nreaders);
- for (i = 0; i < gm_state->nreaders; i++)
+ gm_state->gm_slots = (TupleTableSlot **)
+ palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
+
+ /* Allocate the tuple slot and tuple array for each worker */
+ gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
+ palloc0(nreaders * sizeof(GMReaderTupleBuffer));
+
+ for (i = 0; i < nreaders; i++)
{
/* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple =
(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
- /* Initialize slot for worker */
- gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state);
- ExecSetSlotDescriptor(gm_state->gm_slots[i],
+ /* Initialize tuple slot for worker */
+ gm_state->gm_slots[i + 1] = ExecInitExtraTupleSlot(gm_state->ps.state);
+ ExecSetSlotDescriptor(gm_state->gm_slots[i + 1],
gm_state->tupDesc);
}
/* Allocate the resources for the merge */
- gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+ gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
heap_compare_slots,
gm_state);
+}
+
+/*
+ * Initialize the Gather Merge.
+ *
+ * Reset data structures to ensure they're empty. Then pull at least one
+ * tuple from leader + each worker (or set its "done" indicator), and set up
+ * the heap.
+ */
+static void
+gather_merge_init(GatherMergeState *gm_state)
+{
+ int nreaders = gm_state->nreaders;
+ bool nowait = true;
+ int i;
+
+ /* Assert that gather_merge_setup made enough space */
+ Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
+
+ /* Reset leader's tuple slot to empty */
+ gm_state->gm_slots[0] = NULL;
+
+ /* Reset the tuple slot and tuple array for each worker */
+ for (i = 0; i < nreaders; i++)
+ {
+ /* Reset tuple array to empty */
+ gm_state->gm_tuple_buffers[i].nTuples = 0;
+ gm_state->gm_tuple_buffers[i].readCounter = 0;
+ /* Reset done flag to not-done */
+ gm_state->gm_tuple_buffers[i].done = false;
+ /* Ensure output slot is empty */
+ ExecClearTuple(gm_state->gm_slots[i + 1]);
+ }
+
+ /* Reset binary heap to empty */
+ binaryheap_reset(gm_state->gm_heap);
/*
* First, try to read a tuple from each worker (including leader) in
@@ -422,14 +474,13 @@ gather_merge_init(GatherMergeState *gm_state)
* least one tuple) to the heap.
*/
reread:
- for (i = 0; i < nreaders + 1; i++)
+ for (i = 0; i <= nreaders; i++)
{
CHECK_FOR_INTERRUPTS();
- /* ignore this source if already known done */
- if ((i < nreaders) ?
- !gm_state->gm_tuple_buffers[i].done :
- gm_state->need_to_scan_locally)
+ /* skip this source if already known done */
+ if ((i == 0) ? gm_state->need_to_scan_locally :
+ !gm_state->gm_tuple_buffers[i - 1].done)
{
if (TupIsNull(gm_state->gm_slots[i]))
{
@@ -450,9 +501,9 @@ reread:
}
/* need not recheck leader, since nowait doesn't matter for it */
- for (i = 0; i < nreaders; i++)
+ for (i = 1; i <= nreaders; i++)
{
- if (!gm_state->gm_tuple_buffers[i].done &&
+ if (!gm_state->gm_tuple_buffers[i - 1].done &&
TupIsNull(gm_state->gm_slots[i]))
{
nowait = false;
@@ -467,23 +518,23 @@ reread:
}
/*
- * Clear out the tuple table slots for each gather merge input.
+ * Clear out the tuple table slot, and any unused pending tuples,
+ * for each gather merge input.
*/
static void
-gather_merge_clear_slots(GatherMergeState *gm_state)
+gather_merge_clear_tuples(GatherMergeState *gm_state)
{
int i;
for (i = 0; i < gm_state->nreaders; i++)
{
- pfree(gm_state->gm_tuple_buffers[i].tuple);
- ExecClearTuple(gm_state->gm_slots[i]);
- }
+ GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
- /* Free tuple array as we don't need it any more */
- pfree(gm_state->gm_tuple_buffers);
- /* Free the binaryheap, which was created for sort */
- binaryheap_free(gm_state->gm_heap);
+ while (tuple_buffer->readCounter < tuple_buffer->nTuples)
+ heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+
+ ExecClearTuple(gm_state->gm_slots[i + 1]);
+ }
}
/*
@@ -526,7 +577,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
if (binaryheap_empty(gm_state->gm_heap))
{
/* All the queues are exhausted, and so is the heap */
- gather_merge_clear_slots(gm_state);
+ gather_merge_clear_tuples(gm_state);
return NULL;
}
else
@@ -548,10 +599,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
int i;
/* Don't do anything if this is the leader. */
- if (reader == gm_state->nreaders)
+ if (reader == 0)
return;
- tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+ tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
/* If there's nothing in the array, reset the counters to zero. */
if (tuple_buffer->nTuples == tuple_buffer->readCounter)
@@ -590,7 +641,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
* If we're being asked to generate a tuple from the leader, then we just
* call ExecProcNode as normal to produce one.
*/
- if (gm_state->nreaders == reader)
+ if (reader == 0)
{
if (gm_state->need_to_scan_locally)
{
@@ -601,7 +652,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
if (!TupIsNull(outerTupleSlot))
{
- gm_state->gm_slots[reader] = outerTupleSlot;
+ gm_state->gm_slots[0] = outerTupleSlot;
return true;
}
/* need_to_scan_locally serves as "done" flag for leader */
@@ -611,7 +662,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
}
/* Otherwise, check the state of the relevant tuple buffer. */
- tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+ tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
if (tuple_buffer->nTuples > tuple_buffer->readCounter)
{
@@ -621,8 +672,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
else if (tuple_buffer->done)
{
/* Reader is known to be exhausted. */
- DestroyTupleQueueReader(gm_state->reader[reader]);
- gm_state->reader[reader] = NULL;
+ DestroyTupleQueueReader(gm_state->reader[reader - 1]);
+ gm_state->reader[reader - 1] = NULL;
return false;
}
else
@@ -649,14 +700,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
ExecStoreTuple(tup, /* tuple to store */
gm_state->gm_slots[reader], /* slot in which to store the
* tuple */
- InvalidBuffer, /* buffer associated with this tuple */
- true); /* pfree this pointer if not from heap */
+ InvalidBuffer, /* no buffer associated with tuple */
+ true); /* pfree tuple when done with it */
return true;
}
/*
- * Attempt to read a tuple from given reader.
+ * Attempt to read a tuple from given worker.
*/
static HeapTuple
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
@@ -671,7 +722,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
CHECK_FOR_INTERRUPTS();
/* Attempt to read a tuple. */
- reader = gm_state->reader[nreader];
+ reader = gm_state->reader[nreader - 1];
/* Run TupleQueueReaders in per-tuple context */
tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;