aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c33
1 files changed, 26 insertions, 7 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index efcbaef416c..99a9de3cdc3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+ bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
* to the main backend and start the workers.
*/
static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq_handle **responseq;
char *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
- /* Allocate space from the DSM for the queues themselves. */
- tqueuespace = shm_toc_allocate(pcxt->toc,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ /*
+ * If not reinitializing, allocate space from the DSM for the queues;
+ * otherwise, find the already allocated space.
+ */
+ if (!reinitialize)
+ tqueuespace =
+ shm_toc_allocate(pcxt->toc,
+ PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ else
+ tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
/* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,13 +256,24 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
}
/* Add array of queues to shm_toc, so others can find it. */
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+ if (!reinitialize)
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */
return responseq;
}
/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+ return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
+/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */
- pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/*
* If instrumentation options were supplied, allocate space for the