aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGather.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r--src/backend/executor/nodeGather.c108
1 files changed, 52 insertions, 56 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index c689a4d17a0..7e2272f634b 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -16,6 +16,7 @@
#include "postgres.h"
#include "access/relscan.h"
+#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
@@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate = makeNode(GatherState);
gatherstate->ps.plan = (Plan *) node;
gatherstate->ps.state = estate;
- gatherstate->need_to_scan_workers = false;
gatherstate->need_to_scan_locally = !node->single_copy;
/*
@@ -106,52 +106,57 @@ ExecGather(GatherState *node)
* needs to allocate large dynamic segement, so it is better to do if it
* is really needed.
*/
- if (!node->pei)
+ if (!node->initialized)
{
EState *estate = node->ps.state;
-
- /* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- ((Gather *) (node->ps.plan))->num_workers);
+ Gather *gather = (Gather *) node->ps.plan;
/*
- * Register backend workers. If the required number of workers are not
- * available then we perform the scan with available workers and if
- * there are no more workers available, then the Gather node will just
- * scan locally.
+ * Sometimes we might have to run without parallelism; but if
+ * parallel mode is active then we can try to fire up some workers.
*/
- LaunchParallelWorkers(node->pei->pcxt);
-
- node->funnel = CreateTupleQueueFunnel();
-
- for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+ if (gather->num_workers > 0 && IsInParallelMode())
{
- if (node->pei->pcxt->worker[i].bgwhandle)
+ bool got_any_worker = false;
+
+ /* Initialize the workers required to execute Gather node. */
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
+
+ /*
+ * Register backend workers. We might not get as many as we
+ * requested, or indeed any at all.
+ */
+ LaunchParallelWorkers(node->pei->pcxt);
+
+ /* Set up a tuple queue to collect the results. */
+ node->funnel = CreateTupleQueueFunnel();
+ for (i = 0; i < node->pei->pcxt->nworkers; ++i)
{
- shm_mq_set_handle(node->pei->tqueue[i],
- node->pei->pcxt->worker[i].bgwhandle);
- RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
- node->need_to_scan_workers = true;
+ if (node->pei->pcxt->worker[i].bgwhandle)
+ {
+ shm_mq_set_handle(node->pei->tqueue[i],
+ node->pei->pcxt->worker[i].bgwhandle);
+ RegisterTupleQueueOnFunnel(node->funnel,
+ node->pei->tqueue[i]);
+ got_any_worker = true;
+ }
}
+
+ /* No workers? Then never mind. */
+ if (!got_any_worker)
+ ExecShutdownGather(node);
}
- /* If no workers are available, we must always scan locally. */
- if (!node->need_to_scan_workers)
- node->need_to_scan_locally = true;
+ /* Run plan locally if no workers or not single-copy. */
+ node->need_to_scan_locally = (node->funnel == NULL)
+ || !gather->single_copy;
+ node->initialized = true;
}
slot = gather_getnext(node);
- if (TupIsNull(slot))
- {
- /*
- * Destroy the parallel context once we complete fetching all the
- * tuples. Otherwise, the DSM and workers will stick around for the
- * lifetime of the entire statement.
- */
- ExecShutdownGather(node);
- }
return slot;
}
@@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate)
*/
slot = gatherstate->ps.ps_ProjInfo->pi_slot;
- while (gatherstate->need_to_scan_workers ||
- gatherstate->need_to_scan_locally)
+ while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
{
- if (gatherstate->need_to_scan_workers)
+ if (gatherstate->funnel != NULL)
{
bool done = false;
@@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally,
&done);
if (done)
- gatherstate->need_to_scan_workers = false;
+ ExecShutdownGather(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate)
void
ExecShutdownGather(GatherState *node)
{
- Gather *gather;
-
- if (node->pei == NULL || node->pei->pcxt == NULL)
- return;
-
- /*
- * Ensure all workers have finished before destroying the parallel context
- * to ensure a clean exit.
- */
- if (node->funnel)
+ /* Shut down tuple queue funnel before shutting down workers. */
+ if (node->funnel != NULL)
{
DestroyTupleQueueFunnel(node->funnel);
node->funnel = NULL;
}
- ExecParallelFinish(node->pei);
-
- /* destroy parallel context. */
- DestroyParallelContext(node->pei->pcxt);
- node->pei->pcxt = NULL;
-
- gather = (Gather *) node->ps.plan;
- node->need_to_scan_locally = !gather->single_copy;
- node->need_to_scan_workers = false;
+ /* Now shut down the workers. */
+ if (node->pei != NULL)
+ {
+ ExecParallelFinish(node->pei);
+ ExecParallelCleanup(node->pei);
+ node->pei = NULL;
+ }
}
/* ----------------------------------------------------------------
@@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node)
*/
ExecShutdownGather(node);
+ node->initialized = false;
+
ExecReScan(node->ps.lefttree);
}