diff options
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r-- | src/backend/executor/nodeGather.c | 54 |
1 files changed, 40 insertions, 14 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 9c1533e3113..5f589614dc2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -41,6 +41,7 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate); +static void ExecShutdownGatherWorkers(GatherState *node); /* ---------------------------------------------------------------- @@ -150,9 +151,10 @@ ExecGather(GatherState *node) bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ - node->pei = ExecInitParallelPlan(node->ps.lefttree, - estate, - gather->num_workers); + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gather->num_workers); /* * Register backend workers. We might not get as many as we @@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate) gatherstate->need_to_scan_locally, &done); if (done) - ExecShutdownGather(gatherstate); + ExecShutdownGatherWorkers(gatherstate); if (HeapTupleIsValid(tup)) { @@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate) } /* ---------------------------------------------------------------- - * ExecShutdownGather + * ExecShutdownGatherWorkers * - * Destroy the setup for parallel workers. Collect all the - * stats after workers are stopped, else some work done by - * workers won't be accounted. + * Destroy the parallel workers. Collect all the stats after + * workers are stopped, else some work done by workers won't be + * accounted. * ---------------------------------------------------------------- */ void -ExecShutdownGather(GatherState *node) +ExecShutdownGatherWorkers(GatherState *node) { /* Shut down tuple queue funnel before shutting down workers. */ if (node->funnel != NULL) @@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node) /* Now shut down the workers. */ if (node->pei != NULL) - { ExecParallelFinish(node->pei); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGather + * + * Destroy the setup for parallel workers including parallel context. + * Collect all the stats after workers are stopped, else some work + * done by workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGather(GatherState *node) +{ + ExecShutdownGatherWorkers(node); + + /* Now destroy the parallel context. */ + if (node->pei != NULL) + { ExecParallelCleanup(node->pei); node->pei = NULL; } @@ -349,14 +368,21 @@ void ExecReScanGather(GatherState *node) { /* - * Re-initialize the parallel context and workers to perform rescan of - * relation. We want to gracefully shutdown all the workers so that they + * Re-initialize the parallel workers to perform rescan of relation. + * We want to gracefully shutdown all the workers so that they * should be able to propagate any error or other information to master - * backend before dying. + * backend before dying. Parallel context will be reused for rescan. */ - ExecShutdownGather(node); + ExecShutdownGatherWorkers(node); node->initialized = false; + if (node->pei) + { + ReinitializeParallelDSM(node->pei->pcxt); + node->pei->tqueue = + ExecParallelReinitializeTupleQueues(node->pei->pcxt); + } + ExecReScan(node->ps.lefttree); } |