aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGather.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r--src/backend/executor/nodeGather.c54
1 files changed, 40 insertions, 14 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 9c1533e3113..5f589614dc2 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -41,6 +41,7 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
/* ----------------------------------------------------------------
@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
/*
* Register backend workers. We might not get as many as we
@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally,
&done);
if (done)
- ExecShutdownGather(gatherstate);
+ ExecShutdownGatherWorkers(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
}
/* ----------------------------------------------------------------
- * ExecShutdownGather
+ * ExecShutdownGatherWorkers
*
- * Destroy the setup for parallel workers. Collect all the
- * stats after workers are stopped, else some work done by
- * workers won't be accounted.
+ * Destroy the parallel workers. Collect all the stats after
+ * workers are stopped, else some work done by workers won't be
+ * accounted.
* ----------------------------------------------------------------
*/
void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
{
/* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL)
@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
/* Now shut down the workers. */
if (node->pei != NULL)
- {
ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers including parallel context.
+ * Collect all the stats after workers are stopped, else some work
+ * done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ ExecShutdownGatherWorkers(node);
+
+ /* Now destroy the parallel context. */
+ if (node->pei != NULL)
+ {
ExecParallelCleanup(node->pei);
node->pei = NULL;
}
@@ -349,14 +368,21 @@ void
ExecReScanGather(GatherState *node)
{
/*
- * Re-initialize the parallel context and workers to perform rescan of
- * relation. We want to gracefully shutdown all the workers so that they
+ * Re-initialize the parallel workers to perform rescan of relation.
+ * We want to gracefully shutdown all the workers so that they
* should be able to propagate any error or other information to master
- * backend before dying.
+ * backend before dying. Parallel context will be reused for rescan.
*/
- ExecShutdownGather(node);
+ ExecShutdownGatherWorkers(node);
node->initialized = false;
+ if (node->pei)
+ {
+ ReinitializeParallelDSM(node->pei->pcxt);
+ node->pei->tqueue =
+ ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+ }
+
ExecReScan(node->ps.lefttree);
}