aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c33
-rw-r--r--src/backend/executor/nodeGather.c54
2 files changed, 66 insertions, 21 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index efcbaef416c..99a9de3cdc3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+ bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
* to the main backend and start the workers.
*/
static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq_handle **responseq;
char *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
- /* Allocate space from the DSM for the queues themselves. */
- tqueuespace = shm_toc_allocate(pcxt->toc,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ /*
+ * If not reinitializing, allocate space from the DSM for the queues;
+ * otherwise, find the already allocated space.
+ */
+ if (!reinitialize)
+ tqueuespace =
+ shm_toc_allocate(pcxt->toc,
+ PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ else
+ tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
/* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,13 +256,24 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
}
/* Add array of queues to shm_toc, so others can find it. */
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+ if (!reinitialize)
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */
return responseq;
}
/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+ return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
+/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */
- pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/*
* If instrumentation options were supplied, allocate space for the
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 9c1533e3113..5f589614dc2 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -41,6 +41,7 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
/* ----------------------------------------------------------------
@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
/*
* Register backend workers. We might not get as many as we
@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally,
&done);
if (done)
- ExecShutdownGather(gatherstate);
+ ExecShutdownGatherWorkers(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
}
/* ----------------------------------------------------------------
- * ExecShutdownGather
+ * ExecShutdownGatherWorkers
*
- * Destroy the setup for parallel workers. Collect all the
- * stats after workers are stopped, else some work done by
- * workers won't be accounted.
+ * Destroy the parallel workers. Collect all the stats after
+ * workers are stopped, else some work done by workers won't be
+ * accounted.
* ----------------------------------------------------------------
*/
void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
{
/* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL)
@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
/* Now shut down the workers. */
if (node->pei != NULL)
- {
ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers including parallel context.
+ * Collect all the stats after workers are stopped, else some work
+ * done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ ExecShutdownGatherWorkers(node);
+
+ /* Now destroy the parallel context. */
+ if (node->pei != NULL)
+ {
ExecParallelCleanup(node->pei);
node->pei = NULL;
}
@@ -349,14 +368,21 @@ void
ExecReScanGather(GatherState *node)
{
/*
- * Re-initialize the parallel context and workers to perform rescan of
- * relation. We want to gracefully shutdown all the workers so that they
+ * Re-initialize the parallel workers to perform rescan of relation.
+ * We want to gracefully shutdown all the workers so that they
* should be able to propagate any error or other information to master
- * backend before dying.
+ * backend before dying. Parallel context will be reused for rescan.
*/
- ExecShutdownGather(node);
+ ExecShutdownGatherWorkers(node);
node->initialized = false;
+ if (node->pei)
+ {
+ ReinitializeParallelDSM(node->pei->pcxt);
+ node->pei->tqueue =
+ ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+ }
+
ExecReScan(node->ps.lefttree);
}