aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-10-30 10:43:00 +0100
committerRobert Haas <rhaas@postgresql.org>2015-10-30 10:44:54 +0100
commit3a1f8611f2582df0a16bcd35caed2e1526387643 (patch)
treea246c057e24fd1a8870194a1d1ff3bf3e15ebb58 /src/backend/executor
parentc6baec92fc48387da8164d50f5699a7162267718 (diff)
downloadpostgresql-3a1f8611f2582df0a16bcd35caed2e1526387643.tar.gz
postgresql-3a1f8611f2582df0a16bcd35caed2e1526387643.zip
Update parallel executor support to reuse the same DSM.
Commit b0b0d84b3d663a148022e900ebfc164284a95f55 purported to make it possible to relaunch workers using the same parallel context, but it had an unpleasant race condition: we might reinitialize after the workers have sent their last control message but before they have dettached the DSM, leaving to crashes. Repair by introducing a new ParallelContext operation, ReinitializeParallelDSM. Adjust execParallel.c to use this new support, so that we can rescan a Gather node by relaunching workers but without needing to recreate the DSM. Amit Kapila, with some adjustments by me. Extracted from latest parallel sequential scan patch.
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c33
-rw-r--r--src/backend/executor/nodeGather.c54
2 files changed, 66 insertions, 21 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index efcbaef416c..99a9de3cdc3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+ bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
* to the main backend and start the workers.
*/
static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq_handle **responseq;
char *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
- /* Allocate space from the DSM for the queues themselves. */
- tqueuespace = shm_toc_allocate(pcxt->toc,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ /*
+ * If not reinitializing, allocate space from the DSM for the queues;
+ * otherwise, find the already allocated space.
+ */
+ if (!reinitialize)
+ tqueuespace =
+ shm_toc_allocate(pcxt->toc,
+ PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ else
+ tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
/* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,13 +256,24 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
}
/* Add array of queues to shm_toc, so others can find it. */
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+ if (!reinitialize)
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */
return responseq;
}
/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+ return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
+/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */
- pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/*
* If instrumentation options were supplied, allocate space for the
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 9c1533e3113..5f589614dc2 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -41,6 +41,7 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
/* ----------------------------------------------------------------
@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
/*
* Register backend workers. We might not get as many as we
@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally,
&done);
if (done)
- ExecShutdownGather(gatherstate);
+ ExecShutdownGatherWorkers(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
}
/* ----------------------------------------------------------------
- * ExecShutdownGather
+ * ExecShutdownGatherWorkers
*
- * Destroy the setup for parallel workers. Collect all the
- * stats after workers are stopped, else some work done by
- * workers won't be accounted.
+ * Destroy the parallel workers. Collect all the stats after
+ * workers are stopped, else some work done by workers won't be
+ * accounted.
* ----------------------------------------------------------------
*/
void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
{
/* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL)
@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
/* Now shut down the workers. */
if (node->pei != NULL)
- {
ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers including parallel context.
+ * Collect all the stats after workers are stopped, else some work
+ * done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ ExecShutdownGatherWorkers(node);
+
+ /* Now destroy the parallel context. */
+ if (node->pei != NULL)
+ {
ExecParallelCleanup(node->pei);
node->pei = NULL;
}
@@ -349,14 +368,21 @@ void
ExecReScanGather(GatherState *node)
{
/*
- * Re-initialize the parallel context and workers to perform rescan of
- * relation. We want to gracefully shutdown all the workers so that they
+ * Re-initialize the parallel workers to perform rescan of relation.
+ * We want to gracefully shutdown all the workers so that they
* should be able to propagate any error or other information to master
- * backend before dying.
+ * backend before dying. Parallel context will be reused for rescan.
*/
- ExecShutdownGather(node);
+ ExecShutdownGatherWorkers(node);
node->initialized = false;
+ if (node->pei)
+ {
+ ReinitializeParallelDSM(node->pei->pcxt);
+ node->pei->tqueue =
+ ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+ }
+
ExecReScan(node->ps.lefttree);
}