aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGather.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-10-16 11:56:02 -0400
committerRobert Haas <rhaas@postgresql.org>2015-10-16 11:56:02 -0400
commitbfc78d7196eb28cd4e3d6c24f7e607bacecf1129 (patch)
treee13e9937fcfc2666def5dfad1aa8cdd6038fdd78 /src/backend/executor/nodeGather.c
parent816e336f12ecabdc834d4cc31bcf966b2dd323dc (diff)
downloadpostgresql-bfc78d7196eb28cd4e3d6c24f7e607bacecf1129.tar.gz
postgresql-bfc78d7196eb28cd4e3d6c24f7e607bacecf1129.zip
Rewrite interaction of parallel mode with parallel executor support.
In the previous coding, before returning from ExecutorRun, we'd shut down all parallel workers. This was dead wrong if ExecutorRun was called with a non-zero tuple count; it had the effect of truncating the query output. To fix, give ExecutePlan control over whether to enter parallel mode, and have it refuse to do so if the tuple count is non-zero. Rewrite the Gather logic so that it can cope with being called outside parallel mode. Commit 7aea8e4f2daa4b39ca9d1309a0c4aadb0f7ed81b is largely to blame for this problem, though this patch modifies some subsequently-committed code which relied on the guarantees it purported to make.
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r--src/backend/executor/nodeGather.c108
1 files changed, 52 insertions, 56 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index c689a4d17a0..7e2272f634b 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -16,6 +16,7 @@
#include "postgres.h"
#include "access/relscan.h"
+#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
@@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate = makeNode(GatherState);
gatherstate->ps.plan = (Plan *) node;
gatherstate->ps.state = estate;
- gatherstate->need_to_scan_workers = false;
gatherstate->need_to_scan_locally = !node->single_copy;
/*
@@ -106,52 +106,57 @@ ExecGather(GatherState *node)
* needs to allocate large dynamic segement, so it is better to do if it
* is really needed.
*/
- if (!node->pei)
+ if (!node->initialized)
{
EState *estate = node->ps.state;
-
- /* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- ((Gather *) (node->ps.plan))->num_workers);
+ Gather *gather = (Gather *) node->ps.plan;
/*
- * Register backend workers. If the required number of workers are not
- * available then we perform the scan with available workers and if
- * there are no more workers available, then the Gather node will just
- * scan locally.
+ * Sometimes we might have to run without parallelism; but if
+ * parallel mode is active then we can try to fire up some workers.
*/
- LaunchParallelWorkers(node->pei->pcxt);
-
- node->funnel = CreateTupleQueueFunnel();
-
- for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+ if (gather->num_workers > 0 && IsInParallelMode())
{
- if (node->pei->pcxt->worker[i].bgwhandle)
+ bool got_any_worker = false;
+
+ /* Initialize the workers required to execute Gather node. */
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
+
+ /*
+ * Register backend workers. We might not get as many as we
+ * requested, or indeed any at all.
+ */
+ LaunchParallelWorkers(node->pei->pcxt);
+
+ /* Set up a tuple queue to collect the results. */
+ node->funnel = CreateTupleQueueFunnel();
+ for (i = 0; i < node->pei->pcxt->nworkers; ++i)
{
- shm_mq_set_handle(node->pei->tqueue[i],
- node->pei->pcxt->worker[i].bgwhandle);
- RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
- node->need_to_scan_workers = true;
+ if (node->pei->pcxt->worker[i].bgwhandle)
+ {
+ shm_mq_set_handle(node->pei->tqueue[i],
+ node->pei->pcxt->worker[i].bgwhandle);
+ RegisterTupleQueueOnFunnel(node->funnel,
+ node->pei->tqueue[i]);
+ got_any_worker = true;
+ }
}
+
+ /* No workers? Then never mind. */
+ if (!got_any_worker)
+ ExecShutdownGather(node);
}
- /* If no workers are available, we must always scan locally. */
- if (!node->need_to_scan_workers)
- node->need_to_scan_locally = true;
+ /* Run plan locally if no workers or not single-copy. */
+ node->need_to_scan_locally = (node->funnel == NULL)
+ || !gather->single_copy;
+ node->initialized = true;
}
slot = gather_getnext(node);
- if (TupIsNull(slot))
- {
- /*
- * Destroy the parallel context once we complete fetching all the
- * tuples. Otherwise, the DSM and workers will stick around for the
- * lifetime of the entire statement.
- */
- ExecShutdownGather(node);
- }
return slot;
}
@@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate)
*/
slot = gatherstate->ps.ps_ProjInfo->pi_slot;
- while (gatherstate->need_to_scan_workers ||
- gatherstate->need_to_scan_locally)
+ while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
{
- if (gatherstate->need_to_scan_workers)
+ if (gatherstate->funnel != NULL)
{
bool done = false;
@@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally,
&done);
if (done)
- gatherstate->need_to_scan_workers = false;
+ ExecShutdownGather(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate)
void
ExecShutdownGather(GatherState *node)
{
- Gather *gather;
-
- if (node->pei == NULL || node->pei->pcxt == NULL)
- return;
-
- /*
- * Ensure all workers have finished before destroying the parallel context
- * to ensure a clean exit.
- */
- if (node->funnel)
+ /* Shut down tuple queue funnel before shutting down workers. */
+ if (node->funnel != NULL)
{
DestroyTupleQueueFunnel(node->funnel);
node->funnel = NULL;
}
- ExecParallelFinish(node->pei);
-
- /* destroy parallel context. */
- DestroyParallelContext(node->pei->pcxt);
- node->pei->pcxt = NULL;
-
- gather = (Gather *) node->ps.plan;
- node->need_to_scan_locally = !gather->single_copy;
- node->need_to_scan_workers = false;
+ /* Now shut down the workers. */
+ if (node->pei != NULL)
+ {
+ ExecParallelFinish(node->pei);
+ ExecParallelCleanup(node->pei);
+ node->pei = NULL;
+ }
}
/* ----------------------------------------------------------------
@@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node)
*/
ExecShutdownGather(node);
+ node->initialized = false;
+
ExecReScan(node->ps.lefttree);
}