diff options
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r-- | src/backend/executor/nodeGather.c | 108 |
1 files changed, 52 insertions, 56 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index c689a4d17a0..7e2272f634b 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "access/relscan.h" +#include "access/xact.h" #include "executor/execdebug.h" #include "executor/execParallel.h" #include "executor/nodeGather.h" @@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate = makeNode(GatherState); gatherstate->ps.plan = (Plan *) node; gatherstate->ps.state = estate; - gatherstate->need_to_scan_workers = false; gatherstate->need_to_scan_locally = !node->single_copy; /* @@ -106,52 +106,57 @@ ExecGather(GatherState *node) * needs to allocate large dynamic segement, so it is better to do if it * is really needed. */ - if (!node->pei) + if (!node->initialized) { EState *estate = node->ps.state; - - /* Initialize the workers required to execute Gather node. */ - node->pei = ExecInitParallelPlan(node->ps.lefttree, - estate, - ((Gather *) (node->ps.plan))->num_workers); + Gather *gather = (Gather *) node->ps.plan; /* - * Register backend workers. If the required number of workers are not - * available then we perform the scan with available workers and if - * there are no more workers available, then the Gather node will just - * scan locally. + * Sometimes we might have to run without parallelism; but if + * parallel mode is active then we can try to fire up some workers. */ - LaunchParallelWorkers(node->pei->pcxt); - - node->funnel = CreateTupleQueueFunnel(); - - for (i = 0; i < node->pei->pcxt->nworkers; ++i) + if (gather->num_workers > 0 && IsInParallelMode()) { - if (node->pei->pcxt->worker[i].bgwhandle) + bool got_any_worker = false; + + /* Initialize the workers required to execute Gather node. */ + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gather->num_workers); + + /* + * Register backend workers. We might not get as many as we + * requested, or indeed any at all. + */ + LaunchParallelWorkers(node->pei->pcxt); + + /* Set up a tuple queue to collect the results. */ + node->funnel = CreateTupleQueueFunnel(); + for (i = 0; i < node->pei->pcxt->nworkers; ++i) { - shm_mq_set_handle(node->pei->tqueue[i], - node->pei->pcxt->worker[i].bgwhandle); - RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]); - node->need_to_scan_workers = true; + if (node->pei->pcxt->worker[i].bgwhandle) + { + shm_mq_set_handle(node->pei->tqueue[i], + node->pei->pcxt->worker[i].bgwhandle); + RegisterTupleQueueOnFunnel(node->funnel, + node->pei->tqueue[i]); + got_any_worker = true; + } } + + /* No workers? Then never mind. */ + if (!got_any_worker) + ExecShutdownGather(node); } - /* If no workers are available, we must always scan locally. */ - if (!node->need_to_scan_workers) - node->need_to_scan_locally = true; + /* Run plan locally if no workers or not single-copy. */ + node->need_to_scan_locally = (node->funnel == NULL) + || !gather->single_copy; + node->initialized = true; } slot = gather_getnext(node); - if (TupIsNull(slot)) - { - /* - * Destroy the parallel context once we complete fetching all the - * tuples. Otherwise, the DSM and workers will stick around for the - * lifetime of the entire statement. - */ - ExecShutdownGather(node); - } return slot; } @@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate) */ slot = gatherstate->ps.ps_ProjInfo->pi_slot; - while (gatherstate->need_to_scan_workers || - gatherstate->need_to_scan_locally) + while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally) { - if (gatherstate->need_to_scan_workers) + if (gatherstate->funnel != NULL) { bool done = false; @@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate) gatherstate->need_to_scan_locally, &done); if (done) - gatherstate->need_to_scan_workers = false; + ExecShutdownGather(gatherstate); if (HeapTupleIsValid(tup)) { @@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate) void ExecShutdownGather(GatherState *node) { - Gather *gather; - - if (node->pei == NULL || node->pei->pcxt == NULL) - return; - - /* - * Ensure all workers have finished before destroying the parallel context - * to ensure a clean exit. - */ - if (node->funnel) + /* Shut down tuple queue funnel before shutting down workers. */ + if (node->funnel != NULL) { DestroyTupleQueueFunnel(node->funnel); node->funnel = NULL; } - ExecParallelFinish(node->pei); - - /* destroy parallel context. */ - DestroyParallelContext(node->pei->pcxt); - node->pei->pcxt = NULL; - - gather = (Gather *) node->ps.plan; - node->need_to_scan_locally = !gather->single_copy; - node->need_to_scan_workers = false; + /* Now shut down the workers. */ + if (node->pei != NULL) + { + ExecParallelFinish(node->pei); + ExecParallelCleanup(node->pei); + node->pei = NULL; + } } /* ---------------------------------------------------------------- @@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node) */ ExecShutdownGather(node); + node->initialized = false; + ExecReScan(node->ps.lefttree); } |